v0.0.6\nplay_audio fixes

This commit is contained in:
bot
2024-03-07 02:18:09 +03:00
parent d75add614b
commit 53939b4bc3
22 changed files with 1126 additions and 1104 deletions

22
.gitignore vendored
View File

@@ -1,11 +1,11 @@
/tmp/
/audio/*/
/.idea
/user.db
*.json
*.pyc
/.run/
/.env
*.exe
/venv/
/fun_and_admin_bot.egg-info/
/tmp/
/audio/*/
/.idea
/user.db
*.json
*.pyc
/.run/
/.env
*.exe
/venv/
/fun_and_admin_bot.egg-info/

View File

@@ -1,2 +1,2 @@
0.0.5
Initial
0.0.5
Initial

View File

@@ -1,16 +1,16 @@
__version__ = '0.0.5'
__title__ = "Pisya_bot"
__author__ = "beaconborn"
from typing import NamedTuple, Literal
class VersionInfo(NamedTuple):
major: int
minor: int
micro: int
releaselevel: Literal["alpha", "beta", "candidate", "final"]
serial: int
version_info: VersionInfo = VersionInfo(major=5, minor=0, micro=5, releaselevel="alpha", serial=0)
__version__ = '0.0.6'
__title__ = "Pisya_bot"
__author__ = "baconborn"
from typing import NamedTuple, Literal
class VersionInfo(NamedTuple):
major: int
minor: int
micro: int
releaselevel: Literal["alpha", "beta", "candidate", "final"]
serial: int
version_info: VersionInfo = VersionInfo(major=0, minor=0, micro=6, releaselevel="alpha", serial=0)

173
bot.py
View File

@@ -1,90 +1,83 @@
#!/usr/bin/env python3
import asyncio
import os
from typing import List
import disnake
from disnake import OptionChoice, OptionType, Option
from disnake.ext import commands
from __init__ import version_info as ver
from lib import work_with_cogs, cog_list
from lib import preload_checks, determine_prefix
from lib import logger
preload_checks()
intents = disnake.Intents(messages=True,
guilds=True,
message_content=True,
voice_states=True,
members=True,
presences=True
)
bot = commands.Bot(command_prefix=determine_prefix,
intents=intents,
reload=True
)
asyncio.run(work_with_cogs('load', bot, asyncio.run(cog_list())))
@bot.event
async def on_ready():
logger.info(f'Bot started')
logger.info('We have logged in as {0.user}'.format(bot))
logger.info(f'Version of bot is - v{ver.major}.{ver.minor}.{ver.micro}-{ver.releaselevel}')
@bot.slash_command(
name="cog",
description="Work with cogs",
options=[
Option(
"what_do",
description="Specify what do with cogs",
type=OptionType.string,
required=True,
choices=[
OptionChoice("load", "load"),
OptionChoice("unload", "unload"),
OptionChoice("reload", "reload"),
OptionChoice("enable", "enable"),
OptionChoice("disable", "disable"),
]
),
Option(
'cog',
description="specify cog",
type=OptionType.string
)
]
)
@commands.is_owner()
async def slash_cogs(inter, what_do, cog: str = asyncio.run(cog_list())) -> None:
await work_with_cogs(what_do, bot, cog)
await inter.response.send_message(f'Cog {cog} is {what_do}ed', ephemeral=True)
@slash_cogs.autocomplete('cog')
async def _cog_opt(inter: disnake.ApplicationCommandInteraction, current: str, what_do) -> List[OptionChoice]:
_what = ['load', 'reload', 'unload', 'disable']
if what_do in _what:
_list = await cog_list()
elif what_do == 'enable':
_list = await cog_list('./cogs/disabled/')
return [
OptionChoice(name=choice, value=choice)
for choice in _list if current.lower() in choice.lower()
]
@slash_cogs.error
async def cogs_error(inter, what_do):
await inter.response.send_message(f'Error', ephemeral=True)
logger.error(f'User {inter.author} tries to use cogs func\n{what_do}\n')
bot.run(os.getenv('TOKEN'))
#!/usr/bin/env python3
import asyncio
import os
from typing import List
import disnake
from disnake import OptionChoice, OptionType, Option
from disnake.ext import commands
from __init__ import version_info as ver
from lib import work_with_cogs, cog_list
from lib import preload_checks, determine_prefix
from lib import logger
preload_checks()
intents = disnake.Intents(messages=True,
guilds=True,
message_content=True,
voice_states=True,
members=True,
presences=True
)
bot = commands.Bot(command_prefix=determine_prefix,
intents=intents,
reload=True,
)
asyncio.run(work_with_cogs('load', bot, asyncio.run(cog_list())))
@bot.event
async def on_ready():
logger.info(f'Bot started')
logger.info(f'Disnake version {disnake.__version__}')
logger.info('We have logged in as {0.user}'.format(bot))
logger.info(f'Version of bot is - v{ver.major}.{ver.minor}.{ver.micro}-{ver.releaselevel}')
@bot.slash_command(
name="cog",
description="Work with cogs",
options=[
Option(
"what_do",
description="Specify what do with cogs",
type=OptionType.string,
required=True,
choices=["load", "unload", "reload", "enable", "disable"]
),
Option(
'cog',
description="specify cog",
type=OptionType.string
)
]
)
@commands.is_owner()
async def slash_cogs(inter: disnake.ApplicationCommandInteraction, what_do, cog: str):
await work_with_cogs(what_do, bot, cog)
await inter.response.send_message(f'Cog {cog} is {what_do}ed', ephemeral=True)
@slash_cogs.autocomplete('cog')
async def _cog_opt(inter: disnake.ApplicationCommandInteraction, current: str, what_do):
current = current.lower()
if what_do == 'enable':
_list = await cog_list('./cogs/disabled/')
else:
_list = await cog_list()
return [choice for choice in _list if current in choice.lower()]
@slash_cogs.error
async def cogs_error(inter, what_do):
await inter.response.send_message(f'Error', ephemeral=True)
logger.error(f'User {inter.author} tries to use cogs func\n{what_do}\n')
bot.run(os.getenv('TOKEN'))

View File

