#!/user/bin/env python #library imports import asyncore, asynchat, socket, threading, time from collections import deque class TwitchBot(asynchat.async_chat): """Defines an instance of TwitchBot, a class designed to handle connecting to a Twitch.tv channel chat, parse the chat activity, and handle the IRC protocols. TwitchBot also implements a producer-consumer thread module designed to limit the bots messaging activity to at most one message every 3.1 seconds to avoid losing messages to the Twitch.tv chat spam filter. This bot is mainly meant to be subclassed and just serves as a protocol handling base class. TwitchBot subclasses the asyncchat.asyn_chat class. Running an instance of TwitchBot requires a twitch account name, a password for that account, and a channel name. Keyword arguments: verbose: a boolean value controlling print statements for debugging and possible logging purposes. Defaults True. test: a boolean value that controls whether output should made to sys.stdout or to the twtich channel chat. Defaults False.""" def __init__(self, nickname, passw, stream, verbose=True, test=False): asynchat.async_chat.__init__(self) self.set_terminator(b'\n') self.received =[] self.verbose_m = verbose self.joined = False self.test_m=test self.nick = nickname self.pw = passw self.streamName = stream self.channel = '#'+stream self.host=("%s.jtvirc.com" %(self.streamName)) self.sending=threading.RLock() #prepare/start producer/consumer message processing self.msg_queue = deque() self.msg_lock = threading.Condition() self.speaker_continue = True self.speaker_thread = threading.Thread(target=self.dispatch_message) self.speaker_thread.start() def run(self, host=None, port=6667): """Handles connecting to the Twitch.tv chat server. Keyword arguments: host=host name of the server. Defaults to an internally calculated name based on the stream name given on creation. port=port to connect to. Defaults to the current port specified by Twitch.tv, 6667.""" if host is None: host=self.host self.__connect_to_chat(host, port) def __connect_to_chat(self, host, port): """Create a socket and connect to the chat.""" if self.verbose_m: print('Connecting to %s:%s . . .' %(host, port)) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect((host, port)) try: asyncore.loop() except KeyboardInterrupt: raise SystemExit(0) def handle_connect(self): """Sends the proper IRC formatted messages required to join the Twitch.tv channel chat.""" self.push(bytes("Pass %s\r\n" %(self.pw), 'UTF-8')) self.push(bytes("NICK %s\r\n" %(self.nick), 'UTF-8')) self.push(bytes("JOIN %s\r\n" %(self.channel), 'UTF-8')) def collect_incoming_data(self, data): """Reads in incoming chat data.""" self.received.append(data) def initiate_send(self): """Attempts to makes the async_chat method of the same name thread safe.""" self.sending.acquire() asynchat.async_chat.initiate_send(self) self.sending.release() def found_terminator(self): """Recognize a recieved message and parse it to get the name and message and check to see if it is a command. Also responds to periodic 'ping' messages from the Twitch.tv chat server.""" ircmsg = self.__parse_data() if self.verbose_m: print(ircmsg) #check that the bot has joined the chat if not self.joined and ircmsg.find(' JOIN ') != -1: nick= ircmsg.split('!')[0][1:] chat=ircmsg.split('#')[1][0:] if chat == self.streamName and nick == self.nick.lower(): self.joined = True if self.verbose_m: print('joined %s chat successfully' %(self.streamName)) if ircmsg.find('PING ') != -1: self.push(bytes('PING :PONG\r\n', 'UTF-8')) if ircmsg.find(' PRIVMSG ') !=-1: nick = ircmsg.split('!')[0][1:] msg = ircmsg.split(' PRIVMSG ')[-1].split(' :')[1] self.do_command(nick, msg) def __parse_data(self): """parses an IRC message that has been recieved from raw chat data.""" data = b''.join(self.received).rstrip(b'\r') del self.received[:] try: ircmsg = data.decode('utf-8') except UnicodeDecodeError as e: ircmsg = data.decode('iso-8859-1') return ircmsg def do_command(self, nick, msg): pass #implemented by subclasses def say(self, msg): """Produces a message that, when consumed, will be pushed to the chat by adding the message to a queue.""" if self.joined or self.test_m: with self.msg_lock: self.msg_queue.append(msg) self.msg_lock.notify() def __message_available(self): with self.msg_lock: return len(self.msg_queue)>0 def dispatch_message(self): """This function is run exclusively by an outside thread. Consumes an available message in the message queue and then sleeps before 3.1 seconds.""" while self.speaker_continue: message='' with self.msg_lock: self.msg_lock.wait_for(self.__message_available) message=self.msg_queue.popleft() if self.test_m: print(" PRIVMSG {!s} : {!s}\r\n".format(self.channel, message)) else: self.push(bytes(" PRIVMSG {!s} : {!s}\r\n".format(self.channel, message), 'UTF-8')) time.sleep(3.1) def terminate(self): """Attempt to safely terminate the bot by waiting for the consumer thread to close.""" if self.verbose_m: print('preparing to exit (closing message dispatch thread)...') self.speaker_continue=False self.say("vespabot, signing off") #cheap way of avoiding deadlock if no messages in queue self.speaker_thread.join() if self.verbose_m: print('exiting') self.close_when_done()