twitch-chat-bot/bot.py

542 lines
16 KiB
Python
Raw Normal View History

2019-03-03 01:00:14 +00:00
import argparse
2020-05-04 19:35:44 +00:00
import asyncio.subprocess
2019-03-03 01:00:14 +00:00
import datetime
2020-05-04 19:41:31 +00:00
import hashlib
2019-03-03 01:00:14 +00:00
import json
2020-05-04 19:35:44 +00:00
import os.path
2019-07-20 21:00:08 +00:00
import random
2019-03-03 01:00:14 +00:00
import re
2020-05-04 19:41:31 +00:00
import struct
2019-03-03 01:00:14 +00:00
import sys
2020-05-04 19:35:44 +00:00
import tempfile
2019-03-03 01:00:14 +00:00
import traceback
from typing import Callable
2020-05-04 19:41:31 +00:00
from typing import Dict
2019-03-03 01:00:14 +00:00
from typing import List
from typing import Match
from typing import NamedTuple
from typing import NoReturn
from typing import Optional
from typing import Pattern
from typing import Tuple
import aiohttp
2019-08-11 00:19:15 +00:00
import aiosqlite
2019-03-03 01:00:14 +00:00
# TODO: allow host / port to be configurable
HOST = 'irc.chat.twitch.tv'
PORT = 6697
2020-05-04 19:41:31 +00:00
MSG_RE = re.compile('^@([^ ]+) :([^!]+).* PRIVMSG #[^ ]+ :([^\r]+)')
PRIVMSG = 'PRIVMSG #{channel} :{msg}\r\n'
SEND_MSG_RE = re.compile('^PRIVMSG #[^ ]+ :(?P<msg>[^\r]+)')
2019-03-03 01:00:14 +00:00
class Config(NamedTuple):
username: str
channel: str
oauth_token: str
client_id: str
def __repr__(self) -> str:
return (
f'{type(self).__name__}('
f'username={self.username!r}, '
f'channel={self.channel!r}, '
f'oauth_token={"***"!r}, '
f'client_id={"***"!r}, '
f')'
)
def esc(s: str) -> str:
return s.replace('{', '{{').replace('}', '}}')
2020-05-04 19:41:31 +00:00
def _parse_badge_info(s: str) -> Dict[str, str]:
ret = {}
for part in s.split(';'):
k, v = part.split('=', 1)
ret[k] = v
return ret
def _parse_color(s: str) -> Tuple[int, int, int]:
return int(s[1:3], 16), int(s[3:5], 16), int(s[5:7], 16)
2020-05-04 22:19:50 +00:00
def _badges(badges: str) -> str:
ret = ''
for reg, s in (
(re.compile('^vip/'), '\033[48;2;224;5;185m[♦]\033[m'),
(re.compile('^broadcaster/'), '\033[48;2;233;25;22m[☞]\033[m'),
(re.compile('^subscriber/'), '\033[48;2;130;5;180m[★]\033[m'),
(re.compile('^premium/'), '\033[48;2;0;160;214m[♕]\033[m'),
(re.compile('^sub-gifter/'), '\033[48;2;88;226;193m[◘]\033[m'),
(re.compile('^bits/'), '\033[48;2;203;200;208m[▴]\033[m'),
(re.compile('^bits-leader/'), '\033[48;2;230;186;72m[♦]\033[m'),
):
for badge in badges.split(','):
if reg.match(badge):
ret += s
return ret
2020-05-04 19:41:31 +00:00
def _gen_color(name: str) -> Tuple[int, int, int]:
h = hashlib.sha256(name.encode())
n, = struct.unpack('Q', h.digest()[:8])
bits = [int(s) for s in bin(n)[2:]]
r = bits[0] * 0b1111111 + (bits[1] << 7)
g = bits[2] * 0b1111111 + (bits[3] << 7)
b = bits[4] * 0b1111111 + (bits[5] << 7)
return r, g, b
2019-03-03 01:00:14 +00:00
async def send(
writer: asyncio.StreamWriter,
msg: str,
*,
quiet: bool = False,
) -> None:
if not quiet:
print(f'< {msg}', end='', flush=True, file=sys.stderr)
writer.write(msg.encode())
return await writer.drain()
async def recv(
reader: asyncio.StreamReader,
*,
quiet: bool = False,
) -> bytes:
data = await reader.readline()
2020-05-04 19:28:30 +00:00
if not data:
raise SystemExit('unexpected EOF')
2019-03-03 01:00:14 +00:00
if not quiet:
sys.stderr.buffer.write(b'> ')
sys.stderr.buffer.write(data)
sys.stderr.flush()
return data
class Response:
async def __call__(self, config: Config) -> Optional[str]:
return None
class CmdResponse(Response):
def __init__(self, cmd: str) -> None:
self.cmd = cmd
async def __call__(self, config: Config) -> Optional[str]:
return self.cmd
class MessageResponse(Response):
def __init__(self, match: Match[str], msg_fmt: str) -> None:
self.match = match
self.msg_fmt = msg_fmt
async def __call__(self, config: Config) -> Optional[str]:
params = self.match.groupdict()
params['msg'] = self.msg_fmt.format(**params)
return PRIVMSG.format(**params)
Callback = Callable[[Match[str]], Response]
HANDLERS: List[Tuple[Pattern[str], Callable[[Match[str]], Response]]]
HANDLERS = []
2019-10-19 19:10:47 +00:00
def handler(
*prefixes: str,
flags: re.RegexFlag = re.U,
) -> Callable[[Callback], Callback]:
2019-03-03 01:00:14 +00:00
def handler_decorator(func: Callback) -> Callback:
2019-08-30 04:19:09 +00:00
for prefix in prefixes:
HANDLERS.append((re.compile(prefix + '\r\n$', flags=flags), func))
2019-03-03 01:00:14 +00:00
return func
return handler_decorator
2019-10-19 19:10:47 +00:00
def handle_message(
*message_prefixes: str,
flags: re.RegexFlag = re.U,
) -> Callable[[Callback], Callback]:
2019-03-03 01:00:14 +00:00
return handler(
2019-08-30 04:19:09 +00:00
*(
2020-05-04 19:41:31 +00:00
f'^@(?P<info>[^ ]+) :(?P<user>[^!]+).* '
2019-08-30 04:19:09 +00:00
f'PRIVMSG #(?P<channel>[^ ]+) '
f':(?P<msg>{message_prefix}.*)'
for message_prefix in message_prefixes
), flags=flags,
2019-03-03 01:00:14 +00:00
)
@handler('^PING (.*)')
def pong(match: Match[str]) -> Response:
return CmdResponse(f'PONG {match.group(1)}\r\n')
@handle_message('!ohai')
def cmd_ohai(match: Match[str]) -> Response:
return MessageResponse(match, 'ohai, {user}!')
2019-08-30 03:55:52 +00:00
@handle_message('!lurk')
def cmd_lurk(match: Match[str]) -> Response:
return MessageResponse(match, 'thanks for lurking, {user}!')
2019-06-08 20:26:57 +00:00
@handle_message('!discord')
def cmd_discord(match: Match[str]) -> Response:
2019-06-08 20:51:36 +00:00
return MessageResponse(
match,
'We do have Discord, you are welcome to join: '
'https://discord.gg/HxpQ3px',
)
2019-06-08 20:26:57 +00:00
@handle_message('!homeland')
def cmd_russians(match: Match[str]) -> Response:
return MessageResponse(match, 'WE WILL PROTECT OUR HOMELAND!')
2019-07-20 18:56:17 +00:00
@handle_message('!emoji')
def cmd_emoji(match: Match[str]) -> Response:
2019-08-30 20:43:39 +00:00
return MessageResponse(match, 'anthon63DumpsterFire anthon63Pythonk')
2019-07-20 18:56:17 +00:00
2020-04-04 17:51:28 +00:00
@handle_message('!explain')
def cmd_explain(match: Match[str]) -> Response:
return MessageResponse(
match,
'https://www.youtube.com/playlist?list=PLWBKAf81pmOaP9naRiNAqug6EBnkPakvY', # noqa: E501
)
2019-07-13 23:37:25 +00:00
@handle_message('!keyboard2')
def keyboard2(match: Match[str]) -> Response:
return MessageResponse(
match,
'this is my second mechanical keyboard: '
'https://i.fluffy.cc/CDtRzWX1JZTbqzKswHrZsF7HPX2zfLL1.png',
)
@handle_message('!keyboard')
def keyboard(match: Match[str]) -> Response:
return MessageResponse(
match,
'this is my streaming keyboard (contributed by PhillipWei): '
2019-11-17 00:00:30 +00:00
'https://www.wasdkeyboards.com/code-v3-87-key-mechanical-keyboard-zealio-67g.html', # noqa: E501
2019-07-13 23:37:25 +00:00
)
2019-10-12 21:06:44 +00:00
@handle_message('!github')
def github(match: Match[str]) -> Response:
return MessageResponse(
match,
"anthony's github is https://github.com/asottile -- stream github is "
"https://github.com/anthonywritescode",
)
2019-07-20 21:00:08 +00:00
@handle_message('!still')
def cmd_still(match: Match[str]) -> Response:
2020-05-04 19:41:31 +00:00
_, _, rest = match['msg'].partition(' ')
2019-07-20 21:00:08 +00:00
year = datetime.date.today().year
lol = random.choice(['LOL', 'LOLW', 'LMAO', 'NUUU'])
return MessageResponse(match, f'{esc(rest)}, in {year} - {lol}!')
2019-08-11 00:19:15 +00:00
async def ensure_table_exists(db: aiosqlite.Connection) -> None:
await db.execute(
'CREATE TABLE IF NOT EXISTS today ('
' msg TEXT NOT NULL,'
' timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP'
')',
)
await db.commit()
async def set_today(db: aiosqlite.Connection, msg: str) -> None:
await ensure_table_exists(db)
await db.execute('INSERT INTO today (msg) VALUES (?)', (msg,))
await db.commit()
async def get_today(db: aiosqlite.Connection) -> str:
await ensure_table_exists(db)
query = 'SELECT msg FROM today ORDER BY ROWID DESC LIMIT 1'
async with db.execute(query) as cursor:
row = await cursor.fetchone()
if row is None:
return 'not working on anything?'
else:
return esc(row[0])
class TodayResponse(MessageResponse):
def __init__(self, match: Match[str]) -> None:
super().__init__(match, '')
async def __call__(self, config: Config) -> Optional[str]:
async with aiosqlite.connect('db.db') as db:
self.msg_fmt = await get_today(db)
return await super().__call__(config)
2019-08-30 04:19:09 +00:00
@handle_message('!today', '!project')
2019-08-11 00:19:15 +00:00
def cmd_today(match: Match[str]) -> Response:
return TodayResponse(match)
class SetTodayResponse(MessageResponse):
def __init__(self, match: Match[str], msg: str) -> None:
super().__init__(match, 'updated!')
self.msg = msg
async def __call__(self, config: Config) -> Optional[str]:
async with aiosqlite.connect('db.db') as db:
await set_today(db, self.msg)
return await super().__call__(config)
@handle_message('!settoday')
def cmd_settoday(match: Match[str]) -> Response:
if match['user'] != match['channel']:
return MessageResponse(
match, 'https://www.youtube.com/watch?v=RfiQYRn7fBg',
)
2020-05-04 19:41:31 +00:00
_, _, rest = match['msg'].partition(' ')
2019-08-11 00:19:15 +00:00
return SetTodayResponse(match, rest)
2020-05-04 19:35:44 +00:00
async def check_call(*cmd: str) -> None:
proc = await asyncio.subprocess.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.DEVNULL,
)
await proc.communicate()
if proc.returncode != 0:
raise ValueError(cmd, proc.returncode)
class VideoIdeaResponse(MessageResponse):
def __init__(self, match: Match[str], videoidea: str) -> None:
super().__init__(
match,
'added! https://github.com/asottile/scratch/wiki/anthony-explains-ideas', # noqa: E501
)
self.videoidea = videoidea
async def __call__(self, config: Config) -> Optional[str]:
async def _git(*cmd: str) -> None:
await check_call('git', '-C', tmpdir, *cmd)
with tempfile.TemporaryDirectory() as tmpdir:
await _git(
'clone', '--depth=1', '--quiet',
'git@github.com:asottile/scratch.wiki', '.',
)
ideas_file = os.path.join(tmpdir, 'anthony-explains-ideas.md')
with open(ideas_file, 'rb+') as f:
f.seek(-1, os.SEEK_END)
c = f.read()
if c != b'\n':
f.write(b'\n')
f.write(f'- {self.videoidea}\n'.encode())
await _git('add', '.')
await _git('commit', '-q', '-m', 'idea added by !videoidea')
await _git('push', '-q', 'origin', 'HEAD')
return await super().__call__(config)
@handle_message('![wv]ideoidea')
def cmd_videoidea(match: Match[str]) -> Response:
if match['user'] != match['channel']:
return MessageResponse(
match, 'https://www.youtube.com/watch?v=RfiQYRn7fBg',
)
_, _, rest = match['msg'].partition(' ')
return VideoIdeaResponse(match, rest)
2019-03-03 01:00:14 +00:00
class UptimeResponse(Response):
async def __call__(self, config: Config) -> Optional[str]:
url = f'https://api.twitch.tv/helix/streams?user_login={config.channel}' # noqa: E501
2020-05-04 22:19:25 +00:00
headers = {
'Authorization': f'Bearer {config.oauth_token.split(":")[1]}',
'Client-ID': config.client_id,
}
2019-03-03 01:00:14 +00:00
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
text = await response.text()
2019-03-07 01:31:25 +00:00
data = json.loads(text)['data']
if not data:
msg = 'not currently streaming!'
return PRIVMSG.format(channel=config.channel, msg=msg)
start_time_s = data[0]['started_at']
2019-03-03 01:00:14 +00:00
start_time = datetime.datetime.strptime(
start_time_s, '%Y-%m-%dT%H:%M:%SZ',
)
elapsed = (datetime.datetime.utcnow() - start_time).seconds
parts = []
for n, unit in (
(60 * 60, 'hours'),
(60, 'minutes'),
(1, 'seconds'),
):
if elapsed // n:
parts.append(f'{elapsed // n} {unit}')
elapsed %= n
msg = f'streaming for: {", ".join(parts)}'
return PRIVMSG.format(channel=config.channel, msg=msg)
@handle_message('!uptime')
def cmd_uptime(match: Match[str]) -> Response:
return UptimeResponse()
@handle_message(r'!pep[ ]?(?P<pep_num>\d{1,4})')
def cmd_pep(match: Match[str]) -> Response:
2020-05-04 19:41:31 +00:00
n = str(int(match['pep_num'])).zfill(4)
2019-11-23 23:47:20 +00:00
return MessageResponse(match, f'https://www.python.org/dev/peps/pep-{n}/')
2020-05-04 19:38:28 +00:00
@handle_message(r'!water')
def cmd_water(match: Match[str]) -> Response:
return MessageResponse(match, 'DRINK WATER, BITCH')
@handle_message(r'!levelup')
def cmd_levelup(match: Match[str]) -> Response:
return MessageResponse(match, 'https://i.imgur.com/Uoq5vGx.gif')
2019-06-08 20:51:36 +00:00
COMMAND_RE = re.compile(r'!\w+')
2019-08-11 00:19:15 +00:00
SECRET_CMDS = frozenset(('!settoday',))
2019-06-08 20:51:36 +00:00
2019-03-03 01:00:14 +00:00
@handle_message(r'!\w')
def cmd_help(match: Match[str]) -> Response:
2019-06-08 20:51:36 +00:00
possible = [COMMAND_RE.search(reg.pattern) for reg, _ in HANDLERS]
2019-08-11 00:19:15 +00:00
possible_cmds = {match[0] for match in possible if match} - SECRET_CMDS
commands = ['!help'] + sorted(possible_cmds)
2019-06-08 20:51:36 +00:00
msg = f'possible commands: {", ".join(commands)}'
2019-03-03 01:00:14 +00:00
if not match['msg'].startswith('!help'):
msg = f'unknown command ({esc(match["msg"].split()[0])}), {msg}'
return MessageResponse(match, msg)
@handle_message('PING')
def msg_ping(match: Match[str]) -> Response:
2020-05-04 19:41:31 +00:00
_, _, rest = match['msg'].partition(' ')
2019-03-03 01:00:14 +00:00
return MessageResponse(match, f'PONG {esc(rest)}')
2019-10-19 21:10:46 +00:00
@handle_message(r'.*\b(nano|linux|windows|emacs)\b', flags=re.IGNORECASE)
2019-05-25 23:12:27 +00:00
def msg_gnu_please(match: Match[str]) -> Response:
msg, word = match[3], match[4]
query = re.search(f'gnu[/+]{word}', msg, flags=re.IGNORECASE)
if query:
2019-10-19 22:03:52 +00:00
return MessageResponse(match, f'YES! {query[0]}')
2019-05-25 23:12:27 +00:00
else:
2019-10-19 22:04:50 +00:00
return MessageResponse(match, f"Um please, it's GNU+{esc(word)}!")
2019-05-25 23:12:27 +00:00
2019-12-09 04:30:55 +00:00
@handle_message(r'.*\bth[oi]nk(?:ing)?\b', flags=re.IGNORECASE)
def msg_think(match: Match[str]) -> Response:
return MessageResponse(match, 'anthon63Pythonk ' * 5)
2019-03-03 01:00:14 +00:00
# TODO: !tags, only allowed by stream admin / mods????
def dt_str() -> str:
dt_now = datetime.datetime.now()
2019-12-09 07:05:00 +00:00
return f'[{dt_now.hour:02}:{dt_now.minute:02}]'
2019-03-03 01:00:14 +00:00
async def amain(config: Config, *, quiet: bool) -> NoReturn:
2019-03-03 01:00:14 +00:00
reader, writer = await asyncio.open_connection(HOST, PORT, ssl=True)
await send(writer, f'PASS {config.oauth_token}\r\n', quiet=True)
await send(writer, f'NICK {config.username}\r\n', quiet=quiet)
await send(writer, f'JOIN #{config.channel}\r\n', quiet=quiet)
2020-05-04 19:41:31 +00:00
await send(writer, 'CAP REQ :twitch.tv/tags\r\n', quiet=quiet)
2019-03-03 01:00:14 +00:00
while True:
data = await recv(reader, quiet=quiet)
2019-03-03 01:00:14 +00:00
msg = data.decode('UTF-8', errors='backslashreplace')
msg_match = MSG_RE.match(msg)
if msg_match:
2020-05-04 19:41:31 +00:00
info = _parse_badge_info(msg_match[1])
if info['color']:
r, g, b = _parse_color(info['color'])
else:
r, g, b = _gen_color(info['display-name'])
2020-05-04 22:42:48 +00:00
color_start = f'\033[1m\033[38;2;{r};{g};{b}m'
if msg_match[3].startswith('\x01ACTION '):
print(
f'{dt_str()}'
f'{_badges(info["badges"])}'
f'{color_start}\033[3m * {info["display-name"]}\033[22m '
f'{msg_match[3][8:-1]}\033[m',
)
else:
print(
f'{dt_str()}'
f'{_badges(info["badges"])}'
f'<{color_start}{info["display-name"]}\033[m> '
f'{msg_match[3]}',
)
2019-03-03 01:00:14 +00:00
for pattern, handler in HANDLERS:
match = pattern.match(msg)
if match:
try:
res = await handler(match)(config)
except Exception as e:
traceback.print_exc()
res = PRIVMSG.format(
channel=config.channel,
msg=f'*** unhandled {type(e).__name__} -- see logs',
)
if res is not None:
send_match = SEND_MSG_RE.match(res)
if send_match:
2020-05-04 19:41:31 +00:00
color = '\033[1m\033[3m\033[38;5;21m'
print(
f'{dt_str()}'
f'<{color}{config.username}\033[m> '
f'{send_match[1]}',
)
await send(writer, res, quiet=quiet)
2019-03-03 01:00:14 +00:00
break
else:
if not quiet:
print(f'UNHANDLED: {msg}', end='')
2019-03-03 01:00:14 +00:00
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument('--config', default='config.json')
parser.add_argument('--verbose', action='store_true')
2019-03-03 01:00:14 +00:00
args = parser.parse_args()
with open(args.config) as f:
config = Config(**json.load(f))
asyncio.run(amain(config, quiet=not args.verbose))
2019-03-03 01:00:14 +00:00
return 0
if __name__ == '__main__':
exit(main())