some code changes

This commit is contained in:
2022-08-31 18:28:07 +03:00
parent 4a33b35283
commit 5b134e6446
19 changed files with 288 additions and 145 deletions

View File

@@ -1,22 +0,0 @@
import platform
import sys
from __init__ import version_info
import pkg_resources
def show_version():
entries = []
entries.append(
"- Python v{0.major}.{0.minor}.{0.micro}-{0.releaselevel}".format(sys.version_info)
)
entries.append("- Pisya_bot v{0.major}.{0.minor}.{0.micro}-{0.releaselevel}".format(version_info))
if pkg := pkg_resources.get_distribution("disnake"):
entries.append(f" - disnake pkg_resources: v{pkg.version}")
uname = platform.uname()
entries.append("- system info: {0.system} {0.release} {0.version}".format(uname))
print("\n".join(entries))
show_version()

Binary file not shown.

Binary file not shown.

Binary file not shown.

42
bot.py Normal file → Executable file
View File

@@ -1,18 +1,17 @@
#!/usr/bin/env python3
import asyncio
import logging
import os
import sys
import disnake
from disnake import OptionChoice, OptionType, Option
from disnake.ext import commands
from dotenv import load_dotenv
from __init__ import version_info
from lib.CogsPrepare import work_with_cogs
from lib.Comands import determine_prefix, check_conf
from lib import work_with_cogs
from lib import preload_checks, determine_prefix
from lib import logger
preload_checks()
load_dotenv()
check_conf()
intents = disnake.Intents(messages=True,
guilds=True,
@@ -27,21 +26,19 @@ bot = commands.Bot(command_prefix=determine_prefix,
reload=True
)
logging.basicConfig(stream=sys.stdout, filemode='w', level='INFO',
format='%(asctime)s - %(levelname)s - %(module)s - %(message)s')
asyncio.run(work_with_cogs('load', bot))
@bot.event
async def on_ready():
slash_commands = [r.name for r in bot.slash_commands]
commands = [r.name for r in bot.commands]
logging.info(f'Bot started')
logging.info('We have logged in as {0.user}'.format(bot))
logging.info('Version of bot is - v{0.major}.{0.minor}.{0.micro}-{0.releaselevel}'.format(version_info))
logging.info(f'list of all slash commmmands: \n{slash_commands}')
logging.info(f'list of all commmmands: \n{commands}')
slash_commands = '\n\t* '.join(f'{r.name}: {r.description}' for r in bot.slash_commands)
_commands = '\n\t* '.join(f'{r.name}: {r.description}' for r in bot.commands)
logger.info(f'Bot started')
logger.info('We have logged in as {0.user}'.format(bot))
logger.info('Version of bot is - v{0.major}.{0.minor}.{0.micro}-{0.releaselevel}'.format(version_info))
logger.info(f'list of all slash commands: \n\t* {slash_commands}')
logger.info(f'list of all commands: \n\t* {_commands}')
@bot.slash_command(
@@ -61,13 +58,16 @@ async def on_ready():
)
]
)
@commands.is_owner()
async def slash_cogs(inter, what_do):
if inter.author.id == bot.owner_id:
await work_with_cogs(what_do, bot)
await inter.response.send_message(f'Cogs are {what_do}ed', ephemeral=True)
await work_with_cogs(what_do, bot)
await inter.response.send_message(f'Cogs are {what_do}ed', ephemeral=True)
else:
await inter.response.send_message('You`re not bot owner', ephemeral=True)
@slash_cogs.error
async def cogs_error(inter, what_do):
await inter.response.send_message(f'{what_do}', ephemeral=True)
logger.error(f'User {inter.author} tries to use cogs func')
bot.run(os.getenv('TOKEN'))

View File

