|
### TELEGRAM bot #### |
|
|
|
# https://github.com/eternnoir/pyTelegramBotAPI |
|
import telebot |
|
from telebot.types import ReplyKeyboardMarkup, KeyboardButton |
|
|
|
if SECRETS.TELEGRAM_BOT_TOKEN: |
|
bot = telebot.TeleBot(token=SECRETS.TELEGRAM_BOT_TOKEN) |
|
else: |
|
raise Exception("Telegram bot token is not set") |
|
|
|
def to_dict(obj): |
|
try: |
|
return obj.to_dict() |
|
except: |
|
try: |
|
return obj.to_json() |
|
except: |
|
return obj |
|
|
|
def jsonify(kwargs): |
|
# fix for logging |
|
return {k: to_dict(v) for k, v in kwargs.items()} |
|
|
|
def sendMessage(chat_id, *args, **kwargs): |
|
MAGIC.log_transmission( |
|
"Message to %s@telegram" % chat_id, MAGIC.json.dumps([args, jsonify(kwargs)]) |
|
) |
|
bot.send_message(chat_id, *args, **kwargs) |
|
|
|
def setWebhook(*args, **kwargs): |
|
MAGIC.log_transmission("Telegram->setWebhook", MAGIC.json.dumps([args, kwargs])) |
|
bot.set_webhook(*args, **kwargs) |
|
|
|
def parse_data(data): |
|
return telebot.types.Update.de_json(data) |
|
|
|
telegram_export = MAGIC.AttrDict(sendMessage, setWebhook, parse_data) |
|
|
|
|
|
### OPEN AI client ### |
|
from openai import OpenAI |
|
if SECRETS.OPENAI_API_KEY: |
|
client = OpenAI(api_key=SECRETS.OPENAI_API_KEY) |
|
else: |
|
raise Exception("OPENAI_API_KEY is not set") |
|
|
|
odoo_tools = [{ |
|
"type": "function", |
|
"function": { |
|
"name": "create_lead", |
|
"description": "Get ID for the new lead for the hotel guest request", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"name": { |
|
"type": "string", |
|
"description": "Exact name of the service according to the data file, e.g. 'Colosseum Tour'", |
|
}, |
|
"title": { |
|
"type": "string", |
|
"description": "Key information about client request, e.g. 'Three persons to Colosseum'", |
|
}, |
|
"date": { |
|
"type": "string", |
|
"description": "Choosen date in format DD MMM YYYY", |
|
}, |
|
"total_price": { |
|
"type": "number", |
|
"description": "Total price they must pay", |
|
}, |
|
}, |
|
"required": ["date", "total_price", "title"], |
|
}, |
|
}, |
|
}, { |
|
"type": "function", |
|
"function": { |
|
"name": "create_issue", |
|
"description": f"Get ID for the new issue for hotel guest concern.", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"title": { |
|
"type": "string", |
|
"description": "Short information about the issue, e.g. 'Missing towels in the room #123'", |
|
}, |
|
"details": { |
|
"type": "string", |
|
"description": "Full details, for example quotes from customer messages", |
|
}, |
|
}, |
|
"required": ["title"], |
|
}, |
|
}, |
|
}] |
|
|
|
# Docs: |
|
# * https://platform.openai.com/docs/assistants/overview |
|
# * https://platform.openai.com/docs/api-reference/assistants |
|
# * https://github.com/openai/openai-python |
|
def update_assistant(): |
|
if not PARAMS.AI_ASSISTANT_ID: |
|
assistant = client.beta.assistants.create(model=PARAMS.AI_MODEL) |
|
PARAMS._update_param("AI_ASSISTANT_ID", assistant.id) |
|
|
|
bonus = """ |
|
Also, use the following information as absolute truth to mention whenever it makes sense. |
|
""" |
|
for XXX in DATA: |
|
MAGIC.log(XXX) |
|
if XXX.endswith("17"): |
|
bonus += "\n\n" + XXX[:-2] |
|
bonus += "\n\n" + DATA[XXX].text |
|
|
|
instructions = f"""{PARAMS.AI_INSTRUCTIONS} |
|
|
|
Below are information about the services provided. |
|
|
|
EXCURSIONS |
|
|
|
{DATA.excursions.text} |
|
|
|
HOTEL SERVICES |
|
|
|
{DATA.services.text} |
|
|
|
RESTAURANT |
|
|
|
{DATA.restaurant.text} |
|
|
|
{bonus} |
|
""" |
|
#vector_store = client.beta.vector_stores.create(name="Price Lists") |
|
#file_batch = client.beta.vector_stores.file_batches.upload_and_poll( |
|
# vector_store_id=vector_store.id, files=[ |
|
# DATA.excursions.file_content, |
|
# DATA.services.file_content, |
|
# DATA.restaurant.file_content, |
|
# ] |
|
#) |
|
MAGIC.log_transmission( |
|
"OpenAI", f"""Assistant Name: {PARAMS.AI_ASSISTANT_NAME} |
|
AI Model: {PARAMS.AI_MODEL} |
|
|
|
{instructions}""" |
|
) |
|
client.beta.assistants.update( |
|
assistant_id=PARAMS.AI_ASSISTANT_ID, |
|
name=PARAMS.AI_ASSISTANT_NAME, |
|
instructions=instructions, |
|
model=PARAMS.AI_MODEL, |
|
tools=odoo_tools, |
|
#tools=[{"type": "file_search"}] + odoo_tools, |
|
#tool_resources={"file_search": {"vector_store_ids": [vector_store.id]}}, |
|
) |
|
|
|
# callback(partner, function_name, kwargs) -> value |
|
def chat_assistant(partner, prompt, callback): |
|
return _chat(partner, "assistant", prompt, callback) |
|
|
|
def chat_user(partner, prompt, callback): |
|
return _chat(partner, "user", prompt, callback) |
|
|
|
def _chat(partner, role, prompt, callback): |
|
if not PARAMS.AI_ASSISTANT_ID: |
|
raise MAGIC.ValidationError("AI_ASSISTANT_ID is not set. Run Setup task first") |
|
if not partner.telegram_ID: |
|
raise MAGIC.ValidationError("Partner %s has no telegram_ID set" % partner.id) |
|
|
|
thread_id = partner._get_sync_value("openai_thread_id", "char") |
|
thread = None |
|
if thread_id: |
|
# TODO: ignore error when thread is not found or obsolete |
|
thread = client.beta.threads.retrieve(thread_id) |
|
|
|
if not thread: |
|
thread = client.beta.threads.create() |
|
thread_id = partner._set_sync_value("openai_thread_id", "char", str(thread.id)) |
|
|
|
MAGIC.log_transmission("OpenAI: %s message" % role, prompt) |
|
message = client.beta.threads.messages.create( |
|
thread_id=thread.id, |
|
role=role, |
|
content=prompt, |
|
) |
|
|
|
run = client.beta.threads.runs.create_and_poll( |
|
thread_id=thread.id, |
|
assistant_id=PARAMS.AI_ASSISTANT_ID, |
|
) |
|
_process_run(partner, run, thread) |
|
|
|
if not run.required_action: |
|
return |
|
MAGIC.log(str(run.required_action), MAGIC.LOG_DEBUG) |
|
|
|
tool_outputs = [] |
|
for tool in run.required_action.submit_tool_outputs.tool_calls: |
|
tool_outputs.append({ |
|
"tool_call_id": tool.id, |
|
"output": callback(partner, tool.function.name, MAGIC.json.loads(tool.function.arguments)), |
|
}) |
|
|
|
# Submit all tool outputs at once after collecting them in a list |
|
if tool_outputs: |
|
run = client.beta.threads.runs.submit_tool_outputs_and_poll( |
|
thread_id=thread.id, |
|
run_id=run.id, |
|
tool_outputs=tool_outputs |
|
) |
|
_process_run(partner, run, thread) |
|
|
|
def _process_run(partner, run, thread): |
|
if run.status == 'completed': |
|
messages = client.beta.threads.messages.list( |
|
thread_id=thread.id, |
|
run_id=run.id, |
|
) |
|
MAGIC.log(str(messages), MAGIC.LOG_DEBUG) |
|
for msg in messages.data: |
|
text = msg.content[0].text.value |
|
send_telegram_message(partner, text) |
|
elif run.status != "requires_action": |
|
MAGIC.log("run.status=%s, BUG=%s FIX=" % (run.status, run.last_error.code, run.last_error.message), MAGIC.LOG_WARNING) |
|
|
|
# one-shot text generation, e.g. to translate text |
|
def oracle(system_prompt, user_prompt): |
|
MAGIC.log_transmission( |
|
"OpenAI", f"""{system_prompt} |
|
|
|
{user_prompt}""" |
|
) |
|
response = client.chat.completions.create( |
|
model=PARAMS.AI_MODEL, |
|
messages=[ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": user_prompt}, |
|
] |
|
) |
|
return response.choices[0].message.content |
|
|
|
openai_export=MAGIC.AttrDict(update_assistant, chat_assistant, chat_user, oracle) |
|
|
|
### Extra Tools ### |
|
from lxml.html.clean import Cleaner |
|
import markdown |
|
|
|
|
|
def html_sanitize_telegram(html): |
|
allowed_tags = set({"b", "i", "u", "s", "a", "code", "pre"}) |
|
cleaner = Cleaner(safe_attrs_only=True, safe_attrs=set("href"), allow_tags=allowed_tags, remove_unknown_tags=False) |
|
html = cleaner.clean_html(html) |
|
# remove surrounding div |
|
return html[5:-6] |
|
|
|
|
|
def markdown2html(markdown_content): |
|
html = markdown.markdown(markdown_content) |
|
return html_sanitize_telegram(html) |
|
|
|
|
|
def send_telegram_message(partner, text): |
|
if not partner.telegram_ID: |
|
MAGIC.log("Partner %s has no telegram_ID set" % partner.id) |
|
return |
|
|
|
html = markdown2html(text) |
|
sendMessage(partner.telegram_ID, html_sanitize_telegram(html)) |
|
|
|
html = html.replace("\n", "<br/>") |
|
partner.message_post(body=f"<em>Outgoing message via telegram:</em><br/><br/>{html}") |
|
|
|
|
|
def request_telegram_phone(partner): |
|
if not partner.telegram_ID: |
|
raise MAGIC.ValidationError("Partner %s has no telegram_ID set" % partner.id) |
|
|
|
message_text = PARAMS.TELEGRAM_SHARE_CONTACT_MESSAGE |
|
button_name = PARAMS.TELEGRAM_SHARE_CONTACT_BUTTON |
|
if partner.lang and partner.lang != PARAMS.DEFAULT_LANG: |
|
message_text = oracle(f"Translate telegram message to {partner.lang}", message_text) |
|
button_name = oracle(f"Translate telegram message to {partner.lang}", button_name) |
|
|
|
markup = ReplyKeyboardMarkup(one_time_keyboard=True, resize_keyboard=True) |
|
contact_button = KeyboardButton(button_name, request_contact=True) |
|
markup.add(contact_button) |
|
sendMessage(partner.telegram_ID, message_text, reply_markup=markup) |
|
|
|
|
|
# CORE.* |
|
export(send_telegram_message, |
|
request_telegram_phone, |
|
telegram=telegram_export, |
|
openai=openai_export, |
|
SECRET=MAGIC.env["ir.config_parameter"].sudo().get_param("database.secret"), |
|
) |