9 Commits
test ... master

Author SHA1 Message Date
Slava
0946a7f51e Update .gitlab-ci.yml file 2024-05-28 21:24:14 +00:00
Slava
3e0a0c197a Update .gitlab-ci.yml file 2024-05-28 19:58:20 +00:00
Slava
b43c5cf5c1 Удалить check.py 2024-05-28 19:56:55 +00:00
bot
a136491362 restore 2024-05-28 22:55:47 +03:00
bacon
413736b44d changed logger 2024-04-13 20:03:57 +03:00
bot
78be10a88b v0.0.8 2024-03-25 10:14:57 +03:00
Slava
c3140403b5 Update .gitlab-ci.yml file 2024-03-16 17:51:18 +00:00
Slava
c60502a241 Update .gitlab-ci.yml file 2024-03-16 10:46:43 +00:00
Slava
17f1972104 Update .gitlab-ci.yml file 2024-03-16 10:43:16 +00:00
54 changed files with 997 additions and 1281 deletions

View File

@@ -1,11 +0,0 @@
/tmp/
audio/*/
/.idea
user.db
*.pyc
/.run/
.env
*.exe
/venv/
config.json
.editorconfig

11
.gitignore vendored
View File

@@ -1,11 +1,14 @@
/tmp/ /tmp/
audio/*/ /audio/*/
/.idea /.idea
user.db /user.db
*.pyc *.pyc
/.run/ /.run/
.env /.env
*.exe *.exe
/venv/ /venv/
/fun_and_admin_bot.egg-info/
/.YMcache/
config.json config.json
.editorconfig /env/
/.project

View File

@@ -1,136 +1,19 @@
# You can override the included template(s) by including variable overrides
# SAST customization: https://docs.gitlab.com/ee/user/application_security/sast/#customizing-the-sast-settings
# Secret Detection customization: https://docs.gitlab.com/ee/user/application_security/secret_detection/#customizing-settings
# Dependency Scanning customization: https://docs.gitlab.com/ee/user/application_security/dependency_scanning/#customizing-the-dependency-scanning-settings
# Container Scanning customization: https://docs.gitlab.com/ee/user/application_security/container_scanning/#customizing-the-container-scanning-settings
# Note that environment variables can be set in several places
# See https://docs.gitlab.com/ee/ci/variables/#cicd-variable-precedence
stages: stages:
- 'build' - 'test'
- 'push' - 'code_quality'
- 'deploy'
variables:
# DOCKER_HOST: tcp://docker:2375
# fill those if you have a proxy in your environment
DOCKER_DRIVER: overlay2
# See https://github.com/docker-library/docker/pull/166
DOCKER_TLS_CERTDIR: "/certs"
make_image:
stage: build
image: docker:26.1.3
only:
# We want this job to be run on tags only.
refs:
- tags
- push
changes:
- ^README.md
- ^CHANGELOG
- ^.gitattribute
- ^.gitignore
- ^.gitlab-ci.yml
- src/**/*
- requirements.txt
services:
- docker:26.1.3-dind
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
# fetches the latest image (not failing if image is not found)
- docker pull $CI_REGISTRY_IMAGE:latest || true
# builds the project, passing proxy variables, using OCI labels
# notice the cache-from, which is going to use the image we just pulled locally
# the built image is tagged locally with the commit SHA, and then pushed to
# the GitLab registry
- >
DOCKER_BUILDKIT=1 docker build
--pull
--cache-from $CI_REGISTRY_IMAGE:latest
--label "org.opencontainers.image.title=$CI_PROJECT_TITLE"
--label "org.opencontainers.image.url=$CI_PROJECT_URL"
--label "org.opencontainers.image.created=$CI_JOB_STARTED_AT"
--label "org.opencontainers.image.revision=$CI_COMMIT_SHA"
--label "org.opencontainers.image.version=$CI_COMMIT_REF_NAME"
--tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA
.
- docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA
# Here, the goal is to tag the "master" branch as "latest"
Push latest:
needs: ['make_image']
variables:
# We are just playing with Docker here.
# We do not need GitLab to clone the source code.
GIT_STRATEGY: none
stage: push
only:
changes:
- ^README.md
- ^CHANGELOG
- ^.gitattribute
- ^.gitignore
- ^.gitlab-ci.yml
- src/**/*
- requirements.txt
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
# Because we have no guarantee that this job will be picked up by the same runner
# that built the image in the previous step, we pull it again locally
- docker pull $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA
# Then we tag it "latest"
- docker tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA $CI_REGISTRY_IMAGE:latest
# Annnd we push it.
- docker push $CI_REGISTRY_IMAGE:latest
# Finally, the goal here is to Docker tag any Git tag
# GitLab will start a new pipeline everytime a Git tag is created, which is pretty awesome
Push commit:
needs: ['make_image']
variables:
# Again, we do not need the source code here. Just playing with Docker.
GIT_STRATEGY: none
stage: push
only:
# We want this job to be run on tags only.
refs:
- ^tags
- push
changes:
- ^README.md
- ^CHANGELOG
- ^.gitattribute
- ^.gitignore
- ^.gitlab-ci.yml
- src/**/*
- requirements.txt
before_script: sast:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY stage: 'test'
script: include:
- docker pull $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA - template: Security/SAST.gitlab-ci.yml
- docker tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA - template: Security/Dependency-Scanning.gitlab-ci.yml
- docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_NAME - template: Security/SAST-IaC.gitlab-ci.yml
Push tag:
needs: ['make_image']
variables:
# Again, we do not need the source code here. Just playing with Docker.
GIT_STRATEGY: none
stage: push
only:
# We want this job to be run on tags only.
refs:
- tags
changes:
- ^README.md
- ^CHANGELOG
- ^.gitattribute
- ^.gitignore
- ^.gitlab-ci.yml
- src/**/*
- requirements.txt
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker pull $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA
- docker tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA $CI_REGISTRY_IMAGE:$CI_COMMIT_TAG
- docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_TAG

2
CHANGELOG Normal file
View File

@@ -0,0 +1,2 @@
0.0.5
Initial

View File

