Files
discord_bot/test.py
2022-06-26 08:10:16 +03:00

185 lines
5.7 KiB
Python

import sys
import threading
import logging
import discord
import sqlite3
from os import walk
from discord.ext import commands
from dislash import InteractionClient, Option, OptionType
class DB:
def _prepare_db(guild):
try:
connect = sqlite3.connect('user.db')
cursor = connect.cursor()
create_table = (f'''CREATE TABLE IF NOT EXISTS "{guild}"
([userid] INTEGER PRIMARY KEY, [username] TEXT, [nick] TEXT, [isbot] BOOL, [track] TEXT)
''')
cursor.execute(create_table)
cursor.close()
except sqlite3.Error as error:
logging.error("failed to connecct db", error)
def _work_with_db(db_func, data_turple):
try:
connect = sqlite3.connect('user.db')
cursor = connect.cursor()
cursor.execute(db_func, data_turple)
connect.commit()
cursor.close()
except sqlite3.Error as error:
logging.error("failed to connecct db", error)
def _fill_bd(name, userid, isbot, nick, guild):
sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}"
(username, userid, nick, isbot)
VALUES (?, ?, ?, ?)""")
data_tuple = (name, userid, nick, isbot)
DB._work_with_db(sqlite_insert_with_param, data_tuple)
def _add_audio(guildid, user, audio):
sql_update_query = f"""UPDATE "{guildid}" set track = ? where userid = ?"""
data_tuple = (audio, user)
DB._work_with_db(sql_update_query, data_tuple)
def _read_db(guildid, user):
try:
sql_con = sqlite3.connect("user.db")
cursor = sql_con.cursor()
sql_read = f"""SELECT * FROM "{guildid}" where userid = {user}"""
cursor.execute(sql_read)
record = cursor.fetchone()
print(record[4])
if record[4] is None:
return "None"
else:
return record[4]
except sqlite3.Error as error:
logging.error("Failed to read sqlite table", error)
threading.current_thread().name = "main"
logging.basicConfig(stream=sys.stdout, filemode='w', level='INFO',
format='%(asctime)s - %(levelname)s - %(threadName)s - %(message)s')
intents = discord.Intents.default()
intents.members = True
bot = commands.Bot(command_prefix='$', guild_subscriptions=True, intents=intents)
inter_client = InteractionClient(bot)
class Arg:
f = []
for filenames in walk('audio'):
f.extend(filenames)
break
f = f[2]
dict = {}
keys = range(len(f))
for i in keys:
for x in f:
dict[x] = x
@bot.event
async def on_ready():
for g in bot.get_all_members():
DB._prepare_db(g.guild.id)
for g in bot.get_all_members():
DB._fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id)
logging.info(f'Bot started')
logging.info('We have logged in as {0.user}'.format(bot))
@bot.event
async def on_guild_join(guild):
DB._prepare_db(guild.id)
for g in guild.members:
DB._fill_bd(g.name, g.id, g.bot, g.nick, guild.id)
@bot.event
async def on_member_join(member):
DB._fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id)
# @bot.command()
# async def add_audio(ctx, user, audio):
# DB._add_audio(ctx.guild.id, user.removesuffix('>').removeprefix('<@'), audio)
@inter_client.slash_command(
name="add-audio",
description="Add audio track to user",
options=[
Option("audio", "Specify audio name", OptionType.STRING, required=True),
Option("user", "Specify any user", OptionType.USER)
]
)
async def add_audio(ctx, audio, user=None):
user = user or ctx.author
a = DB._read_db(ctx.guild.id, user.id)
if a == "None":
audiolist = audio
else:
audiolist = DB._read_db(ctx.guild.id, user.id) + ", " + audio
DB._add_audio(ctx.guild.id, user.id, audiolist)
emb = discord.Embed(
title=f"Sucseed added {audio} to {user}",
color=discord.Color.blue()
)
emb.set_thumbnail(url=user.avatar_url)
await ctx.reply(embed=emb)
@inter_client.slash_command(
name="info",
description="Read list of tracks for user",
options=[
Option("user", "Specify any user", OptionType.USER),
]
)
async def info(ctx, user=None):
user = user or ctx.author
audio = DB._read_db(ctx.guild.id, user.id)
rolelist = [r.mention for r in user.roles if r != ctx.guild.default_role]
audiolist = audio.split(", ")
roles = "\n".join(rolelist)
audios = "\n".join(audiolist)
emb = discord.Embed(
title=f"General information",
description=f"General information on server about {user}",
icon=user.avatar_url
)
emb.set_thumbnail(url=user.avatar_url)
emb.add_field(name="General info",
value=f"Username: {user}\n"
f"Nickname: {user.nick}\n"
f"Joined at: {user.joined_at.strftime('%A, %B %d %Y @ %H:%M:%S')}", inline=False)
emb.add_field(name="Audio list", value=f"{audios}", inline=True)
emb.add_field(name="Roles list", value=f"{roles}", inline=True)
emb.set_footer(text="Information requested by: {}".format(ctx.author.display_name))
await ctx.reply(embed=emb)
@bot.command()
async def test(ctx, user):
await ctx.send(DB._read_db(ctx.guild.id, user))
@bot.event
async def on_member_update(before: discord.Member, after: discord.Member):
sql_update_query = f"""UPDATE "{after.guild.id}" set nick = ? where userid = ?"""
data_tuple = (after.nick, before.id)
DB._work_with_db(sql_update_query, data_tuple)
bot.run('OTQ3OTUzOTAxNzgzNjIxNjYy.GTXbMv.KrztaTO7-ivsPEAVjsyikSQ-GP-ANwULmDraig')