@@ -1,4 +1,3 @@
import logging
from asyncio import sleep
import disnake
@@ -6,8 +5,9 @@ from disnake import Option, OptionType
from disnake.ext import commands, tasks
from lib.Comands import read_json, write_json
from lib.DB import fill_bd, prepare_db, work_with_db
from lib import read_json, write_json
from lib import fill_bd, prepare_db, work_with_db
from lib import logger
class Admin(commands.Cog, name='Admin'):
@@ -22,7 +22,7 @@ class Admin(commands.Cog, name='Admin'):
await fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id)
self.activity.start()
logging.info(f'Cog {__name__.split(".")[1]} is ready!.')
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@tasks.loop(seconds=20)
async def activity(self):
@@ -63,7 +63,7 @@ class Admin(commands.Cog, name='Admin'):
role = disnake.utils.get(member.guild.roles, id=guest_role)
else:
role = disnake.utils.get(member.guild.roles, id=bot_role)
logging.info(f"Adding to {member} role {role}")
logger.info(f"Adding to {member} role {role}")
await member.add_roles(role)
@commands.slash_command(
@@ -137,7 +137,9 @@ class Admin(commands.Cog, name='Admin'):
Option("seconds", "specify max duration", OptionType.integer, required=True),
]
)
async def set_time(self, inter, seconds: commands.Range[5, 30]):
async def set_time(self,
inter: disnake.ApplicationCommandInteraction,
seconds: commands.Range[5, 30]):
await write_json(inter.guild.id, "seconds", seconds)
await inter.response.send_message(f"Change max audio duration to {seconds} sec", ephemeral=True)

View File

@@ -1,32 +1,30 @@
import logging
import random
from os import path, makedirs, rename, remove
from typing import Optional
import disnake
from disnake import Option, OptionType
from disnake.ext import commands
from lib.Comands import determine_time
from lib.DB import read_db, check_exist_audio, add_audio
from lib.Player import play_audio, get_audio_list
from lib import logger
from lib import determine_time
from lib import read_db, check_exist_audio, add_audio
from lib import play_audio
# todo: write chose audio from list by slash command
class Audio(commands.Cog):
class Audio(commands.Cog, name='Audio'):
def __init__(self, bot):
self.bot = bot
@commands.Cog.listener()
async def on_ready(self):
logging.info(f'Cog {__name__.split(".")[1]} is ready!.')
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
# todo: complete check activity
@commands.Cog.listener()
async def on_voice_state_update(self, member, before, after):
print(type(member.roles))
if before.channel is None and not member.bot:
if any('Escape from Tarkov' in str(user.activity) for user in after.channel.members):
logging.info('Skip playing by Game')
logger.info('Skip playing by Game')
else:
# Prepare list of audio
from lib.Comands import read_json
@@ -46,35 +44,30 @@ class Audio(commands.Cog):
if def_audio_db or audio_db is not None:
if def_audio_db is None: def_audio_db = []
if audio_db is None: audio_db = []
logging.info(f'Play audio from DB')
logger.info(f'Play audio from DB')
full_audio = def_audio_db + audio_db
await play_audio(random.choice(full_audio), self.bot, after.channel)
elif len(member.roles) == 1 or _role is None:
logging.info(f'Skip playing by role')
logger.info(f'Skip playing by role')
elif any(str(role.id) in _role for role in member.roles):
logging.info(f'Play audio from list by role')
logger.info(f'Play audio from list by role')
await play_audio(random.choice(def_audio_ls), self.bot, after.channel)
else:
logging.info(f'Skip playing by any else')
logger.info(f'Skip playing by any else')
@commands.slash_command(name="play_audio",
description="Set channel whitch itterate with bot",
options=[
Option("audio", "select audio", OptionType.string, required=True),
]
)
async def play_audio(self,
integration: disnake.ApplicationCommandInteraction,
audio: Optional[str]
) -> None:
await get_audio_list(integration, audio)
@commands.command(name="upload_audio")
@commands.command(name="upload_audio",
description=f"Add audio to bot")
async def upload_audio(self, ctx, user=None):
user = user or ctx.author
if ctx.author.guild_permissions.administrator or user is ctx.author:
if ctx.message.attachments:
from os import error
if not path.isdir(f'tmp/{user.id}'):
try:
makedirs(f'tmp/{user.id}')
except error as _error:
pass
if not path.isdir(f'audio/{user.id}'):
try:
makedirs(f'audio/{user.id}')
@@ -83,16 +76,17 @@ class Audio(commands.Cog):
for at in ctx.message.attachments:
import mimetypes
await at.save(f'tmp/{user.id}")/{at.filename}')
guess = mimetypes.guess_type(f'tmp/{user.id}")/{at.filename}')
await at.save(f'tmp/{user.id}/{at.filename}')
guess = mimetypes.guess_type(f'tmp/{user.id}/{at.filename}')
if guess[0].split('/')[0] == 'audio':
from pymediainfo import MediaInfo
file = f'tmp/{user.id}")/{at.filename}'
file = f'tmp/{user.id}/{at.filename}'
duration = round(MediaInfo.parse(file).tracks[0].duration / 1000)
max_duration = determine_time(ctx)
max_duration = int(determine_time(ctx))
print(type(max_duration))
if duration > max_duration:
await ctx.reply(f'Audio duration is {duration}, but max is {max_duration}')
remove(f'tmp/{user.id}")/{at.filename}')
remove(f'tmp/{user.id}/{at.filename}')
else:
a = await read_db(ctx.guild.id, user.id, 'usertracks')
if a:
@@ -102,10 +96,10 @@ class Audio(commands.Cog):
await check_exist_audio(ctx, ctx.guild.id, user.id, 'usertracks', at.filename)
await add_audio(ctx.guild.id, user.id, audiolist)
rename(f'tmp/{user.id}")/{at.filename}', f'audio/{user.id}/{at.filename}')
rename(f'tmp/{user.id}/{at.filename}', f'audio/{user.id}/{at.filename}')
elif guess[0].split('/')[0] != 'audio':
await ctx.reply(f'It not audio {at.filename}\n it`s {guess[0]}')
remove(f'tmp/{user.id}")/{at.filename}')
remove(f'tmp/{user.id}/{at.filename}')
else:
await ctx.reply("Has no Attachment")
else:

