9 Commits
v0.1 ... master

Author SHA1 Message Date
Slava
0946a7f51e Update .gitlab-ci.yml file 2024-05-28 21:24:14 +00:00
Slava
3e0a0c197a Update .gitlab-ci.yml file 2024-05-28 19:58:20 +00:00
Slava
b43c5cf5c1 Удалить check.py 2024-05-28 19:56:55 +00:00
bot
a136491362 restore 2024-05-28 22:55:47 +03:00
bacon
413736b44d changed logger 2024-04-13 20:03:57 +03:00
bot
78be10a88b v0.0.8 2024-03-25 10:14:57 +03:00
Slava
c3140403b5 Update .gitlab-ci.yml file 2024-03-16 17:51:18 +00:00
Slava
c60502a241 Update .gitlab-ci.yml file 2024-03-16 10:46:43 +00:00
Slava
17f1972104 Update .gitlab-ci.yml file 2024-03-16 10:43:16 +00:00
54 changed files with 996 additions and 1255 deletions

View File

@@ -1,11 +0,0 @@
/tmp/
audio/*/
/.idea
user.db
*.pyc
/.run/
.env
*.exe
/venv/
config.json
.editorconfig

11
.gitignore vendored
View File

@@ -1,11 +1,14 @@
/tmp/
audio/*/
/audio/*/
/.idea
user.db
/user.db
*.pyc
/.run/
.env
/.env
*.exe
/venv/
/fun_and_admin_bot.egg-info/
/.YMcache/
config.json
.editorconfig
/env/
/.project

View File

@@ -1,111 +1,19 @@
# You can override the included template(s) by including variable overrides
# SAST customization: https://docs.gitlab.com/ee/user/application_security/sast/#customizing-the-sast-settings
# Secret Detection customization: https://docs.gitlab.com/ee/user/application_security/secret_detection/#customizing-settings
# Dependency Scanning customization: https://docs.gitlab.com/ee/user/application_security/dependency_scanning/#customizing-the-dependency-scanning-settings
# Container Scanning customization: https://docs.gitlab.com/ee/user/application_security/container_scanning/#customizing-the-container-scanning-settings
# Note that environment variables can be set in several places
# See https://docs.gitlab.com/ee/ci/variables/#cicd-variable-precedence
stages:
- 'test'
- 'build'
- 'push'
- 'code_quality'
- 'deploy'
pytest:
sast:
stage: 'test'
image: python
only:
- test
- tags
cache:
paths:
- $CI_PROJECT_DIR/venv/
before_script:
- cd $CI_PROJECT_DIR
- python -m venv venv
- chmod u+x venv/bin/activate
- source venv/bin/activate
- pip install --upgrade --quiet pip
- pip install --quiet -U -r requirements.txt
- pip install --quiet -U -r tests/requirements.txt
script:
- pytest -v
variables:
# DOCKER_HOST: tcp://docker:2375
# fill those if you have a proxy in your environment
DOCKER_DRIVER: overlay2
# See https://github.com/docker-library/docker/pull/166
DOCKER_TLS_CERTDIR: "/certs"
make_image:
stage: build
needs:
- pytest
image: docker:26.1.3
services:
- docker:26.1.3-dind
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
# fetches the latest image (not failing if image is not found)
- docker pull $CI_REGISTRY_IMAGE:latest || true
# builds the project, passing proxy variables, using OCI labels
# notice the cache-from, which is going to use the image we just pulled locally
# the built image is tagged locally with the commit SHA, and then pushed to
# the GitLab registry
- >
DOCKER_BUILDKIT=1 docker build
--pull
--cache-from $CI_REGISTRY_IMAGE:latest
--label "org.opencontainers.image.title=$CI_PROJECT_TITLE"
--label "org.opencontainers.image.url=$CI_PROJECT_URL"
--label "org.opencontainers.image.created=$CI_JOB_STARTED_AT"
--label "org.opencontainers.image.revision=$CI_COMMIT_SHA"
--label "org.opencontainers.image.version=$CI_COMMIT_REF_NAME"
--tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA
.
- docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA
# Here, the goal is to tag the "master" branch as "latest"
Push latest:
variables:
# We are just playing with Docker here.
# We do not need GitLab to clone the source code.
GIT_STRATEGY: none
stage: push
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
# Because we have no guarantee that this job will be picked up by the same runner
# that built the image in the previous step, we pull it again locally
- docker pull $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA
# Then we tag it "latest"
- docker tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA $CI_REGISTRY_IMAGE:latest
# Annnd we push it.
- docker push $CI_REGISTRY_IMAGE:latest
# Finally, the goal here is to Docker tag any Git tag
# GitLab will start a new pipeline everytime a Git tag is created, which is pretty awesome
Push commit:
variables:
# Again, we do not need the source code here. Just playing with Docker.
GIT_STRATEGY: none
stage: push
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker pull $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA
- docker tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA
- docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_NAME
Push tag:
variables:
# Again, we do not need the source code here. Just playing with Docker.
GIT_STRATEGY: none
stage: push
only:
# We want this job to be run on tags only.
- tags
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
script:
- docker pull $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA
- docker tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHORT_SHA $CI_REGISTRY_IMAGE:$CI_COMMIT_TAG
- docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_TAG
include:
- template: Security/SAST.gitlab-ci.yml
- template: Security/Dependency-Scanning.gitlab-ci.yml
- template: Security/SAST-IaC.gitlab-ci.yml

2
CHANGELOG Normal file
View File

@@ -0,0 +1,2 @@
0.0.5
Initial

View File

@@ -1,22 +0,0 @@
FROM python:3.12.2-slim-bookworm as builder
LABEL authors="bacon"
WORKDIR /app
COPY requirements.txt /app
RUN pip wheel install \
--no-cache-dir -q \
--no-deps --wheel-dir /app/wheels \
-r requirements.txt
FROM python:3.12.2-slim-bookworm as runner
WORKDIR /app
COPY --from=builder /app/wheels /wheels
RUN apt-get update -qq && \
apt-get install -qq --no-install-recommends ffmpeg -y && \
apt-get clean -qq autoclean && \
pip install --no-cache -q /wheels/*
COPY src/ /app
ENTRYPOINT ["python", "bot.py"]
#ENTRYPOINT ["ffmpeg"]

View File

@@ -1,16 +1,16 @@
__version__ = '0.0.7'
__title__ = "src"
__author__ = "baconborn"
from typing import NamedTuple, Literal
class VersionInfo(NamedTuple):
major: int
minor: int
micro: int
releaselevel: Literal["alpha", "beta", "candidate", "final"]
serial: int
version_info: VersionInfo = VersionInfo(major=0, minor=0, micro=7, releaselevel="alpha", serial=0)
__version__ = '0.0.8'
__title__ = "Pisya_bot"
__author__ = "baconborn"
from typing import NamedTuple, Literal
class VersionInfo(NamedTuple):
major: int
minor: int
micro: int
releaselevel: Literal["alpha", "beta", "candidate", "final"]
serial: int
version_info: VersionInfo = VersionInfo(major=0, minor=0, micro=8, releaselevel="alpha", serial=0)

319
src/bot.py → bot.py Normal file → Executable file
View File

@@ -1,158 +1,161 @@
#!/usr/bin/env python3
from os import getenv
from os.path import isfile
from disnake import OptionType, Option, Localized, ApplicationCommandInteraction, Intents, __version__
from disnake.ext.commands import Bot, is_owner
from dotenv import load_dotenv
from integral_lib.CogsPrep import work_with_cogs, cog_list
from integral_lib.Comands import determine_prefix
from integral_lib.Logger import logger
load_dotenv()
if not isfile('.env') or not getenv('CONF_FILE'):
with open('.env', 'a', encoding='utf-8') as f:
f.write("CONF_FILE='config.json'\n")
load_dotenv()
if not isfile(getenv('CONF_FILE')):
with open(getenv('CONF_FILE'), 'a', encoding='utf-8') as f:
f.write("")
intents = Intents(messages=True,
guilds=True,
message_content=True,
voice_states=True,
members=True,
presences=True
)
bot = Bot(command_prefix=determine_prefix,
intents=intents,
reload=True,
test_guilds=[648126669122568215]
)
bot.i18n.load("locale/")
work_with_cogs('load', bot, cog_list())
@bot.event
async def on_ready():
logger.info('Bot started')
logger.info(f'Disnake version {__version__}')
logger.info(f'We have logged in as {bot.user}')
@bot.slash_command(
name='cog',
options=[
Option(
name=Localized('cog', key="COG".lower()),
description=Localized("cog file", key="COG_FILE"),
type=OptionType.string
)
]
)
@is_owner()
async def slash_cogs(inter: ApplicationCommandInteraction):
"""
Working with cogs (modules) {{SLASH_COG}}
Parameters
----------
:param inter:
"""
pass
@slash_cogs.sub_command(description=Localized("Enables Cog", key="ENABLE_COG"))
async def enable(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('enable', bot, cog)
await inter.response.send_message(f'Cog {cog} is enabled', ephemeral=True)
@slash_cogs.sub_command(description=Localized("Disables Cog", key="DISABLE_COG"))
async def disable(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('disable', bot, cog)
await inter.response.send_message(f'Cog {cog} is disabled', ephemeral=True)
@slash_cogs.sub_command(description=Localized("Loads Cog", key="LOAD_COG"))
async def load(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('load', bot, cog)
await inter.response.send_message(f'Cog {cog} is loaded', ephemeral=True)
@slash_cogs.sub_command(description=Localized("Unload Cog", key="UNLOAD_COG"))
async def unload(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('unload', bot, cog)
await inter.response.send_message(f'Cog {cog} is unload', ephemeral=True)
@slash_cogs.sub_command(description=Localized("Reloads Cog", key="RELOAD_COG"))
async def reload(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('reload', bot, cog)
await inter.response.send_message(f'Cog {cog} is reloaded', ephemeral=True)
@disable.autocomplete('cog')
@unload.autocomplete('cog')
@load.autocomplete('cog')
@reload.autocomplete('cog')
async def _cog_opt(inter: ApplicationCommandInteraction, current: str):
current = current.lower()
_list = cog_list(fold='./cogs/')
return [choice for choice in _list if current in choice.lower()]
@enable.autocomplete('cog')
async def _cog_opt(inter: ApplicationCommandInteraction, current: str):
current = current.lower()
_list = cog_list(fold='./cogs/disabled/')
return [choice for choice in _list if current in choice.lower()]
@slash_cogs.error
async def cogs_error(inter: ApplicationCommandInteraction):
await inter.response.send_message(Localized("Error", key="EROR"), ephemeral=True)
logger.error(f'User {inter.author} tries to use cogs func')
bot.run(getenv('TOKEN'))
#!/usr/bin/env python3
import asyncio
from os import getenv
from os.path import isfile
from disnake import OptionType, Option, Localized, ApplicationCommandInteraction, Intents, __version__
from disnake.ext.commands import Bot, is_owner
from dotenv import load_dotenv
from loguru import logger
from __init__ import __version__ as ver
from lib.CogsPrep import work_with_cogs, cog_list
from lib.Comands import determine_prefix
load_dotenv()
if not isfile('.env') or not getenv('CONF_FILE'):
with open('.env', 'a', encoding='utf-8') as f:
f.write("CONF_FILE='config.json'\n")
load_dotenv()
if not isfile(getenv('CONF_FILE')):
with open(getenv('CONF_FILE'), 'a', encoding='utf-8') as f:
f.write("")
intents = Intents(messages=True,
guilds=True,
message_content=True,
voice_states=True,
members=True,
presences=True
)
bot = Bot(command_prefix=determine_prefix,
intents=intents,
reload=True,
test_guilds=[648126669122568215]
)
bot.i18n.load("locale/")
asyncio.run(work_with_cogs('load', bot, asyncio.run(cog_list())))
@bot.event
async def on_ready():
logger.info('Bot started')
logger.info(f'Disnake version {__version__}')
logger.info(f'We have logged in as {bot.user}')
logger.info(f'Version of bot is - v{ver}')
@bot.slash_command(
name='cog',
options=[
Option(
name=Localized('cog', key="COG".lower()),
description=Localized("cog file", key="COG_FILE"),
type=OptionType.string
)
]
)
@is_owner()
async def slash_cogs(inter: ApplicationCommandInteraction):
"""
Working with cogs (modules) {{SLASH_COG}}
Parameters
----------
:param inter:
"""
pass
@slash_cogs.sub_command(description=Localized("Enables Cog", key="ENABLE_COG"))
async def enable(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('enable', bot, cog)
await inter.response.send_message(f'Cog {cog} is enabled', ephemeral=True)
@slash_cogs.sub_command(description=Localized("Disables Cog", key="DISABLE_COG"))
async def disable(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('disable', bot, cog)
await inter.response.send_message(f'Cog {cog} is disabled', ephemeral=True)
@slash_cogs.sub_command(description=Localized("Loads Cog", key="LOAD_COG"))
async def load(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('load', bot, cog)
await inter.response.send_message(f'Cog {cog} is loaded', ephemeral=True)
@slash_cogs.sub_command(description=Localized("Unload Cog", key="UNLOAD_COG"))
async def unload(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('unload', bot, cog)
await inter.response.send_message(f'Cog {cog} is unload', ephemeral=True)
@slash_cogs.sub_command(description=Localized("Reloads Cog", key="RELOAD_COG"))
async def reload(inter: ApplicationCommandInteraction, cog: str):
"""
Parameters
----------
:param inter:
:param cog: Select Cogfile {{COG_FILE}}
"""
await work_with_cogs('reload', bot, cog)
await inter.response.send_message(f'Cog {cog} is reloaded', ephemeral=True)
@disable.autocomplete('cog')
@unload.autocomplete('cog')
@load.autocomplete('cog')
@reload.autocomplete('cog')
async def _cog_opt(inter: ApplicationCommandInteraction, current: str):
current = current.lower()
_list = await cog_list(fold='./cogs/')
return [choice for choice in _list if current in choice.lower()]
@enable.autocomplete('cog')
async def _cog_opt(inter: ApplicationCommandInteraction, current: str):
current = current.lower()
_list = await cog_list(fold='./cogs/disabled/')
return [choice for choice in _list if current in choice.lower()]
@slash_cogs.error
async def cogs_error(inter: ApplicationCommandInteraction):
await inter.response.send_message(Localized("Error", key="EROR"), ephemeral=True)
logger.error(f'User {inter.author} tries to use cogs func')
bot.run(getenv('TOKEN'))

View File

@@ -1,148 +1,151 @@
from asyncio import sleep
import disnake
from disnake import Option, OptionType, Localized
from disnake.ext import tasks
from integral_lib.Comands import *
from integral_lib.DB_Worker import fill_bd, prepare_db, work_with_db
from integral_lib.Logger import logger
class Admin(commands.Cog, name='Admin'):
def __init__(self, bot: commands.Bot):
self.bot = bot # a defining src as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
for g in self.bot.get_all_members():
await prepare_db(g.guild.id)
for g in self.bot.get_all_members():
await fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id)
self.activity.start()
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@tasks.loop(seconds=20)
async def activity(self):
await self.bot.change_presence(
activity=disnake.Activity(
name=f'at users: {str(len(self.bot.users))}',
type=3
)
)
await sleep(10)
await self.bot.change_presence(
activity=disnake.Activity(
name=f'at servers: {str(len(self.bot.guilds))}',
type=3
)
)
@commands.Cog.listener()
async def on_member_update(self, before: disnake.Member, after: disnake.Member):
sql_update_query = f"""UPDATE "{after.guild.id}" set nick = ? where userid = ?"""
data_tuple = (after.nick, before.id)
await work_with_db(sql_update_query, data_tuple)
@commands.Cog.listener()
async def on_guild_join(self, guild):
for g in guild.members:
await fill_bd(g.name, g.id, g.bot, g.nick, guild.id)
@commands.Cog.listener()
async def on_member_join(self, member):
await fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id)
bot_role = read_json(member.guild.id, 'bot_role') # Get src role
guest_role = read_json(member.guild.id, 'guest_role') # Get guest role
if bot_role or guest_role:
if member.bot == 0:
role = disnake.utils.get(member.guild.roles, id=guest_role)
else:
role = disnake.utils.get(member.guild.roles, id=bot_role)
logger.info(f"Adding to {member} role {role}")
await member.add_roles(role)
@commands.slash_command(
name="set_default_role",
description=Localized("Set Default src role", key="DEF_ROLE"),
options=[
Option("role",
"Specify role",
OptionType.role,
required=True),
]
)
@commands.has_permissions(administrator=True)
async def set_guest_role(self, inter: disnake.ApplicationCommandInteraction, role):
await write_json(inter.guild.id, "guest_role", role.id)
await inter.response.send_message(f"Set up src role to: `{role.name}`", ephemeral=True)
@commands.command(name="set_prefix")
@commands.has_permissions(administrator=True)
async def command_set_prefix(self, ctx, prefix: str):
await write_json(ctx.guild.id, "prefix", prefix)
await ctx.reply(f"Prefix set to: `{prefix}`")
@commands.guild_only()
@commands.slash_command(
name="set_prefix",
description="Setting up src prefix",
options=[
Option("prefix",
"Specify prefix",
OptionType.string,
required=True),
]
)
@commands.has_permissions(administrator=True)
async def slash_set_prefix(self, inter: disnake.ApplicationCommandInteraction, prefix: str):
await write_json(inter.guild.id, "prefix", prefix)
await inter.response.send_message(f"Prefix set to: `{prefix}`", ephemeral=True)
@commands.guild_only()
@commands.has_permissions(administrator=True)
@commands.slash_command(
name="set_trigger_role",
description=Localized("Setting up role to trigger src", key="KEY_ROLE"),
options=[
Option("role",
Localized("Specify role", key="SETUP_ROLE"),
OptionType.role,
required=True),
]
)
async def set_trigger_role(self, inter: disnake.ApplicationCommandInteraction, role):
await write_json(inter.guild.id, "tigger_role", role.id)
await inter.response.send_message(f"Role set to: `{role.name}`", ephemeral=True)
@commands.slash_command(
name="set_bot_role",
description=Localized("Set src role", key="BOT_ROLE"),
options=[
Option("role",
Localized("Specify role", key="SETUP_ROLE"),
OptionType.role,
required=True)
]
)
@commands.guild_only()
@commands.has_permissions(administrator=True)
async def set_bot_role(self, ctx, role):
await write_json(ctx.guild.id,
"bot_role",
role.id)
await ctx.send(f"Set up src role to: `{role.name}`", ephemeral=True)
@set_bot_role.error
@set_trigger_role.error
@slash_set_prefix.error
async def set_prefix_error(self, inter: disnake.ApplicationCommandInteraction, prefix):
await inter.response.send_message("You don`t have permissions", ephemeral=True)
logger.error(prefix)
def setup(bot): # an extension must have a setup function
bot.add_cog(Admin(bot)) # adding a cog
from asyncio import sleep
import disnake
from disnake import Option, OptionType, Localized
from disnake.ext import commands, tasks
from loguru import logger
from lib.Comands import read_json, write_json
from lib.DB_Worker import fill_bd, prepare_db, work_with_db
class Admin(commands.Cog, name='Admin'):
def __init__(self, bot: commands.Bot):
self.bot = bot # a defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
for g in self.bot.get_all_members():
await prepare_db(g.guild.id)
for g in self.bot.get_all_members():
await fill_bd(g.name, g.id, g.bot, g.nick, g.guild.id)
self.activity.start()
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@tasks.loop(seconds=20)
async def activity(self):
await self.bot.change_presence(
activity=disnake.Activity(
name=f'at users: {str(len(self.bot.users))}',
type=3
)
)
await sleep(10)
await self.bot.change_presence(
activity=disnake.Activity(
name=f'at servers: {str(len(self.bot.guilds))}',
type=3
)
)
@commands.Cog.listener()
async def on_member_update(self, before: disnake.Member, after: disnake.Member):
sql_update_query = f"""UPDATE "{after.guild.id}" set nick = ? where userid = ?"""
data_tuple = (after.nick, before.id)
await work_with_db(sql_update_query, data_tuple)
@commands.Cog.listener()
async def on_guild_join(self, guild):
for g in guild.members:
await fill_bd(g.name, g.id, g.bot, g.nick, guild.id)
@commands.Cog.listener()
async def on_member_join(self, member):
await fill_bd(member.name, member.id, member.bot, member.nick, member.guild.id)
bot_role = read_json(member.guild.id, 'bot_role') # Get bot role
guest_role = read_json(member.guild.id, 'guest_role') # Get guest role
logger.info(f"Bot role: {bot_role} | Guest role: {guest_role}")
logger.info(f'type bot_role: {type(bot_role)} | type guest_role: {type(guest_role)}')
if bot_role or guest_role:
if member.bot == 0:
role = disnake.utils.get(member.guild.roles, id=guest_role)
else:
role = disnake.utils.get(member.guild.roles, id=bot_role)
logger.info(f"Adding to {member} role {role}")
await member.add_roles(role)
@commands.slash_command(
name="set_default_role",
description=Localized("Set Default bot role", key="DEF_ROLE"),
options=[
Option("role",
"Specify role",
OptionType.role,
required=True),
]
)
@commands.has_permissions(administrator=True)
async def set_guest_role(self, inter: disnake.ApplicationCommandInteraction, role):
print(type(role.id))
await write_json(inter.guild.id, "guest_role", role.id)
await inter.response.send_message(f"Set up bot role to: `{role.name}`", ephemeral=True)
@commands.command(name="set_prefix")
@commands.has_permissions(administrator=True)
async def command_set_prefix(self, ctx, prefix: str):
await write_json(ctx.guild.id, "prefix", prefix)
await ctx.reply(f"Prefix set to: `{prefix}`")
@commands.guild_only()
@commands.slash_command(
name="set_prefix",
description="Setting up bot prefix",
options=[
Option("prefix",
"Specify prefix",
OptionType.string,
required=True),
]
)
@commands.has_permissions(administrator=True)
async def slash_set_prefix(self, inter: disnake.ApplicationCommandInteraction, prefix: str):
await write_json(inter.guild.id, "prefix", prefix)
await inter.response.send_message(f"Prefix set to: `{prefix}`", ephemeral=True)
@commands.guild_only()
@commands.has_permissions(administrator=True)
@commands.slash_command(
name="set_trigger_role",
description=Localized("Setting up role to trigger bot", key="KEY_ROLE"),
options=[
Option("role",
Localized("Specify role", key="SETUP_ROLE"),
OptionType.role,
required=True),
]
)
async def set_trigger_role(self, inter: disnake.ApplicationCommandInteraction, role):
await write_json(inter.guild.id, "tigger_role", role.id)
await inter.response.send_message(f"Role set to: `{role.name}`", ephemeral=True)
@commands.slash_command(
name="set_bot_role",
description=Localized("Set bot role", key="BOT_ROLE"),
options=[
Option("role",
Localized("Specify role", key="SETUP_ROLE"),
OptionType.role,
required=True)
]
)
@commands.guild_only()
@commands.has_permissions(administrator=True)
async def set_bot_role(self, ctx, role):
await write_json(ctx.guild.id,
"bot_role",
role.id)
await ctx.send(f"Set up bot role to: `{role.name}`", ephemeral=True)
@set_bot_role.error
@set_trigger_role.error
@slash_set_prefix.error
async def set_prefix_error(self, inter: disnake.ApplicationCommandInteraction, prefix):
await inter.response.send_message("You don`t have permissions", ephemeral=True)
logger.error(prefix)
def setup(bot): # an extension must have a setup function
bot.add_cog(Admin(bot)) # adding a cog

View File

@@ -1,84 +1,96 @@
import random
import disnake
from disnake import OptionChoice, Option, OptionType, Member, VoiceState, ApplicationCommandInteraction
from disnake.ext import commands
from integral_lib.ListGenerator import ListGenerator
from integral_lib.Logger import logger
from integral_lib.Player import play_audio
class Audio(commands.Cog, name='Audio'):
def __init__(self, bot: commands.Bot):
self.bot = bot
@commands.Cog.listener()
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.Cog.listener()
async def on_voice_state_update(self, member: Member,
before: VoiceState,
after: VoiceState):
if before.channel is None and not member.bot:
logger.info(f'Coneccted {member.name} to {after.channel.name}')
if any('Escape from Tarkov' in str(user.activity) for user in after.channel.members):
logger.info('Skip playing by Game')
else:
# Prepare list of audio
from integral_lib.Comands import read_json
_role = await read_json(member.guild.id, 'tigger_role')
audio: list = []
for _a in ListGenerator('audio'):
audio.append(_a.name)
if len(member.roles) == 1 or _role is None:
logger.info('Skip playing by role')
elif any(str(role.id) in _role for role in member.roles):
logger.info('Play audio from list by role')
await play_audio(f'audio/{random.choice(audio)}', self.bot, after.channel)
else:
logger.info('Skip playing by any else')
@commands.slash_command(name="play_audio",
description="Make possible playing audio by command",
options=[
Option(name="audio",
type=OptionType.string,
required=True
)
])
async def playaudio(self, inter: disnake.ApplicationCommandInteraction,
audio: str
):
if inter.author.voice is not None:
await inter.response.send_message(f'Played {audio}', ephemeral=True)
await play_audio(audio, self.bot, inter.author.voice.channel)
else:
await inter.response.send_message('You`re not in voice', ephemeral=True)
@playaudio.autocomplete('audio')
async def list_to_play(self, inter: ApplicationCommandInteraction, current: str):
"""
Asynchronously generates a list of OptionChoices for the given audio files based on the user input.
Parameters:
- inter: disnake.ApplicationCommandInteraction - The interaction context for the command.
- current: str - The current user input to filter the audio file choices.
Returns:
- list[OptionChoice] - A list of OptionChoice objects representing the available audio file choices.
:param current:
:param inter: ApplicationCommandInteraction
"""
current = current.lower()
_dict: dict = {}
for f in ListGenerator('audio'):
_dict[f.name] = f'{f.path}/{f.name}'
return [
OptionChoice(name=choice, value=f'{_dict[choice]}')
for choice in _dict if current in choice.lower()
]
def setup(bot): # an extension must have a setup function
bot.add_cog(Audio(bot)) # adding a cog
import random
import disnake
from disnake import OptionChoice, Option, OptionType, Member, VoiceState
from disnake.ext import commands
from disnake.utils import get
from loguru import logger
from lib.ListGenerator import ListGenerator
from lib.Player import play_audio
class Audio(commands.Cog, name='Audio'):
def __init__(self, bot: commands.Bot):
self.bot = bot
@commands.Cog.listener()
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.Cog.listener()
async def on_voice_state_update(self, member: Member,
before: VoiceState,
after: VoiceState):
if before.channel is None and not member.bot:
logger.info(f'Coneccted {member.name} to {after.channel.name}')
if any('Escape from Tarkov' in str(user.activity) for user in after.channel.members):
logger.info('Skip playing by Game')
else:
# Prepare list of audio
from lib.Comands import read_json
_role = get(member.guild.roles, id=read_json(member.guild.id, 'tigger_role'))
audio: list = []
for _a in ListGenerator('audio'):
audio.append(_a.name)
if len(member.roles) == 1 or _role is None:
logger.info('Skip playing by role')
elif _role in member.roles:
logger.info('Play audio from list by role')
await play_audio(f'audio/{random.choice(audio)}', self.bot, after.channel)
else:
logger.info('Skip playing by any else')
@commands.slash_command(name="play_audio",
description="Make possible playing audio by command",
options=[
Option(name="audio",
type=OptionType.string,
required=True
)
])
async def playaudio(self, inter: disnake.ApplicationCommandInteraction,
audio: str
):
if inter.author.voice is not None:
await inter.response.send_message(f'Played {audio}', ephemeral=True)
await play_audio(audio, self.bot, inter.author.voice.channel)
else:
await inter.response.send_message('You`re not in voice', ephemeral=True)
@commands.slash_command(name="play_random",
description="Make possible playing audio by command",
options=[
Option(name="num",
type=OptionType.integer,
)
])
async def playrandom(self, inter: disnake.ApplicationCommandInteraction,
num: int
):
if inter.author.voice is not None:
audio: list = []
for _a in ListGenerator('audio'):
audio.append(_a.name)
for i in range(num):
logger.info(f'Played {i+1} times')
await play_audio(f'audio/{random.choice(audio)}', self.bot, inter.author.voice.channel)
else:
await inter.response.send_message('You`re not in voice', ephemeral=True)
@playaudio.autocomplete('audio')
async def list_to_play(self, inter: disnake.ApplicationCommandInteraction, current: str):
current = current.lower()
_dict: dict = {}
for f in ListGenerator('audio'):
_dict[f.name] = f'{f.path}/{f.name}'
return [
OptionChoice(name=choice, value=f'{_dict[choice]}')
for choice in _dict if current in choice.lower()
]
def setup(bot): # an extension must have a setup function
bot.add_cog(Audio(bot)) # adding a cog

View File

@@ -1,43 +1,43 @@
from typing import List
import disnake
from disnake import OptionChoice
from disnake.ext import commands
from integral_lib.Comands import write_json
from integral_lib.Logger import logger
class Fun(commands.Cog, name='Fun'):
def __init__(self, bot: commands.Bot):
self.bot = bot # defining src as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.has_permissions(administrator=True)
@commands.slash_command(
name="set_bot_channel",
description="Set channel which iterate with src",
)
async def set_bot_channel(self, inter: disnake.ApplicationCommandInteraction, channel: str):
await write_json(inter.guild.id,
"channel",
disnake.utils.find(lambda d: d.name == channel, inter.guild.channels).id)
await inter.response.send_message(f"Channel set up to {channel}", ephemeral=True)
@set_bot_channel.autocomplete('channel')
async def _list_text_channels(self,
inter: disnake.ApplicationCommandInteraction,
current: str) -> List[OptionChoice]:
_list = [r.name for r in inter.guild.text_channels]
return [
OptionChoice(name=choice, value=choice)
for choice in _list if current in choice
]
def setup(bot): # an extension must have a setup function
bot.add_cog(Fun(bot)) # adding a cog
from typing import List
import disnake
from disnake import OptionChoice
from disnake.ext import commands
from loguru import logger
from lib.Comands import write_json
class Fun(commands.Cog, name='Fun'):
def __init__(self, bot: commands.Bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.has_permissions(administrator=True)
@commands.slash_command(
name="set_bot_channel",
description="Set channel which iterate with bot",
)
async def set_bot_channel(self, inter: disnake.ApplicationCommandInteraction, channel: str):
await write_json(inter.guild.id,
"channel",
disnake.utils.find(lambda d: d.name == channel, inter.guild.channels).id)
await inter.response.send_message(f"Channel set up to {channel}", ephemeral=True)
@set_bot_channel.autocomplete('channel')
async def _list_text_channels(self,
inter: disnake.ApplicationCommandInteraction,
current: str) -> List[OptionChoice]:
_list = [r.name for r in inter.guild.text_channels]
return [
OptionChoice(name=choice, value=choice)
for choice in _list if current in choice
]
def setup(bot): # an extension must have a setup function
bot.add_cog(Fun(bot)) # adding a cog

View File

@@ -1,29 +1,29 @@
import disnake
from disnake import Option
from disnake.ext import commands
from integral_lib import YandexPlayer
from integral_lib.Logger import logger
class Testing(commands.Cog, name='Testing'):
def __init__(self, bot: commands.Bot):
self.bot = bot # defining src as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(name='play', description='play audio test from yandex.music', options=[
Option(name='search',
description='seach track/artist',
required=True),
])
async def play(self, inter: disnake.ApplicationCommandInteraction, search: str):
# TODO add yandex_music player with queue, playlists
result = YandexPlayer.search(search)
await inter.response.send_message(result, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(Testing(bot)) # adding a cog
import disnake
from disnake import Option
from disnake.ext import commands
from loguru import logger
from lib import YandexPlayer
class Testing(commands.Cog, name='Testing'):
def __init__(self, bot: commands.Bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(name='play', description='play audio test from yandex.music', options=[
Option(name='search',
description='seach track/artist',
required=True),
])
async def play(self, inter: disnake.ApplicationCommandInteraction, search: str):
# TODO add yandex_music player with queue, playlists
result = YandexPlayer.search(search)
await inter.response.send_message(result, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(Testing(bot)) # adding a cog

View File

@@ -1,59 +1,60 @@
import disnake
from disnake import Option, OptionType, Colour
from disnake.ext import commands
from integral_lib.Logger import logger
class General(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot # defining src as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(
name="info",
options=[
Option("user",
"Specify any user",
OptionType.user),
]
)
async def info(self, inter: disnake.ApplicationCommandInteraction, user=None):
"""
Shows user info {{SLASH_INFO}}
Parameters
----------
:param inter:
:param user:
:return:
"""
user = user or inter.author
rolelist = [r.mention for r in user.roles if r != inter.guild.default_role]
if rolelist:
roles = "\n".join(rolelist)
else:
roles = "Not added any role"
emb = disnake.Embed(
title="General information",
description=f"General information on server about {user}",
color=Colour.random()
)
emb.set_thumbnail(url=user.display_avatar)
emb.add_field(name="General info",
value=f"Username: {user}\n"
f"Nickname: {user.nick}\n"
f"Joined at: {user.joined_at.strftime('%A, %B %d %Y @ %H:%M:%S')}", inline=False)
emb.add_field(name="Roles list", value=f"{roles}")
emb.set_footer(text=f"Information requested by: {inter.author.display_name}")
await inter.response.send_message(embed=emb, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(General(bot)) # adding a cog
import disnake
from disnake import Option, OptionType, Colour
from disnake.ext import commands
# from lib.Logger import logger
from loguru import logger
class General(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(
name="info",
options=[
Option("user",
"Specify any user",
OptionType.user),
]
)
async def info(self, inter: disnake.ApplicationCommandInteraction, user=None):
"""
Shows user info {{SLASH_INFO}}
Parameters
----------
:param inter:
:param user:
:return:
"""
user = user or inter.author
rolelist = [r.mention for r in user.roles if r != inter.guild.default_role]
if rolelist:
roles = "\n".join(rolelist)
else:
roles = "Not added any role"
emb = disnake.Embed(
title="General information",
description=f"General information on server about {user}",
color=Colour.random()
)
emb.set_thumbnail(url=user.display_avatar)
emb.add_field(name="General info",
value=f"Username: {user}\n"
f"Nickname: {user.nick}\n"
f"Joined at: {user.joined_at.strftime('%A, %B %d %Y @ %H:%M:%S')}", inline=False)
emb.add_field(name="Roles list", value=f"{roles}")
emb.set_footer(text=f"Information requested by: {inter.author.display_name}")
await inter.response.send_message(embed=emb, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(General(bot)) # adding a cog

View File

@@ -1,44 +1,45 @@
import os
import disnake
import psutil
from disnake import ApplicationCommandInteraction
from disnake.ext import commands
from __init__ import version_info as ver
from integral_lib.Comands import determine_prefix
from integral_lib.Logger import logger
class BotInfo(commands.Cog, name='Bot Info'):
def __init__(self, bot: commands.Bot):
self.bot = bot # defining src as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(name="info_bot",
description='Shows general info about src') # this is for making a command
async def info_bot(self, inter: ApplicationCommandInteraction):
_pid = os.getpid()
_process = psutil.Process(_pid)
emb = disnake.Embed(
title="General information",
description="General information on about src",
)
emb.set_thumbnail(self.bot.user.avatar.url)
emb.add_field(name="System info:", value=f"Memory Usage: {round(_process.memory_info().rss / 2 ** 20, 2)} Mb\n"
f"CPU Usage: {_process.cpu_percent()}%\n"
f'Bot ping: {round(self.bot.latency * 1000)}\n'
f'Prefix: `{determine_prefix(self.bot, inter)}\n`'
)
emb.add_field(name="Bot info:", value="Bot owner: <@386629192743256065>\n"
f"Bot version: {ver.major}.{ver.minor}.{ver.micro}-{ver.releaselevel}")
emb.set_footer(text=f"Information requested by: {inter.author.display_name}")
await inter.response.send_message(embed=emb, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(BotInfo(bot)) # adding a cog
import os
import disnake
import psutil
from disnake.ext import commands
from loguru import logger
from __init__ import version_info as ver
from lib.Comands import determine_prefix
# from lib.Logger import logger
class BotInfo(commands.Cog, name='Bot Info'):
def __init__(self, bot: commands.Bot):
self.bot = bot # defining bot as global var in class
@commands.Cog.listener() # this is a decorator for events/listeners
async def on_ready(self):
logger.info(f'Cog {__name__.split(".")[1]} is ready!.')
@commands.slash_command(name="info_bot",
description='Shows general info about bot') # this is for making a command
async def info_bot(self, inter: disnake.ApplicationCommandInteraction):
_pid = os.getpid()
_process = psutil.Process(_pid)
emb = disnake.Embed(
title="General information",
description="General information on about bot",
)
emb.set_thumbnail(self.bot.user.avatar.url)
emb.add_field(name="System info:", value=f"Memory Usage: {round(_process.memory_info().rss / 2 ** 20, 2)} Mb\n"
f"CPU Usage: {_process.cpu_percent()}%\n"
f'Bot ping: {round(self.bot.latency * 1000)}\n'
f'Prefix: `{determine_prefix(self.bot, inter)}\n`'
)
emb.add_field(name="Bot info:", value="Bot owner: <@386629192743256065>\n"
f"Bot version: {ver.major}.{ver.minor}.{ver.micro}-{ver.releaselevel}")
emb.set_footer(text=f"Information requested by: {inter.author.display_name}")
await inter.response.send_message(embed=emb, ephemeral=True)
def setup(bot): # an extension must have a setup function
bot.add_cog(BotInfo(bot)) # adding a cog

View File

45
lib/CogsPrep.py Normal file
View File

@@ -0,0 +1,45 @@
"""
lib.CogsPrepare
~~~~~~~~~~~~~
Loads, unloads Cogs files
cog_list: return list of cog filenames
work_with_cogs: loads, reloads and unloads cogs files
"""
from os import listdir, rename
from typing import List
from disnake.ext import commands
# from .Logger import logger
from loguru import logger
async def cog_list(fold: str = './cogs') -> List[str]:
cogs_list = []
for _filename in listdir(fold):
if _filename.endswith('.py'):
cogs_list.append(_filename[:-3])
return cogs_list
async def work_with_cogs(what_do, bot: commands.Bot, cog):
if isinstance(cog, str):
cog = cog.split()
for _filename in cog:
if what_do == "load":
bot.load_extension(f'cogs.{_filename}')
logger.info(f'Loaded cog {_filename}')
elif what_do == 'unload':
bot.unload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} unloaded')
elif what_do == 'reload':
bot.reload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} reloaded')
elif what_do == 'disable':
bot.unload_extension(f'cogs.{_filename}')
rename(f'cogs/{_filename}.py', f'cogs/disabled/{_filename}.py')
logger.info(f'Cog {_filename} stopped and disabled')
elif what_do == 'enable':
rename(f'cogs/disabled/{_filename}.py', f'cogs/{_filename}.py')
bot.load_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} started and enabled')

View File

@@ -1,86 +1,73 @@
"""
integral_lib.Commands
~~~~~~~~~~~~~~
Some prepare for commands
"""
from json import load, decoder, dump, JSONDecodeError
from os import getenv
from disnake.ext import commands
async def read_json(guild: int, _param: str):
"""
Reads Json file to determite config strings
:param guild: ID of Guild
:param _param: Parameter in json file
:return: value of parameter.
"""
parameter = None
with open(getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON
try:
_json = load(f) # Load the custom prefixes
except decoder.JSONDecodeError:
_json = {}
if guild: # If the guild exists
try:
guild_conf = _json[f"{guild}"]
try:
parameter = guild_conf[f"{_param}"]
except KeyError:
pass
except KeyError:
pass
return parameter
async def write_json(guild: int, param_name: str, param: str or int):
"""
A function to write JSON data to a file after updating or adding a parameter value.
Parameters:
- guild: an integer representing the guild ID
- param_name: a string representing the parameter name
- param: a string or integer representing the parameter value
Returns:
This function does not return anything.
"""
with open(getenv('CONF_FILE'), encoding='utf-8') as f:
try:
_json = load(f)
except decoder.JSONDecodeError:
_json = {}
try:
_guild = _json[f'{guild}']
except KeyError:
_json.update({f'{guild}': {}})
_guild = _json[f'{guild}']
_guild.update({f'{param_name}': f'{param}'})
with open(getenv('CONF_FILE'), 'w', encoding='utf-8') as f:
dump(_json, f, indent=4)
def determine_prefix(bot: commands.Bot, msg):
"""
Determine the per-server src prefix based on the given message object.
:param bot: Disnake Bot object
:param msg: Disnake msg object
:return: prefix for server, default is $
"""
parameter = '$'
with open(getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON
try:
_json = load(f) # Load the custom prefixes
except JSONDecodeError:
_json = {}
try:
parameter = _json[f"{msg.guild.id}"]["prefix"] # Read prefix from json if is setted up
except KeyError:
pass
return parameter
"""
lib.Commands
~~~~~~~~~~~~~~
Some prepare for commands
"""
from json import load, decoder, dump, JSONDecodeError
from os import getenv
from disnake.ext import commands
def read_json(guild: int, _param: str):
"""
Reads Json file to determite config strings
:param guild: ID of Guild
:param _param: Parameter in json file
:return: value of parameter.
"""
parameter = None
with open(getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON
try:
_json = load(f) # Load the custom prefixes
except decoder.JSONDecodeError:
_json = {}
if guild: # If the guild exists
try:
guild_conf = _json[f"{guild}"]
try:
parameter = guild_conf[f"{_param}"]
except KeyError:
pass
except KeyError:
pass
return parameter
async def write_json(guild: int, param_name: str, param: str or int):
print(type(param))
with open(getenv('CONF_FILE'), encoding='utf-8') as f:
try:
_json = load(f)
except decoder.JSONDecodeError:
_json = {}
try:
_guild = _json[f'{guild}']
except KeyError:
_json.update({f'{guild}': {}})
_guild = _json[f'{guild}']
_guild.update({f'{param_name}': param})
_json.update({f'{guild}': _guild})
with open(getenv('CONF_FILE'), 'w', encoding='utf-8') as f:
dump(_json, f, indent=4)
def determine_prefix(bot: commands.Bot, msg):
"""
Determite per-server bot prefix
:param bot: Disnake Bot object
:param msg: Disnake msg object
:return: prefix for server, default is $
"""
parameter = '$'
with open(getenv('CONF_FILE'), encoding='utf-8') as f: # Open the JSON
try:
_json = load(f) # Load the custom prefixes
except JSONDecodeError:
_json = {}
try:
parameter = _json[f"{msg.guild.id}"]["prefix"] # Read prefix from json if is setted up
except KeyError:
pass
return parameter

View File

@@ -1,134 +1,136 @@
from sqlite3 import connect, Error
from .Logger import logger
class DBReader:
def __init__(self, guildid: int = None):
self._guildid = guildid
self.list = self._read_db(self._guildid)
self._current_index = 0
def __str__(self) -> str:
return str(self._guildid)
@classmethod
def _read_db(cls, guildid: int) -> list:
try:
sql_con = connect("user.db")
cursor = sql_con.cursor()
cursor.execute(f"""SELECT * FROM "{guildid}" """)
record = cursor.fetchall()
return record
except Error as _e:
logger.info(f'Error reading DB\n{_e}')
def __iter__(self):
return _ListGenerationIter(self)
class _DBAttrs:
def __init__(self,
userid: int,
username: str,
nick: str,
isbot: bool,
defaulttracks: None or list,
usertracks: None or list):
self.userid = userid
self.username = username
self.nick = nick
self.isbot = isbot
self.defaulttracks = defaulttracks
self.usertracks = usertracks
def __str__(self):
return self.username
def __repr__(self):
return f'<File attrs userid={self.userid} username={self.username} nick={self.nick} ' \
f'isbot={self.isbot} defaulttracks={self.defaulttracks} usertracks={self.usertracks}>'
class _ListGenerationIter:
def __init__(self, user_class):
self._current_index = 0
self._list = user_class.list
self._size = len(self._list)
def __iter__(self):
return self
def __next__(self):
if self._current_index < self._size:
_userid = self._list[self._current_index][0]
_username = self._list[self._current_index][1]
_nick = self._list[self._current_index][2]
_isbot = bool(self._list[self._current_index][3])
_defaulttracks = self._list[self._current_index][4]
_usertracks = self._list[self._current_index][5]
self._current_index += 1
memb = _DBAttrs(_userid,
_username,
_nick,
_isbot,
_defaulttracks,
_usertracks)
return memb
raise StopIteration
async def prepare_db(guild: int):
try:
_connect = connect('user.db')
cursor = _connect.cursor()
cursor.execute(f'CREATE TABLE IF NOT EXISTS "{guild}"\n'
f' ([userid] INTEGER PRIMARY KEY, [username] TEXT,\n'
f' [nick] TEXT, [isbot] BOOL, [defaulttracks] TEXT, [usertracks] TEXT)')
cursor.close()
except Error as _error:
logger.info(_error)
async def work_with_db(db_func: str, data_turple: tuple):
"""
Writing to db per server userinfo
:param db_func:
:param data_turple:
"""
try:
_connect = connect('user.db')
cursor = _connect.cursor()
cursor.execute(db_func, data_turple)
_connect.commit()
cursor.close()
except Error as _error:
logger.critical(_error)
async def fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int):
sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO '{guild}'
(username, userid, nick, isbot)
VALUES (?, ?, ?, ?)""")
data_tuple: tuple[str, int, str, bool] = (name, userid, nick, isbot)
await work_with_db(sqlite_insert_with_param, data_tuple)
async def read_db(guild: int, user: int, column: str):
_col_dict = {'userid': 0,
'username': 1,
'nick': 2,
'isbot': 3,
'defaulttracks': 4,
'usertracks': 5}
try:
sql_con = connect("user.db")
cursor = sql_con.cursor()
cursor.execute(f"""SELECT * FROM "{guild}" where userid = {user}""")
record = cursor.fetchone()
return record[_col_dict[column]]
except Error as _error:
logger.critical(_error)
from sqlite3 import connect, Error
# from lib.Logger import logger
from loguru import logger
class DBReader:
def __init__(self, guildid: int = None):
self._guildid = guildid
self.list = self._read_db(self._guildid)
self._current_index = 0
def __str__(self) -> str:
return str(self._guildid)
@classmethod
def _read_db(cls, guildid: int) -> list:
try:
sql_con = connect("user.db")
cursor = sql_con.cursor()
cursor.execute("""SELECT * FROM "(guildid)" """,
{'guildid': guildid})
record = cursor.fetchall()
return record
except Error as _e:
logger.info(f'Error reading DB\n{_e}')
def __iter__(self):
return _ListGenerationIter(self)
class _DBAttrs:
def __init__(self,
userid: int,
username: str,
nick: str,
isbot: bool,
defaulttracks: None or list,
usertracks: None or list):
self.userid = userid
self.username = username
self.nick = nick
self.isbot = isbot
self.defaulttracks = defaulttracks
self.usertracks = usertracks
def __str__(self):
return self.username
def __repr__(self):
return f'<File attrs userid={self.userid} username={self.username} nick={self.nick} ' \
f'isbot={self.isbot} defaulttracks={self.defaulttracks} usertracks={self.usertracks}>'
class _ListGenerationIter:
def __init__(self, user_class):
self._current_index = 0
self._list = user_class.list
self._size = len(self._list)
def __iter__(self):
return self
def __next__(self):
if self._current_index < self._size:
_userid = self._list[self._current_index][0]
_username = self._list[self._current_index][1]
_nick = self._list[self._current_index][2]
_isbot = bool(self._list[self._current_index][3])
_defaulttracks = self._list[self._current_index][4]
_usertracks = self._list[self._current_index][5]
self._current_index += 1
memb = _DBAttrs(_userid,
_username,
_nick,
_isbot,
_defaulttracks,
_usertracks)
return memb
raise StopIteration
async def prepare_db(guild: int):
try:
_connect = connect('user.db')
cursor = _connect.cursor()
cursor.execute(f'''CREATE TABLE IF NOT EXISTS "{guild}"
([userid] INTEGER PRIMARY KEY, [username] TEXT,
[nick] TEXT, [isbot] BOOL, [defaulttracks] TEXT, [usertracks] TEXT)''')
cursor.close()
except Error as _error:
logger.info(_error)
async def work_with_db(db_func: str, data_turple: tuple):
"""
Writing to db per server userinfo
:param db_func:
:param data_turple:
"""
try:
_connect = connect('user.db')
cursor = _connect.cursor()
cursor.execute(db_func, data_turple)
_connect.commit()
cursor.close()
except Error as _error:
logger.critical(_error)
async def fill_bd(name: str, userid: int, isbot: bool, nick: str, guild: int):
sqlite_insert_with_param = (f"""INSERT OR IGNORE INTO "{guild}"
(username, userid, nick, isbot)
VALUES (?, ?, ?, ?)""")
data_tuple: tuple[str, int, str, bool] = (name, userid, nick, isbot)
await work_with_db(sqlite_insert_with_param, data_tuple)
async def read_db(guild: int, user: int, column: str):
_col_dict = {'userid': 0,
'username': 1,
'nick': 2,
'isbot': 3,
'defaulttracks': 4,
'usertracks': 5}
try:
sql_con = connect("user.db")
cursor = sql_con.cursor()
cursor.execute(f"""SELECT * FROM "{guild}" where userid = {user}""")
record = cursor.fetchone()
return record[_col_dict[column]]
except Error as _error:
logger.critical(_error)

View File

@@ -1,86 +1,86 @@
from mimetypes import guess_type
from os import walk
class ListGenerator:
def __init__(self, path: str = None):
self.path = path
self.list: list = self._lister(self.path)
try:
self._size = len(self.list)
except TypeError:
pass
self._current_index = 0
def __str__(self) -> str:
return 'Audio iter generator'
@classmethod
def _lister(cls, path) -> list:
_list: list = []
try:
for f in walk(path):
_list.extend(f)
break
return sorted(_list[2])
except TypeError:
pass
def __iter__(self):
return _ListGenerationIter(self)
def __next__(self):
if self._current_index < self._size:
self._current_index += 1
class _FileAttrs:
def __init__(self, name, path, mimetype, exc):
self.name = name
self.path = path
self.mimetype = mimetype
self.exc = exc
def __str__(self):
return self.name
def __repr__(self):
return f'<File attrs name={self.name} path={self.path} exc={self.exc} type=<{self.mimetype}> >'
class _ListGenerationIter:
def __init__(self, list_class):
self._current_index = 0
self._list = list_class.list
self._name = self._list[self._current_index]
self._path = list_class.path
self._size = len(self._list)
def __iter__(self):
return self
@property
def type(self) -> str:
guess = guess_type(f'{self._path}/{self._list[self._current_index]}')[0]
return guess
@property
def _exc(self) -> str:
try:
_ret = self._list[self._current_index].split('.')[-1]
except AttributeError:
_ret = None
return _ret
def __next__(self):
if self._current_index < self._size:
_name = self._list[self._current_index]
_path = self._path
_type = self.type
_exc = self._exc
self._current_index += 1
_memb = _FileAttrs(_name, _path, _type, _exc)
return _memb
raise StopIteration
from mimetypes import guess_type
from os import walk
class ListGenerator:
def __init__(self, path: str = None):
self.path = path
self.list: list = self._lister(self.path)
try:
self._size = len(self.list)
except TypeError:
pass
self._current_index = 0
def __str__(self) -> str:
return 'Audio iter generator'
@classmethod
def _lister(cls, path) -> list:
_list: list = []
try:
for f in walk(path):
_list.extend(f)
break
return sorted(_list[2])
except TypeError:
pass
def __iter__(self):
return _ListGenerationIter(self)
def __next__(self):
if self._current_index < self._size:
self._current_index += 1
class _FileAttrs:
def __init__(self, name, path, mimetype, exc):
self.name = name
self.path = path
self.mimetype = mimetype
self.exc = exc
def __str__(self):
return self.name
def __repr__(self):
return f'<File attrs name={self.name} path={self.path} exc={self.exc} type=<{self.mimetype}> >'
class _ListGenerationIter:
def __init__(self, list_class):
self._current_index = 0
self._list = list_class.list
self._name = self._list[self._current_index]
self._path = list_class.path
self._size = len(self._list)
def __iter__(self):
return self
@property
def type(self) -> str:
guess = guess_type(f'{self._path}/{self._list[self._current_index]}')[0]
return guess
@property
def _exc(self) -> str:
try:
_ret = self._list[self._current_index].split('.')[-1]
except AttributeError:
_ret = None
return _ret
def __next__(self):
if self._current_index < self._size:
_name = self._list[self._current_index]
_path = self._path
_type = self.type
_exc = self._exc
self._current_index += 1
_memb = _FileAttrs(_name, _path, _type, _exc)
return _memb
raise StopIteration

View File

@@ -1,17 +1,20 @@
from asyncio import sleep
from disnake import FFmpegPCMAudio
from .Logger import logger
async def play_audio(audio, bot, vc):
if not bot.voice_clients:
logger.error(f'Playing: {audio}')
vp = await vc.connect()
if not vp.is_playing():
vp.play(FFmpegPCMAudio(f'{audio}', ))
while vp.is_playing():
await sleep(0.5)
await sleep(1)
await vp.disconnect()
from asyncio import sleep
from disnake import FFmpegOpusAudio
from loguru import logger
# from .Logger import logger
@logger.catch
async def play_audio(audio, bot, vc):
if not bot.voice_clients:
logger.error(f'Playing: {audio}')
vp = await vc.connect()
if not vp.is_playing():
vp.play(FFmpegOpusAudio(f'{audio}', executable='ffmpeg', options='-nostats -loglevel 0'))
while vp.is_playing():
await sleep(0.5)
await sleep(1)
await vp.disconnect()

5
lib/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""
lib
~~~~~~~~~~~~~
Some libs for the bot which help him
"""

6
qodana.yaml Normal file
View File

@@ -0,0 +1,6 @@
version: "1.0"
bootstrap: |
rm -rf .idea
pip install -U -r requirements.txt
profile:
name: qodana.recommended

Binary file not shown.

View File

@@ -1,78 +0,0 @@
"""
integral_lib.CogsPrepare
~~~~~~~~~~~~~
Loads, unloads Cogs files
cog_list: return list of cog filenames
work_with_cogs: loads, reloads and unloads cogs files
"""
from os import listdir, rename
from typing import List
from disnake.ext.commands.interaction_bot_base import InteractionBotBase
from .Logger import logger
def cog_list(fold: str = './cogs') -> List[str]:
"""
A function that generates a list of cog names based on the files present in a specified folder.
Parameters:
- fold (str): The directory path where the cog files are located. Defaults to './cogs'.
Returns:
- List[str]: A list of cog names without the '.py' extension.
"""
cogs_list = []
for _filename in listdir(fold):
if _filename.endswith('.py'):
cogs_list.append(_filename[:-3])
return cogs_list
def work_with_cogs(what_do: str,
bot: InteractionBotBase,
cog: str | list):
"""
Perform the specified action on the given cog or list of cogs.
Args:
what_do (str): The action to perform (load, unload, reload, disable, enable).
src (InteractionBotBase): The src instance to work with.
cog (str | list): The name of the cog or a list of cogs to work with.
Raises:
ValueError: If the action is not recognized.
Returns:
None
--------
:param cog: str | list
:param bot: InteractionBotBase
:param what_do: str = ['load', 'unload', 'reload', 'disable', 'enable']
"""
if isinstance(cog, str):
cog = cog.split()
for _filename in cog:
if _filename.endswith('.py'):
_filename = _filename.split('.')[0]
if what_do == "load":
bot.load_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} loaded')
elif what_do == 'unload':
bot.unload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} unloaded')
elif what_do == 'reload':
bot.reload_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} reloaded')
elif what_do == 'disable':
bot.unload_extension(f'cogs.{_filename}')
rename(f'cogs/{_filename}.py',
f'cogs/disabled/{_filename}.py')
logger.info(f'Cog {_filename} stopped and disabled')
elif what_do == 'enable':
rename(f'cogs/disabled/{_filename}.py',
f'cogs/{_filename}.py')
bot.load_extension(f'cogs.{_filename}')
logger.info(f'Cog {_filename} started and enabled')
else:
raise ValueError(f"Unrecognized action: {what_do}")

View File

@@ -1,52 +0,0 @@
import logging
class _CustomFormatter(logging.Formatter):
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
fmt = "%(asctime)s - [%(levelname)s] -%(module)s- %(message)s"
FORMATS = {
logging.DEBUG: grey + fmt + reset,
logging.INFO: grey + fmt + reset,
logging.WARNING: yellow + fmt + reset,
logging.ERROR: red + fmt + reset,
logging.CRITICAL: bold_red + fmt + reset
}
old_factory = logging.getLogRecordFactory()
def _record_factory(*args, **kwargs):
record = _CustomFormatter.old_factory(*args, **kwargs)
_module = record.module
_levelname = record.levelname
if len(_module) % 2 == 0 and len(_module) < 12:
_module = ' ' * ((12 - len(_module)) // 2) + _module + ' ' * ((12 - len(_module)) // 2)
elif len(_module) % 2 == 1 and len(_module) < 12:
_module = ' ' * ((12 - len(_module)) // 2) + record.module + ' ' * ((12 - len(_module)) // 2 + 1)
if len(_levelname) % 2 == 0 and len(_levelname) < 8:
_levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2)
elif len(record.levelname) % 2 == 1 and len(record.module) < 8:
_levelname = ' ' * ((8 - len(_levelname)) // 2) + _levelname + ' ' * ((8 - len(_levelname)) // 2 + 1)
record.module = _module
record.levelname = _levelname
return record
logging.setLogRecordFactory(_record_factory)
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno)
formatter = logging.Formatter(log_fmt, "%d-%m-%Y %H:%M:%S")
return formatter.format(record)
logger = logging.getLogger('Pisya_Bot')
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(_CustomFormatter())
logger.addHandler(ch)

View File

@@ -1,5 +0,0 @@
"""
integral_lib
~~~~~~~~~~~~~
Some libs for the src which help him
"""

Binary file not shown.

View File

@@ -1,41 +0,0 @@
from disnake import Intents
from disnake.ext import commands
intents = Intents(messages=True,
message_content=True,
)
class Cog(commands.Cog):
@commands.command()
async def my_cmd(self, ctx):
pass
class CogTwo(commands.Cog):
async def bot_check(self, ctx) -> bool:
pass
class CogThree(commands.Cog):
async def bot_check_once(self, ctx) -> bool:
pass
class CogFour(commands.Cog):
async def bot_slash_command_check(self, ctx) -> bool:
pass
class CogFive(commands.Cog):
async def can_run(self, ctx) -> bool:
pass
def test_bot():
bot = commands.Bot(command_prefix='$', intents=intents)
bot.add_cog(Cog())
bot.add_cog(CogTwo())
bot.add_cog(CogThree())
bot.add_cog(CogFour())
bot.add_cog(CogFive())

View File

@@ -1,31 +0,0 @@
import sys
import pytest
from disnake.ext.commands.common_bot_base import CommonBotBase
from mock import mock
sys.path.append('src')
from integral_lib import CogsPrep
def test_cog_list():
with mock.patch('integral_lib.CogsPrep.listdir') as MockClass:
MockClass.return_value = ['cog1.py', 'cog2.py']
result = CogsPrep.cog_list()
assert result == ['cog1', 'cog2']
@pytest.mark.parametrize("cog", ["cog1.py", "cog2"])
@pytest.mark.parametrize("what_do", ['load', 'unload', 'reload', 'disable', 'enable'])
def test_work_with_cogs(what_do, cog):
with mock.patch('integral_lib.CogsPrep.rename') as mock_rename:
mock_rename.return_value = None
mock_bot = mock.MagicMock(spec=CommonBotBase)
result = CogsPrep.work_with_cogs(what_do, mock_bot, cog)
if what_do in ['load', 'enable']:
assert mock_bot.load_extension.called
elif what_do in ['unload', 'disable']:
assert mock_bot.unload_extension.called
elif what_do == 'reload':
assert mock_bot.reload_extension.called