diff --git a/bot.py b/bot.py index 587b8a3..86564e8 100644 --- a/bot.py +++ b/bot.py @@ -2,6 +2,7 @@ from __future__ import annotations import argparse import asyncio.subprocess +import collections import contextlib import datetime import functools @@ -17,8 +18,10 @@ import tempfile import traceback from typing import Any from typing import Callable +from typing import Counter from typing import Dict from typing import List +from typing import Mapping from typing import Match from typing import NamedTuple from typing import Optional @@ -117,6 +120,14 @@ def _gen_color(name: str) -> Tuple[int, int, int]: return r, g, b +def _optional_user_arg(match: Match[str]) -> str: + _, _, rest = match['msg'].strip().partition(' ') + if rest: + return rest.lstrip('@') + else: + return match['user'] + + async def send( writer: asyncio.StreamWriter, msg: str, @@ -622,16 +633,9 @@ class FollowageResponse(Response): # !followage anthonywritescode -> valid, checks the user passed in payload # !followage foo bar -> still valid, however the whole # "foo bar" will be processed as a username -@handle_message(r'!followage(?P .*)?') +@handle_message('!followage') def cmd_followage(match: Match[str]) -> Response: - user = match['user'] - # "" is a default value if group is missing - groupdict = match.groupdict('') - payload = groupdict['payload'].strip() - if payload: - user = payload.lstrip('@') - - return FollowageResponse(user) + return FollowageResponse(_optional_user_arg(match)) @handle_message(r'!pep[ ]?(?P\d{1,4})') @@ -658,6 +662,76 @@ def cmd_shoutout(match: Match[str]) -> Response: ) +CHAT_LOG_RE = re.compile( + r'^\[[^]]+\][^<*]*(<(?P[^>]+)>|\* (?P[^ ]+))', +) + + +@functools.lru_cache(maxsize=None) +def _counts_per_file(filename: str) -> Mapping[str, int]: + counts: Counter[str] = collections.Counter() + with open(filename) as f: + for line in f: + match = CHAT_LOG_RE.match(line) + assert match, line + user = match['chat_user'] or match['action_user'] + assert user, line + counts[user.lower()] += 1 + return counts + + +def _chat_rank_counts() -> Counter[str]: + total: Counter[str] = collections.Counter() + for filename in os.listdir('logs'): + full_filename = os.path.join('logs', filename) + if filename != f'{datetime.date.today()}.log': + total.update(_counts_per_file(full_filename)) + else: + # don't use the cached version for today's logs + total.update(_counts_per_file.__wrapped__(full_filename)) + return total + + +def chat_rank(username: str) -> Optional[Tuple[int, int]]: + total = _chat_rank_counts() + + username = username.lower() + for i, (candidate, count) in enumerate(total.most_common(), start=1): + if candidate == username: + return i, count + else: + return None + + +@handle_message('!chatrank') +def cmd_chatrank(match: Match[str]) -> Response: + user = _optional_user_arg(match) + ret = chat_rank(user) + if ret is None: + return MessageResponse(match, f'user not found {esc(user)}') + else: + rank, n = ret + logs_start = min(os.listdir('logs')) + logs_start, _, _ = logs_start.partition('.') + return MessageResponse( + match, + f'{esc(user)} is ranked #{rank} with {n} messages ' + f'(since {logs_start})', + ) + + +@handle_message('!top10chat') +def cmd_top_10_chat(match: Match[str]) -> Response: + total = _chat_rank_counts() + return MessageResponse( + match, + ', '.join( + f'{rank}. {user}({n})' + for rank, (user, n) in enumerate(total.most_common(10), start=1) + ), + ) + + COMMAND_RE = re.compile(r'!\w+') SECRET_CMDS = frozenset(('!settoday', '!so'))