import json import logging import sqlite3 from os import walk, listdir, path from enum import Enum class DB: @staticmethod async def prepare_db(guild: int): try: connect = sqlite3.connect('user.db') cursor = connect.cursor() create_table = (f'''CREATE TABLE IF NOT EXISTS "{guild}" ([userid] INTEGER PRIMARY KEY, [username] TEXT, [nick] TEXT, [isbot] BOOL, [track] TEXT) ''') cursor.execute(create_table) cursor.close() except sqlite3.Error as _error: logging.error("failed to connect db", _error) @staticmethod async def work_with_db(db_func: str, data_turple: tuple): """ Writing to db per server userinfo :param db_func: :param data_turple: """ try: connect = sqlite3.connect('user.db') cursor = connect.cursor() cursor.execute(db_func, data_turple) connect.commit() cursor.close() except sqlite3.Error as _error: logging.error("failed to connect db", _error) @staticmethod async def fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int): sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}" (username, userid, nick, isbot) VALUES (?, ?, ?, ?)""") data_tuple: tuple[str, int, str, bool] = (name, userid, nick, isbot) await DB.work_with_db(sqlite_insert_with_param, data_tuple) @staticmethod async def add_audio(guild: int, user: int, audio: str, track: str = 'usertracks'): """ Adding audio into folder and DB :param guild: Guild id :param user: :param audio: :param track: usertracks or defaulttracks """ # audio = f'{DB.read_db(guild, user, track)}, {audio}' logging.error(f"Guild id is: {guild}\n" f"\t\t\t\t\tUser id is: {user}\n" f"\t\t\t\t\taudio is: {audio}\n" f"\t\t\t\t\tcolumn is {track}\n" f"\t\t\t\t\t---------------") sql_update_query = f"""UPDATE "{guild}" set {track} = ? where userid = ?""" data_tuple = (audio, user) logging.info(f'Func add_audio:\n' f'\t\t\t\t\tQuery: {sql_update_query}\n' f'\t\t\t\t\tTurple: {data_tuple}\n' f'\t\t\t\t\tAudio: {audio}\n' f'\t\t\t\t\t-----------------') await DB.work_with_db(sql_update_query, data_tuple) @staticmethod async def read_db(guild: int, user: int, column: str): _col_dict = {'userid': 0, 'username': 1, 'nick': 2, 'isbot': 3, 'defaulttracks': 4, 'usertracks': 5} try: sql_con = sqlite3.connect("user.db") cursor = sql_con.cursor() sql_read = f"""SELECT * FROM "{guild}" where userid = {user}""" cursor.execute(sql_read) record = cursor.fetchone() logging.info(f'Func read_db:\n' f'\t\t\t\t\tTrack: {column}\n' f'\t\t\t\t\tRecord: {record}\n' f'\t\t\t\t\t-----------------') return record[_col_dict[column]] except sqlite3.Error as _error: logging.error("Failed to read sqlite table", _error) @staticmethod async def check_exist_audio(ctx, guild: int, user: int, column: str, audio: str): _list_str = await DB.read_db(guild, user, column) print(type(_list_str)) if _list_str is not None: _list = _list_str.split(',') if audio in _list: await ctx.reply("File in list") logging.info(f'File {audio} in list') else: logging.info(f'File {audio} is not in list') else: _list = 'None' logging.info(f'check_exist_audio\n' f'\t\t\t\t\t{_list}' f'\t\t\t\t\t{audio}') class CogsPrepare: """ Loads, unloads Cogs files """ @staticmethod def cog_list(): cogs_list = [] for _filename in listdir('./cogs'): if _filename.endswith('.py'): cogs_list.append(_filename[:-3]) return cogs_list @staticmethod async def cogs_dict(): cog_dict = {} for _cog in CogsPrepare.cog_list(): cog_dict.update({f'{_cog}': f'{_cog}'}) return cog_dict @staticmethod async def work_with_cogs(what_do, bot): for _filename in CogsPrepare.cog_list(): if what_do == "load": bot.load_extension(f'cogs.{_filename}') elif what_do == 'unload': bot.unload_extension(f'cogs.{_filename}') elif what_do == 'reload': bot.reload_extension(f'cogs.{_filename}') logging.info(f'{what_do}ing cog {_filename}') class Commands: @staticmethod def check_json(): if not path.isfile('prefix.json'): with open('prefix.json', 'w+', encoding='utf-8') as f: f.write("") @staticmethod async def set_prefix(inter, prefix: str): await Commands.write_json(inter.guild.id, "prefix", prefix) @staticmethod async def list_files(fold: str): fl = [] for filenames in walk(fold): fl.extend(filenames) break files = {} for x in fl[2]: files[x] = x return files @staticmethod async def read_json(guild: int, param: str): with open('prefix.json', 'r', encoding='utf-8') as fp: # Open the JSON parameter = None try: _json = json.load(fp) # Load the custom prefixes except TypeError: _json = {} if guild: # If the guild exists try: guild_conf = _json[f"{guild}"] try: parameter = guild_conf[f"{param}"] except: pass except: pass return parameter @staticmethod async def write_json(guild: int, param_name: str, param: str or int): with open('prefix.json', 'r', encoding='utf-8') as f: try: _json = json.load(f) except json.decoder.JSONDecodeError: _json = {} try: _guild = _json[f'{guild}'] except KeyError: _json.update({f'{guild}': {}}) _guild = _json[f'{guild}'] _guild.update({f'{param_name}': f'{param}'}) with open('prefix.json', 'w', encoding='utf-8') as f: json.dump(_json, f, indent=4) @staticmethod async def determine_prefix(bot, msg): """ Determite perserver bot prefix :param bot: Disnake Bot object :param msg: Disnake msg object :return: prefix """ with open('prefix.json', 'r', encoding='utf-8') as fp: # Open the JSON parameter: str try: from json import load _json = load(fp) # Load the custom prefixes except: _json = {} try: parameter = _json[f"{msg.guild.id}"]["prefix"] # Read prefix from json if is setted up except: parameter = '$' return parameter