twitch-chat-bot/bot.py

776 lines
24 KiB
Python
Raw Normal View History

from __future__ import annotations
2019-03-03 01:00:14 +00:00
import argparse
2020-05-04 19:35:44 +00:00
import asyncio.subprocess
import contextlib
2019-03-03 01:00:14 +00:00
import datetime
2020-05-05 18:42:18 +00:00
import functools
2020-05-04 19:41:31 +00:00
import hashlib
2019-03-03 01:00:14 +00:00
import json
2020-05-04 19:35:44 +00:00
import os.path
2019-07-20 21:00:08 +00:00
import random
2019-03-03 01:00:14 +00:00
import re
import signal
2020-05-04 19:41:31 +00:00
import struct
2019-03-03 01:00:14 +00:00
import sys
2020-05-04 19:35:44 +00:00
import tempfile
2019-03-03 01:00:14 +00:00
import traceback
2020-05-09 09:57:13 +00:00
from typing import Any
2019-03-03 01:00:14 +00:00
from typing import Callable
2020-05-04 19:41:31 +00:00
from typing import Dict
2019-03-03 01:00:14 +00:00
from typing import List
from typing import Match
from typing import NamedTuple
from typing import Optional
from typing import Pattern
from typing import Tuple
import aiohttp
2019-08-11 00:19:15 +00:00
import aiosqlite
2020-05-09 09:57:13 +00:00
import async_lru
2020-05-11 04:21:11 +00:00
import pyjokes
2020-05-09 09:57:13 +00:00
from humanize import naturaldelta
2019-03-03 01:00:14 +00:00
# TODO: allow host / port to be configurable
HOST = 'irc.chat.twitch.tv'
PORT = 6697
2020-05-04 19:41:31 +00:00
MSG_RE = re.compile('^@([^ ]+) :([^!]+).* PRIVMSG #[^ ]+ :([^\r]+)')
2020-05-09 09:57:13 +00:00
PRIVMSG = 'PRIVMSG #{channel} : {msg}\r\n'
SEND_MSG_RE = re.compile('^PRIVMSG #[^ ]+ :(?P<msg>[^\r]+)')
2019-03-03 01:00:14 +00:00
class Config(NamedTuple):
username: str
channel: str
oauth_token: str
client_id: str
def __repr__(self) -> str:
return (
f'{type(self).__name__}('
f'username={self.username!r}, '
f'channel={self.channel!r}, '
f'oauth_token={"***"!r}, '
f'client_id={"***"!r}, '
f')'
)
def esc(s: str) -> str:
return s.replace('{', '{{').replace('}', '}}')
2020-05-04 19:41:31 +00:00
def _parse_badge_info(s: str) -> Dict[str, str]:
ret = {}
for part in s.split(';'):
k, v = part.split('=', 1)
ret[k] = v
return ret
def _parse_color(s: str) -> Tuple[int, int, int]:
return int(s[1:3], 16), int(s[3:5], 16), int(s[5:7], 16)
2020-05-04 22:19:50 +00:00
def _badges(badges: str) -> str:
ret = ''
2020-05-07 17:25:25 +00:00
for s, reg in (
2020-05-11 21:07:29 +00:00
('\033[48;2;000;000;000m⚙\033[m', re.compile('^staff/')),
2020-05-18 22:23:05 +00:00
('\033[48;2;000;173;003m⚔\033[m', re.compile('^moderator/')),
2020-05-07 17:25:25 +00:00
('\033[48;2;224;005;185m♦\033[m', re.compile('^vip/')),
('\033[48;2;233;025;022m☞\033[m', re.compile('^broadcaster/')),
2020-05-09 17:48:40 +00:00
('\033[48;2;130;005;180m★\033[m', re.compile('^founder/')),
2020-05-07 17:25:25 +00:00
('\033[48;2;130;005;180m★\033[m', re.compile('^subscriber/')),
('\033[48;2;000;160;214m♕\033[m', re.compile('^premium/')),
2020-05-11 21:07:29 +00:00
('\033[48;2;089;057;154m♕\033[m', re.compile('^turbo/')),
2020-05-18 22:18:05 +00:00
('\033[48;2;230;186;072m◘\033[m', re.compile('^sub-gift-leader/')),
2020-05-07 17:25:25 +00:00
('\033[48;2;088;226;193m◘\033[m', re.compile('^sub-gifter/')),
2020-06-01 22:44:34 +00:00
('\033[48;2;183;125;029m♕\033[m', re.compile('^hype-train/')),
2020-05-07 17:25:25 +00:00
('\033[48;2;203;200;208m▴\033[m', re.compile('^bits/')),
('\033[48;2;230;186;072m♦\033[m', re.compile('^bits-leader/')),
2020-05-04 22:19:50 +00:00
):
for badge in badges.split(','):
if reg.match(badge):
ret += s
return ret
def _is_moderator(match: Match[str]) -> bool:
info = _parse_badge_info(match['info'])
badges = info['badges'].split(',')
return any(badge.startswith('moderator/') for badge in badges)
2020-05-04 19:41:31 +00:00
def _gen_color(name: str) -> Tuple[int, int, int]:
h = hashlib.sha256(name.encode())
n, = struct.unpack('Q', h.digest()[:8])
bits = [int(s) for s in bin(n)[2:]]
r = bits[0] * 0b1111111 + (bits[1] << 7)
g = bits[2] * 0b1111111 + (bits[3] << 7)
b = bits[4] * 0b1111111 + (bits[5] << 7)
return r, g, b
2019-03-03 01:00:14 +00:00
async def send(
writer: asyncio.StreamWriter,
msg: str,
*,
quiet: bool = False,
) -> None:
if not quiet:
print(f'< {msg}', end='', flush=True, file=sys.stderr)
writer.write(msg.encode())
return await writer.drain()
async def recv(
reader: asyncio.StreamReader,
*,
quiet: bool = False,
) -> bytes:
data = await reader.readline()
if not quiet:
sys.stderr.buffer.write(b'> ')
sys.stderr.buffer.write(data)
sys.stderr.flush()
return data
class Response:
async def __call__(self, config: Config) -> Optional[str]:
return None
class CmdResponse(Response):
def __init__(self, cmd: str) -> None:
self.cmd = cmd
async def __call__(self, config: Config) -> Optional[str]:
return self.cmd
class MessageResponse(Response):
def __init__(self, match: Match[str], msg_fmt: str) -> None:
self.match = match
self.msg_fmt = msg_fmt
async def __call__(self, config: Config) -> Optional[str]:
params = self.match.groupdict()
params['msg'] = self.msg_fmt.format(**params)
return PRIVMSG.format(**params)
Callback = Callable[[Match[str]], Response]
HANDLERS: List[Tuple[Pattern[str], Callable[[Match[str]], Response]]]
HANDLERS = []
2019-10-19 19:10:47 +00:00
def handler(
*prefixes: str,
flags: re.RegexFlag = re.U,
) -> Callable[[Callback], Callback]:
2019-03-03 01:00:14 +00:00
def handler_decorator(func: Callback) -> Callback:
2019-08-30 04:19:09 +00:00
for prefix in prefixes:
HANDLERS.append((re.compile(prefix + '\r\n$', flags=flags), func))
2019-03-03 01:00:14 +00:00
return func
return handler_decorator
2019-10-19 19:10:47 +00:00
def handle_message(
*message_prefixes: str,
flags: re.RegexFlag = re.U,
) -> Callable[[Callback], Callback]:
2019-03-03 01:00:14 +00:00
return handler(
2019-08-30 04:19:09 +00:00
*(
2020-05-04 19:41:31 +00:00
f'^@(?P<info>[^ ]+) :(?P<user>[^!]+).* '
2019-08-30 04:19:09 +00:00
f'PRIVMSG #(?P<channel>[^ ]+) '
f':(?P<msg>{message_prefix}.*)'
for message_prefix in message_prefixes
), flags=flags,
2019-03-03 01:00:14 +00:00
)
@handler('^PING (.*)')
def pong(match: Match[str]) -> Response:
return CmdResponse(f'PONG {match.group(1)}\r\n')
2020-05-05 18:42:18 +00:00
_TEXT_COMMANDS = (
2020-05-11 17:25:05 +00:00
# these have to be first so it does not get overridden by !keyboard
2020-05-05 19:27:14 +00:00
(
'!keyboard2',
'this is my second mechanical keyboard: '
'https://i.fluffy.cc/CDtRzWX1JZTbqzKswHrZsF7HPX2zfLL1.png',
),
2020-05-11 17:25:05 +00:00
(
'!keyboard3',
'this is my stream deck keyboard (cherry mx black silent):'
2020-07-13 20:09:16 +00:00
'https://keeb.io/products/bdn9-3x3-9-key-macropad-rotary-encoder-support ' # noqa: E501
'here is more info: https://www.youtube.com/watch?v=p2TyRIAxR48',
2020-05-11 17:25:05 +00:00
),
2020-06-01 22:37:20 +00:00
(
'!keyboard4',
'this is my streaming keyboard (cherry mx clears) '
'(contributed by PhillipWei): '
'https://www.wasdkeyboards.com/code-v3-87-key-mechanical-keyboard-zealio-67g.html', # noqa: E501
),
2020-05-05 19:27:14 +00:00
# the rest of these are sorted by command
2020-05-23 18:05:13 +00:00
(
'!bot',
'I wrote the bot! https://github.com/anthonywritescode/twitch-chat-bot', # noqa: E501
),
2020-05-05 18:42:18 +00:00
(
'!discord',
2019-06-08 20:51:36 +00:00
'We do have Discord, you are welcome to join: '
'https://discord.gg/xDKGPaW',
2020-05-05 18:42:18 +00:00
),
2020-07-06 20:48:01 +00:00
(
'!donate',
"donations are appreciated but not necessary -- if you'd like to "
'donate, you can donate at https://streamlabs.com/anthonywritescode',
),
2020-05-15 21:44:20 +00:00
(
'!editor',
'this is my text editor I made, called babi! '
'https://github.com/asottile/babi '
'more info in this video: https://www.youtube.com/watch?v=WyR1hAGmR3g',
2020-05-15 21:44:20 +00:00
),
2020-06-15 20:06:00 +00:00
('!emoji', 'anthon63DumpsterFire anthon63Pythonk anthon63HelloHello'),
2020-05-05 18:42:18 +00:00
(
'!explain',
2020-04-04 17:51:28 +00:00
'https://www.youtube.com/playlist?list=PLWBKAf81pmOaP9naRiNAqug6EBnkPakvY', # noqa: E501
2020-05-05 18:42:18 +00:00
),
2020-06-21 21:05:25 +00:00
(
'!faq',
'https://www.youtube.com/playlist?list=PLWBKAf81pmOZEPeIV2_pIESK5hRMAo1hR', # noqa: E501
),
2020-05-05 18:42:18 +00:00
(
'!github',
"anthony's github is https://github.com/asottile -- stream github is "
2020-05-09 09:57:13 +00:00
'https://github.com/anthonywritescode',
2020-05-05 18:42:18 +00:00
),
('!homeland', 'WE WILL PROTECT OUR HOMELAND!'),
(
'!keyboard',
2020-06-01 22:37:20 +00:00
'kinesis freestyle pro (cherry mx reds) '
'https://kinesis-ergo.com/shop/freestyle-pro/',
2020-05-05 18:42:18 +00:00
),
('!levelup', 'https://i.imgur.com/Uoq5vGx.gif'),
('!lurk', 'thanks for lurking, {user}!'),
('!ohai', 'ohai, {user}!'),
('!twitter', 'https://twitter.com/codewithanthony'),
('!water', 'DRINK WATER, BITCH'),
('!youtube', 'https://youtube.com/anthonywritescode'),
)
2019-07-13 23:37:25 +00:00
2020-05-05 18:42:18 +00:00
def _generic_msg_handler(match: Match[str], *, msg: str) -> Response:
return MessageResponse(match, msg)
2019-07-13 23:37:25 +00:00
2020-05-05 18:42:18 +00:00
for _cmd, _msg in _TEXT_COMMANDS:
handle_message(_cmd)(functools.partial(_generic_msg_handler, msg=_msg))
2019-10-12 21:06:44 +00:00
2019-07-20 21:00:08 +00:00
@handle_message('!still')
def cmd_still(match: Match[str]) -> Response:
2020-05-04 19:41:31 +00:00
_, _, rest = match['msg'].partition(' ')
2019-07-20 21:00:08 +00:00
year = datetime.date.today().year
lol = random.choice(['LOL', 'LOLW', 'LMAO', 'NUUU'])
return MessageResponse(match, f'{esc(rest)}, in {year} - {lol}!')
2020-07-13 18:54:37 +00:00
@handle_message('!bonk', flags=re.IGNORECASE)
2020-07-10 20:22:32 +00:00
def cmd_bonk(match: Match[str]) -> Response:
_, _, rest = match['msg'].partition(' ')
rest = rest.strip() or 'Makayla_Fox'
2020-07-10 20:22:32 +00:00
return MessageResponse(
match,
f'{esc(rest)}: '
f'https://i.fluffy.cc/DM4QqzjR7wCpkGPwTl6zr907X50XgtBL.png',
2020-07-10 20:22:32 +00:00
)
2019-08-11 00:19:15 +00:00
async def ensure_table_exists(db: aiosqlite.Connection) -> None:
await db.execute(
'CREATE TABLE IF NOT EXISTS today ('
' msg TEXT NOT NULL,'
' timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP'
')',
)
await db.commit()
async def set_today(db: aiosqlite.Connection, msg: str) -> None:
await ensure_table_exists(db)
await db.execute('INSERT INTO today (msg) VALUES (?)', (msg,))
await db.commit()
async def get_today(db: aiosqlite.Connection) -> str:
await ensure_table_exists(db)
query = 'SELECT msg FROM today ORDER BY ROWID DESC LIMIT 1'
async with db.execute(query) as cursor:
row = await cursor.fetchone()
if row is None:
return 'not working on anything?'
else:
return esc(row[0])
class TodayResponse(MessageResponse):
def __init__(self, match: Match[str]) -> None:
super().__init__(match, '')
async def __call__(self, config: Config) -> Optional[str]:
async with aiosqlite.connect('db.db') as db:
self.msg_fmt = await get_today(db)
return await super().__call__(config)
2019-08-30 04:19:09 +00:00
@handle_message('!today', '!project')
2019-08-11 00:19:15 +00:00
def cmd_today(match: Match[str]) -> Response:
return TodayResponse(match)
class SetTodayResponse(MessageResponse):
def __init__(self, match: Match[str], msg: str) -> None:
super().__init__(match, 'updated!')
self.msg = msg
async def __call__(self, config: Config) -> Optional[str]:
async with aiosqlite.connect('db.db') as db:
await set_today(db, self.msg)
return await super().__call__(config)
@handle_message('!settoday')
def cmd_settoday(match: Match[str]) -> Response:
2020-05-04 19:41:31 +00:00
_, _, rest = match['msg'].partition(' ')
2019-08-11 00:19:15 +00:00
return SetTodayResponse(match, rest)
2020-05-04 19:35:44 +00:00
async def check_call(*cmd: str) -> None:
proc = await asyncio.subprocess.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.DEVNULL,
)
await proc.communicate()
if proc.returncode != 0:
raise ValueError(cmd, proc.returncode)
class VideoIdeaResponse(MessageResponse):
def __init__(self, match: Match[str], videoidea: str) -> None:
super().__init__(
match,
'added! https://github.com/asottile/scratch/wiki/anthony-explains-ideas', # noqa: E501
)
self.videoidea = videoidea
async def __call__(self, config: Config) -> Optional[str]:
async def _git(*cmd: str) -> None:
await check_call('git', '-C', tmpdir, *cmd)
with tempfile.TemporaryDirectory() as tmpdir:
await _git(
'clone', '--depth=1', '--quiet',
'git@github.com:asottile/scratch.wiki', '.',
)
ideas_file = os.path.join(tmpdir, 'anthony-explains-ideas.md')
with open(ideas_file, 'rb+') as f:
f.seek(-1, os.SEEK_END)
c = f.read()
if c != b'\n':
f.write(b'\n')
f.write(f'- {self.videoidea}\n'.encode())
await _git('add', '.')
await _git('commit', '-q', '-m', 'idea added by !videoidea')
await _git('push', '-q', 'origin', 'HEAD')
return await super().__call__(config)
@handle_message('![wv]ideoidea')
def cmd_videoidea(match: Match[str]) -> Response:
if not _is_moderator(match) and match['user'] != match['channel']:
2020-05-04 19:35:44 +00:00
return MessageResponse(
match, 'https://www.youtube.com/watch?v=RfiQYRn7fBg',
)
_, _, rest = match['msg'].partition(' ')
return VideoIdeaResponse(match, rest)
2019-03-03 01:00:14 +00:00
class UptimeResponse(Response):
async def __call__(self, config: Config) -> Optional[str]:
url = f'https://api.twitch.tv/helix/streams?user_login={config.channel}' # noqa: E501
2020-05-04 22:19:25 +00:00
headers = {
'Authorization': f'Bearer {config.oauth_token.split(":")[1]}',
'Client-ID': config.client_id,
}
2019-03-03 01:00:14 +00:00
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
text = await response.text()
2019-03-07 01:31:25 +00:00
data = json.loads(text)['data']
if not data:
msg = 'not currently streaming!'
return PRIVMSG.format(channel=config.channel, msg=msg)
start_time_s = data[0]['started_at']
2019-03-03 01:00:14 +00:00
start_time = datetime.datetime.strptime(
start_time_s, '%Y-%m-%dT%H:%M:%SZ',
)
elapsed = (datetime.datetime.utcnow() - start_time).seconds
parts = []
for n, unit in (
(60 * 60, 'hours'),
(60, 'minutes'),
(1, 'seconds'),
):
if elapsed // n:
parts.append(f'{elapsed // n} {unit}')
elapsed %= n
msg = f'streaming for: {", ".join(parts)}'
return PRIVMSG.format(channel=config.channel, msg=msg)
@handle_message('!uptime')
def cmd_uptime(match: Match[str]) -> Response:
return UptimeResponse()
2020-05-09 09:57:13 +00:00
@async_lru.alru_cache(maxsize=32)
async def fetch_twitch_user(
user: str,
*,
oauth_token: str,
client_id: str
) -> Optional[List[Dict[str, Any]]]:
url = 'https://api.twitch.tv/helix/users'
params = [('login', user)]
headers = {
'Authorization': f'Bearer {oauth_token}',
'Client-ID': client_id,
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, headers=headers) as resp:
json_resp = await resp.json()
return json_resp.get('data')
async def fetch_twitch_user_follows(
*,
from_id: int,
to_id: int,
oauth_token: str,
client_id: str
) -> Optional[List[Dict[str, Any]]]:
url = 'https://api.twitch.tv/helix/users/follows'
params = [('from_id', from_id), ('to_id', to_id)]
headers = {
'Authorization': f'Bearer {oauth_token}',
'Client-ID': client_id,
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params, headers=headers) as resp:
json_resp = await resp.json()
return json_resp.get('data')
class FollowageResponse(Response):
def __init__(self, username: str) -> None:
self.username = username
async def __call__(self, config: Config) -> Optional[str]:
token = config.oauth_token.split(':')[1]
fetched_users = await fetch_twitch_user(
config.channel,
oauth_token=token,
client_id=config.client_id,
)
assert fetched_users is not None
me, = fetched_users
fetched_users = await fetch_twitch_user(
self.username,
oauth_token=token,
client_id=config.client_id,
)
if not fetched_users:
msg = f'user {esc(self.username)} not found!'
return PRIVMSG.format(channel=config.channel, msg=msg)
target_user, = fetched_users
# if streamer wants to check the followage to their own channel
if me['id'] == target_user['id']:
msg = (
f"@{esc(target_user['login'])}, you can't check !followage "
f'to your own channel. But I appreciate your curiosity!'
)
return PRIVMSG.format(channel=config.channel, msg=msg)
follow_age_results = await fetch_twitch_user_follows(
from_id=target_user['id'],
to_id=me['id'],
oauth_token=token,
client_id=config.client_id,
)
if not follow_age_results:
msg = f'{esc(target_user["login"])} is not a follower!'
return PRIVMSG.format(channel=config.channel, msg=msg)
follow_age, = follow_age_results
now = datetime.datetime.utcnow()
date_of_follow = datetime.datetime.fromisoformat(
# twitch sends ISO date string with "Z" at the end,
# which python's fromisoformat method does not like
follow_age['followed_at'].rstrip('Z'),
)
delta = now - date_of_follow
msg = (
f'{esc(follow_age["from_name"])} has been following for '
f'{esc(naturaldelta(delta))}!'
)
return PRIVMSG.format(channel=config.channel, msg=msg)
# !followage -> valid, checks the caller
# !followage anthonywritescode -> valid, checks the user passed in payload
# !followage foo bar -> still valid, however the whole
# "foo bar" will be processed as a username
@handle_message(r'!followage(?P<payload> .*)?')
def cmd_followage(match: Match[str]) -> Response:
user = match['user']
# "" is a default value if group is missing
groupdict = match.groupdict('')
payload = groupdict['payload'].strip()
if payload:
user = payload.lstrip('@')
return FollowageResponse(user)
@handle_message(r'!pep[ ]?(?P<pep_num>\d{1,4})')
def cmd_pep(match: Match[str]) -> Response:
2020-05-04 19:41:31 +00:00
n = str(int(match['pep_num'])).zfill(4)
2019-11-23 23:47:20 +00:00
return MessageResponse(match, f'https://www.python.org/dev/peps/pep-{n}/')
2020-05-11 04:21:11 +00:00
@handle_message('!joke')
def cmd_joke(match: Match[str]) -> Response:
return MessageResponse(match, esc(pyjokes.get_joke()))
2020-05-23 19:36:31 +00:00
@handle_message('!so (?P<user_channel>.+)')
def cmd_shoutout(match: Match[str]) -> Response:
if not _is_moderator(match) and match['user'] != match['channel']:
2020-05-23 19:44:43 +00:00
return MessageResponse(
match, 'https://www.youtube.com/watch?v=RfiQYRn7fBg',
)
2020-05-23 19:36:31 +00:00
user = match['user_channel']
return MessageResponse(
match,
f'you should check out https://twitch.tv/{esc(user)} !',
2020-05-23 19:36:31 +00:00
)
2019-06-08 20:51:36 +00:00
COMMAND_RE = re.compile(r'!\w+')
2020-05-23 19:44:43 +00:00
SECRET_CMDS = frozenset(('!settoday', '!so'))
2019-06-08 20:51:36 +00:00
2019-03-03 01:00:14 +00:00
@handle_message(r'!\w')
def cmd_help(match: Match[str]) -> Response:
2019-06-08 20:51:36 +00:00
possible = [COMMAND_RE.search(reg.pattern) for reg, _ in HANDLERS]
2019-08-11 00:19:15 +00:00
possible_cmds = {match[0] for match in possible if match} - SECRET_CMDS
commands = ['!help'] + sorted(possible_cmds)
2019-06-08 20:51:36 +00:00
msg = f'possible commands: {", ".join(commands)}'
2019-03-03 01:00:14 +00:00
if not match['msg'].startswith('!help'):
msg = f'unknown command ({esc(match["msg"].split()[0])}), {msg}'
return MessageResponse(match, msg)
@handle_message('PING')
def msg_ping(match: Match[str]) -> Response:
2020-05-04 19:41:31 +00:00
_, _, rest = match['msg'].partition(' ')
2019-03-03 01:00:14 +00:00
return MessageResponse(match, f'PONG {esc(rest)}')
@handle_message(
r'.*\b(?P<word>nano|linux|windows|emacs)\b', flags=re.IGNORECASE,
)
2019-05-25 23:12:27 +00:00
def msg_gnu_please(match: Match[str]) -> Response:
if random.randrange(0, 100) < 90:
return Response()
msg, word = match['msg'], match['word']
query = re.search(f'gnu[/+]{word}', msg, flags=re.IGNORECASE)
if query:
2019-10-19 22:03:52 +00:00
return MessageResponse(match, f'YES! {query[0]}')
2019-05-25 23:12:27 +00:00
else:
2019-10-19 22:04:50 +00:00
return MessageResponse(match, f"Um please, it's GNU+{esc(word)}!")
2019-05-25 23:12:27 +00:00
2019-12-09 04:30:55 +00:00
@handle_message(r'.*\bth[oi]nk(?:ing)?\b', flags=re.IGNORECASE)
def msg_think(match: Match[str]) -> Response:
return MessageResponse(match, 'anthon63Pythonk ' * 5)
2019-03-03 01:00:14 +00:00
# TODO: !tags, only allowed by stream admin / mods????
def dt_str() -> str:
dt_now = datetime.datetime.now()
2019-12-09 07:05:00 +00:00
return f'[{dt_now.hour:02}:{dt_now.minute:02}]'
2019-03-03 01:00:14 +00:00
def _shutdown(
writer: asyncio.StreamWriter,
loop: asyncio.AbstractEventLoop,
shutdown_task: Optional[asyncio.Task[Any]] = None,
) -> None:
print('bye!')
ignored_tasks = set()
if shutdown_task is not None:
ignored_tasks.add(shutdown_task)
if writer:
writer.close()
closing_task = loop.create_task(writer.wait_closed())
def cancel_tasks(fut: asyncio.Future[Any]) -> None:
tasks = [t for t in asyncio.all_tasks() if t not in ignored_tasks]
for task in tasks:
task.cancel()
closing_task.add_done_callback(cancel_tasks)
2020-05-20 20:47:03 +00:00
UNCOLOR_RE = re.compile(r'\033\[[^m]*m')
class LogWriter:
def __init__(self) -> None:
self.date = str(datetime.date.today())
def write_message(self, msg: str) -> None:
print(msg)
uncolored_msg = UNCOLOR_RE.sub('', msg)
os.makedirs('logs', exist_ok=True)
with open(os.path.join('logs', f'{self.date}.log'), 'a+') as f:
f.write(f'{uncolored_msg}\n')
2020-05-16 20:56:47 +00:00
async def handle_response(
config: Config,
match: Match[str],
handler: Callable[[Match[str]], Response],
writer: asyncio.StreamWriter,
2020-05-20 20:47:03 +00:00
log_writer: LogWriter,
2020-05-16 20:56:47 +00:00
*,
quiet: bool,
) -> None:
try:
res = await handler(match)(config)
except Exception as e:
traceback.print_exc()
res = PRIVMSG.format(
channel=config.channel,
msg=f'*** unhandled {type(e).__name__} -- see logs',
)
if res is not None:
send_match = SEND_MSG_RE.match(res)
if send_match:
color = '\033[1m\033[3m\033[38;5;21m'
2020-05-20 20:47:03 +00:00
log_writer.write_message(
2020-05-16 20:56:47 +00:00
f'{dt_str()}'
f'<{color}{config.username}\033[m> '
f'{send_match[1]}',
)
await send(writer, res, quiet=quiet)
async def amain(config: Config, *, quiet: bool) -> None:
2020-05-20 20:47:03 +00:00
log_writer = LogWriter()
2019-03-03 01:00:14 +00:00
reader, writer = await asyncio.open_connection(HOST, PORT, ssl=True)
loop = asyncio.get_event_loop()
shutdown_cb = functools.partial(_shutdown, writer, loop)
try:
loop.add_signal_handler(signal.SIGINT, shutdown_cb)
except NotImplementedError:
# Doh... Windows...
signal.signal(signal.SIGINT, lambda *_: shutdown_cb())
2019-03-03 01:00:14 +00:00
await send(writer, f'PASS {config.oauth_token}\r\n', quiet=True)
await send(writer, f'NICK {config.username}\r\n', quiet=quiet)
await send(writer, f'JOIN #{config.channel}\r\n', quiet=quiet)
2020-05-04 19:41:31 +00:00
await send(writer, 'CAP REQ :twitch.tv/tags\r\n', quiet=quiet)
2019-03-03 01:00:14 +00:00
while not writer.is_closing():
data = await recv(reader, quiet=quiet)
if not data:
return
2019-03-03 01:00:14 +00:00
msg = data.decode('UTF-8', errors='backslashreplace')
msg_match = MSG_RE.match(msg)
if msg_match:
2020-05-04 19:41:31 +00:00
info = _parse_badge_info(msg_match[1])
if info['color']:
r, g, b = _parse_color(info['color'])
else:
r, g, b = _gen_color(info['display-name'])
2020-05-04 22:42:48 +00:00
color_start = f'\033[1m\033[38;2;{r};{g};{b}m'
if msg_match[3].startswith('\x01ACTION '):
2020-05-20 20:47:03 +00:00
log_writer.write_message(
2020-05-04 22:42:48 +00:00
f'{dt_str()}'
f'{_badges(info["badges"])}'
f'{color_start}\033[3m * {info["display-name"]}\033[22m '
f'{msg_match[3][8:-1]}\033[m',
)
else:
if info.get('msg-id') == 'highlighted-message':
msg_s = f'\033[48;2;117;094;188m{msg_match[3]}\033[m'
else:
msg_s = msg_match[3]
2020-05-20 20:47:03 +00:00
log_writer.write_message(
2020-05-04 22:42:48 +00:00
f'{dt_str()}'
f'{_badges(info["badges"])}'
f'<{color_start}{info["display-name"]}\033[m> '
f'{msg_s}',
2020-05-04 22:42:48 +00:00
)
2019-03-03 01:00:14 +00:00
for pattern, handler in HANDLERS:
match = pattern.match(msg)
if match:
2020-05-16 20:56:47 +00:00
coro = handle_response(
2020-05-20 20:47:03 +00:00
config, match, handler, writer, log_writer, quiet=quiet,
2020-05-16 20:56:47 +00:00
)
loop.create_task(coro)
2019-03-03 01:00:14 +00:00
break
else:
if not quiet:
print(f'UNHANDLED: {msg}', end='')
2019-03-03 01:00:14 +00:00
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument('--config', default='config.json')
parser.add_argument('--verbose', action='store_true')
2019-03-03 01:00:14 +00:00
args = parser.parse_args()
with open(args.config) as f:
config = Config(**json.load(f))
with contextlib.suppress(KeyboardInterrupt):
asyncio.run(amain(config, quiet=not args.verbose))
2019-03-03 01:00:14 +00:00
return 0
if __name__ == '__main__':
exit(main())