optimised imports
This commit is contained in:
55
bot.py
55
bot.py
@@ -3,12 +3,11 @@ import asyncio
|
||||
from os import getenv
|
||||
from os.path import isfile
|
||||
|
||||
import disnake
|
||||
from disnake import OptionType, Option, Localized
|
||||
from disnake.ext import commands
|
||||
from disnake import OptionType, Option, Localized, ApplicationCommandInteraction, Intents, __version__
|
||||
from disnake.ext.commands import Bot, is_owner
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from __init__ import __version__
|
||||
from __init__ import __version__ as ver
|
||||
from lib.CogsPrep import work_with_cogs, cog_list
|
||||
from lib.Comands import determine_prefix
|
||||
from lib.Logger import logger
|
||||
@@ -22,19 +21,19 @@ if not isfile(getenv('CONF_FILE')):
|
||||
with open(getenv('CONF_FILE'), 'a', encoding='utf-8') as f:
|
||||
f.write("")
|
||||
|
||||
intents = disnake.Intents(messages=True,
|
||||
guilds=True,
|
||||
message_content=True,
|
||||
voice_states=True,
|
||||
members=True,
|
||||
presences=True
|
||||
)
|
||||
intents = Intents(messages=True,
|
||||
guilds=True,
|
||||
message_content=True,
|
||||
voice_states=True,
|
||||
members=True,
|
||||
presences=True
|
||||
)
|
||||
|
||||
bot = commands.Bot(command_prefix=determine_prefix,
|
||||
intents=intents,
|
||||
reload=True,
|
||||
test_guilds=[648126669122568215]
|
||||
)
|
||||
bot = Bot(command_prefix=determine_prefix,
|
||||
intents=intents,
|
||||
reload=True,
|
||||
test_guilds=[648126669122568215]
|
||||
)
|
||||
|
||||
bot.i18n.load("locale/")
|
||||
|
||||
@@ -44,9 +43,9 @@ asyncio.run(work_with_cogs('load', bot, asyncio.run(cog_list())))
|
||||
@bot.event
|
||||
async def on_ready():
|
||||
logger.info('Bot started')
|
||||
logger.info(f'Disnake version {disnake.__version__}')
|
||||
logger.info(f'Disnake version {__version__}')
|
||||
logger.info(f'We have logged in as {bot.user}')
|
||||
logger.info(f'Version of bot is - v{__version__}')
|
||||
logger.info(f'Version of bot is - v{ver}')
|
||||
|
||||
|
||||
@bot.slash_command(
|
||||
@@ -59,8 +58,8 @@ async def on_ready():
|
||||
)
|
||||
]
|
||||
)
|
||||
@commands.is_owner()
|
||||
async def slash_cogs(inter: disnake.ApplicationCommandInteraction):
|
||||
@is_owner()
|
||||
async def slash_cogs(inter: ApplicationCommandInteraction):
|
||||
"""
|
||||
Working with cogs (modules) {{SLASH_COG}}
|
||||
|
||||
@@ -72,7 +71,7 @@ async def slash_cogs(inter: disnake.ApplicationCommandInteraction):
|
||||
|
||||
|
||||
@slash_cogs.sub_command(description=Localized("Enables Cog", key="ENABLE_COG"))
|
||||
async def enable(inter: disnake.ApplicationCommandInteraction, cog: str):
|
||||
async def enable(inter: ApplicationCommandInteraction, cog: str):
|
||||
"""
|
||||
|
||||
Parameters
|
||||
@@ -85,7 +84,7 @@ async def enable(inter: disnake.ApplicationCommandInteraction, cog: str):
|
||||
|
||||
|
||||
@slash_cogs.sub_command(description=Localized("Disables Cog", key="DISABLE_COG"))
|
||||
async def disable(inter: disnake.ApplicationCommandInteraction, cog: str):
|
||||
async def disable(inter: ApplicationCommandInteraction, cog: str):
|
||||
"""
|
||||
|
||||
Parameters
|
||||
@@ -98,7 +97,7 @@ async def disable(inter: disnake.ApplicationCommandInteraction, cog: str):
|
||||
|
||||
|
||||
@slash_cogs.sub_command(description=Localized("Loads Cog", key="LOAD_COG"))
|
||||
async def load(inter: disnake.ApplicationCommandInteraction, cog: str):
|
||||
async def load(inter: ApplicationCommandInteraction, cog: str):
|
||||
"""
|
||||
|
||||
Parameters
|
||||
@@ -111,7 +110,7 @@ async def load(inter: disnake.ApplicationCommandInteraction, cog: str):
|
||||
|
||||
|
||||
@slash_cogs.sub_command(description=Localized("Unload Cog", key="UNLOAD_COG"))
|
||||
async def unload(inter: disnake.ApplicationCommandInteraction, cog: str):
|
||||
async def unload(inter: ApplicationCommandInteraction, cog: str):
|
||||
"""
|
||||
|
||||
Parameters
|
||||
@@ -124,7 +123,7 @@ async def unload(inter: disnake.ApplicationCommandInteraction, cog: str):
|
||||
|
||||
|
||||
@slash_cogs.sub_command(description=Localized("Reloads Cog", key="RELOAD_COG"))
|
||||
async def reload(inter: disnake.ApplicationCommandInteraction, cog: str):
|
||||
async def reload(inter: ApplicationCommandInteraction, cog: str):
|
||||
"""
|
||||
|
||||
Parameters
|
||||
@@ -140,21 +139,21 @@ async def reload(inter: disnake.ApplicationCommandInteraction, cog: str):
|
||||
@unload.autocomplete('cog')
|
||||
@load.autocomplete('cog')
|
||||
@reload.autocomplete('cog')
|
||||
async def _cog_opt(inter: disnake.ApplicationCommandInteraction, current: str):
|
||||
async def _cog_opt(inter: ApplicationCommandInteraction, current: str):
|
||||
current = current.lower()
|
||||
_list = await cog_list(fold='./cogs/')
|
||||
return [choice for choice in _list if current in choice.lower()]
|
||||
|
||||
|
||||
@enable.autocomplete('cog')
|
||||
async def _cog_opt(inter: disnake.ApplicationCommandInteraction, current: str):
|
||||
async def _cog_opt(inter: ApplicationCommandInteraction, current: str):
|
||||
current = current.lower()
|
||||
_list = await cog_list(fold='./cogs/disabled/')
|
||||
return [choice for choice in _list if current in choice.lower()]
|
||||
|
||||
|
||||
@slash_cogs.error
|
||||
async def cogs_error(inter: disnake.ApplicationCommandInteraction):
|
||||
async def cogs_error(inter: ApplicationCommandInteraction):
|
||||
await inter.response.send_message(Localized("Error", key="EROR"), ephemeral=True)
|
||||
logger.error(f'User {inter.author} tries to use cogs func')
|
||||
|
||||
|
||||
@@ -6,11 +6,11 @@ cog_list: return list of cog filenames
|
||||
work_with_cogs: loads, reloads and unloads cogs files
|
||||
|
||||
"""
|
||||
import os
|
||||
from os import listdir
|
||||
from os import listdir, rename
|
||||
from typing import List
|
||||
|
||||
from disnake.ext import commands
|
||||
|
||||
from .Logger import logger
|
||||
|
||||
|
||||
@@ -37,9 +37,9 @@ async def work_with_cogs(what_do, bot: commands.Bot, cog):
|
||||
logger.info(f'Cog {_filename} reloaded')
|
||||
elif what_do == 'disable':
|
||||
bot.unload_extension(f'cogs.{_filename}')
|
||||
os.rename(f'cogs/{_filename}.py', f'cogs/disabled/{_filename}.py')
|
||||
rename(f'cogs/{_filename}.py', f'cogs/disabled/{_filename}.py')
|
||||
logger.info(f'Cog {_filename} stopped and disabled')
|
||||
elif what_do == 'enable':
|
||||
os.rename(f'cogs/disabled/{_filename}.py', f'cogs/{_filename}.py')
|
||||
rename(f'cogs/disabled/{_filename}.py', f'cogs/{_filename}.py')
|
||||
bot.load_extension(f'cogs.{_filename}')
|
||||
logger.info(f'Cog {_filename} started and enabled')
|
||||
|
||||
@@ -3,8 +3,9 @@ lib.Commands
|
||||
~~~~~~~~~~~~~~
|
||||
Some prepare for commands
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
from json import load, decoder, dump, JSONDecodeError
|
||||
from os import getenv
|
||||
|
||||
from disnake.ext import commands
|
||||
|
||||
|
||||
@@ -16,10 +17,10 @@ async def read_json(guild: int, _param: str):
|
||||
:return: value of parameter.
|
||||
"""
|
||||
parameter = None
|
||||
with open(os.getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON
|
||||
with open(getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON
|
||||
try:
|
||||
_json = json.load(f) # Load the custom prefixes
|
||||
except json.decoder.JSONDecodeError:
|
||||
_json = load(f) # Load the custom prefixes
|
||||
except decoder.JSONDecodeError:
|
||||
_json = {}
|
||||
if guild: # If the guild exists
|
||||
try:
|
||||
@@ -34,10 +35,10 @@ async def read_json(guild: int, _param: str):
|
||||
|
||||
|
||||
async def write_json(guild: int, param_name: str, param: str or int):
|
||||
with open(os.getenv('CONF_FILE'), encoding='utf-8') as f:
|
||||
with open(getenv('CONF_FILE'), encoding='utf-8') as f:
|
||||
try:
|
||||
_json = json.load(f)
|
||||
except json.decoder.JSONDecodeError:
|
||||
_json = load(f)
|
||||
except decoder.JSONDecodeError:
|
||||
_json = {}
|
||||
try:
|
||||
_guild = _json[f'{guild}']
|
||||
@@ -45,8 +46,8 @@ async def write_json(guild: int, param_name: str, param: str or int):
|
||||
_json.update({f'{guild}': {}})
|
||||
_guild = _json[f'{guild}']
|
||||
_guild.update({f'{param_name}': f'{param}'})
|
||||
with open(os.getenv('CONF_FILE'), 'w', encoding='utf-8') as f:
|
||||
json.dump(_json, f, indent=4)
|
||||
with open(getenv('CONF_FILE'), 'w', encoding='utf-8') as f:
|
||||
dump(_json, f, indent=4)
|
||||
|
||||
|
||||
def determine_prefix(bot: commands.Bot, msg):
|
||||
@@ -57,10 +58,10 @@ def determine_prefix(bot: commands.Bot, msg):
|
||||
:return: prefix for server, default is $
|
||||
"""
|
||||
parameter = '$'
|
||||
with open(os.getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON
|
||||
with open(getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON
|
||||
try:
|
||||
_json = json.load(f) # Load the custom prefixes
|
||||
except json.JSONDecodeError:
|
||||
_json = load(f) # Load the custom prefixes
|
||||
except JSONDecodeError:
|
||||
_json = {}
|
||||
try:
|
||||
parameter = _json[f"{msg.guild.id}"]["prefix"] # Read prefix from json if is setted up
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import sqlite3
|
||||
from sqlite3 import connect, Error
|
||||
|
||||
from lib.Logger import logger
|
||||
|
||||
@@ -16,13 +16,13 @@ class DBReader:
|
||||
@classmethod
|
||||
def _read_db(cls, guildid: int) -> list:
|
||||
try:
|
||||
sql_con = sqlite3.connect("user.db")
|
||||
sql_con = connect("user.db")
|
||||
cursor = sql_con.cursor()
|
||||
cursor.execute("""SELECT * FROM "(guildid)" """,
|
||||
{'guildid': guildid})
|
||||
record = cursor.fetchall()
|
||||
return record
|
||||
except sqlite3.Error as _e:
|
||||
except Error as _e:
|
||||
logger.info(f'Error reading DB\n{_e}')
|
||||
|
||||
def __iter__(self):
|
||||
@@ -84,13 +84,13 @@ class _ListGenerationIter:
|
||||
|
||||
async def prepare_db(guild: int):
|
||||
try:
|
||||
connect = sqlite3.connect('user.db')
|
||||
cursor = connect.cursor()
|
||||
_connect = connect('user.db')
|
||||
cursor = _connect.cursor()
|
||||
cursor.execute(f'''CREATE TABLE IF NOT EXISTS "{guild}"
|
||||
([userid] INTEGER PRIMARY KEY, [username] TEXT,
|
||||
[nick] TEXT, [isbot] BOOL, [defaulttracks] TEXT, [usertracks] TEXT)''')
|
||||
cursor.close()
|
||||
except sqlite3.Error as _error:
|
||||
except Error as _error:
|
||||
logger.info(_error)
|
||||
|
||||
|
||||
@@ -101,12 +101,12 @@ async def work_with_db(db_func: str, data_turple: tuple):
|
||||
:param data_turple:
|
||||
"""
|
||||
try:
|
||||
connect = sqlite3.connect('user.db')
|
||||
cursor = connect.cursor()
|
||||
_connect = connect('user.db')
|
||||
cursor = _connect.cursor()
|
||||
cursor.execute(db_func, data_turple)
|
||||
connect.commit()
|
||||
_connect.commit()
|
||||
cursor.close()
|
||||
except sqlite3.Error as _error:
|
||||
except Error as _error:
|
||||
logger.critical(_error)
|
||||
|
||||
|
||||
@@ -126,10 +126,10 @@ async def read_db(guild: int, user: int, column: str):
|
||||
'defaulttracks': 4,
|
||||
'usertracks': 5}
|
||||
try:
|
||||
sql_con = sqlite3.connect("user.db")
|
||||
sql_con = connect("user.db")
|
||||
cursor = sql_con.cursor()
|
||||
cursor.execute(f"""SELECT * FROM "{guild}" where userid = {user}""")
|
||||
record = cursor.fetchone()
|
||||
return record[_col_dict[column]]
|
||||
except sqlite3.Error as _error:
|
||||
except Error as _error:
|
||||
logger.critical(_error)
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import mimetypes
|
||||
import os
|
||||
from mimetypes import guess_type
|
||||
from os import walk
|
||||
|
||||
|
||||
class ListGenerator:
|
||||
@@ -19,7 +19,7 @@ class ListGenerator:
|
||||
def _lister(cls, path) -> list:
|
||||
_list: list = []
|
||||
try:
|
||||
for f in os.walk(path):
|
||||
for f in walk(path):
|
||||
_list.extend(f)
|
||||
break
|
||||
return sorted(_list[2])
|
||||
@@ -63,7 +63,7 @@ class _ListGenerationIter:
|
||||
|
||||
@property
|
||||
def type(self) -> str:
|
||||
guess = mimetypes.guess_type(f'{self._path}/{self._list[self._current_index]}')[0]
|
||||
guess = guess_type(f'{self._path}/{self._list[self._current_index]}')[0]
|
||||
return guess
|
||||
|
||||
@property
|
||||
|
||||
@@ -1,14 +1,16 @@
|
||||
from .Logger import logger
|
||||
from asyncio import sleep
|
||||
|
||||
from disnake import FFmpegOpusAudio
|
||||
|
||||
from .Logger import logger
|
||||
|
||||
|
||||
async def play_audio(audio, bot, vc):
|
||||
if not bot.voice_clients:
|
||||
logger.error(f'Playing: {audio}')
|
||||
vp = await vc.connect()
|
||||
if not vp.is_playing():
|
||||
vp.play(FFmpegOpusAudio(f'{audio}'))
|
||||
vp.play(FFmpegOpusAudio(f'{audio}', ))
|
||||
while vp.is_playing():
|
||||
await sleep(0.5)
|
||||
await sleep(1)
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import os
|
||||
from os import getenv
|
||||
|
||||
from yandex_music import Client
|
||||
|
||||
client = Client(os.getenv('YANDEX_TOKEN')).init()
|
||||
client = Client(getenv('YANDEX_TOKEN')).init()
|
||||
|
||||
|
||||
def search(_str: str, _type: str = 'all') -> dict:
|
||||
|
||||
Reference in New Issue
Block a user