import sqlite3 from typing import List from lib import logger class DB_Reader: def __init__(self, guildid: int = None): self._guildid = guildid self.list = self._read_db(self._guildid) self._current_index = 0 def __str__(self) -> str: return str(self._guildid) @classmethod def _read_db(cls, guildid: int) -> list: try: sql_con = sqlite3.connect("user.db") cursor = sql_con.cursor() cursor.execute(f"""SELECT * FROM "{guildid}" """) record = cursor.fetchall() return record except sqlite3.Error as _e: logger.info(f'Error reading DB\n{_e}') def __iter__(self): return _ListGenerationIter(self) class _DBAttrs: def __init__(self, userid: int, username: str, nick: str, isbot: bool, defaulttracks: None or list, usertracks: None or list): self.id = userid self.username = username self.nick = nick self.isbot = isbot if defaulttracks is not None: self._def_list = defaulttracks self.defaulttracks = self._defaulttracks else: self.defaulttracks = None if usertracks is not None: self._user_list = usertracks self.usertracks = self._usertracks else: self.usertracks = None def __str__(self): return self.username def __repr__(self): return f'' @property def _defaulttracks(self) -> List[str]: return self._def_list.split(', ') @property def _usertracks(self) -> List[str]: return self._user_list.split(', ') class _ListGenerationIter: def __init__(self, user_class): self._current_index = 0 self._list = user_class.list self._size = len(self._list) def __iter__(self): return self def __next__(self): if self._current_index < self._size: _userid = self._list[self._current_index][0] _username = self._list[self._current_index][1] _nick = self._list[self._current_index][2] _isbot = bool(self._list[self._current_index][3]) _defaulttracks = self._list[self._current_index][4] _usertracks = self._list[self._current_index][5] self._current_index += 1 memb = _DBAttrs(_userid, _username, _nick, _isbot, _defaulttracks, _usertracks) return memb raise StopIteration async def prepare_db(guild: int): try: connect = sqlite3.connect('user.db') cursor = connect.cursor() create_table = (f'''CREATE TABLE IF NOT EXISTS "{guild}" ([userid] INTEGER PRIMARY KEY, [username] TEXT, [nick] TEXT, [isbot] BOOL, [defaulttracks] TEXT, [usertracks] TEXT) ''') cursor.execute(create_table) cursor.close() except sqlite3.Error as _error: pass async def work_with_db(db_func: str, data_turple: tuple): """ Writing to db per server userinfo :param db_func: :param data_turple: """ try: connect = sqlite3.connect('user.db') cursor = connect.cursor() cursor.execute(db_func, data_turple) connect.commit() cursor.close() except sqlite3.Error as _error: pass async def fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int): sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}" (username, userid, nick, isbot) VALUES (?, ?, ?, ?)""") data_tuple: tuple[str, int, str, bool] = (name, userid, nick, isbot) await work_with_db(sqlite_insert_with_param, data_tuple) async def add_audio(guild: int, user: int, audio: str, track: str = 'usertracks'): """ Adding audio into а folder and DB :param guild: Guild id :param user: :param audio: :param track: usertracks or defaulttracks. """ # audio = f'{DB.read_db(guild, user, track)}, {audio}' sql_update_query = f"""UPDATE "{guild}" set {track} = ? where userid = ?""" data_tuple = (audio, user) await work_with_db(sql_update_query, data_tuple) async def read_db(guild: int, user: int, column: str): _col_dict = {'userid': 0, 'username': 1, 'nick': 2, 'isbot': 3, 'defaulttracks': 4, 'usertracks': 5} try: sql_con = sqlite3.connect("user.db") cursor = sql_con.cursor() sql_read = f"""SELECT * FROM "{guild}" where userid = {user}""" cursor.execute(sql_read) record = cursor.fetchone() return record[_col_dict[column]] except sqlite3.Error as _error: pass async def check_exist_audio(guild: int, user: int, audio: str): _users_db = DB_Reader(guild) for _user in _users_db: if not _user.isbot: if _user.id == user and _user.usertracks is not None and audio in _user.usertracks: return True else: return False