add !motd and set_motd (channe point reward)

This commit is contained in:
Anthony Sottile 2020-07-17 15:01:59 -07:00
parent b0587f1c68
commit 931fd8e9c2
1 changed files with 99 additions and 9 deletions

108
bot.py
View File

@ -35,7 +35,11 @@ from humanize import naturaldelta
HOST = 'irc.chat.twitch.tv' HOST = 'irc.chat.twitch.tv'
PORT = 6697 PORT = 6697
MSG_RE = re.compile('^@([^ ]+) :([^!]+).* PRIVMSG #[^ ]+ :([^\r]+)') MSG_RE = re.compile(
'^@(?P<info>[^ ]+) :(?P<user>[^!]+).* '
'PRIVMSG #(?P<channel>[^ ]+) '
':(?P<msg>[^\r]+)',
)
PRIVMSG = 'PRIVMSG #{channel} : {msg}\r\n' PRIVMSG = 'PRIVMSG #{channel} : {msg}\r\n'
SEND_MSG_RE = re.compile('^PRIVMSG #[^ ]+ :(?P<msg>[^\r]+)') SEND_MSG_RE = re.compile('^PRIVMSG #[^ ]+ :(?P<msg>[^\r]+)')
@ -192,6 +196,16 @@ def handle_message(
) )
_POINTS_HANDLERS = {}
def channel_points_handler(reward_id: str) -> Callable[[Callback], Callback]:
def channel_points_handler_decorator(func: Callback) -> Callback:
_POINTS_HANDLERS[reward_id] = func
return func
return channel_points_handler_decorator
@handler('^PING (.*)') @handler('^PING (.*)')
def pong(match: Match[str]) -> Response: def pong(match: Match[str]) -> Response:
return CmdResponse(f'PONG {match.group(1)}\r\n') return CmdResponse(f'PONG {match.group(1)}\r\n')
@ -295,7 +309,7 @@ def cmd_bonk(match: Match[str]) -> Response:
) )
async def ensure_table_exists(db: aiosqlite.Connection) -> None: async def ensure_today_table_exists(db: aiosqlite.Connection) -> None:
await db.execute( await db.execute(
'CREATE TABLE IF NOT EXISTS today (' 'CREATE TABLE IF NOT EXISTS today ('
' msg TEXT NOT NULL,' ' msg TEXT NOT NULL,'
@ -306,13 +320,13 @@ async def ensure_table_exists(db: aiosqlite.Connection) -> None:
async def set_today(db: aiosqlite.Connection, msg: str) -> None: async def set_today(db: aiosqlite.Connection, msg: str) -> None:
await ensure_table_exists(db) await ensure_today_table_exists(db)
await db.execute('INSERT INTO today (msg) VALUES (?)', (msg,)) await db.execute('INSERT INTO today (msg) VALUES (?)', (msg,))
await db.commit() await db.commit()
async def get_today(db: aiosqlite.Connection) -> str: async def get_today(db: aiosqlite.Connection) -> str:
await ensure_table_exists(db) await ensure_today_table_exists(db)
query = 'SELECT msg FROM today ORDER BY ROWID DESC LIMIT 1' query = 'SELECT msg FROM today ORDER BY ROWID DESC LIMIT 1'
async with db.execute(query) as cursor: async with db.execute(query) as cursor:
row = await cursor.fetchone() row = await cursor.fetchone()
@ -358,6 +372,68 @@ def cmd_settoday(match: Match[str]) -> Response:
return SetTodayResponse(match, rest) return SetTodayResponse(match, rest)
async def ensure_motd_table_exists(db: aiosqlite.Connection) -> None:
await db.execute(
'CREATE TABLE IF NOT EXISTS motd ('
' user TEXT NOT NULL,'
' msg TEXT NOT NULL,'
' points INT NOT NULL,'
' timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP'
')',
)
await db.commit()
async def set_motd(db: aiosqlite.Connection, user: str, msg: str) -> None:
await ensure_motd_table_exists(db)
query = 'INSERT INTO motd (user, msg, points) VALUES (?, ?, ?)'
await db.execute(query, (user, msg, 250))
await db.commit()
async def get_motd(db: aiosqlite.Connect) -> str:
await ensure_motd_table_exists(db)
query = 'SELECT msg FROM motd ORDER BY ROWID DESC LIMIT 1'
async with db.execute(query) as cursor:
row = await cursor.fetchone()
if row is None:
return 'nothing???'
else:
return esc(row[0])
class SetMotdResponse(MessageResponse):
def __init__(self, match: Match[str]) -> None:
super().__init__(match, 'motd updated! thanks for spending points!')
self.user = match['user']
self.msg = match['msg']
async def __call__(self, config: Config) -> Optional[str]:
async with aiosqlite.connect('db.db') as db:
await set_motd(db, self.user, self.msg)
return await super().__call__(config)
@channel_points_handler('a2fa47a2-851e-40db-b909-df001801cade')
def cmd_set_motd(match: Match[str]) -> Response:
return SetMotdResponse(match)
class MotdResponse(MessageResponse):
def __init__(self, match: Match[str]) -> None:
super().__init__(match, '')
async def __call__(self, config: Config) -> Optional[str]:
async with aiosqlite.connect('db.db') as db:
self.msg_fmt = await get_motd(db)
return await super().__call__(config)
@handle_message('!motd')
def cmd_motd(match: Match[str]) -> Response:
return MotdResponse(match)
async def check_call(*cmd: str) -> None: async def check_call(*cmd: str) -> None:
proc = await asyncio.subprocess.create_subprocess_exec( proc = await asyncio.subprocess.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.DEVNULL, *cmd, stdout=asyncio.subprocess.DEVNULL,
@ -720,7 +796,7 @@ async def amain(config: Config, *, quiet: bool) -> None:
msg_match = MSG_RE.match(msg) msg_match = MSG_RE.match(msg)
if msg_match: if msg_match:
info = _parse_badge_info(msg_match[1]) info = _parse_badge_info(msg_match['info'])
if info['color']: if info['color']:
r, g, b = _parse_color(info['color']) r, g, b = _parse_color(info['color'])
else: else:
@ -728,18 +804,20 @@ async def amain(config: Config, *, quiet: bool) -> None:
color_start = f'\033[1m\033[38;2;{r};{g};{b}m' color_start = f'\033[1m\033[38;2;{r};{g};{b}m'
if msg_match[3].startswith('\x01ACTION '): if msg_match['msg'].startswith('\x01ACTION '):
log_writer.write_message( log_writer.write_message(
f'{dt_str()}' f'{dt_str()}'
f'{_badges(info["badges"])}' f'{_badges(info["badges"])}'
f'{color_start}\033[3m * {info["display-name"]}\033[22m ' f'{color_start}\033[3m * {info["display-name"]}\033[22m '
f'{msg_match[3][8:-1]}\033[m', f'{msg_match["msg"][8:-1]}\033[m',
) )
else: else:
if info.get('msg-id') == 'highlighted-message': if info.get('msg-id') == 'highlighted-message':
msg_s = f'\033[48;2;117;094;188m{msg_match[3]}\033[m' msg_s = f'\033[48;2;117;094;188m{msg_match["msg"]}\033[m'
elif 'custom-reward-id' in info:
msg_s = f'\033[48;2;029;091;130m{msg_match["msg"]}\033[m'
else: else:
msg_s = msg_match[3] msg_s = msg_match['msg']
log_writer.write_message( log_writer.write_message(
f'{dt_str()}' f'{dt_str()}'
@ -748,6 +826,18 @@ async def amain(config: Config, *, quiet: bool) -> None:
f'{msg_s}', f'{msg_s}',
) )
if 'custom-reward-id' in info:
if info['custom-reward-id'] in _POINTS_HANDLERS:
handler = _POINTS_HANDLERS[info['custom-reward-id']]
coro = handle_response(
config, msg_match, handler, writer, log_writer,
quiet=quiet,
)
loop.create_task(coro)
elif not quiet:
print(f'UNHANDLED reward({info["custom-reward-id"]})')
continue
for pattern, handler in HANDLERS: for pattern, handler in HANDLERS:
match = pattern.match(msg) match = pattern.match(msg)
if match: if match: