Created
November 1, 2023 16:10
-
-
Save cmw2/0364d7b7eeea482f8b058ed001d1ec3a to your computer and use it in GitHub Desktop.
Add Azure Monitor OpenTelemetry to Sample AOAISearchDemo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# CMW: This is in app/backend | |
import datetime | |
import json | |
import mimetypes | |
import yaml | |
from azure.core.credentials import AzureKeyCredential | |
from azure.identity import DefaultAzureCredential | |
from azure.search.documents import SearchClient | |
from azure.storage.blob import BlobServiceClient | |
from backend.approaches.approach import Approach | |
from backend.approaches.approach_classifier import ApproachClassifier | |
from backend.approaches.chatstructured import ChatStructuredApproach | |
from backend.approaches.chatunstructured import ChatUnstructuredApproach | |
from backend.cognition.openai_client import OpenAIClient | |
from backend.config import DefaultConfig | |
from backend.contracts.chat_response import Answer, ApproachType, ChatResponse | |
from backend.contracts.error import ( | |
ContentFilterException, | |
OutOfScopeException, | |
UnauthorizedDBAccessException, | |
) | |
from backend.contracts.search_settings import SearchSettings | |
from backend.data_client.data_client import DataClient | |
from backend.utilities.access_management import AccessManager | |
from common.contracts.chat_session import ( | |
ChatSession, | |
DialogClassification, | |
ParticipantType, | |
) | |
from flask import Flask, jsonify, request | |
# CMW: Added this section for AZMon and OpenTelemetry | |
from opentelemetry import trace | |
from opentelemetry.instrumentation.flask import FlaskInstrumentor | |
from opentelemetry.instrumentation.requests import RequestsInstrumentor | |
from opentelemetry.sdk.resources import SERVICE_NAME, Resource | |
from opentelemetry.sdk.trace import TracerProvider | |
from opentelemetry.sdk.trace.export import BatchSpanProcessor | |
from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter | |
# Use the current user identity to authenticate with Azure OpenAI, Cognitive Search and Blob Storage (no secrets needed, | |
# just use 'az login' locally, and managed identity when deployed on Azure). If you need to use keys, use separate AzureKeyCredential instances with the | |
# keys for each service | |
# If you encounter a blocking error during a DefaultAzureCredntial resolution, you can exclude the problematic credential by using a parameter (ex. exclude_shared_token_cache_credential=True) | |
DefaultConfig.initialize() | |
azure_credential = DefaultAzureCredential() | |
search_credential = AzureKeyCredential(DefaultConfig.AZURE_SEARCH_KEY) | |
openai_client = OpenAIClient() | |
# Set up clients for Cognitive Search and Storage | |
search_client = SearchClient( | |
endpoint=f"https://{DefaultConfig.AZURE_SEARCH_SERVICE}.search.windows.net", | |
index_name=DefaultConfig.AZURE_SEARCH_INDEX, | |
credential=search_credential, | |
) | |
blob_client = BlobServiceClient.from_connection_string( | |
DefaultConfig.AZURE_BLOB_CONNECTION_STRING | |
) | |
blob_container = blob_client.get_container_client(DefaultConfig.AZURE_STORAGE_CONTAINER) | |
# get the logger that is already initialized | |
logger = DefaultConfig.logger | |
chat_approaches = { | |
ApproachType.unstructured.name: ChatUnstructuredApproach( | |
search_client, | |
DefaultConfig.KB_FIELDS_SOURCEPAGE, | |
DefaultConfig.KB_FIELDS_CONTENT, | |
logger, | |
search_threshold_percentage=DefaultConfig.SEARCH_THRESHOLD_PERCENTAGE, | |
), | |
ApproachType.structured.name: ChatStructuredApproach( | |
DefaultConfig.SQL_CONNECTION_STRING, logger | |
), | |
} | |
# initialize data client | |
base_uri = DefaultConfig.DATA_SERVICE_URI | |
data_client = DataClient(base_uri, logger) | |
approach_classifier = ApproachClassifier(logger) | |
access_manager = AccessManager() | |
app = Flask(__name__) | |
# CMW: Added this code to instrument the flask app | |
trace.set_tracer_provider( | |
TracerProvider( | |
resource=Resource.create({SERVICE_NAME: "backend-service"}), | |
) | |
) | |
FlaskInstrumentor().instrument_app(app) | |
RequestsInstrumentor().instrument() | |
trace_exporter = AzureMonitorTraceExporter( | |
connection_string=DefaultConfig.APPLICATION_INSIGHTS_CNX_STR | |
) | |
trace.get_tracer_provider().add_span_processor( | |
BatchSpanProcessor(trace_exporter) | |
) | |
@app.route("/", defaults={"path": ""}) | |
@app.route("/<path:path>") | |
def index(path): | |
return app.send_static_file("index.html") | |
@app.route("/assets/<path:rest_of_path>") | |
def assets(rest_of_path): | |
return app.send_static_file(f"assets/{rest_of_path}") | |
# Serve content files from blob storage from within the app to keep the example self-contained. | |
# *** NOTE *** this assumes that the content files are public, or at least that all users of the app | |
# can access all the files. This is also slow and memory hungry. | |
@app.route("/content/<path>") | |
def content_file(path): | |
blob = blob_container.get_blob_client(path).download_blob() | |
mime_type = blob.properties["content_settings"]["content_type"] | |
if mime_type == "application/octet-stream": | |
mime_type = mimetypes.guess_type(path)[0] or "application/octet-stream" | |
return ( | |
blob.readall(), | |
200, | |
{"Content-Type": mime_type, "Content-Disposition": f"inline; filename={path}"}, | |
) | |
@app.route("/chat", methods=["POST"]) | |
def chat(): | |
# try get conversation_id and dialog_id needed for logging | |
conversation_id = request.json.get( | |
"conversation_id", "no conversation_id found in request" | |
) | |
dialog_id = request.json.get("dialog_id", "no dialog_id found in request") | |
user_id = request.json.get("user_id", "no user_id found in request") | |
classification_override = None | |
overrides = request.json.get("overrides", None) | |
if overrides: | |
classification_override = overrides.get("classification_override", None) | |
# fetch user profile | |
user_profile = data_client.get_user_profile(user_id) | |
# check user access rules | |
allowed_resources = data_client.get_user_resources(user_id) | |
allowed_approaches = access_manager.get_allowed_approaches(allowed_resources) | |
logger.set_conversation_and_dialog_ids(conversation_id, dialog_id) | |
properties = logger.get_updated_properties( | |
{"conversation_id": conversation_id, "dialog_id": dialog_id, "user_id": user_id} | |
) | |
logger.info(f"request: {json.dumps(request.json)}", extra=properties) | |
user_message = request.json.get("dialog") | |
chat_session: ChatSession | |
chat_session_exists = data_client.check_chat_session(user_id, conversation_id) | |
if not chat_session_exists: | |
chat_session = data_client.create_chat_session(user_id, conversation_id) | |
logger.info( | |
f"created new chat session for user {user_id} and session {conversation_id}", | |
extra=properties, | |
) | |
else: | |
chat_session = data_client.get_chat_session(user_id, conversation_id) | |
logger.info( | |
f"chat session for user {user_id} and session {conversation_id} already exists", | |
extra=properties, | |
) | |
history = [ | |
{ | |
"participant_type": dialog.participant_type.value, | |
"utterance": dialog.utterance, | |
"question_type": dialog.classification.value, | |
} | |
for dialog in chat_session.conversation | |
] | |
history.append( | |
{"participant_type": ParticipantType.user.value, "utterance": user_message} | |
) | |
bot_config = yaml.safe_load(open("backend/bot_config.yaml", "r")) | |
question_classification = None | |
try: | |
if classification_override: | |
approach_type = ApproachType(classification_override) | |
else: | |
approach_type = approach_classifier.run(history, bot_config, openai_client) | |
logger.info(f"question_type: {approach_type.name}", extra=properties) | |
if approach_type == ApproachType.chit_chat: | |
chit_chat_canned_response = "I'm sorry, but the question you've asked is outside my area of expertise. I'd be happy to help with any questions related to Microsoft Surface PCs and Laptops. Please feel free to ask about those, and I'll do my best to assist you!" | |
data_client.add_dialog_to_chat_session( | |
user_id, | |
conversation_id, | |
ParticipantType.user, | |
datetime.datetime.now(), | |
user_message, | |
DialogClassification.chit_chat, | |
) | |
logger.info( | |
f"added dialog to chat session for user {user_id} and session {conversation_id}", | |
extra=properties, | |
) | |
answer = Answer(formatted_answer=chit_chat_canned_response) | |
response = ChatResponse(answer=answer, classification=approach_type) | |
data_client.add_dialog_to_chat_session( | |
user_id, | |
conversation_id, | |
ParticipantType.assistant, | |
datetime.datetime.now(), | |
json.dumps(response.answer.to_item()), | |
DialogClassification.chit_chat, | |
) | |
logger.info( | |
f"added response {chit_chat_canned_response} to chat session for user {user_id} and session {conversation_id}", | |
extra=properties, | |
) | |
return jsonify(response.to_item()) | |
elif approach_type == ApproachType.inappropriate: | |
inappropiate_canned_response = "I'm sorry, but the question you've asked goes against our content safety policy due to harmful, offensive, or illegal content. I'd be happy to help with any questions related to Microsoft Surface PCs and Laptops. Please feel free to ask about those, and I'll do my best to assist you!" | |
# TODO: Use DialogClassification.inappropiate once data service has been updated. | |
data_client.add_dialog_to_chat_session( | |
user_id, | |
conversation_id, | |
ParticipantType.user, | |
datetime.datetime.now(), | |
user_message, | |
DialogClassification.chit_chat, | |
) | |
logger.info( | |
f"added dialog to chat session for user {user_id} and session {conversation_id}", | |
extra=properties, | |
) | |
answer = Answer(formatted_answer=inappropiate_canned_response) | |
response = ChatResponse(answer=answer, classification=approach_type) | |
# TODO: Use DialogClassification.inappropiate once data service has been updated. | |
data_client.add_dialog_to_chat_session( | |
user_id, | |
conversation_id, | |
ParticipantType.assistant, | |
datetime.datetime.now(), | |
json.dumps(response.answer.to_item()), | |
DialogClassification.chit_chat, | |
) | |
logger.info( | |
f"added response {inappropiate_canned_response} to chat session for user {user_id} and session {conversation_id}", | |
extra=properties, | |
) | |
return jsonify(response.to_item()) | |
# check if user is allowed to use the approach | |
user_allowed = access_manager.is_user_allowed(allowed_approaches, approach_type) | |
if not user_allowed: | |
prohibited_resource = access_manager.map_approach_to_resource(approach_type) | |
raise Exception( | |
f"This query requires access to {prohibited_resource}\nUser: {user_profile.user_name} is not allowed to use this resource, please try another query or contact your administrator." | |
) | |
question_classification = ( | |
DialogClassification.unstructured_query | |
if approach_type == ApproachType.unstructured | |
else DialogClassification.structured_query | |
) | |
# filtered_chat_session = data_client.filter_chat_session(chat_session, filter=question_classification) | |
filtered_chat_session = chat_session | |
simplified_history = [ | |
{ | |
"participant_type": dialog.participant_type.value, | |
"utterance": dialog.utterance, | |
} | |
for dialog in filtered_chat_session.conversation | |
] | |
simplified_history.append( | |
{"participant_type": ParticipantType.user.value, "utterance": user_message} | |
) | |
impl = chat_approaches.get(approach_type.name) | |
if not impl: | |
return jsonify({"error": "unknown approach"}), 400 | |
response = impl.run( | |
simplified_history, | |
bot_config, | |
openai_client, | |
request.json.get("overrides") or None, | |
) | |
# state store update | |
if not response.error: | |
data_client.add_dialog_to_chat_session( | |
user_id, | |
conversation_id, | |
ParticipantType.user, | |
datetime.datetime.now(), | |
user_message, | |
question_classification, | |
) | |
logger.info( | |
f"added dialog to chat session for user {user_id} and session {conversation_id}", | |
extra=properties, | |
) | |
data_client.add_dialog_to_chat_session( | |
user_id, | |
conversation_id, | |
ParticipantType.assistant, | |
datetime.datetime.now(), | |
json.dumps(response.answer.to_item()), | |
question_classification, | |
) | |
logger.info( | |
f"added response {response.answer.formatted_answer} to chat session for user {user_id} and session {conversation_id}", | |
extra=properties, | |
) | |
return jsonify(response.to_item()) | |
except OutOfScopeException as e: | |
logger.exception(f"Exception in /chat: {str(e)}", extra=properties) | |
if access_manager.is_user_allowed( | |
allowed_approaches, e.suggested_classification | |
): | |
response = ChatResponse( | |
answer=Answer( | |
f"Error when querying knowledge-base: '{str(e.message)}'." | |
), | |
show_retry=True, | |
suggested_classification=e.suggested_classification, | |
classification=question_classification, | |
) | |
return jsonify(response.to_item()) | |
else: | |
response = ChatResponse( | |
answer=Answer(str(e.message)), classification=question_classification | |
) | |
return jsonify(response.to_item()) | |
except UnauthorizedDBAccessException as e: | |
logger.exception( | |
f"UnauthorizedDBAccessExceptionException in /chat: {str(e)}", | |
extra=properties, | |
) | |
response = ChatResponse(answer=Answer(), error=str(e.message)) | |
return jsonify(response.to_item()), 403 | |
except ContentFilterException as e: | |
logger.exception(f"ContentFilterException in /chat: {str(e)}", extra=properties) | |
response = ChatResponse(answer=Answer(), error=str(e.message)) | |
return jsonify(response.to_item()), 400 | |
except Exception as e: | |
logger.exception(f"Exception in /chat: {e}", extra=properties) | |
response = ChatResponse(answer=Answer(), error=str(e), show_retry=True) | |
return jsonify(response.to_item()), 500 | |
@app.route("/user-profiles", methods=["GET"]) | |
def get_all_user_profiles(): | |
try: | |
user_profiles = data_client.get_all_user_profiles() | |
user_profiles_dict = [user_profile.to_item() for user_profile in user_profiles] | |
return jsonify(user_profiles_dict) | |
except Exception as e: | |
logger.exception(f"Exception in /user-profiles: {e}") | |
return jsonify({"error": str(e)}), 500 | |
@app.route("/chat-sessions/<user_id>/<conversation_id>", methods=["DELETE"]) | |
def clear_chat_session(user_id: str, conversation_id: str): | |
properties = logger.get_updated_properties( | |
{"user_id": user_id, "conversation_id": conversation_id} | |
) | |
try: | |
data_client.clear_chat_session(user_id, conversation_id) | |
logger.info(f"cleared chat session.", extra=properties) | |
return jsonify({"message": "cleared chat session"}) | |
except Exception as e: | |
logger.exception( | |
f"Exception in /chat-sessions/<user_id>/<conversation_id>: {e}" | |
) | |
return jsonify({"error": str(e)}), 500 | |
@app.route("/search-settings", methods=["GET"]) | |
def get_search_settings(): | |
try: | |
skip_vectorization_str = DefaultConfig.SEARCH_SKIP_VECTORIZATION | |
vectorization_enabled = ( | |
True | |
if skip_vectorization_str.lower() == "false" | |
else False | |
if skip_vectorization_str.lower() == "true" | |
else None | |
) | |
if vectorization_enabled is None: | |
raise Exception( | |
f"Invalid value for SEARCH_SKIP_VECTORIZATION: {skip_vectorization_str}. Must be either 'true' or 'false'" | |
) | |
search_settings = SearchSettings(vectorization_enabled) | |
return jsonify(search_settings.to_item()) | |
except Exception as e: | |
logger.exception(f"Exception in /search-settings: {e}") | |
return jsonify({"error": str(e)}), 500 | |
if __name__ == "__main__": | |
app.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# CMW: This is in app/backend | |
#!/usr/bin/env python3 | |
# Copyright (c) Microsoft Corporation. All rights reserved. | |
# Licensed under the MIT License. | |
import os | |
from datetime import datetime | |
from azure.identity import DefaultAzureCredential | |
from azure.keyvault.secrets import SecretClient | |
from common.logging.log_helper import CustomLogger | |
from dotenv import load_dotenv | |
# load value from .env file if it exists, unless deploying in a production environment | |
if os.getenv("ENVIRONMENT") != "PROD": | |
load_dotenv(override=True, dotenv_path=f"{os.getcwd()}/backend/.env") | |
class Config_Reader(): | |
def __init__(self, logger: CustomLogger) -> None: | |
self.logger = logger | |
def set_logger(self, logger: CustomLogger): | |
self.logger = logger | |
def read_config_value(self, key_name:str)-> str: | |
return self._get_config_value(key_name) | |
def _get_secret_from_keyvault(self, key_name:str): | |
KEYVAULT_URI = os.getenv("KEYVAULT_URI", "") | |
credential = DefaultAzureCredential() | |
key_name = key_name.replace("_", "-") | |
client = SecretClient(vault_url=KEYVAULT_URI, credential=credential) | |
return client.get_secret(key_name).value | |
def _get_config_value(self, key_name:str)-> str: | |
value = os.getenv(key_name, None) | |
if value is None or value == "": | |
start = datetime.now() | |
value = self._get_secret_from_keyvault(key_name) | |
end = datetime.now() | |
duration = (end - start).microseconds/1000 | |
addl_dimension = {"keyvault_duration": duration} | |
if self.logger: | |
add_props = self.logger.get_updated_properties(addl_dimension) | |
self.logger.info(f"key name: {key_name}, keyvault_duration: {duration}", extra=add_props) | |
if value is None: | |
if self.logger: | |
self.logger.error(f"Necessary value {value} couldn't be found in environment or Key Vault") | |
raise Exception(f"Necessary value {value} couldn't be found in environment or Key Vault") | |
return value | |
class DefaultConfig: | |
_initialized = False | |
@classmethod | |
def initialize(cls): | |
if not cls._initialized: | |
config_reader = Config_Reader(None) | |
# CMW: Put this on the class so can use it from app.py | |
cls.APPLICATION_INSIGHTS_CNX_STR = config_reader.read_config_value("APPLICATION-INSIGHTS-CNX-STR") | |
cls.logger = CustomLogger(cls.APPLICATION_INSIGHTS_CNX_STR) | |
cls.logger.set_conversation_and_dialog_ids("BACKEND_APP", "NONE") | |
config_reader.set_logger(cls.logger) | |
try: | |
# cls.logger.info(f"APPINSIGHTSINFO: {cls.APPLICATION_INSIGHTS_CNX_STR}") | |
cls.AZURE_OPENAI_GPT4_SERVICE = config_reader.read_config_value("AZURE-OPENAI-GPT4-SERVICE") | |
cls.AZURE_OPENAI_GPT4_API_KEY = config_reader.read_config_value("AZURE-OPENAI-GPT4-API-KEY") | |
cls.AZURE_OPENAI_CLASSIFIER_SERVICE = config_reader.read_config_value("AZURE-OPENAI-CLASSIFIER-SERVICE") | |
cls.AZURE_OPENAI_CLASSIFIER_API_KEY = config_reader.read_config_value("AZURE-OPENAI-CLASSIFIER-API-KEY") | |
cls.AZURE_OPENAI_EMBEDDINGS_SERVICE = config_reader.read_config_value("AZURE-OPENAI-EMBEDDINGS-SERVICE") | |
cls.AZURE_OPENAI_EMBEDDINGS_API_KEY = config_reader.read_config_value("AZURE-OPENAI-EMBEDDINGS-API-KEY") | |
cls.AZURE_SEARCH_SERVICE = config_reader.read_config_value("AZURE-SEARCH-SERVICE") | |
cls.AZURE_SEARCH_INDEX = config_reader.read_config_value("AZURE-SEARCH-INDEX") | |
cls.AZURE_SEARCH_KEY = config_reader.read_config_value("AZURE-SEARCH-KEY") | |
cls.KB_FIELDS_CONTENT = config_reader.read_config_value("KB-FIELDS-CONTENT") | |
cls.KB_FIELDS_CATEGORY = config_reader.read_config_value("KB-FIELDS-CATEGORY") | |
cls.KB_FIELDS_SOURCEPAGE = config_reader.read_config_value("KB-FIELDS-SOURCEPAGE") | |
cls.SEARCH_SKIP_VECTORIZATION = config_reader.read_config_value("SEARCH-SKIP-VECTORIZATION") | |
cls.AZURE_STORAGE_ACCOUNT = config_reader.read_config_value("AZURE-STORAGE-ACCOUNT") | |
cls.AZURE_STORAGE_CONTAINER = config_reader.read_config_value("AZURE-STORAGE-CONTAINER") | |
cls.AZURE_BLOB_CONNECTION_STRING = config_reader.read_config_value("AZURE-BLOB-CONNECTION-STRING") | |
cls.DATA_SERVICE_URI = config_reader.read_config_value("DATA-SERVICE-URI") | |
cls.SQL_CONNECTION_STRING = config_reader.read_config_value("SQL-CONNECTION-STRING") | |
cls.RATIO_OF_INDEX_TO_HISTORY = int(os.getenv("RATIO_OF_INDEX_TO_HISTORY", 5)) if os.getenv("RATIO_OF_INDEX_TO_HISTORY") != "" else 5 | |
cls.SEARCH_THRESHOLD_PERCENTAGE = int(os.getenv("SEARCH_THRESHOLD_PERCENTAGE", 50)) if os.getenv("SEARCH_THRESHOLD_PERCENTAGE") != "" else 50 | |
cls.logger.info(f"SEARCH_THRESHOLD_PERCENTAGE: {cls.SEARCH_THRESHOLD_PERCENTAGE}") | |
cls._initialized = True | |
except Exception as e: | |
cls.logger.error(f"Error while loading config: {e}") | |
raise e |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# CMW: This is in app/common/logging | |
import logging | |
from copy import deepcopy | |
# CMW: removed from opencensus.... and replaced with | |
from azure.monitor.opentelemetry import configure_azure_monitor | |
from opentelemetry._logs import set_logger_provider | |
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler | |
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor | |
from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter | |
class CustomLogger(logging.LoggerAdapter): | |
def __init__(self, app_insights_cnx_str): | |
self.app_insights_cnx_str = app_insights_cnx_str | |
self.extra = {} | |
custom_dimensions = { | |
"conversation_id": "please set conversation_id before logging", | |
"dialog_id": "please set dialog_id before logging" | |
} | |
log_properties = { | |
"custom_dimensions": custom_dimensions, | |
} | |
self.extra = log_properties | |
self.logger = logging.getLogger(__name__) | |
self.logger.setLevel(logging.DEBUG) | |
self.initialize_loggers() | |
def set_conversation_id(self, conversation_id: str): | |
self.extra["custom_dimensions"]["conversation_id"] = conversation_id | |
def set_dialog_id(self, dialog_id: str): | |
self.extra["custom_dimensions"]["dialog_id"] = dialog_id | |
def set_conversation_and_dialog_ids(self, conversation_id: str, dialog_id: str): | |
self.set_conversation_id(conversation_id) | |
self.set_dialog_id(dialog_id) | |
def get_converation_and_dialog_ids(self) -> dict: | |
return { | |
"conversation_id": self.extra["custom_dimensions"]["conversation_id"], | |
"dialog_id": self.extra["custom_dimensions"]["dialog_id"] | |
} | |
def initialize_loggers(self): | |
if self.app_insights_cnx_str: | |
# add appInsights logger if it is not already added by another instance of CustomLogger | |
# CMW: Is there something better to check for than just generic LoggingHandler? | |
if not any(isinstance(handler, LoggingHandler) for handler in self.logger.handlers): | |
# CMW: Added this new opentelemetry based code | |
logger_provider = LoggerProvider() | |
set_logger_provider(logger_provider) | |
exporter = AzureMonitorLogExporter( | |
connection_string=self.app_insights_cnx_str | |
) | |
logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter)) | |
handler = LoggingHandler() | |
az_log_handler = handler | |
self.logger.addHandler(az_log_handler) | |
# add console logger if it is not already added by another instance of CustomLogger | |
if not any(isinstance(handler, logging.StreamHandler) for handler in self.logger.handlers): | |
console_handler = logging.StreamHandler() | |
console_handler.setLevel(logging.DEBUG) | |
self.logger.addHandler(console_handler) | |
def process(self, msg, kwargs): | |
""" | |
Extract conversation_id and include it in the log message | |
""" | |
extra = kwargs.get("extra") or self.extra | |
if extra.__contains__("custom_dimensions"): | |
custom_dimensions = extra["custom_dimensions"] | |
conversation_id = custom_dimensions["conversation_id"] | |
else: | |
conversation_id = "Conversation_id NOT SET while logging" | |
# include all properties except custom dimensions in extra dictionary to message | |
for key, value in extra.items(): | |
if key != "custom_dimensions": | |
msg = msg + f", {key}: {value}" | |
return 'Conversation_id: %s, %s' % (conversation_id, msg), kwargs | |
def get_updated_properties(self, additional_custom_dimensions: dict, additional_properties: dict = {} ) -> dict: | |
""" | |
Add custom dimensions to the logger | |
""" | |
custom_dimensions = self.get_updated_custom_dimensions(additional_custom_dimensions) | |
properties = deepcopy(self.extra) | |
for key, value in additional_properties.items(): | |
if key != "custom_dimensions": | |
properties[key] = value | |
properties["custom_dimensions"] = custom_dimensions | |
return properties | |
def get_updated_custom_dimensions(self, additional_custom_dimensions: dict) -> dict: | |
""" | |
Add custom dimensions to the logger | |
""" | |
dimensions = deepcopy(self.extra["custom_dimensions"]) | |
for key, value in additional_custom_dimensions.items(): | |
dimensions[key] = value | |
return dimensions |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# CMW removed opencensus items | |
# added | |
azure-monitor-opentelemetry==1.0.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment