Extend commands

This commit is contained in:
2022-09-07 21:36:09 +03:00
parent 013b33731b
commit 5d96b76541
12 changed files with 136 additions and 110 deletions

1
.gitignore vendored
View File

@@ -8,3 +8,4 @@
/.env /.env
*.exe *.exe
/venv/ /venv/
/fun_and_admin_bot.egg-info/

28
bot.py
View File

@@ -1,12 +1,13 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import asyncio import asyncio
import os import os
from typing import List
import disnake import disnake
from disnake import OptionChoice, OptionType, Option from disnake import OptionChoice, OptionType, Option
from disnake.ext import commands from disnake.ext import commands
from __init__ import version_info as ver from __init__ import version_info as ver
from lib import work_with_cogs from lib import work_with_cogs, cog_list
from lib import preload_checks, determine_prefix from lib import preload_checks, determine_prefix
from lib import logger from lib import logger
@@ -27,7 +28,7 @@ bot = commands.Bot(command_prefix=determine_prefix,
) )
asyncio.run(work_with_cogs('load', bot)) asyncio.run(work_with_cogs('load', bot, cog_list()))
@bot.event @bot.event
@@ -51,19 +52,34 @@ async def on_ready():
OptionChoice("unload", "unload"), OptionChoice("unload", "unload"),
OptionChoice("reload", "reload") OptionChoice("reload", "reload")
] ]
),
Option(
'cog',
description="specify cog",
type=OptionType.string
) )
] ]
) )
@commands.is_owner() @commands.is_owner()
async def slash_cogs(inter, what_do): async def slash_cogs(inter, what_do, cog: str = cog_list()) -> None:
await work_with_cogs(what_do, bot) print(type(inter.guild.text_channels))
await work_with_cogs(what_do, bot, cog)
await inter.response.send_message(f'Cogs are {what_do}ed', ephemeral=True) await inter.response.send_message(f'Cogs are {what_do}ed', ephemeral=True)
@slash_cogs.autocomplete('cog')
async def _cog_opt(inter: disnake.ApplicationCommandInteraction, current: str) -> List[OptionChoice]:
_list = cog_list()
return [
OptionChoice(name=choice, value=choice)
for choice in _list if current.lower() in choice.lower()
]
@slash_cogs.error @slash_cogs.error
async def cogs_error(inter, what_do): async def cogs_error(inter, what_do):
await inter.response.send_message(f'{what_do}', ephemeral=True) await inter.response.send_message(f'Error', ephemeral=True)
logger.error(f'User {inter.author} tries to use cogs func') logger.error(f'User {inter.author} tries to use cogs func\n{what_do}\n')
bot.run(os.getenv('TOKEN')) bot.run(os.getenv('TOKEN'))

View File

@@ -1,7 +1,8 @@
from asyncio import sleep from asyncio import sleep
from typing import List
import disnake import disnake
from disnake import Option, OptionType from disnake import Option, OptionType, OptionChoice
from disnake.ext import commands, tasks from disnake.ext import commands, tasks
@@ -76,7 +77,7 @@ class Admin(commands.Cog, name='Admin'):
@commands.has_permissions(administrator=True) @commands.has_permissions(administrator=True)
async def set_guest_role(self, inter, role): async def set_guest_role(self, inter, role):
await write_json(inter.guild.id, "guest_role", role.id) await write_json(inter.guild.id, "guest_role", role.id)
await inter.response.send_message(f"Setted up dsss bot role to: `{role.name}`", ephemeral=True) await inter.response.send_message(f"Set up bot role to: `{role.name}`", ephemeral=True)
@commands.command(name="set_prefix") @commands.command(name="set_prefix")
@commands.has_permissions(administrator=True) @commands.has_permissions(administrator=True)
@@ -121,7 +122,7 @@ class Admin(commands.Cog, name='Admin'):
@commands.has_permissions(administrator=True) @commands.has_permissions(administrator=True)
async def set_bot_role(self, ctx, role): async def set_bot_role(self, ctx, role):
await write_json(ctx.guild.id, "bot_role", role.id) await write_json(ctx.guild.id, "bot_role", role.id)
await ctx.send(f"Setted up bot role to: `{role.name}`", ephemeral=True) await ctx.send(f"Set up bot role to: `{role.name}`", ephemeral=True)
@set_bot_role.error @set_bot_role.error
@set_trigger_role.error @set_trigger_role.error
@@ -146,14 +147,27 @@ class Admin(commands.Cog, name='Admin'):
@commands.has_permissions(administrator=True) @commands.has_permissions(administrator=True)
@commands.slash_command( @commands.slash_command(
name="set_bot_channel", name="set_bot_channel",
description="Set channel whitch itterate with bot", description="Set channel which iterate with bot",
options=[ options=[
Option("channel", "specify channel", OptionType.channel, required=True), Option("channel", "specify channel", OptionType.string, required=True),
] ]
) )
async def set_bot_channel(self, inter, channel): async def set_bot_channel(self, inter, channel):
print(type(inter.guild.text_channels))
await write_json(inter.guild.id, "channel", channel.id) await write_json(inter.guild.id, "channel", channel.id)
await inter.response.send_message(f"Channel setted up to {channel.mention}", ephemeral=True) await inter.response.send_message(f"Channel set up to {channel.mention}", ephemeral=True)
@set_bot_channel.autocomplete('channel')
async def _list_text_channels(self,
inter: disnake.ApplicationCommandInteraction,
current: str) -> List[OptionChoice]:
_list = []
for _channel in inter.guild.text_channels:
_list.append(_channel)
return [
OptionChoice(name=choice, value=choice)
for choice in _list if current in choice
]
def setup(bot): # an extension must have a setup function def setup(bot): # an extension must have a setup function