@@ -1,22 +0,0 @@
FROM python:3.12.2-slim-bookworm as builder
LABEL authors="bacon"
WORKDIR /app
COPY requirements.txt /app
RUN pip wheel install \
--no-cache-dir -q \
--no-deps --wheel-dir /app/wheels \
-r requirements.txt
FROM python:3.12.2-slim-bookworm as runner
WORKDIR /app
COPY --from=builder /app/wheels /wheels
RUN apt-get update -qq && \
apt-get install -qq --no-install-recommends ffmpeg -y && \
apt-get clean -qq autoclean && \
pip install --no-cache -q /wheels/*
COPY src/ /app
ENTRYPOINT ["python", "bot.py"]
#ENTRYPOINT ["ffmpeg"]

View File

@@ -1,16 +1,16 @@
__version__ = '0.0.7' __version__ = '0.0.8'
__title__ = "src" __title__ = "Pisya_bot"
__author__ = "baconborn" __author__ = "baconborn"
from typing import NamedTuple, Literal from typing import NamedTuple, Literal
class VersionInfo(NamedTuple): class VersionInfo(NamedTuple):
major: int major: int
minor: int minor: int
micro: int micro: int
releaselevel: Literal["alpha", "beta", "candidate", "final"] releaselevel: Literal["alpha", "beta", "candidate", "final"]
serial: int serial: int
version_info: VersionInfo = VersionInfo(major=0, minor=0, micro=7, releaselevel="alpha", serial=0) version_info: VersionInfo = VersionInfo(major=0, minor=0, micro=8, releaselevel="alpha", serial=0)

319
src/bot.py → bot.py Normal file → Executable file
View File

@@ -1,158 +1,161 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from os import getenv import asyncio
from os.path import isfile from os import getenv
from os.path import isfile
from disnake import OptionType, Option, Localized, ApplicationCommandInteraction, Intents, __version__
from disnake.ext.commands import Bot, is_owner from disnake import OptionType, Option, Localized, ApplicationCommandInteraction, Intents, __version__
from dotenv import load_dotenv from disnake.ext.commands import Bot, is_owner
from dotenv import load_dotenv
from integral_lib.CogsPrep import work_with_cogs, cog_list from loguru import logger
from integral_lib.Comands import determine_prefix
from integral_lib.Logger import logger from __init__ import __version__ as ver
from lib.CogsPrep import work_with_cogs, cog_list
load_dotenv() from lib.Comands import determine_prefix
if not isfile('.env') or not getenv('CONF_FILE'):
with open('.env', 'a', encoding='utf-8') as f: load_dotenv()
f.write("CONF_FILE='config.json'\n") if not isfile('.env') or not getenv('CONF_FILE'):
load_dotenv() with open('.env', 'a', encoding='utf-8') as f:
if not isfile(getenv('CONF_FILE')): f.write("CONF_FILE='config.json'\n")
with open(getenv('CONF_FILE'), 'a', encoding='utf-8') as f: load_dotenv()
f.write("") if not isfile(getenv('CONF_FILE')):
with open(getenv('CONF_FILE'), 'a', encoding='utf-8') as f:
intents = Intents(messages=True, f.write("")
guilds=True,
message_content=True, intents = Intents(messages=True,
voice_states=True, guilds=True,
members=True, message_content=True,
presences=True voice_states=True,
) members=True,
presences=True
bot = Bot(command_prefix=determine_prefix, )
intents=intents,
reload=True, bot = Bot(command_prefix=determine_prefix,
test_guilds=[648126669122568215] intents=intents,
) reload=True,
test_guilds=[648126669122568215]
bot.i18n.load("locale/") )
work_with_cogs('load', bot, cog_list()) bot.i18n.load("locale/")
asyncio.run(work_with_cogs('load', bot, asyncio.run(cog_list())))
@bot.event
async def on_ready():
logger.info('Bot started') @bot.event
logger.info(f'Disnake version {__version__}') async def on_ready():
logger.info(f'We have logged in as {bot.user}') logger.info('Bot started')
logger.info(f'Disnake version {__version__}')
logger.info(f'We have logged in as {bot.user}')
@bot.slash_command( logger.info(f'Version of bot is - v{ver}')
name='cog',
options=[
Option( @bot.slash_command(
name=Localized('cog', key="COG".lower()), name='cog',
description=Localized("cog file", key="COG_FILE"), options=[
type=OptionType.string Option(
) name=Localized('cog', key="COG".lower()),
] description=Localized("cog file", key="COG_FILE"),
) type=OptionType.string
@is_owner() )
async def slash_cogs(inter: ApplicationCommandInteraction): ]
""" )
Working with cogs (modules) {{SLASH_COG}} @is_owner()
async def slash_cogs(inter: ApplicationCommandInteraction):
Parameters """
---------- Working with cogs (modules) {{SLASH_COG}}
:param inter:
""" Parameters
pass ----------
:param inter:
"""
@slash_cogs.sub_command(description=Localized("Enables Cog", key="ENABLE_COG")) pass
async def enable(inter: ApplicationCommandInteraction, cog: str):
"""
@slash_cogs.sub_command(description=Localized("Enables Cog", key="ENABLE_COG"))
Parameters async def enable(inter: ApplicationCommandInteraction, cog: str):
---------- """
:param inter:
:param cog: Select Cogfile {{COG_FILE}} Parameters
""" ----------
await work_with_cogs('enable', bot, cog) :param inter:
await inter.response.send_message(f'Cog {cog} is enabled', ephemeral=True) :param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('enable', bot, cog)
@slash_cogs.sub_command(description=Localized("Disables Cog", key="DISABLE_COG")) await inter.response.send_message(f'Cog {cog} is enabled', ephemeral=True)
async def disable(inter: ApplicationCommandInteraction, cog: str):
"""
@slash_cogs.sub_command(description=Localized("Disables Cog", key="DISABLE_COG"))
Parameters async def disable(inter: ApplicationCommandInteraction, cog: str):
---------- """
:param inter:
:param cog: Select Cogfile {{COG_FILE}} Parameters
""" ----------
await work_with_cogs('disable', bot, cog) :param inter:
await inter.response.send_message(f'Cog {cog} is disabled', ephemeral=True) :param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('disable', bot, cog)
@slash_cogs.sub_command(description=Localized("Loads Cog", key="LOAD_COG")) await inter.response.send_message(f'Cog {cog} is disabled', ephemeral=True)
async def load(inter: ApplicationCommandInteraction, cog: str):
"""
@slash_cogs.sub_command(description=Localized("Loads Cog", key="LOAD_COG"))
Parameters async def load(inter: ApplicationCommandInteraction, cog: str):
---------- """
:param inter:
:param cog: Select Cogfile {{COG_FILE}} Parameters
""" ----------
await work_with_cogs('load', bot, cog) :param inter:
await inter.response.send_message(f'Cog {cog} is loaded', ephemeral=True) :param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('load', bot, cog)
@slash_cogs.sub_command(description=Localized("Unload Cog", key="UNLOAD_COG")) await inter.response.send_message(f'Cog {cog} is loaded', ephemeral=True)
async def unload(inter: ApplicationCommandInteraction, cog: str):
"""
@slash_cogs.sub_command(description=Localized("Unload Cog", key="UNLOAD_COG"))
Parameters async def unload(inter: ApplicationCommandInteraction, cog: str):
---------- """
:param inter:
:param cog: Select Cogfile {{COG_FILE}} Parameters
""" ----------
await work_with_cogs('unload', bot, cog) :param inter:
await inter.response.send_message(f'Cog {cog} is unload', ephemeral=True) :param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('unload', bot, cog)
@slash_cogs.sub_command(description=Localized("Reloads Cog", key="RELOAD_COG")) await inter.response.send_message(f'Cog {cog} is unload', ephemeral=True)
async def reload(inter: ApplicationCommandInteraction, cog: str):
"""
@slash_cogs.sub_command(description=Localized("Reloads Cog", key="RELOAD_COG"))
Parameters async def reload(inter: ApplicationCommandInteraction, cog: str):
---------- """
:param inter:
:param cog: Select Cogfile {{COG_FILE}} Parameters
""" ----------
await work_with_cogs('reload', bot, cog) :param inter:
await inter.response.send_message(f'Cog {cog} is reloaded', ephemeral=True) :param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('reload', bot, cog)
@disable.autocomplete('cog') await inter.response.send_message(f'Cog {cog} is reloaded', ephemeral=True)
@unload.autocomplete('cog')
@load.autocomplete('cog')
@reload.autocomplete('cog') @disable.autocomplete('cog')
async def _cog_opt(inter: ApplicationCommandInteraction, current: str): @unload.autocomplete('cog')
current = current.lower() @load.autocomplete('cog')
_list = cog_list(fold='./cogs/') @reload.autocomplete('cog')
return [choice for choice in _list if current in choice.lower()] async def _cog_opt(inter: ApplicationCommandInteraction, current: str):
current = current.lower()
_list = await cog_list(fold='./cogs/')
@enable.autocomplete('cog') return [choice for choice in _list if current in choice.lower()]
async def _cog_opt(inter: ApplicationCommandInteraction, current: str):
current = current.lower()
_list = cog_list(fold='./cogs/disabled/') @enable.autocomplete('cog')
return [choice for choice in _list if current in choice.lower()] async def _cog_opt(inter: ApplicationCommandInteraction, current: str):
current = current.lower()
_list = await cog_list(fold='./cogs/disabled/')
@slash_cogs.error return [choice for choice in _list if current in choice.lower()]
async def cogs_error(inter: ApplicationCommandInteraction):
await inter.response.send_message(Localized("Error", key="EROR"), ephemeral=True)
logger.error(f'User {inter.author} tries to use cogs func') @slash_cogs.error
async def cogs_error(inter: ApplicationCommandInteraction):
await inter.response.send_message(Localized("Error", key="EROR"), ephemeral=True)
bot.run(getenv('TOKEN')) logger.error(f'User {inter.author} tries to use cogs func')
bot.run(getenv('TOKEN'))

View File

@@ -1,148 +1,151 @@
from asyncio import sleep from asyncio import sleep
import disnake import disnake
from disnake import Option, OptionType, Localized from disnake import Option, OptionType, Localized
from disnake.ext import tasks from disnake.ext import commands, tasks
from loguru import logger
from integral_lib.Comands import *
from integral_lib.DB_Worker import fill_bd, prepare_db, work_with_db from lib.Comands import read_json, write_json
from integral_lib.Logger import logger from lib.DB_Worker import fill_bd, prepare_db, work_with_db
class Admin(commands.Cog, name='Admin'): class Admin(commands.Cog, name='Admin'):
def __init__(self, bot: commands.Bot): def __init__(self, bot: commands.Bot):
self.bot = bot # a defining src as global var in class self.bot = bot # a defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners @commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self): async def on_ready(self):
for g in self.bot.get_all_members(): for g in self.bot.get_all_members():
await prepare_db(g.guild.id) await prepare_db(g.guild.id)
for g in self.bot.get_all_members(): for g in self.bot.get_all_members():
await fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id) await fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id)
self.activity.start() self.activity.start()
logger.info(f'Cog {__name__.split(".")[1]} is ready!.') logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@tasks.loop(seconds=20) @tasks.loop(seconds=20)
async def activity(self): async def activity(self):
await self.bot.change_presence( await self.bot.change_presence(
activity=disnake.Activity( activity=disnake.Activity(
name=f'at users: {str(len(self.bot.users))}', name=f'at users: {str(len(self.bot.users))}',
type=3 type=3
) )
) )
await sleep(10) await sleep(10)
await self.bot.change_presence( await self.bot.change_presence(
activity=disnake.Activity( activity=disnake.Activity(
name=f'at servers: {str(len(self.bot.guilds))}', name=f'at servers: {str(len(self.bot.guilds))}',
type=3 type=3
) )
) )
@commands.Cog.listener() @commands.Cog.listener()
async def on_member_update(self, before: disnake.Member, after: disnake.Member): async def on_member_update(self, before: disnake.Member, after: disnake.Member):
sql_update_query = f"""UPDATE "{after.guild.id}" set nick = ? where userid = ?""" sql_update_query = f"""UPDATE "{after.guild.id}" set nick = ? where userid = ?"""
data_tuple = (after.nick, before.id) data_tuple = (after.nick, before.id)
await work_with_db(sql_update_query, data_tuple) await work_with_db(sql_update_query, data_tuple)
@commands.Cog.listener() @commands.Cog.listener()
async def on_guild_join(self, guild): async def on_guild_join(self, guild):
for g in guild.members: for g in guild.members:
await fill_bd(g.name, g.id, g.bot, g.nick, guild.id) await fill_bd(g.name, g.id, g.bot, g.nick, guild.id)
@commands.Cog.listener() @commands.Cog.listener()
async def on_member_join(self, member): async def on_member_join(self, member):
await fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id) await fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id)
bot_role = read_json(member.guild.id, 'bot_role') # Get src role bot_role = read_json(member.guild.id, 'bot_role') # Get bot role
guest_role = read_json(member.guild.id, 'guest_role') # Get guest role guest_role = read_json(member.guild.id, 'guest_role') # Get guest role
if bot_role or guest_role: logger.info(f"Bot role: {bot_role} | Guest role: {guest_role}")
if member.bot == 0: logger.info(f'type bot_role: {type(bot_role)} | type guest_role: {type(guest_role)}')
role = disnake.utils.get(member.guild.roles, id=guest_role) if bot_role or guest_role:
else: if member.bot == 0:
role = disnake.utils.get(member.guild.roles, id=bot_role) role = disnake.utils.get(member.guild.roles, id=guest_role)
logger.info(f"Adding to {member} role {role}") else:
await member.add_roles(role) role = disnake.utils.get(member.guild.roles, id=bot_role)
logger.info(f"Adding to {member} role {role}")
@commands.slash_command( await member.add_roles(role)
name="set_default_role",
description=Localized("Set Default src role", key="DEF_ROLE"), @commands.slash_command(
options=[ name="set_default_role",
Option("role", description=Localized("Set Default bot role", key="DEF_ROLE"),
"Specify role", options=[
OptionType.role, Option("role",
required=True), "Specify role",
] OptionType.role,
) required=True),
@commands.has_permissions(administrator=True) ]
async def set_guest_role(self, inter: disnake.ApplicationCommandInteraction, role): )
await write_json(inter.guild.id, "guest_role", role.id) @commands.has_permissions(administrator=True)
await inter.response.send_message(f"Set up src role to: `{role.name}`", ephemeral=True) async def set_guest_role(self, inter: disnake.ApplicationCommandInteraction, role):
print(type(role.id))
@commands.command(name="set_prefix") await write_json(inter.guild.id, "guest_role", role.id)
@commands.has_permissions(administrator=True) await inter.response.send_message(f"Set up bot role to: `{role.name}`", ephemeral=True)
async def command_set_prefix(self, ctx, prefix: str):
await write_json(ctx.guild.id, "prefix", prefix) @commands.command(name="set_prefix")
await ctx.reply(f"Prefix set to: `{prefix}`") @commands.has_permissions(administrator=True)
async def command_set_prefix(self, ctx, prefix: str):
@commands.guild_only() await write_json(ctx.guild.id, "prefix", prefix)
@commands.slash_command( await ctx.reply(f"Prefix set to: `{prefix}`")
name="set_prefix",
description="Setting up src prefix", @commands.guild_only()
options=[ @commands.slash_command(
Option("prefix", name="set_prefix",
"Specify prefix", description="Setting up bot prefix",
OptionType.string, options=[
required=True), Option("prefix",
] "Specify prefix",
) OptionType.string,
@commands.has_permissions(administrator=True) required=True),
async def slash_set_prefix(self, inter: disnake.ApplicationCommandInteraction, prefix: str): ]
await write_json(inter.guild.id, "prefix", prefix) )
await inter.response.send_message(f"Prefix set to: `{prefix}`", ephemeral=True) @commands.has_permissions(administrator=True)
async def slash_set_prefix(self, inter: disnake.ApplicationCommandInteraction, prefix: str):
@commands.guild_only() await write_json(inter.guild.id, "prefix", prefix)
@commands.has_permissions(administrator=True) await inter.response.send_message(f"Prefix set to: `{prefix}`", ephemeral=True)
@commands.slash_command(
name="set_trigger_role", @commands.guild_only()
description=Localized("Setting up role to trigger src", key="KEY_ROLE"), @commands.has_permissions(administrator=True)
options=[ @commands.slash_command(
Option("role", name="set_trigger_role",
Localized("Specify role", key="SETUP_ROLE"), description=Localized("Setting up role to trigger bot", key="KEY_ROLE"),
OptionType.role, options=[
required=True), Option("role",
] Localized("Specify role", key="SETUP_ROLE"),
) OptionType.role,
async def set_trigger_role(self, inter: disnake.ApplicationCommandInteraction, role): required=True),
await write_json(inter.guild.id, "tigger_role", role.id) ]
await inter.response.send_message(f"Role set to: `{role.name}`", ephemeral=True) )
async def set_trigger_role(self, inter: disnake.ApplicationCommandInteraction, role):
@commands.slash_command( await write_json(inter.guild.id, "tigger_role", role.id)
name="set_bot_role", await inter.response.send_message(f"Role set to: `{role.name}`", ephemeral=True)
description=Localized("Set src role", key="BOT_ROLE"),
options=[ @commands.slash_command(
Option("role", name="set_bot_role",
Localized("Specify role", key="SETUP_ROLE"), description=Localized("Set bot role", key="BOT_ROLE"),
OptionType.role, options=[
required=True) Option("role",
] Localized("Specify role", key="SETUP_ROLE"),
) OptionType.role,
@commands.guild_only() required=True)
@commands.has_permissions(administrator=True) ]
async def set_bot_role(self, ctx, role): )
await write_json(ctx.guild.id, @commands.guild_only()
"bot_role", @commands.has_permissions(administrator=True)
role.id) async def set_bot_role(self, ctx, role):
await ctx.send(f"Set up src role to: `{role.name}`", ephemeral=True) await write_json(ctx.guild.id,
"bot_role",
@set_bot_role.error role.id)
@set_trigger_role.error await ctx.send(f"Set up bot role to: `{role.name}`", ephemeral=True)
@slash_set_prefix.error
async def set_prefix_error(self, inter: disnake.ApplicationCommandInteraction, prefix): @set_bot_role.error
await inter.response.send_message("You don`t have permissions", ephemeral=True) @set_trigger_role.error
logger.error(prefix) @slash_set_prefix.error
async def set_prefix_error(self, inter: disnake.ApplicationCommandInteraction, prefix):
await inter.response.send_message("You don`t have permissions", ephemeral=True)
def setup(bot): # an extension must have a setup function logger.error(prefix)
bot.add_cog(Admin(bot)) # adding a cog
def setup(bot): # an extension must have a setup function
bot.add_cog(Admin(bot)) # adding a cog

View File

@@ -1,84 +1,96 @@
import random import random
import disnake import disnake
from disnake import OptionChoice, Option, OptionType, Member, VoiceState, ApplicationCommandInteraction from disnake import OptionChoice, Option, OptionType, Member, VoiceState
from disnake.ext import commands from disnake.ext import commands
from disnake.utils import get
from integral_lib.ListGenerator import ListGenerator from loguru import logger
from integral_lib.Logger import logger
from integral_lib.Player import play_audio from lib.ListGenerator import ListGenerator
from lib.Player import play_audio
class Audio(commands.Cog, name='Audio'):
def __init__(self, bot: commands.Bot): class Audio(commands.Cog, name='Audio'):
self.bot = bot def __init__(self, bot: commands.Bot):
self.bot = bot
@commands.Cog.listener()
async def on_ready(self): @commands.Cog.listener()
logger.info(f'Cog {__name__.split(".")[1]} is ready!.') async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.Cog.listener()
async def on_voice_state_update(self, member: Member, @commands.Cog.listener()
before: VoiceState, async def on_voice_state_update(self, member: Member,
after: VoiceState): before: VoiceState,
if before.channel is None and not member.bot: after: VoiceState):
logger.info(f'Coneccted {member.name} to {after.channel.name}') if before.channel is None and not member.bot:
if any('Escape from Tarkov' in str(user.activity) for user in after.channel.members): logger.info(f'Coneccted {member.name} to {after.channel.name}')
logger.info('Skip playing by Game') if any('Escape from Tarkov' in str(user.activity) for user in after.channel.members):
else: logger.info('Skip playing by Game')
# Prepare list of audio else:
from integral_lib.Comands import read_json # Prepare list of audio
_role = await read_json(member.guild.id, 'tigger_role') from lib.Comands import read_json
audio: list = [] _role = get(member.guild.roles, id=read_json(member.guild.id, 'tigger_role'))
for _a in ListGenerator('audio'): audio: list = []
audio.append(_a.name) for _a in ListGenerator('audio'):
audio.append(_a.name)
if len(member.roles) == 1 or _role is None:
logger.info('Skip playing by role') if len(member.roles) == 1 or _role is None:
elif any(str(role.id) in _role for role in member.roles): logger.info('Skip playing by role')
logger.info('Play audio from list by role') elif _role in member.roles:
await play_audio(f'audio/{random.choice(audio)}', self.bot, after.channel) logger.info('Play audio from list by role')
else: await play_audio(f'audio/{random.choice(audio)}', self.bot, after.channel)
logger.info('Skip playing by any else') else:
logger.info('Skip playing by any else')
@commands.slash_command(name="play_audio",
description="Make possible playing audio by command", @commands.slash_command(name="play_audio",
options=[ description="Make possible playing audio by command",
Option(name="audio", options=[
type=OptionType.string, Option(name="audio",
required=True type=OptionType.string,
) required=True
]) )
async def playaudio(self, inter: disnake.ApplicationCommandInteraction, ])
audio: str async def playaudio(self, inter: disnake.ApplicationCommandInteraction,
): audio: str
if inter.author.voice is not None: ):
await inter.response.send_message(f'Played {audio}', ephemeral=True) if inter.author.voice is not None:
await play_audio(audio, self.bot, inter.author.voice.channel) await inter.response.send_message(f'Played {audio}', ephemeral=True)
else:
await inter.response.send_message('You`re not in voice', ephemeral=True) await play_audio(audio, self.bot, inter.author.voice.channel)
else:
@playaudio.autocomplete('audio') await inter.response.send_message('You`re not in voice', ephemeral=True)
async def list_to_play(self, inter: ApplicationCommandInteraction, current: str):
""" @commands.slash_command(name="play_random",
Asynchronously generates a list of OptionChoices for the given audio files based on the user input. description="Make possible playing audio by command",
Parameters: options=[
- inter: disnake.ApplicationCommandInteraction - The interaction context for the command. Option(name="num",
- current: str - The current user input to filter the audio file choices. type=OptionType.integer,
Returns: )
- list[OptionChoice] - A list of OptionChoice objects representing the available audio file choices. ])
:param current: async def playrandom(self, inter: disnake.ApplicationCommandInteraction,
:param inter: ApplicationCommandInteraction num: int
""" ):
current = current.lower() if inter.author.voice is not None:
_dict: dict = {} audio: list = []
for f in ListGenerator('audio'): for _a in ListGenerator('audio'):
_dict[f.name] = f'{f.path}/{f.name}' audio.append(_a.name)
return [ for i in range(num):
OptionChoice(name=choice, value=f'{_dict[choice]}') logger.info(f'Played {i+1} times')
for choice in _dict if current in choice.lower() await play_audio(f'audio/{random.choice(audio)}', self.bot, inter.author.voice.channel)
] else:
await inter.response.send_message('You`re not in voice', ephemeral=True)
def setup(bot): # an extension must have a setup function @playaudio.autocomplete('audio')
bot.add_cog(Audio(bot)) # adding a cog async def list_to_play(self, inter: disnake.ApplicationCommandInteraction, current: str):
current = current.lower()
_dict: dict = {}
for f in ListGenerator('audio'):
_dict[f.name] = f'{f.path}/{f.name}'
return [
OptionChoice(name=choice, value=f'{_dict[choice]}')
for choice in _dict if current in choice.lower()
]
def setup(bot): # an extension must have a setup function
bot.add_cog(Audio(bot)) # adding a cog

View File

@@ -1,43 +1,43 @@
from typing import List from typing import List
import disnake import disnake
from disnake import OptionChoice from disnake import OptionChoice
from disnake.ext import commands from disnake.ext import commands
from loguru import logger
from integral_lib.Comands import write_json
from integral_lib.Logger import logger from lib.Comands import write_json
class Fun(commands.Cog, name='Fun'): class Fun(commands.Cog, name='Fun'):
def __init__(self, bot: commands.Bot): def __init__(self, bot: commands.Bot):
self.bot = bot # defining src as global var in class self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners @commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self): async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.') logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.has_permissions(administrator=True) @commands.has_permissions(administrator=True)
@commands.slash_command( @commands.slash_command(
name="set_bot_channel", name="set_bot_channel",
description="Set channel which iterate with src", description="Set channel which iterate with bot",
) )
async def set_bot_channel(self, inter: disnake.ApplicationCommandInteraction, channel: str): async def set_bot_channel(self, inter: disnake.ApplicationCommandInteraction, channel: str):
await write_json(inter.guild.id, await write_json(inter.guild.id,
"channel", "channel",
disnake.utils.find(lambda d: d.name == channel, inter.guild.channels).id) disnake.utils.find(lambda d: d.name == channel, inter.guild.channels).id)
await inter.response.send_message(f"Channel set up to {channel}", ephemeral=True) await inter.response.send_message(f"Channel set up to {channel}", ephemeral=True)
@set_bot_channel.autocomplete('channel') @set_bot_channel.autocomplete('channel')
async def _list_text_channels(self, async def _list_text_channels(self,
inter: disnake.ApplicationCommandInteraction, inter: disnake.ApplicationCommandInteraction,
current: str) -> List[OptionChoice]: current: str) -> List[OptionChoice]:
_list = [r.name for r in inter.guild.text_channels] _list = [r.name for r in inter.guild.text_channels]
return [ return [
OptionChoice(name=choice, value=choice) OptionChoice(name=choice, value=choice)
for choice in _list if current in choice for choice in _list if current in choice
] ]
def setup(bot): # an extension must have a setup function def setup(bot): # an extension must have a setup function
bot.add_cog(Fun(bot)) # adding a cog bot.add_cog(Fun(bot)) # adding a cog

View File

@@ -1,29 +1,29 @@
import disnake import disnake
from disnake import Option from disnake import Option
from disnake.ext import commands from disnake.ext import commands
from loguru import logger
from integral_lib import YandexPlayer
from integral_lib.Logger import logger from lib import YandexPlayer
class Testing(commands.Cog, name='Testing'): class Testing(commands.Cog, name='Testing'):
def __init__(self, bot: commands.Bot): def __init__(self, bot: commands.Bot):
self.bot = bot # defining src as global var in class self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners @commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self): async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.') logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(name='play', description='play audio test from yandex.music', options=[ @commands.slash_command(name='play', description='play audio test from yandex.music', options=[
Option(name='search', Option(name='search',
description='seach track/artist', description='seach track/artist',
required=True), required=True),
]) ])
async def play(self, inter: disnake.ApplicationCommandInteraction, search: str): async def play(self, inter: disnake.ApplicationCommandInteraction, search: str):
# TODO add yandex_music player with queue, playlists # TODO add yandex_music player with queue, playlists
result = YandexPlayer.search(search) result = YandexPlayer.search(search)
await inter.response.send_message(result, ephemeral=True) await inter.response.send_message(result, ephemeral=True)
def setup(bot): # an extension must have a setup function def setup(bot): # an extension must have a setup function
bot.add_cog(Testing(bot)) # adding a cog bot.add_cog(Testing(bot)) # adding a cog

View File

@@ -1,59 +1,60 @@
import disnake import disnake
from disnake import Option, OptionType, Colour from disnake import Option, OptionType, Colour
from disnake.ext import commands from disnake.ext import commands
from integral_lib.Logger import logger # from lib.Logger import logger
from loguru import logger
class General(commands.Cog):
def __init__(self, bot: commands.Bot): class General(commands.Cog):
self.bot = bot # defining src as global var in class def __init__(self, bot: commands.Bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self): @commands.Cog.listener() # this is a decorator for events/listeners
logger.info(f'Cog {__name__.split(".")[1]} is ready!.') async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(
name="info", @commands.slash_command(
options=[ name="info",
Option("user", options=[
"Specify any user", Option("user",
OptionType.user), "Specify any user",
] OptionType.user),
) ]
async def info(self, inter: disnake.ApplicationCommandInteraction, user=None): )
""" async def info(self, inter: disnake.ApplicationCommandInteraction, user=None):
Shows user info {{SLASH_INFO}} """
Shows user info {{SLASH_INFO}}
Parameters
---------- Parameters
:param inter: ----------
:param user: :param inter:
:return: :param user:
""" :return:
user = user or inter.author """
user = user or inter.author
rolelist = [r.mention for r in user.roles if r != inter.guild.default_role]
if rolelist: rolelist = [r.mention for r in user.roles if r != inter.guild.default_role]
roles = "\n".join(rolelist) if rolelist:
else: roles = "\n".join(rolelist)
roles = "Not added any role" else:
roles = "Not added any role"
emb = disnake.Embed(
title="General information", emb = disnake.Embed(
description=f"General information on server about {user}", title="General information",
color=Colour.random() description=f"General information on server about {user}",
) color=Colour.random()
emb.set_thumbnail(url=user.display_avatar) )
emb.add_field(name="General info", emb.set_thumbnail(url=user.display_avatar)
value=f"Username: {user}\n" emb.add_field(name="General info",
f"Nickname: {user.nick}\n" value=f"Username: {user}\n"
f"Joined at: {user.joined_at.strftime('%A, %B %d %Y @ %H:%M:%S')}", inline=False) f"Nickname: {user.nick}\n"
emb.add_field(name="Roles list", value=f"{roles}") f"Joined at: {user.joined_at.strftime('%A, %B %d %Y @ %H:%M:%S')}", inline=False)
emb.set_footer(text=f"Information requested by: {inter.author.display_name}") emb.add_field(name="Roles list", value=f"{roles}")
emb.set_footer(text=f"Information requested by: {inter.author.display_name}")
await inter.response.send_message(embed=emb, ephemeral=True)
await inter.response.send_message(embed=emb, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(General(bot)) # adding a cog def setup(bot): # an extension must have a setup function
bot.add_cog(General(bot)) # adding a cog

View File

@@ -1,44 +1,45 @@
import os import os
import disnake import disnake
import psutil import psutil
from disnake import ApplicationCommandInteraction from disnake.ext import commands
from disnake.ext import commands from loguru import logger
from __init__ import version_info as ver from __init__ import version_info as ver
from integral_lib.Comands import determine_prefix from lib.Comands import determine_prefix
from integral_lib.Logger import logger
# from lib.Logger import logger
class BotInfo(commands.Cog, name='Bot Info'):
def __init__(self, bot: commands.Bot): class BotInfo(commands.Cog, name='Bot Info'):
self.bot = bot # defining src as global var in class def __init__(self, bot: commands.Bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self): @commands.Cog.listener() # this is a decorator for events/listeners
logger.info(f'Cog {__name__.split(".")[1]} is ready!.') async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(name="info_bot",
description='Shows general info about src') # this is for making a command @commands.slash_command(name="info_bot",
async def info_bot(self, inter: ApplicationCommandInteraction): description='Shows general info about bot') # this is for making a command
_pid = os.getpid() async def info_bot(self, inter: disnake.ApplicationCommandInteraction):
_process = psutil.Process(_pid) _pid = os.getpid()
emb = disnake.Embed( _process = psutil.Process(_pid)
title="General information", emb = disnake.Embed(
description="General information on about src", title="General information",
) description="General information on about bot",
emb.set_thumbnail(self.bot.user.avatar.url) )
emb.add_field(name="System info:", value=f"Memory Usage: {round(_process.memory_info().rss / 2 ** 20, 2)} Mb\n" emb.set_thumbnail(self.bot.user.avatar.url)
f"CPU Usage: {_process.cpu_percent()}%\n" emb.add_field(name="System info:", value=f"Memory Usage: {round(_process.memory_info().rss / 2 ** 20, 2)} Mb\n"
f'Bot ping: {round(self.bot.latency * 1000)}\n' f"CPU Usage: {_process.cpu_percent()}%\n"
f'Prefix: `{determine_prefix(self.bot, inter)}\n`' f'Bot ping: {round(self.bot.latency * 1000)}\n'
) f'Prefix: `{determine_prefix(self.bot, inter)}\n`'
emb.add_field(name="Bot info:", value="Bot owner: <@386629192743256065>\n" )
f"Bot version: {ver.major}.{ver.minor}.{ver.micro}-{ver.releaselevel}") emb.add_field(name="Bot info:", value="Bot owner: <@386629192743256065>\n"
emb.set_footer(text=f"Information requested by: {inter.author.display_name}") f"Bot version: {ver.major}.{ver.minor}.{ver.micro}-{ver.releaselevel}")
emb.set_footer(text=f"Information requested by: {inter.author.display_name}")
await inter.response.send_message(embed=emb, ephemeral=True)
await inter.response.send_message(embed=emb, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(BotInfo(bot)) # adding a cog def setup(bot): # an extension must have a setup function
bot.add_cog(BotInfo(bot)) # adding a cog

View File

45
lib/CogsPrep.py Normal file
View File

@@ -0,0 +1,45 @@
"""
lib.CogsPrepare
~~~~~~~~~~~~~
Loads, unloads Cogs files
cog_list: return list of cog filenames
work_with_cogs: loads, reloads and unloads cogs files
"""
from os import listdir, rename
from typing import List
from disnake.ext import commands
# from .Logger import logger
from loguru import logger
async def cog_list(fold: str = './cogs') -> List[str]:
cogs_list = []
for _filename in listdir(fold):
if _filename.endswith('.py'):
cogs_list.append(_filename[:-3])
return cogs_list
async def work_with_cogs(what_do, bot: commands.Bot, cog):
if isinstance(cog, str):
cog = cog.split()
for _filename in cog:
if what_do == "load":
bot.load_extension(f'cogs.{_filename}')
logger.info(f'Loaded cog {_filename}')
elif what_do == 'unload':
bot.unload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} unloaded')
elif what_do == 'reload':
bot.reload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} reloaded')
elif what_do == 'disable':
bot.unload_extension(f'cogs.{_filename}')
rename(f'cogs/{_filename}.py', f'cogs/disabled/{_filename}.py')
logger.info(f'Cog {_filename} stopped and disabled')
elif what_do == 'enable':
rename(f'cogs/disabled/{_filename}.py', f'cogs/{_filename}.py')
bot.load_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} started and enabled')

View File

@@ -1,86 +1,73 @@
""" """
integral_lib.Commands lib.Commands
~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~
Some prepare for commands Some prepare for commands
"""
from json import load, decoder, dump, JSONDecodeError
""" from os import getenv
from json import load, decoder, dump, JSONDecodeError
from os import getenv from disnake.ext import commands
from disnake.ext import commands
def read_json(guild: int, _param: str):
"""
async def read_json(guild: int, _param: str): Reads Json file to determite config strings
""" :param guild: ID of Guild
Reads Json file to determite config strings :param _param: Parameter in json file
:param guild: ID of Guild :return: value of parameter.
:param _param: Parameter in json file """
:return: value of parameter. parameter = None
""" with open(getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON
parameter = None try:
with open(getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON _json = load(f) # Load the custom prefixes
try: except decoder.JSONDecodeError:
_json = load(f) # Load the custom prefixes _json = {}
except decoder.JSONDecodeError: if guild: # If the guild exists
_json = {} try:
if guild: # If the guild exists guild_conf = _json[f"{guild}"]
try: try:
guild_conf = _json[f"{guild}"] parameter = guild_conf[f"{_param}"]
try: except KeyError:
parameter = guild_conf[f"{_param}"] pass
except KeyError: except KeyError:
pass pass
except KeyError: return parameter
pass
return parameter
async def write_json(guild: int, param_name: str, param: str or int):
print(type(param))
async def write_json(guild: int, param_name: str, param: str or int): with open(getenv('CONF_FILE'), encoding='utf-8') as f:
""" try:
A function to write JSON data to a file after updating or adding a parameter value. _json = load(f)
except decoder.JSONDecodeError:
Parameters: _json = {}
try:
- guild: an integer representing the guild ID _guild = _json[f'{guild}']
- param_name: a string representing the parameter name except KeyError:
- param: a string or integer representing the parameter value _json.update({f'{guild}': {}})
_guild = _json[f'{guild}']
Returns: _guild.update({f'{param_name}': param})
This function does not return anything. _json.update({f'{guild}': _guild})
""" with open(getenv('CONF_FILE'), 'w', encoding='utf-8') as f:
with open(getenv('CONF_FILE'), encoding='utf-8') as f: dump(_json, f, indent=4)
try:
_json = load(f)
except decoder.JSONDecodeError: def determine_prefix(bot: commands.Bot, msg):
_json = {} """
try: Determite per-server bot prefix
_guild = _json[f'{guild}'] :param bot: Disnake Bot object
except KeyError: :param msg: Disnake msg object
_json.update({f'{guild}': {}}) :return: prefix for server, default is $
_guild = _json[f'{guild}'] """
_guild.update({f'{param_name}': f'{param}'}) parameter = '$'
with open(getenv('CONF_FILE'), 'w', encoding='utf-8') as f: with open(getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON
dump(_json, f, indent=4) try:
_json = load(f) # Load the custom prefixes
except JSONDecodeError:
def determine_prefix(bot: commands.Bot, msg): _json = {}
""" try:
Determine the per-server src prefix based on the given message object. parameter = _json[f"{msg.guild.id}"]["prefix"] # Read prefix from json if is setted up
:param bot: Disnake Bot object except KeyError:
:param msg: Disnake msg object pass
:return: prefix for server, default is $ return parameter
"""
parameter = '$'
with open(getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON
try:
_json = load(f) # Load the custom prefixes
except JSONDecodeError:
_json = {}
try:
parameter = _json[f"{msg.guild.id}"]["prefix"] # Read prefix from json if is setted up
except KeyError:
pass
return parameter

View File

@@ -1,134 +1,136 @@
from sqlite3 import connect, Error from sqlite3 import connect, Error
from .Logger import logger # from lib.Logger import logger
from loguru import logger
class DBReader:
class DBReader:
def __init__(self, guildid: int = None):
self._guildid = guildid def __init__(self, guildid: int = None):
self.list = self._read_db(self._guildid) self._guildid = guildid
self._current_index = 0 self.list = self._read_db(self._guildid)
self._current_index = 0
def __str__(self) -> str:
return str(self._guildid) def __str__(self) -> str:
return str(self._guildid)
@classmethod
def _read_db(cls, guildid: int) -> list: @classmethod
try: def _read_db(cls, guildid: int) -> list:
sql_con = connect("user.db") try:
cursor = sql_con.cursor() sql_con = connect("user.db")
cursor.execute(f"""SELECT * FROM "{guildid}" """) cursor = sql_con.cursor()
record = cursor.fetchall() cursor.execute("""SELECT * FROM "(guildid)" """,
return record {'guildid': guildid})
except Error as _e: record = cursor.fetchall()
logger.info(f'Error reading DB\n{_e}') return record
except Error as _e:
def __iter__(self): logger.info(f'Error reading DB\n{_e}')
return _ListGenerationIter(self)
def __iter__(self):
return _ListGenerationIter(self)
class _DBAttrs:
def __init__(self,
userid: int, class _DBAttrs:
username: str, def __init__(self,
nick: str, userid: int,
isbot: bool, username: str,
defaulttracks: None or list, nick: str,
usertracks: None or list): isbot: bool,
self.userid = userid defaulttracks: None or list,
self.username = username usertracks: None or list):
self.nick = nick self.userid = userid
self.isbot = isbot self.username = username
self.defaulttracks = defaulttracks self.nick = nick
self.usertracks = usertracks self.isbot = isbot
self.defaulttracks = defaulttracks
def __str__(self): self.usertracks = usertracks
return self.username
def __str__(self):
def __repr__(self): return self.username
return f'<File attrs userid={self.userid} username={self.username} nick={self.nick} ' \
f'isbot={self.isbot} defaulttracks={self.defaulttracks} usertracks={self.usertracks}>' def __repr__(self):
return f'<File attrs userid={self.userid} username={self.username} nick={self.nick} ' \
f'isbot={self.isbot} defaulttracks={self.defaulttracks} usertracks={self.usertracks}>'
class _ListGenerationIter:
def __init__(self, user_class):
self._current_index = 0 class _ListGenerationIter:
self._list = user_class.list def __init__(self, user_class):
self._current_index = 0
self._size = len(self._list) self._list = user_class.list
def __iter__(self): self._size = len(self._list)
return self
def __iter__(self):
def __next__(self): return self
if self._current_index < self._size:
_userid = self._list[self._current_index][0] def __next__(self):
_username = self._list[self._current_index][1] if self._current_index < self._size:
_nick = self._list[self._current_index][2] _userid = self._list[self._current_index][0]
_isbot = bool(self._list[self._current_index][3]) _username = self._list[self._current_index][1]
_defaulttracks = self._list[self._current_index][4] _nick = self._list[self._current_index][2]
_usertracks = self._list[self._current_index][5] _isbot = bool(self._list[self._current_index][3])
_defaulttracks = self._list[self._current_index][4]
self._current_index += 1 _usertracks = self._list[self._current_index][5]
memb = _DBAttrs(_userid,
_username, self._current_index += 1
_nick, memb = _DBAttrs(_userid,
_isbot, _username,
_defaulttracks, _nick,
_usertracks) _isbot,
return memb _defaulttracks,
raise StopIteration _usertracks)
return memb
raise StopIteration
async def prepare_db(guild: int):
try:
_connect = connect('user.db') async def prepare_db(guild: int):
cursor = _connect.cursor() try:
cursor.execute(f'CREATE TABLE IF NOT EXISTS "{guild}"\n' _connect = connect('user.db')
f' ([userid] INTEGER PRIMARY KEY, [username] TEXT,\n' cursor = _connect.cursor()
f' [nick] TEXT, [isbot] BOOL, [defaulttracks] TEXT, [usertracks] TEXT)') cursor.execute(f'''CREATE TABLE IF NOT EXISTS "{guild}"
cursor.close() ([userid] INTEGER PRIMARY KEY, [username] TEXT,
except Error as _error: [nick] TEXT, [isbot] BOOL, [defaulttracks] TEXT, [usertracks] TEXT)''')
logger.info(_error) cursor.close()
except Error as _error:
logger.info(_error)
async def work_with_db(db_func: str, data_turple: tuple):
"""
Writing to db per server userinfo async def work_with_db(db_func: str, data_turple: tuple):
:param db_func: """
:param data_turple: Writing to db per server userinfo
""" :param db_func:
try: :param data_turple:
_connect = connect('user.db') """
cursor = _connect.cursor() try:
cursor.execute(db_func, data_turple) _connect = connect('user.db')
_connect.commit() cursor = _connect.cursor()
cursor.close() cursor.execute(db_func, data_turple)
except Error as _error: _connect.commit()
logger.critical(_error) cursor.close()
except Error as _error:
logger.critical(_error)
async def fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int):
sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO '{guild}'
(username, userid, nick, isbot) async def fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int):
VALUES (?, ?, ?, ?)""") sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}"
data_tuple: tuple[str, int, str, bool] = (name, userid, nick, isbot) (username, userid, nick, isbot)
await work_with_db(sqlite_insert_with_param, data_tuple) VALUES (?, ?, ?, ?)""")
data_tuple: tuple[str, int, str, bool] = (name, userid, nick, isbot)
await work_with_db(sqlite_insert_with_param, data_tuple)
async def read_db(guild: int, user: int, column: str):
_col_dict = {'userid': 0,
'username': 1, async def read_db(guild: int, user: int, column: str):
'nick': 2, _col_dict = {'userid': 0,
'isbot': 3, 'username': 1,
'defaulttracks': 4, 'nick': 2,
'usertracks': 5} 'isbot': 3,
try: 'defaulttracks': 4,
sql_con = connect("user.db") 'usertracks': 5}
cursor = sql_con.cursor() try:
cursor.execute(f"""SELECT * FROM "{guild}" where userid = {user}""") sql_con = connect("user.db")
record = cursor.fetchone() cursor = sql_con.cursor()
return record[_col_dict[column]] cursor.execute(f"""SELECT * FROM "{guild}" where userid = {user}""")
except Error as _error: record = cursor.fetchone()
logger.critical(_error) return record[_col_dict[column]]
except Error as _error:
logger.critical(_error)

View File

@@ -1,86 +1,86 @@
from mimetypes import guess_type from mimetypes import guess_type
from os import walk from os import walk
class ListGenerator: class ListGenerator:
def __init__(self, path: str = None): def __init__(self, path: str = None):
self.path = path self.path = path
self.list: list = self._lister(self.path) self.list: list = self._lister(self.path)
try: try:
self._size = len(self.list) self._size = len(self.list)
except TypeError: except TypeError:
pass pass
self._current_index = 0 self._current_index = 0
def __str__(self) -> str: def __str__(self) -> str:
return 'Audio iter generator' return 'Audio iter generator'
@classmethod @classmethod
def _lister(cls, path) -> list: def _lister(cls, path) -> list:
_list: list = [] _list: list = []
try: try:
for f in walk(path): for f in walk(path):
_list.extend(f) _list.extend(f)
break break
return sorted(_list[2]) return sorted(_list[2])
except TypeError: except TypeError:
pass pass
def __iter__(self): def __iter__(self):
return _ListGenerationIter(self) return _ListGenerationIter(self)
def __next__(self): def __next__(self):
if self._current_index < self._size: if self._current_index < self._size:
self._current_index += 1 self._current_index += 1
class _FileAttrs: class _FileAttrs:
def __init__(self, name, path, mimetype, exc): def __init__(self, name, path, mimetype, exc):
self.name = name self.name = name
self.path = path self.path = path
self.mimetype = mimetype self.mimetype = mimetype
self.exc = exc self.exc = exc
def __str__(self): def __str__(self):
return self.name return self.name
def __repr__(self): def __repr__(self):
return f'<File attrs name={self.name} path={self.path} exc={self.exc} type=<{self.mimetype}> >' return f'<File attrs name={self.name} path={self.path} exc={self.exc} type=<{self.mimetype}> >'
class _ListGenerationIter: class _ListGenerationIter:
def __init__(self, list_class): def __init__(self, list_class):
self._current_index = 0 self._current_index = 0
self._list = list_class.list self._list = list_class.list
self._name = self._list[self._current_index] self._name = self._list[self._current_index]
self._path = list_class.path self._path = list_class.path
self._size = len(self._list) self._size = len(self._list)
def __iter__(self): def __iter__(self):
return self return self
@property @property
def type(self) -> str: def type(self) -> str:
guess = guess_type(f'{self._path}/{self._list[self._current_index]}')[0] guess = guess_type(f'{self._path}/{self._list[self._current_index]}')[0]
return guess return guess
@property @property
def _exc(self) -> str: def _exc(self) -> str:
try: try:
_ret = self._list[self._current_index].split('.')[-1] _ret = self._list[self._current_index].split('.')[-1]
except AttributeError: except AttributeError:
_ret = None _ret = None
return _ret return _ret
def __next__(self): def __next__(self):
if self._current_index < self._size: if self._current_index < self._size:
_name = self._list[self._current_index] _name = self._list[self._current_index]
_path = self._path _path = self._path
_type = self.type _type = self.type
_exc = self._exc _exc = self._exc
self._current_index += 1 self._current_index += 1
_memb = _FileAttrs(_name, _path, _type, _exc) _memb = _FileAttrs(_name, _path, _type, _exc)
return _memb return _memb
raise StopIteration raise StopIteration

View File

@@ -1,17 +1,20 @@
from asyncio import sleep from asyncio import sleep
from disnake import FFmpegPCMAudio from disnake import FFmpegOpusAudio
from loguru import logger
from .Logger import logger
# from .Logger import logger
async def play_audio(audio, bot, vc):
if not bot.voice_clients:
logger.error(f'Playing: {audio}') @logger.catch
vp = await vc.connect() async def play_audio(audio, bot, vc):
if not vp.is_playing(): if not bot.voice_clients:
vp.play(FFmpegPCMAudio(f'{audio}', )) logger.error(f'Playing: {audio}')
while vp.is_playing(): vp = await vc.connect()
await sleep(0.5) if not vp.is_playing():
await sleep(1) vp.play(FFmpegOpusAudio(f'{audio}', executable='ffmpeg', options='-nostats -loglevel 0'))
await vp.disconnect() while vp.is_playing():
await sleep(0.5)
await sleep(1)
await vp.disconnect()

5
lib/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""
lib
~~~~~~~~~~~~~
Some libs for the bot which help him
"""

6
qodana.yaml Normal file
View File

@@ -0,0 +1,6 @@
version: "1.0"
bootstrap: |
rm -rf .idea
pip install -U -r requirements.txt
profile:
name: qodana.recommended

Binary file not shown.

View File

@@ -1,78 +0,0 @@
"""
integral_lib.CogsPrepare
~~~~~~~~~~~~~
Loads, unloads Cogs files
cog_list: return list of cog filenames
work_with_cogs: loads, reloads and unloads cogs files
"""
from os import listdir, rename
from typing import List
from disnake.ext.commands.interaction_bot_base import InteractionBotBase
from .Logger import logger
def cog_list(fold: str = './cogs') -> List[str]:
"""
A function that generates a list of cog names based on the files present in a specified folder.
Parameters:
- fold (str): The directory path where the cog files are located. Defaults to './cogs'.
Returns:
- List[str]: A list of cog names without the '.py' extension.
"""
cogs_list = []
for _filename in listdir(fold):
if _filename.endswith('.py'):
cogs_list.append(_filename[:-3])
return cogs_list
def work_with_cogs(what_do: str,
bot: InteractionBotBase,
cog: str | list):
"""
Perform the specified action on the given cog or list of cogs.
Args:
what_do (str): The action to perform (load, unload, reload, disable, enable).
src (InteractionBotBase): The src instance to work with.
cog (str | list): The name of the cog or a list of cogs to work with.
Raises:
ValueError: If the action is not recognized.
Returns:
None
--------
:param cog: str | list
:param bot: InteractionBotBase
:param what_do: str = ['load', 'unload', 'reload', 'disable', 'enable']
"""
if isinstance(cog, str):
cog = cog.split()
for _filename in cog:
if _filename.endswith('.py'):
_filename = _filename.split('.')[0]
if what_do == "load":
bot.load_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} loaded')
elif what_do == 'unload':
bot.unload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} unloaded')
elif what_do == 'reload':
bot.reload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} reloaded')
elif what_do == 'disable':
bot.unload_extension(f'cogs.{_filename}')
rename(f'cogs/{_filename}.py',
f'cogs/disabled/{_filename}.py')
logger.info(f'Cog {_filename} stopped and disabled')
elif what_do == 'enable':
rename(f'cogs/disabled/{_filename}.py',
f'cogs/{_filename}.py')
bot.load_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} started and enabled')
else:
raise ValueError(f"Unrecognized action: {what_do}")

View File

@@ -1,52 +0,0 @@
import logging
class _CustomFormatter(logging.Formatter):
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
fmt = "%(asctime)s - [%(levelname)s] -%(module)s- %(message)s"
FORMATS = {
logging.DEBUG: grey + fmt + reset,
logging.INFO: grey + fmt + reset,
logging.WARNING: yellow + fmt + reset,
logging.ERROR: red + fmt + reset,
logging.CRITICAL: bold_red + fmt + reset
}
old_factory = logging.getLogRecordFactory()
def _record_factory(*args, **kwargs):
record = _CustomFormatter.old_factory(*args, **kwargs)
_module = record.module
_levelname = record.levelname
if len(_module) % 2 == 0 and len(_module) < 12:
_module = ' ' * ((12 - len(_module)) // 2) + _module + ' ' * ((12 - len(_module)) // 2)
elif len(_module) % 2 == 1 and len(_module) < 12:
_module = ' ' * ((12 - len(_module)) // 2) + record.module + ' ' * ((12 - len(_module)) // 2 + 1)
if len(_levelname) % 2 == 0 and len(_levelname) < 8:
_levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2)
elif len(record.levelname) % 2 == 1 and len(record.module) < 8:
_levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2 + 1)
record.module = _module
record.levelname = _levelname
return record
logging.setLogRecordFactory(_record_factory)
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt, "%d-%m-%Y %H:%M:%S")
return formatter.format(record)
logger = logging.getLogger('Pisya_Bot')
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(_CustomFormatter())
logger.addHandler(ch)

View File

@@ -1,5 +0,0 @@
"""
integral_lib
~~~~~~~~~~~~~
Some libs for the src which help him
"""

Binary file not shown.

View File

@@ -1,41 +0,0 @@
from disnake import Intents
from disnake.ext import commands
intents = Intents(messages=True,
message_content=True,
)
class Cog(commands.Cog):
@commands.command()
async def my_cmd(self, ctx):
pass
class CogTwo(commands.Cog):
async def bot_check(self, ctx) -> bool:
pass
class CogThree(commands.Cog):
async def bot_check_once(self, ctx) -> bool:
pass
class CogFour(commands.Cog):
async def bot_slash_command_check(self, ctx) -> bool:
pass
class CogFive(commands.Cog):
async def can_run(self, ctx) -> bool:
pass
def test_bot():
bot = commands.Bot(command_prefix='$', intents=intents)
bot.add_cog(Cog())
bot.add_cog(CogTwo())
bot.add_cog(CogThree())
bot.add_cog(CogFour())
bot.add_cog(CogFive())

View File

@@ -1,31 +0,0 @@
import sys
import pytest
from disnake.ext.commands.common_bot_base import CommonBotBase
from mock import mock
sys.path.append('src')
from integral_lib import CogsPrep
def test_cog_list():
with mock.patch('integral_lib.CogsPrep.listdir') as MockClass:
MockClass.return_value = ['cog1.py', 'cog2.py']
result = CogsPrep.cog_list()
assert result == ['cog1', 'cog2']
@pytest.mark.parametrize("cog", ["cog1.py", "cog2"])
@pytest.mark.parametrize("what_do", ['load', 'unload', 'reload', 'disable', 'enable'])
def test_work_with_cogs(what_do, cog):
with mock.patch('integral_lib.CogsPrep.rename') as mock_rename:
mock_rename.return_value = None
mock_bot = mock.MagicMock(spec=CommonBotBase)
result = CogsPrep.work_with_cogs(what_do, mock_bot, cog)
if what_do in ['load', 'enable']:
assert mock_bot.load_extension.called
elif what_do in ['unload', 'disable']:
assert mock_bot.unload_extension.called
elif what_do == 'reload':
assert mock_bot.reload_extension.called