@@ -1,174 +1,172 @@
from asyncio import sleep
from typing import List
import disnake
from disnake import Option, OptionType, OptionChoice
from disnake.ext import commands, tasks
from lib import read_json, write_json
from lib import fill_bd, prepare_db, work_with_db
from lib import logger
class Admin(commands.Cog, name='Admin'):
def __init__(self, bot):
self.bot = bot # a defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
for g in self.bot.get_all_members():
await prepare_db(g.guild.id)
for g in self.bot.get_all_members():
await fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id)
self.activity.start()
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@tasks.loop(seconds=20)
async def activity(self):
await self.bot.change_presence(
activity=disnake.Activity(
name=f'at users: {str(len(self.bot.users))}',
type=3
)
)
await sleep(10)
await self.bot.change_presence(
activity=disnake.Activity(
name=f'at servers: {str(len(self.bot.guilds))}',
type=3
)
)
@commands.Cog.listener()
async def on_member_update(self, before: disnake.Member, after: disnake.Member):
sql_update_query = f"""UPDATE "{after.guild.id}" set nick = ? where userid = ?"""
data_tuple = (after.nick, before.id)
await work_with_db(sql_update_query, data_tuple)
@commands.Cog.listener()
async def on_guild_join(self, guild):
for g in guild.members:
await fill_bd(g.name, g.id, g.bot, g.nick, guild.id)
@commands.Cog.listener()
async def on_member_join(self, member):
await fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id)
bot_role = read_json(member.guild.id, 'bot_role') # Get bot role
guest_role = read_json(member.guild.id, 'guest_role') # Get guest role
if bot_role or guest_role:
if member.bot == 0:
role = disnake.utils.get(member.guild.roles, id=guest_role)
else:
role = disnake.utils.get(member.guild.roles, id=bot_role)
logger.info(f"Adding to {member} role {role}")
await member.add_roles(role)
@commands.slash_command(
name="set_guest_role",
description="Set Default bot role",
options=[
Option("role", "Specify role", OptionType.role, required=True),
]
)
@commands.has_permissions(administrator=True)
async def set_guest_role(self, inter, role):
await write_json(inter.guild.id, "guest_role", role.id)
await inter.response.send_message(f"Set up bot role to: `{role.name}`", ephemeral=True)
@commands.command(name="set_prefix")
@commands.has_permissions(administrator=True)
async def command_set_prefix(self, ctx, prefix: str):
await write_json(ctx.guild.id, "prefix", prefix)
await ctx.reply(f"Prefix set to: `{prefix}`")
@commands.guild_only()
@commands.slash_command(
name="set_prefix",
description="Setting up bot prefix",
options=[
Option("prefix", "Specify prefix", OptionType.string, required=True),
]
)
@commands.has_permissions(administrator=True)
async def slash_set_prefix(self, inter, prefix: str):
await write_json(inter.guild.id, "prefix", prefix)
await inter.response.send_message(f"Prefix set to: `{prefix}`", ephemeral=True)
@commands.guild_only()
@commands.slash_command(
name="set_trigger_role",
description="Setting up role to trigger bot",
options=[
Option("role", "Specify role", OptionType.role, required=True),
]
)
@commands.has_permissions(administrator=True)
async def set_trigger_role(self, inter, role):
await write_json(inter.guild.id, "tigger_role", role.id)
await inter.response.send_message(f"Role to trigger set to : `{role.name}`", ephemeral=True)
@commands.slash_command(
name="set_bot_role",
description="Set Default bot role",
options=[
Option("role", "Specify role", OptionType.role, required=True),
]
)
@commands.guild_only()
@commands.has_permissions(administrator=True)
async def set_bot_role(self, ctx, role):
await write_json(ctx.guild.id, "bot_role", role.id)
await ctx.send(f"Set up bot role to: `{role.name}`", ephemeral=True)
@set_bot_role.error
@set_trigger_role.error
@slash_set_prefix.error
async def set_prefix_error(self, inter, prefix):
await inter.response.send_message("You don`t have permissions", ephemeral=True)
@commands.has_permissions(administrator=True)
@commands.slash_command(
name="set_time",
description="Read list of tracks for user",
options=[
Option("seconds", "specify max duration", OptionType.integer, required=True),
]
)
async def set_time(self,
inter: disnake.ApplicationCommandInteraction,
seconds: commands.Range[5, 30]):
await write_json(inter.guild.id, "seconds", seconds)
await inter.response.send_message(f"Change max audio duration to {seconds} sec", ephemeral=True)
@commands.has_permissions(administrator=True)
@commands.slash_command(
name="set_bot_channel",
description="Set channel which iterate with bot",
options=[
Option("channel", "specify channel", OptionType.string, required=True),
]
)
async def set_bot_channel(self, inter, channel):
print(type(inter.guild.text_channels))
await write_json(inter.guild.id, "channel", channel.id)
await inter.response.send_message(f"Channel set up to {channel.mention}", ephemeral=True)
@set_bot_channel.autocomplete('channel')
async def _list_text_channels(self,
inter: disnake.ApplicationCommandInteraction,
current: str) -> List[OptionChoice]:
_list = []
for _channel in inter.guild.text_channels:
_list.append(_channel)
return [
OptionChoice(name=choice, value=choice)
for choice in _list if current in choice
]
def setup(bot): # an extension must have a setup function
bot.add_cog(Admin(bot)) # adding a cog
from asyncio import sleep
from typing import List
import disnake
from disnake import Option, OptionType, OptionChoice
from disnake.ext import commands, tasks
from lib import read_json, write_json
from lib import fill_bd, prepare_db, work_with_db
from lib import logger
class Admin(commands.Cog, name='Admin'):
def __init__(self, bot):
self.bot = bot # a defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
for g in self.bot.get_all_members():
await prepare_db(g.guild.id)
for g in self.bot.get_all_members():
await fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id)
self.activity.start()
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@tasks.loop(seconds=20)
async def activity(self):
await self.bot.change_presence(
activity=disnake.Activity(
name=f'at users: {str(len(self.bot.users))}',
type=3
)
)
await sleep(10)
await self.bot.change_presence(
activity=disnake.Activity(
name=f'at servers: {str(len(self.bot.guilds))}',
type=3
)
)
@commands.Cog.listener()
async def on_member_update(self, before: disnake.Member, after: disnake.Member):
sql_update_query = f"""UPDATE "{after.guild.id}" set nick = ? where userid = ?"""
data_tuple = (after.nick, before.id)
await work_with_db(sql_update_query, data_tuple)
@commands.Cog.listener()
async def on_guild_join(self, guild):
for g in guild.members:
await fill_bd(g.name, g.id, g.bot, g.nick, guild.id)
@commands.Cog.listener()
async def on_member_join(self, member):
await fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id)
bot_role = read_json(member.guild.id, 'bot_role') # Get bot role
guest_role = read_json(member.guild.id, 'guest_role') # Get guest role
if bot_role or guest_role:
if member.bot == 0:
role = disnake.utils.get(member.guild.roles, id=guest_role)
else:
role = disnake.utils.get(member.guild.roles, id=bot_role)
logger.info(f"Adding to {member} role {role}")
await member.add_roles(role)
@commands.slash_command(
name="set_guest_role",
description="Set Default bot role",
options=[
Option("role", "Specify role", OptionType.role, required=True),
]
)
@commands.has_permissions(administrator=True)
async def set_guest_role(self, inter, role):
await write_json(inter.guild.id, "guest_role", role.id)
await inter.response.send_message(f"Set up bot role to: `{role.name}`", ephemeral=True)
@commands.command(name="set_prefix")
@commands.has_permissions(administrator=True)
async def command_set_prefix(self, ctx, prefix: str):
await write_json(ctx.guild.id, "prefix", prefix)
await ctx.reply(f"Prefix set to: `{prefix}`")
@commands.guild_only()
@commands.slash_command(
name="set_prefix",
description="Setting up bot prefix",
options=[
Option("prefix", "Specify prefix", OptionType.string, required=True),
]
)
@commands.has_permissions(administrator=True)
async def slash_set_prefix(self, inter, prefix: str):
await write_json(inter.guild.id, "prefix", prefix)
await inter.response.send_message(f"Prefix set to: `{prefix}`", ephemeral=True)
@commands.guild_only()
@commands.slash_command(
name="set_trigger_role",
description="Setting up role to trigger bot",
options=[
Option("role", "Specify role", OptionType.role, required=True),
]
)
@commands.has_permissions(administrator=True)
async def set_trigger_role(self, inter, role):
await write_json(inter.guild.id, "tigger_role", role.id)
await inter.response.send_message(f"Role to trigger set to : `{role.name}`", ephemeral=True)
@commands.slash_command(
name="set_bot_role",
description="Set Default bot role",
options=[
Option("role", "Specify role", OptionType.role, required=True),
]
)
@commands.guild_only()
@commands.has_permissions(administrator=True)
async def set_bot_role(self, ctx, role):
await write_json(ctx.guild.id, "bot_role", role.id)
await ctx.send(f"Set up bot role to: `{role.name}`", ephemeral=True)
@set_bot_role.error
@set_trigger_role.error
@slash_set_prefix.error
async def set_prefix_error(self, inter, prefix):
await inter.response.send_message("You don`t have permissions", ephemeral=True)
logger.error(prefix)
@commands.has_permissions(administrator=True)
@commands.slash_command(
name="set_time",
description="Read list of tracks for user",
options=[
Option("seconds", "specify max duration", OptionType.integer, required=True),
]
)
async def set_time(self,
inter: disnake.ApplicationCommandInteraction,
seconds: commands.Range[int, 5, 30]):
await write_json(inter.guild.id, "seconds", seconds)
await inter.response.send_message(f"Change max audio duration to {seconds} sec", ephemeral=True)
@commands.has_permissions(administrator=True)
@commands.slash_command(
name="set_bot_channel",
description="Set channel which iterate with bot",
)
async def set_bot_channel(self, inter, channel: str):
await write_json(inter.guild.id,
"channel",
disnake.utils.find(lambda d: d.name == channel, inter.guild.channels).id)
await inter.response.send_message(f"Channel set up to {channel}", ephemeral=True)
@set_bot_channel.autocomplete('channel')
async def _list_text_channels(self,
inter: disnake.ApplicationCommandInteraction,
current: str) -> List[OptionChoice]:
_list = [r.name for r in inter.guild.text_channels]
return [
OptionChoice(name=choice, value=choice)
for choice in _list if current in choice
]
def setup(bot): # an extension must have a setup function
bot.add_cog(Admin(bot)) # adding a cog

