157 lines
4.3 KiB
Python
157 lines
4.3 KiB
Python
import sqlite3
|
||
|
||
|
||
class User:
|
||
def __init__(self, userid: str = None):
|
||
self.userid = userid
|
||
self.list: list = self._lister(self.userid)
|
||
self._current_index = 0
|
||
|
||
def __str__(self) -> str:
|
||
return self.name
|
||
|
||
def __repr__(self) -> str:
|
||
return (
|
||
f'<Audio file name={self.name} path={self.userid} type={self.type}>'
|
||
)
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return self.list[self._current_index]
|
||
|
||
@property
|
||
def type(self) -> str:
|
||
guess = mimetypes.guess_type(f'{self.path}/{self.name}')[0].split('/')[0]
|
||
return guess
|
||
|
||
@classmethod
|
||
def _lister(cls, path) -> list:
|
||
_dict: list = []
|
||
for _files in os.walk(path):
|
||
_dict.extend(_files)
|
||
break
|
||
return _dict[2]
|
||
|
||
def __iter__(self):
|
||
return _ListGenerationIter(self)
|
||
|
||
|
||
class _FileAttrs:
|
||
def __init__(self, name, path, type):
|
||
self.name = name
|
||
self.path = path
|
||
self.type = type
|
||
|
||
def __str__(self):
|
||
return 'Audio Dict Generator'
|
||
|
||
def __repr__(self):
|
||
return f'<File attrs name={self.name} path={self.path} type={self.path}>'
|
||
|
||
|
||
class _ListGenerationIter:
|
||
def __init__(self, list_class):
|
||
self._list = list_class.list
|
||
self._name = list_class.name
|
||
self._type = list_class.type
|
||
self._path = list_class.path
|
||
|
||
self._size = len(self._list)
|
||
self._current_index = 0
|
||
|
||
def __iter__(self):
|
||
return self
|
||
|
||
def __next__(self):
|
||
if self._current_index < self._size:
|
||
_name = self._list[self._current_index]
|
||
_path = self._path
|
||
_type = self._type
|
||
self._current_index += 1
|
||
memb = _FileAttrs(_name, _path, _type)
|
||
return memb
|
||
raise StopIteration
|
||
|
||
|
||
async def prepare_db(guild: int):
|
||
try:
|
||
connect = sqlite3.connect('user.db')
|
||
cursor = connect.cursor()
|
||
create_table = (f'''CREATE TABLE IF NOT EXISTS "{guild}" ([userid] INTEGER PRIMARY KEY, [username] TEXT,
|
||
[nick] TEXT, [isbot] BOOL, [defaulttracks] TEXT, [usertracks] TEXT) ''')
|
||
cursor.execute(create_table)
|
||
cursor.close()
|
||
except sqlite3.Error as _error:
|
||
pass
|
||
|
||
|
||
async def work_with_db(db_func: str, data_turple: tuple):
|
||
"""
|
||
Writing to db per server userinfo
|
||
:param db_func:
|
||
:param data_turple:
|
||
"""
|
||
try:
|
||
connect = sqlite3.connect('user.db')
|
||
cursor = connect.cursor()
|
||
cursor.execute(db_func, data_turple)
|
||
connect.commit()
|
||
cursor.close()
|
||
except sqlite3.Error as _error:
|
||
pass
|
||
|
||
|
||
async def fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int):
|
||
sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}"
|
||
(username, userid, nick, isbot)
|
||
VALUES (?, ?, ?, ?)""")
|
||
data_tuple: tuple[str, int, str, bool] = (name, userid, nick, isbot)
|
||
await work_with_db(sqlite_insert_with_param, data_tuple)
|
||
|
||
|
||
async def add_audio(guild: int, user: int, audio: str, track: str = 'usertracks'):
|
||
"""
|
||
Adding audio into а folder and DB
|
||
:param guild: Guild id
|
||
:param user:
|
||
:param audio:
|
||
:param track: usertracks or defaulttracks.
|
||
"""
|
||
# audio = f'{DB.read_db(guild, user, track)}, {audio}'
|
||
sql_update_query = f"""UPDATE "{guild}" set {track} = ? where userid = ?"""
|
||
data_tuple = (audio, user)
|
||
await work_with_db(sql_update_query, data_tuple)
|
||
|
||
|
||
async def read_db(guild: int, user: int, column: str):
|
||
_col_dict = {'userid': 0,
|
||
'username': 1,
|
||
'nick': 2,
|
||
'isbot': 3,
|
||
'defaulttracks': 4,
|
||
'usertracks': 5}
|
||
try:
|
||
sql_con = sqlite3.connect("user.db")
|
||
cursor = sql_con.cursor()
|
||
sql_read = f"""SELECT * FROM "{guild}" where userid = {user}"""
|
||
cursor.execute(sql_read)
|
||
record = cursor.fetchone()
|
||
return record[_col_dict[column]]
|
||
except sqlite3.Error as _error:
|
||
pass
|
||
|
||
|
||
async def check_exist_audio(ctx, guild: int, user: int, column: str, audio: str):
|
||
_list_str = await read_db(guild, user, column)
|
||
print(type(_list_str))
|
||
if _list_str is not None:
|
||
_list = _list_str.split(',')
|
||
if audio in _list:
|
||
await ctx.reply("File in list")
|
||
|
||
else:
|
||
pass
|
||
|
||
else:
|
||
_list = 'None'
|