Add !chatrank and !top10chat commands

This commit is contained in:
Anthony Sottile 2020-07-18 15:08:20 -07:00
parent 931fd8e9c2
commit f360250839
1 changed files with 83 additions and 9 deletions

92
bot.py
View File

@ -2,6 +2,7 @@ from __future__ import annotations
import argparse
import asyncio.subprocess
import collections
import contextlib
import datetime
import functools
@ -17,8 +18,10 @@ import tempfile
import traceback
from typing import Any
from typing import Callable
from typing import Counter
from typing import Dict
from typing import List
from typing import Mapping
from typing import Match
from typing import NamedTuple
from typing import Optional
@ -117,6 +120,14 @@ def _gen_color(name: str) -> Tuple[int, int, int]:
return r, g, b
def _optional_user_arg(match: Match[str]) -> str:
_, _, rest = match['msg'].strip().partition(' ')
if rest:
return rest.lstrip('@')
else:
return match['user']
async def send(
writer: asyncio.StreamWriter,
msg: str,
@ -622,16 +633,9 @@ class FollowageResponse(Response):
# !followage anthonywritescode -> valid, checks the user passed in payload
# !followage foo bar -> still valid, however the whole
# "foo bar" will be processed as a username
@handle_message(r'!followage(?P<payload> .*)?')
@handle_message('!followage')
def cmd_followage(match: Match[str]) -> Response:
user = match['user']
# "" is a default value if group is missing
groupdict = match.groupdict('')
payload = groupdict['payload'].strip()
if payload:
user = payload.lstrip('@')
return FollowageResponse(user)
return FollowageResponse(_optional_user_arg(match))
@handle_message(r'!pep[ ]?(?P<pep_num>\d{1,4})')
@ -658,6 +662,76 @@ def cmd_shoutout(match: Match[str]) -> Response:
)
CHAT_LOG_RE = re.compile(
r'^\[[^]]+\][^<*]*(<(?P<chat_user>[^>]+)>|\* (?P<action_user>[^ ]+))',
)
@functools.lru_cache(maxsize=None)
def _counts_per_file(filename: str) -> Mapping[str, int]:
counts: Counter[str] = collections.Counter()
with open(filename) as f:
for line in f:
match = CHAT_LOG_RE.match(line)
assert match, line
user = match['chat_user'] or match['action_user']
assert user, line
counts[user.lower()] += 1
return counts
def _chat_rank_counts() -> Counter[str]:
total: Counter[str] = collections.Counter()
for filename in os.listdir('logs'):
full_filename = os.path.join('logs', filename)
if filename != f'{datetime.date.today()}.log':
total.update(_counts_per_file(full_filename))
else:
# don't use the cached version for today's logs
total.update(_counts_per_file.__wrapped__(full_filename))
return total
def chat_rank(username: str) -> Optional[Tuple[int, int]]:
total = _chat_rank_counts()
username = username.lower()
for i, (candidate, count) in enumerate(total.most_common(), start=1):
if candidate == username:
return i, count
else:
return None
@handle_message('!chatrank')
def cmd_chatrank(match: Match[str]) -> Response:
user = _optional_user_arg(match)
ret = chat_rank(user)
if ret is None:
return MessageResponse(match, f'user not found {esc(user)}')
else:
rank, n = ret
logs_start = min(os.listdir('logs'))
logs_start, _, _ = logs_start.partition('.')
return MessageResponse(
match,
f'{esc(user)} is ranked #{rank} with {n} messages '
f'(since {logs_start})',
)
@handle_message('!top10chat')
def cmd_top_10_chat(match: Match[str]) -> Response:
total = _chat_rank_counts()
return MessageResponse(
match,
', '.join(
f'{rank}. {user}({n})'
for rank, (user, n) in enumerate(total.most_common(10), start=1)
),
)
COMMAND_RE = re.compile(r'!\w+')
SECRET_CMDS = frozenset(('!settoday', '!so'))