View File

@@ -1,109 +1,83 @@
import random
from os import path, makedirs, rename, remove
from disnake.ext import commands
from lib import logger
from lib import determine_time
from lib import read_db, check_exist_audio, add_audio
from lib import play_audio
# todo: write chose audio from list by slash command
class Audio(commands.Cog, name='Audio'):
def __init__(self, bot):
self.bot = bot
@commands.Cog.listener()
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
# todo: complete check activity
@commands.Cog.listener()
async def on_voice_state_update(self, member, before, after):
if before.channel is None and not member.bot:
if any('Escape from Tarkov' in str(user.activity) for user in after.channel.members):
logger.info('Skip playing by Game')
else:
# Prepare list of audio
from lib.Comands import read_json
_role = await read_json(member.guild.id, 'tigger_role')
# Read audio from DB
audio_db = await read_db(member.guild.id, member.id, 'usertracks')
def_audio_db = await read_db(member.guild.id, member.id, 'defaulttracks')
if audio_db is not None:
audio_db = audio_db.split(', ') # Need to fix creating list
for i in range(len(audio_db)):
audio_db[i] = f'{member.id}/{audio_db[i]}'
if def_audio_db is not None:
def_audio_db = def_audio_db.split(', ')
from lib.Comands import list_files
def_audio_ls = await list_files()
if def_audio_db or audio_db is not None:
if not def_audio_db: def_audio_db = []
if not audio_db: audio_db = []
logger.info(f'Play audio from DB')
full_audio = def_audio_db + audio_db
await play_audio(f'audio/{random.choice(full_audio)}', self.bot, after.channel)
elif len(member.roles) == 1 or _role is None:
logger.info(f'Skip playing by role')
elif any(str(role.id) in _role for role in member.roles):
logger.info(f'Play audio from list by role')
await play_audio(f'audio/{random.choice(def_audio_ls)}', self.bot, after.channel)
else:
logger.info(f'Skip playing by any else')
@commands.command(name="upload_audio",
description=f"Add audio to bot")
async def upload_audio(self, ctx, user=None):
user = user or ctx.author
if ctx.author.guild_permissions.administrator or user is ctx.author:
if ctx.message.attachments:
from os import error
if not path.isdir(f'tmp/{user.id}'):
try:
makedirs(f'tmp/{user.id}')
except error as _error:
pass
if not path.isdir(f'audio/{user.id}'):
try:
makedirs(f'audio/{user.id}')
except error as _error:
pass
for at in ctx.message.attachments:
import mimetypes
await at.save(f'tmp/{user.id}/{at.filename}')
guess = mimetypes.guess_type(f'tmp/{user.id}/{at.filename}')
if guess[0].split('/')[0] == 'audio':
from pymediainfo import MediaInfo
file = f'tmp/{user.id}/{at.filename}'
duration = round(MediaInfo.parse(file).tracks[0].duration / 1000)
max_duration = int(determine_time(ctx))
print(type(max_duration))
if duration > max_duration:
await ctx.reply(f'Audio duration is {duration}, but max is {max_duration}')
remove(f'tmp/{user.id}/{at.filename}')
else:
a = await read_db(ctx.guild.id, user.id, 'usertracks')
if a:
audiolist = a + ", " + f'{at.filename}'
else:
audiolist = f'{at.filename}'
await check_exist_audio(ctx, ctx.guild.id, user.id, 'usertracks', at.filename)
await add_audio(ctx.guild.id, user.id, audiolist)
rename(f'tmp/{user.id}/{at.filename}', f'audio/{user.id}/{at.filename}')
elif guess[0].split('/')[0] != 'audio':
await ctx.reply(f'It not audio {at.filename}\n it`s {guess[0]}')
remove(f'tmp/{user.id}/{at.filename}')
else:
await ctx.reply("Has no Attachment")
else:
await ctx.reply(f'You`re not admin. You can add audio only for your own account')
def setup(bot): # an extension must have a setup function
bot.add_cog(Audio(bot)) # adding a cog
import random
from itertools import islice
from typing import List
import disnake
from disnake import OptionChoice, Option, OptionType
from disnake.ext import commands
from lib import logger, ListGenerator
from lib import read_db
from lib import play_audio
# todo: write chose audio from list by slash command
class Audio(commands.Cog, name='Audio'):
def __init__(self, bot):
self.bot = bot
@commands.Cog.listener()
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
# todo: complete check activity
@commands.Cog.listener()
async def on_voice_state_update(self, member, before, after):
if before.channel is None and not member.bot:
if any('Escape from Tarkov' in str(user.activity) for user in after.channel.members):
logger.info('Skip playing by Game')
else:
# Prepare list of audio
from lib.Comands import read_json
_role = await read_json(member.guild.id, 'tigger_role')
# Read audio from DB
audio_db = await read_db(member.guild.id, member.id, 'defaulttracks')
if audio_db is not None:
audio = audio_db.split(', ')
else:
from lib.Comands import list_files
audio = await list_files()
if audio_db is not None:
logger.info(f'Play audio from DB')
await play_audio(f'audio/{random.choice(audio)}', self.bot, after.channel)
elif len(member.roles) == 1 or _role is None:
logger.info(f'Skip playing by role')
elif any(str(role.id) in _role for role in member.roles):
logger.info(f'Play audio from list by role')
await play_audio(f'audio/{random.choice(audio)}', self.bot, after.channel)
else:
logger.info(f'Skip playing by any else')
@commands.slash_command(name="play_audio",
description="Make possible playing audio by command",
options=[
Option(name="audio",
type=OptionType.string,
required=True
)
])
async def playaudio(self, inter: disnake.ApplicationCommandInteraction,
audio: str
):
if inter.author.voice is not None:
await inter.response.send_message(f'Played {audio}', ephemeral=True)
await play_audio(audio, self.bot, inter.author.voice.channel)
else:
await inter.response.send_message('You`re not in voice', ephemeral=True)
@playaudio.autocomplete('audio')
async def list_to_play(self, inter: disnake.ApplicationCommandInteraction, current: str):
current = current.lower()
_dict: dict = {}
for f in ListGenerator('audio'):
_dict[f.name] = f'{f.path}/{f.name}'
return [
OptionChoice(name=choice, value=f'{_dict[choice]}')
for choice in _dict if current in choice.lower()
]
def setup(bot): # an extension must have a setup function
bot.add_cog(Audio(bot)) # adding a cog

