### TELEGRAM bot #### |
# https://github.com/eternnoir/pyTelegramBotAPI |
import telebot |
from telebot.types import ReplyKeyboardMarkup, KeyboardButton |
bot = telebot.TeleBot(token=SECRETS.TELEGRAM_BOT_TOKEN) |
else: |
raise Exception("Telegram bot token is not set") |
def to_dict(obj): |
try: |
return obj.to_dict() |
except: |
try: |
return obj.to_json() |
except: |
return obj |
def jsonify(kwargs): |
# fix for logging |
return {k: to_dict(v) for k, v in kwargs.items()} |
def sendMessage(chat_id, *args, **kwargs): |
MAGIC.log_transmission( |
"Message to %s@telegram" % chat_id, MAGIC.json.dumps([args, jsonify(kwargs)]) |
) |
bot.send_message(chat_id, *args, **kwargs) |
def setWebhook(*args, **kwargs): |
MAGIC.log_transmission("Telegram->setWebhook", MAGIC.json.dumps([args, kwargs])) |
bot.set_webhook(*args, **kwargs) |
def parse_data(data): |
return telebot.types.Update.de_json(data) |
telegram_export = MAGIC.AttrDict(sendMessage, setWebhook, parse_data) |
### OPEN AI client ### |
from openai import OpenAI |
client = OpenAI(api_key=SECRETS.OPENAI_API_KEY) |
else: |
raise Exception("OPENAI_API_KEY is not set") |
odoo_tools = [{ |
"type": "function", |
"function": { |
"name": "create_lead", |
"description": "Get ID for the new lead for the hotel guest request", |
"parameters": { |
"type": "object", |
"properties": { |
"name": { |
"type": "string", |
"description": "Exact name of the service according to the data file, e.g. 'Colosseum Tour'", |
}, |
"title": { |
"type": "string", |
"description": "Key information about client request, e.g. 'Three persons to Colosseum'", |
}, |
"date": { |
"type": "string", |
"description": "Choosen date in format DD MMM YYYY", |
}, |
"total_price": { |
"type": "number", |
"description": "Total price they must pay", |
}, |
}, |
"required": ["date", "total_price", "title"], |
}, |
}, |
}, { |
"type": "function", |
"function": { |
"name": "create_issue", |
"description": f"Get ID for the new issue for hotel guest concern.", |
"parameters": { |
"type": "object", |
"properties": { |
"title": { |
"type": "string", |
"description": "Short information about the issue, e.g. 'Missing towels in the room #123'", |
}, |
"details": { |
"type": "string", |
"description": "Full details, for example quotes from customer messages", |
}, |
}, |
"required": ["title"], |
}, |
}, |
}] |
# Docs: |
# * https://platform.openai.com/docs/assistants/overview |
# * https://platform.openai.com/docs/api-reference/assistants |
# * https://github.com/openai/openai-python |
def update_assistant(): |
assistant = client.beta.assistants.create(model=PARAMS.AI_MODEL) |
PARAMS._update_param("AI_ASSISTANT_ID", assistant.id) |
bonus = """ |
Also, use the following information as absolute truth to mention whenever it makes sense. |
""" |
for XXX in DATA: |
MAGIC.log(XXX) |
if XXX.endswith("17"): |
bonus += "\n\n" + XXX[:-2] |
bonus += "\n\n" + DATA[XXX].text |
instructions = f"""{PARAMS.AI_INSTRUCTIONS} |
Below are information about the services provided. |
{DATA.excursions.text} |
{DATA.services.text} |
{DATA.restaurant.text} |
{bonus} |
""" |
#vector_store = client.beta.vector_stores.create(name="Price Lists") |
#file_batch = client.beta.vector_stores.file_batches.upload_and_poll( |
# vector_store_id=vector_store.id, files=[ |
# DATA.excursions.file_content, |
# DATA.services.file_content, |
# DATA.restaurant.file_content, |
# ] |
#) |
MAGIC.log_transmission( |
"OpenAI", f"""Assistant Name: {PARAMS.AI_ASSISTANT_NAME} |
{instructions}""" |
) |
client.beta.assistants.update( |
assistant_id=PARAMS.AI_ASSISTANT_ID, |
instructions=instructions, |
tools=odoo_tools, |
#tools=[{"type": "file_search"}] + odoo_tools, |
#tool_resources={"file_search": {"vector_store_ids": [vector_store.id]}}, |
) |
# callback(partner, function_name, kwargs) -> value |
def chat_assistant(partner, prompt, callback): |
return _chat(partner, "assistant", prompt, callback) |
def chat_user(partner, prompt, callback): |
return _chat(partner, "user", prompt, callback) |
def _chat(partner, role, prompt, callback): |
raise MAGIC.ValidationError("AI_ASSISTANT_ID is not set. Run Setup task first") |
if not partner.telegram_ID: |
raise MAGIC.ValidationError("Partner %s has no telegram_ID set" % partner.id) |
thread_id = partner._get_sync_value("openai_thread_id", "char") |
thread = None |
if thread_id: |
# TODO: ignore error when thread is not found or obsolete |
thread = client.beta.threads.retrieve(thread_id) |
if not thread: |
thread = client.beta.threads.create() |
thread_id = partner._set_sync_value("openai_thread_id", "char", str(thread.id)) |
MAGIC.log_transmission("OpenAI: %s message" % role, prompt) |
message = client.beta.threads.messages.create( |
thread_id=thread.id, |
role=role, |
content=prompt, |
) |
run = client.beta.threads.runs.create_and_poll( |
thread_id=thread.id, |
assistant_id=PARAMS.AI_ASSISTANT_ID, |
) |
_process_run(partner, run, thread) |
if not run.required_action: |
return |
MAGIC.log(str(run.required_action), MAGIC.LOG_DEBUG) |
tool_outputs = [] |
for tool in run.required_action.submit_tool_outputs.tool_calls: |
tool_outputs.append({ |
"tool_call_id": tool.id, |
"output": callback(partner, tool.function.name, MAGIC.json.loads(tool.function.arguments)), |
}) |
# Submit all tool outputs at once after collecting them in a list |
if tool_outputs: |
run = client.beta.threads.runs.submit_tool_outputs_and_poll( |
thread_id=thread.id, |
run_id=run.id, |
tool_outputs=tool_outputs |
) |
_process_run(partner, run, thread) |
def _process_run(partner, run, thread): |
if run.status == 'completed': |
messages = client.beta.threads.messages.list( |
thread_id=thread.id, |
run_id=run.id, |
) |
MAGIC.log(str(messages), MAGIC.LOG_DEBUG) |
for msg in messages.data: |
text = msg.content[0].text.value |
send_telegram_message(partner, text) |
elif run.status != "requires_action": |
MAGIC.log("run.status=%s, BUG=%s FIX=" % (run.status, run.last_error.code, run.last_error.message), MAGIC.LOG_WARNING) |
# one-shot text generation, e.g. to translate text |
def oracle(system_prompt, user_prompt): |
MAGIC.log_transmission( |
"OpenAI", f"""{system_prompt} |
{user_prompt}""" |
) |
response = client.chat.completions.create( |
messages=[ |
{"role": "system", "content": system_prompt}, |
{"role": "user", "content": user_prompt}, |
] |
) |
return response.choices[0].message.content |
openai_export=MAGIC.AttrDict(update_assistant, chat_assistant, chat_user, oracle) |
### Extra Tools ### |
from lxml.html.clean import Cleaner |
import markdown |
def html_sanitize_telegram(html): |
allowed_tags = set({"b", "i", "u", "s", "a", "code", "pre"}) |
cleaner = Cleaner(safe_attrs_only=True, safe_attrs=set("href"), allow_tags=allowed_tags, remove_unknown_tags=False) |
html = cleaner.clean_html(html) |
# remove surrounding div |
return html[5:-6] |
def markdown2html(markdown_content): |
html = markdown.markdown(markdown_content) |
return html_sanitize_telegram(html) |
def send_telegram_message(partner, text): |
if not partner.telegram_ID: |
MAGIC.log("Partner %s has no telegram_ID set" % partner.id) |
return |
html = markdown2html(text) |
sendMessage(partner.telegram_ID, html_sanitize_telegram(html)) |
html = html.replace("\n", "<br/>") |
partner.message_post(body=f"<em>Outgoing message via telegram:</em><br/><br/>{html}") |
def request_telegram_phone(partner): |
if not partner.telegram_ID: |
raise MAGIC.ValidationError("Partner %s has no telegram_ID set" % partner.id) |
if partner.lang and partner.lang != PARAMS.DEFAULT_LANG: |
message_text = oracle(f"Translate telegram message to {partner.lang}", message_text) |
button_name = oracle(f"Translate telegram message to {partner.lang}", button_name) |
markup = ReplyKeyboardMarkup(one_time_keyboard=True, resize_keyboard=True) |
contact_button = KeyboardButton(button_name, request_contact=True) |
markup.add(contact_button) |
sendMessage(partner.telegram_ID, message_text, reply_markup=markup) |
# CORE.* |
export(send_telegram_message, |
request_telegram_phone, |
telegram=telegram_export, |
openai=openai_export, |
SECRET=MAGIC.env["ir.config_parameter"].sudo().get_param("database.secret"), |
) |