geas-bot/app/bot.py

458 lines
18 KiB
Python

# Import Dependencies
import os
import pymongo
import discord
from discord.ext import commands, tasks
# Set Intents
intents = discord.Intents.all()
intents.typing = True
intents.presences = True
intents.members = True
caches = discord.MemberCacheFlags.all()
# Set Prefix
p = '¬'
# Define Global State Dictionary
state = {}
# Create Clients
dbClient = pymongo.MongoClient(host='geasbot-db', username=os.environ['MONGO_INITDB_ROOT_USERNAME'], password=os.environ['MONGO_INITDB_ROOT_PASSWORD'], authsource='admin', serverSelectionTimeoutMS=1000)
client = commands.Bot(command_prefix=p, description=f'Geas Server Bot v {os.getenv("BOT_VERSION")}. This is a bot to facilitate setting up game channels on the Geas server. The prefix for the bot is {p}. You can interact with and manipulate game channels that have been created with this bot by @-mentioning the relevant role associated with the game.', intents=intents)
# Define Game Times Dictionary
gameTimes = {
'wed': 'WED',
'sunaft': 'SUN AFT',
'suneve': 'SUN EVE',
'oneshot': 'ONE SHOT'
}
# Reference Time Codes
def gameTime(arg):
return gameTimes.get(arg, 'OTHER')
# List Time Codes
def timeSlotList():
l = []
for t in gameTimes:
l.append(gameTimes[t])
l.append('OTHER')
return l
# Lookup Game Time Slot
def dbFindTimeslot(guild, role):
db = dbClient[str(guild.id)]
if role.name.split(': ',maxsplit=1)[0] not in timeSlotList():
raise commands.CommandError(f'Invalid lookup value: {role.mention} is not a valid game role.')
try:
for c in db.list_collection_names():
ret = db[c].find_one({'role':role.id})
if ret != None and c != 'settings':
return c
except:
return role.name.split(': ',maxsplit=1)[0]
# Lookup Category from Role
def dbLookupRole(guild, role):
db = dbClient[str(guild.id)]
colName = dbFindTimeslot(guild, role)
try:
catID = db[colName].find_one({'role':role.id})['category']
except:
for cat in guild.categories:
if cat.name == role.name:
catID = cat.id
break
try:
return guild.get_channel(catID)
except:
raise commands.CommandError('Error: The game\'s corresponding category cannot be matched.')
# Get Settings from DB
def dbGetSettings():
try:
for db in dbClient.list_database_names():
if db not in ['admin','config','local']:
state[db] = {
'settings': dbClient[db]['settings'].find_one({'_id':0}).copy()
}
print('Imported settings from database to local state.',state)
except Exception as err:
print('Failed to get Settings from Database due to error: ',err)
# Get list of game role IDs on server
def gameRoleIDList(guild):
l = []
for r in guild.roles:
if r.name.split(': ',maxsplit=1)[0] in timeSlotList():
l.append(r.id)
return l
# Get list of game role IDs in DB
def gameRoleDBList(guild):
dbName = str(guild.id)
db = dbClient[dbName]
l = []
try:
for ts in timeSlotList():
for e in db[ts].find():
l.append(e['role'])
return l
except:
pass
# Get list of game category IDs in DB
def gameCategoryDBList(guild):
dbName = str(guild.id)
db = dbClient[dbName]
l = []
try:
for ts in timeSlotList():
for e in db[ts].find():
l.append(e['category'])
return l
except:
pass
# Get list of GM IDs in DB
def gmDBList(guild):
dbName = str(guild.id)
db = dbClient[dbName]
l = []
try:
for ts in timeSlotList():
for e in db[ts].find():
l.append(e['gm'])
return l
except:
pass
# Sync Games on Server
def syncGames(guild):
dbName = str(guild.id)
db = dbClient[dbName]
try:
for ts in timeSlotList():
for e in db[ts].find():
if e['role'] not in gameRoleIDList(guild):
db[ts].delete_many({'role':e['role']})
for r in guild.roles:
if r.name.split(': ',maxsplit=1)[0] in timeSlotList():
if r.id not in gameRoleDBList(guild):
gameName = r.name.split(': ',maxsplit=1)[1]
colName = r.name.split(': ',maxsplit=1)[0]
for c in guild.categories:
if c.name == r.name:
break
permissions = c.overwrites
for p in permissions:
if isinstance(p,discord.Member) and permissions[p].manage_channels:
break
g = {
'game': gameName,
'gm': p.id,
'category': c.id,
'capacity': 5,
'role': r.id
}
db[colName].replace_one({'role':g['role']}, g, upsert=True)
for c in guild.categories:
if c.name.split(': ',maxsplit=1)[0] in timeSlotList():
if c.id not in gameCategoryDBList(guild):
for r in guild.roles:
if r.name == c.name:
break
if r.name != c.name:
break
colName = r.name.split(': ',maxsplit=1)[0]
db[colName].update_one({'role':r.id},{'$set':{'category':c.id}})
for p in c.overwrites:
if isinstance(p,discord.Member) and c.overwrites[p].manage_channels:
break
if p.id not in gmDBList(guild):
for r in guild.roles:
if r.name == c.name:
break
if r.name != c.name:
break
colName = r.name.split(': ',maxsplit=1)[0]
db[colName].update_one({'role':r.id},{'$set':{'gm':p.id}})
print(f'Synced database for server {guild.name}')
except:
pass
# Sync for Each Guild
@tasks.loop(hours=1.0)
async def syncGuilds():
try:
for guild in client.guilds:
syncGames(guild)
print('Synced database with server games list.')
except:
pass
@client.event
async def on_command_error(ctx,error):
if isinstance(error, commands.CommandNotFound):
await ctx.send(f'Invalid command. Please use `{p}help` to see a list of available commands.')
else:
await ctx.channel.send(error)
# On Ready
@client.event
async def on_ready():
await client.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=f'{p} commands'))
print(f'Bot has logged in as {client.user.name} responding to prefix {p}')
print(f'Geas Server Bot version {os.getenv("BOT_VERSION")} by Vivek Santayana')
dbGetSettings()
syncGuilds.start()
# Check Bot Role is Defined
def botrole_is_defined():
async def predicate(ctx):
try:
return state[str(ctx.guild.id)]['settings']['botrole'] != None
except:
raise commands.CommandError(f'Bot role has not been defined. Please set the bot role using the `{p}definebotrole` command first.')
return commands.check(predicate)
# Check if invoked in valid game channel
def in_game_channel(ctx):
categoriesList = []
db = dbClient[str(ctx.guild.id)]
try:
try:
for c in db.list_collection_names():
ret = db[c].find({})
for e in ret:
if ctx.channel.category == ctx.guild.get_channel(e['category']):
return True
except:
if ctx.channel.category.name.split(': ',maxsplit=1)[0] in timeSlotList():
return True
except:
pass
if ctx.command.name == 'deletegame':
raise commands.CommandError('Error: you must invoke this command in a text channel corresponding to the game you are attempting to delete.')
raise commands.CommandError(f'Error: you must invoke this command in the text channel of your game.')
# setupGame command: <when>, <@GM> <capacity>, <Name of game>
@client.command(name='setupgame', aliases=['setup','gamesetup','creategame','gamecreate','create'], description='Use this command to set up the roles and channels for a new game. The syntax is `setup {wed|sunaft|suneve|oneshot|other} {@GM Name} {Capacity} {Name of game}`')
@commands.has_permissions(administrator=True)
@botrole_is_defined()
async def setupGame(ctx,arg1,arg2, arg3: int, *, arg4):
if not (arg2.startswith('<@') and not (arg2.startswith('<@&'))):
raise commands.CommandError('Invalid argument. The second parameter must @ the GM.')
dbName = str(ctx.guild.id)
db = dbClient[dbName]
colName = gameTime(arg1.lower())
if colName == 'OTHER':
await ctx.channel.send('Time code not recognised. Game will be categorised as \'Other\'.')
await ctx.channel.trigger_typing()
gm = int(arg2.replace('<@', '').replace('>', '').replace('!', ''))
gmMember = await ctx.guild.fetch_member(gm)
cap = int(arg3)
gameTitle = f'{colName}: {arg4}'
roleExists = False
for r in ctx.guild.roles:
if r.name == gameTitle:
roleExists = True
break
if not roleExists:
r = await ctx.guild.create_role(name=gameTitle)
await r.edit(mentionable=True)
await gmMember.add_roles(r)
categoryExists = False
for c in ctx.guild.categories:
if c.name == gameTitle:
categoryExists = True
break
ret = state[dbName]['settings']
bots = ctx.guild.get_role(ret['botrole'])
permissions = {
ctx.guild.default_role: discord.PermissionOverwrite(read_messages=False),
r: discord.PermissionOverwrite(read_messages=True),
bots: discord.PermissionOverwrite(read_messages=True),
gmMember: discord.PermissionOverwrite(read_messages=True, manage_messages=True, manage_channels=True, manage_permissions=True,priority_speaker=True,move_members=True,mute_members=True,deafen_members=True),
}
if not categoryExists:
c = await ctx.guild.create_category(name=gameTitle, overwrites=permissions)
await c.create_voice_channel(name=f'voice: {gameTitle}', topic=f'Default voice channel for {gameTitle}')
t = await c.create_text_channel(name=f'text: {gameTitle}', topic=f'Default text channel for {gameTitle}')
await ctx.channel.send(f'Game {r.mention} has been created with GM {gmMember.mention} and space for {cap} players.')
await t.send(f'Hello, {gmMember.mention}! Your game channels for {gameTitle} have now been set up.\nYou can also ping your players or interact with the bot commands by mentioning the {r.mention} role.')
else:
await c.edit(overwrites=permissions)
tPos = len(ctx.guild.channels)
tFirst = None
vExists = False
for t in c.text_channels:
if t.position <= tPos:
tFirst = t
tPos = t.position
await t.edit(sync_permissions=True)
for v in c.voice_channels:
await v.edit(sync_permissions=True)
vExists = True
await ctx.channel.send(f'The category for game {r.mention} has been reset for GM {gmMember.mention} with space for {cap} players.')
if tFirst == None:
tFirst = await c.create_text_channel(name=f'text: {gameTitle}', topic=f'Default text channel for {gameTitle}')
if not vExists:
await c.create_voice_channel(name=f'voice: {gameTitle}', topic=f'Default voice channel for {gameTitle}')
await tFirst.send(f'Hello, {gmMember.mention}! Your game channels for {gameTitle} have now been set up.\nYou can also ping your players or interact with the bot commands by mentioning the {r.mention} role.')
g = {
'game': arg4,
'gm': gm,
'capacity': cap,
'category': c.id,
'role': r.id
}
try:
db[colName].replace_one({'role':g['role']}, g , upsert=True)
except:
pass
@setupGame.error
async def clear_error(ctx, error):
if isinstance(error, commands.BadArgument):
await ctx.channel.send('The number of players in the game must be an integer.')
raise error
# defineBotrole command
@client.command(name='definebotrole', aliases=['setbotrole','botrole','botroleset','botroledefine','br'], description='This is a set-up command to define which role is the botrole that the dice bot use. The syntax is `botrole {@Role}`')
@commands.has_permissions(administrator=True)
async def defineBotrole(ctx, arg):
if not arg.startswith('<@&'):
raise commands.CommandError('Invalid argument. The argument must @ a role.')
dbName = str(ctx.guild.id)
db = dbClient[dbName]
colName = 'settings'
r = ctx.guild.get_role(int(arg[3:-1]))
if dbName not in state:
state[dbName] = {}
if 'settings' not in state[dbName]:
state[dbName]['settings'] = {}
ret = state[dbName]['settings']
ret['_id'] = 0
if 'botrole' not in ret:
await ctx.channel.send(f'Bot role for server {ctx.guild.name} not set. Setting {r.mention} to bot role now.')
else:
await ctx.channel.send(f'Bot role for server {ctx.guild.name} has already been set to {ctx.guild.get_role(ret["botrole"]).mention}. Updating to {r.mention}.')
ret['botrole'] = r.id
try:
db[colName].update_one({'_id':0},{'$set':{'botrole': r.id}}, upsert=True)
except:
pass
# deleteGame command
@client.command(name='deletegame', aliases=['delete','del', 'removegame', 'delgame', 'gamedel', 'gamedelete'], description='Use this command to delete the role and associated channels for a game. The syntax is `delete {@Game Role}`. **It must be called in a text channel inside the relevant game.**')
@commands.has_permissions(administrator=True)
@commands.check(in_game_channel)
async def deleteGame(ctx, arg):
if not arg.startswith('<@&'):
raise commands.CommandError('Invalid argument. The argument must @ a role.')
r = ctx.guild.get_role(int(arg[3:-1]))
cat = dbLookupRole(ctx.guild, r)
dbName = str(ctx.guild.id)
db = dbClient[dbName]
await ctx.channel.trigger_typing()
try:
colName = dbFindTimeslot(ctx.guild,r)
except:
raise commands.CommandError(f'Invalid argument. {r.mention} role is not a valid game role.')
for t in ctx.guild.text_channels:
if t.category == cat:
await t.delete()
for v in ctx.guild.voice_channels:
if v.category == cat:
await v.delete()
await cat.delete()
try:
db[colName].delete_one({'role':r.id})
except:
pass
await r.delete()
# reset command reset wed|sunaft|suneve|oneshot|all|other
@client.command(name='reset', aliases=['deleteall','delall','cleargames','clear'], description='This deletes all games in a particular time slot category. This is a very powerful command. Be careful when you use it. The syntax is `reset {wed|sunaft|suneve|oneshot|other|all}`')
@commands.has_permissions(administrator=True)
async def reset(ctx, *args):
delList = []
dbName = str(ctx.guild.id)
db = dbClient[dbName]
await ctx.channel.trigger_typing()
for a in args:
if a.lower() not in ['all', 'other', 'wed', 'sunaft', 'suneve', 'oneshot']:
raise commands.CommandError(f'Invalid argument. {a} is not a valid flag for the command.')
if a.lower() == 'all':
for l in ['wed', 'sunaft', 'suneve', 'oneshot','other']:
if l not in delList:
delList.append(l)
else:
if a.lower() not in delList:
delList.append(a.lower())
for d in delList:
colName = gameTime(d)
try:
cur = db[colName].find({})
for g in cur:
await ctx.guild.get_role(g['role']).delete()
cat = ctx.guild.get_channel(g['category'])
for c in cat.channels:
await c.delete()
await cat.delete()
db[colName].deleteMany({})
except:
for r in ctx.guild.roles:
if r.name.startswith(colName):
for cat in ctx.guild.categories:
if cat.name == r.name:
for c in cat.channels:
await c.delete()
await cat.delete()
await r.delete()
await ctx.channel.send(f'All games for {colName} have been deleted.')
# Migrate Guild from Old Server Settings to New Settings
@client.command(name='migrate', aliases=['migrategames','migratedata'], description='A set-up command to migrate games from the old server settings to the new server settings using the database for the first time.')
@commands.has_permissions(administrator=True)
async def migrateData(ctx):
await ctx.channel.trigger_typing()
dbName = str(ctx.guild.id)
db = dbClient[dbName]
gNum = 0
for r in ctx.guild.roles:
if r.name.split(': ',maxsplit=1)[0] in timeSlotList():
gNum += 1
await r.edit(mentionable=True)
gameName = r.name.split(': ',maxsplit=1)[1]
colName = r.name.split(': ',maxsplit=1)[0]
for c in ctx.guild.categories:
if c.name == r.name:
break
permissions = c.overwrites
for p in permissions:
if isinstance(p,discord.Member) and permissions[p].manage_channels:
break
g = {
'game': gameName,
'gm': p.id,
'capacity': 5,
'category': c.id,
'role': r.id
}
try:
db[colName].replace_one({'role':g['role']}, g , upsert=True)
except:
raise commands.CommandError('Error: Database connection failed. Could not migrate games to database.')
await ctx.channel.send(f'Finished migrating {gNum} games onto the database.')
# Import Cogs
for cogfile in os.listdir('./cogs'):
if cogfile.endswith('.py'):
client.load_extension(f'cogs.{cogfile[:-3]}')
# Run Bot
client.run(os.getenv('TEST_TOKEN'))