224 lines
6.0 KiB
Python
224 lines
6.0 KiB
Python
import argparse
|
|
import asyncio
|
|
import datetime
|
|
import json
|
|
import re
|
|
import sys
|
|
import traceback
|
|
from typing import Callable
|
|
from typing import List
|
|
from typing import Match
|
|
from typing import NamedTuple
|
|
from typing import NoReturn
|
|
from typing import Optional
|
|
from typing import Pattern
|
|
from typing import Tuple
|
|
|
|
import aiohttp
|
|
|
|
# TODO: allow host / port to be configurable
|
|
HOST = 'irc.chat.twitch.tv'
|
|
PORT = 6697
|
|
|
|
|
|
class Config(NamedTuple):
|
|
username: str
|
|
channel: str
|
|
oauth_token: str
|
|
client_id: str
|
|
client_secret: str
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f'{type(self).__name__}('
|
|
f'username={self.username!r}, '
|
|
f'channel={self.channel!r}, '
|
|
f'oauth_token={"***"!r}, '
|
|
f'client_id={"***"!r}, '
|
|
f'client_secret={"***"!r}, '
|
|
f')'
|
|
)
|
|
|
|
|
|
def esc(s: str) -> str:
|
|
return s.replace('{', '{{').replace('}', '}}')
|
|
|
|
|
|
async def send(
|
|
writer: asyncio.StreamWriter,
|
|
msg: str,
|
|
*,
|
|
quiet: bool = False,
|
|
) -> None:
|
|
if not quiet:
|
|
print(f'< {msg}', end='', flush=True, file=sys.stderr)
|
|
writer.write(msg.encode())
|
|
return await writer.drain()
|
|
|
|
|
|
async def recv(
|
|
reader: asyncio.StreamReader,
|
|
*,
|
|
quiet: bool = False,
|
|
) -> bytes:
|
|
data = await reader.readline()
|
|
if not quiet:
|
|
sys.stderr.buffer.write(b'> ')
|
|
sys.stderr.buffer.write(data)
|
|
sys.stderr.flush()
|
|
return data
|
|
|
|
|
|
class Response:
|
|
async def __call__(self, config: Config) -> Optional[str]:
|
|
return None
|
|
|
|
|
|
class CmdResponse(Response):
|
|
def __init__(self, cmd: str) -> None:
|
|
self.cmd = cmd
|
|
|
|
async def __call__(self, config: Config) -> Optional[str]:
|
|
return self.cmd
|
|
|
|
|
|
PRIVMSG = 'PRIVMSG #{channel} :{msg}\r\n'
|
|
|
|
|
|
class MessageResponse(Response):
|
|
def __init__(self, match: Match[str], msg_fmt: str) -> None:
|
|
self.match = match
|
|
self.msg_fmt = msg_fmt
|
|
|
|
async def __call__(self, config: Config) -> Optional[str]:
|
|
params = self.match.groupdict()
|
|
params['msg'] = self.msg_fmt.format(**params)
|
|
return PRIVMSG.format(**params)
|
|
|
|
|
|
Callback = Callable[[Match[str]], Response]
|
|
HANDLERS: List[Tuple[Pattern[str], Callable[[Match[str]], Response]]]
|
|
HANDLERS = []
|
|
|
|
|
|
def handler(prefix: str) -> Callable[[Callback], Callback]:
|
|
def handler_decorator(func: Callback) -> Callback:
|
|
HANDLERS.append((re.compile(prefix + '\r\n$'), func))
|
|
return func
|
|
return handler_decorator
|
|
|
|
|
|
def handle_message(message_prefix: str) -> Callable[[Callback], Callback]:
|
|
return handler(
|
|
f'^:(?P<user>[^!]+).* '
|
|
f'PRIVMSG #(?P<channel>[^ ]+) '
|
|
f':(?P<msg>{message_prefix}.*)',
|
|
)
|
|
|
|
|
|
@handler('^PING (.*)')
|
|
def pong(match: Match[str]) -> Response:
|
|
return CmdResponse(f'PONG {match.group(1)}\r\n')
|
|
|
|
|
|
@handle_message('!ohai')
|
|
def cmd_ohai(match: Match[str]) -> Response:
|
|
return MessageResponse(match, 'ohai, {user}!')
|
|
|
|
|
|
class UptimeResponse(Response):
|
|
async def __call__(self, config: Config) -> Optional[str]:
|
|
url = f'https://api.twitch.tv/helix/streams?user_login={config.channel}' # noqa: E501
|
|
headers = {'Client-ID': config.client_id}
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, headers=headers) as response:
|
|
text = await response.text()
|
|
start_time_s = json.loads(text)['data'][0]['started_at']
|
|
start_time = datetime.datetime.strptime(
|
|
start_time_s, '%Y-%m-%dT%H:%M:%SZ',
|
|
)
|
|
elapsed = (datetime.datetime.utcnow() - start_time).seconds
|
|
|
|
parts = []
|
|
for n, unit in (
|
|
(60 * 60, 'hours'),
|
|
(60, 'minutes'),
|
|
(1, 'seconds'),
|
|
):
|
|
if elapsed // n:
|
|
parts.append(f'{elapsed // n} {unit}')
|
|
elapsed %= n
|
|
msg = f'streaming for: {", ".join(parts)}'
|
|
return PRIVMSG.format(channel=config.channel, msg=msg)
|
|
|
|
|
|
@handle_message('!uptime')
|
|
def cmd_uptime(match: Match[str]) -> Response:
|
|
return UptimeResponse()
|
|
|
|
|
|
@handle_message(r'!\w')
|
|
def cmd_help(match: Match[str]) -> Response:
|
|
msg = 'possible commands: !help, !ohai, !uptime'
|
|
if not match['msg'].startswith('!help'):
|
|
msg = f'unknown command ({esc(match["msg"].split()[0])}), {msg}'
|
|
return MessageResponse(match, msg)
|
|
|
|
|
|
@handle_message('PING')
|
|
def msg_ping(match: Match[str]) -> Response:
|
|
_, _, msg = match.groups()
|
|
_, _, rest = msg.partition(' ')
|
|
return MessageResponse(match, f'PONG {esc(rest)}')
|
|
|
|
|
|
# TODO: !tags, only allowed by stream admin / mods????
|
|
|
|
|
|
@handler('.*')
|
|
def unhandled(match: Match[str]) -> Response:
|
|
print(f'UNHANDLED: {match.group()}', end='')
|
|
return Response()
|
|
|
|
|
|
async def amain(config: Config) -> NoReturn:
|
|
reader, writer = await asyncio.open_connection(HOST, PORT, ssl=True)
|
|
|
|
await send(writer, f'PASS {config.oauth_token}\r\n', quiet=True)
|
|
await send(writer, f'NICK {config.username}\r\n')
|
|
await send(writer, f'JOIN #{config.channel}\r\n')
|
|
|
|
while True:
|
|
data = await recv(reader)
|
|
msg = data.decode('UTF-8', errors='backslashreplace')
|
|
for pattern, handler in HANDLERS:
|
|
match = pattern.match(msg)
|
|
if match:
|
|
try:
|
|
res = await handler(match)(config)
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
res = PRIVMSG.format(
|
|
channel=config.channel,
|
|
msg=f'*** unhandled {type(e).__name__} -- see logs',
|
|
)
|
|
if res is not None:
|
|
await send(writer, res)
|
|
break
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--config', default='config.json')
|
|
args = parser.parse_args()
|
|
|
|
with open(args.config) as f:
|
|
config = Config(**json.load(f))
|
|
|
|
asyncio.run(amain(config))
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
exit(main())
|