View File

@@ -1,18 +1,15 @@
import random import random
from os import path, makedirs, rename, remove from os import path, makedirs, rename, remove
from typing import List
import disnake
from disnake import OptionChoice, OptionType, Option
from disnake.ext import commands from disnake.ext import commands
from lib import logger, ListGenerator from lib import logger
from lib import determine_time from lib import determine_time
from lib import read_db, check_exist_audio, add_audio from lib import read_db, check_exist_audio, add_audio
from lib import play_audio from lib import play_audio
# todo: write set audio from list by slash command # todo: write chose audio from list by slash command
class Audio(commands.Cog, name='Audio'): class Audio(commands.Cog, name='Audio'):
def __init__(self, bot): def __init__(self, bot):
self.bot = bot self.bot = bot
@@ -44,8 +41,8 @@ class Audio(commands.Cog, name='Audio'):
def_audio_ls = await list_files() def_audio_ls = await list_files()
if def_audio_db or audio_db is not None: if def_audio_db or audio_db is not None:
def_audio_db = [] if not def_audio_db else def_audio_db if not def_audio_db: def_audio_db = []
audio_db = [] if not audio_db else audio_db if not audio_db: audio_db = []
logger.info(f'Play audio from DB') logger.info(f'Play audio from DB')
full_audio = def_audio_db + audio_db full_audio = def_audio_db + audio_db
await play_audio(f'audio/{random.choice(full_audio)}', self.bot, after.channel) await play_audio(f'audio/{random.choice(full_audio)}', self.bot, after.channel)
@@ -85,6 +82,7 @@ class Audio(commands.Cog, name='Audio'):
file = f'tmp/{user.id}/{at.filename}' file = f'tmp/{user.id}/{at.filename}'
duration = round(MediaInfo.parse(file).tracks[0].duration / 1000) duration = round(MediaInfo.parse(file).tracks[0].duration / 1000)
max_duration = int(determine_time(ctx)) max_duration = int(determine_time(ctx))
print(type(max_duration))
if duration > max_duration: if duration > max_duration:
await ctx.reply(f'Audio duration is {duration}, but max is {max_duration}') await ctx.reply(f'Audio duration is {duration}, but max is {max_duration}')
remove(f'tmp/{user.id}/{at.filename}') remove(f'tmp/{user.id}/{at.filename}')
@@ -95,13 +93,9 @@ class Audio(commands.Cog, name='Audio'):
else: else:
audiolist = f'{at.filename}' audiolist = f'{at.filename}'
if not await check_exist_audio(ctx.guild.id, user.id, at.filename): await check_exist_audio(ctx, ctx.guild.id, user.id, 'usertracks', at.filename)
rename(f'tmp/{user.id}/{at.filename}', f'audio/{user.id}/{at.filename}') await add_audio(ctx.guild.id, user.id, audiolist)
await add_audio(ctx.guild.id, user.id, audiolist) rename(f'tmp/{user.id}/{at.filename}', f'audio/{user.id}/{at.filename}')
await ctx.reply(f'Audio {at.filename.split(".")[0]} added to db')
else:
await ctx.reply(f'Audio {at.filename.split(".")[0]} already in db')
remove(f'tmp/{user.id}/{at.filename}')
elif guess[0].split('/')[0] != 'audio': elif guess[0].split('/')[0] != 'audio':
await ctx.reply(f'It not audio {at.filename}\n it`s {guess[0]}') await ctx.reply(f'It not audio {at.filename}\n it`s {guess[0]}')
remove(f'tmp/{user.id}/{at.filename}') remove(f'tmp/{user.id}/{at.filename}')
@@ -110,50 +104,6 @@ class Audio(commands.Cog, name='Audio'):
else: else:
await ctx.reply(f'You`re not admin. You can add audio only for your own account') await ctx.reply(f'You`re not admin. You can add audio only for your own account')
@commands.slash_command(name="play_audio",
description="Make possible playing audio by command",
options=[
Option(name="audio",
type=OptionType.string,
required=True
)
])
async def playaudio(self, inter: disnake.ApplicationCommandInteraction,
audio: str
):
if inter.author.voice is not None:
await inter.response.send_message(f'Played {audio}', ephemeral=True)
await play_audio(audio, self.bot, inter.author.voice.channel)
else:
await inter.response.send_message('You`re not in voice', ephemeral=True)
@playaudio.autocomplete('audio')
async def list_to_play(self, inter: disnake.ApplicationCommandInteraction, current: str) -> List[OptionChoice]:
_def_iter = ListGenerator('audio')
_def_dict: dict = {}
for f in _def_iter:
_def_dict[f.name] = f'{f.path}/{f.name}'
_user_dict: dict = {}
try:
_user_iter = ListGenerator(f'audio/{inter.author.id}')
for f in _user_iter:
_user_dict[f.name] = f'{f.path}/{f.name}'
# user_dict = []
# for _track in user_list:
except IndexError:
pass
_dict = {}
_dict.update(_def_dict)
_dict.update(_user_dict)
return [
OptionChoice(name=choice, value=f'{_dict[choice]}')
for choice in _dict if current.lower() in choice.lower()
]
def setup(bot): # an extension must have a setup function def setup(bot): # an extension must have a setup function
bot.add_cog(Audio(bot)) # adding a cog bot.add_cog(Audio(bot)) # adding a cog

