Skip to content

Instantly share code, notes, and snippets.

@leventerevesz
Created April 19, 2020 13:53
Show Gist options
  • Save leventerevesz/2a534b8dd2ab5c98e585fc8235d84cbe to your computer and use it in GitHub Desktop.
Save leventerevesz/2a534b8dd2ab5c98e585fc8235d84cbe to your computer and use it in GitHub Desktop.
Azure IoT Edge PythonModule example
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
import time
import os
import sys
import asyncio
from six.moves import input
import threading
from azure.iot.device.aio import IoTHubModuleClient
# global counters
TEMPERATURE_THRESHOLD = 25
TWIN_CALLBACKS = 0
RECEIVED_MESSAGES = 0
async def main():
try:
print ( "IoT Hub Client for Python" )
# The client object is used to interact with your Azure IoT hub.
module_client = IoTHubModuleClient.create_from_edge_environment()
# connect the client.
await module_client.connect()
# Define behavior for receiving an input message on input1
# Because this is a filter module, we forward this message to the "output1" queue.
async def input1_listener(module_client):
global RECEIVED_MESSAGES
global TEMPERATURE_THRESHOLD
while True:
try:
input_message = await module_client.receive_message_on_input("input1") # blocking call
message = input_message.data
size = len(message)
message_text = message.decode('utf-8')
print ( " Data: <<<%s>>> & Size=%d" % (message_text, size) )
custom_properties = input_message.custom_properties
print ( " Properties: %s" % custom_properties )
RECEIVED_MESSAGES += 1
print ( " Total messages received: %d" % RECEIVED_MESSAGES )
data = json.loads(message_text)
if "machine" in data and "temperature" in data["machine"] and data["machine"]["temperature"] > TEMPERATURE_THRESHOLD:
custom_properties["MessageType"] = "Alert"
print ( "Machine temperature %s exceeds threshold %s" % (data["machine"]["temperature"], TEMPERATURE_THRESHOLD))
await module_client.send_message_to_output(input_message, "output1")
except Exception as ex:
print ( "Unexpected error in input1_listener: %s" % ex )
# twin_patch_listener is invoked when the module twin's desired properties are updated.
async def twin_patch_listener(module_client):
global TWIN_CALLBACKS
global TEMPERATURE_THRESHOLD
while True:
try:
data = await module_client.receive_twin_desired_properties_patch() # blocking call
print( "The data in the desired properties patch was: %s" % data)
if "TemperatureThreshold" in data:
TEMPERATURE_THRESHOLD = data["TemperatureThreshold"]
TWIN_CALLBACKS += 1
print ( "Total calls confirmed: %d\n" % TWIN_CALLBACKS )
except Exception as ex:
print ( "Unexpected error in twin_patch_listener: %s" % ex )
# define behavior for halting the application
def stdin_listener():
while True:
try:
selection = input("Press Q to quit\n")
if selection == "Q" or selection == "q":
print("Quitting...")
break
except:
time.sleep(10)
# Schedule task for C2D Listener
listeners = asyncio.gather(input1_listener(module_client), twin_patch_listener(module_client))
print ( "The sample is now waiting for messages. ")
# Run the stdin listener in the event loop
loop = asyncio.get_event_loop()
user_finished = loop.run_in_executor(None, stdin_listener)
# Wait for user to indicate they are done listening for messages
await user_finished
# Cancel listening
listeners.cancel()
# Finally, disconnect
await module_client.disconnect()
except Exception as e:
print ( "Unexpected error %s " % e )
raise
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment