Files
discord_bot/test.py
2022-06-27 21:50:26 +03:00

390 lines
12 KiB
Python

import json
import logging
import sqlite3
import sys
import threading
from os import walk, path, makedirs, remove, rename
from typing import List, Union, Any
import discord
from discord.ext import commands
from dislash import InteractionClient, Option, OptionType
class DB:
@staticmethod
def _prepare_db(guild):
try:
connect = sqlite3.connect('user.db')
cursor = connect.cursor()
create_table = (f'''CREATE TABLE IF NOT EXISTS "{guild}"
([userid] INTEGER PRIMARY KEY, [username] TEXT, [nick] TEXT, [isbot] BOOL, [track] TEXT)
''')
cursor.execute(create_table)
cursor.close()
except sqlite3.Error as error:
logging.error("failed to connecct db", error)
@staticmethod
def _work_with_db(db_func, data_turple):
try:
connect = sqlite3.connect('user.db')
cursor = connect.cursor()
cursor.execute(db_func, data_turple)
connect.commit()
cursor.close()
except sqlite3.Error as error:
logging.error("failed to connecct db", error)
@staticmethod
def _fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int):
sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}"
(username, userid, nick, isbot)
VALUES (?, ?, ?, ?)""")
data_tuple = (name, userid, nick, isbot)
DB._work_with_db(sqlite_insert_with_param, data_tuple)
@staticmethod
def _add_audio(guildid, user, audio):
sql_update_query = f"""UPDATE "{guildid}" set track = ? where userid = ?"""
data_tuple = (audio, user)
DB._work_with_db(sql_update_query, data_tuple)
@staticmethod
def _read_db(guildid, user):
try:
sql_con = sqlite3.connect("user.db")
cursor = sql_con.cursor()
sql_read = f"""SELECT * FROM "{guildid}" where userid = {user}"""
cursor.execute(sql_read)
record = cursor.fetchone()
# if record[4] is None:
# return "None"
# else:
# return record[4]
return record[4]
except sqlite3.Error as error:
logging.error("Failed to read sqlite table", error)
threading.current_thread().name = "main"
logging.basicConfig(stream=sys.stdout, filemode='w', level='INFO',
format='%(asctime)s - %(levelname)s - %(threadName)s - %(message)s')
DEFAULT_PREFIX = "$" # The prefix you want everyone to have if you don't define your own
if not path.isfile('prefix.json'):
with open('prefix.json', 'w+') as f:
f.write("")
f.close()
def determine_prefix(bot, msg):
guild = msg.guild
base = DEFAULT_PREFIX
try:
with open('prefix.json', 'r', encoding='utf-8') as fp: # Open the JSON
jsonObject = json.load(fp) # Load the custom prefixes
except:
jsonObject = {}
if guild: # If the guild exists
try:
guild_conf = jsonObject[f"{guild.id}"]
try:
prefix = guild_conf["prefix"] # Get the prefix
base = prefix
except:
pass
except KeyError:
pass
return base
intents = discord.Intents.default()
intents.members = True
bot = commands.Bot(command_prefix=determine_prefix, guild_subscriptions=True, intents=intents)
inter_client = InteractionClient(bot)
class Commands:
async def setprefix(ctx, prefix: str):
with open('prefix.json', 'r', encoding='utf-8') as fp:
try:
jsonObject = json.load(fp)
except:
jsonObject = {}
try:
jsonObject[f"{ctx.guild.id}"]["prefix"] = prefix
except KeyError:
guild = jsonObject[f"{ctx.guild.id}"] = {'prefix': ''}
new = {"prefix": prefix}
guild.update(new)
jsonObject[str(ctx.guild.id)] = guild
await ctx.send(f"Prefix set to: `{prefix}`")
with open("prefix.json", "w") as fl:
json.dump(jsonObject, fl)
fl.close()
def list_files(folder):
f = []
for filenames in walk(folder):
f.extend(filenames)
break
f = f[2]
files = {}
for x in f:
files[x] = x
return files
@bot.event
async def on_ready():
for g in bot.get_all_members():
DB._prepare_db(g.guild.id)
for g in bot.get_all_members():
DB._fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id)
logging.info(f'Bot started')
logging.info('We have logged in as {0.user}'.format(bot))
@bot.event
async def on_guild_join(guild):
for g in guild.members:
DB._fill_bd(g.name, g.id, g.bot, g.nick, guild.id)
@bot.event
async def on_member_join(member):
DB._fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id)
try:
with open('prefix.json', 'r', encoding='utf-8') as fp: # Open the JSON
jsonObject = json.load(fp) # Load the custom prefixes
except:
jsonObject = {}
if member.guild.id: # If the guild exists
try:
guild_conf = jsonObject[f"{member.guild.id}"]
try:
bot_role = guild_conf["bot_role"] # Get bot role
guest_role = guild_conf["guest_role"] # Get guest role
except:
pass
except KeyError:
pass
if bot_role and guest_role:
if member.bot == 0:
role = discord.utils.get(member.guild.roles, id=guest_role)
else:
role = discord.utils.get(member.guild.roles, id=bot_role)
logging.info(f"Adding to {member} role {role}")
await member.add_roles(role)
@bot.event
async def on_member_update(before: discord.Member, after: discord.Member):
sql_update_query = f"""UPDATE "{after.guild.id}" set nick = ? where userid = ?"""
data_tuple = (after.nick, before.id)
DB._work_with_db(sql_update_query, data_tuple)
@bot.command(name="upload_audio")
async def upload_audio(ctx, user=None):
user = user or ctx.author
if ctx.author.guild_permissions.administrator or user is ctx.author:
if ctx.message.attachments:
from os import error
if not path.isdir(f'tmp/{user.id}'):
try:
makedirs(f'tmp/{user.id}')
except error as error:
logging.info(f"Failed to create dir", error)
if not path.isdir(f'audio/{user.id}'):
try:
makedirs(f'audio/{user.id}')
except error as error:
logging.info(f"Failed to create dir", error)
for at in ctx.message.attachments:
import mimetypes
await at.save(f'tmp/{user.id}/{at.filename}')
guess = mimetypes.guess_type(f'tmp/{user.id}/{at.filename}')
if guess[0]:
from pymediainfo import MediaInfo
file = f'tmp/{user.id}/{at.filename}'
duration = round(MediaInfo.parse(file).tracks[0].duration / 1000)
if duration > 15:
await ctx.reply(f'Audio duration is {duration}, but max is 15')
else:
a = DB._read_db(ctx.guild.id, user.id)
if a is None:
audiolist = f'{user.id}/{at.filename}'
else:
audiolist = DB._read_db(ctx.guild.id, user.id) + ", " + f'{user.id}/{at.filename}'
DB._add_audio(ctx.guild.id, user.id, audiolist)
rename(f'tmp/{user.id}/{at.filename}', f'audio/{user.id}/{at.filename}')
else:
await ctx.reply(f'Failed to find MIME type of {at.filename}')
remove(f'tmp/{user.id}/{at.filename}')
else:
await ctx.reply("Has no Attachment")
else:
await ctx.reply(f'You`re not admin. You can add audio only for your own account')
@bot.command(name="set_prefix")
@commands.has_permissions(administrator=True)
async def command_setprefix(ctx, prefix: str):
await Commands.setprefix(ctx, prefix)
@inter_client.slash_command(
name="set_prefix",
description="Settin up bot prefix",
options=[
Option("prefix", "Specify prefix", OptionType.STRING, required=True),
]
)
async def slash_setprefix(ctx, prefix: str):
await Commands.setprefix(ctx, prefix)
@inter_client.slash_command(
name="info",
description="Read list of tracks for user",
options=[
Option("user", "Specify any user", OptionType.USER),
]
)
async def info(ctx, user=None):
user = user or ctx.author
audio = DB._read_db(ctx.guild.id, user.id)
rolelist = [r.mention for r in user.roles if r != ctx.guild.default_role]
if rolelist:
roles = "\n".join(rolelist)
else:
roles = "Not added any role"
if audio is None:
audios = "Not selected audio"
else:
audios = "" + "\n".join(sorted(audio.split(", ")))
emb = discord.Embed(
title=f"General information",
description=f"General information on server about {user}",
icon=user.avatar_url
)
emb.set_thumbnail(url=user.avatar_url)
emb.add_field(name="General info",
value=f"Username: {user}\n"
f"Nickname: {user.nick}\n"
f"Joined at: {user.joined_at.strftime('%A, %B %d %Y @ %H:%M:%S')}", inline=False)
emb.add_field(name="Audio list", value=f"{audios}", inline=True)
emb.add_field(name="Roles list", value=f"{roles}", inline=True)
emb.set_footer(text="Information requested by: {}".format(ctx.author.display_name))
await ctx.reply(embed=emb)
@inter_client.slash_command(
name="set_trigger_role",
description="Setting up role to trigger bot",
options=[
Option("role", "Specify role", OptionType.ROLE, required=True),
]
)
@commands.has_permissions(administrator=True)
async def set_trigger_role(ctx, role):
with open('prefix.json', 'r', encoding='utf-8') as fp:
try:
jsonObject = json.load(fp)
except:
jsonObject = {}
try:
jsonObject[f"{ctx.guild.id}"]["tigger_role"] = role.id
except KeyError:
trigger_role = jsonObject[f"{ctx.guild.id}"] = {'tigger_role': ''}
new = {"tigger_role": role.id}
trigger_role.update(new)
jsonObject[str(ctx.guild.id)] = trigger_role
await ctx.send(f"Role to trigger set to : `{role.name}`")
with open("prefix.json", "w") as fl:
json.dump(jsonObject, fl)
fl.close()
@inter_client.slash_command(
name="set_bot_role",
description="Set Default bot role",
options=[
Option("role", "Specify role", OptionType.ROLE, required=True),
]
)
@commands.has_permissions(administrator=True)
async def set_bot_role(ctx, role):
with open('prefix.json', 'r', encoding='utf-8') as fp:
try:
jsonObject = json.load(fp)
except:
jsonObject = {}
try:
jsonObject[f"{ctx.guild.id}"]["bot_role"] = role.id
except KeyError:
bot_role = jsonObject[f"{ctx.guild.id}"] = {'bot_role': ''}
new = {"bot_role": role.id}
bot_role.update(new)
jsonObject[str(ctx.guild.id)] = bot_role
await ctx.send(f"Setted up bot role to: `{role.name}`")
with open("prefix.json", "w") as fl:
json.dump(jsonObject, fl)
fl.close()
@inter_client.slash_command(
name="set_guest_role",
description="Set Default bot role",
options=[
Option("role", "Specify role", OptionType.ROLE, required=True),
]
)
@commands.has_permissions(administrator=True)
async def set_guest_role(ctx, role):
with open('prefix.json', 'r', encoding='utf-8') as fp:
try:
jsonObject = json.load(fp)
except:
jsonObject = {}
try:
jsonObject[f"{ctx.guild.id}"]["guest_role"] = role.id
except KeyError:
bot_role = jsonObject[f"{ctx.guild.id}"] = {'guest_role': ''}
new = {"guest_role": role.id}
bot_role.update(new)
jsonObject[str(ctx.guild.id)] = bot_role
await ctx.send(f"Setted up bot role to: `{role.name}`")
with open("prefix.json", "w") as fl:
json.dump(jsonObject, fl)
fl.close()
bot.run('OTQ3OTUzOTAxNzgzNjIxNjYy.GTXbMv.KrztaTO7-ivsPEAVjsyikSQ-GP-ANwULmDraig')