View File

@@ -28,7 +28,7 @@ class General(commands.Cog):
user = user or inter.author user = user or inter.author
_user = DB_Reader(inter.guild.id) _user = DB_Reader(inter.guild.id)
for r in _user: for r in _user:
if r.id == user.id: if r.userid == user.id:
user_audio = r.usertracks user_audio = r.usertracks
default_audio = r.defaulttracks default_audio = r.defaulttracks

View File

@@ -1,6 +1,12 @@
from typing import List
import disnake
from disnake import Option, OptionType, OptionChoice
from disnake.ext import commands from disnake.ext import commands
from lib import ListGenerator
from lib import logger from lib import logger
from lib import play_audio
class Testing(commands.Cog, name='Testing'): class Testing(commands.Cog, name='Testing'):
@@ -11,6 +17,51 @@ class Testing(commands.Cog, name='Testing'):
async def on_ready(self): async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.') logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(name="play_audio",
description="Make possible playing audio by command",
options=[
Option(name="audio",
type=OptionType.string,
required=True
)
])
async def playaudio(self, inter: disnake.ApplicationCommandInteraction,
audio: str
):
if inter.author.voice is not None:
await inter.response.send_message(f'Played {audio}', ephemeral=True)
await play_audio(audio, self.bot, inter.author.voice.channel)
else:
await inter.response.send_message('You`re not in voice', ephemeral=True)
@playaudio.autocomplete('audio')
async def list_to_play(self, inter: disnake.ApplicationCommandInteraction, current: str) -> List[OptionChoice]:
_def_iter = ListGenerator('audio')
_def_dict: dict = {}
for f in _def_iter:
_def_dict[f.name] = f'{f.path}/{f.name}'
_user_dict: dict = {}
try:
_user_iter = ListGenerator(f'audio/{inter.author.id}')
for f in _user_iter:
_user_dict[f.name] = f'{f.path}/{f.name}'
# user_dict = []
# for _track in user_list:
except IndexError:
pass
_dict = {}
_dict.update(_def_dict)
_dict.update(_user_dict)
return [
OptionChoice(name=choice, value=f'{_dict[choice]}')
for choice in _dict if current.lower() in choice.lower()
]
def setup(bot): # an extension must have a setup function def setup(bot): # an extension must have a setup function
bot.add_cog(Testing(bot)) # adding a cog bot.add_cog(Testing(bot)) # adding a cog

View File

@@ -20,8 +20,10 @@ def cog_list():
return cogs_list return cogs_list
async def work_with_cogs(what_do, bot): async def work_with_cogs(what_do, bot, cog):
for _filename in cog_list(): if isinstance(cog, str):
cog = cog.split()
for _filename in cog:
if what_do == "load": if what_do == "load":
try: try:
bot.load_extension(f'cogs.{_filename}') bot.load_extension(f'cogs.{_filename}')
@@ -38,5 +40,7 @@ async def work_with_cogs(what_do, bot):
elif what_do == 'unload': elif what_do == 'unload':
bot.unload_extension(f'cogs.{_filename}') bot.unload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} unloaded')
elif what_do == 'reload': elif what_do == 'reload':
bot.reload_extension(f'cogs.{_filename}') bot.reload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} reloaded')

