fixed start after move

This commit is contained in:
bacon
2024-03-18 02:27:01 +03:00
parent 76d1625346
commit 98e71f66e9
43 changed files with 30 additions and 27 deletions

16
src/__init__.py Normal file
View File

@@ -0,0 +1,16 @@
__version__ = '0.0.7'
__title__ = "src"
__author__ = "baconborn"
from typing import NamedTuple, Literal
class VersionInfo(NamedTuple):
major: int
minor: int
micro: int
releaselevel: Literal["alpha", "beta", "candidate", "final"]
serial: int
version_info: VersionInfo = VersionInfo(major=0, minor=0, micro=7, releaselevel="alpha", serial=0)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

158
src/bot.py Normal file
View File

@@ -0,0 +1,158 @@
#!/usr/bin/env python3
from os import getenv
from os.path import isfile
from disnake import OptionType, Option, Localized, ApplicationCommandInteraction, Intents, __version__
from disnake.ext.commands import Bot, is_owner
from dotenv import load_dotenv
from integral_lib.CogsPrep import work_with_cogs, cog_list
from integral_lib.Comands import determine_prefix
from integral_lib.Logger import logger
load_dotenv()
if not isfile('.env') or not getenv('CONF_FILE'):
with open('.env', 'a', encoding='utf-8') as f:
f.write("CONF_FILE='config.json'\n")
load_dotenv()
if not isfile(getenv('CONF_FILE')):
with open(getenv('CONF_FILE'), 'a', encoding='utf-8') as f:
f.write("")
intents = Intents(messages=True,
guilds=True,
message_content=True,
voice_states=True,
members=True,
presences=True
)
bot = Bot(command_prefix=determine_prefix,
intents=intents,
reload=True,
test_guilds=[648126669122568215]
)
bot.i18n.load("locale/")
work_with_cogs('load', bot, cog_list())
@bot.event
async def on_ready():
logger.info('Bot started')
logger.info(f'Disnake version {__version__}')
logger.info(f'We have logged in as {bot.user}')
@bot.slash_command(
name='cog',
options=[
Option(
name=Localized('cog', key="COG".lower()),
description=Localized("cog file", key="COG_FILE"),
type=OptionType.string
)
]
)
@is_owner()
async def slash_cogs(inter: ApplicationCommandInteraction):
"""
Working with cogs (modules) {{SLASH_COG}}
Parameters
----------
:param inter:
"""
pass
@slash_cogs.sub_command(description=Localized("Enables Cog", key="ENABLE_COG"))
async def enable(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('enable', bot, cog)
await inter.response.send_message(f'Cog {cog} is enabled', ephemeral=True)
@slash_cogs.sub_command(description=Localized("Disables Cog", key="DISABLE_COG"))
async def disable(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('disable', bot, cog)
await inter.response.send_message(f'Cog {cog} is disabled', ephemeral=True)
@slash_cogs.sub_command(description=Localized("Loads Cog", key="LOAD_COG"))
async def load(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('load', bot, cog)
await inter.response.send_message(f'Cog {cog} is loaded', ephemeral=True)
@slash_cogs.sub_command(description=Localized("Unload Cog", key="UNLOAD_COG"))
async def unload(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('unload', bot, cog)
await inter.response.send_message(f'Cog {cog} is unload', ephemeral=True)
@slash_cogs.sub_command(description=Localized("Reloads Cog", key="RELOAD_COG"))
async def reload(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('reload', bot, cog)
await inter.response.send_message(f'Cog {cog} is reloaded', ephemeral=True)
@disable.autocomplete('cog')
@unload.autocomplete('cog')
@load.autocomplete('cog')
@reload.autocomplete('cog')
async def _cog_opt(inter: ApplicationCommandInteraction, current: str):
current = current.lower()
_list = cog_list(fold='./cogs/')
return [choice for choice in _list if current in choice.lower()]
@enable.autocomplete('cog')
async def _cog_opt(inter: ApplicationCommandInteraction, current: str):
current = current.lower()
_list = cog_list(fold='./cogs/disabled/')
return [choice for choice in _list if current in choice.lower()]
@slash_cogs.error
async def cogs_error(inter: ApplicationCommandInteraction):
await inter.response.send_message(Localized("Error", key="EROR"), ephemeral=True)
logger.error(f'User {inter.author} tries to use cogs func')
bot.run(getenv('TOKEN'))

148
src/cogs/admin.py Normal file
View File

@@ -0,0 +1,148 @@
from asyncio import sleep
import disnake
from disnake import Option, OptionType, Localized
from disnake.ext import tasks
from integral_lib.Comands import *
from integral_lib.DB_Worker import fill_bd, prepare_db, work_with_db
from integral_lib.Logger import logger
class Admin(commands.Cog, name='Admin'):
def __init__(self, bot: commands.Bot):
self.bot = bot # a defining src as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
for g in self.bot.get_all_members():
await prepare_db(g.guild.id)
for g in self.bot.get_all_members():
await fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id)
self.activity.start()
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@tasks.loop(seconds=20)
async def activity(self):
await self.bot.change_presence(
activity=disnake.Activity(
name=f'at users: {str(len(self.bot.users))}',
type=3
)
)
await sleep(10)
await self.bot.change_presence(
activity=disnake.Activity(
name=f'at servers: {str(len(self.bot.guilds))}',
type=3
)
)
@commands.Cog.listener()
async def on_member_update(self, before: disnake.Member, after: disnake.Member):
sql_update_query = f"""UPDATE "{after.guild.id}" set nick = ? where userid = ?"""
data_tuple = (after.nick, before.id)
await work_with_db(sql_update_query, data_tuple)
@commands.Cog.listener()
async def on_guild_join(self, guild):
for g in guild.members:
await fill_bd(g.name, g.id, g.bot, g.nick, guild.id)
@commands.Cog.listener()
async def on_member_join(self, member):
await fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id)
bot_role = read_json(member.guild.id, 'bot_role') # Get src role
guest_role = read_json(member.guild.id, 'guest_role') # Get guest role
if bot_role or guest_role:
if member.bot == 0:
role = disnake.utils.get(member.guild.roles, id=guest_role)
else:
role = disnake.utils.get(member.guild.roles, id=bot_role)
logger.info(f"Adding to {member} role {role}")
await member.add_roles(role)
@commands.slash_command(
name="set_default_role",
description=Localized("Set Default src role", key="DEF_ROLE"),
options=[
Option("role",
"Specify role",
OptionType.role,
required=True),
]
)
@commands.has_permissions(administrator=True)
async def set_guest_role(self, inter: disnake.ApplicationCommandInteraction, role):
await write_json(inter.guild.id, "guest_role", role.id)
await inter.response.send_message(f"Set up src role to: `{role.name}`", ephemeral=True)
@commands.command(name="set_prefix")
@commands.has_permissions(administrator=True)
async def command_set_prefix(self, ctx, prefix: str):
await write_json(ctx.guild.id, "prefix", prefix)
await ctx.reply(f"Prefix set to: `{prefix}`")
@commands.guild_only()
@commands.slash_command(
name="set_prefix",
description="Setting up src prefix",
options=[
Option("prefix",
"Specify prefix",
OptionType.string,
required=True),
]
)
@commands.has_permissions(administrator=True)
async def slash_set_prefix(self, inter: disnake.ApplicationCommandInteraction, prefix: str):
await write_json(inter.guild.id, "prefix", prefix)
await inter.response.send_message(f"Prefix set to: `{prefix}`", ephemeral=True)
@commands.guild_only()
@commands.has_permissions(administrator=True)
@commands.slash_command(
name="set_trigger_role",
description=Localized("Setting up role to trigger src", key="KEY_ROLE"),
options=[
Option("role",
Localized("Specify role", key="SETUP_ROLE"),
OptionType.role,
required=True),
]
)
async def set_trigger_role(self, inter: disnake.ApplicationCommandInteraction, role):
await write_json(inter.guild.id, "tigger_role", role.id)
await inter.response.send_message(f"Role set to: `{role.name}`", ephemeral=True)
@commands.slash_command(
name="set_bot_role",
description=Localized("Set src role", key="BOT_ROLE"),
options=[
Option("role",
Localized("Specify role", key="SETUP_ROLE"),
OptionType.role,
required=True)
]
)
@commands.guild_only()
@commands.has_permissions(administrator=True)
async def set_bot_role(self, ctx, role):
await write_json(ctx.guild.id,
"bot_role",
role.id)
await ctx.send(f"Set up src role to: `{role.name}`", ephemeral=True)
@set_bot_role.error
@set_trigger_role.error
@slash_set_prefix.error
async def set_prefix_error(self, inter: disnake.ApplicationCommandInteraction, prefix):
await inter.response.send_message("You don`t have permissions", ephemeral=True)
logger.error(prefix)
def setup(bot): # an extension must have a setup function
bot.add_cog(Admin(bot)) # adding a cog

84
src/cogs/audio.py Normal file
View File

@@ -0,0 +1,84 @@
import random
import disnake
from disnake import OptionChoice, Option, OptionType, Member, VoiceState, ApplicationCommandInteraction
from disnake.ext import commands
from integral_lib.ListGenerator import ListGenerator
from integral_lib.Logger import logger
from integral_lib.Player import play_audio
class Audio(commands.Cog, name='Audio'):
def __init__(self, bot: commands.Bot):
self.bot = bot
@commands.Cog.listener()
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.Cog.listener()
async def on_voice_state_update(self, member: Member,
before: VoiceState,
after: VoiceState):
if before.channel is None and not member.bot:
logger.info(f'Coneccted {member.name} to {after.channel.name}')
if any('Escape from Tarkov' in str(user.activity) for user in after.channel.members):
logger.info('Skip playing by Game')
else:
# Prepare list of audio
from src.integral_lib.Comands import read_json
_role = await read_json(member.guild.id, 'tigger_role')
audio: list = []
for _a in ListGenerator('audio'):
audio.append(_a.name)
if len(member.roles) == 1 or _role is None:
logger.info('Skip playing by role')
elif any(str(role.id) in _role for role in member.roles):
logger.info('Play audio from list by role')
await play_audio(f'audio/{random.choice(audio)}', self.bot, after.channel)
else:
logger.info('Skip playing by any else')
@commands.slash_command(name="play_audio",
description="Make possible playing audio by command",
options=[
Option(name="audio",
type=OptionType.string,
required=True
)
])
async def playaudio(self, inter: disnake.ApplicationCommandInteraction,
audio: str
):
if inter.author.voice is not None:
await inter.response.send_message(f'Played {audio}', ephemeral=True)
await play_audio(audio, self.bot, inter.author.voice.channel)
else:
await inter.response.send_message('You`re not in voice', ephemeral=True)
@playaudio.autocomplete('audio')
async def list_to_play(self, inter: ApplicationCommandInteraction, current: str):
"""
Asynchronously generates a list of OptionChoices for the given audio files based on the user input.
Parameters:
- inter: disnake.ApplicationCommandInteraction - The interaction context for the command.
- current: str - The current user input to filter the audio file choices.
Returns:
- list[OptionChoice] - A list of OptionChoice objects representing the available audio file choices.
:param current:
:param inter: ApplicationCommandInteraction
"""
current = current.lower()
_dict: dict = {}
for f in ListGenerator('audio'):
_dict[f.name] = f'{f.path}/{f.name}'
return [
OptionChoice(name=choice, value=f'{_dict[choice]}')
for choice in _dict if current in choice.lower()
]
def setup(bot): # an extension must have a setup function
bot.add_cog(Audio(bot)) # adding a cog

View File

@@ -0,0 +1,43 @@
from typing import List
import disnake
from disnake import OptionChoice
from disnake.ext import commands
from integral_lib.Comands import write_json
from integral_lib.Logger import logger
class Fun(commands.Cog, name='Fun'):
def __init__(self, bot: commands.Bot):
self.bot = bot # defining src as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.has_permissions(administrator=True)
@commands.slash_command(
name="set_bot_channel",
description="Set channel which iterate with src",
)
async def set_bot_channel(self, inter: disnake.ApplicationCommandInteraction, channel: str):
await write_json(inter.guild.id,
"channel",
disnake.utils.find(lambda d: d.name == channel, inter.guild.channels).id)
await inter.response.send_message(f"Channel set up to {channel}", ephemeral=True)
@set_bot_channel.autocomplete('channel')
async def _list_text_channels(self,
inter: disnake.ApplicationCommandInteraction,
current: str) -> List[OptionChoice]:
_list = [r.name for r in inter.guild.text_channels]
return [
OptionChoice(name=choice, value=choice)
for choice in _list if current in choice
]
def setup(bot): # an extension must have a setup function
bot.add_cog(Fun(bot)) # adding a cog

29
src/cogs/disabled/test.py Normal file
View File

@@ -0,0 +1,29 @@
import disnake
from disnake import Option
from disnake.ext import commands
from integral_lib import YandexPlayer
from integral_lib.Logger import logger
class Testing(commands.Cog, name='Testing'):
def __init__(self, bot: commands.Bot):
self.bot = bot # defining src as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(name='play', description='play audio test from yandex.music', options=[
Option(name='search',
description='seach track/artist',
required=True),
])
async def play(self, inter: disnake.ApplicationCommandInteraction, search: str):
# TODO add yandex_music player with queue, playlists
result = YandexPlayer.search(search)
await inter.response.send_message(result, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(Testing(bot)) # adding a cog

59
src/cogs/general.py Normal file
View File

@@ -0,0 +1,59 @@
import disnake
from disnake import Option, OptionType, Colour
from disnake.ext import commands
from integral_lib.Logger import logger
class General(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot # defining src as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(
name="info",
options=[
Option("user",
"Specify any user",
OptionType.user),
]
)
async def info(self, inter: disnake.ApplicationCommandInteraction, user=None):
"""
Shows user info {{SLASH_INFO}}
Parameters
----------
:param inter:
:param user:
:return:
"""
user = user or inter.author
rolelist = [r.mention for r in user.roles if r != inter.guild.default_role]
if rolelist:
roles = "\n".join(rolelist)
else:
roles = "Not added any role"
emb = disnake.Embed(
title="General information",
description=f"General information on server about {user}",
color=Colour.random()
)
emb.set_thumbnail(url=user.display_avatar)
emb.add_field(name="General info",
value=f"Username: {user}\n"
f"Nickname: {user.nick}\n"
f"Joined at: {user.joined_at.strftime('%A, %B %d %Y @ %H:%M:%S')}", inline=False)
emb.add_field(name="Roles list", value=f"{roles}")
emb.set_footer(text=f"Information requested by: {inter.author.display_name}")
await inter.response.send_message(embed=emb, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(General(bot)) # adding a cog

44
src/cogs/info.py Normal file
View File

@@ -0,0 +1,44 @@
import os
import disnake
import psutil
from disnake import ApplicationCommandInteraction
from disnake.ext import commands
from __init__ import version_info as ver
from integral_lib.Comands import determine_prefix
from integral_lib.Logger import logger
class BotInfo(commands.Cog, name='Bot Info'):
def __init__(self, bot: commands.Bot):
self.bot = bot # defining src as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(name="info_bot",
description='Shows general info about src') # this is for making a command
async def info_bot(self, inter: ApplicationCommandInteraction):
_pid = os.getpid()
_process = psutil.Process(_pid)
emb = disnake.Embed(
title="General information",
description="General information on about src",
)
emb.set_thumbnail(self.bot.user.avatar.url)
emb.add_field(name="System info:", value=f"Memory Usage: {round(_process.memory_info().rss / 2 ** 20, 2)} Mb\n"
f"CPU Usage: {_process.cpu_percent()}%\n"
f'Bot ping: {round(self.bot.latency * 1000)}\n'
f'Prefix: `{determine_prefix(self.bot, inter)}\n`'
)
emb.add_field(name="Bot info:", value="Bot owner: <@386629192743256065>\n"
f"Bot version: {ver.major}.{ver.minor}.{ver.micro}-{ver.releaselevel}")
emb.set_footer(text=f"Information requested by: {inter.author.display_name}")
await inter.response.send_message(embed=emb, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(BotInfo(bot)) # adding a cog

View File

@@ -0,0 +1,78 @@
"""
integral_lib.CogsPrepare
~~~~~~~~~~~~~
Loads, unloads Cogs files
cog_list: return list of cog filenames
work_with_cogs: loads, reloads and unloads cogs files
"""
from os import listdir, rename
from typing import List
from disnake.ext.commands.interaction_bot_base import InteractionBotBase
from .Logger import logger
def cog_list(fold: str = './cogs') -> List[str]:
"""
A function that generates a list of cog names based on the files present in a specified folder.
Parameters:
- fold (str): The directory path where the cog files are located. Defaults to './cogs'.
Returns:
- List[str]: A list of cog names without the '.py' extension.
"""
cogs_list = []
for _filename in listdir(fold):
if _filename.endswith('.py'):
cogs_list.append(_filename[:-3])
return cogs_list
def work_with_cogs(what_do: str,
bot: InteractionBotBase,
cog: str | list):
"""
Perform the specified action on the given cog or list of cogs.
Args:
what_do (str): The action to perform (load, unload, reload, disable, enable).
src (InteractionBotBase): The src instance to work with.
cog (str | list): The name of the cog or a list of cogs to work with.
Raises:
ValueError: If the action is not recognized.
Returns:
None
--------
:param cog: str | list
:param bot: InteractionBotBase
:param what_do: str = ['load', 'unload', 'reload', 'disable', 'enable']
"""
if isinstance(cog, str):
cog = cog.split()
for _filename in cog:
if _filename.endswith('.py'):
_filename = _filename.split('.')[0]
if what_do == "load":
bot.load_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} loaded')
elif what_do == 'unload':
bot.unload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} unloaded')
elif what_do == 'reload':
bot.reload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} reloaded')
elif what_do == 'disable':
bot.unload_extension(f'cogs.{_filename}')
rename(f'cogs/{_filename}.py',
f'cogs/disabled/{_filename}.py')
logger.info(f'Cog {_filename} stopped and disabled')
elif what_do == 'enable':
rename(f'cogs/disabled/{_filename}.py',
f'cogs/{_filename}.py')
bot.load_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} started and enabled')
else:
raise ValueError(f"Unrecognized action: {what_do}")

View File

@@ -0,0 +1,86 @@
"""
integral_lib.Commands
~~~~~~~~~~~~~~
Some prepare for commands
"""
from json import load, decoder, dump, JSONDecodeError
from os import getenv
from disnake.ext import commands
async def read_json(guild: int, _param: str):
"""
Reads Json file to determite config strings
:param guild: ID of Guild
:param _param: Parameter in json file
:return: value of parameter.
"""
parameter = None
with open(getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON
try:
_json = load(f) # Load the custom prefixes
except decoder.JSONDecodeError:
_json = {}
if guild: # If the guild exists
try:
guild_conf = _json[f"{guild}"]
try:
parameter = guild_conf[f"{_param}"]
except KeyError:
pass
except KeyError:
pass
return parameter
async def write_json(guild: int, param_name: str, param: str or int):
"""
A function to write JSON data to a file after updating or adding a parameter value.
Parameters:
- guild: an integer representing the guild ID
- param_name: a string representing the parameter name
- param: a string or integer representing the parameter value
Returns:
This function does not return anything.
"""
with open(getenv('CONF_FILE'), encoding='utf-8') as f:
try:
_json = load(f)
except decoder.JSONDecodeError:
_json = {}
try:
_guild = _json[f'{guild}']
except KeyError:
_json.update({f'{guild}': {}})
_guild = _json[f'{guild}']
_guild.update({f'{param_name}': f'{param}'})
with open(getenv('CONF_FILE'), 'w', encoding='utf-8') as f:
dump(_json, f, indent=4)
def determine_prefix(bot: commands.Bot, msg):
"""
Determine the per-server src prefix based on the given message object.
:param bot: Disnake Bot object
:param msg: Disnake msg object
:return: prefix for server, default is $
"""
parameter = '$'
with open(getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON
try:
_json = load(f) # Load the custom prefixes
except JSONDecodeError:
_json = {}
try:
parameter = _json[f"{msg.guild.id}"]["prefix"] # Read prefix from json if is setted up
except KeyError:
pass
return parameter

View File

@@ -0,0 +1,135 @@
from sqlite3 import connect, Error
from .Logger import logger
class DBReader:
def __init__(self, guildid: int = None):
self._guildid = guildid
self.list = self._read_db(self._guildid)
self._current_index = 0
def __str__(self) -> str:
return str(self._guildid)
@classmethod
def _read_db(cls, guildid: int) -> list:
try:
sql_con = connect("user.db")
cursor = sql_con.cursor()
cursor.execute("""SELECT * FROM "(guildid)" """,
{'guildid': guildid})
record = cursor.fetchall()
return record
except Error as _e:
logger.info(f'Error reading DB\n{_e}')
def __iter__(self):
return _ListGenerationIter(self)
class _DBAttrs:
def __init__(self,
userid: int,
username: str,
nick: str,
isbot: bool,
defaulttracks: None or list,
usertracks: None or list):
self.userid = userid
self.username = username
self.nick = nick
self.isbot = isbot
self.defaulttracks = defaulttracks
self.usertracks = usertracks
def __str__(self):
return self.username
def __repr__(self):
return f'<File attrs userid={self.userid} username={self.username} nick={self.nick} ' \
f'isbot={self.isbot} defaulttracks={self.defaulttracks} usertracks={self.usertracks}>'
class _ListGenerationIter:
def __init__(self, user_class):
self._current_index = 0
self._list = user_class.list
self._size = len(self._list)
def __iter__(self):
return self
def __next__(self):
if self._current_index < self._size:
_userid = self._list[self._current_index][0]
_username = self._list[self._current_index][1]
_nick = self._list[self._current_index][2]
_isbot = bool(self._list[self._current_index][3])
_defaulttracks = self._list[self._current_index][4]
_usertracks = self._list[self._current_index][5]
self._current_index += 1
memb = _DBAttrs(_userid,
_username,
_nick,
_isbot,
_defaulttracks,
_usertracks)
return memb
raise StopIteration
async def prepare_db(guild: int):
try:
_connect = connect('user.db')
cursor = _connect.cursor()
cursor.execute(f'''CREATE TABLE IF NOT EXISTS "{guild}"
([userid] INTEGER PRIMARY KEY, [username] TEXT,
[nick] TEXT, [isbot] BOOL, [defaulttracks] TEXT, [usertracks] TEXT)''')
cursor.close()
except Error as _error:
logger.info(_error)
async def work_with_db(db_func: str, data_turple: tuple):
"""
Writing to db per server userinfo
:param db_func:
:param data_turple:
"""
try:
_connect = connect('user.db')
cursor = _connect.cursor()
cursor.execute(db_func, data_turple)
_connect.commit()
cursor.close()
except Error as _error:
logger.critical(_error)
async def fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int):
sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}"
(username, userid, nick, isbot)
VALUES (?, ?, ?, ?)""")
data_tuple: tuple[str, int, str, bool] = (name, userid, nick, isbot)
await work_with_db(sqlite_insert_with_param, data_tuple)
async def read_db(guild: int, user: int, column: str):
_col_dict = {'userid': 0,
'username': 1,
'nick': 2,
'isbot': 3,
'defaulttracks': 4,
'usertracks': 5}
try:
sql_con = connect("user.db")
cursor = sql_con.cursor()
cursor.execute(f"""SELECT * FROM "{guild}" where userid = {user}""")
record = cursor.fetchone()
return record[_col_dict[column]]
except Error as _error:
logger.critical(_error)

View File

@@ -0,0 +1,86 @@
from mimetypes import guess_type
from os import walk
class ListGenerator:
def __init__(self, path: str = None):
self.path = path
self.list: list = self._lister(self.path)
try:
self._size = len(self.list)
except TypeError:
pass
self._current_index = 0
def __str__(self) -> str:
return 'Audio iter generator'
@classmethod
def _lister(cls, path) -> list:
_list: list = []
try:
for f in walk(path):
_list.extend(f)
break
return sorted(_list[2])
except TypeError:
pass
def __iter__(self):
return _ListGenerationIter(self)
def __next__(self):
if self._current_index < self._size:
self._current_index += 1
class _FileAttrs:
def __init__(self, name, path, mimetype, exc):
self.name = name
self.path = path
self.mimetype = mimetype
self.exc = exc
def __str__(self):
return self.name
def __repr__(self):
return f'<File attrs name={self.name} path={self.path} exc={self.exc} type=<{self.mimetype}> >'
class _ListGenerationIter:
def __init__(self, list_class):
self._current_index = 0
self._list = list_class.list
self._name = self._list[self._current_index]
self._path = list_class.path
self._size = len(self._list)
def __iter__(self):
return self
@property
def type(self) -> str:
guess = guess_type(f'{self._path}/{self._list[self._current_index]}')[0]
return guess
@property
def _exc(self) -> str:
try:
_ret = self._list[self._current_index].split('.')[-1]
except AttributeError:
_ret = None
return _ret
def __next__(self):
if self._current_index < self._size:
_name = self._list[self._current_index]
_path = self._path
_type = self.type
_exc = self._exc
self._current_index += 1
_memb = _FileAttrs(_name, _path, _type, _exc)
return _memb
raise StopIteration

View File

@@ -0,0 +1,52 @@
import logging
class _CustomFormatter(logging.Formatter):
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
fmt = "%(asctime)s - [%(levelname)s] -%(module)s- %(message)s"
FORMATS = {
logging.DEBUG: grey + fmt + reset,
logging.INFO: grey + fmt + reset,
logging.WARNING: yellow + fmt + reset,
logging.ERROR: red + fmt + reset,
logging.CRITICAL: bold_red + fmt + reset
}
old_factory = logging.getLogRecordFactory()
def _record_factory(*args, **kwargs):
record = _CustomFormatter.old_factory(*args, **kwargs)
_module = record.module
_levelname = record.levelname
if len(_module) % 2 == 0 and len(_module) < 12:
_module = ' ' * ((12 - len(_module)) // 2) + _module + ' ' * ((12 - len(_module)) // 2)
elif len(_module) % 2 == 1 and len(_module) < 12:
_module = ' ' * ((12 - len(_module)) // 2) + record.module + ' ' * ((12 - len(_module)) // 2 + 1)
if len(_levelname) % 2 == 0 and len(_levelname) < 8:
_levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2)
elif len(record.levelname) % 2 == 1 and len(record.module) < 8:
_levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2 + 1)
record.module = _module
record.levelname = _levelname
return record
logging.setLogRecordFactory(_record_factory)
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt, "%d-%m-%Y %H:%M:%S")
return formatter.format(record)
logger = logging.getLogger('Pisya_Bot')
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(_CustomFormatter())
logger.addHandler(ch)

View File

@@ -0,0 +1,17 @@
from asyncio import sleep
from disnake import FFmpegOpusAudio
from .Logger import logger
async def play_audio(audio, bot, vc):
if not bot.voice_clients:
logger.error(f'Playing: {audio}')
vp = await vc.connect()
if not vp.is_playing():
vp.play(FFmpegOpusAudio(f'{audio}', ))
while vp.is_playing():
await sleep(0.5)
await sleep(1)
await vp.disconnect()

View File

@@ -0,0 +1,83 @@
from os import getenv
from yandex_music import Client
client = Client(getenv('YANDEX_TOKEN')).init()
def search(_str: str, _type: str = 'all') -> dict:
album_id = None
album_title = None
artist_id = None
artist_name = None
result_json = {}
search_result = client.search(_str, type_=_type)
type_ = search_result.best.type
best = search_result.best.result
# print(search_result)
if type_ == 'track':
artists = ''
if best.artists:
artists = ', '.join(artist.name for artist in best.artists)
if best.albums:
for i in best.albums:
album_id = i.id
album_title = i.title
# Generate json for track
result_json = {
"artist": {
"id": best.id,
"name": artists
},
"track": {
"id": best.trackId,
"title": best.title
},
"album": {
"id": album_id,
"title": album_title
}
}
elif type_ == 'artist':
result_json = {
"artist": {
"id": best.id,
"name": best.name
}
}
elif type_ == 'album':
if best.artists:
for i in best.artists:
artist_id = i.id
artist_name = i.name
_tracks = []
tracks = {}
album = client.albums_with_tracks(best.id)
for i, volume in enumerate(album.volumes):
_tracks += volume
_i = 1
for track in _tracks:
print(track)
tracks[f"track_{_i}"] = {
"id": track.id,
"tittle": track.title,
}
_i += 1
result_json = {
"artist": {
"id": artist_id,
"name": artist_name
},
"album": {
"id": best.id,
"title": best.title
},
"tracks": tracks
}
print(_type)
return result_json

View File

@@ -0,0 +1,5 @@
"""
integral_lib
~~~~~~~~~~~~~
Some libs for the src which help him
"""

17
src/locale/ru.json Normal file
View File

@@ -0,0 +1,17 @@
{
"COG_FILE": "Выберите ког файл",
"ENABLE_COG": "Включение кога",
"DISABLE_COG": "Отключение кога",
"LOAD_COG": "Загрузка кога",
"UNLOAD_COG": "Выгрузка кога",
"RELOAD_COG": "Перезагрузка кога",
"SLASH_COG": "Работа с когами(модулями)",
"SLASH_INFO": "Отобрадение информации о пользователе",
"ERROR": "Ошибка",
"COG": "ког",
"DEF_ROLE": "Установка стандартной роли",
"KEY_ROLE": "Установка роли-триггера",
"BOT_ROLE": "Установка роли бота",
"SETUP_ROLE": "Укажите роль",
"SET_ROLE": "Роль установлена"
}