fixed start after move
This commit is contained in:
51
bot/integral_lib/CogsPrep.py
Normal file
51
bot/integral_lib/CogsPrep.py
Normal file
@@ -0,0 +1,51 @@
|
||||
"""
|
||||
integral_lib.CogsPrepare
|
||||
~~~~~~~~~~~~~
|
||||
Loads, unloads Cogs files
|
||||
cog_list: return list of cog filenames
|
||||
work_with_cogs: loads, reloads and unloads cogs files
|
||||
|
||||
"""
|
||||
from os import listdir, rename
|
||||
from typing import List
|
||||
|
||||
from disnake.ext.commands.interaction_bot_base import InteractionBotBase
|
||||
|
||||
from .Logger import logger
|
||||
|
||||
|
||||
def cog_list(fold: str = './cogs') -> List[str]:
|
||||
cogs_list = []
|
||||
for _filename in listdir(fold):
|
||||
if _filename.endswith('.py'):
|
||||
cogs_list.append(_filename[:-3])
|
||||
return cogs_list
|
||||
|
||||
|
||||
def work_with_cogs(what_do: str,
|
||||
bot: InteractionBotBase,
|
||||
cog: str | list):
|
||||
if isinstance(cog, str):
|
||||
cog = cog.split()
|
||||
for _filename in cog:
|
||||
if _filename.endswith('.py'):
|
||||
_filename = _filename.split('.')[0]
|
||||
if what_do == "load":
|
||||
bot.load_extension(f'cogs.{_filename}')
|
||||
logger.info(f'Cog {_filename} loaded')
|
||||
elif what_do == 'unload':
|
||||
bot.unload_extension(f'cogs.{_filename}')
|
||||
logger.info(f'Cog {_filename} unloaded')
|
||||
elif what_do == 'reload':
|
||||
bot.reload_extension(f'cogs.{_filename}')
|
||||
logger.info(f'Cog {_filename} reloaded')
|
||||
elif what_do == 'disable':
|
||||
bot.unload_extension(f'cogs.{_filename}')
|
||||
rename(f'cogs/{_filename}.py',
|
||||
f'cogs/disabled/{_filename}.py')
|
||||
logger.info(f'Cog {_filename} stopped and disabled')
|
||||
elif what_do == 'enable':
|
||||
rename(f'cogs/disabled/{_filename}.py',
|
||||
f'cogs/{_filename}.py')
|
||||
bot.load_extension(f'cogs.{_filename}')
|
||||
logger.info(f'Cog {_filename} started and enabled')
|
||||
73
bot/integral_lib/Comands.py
Normal file
73
bot/integral_lib/Comands.py
Normal file
@@ -0,0 +1,73 @@
|
||||
"""
|
||||
integral_lib.Commands
|
||||
~~~~~~~~~~~~~~
|
||||
Some prepare for commands
|
||||
|
||||
|
||||
"""
|
||||
from json import load, decoder, dump, JSONDecodeError
|
||||
from os import getenv
|
||||
|
||||
from disnake.ext import commands
|
||||
|
||||
|
||||
async def read_json(guild: int, _param: str):
|
||||
"""
|
||||
Reads Json file to determite config strings
|
||||
:param guild: ID of Guild
|
||||
:param _param: Parameter in json file
|
||||
:return: value of parameter.
|
||||
"""
|
||||
parameter = None
|
||||
with open(getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON
|
||||
try:
|
||||
_json = load(f) # Load the custom prefixes
|
||||
except decoder.JSONDecodeError:
|
||||
_json = {}
|
||||
if guild: # If the guild exists
|
||||
try:
|
||||
guild_conf = _json[f"{guild}"]
|
||||
try:
|
||||
parameter = guild_conf[f"{_param}"]
|
||||
except KeyError:
|
||||
pass
|
||||
except KeyError:
|
||||
pass
|
||||
return parameter
|
||||
|
||||
|
||||
async def write_json(guild: int, param_name: str, param: str or int):
|
||||
with open(getenv('CONF_FILE'), encoding='utf-8') as f:
|
||||
try:
|
||||
_json = load(f)
|
||||
except decoder.JSONDecodeError:
|
||||
_json = {}
|
||||
try:
|
||||
_guild = _json[f'{guild}']
|
||||
except KeyError:
|
||||
_json.update({f'{guild}': {}})
|
||||
_guild = _json[f'{guild}']
|
||||
_guild.update({f'{param_name}': f'{param}'})
|
||||
with open(getenv('CONF_FILE'), 'w', encoding='utf-8') as f:
|
||||
dump(_json, f, indent=4)
|
||||
|
||||
|
||||
def determine_prefix(bot: commands.Bot, msg):
|
||||
"""
|
||||
Determite per-server bot prefix
|
||||
:param bot: Disnake Bot object
|
||||
:param msg: Disnake msg object
|
||||
:return: prefix for server, default is $
|
||||
"""
|
||||
parameter = '$'
|
||||
with open(getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON
|
||||
try:
|
||||
_json = load(f) # Load the custom prefixes
|
||||
except JSONDecodeError:
|
||||
_json = {}
|
||||
try:
|
||||
parameter = _json[f"{msg.guild.id}"]["prefix"] # Read prefix from json if is setted up
|
||||
|
||||
except KeyError:
|
||||
pass
|
||||
return parameter
|
||||
135
bot/integral_lib/DB_Worker.py
Normal file
135
bot/integral_lib/DB_Worker.py
Normal file
@@ -0,0 +1,135 @@
|
||||
from sqlite3 import connect, Error
|
||||
|
||||
from .Logger import logger
|
||||
|
||||
|
||||
class DBReader:
|
||||
|
||||
def __init__(self, guildid: int = None):
|
||||
self._guildid = guildid
|
||||
self.list = self._read_db(self._guildid)
|
||||
self._current_index = 0
|
||||
|
||||
def __str__(self) -> str:
|
||||
return str(self._guildid)
|
||||
|
||||
@classmethod
|
||||
def _read_db(cls, guildid: int) -> list:
|
||||
try:
|
||||
sql_con = connect("user.db")
|
||||
cursor = sql_con.cursor()
|
||||
cursor.execute("""SELECT * FROM "(guildid)" """,
|
||||
{'guildid': guildid})
|
||||
record = cursor.fetchall()
|
||||
return record
|
||||
except Error as _e:
|
||||
logger.info(f'Error reading DB\n{_e}')
|
||||
|
||||
def __iter__(self):
|
||||
return _ListGenerationIter(self)
|
||||
|
||||
|
||||
class _DBAttrs:
|
||||
def __init__(self,
|
||||
userid: int,
|
||||
username: str,
|
||||
nick: str,
|
||||
isbot: bool,
|
||||
defaulttracks: None or list,
|
||||
usertracks: None or list):
|
||||
self.userid = userid
|
||||
self.username = username
|
||||
self.nick = nick
|
||||
self.isbot = isbot
|
||||
self.defaulttracks = defaulttracks
|
||||
self.usertracks = usertracks
|
||||
|
||||
def __str__(self):
|
||||
return self.username
|
||||
|
||||
def __repr__(self):
|
||||
return f'<File attrs userid={self.userid} username={self.username} nick={self.nick} ' \
|
||||
f'isbot={self.isbot} defaulttracks={self.defaulttracks} usertracks={self.usertracks}>'
|
||||
|
||||
|
||||
class _ListGenerationIter:
|
||||
def __init__(self, user_class):
|
||||
self._current_index = 0
|
||||
self._list = user_class.list
|
||||
|
||||
self._size = len(self._list)
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
if self._current_index < self._size:
|
||||
_userid = self._list[self._current_index][0]
|
||||
_username = self._list[self._current_index][1]
|
||||
_nick = self._list[self._current_index][2]
|
||||
_isbot = bool(self._list[self._current_index][3])
|
||||
_defaulttracks = self._list[self._current_index][4]
|
||||
_usertracks = self._list[self._current_index][5]
|
||||
|
||||
self._current_index += 1
|
||||
memb = _DBAttrs(_userid,
|
||||
_username,
|
||||
_nick,
|
||||
_isbot,
|
||||
_defaulttracks,
|
||||
_usertracks)
|
||||
return memb
|
||||
raise StopIteration
|
||||
|
||||
|
||||
async def prepare_db(guild: int):
|
||||
try:
|
||||
_connect = connect('user.db')
|
||||
cursor = _connect.cursor()
|
||||
cursor.execute(f'''CREATE TABLE IF NOT EXISTS "{guild}"
|
||||
([userid] INTEGER PRIMARY KEY, [username] TEXT,
|
||||
[nick] TEXT, [isbot] BOOL, [defaulttracks] TEXT, [usertracks] TEXT)''')
|
||||
cursor.close()
|
||||
except Error as _error:
|
||||
logger.info(_error)
|
||||
|
||||
|
||||
async def work_with_db(db_func: str, data_turple: tuple):
|
||||
"""
|
||||
Writing to db per server userinfo
|
||||
:param db_func:
|
||||
:param data_turple:
|
||||
"""
|
||||
try:
|
||||
_connect = connect('user.db')
|
||||
cursor = _connect.cursor()
|
||||
cursor.execute(db_func, data_turple)
|
||||
_connect.commit()
|
||||
cursor.close()
|
||||
except Error as _error:
|
||||
logger.critical(_error)
|
||||
|
||||
|
||||
async def fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int):
|
||||
sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}"
|
||||
(username, userid, nick, isbot)
|
||||
VALUES (?, ?, ?, ?)""")
|
||||
data_tuple: tuple[str, int, str, bool] = (name, userid, nick, isbot)
|
||||
await work_with_db(sqlite_insert_with_param, data_tuple)
|
||||
|
||||
|
||||
async def read_db(guild: int, user: int, column: str):
|
||||
_col_dict = {'userid': 0,
|
||||
'username': 1,
|
||||
'nick': 2,
|
||||
'isbot': 3,
|
||||
'defaulttracks': 4,
|
||||
'usertracks': 5}
|
||||
try:
|
||||
sql_con = connect("user.db")
|
||||
cursor = sql_con.cursor()
|
||||
cursor.execute(f"""SELECT * FROM "{guild}" where userid = {user}""")
|
||||
record = cursor.fetchone()
|
||||
return record[_col_dict[column]]
|
||||
except Error as _error:
|
||||
logger.critical(_error)
|
||||
86
bot/integral_lib/ListGenerator.py
Normal file
86
bot/integral_lib/ListGenerator.py
Normal file
@@ -0,0 +1,86 @@
|
||||
from mimetypes import guess_type
|
||||
from os import walk
|
||||
|
||||
|
||||
class ListGenerator:
|
||||
def __init__(self, path: str = None):
|
||||
self.path = path
|
||||
self.list: list = self._lister(self.path)
|
||||
try:
|
||||
self._size = len(self.list)
|
||||
except TypeError:
|
||||
pass
|
||||
self._current_index = 0
|
||||
|
||||
def __str__(self) -> str:
|
||||
return 'Audio iter generator'
|
||||
|
||||
@classmethod
|
||||
def _lister(cls, path) -> list:
|
||||
_list: list = []
|
||||
try:
|
||||
for f in walk(path):
|
||||
_list.extend(f)
|
||||
break
|
||||
return sorted(_list[2])
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
def __iter__(self):
|
||||
return _ListGenerationIter(self)
|
||||
|
||||
def __next__(self):
|
||||
if self._current_index < self._size:
|
||||
self._current_index += 1
|
||||
|
||||
|
||||
class _FileAttrs:
|
||||
def __init__(self, name, path, mimetype, exc):
|
||||
self.name = name
|
||||
self.path = path
|
||||
self.mimetype = mimetype
|
||||
self.exc = exc
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
def __repr__(self):
|
||||
return f'<File attrs name={self.name} path={self.path} exc={self.exc} type=<{self.mimetype}> >'
|
||||
|
||||
|
||||
class _ListGenerationIter:
|
||||
|
||||
def __init__(self, list_class):
|
||||
self._current_index = 0
|
||||
self._list = list_class.list
|
||||
self._name = self._list[self._current_index]
|
||||
self._path = list_class.path
|
||||
|
||||
self._size = len(self._list)
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
@property
|
||||
def type(self) -> str:
|
||||
guess = guess_type(f'{self._path}/{self._list[self._current_index]}')[0]
|
||||
return guess
|
||||
|
||||
@property
|
||||
def _exc(self) -> str:
|
||||
try:
|
||||
_ret = self._list[self._current_index].split('.')[-1]
|
||||
except AttributeError:
|
||||
_ret = None
|
||||
return _ret
|
||||
|
||||
def __next__(self):
|
||||
if self._current_index < self._size:
|
||||
_name = self._list[self._current_index]
|
||||
_path = self._path
|
||||
_type = self.type
|
||||
_exc = self._exc
|
||||
self._current_index += 1
|
||||
_memb = _FileAttrs(_name, _path, _type, _exc)
|
||||
return _memb
|
||||
raise StopIteration
|
||||
52
bot/integral_lib/Logger.py
Normal file
52
bot/integral_lib/Logger.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import logging
|
||||
|
||||
|
||||
class _CustomFormatter(logging.Formatter):
|
||||
grey = "\x1b[38;20m"
|
||||
yellow = "\x1b[33;20m"
|
||||
red = "\x1b[31;20m"
|
||||
bold_red = "\x1b[31;1m"
|
||||
reset = "\x1b[0m"
|
||||
fmt = "%(asctime)s - [%(levelname)s] -%(module)s- %(message)s"
|
||||
|
||||
FORMATS = {
|
||||
logging.DEBUG: grey + fmt + reset,
|
||||
logging.INFO: grey + fmt + reset,
|
||||
logging.WARNING: yellow + fmt + reset,
|
||||
logging.ERROR: red + fmt + reset,
|
||||
logging.CRITICAL: bold_red + fmt + reset
|
||||
}
|
||||
|
||||
old_factory = logging.getLogRecordFactory()
|
||||
|
||||
def _record_factory(*args, **kwargs):
|
||||
record = _CustomFormatter.old_factory(*args, **kwargs)
|
||||
_module = record.module
|
||||
_levelname = record.levelname
|
||||
if len(_module) % 2 == 0 and len(_module) < 12:
|
||||
_module = ' ' * ((12 - len(_module)) // 2) + _module + ' ' * ((12 - len(_module)) // 2)
|
||||
elif len(_module) % 2 == 1 and len(_module) < 12:
|
||||
_module = ' ' * ((12 - len(_module)) // 2) + record.module + ' ' * ((12 - len(_module)) // 2 + 1)
|
||||
if len(_levelname) % 2 == 0 and len(_levelname) < 8:
|
||||
_levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2)
|
||||
elif len(record.levelname) % 2 == 1 and len(record.module) < 8:
|
||||
_levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2 + 1)
|
||||
record.module = _module
|
||||
record.levelname = _levelname
|
||||
return record
|
||||
|
||||
logging.setLogRecordFactory(_record_factory)
|
||||
|
||||
def format(self, record):
|
||||
log_fmt = self.FORMATS.get(record.levelno)
|
||||
formatter = logging.Formatter(log_fmt, "%d-%m-%Y %H:%M:%S")
|
||||
return formatter.format(record)
|
||||
|
||||
|
||||
logger = logging.getLogger('Pisya_Bot')
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
ch = logging.StreamHandler()
|
||||
ch.setLevel(logging.DEBUG)
|
||||
ch.setFormatter(_CustomFormatter())
|
||||
logger.addHandler(ch)
|
||||
17
bot/integral_lib/Player.py
Normal file
17
bot/integral_lib/Player.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from asyncio import sleep
|
||||
|
||||
from disnake import FFmpegOpusAudio
|
||||
|
||||
from .Logger import logger
|
||||
|
||||
|
||||
async def play_audio(audio, bot, vc):
|
||||
if not bot.voice_clients:
|
||||
logger.error(f'Playing: {audio}')
|
||||
vp = await vc.connect()
|
||||
if not vp.is_playing():
|
||||
vp.play(FFmpegOpusAudio(f'{audio}', ))
|
||||
while vp.is_playing():
|
||||
await sleep(0.5)
|
||||
await sleep(1)
|
||||
await vp.disconnect()
|
||||
83
bot/integral_lib/YandexPlayer.py
Normal file
83
bot/integral_lib/YandexPlayer.py
Normal file
@@ -0,0 +1,83 @@
|
||||
from os import getenv
|
||||
|
||||
from yandex_music import Client
|
||||
|
||||
client = Client(getenv('YANDEX_TOKEN')).init()
|
||||
|
||||
|
||||
def search(_str: str, _type: str = 'all') -> dict:
|
||||
album_id = None
|
||||
album_title = None
|
||||
artist_id = None
|
||||
artist_name = None
|
||||
result_json = {}
|
||||
|
||||
search_result = client.search(_str, type_=_type)
|
||||
type_ = search_result.best.type
|
||||
best = search_result.best.result
|
||||
|
||||
# print(search_result)
|
||||
if type_ == 'track':
|
||||
artists = ''
|
||||
if best.artists:
|
||||
artists = ', '.join(artist.name for artist in best.artists)
|
||||
if best.albums:
|
||||
for i in best.albums:
|
||||
album_id = i.id
|
||||
album_title = i.title
|
||||
|
||||
# Generate json for track
|
||||
result_json = {
|
||||
"artist": {
|
||||
"id": best.id,
|
||||
"name": artists
|
||||
},
|
||||
"track": {
|
||||
"id": best.trackId,
|
||||
"title": best.title
|
||||
},
|
||||
"album": {
|
||||
"id": album_id,
|
||||
"title": album_title
|
||||
}
|
||||
}
|
||||
elif type_ == 'artist':
|
||||
result_json = {
|
||||
"artist": {
|
||||
"id": best.id,
|
||||
"name": best.name
|
||||
}
|
||||
}
|
||||
elif type_ == 'album':
|
||||
if best.artists:
|
||||
for i in best.artists:
|
||||
artist_id = i.id
|
||||
artist_name = i.name
|
||||
_tracks = []
|
||||
tracks = {}
|
||||
album = client.albums_with_tracks(best.id)
|
||||
for i, volume in enumerate(album.volumes):
|
||||
_tracks += volume
|
||||
_i = 1
|
||||
for track in _tracks:
|
||||
print(track)
|
||||
tracks[f"track_{_i}"] = {
|
||||
"id": track.id,
|
||||
"tittle": track.title,
|
||||
|
||||
}
|
||||
_i += 1
|
||||
|
||||
result_json = {
|
||||
"artist": {
|
||||
"id": artist_id,
|
||||
"name": artist_name
|
||||
},
|
||||
"album": {
|
||||
"id": best.id,
|
||||
"title": best.title
|
||||
},
|
||||
"tracks": tracks
|
||||
}
|
||||
print(_type)
|
||||
return result_json
|
||||
5
bot/integral_lib/__init__.py
Normal file
5
bot/integral_lib/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""
|
||||
integral_lib
|
||||
~~~~~~~~~~~~~
|
||||
Some libs for the bot which help him
|
||||
"""
|
||||
Reference in New Issue
Block a user