some broken json work

This commit is contained in:
2022-06-28 19:23:12 +03:00
parent 9f5b8c664f
commit 144af6ea0a

183
test.py
View File

@@ -4,18 +4,19 @@ import sqlite3
import sys
import threading
from os import walk, path, makedirs, remove, rename, listdir
from typing import List, Union, Any
from typing import List, Union, Any, Dict
import discord
from discord.ext import commands
from dislash import InteractionClient, Option, OptionType
from dislash import InteractionClient, Option, OptionType, OptionChoice
bot_owner = 386629192743256065
class DB:
@staticmethod
def _prepare_db(guild: int):
def prepare_db(guild: int):
try:
connect = sqlite3.connect('user.db')
cursor = connect.cursor()
@@ -28,7 +29,7 @@ class DB:
logging.error("failed to connecct db", error)
@staticmethod
def _work_with_db(db_func, data_turple):
def work_with_db(db_func, data_turple):
try:
connect = sqlite3.connect('user.db')
cursor = connect.cursor()
@@ -40,21 +41,21 @@ class DB:
logging.error("failed to connecct db", error)
@staticmethod
def _fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int):
def fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int):
sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}"
(username, userid, nick, isbot)
VALUES (?, ?, ?, ?)""")
data_tuple = (name, userid, nick, isbot)
DB._work_with_db(sqlite_insert_with_param, data_tuple)
DB.work_with_db(sqlite_insert_with_param, data_tuple)
@staticmethod
def _add_audio(guildid: int, user: int, audio: str):
def add_audio(guildid: int, user: int, audio: str):
sql_update_query = f"""UPDATE "{guildid}" set track = ? where userid = ?"""
data_tuple = (audio, user)
DB._work_with_db(sql_update_query, data_tuple)
DB.work_with_db(sql_update_query, data_tuple)
@staticmethod
def _read_db(guild: int, user: int):
def read_db(guild: int, user: int):
try:
sql_con = sqlite3.connect("user.db")
cursor = sql_con.cursor()
@@ -67,9 +68,9 @@ class DB:
logging.error("Failed to read sqlite table", error)
class Cogs_prepare:
class CogsPrepare:
@staticmethod
def _cog_list():
def cog_list():
cogs_list = []
for filename in listdir('./cogs'):
if filename.endswith('.py'):
@@ -77,26 +78,30 @@ class Cogs_prepare:
return cogs_list
@staticmethod
async def _cogs_load(ctx):
for filename in Cogs_prepare._cog_list():
if filename.endswith('.py'):
bot.load_extension(f'cogs.{filename[:-3]}')
print(f'Load cog {filename[:-3]}')
else:
print(f'Unable to load {filename[:-3]}')
await ctx.reply('Cogs loaded')
async def cogs_load():
for filename in CogsPrepare.cog_list():
bot.load_extension(f'cogs.{filename}')
print(f'Load cog {filename}')
@staticmethod
async def _cogs_unload(ctx):
for filename in Cogs_prepare._cog_list():
bot.unload_extension(f'cogs.{filename[:-3]}')
print(f'Unload cog {filename[:-3]}')
async def cogs_unload():
for filename in CogsPrepare.cog_list():
bot.unload_extension(f'cogs.{filename}')
print(f'Unload cog {filename}')
await ctx.reply('Cogs unloaded')
@classmethod
async def work_with_cogs(what_do, cogs):
for filename in cogs:
if what_do == "load":
bot.load_extension(f'cogs.{filename}')
elif what_do == 'unload':
bot.unload_extension(f'cogs.{filename}')
elif what_do == 'reload':
bot.reload_extension(f'cogs.{filename}')
logging.info(f'{what_do.capitalize()}ing cog {filename}')
@staticmethod
async def _cog_load(ctx, cog):
async def cog_load(ctx, cog):
try:
bot.load_extension(f'cogs.{cog}')
await ctx.reply(f'Loaded cog {cog}')
@@ -125,7 +130,7 @@ if not path.isfile('prefix.json'):
def determine_prefix(bot, msg):
guild = msg.guild
base = Commands._read_json(guild.id, 'prefix') or DEFAULT_PREFIX # Get bot role
base = Commands.read_json(guild.id, 'prefix') or DEFAULT_PREFIX # Get bot role
return base
@@ -135,9 +140,7 @@ bot = commands.Bot(command_prefix=determine_prefix, guild_subscriptions=True, in
inter_client = InteractionClient(bot)
print(Cogs_prepare._cog_list())
for filename in Cogs_prepare._cog_list():
for filename in CogsPrepare.cog_list():
try:
bot.load_extension(f'cogs.{filename}')
logging.info(f'Loaded cog {filename}')
@@ -149,19 +152,19 @@ for filename in Cogs_prepare._cog_list():
logging.error(f'Error: {filename} failed to load properly.')
except commands.ExtensionError:
logging.error(f'Error: unknown error with {filename[:-3]}')
logging.error(f'Error: unknown error with {filename}')
class Commands:
@staticmethod
async def set_prefix(ctx, prefix: str):
await Commands._write_json(ctx.guild.id, "prefix", prefix)
await Commands.write_json(ctx.guild.id, "prefix", prefix)
await ctx.send(f"Prefix set to: `{prefix}`")
@staticmethod
def list_files(_fold: str):
def list_files(fold: str):
fl = []
for filenames in walk(_fold):
for filenames in walk(fold):
fl.extend(filenames)
break
files = {}
@@ -171,14 +174,14 @@ class Commands:
return files
@staticmethod
async def _read_json(guild: int, param):
async def read_json(guild: int, param):
parameter = None
try:
with open('prefix.json', 'r', encoding='utf-8') as fp: # Open the JSON
jsonObject = json.load(fp) # Load the custom prefixes
except TypeError:
jsonObject = {}
print(type(jsonObject))
if guild: # If the guild exists
try:
guild_conf = jsonObject[f"{guild}"]
@@ -192,29 +195,46 @@ class Commands:
return parameter
@staticmethod
async def _write_json(guild, param_name, param):
async def write_json(guild, param_name, param):
with open('prefix.json', 'r', encoding='utf-8') as fp:
try:
jsonObject = json.load(fp)
except TypeError:
jsonObject: dict = json.load(fp)
except:
jsonObject = {}
print(type(jsonObject))
try:
jsonObject[f"{guild}"][f"{param_name}"] = param
except KeyError:
json_param = jsonObject[f"{guild}"] = {f'{param_name}': ''}
jsonObject[str(guild)] = json_param.update({f"{param_name}": param})
print(jsonObject)
print('--------')
new = {f"{guild}": "{}"}
print(new)
jsonObject.update(new)
try:
json_param = json.load(jsonObject[f"{guild}"])
except KeyError:
json_param = {}
print(type(json_param))
print(param_name)
print(param)
new = {f"{param_name}": f"{param}"}
print(type(json_param))
print(new)
json_param.update(new)
# jsonObject[str(guild)] = json_param
with open("prefix.json", "w") as fl:
json.dump(jsonObject, fl)
fl.close()
@bot.event
async def on_ready():
for g in bot.get_all_members():
DB._prepare_db(g.guild.id)
DB.prepare_db(g.guild.id)
for g in bot.get_all_members():
DB._fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id)
DB.fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id)
logging.info(f'Bot started')
logging.info('We have logged in as {0.user}'.format(bot))
@@ -222,15 +242,15 @@ async def on_ready():
@bot.event
async def on_guild_join(guild):
for g in guild.members:
DB._fill_bd(g.name, g.id, g.bot, g.nick, guild.id)
DB.fill_bd(g.name, g.id, g.bot, g.nick, guild.id)
@bot.event
async def on_member_join(member):
DB._fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id)
DB.fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id)
bot_role = Commands._read_json(member.guild.id, 'bot_role') # Get bot role
guest_role = Commands._read_json(member.guild.id, 'guest_role') # Get guest role
bot_role = Commands.read_json(member.guild.id, 'bot_role') # Get bot role
guest_role = Commands.read_json(member.guild.id, 'guest_role') # Get guest role
if bot_role and guest_role:
if member.bot == 0:
@@ -245,7 +265,7 @@ async def on_member_join(member):
async def on_member_update(before: discord.Member, after: discord.Member):
sql_update_query = f"""UPDATE "{after.guild.id}" set nick = ? where userid = ?"""
data_tuple = (after.nick, before.id)
DB._work_with_db(sql_update_query, data_tuple)
DB.work_with_db(sql_update_query, data_tuple)
@bot.command(name="upload_audio")
@@ -276,13 +296,13 @@ async def upload_audio(ctx, user=None):
if duration > 15:
await ctx.reply(f'Audio duration is {duration}, but max is 15')
else:
a = DB._read_db(ctx.guild.id, user.id)
a = DB.read_db(ctx.guild.id, user.id)
if a is None:
audiolist = f'{user.id}/{at.filename}'
else:
audiolist = DB._read_db(ctx.guild.id, user.id) + ", " + f'{user.id}/{at.filename}'
audiolist = DB.read_db(ctx.guild.id, user.id) + ", " + f'{user.id}/{at.filename}'
DB._add_audio(ctx.guild.id, user.id, audiolist)
DB.add_audio(ctx.guild.id, user.id, audiolist)
rename(f'tmp/{user.id}/{at.filename}', f'audio/{user.id}/{at.filename}')
else:
await ctx.reply(f'Failed to find MIME type of {at.filename}')
@@ -310,6 +330,29 @@ async def slash_setprefix(ctx, prefix: str):
await Commands.set_prefix(ctx, prefix)
# @inter_client.slash_command(
# name="cogs_load",
# description="Load cogs",
# )
# async def slash_cogs_load(ctx):
# if ctx.author.id == bot_owner:
# await CogsPrepare.cogs_load()
# await ctx.reply('Cogs loaded')
# else:
# await ctx.reply('You`re not bot owner')
#
# @inter_client.slash_command(
# name="cogs_unload",
# description="Unload cogs",
# )
# async def slash_cogs_unload(ctx):
# if ctx.author.id == bot_owner:
# await CogsPrepare.cogs_unload()
# await ctx.reply('Cogs unloaded')
# else:
# await ctx.reply('You`re not bot owner')
@inter_client.slash_command(
name="info",
description="Read list of tracks for user",
@@ -319,7 +362,7 @@ async def slash_setprefix(ctx, prefix: str):
)
async def info(ctx, user=None):
user = user or ctx.author
audio = DB._read_db(ctx.guild.id, user.id)
audio = DB.read_db(ctx.guild.id, user.id)
rolelist = [r.mention for r in user.roles if r != ctx.guild.default_role]
if rolelist:
roles = "\n".join(rolelist)
@@ -357,7 +400,7 @@ async def info(ctx, user=None):
)
@commands.has_permissions(administrator=True)
async def set_trigger_role(ctx, role):
await Commands._write_json(ctx.guild.id, "tigger_role", role.id)
await Commands.write_json(ctx.guild.id, "tigger_role", role.id)
await ctx.send(f"Role to trigger set to : `{role.name}`")
@@ -370,10 +413,40 @@ async def set_trigger_role(ctx, role):
)
@commands.has_permissions(administrator=True)
async def set_bot_role(ctx, role):
await Commands._write_json(ctx.guild.id, "bot_role", role.id)
await Commands.write_json(ctx.guild.id, "bot_role", role.id)
await ctx.send(f"Setted up bot role to: `{role.name}`")
@inter_client.slash_command(
name="cogs",
description="Work with cogs",
options=[
Option("what do",
description="Specify what do with cogs",
type=OptionType.STRING,
required=True,
choices=[
OptionChoice("load", "load"),
OptionChoice("unload", "unload"),
OptionChoice("reload", "reload"),
]
),
Option('cog', "Specify cog if need", OptionType.STRING)
]
)
async def slash_cogs(ctx, what_do, cogs):
if cogs is None:
cogs = CogsPrepare.cog_list()
if ctx.author.id == bot_owner:
await CogsPrepare.work_with_cogs(what_do, cogs)
if cogs is CogsPrepare.cog_list():
await ctx.reply(f'Cogs are f{what_do}ed')
else:
await ctx.reply(f'Cog {cogs} is {what_do}ed')
else:
await ctx.reply('You`re not bot owner')
@inter_client.slash_command(
name="set_guest_role",
description="Set Default bot role",
@@ -383,7 +456,7 @@ async def set_bot_role(ctx, role):
)
@commands.has_permissions(administrator=True)
async def set_guest_role(ctx, role):
await Commands._write_json(ctx.guild.id, "guest_role", role.id)
await Commands.write_json(ctx.guild.id, "guest_role", role.id)
await ctx.send(f"Setted up bot role to: `{role.name}`")