import json import logging import sqlite3 import sys import threading from os import walk, path, makedirs, remove, rename, listdir from typing import List, Union, Any import discord from discord.ext import commands from dislash import InteractionClient, Option, OptionType bot_owner = 386629192743256065 class DB: @staticmethod def _prepare_db(guild: int): try: connect = sqlite3.connect('user.db') cursor = connect.cursor() create_table = (f'''CREATE TABLE IF NOT EXISTS "{guild}" ([userid] INTEGER PRIMARY KEY, [username] TEXT, [nick] TEXT, [isbot] BOOL, [track] TEXT) ''') cursor.execute(create_table) cursor.close() except sqlite3.Error as error: logging.error("failed to connecct db", error) @staticmethod def _work_with_db(db_func, data_turple): try: connect = sqlite3.connect('user.db') cursor = connect.cursor() cursor.execute(db_func, data_turple) connect.commit() cursor.close() except sqlite3.Error as error: logging.error("failed to connecct db", error) @staticmethod def _fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int): sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}" (username, userid, nick, isbot) VALUES (?, ?, ?, ?)""") data_tuple = (name, userid, nick, isbot) DB._work_with_db(sqlite_insert_with_param, data_tuple) @staticmethod def _add_audio(guildid: int, user: int, audio: str): sql_update_query = f"""UPDATE "{guildid}" set track = ? where userid = ?""" data_tuple = (audio, user) DB._work_with_db(sql_update_query, data_tuple) @staticmethod def _read_db(guild: int, user: int): try: sql_con = sqlite3.connect("user.db") cursor = sql_con.cursor() sql_read = f"""SELECT * FROM "{guild}" where userid = {user}""" cursor.execute(sql_read) record = cursor.fetchone() return record[4] except sqlite3.Error as error: logging.error("Failed to read sqlite table", error) class Cogs_prepare: @staticmethod def _cog_list(): cogs_list = [] for filename in listdir('./cogs'): if filename.endswith('.py'): cogs_list.append(filename[:-3]) return cogs_list @staticmethod async def _cogs_load(ctx): for filename in Cogs_prepare._cog_list(): if filename.endswith('.py'): bot.load_extension(f'cogs.{filename[:-3]}') print(f'Load cog {filename[:-3]}') else: print(f'Unable to load {filename[:-3]}') await ctx.reply('Cogs loaded') @staticmethod async def _cogs_unload(ctx): for filename in Cogs_prepare._cog_list(): bot.unload_extension(f'cogs.{filename[:-3]}') print(f'Unload cog {filename[:-3]}') await ctx.reply('Cogs unloaded') @staticmethod async def _cog_load(ctx, cog): try: bot.load_extension(f'cogs.{cog}') await ctx.reply(f'Loaded cog {cog}') except commands.ExtensionNotFound: await ctx.send(f"Error: {cog} couldn't be found to load.") except commands.ExtensionFailed: await ctx.send(f'Error: {cog} failed to load properly.') except commands.ExtensionError: await ctx.send(f'Error: unknown error with {cog}') threading.current_thread().name = "main" logging.basicConfig(stream=sys.stdout, filemode='w', level='INFO', format='%(asctime)s - %(levelname)s - %(threadName)s - %(message)s') DEFAULT_PREFIX = "$" # The prefix you want everyone to have if you don't define your own if not path.isfile('prefix.json'): with open('prefix.json', 'w+') as f: f.write("") f.close() def determine_prefix(bot, msg): guild = msg.guild base = Commands._read_json(guild.id, 'prefix') or DEFAULT_PREFIX # Get bot role return base intents = discord.Intents.default() intents.members = True bot = commands.Bot(command_prefix=determine_prefix, guild_subscriptions=True, intents=intents) inter_client = InteractionClient(bot) print(Cogs_prepare._cog_list()) for filename in Cogs_prepare._cog_list(): try: bot.load_extension(f'cogs.{filename}') logging.info(f'Loaded cog {filename}') except commands.ExtensionNotFound: logging.error(f"Error: {filename} couldn't be found to load.") except commands.ExtensionFailed: logging.error(f'Error: {filename} failed to load properly.') except commands.ExtensionError: logging.error(f'Error: unknown error with {filename[:-3]}') class Commands: @staticmethod async def set_prefix(ctx, prefix: str): await Commands._write_json(ctx.guild.id, "prefix", prefix) await ctx.send(f"Prefix set to: `{prefix}`") @staticmethod def list_files(_fold: str): fl = [] for filenames in walk(_fold): fl.extend(filenames) break files = {} for x in fl[2]: files[x] = x return files @staticmethod async def _read_json(guild: int, param): parameter = None try: with open('prefix.json', 'r', encoding='utf-8') as fp: # Open the JSON jsonObject = json.load(fp) # Load the custom prefixes except TypeError: jsonObject = {} if guild: # If the guild exists try: guild_conf = jsonObject[f"{guild}"] try: parameter = guild_conf[f"{param}"] except TypeError: pass except KeyError: pass fp.close() return parameter @staticmethod async def _write_json(guild, param_name, param): with open('prefix.json', 'r', encoding='utf-8') as fp: try: jsonObject = json.load(fp) except TypeError: jsonObject = {} try: jsonObject[f"{guild}"][f"{param_name}"] = param except KeyError: json_param = jsonObject[f"{guild}"] = {f'{param_name}': ''} jsonObject[str(guild)] = json_param.update({f"{param_name}": param}) with open("prefix.json", "w") as fl: json.dump(jsonObject, fl) fl.close() @bot.event async def on_ready(): for g in bot.get_all_members(): DB._prepare_db(g.guild.id) for g in bot.get_all_members(): DB._fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id) logging.info(f'Bot started') logging.info('We have logged in as {0.user}'.format(bot)) @bot.event async def on_guild_join(guild): for g in guild.members: DB._fill_bd(g.name, g.id, g.bot, g.nick, guild.id) @bot.event async def on_member_join(member): DB._fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id) bot_role = Commands._read_json(member.guild.id, 'bot_role') # Get bot role guest_role = Commands._read_json(member.guild.id, 'guest_role') # Get guest role if bot_role and guest_role: if member.bot == 0: role = discord.utils.get(member.guild.roles, id=guest_role) else: role = discord.utils.get(member.guild.roles, id=bot_role) logging.info(f"Adding to {member} role {role}") await member.add_roles(role) @bot.event async def on_member_update(before: discord.Member, after: discord.Member): sql_update_query = f"""UPDATE "{after.guild.id}" set nick = ? where userid = ?""" data_tuple = (after.nick, before.id) DB._work_with_db(sql_update_query, data_tuple) @bot.command(name="upload_audio") async def upload_audio(ctx, user=None): user = user or ctx.author if ctx.author.guild_permissions.administrator or user is ctx.author: if ctx.message.attachments: from os import error if not path.isdir(f'tmp/{user.id}'): try: makedirs(f'tmp/{user.id}') except error as error: logging.info(f"Failed to create dir", error) if not path.isdir(f'audio/{user.id}'): try: makedirs(f'audio/{user.id}') except error as error: logging.info(f"Failed to create dir", error) for at in ctx.message.attachments: import mimetypes await at.save(f'tmp/{user.id}/{at.filename}') guess = mimetypes.guess_type(f'tmp/{user.id}/{at.filename}') if guess[0]: from pymediainfo import MediaInfo file = f'tmp/{user.id}/{at.filename}' duration = round(MediaInfo.parse(file).tracks[0].duration / 1000) if duration > 15: await ctx.reply(f'Audio duration is {duration}, but max is 15') else: a = DB._read_db(ctx.guild.id, user.id) if a is None: audiolist = f'{user.id}/{at.filename}' else: audiolist = DB._read_db(ctx.guild.id, user.id) + ", " + f'{user.id}/{at.filename}' DB._add_audio(ctx.guild.id, user.id, audiolist) rename(f'tmp/{user.id}/{at.filename}', f'audio/{user.id}/{at.filename}') else: await ctx.reply(f'Failed to find MIME type of {at.filename}') remove(f'tmp/{user.id}/{at.filename}') else: await ctx.reply("Has no Attachment") else: await ctx.reply(f'You`re not admin. You can add audio only for your own account') @bot.command(name="set_prefix") @commands.has_permissions(administrator=True) async def command_setprefix(ctx, prefix: str): await self.Commands.set_prefix(ctx, prefix) @inter_client.slash_command( name="set_prefix", description="Settin up bot prefix", options=[ Option("prefix", "Specify prefix", OptionType.STRING, required=True), ] ) async def slash_setprefix(ctx, prefix: str): await Commands.set_prefix(ctx, prefix) @inter_client.slash_command( name="info", description="Read list of tracks for user", options=[ Option("user", "Specify any user", OptionType.USER), ] ) async def info(ctx, user=None): user = user or ctx.author audio = DB._read_db(ctx.guild.id, user.id) rolelist = [r.mention for r in user.roles if r != ctx.guild.default_role] if rolelist: roles = "\n".join(rolelist) else: roles = "Not added any role" if audio is None: audios = "Not selected audio" else: audios = "• " + "\n• ".join(sorted(audio.split(", "))) emb = discord.Embed( title=f"General information", description=f"General information on server about {user}", icon=user.avatar_url ) emb.set_thumbnail(url=user.avatar_url) emb.add_field(name="General info", value=f"Username: {user}\n" f"Nickname: {user.nick}\n" f"Joined at: {user.joined_at.strftime('%A, %B %d %Y @ %H:%M:%S')}", inline=False) emb.add_field(name="Audio list", value=f"{audios}", inline=True) emb.add_field(name="Roles list", value=f"{roles}", inline=True) emb.set_footer(text="Information requested by: {}".format(ctx.author.display_name)) await ctx.reply(embed=emb) @inter_client.slash_command( name="set_trigger_role", description="Setting up role to trigger bot", options=[ Option("role", "Specify role", OptionType.ROLE, required=True), ] ) @commands.has_permissions(administrator=True) async def set_trigger_role(ctx, role): await Commands._write_json(ctx.guild.id, "tigger_role", role.id) await ctx.send(f"Role to trigger set to : `{role.name}`") @inter_client.slash_command( name="set_bot_role", description="Set Default bot role", options=[ Option("role", "Specify role", OptionType.ROLE, required=True), ] ) @commands.has_permissions(administrator=True) async def set_bot_role(ctx, role): await Commands._write_json(ctx.guild.id, "bot_role", role.id) await ctx.send(f"Setted up bot role to: `{role.name}`") @inter_client.slash_command( name="set_guest_role", description="Set Default bot role", options=[ Option("role", "Specify role", OptionType.ROLE, required=True), ] ) @commands.has_permissions(administrator=True) async def set_guest_role(ctx, role): await Commands._write_json(ctx.guild.id, "guest_role", role.id) await ctx.send(f"Setted up bot role to: `{role.name}`") bot.run('OTQ3OTUzOTAxNzgzNjIxNjYy.GTXbMv.KrztaTO7-ivsPEAVjsyikSQ-GP-ANwULmDraig')