import json import os import sys import threading import logging import discord import sqlite3 from os import walk from discord.ext import commands from dislash import InteractionClient, Option, OptionType, SelectMenu, SelectOption class DB: def _prepare_db(guild): try: connect = sqlite3.connect('user.db') cursor = connect.cursor() create_table = (f'''CREATE TABLE IF NOT EXISTS "{guild}" ([userid] INTEGER PRIMARY KEY, [username] TEXT, [nick] TEXT, [isbot] BOOL, [track] TEXT) ''') cursor.execute(create_table) cursor.close() except sqlite3.Error as error: logging.error("failed to connecct db", error) def _work_with_db(db_func, data_turple): try: connect = sqlite3.connect('user.db') cursor = connect.cursor() cursor.execute(db_func, data_turple) connect.commit() cursor.close() except sqlite3.Error as error: logging.error("failed to connecct db", error) def _fill_bd(name, userid, isbot, nick, guild): sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}" (username, userid, nick, isbot) VALUES (?, ?, ?, ?)""") data_tuple = (name, userid, nick, isbot) DB._work_with_db(sqlite_insert_with_param, data_tuple) def _add_audio(guildid, user, audio): sql_update_query = f"""UPDATE "{guildid}" set track = ? where userid = ?""" data_tuple = (audio, user) DB._work_with_db(sql_update_query, data_tuple) def _read_db(guildid, user): try: sql_con = sqlite3.connect("user.db") cursor = sql_con.cursor() sql_read = f"""SELECT * FROM "{guildid}" where userid = {user}""" cursor.execute(sql_read) record = cursor.fetchone() if record[4] is None: return "None" else: return record[4] except sqlite3.Error as error: logging.error("Failed to read sqlite table", error) threading.current_thread().name = "main" logging.basicConfig(stream=sys.stdout, filemode='w', level='INFO', format='%(asctime)s - %(levelname)s - %(threadName)s - %(message)s') with open("config.json", 'r') as f: prefixes = json.load(f) default_prefix = "$" def prefix(bot, message): id = message.guild.id return prefixes.get(id, default_prefix) intents = discord.Intents.default() intents.members = True bot = commands.Bot(command_prefix=prefix, guild_subscriptions=True, intents=intents) inter_client = InteractionClient(bot) class Arg: f = [] for filenames in walk('audio'): f.extend(filenames) break f = f[2] dict = {} keys = range(len(f)) for i in keys: for x in f: dict[x] = x class Audio: def add(user,): pass @bot.event async def on_ready(): for g in bot.get_all_members(): DB._prepare_db(g.guild.id) for g in bot.get_all_members(): DB._fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id) logging.info(f'Bot started') logging.info('We have logged in as {0.user}'.format(bot)) @bot.event async def on_guild_join(guild): with open('config.json', 'r') as f: prefixes = json.load(f) prefixes[str(guild.id)] = '$' with open('config.json', 'r') as f: json.dump(prefixes, f, indent=4) for g in guild.members: DB._fill_bd(g.name, g.id, g.bot, g.nick, guild.id) @bot.event async def on_member_join(member): DB._fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id) @inter_client.slash_command( name="add-audio", description="Add audio track to user", options=[ Option("audio", "Specify audio name", OptionType.STRING, required=True), Option("user", "Specify any user", OptionType.USER) ] ) async def add_audio(ctx, audio, user=None): user = user or ctx.author a = DB._read_db(ctx.guild.id, user.id) if a == "None": audiolist = audio else: audiolist = DB._read_db(ctx.guild.id, user.id) + ", " + audio DB._add_audio(ctx.guild.id, user.id, audiolist) emb = discord.Embed( title=f"Sucseed added {audio} to {user}", color=discord.Color.blue() ) emb.set_thumbnail(url=user.avatar_url) await ctx.reply(embed=emb) @inter_client.slash_command( name="info", description="Read list of tracks for user", options=[ Option("user", "Specify any user", OptionType.USER), ] ) async def info(ctx, user=None): user = user or ctx.author audio = DB._read_db(ctx.guild.id, user.id) rolelist = [r.mention for r in user.roles if r != ctx.guild.default_role] if rolelist: roles = "\n".join(rolelist) else: roles = "Not added any role" if audio == "None": audios = "Not selected audio" else: audios = "• " + "\n• ".join(sorted(audio.split(", "))) emb = discord.Embed( title=f"General information", description=f"General information on server about {user}", icon=user.avatar_url ) emb.set_thumbnail(url=user.avatar_url) emb.add_field(name="General info", value=f"Username: {user}\n" f"Nickname: {user.nick}\n" f"Joined at: {user.joined_at.strftime('%A, %B %d %Y @ %H:%M:%S')}", inline=False) emb.add_field(name="Audio list", value=f"{audios}", inline=True) emb.add_field(name="Roles list", value=f"{roles}", inline=True) emb.set_footer(text="Information requested by: {}".format(ctx.author.display_name)) await ctx.reply(embed=emb) @bot.command() async def select_audio(ctx, user=None): msg = await ctx.send( "Select audio for user!", components=[ SelectMenu( custom_id="test", placeholder="List of audio?", max_values=2, options=[ SelectOption("Option 1", "value 1"), SelectOption("Option 2", "value 2"), SelectOption("Option 3", "value 3") ] ) ] ) # Wait for someone to click on it inter = await msg.wait_for_dropdown() # Send what you received labels = [option.label for option in inter.select_menu.selected_options] await inter.reply(f"Options: {', '.join(labels)}") @inter_client.slash_command(description="Select audio from groups to user") async def groups(inter): pass @groups.sub_command_group() async def select_user_audio(inter, user=None): pass @bot.command(name="upload_audio") async def upload_audio(ctx, user=None): user = user or ctx.author if ctx.author.guild_permissions.administrator or user is ctx.author: if ctx.message.attachments: if not os.path.isdir(f'tmp/{user.id}'): try: os.makedirs(f'tmp/{user.id}') except os.error as error: logging.info(f"Failed to create dir", error) for a in ctx.message.attachments: import mimetypes await a.save(f'tmp/{user.id}/{a.filename}') guess = mimetypes.guess_type(f'tmp/{user.id}/{a.filename}') if guess[0]: from pymediainfo import MediaInfo file = f'tmp/{user.id}/{a.filename}' duration = round(MediaInfo.parse(file).tracks[0].duration / 1000) if duration > 15: await ctx.reply(f'Audio duration is {duration}, but max is 10') else: await ctx.reply(f'Failed to find MIME type of {a.filename}') os.remove(f'tmp/{user.id}/{a.filename}') else: await ctx.reply("Has no Attachment") else: await ctx.reply(f'You`re not admin. You can add audio only for your own account') @bot.event async def on_member_update(before: discord.Member, after: discord.Member): sql_update_query = f"""UPDATE "{after.guild.id}" set nick = ? where userid = ?""" data_tuple = (after.nick, before.id) DB._work_with_db(sql_update_query, data_tuple) @bot.command(pass_context=True) @commands.has_permissions(administrator=True) #ensure that only administrators can use this command async def changeprefix(ctx, prefix): #command: bl!changeprefix ... with open('config.json', 'r') as f: prefixes = json.load(f) prefixes[str(ctx.guild.id)] = prefix with open('config.json', 'w') as f: #writes the new prefix into the .json json.dump(prefixes, f, indent=4) await ctx.send(f'Prefix changed to: {prefix}') #confirms the prefix it's been changed to #next step completely optional: changes bot nickname to also have prefix in the nickname name=f'{prefix}BotBot' bot.run('OTQ3OTUzOTAxNzgzNjIxNjYy.GTXbMv.KrztaTO7-ivsPEAVjsyikSQ-GP-ANwULmDraig')