modulecore.py

Sat, 11 Apr 2015 21:02:54 +0300

author
Teemu Piippo <crimsondusk64@gmail.com>
date
Sat, 11 Apr 2015 21:02:54 +0300
changeset 122
f899af683bbe
parent 121
ac07779f788d
child 124
7b2cd8b1ba86
permissions
-rw-r--r--

- fixed a derp in commitsdb

import os
import re
import time
from configfile import Config

Modules = {}
Commands = {}
Hooks = {}

class CommandError (Exception):
	def __init__ (self, value):
		self.value = value
	def __str__ (self):
		return self.value

#
# init_data()
#
# Initializes command module data
#
def init_data():
	global Commands
	global Modules
	files = os.listdir ('.')
	numHooks = 0

	for fn in files:
		if fn[0:4] != 'mod_' or fn[-3:] != '.py':
			continue

		fn = fn[0:-3]
		globals = {}
		module = __import__ (fn)
		Modules[fn] = module

		for cmd in module.ModuleData['commands']:
			if cmd['args'] == None:
				cmd['args'] = ''

			cmd['module'] = module
			cmd['regex'] = make_regex (cmd['args'])
			cmd['argnames'] = []
			Commands[cmd['name']] = cmd

			for argname in cmd['args'].split (' '):
				argname = argname[1:-1]

				if argname[-3:] == '...':
					argname = argname[0:-3]

				if argname == '':
					continue

				cmd['argnames'].append (argname)

		if 'hooks' in module.ModuleData:
			for key, hooks in module.ModuleData['hooks'].iteritems():
				for hook in hooks:
					if key not in Hooks:
						Hooks[key] = []

					Hooks[key].append ({'name': hook, 'func': getattr (module, hook)})
					numHooks += 1

		print "Loaded module %s" % fn

	print ('Loaded %d commands and %d hooks in %d modules' %
		(len (Commands), numHooks, len (Modules)))

#
# command_error (message)
#
# Raises a command error
#
def command_error (message):
	raise CommandError (message)

#
# is_available (cmd, ident, host)
#
# Is the given command available to the given user?
#
def is_available (cmd, ident, host):
	if cmd['level'] == 'admin' \
	and not "%s@%s" % (ident, host) in Config.get_value ('admins', default=[]):
		return False

	return True

#
# response_function
#
g_responsePages = [[]]
g_responsePageNum = 0
g_lastConfirm = 0
g_confirmCommand = None

class Confirmxeption (Exception):
	def __init__ (self, id, value):
		self.id = id
		self.message = value
	def __str__ (self):
		return self.message

def response_function (message):
	global g_responsePages

	if len (g_responsePages[-1]) > 4:
		g_responsePages.append ([message])
	else:
		g_responsePages[-1].append (message)

def confirm_function (id, message):
	raise Confirmxeption (id, message)

def print_responses (commandObject):
	global g_responsePages
	global g_responsePageNum
	bot = commandObject['bot']
	replyto = commandObject['replyto']

	# Check bounds
	if g_responsePageNum >= len (g_responsePages):
		bot.privmsg (replyto, "No more messages.")
		return

	# Print this page
	for line in g_responsePages[g_responsePageNum]:
		bot.privmsg (replyto, line)

	# Advance page cursor
	g_responsePageNum += 1

	# If this was not the last page, tell the user there's more
	if g_responsePageNum != len (g_responsePages):
		num = (len (g_responsePages) - g_responsePageNum)
		bot.privmsg (replyto, "%d more page%s, use .more to continue output" \
			% (num, 's' if num != 1 else ''))

#
# check_same_caller (comm1, comm2)
#
# Are the two commands called by the same person?
#
def check_same_caller (comm1, comm2):
	return comm1['bot'].name == comm2['bot'].name \
	   and comm1['sender']   == comm2['sender'] \
	   and comm1['ident']    == comm2['ident'] \
	   and comm1['host']     == comm2['host']

def exec_command (commandObject):
	global g_lastConfirm
	global g_confirmCommand
	cmdname = commandObject['cmdname']

	try:
		func = getattr (commandObject['module'], 'cmd_' + cmdname)
	except AttributeError:
		command_error ('command "%s" is not defined!' % cmdname)

	try:
		func (**commandObject)
	except Confirmxeption as e:
		if time.time() - g_lastConfirm < 15 and g_confirmCommand != None:
			command_error ('''another confirm is underway''')

		g_lastConfirm = time.time()
		response_function (str (e) + ' (.yes/.no)')
		commandObject['confirmed'] = e.id
		g_confirmCommand = commandObject
	except Exception as e:
		command_error (str (e))