View File

@@ -1,15 +1,15 @@
from disnake.ext import commands
from lib import logger
class Fun(commands.Cog, name='Fun'):
def __init__(self, bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
def setup(bot): # an extension must have a setup function
bot.add_cog(Fun(bot)) # adding a cog
from disnake.ext import commands
from lib import logger
class Fun(commands.Cog, name='Fun'):
def __init__(self, bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
def setup(bot): # an extension must have a setup function
bot.add_cog(Fun(bot)) # adding a cog

33
cogs/disabled/test.py Normal file
View File

@@ -0,0 +1,33 @@
from typing import List
import disnake
from disnake import OptionChoice
from disnake.ext import commands
from lib import logger
class Testing(commands.Cog, name='Testing'):
def __init__(self, bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(
name='select_audio',
description='Select Audios from List'
)
async def select_audio(self, inter, audios: str):
pass
@select_audio.autocomplete('audios')
async def _list_audios(self,
inter: disnake.ApplicationCommandInteraction,
current: str) -> List[OptionChoice]:
pass
def setup(bot): # an extension must have a setup function
bot.add_cog(Testing(bot)) # adding a cog

View File

@@ -1,69 +1,61 @@
import disnake
from disnake import Option, OptionType, Colour
from disnake.ext import commands
from lib import DB_Reader
from lib import logger
class General(commands.Cog):
def __init__(self, bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(
name="info",
description="Read list of tracks for user",
options=[
Option("user", "Specify any user", OptionType.user),
]
)
async def info(self, inter, user=None):
user_audio = None
default_audio = None
user = user or inter.author
_user = DB_Reader(inter.guild.id)
for r in _user:
if r.userid == user.id:
user_audio = r.usertracks
default_audio = r.defaulttracks
rolelist = [r.mention for r in user.roles if r != inter.guild.default_role]
if rolelist:
roles = "\n".join(rolelist)
else:
roles = "Not added any role"
if user_audio:
audios = "" + "\n".join(sorted(user_audio.split(", ")))
else:
audios = "Not selected audio"
if default_audio:
audios2 = "" + "\n".join(sorted(default_audio.split(", ")))
else:
audios2 = "Not selected audio"
emb = disnake.Embed(
title=f"General information",
description=f"General information on server about {user}",
color=Colour.random()
)
emb.set_thumbnail(url=user.display_avatar)
emb.add_field(name="General info",
value=f"Username: {user}\n"
f"Nickname: {user.nick}\n"
f"Joined at: {user.joined_at.strftime('%A, %B %d %Y @ %H:%M:%S')}", inline=False)
emb.add_field(name="User audio list", value=f"{audios}", inline=True)
emb.add_field(name="Default audio list", value=f"{audios2}", inline=True)
emb.add_field(name="Roles list", value=f"{roles}", inline=True)
emb.set_footer(text="Information requested by: {}".format(inter.author.display_name))
await inter.response.send_message(embed=emb, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(General(bot)) # adding a cog
import disnake
from disnake import Option, OptionType, Colour
from disnake.ext import commands
from lib import DBReader
from lib import logger
class General(commands.Cog):
def __init__(self, bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(
name="info",
description="Read list of tracks for user",
options=[
Option("user", "Specify any user", OptionType.user),
]
)
async def info(self, inter, user=None):
audio = None
user = user or inter.author
_user = DBReader(inter.guild.id)
for r in _user:
if r.userid == user.id:
audio = r.defaulttracks
rolelist = [r.mention for r in user.roles if r != inter.guild.default_role]
if rolelist:
roles = "\n".join(rolelist)
else:
roles = "Not added any role"
if audio:
audios = "" + "\n".join(sorted(audio.split(", ")))
else:
audios = "Not selected audio"
emb = disnake.Embed(
title=f"General information",
description=f"General information on server about {user}",
color=Colour.random()
)
emb.set_thumbnail(url=user.display_avatar)
emb.add_field(name="General info",
value=f"Username: {user}\n"
f"Nickname: {user.nick}\n"
f"Joined at: {user.joined_at.strftime('%A, %B %d %Y @ %H:%M:%S')}", inline=False)
emb.add_field(name="Default audio list", value=f"{audios}", inline=True)
emb.add_field(name="Roles list", value=f"{roles}", inline=True)
emb.set_footer(text="Information requested by: {}".format(inter.author.display_name))
await inter.response.send_message(embed=emb, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(General(bot)) # adding a cog

View File

@@ -1,44 +1,44 @@
import os
import disnake
import psutil
from disnake.ext import commands
from __init__ import version_info as ver
from lib import determine_prefix, determine_time
from lib import logger
class BotInfo(commands.Cog, name='Bot Info'):
def __init__(self, bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(name="info_bot",
description='Shows general info about bot') # this is for making a command
async def info_bot(self, inter):
_pid = os.getpid()
_process = psutil.Process(_pid)
emb = disnake.Embed(
title=f"General information",
description=f"General information on about bot",
)
emb.set_thumbnail(self.bot.user.avatar.url)
emb.add_field(name="System info:", value=f"Memory Usage: {round(_process.memory_info().rss / 2 ** 20, 2)} Mb\n"
f"CPU Usage: {_process.cpu_percent()}%\n"
f'Bot ping: {round(self.bot.latency * 1000)}\n'
f'Prefix: `{determine_prefix(self.bot, inter)}\n`'
f"Max audio duration: {determine_time(inter)} sec\n"
)
emb.add_field(name="Bot info:", value=f"Bot owner: <@{self.bot.owner_id}>\n"
f"Bot version: {ver.major}.{ver.minor}.{ver.micro}-{ver.releaselevel}")
emb.set_footer(text="Information requested by: {}".format(inter.author.display_name))
await inter.response.send_message(embed=emb, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(BotInfo(bot)) # adding a cog
import os
import disnake
import psutil
from disnake.ext import commands
from __init__ import version_info as ver
from lib import determine_prefix, determine_time
from lib import logger
class BotInfo(commands.Cog, name='Bot Info'):
def __init__(self, bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(name="info_bot",
description='Shows general info about bot') # this is for making a command
async def info_bot(self, inter):
_pid = os.getpid()
_process = psutil.Process(_pid)
emb = disnake.Embed(
title=f"General information",
description=f"General information on about bot",
)
emb.set_thumbnail(self.bot.user.avatar.url)
emb.add_field(name="System info:", value=f"Memory Usage: {round(_process.memory_info().rss / 2 ** 20, 2)} Mb\n"
f"CPU Usage: {_process.cpu_percent()}%\n"
f'Bot ping: {round(self.bot.latency * 1000)}\n'
f'Prefix: `{determine_prefix(self.bot, inter)}\n`'
f"Max audio duration: {determine_time(inter)} sec\n"
)
emb.add_field(name="Bot info:", value=f"Bot owner: <@386629192743256065>\n"
f"Bot version: {ver.major}.{ver.minor}.{ver.micro}-{ver.releaselevel}")
emb.set_footer(text="Information requested by: {}".format(inter.author.display_name))
await inter.response.send_message(embed=emb, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(BotInfo(bot)) # adding a cog

View File

@@ -1,67 +0,0 @@
from typing import List
import disnake
from disnake import Option, OptionType, OptionChoice
from disnake.ext import commands
from lib import ListGenerator
from lib import logger
from lib import play_audio
class Testing(commands.Cog, name='Testing'):
def __init__(self, bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(name="play_audio",
description="Make possible playing audio by command",
options=[
Option(name="audio",
type=OptionType.string,
required=True
)
])
async def playaudio(self, inter: disnake.ApplicationCommandInteraction,
audio: str
):
if inter.author.voice is not None:
await inter.response.send_message(f'Played {audio}', ephemeral=True)
await play_audio(audio, self.bot, inter.author.voice.channel)
else:
await inter.response.send_message('You`re not in voice', ephemeral=True)
@playaudio.autocomplete('audio')
async def list_to_play(self, inter: disnake.ApplicationCommandInteraction, current: str) -> List[OptionChoice]:
_def_iter = ListGenerator('audio')
_def_dict: dict = {}
for f in _def_iter:
_def_dict[f.name] = f'{f.path}/{f.name}'
_user_dict: dict = {}
try:
_user_iter = ListGenerator(f'audio/{inter.author.id}')
for f in _user_iter:
_user_dict[f.name] = f'{f.path}/{f.name}'
# user_dict = []
# for _track in user_list:
except IndexError:
pass
_dict = {}
_dict.update(_def_dict)
_dict.update(_user_dict)
return [
OptionChoice(name=choice, value=f'{_dict[choice]}')
for choice in _dict if current.lower() in choice.lower()
]
def setup(bot): # an extension must have a setup function
bot.add_cog(Testing(bot)) # adding a cog

View File

@@ -1,57 +1,57 @@
"""
lib.CogsPrepare
~~~~~~~~~~~~~
Loads, unloads Cogs files
cog_list: return list of cog filenames
work_with_cogs: loads, reloads and unloads cogs files
"""
import os
import traceback
from os import listdir
from typing import List
from disnake.ext import commands
from .Logger import logger
async def cog_list(fold='./cogs') -> List[str]:
cogs_list = []
for _filename in listdir(fold):
if _filename.endswith('.py'):
cogs_list.append(_filename[:-3])
return cogs_list
async def work_with_cogs(what_do, bot, cog):
if isinstance(cog, str):
cog = cog.split()
for _filename in cog:
if what_do == "load":
try:
bot.load_extension(f'cogs.{_filename}')
logger.info(f'Loaded cog {_filename}')
except commands.ExtensionNotFound:
logger.error(f"Error: {_filename} couldn't be find to load.")
except commands.ExtensionFailed as error:
logger.error(f'Error: {_filename} failed to load properly.\n\t{error}\n\n{traceback.format_exc()}')
except commands.ExtensionError:
logger.error(f'Error: unknown error with {_filename}')
elif what_do == 'unload':
bot.unload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} unloaded')
elif what_do == 'reload':
bot.reload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} reloaded')
elif what_do == 'disable':
bot.unload_extension(f'cogs.{_filename}')
os.rename(f'cogs/{_filename}.py', f'cogs/disabled/{_filename}.py')
logger.info(f'Cog {_filename} stopped and disabled')
elif what_do == 'enable':
os.rename(f'cogs/disabled/{_filename}.py', f'cogs/{_filename}.py')
bot.load_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} started and enabled')
"""
lib.CogsPrepare
~~~~~~~~~~~~~
Loads, unloads Cogs files
cog_list: return list of cog filenames
work_with_cogs: loads, reloads and unloads cogs files
"""
import os
import traceback
from os import listdir
from typing import List
from disnake.ext import commands
from .Logger import logger
async def cog_list(fold='./cogs') -> List[str]:
cogs_list = []
for _filename in listdir(fold):
if _filename.endswith('.py'):
cogs_list.append(_filename[:-3])
return cogs_list
async def work_with_cogs(what_do, bot, cog):
if isinstance(cog, str):
cog = cog.split()
for _filename in cog:
if what_do == "load":
try:
bot.load_extension(f'cogs.{_filename}')
logger.info(f'Loaded cog {_filename}')
except commands.ExtensionNotFound:
logger.error(f"Error: {_filename} couldn't be find to load.")
except commands.ExtensionFailed as error:
logger.error(f'Error: {_filename} failed to load properly.\n\t{error}\n\n{traceback.format_exc()}')
except commands.ExtensionError:
logger.error(f'Error: unknown error with {_filename}')
elif what_do == 'unload':
bot.unload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} unloaded')
elif what_do == 'reload':
bot.reload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} reloaded')
elif what_do == 'disable':
bot.unload_extension(f'cogs.{_filename}')
os.rename(f'cogs/{_filename}.py', f'cogs/disabled/{_filename}.py')
logger.info(f'Cog {_filename} stopped and disabled')
elif what_do == 'enable':
os.rename(f'cogs/disabled/{_filename}.py', f'cogs/{_filename}.py')
bot.load_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} started and enabled')

View File

@@ -1,113 +1,113 @@
"""
lib.Commands
~~~~~~~~~~~~~~
Some prepare for commands
"""
import json
import os
import dotenv
def preload_checks():
dotenv.load_dotenv()
if not os.path.isfile('.env') or not os.getenv('CONF_FILE'):
with open('.env', 'a', encoding='utf-8') as f:
f.write("CONF_FILE='config.json'\n")
dotenv.load_dotenv()
if not os.path.isfile(os.getenv('CONF_FILE')):
with open(os.getenv('CONF_FILE'), 'a', encoding='utf-8') as f:
f.write("")
async def list_files(fold: str = 'audio'):
if fold != 'audio': fold = f'audio/{fold}'
fl = []
for filenames in os.walk(fold):
fl.extend(filenames)
break
files = {}
for x in fl[2]:
files[x] = x
return fl[2]
async def read_json(guild: int, _param: str):
"""
Reads Json file to determite config strings
:param guild: ID of Guild
:param _param: Parameter in json file
:return: value of parameter.
"""
parameter = None
with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f: # Open the JSON
try:
_json = json.load(f) # Load the custom prefixes
except json.decoder.JSONDecodeError:
_json = {}
if guild: # If the guild exists
try:
guild_conf = _json[f"{guild}"]
try:
parameter = guild_conf[f"{_param}"]
except KeyError:
pass
except KeyError:
pass
return parameter
async def write_json(guild: int, param_name: str, param: str or int):
with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f:
try:
_json = json.load(f)
except json.decoder.JSONDecodeError:
_json = {}
try:
_guild = _json[f'{guild}']
except KeyError:
_json.update({f'{guild}': {}})
_guild = _json[f'{guild}']
_guild.update({f'{param_name}': f'{param}'})
with open(os.getenv('CONF_FILE'), 'w', encoding='utf-8') as f:
json.dump(_json, f, indent=4)
def determine_prefix(bot, msg):
"""
Determite per-server bot prefix
:param bot: Disnake Bot object
:param msg: Disnake msg object
:return: prefix for server, default is $
"""
parameter = '$'
with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f: # Open the JSON
try:
_json = json.load(f) # Load the custom prefixes
except json.JSONDecodeError:
_json = {}
try:
parameter = _json[f"{msg.guild.id}"]["prefix"] # Read prefix from json if is setted up
except KeyError:
pass
return parameter
def determine_time(msg):
"""
Determite per-server bot prefix
:param msg: Disnake msg object
:return: prefix for server, default is $
"""
parameter = 15
with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f: # Open the JSON
try:
_json = json.load(f) # Load the custom prefixes
except json.JSONDecodeError:
_json = {}
try:
parameter = _json[f"{msg.guild.id}"]["seconds"] # Read prefix from json if is setted up
except KeyError:
pass
return parameter
"""
lib.Commands
~~~~~~~~~~~~~~
Some prepare for commands
"""
import json
import os
import dotenv
def preload_checks():
dotenv.load_dotenv()
if not os.path.isfile('.env') or not os.getenv('CONF_FILE'):
with open('.env', 'a', encoding='utf-8') as f:
f.write("CONF_FILE='config.json'\n")
dotenv.load_dotenv()
if not os.path.isfile(os.getenv('CONF_FILE')):
with open(os.getenv('CONF_FILE'), 'a', encoding='utf-8') as f:
f.write("")
async def list_files(fold: str = 'audio'):
if fold != 'audio': fold = f'audio/{fold}'
fl = []
for filenames in os.walk(fold):
fl.extend(filenames)
break
files = {}
for x in fl[2]:
files[x] = x
return fl[2]
async def read_json(guild: int, _param: str):
"""
Reads Json file to determite config strings
:param guild: ID of Guild
:param _param: Parameter in json file
:return: value of parameter.
"""
parameter = None
with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f: # Open the JSON
try:
_json = json.load(f) # Load the custom prefixes
except json.decoder.JSONDecodeError:
_json = {}
if guild: # If the guild exists
try:
guild_conf = _json[f"{guild}"]
try:
parameter = guild_conf[f"{_param}"]
except KeyError:
pass
except KeyError:
pass
return parameter
async def write_json(guild: int, param_name: str, param: str or int):
with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f:
try:
_json = json.load(f)
except json.decoder.JSONDecodeError:
_json = {}
try:
_guild = _json[f'{guild}']
except KeyError:
_json.update({f'{guild}': {}})
_guild = _json[f'{guild}']
_guild.update({f'{param_name}': f'{param}'})
with open(os.getenv('CONF_FILE'), 'w', encoding='utf-8') as f:
json.dump(_json, f, indent=4)
def determine_prefix(bot, msg):
"""
Determite per-server bot prefix
:param bot: Disnake Bot object
:param msg: Disnake msg object
:return: prefix for server, default is $
"""
parameter = '$'
with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f: # Open the JSON
try:
_json = json.load(f) # Load the custom prefixes
except json.JSONDecodeError:
_json = {}
try:
parameter = _json[f"{msg.guild.id}"]["prefix"] # Read prefix from json if is setted up
except KeyError:
pass
return parameter
def determine_time(msg):
"""
Determite per-server bot prefix
:param msg: Disnake msg object
:return: prefix for server, default is $
"""
parameter = 15
with open(os.getenv('CONF_FILE'), 'r', encoding='utf-8') as f: # Open the JSON
try:
_json = json.load(f) # Load the custom prefixes
except json.JSONDecodeError:
_json = {}
try:
parameter = _json[f"{msg.guild.id}"]["seconds"] # Read prefix from json if is setted up
except KeyError:
pass
return parameter

View File

@@ -1,165 +1,165 @@
import sqlite3
from lib import logger
class DB_Reader:
def __init__(self, guildid: int = None):
self._guildid = guildid
self.list = self._read_db(self._guildid)
self._current_index = 0
def __str__(self) -> str:
return str(self._guildid)
@classmethod
def _read_db(cls, guildid: int) -> list:
try:
sql_con = sqlite3.connect("user.db")
cursor = sql_con.cursor()
cursor.execute(f"""SELECT * FROM "{guildid}" """)
record = cursor.fetchall()
return record
except sqlite3.Error as _e:
logger.info(f'Error reading DB\n{_e}')
def __iter__(self):
return _ListGenerationIter(self)
class _DBAttrs:
def __init__(self,
userid: int,
username: str,
nick: str,
isbot: bool,
defaulttracks: None or list,
usertracks: None or list):
self.userid = userid
self.username = username
self.nick = nick
self.isbot = isbot
self.defaulttracks = defaulttracks
self.usertracks = usertracks
def __str__(self):
return self.username
def __repr__(self):
return f'<File attrs userid={self.userid} username={self.username} nick={self.nick} ' \
f'isbot={self.isbot} defaulttracks={self.defaulttracks} usertracks={self.usertracks}>'
class _ListGenerationIter:
def __init__(self, user_class):
self._current_index = 0
self._list = user_class.list
self._size = len(self._list)
def __iter__(self):
return self
def __next__(self):
if self._current_index < self._size:
_userid = self._list[self._current_index][0]
_username = self._list[self._current_index][1]
_nick = self._list[self._current_index][2]
_isbot = bool(self._list[self._current_index][3])
_defaulttracks = self._list[self._current_index][4]
_usertracks = self._list[self._current_index][5]
self._current_index += 1
memb = _DBAttrs(_userid,
_username,
_nick,
_isbot,
_defaulttracks,
_usertracks)
return memb
raise StopIteration
async def prepare_db(guild: int):
try:
connect = sqlite3.connect('user.db')
cursor = connect.cursor()
create_table = (f'''CREATE TABLE IF NOT EXISTS "{guild}" ([userid] INTEGER PRIMARY KEY, [username] TEXT,
[nick] TEXT, [isbot] BOOL, [defaulttracks] TEXT, [usertracks] TEXT) ''')
cursor.execute(create_table)
cursor.close()
except sqlite3.Error as _error:
pass
async def work_with_db(db_func: str, data_turple: tuple):
"""
Writing to db per server userinfo
:param db_func:
:param data_turple:
"""
try:
connect = sqlite3.connect('user.db')
cursor = connect.cursor()
cursor.execute(db_func, data_turple)
connect.commit()
cursor.close()
except sqlite3.Error as _error:
pass
async def fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int):
sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}"
(username, userid, nick, isbot)
VALUES (?, ?, ?, ?)""")
data_tuple: tuple[str, int, str, bool] = (name, userid, nick, isbot)
await work_with_db(sqlite_insert_with_param, data_tuple)
async def add_audio(guild: int, user: int, audio: str, track: str = 'usertracks'):
"""
Adding audio into а folder and DB
:param guild: Guild id
:param user:
:param audio:
:param track: usertracks or defaulttracks.
"""
# audio = f'{DB.read_db(guild, user, track)}, {audio}'
sql_update_query = f"""UPDATE "{guild}" set {track} = ? where userid = ?"""
data_tuple = (audio, user)
await work_with_db(sql_update_query, data_tuple)
async def read_db(guild: int, user: int, column: str):
_col_dict = {'userid': 0,
'username': 1,
'nick': 2,
'isbot': 3,
'defaulttracks': 4,
'usertracks': 5}
try:
sql_con = sqlite3.connect("user.db")
cursor = sql_con.cursor()
sql_read = f"""SELECT * FROM "{guild}" where userid = {user}"""
cursor.execute(sql_read)
record = cursor.fetchone()
return record[_col_dict[column]]
except sqlite3.Error as _error:
pass
async def check_exist_audio(ctx, guild: int, user: int, column: str, audio: str):
_list_str = await read_db(guild, user, column)
print(type(_list_str))
if _list_str is not None:
_list = _list_str.split(',')
if audio in _list:
await ctx.reply("File in list")
else:
pass
else:
_list = 'None'
import sqlite3
from lib import logger
class DBReader:
def __init__(self, guildid: int = None):
self._guildid = guildid
self.list = self._read_db(self._guildid)
self._current_index = 0
def __str__(self) -> str:
return str(self._guildid)
@classmethod
def _read_db(cls, guildid: int) -> list:
try:
sql_con = sqlite3.connect("user.db")
cursor = sql_con.cursor()
cursor.execute(f"""SELECT * FROM "{guildid}" """)
record = cursor.fetchall()
return record
except sqlite3.Error as _e:
logger.info(f'Error reading DB\n{_e}')
def __iter__(self):
return _ListGenerationIter(self)
class _DBAttrs:
def __init__(self,
userid: int,
username: str,
nick: str,
isbot: bool,
defaulttracks: None or list,
usertracks: None or list):
self.userid = userid
self.username = username
self.nick = nick
self.isbot = isbot
self.defaulttracks = defaulttracks
self.usertracks = usertracks
def __str__(self):
return self.username
def __repr__(self):
return f'<File attrs userid={self.userid} username={self.username} nick={self.nick} ' \
f'isbot={self.isbot} defaulttracks={self.defaulttracks} usertracks={self.usertracks}>'
class _ListGenerationIter:
def __init__(self, user_class):
self._current_index = 0
self._list = user_class.list
self._size = len(self._list)
def __iter__(self):
return self
def __next__(self):
if self._current_index < self._size:
_userid = self._list[self._current_index][0]
_username = self._list[self._current_index][1]
_nick = self._list[self._current_index][2]
_isbot = bool(self._list[self._current_index][3])
_defaulttracks = self._list[self._current_index][4]
_usertracks = self._list[self._current_index][5]
self._current_index += 1
memb = _DBAttrs(_userid,
_username,
_nick,
_isbot,
_defaulttracks,
_usertracks)
return memb
raise StopIteration
async def prepare_db(guild: int):
try:
connect = sqlite3.connect('user.db')
cursor = connect.cursor()
create_table = (f'''CREATE TABLE IF NOT EXISTS "{guild}" ([userid] INTEGER PRIMARY KEY, [username] TEXT,
[nick] TEXT, [isbot] BOOL, [defaulttracks] TEXT, [usertracks] TEXT) ''')
cursor.execute(create_table)
cursor.close()
except sqlite3.Error as _error:
pass
async def work_with_db(db_func: str, data_turple: tuple):
"""
Writing to db per server userinfo
:param db_func:
:param data_turple:
"""
try:
connect = sqlite3.connect('user.db')
cursor = connect.cursor()
cursor.execute(db_func, data_turple)
connect.commit()
cursor.close()
except sqlite3.Error as _error:
pass
async def fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int):
sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}"
(username, userid, nick, isbot)
VALUES (?, ?, ?, ?)""")
data_tuple: tuple[str, int, str, bool] = (name, userid, nick, isbot)
await work_with_db(sqlite_insert_with_param, data_tuple)
async def add_audio(guild: int, user: int, audio: str, track: str = 'usertracks'):
"""
Adding audio into а folder and DB
:param guild: Guild id
:param user:
:param audio:
:param track: usertracks or defaulttracks.
"""
# audio = f'{DB.read_db(guild, user, track)}, {audio}'
sql_update_query = f"""UPDATE "{guild}" set {track} = ? where userid = ?"""
data_tuple = (audio, user)
await work_with_db(sql_update_query, data_tuple)
async def read_db(guild: int, user: int, column: str):
_col_dict = {'userid': 0,
'username': 1,
'nick': 2,
'isbot': 3,
'defaulttracks': 4,
'usertracks': 5}
try:
sql_con = sqlite3.connect("user.db")
cursor = sql_con.cursor()
sql_read = f"""SELECT * FROM "{guild}" where userid = {user}"""
cursor.execute(sql_read)
record = cursor.fetchone()
return record[_col_dict[column]]
except sqlite3.Error as _error:
pass
async def check_exist_audio(ctx, guild: int, user: int, column: str, audio: str):
_list_str = await read_db(guild, user, column)
print(type(_list_str))
if _list_str is not None:
_list = _list_str.split(',')
if audio in _list:
await ctx.reply("File in list")
else:
pass
else:
_list = 'None'

View File

@@ -1,87 +1,86 @@
import mimetypes
import os
from typing import Tuple, Optional
class ListGenerator:
def __init__(self, path: str = None):
self.path = path
self.list: list = self._lister(self.path)
try:
self._size = len(self.list)
except TypeError:
pass
self._current_index = 0
def __str__(self) -> str:
return 'Audio iter generator'
@classmethod
def _lister(cls, path) -> list:
_list: list = []
try:
for f in os.walk(path):
_list.extend(f)
break
return sorted(_list[2])
except TypeError:
pass
def __iter__(self):
return _ListGenerationIter(self)
def __next__(self):
if self._current_index < self._size:
self._current_index += 1
class _FileAttrs:
def __init__(self, name, path, type, exc):
self.name = name
self.path = path
self.mimetype = type
self.exc = exc
def __str__(self):
return self.name
def __repr__(self):
return f'<File attrs name={self.name} path={self.path} exc={self.exc} type=<{self.mimetype}> >'
class _ListGenerationIter:
def __init__(self, list_class):
self._current_index = 0
self._list = list_class.list
self._name = self._list[self._current_index]
self._path = list_class.path
self._size = len(self._list)
def __iter__(self):
return self
@property
def type(self) -> tuple[Optional[str], Optional[str]]:
guess = mimetypes.guess_type(f'{self._path}/{self._list[self._current_index]}')[0]
return guess
@property
def _exc(self) -> str:
try:
_ret = self._list[self._current_index].split('.')[-1]
except AttributeError:
_ret = None
return _ret
def __next__(self):
if self._current_index < self._size:
_name = self._list[self._current_index]
_path = self._path
_type = self.type
_exc = self._exc
self._current_index += 1
memb = _FileAttrs(_name, _path, _type, _exc)
return memb
raise StopIteration
import mimetypes
import os
class ListGenerator:
def __init__(self, path: str = None):
self.path = path
self.list: list = self._lister(self.path)
try:
self._size = len(self.list)
except TypeError:
pass
self._current_index = 0
def __str__(self) -> str:
return 'Audio iter generator'
@classmethod
def _lister(cls, path) -> list:
_list: list = []
try:
for f in os.walk(path):
_list.extend(f)
break
return sorted(_list[2])
except TypeError:
pass
def __iter__(self):
return _ListGenerationIter(self)
def __next__(self):
if self._current_index < self._size:
self._current_index += 1
class _FileAttrs:
def __init__(self, name, path, mimetype, exc):
self.name = name
self.path = path
self.mimetype = mimetype
self.exc = exc
def __str__(self):
return self.name
def __repr__(self):
return f'<File attrs name={self.name} path={self.path} exc={self.exc} type=<{self.mimetype}> >'
class _ListGenerationIter:
def __init__(self, list_class):
self._current_index = 0
self._list = list_class.list
self._name = self._list[self._current_index]
self._path = list_class.path
self._size = len(self._list)
def __iter__(self):
return self
@property
def type(self) -> str:
guess = mimetypes.guess_type(f'{self._path}/{self._list[self._current_index]}')[0]
return guess
@property
def _exc(self) -> str:
try:
_ret = self._list[self._current_index].split('.')[-1]
except AttributeError:
_ret = None
return _ret
def __next__(self):
if self._current_index < self._size:
_name = self._list[self._current_index]
_path = self._path
_type = self.type
_exc = self._exc
self._current_index += 1
_memb = _FileAttrs(_name, _path, _type, _exc)
return _memb
raise StopIteration

View File

@@ -1,52 +1,52 @@
import logging
class _CustomFormatter(logging.Formatter):
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
format = f"%(asctime)s - [%(levelname)s] -%(module)s- %(message)s"
FORMATS = {
logging.DEBUG: grey + format + reset,
logging.INFO: grey + format + reset,
logging.WARNING: yellow + format + reset,
logging.ERROR: red + format + reset,
logging.CRITICAL: bold_red + format + reset
}
old_factory = logging.getLogRecordFactory()
def _record_factory(*args, **kwargs):
record = _CustomFormatter.old_factory(*args, **kwargs)
_module = record.module
_levelname = record.levelname
if len(_module) % 2 == 0 and len(_module) < 12:
_module = ' ' * ((12 - len(_module)) // 2) + _module + ' ' * ((12 - len(_module)) // 2)
elif len(_module) % 2 == 1 and len(_module) < 12:
_module = ' ' * ((12 - len(_module)) // 2) + record.module + ' ' * ((12 - len(_module)) // 2 + 1)
if len(_levelname) % 2 == 0 and len(_levelname) < 8:
_levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2)
elif len(record.levelname) % 2 == 1 and len(record.module) < 8:
_levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2 + 1)
record.module = _module
record.levelname = _levelname
return record
logging.setLogRecordFactory(_record_factory)
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt, "%d-%m-%Y %H:%M:%S")
return formatter.format(record)
logger = logging.getLogger('Pisya_Bot')
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(_CustomFormatter())
logger.addHandler(ch)
import logging
class _CustomFormatter(logging.Formatter):
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
format = f"%(asctime)s - [%(levelname)s] -%(module)s- %(message)s"
FORMATS = {
logging.DEBUG: grey + format + reset,
logging.INFO: grey + format + reset,
logging.WARNING: yellow + format + reset,
logging.ERROR: red + format + reset,
logging.CRITICAL: bold_red + format + reset
}
old_factory = logging.getLogRecordFactory()
def _record_factory(*args, **kwargs):
record = _CustomFormatter.old_factory(*args, **kwargs)
_module = record.module
_levelname = record.levelname
if len(_module) % 2 == 0 and len(_module) < 12:
_module = ' ' * ((12 - len(_module)) // 2) + _module + ' ' * ((12 - len(_module)) // 2)
elif len(_module) % 2 == 1 and len(_module) < 12:
_module = ' ' * ((12 - len(_module)) // 2) + record.module + ' ' * ((12 - len(_module)) // 2 + 1)
if len(_levelname) % 2 == 0 and len(_levelname) < 8:
_levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2)
elif len(record.levelname) % 2 == 1 and len(record.module) < 8:
_levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2 + 1)
record.module = _module
record.levelname = _levelname
return record
logging.setLogRecordFactory(_record_factory)
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt, "%d-%m-%Y %H:%M:%S")
return formatter.format(record)
logger = logging.getLogger('Pisya_Bot')
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(_CustomFormatter())
logger.addHandler(ch)

View File

@@ -1,17 +1,17 @@
from .Logger import logger
from asyncio import sleep
from disnake import FFmpegPCMAudio
async def play_audio(audio, bot, vc):
if not bot.voice_clients:
logger.info(audio)
logger.error(f'Playing: {audio}')
await sleep(1)
vp = await vc.connect()
if not vp.is_playing():
vp.play(FFmpegPCMAudio(f'{audio}'), after=None)
while vp.is_playing():
await sleep(0.5)
await sleep(1)
await vp.disconnect()
from .Logger import logger
from asyncio import sleep
from disnake import FFmpegOpusAudio, opus
async def play_audio(audio, bot, vc):
if not bot.voice_clients:
# logger.info(audio)
logger.error(f'Playing: {audio}')
await sleep(1)
vp = await vc.connect()
if not vp.is_playing():
vp.play(FFmpegOpusAudio(f'{audio}', executable='ffmpeg'), after=lambda e: print('done', e))
while vp.is_playing():
await sleep(0.5)
await sleep(1)
await vp.disconnect()

View File

@@ -1,11 +1,11 @@
"""
lib
~~~~~~~~~~~~~
Some libs for the bot whitch help him
"""
from .ListGenerator import *
from .Logger import *
from .Comands import *
from .Player import *
from .DB_Worker import *
from .CogsPrep import *
"""
lib
~~~~~~~~~~~~~
Some libs for the bot whitch help him
"""
from .ListGenerator import *
from .Logger import *
from .Comands import *
from .Player import *
from .DB_Worker import *
from .CogsPrep import *

80
main.py Normal file
View File

@@ -0,0 +1,80 @@
import random
import sys
import threading
import logging
import discord
from asyncio import sleep
from os import walk
from discord import user, member
from discord import FFmpegPCMAudio
from discord.ext import commands
threading.current_thread().name = "main"
logging.basicConfig(stream=sys.stdout, filemode='w', level='INFO',
format='%(asctime)s - %(levelname)s - %(threadName)s - %(message)s')
intents = discord.Intents.default()
intents.members = True
bot = commands.Bot(command_prefix='$', guild_subscriptions=True, intents=intents)
f = []
for filenames in walk('audio'):
f.extend(filenames)
break
f = f[2]
@bot.event
async def on_voice_state_update(member, before, after):
# channel = bot.get_channel(783729824896122930)
_role = 929729495370461205
_memb = 375664768087752714
_bot_id = 946819004314570852
role = discord.utils.find(lambda r: r.name == 'тарковчане', member.roles)
if before.channel is None and role in member.roles:
track = random.randint(0, len(f) - 1)
audio_source = FFmpegPCMAudio(f'audio/{f[track]}')
logging.error(f'{track}\t\t{f[track]}')
if not bot.voice_clients:
await sleep(1)
_channel = after.channel
vc = await after.channel.connect()
if not vc.is_playing():
vc.play(audio_source, after=None)
while vc.is_playing():
await sleep(0.5)
await sleep(1)
await vc.disconnect()
if before.channel is None and member.id == _memb:
track = random.randint(0, len(f) - 1)
audio_source = FFmpegPCMAudio(f'audio/{_memb}/bear2_enemy_scav3.wav')
logging.error(f'{track}\t\t\t{f[track]}')
if not bot.voice_clients:
await sleep(1)
_channel = after.channel
vc = await after.channel.connect()
if not vc.is_playing():
vc.play(audio_source, after=None)
while vc.is_playing():
await sleep(0.5)
await sleep(1)
await vc.disconnect()
@bot.event
async def on_member_join(member):
if member.bot == 0:
role = discord.utils.get(member.guild.roles, id=734358428939452486)
else:
role = discord.utils.get(member.guild.roles, id=734358434945826858)
logging.info(f"Adding to {member} role {role}")
await member.add_roles(role)
@bot.event
async def on_ready():
logging.info(f'Bot started')
bot.run('OTQ2ODE5MDA0MzE0NTcwODUy.YhkP6Q.dhFqi2MJMrxzHt5FtjK5Cl-5BI8')

View File

@@ -1,6 +1,6 @@
disnake[audio]~=2.5.2
psutil==5.9.2
pymediainfo~=5.1.0
pyNaCl~=1.5.0
python-dotenv==0.21.0
disnake[audio]==2.8.0
psutil==5.9.4
pymediainfo==6.0.1
pyNaCl~=1.5.0
python-dotenv==1.0.0
ffmpeg-python~=0.2.0

15
setup.py Normal file
View File

@@ -0,0 +1,15 @@
from setuptools import setup
import __init__
setup(
name='fun and admin bot',
version=__init__.__version__,
packages=['lib'],
url='',
platforms='POSIX',
license='',
author='riksl',
author_email='',
description='',
requires=[]
)

5
test.py Normal file
View File

@@ -0,0 +1,5 @@
from lib import ListGenerator
for i in ListGenerator('audio'):
print(i.__repr__())
print(f"{i.path}/{i.name}")