# Import Dependencies import os import pymongo import discord from discord.ext import commands, tasks # Set Intents intents = discord.Intents.all() intents.typing = True intents.presences = True intents.members = True caches = discord.MemberCacheFlags.all() # Set Prefix p = '¬' # Define Global State Dictionary state = {} # Create Clients dbClient = pymongo.MongoClient(host='geasbot-db', username=os.environ['MONGO_INITDB_ROOT_USERNAME'], password=os.environ['MONGO_INITDB_ROOT_PASSWORD'], authsource='admin', serverSelectionTimeoutMS=1000) client = commands.Bot(command_prefix=p, description=f'Geas Server Bot v {os.getenv("BOT_VERSION")}. This is a bot to facilitate setting up game channels on the Geas server. The prefix for the bot is {p}. You can interact with and manipulate game channels that have been created with this bot by @-mentioning the relevant role associated with the game.', intents=intents) # Define Game Times Dictionary gameTimes = { 'wed': 'WED', 'sunaft': 'SUN AFT', 'suneve': 'SUN EVE', 'oneshot': 'ONE SHOT' } # Reference Time Codes def gameTime(arg): return gameTimes.get(arg, 'OTHER') # List Time Codes def timeSlotList(): l = [] for t in gameTimes: l.append(gameTimes[t]) l.append('OTHER') return l # Lookup Game Time Slot def dbFindTimeslot(guild, role): db = dbClient[str(guild.id)] if role.name.split(': ',maxsplit=1)[0] not in timeSlotList(): raise commands.CommandError(f'Invalid lookup value: {role.mention} is not a valid game role.') try: for c in db.list_collection_names(): ret = db[c].find_one({'role':role.id}) if ret != None and c != 'settings': return c except: return role.name.split(': ',maxsplit=1)[0] # Lookup Category from Role def dbLookupRole(guild, role): db = dbClient[str(guild.id)] colName = dbFindTimeslot(guild, role) try: catID = db[colName].find_one({'role':role.id})['category'] except: for cat in guild.categories: if cat.name == role.name: catID = cat.id break try: return guild.get_channel(catID) except: raise commands.CommandError('Error: The game\'s corresponding category cannot be matched.') # Get Settings from DB def dbGetSettings(): try: for db in dbClient.list_database_names(): if db not in ['admin','config','local']: state[db] = { 'settings': dbClient[db]['settings'].find_one({'_id':0}).copy() } print('Imported settings from database to local state.',state) except Exception as err: print('Failed to get Settings from Database due to error: ',err) # Get list of game role IDs on server def gameRoleIDList(guild): l = [] for r in guild.roles: if r.name.split(': ',maxsplit=1)[0] in timeSlotList(): l.append(r.id) return l # Get list of game role IDs in DB def gameRoleDBList(guild): dbName = str(guild.id) db = dbClient[dbName] l = [] try: for ts in timeSlotList(): for e in db[ts].find(): l.append(e['role']) return l except: pass # Get list of game category IDs in DB def gameCategoryDBList(guild): dbName = str(guild.id) db = dbClient[dbName] l = [] try: for ts in timeSlotList(): for e in db[ts].find(): l.append(e['category']) return l except: pass # Get list of GM IDs in DB def gmDBList(guild): dbName = str(guild.id) db = dbClient[dbName] l = [] try: for ts in timeSlotList(): for e in db[ts].find(): l.append(e['gm']) return l except: pass # Sync Games on Server def syncGames(guild): dbName = str(guild.id) db = dbClient[dbName] try: for ts in timeSlotList(): for e in db[ts].find(): if e['role'] not in gameRoleIDList(guild): db[ts].delete_many({'role':e['role']}) for r in guild.roles: if r.name.split(': ',maxsplit=1)[0] in timeSlotList(): if r.id not in gameRoleDBList(guild): gameName = r.name.split(': ',maxsplit=1)[1] colName = r.name.split(': ',maxsplit=1)[0] for c in guild.categories: if c.name == r.name: break permissions = c.overwrites for p in permissions: if isinstance(p,discord.Member) and permissions[p].manage_channels: break g = { 'game': gameName, 'gm': p.id, 'category': c.id, 'capacity': 5, 'role': r.id } db[colName].replace_one({'role':g['role']}, g, upsert=True) for c in guild.categories: if c.name.split(': ',maxsplit=1)[0] in timeSlotList(): if c.id not in gameCategoryDBList(guild): for r in guild.roles: if r.name == c.name: break if r.name != c.name: break colName = r.name.split(': ',maxsplit=1)[0] db[colName].update_one({'role':r.id},{'$set':{'category':c.id}}) for p in c.overwrites: if isinstance(p,discord.Member) and c.overwrites[p].manage_channels: break if p.id not in gmDBList(guild): for r in guild.roles: if r.name == c.name: break if r.name != c.name: break colName = r.name.split(': ',maxsplit=1)[0] db[colName].update_one({'role':r.id},{'$set':{'gm':p.id}}) print(f'Synced database for server {guild.name}') except: pass # Sync for Each Guild @tasks.loop(hours=1.0) async def syncGuilds(): try: for guild in client.guilds: syncGames(guild) print('Synced database with server games list.') except: pass @client.event async def on_command_error(ctx,error): if isinstance(error, commands.CommandNotFound): await ctx.send(f'Invalid command. Please use `{p}help` to see a list of available commands.') else: await ctx.channel.send(error) # On Ready @client.event async def on_ready(): await client.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=f'{p} commands')) print(f'Bot has logged in as {client.user.name} responding to prefix {p}') print(f'Geas Server Bot version {os.getenv("BOT_VERSION")} by Vivek Santayana') dbGetSettings() syncGuilds.start() # Check Bot Role is Defined def botrole_is_defined(): async def predicate(ctx): try: return state[str(ctx.guild.id)]['settings']['botrole'] != None except: raise commands.CommandError(f'Bot role has not been defined. Please set the bot role using the `{p}definebotrole` command first.') return commands.check(predicate) # Check if invoked in valid game channel def in_game_channel(ctx): categoriesList = [] db = dbClient[str(ctx.guild.id)] try: try: for c in db.list_collection_names(): ret = db[c].find({}) for e in ret: if ctx.channel.category == ctx.guild.get_channel(e['category']): return True except: if ctx.channel.category.name.split(': ',maxsplit=1)[0] in timeSlotList(): return True except: pass if ctx.command.name == 'deletegame': raise commands.CommandError('Error: you must invoke this command in a text channel corresponding to the game you are attempting to delete.') raise commands.CommandError(f'Error: you must invoke this command in the text channel of your game.') # setupGame command: , <@GM> , @client.command(name='setupgame', aliases=['setup','gamesetup','creategame','gamecreate','create'], description='Use this command to set up the roles and channels for a new game. The syntax is `setup {wed|sunaft|suneve|oneshot|other} {@GM Name} {Capacity} {Name of game}`') @commands.has_permissions(administrator=True) @botrole_is_defined() async def setupGame(ctx,arg1,arg2, arg3: int, *, arg4): if not (arg2.startswith('<@') and not (arg2.startswith('<@&'))): raise commands.CommandError('Invalid argument. The second parameter must @ the GM.') dbName = str(ctx.guild.id) db = dbClient[dbName] colName = gameTime(arg1.lower()) if colName == 'OTHER': await ctx.channel.send('Time code not recognised. Game will be categorised as \'Other\'.') await ctx.channel.trigger_typing() gm = int(arg2.replace('<@', '').replace('>', '').replace('!', '')) gmMember = await ctx.guild.fetch_member(gm) cap = int(arg3) gameTitle = f'{colName}: {arg4}' roleExists = False for r in ctx.guild.roles: if r.name == gameTitle: roleExists = True break if not roleExists: r = await ctx.guild.create_role(name=gameTitle) await r.edit(mentionable=True) await gmMember.add_roles(r) categoryExists = False for c in ctx.guild.categories: if c.name == gameTitle: categoryExists = True break ret = state[dbName]['settings'] bots = ctx.guild.get_role(ret['botrole']) permissions = { ctx.guild.default_role: discord.PermissionOverwrite(read_messages=False), r: discord.PermissionOverwrite(read_messages=True), bots: discord.PermissionOverwrite(read_messages=True), gmMember: discord.PermissionOverwrite(read_messages=True, manage_messages=True, manage_channels=True, manage_permissions=True,priority_speaker=True,move_members=True,mute_members=True,deafen_members=True), } if not categoryExists: c = await ctx.guild.create_category(name=gameTitle, overwrites=permissions) await c.create_voice_channel(name=f'voice: {gameTitle}', topic=f'Default voice channel for {gameTitle}') t = await c.create_text_channel(name=f'text: {gameTitle}', topic=f'Default text channel for {gameTitle}') await ctx.channel.send(f'Game {r.mention} has been created with GM {gmMember.mention} and space for {cap} players.') await t.send(f'Hello, {gmMember.mention}! Your game channels for {gameTitle} have now been set up.\nYou can also ping your players or interact with the bot commands by mentioning the {r.mention} role.') else: await c.edit(overwrites=permissions) tPos = len(ctx.guild.channels) tFirst = None vExists = False for t in c.text_channels: if t.position <= tPos: tFirst = t tPos = t.position await t.edit(sync_permissions=True) for v in c.voice_channels: await v.edit(sync_permissions=True) vExists = True await ctx.channel.send(f'The category for game {r.mention} has been reset for GM {gmMember.mention} with space for {cap} players.') if tFirst == None: tFirst = await c.create_text_channel(name=f'text: {gameTitle}', topic=f'Default text channel for {gameTitle}') if not vExists: await c.create_voice_channel(name=f'voice: {gameTitle}', topic=f'Default voice channel for {gameTitle}') await tFirst.send(f'Hello, {gmMember.mention}! Your game channels for {gameTitle} have now been set up.\nYou can also ping your players or interact with the bot commands by mentioning the {r.mention} role.') g = { 'game': arg4, 'gm': gm, 'capacity': cap, 'category': c.id, 'role': r.id } try: db[colName].replace_one({'role':g['role']}, g , upsert=True) except: pass @setupGame.error async def clear_error(ctx, error): if isinstance(error, commands.BadArgument): await ctx.channel.send('The number of players in the game must be an integer.') raise error # defineBotrole command @client.command(name='definebotrole', aliases=['setbotrole','botrole','botroleset','botroledefine','br'], description='This is a set-up command to define which role is the botrole that the dice bot use. The syntax is `botrole {@Role}`') @commands.has_permissions(administrator=True) async def defineBotrole(ctx, arg): if not arg.startswith('<@&'): raise commands.CommandError('Invalid argument. The argument must @ a role.') dbName = str(ctx.guild.id) db = dbClient[dbName] colName = 'settings' r = ctx.guild.get_role(int(arg[3:-1])) if dbName not in state: state[dbName] = {} if 'settings' not in state[dbName]: state[dbName]['settings'] = {} ret = state[dbName]['settings'] ret['_id'] = 0 if 'botrole' not in ret: await ctx.channel.send(f'Bot role for server {ctx.guild.name} not set. Setting {r.mention} to bot role now.') else: await ctx.channel.send(f'Bot role for server {ctx.guild.name} has already been set to {ctx.guild.get_role(ret["botrole"]).mention}. Updating to {r.mention}.') ret['botrole'] = r.id try: db[colName].update_one({'_id':0},{'$set':{'botrole': r.id}}, upsert=True) except: pass # deleteGame command @client.command(name='deletegame', aliases=['delete','del', 'removegame', 'delgame', 'gamedel', 'gamedelete'], description='Use this command to delete the role and associated channels for a game. The syntax is `delete {@Game Role}`. **It must be called in a text channel inside the relevant game.**') @commands.has_permissions(administrator=True) @commands.check(in_game_channel) async def deleteGame(ctx, arg): if not arg.startswith('<@&'): raise commands.CommandError('Invalid argument. The argument must @ a role.') r = ctx.guild.get_role(int(arg[3:-1])) cat = dbLookupRole(ctx.guild, r) dbName = str(ctx.guild.id) db = dbClient[dbName] await ctx.channel.trigger_typing() try: colName = dbFindTimeslot(ctx.guild,r) except: raise commands.CommandError(f'Invalid argument. {r.mention} role is not a valid game role.') for t in ctx.guild.text_channels: if t.category == cat: await t.delete() for v in ctx.guild.voice_channels: if v.category == cat: await v.delete() await cat.delete() try: db[colName].delete_one({'role':r.id}) except: pass await r.delete() # reset command reset wed|sunaft|suneve|oneshot|all|other @client.command(name='reset', aliases=['deleteall','delall','cleargames','clear'], description='This deletes all games in a particular time slot category. This is a very powerful command. Be careful when you use it. The syntax is `reset {wed|sunaft|suneve|oneshot|other|all}`') @commands.has_permissions(administrator=True) async def reset(ctx, *args): delList = [] dbName = str(ctx.guild.id) db = dbClient[dbName] await ctx.channel.trigger_typing() for a in args: if a.lower() not in ['all', 'other', 'wed', 'sunaft', 'suneve', 'oneshot']: raise commands.CommandError(f'Invalid argument. {a} is not a valid flag for the command.') if a.lower() == 'all': for l in ['wed', 'sunaft', 'suneve', 'oneshot','other']: if l not in delList: delList.append(l) else: if a.lower() not in delList: delList.append(a.lower()) for d in delList: colName = gameTime(d) try: cur = db[colName].find({}) for g in cur: await ctx.guild.get_role(g['role']).delete() cat = ctx.guild.get_channel(g['category']) for c in cat.channels: await c.delete() await cat.delete() db[colName].deleteMany({}) except: for r in ctx.guild.roles: if r.name.startswith(colName): for cat in ctx.guild.categories: if cat.name == r.name: for c in cat.channels: await c.delete() await cat.delete() await r.delete() await ctx.channel.send(f'All games for {colName} have been deleted.') # Migrate Guild from Old Server Settings to New Settings @client.command(name='migrate', aliases=['migrategames','migratedata'], description='A set-up command to migrate games from the old server settings to the new server settings using the database for the first time.') @commands.has_permissions(administrator=True) async def migrateData(ctx): await ctx.channel.trigger_typing() dbName = str(ctx.guild.id) db = dbClient[dbName] gNum = 0 for r in ctx.guild.roles: if r.name.split(': ',maxsplit=1)[0] in timeSlotList(): gNum += 1 await r.edit(mentionable=True) gameName = r.name.split(': ',maxsplit=1)[1] colName = r.name.split(': ',maxsplit=1)[0] for c in ctx.guild.categories: if c.name == r.name: break permissions = c.overwrites for p in permissions: if isinstance(p,discord.Member) and permissions[p].manage_channels: break g = { 'game': gameName, 'gm': p.id, 'capacity': 5, 'category': c.id, 'role': r.id } try: db[colName].replace_one({'role':g['role']}, g , upsert=True) except: raise commands.CommandError('Error: Database connection failed. Could not migrate games to database.') await ctx.channel.send(f'Finished migrating {gNum} games onto the database.') # Easter egg for Alan @client.command() async def alan(ctx): if ctx.author.id != 355857207205691392: alan = await ctx.guild.fetch_member(355857207205691392) raise commands.CommandError(f'Error: This is a powerful, super secret master command that only {alan.mention} can use. Do **NOT** invoke this if you are not Alan. ~~It is a command that can do great evil in the wrong hands.~~') await ctx.send(f'Hi {alan.mention}. In order to execute this script, you must click the following link:\n\nUse it wisely.') # Import Cogs for cogfile in os.listdir('./cogs'): if cogfile.endswith('.py'): client.load_extension(f'cogs.{cogfile[:-3]}') # Run Bot client.run(os.getenv('TEST_TOKEN'))