View File

@@ -1,6 +1,5 @@
import logging
from disnake.ext import commands
from lib import logger
class Fun(commands.Cog, name='Fun'):
@@ -9,7 +8,7 @@ class Fun(commands.Cog, name='Fun'):
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logging.info(f'Cog {__name__.split(".")[1]} is ready!.')
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
def setup(bot): # an extension must have a setup function

View File

@@ -1,11 +1,10 @@
import logging
import disnake
from disnake import Option, OptionType, Colour
from disnake.ext import commands
from lib.Comands import write_json
from lib.DB import work_with_db, read_db
from lib import read_db
from lib import logger
class General(commands.Cog):
@@ -14,7 +13,7 @@ class General(commands.Cog):
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logging.info(f'Cog {__name__.split(".")[1]} is ready!.')
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(
name="info",

View File

@@ -1,11 +1,11 @@
import logging
import os
import disnake
import psutil
from disnake.ext import commands
from lib.Comands import determine_prefix, determine_time
from lib import determine_prefix, determine_time
from lib import logger
class Bot_info(commands.Cog, name='Bot Info'):
@@ -14,9 +14,10 @@ class Bot_info(commands.Cog, name='Bot Info'):
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logging.info(f'Cog {__name__.split(".")[1]} is ready!.')
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(name="info_bot") # this is for making a command
@commands.slash_command(name="info_bot",
description='Shows general info about bot') # this is for making a command
async def info_bot(self, inter):
_pid = os.getpid()
_process = psutil.Process(_pid)

56
cogs/test.py Normal file
View File

@@ -0,0 +1,56 @@
from typing import List
import disnake
from disnake import Option, OptionType, OptionChoice
from disnake.ext import commands
from lib import list_files
from lib import logger
from lib import play_audio
class Testing(commands.Cog, name='Testing'):
def __init__(self, bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(name="play_audio",
options=[
Option(name="audio",
type=OptionType.string,
required=True
# choices=OptionChoice(list_to_play)
)
])
async def playaudio(self, inter: disnake.ApplicationCommandInteraction,
audio: str
):
if inter.author.voice is not None:
await inter.response.send_message(f'Played {audio}')
await play_audio(audio, self.bot, inter.author.voice.channel)
else:
await inter.response.send_message('You`re not in voice')
@playaudio.autocomplete('audio')
async def list_to_play(self, inter: disnake.ApplicationCommandInteraction, current: str) -> List[OptionChoice]:
def_list = await list_files()
try:
user_list = await list_files(str(inter.author.id))
# user_dict = []
# for _track in user_list:
except IndexError:
user_list = []
_list = def_list + user_list
return [
OptionChoice(name=choice, value=choice)
for choice in def_list if current.lower() in choice.lower()
]
def setup(bot): # an extension must have a setup function
bot.add_cog(Testing(bot)) # adding a cog

View File

@@ -6,11 +6,10 @@ cog_list: return list of cog filenames
work_with_cogs: loads, reloads and unloads cogs files
"""
import logging
import traceback
from os import listdir
from disnake.ext import commands
from .Logger import logger
def cog_list():
@@ -26,16 +25,16 @@ async def work_with_cogs(what_do, bot):
if what_do == "load":
try:
bot.load_extension(f'cogs.{_filename}')
logging.info(f'Loaded cog {_filename}')
logger.info(f'Loaded cog {_filename}')
except commands.ExtensionNotFound:
logging.error(f"Error: {_filename} couldn't be found to load.")
logger.error(f"Error: {_filename} couldn't be find to load.")
except commands.ExtensionFailed as error:
logging.error(f'Error: {_filename} failed to load properly.', error)
logger.error(f'Error: {_filename} failed to load properly.\n\t{error}\n\n{traceback.format_exc()}')
except commands.ExtensionError:
logging.error(f'Error: unknown error with {_filename}')
logger.error(f'Error: unknown error with {_filename}')
elif what_do == 'unload':
bot.unload_extension(f'cogs.{_filename}')

View File

@@ -1,23 +1,26 @@
import asyncio
"""
lib.Commands
~~~~~~~~~~~~~~
Some prepare for commands
"""
import json
import os
from typing import Union
import disnake
"""
Some prepare for commands
"""
import dotenv
def check_conf():
if not os.path.isfile(os.getenv('conf_file')):
with open(os.getenv('conf_file'), 'w+', encoding='utf-8') as f:
def preload_checks():
dotenv.load_dotenv()
if not os.path.isfile('.env') or not os.getenv('CONF_FILE'):
with open('.env', 'a', encoding='utf-8') as f:
f.write("CONF_FILE='config.json'\n")
dotenv.load_dotenv()
if not os.path.isfile(os.getenv('CONF_FILE')):
with open(os.getenv('CONF_FILE'), 'a', encoding='utf-8') as f:
f.write("")
async def list_files(fold: str = 'audio'):
if fold != 'audio': fold = f'audio/{fold}'
fl = []
for filenames in os.walk(fold):
fl.extend(filenames)
@@ -36,7 +39,7 @@ async def read_json(guild: int, _param: str):
:return: value of parameter.
"""
parameter = None
with open(os.getenv('conf_file'), 'r', encoding='utf-8') as f: # Open the JSON
with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f: # Open the JSON
try:
_json = json.load(f) # Load the custom prefixes
except json.decoder.JSONDecodeError:
@@ -54,7 +57,7 @@ async def read_json(guild: int, _param: str):
async def write_json(guild: int, param_name: str, param: str or int):
with open(os.getenv('conf_file'), 'r', encoding='utf-8') as f:
with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f:
try:
_json = json.load(f)
except json.decoder.JSONDecodeError:
@@ -65,7 +68,7 @@ async def write_json(guild: int, param_name: str, param: str or int):
_json.update({f'{guild}': {}})
_guild = _json[f'{guild}']
_guild.update({f'{param_name}': f'{param}'})
with open(os.getenv('conf_file'), 'w', encoding='utf-8') as f:
with open(os.getenv('CONF_FILE'), 'w', encoding='utf-8') as f:
json.dump(_json, f, indent=4)
@@ -77,7 +80,7 @@ def determine_prefix(bot, msg):
:return: prefix for server, default is $
"""
parameter = '$'
with open(os.getenv('conf_file'), 'r', encoding='utf-8') as f: # Open the JSON
with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f: # Open the JSON
try:
_json = json.load(f) # Load the custom prefixes
except:
@@ -97,7 +100,7 @@ def determine_time(msg):
:return: prefix for server, default is $
"""
parameter = 15
with open(os.getenv('conf_file'), 'r', encoding='utf-8') as f: # Open the JSON
with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f: # Open the JSON
try:
_json = json.load(f) # Load the custom prefixes
except:
@@ -108,9 +111,3 @@ def determine_time(msg):
except:
pass
return parameter
def maybe_defer(inter: disnake.Interaction, *, delay: Union[float, int] = 2.0, **options) -> asyncio.Task:
"""Defer an interaction if it has not been responded to after ``delay`` seconds."""
loop = inter.bot.loop
if delay <= 0:
return loop.create_task(inter.response.defer(**options))

96
lib/ListGenerator.py Normal file
View File

@@ -0,0 +1,96 @@
import mimetypes
import os
class FileAttrs:
def __iter__(self, name, path, type):
self.name = name
self.path = path
self.type = type
@self.name.setter
def name(self, value):
self.name = value
@self.type.setter
def type(self, value):
self.type = value
@self.path.setter
def path(self, value):
self.path = value
class ListGenerator:
def __init__(self, path=None):
self._path = path
self.list: list = self._lister(self._path)
self._current_index = 0
def __str__(self) -> str:
return self.name
def __repr__(self) -> str:
return (
f'<Audio file name={self.name} path={self.path} type={self.type} size={self._size}>'
)
@property
def name(self) -> str:
return self.list[self._current_index]
@property
def path(self) -> str:
return str(self._path)
@property
def type(self) -> str:
guess = mimetypes.guess_type(f'{self._path}/{self.name}')[0].split('/')[0]
return guess
@classmethod
def _lister(cls, path) -> list:
_dict: list = []
for _files in os.walk(path):
_dict.extend(_files)
break
return _dict[2]
def __iter__(self):
return ListGenerationIter(self)
@name.setter
def name(self, value):
self.name = value
@type.setter
def type(self, value):
self.type = value
@path.setter
def path(self, value):
self.path = value
class ListGenerationIter:
def __init__(self, list_class):
self._list = list_class.list
self._name = list_class.name
self._type = list_class.type
self._path = list_class.path
self._size = len(self._list)
self._current_index = 0
def __iter__(self):
return self
def __next__(self):
if self._current_index < self._size:
_name = self._list[self._current_index]
_path = self._path
_type = self._type
self._current_index += 1
memb = FileAttrs(_name, _path, _type)
return memb
raise StopIteration

33
lib/Logger.py Normal file
View File

@@ -0,0 +1,33 @@
import logging
class CustomFormatter(logging.Formatter):
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
format = "%(asctime)s - %(levelname)s - %(module)s \t- %(message)s"
FORMATS = {
logging.DEBUG: grey + format + reset,
logging.INFO: grey + format + reset,
logging.WARNING: yellow + format + reset,
logging.ERROR: red + format + reset,
logging.CRITICAL: bold_red + format + reset
}
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
logger = logging.getLogger('Pisya_Bot')
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(CustomFormatter())
logger.addHandler(ch)

View File

@@ -1,16 +1,12 @@
import logging
from .Logger import logger
from asyncio import sleep
from typing import Union, Optional
import disnake
from disnake import FFmpegPCMAudio
from disnake.ext import commands
async def play_audio(audio, bot, vc):
if not bot.voice_clients:
logging.info(audio)
logging.error(f'Playing: {audio}')
logger.info(audio)
logger.error(f'Playing: {audio}')
await sleep(1)
vp = await vc.connect()
if not vp.is_playing():
@@ -19,11 +15,3 @@ async def play_audio(audio, bot, vc):
await sleep(0.5)
await sleep(1)
await vp.disconnect()
async def get_audio_list(inter: Union[disnake.ApplicationCommandInteraction, commands.Context],
search: Optional[str],
return_embed: bool = False,
) -> None:
await list_files()

View File

@@ -3,8 +3,9 @@ lib
~~~~~~~~~~~~~
Some libs for the bot whitch help him
"""
from .DB import *
from .Player import *
from .ListGenerator import *
from .Logger import *
from .Comands import *
from .CogsPrepare import *
from .Player import *
from .DB_Worker import *
from .CogsPrep import *

View File

@@ -1,9 +1,9 @@
from setuptools import setup
import test
import __init__
setup(
name='fun and admin bot',
version=test.__version__,
version=__init__.__version__,
packages=['lib'],
url='',
platforms='POSIX',