#
# call_command (bot, message, cmdname, **kvargs)
#
# Calls a cobalt command
#
def call_command (bot, message, cmdname, **kvargs):
	global g_responsePages
	global g_responsePageNum

	try:
		cmd = Commands[cmdname]
	except KeyError:
		return

	if not is_available (cmd, kvargs['ident'], kvargs['host']):
		command_error ("you may not use %s" % cmdname)

	match = re.compile (cmd['regex']).match (message)

	if match == None:
		# didn't match
		command_error ('invalid arguments\nusage: %s %s' % (cmd['name'], cmd['args']))

	# .more is special as it is an interface to the page system.
	# Anything else resets it.
	if cmdname != 'more':
		g_responsePages = [[]]
		g_responsePageNum = 0

	i = 1
	args = {}

	for argname in cmd['argnames']:
		args[argname] = match.group (i)
		i += 1

	print "ModuleCore: %s called by %s" % (cmdname, kvargs['sender'])
	commandObject = kvargs
	commandObject['bot'] = bot
	commandObject['cmdname'] = cmdname
	commandObject['args'] = args
	commandObject['reply'] = response_function
	commandObject['confirm'] = confirm_function
	commandObject['confirmed'] = 0
	commandObject['commandObject'] = commandObject
	commandObject['info'] = cmd
	commandObject['module'] = cmd['module']
	commandObject['error'] = command_error
	exec_command (commandObject)

	# Print the first page of responses.
	if cmdname != 'more':
		print_responses (commandObject)

def call_hook (bot, hookname, **kvargs):
	global g_responsePages
	global g_responsePageNum
	hookObject = kvargs
	hookObject['bot'] = bot
	g_responsePages = [[]]
	g_responsePageNum = 0

	if 'replyto' in hookObject:
		hookObject['reply'] = response_function

	if hookname in Hooks:
		for hook in Hooks[hookname]:
			hook['func'] (**hookObject)

	print_responses (hookObject)

def confirm (cmd, yes):
	global g_confirmCommand

	if g_confirmCommand == None:
		cmd['reply'] ('%s to what?' % cmd['cmdname'])
		return

	if not check_same_caller (cmd, g_confirmCommand):
		return

	if yes:
		exec_command (g_confirmCommand)
	else:
		cmd['reply'] ('okay then')

	g_confirmCommand = None

#
# get_available_commands
#
# Gets a list of commands available to the given user
#
def get_available_commands (ident, host):
	result=[]

	for cmdname,cmd in Commands.iteritems():
		if not is_available (cmd, ident, host):
			continue

		result.append (cmdname)

	return result

#
# get_command_by_name
#
# Gets a command by name
#
def get_command_by_name (name):
	try:
		return Commands[name]
	except:
		return None

#
# make_regex
#
# Takes the argument list and returns a corresponding regular expression
#
def make_regex (arglist):
	if arglist == None:
		return '^.+$'

	gotoptional = False
	gotvariadic = False
	regex = ''

	for arg in arglist.split (' '):
		if gotvariadic:
			raise CommandError ('variadic argument is not last')

		if arg == '':
			continue
		
		gotliteral = False

		if arg[0] == '[' and arg[-1] == ']':
			arg = arg[1:-1]
			gotoptional = True
		elif arg[0] == '<' and arg[-1] == '>':
			if gotoptional:
				raise CommandError ('mandatory argument after optional one')

			arg = arg[1:-1]
		else:
			gotliteral = True

		if arg[-3:] == '...':
			gotvariadic = True
			arg = arg[0:-3]

		if gotoptional == False:
			regex += r'\s+'
		else:
			regex += r'\s*'

		if gotliteral:
			regex += arg
		elif gotoptional:
			if gotvariadic:
				regex += r'(.*)'
			else:
				regex += r'([^ ]*)'
		else:
			if gotvariadic:
				regex += r'(.+)'
			else:
				regex += r'([^ ]+)'
		#fi
	#done

	if not gotvariadic:
		regex += r'\s*'

	return '^[^ ]+%s$' % regex
#enddef

mercurial