From 53939b4bc33cb5479321fdf0a75c5b7ba2fed634 Mon Sep 17 00:00:00 2001 From: bot Date: Thu, 7 Mar 2024 02:18:09 +0300 Subject: [PATCH] v0.0.6\nplay_audio fixes --- .gitignore | 22 +-- CHANGELOG | 4 +- __init__.py | 32 ++-- bot.py | 173 +++++++++--------- cogs/admin.py | 346 +++++++++++++++++------------------ cogs/audio.py | 192 +++++++++---------- cogs/{ => disabled}/funny.py | 30 +-- cogs/disabled/test.py | 33 ++++ cogs/general.py | 130 ++++++------- cogs/info.py | 88 ++++----- cogs/test.py | 67 ------- lib/CogsPrep.py | 114 ++++++------ lib/Comands.py | 226 +++++++++++------------ lib/DB_Worker.py | 330 ++++++++++++++++----------------- lib/ListGenerator.py | 173 +++++++++--------- lib/Logger.py | 104 +++++------ lib/Player.py | 34 ++-- lib/__init__.py | 22 +-- main.py | 80 ++++++++ requirements.txt | 10 +- setup.py | 15 ++ test.py | 5 + 22 files changed, 1126 insertions(+), 1104 deletions(-) rename cogs/{ => disabled}/funny.py (96%) create mode 100644 cogs/disabled/test.py delete mode 100644 cogs/test.py create mode 100644 main.py create mode 100644 setup.py create mode 100644 test.py diff --git a/.gitignore b/.gitignore index d40a592..426be7e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,11 +1,11 @@ -/tmp/ -/audio/*/ -/.idea -/user.db -*.json -*.pyc -/.run/ -/.env -*.exe -/venv/ -/fun_and_admin_bot.egg-info/ +/tmp/ +/audio/*/ +/.idea +/user.db +*.json +*.pyc +/.run/ +/.env +*.exe +/venv/ +/fun_and_admin_bot.egg-info/ diff --git a/CHANGELOG b/CHANGELOG index 77d3c92..f8461e7 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,2 +1,2 @@ -0.0.5 -Initial +0.0.5 +Initial diff --git a/__init__.py b/__init__.py index 9b52901..1ea1da5 100644 --- a/__init__.py +++ b/__init__.py @@ -1,16 +1,16 @@ -__version__ = '0.0.5' -__title__ = "Pisya_bot" -__author__ = "beaconborn" - -from typing import NamedTuple, Literal - - -class VersionInfo(NamedTuple): - major: int - minor: int - micro: int - releaselevel: Literal["alpha", "beta", "candidate", "final"] - serial: int - - -version_info: VersionInfo = VersionInfo(major=5, minor=0, micro=5, releaselevel="alpha", serial=0) +__version__ = '0.0.6' +__title__ = "Pisya_bot" +__author__ = "baconborn" + +from typing import NamedTuple, Literal + + +class VersionInfo(NamedTuple): + major: int + minor: int + micro: int + releaselevel: Literal["alpha", "beta", "candidate", "final"] + serial: int + + +version_info: VersionInfo = VersionInfo(major=0, minor=0, micro=6, releaselevel="alpha", serial=0) diff --git a/bot.py b/bot.py index 488d2af..463b08a 100755 --- a/bot.py +++ b/bot.py @@ -1,90 +1,83 @@ -#!/usr/bin/env python3 -import asyncio -import os -from typing import List - -import disnake -from disnake import OptionChoice, OptionType, Option -from disnake.ext import commands -from __init__ import version_info as ver -from lib import work_with_cogs, cog_list -from lib import preload_checks, determine_prefix -from lib import logger - -preload_checks() - - -intents = disnake.Intents(messages=True, - guilds=True, - message_content=True, - voice_states=True, - members=True, - presences=True - ) - -bot = commands.Bot(command_prefix=determine_prefix, - intents=intents, - reload=True - ) - - -asyncio.run(work_with_cogs('load', bot, asyncio.run(cog_list()))) - - -@bot.event -async def on_ready(): - logger.info(f'Bot started') - logger.info('We have logged in as {0.user}'.format(bot)) - logger.info(f'Version of bot is - v{ver.major}.{ver.minor}.{ver.micro}-{ver.releaselevel}') - - -@bot.slash_command( - name="cog", - description="Work with cogs", - options=[ - Option( - "what_do", - description="Specify what do with cogs", - type=OptionType.string, - required=True, - choices=[ - OptionChoice("load", "load"), - OptionChoice("unload", "unload"), - OptionChoice("reload", "reload"), - OptionChoice("enable", "enable"), - OptionChoice("disable", "disable"), - ] - ), - Option( - 'cog', - description="specify cog", - type=OptionType.string - ) - ] -) -@commands.is_owner() -async def slash_cogs(inter, what_do, cog: str = asyncio.run(cog_list())) -> None: - await work_with_cogs(what_do, bot, cog) - await inter.response.send_message(f'Cog {cog} is {what_do}ed', ephemeral=True) - - -@slash_cogs.autocomplete('cog') -async def _cog_opt(inter: disnake.ApplicationCommandInteraction, current: str, what_do) -> List[OptionChoice]: - _what = ['load', 'reload', 'unload', 'disable'] - if what_do in _what: - _list = await cog_list() - elif what_do == 'enable': - _list = await cog_list('./cogs/disabled/') - return [ - OptionChoice(name=choice, value=choice) - for choice in _list if current.lower() in choice.lower() - ] - - -@slash_cogs.error -async def cogs_error(inter, what_do): - await inter.response.send_message(f'Error', ephemeral=True) - logger.error(f'User {inter.author} tries to use cogs func\n{what_do}\n') - - -bot.run(os.getenv('TOKEN')) +#!/usr/bin/env python3 +import asyncio +import os +from typing import List + +import disnake +from disnake import OptionChoice, OptionType, Option +from disnake.ext import commands +from __init__ import version_info as ver +from lib import work_with_cogs, cog_list +from lib import preload_checks, determine_prefix +from lib import logger + +preload_checks() + + +intents = disnake.Intents(messages=True, + guilds=True, + message_content=True, + voice_states=True, + members=True, + presences=True + ) + +bot = commands.Bot(command_prefix=determine_prefix, + intents=intents, + reload=True, + + ) + + +asyncio.run(work_with_cogs('load', bot, asyncio.run(cog_list()))) + + +@bot.event +async def on_ready(): + logger.info(f'Bot started') + logger.info(f'Disnake version {disnake.__version__}') + logger.info('We have logged in as {0.user}'.format(bot)) + logger.info(f'Version of bot is - v{ver.major}.{ver.minor}.{ver.micro}-{ver.releaselevel}') + + +@bot.slash_command( + name="cog", + description="Work with cogs", + options=[ + Option( + "what_do", + description="Specify what do with cogs", + type=OptionType.string, + required=True, + choices=["load", "unload", "reload", "enable", "disable"] + ), + Option( + 'cog', + description="specify cog", + type=OptionType.string + ) + ] +) +@commands.is_owner() +async def slash_cogs(inter: disnake.ApplicationCommandInteraction, what_do, cog: str): + await work_with_cogs(what_do, bot, cog) + await inter.response.send_message(f'Cog {cog} is {what_do}ed', ephemeral=True) + + +@slash_cogs.autocomplete('cog') +async def _cog_opt(inter: disnake.ApplicationCommandInteraction, current: str, what_do): + current = current.lower() + if what_do == 'enable': + _list = await cog_list('./cogs/disabled/') + else: + _list = await cog_list() + return [choice for choice in _list if current in choice.lower()] + + +@slash_cogs.error +async def cogs_error(inter, what_do): + await inter.response.send_message(f'Error', ephemeral=True) + logger.error(f'User {inter.author} tries to use cogs func\n{what_do}\n') + + +bot.run(os.getenv('TOKEN')) diff --git a/cogs/admin.py b/cogs/admin.py index 1e73b76..a947edc 100644 --- a/cogs/admin.py +++ b/cogs/admin.py @@ -1,174 +1,172 @@ -from asyncio import sleep -from typing import List - -import disnake -from disnake import Option, OptionType, OptionChoice - -from disnake.ext import commands, tasks - -from lib import read_json, write_json -from lib import fill_bd, prepare_db, work_with_db -from lib import logger - - -class Admin(commands.Cog, name='Admin'): - def __init__(self, bot): - self.bot = bot # a defining bot as global var in class - - @commands.Cog.listener() # this is a decorator for events/listeners - async def on_ready(self): - for g in self.bot.get_all_members(): - await prepare_db(g.guild.id) - for g in self.bot.get_all_members(): - await fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id) - - self.activity.start() - logger.info(f'Cog {__name__.split(".")[1]} is ready!.') - - @tasks.loop(seconds=20) - async def activity(self): - await self.bot.change_presence( - activity=disnake.Activity( - name=f'at users: {str(len(self.bot.users))}', - type=3 - ) - ) - await sleep(10) - await self.bot.change_presence( - activity=disnake.Activity( - name=f'at servers: {str(len(self.bot.guilds))}', - type=3 - ) - ) - - @commands.Cog.listener() - async def on_member_update(self, before: disnake.Member, after: disnake.Member): - sql_update_query = f"""UPDATE "{after.guild.id}" set nick = ? where userid = ?""" - data_tuple = (after.nick, before.id) - await work_with_db(sql_update_query, data_tuple) - - @commands.Cog.listener() - async def on_guild_join(self, guild): - for g in guild.members: - await fill_bd(g.name, g.id, g.bot, g.nick, guild.id) - - @commands.Cog.listener() - async def on_member_join(self, member): - await fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id) - - bot_role = read_json(member.guild.id, 'bot_role') # Get bot role - guest_role = read_json(member.guild.id, 'guest_role') # Get guest role - - if bot_role or guest_role: - if member.bot == 0: - role = disnake.utils.get(member.guild.roles, id=guest_role) - else: - role = disnake.utils.get(member.guild.roles, id=bot_role) - logger.info(f"Adding to {member} role {role}") - await member.add_roles(role) - - @commands.slash_command( - name="set_guest_role", - description="Set Default bot role", - options=[ - Option("role", "Specify role", OptionType.role, required=True), - ] - ) - @commands.has_permissions(administrator=True) - async def set_guest_role(self, inter, role): - await write_json(inter.guild.id, "guest_role", role.id) - await inter.response.send_message(f"Set up bot role to: `{role.name}`", ephemeral=True) - - @commands.command(name="set_prefix") - @commands.has_permissions(administrator=True) - async def command_set_prefix(self, ctx, prefix: str): - await write_json(ctx.guild.id, "prefix", prefix) - await ctx.reply(f"Prefix set to: `{prefix}`") - - @commands.guild_only() - @commands.slash_command( - name="set_prefix", - description="Setting up bot prefix", - options=[ - Option("prefix", "Specify prefix", OptionType.string, required=True), - ] - ) - @commands.has_permissions(administrator=True) - async def slash_set_prefix(self, inter, prefix: str): - await write_json(inter.guild.id, "prefix", prefix) - await inter.response.send_message(f"Prefix set to: `{prefix}`", ephemeral=True) - - @commands.guild_only() - @commands.slash_command( - name="set_trigger_role", - description="Setting up role to trigger bot", - options=[ - Option("role", "Specify role", OptionType.role, required=True), - ] - ) - @commands.has_permissions(administrator=True) - async def set_trigger_role(self, inter, role): - await write_json(inter.guild.id, "tigger_role", role.id) - await inter.response.send_message(f"Role to trigger set to : `{role.name}`", ephemeral=True) - - @commands.slash_command( - name="set_bot_role", - description="Set Default bot role", - options=[ - Option("role", "Specify role", OptionType.role, required=True), - ] - ) - @commands.guild_only() - @commands.has_permissions(administrator=True) - async def set_bot_role(self, ctx, role): - await write_json(ctx.guild.id, "bot_role", role.id) - await ctx.send(f"Set up bot role to: `{role.name}`", ephemeral=True) - - @set_bot_role.error - @set_trigger_role.error - @slash_set_prefix.error - async def set_prefix_error(self, inter, prefix): - await inter.response.send_message("You don`t have permissions", ephemeral=True) - - @commands.has_permissions(administrator=True) - @commands.slash_command( - name="set_time", - description="Read list of tracks for user", - options=[ - Option("seconds", "specify max duration", OptionType.integer, required=True), - ] - ) - async def set_time(self, - inter: disnake.ApplicationCommandInteraction, - seconds: commands.Range[5, 30]): - await write_json(inter.guild.id, "seconds", seconds) - await inter.response.send_message(f"Change max audio duration to {seconds} sec", ephemeral=True) - - @commands.has_permissions(administrator=True) - @commands.slash_command( - name="set_bot_channel", - description="Set channel which iterate with bot", - options=[ - Option("channel", "specify channel", OptionType.string, required=True), - ] - ) - async def set_bot_channel(self, inter, channel): - print(type(inter.guild.text_channels)) - await write_json(inter.guild.id, "channel", channel.id) - await inter.response.send_message(f"Channel set up to {channel.mention}", ephemeral=True) - - @set_bot_channel.autocomplete('channel') - async def _list_text_channels(self, - inter: disnake.ApplicationCommandInteraction, - current: str) -> List[OptionChoice]: - _list = [] - for _channel in inter.guild.text_channels: - _list.append(_channel) - return [ - OptionChoice(name=choice, value=choice) - for choice in _list if current in choice - ] - - -def setup(bot): # an extension must have a setup function - bot.add_cog(Admin(bot)) # adding a cog +from asyncio import sleep +from typing import List + +import disnake +from disnake import Option, OptionType, OptionChoice + +from disnake.ext import commands, tasks + +from lib import read_json, write_json +from lib import fill_bd, prepare_db, work_with_db +from lib import logger + + +class Admin(commands.Cog, name='Admin'): + def __init__(self, bot): + self.bot = bot # a defining bot as global var in class + + @commands.Cog.listener() # this is a decorator for events/listeners + async def on_ready(self): + for g in self.bot.get_all_members(): + await prepare_db(g.guild.id) + for g in self.bot.get_all_members(): + await fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id) + + self.activity.start() + logger.info(f'Cog {__name__.split(".")[1]} is ready!.') + + @tasks.loop(seconds=20) + async def activity(self): + await self.bot.change_presence( + activity=disnake.Activity( + name=f'at users: {str(len(self.bot.users))}', + type=3 + ) + ) + await sleep(10) + await self.bot.change_presence( + activity=disnake.Activity( + name=f'at servers: {str(len(self.bot.guilds))}', + type=3 + ) + ) + + @commands.Cog.listener() + async def on_member_update(self, before: disnake.Member, after: disnake.Member): + sql_update_query = f"""UPDATE "{after.guild.id}" set nick = ? where userid = ?""" + data_tuple = (after.nick, before.id) + await work_with_db(sql_update_query, data_tuple) + + @commands.Cog.listener() + async def on_guild_join(self, guild): + for g in guild.members: + await fill_bd(g.name, g.id, g.bot, g.nick, guild.id) + + @commands.Cog.listener() + async def on_member_join(self, member): + await fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id) + + bot_role = read_json(member.guild.id, 'bot_role') # Get bot role + guest_role = read_json(member.guild.id, 'guest_role') # Get guest role + + if bot_role or guest_role: + if member.bot == 0: + role = disnake.utils.get(member.guild.roles, id=guest_role) + else: + role = disnake.utils.get(member.guild.roles, id=bot_role) + logger.info(f"Adding to {member} role {role}") + await member.add_roles(role) + + @commands.slash_command( + name="set_guest_role", + description="Set Default bot role", + options=[ + Option("role", "Specify role", OptionType.role, required=True), + ] + ) + @commands.has_permissions(administrator=True) + async def set_guest_role(self, inter, role): + await write_json(inter.guild.id, "guest_role", role.id) + await inter.response.send_message(f"Set up bot role to: `{role.name}`", ephemeral=True) + + @commands.command(name="set_prefix") + @commands.has_permissions(administrator=True) + async def command_set_prefix(self, ctx, prefix: str): + await write_json(ctx.guild.id, "prefix", prefix) + await ctx.reply(f"Prefix set to: `{prefix}`") + + @commands.guild_only() + @commands.slash_command( + name="set_prefix", + description="Setting up bot prefix", + options=[ + Option("prefix", "Specify prefix", OptionType.string, required=True), + ] + ) + @commands.has_permissions(administrator=True) + async def slash_set_prefix(self, inter, prefix: str): + await write_json(inter.guild.id, "prefix", prefix) + await inter.response.send_message(f"Prefix set to: `{prefix}`", ephemeral=True) + + @commands.guild_only() + @commands.slash_command( + name="set_trigger_role", + description="Setting up role to trigger bot", + options=[ + Option("role", "Specify role", OptionType.role, required=True), + ] + ) + @commands.has_permissions(administrator=True) + async def set_trigger_role(self, inter, role): + await write_json(inter.guild.id, "tigger_role", role.id) + await inter.response.send_message(f"Role to trigger set to : `{role.name}`", ephemeral=True) + + @commands.slash_command( + name="set_bot_role", + description="Set Default bot role", + options=[ + Option("role", "Specify role", OptionType.role, required=True), + ] + ) + @commands.guild_only() + @commands.has_permissions(administrator=True) + async def set_bot_role(self, ctx, role): + await write_json(ctx.guild.id, "bot_role", role.id) + await ctx.send(f"Set up bot role to: `{role.name}`", ephemeral=True) + + @set_bot_role.error + @set_trigger_role.error + @slash_set_prefix.error + async def set_prefix_error(self, inter, prefix): + await inter.response.send_message("You don`t have permissions", ephemeral=True) + logger.error(prefix) + + @commands.has_permissions(administrator=True) + @commands.slash_command( + name="set_time", + description="Read list of tracks for user", + options=[ + Option("seconds", "specify max duration", OptionType.integer, required=True), + ] + ) + async def set_time(self, + inter: disnake.ApplicationCommandInteraction, + seconds: commands.Range[int, 5, 30]): + await write_json(inter.guild.id, "seconds", seconds) + await inter.response.send_message(f"Change max audio duration to {seconds} sec", ephemeral=True) + + @commands.has_permissions(administrator=True) + @commands.slash_command( + name="set_bot_channel", + description="Set channel which iterate with bot", + + ) + async def set_bot_channel(self, inter, channel: str): + await write_json(inter.guild.id, + "channel", + disnake.utils.find(lambda d: d.name == channel, inter.guild.channels).id) + await inter.response.send_message(f"Channel set up to {channel}", ephemeral=True) + + @set_bot_channel.autocomplete('channel') + async def _list_text_channels(self, + inter: disnake.ApplicationCommandInteraction, + current: str) -> List[OptionChoice]: + _list = [r.name for r in inter.guild.text_channels] + return [ + OptionChoice(name=choice, value=choice) + for choice in _list if current in choice + ] + + +def setup(bot): # an extension must have a setup function + bot.add_cog(Admin(bot)) # adding a cog diff --git a/cogs/audio.py b/cogs/audio.py index 87750e0..afa7725 100644 --- a/cogs/audio.py +++ b/cogs/audio.py @@ -1,109 +1,83 @@ -import random -from os import path, makedirs, rename, remove - -from disnake.ext import commands - -from lib import logger -from lib import determine_time -from lib import read_db, check_exist_audio, add_audio -from lib import play_audio - - -# todo: write chose audio from list by slash command -class Audio(commands.Cog, name='Audio'): - def __init__(self, bot): - self.bot = bot - - @commands.Cog.listener() - async def on_ready(self): - logger.info(f'Cog {__name__.split(".")[1]} is ready!.') - - # todo: complete check activity - @commands.Cog.listener() - async def on_voice_state_update(self, member, before, after): - if before.channel is None and not member.bot: - if any('Escape from Tarkov' in str(user.activity) for user in after.channel.members): - logger.info('Skip playing by Game') - else: - # Prepare list of audio - from lib.Comands import read_json - _role = await read_json(member.guild.id, 'tigger_role') - # Read audio from DB - audio_db = await read_db(member.guild.id, member.id, 'usertracks') - def_audio_db = await read_db(member.guild.id, member.id, 'defaulttracks') - if audio_db is not None: - audio_db = audio_db.split(', ') # Need to fix creating list - for i in range(len(audio_db)): - audio_db[i] = f'{member.id}/{audio_db[i]}' - if def_audio_db is not None: - def_audio_db = def_audio_db.split(', ') - from lib.Comands import list_files - def_audio_ls = await list_files() - - if def_audio_db or audio_db is not None: - if not def_audio_db: def_audio_db = [] - if not audio_db: audio_db = [] - logger.info(f'Play audio from DB') - full_audio = def_audio_db + audio_db - await play_audio(f'audio/{random.choice(full_audio)}', self.bot, after.channel) - elif len(member.roles) == 1 or _role is None: - logger.info(f'Skip playing by role') - elif any(str(role.id) in _role for role in member.roles): - logger.info(f'Play audio from list by role') - await play_audio(f'audio/{random.choice(def_audio_ls)}', self.bot, after.channel) - else: - logger.info(f'Skip playing by any else') - - @commands.command(name="upload_audio", - description=f"Add audio to bot") - async def upload_audio(self, ctx, user=None): - user = user or ctx.author - if ctx.author.guild_permissions.administrator or user is ctx.author: - if ctx.message.attachments: - from os import error - if not path.isdir(f'tmp/{user.id}'): - try: - makedirs(f'tmp/{user.id}') - except error as _error: - pass - - if not path.isdir(f'audio/{user.id}'): - try: - makedirs(f'audio/{user.id}') - except error as _error: - pass - for at in ctx.message.attachments: - import mimetypes - - await at.save(f'tmp/{user.id}/{at.filename}') - guess = mimetypes.guess_type(f'tmp/{user.id}/{at.filename}') - if guess[0].split('/')[0] == 'audio': - from pymediainfo import MediaInfo - file = f'tmp/{user.id}/{at.filename}' - duration = round(MediaInfo.parse(file).tracks[0].duration / 1000) - max_duration = int(determine_time(ctx)) - print(type(max_duration)) - if duration > max_duration: - await ctx.reply(f'Audio duration is {duration}, but max is {max_duration}') - remove(f'tmp/{user.id}/{at.filename}') - else: - a = await read_db(ctx.guild.id, user.id, 'usertracks') - if a: - audiolist = a + ", " + f'{at.filename}' - else: - audiolist = f'{at.filename}' - - await check_exist_audio(ctx, ctx.guild.id, user.id, 'usertracks', at.filename) - await add_audio(ctx.guild.id, user.id, audiolist) - rename(f'tmp/{user.id}/{at.filename}', f'audio/{user.id}/{at.filename}') - elif guess[0].split('/')[0] != 'audio': - await ctx.reply(f'It not audio {at.filename}\n it`s {guess[0]}') - remove(f'tmp/{user.id}/{at.filename}') - else: - await ctx.reply("Has no Attachment") - else: - await ctx.reply(f'You`re not admin. You can add audio only for your own account') - - -def setup(bot): # an extension must have a setup function - bot.add_cog(Audio(bot)) # adding a cog +import random +from itertools import islice +from typing import List + +import disnake +from disnake import OptionChoice, Option, OptionType +from disnake.ext import commands + +from lib import logger, ListGenerator +from lib import read_db +from lib import play_audio + + +# todo: write chose audio from list by slash command +class Audio(commands.Cog, name='Audio'): + def __init__(self, bot): + self.bot = bot + + @commands.Cog.listener() + async def on_ready(self): + logger.info(f'Cog {__name__.split(".")[1]} is ready!.') + + # todo: complete check activity + @commands.Cog.listener() + async def on_voice_state_update(self, member, before, after): + if before.channel is None and not member.bot: + if any('Escape from Tarkov' in str(user.activity) for user in after.channel.members): + logger.info('Skip playing by Game') + else: + # Prepare list of audio + from lib.Comands import read_json + _role = await read_json(member.guild.id, 'tigger_role') + # Read audio from DB + audio_db = await read_db(member.guild.id, member.id, 'defaulttracks') + + if audio_db is not None: + audio = audio_db.split(', ') + else: + from lib.Comands import list_files + audio = await list_files() + + if audio_db is not None: + logger.info(f'Play audio from DB') + await play_audio(f'audio/{random.choice(audio)}', self.bot, after.channel) + elif len(member.roles) == 1 or _role is None: + logger.info(f'Skip playing by role') + elif any(str(role.id) in _role for role in member.roles): + logger.info(f'Play audio from list by role') + await play_audio(f'audio/{random.choice(audio)}', self.bot, after.channel) + else: + logger.info(f'Skip playing by any else') + + @commands.slash_command(name="play_audio", + description="Make possible playing audio by command", + options=[ + Option(name="audio", + type=OptionType.string, + required=True + ) + ]) + async def playaudio(self, inter: disnake.ApplicationCommandInteraction, + audio: str + ): + if inter.author.voice is not None: + await inter.response.send_message(f'Played {audio}', ephemeral=True) + await play_audio(audio, self.bot, inter.author.voice.channel) + else: + await inter.response.send_message('You`re not in voice', ephemeral=True) + + @playaudio.autocomplete('audio') + async def list_to_play(self, inter: disnake.ApplicationCommandInteraction, current: str): + current = current.lower() + _dict: dict = {} + for f in ListGenerator('audio'): + _dict[f.name] = f'{f.path}/{f.name}' + return [ + OptionChoice(name=choice, value=f'{_dict[choice]}') + for choice in _dict if current in choice.lower() + ] + + +def setup(bot): # an extension must have a setup function + bot.add_cog(Audio(bot)) # adding a cog diff --git a/cogs/funny.py b/cogs/disabled/funny.py similarity index 96% rename from cogs/funny.py rename to cogs/disabled/funny.py index c7bf06c..f4b2a03 100644 --- a/cogs/funny.py +++ b/cogs/disabled/funny.py @@ -1,15 +1,15 @@ -from disnake.ext import commands -from lib import logger - - -class Fun(commands.Cog, name='Fun'): - def __init__(self, bot): - self.bot = bot # defining bot as global var in class - - @commands.Cog.listener() # this is a decorator for events/listeners - async def on_ready(self): - logger.info(f'Cog {__name__.split(".")[1]} is ready!.') - - -def setup(bot): # an extension must have a setup function - bot.add_cog(Fun(bot)) # adding a cog +from disnake.ext import commands +from lib import logger + + +class Fun(commands.Cog, name='Fun'): + def __init__(self, bot): + self.bot = bot # defining bot as global var in class + + @commands.Cog.listener() # this is a decorator for events/listeners + async def on_ready(self): + logger.info(f'Cog {__name__.split(".")[1]} is ready!.') + + +def setup(bot): # an extension must have a setup function + bot.add_cog(Fun(bot)) # adding a cog diff --git a/cogs/disabled/test.py b/cogs/disabled/test.py new file mode 100644 index 0000000..0895ce9 --- /dev/null +++ b/cogs/disabled/test.py @@ -0,0 +1,33 @@ +from typing import List + +import disnake +from disnake import OptionChoice +from disnake.ext import commands + +from lib import logger + + +class Testing(commands.Cog, name='Testing'): + def __init__(self, bot): + self.bot = bot # defining bot as global var in class + + @commands.Cog.listener() # this is a decorator for events/listeners + async def on_ready(self): + logger.info(f'Cog {__name__.split(".")[1]} is ready!.') + + @commands.slash_command( + name='select_audio', + description='Select Audios from List' + ) + async def select_audio(self, inter, audios: str): + pass + + @select_audio.autocomplete('audios') + async def _list_audios(self, + inter: disnake.ApplicationCommandInteraction, + current: str) -> List[OptionChoice]: + pass + + +def setup(bot): # an extension must have a setup function + bot.add_cog(Testing(bot)) # adding a cog diff --git a/cogs/general.py b/cogs/general.py index 2c97801..ca6bfdf 100644 --- a/cogs/general.py +++ b/cogs/general.py @@ -1,69 +1,61 @@ - -import disnake -from disnake import Option, OptionType, Colour -from disnake.ext import commands - -from lib import DB_Reader -from lib import logger - - -class General(commands.Cog): - def __init__(self, bot): - self.bot = bot # defining bot as global var in class - - @commands.Cog.listener() # this is a decorator for events/listeners - async def on_ready(self): - logger.info(f'Cog {__name__.split(".")[1]} is ready!.') - - @commands.slash_command( - name="info", - description="Read list of tracks for user", - options=[ - Option("user", "Specify any user", OptionType.user), - ] - ) - async def info(self, inter, user=None): - user_audio = None - default_audio = None - user = user or inter.author - _user = DB_Reader(inter.guild.id) - for r in _user: - if r.userid == user.id: - user_audio = r.usertracks - default_audio = r.defaulttracks - - rolelist = [r.mention for r in user.roles if r != inter.guild.default_role] - if rolelist: - roles = "\n".join(rolelist) - else: - roles = "Not added any role" - if user_audio: - audios = "• " + "\n• ".join(sorted(user_audio.split(", "))) - else: - audios = "Not selected audio" - - if default_audio: - audios2 = "• " + "\n• ".join(sorted(default_audio.split(", "))) - else: - audios2 = "Not selected audio" - - emb = disnake.Embed( - title=f"General information", - description=f"General information on server about {user}", - color=Colour.random() - ) - emb.set_thumbnail(url=user.display_avatar) - emb.add_field(name="General info", - value=f"Username: {user}\n" - f"Nickname: {user.nick}\n" - f"Joined at: {user.joined_at.strftime('%A, %B %d %Y @ %H:%M:%S')}", inline=False) - emb.add_field(name="User audio list", value=f"{audios}", inline=True) - emb.add_field(name="Default audio list", value=f"{audios2}", inline=True) - emb.add_field(name="Roles list", value=f"{roles}", inline=True) - emb.set_footer(text="Information requested by: {}".format(inter.author.display_name)) - - await inter.response.send_message(embed=emb, ephemeral=True) - - -def setup(bot): # an extension must have a setup function - bot.add_cog(General(bot)) # adding a cog +import disnake +from disnake import Option, OptionType, Colour +from disnake.ext import commands + +from lib import DBReader +from lib import logger + + +class General(commands.Cog): + def __init__(self, bot): + self.bot = bot # defining bot as global var in class + + @commands.Cog.listener() # this is a decorator for events/listeners + async def on_ready(self): + logger.info(f'Cog {__name__.split(".")[1]} is ready!.') + + @commands.slash_command( + name="info", + description="Read list of tracks for user", + options=[ + Option("user", "Specify any user", OptionType.user), + ] + ) + async def info(self, inter, user=None): + audio = None + user = user or inter.author + _user = DBReader(inter.guild.id) + for r in _user: + if r.userid == user.id: + audio = r.defaulttracks + + rolelist = [r.mention for r in user.roles if r != inter.guild.default_role] + if rolelist: + roles = "\n".join(rolelist) + else: + roles = "Not added any role" + + if audio: + audios = "• " + "\n• ".join(sorted(audio.split(", "))) + else: + audios = "Not selected audio" + + emb = disnake.Embed( + title=f"General information", + description=f"General information on server about {user}", + color=Colour.random() + ) + emb.set_thumbnail(url=user.display_avatar) + emb.add_field(name="General info", + value=f"Username: {user}\n" + f"Nickname: {user.nick}\n" + f"Joined at: {user.joined_at.strftime('%A, %B %d %Y @ %H:%M:%S')}", inline=False) + emb.add_field(name="Default audio list", value=f"{audios}", inline=True) + emb.add_field(name="Roles list", value=f"{roles}", inline=True) + emb.set_footer(text="Information requested by: {}".format(inter.author.display_name)) + + await inter.response.send_message(embed=emb, ephemeral=True) + + +def setup(bot): # an extension must have a setup function + bot.add_cog(General(bot)) # adding a cog diff --git a/cogs/info.py b/cogs/info.py index 88b86e9..f51b200 100644 --- a/cogs/info.py +++ b/cogs/info.py @@ -1,44 +1,44 @@ -import os - -import disnake -import psutil -from disnake.ext import commands - -from __init__ import version_info as ver -from lib import determine_prefix, determine_time -from lib import logger - - -class BotInfo(commands.Cog, name='Bot Info'): - def __init__(self, bot): - self.bot = bot # defining bot as global var in class - - @commands.Cog.listener() # this is a decorator for events/listeners - async def on_ready(self): - logger.info(f'Cog {__name__.split(".")[1]} is ready!.') - - @commands.slash_command(name="info_bot", - description='Shows general info about bot') # this is for making a command - async def info_bot(self, inter): - _pid = os.getpid() - _process = psutil.Process(_pid) - emb = disnake.Embed( - title=f"General information", - description=f"General information on about bot", - ) - emb.set_thumbnail(self.bot.user.avatar.url) - emb.add_field(name="System info:", value=f"Memory Usage: {round(_process.memory_info().rss / 2 ** 20, 2)} Mb\n" - f"CPU Usage: {_process.cpu_percent()}%\n" - f'Bot ping: {round(self.bot.latency * 1000)}\n' - f'Prefix: `{determine_prefix(self.bot, inter)}\n`' - f"Max audio duration: {determine_time(inter)} sec\n" - ) - emb.add_field(name="Bot info:", value=f"Bot owner: <@{self.bot.owner_id}>\n" - f"Bot version: {ver.major}.{ver.minor}.{ver.micro}-{ver.releaselevel}") - emb.set_footer(text="Information requested by: {}".format(inter.author.display_name)) - - await inter.response.send_message(embed=emb, ephemeral=True) - - -def setup(bot): # an extension must have a setup function - bot.add_cog(BotInfo(bot)) # adding a cog +import os + +import disnake +import psutil +from disnake.ext import commands + +from __init__ import version_info as ver +from lib import determine_prefix, determine_time +from lib import logger + + +class BotInfo(commands.Cog, name='Bot Info'): + def __init__(self, bot): + self.bot = bot # defining bot as global var in class + + @commands.Cog.listener() # this is a decorator for events/listeners + async def on_ready(self): + logger.info(f'Cog {__name__.split(".")[1]} is ready!.') + + @commands.slash_command(name="info_bot", + description='Shows general info about bot') # this is for making a command + async def info_bot(self, inter): + _pid = os.getpid() + _process = psutil.Process(_pid) + emb = disnake.Embed( + title=f"General information", + description=f"General information on about bot", + ) + emb.set_thumbnail(self.bot.user.avatar.url) + emb.add_field(name="System info:", value=f"Memory Usage: {round(_process.memory_info().rss / 2 ** 20, 2)} Mb\n" + f"CPU Usage: {_process.cpu_percent()}%\n" + f'Bot ping: {round(self.bot.latency * 1000)}\n' + f'Prefix: `{determine_prefix(self.bot, inter)}\n`' + f"Max audio duration: {determine_time(inter)} sec\n" + ) + emb.add_field(name="Bot info:", value=f"Bot owner: <@386629192743256065>\n" + f"Bot version: {ver.major}.{ver.minor}.{ver.micro}-{ver.releaselevel}") + emb.set_footer(text="Information requested by: {}".format(inter.author.display_name)) + + await inter.response.send_message(embed=emb, ephemeral=True) + + +def setup(bot): # an extension must have a setup function + bot.add_cog(BotInfo(bot)) # adding a cog diff --git a/cogs/test.py b/cogs/test.py deleted file mode 100644 index 30374c3..0000000 --- a/cogs/test.py +++ /dev/null @@ -1,67 +0,0 @@ -from typing import List - -import disnake -from disnake import Option, OptionType, OptionChoice -from disnake.ext import commands - -from lib import ListGenerator -from lib import logger -from lib import play_audio - - -class Testing(commands.Cog, name='Testing'): - def __init__(self, bot): - self.bot = bot # defining bot as global var in class - - @commands.Cog.listener() # this is a decorator for events/listeners - async def on_ready(self): - logger.info(f'Cog {__name__.split(".")[1]} is ready!.') - - @commands.slash_command(name="play_audio", - description="Make possible playing audio by command", - options=[ - Option(name="audio", - type=OptionType.string, - required=True - ) - ]) - async def playaudio(self, inter: disnake.ApplicationCommandInteraction, - audio: str - ): - if inter.author.voice is not None: - await inter.response.send_message(f'Played {audio}', ephemeral=True) - await play_audio(audio, self.bot, inter.author.voice.channel) - else: - await inter.response.send_message('You`re not in voice', ephemeral=True) - - @playaudio.autocomplete('audio') - async def list_to_play(self, inter: disnake.ApplicationCommandInteraction, current: str) -> List[OptionChoice]: - _def_iter = ListGenerator('audio') - _def_dict: dict = {} - for f in _def_iter: - _def_dict[f.name] = f'{f.path}/{f.name}' - - _user_dict: dict = {} - try: - _user_iter = ListGenerator(f'audio/{inter.author.id}') - for f in _user_iter: - _user_dict[f.name] = f'{f.path}/{f.name}' - - # user_dict = [] - # for _track in user_list: - - except IndexError: - pass - - _dict = {} - _dict.update(_def_dict) - _dict.update(_user_dict) - return [ - OptionChoice(name=choice, value=f'{_dict[choice]}') - for choice in _dict if current.lower() in choice.lower() - - ] - - -def setup(bot): # an extension must have a setup function - bot.add_cog(Testing(bot)) # adding a cog diff --git a/lib/CogsPrep.py b/lib/CogsPrep.py index b66cb4c..635fd6e 100644 --- a/lib/CogsPrep.py +++ b/lib/CogsPrep.py @@ -1,57 +1,57 @@ -""" -lib.CogsPrepare -~~~~~~~~~~~~~ -Loads, unloads Cogs files -cog_list: return list of cog filenames -work_with_cogs: loads, reloads and unloads cogs files - -""" -import os -import traceback -from os import listdir -from typing import List - -from disnake.ext import commands -from .Logger import logger - - -async def cog_list(fold='./cogs') -> List[str]: - cogs_list = [] - for _filename in listdir(fold): - if _filename.endswith('.py'): - cogs_list.append(_filename[:-3]) - return cogs_list - - -async def work_with_cogs(what_do, bot, cog): - if isinstance(cog, str): - cog = cog.split() - for _filename in cog: - if what_do == "load": - try: - bot.load_extension(f'cogs.{_filename}') - logger.info(f'Loaded cog {_filename}') - - except commands.ExtensionNotFound: - logger.error(f"Error: {_filename} couldn't be find to load.") - - except commands.ExtensionFailed as error: - logger.error(f'Error: {_filename} failed to load properly.\n\t{error}\n\n{traceback.format_exc()}') - - except commands.ExtensionError: - logger.error(f'Error: unknown error with {_filename}') - - elif what_do == 'unload': - bot.unload_extension(f'cogs.{_filename}') - logger.info(f'Cog {_filename} unloaded') - elif what_do == 'reload': - bot.reload_extension(f'cogs.{_filename}') - logger.info(f'Cog {_filename} reloaded') - elif what_do == 'disable': - bot.unload_extension(f'cogs.{_filename}') - os.rename(f'cogs/{_filename}.py', f'cogs/disabled/{_filename}.py') - logger.info(f'Cog {_filename} stopped and disabled') - elif what_do == 'enable': - os.rename(f'cogs/disabled/{_filename}.py', f'cogs/{_filename}.py') - bot.load_extension(f'cogs.{_filename}') - logger.info(f'Cog {_filename} started and enabled') +""" +lib.CogsPrepare +~~~~~~~~~~~~~ +Loads, unloads Cogs files +cog_list: return list of cog filenames +work_with_cogs: loads, reloads and unloads cogs files + +""" +import os +import traceback +from os import listdir +from typing import List + +from disnake.ext import commands +from .Logger import logger + + +async def cog_list(fold='./cogs') -> List[str]: + cogs_list = [] + for _filename in listdir(fold): + if _filename.endswith('.py'): + cogs_list.append(_filename[:-3]) + return cogs_list + + +async def work_with_cogs(what_do, bot, cog): + if isinstance(cog, str): + cog = cog.split() + for _filename in cog: + if what_do == "load": + try: + bot.load_extension(f'cogs.{_filename}') + logger.info(f'Loaded cog {_filename}') + + except commands.ExtensionNotFound: + logger.error(f"Error: {_filename} couldn't be find to load.") + + except commands.ExtensionFailed as error: + logger.error(f'Error: {_filename} failed to load properly.\n\t{error}\n\n{traceback.format_exc()}') + + except commands.ExtensionError: + logger.error(f'Error: unknown error with {_filename}') + + elif what_do == 'unload': + bot.unload_extension(f'cogs.{_filename}') + logger.info(f'Cog {_filename} unloaded') + elif what_do == 'reload': + bot.reload_extension(f'cogs.{_filename}') + logger.info(f'Cog {_filename} reloaded') + elif what_do == 'disable': + bot.unload_extension(f'cogs.{_filename}') + os.rename(f'cogs/{_filename}.py', f'cogs/disabled/{_filename}.py') + logger.info(f'Cog {_filename} stopped and disabled') + elif what_do == 'enable': + os.rename(f'cogs/disabled/{_filename}.py', f'cogs/{_filename}.py') + bot.load_extension(f'cogs.{_filename}') + logger.info(f'Cog {_filename} started and enabled') diff --git a/lib/Comands.py b/lib/Comands.py index fc6b020..026aae2 100644 --- a/lib/Comands.py +++ b/lib/Comands.py @@ -1,113 +1,113 @@ -""" -lib.Commands -~~~~~~~~~~~~~~ -Some prepare for commands -""" -import json -import os -import dotenv - - -def preload_checks(): - dotenv.load_dotenv() - if not os.path.isfile('.env') or not os.getenv('CONF_FILE'): - with open('.env', 'a', encoding='utf-8') as f: - f.write("CONF_FILE='config.json'\n") - dotenv.load_dotenv() - if not os.path.isfile(os.getenv('CONF_FILE')): - with open(os.getenv('CONF_FILE'), 'a', encoding='utf-8') as f: - f.write("") - - -async def list_files(fold: str = 'audio'): - if fold != 'audio': fold = f'audio/{fold}' - fl = [] - for filenames in os.walk(fold): - fl.extend(filenames) - break - files = {} - for x in fl[2]: - files[x] = x - return fl[2] - - -async def read_json(guild: int, _param: str): - """ - Reads Json file to determite config strings - :param guild: ID of Guild - :param _param: Parameter in json file - :return: value of parameter. - """ - parameter = None - with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f: # Open the JSON - try: - _json = json.load(f) # Load the custom prefixes - except json.decoder.JSONDecodeError: - _json = {} - if guild: # If the guild exists - try: - guild_conf = _json[f"{guild}"] - try: - parameter = guild_conf[f"{_param}"] - except KeyError: - pass - except KeyError: - pass - return parameter - - -async def write_json(guild: int, param_name: str, param: str or int): - with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f: - try: - _json = json.load(f) - except json.decoder.JSONDecodeError: - _json = {} - try: - _guild = _json[f'{guild}'] - except KeyError: - _json.update({f'{guild}': {}}) - _guild = _json[f'{guild}'] - _guild.update({f'{param_name}': f'{param}'}) - with open(os.getenv('CONF_FILE'), 'w', encoding='utf-8') as f: - json.dump(_json, f, indent=4) - - -def determine_prefix(bot, msg): - """ - Determite per-server bot prefix - :param bot: Disnake Bot object - :param msg: Disnake msg object - :return: prefix for server, default is $ - """ - parameter = '$' - with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f: # Open the JSON - try: - _json = json.load(f) # Load the custom prefixes - except json.JSONDecodeError: - _json = {} - try: - parameter = _json[f"{msg.guild.id}"]["prefix"] # Read prefix from json if is setted up - - except KeyError: - pass - return parameter - - -def determine_time(msg): - """ - Determite per-server bot prefix - :param msg: Disnake msg object - :return: prefix for server, default is $ - """ - parameter = 15 - with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f: # Open the JSON - try: - _json = json.load(f) # Load the custom prefixes - except json.JSONDecodeError: - _json = {} - try: - parameter = _json[f"{msg.guild.id}"]["seconds"] # Read prefix from json if is setted up - - except KeyError: - pass - return parameter +""" +lib.Commands +~~~~~~~~~~~~~~ +Some prepare for commands +""" +import json +import os +import dotenv + + +def preload_checks(): + dotenv.load_dotenv() + if not os.path.isfile('.env') or not os.getenv('CONF_FILE'): + with open('.env', 'a', encoding='utf-8') as f: + f.write("CONF_FILE='config.json'\n") + dotenv.load_dotenv() + if not os.path.isfile(os.getenv('CONF_FILE')): + with open(os.getenv('CONF_FILE'), 'a', encoding='utf-8') as f: + f.write("") + + +async def list_files(fold: str = 'audio'): + if fold != 'audio': fold = f'audio/{fold}' + fl = [] + for filenames in os.walk(fold): + fl.extend(filenames) + break + files = {} + for x in fl[2]: + files[x] = x + return fl[2] + + +async def read_json(guild: int, _param: str): + """ + Reads Json file to determite config strings + :param guild: ID of Guild + :param _param: Parameter in json file + :return: value of parameter. + """ + parameter = None + with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f: # Open the JSON + try: + _json = json.load(f) # Load the custom prefixes + except json.decoder.JSONDecodeError: + _json = {} + if guild: # If the guild exists + try: + guild_conf = _json[f"{guild}"] + try: + parameter = guild_conf[f"{_param}"] + except KeyError: + pass + except KeyError: + pass + return parameter + + +async def write_json(guild: int, param_name: str, param: str or int): + with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f: + try: + _json = json.load(f) + except json.decoder.JSONDecodeError: + _json = {} + try: + _guild = _json[f'{guild}'] + except KeyError: + _json.update({f'{guild}': {}}) + _guild = _json[f'{guild}'] + _guild.update({f'{param_name}': f'{param}'}) + with open(os.getenv('CONF_FILE'), 'w', encoding='utf-8') as f: + json.dump(_json, f, indent=4) + + +def determine_prefix(bot, msg): + """ + Determite per-server bot prefix + :param bot: Disnake Bot object + :param msg: Disnake msg object + :return: prefix for server, default is $ + """ + parameter = '$' + with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f: # Open the JSON + try: + _json = json.load(f) # Load the custom prefixes + except json.JSONDecodeError: + _json = {} + try: + parameter = _json[f"{msg.guild.id}"]["prefix"] # Read prefix from json if is setted up + + except KeyError: + pass + return parameter + + +def determine_time(msg): + """ + Determite per-server bot prefix + :param msg: Disnake msg object + :return: prefix for server, default is $ + """ + parameter = 15 + with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f: # Open the JSON + try: + _json = json.load(f) # Load the custom prefixes + except json.JSONDecodeError: + _json = {} + try: + parameter = _json[f"{msg.guild.id}"]["seconds"] # Read prefix from json if is setted up + + except KeyError: + pass + return parameter diff --git a/lib/DB_Worker.py b/lib/DB_Worker.py index 798514e..d46e849 100644 --- a/lib/DB_Worker.py +++ b/lib/DB_Worker.py @@ -1,165 +1,165 @@ -import sqlite3 - - -from lib import logger - - -class DB_Reader: - - def __init__(self, guildid: int = None): - self._guildid = guildid - self.list = self._read_db(self._guildid) - self._current_index = 0 - - def __str__(self) -> str: - return str(self._guildid) - - @classmethod - def _read_db(cls, guildid: int) -> list: - try: - sql_con = sqlite3.connect("user.db") - cursor = sql_con.cursor() - cursor.execute(f"""SELECT * FROM "{guildid}" """) - record = cursor.fetchall() - return record - except sqlite3.Error as _e: - logger.info(f'Error reading DB\n{_e}') - - def __iter__(self): - return _ListGenerationIter(self) - - -class _DBAttrs: - def __init__(self, - userid: int, - username: str, - nick: str, - isbot: bool, - defaulttracks: None or list, - usertracks: None or list): - self.userid = userid - self.username = username - self.nick = nick - self.isbot = isbot - self.defaulttracks = defaulttracks - self.usertracks = usertracks - - def __str__(self): - return self.username - - def __repr__(self): - return f'' - - -class _ListGenerationIter: - def __init__(self, user_class): - self._current_index = 0 - self._list = user_class.list - - self._size = len(self._list) - - def __iter__(self): - return self - - def __next__(self): - if self._current_index < self._size: - _userid = self._list[self._current_index][0] - _username = self._list[self._current_index][1] - _nick = self._list[self._current_index][2] - _isbot = bool(self._list[self._current_index][3]) - _defaulttracks = self._list[self._current_index][4] - _usertracks = self._list[self._current_index][5] - - self._current_index += 1 - memb = _DBAttrs(_userid, - _username, - _nick, - _isbot, - _defaulttracks, - _usertracks) - return memb - raise StopIteration - - -async def prepare_db(guild: int): - try: - connect = sqlite3.connect('user.db') - cursor = connect.cursor() - create_table = (f'''CREATE TABLE IF NOT EXISTS "{guild}" ([userid] INTEGER PRIMARY KEY, [username] TEXT, - [nick] TEXT, [isbot] BOOL, [defaulttracks] TEXT, [usertracks] TEXT) ''') - cursor.execute(create_table) - cursor.close() - except sqlite3.Error as _error: - pass - - -async def work_with_db(db_func: str, data_turple: tuple): - """ - Writing to db per server userinfo - :param db_func: - :param data_turple: - """ - try: - connect = sqlite3.connect('user.db') - cursor = connect.cursor() - cursor.execute(db_func, data_turple) - connect.commit() - cursor.close() - except sqlite3.Error as _error: - pass - - -async def fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int): - sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}" - (username, userid, nick, isbot) - VALUES (?, ?, ?, ?)""") - data_tuple: tuple[str, int, str, bool] = (name, userid, nick, isbot) - await work_with_db(sqlite_insert_with_param, data_tuple) - - -async def add_audio(guild: int, user: int, audio: str, track: str = 'usertracks'): - """ - Adding audio into а folder and DB - :param guild: Guild id - :param user: - :param audio: - :param track: usertracks or defaulttracks. - """ - # audio = f'{DB.read_db(guild, user, track)}, {audio}' - sql_update_query = f"""UPDATE "{guild}" set {track} = ? where userid = ?""" - data_tuple = (audio, user) - await work_with_db(sql_update_query, data_tuple) - - -async def read_db(guild: int, user: int, column: str): - _col_dict = {'userid': 0, - 'username': 1, - 'nick': 2, - 'isbot': 3, - 'defaulttracks': 4, - 'usertracks': 5} - try: - sql_con = sqlite3.connect("user.db") - cursor = sql_con.cursor() - sql_read = f"""SELECT * FROM "{guild}" where userid = {user}""" - cursor.execute(sql_read) - record = cursor.fetchone() - return record[_col_dict[column]] - except sqlite3.Error as _error: - pass - - -async def check_exist_audio(ctx, guild: int, user: int, column: str, audio: str): - _list_str = await read_db(guild, user, column) - print(type(_list_str)) - if _list_str is not None: - _list = _list_str.split(',') - if audio in _list: - await ctx.reply("File in list") - - else: - pass - - else: - _list = 'None' +import sqlite3 + + +from lib import logger + + +class DBReader: + + def __init__(self, guildid: int = None): + self._guildid = guildid + self.list = self._read_db(self._guildid) + self._current_index = 0 + + def __str__(self) -> str: + return str(self._guildid) + + @classmethod + def _read_db(cls, guildid: int) -> list: + try: + sql_con = sqlite3.connect("user.db") + cursor = sql_con.cursor() + cursor.execute(f"""SELECT * FROM "{guildid}" """) + record = cursor.fetchall() + return record + except sqlite3.Error as _e: + logger.info(f'Error reading DB\n{_e}') + + def __iter__(self): + return _ListGenerationIter(self) + + +class _DBAttrs: + def __init__(self, + userid: int, + username: str, + nick: str, + isbot: bool, + defaulttracks: None or list, + usertracks: None or list): + self.userid = userid + self.username = username + self.nick = nick + self.isbot = isbot + self.defaulttracks = defaulttracks + self.usertracks = usertracks + + def __str__(self): + return self.username + + def __repr__(self): + return f'' + + +class _ListGenerationIter: + def __init__(self, user_class): + self._current_index = 0 + self._list = user_class.list + + self._size = len(self._list) + + def __iter__(self): + return self + + def __next__(self): + if self._current_index < self._size: + _userid = self._list[self._current_index][0] + _username = self._list[self._current_index][1] + _nick = self._list[self._current_index][2] + _isbot = bool(self._list[self._current_index][3]) + _defaulttracks = self._list[self._current_index][4] + _usertracks = self._list[self._current_index][5] + + self._current_index += 1 + memb = _DBAttrs(_userid, + _username, + _nick, + _isbot, + _defaulttracks, + _usertracks) + return memb + raise StopIteration + + +async def prepare_db(guild: int): + try: + connect = sqlite3.connect('user.db') + cursor = connect.cursor() + create_table = (f'''CREATE TABLE IF NOT EXISTS "{guild}" ([userid] INTEGER PRIMARY KEY, [username] TEXT, + [nick] TEXT, [isbot] BOOL, [defaulttracks] TEXT, [usertracks] TEXT) ''') + cursor.execute(create_table) + cursor.close() + except sqlite3.Error as _error: + pass + + +async def work_with_db(db_func: str, data_turple: tuple): + """ + Writing to db per server userinfo + :param db_func: + :param data_turple: + """ + try: + connect = sqlite3.connect('user.db') + cursor = connect.cursor() + cursor.execute(db_func, data_turple) + connect.commit() + cursor.close() + except sqlite3.Error as _error: + pass + + +async def fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int): + sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}" + (username, userid, nick, isbot) + VALUES (?, ?, ?, ?)""") + data_tuple: tuple[str, int, str, bool] = (name, userid, nick, isbot) + await work_with_db(sqlite_insert_with_param, data_tuple) + + +async def add_audio(guild: int, user: int, audio: str, track: str = 'usertracks'): + """ + Adding audio into а folder and DB + :param guild: Guild id + :param user: + :param audio: + :param track: usertracks or defaulttracks. + """ + # audio = f'{DB.read_db(guild, user, track)}, {audio}' + sql_update_query = f"""UPDATE "{guild}" set {track} = ? where userid = ?""" + data_tuple = (audio, user) + await work_with_db(sql_update_query, data_tuple) + + +async def read_db(guild: int, user: int, column: str): + _col_dict = {'userid': 0, + 'username': 1, + 'nick': 2, + 'isbot': 3, + 'defaulttracks': 4, + 'usertracks': 5} + try: + sql_con = sqlite3.connect("user.db") + cursor = sql_con.cursor() + sql_read = f"""SELECT * FROM "{guild}" where userid = {user}""" + cursor.execute(sql_read) + record = cursor.fetchone() + return record[_col_dict[column]] + except sqlite3.Error as _error: + pass + + +async def check_exist_audio(ctx, guild: int, user: int, column: str, audio: str): + _list_str = await read_db(guild, user, column) + print(type(_list_str)) + if _list_str is not None: + _list = _list_str.split(',') + if audio in _list: + await ctx.reply("File in list") + + else: + pass + + else: + _list = 'None' diff --git a/lib/ListGenerator.py b/lib/ListGenerator.py index a91123f..0182530 100644 --- a/lib/ListGenerator.py +++ b/lib/ListGenerator.py @@ -1,87 +1,86 @@ -import mimetypes -import os -from typing import Tuple, Optional - - -class ListGenerator: - def __init__(self, path: str = None): - self.path = path - self.list: list = self._lister(self.path) - try: - self._size = len(self.list) - except TypeError: - pass - self._current_index = 0 - - def __str__(self) -> str: - return 'Audio iter generator' - - @classmethod - def _lister(cls, path) -> list: - _list: list = [] - try: - for f in os.walk(path): - _list.extend(f) - break - return sorted(_list[2]) - except TypeError: - pass - - def __iter__(self): - return _ListGenerationIter(self) - - def __next__(self): - if self._current_index < self._size: - self._current_index += 1 - - -class _FileAttrs: - def __init__(self, name, path, type, exc): - self.name = name - self.path = path - self.mimetype = type - self.exc = exc - - def __str__(self): - return self.name - - def __repr__(self): - return f' >' - - -class _ListGenerationIter: - - def __init__(self, list_class): - self._current_index = 0 - self._list = list_class.list - self._name = self._list[self._current_index] - self._path = list_class.path - - self._size = len(self._list) - - def __iter__(self): - return self - - @property - def type(self) -> tuple[Optional[str], Optional[str]]: - guess = mimetypes.guess_type(f'{self._path}/{self._list[self._current_index]}')[0] - return guess - - @property - def _exc(self) -> str: - try: - _ret = self._list[self._current_index].split('.')[-1] - except AttributeError: - _ret = None - return _ret - - def __next__(self): - if self._current_index < self._size: - _name = self._list[self._current_index] - _path = self._path - _type = self.type - _exc = self._exc - self._current_index += 1 - memb = _FileAttrs(_name, _path, _type, _exc) - return memb - raise StopIteration +import mimetypes +import os + + +class ListGenerator: + def __init__(self, path: str = None): + self.path = path + self.list: list = self._lister(self.path) + try: + self._size = len(self.list) + except TypeError: + pass + self._current_index = 0 + + def __str__(self) -> str: + return 'Audio iter generator' + + @classmethod + def _lister(cls, path) -> list: + _list: list = [] + try: + for f in os.walk(path): + _list.extend(f) + break + return sorted(_list[2]) + except TypeError: + pass + + def __iter__(self): + return _ListGenerationIter(self) + + def __next__(self): + if self._current_index < self._size: + self._current_index += 1 + + +class _FileAttrs: + def __init__(self, name, path, mimetype, exc): + self.name = name + self.path = path + self.mimetype = mimetype + self.exc = exc + + def __str__(self): + return self.name + + def __repr__(self): + return f' >' + + +class _ListGenerationIter: + + def __init__(self, list_class): + self._current_index = 0 + self._list = list_class.list + self._name = self._list[self._current_index] + self._path = list_class.path + + self._size = len(self._list) + + def __iter__(self): + return self + + @property + def type(self) -> str: + guess = mimetypes.guess_type(f'{self._path}/{self._list[self._current_index]}')[0] + return guess + + @property + def _exc(self) -> str: + try: + _ret = self._list[self._current_index].split('.')[-1] + except AttributeError: + _ret = None + return _ret + + def __next__(self): + if self._current_index < self._size: + _name = self._list[self._current_index] + _path = self._path + _type = self.type + _exc = self._exc + self._current_index += 1 + _memb = _FileAttrs(_name, _path, _type, _exc) + return _memb + raise StopIteration diff --git a/lib/Logger.py b/lib/Logger.py index a117046..9f7f127 100644 --- a/lib/Logger.py +++ b/lib/Logger.py @@ -1,52 +1,52 @@ -import logging - - -class _CustomFormatter(logging.Formatter): - grey = "\x1b[38;20m" - yellow = "\x1b[33;20m" - red = "\x1b[31;20m" - bold_red = "\x1b[31;1m" - reset = "\x1b[0m" - format = f"%(asctime)s - [%(levelname)s] -%(module)s- %(message)s" - - FORMATS = { - logging.DEBUG: grey + format + reset, - logging.INFO: grey + format + reset, - logging.WARNING: yellow + format + reset, - logging.ERROR: red + format + reset, - logging.CRITICAL: bold_red + format + reset - } - - old_factory = logging.getLogRecordFactory() - - def _record_factory(*args, **kwargs): - record = _CustomFormatter.old_factory(*args, **kwargs) - _module = record.module - _levelname = record.levelname - if len(_module) % 2 == 0 and len(_module) < 12: - _module = ' ' * ((12 - len(_module)) // 2) + _module + ' ' * ((12 - len(_module)) // 2) - elif len(_module) % 2 == 1 and len(_module) < 12: - _module = ' ' * ((12 - len(_module)) // 2) + record.module + ' ' * ((12 - len(_module)) // 2 + 1) - if len(_levelname) % 2 == 0 and len(_levelname) < 8: - _levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2) - elif len(record.levelname) % 2 == 1 and len(record.module) < 8: - _levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2 + 1) - record.module = _module - record.levelname = _levelname - return record - - logging.setLogRecordFactory(_record_factory) - - def format(self, record): - log_fmt = self.FORMATS.get(record.levelno) - formatter = logging.Formatter(log_fmt, "%d-%m-%Y %H:%M:%S") - return formatter.format(record) - - -logger = logging.getLogger('Pisya_Bot') -logger.setLevel(logging.DEBUG) - -ch = logging.StreamHandler() -ch.setLevel(logging.DEBUG) -ch.setFormatter(_CustomFormatter()) -logger.addHandler(ch) +import logging + + +class _CustomFormatter(logging.Formatter): + grey = "\x1b[38;20m" + yellow = "\x1b[33;20m" + red = "\x1b[31;20m" + bold_red = "\x1b[31;1m" + reset = "\x1b[0m" + format = f"%(asctime)s - [%(levelname)s] -%(module)s- %(message)s" + + FORMATS = { + logging.DEBUG: grey + format + reset, + logging.INFO: grey + format + reset, + logging.WARNING: yellow + format + reset, + logging.ERROR: red + format + reset, + logging.CRITICAL: bold_red + format + reset + } + + old_factory = logging.getLogRecordFactory() + + def _record_factory(*args, **kwargs): + record = _CustomFormatter.old_factory(*args, **kwargs) + _module = record.module + _levelname = record.levelname + if len(_module) % 2 == 0 and len(_module) < 12: + _module = ' ' * ((12 - len(_module)) // 2) + _module + ' ' * ((12 - len(_module)) // 2) + elif len(_module) % 2 == 1 and len(_module) < 12: + _module = ' ' * ((12 - len(_module)) // 2) + record.module + ' ' * ((12 - len(_module)) // 2 + 1) + if len(_levelname) % 2 == 0 and len(_levelname) < 8: + _levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2) + elif len(record.levelname) % 2 == 1 and len(record.module) < 8: + _levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2 + 1) + record.module = _module + record.levelname = _levelname + return record + + logging.setLogRecordFactory(_record_factory) + + def format(self, record): + log_fmt = self.FORMATS.get(record.levelno) + formatter = logging.Formatter(log_fmt, "%d-%m-%Y %H:%M:%S") + return formatter.format(record) + + +logger = logging.getLogger('Pisya_Bot') +logger.setLevel(logging.DEBUG) + +ch = logging.StreamHandler() +ch.setLevel(logging.DEBUG) +ch.setFormatter(_CustomFormatter()) +logger.addHandler(ch) diff --git a/lib/Player.py b/lib/Player.py index be4538b..b962874 100644 --- a/lib/Player.py +++ b/lib/Player.py @@ -1,17 +1,17 @@ -from .Logger import logger -from asyncio import sleep -from disnake import FFmpegPCMAudio - - -async def play_audio(audio, bot, vc): - if not bot.voice_clients: - logger.info(audio) - logger.error(f'Playing: {audio}') - await sleep(1) - vp = await vc.connect() - if not vp.is_playing(): - vp.play(FFmpegPCMAudio(f'{audio}'), after=None) - while vp.is_playing(): - await sleep(0.5) - await sleep(1) - await vp.disconnect() +from .Logger import logger +from asyncio import sleep +from disnake import FFmpegOpusAudio, opus + + +async def play_audio(audio, bot, vc): + if not bot.voice_clients: + # logger.info(audio) + logger.error(f'Playing: {audio}') + await sleep(1) + vp = await vc.connect() + if not vp.is_playing(): + vp.play(FFmpegOpusAudio(f'{audio}', executable='ffmpeg'), after=lambda e: print('done', e)) + while vp.is_playing(): + await sleep(0.5) + await sleep(1) + await vp.disconnect() diff --git a/lib/__init__.py b/lib/__init__.py index d1bbc5f..cdc88f4 100644 --- a/lib/__init__.py +++ b/lib/__init__.py @@ -1,11 +1,11 @@ -""" -lib -~~~~~~~~~~~~~ -Some libs for the bot whitch help him -""" -from .ListGenerator import * -from .Logger import * -from .Comands import * -from .Player import * -from .DB_Worker import * -from .CogsPrep import * +""" +lib +~~~~~~~~~~~~~ +Some libs for the bot whitch help him +""" +from .ListGenerator import * +from .Logger import * +from .Comands import * +from .Player import * +from .DB_Worker import * +from .CogsPrep import * diff --git a/main.py b/main.py new file mode 100644 index 0000000..a43c2d8 --- /dev/null +++ b/main.py @@ -0,0 +1,80 @@ +import random +import sys +import threading +import logging +import discord +from asyncio import sleep +from os import walk + +from discord import user, member +from discord import FFmpegPCMAudio +from discord.ext import commands + +threading.current_thread().name = "main" +logging.basicConfig(stream=sys.stdout, filemode='w', level='INFO', + format='%(asctime)s - %(levelname)s - %(threadName)s - %(message)s') + + +intents = discord.Intents.default() +intents.members = True +bot = commands.Bot(command_prefix='$', guild_subscriptions=True, intents=intents) +f = [] +for filenames in walk('audio'): + f.extend(filenames) + break +f = f[2] + + +@bot.event +async def on_voice_state_update(member, before, after): + # channel = bot.get_channel(783729824896122930) + _role = 929729495370461205 + _memb = 375664768087752714 + _bot_id = 946819004314570852 + role = discord.utils.find(lambda r: r.name == 'тарковчане', member.roles) + if before.channel is None and role in member.roles: + track = random.randint(0, len(f) - 1) + audio_source = FFmpegPCMAudio(f'audio/{f[track]}') + logging.error(f'{track}\t\t{f[track]}') + if not bot.voice_clients: + await sleep(1) + _channel = after.channel + vc = await after.channel.connect() + if not vc.is_playing(): + vc.play(audio_source, after=None) + while vc.is_playing(): + await sleep(0.5) + await sleep(1) + await vc.disconnect() + if before.channel is None and member.id == _memb: + track = random.randint(0, len(f) - 1) + audio_source = FFmpegPCMAudio(f'audio/{_memb}/bear2_enemy_scav3.wav') + logging.error(f'{track}\t\t\t{f[track]}') + if not bot.voice_clients: + await sleep(1) + _channel = after.channel + vc = await after.channel.connect() + if not vc.is_playing(): + vc.play(audio_source, after=None) + while vc.is_playing(): + await sleep(0.5) + await sleep(1) + await vc.disconnect() + + +@bot.event +async def on_member_join(member): + if member.bot == 0: + role = discord.utils.get(member.guild.roles, id=734358428939452486) + else: + role = discord.utils.get(member.guild.roles, id=734358434945826858) + logging.info(f"Adding to {member} role {role}") + await member.add_roles(role) + + +@bot.event +async def on_ready(): + logging.info(f'Bot started') + + +bot.run('OTQ2ODE5MDA0MzE0NTcwODUy.YhkP6Q.dhFqi2MJMrxzHt5FtjK5Cl-5BI8') diff --git a/requirements.txt b/requirements.txt index 17f89c0..e291100 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ -disnake[audio]~=2.5.2 -psutil==5.9.2 -pymediainfo~=5.1.0 -pyNaCl~=1.5.0 -python-dotenv==0.21.0 +disnake[audio]==2.8.0 +psutil==5.9.4 +pymediainfo==6.0.1 +pyNaCl~=1.5.0 +python-dotenv==1.0.0 ffmpeg-python~=0.2.0 \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..a1a24d9 --- /dev/null +++ b/setup.py @@ -0,0 +1,15 @@ +from setuptools import setup +import __init__ + +setup( + name='fun and admin bot', + version=__init__.__version__, + packages=['lib'], + url='', + platforms='POSIX', + license='', + author='riksl', + author_email='', + description='', + requires=[] +) diff --git a/test.py b/test.py new file mode 100644 index 0000000..b2641b4 --- /dev/null +++ b/test.py @@ -0,0 +1,5 @@ +from lib import ListGenerator + +for i in ListGenerator('audio'): + print(i.__repr__()) + print(f"{i.path}/{i.name}") \ No newline at end of file