Created
February 1, 2024 11:42
-
-
Save YuriFontella/ebe771b7ade392bf3af37e3546c18c0d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import httpx | |
import asyncio | |
from fastapi import FastAPI, Request | |
from tortoise import Tortoise, Model, fields, connections | |
from WPP_Whatsapp import Create | |
from urllib.parse import parse_qs | |
from contextlib import asynccontextmanager | |
URL = 'https://slack.com/api/chat.postMessage' | |
TOKEN = '' | |
CHANNEL = '' | |
creator = Create(session='iphone', browser='firefox') | |
client = creator.start() | |
if creator.state != 'CONNECTED': | |
raise Exception(creator.state) | |
else: | |
print(creator.state) | |
class Contacts(Model): | |
name = fields.CharField(max_length=255) | |
number = fields.IntField(unique=True) | |
async def slack(text, name, image): | |
async with httpx.AsyncClient() as client: | |
response = await client.post( | |
url=URL, | |
data={ | |
'token': TOKEN, | |
'channel': CHANNEL, | |
'text': text, | |
'icon_url': image, | |
'username': name | |
} | |
) | |
if response.status_code == 200: | |
data = response.json() | |
if data.get('ok', False): | |
print(True) | |
else: | |
print(response.json()) | |
async def whatsapp(): | |
async def on_message(message): | |
_sender = message.get('sender') | |
_from = message.get('from') | |
_body = message.get('body') | |
if _from == 'status@broadcast': | |
pass | |
elif not message.get("isGroupMsg") and not _sender.get('isBusiness'): | |
print(_sender) | |
print(_body) | |
if _body: | |
pushname = _sender['pushname'] | |
number = _sender['id']['user'] | |
image = _sender['profilePicThumbObj']['eurl'] | |
await Contacts.update_or_create(defaults={'name': pushname}, number=number) | |
await slack( | |
text=_body, | |
name=pushname, | |
image=image | |
) | |
creator.client.onMessage(on_message) | |
@asynccontextmanager | |
async def lifespan(app): | |
app.state.client = client | |
await Tortoise.init(db_url='sqlite://:memory:', modules={'models': ["main"]}) | |
await Tortoise.generate_schemas() | |
asyncio.create_task(whatsapp()) | |
yield | |
await connections.close_all() | |
app = FastAPI(lifespan=lifespan) | |
@app.post('/whatsapp') | |
async def start(request: Request): | |
body = await request.body() | |
decoded_url = body.decode('utf-8') | |
query_params = parse_qs(decoded_url) | |
if 'text' not in query_params: | |
return 'Envie uma opção' | |
text = query_params['text'][0] | |
result = [item.strip() for item in text.split('|')] | |
option = result[0] | |
if option == 'contacts': | |
contacts = await Contacts.all().values() | |
return { | |
"text": "Lista de contatos:", | |
"attachments": [ | |
{"text": f"ID {contact['id']} - {str(contact['name']).capitalize()}"} for contact in contacts | |
] | |
} | |
elif option == 'message': | |
id = result[1] | |
text= result[2] | |
contact = await Contacts.get_or_none(id=id) | |
if contact: | |
client = request.app.state.client | |
client.sendText(str(contact.number) + '@c.us', result[2]) | |
return True | |
@app.post('/contacts') | |
async def on_post(request: Request): | |
data = await request.json() | |
await Contacts.update_or_create(defaults={'name': data['name']}, number=data['number']) | |
return True | |
@app.get('/contacts') | |
async def on_get(): | |
return await Contacts.all() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment