Files
discord_bot/test.py
Slava 3456faed4e Fixed read prefix
First iteration work with cog from slash command
2022-07-01 00:23:20 +03:00

428 lines
14 KiB
Python

import json
import logging
import sqlite3
import sys
import threading
from os import walk, path, makedirs, remove, rename, listdir
import discord
import dislash
from discord.ext import commands
from dislash import InteractionClient, Option, OptionType, OptionChoice
DEFAULT_PREFIX = "$" # The prefix you want everyone to have if you don't define your own
bot_owner = 386629192743256065
def determine_prefix(bot, msg):
guild = msg.guild
with open('prefix.json', 'r', encoding='utf-8') as fp: # Open the JSON
parameter = DEFAULT_PREFIX
try:
_json = json.load(fp) # Load the custom prefixes
except:
_json = {}
if guild: # If the guild exists
try:
parameter = _json[f"{guild.id}"]["prefix"]
except:
pass
return parameter
intents = discord.Intents.default()
intents.members = True
bot = commands.Bot(command_prefix=determine_prefix, guild_subscriptions=True, intents=intents)
inter_client = InteractionClient(bot, sync_commands=True)
class DB:
@staticmethod
def prepare_db(guild: int):
try:
connect = sqlite3.connect('user.db')
cursor = connect.cursor()
create_table = (f'''CREATE TABLE IF NOT EXISTS "{guild}"
([userid] INTEGER PRIMARY KEY, [username] TEXT, [nick] TEXT, [isbot] BOOL, [track] TEXT)
''')
cursor.execute(create_table)
cursor.close()
except sqlite3.Error as error:
logging.error("failed to connecct db", error)
@staticmethod
def work_with_db(db_func, data_turple):
try:
connect = sqlite3.connect('user.db')
cursor = connect.cursor()
cursor.execute(db_func, data_turple)
connect.commit()
cursor.close()
except sqlite3.Error as error:
logging.error("failed to connecct db", error)
@staticmethod
def fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int):
sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}"
(username, userid, nick, isbot)
VALUES (?, ?, ?, ?)""")
data_tuple = (name, userid, nick, isbot)
DB.work_with_db(sqlite_insert_with_param, data_tuple)
@staticmethod
def add_audio(guild: int, user: int, audio: str):
sql_update_query = f"""UPDATE "{guild}" set track = ? where userid = ?"""
data_tuple = (audio, user)
DB.work_with_db(sql_update_query, data_tuple)
@staticmethod
def read_db(guild: int, user: int):
try:
sql_con = sqlite3.connect("user.db")
cursor = sql_con.cursor()
sql_read = f"""SELECT * FROM "{guild}" where userid = {user}"""
cursor.execute(sql_read)
record = cursor.fetchone()
return record[4]
except sqlite3.Error as error:
logging.error("Failed to read sqlite table", error)
class CogsPrepare:
@staticmethod
def cog_list():
cogs_list = []
for _filename in listdir('./cogs'):
if _filename.endswith('.py'):
cogs_list.append(_filename[:-3])
return cogs_list
@staticmethod
async def work_with_cogs(what_do, cogs):
for _filename in cogs:
if what_do == "load":
bot.load_extension(f'cogs.{_filename}')
elif what_do == 'unload':
bot.unload_extension(f'cogs.{_filename}')
elif what_do == 'reload':
bot.reload_extension(f'cogs.{_filename}')
logging.info(f'{what_do}ing cog {_filename}')
@staticmethod
async def cog_load(ctx, cog):
try:
bot.load_extension(f'cogs.{cog}')
await ctx.reply(f'Loaded cog {cog}', ephemeral=True)
except commands.ExtensionNotFound:
await ctx.send(f"Error: {cog} couldn't be found to load.", ephemeral=True)
except commands.ExtensionFailed:
await ctx.send(f'Error: {cog} failed to load properly.', ephemeral=True)
except commands.ExtensionError:
await ctx.send(f'Error: unknown error with {cog}', ephemeral=True)
class Commands:
@staticmethod
async def set_prefix(ctx, prefix: str):
await Commands.write_json(ctx.guild.id, "prefix", prefix)
await ctx.send(f"Prefix set to: `{prefix}`", ephemeral=True)
@staticmethod
def list_files(fold: str):
fl = []
for filenames in walk(fold):
fl.extend(filenames)
break
files = {}
for x in fl[2]:
files[x] = x
return files
@staticmethod
async def read_json(guild: int, param: str):
with open('prefix.json', 'r', encoding='utf-8') as fp: # Open the JSON
parameter = None
try:
_json = json.load(fp) # Load the custom prefixes
except:
_json = {}
if guild: # If the guild exists
try:
guild_conf = _json[f"{guild}"]
try:
parameter = guild_conf[f"{param}"]
except:
pass
except:
pass
fp.close()
return parameter
@staticmethod
async def write_json(guild: int, param_name: str, param: str or int):
with open('prefix.json', 'r') as _f:
try:
_json = json.load(_f)
except json.decoder.JSONDecodeError:
_json = {}
_f.close()
try:
_guild = _json[f'{guild}']
except KeyError:
_json.update({f'{guild}': {}})
_guild = _json[f'{guild}']
_guild.update({f'{param_name}': f'{param}'})
with open('prefix.json', 'w') as _f:
json.dump(_json, _f, indent=4)
_f.close()
threading.current_thread().name = "main"
logging.basicConfig(stream=sys.stdout, filemode='w', level='INFO',
format='%(asctime)s - %(levelname)s - %(threadName)s - %(message)s')
if not path.isfile('prefix.json'):
with open('prefix.json', 'w+') as f:
f.write("")
f.close()
for filename in CogsPrepare.cog_list():
try:
bot.load_extension(f'cogs.{filename}')
logging.info(f'Loaded cog {filename}')
except commands.ExtensionNotFound:
logging.error(f"Error: {filename} couldn't be found to load.")
except commands.ExtensionFailed:
logging.error(f'Error: {filename} failed to load properly.')
except commands.ExtensionError:
logging.error(f'Error: unknown error with {filename}')
@bot.event
async def on_ready():
for g in bot.get_all_members():
DB.prepare_db(g.guild.id)
for g in bot.get_all_members():
DB.fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id)
logging.info(f'Bot started')
logging.info('We have logged in as {0.user}'.format(bot))
@bot.event
async def on_guild_join(guild):
for g in guild.members:
DB.fill_bd(g.name, g.id, g.bot, g.nick, guild.id)
@bot.event
async def on_member_join(member):
DB.fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id)
bot_role = Commands.read_json(member.guild.id, 'bot_role') # Get bot role
guest_role = Commands.read_json(member.guild.id, 'guest_role') # Get guest role
if bot_role and guest_role:
if member.bot == 0:
role = discord.utils.get(member.guild.roles, id=guest_role)
else:
role = discord.utils.get(member.guild.roles, id=bot_role)
logging.info(f"Adding to {member} role {role}")
await member.add_roles(role)
@bot.event
async def on_member_update(before: discord.Member, after: discord.Member):
sql_update_query = f"""UPDATE "{after.guild.id}" set nick = ? where userid = ?"""
data_tuple = (after.nick, before.id)
DB.work_with_db(sql_update_query, data_tuple)
@bot.command(name="upload_audio")
async def upload_audio(ctx, user=None):
user = user or ctx.author
if ctx.author.guild_permissions.administrator or user is ctx.author:
if ctx.message.attachments:
from os import error
if not path.isdir(f'tmp/{user.id}'):
try:
makedirs(f'tmp/{user.id}')
except error as error:
logging.info(f"Failed to create dir", error)
if not path.isdir(f'audio/{user.id}'):
try:
makedirs(f'audio/{user.id}')
except error as error:
logging.info(f"Failed to create dir", error)
for at in ctx.message.attachments:
import mimetypes
await at.save(f'tmp/{user.id}/{at.filename}')
guess = mimetypes.guess_type(f'tmp/{user.id}/{at.filename}')
if guess[0]:
from pymediainfo import MediaInfo
file = f'tmp/{user.id}/{at.filename}'
duration = round(MediaInfo.parse(file).tracks[0].duration / 1000)
if duration > 15:
await ctx.reply(f'Audio duration is {duration}, but max is 15', ephemeral=True)
else:
a = DB.read_db(ctx.guild.id, user.id)
if a is None:
audiolist = f'{user.id}/{at.filename}'
else:
audiolist = DB.read_db(ctx.guild.id, user.id) + ", " + f'{user.id}/{at.filename}'
DB.add_audio(ctx.guild.id, user.id, audiolist)
rename(f'tmp/{user.id}/{at.filename}', f'audio/{user.id}/{at.filename}')
else:
await ctx.reply(f'Failed to find MIME type of {at.filename}', ephemeral=True)
remove(f'tmp/{user.id}/{at.filename}')
else:
await ctx.reply("Has no Attachment", ephemeral=True)
else:
await ctx.reply(f'You`re not admin. You can add audio only for your own account', ephemeral=True)
@bot.command(name="set_prefix")
@commands.has_permissions(administrator=True)
async def command_set_prefix(ctx, prefix: str):
await Commands.set_prefix(ctx, prefix)
@inter_client.slash_command(
name="set_prefix",
description="Settin up bot prefix",
options=[
Option("prefix", "Specify prefix", OptionType.STRING, required=True),
]
)
@dislash.has_permissions(administrator=True)
async def slash_set_prefix(ctx, prefix: str):
await Commands.set_prefix(ctx, prefix)
@slash_set_prefix.error
@command_set_prefix.error
async def set_prefix_error(ctx):
await ctx.reply('You don`t have permissions', ephemeral=True)
@inter_client.slash_command(
name="info",
description="Read list of tracks for user",
options=[
Option("user", "Specify any user", OptionType.USER),
]
)
async def info(ctx, user=None):
user = user or ctx.author
audio = DB.read_db(ctx.guild.id, user.id)
rolelist = [r.mention for r in user.roles if r != ctx.guild.default_role]
if rolelist:
roles = "\n".join(rolelist)
else:
roles = "Not added any role"
if audio is None:
audios = "Not selected audio"
else:
audios = "" + "\n".join(sorted(audio.split(", ")))
emb = discord.Embed(
title=f"General information",
description=f"General information on server about {user}",
icon=user.avatar_url
)
emb.set_thumbnail(url=user.avatar_url)
emb.add_field(name="General info",
value=f"Username: {user}\n"
f"Nickname: {user.nick}\n"
f"Joined at: {user.joined_at.strftime('%A, %B %d %Y @ %H:%M:%S')}", inline=False)
emb.add_field(name="Audio list", value=f"{audios}", inline=True)
emb.add_field(name="Roles list", value=f"{roles}", inline=True)
emb.set_footer(text="Information requested by: {}".format(ctx.author.display_name))
await ctx.reply(embed=emb, ephemeral=True)
@inter_client.slash_command(
name="set_trigger_role",
description="Setting up role to trigger bot",
options=[
Option("role", "Specify role", OptionType.ROLE, required=True),
]
)
@commands.has_permissions(administrator=True)
async def set_trigger_role(ctx, role):
await Commands.write_json(ctx.guild.id, "tigger_role", role.id)
await ctx.send(f"Role to trigger set to : `{role.name}`")
@inter_client.slash_command(
name="set_bot_role",
description="Set Default bot role",
options=[
Option("role", "Specify role", OptionType.ROLE, required=True),
]
)
@commands.has_permissions(administrator=True)
async def set_bot_role(ctx, role):
await Commands.write_json(ctx.guild.id, "bot_role", role.id)
await ctx.send(f"Setted up bot role to: `{role.name}`")
@inter_client.slash_command(
name="cog",
description="Work with cogs",
options=[
Option(
"what_do",
description="Specify what do with cogs",
type=OptionType.STRING,
required=True,
choices=[
OptionChoice("load", "load"),
OptionChoice("unload", "unload"),
OptionChoice("reload", "reload")
]
),
Option('cog', "Specify cog if need", OptionType.STRING)
]
)
async def slash_cogs(ctx, what_do, cogs=None):
if cogs is None:
cogs = CogsPrepare.cog_list()
if ctx.author.id == bot_owner:
await CogsPrepare.work_with_cogs(what_do, cogs)
if cogs is CogsPrepare.cog_list():
await ctx.reply(f'Cogs are {what_do}ed', ephemeral=True)
else:
await ctx.reply(f'Cog {cogs[0]} is {what_do}ed', ephemeral=True)
else:
await ctx.reply('You`re not bot owner', ephemeral=True)
@inter_client.slash_command(
name="set_guest_role",
description="Set Default bot role",
options=[
Option("role", "Specify role", OptionType.ROLE, required=True),
]
)
@commands.has_permissions(administrator=True)
async def set_guest_role(ctx, role):
await Commands.write_json(ctx.guild.id, "guest_role", role.id)
await ctx.send(f"Setted up bot role to: `{role.name}`")
bot.run('OTQ3OTUzOTAxNzgzNjIxNjYy.GTXbMv.KrztaTO7-ivsPEAVjsyikSQ-GP-ANwULmDraig')