425 lines
14 KiB
Python
425 lines
14 KiB
Python
import json
|
|
import logging
|
|
import sqlite3
|
|
import sys
|
|
import threading
|
|
from os import walk, path, makedirs, remove, rename, listdir
|
|
|
|
import discord
|
|
from discord.ext import commands
|
|
from dislash import InteractionClient, Option, OptionType, OptionChoice
|
|
|
|
|
|
def determine_prefix(bot, msg):
|
|
guild = msg.guild
|
|
base = Commands.read_json(guild.id, 'prefix') or DEFAULT_PREFIX # Get bot role
|
|
return base
|
|
|
|
|
|
bot_owner = 386629192743256065
|
|
DEFAULT_PREFIX = "$" # The prefix you want everyone to have if you don't define your own
|
|
|
|
intents = discord.Intents.default()
|
|
intents.members = True
|
|
bot = commands.Bot(command_prefix=determine_prefix, guild_subscriptions=True, intents=intents)
|
|
inter_client = InteractionClient(bot)
|
|
|
|
|
|
class DB:
|
|
@staticmethod
|
|
def prepare_db(guild: int):
|
|
try:
|
|
connect = sqlite3.connect('user.db')
|
|
cursor = connect.cursor()
|
|
create_table = (f'''CREATE TABLE IF NOT EXISTS "{guild}"
|
|
([userid] INTEGER PRIMARY KEY, [username] TEXT, [nick] TEXT, [isbot] BOOL, [track] TEXT)
|
|
''')
|
|
cursor.execute(create_table)
|
|
cursor.close()
|
|
except sqlite3.Error as error:
|
|
logging.error("failed to connecct db", error)
|
|
|
|
@staticmethod
|
|
def work_with_db(db_func, data_turple):
|
|
try:
|
|
connect = sqlite3.connect('user.db')
|
|
cursor = connect.cursor()
|
|
cursor.execute(db_func, data_turple)
|
|
connect.commit()
|
|
cursor.close()
|
|
|
|
except sqlite3.Error as error:
|
|
logging.error("failed to connecct db", error)
|
|
|
|
@staticmethod
|
|
def fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int):
|
|
sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}"
|
|
(username, userid, nick, isbot)
|
|
VALUES (?, ?, ?, ?)""")
|
|
data_tuple = (name, userid, nick, isbot)
|
|
DB.work_with_db(sqlite_insert_with_param, data_tuple)
|
|
|
|
@staticmethod
|
|
def add_audio(guild: int, user: int, audio: str):
|
|
sql_update_query = f"""UPDATE "{guild}" set track = ? where userid = ?"""
|
|
data_tuple = (audio, user)
|
|
DB.work_with_db(sql_update_query, data_tuple)
|
|
|
|
@staticmethod
|
|
def read_db(guild: int, user: int):
|
|
try:
|
|
sql_con = sqlite3.connect("user.db")
|
|
cursor = sql_con.cursor()
|
|
sql_read = f"""SELECT * FROM "{guild}" where userid = {user}"""
|
|
cursor.execute(sql_read)
|
|
record = cursor.fetchone()
|
|
return record[4]
|
|
|
|
except sqlite3.Error as error:
|
|
logging.error("Failed to read sqlite table", error)
|
|
|
|
|
|
class CogsPrepare:
|
|
@staticmethod
|
|
def cog_list():
|
|
cogs_list = []
|
|
for _filename in listdir('./cogs'):
|
|
if _filename.endswith('.py'):
|
|
cogs_list.append(_filename[:-3])
|
|
return cogs_list
|
|
|
|
@staticmethod
|
|
async def cogs_load():
|
|
for _filename in CogsPrepare.cog_list():
|
|
bot.load_extension(f'cogs.{_filename}')
|
|
print(f'Load cog {_filename}')
|
|
|
|
@staticmethod
|
|
async def cogs_unload():
|
|
for _filename in CogsPrepare.cog_list():
|
|
bot.unload_extension(f'cogs.{_filename}')
|
|
print(f'Unload cog {_filename}')
|
|
|
|
@staticmethod
|
|
async def work_with_cogs(what_do, cogs):
|
|
for _filename in cogs:
|
|
if what_do == "load":
|
|
bot.load_extension(f'cogs.{_filename}')
|
|
elif what_do == 'unload':
|
|
bot.unload_extension(f'cogs.{_filename}')
|
|
elif what_do == 'reload':
|
|
bot.reload_extension(f'cogs.{_filename}')
|
|
logging.info(f'{what_do}ing cog {_filename}')
|
|
|
|
@staticmethod
|
|
async def cog_load(ctx, cog):
|
|
try:
|
|
bot.load_extension(f'cogs.{cog}')
|
|
await ctx.reply(f'Loaded cog {cog}')
|
|
|
|
except commands.ExtensionNotFound:
|
|
await ctx.send(f"Error: {cog} couldn't be found to load.")
|
|
|
|
except commands.ExtensionFailed:
|
|
await ctx.send(f'Error: {cog} failed to load properly.')
|
|
|
|
except commands.ExtensionError:
|
|
await ctx.send(f'Error: unknown error with {cog}')
|
|
|
|
|
|
class Commands:
|
|
@staticmethod
|
|
async def set_prefix(ctx, prefix: str):
|
|
await Commands.write_json(ctx.guild.id, "prefix", prefix)
|
|
await ctx.send(f"Prefix set to: `{prefix}`")
|
|
|
|
@staticmethod
|
|
def list_files(fold: str):
|
|
fl = []
|
|
for filenames in walk(fold):
|
|
fl.extend(filenames)
|
|
break
|
|
files = {}
|
|
for x in fl[2]:
|
|
files[x] = x
|
|
|
|
return files
|
|
|
|
@staticmethod
|
|
async def read_json(guild: int, param: str):
|
|
parameter = None
|
|
try:
|
|
with open('prefix.json', 'r', encoding='utf-8') as fp: # Open the JSON
|
|
jsonObject = json.load(fp) # Load the custom prefixes
|
|
except TypeError:
|
|
jsonObject = {}
|
|
if guild: # If the guild exists
|
|
try:
|
|
guild_conf = jsonObject[f"{guild}"]
|
|
try:
|
|
parameter = guild_conf[f"{param}"]
|
|
except TypeError:
|
|
pass
|
|
except KeyError:
|
|
pass
|
|
fp.close()
|
|
return parameter
|
|
|
|
@staticmethod
|
|
async def write_json(guild: int, param_name: str, param: str or int):
|
|
with open('prefix.json', 'r') as _f:
|
|
try:
|
|
_jsonObject = json.load(_f)
|
|
except json.JSONDecodeError:
|
|
_jsonObject = {}
|
|
try:
|
|
_guild_conf = _jsonObject[f'{guild}']
|
|
except KeyError as error:
|
|
_guild_conf = {f'{guild}': {}}
|
|
try:
|
|
_param = _guild_conf[f"{param_name}"]
|
|
|
|
except KeyError:
|
|
_param = {f'{param_name}': f'{param}'}
|
|
|
|
_jsonObject.update(_guild_conf)
|
|
print(_jsonObject)
|
|
with open('prefix.json', 'w') as _f:
|
|
json.dump(_jsonObject, _f)
|
|
_f.close()
|
|
|
|
|
|
threading.current_thread().name = "main"
|
|
logging.basicConfig(stream=sys.stdout, filemode='w', level='INFO',
|
|
format='%(asctime)s - %(levelname)s - %(threadName)s - %(message)s')
|
|
|
|
if not path.isfile('prefix.json'):
|
|
with open('prefix.json', 'w+') as f:
|
|
f.write("")
|
|
f.close()
|
|
|
|
for filename in CogsPrepare.cog_list():
|
|
try:
|
|
bot.load_extension(f'cogs.{filename}')
|
|
logging.info(f'Loaded cog {filename}')
|
|
|
|
except commands.ExtensionNotFound:
|
|
logging.error(f"Error: {filename} couldn't be found to load.")
|
|
|
|
except commands.ExtensionFailed:
|
|
logging.error(f'Error: {filename} failed to load properly.')
|
|
|
|
except commands.ExtensionError:
|
|
logging.error(f'Error: unknown error with {filename}')
|
|
|
|
|
|
@bot.event
|
|
async def on_ready():
|
|
for g in bot.get_all_members():
|
|
DB.prepare_db(g.guild.id)
|
|
for g in bot.get_all_members():
|
|
DB.fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id)
|
|
logging.info(f'Bot started')
|
|
logging.info('We have logged in as {0.user}'.format(bot))
|
|
|
|
|
|
@bot.event
|
|
async def on_guild_join(guild):
|
|
for g in guild.members:
|
|
DB.fill_bd(g.name, g.id, g.bot, g.nick, guild.id)
|
|
|
|
|
|
@bot.event
|
|
async def on_member_join(member):
|
|
DB.fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id)
|
|
|
|
bot_role = Commands.read_json(member.guild.id, 'bot_role') # Get bot role
|
|
guest_role = Commands.read_json(member.guild.id, 'guest_role') # Get guest role
|
|
|
|
if bot_role and guest_role:
|
|
if member.bot == 0:
|
|
role = discord.utils.get(member.guild.roles, id=guest_role)
|
|
else:
|
|
role = discord.utils.get(member.guild.roles, id=bot_role)
|
|
logging.info(f"Adding to {member} role {role}")
|
|
await member.add_roles(role)
|
|
|
|
|
|
@bot.event
|
|
async def on_member_update(before: discord.Member, after: discord.Member):
|
|
sql_update_query = f"""UPDATE "{after.guild.id}" set nick = ? where userid = ?"""
|
|
data_tuple = (after.nick, before.id)
|
|
DB.work_with_db(sql_update_query, data_tuple)
|
|
|
|
|
|
@bot.command(name="upload_audio")
|
|
async def upload_audio(ctx, user=None):
|
|
user = user or ctx.author
|
|
if ctx.author.guild_permissions.administrator or user is ctx.author:
|
|
if ctx.message.attachments:
|
|
from os import error
|
|
if not path.isdir(f'tmp/{user.id}'):
|
|
try:
|
|
makedirs(f'tmp/{user.id}')
|
|
except error as error:
|
|
logging.info(f"Failed to create dir", error)
|
|
if not path.isdir(f'audio/{user.id}'):
|
|
try:
|
|
makedirs(f'audio/{user.id}')
|
|
except error as error:
|
|
logging.info(f"Failed to create dir", error)
|
|
for at in ctx.message.attachments:
|
|
import mimetypes
|
|
|
|
await at.save(f'tmp/{user.id}/{at.filename}')
|
|
guess = mimetypes.guess_type(f'tmp/{user.id}/{at.filename}')
|
|
if guess[0]:
|
|
from pymediainfo import MediaInfo
|
|
file = f'tmp/{user.id}/{at.filename}'
|
|
duration = round(MediaInfo.parse(file).tracks[0].duration / 1000)
|
|
if duration > 15:
|
|
await ctx.reply(f'Audio duration is {duration}, but max is 15')
|
|
else:
|
|
a = DB.read_db(ctx.guild.id, user.id)
|
|
if a is None:
|
|
audiolist = f'{user.id}/{at.filename}'
|
|
else:
|
|
audiolist = DB.read_db(ctx.guild.id, user.id) + ", " + f'{user.id}/{at.filename}'
|
|
|
|
DB.add_audio(ctx.guild.id, user.id, audiolist)
|
|
rename(f'tmp/{user.id}/{at.filename}', f'audio/{user.id}/{at.filename}')
|
|
else:
|
|
await ctx.reply(f'Failed to find MIME type of {at.filename}')
|
|
remove(f'tmp/{user.id}/{at.filename}')
|
|
else:
|
|
await ctx.reply("Has no Attachment")
|
|
else:
|
|
await ctx.reply(f'You`re not admin. You can add audio only for your own account')
|
|
|
|
|
|
@bot.command(name="set_prefix")
|
|
@commands.has_permissions(administrator=True)
|
|
async def command_set_prefix(ctx, prefix: str):
|
|
await Commands.set_prefix(ctx, prefix)
|
|
|
|
|
|
@inter_client.slash_command(
|
|
name="set_prefix",
|
|
description="Settin up bot prefix",
|
|
options=[
|
|
Option("prefix", "Specify prefix", OptionType.STRING, required=True),
|
|
]
|
|
)
|
|
async def slash_set_prefix(ctx, prefix: str):
|
|
await Commands.set_prefix(ctx, prefix)
|
|
|
|
|
|
@inter_client.slash_command(
|
|
name="info",
|
|
description="Read list of tracks for user",
|
|
options=[
|
|
Option("user", "Specify any user", OptionType.USER),
|
|
]
|
|
)
|
|
async def info(ctx, user=None):
|
|
user = user or ctx.author
|
|
audio = DB.read_db(ctx.guild.id, user.id)
|
|
rolelist = [r.mention for r in user.roles if r != ctx.guild.default_role]
|
|
if rolelist:
|
|
roles = "\n".join(rolelist)
|
|
else:
|
|
roles = "Not added any role"
|
|
|
|
if audio is None:
|
|
audios = "Not selected audio"
|
|
else:
|
|
audios = "• " + "\n• ".join(sorted(audio.split(", ")))
|
|
|
|
emb = discord.Embed(
|
|
title=f"General information",
|
|
description=f"General information on server about {user}",
|
|
icon=user.avatar_url
|
|
)
|
|
emb.set_thumbnail(url=user.avatar_url)
|
|
emb.add_field(name="General info",
|
|
value=f"Username: {user}\n"
|
|
f"Nickname: {user.nick}\n"
|
|
f"Joined at: {user.joined_at.strftime('%A, %B %d %Y @ %H:%M:%S')}", inline=False)
|
|
emb.add_field(name="Audio list", value=f"{audios}", inline=True)
|
|
emb.add_field(name="Roles list", value=f"{roles}", inline=True)
|
|
emb.set_footer(text="Information requested by: {}".format(ctx.author.display_name))
|
|
|
|
await ctx.reply(embed=emb)
|
|
|
|
|
|
@inter_client.slash_command(
|
|
name="set_trigger_role",
|
|
description="Setting up role to trigger bot",
|
|
options=[
|
|
Option("role", "Specify role", OptionType.ROLE, required=True),
|
|
]
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def set_trigger_role(ctx, role):
|
|
await Commands.write_json(ctx.guild.id, "tigger_role", role.id)
|
|
await ctx.send(f"Role to trigger set to : `{role.name}`")
|
|
|
|
|
|
@inter_client.slash_command(
|
|
name="set_bot_role",
|
|
description="Set Default bot role",
|
|
options=[
|
|
Option("role", "Specify role", OptionType.ROLE, required=True),
|
|
]
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def set_bot_role(ctx, role):
|
|
await Commands.write_json(ctx.guild.id, "bot_role", role.id)
|
|
await ctx.send(f"Setted up bot role to: `{role.name}`")
|
|
|
|
|
|
@inter_client.slash_command(
|
|
name="cogs",
|
|
description="Work with cogs",
|
|
options=[
|
|
Option("what do",
|
|
description="Specify what do with cogs",
|
|
type=OptionType.STRING,
|
|
required=True,
|
|
choices=[
|
|
OptionChoice("load", "load"),
|
|
OptionChoice("unload", "unload"),
|
|
OptionChoice("reload", "reload"),
|
|
]
|
|
),
|
|
Option('cog', "Specify cog if need", OptionType.STRING)
|
|
]
|
|
)
|
|
async def slash_cogs(ctx, what_do, cogs):
|
|
if cogs is None:
|
|
cogs = CogsPrepare.cog_list()
|
|
if ctx.author.id == bot_owner:
|
|
await CogsPrepare.work_with_cogs(what_do, cogs)
|
|
if cogs is CogsPrepare.cog_list():
|
|
await ctx.reply(f'Cogs are f{what_do}ed')
|
|
else:
|
|
await ctx.reply(f'Cog {cogs} is {what_do}ed')
|
|
else:
|
|
await ctx.reply('You`re not bot owner')
|
|
|
|
|
|
@inter_client.slash_command(
|
|
name="set_guest_role",
|
|
description="Set Default bot role",
|
|
options=[
|
|
Option("role", "Specify role", OptionType.ROLE, required=True),
|
|
]
|
|
)
|
|
@commands.has_permissions(administrator=True)
|
|
async def set_guest_role(ctx, role):
|
|
await Commands.write_json(ctx.guild.id, "guest_role", role.id)
|
|
await ctx.send(f"Setted up bot role to: `{role.name}`")
|
|
|
|
|
|
bot.run('OTQ3OTUzOTAxNzgzNjIxNjYy.GTXbMv.KrztaTO7-ivsPEAVjsyikSQ-GP-ANwULmDraig')
|