Created
January 18, 2022 00:10
-
-
Save Kanin/0e957d3a5530c359dd60026b4ca1105c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import re | |
from datetime import datetime, timedelta | |
import discord | |
from discord.ext import commands | |
from bot import Bot | |
from utils import checks | |
from utils.ctx import Context | |
humanized = dict() | |
class ShopItem(commands.Converter): | |
async def convert(self, ctx: Context, argument): | |
match = re.match("([0-9]{15,20})$", argument) or re.match(r"<@&([0-9]+)>$", argument) | |
if match: | |
result = ctx.guild.get_role(int(match.group(1))) | |
else: | |
result = discord.utils.get(ctx.guild._roles.values(), name=argument) | |
if result: | |
return result | |
argument = argument.lower() | |
shop_items = await ctx.pool.fetch("SELECT role_id FROM points_shop") | |
if not humanized or len(humanized) != len(shop_items): | |
shop_items = [ctx.guild.get_role(item["role_id"]) for item in shop_items] | |
for role in shop_items: | |
role_name = role.name.lower() | |
matches = re.findall(r"[^a-z0-9]", role_name) | |
for match in set(matches): | |
role_name = role_name.replace(match, match + "?") | |
humanized[role_name] = role | |
for role_name_humanized, role in humanized.items(): | |
if re.match(role_name_humanized, argument): | |
return role | |
raise commands.CommandInvokeError(f"Shop item {argument} doesn't exist!") | |
class Points(commands.Cog): | |
def __init__(self, bot): | |
self.bot: Bot = bot | |
self.tax = 25 | |
async def log(self, message): | |
channel = self.bot.get_channel(867327243020926987) | |
await channel.send(message, allowed_mentions=discord.AllowedMentions.none()) | |
# | |
# POINTS | |
# | |
@commands.group(invoke_without_command=True, case_insensitive=True) | |
@commands.guild_only() | |
async def points(self, ctx: Context, user: discord.User = None): | |
user = user or ctx.author | |
user_data = await self.bot.pool.fetchrow("SELECT * FROM points WHERE user_id=$1", user.id) | |
if not user_data: | |
return await ctx.reply(f"{user} does not have any points!") | |
await ctx.reply(f"{user} currently has {user_data['count']} points!") | |
@points.command(name="add", aliases=["give"]) | |
@checks.admin() | |
async def points_add(self, ctx: Context, user: discord.User, points: int): | |
user_data = await self.bot.pool.fetchrow("SELECT * FROM points WHERE user_id=$1", user.id) | |
if not user_data: | |
await self.bot.pool.execute( | |
"INSERT INTO points (count, last_added, user_id) VALUES ($1, $2, $3)", | |
points, | |
datetime.utcnow(), | |
user.id | |
) | |
await ctx.reply(f"Added {points} points to {user}!") | |
return await self.log(f"{ctx.author.mention} added {points} points to {user.mention}") | |
await self.bot.pool.execute("UPDATE points SET count=count + $1 WHERE user_id=$2", points, user.id) | |
await ctx.reply(f"Added {points} points to {user}!") | |
await self.log(f"{ctx.author.mention} added {points} points to {user.mention}") | |
@points.command(name="remove", aliases=["take"]) | |
@checks.admin() | |
async def points_remove(self, ctx: Context, user: discord.User, points: int): | |
user_data = await self.bot.pool.fetchrow("SELECT * FROM points WHERE user_id=$1", user.id) | |
if not user_data: | |
return await ctx.reply(f"{user} doesn't have any points to remove..?") | |
await self.bot.pool.execute("UPDATE points SET count=count - $1 WHERE user_id=$2", points, user.id) | |
await ctx.reply(f"Removed {points} points from {user}!") | |
await self.log(f"{ctx.author.mention} removed {points} points from {user.mention}") | |
# | |
# SHOP | |
# | |
@commands.group(invoke_without_command=True, case_insensitive=True) | |
@commands.guild_only() | |
async def shop(self, ctx: Context, page: int = 1): | |
items = await self.bot.pool.fetch("SELECT * FROM points_shop WHERE active=true ORDER BY position DESC") | |
if not items: | |
return await ctx.send_error("There's nothing in the shop!") | |
pages, leftover = divmod(len(items), 24) | |
if leftover: | |
pages += 1 | |
base = (page - 1) * 24 | |
items = items[base:base + 24] | |
em = discord.Embed(color=discord.Color.random()).set_author(name="Role shop!") | |
em.set_thumbnail(url=ctx.guild.icon.url) | |
for item in items: | |
role: discord.Role = ctx.guild.get_role(item["role_id"]) | |
desc = f"{role.mention}\n" | |
desc += f"**Price:** {item['price']}" | |
if item["stock"]: | |
desc += f"\n**Stock:** {item['stock_left']}/{item['stock']}" | |
em.add_field(name="", value=desc) | |
if not len(em.fields): | |
return await ctx.send_error("There's nothing in the shop!") | |
em.description = f"Buy a role with `{ctx.prefix}shop buy`\n" \ | |
f"Gift a role with `{ctx.prefix}shop gift`" | |
em.set_footer(text=f"Page: {page}/{pages}") | |
await ctx.reply(embed=em) | |
@shop.command(name="add") | |
@checks.admin() | |
async def shop_add(self, ctx: Context, role: discord.Role, price: int, stock: int = 0): | |
check = await self.bot.pool.fetchrow("SELECT * FROM points_shop WHERE role_id=$1", role.id) | |
if check: | |
return await ctx.send_error(f"{role.mention} is already in the shop!") | |
if not stock: | |
await self.bot.pool.execute( | |
"INSERT INTO points_shop (price, role_id, position) VALUES ($1, $2, $3)", | |
price, | |
role.id, | |
role.position | |
) | |
await ctx.reply(f"Added {role.name} to the shop for {price} points!") | |
return await self.log(f"{ctx.author.mention} added {role.mention} to the shop for {price} points") | |
await self.bot.pool.execute( | |
"INSERT INTO points_shop (price, role_id, stock, stock_left, position) VALUES ($1, $2, $3, $3, $4)", | |
price, | |
role.id, | |
stock, | |
role.position | |
) | |
await ctx.reply(f"Added {role.name} to the shop for {price} points with a stock of {stock}!") | |
await self.log( | |
f"{ctx.author.mention} added {role.mention} to the shop for {price} points with a stock of {stock}" | |
) | |
@shop.command(name="addstock") | |
@checks.admin() | |
async def shop_addstock(self, ctx: Context, role: ShopItem, amount: int): | |
shop_item = await self.bot.pool.fetchrow("SELECT * FROM points_shop WHERE role_id=$1", role.id) | |
if not shop_item: | |
return await ctx.send_error(f"{role.mention} is not in the shop!") | |
if not shop_item["stock"]: | |
return await ctx.reply(f"{role.name} is an unlimited item, you cannot add stock!") | |
await self.bot.pool.execute( | |
"UPDATE points_shop SET stock=stock + $1, stock_left=stock_left + $1 WHERE role_id=$2", | |
amount, | |
role.id | |
) | |
await ctx.reply( | |
f"{role.name} now has {shop_item['stock_left'] + amount}/{shop_item['stock'] + amount} stock available." | |
) | |
await self.log(f"{ctx.author.mention} added {amount} stock to {role.mention}" | |
f" ({shop_item['stock_left'] + amount}/{shop_item['stock'] + amount})") | |
@shop.command(name="buy") | |
async def shop_buy(self, ctx: Context, *, role: ShopItem): | |
shop_item = await self.bot.pool.fetchrow("SELECT * FROM points_shop WHERE role_id=$1", role.id) | |
if not shop_item: | |
return await ctx.send_error(f"{role.mention} is not in the shop!") | |
if not shop_item["active"]: | |
return await ctx.send_error(f"{role.mention} is not currently available!") | |
user_item = await self.bot.pool.fetchrow( | |
"SELECT * FROM points_shop_inventory WHERE role_id=$1 and user_id=$2", | |
role.id, | |
ctx.author.id | |
) | |
if user_item: | |
return await ctx.send_error(f"You already have {role.mention} in your inventory!") | |
if shop_item["stock"] and not shop_item["stock_left"]: | |
return await ctx.send_error(f"{role.mention} is out of stock!") | |
user_data = await self.bot.pool.fetchrow("SELECT * FROM points WHERE user_id=$1", ctx.author.id) | |
if shop_item["price"] > user_data["count"]: | |
return await ctx.send_error( | |
f"You do not have enough points to buy {role.mention}!\n{user_data['count']}/{shop_item['price']}" | |
) | |
async with self.bot.pool.acquire() as con: | |
async with con.transaction(): | |
await con.execute( | |
"UPDATE points SET count=count - $1 WHERE user_id=$2", | |
shop_item["price"], | |
ctx.author.id | |
) | |
await con.execute( | |
"INSERT INTO points_shop_inventory (role_id, user_id) VALUES ($1, $2)", | |
role.id, | |
ctx.author.id | |
) | |
if shop_item["stock"]: | |
await con.execute( | |
"UPDATE points_shop SET stock_left=stock_left - 1 WHERE role_id=$1", | |
role.id | |
) | |
await ctx.reply(f"You now own the {role.name} role, request it via `{ctx.prefix}shop equip {role.name}`!") | |
await self.log(f"{ctx.author.mention} bought {role.mention} from the shop") | |
@shop.command(name="equip") | |
async def shop_equip(self, ctx: Context, *, role: ShopItem): | |
user_item = await self.bot.pool.fetchrow("SELECT * FROM points_shop_inventory WHERE role_id=$1", role.id) | |
if not user_item: | |
return await ctx.send_error("You don't have the item you requested!") | |
if role in ctx.author.roles: | |
return await ctx.send_error(f"You already have the {role.name} role equipped!") | |
await ctx.author.add_roles(role) | |
await ctx.reply(f"I have added the {role.name} role to you!") | |
@staticmethod | |
def get_level_xp(level): | |
return 5 * (level ** 2) + 50 * level + 100 | |
@classmethod | |
def get_level_from_xp(cls, xp, return_xp=False): | |
level = 0 | |
while xp >= cls.get_level_xp(level): | |
xp -= cls.get_level_xp(level) | |
level += 1 | |
if return_xp: | |
return level, xp | |
return level | |
@shop.command(name="gift") | |
async def shop_gift(self, ctx: Context, user: discord.Member, *, role: ShopItem): | |
shop_item = await self.bot.pool.fetchrow("SELECT * FROM points_shop WHERE role_id=$1", role.id) | |
if not shop_item: | |
return await ctx.send_error(f"{role.mention} is not in the shop!") | |
if not shop_item["active"]: | |
return await ctx.send_error("This item is not currently available!") | |
user_item = await self.bot.pool.fetchrow( | |
"SELECT * FROM points_shop_inventory WHERE role_id=$1 and user_id=$2", | |
role.id, | |
user.id | |
) | |
if user_item: | |
return await ctx.send_error(f"{user.mention} already has the {role.mention} role in their inventory!") | |
user_xp = await self.bot.pool.fetchval("SELECT xp FROM levels WHERE user_id=$1", user.id) | |
if not user_xp: | |
return await ctx.send_error( | |
f"{user.mention} doesn't have any xp! They need to be at least level 10 to receive gifts!" | |
) | |
level = self.get_level_from_xp(user_xp) | |
if level < 10: | |
return await ctx.send_error( | |
f"{user.mention} needs to be level 10 to receive gifts, they are only level {level}!" | |
) | |
if shop_item["stock"] and not shop_item["stock_left"]: | |
return await ctx.send_error(f"{role.mention} is out of stock!") | |
author_data = await self.bot.pool.fetchrow("SELECT * FROM points WHERE user_id=$1", ctx.author.id) | |
if shop_item["price"] > author_data["count"]: | |
return await ctx.send_error( | |
f"You do not have enough points to buy {role.mention}!\n{author_data['count']}/{shop_item['price']}" | |
) | |
user_price = int(round((self.tax / 100) * shop_item["price"], 0)) | |
user_data = await self.bot.pool.fetchrow("SELECT * FROM points WHERE user_id=$1", user.id) | |
if user_price > user_data["count"]: | |
return await ctx.send_error( | |
f"{user.mention} does not have enough points to pay tax!\n{user_data['count']}/{user_price}" | |
) | |
em = discord.Embed(color=discord.Color.random()) | |
em.description = f"{ctx.author.mention} is trying to gift you the {role.mention} role, do you accept?" | |
em.add_field(name="Tax:", value=f"{user_price} points") | |
msg = await ctx.send(content=user.mention, embed=em) | |
await msg.add_reaction("✅") | |
await msg.add_reaction("❌") | |
def check(reaction, usr): | |
return usr.id == user.id and str(reaction.emoji) in ["✅", "❌"] and reaction.message.id == msg.id | |
try: | |
r, u = await self.bot.wait_for("reaction_add", timeout=60, check=check) | |
except asyncio.TimeoutError: | |
await msg.clear_reactions() | |
return await ctx.send_error(f"{user.mention} didn't respond in time!") | |
else: | |
if str(r.emoji) == "❌": | |
await msg.clear_reactions() | |
return await ctx.send_error(f"{user.mention} denied the transaction.") | |
await msg.clear_reactions() | |
async with self.bot.pool.acquire() as con: | |
async with con.transaction(): | |
await self.bot.pool.execute( | |
"UPDATE points SET count=count - $1 WHERE user_id=$2", | |
shop_item["price"], | |
ctx.author.id | |
) | |
await self.bot.pool.execute( | |
"UPDATE points SET count=count - $1 WHERE user_id=$2", | |
user_price, | |
user.id | |
) | |
await self.bot.pool.execute( | |
"INSERT INTO points_shop_inventory (role_id, user_id, gifter) VALUES ($1, $2, $3)", | |
role.id, | |
user.id, | |
ctx.author.id | |
) | |
if shop_item["stock"]: | |
await self.bot.pool.execute( | |
"UPDATE points_shop SET stock_left=stock_left - 1 WHERE role_id=$1", | |
role.id | |
) | |
await ctx.reply( | |
f"{user} now owns the {role.name} role, They can request it via `{ctx.prefix}shop equip {role.name}`!" | |
) | |
await self.log(f"{ctx.author.mention} gifted {role.mention} to {user.mention}") | |
@shop.command(name="inventory", aliases=["inv"]) | |
async def shop_inventory(self, ctx: Context, page: int = 1): | |
items = await self.bot.pool.fetch( | |
"SELECT * FROM points_shop_inventory NATURAL JOIN points_shop WHERE user_id=$1 ORDER BY position DESC", | |
ctx.author.id | |
) | |
if not items: | |
return await ctx.send_error("You don't have any roles!") | |
pages, leftover = divmod(len(items), 41) | |
if leftover: | |
pages += 1 | |
base = (page - 1) * 41 | |
items = items[base:base + 41] | |
items = [ctx.guild.get_role(x["role_id"]).mention for x in items] | |
em = discord.Embed(color=discord.Color.random()).set_author(name=f"{ctx.author.name}'s inventory!") | |
em.set_thumbnail(url=ctx.guild.icon.url) | |
em.description = f"`{ctx.prefix}shop equip <role>`\n`{ctx.prefix}shop unequip <role>`\n\n" | |
em.description += ", ".join(items) | |
em.set_footer(text=f"Page: {page}/{pages}") | |
await ctx.reply(embed=em) | |
@shop.command(name="remove") | |
@checks.admin() | |
async def shop_remove(self, ctx: Context, *, role: ShopItem): | |
shop_item = await self.bot.pool.fetchrow("SELECT * FROM points_shop WHERE role_id=$1", role.id) | |
if not shop_item: | |
return await ctx.send_error(f"{role.mention} is not a known shop item!") | |
await self.bot.pool.execute("DELETE FROM points_shop_inventory WHERE role_id=$1", role.id) | |
await self.bot.pool.execute("DELETE FROM points_shop WHERE role_id=$1", role.id) | |
await ctx.reply(f"Removed {role.name} from the shop!") | |
await self.log(f"{ctx.author.mention} removed {role.mention} from the shop") | |
@shop.command(name="test") | |
@checks.admin() | |
async def shop_test(self, ctx: Context, *, role: ShopItem): | |
await ctx.reply(role.mention, allowed_mentions=discord.AllowedMentions.none()) | |
@shop.command(name="toggle") | |
@checks.admin() | |
async def shop_toggle(self, ctx: Context, *, role: ShopItem): | |
shop_item = await self.bot.pool.fetchrow("SELECT * FROM points_shop WHERE role_id=$1", role.id) | |
if not shop_item: | |
return await ctx.send_error(f"{role.mention} is not in the shop!") | |
await self.bot.pool.execute("UPDATE points_shop SET active=not active WHERE role_id=$1", role.id) | |
word = "unavailable" if shop_item["active"] else "available" | |
await ctx.reply(f"{role.name} is now {word} in the shop!") | |
await self.log(f"{ctx.author.mention}: {role.mention} is now {word}") | |
@shop.command(name="unequip") | |
async def shop_unequip(self, ctx: Context, *, role: ShopItem): | |
user_item = await self.bot.pool.fetchrow("SELECT * FROM points_shop_inventory WHERE role_id=$1", role.id) | |
if not user_item: | |
return await ctx.send_error("You don't have the item you requested!") | |
if role not in ctx.author.roles: | |
return await ctx.send_error(f"You don't have the {role.name} role equipped!") | |
await ctx.author.remove_roles(role) | |
await ctx.reply(f"I have removed the {role.name} role from you!") | |
# | |
# EVENTS | |
# | |
async def give_points(self, message: discord.Message): | |
# noinspection PyTypeChecker | |
guild: discord.Guild = message.guild | |
if not guild or guild.id != 694641646780022818 or message.author.bot: | |
return | |
user: discord.Member = message.author | |
member: discord.Role = guild.get_role(694641646780022826) | |
if member not in user.roles: | |
return | |
user_data = await self.bot.pool.fetchrow("SELECT * FROM points WHERE user_id=$1", user.id) | |
if not user_data: | |
await self.bot.pool.execute( | |
"INSERT INTO points (count, last_added, user_id) VALUES ($1, $2, $3)", | |
1, | |
datetime.utcnow(), | |
user.id | |
) | |
return | |
if datetime.utcnow() <= user_data["last_added"] + timedelta(seconds=10): | |
return | |
await self.bot.pool.execute( | |
"UPDATE points SET count=count + 1, last_added=$1 WHERE user_id=$2", | |
datetime.utcnow(), | |
user.id | |
) | |
@commands.Cog.listener() | |
async def on_guild_role_delete(self, role: discord.Role): | |
guild: discord.Guild = role.guild | |
if guild.id != 694641646780022818: | |
return | |
shop_item = await self.bot.pool.fetchrow("SELECT * FROM points_shop WHERE role_id=$1", role.id) | |
if not shop_item: | |
return | |
await self.bot.pool.execute("DELETE FROM points_shop WHERE role_id=$1", role.id) | |
@commands.Cog.listener() | |
async def on_guild_role_update(self, before: discord.Role, after: discord.Role): | |
guild: discord.Guild = before.guild | |
if guild.id != 694641646780022818: | |
return | |
shop_item = await self.bot.pool.fetchrow("SELECT * FROM points_shop WHERE role_id=$1", before.id) | |
if not shop_item: | |
return | |
if before.position != after.position: | |
await self.bot.pool.execute("UPDATE points_shop SET position=$1 WHERE role_id=$2", after.position, after.id) | |
@commands.Cog.listener() | |
async def on_command_completion(self, ctx: Context): | |
await self.give_points(ctx.message) | |
@commands.Cog.listener() | |
async def on_message(self, message: discord.Message): | |
ctx: Context = await self.bot.get_context(message) | |
if ctx.command: | |
return | |
await self.give_points(message) | |
def setup(bot): | |
bot.add_cog(Points(bot)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
spaghetti