Created
January 16, 2018 00:31
-
-
Save Moving-Electrons/31a024ce0898a7c7415eecdefb4a087c to your computer and use it in GitHub Desktop.
This script check ambient conditions every 2 minutes and updates an LED matrix graph accordingly. It uses a Raspberry Pi Zero W, Adafruit VCNL4010 proximity sensor and a Sense HAT. More info can be found at www.movingelectrons.net.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# Import the VCNL40xx module. | |
import Adafruit_VCNL40xx | |
from sense_hat import SenseHat | |
import time | |
import threading | |
import subprocess | |
# Create a VCNL4010 instance. | |
vcnl = Adafruit_VCNL40xx.VCNL4010() | |
def pixel_image(orientation, temp): | |
''' | |
Returns a list with a pixel image that can be sent to the | |
SenseHat. Each "pixel" (i.e. RGB tuple) is colored based on a | |
predefined gradient color range and the temperature passed | |
as an argument. | |
orientation: can be down, up or same. | |
temp: Float. | |
''' | |
o = (0, 0, 0) | |
# Gradient codes from http://www.colorhexa.com/0055e5-to-bf2300 | |
if temp <= 68.00: x = (0, 85, 229) | |
elif 68.01 <= temp <= 71.00: x = (0, 149, 226) | |
elif 71.01 <= temp <= 73.00: x = (0, 213, 41) | |
elif 73.01 <= temp <= 75.00: x = (192, 200, 0) | |
elif 75.01 <= temp <= 78.00: x = (194, 91, 0) | |
elif temp >= 78.01: x = (191, 35, 0) | |
else: x = (255,255,255) | |
up_image = [ | |
o,o,o,x,x,o,o,o, | |
o,o,x,x,x,x,o,o, | |
o,x,o,x,x,o,x,o, | |
x,o,o,x,x,o,o,x, | |
o,o,o,x,x,o,o,o, | |
o,o,o,x,x,o,o,o, | |
o,o,o,x,x,o,o,o, | |
o,o,o,x,x,o,o,o, | |
] | |
down_image = [ | |
o,o,o,x,x,o,o,o, | |
o,o,o,x,x,o,o,o, | |
o,o,o,x,x,o,o,o, | |
o,o,o,x,x,o,o,o, | |
x,o,o,x,x,o,o,x, | |
o,x,o,x,x,o,x,o, | |
o,o,x,x,x,x,o,o, | |
o,o,o,x,x,o,o,o, | |
] | |
same_image = [ | |
o,o,o,o,o,o,o,o, | |
x,x,x,x,x,x,x,x, | |
x,x,x,x,x,x,x,x, | |
o,o,o,o,o,o,o,o, | |
o,o,o,o,o,o,o,o, | |
x,x,x,x,x,x,x,x, | |
x,x,x,x,x,x,x,x, | |
o,o,o,o,o,o,o,o, | |
] | |
if orientation == 'up': px_img = up_image | |
elif orientation == 'down': px_img = down_image | |
elif orientation == 'same': px_img = same_image | |
return px_img | |
def calc_temp(temp): | |
''' | |
Calculates adjusted temperature to account for | |
CPU temperature and measured ambient temperature. | |
Celsius. | |
''' | |
cmd = "cat /sys/class/thermal/thermal_zone0/temp" | |
output = subprocess.check_output(cmd, shell = True) | |
# decoding bytes string: | |
cpu_str = output.decode('utf-8').strip('\n') | |
cpu_float = int(cpu_str)/1000 | |
calc_temp = temp - ((cpu_float - temp) / 2.0) + TEMP_FACTOR | |
return calc_temp | |
def turn_off(): | |
''' | |
Asks if the device should be turned off and turns it off | |
if requested. | |
''' | |
print('Turn off?') | |
sense.show_message('Turn off?', scroll_speed = SCROLL_SPEED) | |
event = sense.stick.wait_for_event(emptybuffer=True) | |
time.sleep(0.3) | |
if (event.direction == STICK_TRIGGER) and (event.action == 'pressed'): | |
sense.show_message('Turning off device. Wait 15 sec. before disconnect...', scroll_speed = SCROLL_SPEED) | |
subprocess.run(['sudo', 'shutdown', 'now']) | |
return | |
def check_events(): | |
''' | |
Function called by the Event Thread. It performs two tasks: | |
a. Checks the distance between an object and the proximity sensor while | |
the other thread is working. If the distance is below a | |
threhold, it raises a flag (in_range) which is seen by the other | |
Thread and interrupts operation in that thread. | |
b. Checks if the SenseHat joystick is moved in the direction defined | |
by STICK_TRIGGER and calls a function. | |
''' | |
while True: | |
time.sleep(0.25) | |
# Read proximity data and joystick input | |
proximity = vcnl.read_proximity() | |
stick = sense.stick.get_events() | |
# Object at about 4.5 cms or closer. | |
if proximity > 2050: | |
print('Object in range') | |
in_range.set() | |
if stick: #i.e. If list is not empty. | |
if stick[0].direction == STICK_TRIGGER: | |
turn_off() | |
return | |
def conditions(): | |
''' | |
Main Thread. It shows an image on the LED matrix representing the change in | |
temperature since the last measurement. | |
''' | |
prev_temp = 65 # Arbitrary initial temperature - Farenheit. | |
while True: | |
if not in_range.is_set(): | |
# Read sensors | |
temp_c = sense.get_temperature() | |
humidity = sense.get_humidity() | |
pressure_mb = sense.get_pressure() | |
#light_lx = vcnl.read_ambient() | |
adjted_temp_c = calc_temp(temp_c) | |
# Format data | |
adjted_temp_f = adjted_temp_c*(9.0/5.0) + 32.0 | |
adjted_temp_f = float("{0:.2f}".format(adjted_temp_f)) | |
humidity = float("{0:.2f}".format(humidity)) | |
pressure_mb = float("{0:.2f}".format(pressure_mb)) | |
#pressure_in = 0.03937008*(pressure_mb) | |
#pressure_in = float("{0:.2f}".format(pressure_in)) | |
print('SenseHat measured temperature: {} C'.format(temp_c)) | |
print('Adjusted temperature: {} C'.format(adjted_temp_c)) | |
print('Adjusted temperature: {} F'.format(adjted_temp_f)) | |
#print(light_lx) | |
# Determine arrow direction | |
if adjted_temp_f < prev_temp: arrow = 'down' | |
elif adjted_temp_f > prev_temp: arrow = 'up' | |
else: arrow = 'same' | |
prev_temp = adjted_temp_f | |
image = pixel_image(arrow, adjted_temp_f) | |
sense.set_pixels(image) | |
# This is used instead of time.sleep() to be able to interrupt | |
# the 'sleep' when the event flag is raised. | |
in_range.wait(READINGS_INTERVAL*60) | |
else: | |
ambient_str = 'Temp: {} F Humidity: {} % Pressure: {} mb'.format(adjted_temp_f, humidity, pressure_mb) | |
sense.show_message(ambient_str, SCROLL_SPEED) | |
in_range.clear() | |
return | |
if __name__ == '__main__': | |
STICK_TRIGGER = 'down' | |
SCROLL_SPEED = 0.015 | |
TEMP_FACTOR = 1.65 #Temp. adjustment factor in Celsius. | |
READINGS_INTERVAL = 2 # in minutes. | |
sense= SenseHat() | |
sense.set_rotation(180) | |
# Threading event to be used as flag to communicate between threads. | |
in_range = threading.Event() | |
event_thread = threading.Thread(name='check events', target=check_events) | |
main_thread = threading.Thread(name='conditions', target=conditions) | |
event_thread.start() | |
main_thread.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment