tea_cah/botcmd.py
Wolfgang Müller b38912b41d Add support for a random pick from a selection of cards
!card now accepts a comma-separated list of numbers from which it will
select one randomly, with equal probability. This selection is made
before the card event is sent out and is fully transparent to the rest
of the game logic.
2019-05-13 22:57:21 +03:00

433 lines
10 KiB
Python

import threading
import time
import channel
import gameloop
import random
import re
nickserv_pass = None
irc_chan = None
game_channel = None
def chunk(l, n):
assert 0 < n
chunked = []
item = []
for i in l:
if len(item) >= n:
chunked.append(item)
item = []
item.append(i)
if len(item) > 0:
chunked.append(item)
return chunked
class GameLoop(threading.Thread):
def __init__(self, irc, chan, irc_chan):
self.irc = irc
self.chan = chan
self.irc_chan = irc_chan
threading.Thread.__init__(self)
def send(self, message):
message_parts = message.encode().split(b' ')
line = []
line_len = 0
for part in message_parts:
if len(part) + line_len > 440:
self.irc.bot_response(self.irc_chan, b' '.join(line))
line = []
line_len = 0
line.append(part)
line_len += len(part) + 1
if len(line) > 0:
self.irc.bot_response(self.irc_chan, b' '.join(line))
def notice(self, recipient, message):
recipient = recipient.encode()
message_parts = message.encode().split(b' ')
line = []
line_len = 0
for part in message_parts:
if len(part) + line_len > 440:
self.irc.send_raw(b'NOTICE %s :%s %s' % (recipient, self.irc_chan, b' '.join(line)))
line = []
line_len = 0
line.append(part)
line_len += len(part) + 1
if len(line) > 0:
self.irc.send_raw(b'NOTICE %s :%s %s' % (recipient, self.irc_chan, b' '.join(line)))
def get_event(self):
event = self.chan.recv()
return event
def voice(self, nicks):
if type(nicks) == str: nicks = [nicks]
for nicks in chunk(nicks, 4):
self.irc.send_raw(b'MODE %s +%s %s' % (self.irc_chan, b'v'*len(nicks), b' '.join(i.encode() for i in nicks)))
def devoice(self, nicks):
if type(nicks) == str: nicks = [nicks]
for nicks in chunk(nicks, 4):
self.irc.send_raw(b'MODE %s -%s %s' % (self.irc_chan, b'v'*len(nicks), b' '.join(i.encode() for i in nicks)))
def run(self):
try:
gameloop.game(self.send, self.notice, self.voice, self.devoice, self.get_event)
except Exception as err:
self.send('Crash! (%s, %s)' % (type(err), repr(err)))
finally:
self.chan.close()
def start_gameloop(irc):
global game_channel, irc_chan
if game_channel is not None:
return
chan = channel.Channel()
GameLoop(irc, chan, irc_chan).start()
game_channel = chan
def stop_gameloop():
global game_channel
if game_channel is None:
return
game_channel.send((gameloop.events.quit,))
game_channel = None
def send_event(event):
global game_channel
game_channel.send(event)
def usage(command):
if type(command) == str:
command = [command]
if len(command) > 0:
if command[0][0] == '!':
command[0] = command[0][1:]
if len(command) == 0:
return '!status !start !ready !unready !kill !join !leave !players !kick !deck !limit !card !cards !origins !redeal !help'
elif len(command) == 1:
if command[0] in ('status', 'ready', 'unready', 'kill', 'join', 'leave', 'players', 'cards', 'origins', 'redeal'):
return 'Usage: !%s' % (command[0])
elif command[0] == 'start':
return 'Usage: !start [<preset>]'
elif command[0] == 'join':
return 'Usage: !join [<message>]'
elif command[0] == 'kick':
return 'Usage: !kick <nick>'
elif command[0] == 'card':
return 'Usage: [!card] <number[,number,...]> ...'
elif command[0] == 'deck':
return 'Subcommands: !deck add | remove | list'
elif command[0] == 'bot':
return 'Subcommands: !bot add | remove'
elif command[0] == 'limit':
return 'Usage: !limit [<number> [<type>]]'
elif command[0] == 'help':
return 'Usage: !help [command [subcommmand]]'
else:
return 'No such command !%s' % (command[0])
elif len(command) == 2:
if command[0] == 'deck':
if command[1] == 'add':
return 'Usage: !deck add <code> | random'
elif command[1] == 'remove':
return 'Usage: !deck remove <code>'
elif command[1] == 'list':
return 'Usage: !deck list'
else:
return 'No such subcommand !%s %s' % (command[0], command[1])
elif command[0] == 'bot':
if command[1] == 'add':
return 'Usage: !bot add <type> [<name>]'
elif command[1] == 'remove':
return 'Usage: !bot remove <name>'
else:
return 'No such subcommand !%s %s' % (command[0], command[1])
else:
return 'No such subcommand !%s %s' % (command[0], command[1])
else:
return 'Uh, how did we get %i args?' % len(command)
def parse_command(message, nick, irc):
def send(m):
global irc_chan
irc.bot_response(irc_chan, m)
def arg(num, index = 1):
nonlocal message
if type(num) == int:
num = [num]
if len(message) - index not in num:
send(usage(message[:index]))
return None
return message[index:]
def valid_choice(c):
return re.fullmatch(r'\d+(,\d+)*', c)
events = gameloop.events
message = message.split()
if len(message) == 0: return
c = message[0]
if c == '!status':
if arg(0) is not None:
send_event((events.status,))
elif c == '!start':
args = arg([0, 1])
if args is not None:
if len(args) == 0:
send_event((events.start, nick))
else:
send_event((events.start, nick, args[0]))
elif c == '!ready':
if arg(0) is not None:
send_event((events.ready, nick))
elif c == '!unready':
if arg(0) is not None:
send_event((events.unready, nick))
elif c == '!kill':
if arg(0) is not None:
send_event((events.kill,))
elif c == '!join':
if len(message) > 1:
send_event((events.join, nick, ' '.join(message[1:])))
else:
send_event((events.join, nick))
elif c == '!leave':
if arg(0) is not None:
send_event((events.leave, nick))
elif c == '!players':
if arg(0) is not None:
send_event((events.players,))
elif c == '!kick':
args = arg(1)
if args is not None:
kickee, = args
send_event((events.kick, nick, kickee))
elif c == '!deck':
if len(message) < 2:
send(usage('!deck'))
return
subc = message[1]
if subc == 'add':
args = arg(1, 2)
if args is not None:
code, = args
if code == 'random':
send_event((events.deck_add_random,))
else:
send_event((events.deck_add, code))
elif subc == 'remove':
args = arg(1, 2)
if args is not None:
code, = args
send_event((events.deck_remove, code))
elif subc == 'list':
if arg(0, 2) is not None:
send_event((events.deck_list,))
else:
send(usage('!deck'))
elif c == '!bot':
if len(message) < 2:
send(usage('!bot'))
return
subc = message[1]
if subc == 'add':
args = arg([1, 2], 2)
if args is not None:
if len(args) == 2:
bot_type, name = args
else:
bot_type, = args
name = bot_type
if bot_type == 'rando':
send_event((events.bot_add_rando, name))
else:
send('Allowed bot types: rando')
elif subc == 'remove':
args = arg(1, 2)
if args is not None:
name, = args
send_event((events.bot_remove, name))
else:
send(usage('!bot'))
elif c == '!limit':
args = arg([0, 1, 2])
if args is None: return
if len(args) == 0:
send_event((events.limit,))
else:
num = args[0]
if not num.isdecimal():
send(usage('!limit'))
return
num = int(num)
if len(args) == 2:
limit_type = args[1]
if limit_type == 'p' or limit_type == 'points':
send_event((events.limit, gameloop.limit_types.points, num))
elif limit_type == 'r' or limit_type == 'rounds':
send_event((events.limit, gameloop.limit_types.rounds, num))
else:
send('Allowed limit types: p(oints), r(ounds)')
else:
send_event((events.limit, gameloop.limit_types.points, num))
elif c == '!card' or all(valid_choice(i) for i in message):
if c == '!card':
args = message[1:]
else:
args = message
if not all(valid_choice(i) for i in args):
send(usage('!card'))
return
def pick(c):
if c.isdecimal():
return c
else:
return random.choice(c.split(','))
choices = [int(pick(i)) for i in args]
send_event((events.card, nick, choices))
elif c == '!cards':
if arg(0) is not None:
send_event((events.cards, nick))
elif c == '!origins':
if arg(0) is not None:
send_event((events.origins, nick))
elif c == '!redeal':
if arg(0) is not None:
send_event((events.redeal, nick))
elif c == '!help':
args = arg([0, 1, 2])
if args is not None:
send(usage(args))
# initialize(*, config)
# Called to initialize the IRC bot
# Runs before even logger is brought up, and blocks further bringup until it's done
# config is a configpatser.ConfigParser object containig contents of bot.conf
def initialize(*, config):
global nickserv_pass, irc_chan
nickserv_pass = config['nickserv']['password']
irc_chan = config['server']['channels'].split()[0].encode()
# on_connect(*, irc)
# Called after IRC bot has connected and sent the USER/NICk commands but not yet attempted anything else
# Called for every reconnect
# Blocks the bot until it's done, including PING/PONG handling
# irc is the IRC API object
def on_connect(*, irc):
global nickserv_pass
if nickserv_pass != '':
irc.msg(b'nickserv', b'IDENTIFY ' + nickserv_pass.encode())
time.sleep(30) # One day I will do this correctly. Today is not the day
stop_gameloop()
start_gameloop(irc)
# on_quit(*, irc)
# Called just before IRC bot sends QUIT
# Blocks the bot until it's done, including PING/PONG handling
# irc is the IRC API object
def on_quit(*, irc):
stop_gameloop()
# handle_message(*, prefix, message, nick, channel, irc)
# Called for PRIVMSGs.
# prefix is the prefix at the start of the message, without the leading ':'
# message is the contents of the message
# nick is who sent the message
# channel is where you should send the response (note: in queries nick == channel)
# irc is the IRC API object
# All strings are bytestrings
def handle_message(*, prefix, message, nick, channel, irc):
global irc_chan
if channel == irc_chan:
parse_command(message.decode(), nick.decode(), irc)
# handle_nonmessage(*, prefix, command, arguments, irc)
# Called for all other commands than PINGs and PRIVMSGs.
# prefix is the prefix at the start of the message, without the leading ':'
# command is the command or number code
# arguments is rest of the arguments of the command, represented as a list. ':'-arguments are handled automatically
# irc is the IRC API object
# All strings are bytestrings
def handle_nonmessage(*, prefix, command, arguments, irc):
if command == b'NICK':
old = prefix.split(b'!')[0].decode()
new = arguments[0].decode()
send_event((gameloop.events.nick_change, old, new))
elif command == b'PART' or command == b'QUIT':
nick = prefix.split(b'!')[0].decode()
send_event((gameloop.events.leave, nick))
elif command == b'KICK':
nick = arguments[1].decode()
send_event((gameloop.events.leave, nick))