View File

@@ -20,7 +20,7 @@ def preload_checks():
async def list_files(fold: str = 'audio'): async def list_files(fold: str = 'audio'):
fold = f'audio/{fold}' if fold != 'audio' else fold if fold != 'audio': fold = f'audio/{fold}'
fl = [] fl = []
for filenames in os.walk(fold): for filenames in os.walk(fold):
fl.extend(filenames) fl.extend(filenames)

View File

@@ -1,5 +1,5 @@
import sqlite3 import sqlite3
from typing import List
from lib import logger from lib import logger
@@ -37,36 +37,20 @@ class _DBAttrs:
isbot: bool, isbot: bool,
defaulttracks: None or list, defaulttracks: None or list,
usertracks: None or list): usertracks: None or list):
self.id = userid self.userid = userid
self.username = username self.username = username
self.nick = nick self.nick = nick
self.isbot = isbot self.isbot = isbot
if defaulttracks is not None: self.defaulttracks = defaulttracks
self._def_list = defaulttracks self.usertracks = usertracks
self.defaulttracks = self._defaulttracks
else:
self.defaulttracks = None
if usertracks is not None:
self._user_list = usertracks
self.usertracks = self._usertracks
else:
self.usertracks = None
def __str__(self): def __str__(self):
return self.username return self.username
def __repr__(self): def __repr__(self):
return f'<File attrs userid={self.id} username={self.username} nick={self.nick} ' \ return f'<File attrs userid={self.userid} username={self.username} nick={self.nick} ' \
f'isbot={self.isbot} defaulttracks={self.defaulttracks} usertracks={self.usertracks}>' f'isbot={self.isbot} defaulttracks={self.defaulttracks} usertracks={self.usertracks}>'
@property
def _defaulttracks(self) -> List[str]:
return self._def_list.split(', ')
@property
def _usertracks(self) -> List[str]:
return self._user_list.split(', ')
class _ListGenerationIter: class _ListGenerationIter:
def __init__(self, user_class): def __init__(self, user_class):
@@ -166,11 +150,16 @@ async def read_db(guild: int, user: int, column: str):
pass pass
async def check_exist_audio(guild: int, user: int, audio: str): async def check_exist_audio(ctx, guild: int, user: int, column: str, audio: str):
_users_db = DB_Reader(guild) _list_str = await read_db(guild, user, column)
for _user in _users_db: print(type(_list_str))
if not _user.isbot: if _list_str is not None:
if _user.id == user and _user.usertracks is not None and audio in _user.usertracks: _list = _list_str.split(',')
return True if audio in _list:
else: await ctx.reply("File in list")
return False
else:
pass
else:
_list = 'None'

View File

@@ -1,6 +1,6 @@
import mimetypes import mimetypes
import os import os
from typing import Optional from typing import Tuple, Optional
class ListGenerator: class ListGenerator:
@@ -36,10 +36,10 @@ class ListGenerator:
class _FileAttrs: class _FileAttrs:
def __init__(self, name, path, _type, exc): def __init__(self, name, path, type, exc):
self.name = name self.name = name
self.path = path self.path = path
self.mimetype = _type self.mimetype = type
self.exc = exc self.exc = exc
def __str__(self): def __str__(self):
@@ -63,7 +63,7 @@ class _ListGenerationIter:
return self return self
@property @property
def _type(self) -> Optional[str]: def type(self) -> tuple[Optional[str], Optional[str]]:
guess = mimetypes.guess_type(f'{self._path}/{self._list[self._current_index]}')[0] guess = mimetypes.guess_type(f'{self._path}/{self._list[self._current_index]}')[0]
return guess return guess

View File

@@ -5,6 +5,7 @@ from disnake import FFmpegPCMAudio
async def play_audio(audio, bot, vc): async def play_audio(audio, bot, vc):
if not bot.voice_clients: if not bot.voice_clients:
logger.info(audio)
logger.error(f'Playing: {audio}') logger.error(f'Playing: {audio}')
await sleep(1) await sleep(1)
vp = await vc.connect() vp = await vc.connect()

View File

@@ -1,7 +1,7 @@
disnake[audio]~=2.5.2 disnake[audio]~=2.5.2
setuptools==65.3.0 setuptools==65.3.0
psutil==5.9.2 psutil~=5.9.1
pymediainfo~=5.1.0 pymediainfo~=5.1.0
pyNaCl~=1.5.0 pyNaCl~=1.5.0
python-dotenv==0.21.0 python-dotenv~=0.20.0
ffmpeg-python~=0.2.0 ffmpeg