twitch-chat-bot/bot.py

943 lines
29 KiB
Python

from __future__ import annotations
import argparse
import asyncio.subprocess
import collections
import contextlib
import datetime
import functools
import hashlib
import json
import os.path
import random
import re
import signal
import struct
import sys
import tempfile
import traceback
from typing import Any
from typing import Callable
from typing import Counter
from typing import Dict
from typing import List
from typing import Mapping
from typing import Match
from typing import NamedTuple
from typing import Optional
from typing import Pattern
from typing import Tuple
import aiohttp
import aiosqlite
import async_lru
import pyjokes
from humanize import naturaldelta
# TODO: allow host / port to be configurable
HOST = 'irc.chat.twitch.tv'
PORT = 6697
MSG_RE = re.compile(
'^@(?P<info>[^ ]+) :(?P<user>[^!]+).* '
'PRIVMSG #(?P<channel>[^ ]+) '
':(?P<msg>[^\r]+)',
)
PRIVMSG = 'PRIVMSG #{channel} : {msg}\r\n'
SEND_MSG_RE = re.compile('^PRIVMSG #[^ ]+ :(?P<msg>[^\r]+)')
class Config(NamedTuple):
username: str
channel: str
oauth_token: str
client_id: str
def __repr__(self) -> str:
return (
f'{type(self).__name__}('
f'username={self.username!r}, '
f'channel={self.channel!r}, '
f'oauth_token={"***"!r}, '
f'client_id={"***"!r}, '
f')'
)
def esc(s: str) -> str:
return s.replace('{', '{{').replace('}', '}}')
def _parse_badge_info(s: str) -> Dict[str, str]:
ret = {}
for part in s.split(';'):
k, v = part.split('=', 1)
ret[k] = v
return ret
def _parse_color(s: str) -> Tuple[int, int, int]:
return int(s[1:3], 16), int(s[3:5], 16), int(s[5:7], 16)
def _badges(badges: str) -> str:
ret = ''
for s, reg in (
('\033[48;2;000;000;000m⚙\033[m', re.compile('^staff/')),
('\033[48;2;000;173;003m⚔\033[m', re.compile('^moderator/')),
('\033[48;2;224;005;185m♦\033[m', re.compile('^vip/')),
('\033[48;2;233;025;022m☞\033[m', re.compile('^broadcaster/')),
('\033[48;2;130;005;180m★\033[m', re.compile('^founder/')),
('\033[48;2;130;005;180m★\033[m', re.compile('^subscriber/')),
('\033[48;2;000;160;214m♕\033[m', re.compile('^premium/')),
('\033[48;2;089;057;154m♕\033[m', re.compile('^turbo/')),
('\033[48;2;230;186;072m◘\033[m', re.compile('^sub-gift-leader/')),
('\033[48;2;088;226;193m◘\033[m', re.compile('^sub-gifter/')),
('\033[48;2;183;125;029m♕\033[m', re.compile('^hype-train/')),
('\033[48;2;203;200;208m▴\033[m', re.compile('^bits/')),
('\033[48;2;230;186;072m♦\033[m', re.compile('^bits-leader/')),
):
for badge in badges.split(','):
if reg.match(badge):
ret += s
return ret
def _is_moderator(match: Match[str]) -> bool:
info = _parse_badge_info(match['info'])
badges = info['badges'].split(',')
return any(badge.startswith('moderator/') for badge in badges)
def _gen_color(name: str) -> Tuple[int, int, int]:
h = hashlib.sha256(name.encode())
n, = struct.unpack('Q', h.digest()[:8])
bits = [int(s) for s in bin(n)[2:]]
r = bits[0] * 0b1111111 + (bits[1] << 7)
g = bits[2] * 0b1111111 + (bits[3] << 7)
b = bits[4] * 0b1111111 + (bits[5] << 7)
return r, g, b
def _optional_user_arg(match: Match[str]) -> str:
_, _, rest = match['msg'].strip().partition(' ')
if rest:
return rest.lstrip('@')
else:
return match['user']
async def send(
writer: asyncio.StreamWriter,
msg: str,
*,
quiet: bool = False,
) -> None:
if not quiet:
print(f'< {msg}', end='', flush=True, file=sys.stderr)
writer.write(msg.encode())
return await writer.drain()
async def recv(
reader: asyncio.StreamReader,
*,
quiet: bool = False,
) -> bytes:
data = await reader.readline()
if not quiet:
sys.stderr.buffer.write(b'> ')
sys.stderr.buffer.write(data)
sys.stderr.flush()
return data
class Response:
async def __call__(self, config: Config) -> Optional[str]:
return None
class CmdResponse(Response):
def __init__(self, cmd: str) -> None:
self.cmd = cmd
async def __call__(self, config: Config) -> Optional[str]:
return self.cmd
class MessageResponse(Response):
def __init__(self, match: Match[str], msg_fmt: str) -> None:
self.match = match
self.msg_fmt = msg_fmt
async def __call__(self, config: Config) -> Optional[str]:
params = self.match.groupdict()
params['msg'] = self.msg_fmt.format(**params)
return PRIVMSG.format(**params)
Callback = Callable[[Match[str]], Response]
HANDLERS: List[Tuple[Pattern[str], Callable[[Match[str]], Response]]]
HANDLERS = []
def handler(
*prefixes: str,
flags: re.RegexFlag = re.U,
) -> Callable[[Callback], Callback]:
def handler_decorator(func: Callback) -> Callback:
for prefix in prefixes:
HANDLERS.append((re.compile(prefix + '\r\n$', flags=flags), func))
return func
return handler_decorator
def handle_message(
*message_prefixes: str,
flags: re.RegexFlag = re.U,
) -> Callable[[Callback], Callback]:
return handler(
*(
f'^@(?P<info>[^ ]+) :(?P<user>[^!]+).* '
f'PRIVMSG #(?P<channel>[^ ]+) '
f':(?P<msg>{message_prefix}.*)'
for message_prefix in message_prefixes
), flags=flags,
)
_POINTS_HANDLERS = {}
def channel_points_handler(reward_id: str) -> Callable[[Callback], Callback]:
def channel_points_handler_decorator(func: Callback) -> Callback:
_POINTS_HANDLERS[reward_id] = func
return func
return channel_points_handler_decorator
@handler('^PING (.*)')
def pong(match: Match[str]) -> Response:
return CmdResponse(f'PONG {match.group(1)}\r\n')
_TEXT_COMMANDS = (
# these have to be first so it does not get overridden by !keyboard
(
'!keyboard2',
'this is my second mechanical keyboard: '
'https://i.fluffy.cc/CDtRzWX1JZTbqzKswHrZsF7HPX2zfLL1.png',
),
(
'!keyboard3',
'this is my stream deck keyboard (cherry mx black silent):'
'https://keeb.io/products/bdn9-3x3-9-key-macropad-rotary-encoder-support ' # noqa: E501
'here is more info: https://www.youtube.com/watch?v=p2TyRIAxR48',
),
(
'!keyboard4',
'this is my streaming keyboard (cherry mx clears) '
'(contributed by PhillipWei): https://amzn.to/3jzmwh3',
),
# the rest of these are sorted by command
(
'!bot',
'I wrote the bot! https://github.com/anthonywritescode/twitch-chat-bot', # noqa: E501
),
(
'!discord',
'We do have Discord, you are welcome to join: '
'https://discord.gg/xDKGPaW',
),
(
'!donate',
"donations are appreciated but not necessary -- if you'd like to "
'donate, you can donate at https://streamlabs.com/anthonywritescode',
),
(
'!editor',
'this is my text editor I made, called babi! '
'https://github.com/asottile/babi '
'more info in this video: https://www.youtube.com/watch?v=WyR1hAGmR3g',
),
('!emoji', 'anthon63DumpsterFire anthon63Pythonk anthon63HelloHello'),
(
'!explain',
'https://www.youtube.com/playlist?list=PLWBKAf81pmOaP9naRiNAqug6EBnkPakvY', # noqa: E501
),
(
'!faq',
'https://www.youtube.com/playlist?list=PLWBKAf81pmOZEPeIV2_pIESK5hRMAo1hR', # noqa: E501
),
(
'!github',
"anthony's github is https://github.com/asottile -- stream github is "
'https://github.com/anthonywritescode',
),
('!homeland', 'WE WILL PROTECT OUR HOMELAND!'),
(
'!keyboard',
'kinesis freestyle pro (cherry mx reds) https://amzn.to/3jyN4PC',
),
('!levelup', 'https://i.imgur.com/Uoq5vGx.gif'),
('!lurk', 'thanks for lurking, {user}!'),
('!ohai', 'ohai, {user}!'),
('!playlist', 'HearWeGo: https://www.youtube.com/playlist?list=PL44UysF4ZQ23B_ITIqM8Fqt1UXgsA9yD6'), # noqa: E501
('!twitter', 'https://twitter.com/codewithanthony'),
('!water', 'DRINK WATER, BITCH'),
('!youtube', 'https://youtube.com/anthonywritescode'),
)
def _generic_msg_handler(match: Match[str], *, msg: str) -> Response:
return MessageResponse(match, msg)
for _cmd, _msg in _TEXT_COMMANDS:
handle_message(_cmd)(functools.partial(_generic_msg_handler, msg=_msg))
@handle_message('!still')
def cmd_still(match: Match[str]) -> Response:
_, _, rest = match['msg'].partition(' ')
year = datetime.date.today().year
lol = random.choice(['LOL', 'LOLW', 'LMAO', 'NUUU'])
return MessageResponse(match, f'{esc(rest)}, in {year} - {lol}!')
@handle_message('!bonk', flags=re.IGNORECASE)
def cmd_bonk(match: Match[str]) -> Response:
_, _, rest = match['msg'].partition(' ')
rest = rest.strip() or 'Makayla_Fox'
return MessageResponse(
match,
f'{esc(rest)}: '
f'https://i.fluffy.cc/DM4QqzjR7wCpkGPwTl6zr907X50XgtBL.png',
)
async def ensure_today_table_exists(db: aiosqlite.Connection) -> None:
await db.execute(
'CREATE TABLE IF NOT EXISTS today ('
' msg TEXT NOT NULL,'
' timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP'
')',
)
await db.commit()
async def set_today(db: aiosqlite.Connection, msg: str) -> None:
await ensure_today_table_exists(db)
await db.execute('INSERT INTO today (msg) VALUES (?)', (msg,))
await db.commit()
async def get_today(db: aiosqlite.Connection) -> str:
await ensure_today_table_exists(db)
query = 'SELECT msg FROM today ORDER BY ROWID DESC LIMIT 1'
async with db.execute(query) as cursor:
row = await cursor.fetchone()
if row is None:
return 'not working on anything?'
else:
return esc(row[0])
class TodayResponse(MessageResponse):
def __init__(self, match: Match[str]) -> None:
super().__init__(match, '')
async def __call__(self, config: Config) -> Optional[str]:
async with aiosqlite.connect('db.db') as db:
self.msg_fmt = await get_today(db)
return await super().__call__(config)
@handle_message('!today', '!project')
def cmd_today(match: Match[str]) -> Response:
return TodayResponse(match)
class SetTodayResponse(MessageResponse):
def __init__(self, match: Match[str], msg: str) -> None:
super().__init__(match, 'updated!')
self.msg = msg
async def __call__(self, config: Config) -> Optional[str]:
async with aiosqlite.connect('db.db') as db:
await set_today(db, self.msg)
return await super().__call__(config)
@handle_message('!settoday')
def cmd_settoday(match: Match[str]) -> Response:
if not _is_moderator(match) and match['user'] != match['channel']:
return MessageResponse(
match, 'https://www.youtube.com/watch?v=RfiQYRn7fBg',
)
_, _, rest = match['msg'].partition(' ')
return SetTodayResponse(match, rest)
async def ensure_motd_table_exists(db: aiosqlite.Connection) -> None:
await db.execute(
'CREATE TABLE IF NOT EXISTS motd ('
' user TEXT NOT NULL,'
' msg TEXT NOT NULL,'
' points INT NOT NULL,'
' timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP'
')',
)
await db.commit()
async def set_motd(db: aiosqlite.Connection, user: str, msg: str) -> None:
await ensure_motd_table_exists(db)
query = 'INSERT INTO motd (user, msg, points) VALUES (?, ?, ?)'
await db.execute(query, (user, msg, 250))
await db.commit()
async def get_motd(db: aiosqlite.Connect) -> str:
await ensure_motd_table_exists(db)
query = 'SELECT msg FROM motd ORDER BY ROWID DESC LIMIT 1'
async with db.execute(query) as cursor:
row = await cursor.fetchone()
if row is None:
return 'nothing???'
else:
return esc(row[0])
class SetMotdResponse(MessageResponse):
def __init__(self, match: Match[str]) -> None:
super().__init__(match, 'motd updated! thanks for spending points!')
self.user = match['user']
self.msg = match['msg']
async def __call__(self, config: Config) -> Optional[str]:
async with aiosqlite.connect('db.db') as db:
await set_motd(db, self.user, self.msg)
return await super().__call__(config)
@channel_points_handler('a2fa47a2-851e-40db-b909-df001801cade')
def cmd_set_motd(match: Match[str]) -> Response:
return SetMotdResponse(match)
class MotdResponse(MessageResponse):
def __init__(self, match: Match[str]) -> None:
super().__init__(match, '')
async def __call__(self, config: Config) -> Optional[str]:
async with aiosqlite.connect('db.db') as db:
self.msg_fmt = await get_motd(db)
return await super().__call__(config)
@handle_message('!motd')
def cmd_motd(match: Match[str]) -> Response:
return MotdResponse(match)
async def check_call(*cmd: str) -> None:
proc = await asyncio.subprocess.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.DEVNULL,
)
await proc.communicate()
if proc.returncode != 0:
raise ValueError(cmd, proc.returncode)
class VideoIdeaResponse(MessageResponse):
def __init__(self, match: Match[str], videoidea: str) -> None:
super().__init__(
match,
'added! https://github.com/asottile/scratch/wiki/anthony-explains-ideas', # noqa: E501
)
self.videoidea = videoidea
async def __call__(self, config: Config) -> Optional[str]:
async def _git(*cmd: str) -> None:
await check_call('git', '-C', tmpdir, *cmd)
with tempfile.TemporaryDirectory() as tmpdir:
await _git(
'clone', '--depth=1', '--quiet',
'git@github.com:asottile/scratch.wiki', '.',
)
ideas_file = os.path.join(tmpdir, 'anthony-explains-ideas.md')
with open(ideas_file, 'rb+') as f:
f.seek(-1, os.SEEK_END)
c = f.read()
if c != b'\n':
f.write(b'\n')
f.write(f'- {self.videoidea}\n'.encode())
await _git('add', '.')
await _git('commit', '-q', '-m', 'idea added by !videoidea')
await _git('push', '-q', 'origin', 'HEAD')
return await super().__call__(config)
@handle_message('![wv]ideoidea')
def cmd_videoidea(match: Match[str]) -> Response:
if not _is_moderator(match) and match['user'] != match['channel']:
return MessageResponse(
match, 'https://www.youtube.com/watch?v=RfiQYRn7fBg',
)
_, _, rest = match['msg'].partition(' ')
return VideoIdeaResponse(match, rest)
class UptimeResponse(Response):
async def __call__(self, config: Config) -> Optional[str]:
url = f'https://api.twitch.tv/helix/streams?user_login={config.channel}' # noqa: E501
headers = {
'Authorization': f'Bearer {config.oauth_token.split(":")[1]}',
'Client-ID': config.client_id,
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
text = await response.text()
data = json.loads(text)['data']
if not data:
msg = 'not currently streaming!'
return PRIVMSG.format(channel=config.channel, msg=msg)
start_time_s = data[0]['started_at']
start_time = datetime.datetime.strptime(
start_time_s, '%Y-%m-%dT%H:%M:%SZ',
)
elapsed = (datetime.datetime.utcnow() - start_time).seconds
parts = []
for n, unit in (
(60 * 60, 'hours'),
(60, 'minutes'),
(1, 'seconds'),
):
if elapsed // n:
parts.append(f'{elapsed // n} {unit}')
elapsed %= n
msg = f'streaming for: {", ".join(parts)}'
return PRIVMSG.format(channel=config.channel, msg=msg)
@handle_message('!uptime')
def cmd_uptime(match: Match[str]) -> Response:
return UptimeResponse()
@async_lru.alru_cache(maxsize=32)
async def fetch_twitch_user(
user: str,
*,
oauth_token: str,
client_id: str
) -> Optional[List[Dict[str, Any]]]:
url = 'https://api.twitch.tv/helix/users'
params = [('login', user)]
headers = {
'Authorization': f'Bearer {oauth_token}',
'Client-ID': client_id,
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, headers=headers) as resp:
json_resp = await resp.json()
return json_resp.get('data')
async def fetch_twitch_user_follows(
*,
from_id: int,
to_id: int,
oauth_token: str,
client_id: str
) -> Optional[List[Dict[str, Any]]]:
url = 'https://api.twitch.tv/helix/users/follows'
params = [('from_id', from_id), ('to_id', to_id)]
headers = {
'Authorization': f'Bearer {oauth_token}',
'Client-ID': client_id,
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, headers=headers) as resp:
json_resp = await resp.json()
return json_resp.get('data')
class FollowageResponse(Response):
def __init__(self, username: str) -> None:
self.username = username
async def __call__(self, config: Config) -> Optional[str]:
token = config.oauth_token.split(':')[1]
fetched_users = await fetch_twitch_user(
config.channel,
oauth_token=token,
client_id=config.client_id,
)
assert fetched_users is not None
me, = fetched_users
fetched_users = await fetch_twitch_user(
self.username,
oauth_token=token,
client_id=config.client_id,
)
if not fetched_users:
msg = f'user {esc(self.username)} not found!'
return PRIVMSG.format(channel=config.channel, msg=msg)
target_user, = fetched_users
# if streamer wants to check the followage to their own channel
if me['id'] == target_user['id']:
msg = (
f"@{esc(target_user['login'])}, you can't check !followage "
f'to your own channel. But I appreciate your curiosity!'
)
return PRIVMSG.format(channel=config.channel, msg=msg)
follow_age_results = await fetch_twitch_user_follows(
from_id=target_user['id'],
to_id=me['id'],
oauth_token=token,
client_id=config.client_id,
)
if not follow_age_results:
msg = f'{esc(target_user["login"])} is not a follower!'
return PRIVMSG.format(channel=config.channel, msg=msg)
follow_age, = follow_age_results
now = datetime.datetime.utcnow()
date_of_follow = datetime.datetime.fromisoformat(
# twitch sends ISO date string with "Z" at the end,
# which python's fromisoformat method does not like
follow_age['followed_at'].rstrip('Z'),
)
delta = now - date_of_follow
msg = (
f'{esc(follow_age["from_name"])} has been following for '
f'{esc(naturaldelta(delta))}!'
)
return PRIVMSG.format(channel=config.channel, msg=msg)
# !followage -> valid, checks the caller
# !followage anthonywritescode -> valid, checks the user passed in payload
# !followage foo bar -> still valid, however the whole
# "foo bar" will be processed as a username
@handle_message('!followage')
def cmd_followage(match: Match[str]) -> Response:
return FollowageResponse(_optional_user_arg(match))
@handle_message(r'!pep[ ]?(?P<pep_num>\d{1,4})')
def cmd_pep(match: Match[str]) -> Response:
n = str(int(match['pep_num'])).zfill(4)
return MessageResponse(match, f'https://www.python.org/dev/peps/pep-{n}/')
@handle_message('!joke')
def cmd_joke(match: Match[str]) -> Response:
return MessageResponse(match, esc(pyjokes.get_joke()))
@handle_message('!so (?P<user_channel>.+)')
def cmd_shoutout(match: Match[str]) -> Response:
if not _is_moderator(match) and match['user'] != match['channel']:
return MessageResponse(
match, 'https://www.youtube.com/watch?v=RfiQYRn7fBg',
)
user = match['user_channel']
return MessageResponse(
match,
f'you should check out https://twitch.tv/{esc(user)} !',
)
CHAT_LOG_RE = re.compile(
r'^\[[^]]+\][^<*]*(<(?P<chat_user>[^>]+)>|\* (?P<action_user>[^ ]+))',
)
@functools.lru_cache(maxsize=None)
def _counts_per_file(filename: str) -> Mapping[str, int]:
counts: Counter[str] = collections.Counter()
with open(filename) as f:
for line in f:
match = CHAT_LOG_RE.match(line)
assert match, line
user = match['chat_user'] or match['action_user']
assert user, line
counts[user.lower()] += 1
return counts
def _chat_rank_counts() -> Counter[str]:
total: Counter[str] = collections.Counter()
for filename in os.listdir('logs'):
full_filename = os.path.join('logs', filename)
if filename != f'{datetime.date.today()}.log':
total.update(_counts_per_file(full_filename))
else:
# don't use the cached version for today's logs
total.update(_counts_per_file.__wrapped__(full_filename))
return total
def chat_rank(username: str) -> Optional[Tuple[int, int]]:
total = _chat_rank_counts()
username = username.lower()
for i, (candidate, count) in enumerate(total.most_common(), start=1):
if candidate == username:
return i, count
else:
return None
@handle_message('!chatrank')
def cmd_chatrank(match: Match[str]) -> Response:
user = _optional_user_arg(match)
ret = chat_rank(user)
if ret is None:
return MessageResponse(match, f'user not found {esc(user)}')
else:
rank, n = ret
logs_start = min(os.listdir('logs'))
logs_start, _, _ = logs_start.partition('.')
return MessageResponse(
match,
f'{esc(user)} is ranked #{rank} with {n} messages '
f'(since {logs_start})',
)
@handle_message('!top10chat')
def cmd_top_10_chat(match: Match[str]) -> Response:
total = _chat_rank_counts()
return MessageResponse(
match,
', '.join(
f'{rank}. {user}({n})'
for rank, (user, n) in enumerate(total.most_common(10), start=1)
),
)
COMMAND_RE = re.compile(r'!\w+')
SECRET_CMDS = frozenset(('!settoday', '!so'))
@handle_message(r'!\w')
def cmd_help(match: Match[str]) -> Response:
possible = [COMMAND_RE.search(reg.pattern) for reg, _ in HANDLERS]
possible_cmds = {match[0] for match in possible if match} - SECRET_CMDS
commands = ['!help'] + sorted(possible_cmds)
msg = f'possible commands: {", ".join(commands)}'
if not match['msg'].startswith('!help'):
msg = f'unknown command ({esc(match["msg"].split()[0])}), {msg}'
return MessageResponse(match, msg)
@handle_message('PING')
def msg_ping(match: Match[str]) -> Response:
_, _, rest = match['msg'].partition(' ')
return MessageResponse(match, f'PONG {esc(rest)}')
@handle_message(
r'.*\b(?P<word>nano|linux|windows|emacs)\b', flags=re.IGNORECASE,
)
def msg_gnu_please(match: Match[str]) -> Response:
if random.randrange(0, 100) < 90:
return Response()
msg, word = match['msg'], match['word']
query = re.search(f'gnu[/+]{word}', msg, flags=re.IGNORECASE)
if query:
return MessageResponse(match, f'YES! {query[0]}')
else:
return MessageResponse(match, f"Um please, it's GNU+{esc(word)}!")
@handle_message(r'.*\bth[oi]nk(?:ing)?\b', flags=re.IGNORECASE)
def msg_think(match: Match[str]) -> Response:
return MessageResponse(match, 'anthon63Pythonk ' * 5)
# TODO: !tags, only allowed by stream admin / mods????
def dt_str() -> str:
dt_now = datetime.datetime.now()
return f'[{dt_now.hour:02}:{dt_now.minute:02}]'
def _shutdown(
writer: asyncio.StreamWriter,
loop: asyncio.AbstractEventLoop,
shutdown_task: Optional[asyncio.Task[Any]] = None,
) -> None:
print('bye!')
ignored_tasks = set()
if shutdown_task is not None:
ignored_tasks.add(shutdown_task)
if writer:
writer.close()
closing_task = loop.create_task(writer.wait_closed())
def cancel_tasks(fut: asyncio.Future[Any]) -> None:
tasks = [t for t in asyncio.all_tasks() if t not in ignored_tasks]
for task in tasks:
task.cancel()
closing_task.add_done_callback(cancel_tasks)
UNCOLOR_RE = re.compile(r'\033\[[^m]*m')
class LogWriter:
def __init__(self) -> None:
self.date = str(datetime.date.today())
def write_message(self, msg: str) -> None:
print(msg)
uncolored_msg = UNCOLOR_RE.sub('', msg)
os.makedirs('logs', exist_ok=True)
with open(os.path.join('logs', f'{self.date}.log'), 'a+') as f:
f.write(f'{uncolored_msg}\n')
async def handle_response(
config: Config,
match: Match[str],
handler: Callable[[Match[str]], Response],
writer: asyncio.StreamWriter,
log_writer: LogWriter,
*,
quiet: bool,
) -> None:
try:
res = await handler(match)(config)
except Exception as e:
traceback.print_exc()
res = PRIVMSG.format(
channel=config.channel,
msg=f'*** unhandled {type(e).__name__} -- see logs',
)
if res is not None:
send_match = SEND_MSG_RE.match(res)
if send_match:
color = '\033[1m\033[3m\033[38;5;21m'
log_writer.write_message(
f'{dt_str()}'
f'<{color}{config.username}\033[m> '
f'{send_match[1]}',
)
await send(writer, res, quiet=quiet)
async def amain(config: Config, *, quiet: bool) -> None:
log_writer = LogWriter()
reader, writer = await asyncio.open_connection(HOST, PORT, ssl=True)
loop = asyncio.get_event_loop()
shutdown_cb = functools.partial(_shutdown, writer, loop)
try:
loop.add_signal_handler(signal.SIGINT, shutdown_cb)
except NotImplementedError:
# Doh... Windows...
signal.signal(signal.SIGINT, lambda *_: shutdown_cb())
await send(writer, f'PASS {config.oauth_token}\r\n', quiet=True)
await send(writer, f'NICK {config.username}\r\n', quiet=quiet)
await send(writer, f'JOIN #{config.channel}\r\n', quiet=quiet)
await send(writer, 'CAP REQ :twitch.tv/tags\r\n', quiet=quiet)
while not writer.is_closing():
data = await recv(reader, quiet=quiet)
if not data:
return
msg = data.decode('UTF-8', errors='backslashreplace')
msg_match = MSG_RE.match(msg)
if msg_match:
info = _parse_badge_info(msg_match['info'])
if info['color']:
r, g, b = _parse_color(info['color'])
else:
r, g, b = _gen_color(info['display-name'])
color_start = f'\033[1m\033[38;2;{r};{g};{b}m'
if msg_match['msg'].startswith('\x01ACTION '):
log_writer.write_message(
f'{dt_str()}'
f'{_badges(info["badges"])}'
f'{color_start}\033[3m * {info["display-name"]}\033[22m '
f'{msg_match["msg"][8:-1]}\033[m',
)
else:
if info.get('msg-id') == 'highlighted-message':
msg_s = f'\033[48;2;117;094;188m{msg_match["msg"]}\033[m'
elif 'custom-reward-id' in info:
msg_s = f'\033[48;2;029;091;130m{msg_match["msg"]}\033[m'
else:
msg_s = msg_match['msg']
log_writer.write_message(
f'{dt_str()}'
f'{_badges(info["badges"])}'
f'<{color_start}{info["display-name"]}\033[m> '
f'{msg_s}',
)
if 'custom-reward-id' in info:
if info['custom-reward-id'] in _POINTS_HANDLERS:
handler = _POINTS_HANDLERS[info['custom-reward-id']]
coro = handle_response(
config, msg_match, handler, writer, log_writer,
quiet=quiet,
)
loop.create_task(coro)
elif not quiet:
print(f'UNHANDLED reward({info["custom-reward-id"]})')
continue
for pattern, handler in HANDLERS:
match = pattern.match(msg)
if match:
coro = handle_response(
config, match, handler, writer, log_writer, quiet=quiet,
)
loop.create_task(coro)
break
else:
if not quiet:
print(f'UNHANDLED: {msg}', end='')
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument('--config', default='config.json')
parser.add_argument('--verbose', action='store_true')
args = parser.parse_args()
with open(args.config) as f:
config = Config(**json.load(f))
with contextlib.suppress(KeyboardInterrupt):
asyncio.run(amain(config, quiet=not args.verbose))
return 0
if __name__ == '__main__':
exit(main())