|
# SPDX-FileCopyrightText: 2024 Paulus @PaulskPt on Github |
|
# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries |
|
# SPDX-FileCopyrightText: 2023 Kattni Rembor for Adafruit Industries |
|
# SPDX-License-Identifier: MIT |
|
""" |
|
This is a Python script to test: |
|
A Raspberry Pi 5B-8GB single board computer with, via I2C, connected: |
|
a) an Adafruit Mini I2C Gamepad with seesaw - STEMMA QT / Qwiic (https://www.adafruit.com/product/5743); |
|
b) an Adafruit AHT20 Temperature & Humidity Sensor Breakout Board - STEMMA QT / Qwiic (https://www.adafruit.com/product/4566). |
|
Added functionality to save data to two separate log files: |
|
1) gamepadqt.log for some system info and data from the Gamepad QT; |
|
2) aht20.log for some system info and data from the AHT20 sensor. |
|
At intervals the sensor data will be sent to a Google Apps Scrips script, which in turn will feed the received data |
|
to a Google Sheets spreadsheet. Times used are in UTC. |
|
This script has been successfully tested on a Raspberry Pi 5B-8GB |
|
within a virtual enviroment (env) from within an own project directory (/home/<user>/projects/rpi5_tmp_hum/). |
|
This script needs the Adafruit-Blinka repo to be installed on to Raspberry Pi 5B-8GB. |
|
It also uses the Adafruit-Python-extended-Bus repo to make I2C communication possible. |
|
The following lines need to be present in the file: /boot/firmware/config.txt: |
|
dtparam=i2c_arm=on |
|
dtoverlay=i2c-gpio,bus=3,i2c_gpio_delay_us=1,i2c_gpio_sda=17,i2c_gpio_scl=27 |
|
Note: the spreadsheet I have in my Google Drive, folder <G_drive>/data_fm_Python/DataToGoogle_RASPI5. |
|
Want more? See my repos on Github.com/PaulskPt and my gists on Github.com/Gist/PaulskPt. |
|
""" |
|
import time as _time |
|
from datetime import datetime, timedelta, timezone |
|
import os |
|
import sys |
|
import gc |
|
import board |
|
from micropython import const |
|
from lib.adafruit_ahtx0 import AHTx0 |
|
from adafruit_seesaw.seesaw import Seesaw |
|
from adafruit_extended_bus import ExtendedI2C as I2C |
|
import microcontroller |
|
import logging as logging # see: https://docs.python.org/3/howto/logging-cookbook.html#logging-cookbook |
|
import tomllib # see: https://docs.python.org/3/library/tomllib.html |
|
from urllib.request import urlopen |
|
import urllib.error |
|
from google_auth_oauthlib.flow import InstalledAppFlow |
|
|
|
with open("settings.toml", "rb") as f: |
|
data = tomllib.load(f) |
|
if f: |
|
f.close() |
|
del f |
|
|
|
class State: |
|
def __init__(self, saved_state_json=None): |
|
self.board_id = None |
|
self.USE_TAG = None |
|
self.my_debug = False |
|
self.auth_flow_set = False |
|
self.GOOGLE_APPS_SCRIPTS_REDIRECT_URI = None |
|
self.flow = None |
|
self.seesaw = None |
|
self.tempSensor = None |
|
self.tmp_cl = None |
|
self.hum_cl = None |
|
self.msg1_shown = False |
|
self.last_x = 0 |
|
self.last_y = 0 |
|
self.qt_btns_present = False |
|
self.aht20_present = False |
|
self.AHT20_LOGFILE = "" |
|
self.GAMEPAD_LOGFILE = "" |
|
self.AHT20_LOGGER_NAME = "" |
|
self.GAMEPAD_LOGGER_NAME = "" |
|
self.LOG_QT = None |
|
self.LOG_AHT = None |
|
self.PUSHING_BOX_DEVID = "" |
|
self.stat_result = "" |
|
self.msg_sent = False |
|
self.msg_nr_sent = 0 |
|
self.get_INT_RTC = True |
|
self.SYS_RTC_is_set = False |
|
self.curr_timestamp = None |
|
self.curr_tm = None # struct_time |
|
self.dst_offset = 0 # PT wintertime |
|
self.INTERVAL_SECONDS = 3600 # 10 Minutes |
|
self.RTCtpl = None |
|
self.RTCtpl_DOW = DOW = \ |
|
{ |
|
0: "Monday", |
|
1: "Tuesday", |
|
2: "Wednesday", |
|
3: "Thursday", |
|
4: "Friday", |
|
5: "Saturday", |
|
6: "Sunday" |
|
} |
|
self.RTCtpl_MONTH = \ |
|
{ |
|
0: "Dummy", |
|
1: "January", |
|
2: "February", |
|
3: "March", |
|
4: "April", |
|
5: "May", |
|
6: "June", |
|
7: "July", |
|
8: "August", |
|
9: "September", |
|
10: "October", |
|
11: "November", |
|
12: "December" |
|
} |
|
|
|
self.BUTTON_X = const(6) |
|
self.BUTTON_Y = const(2) |
|
self.BUTTON_A = const(5) |
|
self.BUTTON_B = const(1) |
|
self.BUTTON_SELECT = const(0) |
|
self.BUTTON_START = const(16) |
|
self.button_mask = const( |
|
(1 << self.BUTTON_X) |
|
| (1 << self.BUTTON_Y) |
|
| (1 << self.BUTTON_A) |
|
| (1 << self.BUTTON_B) |
|
| (1 << self.BUTTON_SELECT) |
|
| (1 << self.BUTTON_START) |
|
) |
|
|
|
state = State() |
|
|
|
state.my_debug = True if int(data["MY_DEBUG"]) else False |
|
state.USE_TAG = True if int(data["USE_TAG"]) else False |
|
state.LOG_QT = True if int(data["LOG_QT"]) else False |
|
state.LOG_AHT = True if int(data["LOG_AHT"]) else False |
|
state.INTERVAL_SECONDS = int(data["INTERVAL_SECONDS"]) |
|
state.PUSHING_BOX_DEVID = data["PUSHING_BOX_DEVID"] |
|
state.AHT20_LOGFILE = data["AHT20_LOGFILE"] |
|
state.GAMEPAD_LOGFILE = data["GAMEPAD_LOGFILE"] |
|
state.AHT20_LOGGER_NAME = data["AHT20_LOGGER_NAME"] |
|
state.GAMEPAD_LOGGER_NAME = data["GAMEPAD_LOGGER_NAME"] |
|
state.GOOGLE_APPS_SCRIPTS_REDIRECT_URI = data['GOOGLE_APPS_SCRIPTS_REDIRECT_URI'] |
|
|
|
del data |
|
|
|
logfile_aht = state.AHT20_LOGFILE |
|
logfile_qt = state.GAMEPAD_LOGFILE |
|
logging_level = logging.DEBUG |
|
|
|
logger_aht = logging.getLogger(state.AHT20_LOGGER_NAME) # .addHandler(logging.NullHandler()) |
|
logger_qt = logging.getLogger(state.GAMEPAD_LOGGER_NAME) # .addHandler(logging.NullHandler()) |
|
logger_aht.setLevel(logging_level) |
|
logger_qt.setLevel(logging_level) |
|
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
|
formatter = logging.Formatter(format, datefmt="%Y-%m-%d %H:%M:%S") |
|
|
|
file_handler_aht = logging.FileHandler(logfile_aht) |
|
file_handler_aht.setFormatter(formatter) |
|
file_handler_qt = logging.FileHandler(logfile_qt) |
|
file_handler_qt.setFormatter(formatter) |
|
|
|
#console = logging.StreamHandler() |
|
#console.setFormatter(formatter) |
|
|
|
if state.LOG_QT: |
|
logger_qt.addHandler(file_handler_qt) |
|
else: |
|
logger_qt.addHandler(logging.NullHandler()) # Don't log |
|
|
|
#logger_qt.addHandler(console) |
|
if state.LOG_AHT: |
|
logger_aht.addHandler(file_handler_aht) |
|
else: |
|
logger_aht.addHandler(logging.NullHandler()) # Don't log |
|
#logger_aht.addHandler(console) |
|
|
|
del logfile_qt |
|
del logfile_aht |
|
del logging_level |
|
del format |
|
del formatter |
|
del file_handler_qt |
|
del file_handler_aht |
|
|
|
|
|
class Date: |
|
def __init__(self): |
|
self.d1 = 0 # d |
|
self.m1 = 0 # m |
|
self.y1 = 0 # y |
|
self.d2 = 0 # d |
|
self.m2 = 0 # m |
|
self.y2 = 0 # y |
|
self.monthDays = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] |
|
# To store number of days in all months from |
|
# January to Dec. |
|
|
|
# This function counts number of leap years |
|
# before the given date |
|
# d = (dd, mo, yy) |
|
def countLeapYears(self, d): |
|
|
|
years = d[2] |
|
|
|
# Check if the current year needs to be considered |
|
# for the count of leap years or not |
|
if (d[1] <= 2): |
|
years -= 1 |
|
|
|
# An year is a leap year if it is a multiple of 4, |
|
# multiple of 400 and not a multiple of 100. |
|
return int(years / 4) - int(years / 100) + int(years / 400) |
|
|
|
|
|
# This function returns number of days between two |
|
# given dates |
|
def getDifference(self, dt1, dt2): |
|
|
|
# COUNT TOTAL NUMBER OF DAYS BEFORE FIRST DATE 'dt1' |
|
|
|
self.d1 = dt1[0] |
|
self.m1 = dt1[1] |
|
self.y1 = dt1[2] |
|
|
|
self.d2 = dt2[0] |
|
self.m2 = dt2[1] |
|
self.y2 = dt2[2] |
|
|
|
# initialize count using years and day |
|
n1 = self.y1 * 365 + self.d1 |
|
|
|
# Add days for months in given date |
|
for i in range(0, self.m1 - 1): |
|
n1 += self.monthDays[i] |
|
|
|
# Since every leap year is of 366 days, |
|
# Add a day for every leap year |
|
n1 += self.countLeapYears(dt1) |
|
|
|
# SIMILARLY, COUNT TOTAL NUMBER OF DAYS BEFORE 'dt2' |
|
|
|
n2 = self.y2 * 365 + self.d2 |
|
for i in range(0, self.m2 - 1): |
|
n2 += self.monthDays[i] |
|
n2 += self.countLeapYears(dt2) |
|
|
|
# return difference between two counts |
|
return (n2 - n1) |
|
|
|
|
|
class sensor_tmp: |
|
def __init__(self, tmp, dt): |
|
self._tmp = tmp |
|
self._tmp_old = 0.00 |
|
self._dt = dt |
|
self._dt_old = None |
|
|
|
@property |
|
def tmp(self): |
|
return self._tmp |
|
|
|
@property |
|
def tmp_old(self): |
|
return self._tmp_old |
|
|
|
@tmp.setter |
|
def tmp(self, tmp): |
|
if isinstance(tmp, float): |
|
self._tmp = tmp |
|
elif isinstance(tmp, int): |
|
self._tmp = float(tmp) |
|
|
|
@tmp_old.setter |
|
def tmp_old(self, tmp): |
|
if isinstance(tmp, float): |
|
self._tmp_old = tmp |
|
elif isinstance(tmp, int): |
|
self._tmp_old = float(tmp) |
|
|
|
@property |
|
def last_upd(self): |
|
return self._dt |
|
|
|
@last_upd.setter |
|
def last_upd(self, dt): |
|
if isinstance(dt, str): |
|
self._dt = dt |
|
|
|
@property |
|
def last_upd_old(self): |
|
return self._dt_old |
|
|
|
@last_upd_old.setter |
|
def last_upd_old(self, dt): |
|
if isinstance(dt, str): |
|
self._dt_old = dt |
|
|
|
class sensor_hum: |
|
def __init__(self, hum, dt): |
|
self._hum = hum |
|
self._hum_old = 0.00 |
|
self._dt = dt |
|
|
|
@property |
|
def hum(self): |
|
return self._hum |
|
|
|
@property |
|
def hum_old(self): |
|
return self._hum_old |
|
|
|
@hum.setter |
|
def hum(self, hum): |
|
if isinstance(hum, float): |
|
self._hum = hum |
|
elif isinstance(hum, int): |
|
self._hum = float(hum) |
|
|
|
@hum_old.setter |
|
def hum_old(self, hum): |
|
if isinstance(hum, float): |
|
self._hum_old = hum |
|
elif isinstance(hum, int): |
|
self._hum_old = float(hum) |
|
|
|
@property |
|
def last_upd(self): |
|
return self._dt |
|
|
|
@last_upd.setter |
|
def last_upd(self, dt): |
|
if isinstance(dt, str): |
|
self._dt = dt |
|
|
|
|
|
def get_dt() -> str: |
|
now = datetime.now(timezone.utc) |
|
dt = now.timetuple() |
|
dt1 = "{:s}, {:s} {:02d}, {:02d},".format(state.RTCtpl_DOW[dt.tm_wday], state.RTCtpl_MONTH[dt.tm_mon], dt.tm_mday, dt.tm_year) |
|
if dt.tm_hour < 12: |
|
hh = dt.tm_hour |
|
ampm = "am" |
|
elif dt.tm_hour == 12: |
|
hh = dt.tm_hour |
|
ampm = "pm" |
|
elif dt.tm_hour >= 13: |
|
hh = dt.tm_hour - 12 |
|
ampm = "pm" |
|
|
|
dt2 = "at: {:02d}:{:02d}:{:02d} {:s} UTC".format(hh, dt.tm_min, dt.tm_sec, ampm ) |
|
return dt1+" "+dt2 |
|
|
|
line = "-" * 55 |
|
print(line) |
|
logger_qt.info(line) |
|
logger_aht.info(line) |
|
s = "New run:" |
|
print(s) |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
s = f"from python script: \"{sys.argv[0]}\"" # __file__ includes the full path to the file |
|
print(s) |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
s = "Runtime: "+get_dt() |
|
print(s) |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
print(line) |
|
logger_qt.info(line) |
|
logger_aht.info(line) |
|
del line |
|
|
|
|
|
state.board_id = board.board_id |
|
logger_qt.info(f"board id: {state.board_id}") |
|
logger_aht.info(f"board id: {state.board_id}") |
|
|
|
s= "using Adafruit_Python_extended_Bus. Using I2C bus #3" # github.com/adafruit/Adafruit_Python_extended_Bus |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
del s |
|
|
|
i2c_bus = I2C(3) # I2C bus to be used: /dev/i2c-3 |
|
|
|
if i2c_bus is None: |
|
s = f"Error: type(i2c_bus)= {type(i2c_bus)}. Exiting..." |
|
logger_qt.error(s) |
|
logger_aht.error(s) |
|
sys.exit() |
|
|
|
e = None |
|
|
|
try: |
|
seesaw = Seesaw(i2c_bus, addr=0x50) |
|
seesaw.pin_mode_bulk(state.button_mask, seesaw.INPUT_PULLUP) |
|
if state.my_debug: |
|
print(f"global(): type(seesaw) = {type(seesaw)}") |
|
if seesaw: |
|
state.seesaw = seesaw |
|
state.qt_btns_present = True |
|
del seesaw |
|
except Exception as e: |
|
logger_qt.error(f"global(): Error while creating an instance seesaw class: {e}") |
|
state.qt_btns_present = False |
|
pass |
|
|
|
try: |
|
# tempSensor = PiicoDev_TMP117() # initialise the sensor |
|
tempSensor = AHTx0(i2c_bus) # initialise the sensor |
|
#logger_aht.info(f"type(tempSensor)= {type(tempSensor)}") |
|
if tempSensor: |
|
state.tempSensor = tempSensor |
|
state.aht20_present = True |
|
del tempSensor |
|
except Exception as e: |
|
logger_aht.error(f"global(): Error while creating an instance ath20 sensor class: {e}") |
|
state.aht20_present = False |
|
pass |
|
|
|
del e |
|
del i2c_bus |
|
|
|
s = "Gamepad QT is {} present".format("" if state.qt_btns_present else " not") |
|
logger_qt.info(s) |
|
s = "AHT20 sensor is {} present".format("" if state.aht20_present else " not") |
|
logger_aht.info(s) |
|
del s |
|
|
|
gc.collect() |
|
|
|
def test_msg(state: object): |
|
TAG = "gamepad_test(): " |
|
s1 = s2 = "" |
|
if state.qt_btns_present: |
|
s1 = "the Gamepad QT" |
|
if state.aht20_present: |
|
s2 = "and the AHT20 sensor" |
|
|
|
if not state.msg1_shown: |
|
if not state.qt_btns_present and not state.aht20_present: |
|
s = TAG+f"neither of the Gamepad QT or the AHT20 sensor is present. Check wiring. Exiting..." |
|
print(s) |
|
logger_qt.error(s) |
|
logger_aht.error(s) |
|
# _time.sleep(3) |
|
sys.exit() |
|
elif state.qt_btns_present or state.aht20_present: |
|
state.msg1_shown = True |
|
print(TAG+f"We\'re going to test {s1} {s2} with this {state.board_id}.") |
|
if state.qt_btns_present: |
|
print("\t\tPress any of the buttons (X, Y, A, B, Select or Start) on the Gamepad QT.\n\t\t" + \ |
|
f"To reboot {state.board_id} press Gamepad QT button Start.\n") |
|
|
|
|
|
def ck_usr_answr() -> bool: |
|
ret = False |
|
ays = "Are you sure? (Y/n)+<Enter>: " |
|
answer = "" |
|
while True: |
|
logger_qt.warning(ays) |
|
logger_aht.warning(ays) |
|
answer = input(ays) |
|
s = f"You answered: \'{answer}\'" |
|
print(s) |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
if answer.upper() == "Y": |
|
ret = True |
|
break |
|
elif answer.upper() == "N": |
|
s = "not rebooting" |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
break |
|
return ret |
|
|
|
|
|
def reboot(): |
|
s = "\nRebooting..." |
|
print(s) |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
_time.sleep(3) |
|
os.system("sudo reboot") # for Raspberry Pi boards |
|
#microcontroller.reset() # for CircuitPython boards |
|
|
|
|
|
def pr_btn_name(res: int): |
|
btns = ["X", "Y", "A", "B", "Select", "Start"] |
|
if res >= 0 and res < len(btns): |
|
#blink_led(state) |
|
s = "Button "+btns[res]+" pressed" |
|
print(s) |
|
logger_qt.info(s) |
|
|
|
|
|
# Check for button presses on the Gamepad QT |
|
def ck_qt_btns(state: object): |
|
TAG = "ck_qt_btns(): " |
|
|
|
if not state.qt_btns_present: |
|
print(TAG+f"state.qt_btns_present= {state.qt_btns_present}") |
|
return |
|
|
|
nr_btns = 6 |
|
res_x = res_y = res_a = res_b = res_sel = res_sta = -1 |
|
elapsed_t = None |
|
interval_t = 36 |
|
interval_cnt = 0 |
|
gc.collect() |
|
_time.sleep(0.2) |
|
x = 0 |
|
y = 0 |
|
|
|
try: |
|
# get the joystick x and y axis value |
|
x = 1023 - state.seesaw.analog_read(14) |
|
y = 1023 - state.seesaw.analog_read(15) |
|
except Exception as e: |
|
if e.errno == 121: # Remote I/O Error |
|
logger_qt.error(f"Error: {e}") |
|
pass |
|
if x >= state.last_x: |
|
diff_x = abs(x - state.last_x) |
|
else: |
|
diff_x = abs(state.last_x - x) |
|
|
|
if y >= state.last_y: |
|
diff_y = abs(y - state.last_y) |
|
else: |
|
diff_y = abs(state.last_y - y) |
|
|
|
if (diff_x > 3) or (diff_y > 3): |
|
s = f"joystick: (x, y)= ({x}, {y})" |
|
print(s) |
|
logger_qt.info(s) |
|
# print(TAG+f"diff_x= {diff_x}, diff_y= {diff_y}") |
|
state.last_x = x |
|
state.last_y = y |
|
|
|
# Get the button presses, if any... |
|
buttons = state.seesaw.digital_read_bulk(state.button_mask) |
|
if state.my_debug: |
|
s = "\n"+TAG+f"buttons = {buttons}" |
|
print(s) |
|
logger_qt.info(s) |
|
if buttons == 65639: |
|
if state.my_debug: |
|
logger_qt.info(TAG+f"Gamepad QT: no button pressed") |
|
return |
|
# _time.sleep(0.5) |
|
start_t = _time.monotonic() |
|
|
|
if buttons: |
|
res = -1 |
|
for _ in range(nr_btns): |
|
if _ == 0: |
|
bz = 1 << state.BUTTON_X |
|
if not buttons & (bz): |
|
res = _ |
|
if res_x != res: |
|
pr_btn_name(res) |
|
res_x = res |
|
break |
|
if _ == 1: |
|
bz = 1 << state.BUTTON_Y |
|
if not buttons & (bz): |
|
res = _ |
|
if res_y != res: |
|
pr_btn_name(res) |
|
res_y = res |
|
break |
|
if _ == 2: |
|
bz = 1 << state.BUTTON_A |
|
if not buttons & (bz): |
|
res = _ |
|
if res_a != res: |
|
pr_btn_name(res) |
|
res_a = res |
|
break |
|
if _ == 3: |
|
bz = 1 << state.BUTTON_B |
|
if not buttons & (bz): |
|
res = _ |
|
if res_b != res: |
|
pr_btn_name(res) |
|
res_b = res |
|
break |
|
if _ == 4: |
|
bz = 1 << state.BUTTON_SELECT |
|
if not buttons & (bz): |
|
res = _ |
|
if res_sel != res: |
|
pr_btn_name(res) |
|
res_sel = res |
|
break |
|
if _ == 5: |
|
bz = 1 << state.BUTTON_START |
|
if not buttons & (bz): |
|
res = _ |
|
if res_sta != res: |
|
pr_btn_name(res) |
|
res_sta = res |
|
s = f"About to reboot the {state.board_id}" |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
if ck_usr_answr(): |
|
reboot() # Reboot the board |
|
else: |
|
state.msg1_shown = False |
|
res_sta = -2 |
|
test_msg(state) |
|
break |
|
|
|
curr_t = _time.monotonic() |
|
elapsed_t = (curr_t - start_t) * 1000 |
|
if elapsed_t >= interval_t: |
|
interval_cnt += 1 |
|
if interval_cnt >= 100: |
|
interval_cnt = 0 |
|
res_x = res_y = res_a = res_b = res_sel = res_sta = -2 |
|
start_t = curr_t |
|
|
|
_time.sleep(0.01) |
|
|
|
|
|
def ck_sensor(state: object) -> tuple: |
|
TAG = "rd_sensor(): " |
|
|
|
tpl = ("", 0.00, 0.00) # default |
|
|
|
if not state.tempSensor: |
|
try: |
|
tempSensor = AHTx0(i2c_bus) # initialise the sensor |
|
if tempSensor: |
|
state.tempSensor = tempSensor |
|
state.aht20_present = True |
|
del tempSensor |
|
except ValueError as e: |
|
state.aht20_present = False |
|
s = TAG+f"Error: {e}. Check wiring!" |
|
print(s) |
|
logger_aht.error(s) |
|
return tpl |
|
|
|
try: |
|
tmp = state.tempSensor.temperature # Celsius |
|
hum = state.tempSensor.relative_humidity # |
|
dts = get_dt_for_google(state) # get datetime from builtin rtc |
|
|
|
state.tmp_cl.tmp = tmp |
|
state.tmp_cl.last_upd = dts |
|
|
|
if tmp != state.tmp_cl.tmp_old: |
|
state.tmp_cl.tmp_old = tmp |
|
|
|
state.hum_cl._hum = hum |
|
state.hum_cl.last_upd = dts |
|
|
|
if hum != state.hum_cl.hum_old: |
|
state.hum_cl.hum_old = hum |
|
|
|
tpl = (dts, tmp, hum) |
|
except ValueError as e: |
|
s = TAG+f"Error: {e}. Check wiring!" |
|
print(s) |
|
logger_aht.error(s) |
|
pass |
|
return tpl |
|
|
|
def get_temphum(state: object): |
|
TAG = "get_temphum(): " |
|
|
|
if not state.aht20_present: |
|
return |
|
|
|
tpl = ck_sensor(state) |
|
|
|
if isinstance(tpl, tuple): |
|
# Convert temperature and humidity values into a string and print the data |
|
lst = [ |
|
"last update: {:s}".format(tpl[0]), |
|
"temperature: {:5.2f}°C".format(tpl[1]), |
|
"humidity: {:5.2f} %".format(tpl[2]) |
|
] |
|
print() |
|
for _ in range(len(tpl)): |
|
s = TAG+f"{lst[_]}" # was: state.tmp_cl.last_upd |
|
print(s) |
|
logger_aht.info(s) |
|
|
|
|
|
def get_rnd_timestamp() -> int: |
|
return round(datetime.timestamp(datetime.now(timezone.utc))) |
|
|
|
|
|
# return a rounded timestamp in the future |
|
def calc_future_timestamp(n: datetime, nr_secs: int) -> int: |
|
n2 = n + timedelta(seconds=nr_secs) |
|
return round(n2.timestamp()) |
|
|
|
|
|
def get_dt() -> str: |
|
now_utc = datetime.now(timezone.utc) # get UTC time |
|
dt = now_utc.timetuple() |
|
dt1 = "{:d}/{:02d}/{:02d}".format(dt.tm_mon, dt.tm_mday, dt.tm_year) |
|
dt2 = "{:02d}:{:02d}:{:02d} weekday: {:s}".format(dt.tm_hour, dt.tm_min, dt.tm_sec, state.RTCtpl_DOW[dt.tm_wday]) |
|
return dt1+" "+dt2 |
|
|
|
|
|
def get_dt_for_google(state: object) -> str: |
|
TAG = "get_dt_for_google(): " |
|
now_utc = datetime.now(timezone.utc) # get UTC time |
|
dt = now_utc.timetuple() |
|
dts = "{:d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}Z".format(dt.tm_year, dt.tm_mon, dt.tm_mday, dt.tm_hour, dt.tm_min, dt.tm_sec) |
|
if state.my_debug: |
|
print(TAG+f"dts= {dts}") #, dt.tm_isdst= {dt.tm_isdst}") |
|
return dts |
|
|
|
|
|
def get_INT_RTC(state: object): |
|
if not state.get_INT_RTC: |
|
return |
|
|
|
TAG = "get_INT_RTC(): " |
|
dt = None |
|
now = None |
|
|
|
try: |
|
now_utc = datetime.now(timezone.utc) # get UTC time |
|
now_tpl = now_utc.timetuple() |
|
state.RTCtpl = now_tpl |
|
|
|
except OSError as e: |
|
logger_qt.error(TAG+f"Error: {e}") |
|
raise |
|
except Exception as e: |
|
raise |
|
|
|
if state.RTCtpl is not None: |
|
if state.my_debug: |
|
logger_aht.info(f"type(state.RTCtpl) = {type(state.RTCtpl)}") |
|
|
|
if state.my_debug: |
|
logger_qt.info(TAG+f"state.RTCtpl: {state.RTCtpl}") |
|
logger_aht.info(TAG+f"state.RTCtpl: {state.RTCtpl}") |
|
state.SYS_RTC_is_set = True |
|
|
|
dt = state.RTCtpl |
|
# print(TAG+f"dt= {dt}") |
|
|
|
dt1 =TAG+"{:d}/{:02d}/{:02d}".format(dt.tm_mon, dt.tm_mday, dt.tm_year) |
|
dt2 = TAG+"{:02d}:{:02d}:{:02d} UTC weekday: {:s}".format(dt.tm_hour, dt.tm_min, dt.tm_sec, state.RTCtpl_DOW[dt.tm_wday]) |
|
logger_qt.info(dt1) |
|
logger_aht.info(dt1) |
|
logger_qt.info(dt2) |
|
logger_aht.info(dt2) |
|
|
|
|
|
def get_th_direct(state: object): |
|
# datetime e.g.: 2022-08-10T16:40:16Z |
|
TAG = "get_th_direct(): " |
|
|
|
if not state.aht20_present: |
|
return |
|
|
|
if state.my_debug: |
|
tpl = ck_sensor(state) |
|
if isinstance(tpl, tuple): |
|
lst = ["updated at", "Temperature:", "Humidity"] |
|
for _ in range(len(tpl)): |
|
s = TAG+f"{lst[_]} {tpl[_]}" # was: state.tmp_cl.last_upd |
|
print(s) |
|
logger_aht.info(s) |
|
|
|
def pr_th_msg(state: object) -> bool: |
|
TAG = "pr_th_msg(): " |
|
if state.stat_result is None: |
|
state.stat_result = "" |
|
|
|
ret = True |
|
upd_dt = state.tmp_cl.last_upd # [:10] # e.g.: 2022-08-10T16:40:16Z |
|
tmp_s = str(round(state.tmp_cl.tmp, 4)) |
|
hum_s = str(round(state.hum_cl.hum, 4)) |
|
text_items = ["response to send: "+state.stat_result, "-" * 23, upd_dt, "Temperature: "+tmp_s+" C", "Humidity: "+hum_s+" %", "-" * 23] |
|
|
|
for _ in range(len(text_items)): |
|
s = text_items[_] |
|
print(s) |
|
logger_aht.info(s) |
|
print() |
|
#logger_aht.info("-" * 23) |
|
|
|
return ret |
|
|
|
def flow_create(stat) -> bool: |
|
TAG = "flow_create(): " |
|
ret = False |
|
creds_file = 'client_secrets.json' |
|
if not stat.auth_flow_set: |
|
# Create and update Google Apps Script projects |
|
# See, edit, create, and delete all your Google Sheets spreadsheets |
|
# https://developers.google.com/identity/protocols/oauth2/scopes |
|
try: |
|
s = TAG+f"using credential file: {creds_file}" |
|
logger_aht.info(s) |
|
if not state.my_debug: |
|
print(s) |
|
flow = InstalledAppFlow.from_client_secrets_file( |
|
creds_file, |
|
scopes=['scripts', 'spreadsheets'] # was: ['https://www.googleapis.com/auth/scripts.scripts','https://www.googleapis.com/auth/spreadsheets']) |
|
) |
|
if flow: |
|
# Next line in example was:'https://www.example.com/oauth2callback' |
|
flow.redirect_uri = state.GOOGLE_APPS_SCRIPTS_REDIRECT_URI |
|
state.flow = flow |
|
s0 = "google flow object created" |
|
if state.my_debug: |
|
s = TAG+s0+f". Flow = {flow}. type(flow) = {type(flow)}" |
|
else: |
|
s = TAG+s0 |
|
print(s) |
|
logger_aht.info(s) |
|
state.auth_flow_set = True |
|
ret = True |
|
else: |
|
ret = False |
|
except Exception as e: |
|
s = f"start_flow(): {e}" |
|
print(s) |
|
logger_aht.error(s) |
|
raise |
|
else: |
|
ret = True |
|
|
|
return ret |
|
|
|
def send_to_google_apps_scripts(state: object) -> bool: |
|
|
|
TAG = "send_to_google_apps_scripts(): " |
|
|
|
if not flow_create(state): |
|
s = TAG+" unable to create a google flow object. Exiting function." |
|
print(s) |
|
logger_aht.error(s) |
|
return False |
|
|
|
ret = True |
|
tmp_s = None |
|
hum_s = None |
|
response = None |
|
state.stat_result = "" |
|
|
|
upd_dt = state.tmp_cl.last_upd # [:10] # e.g.: 2022-08-10T16:40:16Z |
|
upd_dt_old = state.tmp_cl.last_upd_old |
|
|
|
# -------------------------------------------------------------- |
|
tmp = round(state.tmp_cl.tmp, 4) |
|
tmp_s = str(tmp) |
|
tmp_old = state.tmp_cl.tmp_old |
|
|
|
if state.my_debug: |
|
print(TAG+f"tmp_s= \'{tmp_s}\', tmp_old= {tmp_old}") |
|
# -------------------------------------------------------------- |
|
hum = round(state.hum_cl.hum, 4) |
|
hum_s = str(hum) |
|
hum_old = state.hum_cl.hum_old |
|
|
|
if state.my_debug: |
|
print(TAG+f"hum_s= \'{hum_s}\', hum_old= {hum_old}") |
|
# -------------------------------------------------------------- |
|
|
|
# le_old = len(upd_dt_old) |
|
# if (le_old > 0) and (tmp == tmp_old) and (hum == hum_old): # and upd_dt == upd_dt_old |
|
if (upd_dt_old == upd_dt) and (tmp == tmp_old) and (hum == hum_old): # and upd_dt == upd_dt_old |
|
print(TAG+"datetime stamp unchanged. Waiting for new sensor data...") |
|
return ret # We don't want to send duplicates |
|
else: |
|
upd_dt_old = upd_dt # |
|
state.tmp_cl.last_upd_old = upd_dt |
|
#upd_tm = t_dict["updated_at"][11:19] |
|
|
|
state.msg_nr_sent += 1 # increase the messages sent count |
|
# dteData = upd_dt # Added 2024-03-22 because this variable was missing. |
|
# ?date=$date$&time=$time$&temp=$temp$&hum=$hum$ |
|
# 2024-04-15 added board_id: &id=$id$ |
|
# ?date=$date$&time=$time$&temp=$temp$&hum=$hum$&id=$id$ |
|
|
|
s = state.GOOGLE_APPS_SCRIPTS_REDIRECT_URI+"?GET=" # New scipt DataToGoogle_RASPI5 deployment nr 1 |
|
s += "&date=\"" + upd_dt + "\"" |
|
s += "&temp=" + tmp_s |
|
s += "&hum=" + hum_s |
|
s += "&id=\"" + state.board_id + "\"" |
|
|
|
|
|
if state.my_debug: |
|
print(TAG+upd_dt, end='') |
|
print(". Sending Data message nr: ", end='') |
|
print(state.msg_nr_sent, end='') |
|
print(" to middle-man server...", end='\n') |
|
print(TAG+"Going to send:", end='') |
|
print("\n\""+s, end='') |
|
print("\"", end='\n') # To complete the message to the Serial (REPL) window |
|
|
|
try: |
|
webUrl=urllib.request.urlopen(s) |
|
response = str(webUrl.getcode()) |
|
except OSError as e: |
|
s = TAG+f"Error: {e}" |
|
print(s) |
|
logger_aht.error(s) |
|
if e.args[0] == -2: # gaierror |
|
# See: https://docs.circuitpython.org/_/downloads/en/6.3.x/pdf/, page 216 |
|
return False # do nothing |
|
except RuntimeError as e: # can be activated by adafruit_requests, OutOfRetries error |
|
# or by get_socket. RuntimeError, EHOSTUNREACH errno 118 |
|
s = TAG+f"Error: {e}" |
|
print(s) |
|
logger_aht.error(s) |
|
return False # do nothing |
|
|
|
if state.my_debug: |
|
s = TAG+"Data Sent" |
|
logger_aht.info(s) |
|
print(s) |
|
print() |
|
s = TAG+f"response= {response}, len(response)= {len(response)}, type(response)= {type(response)}" |
|
print(s) |
|
logger_aht.info(s) |
|
if response: |
|
le = len(response) |
|
if le == 3: |
|
status = int(response) |
|
print() |
|
print(TAG,end='') |
|
if status == 404: |
|
s = "error 404: file not found on this server." |
|
print(s) |
|
logger_aht.warning(s) |
|
elif status != 200 and status != 404: |
|
s = f"response= {response}" |
|
print(s) |
|
logger_aht.info(s) |
|
else: |
|
status = -1 |
|
|
|
state.stat_result = "OK" if status == 200 else "NO" |
|
s1 = "response=" |
|
s2 = "{} (= {})".format(status, state.stat_result) |
|
if state.my_debug: |
|
print() |
|
s = TAG+f"{s1} {s2}" |
|
print(s) |
|
logger_aht.info(s) |
|
#blink_leds(RED) |
|
|
|
s =TAG+f"send result: {status}" |
|
if status == 200: |
|
logger_aht.info(s) |
|
#blink_leds(GRN) |
|
else: |
|
logger_aht.warning(s) |
|
#pr_msg(["send result = "+s2]) |
|
response = None |
|
if state.my_debug: |
|
print(TAG+f"upd_dt= {upd_dt}, tmp_s= {tmp_s}, hum_s= {hum_s}") |
|
pr_th_msg(state) |
|
return ret |
|
|
|
def pr_msg(msg): |
|
text_items = [] |
|
le_max = 5 |
|
if isinstance(msg, list): |
|
le_msg = len(msg) |
|
if le_msg > 0 and le_msg <= le_max: |
|
for _ in range(len(msg)): |
|
text_items.append(msg[_]) |
|
le = len(text_items) |
|
if le_max - le > 0: # fill up with blank lines |
|
for _ in range(le_max - le): |
|
text_items.append(" ") |
|
for i in range(len(text_items)): |
|
s = text_items[i] # Display the dt, tm and tz |
|
print(s) |
|
logger_aht.info(s) |
|
|
|
def setup(state: object): |
|
TAG = "setup(): " |
|
|
|
if state.get_INT_RTC : |
|
if state.my_debug: |
|
logger_aht.info(TAG+"Going to get datetime from internal (SYS) RTC") |
|
get_INT_RTC(state) |
|
|
|
# Create instances of the sensor_tmp and sensor_hum classes |
|
state.tmp_cl = sensor_tmp(0.00, "") # create class instance with default values |
|
state.hum_cl = sensor_hum(0.00, "") # same |
|
|
|
gc.collect() |
|
|
|
|
|
def main(): |
|
TAG= "main(): " |
|
loopnr = 0 |
|
setup(state) |
|
# state.curr_tm = _time.time() # set curr_tm for the first time |
|
state.curr_timestamp = get_rnd_timestamp() |
|
start = True |
|
test_msg(state) |
|
|
|
s = "\n\nAdafruit Gamepad QT test:\n" |
|
logger_qt.info(s) |
|
s = "\n\nAdafruit AHT20 test:\n" |
|
logger_aht.info(s) |
|
s = f"DataToGoogle interval: {state.INTERVAL_SECONDS} seconds" |
|
print(s) |
|
logger_aht.info(s) |
|
|
|
if state.aht20_present: |
|
logger_aht.info('logged values of temperature and humidity from Adafruit AHT20 sensor:') |
|
|
|
if state.INTERVAL_SECONDS is None: |
|
state.INTERVAL_SECONDS = 3600 # Set for 1 hour |
|
elif state.INTERVAL_SECONDS == 0: |
|
state.INTERVAL_SECONDS = 3600 # Same |
|
|
|
interval = state.INTERVAL_SECONDS |
|
|
|
togo_send_secs_old = 0.00 |
|
togo_temphum_secs_old = 0.00 |
|
do_temphum = False |
|
do_send = False |
|
interval_short = 120 # default: 3600 // 6 = 600 seconds = 10 minutes. See setup(). |
|
interval_long = interval # interval # default: 3600 seconds = 1 hour |
|
future_send_timestamp = calc_future_timestamp(datetime.now(timezone.utc), interval_long) |
|
future_temphum_timestamp = calc_future_timestamp(datetime.now(timezone.utc), interval_short) |
|
while True: |
|
try: |
|
loopnr += 1 |
|
# print(f"\nLoopnr: {loopnr}") |
|
ck_qt_btns(state) |
|
# _time.sleep(0.5) |
|
|
|
if state.SYS_RTC_is_set: |
|
curr_timestamp = get_rnd_timestamp() |
|
do_send = True if curr_timestamp >= future_send_timestamp else False |
|
do_temphum = True if curr_timestamp >= future_temphum_timestamp else False |
|
togo_temphum_secs = future_temphum_timestamp - curr_timestamp |
|
togo_send_secs = future_send_timestamp - curr_timestamp |
|
|
|
if togo_temphum_secs_old != togo_temphum_secs: |
|
togo_temphum_secs_old = togo_temphum_secs |
|
if togo_temphum_secs % 10 == 0: |
|
print() |
|
print(TAG+"next get_temphum() in: {:4d} seconds".format(togo_temphum_secs)) |
|
|
|
if togo_send_secs_old != togo_send_secs: |
|
togo_send_secs_old = togo_send_secs |
|
if togo_send_secs % 10 == 0: |
|
print(TAG+"next SendToGoogle in: {:4d} seconds".format(togo_send_secs)) |
|
|
|
if start or do_temphum: |
|
if state.msg_sent: |
|
state.msg_sent = False |
|
get_temphum(state) |
|
future_temphum_timestamp = calc_future_timestamp(datetime.now(timezone.utc), interval_short) # in practice we will use var 'interval' |
|
|
|
if (start or (do_send) or (not state.tempSensor)) and (not state.msg_sent): |
|
start = False |
|
#blink_leds() # switch off LEDs |
|
get_th_direct(state) |
|
gc.collect() |
|
if not send_to_google_apps_scripts(state): |
|
s = TAG+f"failed to send data to Google Apps Scripts script!" |
|
print(s) |
|
logger_aht.error(s) |
|
else: |
|
state.msg_sent = True |
|
future_send_timestamp = calc_future_timestamp(datetime.now(timezone.utc), interval_long) |
|
# if c2 == 30: |
|
# sys.exit() # Temporary forced end of execution |
|
|
|
|
|
#_time.sleep(1) |
|
|
|
except KeyboardInterrupt: |
|
s = TAG+"KeyboardInterrrupt. Exiting..." |
|
print(s) |
|
logger_qt.info(s) |
|
logger_aht.info(s) |
|
break |
|
|
|
if loopnr >= 1000: |
|
loopnr = 0 |
|
# break |
|
|
|
sys.exit() |
|
|
|
if __name__ == '__main__': |
|
main() |
Example of the results in the Google Sheets sheet
Formula to represent as concatenated texts of average temperature and average humidity over a range of rows:
=CONCATENATE("Avg Temp: ";TEXT(ROUND(SUM(J2:J63)/ROWS(J2:J63);2);"00.00 ºC");", Avg Hum: ";TEXT(ROUND((SUM(K2:K63)/ROWS(K2:K63));2);"00.00 \%"))