updated structure

This commit is contained in:
bacon
2024-03-17 22:03:27 +03:00
parent 4e535f6580
commit d22c3ca6b6
44 changed files with 955 additions and 948 deletions

16
bot/__init__.py Normal file
View File

@@ -0,0 +1,16 @@
__version__ = '0.0.7'
__title__ = "Pisya_bot"
__author__ = "baconborn"
from typing import NamedTuple, Literal
class VersionInfo(NamedTuple):
major: int
minor: int
micro: int
releaselevel: Literal["alpha", "beta", "candidate", "final"]
serial: int
version_info: VersionInfo = VersionInfo(major=0, minor=0, micro=7, releaselevel="alpha", serial=0)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

159
bot/bot.py Normal file
View File

@@ -0,0 +1,159 @@
#!/usr/bin/env python3
import asyncio
from os import getenv
from os.path import isfile
from disnake import OptionType, Option, Localized, ApplicationCommandInteraction, Intents, __version__
from disnake.ext.commands import Bot, is_owner
from dotenv import load_dotenv
from lib.CogsPrep import work_with_cogs, cog_list
from lib.Comands import determine_prefix
from lib.Logger import logger
load_dotenv()
if not isfile('.env') or not getenv('CONF_FILE'):
with open('.env', 'a', encoding='utf-8') as f:
f.write("CONF_FILE='config.json'\n")
load_dotenv()
if not isfile(getenv('CONF_FILE')):
with open(getenv('CONF_FILE'), 'a', encoding='utf-8') as f:
f.write("")
intents = Intents(messages=True,
guilds=True,
message_content=True,
voice_states=True,
members=True,
presences=True
)
bot = Bot(command_prefix=determine_prefix,
intents=intents,
reload=True,
test_guilds=[648126669122568215]
)
bot.i18n.load("locale/")
asyncio.run(work_with_cogs('load', bot, cog_list()))
@bot.event
async def on_ready():
logger.info('Bot started')
logger.info(f'Disnake version {__version__}')
logger.info(f'We have logged in as {bot.user}')
@bot.slash_command(
name='cog',
options=[
Option(
name=Localized('cog', key="COG".lower()),
description=Localized("cog file", key="COG_FILE"),
type=OptionType.string
)
]
)
@is_owner()
async def slash_cogs(inter: ApplicationCommandInteraction):
"""
Working with cogs (modules) {{SLASH_COG}}
Parameters
----------
:param inter:
"""
pass
@slash_cogs.sub_command(description=Localized("Enables Cog", key="ENABLE_COG"))
async def enable(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('enable', bot, cog)
await inter.response.send_message(f'Cog {cog} is enabled', ephemeral=True)
@slash_cogs.sub_command(description=Localized("Disables Cog", key="DISABLE_COG"))
async def disable(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('disable', bot, cog)
await inter.response.send_message(f'Cog {cog} is disabled', ephemeral=True)
@slash_cogs.sub_command(description=Localized("Loads Cog", key="LOAD_COG"))
async def load(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('load', bot, cog)
await inter.response.send_message(f'Cog {cog} is loaded', ephemeral=True)
@slash_cogs.sub_command(description=Localized("Unload Cog", key="UNLOAD_COG"))
async def unload(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('unload', bot, cog)
await inter.response.send_message(f'Cog {cog} is unload', ephemeral=True)
@slash_cogs.sub_command(description=Localized("Reloads Cog", key="RELOAD_COG"))
async def reload(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('reload', bot, cog)
await inter.response.send_message(f'Cog {cog} is reloaded', ephemeral=True)
@disable.autocomplete('cog')
@unload.autocomplete('cog')
@load.autocomplete('cog')
@reload.autocomplete('cog')
async def _cog_opt(inter: ApplicationCommandInteraction, current: str):
current = current.lower()
_list = cog_list(fold='./cogs/')
return [choice for choice in _list if current in choice.lower()]
@enable.autocomplete('cog')
async def _cog_opt(inter: ApplicationCommandInteraction, current: str):
current = current.lower()
_list = cog_list(fold='./cogs/disabled/')
return [choice for choice in _list if current in choice.lower()]
@slash_cogs.error
async def cogs_error(inter: ApplicationCommandInteraction):
await inter.response.send_message(Localized("Error", key="EROR"), ephemeral=True)
logger.error(f'User {inter.author} tries to use cogs func')
bot.run(getenv('TOKEN'))

148
bot/cogs/admin.py Normal file
View File

@@ -0,0 +1,148 @@
from asyncio import sleep
import disnake
from disnake import Option, OptionType, Localized
from disnake.ext import commands, tasks
from bot.lib.Comands import read_json, write_json
from bot.lib.DB_Worker import fill_bd, prepare_db, work_with_db
from bot.lib.Logger import logger
class Admin(commands.Cog, name='Admin'):
def __init__(self, bot: commands.Bot):
self.bot = bot # a defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
for g in self.bot.get_all_members():
await prepare_db(g.guild.id)
for g in self.bot.get_all_members():
await fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id)
self.activity.start()
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@tasks.loop(seconds=20)
async def activity(self):
await self.bot.change_presence(
activity=disnake.Activity(
name=f'at users: {str(len(self.bot.users))}',
type=3
)
)
await sleep(10)
await self.bot.change_presence(
activity=disnake.Activity(
name=f'at servers: {str(len(self.bot.guilds))}',
type=3
)
)
@commands.Cog.listener()
async def on_member_update(self, before: disnake.Member, after: disnake.Member):
sql_update_query = f"""UPDATE "{after.guild.id}" set nick = ? where userid = ?"""
data_tuple = (after.nick, before.id)
await work_with_db(sql_update_query, data_tuple)
@commands.Cog.listener()
async def on_guild_join(self, guild):
for g in guild.members:
await fill_bd(g.name, g.id, g.bot, g.nick, guild.id)
@commands.Cog.listener()
async def on_member_join(self, member):
await fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id)
bot_role = read_json(member.guild.id, 'bot_role') # Get bot role
guest_role = read_json(member.guild.id, 'guest_role') # Get guest role
if bot_role or guest_role:
if member.bot == 0:
role = disnake.utils.get(member.guild.roles, id=guest_role)
else:
role = disnake.utils.get(member.guild.roles, id=bot_role)
logger.info(f"Adding to {member} role {role}")
await member.add_roles(role)
@commands.slash_command(
name="set_default_role",
description=Localized("Set Default bot role", key="DEF_ROLE"),
options=[
Option("role",
"Specify role",
OptionType.role,
required=True),
]
)
@commands.has_permissions(administrator=True)
async def set_guest_role(self, inter: disnake.ApplicationCommandInteraction, role):
await write_json(inter.guild.id, "guest_role", role.id)
await inter.response.send_message(f"Set up bot role to: `{role.name}`", ephemeral=True)
@commands.command(name="set_prefix")
@commands.has_permissions(administrator=True)
async def command_set_prefix(self, ctx, prefix: str):
await write_json(ctx.guild.id, "prefix", prefix)
await ctx.reply(f"Prefix set to: `{prefix}`")
@commands.guild_only()
@commands.slash_command(
name="set_prefix",
description="Setting up bot prefix",
options=[
Option("prefix",
"Specify prefix",
OptionType.string,
required=True),
]
)
@commands.has_permissions(administrator=True)
async def slash_set_prefix(self, inter: disnake.ApplicationCommandInteraction, prefix: str):
await write_json(inter.guild.id, "prefix", prefix)
await inter.response.send_message(f"Prefix set to: `{prefix}`", ephemeral=True)
@commands.guild_only()
@commands.has_permissions(administrator=True)
@commands.slash_command(
name="set_trigger_role",
description=Localized("Setting up role to trigger bot", key="KEY_ROLE"),
options=[
Option("role",
Localized("Specify role", key="SETUP_ROLE"),
OptionType.role,
required=True),
]
)
async def set_trigger_role(self, inter: disnake.ApplicationCommandInteraction, role):
await write_json(inter.guild.id, "tigger_role", role.id)
await inter.response.send_message(f"Role set to: `{role.name}`", ephemeral=True)
@commands.slash_command(
name="set_bot_role",
description=Localized("Set bot role", key="BOT_ROLE"),
options=[
Option("role",
Localized("Specify role", key="SETUP_ROLE"),
OptionType.role,
required=True)
]
)
@commands.guild_only()
@commands.has_permissions(administrator=True)
async def set_bot_role(self, ctx, role):
await write_json(ctx.guild.id,
"bot_role",
role.id)
await ctx.send(f"Set up bot role to: `{role.name}`", ephemeral=True)
@set_bot_role.error
@set_trigger_role.error
@slash_set_prefix.error
async def set_prefix_error(self, inter: disnake.ApplicationCommandInteraction, prefix):
await inter.response.send_message("You don`t have permissions", ephemeral=True)
logger.error(prefix)
def setup(bot): # an extension must have a setup function
bot.add_cog(Admin(bot)) # adding a cog

74
bot/cogs/audio.py Normal file
View File

@@ -0,0 +1,74 @@
import random
import disnake
from disnake import OptionChoice, Option, OptionType, Member, VoiceState
from disnake.ext import commands
from bot.lib.ListGenerator import ListGenerator
from bot.lib.Logger import logger
from bot.lib.Player import play_audio
class Audio(commands.Cog, name='Audio'):
def __init__(self, bot: commands.Bot):
self.bot = bot
@commands.Cog.listener()
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.Cog.listener()
async def on_voice_state_update(self, member: Member,
before: VoiceState,
after: VoiceState):
if before.channel is None and not member.bot:
logger.info(f'Coneccted {member.name} to {after.channel.name}')
if any('Escape from Tarkov' in str(user.activity) for user in after.channel.members):
logger.info('Skip playing by Game')
else:
# Prepare list of audio
from bot.lib.Comands import read_json
_role = await read_json(member.guild.id, 'tigger_role')
audio: list = []
for _a in ListGenerator('audio'):
audio.append(_a.name)
if len(member.roles) == 1 or _role is None:
logger.info('Skip playing by role')
elif any(str(role.id) in _role for role in member.roles):
logger.info('Play audio from list by role')
await play_audio(f'audio/{random.choice(audio)}', self.bot, after.channel)
else:
logger.info('Skip playing by any else')
@commands.slash_command(name="play_audio",
description="Make possible playing audio by command",
options=[
Option(name="audio",
type=OptionType.string,
required=True
)
])
async def playaudio(self, inter: disnake.ApplicationCommandInteraction,
audio: str
):
if inter.author.voice is not None:
await inter.response.send_message(f'Played {audio}', ephemeral=True)
await play_audio(audio, self.bot, inter.author.voice.channel)
else:
await inter.response.send_message('You`re not in voice', ephemeral=True)
@playaudio.autocomplete('audio')
async def list_to_play(self, inter: disnake.ApplicationCommandInteraction, current: str):
current = current.lower()
_dict: dict = {}
for f in ListGenerator('audio'):
_dict[f.name] = f'{f.path}/{f.name}'
return [
OptionChoice(name=choice, value=f'{_dict[choice]}')
for choice in _dict if current in choice.lower()
]
def setup(bot): # an extension must have a setup function
bot.add_cog(Audio(bot)) # adding a cog

View File

@@ -0,0 +1,43 @@
from typing import List
import disnake
from disnake import OptionChoice
from disnake.ext import commands
from bot.lib import logger
from bot.lib import write_json
class Fun(commands.Cog, name='Fun'):
def __init__(self, bot: commands.Bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.has_permissions(administrator=True)
@commands.slash_command(
name="set_bot_channel",
description="Set channel which iterate with bot",
)
async def set_bot_channel(self, inter: disnake.ApplicationCommandInteraction, channel: str):
await write_json(inter.guild.id,
"channel",
disnake.utils.find(lambda d: d.name == channel, inter.guild.channels).id)
await inter.response.send_message(f"Channel set up to {channel}", ephemeral=True)
@set_bot_channel.autocomplete('channel')
async def _list_text_channels(self,
inter: disnake.ApplicationCommandInteraction,
current: str) -> List[OptionChoice]:
_list = [r.name for r in inter.guild.text_channels]
return [
OptionChoice(name=choice, value=choice)
for choice in _list if current in choice
]
def setup(bot): # an extension must have a setup function
bot.add_cog(Fun(bot)) # adding a cog

29
bot/cogs/disabled/test.py Normal file
View File

@@ -0,0 +1,29 @@
import disnake
from disnake import Option
from disnake.ext import commands
from bot.lib import YandexPlayer
from bot.lib import logger
class Testing(commands.Cog, name='Testing'):
def __init__(self, bot: commands.Bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(name='play', description='play audio test from yandex.music', options=[
Option(name='search',
description='seach track/artist',
required=True),
])
async def play(self, inter: disnake.ApplicationCommandInteraction, search: str):
# TODO add yandex_music player with queue, playlists
result = YandexPlayer.search(search)
await inter.response.send_message(result, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(Testing(bot)) # adding a cog

59
bot/cogs/general.py Normal file
View File

@@ -0,0 +1,59 @@
import disnake
from disnake import Option, OptionType, Colour
from disnake.ext import commands
from bot.lib.Logger import logger
class General(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(
name="info",
options=[
Option("user",
"Specify any user",
OptionType.user),
]
)
async def info(self, inter: disnake.ApplicationCommandInteraction, user=None):
"""
Shows user info {{SLASH_INFO}}
Parameters
----------
:param inter:
:param user:
:return:
"""
user = user or inter.author
rolelist = [r.mention for r in user.roles if r != inter.guild.default_role]
if rolelist:
roles = "\n".join(rolelist)
else:
roles = "Not added any role"
emb = disnake.Embed(
title="General information",
description=f"General information on server about {user}",
color=Colour.random()
)
emb.set_thumbnail(url=user.display_avatar)
emb.add_field(name="General info",
value=f"Username: {user}\n"
f"Nickname: {user.nick}\n"
f"Joined at: {user.joined_at.strftime('%A, %B %d %Y @ %H:%M:%S')}", inline=False)
emb.add_field(name="Roles list", value=f"{roles}")
emb.set_footer(text=f"Information requested by: {inter.author.display_name}")
await inter.response.send_message(embed=emb, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(General(bot)) # adding a cog

44
bot/cogs/info.py Normal file
View File

@@ -0,0 +1,44 @@
import os
import disnake
import psutil
from disnake import ApplicationCommandInteraction
from disnake.ext import commands
from bot.__init__ import version_info as ver
from bot.lib.Comands import determine_prefix
from bot.lib.Logger import logger
class BotInfo(commands.Cog, name='Bot Info'):
def __init__(self, bot: commands.Bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(name="info_bot",
description='Shows general info about bot') # this is for making a command
async def info_bot(self, inter: ApplicationCommandInteraction):
_pid = os.getpid()
_process = psutil.Process(_pid)
emb = disnake.Embed(
title="General information",
description="General information on about bot",
)
emb.set_thumbnail(self.bot.user.avatar.url)
emb.add_field(name="System info:", value=f"Memory Usage: {round(_process.memory_info().rss / 2 ** 20, 2)} Mb\n"
f"CPU Usage: {_process.cpu_percent()}%\n"
f'Bot ping: {round(self.bot.latency * 1000)}\n'
f'Prefix: `{determine_prefix(self.bot, inter)}\n`'
)
emb.add_field(name="Bot info:", value="Bot owner: <@386629192743256065>\n"
f"Bot version: {ver.major}.{ver.minor}.{ver.micro}-{ver.releaselevel}")
emb.set_footer(text=f"Information requested by: {inter.author.display_name}")
await inter.response.send_message(embed=emb, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(BotInfo(bot)) # adding a cog

50
bot/lib/CogsPrep.py Normal file
View File

@@ -0,0 +1,50 @@
"""
lib.CogsPrepare
~~~~~~~~~~~~~
Loads, unloads Cogs files
cog_list: return list of cog filenames
work_with_cogs: loads, reloads and unloads cogs files
"""
from os import listdir, rename
from disnake.ext.commands.interaction_bot_base import InteractionBotBase
from .Logger import logger
def cog_list(fold: str = './cogs') -> list:
cogs_list = []
for _filename in listdir(fold):
if _filename.endswith('.py'):
cogs_list.append(_filename[:-3])
return cogs_list
async def work_with_cogs(what_do: str,
bot: InteractionBotBase,
cog: str | list):
if isinstance(cog, str):
cog = cog.split()
for _filename in cog:
if _filename.endswith('.py'):
_filename = _filename.split('.')[0]
if what_do == "load":
bot.load_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} loaded')
elif what_do == 'unload':
bot.unload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} unloaded')
elif what_do == 'reload':
bot.reload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} reloaded')
elif what_do == 'disable':
bot.unload_extension(f'cogs.{_filename}')
rename(f'cogs/{_filename}.py',
f'cogs/disabled/{_filename}.py')
logger.info(f'Cog {_filename} stopped and disabled')
elif what_do == 'enable':
rename(f'cogs/disabled/{_filename}.py',
f'cogs/{_filename}.py')
bot.load_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} started and enabled')

73
bot/lib/Comands.py Normal file
View File

@@ -0,0 +1,73 @@
"""
lib.Commands
~~~~~~~~~~~~~~
Some prepare for commands
"""
from json import load, decoder, dump, JSONDecodeError
from os import getenv
from disnake.ext import commands
async def read_json(guild: int, _param: str):
"""
Reads Json file to determite config strings
:param guild: ID of Guild
:param _param: Parameter in json file
:return: value of parameter.
"""
parameter = None
with open(getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON
try:
_json = load(f) # Load the custom prefixes
except decoder.JSONDecodeError:
_json = {}
if guild: # If the guild exists
try:
guild_conf = _json[f"{guild}"]
try:
parameter = guild_conf[f"{_param}"]
except KeyError:
pass
except KeyError:
pass
return parameter
async def write_json(guild: int, param_name: str, param: str or int):
with open(getenv('CONF_FILE'), encoding='utf-8') as f:
try:
_json = load(f)
except decoder.JSONDecodeError:
_json = {}
try:
_guild = _json[f'{guild}']
except KeyError:
_json.update({f'{guild}': {}})
_guild = _json[f'{guild}']
_guild.update({f'{param_name}': f'{param}'})
with open(getenv('CONF_FILE'), 'w', encoding='utf-8') as f:
dump(_json, f, indent=4)
def determine_prefix(bot: commands.Bot, msg):
"""
Determite per-server bot prefix
:param bot: Disnake Bot object
:param msg: Disnake msg object
:return: prefix for server, default is $
"""
parameter = '$'
with open(getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON
try:
_json = load(f) # Load the custom prefixes
except JSONDecodeError:
_json = {}
try:
parameter = _json[f"{msg.guild.id}"]["prefix"] # Read prefix from json if is setted up
except KeyError:
pass
return parameter

135
bot/lib/DB_Worker.py Normal file
View File

@@ -0,0 +1,135 @@
from sqlite3 import connect, Error
from .Logger import logger
class DBReader:
def __init__(self, guildid: int = None):
self._guildid = guildid
self.list = self._read_db(self._guildid)
self._current_index = 0
def __str__(self) -> str:
return str(self._guildid)
@classmethod
def _read_db(cls, guildid: int) -> list:
try:
sql_con = connect("user.db")
cursor = sql_con.cursor()
cursor.execute("""SELECT * FROM "(guildid)" """,
{'guildid': guildid})
record = cursor.fetchall()
return record
except Error as _e:
logger.info(f'Error reading DB\n{_e}')
def __iter__(self):
return _ListGenerationIter(self)
class _DBAttrs:
def __init__(self,
userid: int,
username: str,
nick: str,
isbot: bool,
defaulttracks: None or list,
usertracks: None or list):
self.userid = userid
self.username = username
self.nick = nick
self.isbot = isbot
self.defaulttracks = defaulttracks
self.usertracks = usertracks
def __str__(self):
return self.username
def __repr__(self):
return f'<File attrs userid={self.userid} username={self.username} nick={self.nick} ' \
f'isbot={self.isbot} defaulttracks={self.defaulttracks} usertracks={self.usertracks}>'
class _ListGenerationIter:
def __init__(self, user_class):
self._current_index = 0
self._list = user_class.list
self._size = len(self._list)
def __iter__(self):
return self
def __next__(self):
if self._current_index < self._size:
_userid = self._list[self._current_index][0]
_username = self._list[self._current_index][1]
_nick = self._list[self._current_index][2]
_isbot = bool(self._list[self._current_index][3])
_defaulttracks = self._list[self._current_index][4]
_usertracks = self._list[self._current_index][5]
self._current_index += 1
memb = _DBAttrs(_userid,
_username,
_nick,
_isbot,
_defaulttracks,
_usertracks)
return memb
raise StopIteration
async def prepare_db(guild: int):
try:
_connect = connect('user.db')
cursor = _connect.cursor()
cursor.execute(f'''CREATE TABLE IF NOT EXISTS "{guild}"
([userid] INTEGER PRIMARY KEY, [username] TEXT,
[nick] TEXT, [isbot] BOOL, [defaulttracks] TEXT, [usertracks] TEXT)''')
cursor.close()
except Error as _error:
logger.info(_error)
async def work_with_db(db_func: str, data_turple: tuple):
"""
Writing to db per server userinfo
:param db_func:
:param data_turple:
"""
try:
_connect = connect('user.db')
cursor = _connect.cursor()
cursor.execute(db_func, data_turple)
_connect.commit()
cursor.close()
except Error as _error:
logger.critical(_error)
async def fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int):
sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}"
(username, userid, nick, isbot)
VALUES (?, ?, ?, ?)""")
data_tuple: tuple[str, int, str, bool] = (name, userid, nick, isbot)
await work_with_db(sqlite_insert_with_param, data_tuple)
async def read_db(guild: int, user: int, column: str):
_col_dict = {'userid': 0,
'username': 1,
'nick': 2,
'isbot': 3,
'defaulttracks': 4,
'usertracks': 5}
try:
sql_con = connect("user.db")
cursor = sql_con.cursor()
cursor.execute(f"""SELECT * FROM "{guild}" where userid = {user}""")
record = cursor.fetchone()
return record[_col_dict[column]]
except Error as _error:
logger.critical(_error)

86
bot/lib/ListGenerator.py Normal file
View File

@@ -0,0 +1,86 @@
from mimetypes import guess_type
from os import walk
class ListGenerator:
def __init__(self, path: str = None):
self.path = path
self.list: list = self._lister(self.path)
try:
self._size = len(self.list)
except TypeError:
pass
self._current_index = 0
def __str__(self) -> str:
return 'Audio iter generator'
@classmethod
def _lister(cls, path) -> list:
_list: list = []
try:
for f in walk(path):
_list.extend(f)
break
return sorted(_list[2])
except TypeError:
pass
def __iter__(self):
return _ListGenerationIter(self)
def __next__(self):
if self._current_index < self._size:
self._current_index += 1
class _FileAttrs:
def __init__(self, name, path, mimetype, exc):
self.name = name
self.path = path
self.mimetype = mimetype
self.exc = exc
def __str__(self):
return self.name
def __repr__(self):
return f'<File attrs name={self.name} path={self.path} exc={self.exc} type=<{self.mimetype}> >'
class _ListGenerationIter:
def __init__(self, list_class):
self._current_index = 0
self._list = list_class.list
self._name = self._list[self._current_index]
self._path = list_class.path
self._size = len(self._list)
def __iter__(self):
return self
@property
def type(self) -> str:
guess = guess_type(f'{self._path}/{self._list[self._current_index]}')[0]
return guess
@property
def _exc(self) -> str:
try:
_ret = self._list[self._current_index].split('.')[-1]
except AttributeError:
_ret = None
return _ret
def __next__(self):
if self._current_index < self._size:
_name = self._list[self._current_index]
_path = self._path
_type = self.type
_exc = self._exc
self._current_index += 1
_memb = _FileAttrs(_name, _path, _type, _exc)
return _memb
raise StopIteration

52
bot/lib/Logger.py Normal file
View File

@@ -0,0 +1,52 @@
import logging
class _CustomFormatter(logging.Formatter):
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
fmt = "%(asctime)s - [%(levelname)s] -%(module)s- %(message)s"
FORMATS = {
logging.DEBUG: grey + fmt + reset,
logging.INFO: grey + fmt + reset,
logging.WARNING: yellow + fmt + reset,
logging.ERROR: red + fmt + reset,
logging.CRITICAL: bold_red + fmt + reset
}
old_factory = logging.getLogRecordFactory()
def _record_factory(*args, **kwargs):
record = _CustomFormatter.old_factory(*args, **kwargs)
_module = record.module
_levelname = record.levelname
if len(_module) % 2 == 0 and len(_module) < 12:
_module = ' ' * ((12 - len(_module)) // 2) + _module + ' ' * ((12 - len(_module)) // 2)
elif len(_module) % 2 == 1 and len(_module) < 12:
_module = ' ' * ((12 - len(_module)) // 2) + record.module + ' ' * ((12 - len(_module)) // 2 + 1)
if len(_levelname) % 2 == 0 and len(_levelname) < 8:
_levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2)
elif len(record.levelname) % 2 == 1 and len(record.module) < 8:
_levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2 + 1)
record.module = _module
record.levelname = _levelname
return record
logging.setLogRecordFactory(_record_factory)
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt, "%d-%m-%Y %H:%M:%S")
return formatter.format(record)
logger = logging.getLogger('Pisya_Bot')
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(_CustomFormatter())
logger.addHandler(ch)

17
bot/lib/Player.py Normal file
View File

@@ -0,0 +1,17 @@
from asyncio import sleep
from disnake import FFmpegOpusAudio
from .Logger import logger
async def play_audio(audio, bot, vc):
if not bot.voice_clients:
logger.error(f'Playing: {audio}')
vp = await vc.connect()
if not vp.is_playing():
vp.play(FFmpegOpusAudio(f'{audio}', ))
while vp.is_playing():
await sleep(0.5)
await sleep(1)
await vp.disconnect()

83
bot/lib/YandexPlayer.py Normal file
View File

@@ -0,0 +1,83 @@
from os import getenv
from yandex_music import Client
client = Client(getenv('YANDEX_TOKEN')).init()
def search(_str: str, _type: str = 'all') -> dict:
album_id = None
album_title = None
artist_id = None
artist_name = None
result_json = {}
search_result = client.search(_str, type_=_type)
type_ = search_result.best.type
best = search_result.best.result
# print(search_result)
if type_ == 'track':
artists = ''
if best.artists:
artists = ', '.join(artist.name for artist in best.artists)
if best.albums:
for i in best.albums:
album_id = i.id
album_title = i.title
# Generate json for track
result_json = {
"artist": {
"id": best.id,
"name": artists
},
"track": {
"id": best.trackId,
"title": best.title
},
"album": {
"id": album_id,
"title": album_title
}
}
elif type_ == 'artist':
result_json = {
"artist": {
"id": best.id,
"name": best.name
}
}
elif type_ == 'album':
if best.artists:
for i in best.artists:
artist_id = i.id
artist_name = i.name
_tracks = []
tracks = {}
album = client.albums_with_tracks(best.id)
for i, volume in enumerate(album.volumes):
_tracks += volume
_i = 1
for track in _tracks:
print(track)
tracks[f"track_{_i}"] = {
"id": track.id,
"tittle": track.title,
}
_i += 1
result_json = {
"artist": {
"id": artist_id,
"name": artist_name
},
"album": {
"id": best.id,
"title": best.title
},
"tracks": tracks
}
print(_type)
return result_json

5
bot/lib/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""
lib
~~~~~~~~~~~~~
Some libs for the bot which help him
"""

17
bot/locale/ru.json Normal file
View File

@@ -0,0 +1,17 @@
{
"COG_FILE": "Выберите ког файл",
"ENABLE_COG": "Включение кога",
"DISABLE_COG": "Отключение кога",
"LOAD_COG": "Загрузка кога",
"UNLOAD_COG": "Выгрузка кога",
"RELOAD_COG": "Перезагрузка кога",
"SLASH_COG": "Работа с когами(модулями)",
"SLASH_INFO": "Отобрадение информации о пользователе",
"ERROR": "Ошибка",
"COG": "ког",
"DEF_ROLE": "Установка стандартной роли",
"KEY_ROLE": "Установка роли-триггера",
"BOT_ROLE": "Установка роли бота",
"SETUP_ROLE": "Укажите роль",
"SET_ROLE": "Роль установлена"
}