Skip to content

Instantly share code, notes, and snippets.

@aallan
Last active September 17, 2024 08:17
Show Gist options
  • Save aallan/510847c5ca0d74843d0fb8818a06df0c to your computer and use it in GitHub Desktop.
Save aallan/510847c5ca0d74843d0fb8818a06df0c to your computer and use it in GitHub Desktop.
Vindriktning webserver for Raspberry Pi Pico W written in MicroPython
from micropython import const
from ustruct import unpack as unp
# Author David Stenwall (david at stenwall.io)
# See https://github.com/dafvid/micropython-bmp280
# SPDX-License-Identifier: MIT
# Power Modes
BMP280_POWER_SLEEP = const(0)
BMP280_POWER_FORCED = const(1)
BMP280_POWER_NORMAL = const(3)
BMP280_SPI3W_ON = const(1)
BMP280_SPI3W_OFF = const(0)
BMP280_TEMP_OS_SKIP = const(0)
BMP280_TEMP_OS_1 = const(1)
BMP280_TEMP_OS_2 = const(2)
BMP280_TEMP_OS_4 = const(3)
BMP280_TEMP_OS_8 = const(4)
BMP280_TEMP_OS_16 = const(5)
BMP280_PRES_OS_SKIP = const(0)
BMP280_PRES_OS_1 = const(1)
BMP280_PRES_OS_2 = const(2)
BMP280_PRES_OS_4 = const(3)
BMP280_PRES_OS_8 = const(4)
BMP280_PRES_OS_16 = const(5)
# Standby settings in ms
BMP280_STANDBY_0_5 = const(0)
BMP280_STANDBY_62_5 = const(1)
BMP280_STANDBY_125 = const(2)
BMP280_STANDBY_250 = const(3)
BMP280_STANDBY_500 = const(4)
BMP280_STANDBY_1000 = const(5)
BMP280_STANDBY_2000 = const(6)
BMP280_STANDBY_4000 = const(7)
# IIR Filter setting
BMP280_IIR_FILTER_OFF = const(0)
BMP280_IIR_FILTER_2 = const(1)
BMP280_IIR_FILTER_4 = const(2)
BMP280_IIR_FILTER_8 = const(3)
BMP280_IIR_FILTER_16 = const(4)
# Oversampling setting
BMP280_OS_ULTRALOW = const(0)
BMP280_OS_LOW = const(1)
BMP280_OS_STANDARD = const(2)
BMP280_OS_HIGH = const(3)
BMP280_OS_ULTRAHIGH = const(4)
# Oversampling matrix
# (PRESS_OS, TEMP_OS, sample time in ms)
_BMP280_OS_MATRIX = [
[BMP280_PRES_OS_1, BMP280_TEMP_OS_1, 7],
[BMP280_PRES_OS_2, BMP280_TEMP_OS_1, 9],
[BMP280_PRES_OS_4, BMP280_TEMP_OS_1, 14],
[BMP280_PRES_OS_8, BMP280_TEMP_OS_1, 23],
[BMP280_PRES_OS_16, BMP280_TEMP_OS_2, 44]
]
# Use cases
BMP280_CASE_HANDHELD_LOW = const(0)
BMP280_CASE_HANDHELD_DYN = const(1)
BMP280_CASE_WEATHER = const(2)
BMP280_CASE_FLOOR = const(3)
BMP280_CASE_DROP = const(4)
BMP280_CASE_INDOOR = const(5)
_BMP280_CASE_MATRIX = [
[BMP280_POWER_NORMAL, BMP280_OS_ULTRAHIGH, BMP280_IIR_FILTER_4, BMP280_STANDBY_62_5],
[BMP280_POWER_NORMAL, BMP280_OS_STANDARD, BMP280_IIR_FILTER_16, BMP280_STANDBY_0_5],
[BMP280_POWER_FORCED, BMP280_OS_ULTRALOW, BMP280_IIR_FILTER_OFF, BMP280_STANDBY_0_5],
[BMP280_POWER_NORMAL, BMP280_OS_STANDARD, BMP280_IIR_FILTER_4, BMP280_STANDBY_125],
[BMP280_POWER_NORMAL, BMP280_OS_LOW, BMP280_IIR_FILTER_OFF, BMP280_STANDBY_0_5],
[BMP280_POWER_NORMAL, BMP280_OS_ULTRAHIGH, BMP280_IIR_FILTER_16, BMP280_STANDBY_0_5]
]
_BMP280_REGISTER_ID = const(0xD0)
_BMP280_REGISTER_RESET = const(0xE0)
_BMP280_REGISTER_STATUS = const(0xF3)
_BMP280_REGISTER_CONTROL = const(0xF4)
_BMP280_REGISTER_CONFIG = const(0xF5) # IIR filter config
_BMP280_REGISTER_DATA = const(0xF7)
class BMP280:
def __init__(self, i2c_bus, addr=0x77, use_case=BMP280_CASE_HANDHELD_DYN):
self._bmp_i2c = i2c_bus
self._i2c_addr = addr
# read calibration data
# < little-endian
# H unsigned short
# h signed short
self._T1 = unp('<H', self._read(0x88, 2))[0]
self._T2 = unp('<h', self._read(0x8A, 2))[0]
self._T3 = unp('<h', self._read(0x8C, 2))[0]
self._P1 = unp('<H', self._read(0x8E, 2))[0]
self._P2 = unp('<h', self._read(0x90, 2))[0]
self._P3 = unp('<h', self._read(0x92, 2))[0]
self._P4 = unp('<h', self._read(0x94, 2))[0]
self._P5 = unp('<h', self._read(0x96, 2))[0]
self._P6 = unp('<h', self._read(0x98, 2))[0]
self._P7 = unp('<h', self._read(0x9A, 2))[0]
self._P8 = unp('<h', self._read(0x9C, 2))[0]
self._P9 = unp('<h', self._read(0x9E, 2))[0]
# output raw
self._t_raw = 0
self._t_fine = 0
self._t = 0
self._p_raw = 0
self._p = 0
self.read_wait_ms = 0 # interval between forced measure and readout
self._new_read_ms = 200 # interval between
self._last_read_ts = 0
if use_case is not None:
self.use_case(use_case)
def _read(self, addr, size=1):
return self._bmp_i2c.readfrom_mem(self._i2c_addr, addr, size)
def _write(self, addr, b_arr):
if not type(b_arr) is bytearray:
b_arr = bytearray([b_arr])
return self._bmp_i2c.writeto_mem(self._i2c_addr, addr, b_arr)
def _gauge(self):
# TODO limit new reads
# read all data at once (as by spec)
d = self._read(_BMP280_REGISTER_DATA, 6)
self._p_raw = (d[0] << 12) + (d[1] << 4) + (d[2] >> 4)
self._t_raw = (d[3] << 12) + (d[4] << 4) + (d[5] >> 4)
self._t_fine = 0
self._t = 0
self._p = 0
def reset(self):
self._write(_BMP280_REGISTER_RESET, 0xB6)
def load_test_calibration(self):
self._T1 = 27504
self._T2 = 26435
self._T3 = -1000
self._P1 = 36477
self._P2 = -10685
self._P3 = 3024
self._P4 = 2855
self._P5 = 140
self._P6 = -7
self._P7 = 15500
self._P8 = -14600
self._P9 = 6000
def load_test_data(self):
self._t_raw = 519888
self._p_raw = 415148
def print_calibration(self):
print("T1: {} {}".format(self._T1, type(self._T1)))
print("T2: {} {}".format(self._T2, type(self._T2)))
print("T3: {} {}".format(self._T3, type(self._T3)))
print("P1: {} {}".format(self._P1, type(self._P1)))
print("P2: {} {}".format(self._P2, type(self._P2)))
print("P3: {} {}".format(self._P3, type(self._P3)))
print("P4: {} {}".format(self._P4, type(self._P4)))
print("P5: {} {}".format(self._P5, type(self._P5)))
print("P6: {} {}".format(self._P6, type(self._P6)))
print("P7: {} {}".format(self._P7, type(self._P7)))
print("P8: {} {}".format(self._P8, type(self._P8)))
print("P9: {} {}".format(self._P9, type(self._P9)))
def _calc_t_fine(self):
# From datasheet page 22
self._gauge()
if self._t_fine == 0:
var1 = (((self._t_raw >> 3) - (self._T1 << 1)) * self._T2) >> 11
var2 = (((((self._t_raw >> 4) - self._T1)
* ((self._t_raw >> 4)
- self._T1)) >> 12)
* self._T3) >> 14
self._t_fine = var1 + var2
@property
def temperature(self):
self._calc_t_fine()
if self._t == 0:
self._t = ((self._t_fine * 5 + 128) >> 8) / 100.
return self._t
@property
def pressure(self):
# From datasheet page 22
self._calc_t_fine()
if self._p == 0:
var1 = self._t_fine - 128000
var2 = var1 * var1 * self._P6
var2 = var2 + ((var1 * self._P5) << 17)
var2 = var2 + (self._P4 << 35)
var1 = ((var1 * var1 * self._P3) >> 8) + ((var1 * self._P2) << 12)
var1 = (((1 << 47) + var1) * self._P1) >> 33
if var1 == 0:
return 0
p = 1048576 - self._p_raw
p = int((((p << 31) - var2) * 3125) / var1)
var1 = (self._P9 * (p >> 13) * (p >> 13)) >> 25
var2 = (self._P8 * p) >> 19
p = ((p + var1 + var2) >> 8) + (self._P7 << 4)
self._p = p / 256.0
return self._p
def _write_bits(self, address, value, length, shift=0):
d = self._read(address)[0]
m = int('1' * length, 2) << shift
d &= ~m
d |= m & value << shift
self._write(address, d)
def _read_bits(self, address, length, shift=0):
d = self._read(address)[0]
return d >> shift & int('1' * length, 2)
@property
def standby(self):
return self._read_bits(_BMP280_REGISTER_CONFIG, 3, 5)
@standby.setter
def standby(self, v):
assert 0 <= v <= 7
self._write_bits(_BMP280_REGISTER_CONFIG, v, 3, 5)
@property
def iir(self):
return self._read_bits(_BMP280_REGISTER_CONFIG, 3, 2)
@iir.setter
def iir(self, v):
assert 0 <= v <= 4
self._write_bits(_BMP280_REGISTER_CONFIG, v, 3, 2)
@property
def spi3w(self):
return self._read_bits(_BMP280_REGISTER_CONFIG, 1)
@spi3w.setter
def spi3w(self, v):
assert v in (0, 1)
self._write_bits(_BMP280_REGISTER_CONFIG, v, 1)
@property
def temp_os(self):
return self._read_bits(_BMP280_REGISTER_CONTROL, 3, 5)
@temp_os.setter
def temp_os(self, v):
assert 0 <= v <= 5
self._write_bits(_BMP280_REGISTER_CONTROL, v, 3, 5)
@property
def press_os(self):
return self._read_bits(_BMP280_REGISTER_CONTROL, 3, 2)
@press_os.setter
def press_os(self, v):
assert 0 <= v <= 5
self._write_bits(_BMP280_REGISTER_CONTROL, v, 3, 2)
@property
def power_mode(self):
return self._read_bits(_BMP280_REGISTER_CONTROL, 2)
@power_mode.setter
def power_mode(self, v):
assert 0 <= v <= 3
self._write_bits(_BMP280_REGISTER_CONTROL, v, 2)
@property
def is_measuring(self):
return bool(self._read_bits(_BMP280_REGISTER_STATUS, 1, 3))
@property
def is_updating(self):
return bool(self._read_bits(_BMP280_REGISTER_STATUS, 1))
@property
def chip_id(self):
return self._read(_BMP280_REGISTER_ID, 1)
@property
def in_normal_mode(self):
return self.power_mode == BMP280_POWER_NORMAL
def force_measure(self):
self.power_mode = BMP280_POWER_FORCED
def normal_measure(self):
self.power_mode = BMP280_POWER_NORMAL
def sleep(self):
self.power_mode = BMP280_POWER_SLEEP
def use_case(self, uc):
assert 0 <= uc <= 5
pm, oss, iir, sb = _BMP280_CASE_MATRIX[uc]
p_os, t_os, self.read_wait_ms = _BMP280_OS_MATRIX[oss]
self._write(_BMP280_REGISTER_CONFIG, (iir << 2) + (sb << 5))
self._write(_BMP280_REGISTER_CONTROL, pm + (p_os << 2) + (t_os << 5))
def oversample(self, oss):
assert 0 <= oss <= 4
p_os, t_os, self.read_wait_ms = _BMP280_OS_MATRIX[oss]
self._write_bits(_BMP280_REGISTER_CONTROL, p_os + (t_os << 3), 2)
# Vindriktning web server for Raspberry Pi Pico W
# Author: Alasdair Allan (https://twitter.com/aallan)
#
# Adapted from: https://learn.adafruit.com/ikea-vindriktning-hack-with-qt-py-esp32-s3-and-adafruit-io/
# Original author: Liz Clark (https://twitter.com/BlitzCityDIY)
#
# Adapted from: https://www.raspberrypi.com/news/how-to-run-a-webserver-on-raspberry-pi-pico-w/
# Original author: Alasdair Allan (https://twitter.com/aallan)
#
# SPDX-License-Identifier: MIT
# PM1006 Air Quality Sensor
# http://www.jdscompany.co.kr/download.asp?gubun=07&filename=PM1006_LED_PARTICLE_SENSOR_MODULE_SPECIFICATIONS.pdf
#
# Signal PM1006 Pico
# +5V +5V VSYS
# GND GND GND
# TX REST GP1 (UART0 RX)
# BMP280 Pressure and Temperature Sensor
# https://www.adafruit.com/product/2651
#
# Signal BMP280 Pico
# +3V3 VIN 3V3(OUT)
# GND GND GND
# SCL SCK GP15 (I2C1 SCL)
# SDA SDI GP14 (I2C1 SDA)
#
# NOTE: BMP280 sensor can be found at I2C address 0x76 or 0x77. Edit line 91 of bmp280.py appropriately.
# Debug Probe
# https://www.raspberrypi.com/documentation/microcontrollers/debug-probe.html
#
# Signal Probe Pico
# GND GND GND
# RX RX GP4 (UART1 TX)
# TX TX GP5 (UART1 RX)
# WARNING: DO NOT POWER THE RASPBERRY PI PICO VIA MICRO USB and VSYS AT THE SAME TIME WITHOUT USING A SCHOTTLY DIODE,
# SEE SECTION 4.5 OF THE RASPBERRY PI PICO DATASHEET https://datasheets.raspberrypi.com/pico/pico-datasheet.pdf
import network
import socket
import time
from machine import Pin
from machine import UART
from machine import I2C
from bmp280 import *
I2C1_SDA = 14
I2C1_SCL = 15
measurements = [0, 0, 0, 0, 0]
measurement_idx = 0
def valid_header(d):
uart1.write("data = " + str(d) + "\n")
headerValid = (d[0] == 0x16 and d[1] == 0x11 and d[2] == 0x0B)
if headerValid:
uart1.write("msg header valid\n")
else:
uart1.write("msg header not valid\n")
uart1.flush()
return headerValid
led = Pin("LED", Pin.OUT)
led.on()
uart0 = UART(0, baudrate=9600, tx=Pin(0), rx=Pin(1))
uart1 = UART(1, baudrate=9600, tx=Pin(4), rx=Pin(5))
i2c1 = I2C(1, scl=Pin(I2C1_SCL), sda=Pin(I2C1_SDA), freq=100000)
uart1.write("I2C scan" + str(I2C.scan(i2c1)) + "\n")
uart1.flush()
bmp = BMP280(i2c1)
bmp.use_case(BMP280_CASE_WEATHER)
bmp.oversample(BMP280_OS_HIGH)
bmp.temp_os = BMP280_TEMP_OS_8
bmp.press_os = BMP280_PRES_OS_4
bmp.standby = BMP280_STANDBY_250
bmp.iir = BMP280_IIR_FILTER_2
bmp.spi3w = BMP280_SPI3W_ON
bmp.power_mode = BMP280_POWER_NORMAL
ssid = 'Babilim'
password = '21cm Line of Hydrogen'
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
json = """{
"sensor":"vindriktning",
"pm25":%s,
"temperature":%s,
"pressure":%s
}
"""
#html = """<!DOCTYPE html>
#<html>
# <head> <title>Vindriktning</title> </head>
# <body> <h1>Vindriktning</h1>
# <p>pm25 = %s</p>
# <p>temp = %s</p>
# <p>pressure = %s</p>
# </body>
#</html>
#"""
max_wait = 10
while max_wait > 0:
if wlan.status() < 0 or wlan.status() >= 3:
break
max_wait -= 1
uart1.write('waiting for connection...\n')
time.sleep(1)
if wlan.status() != 3:
raise RuntimeError('network connection failed\n')
else:
uart1.write('connected\n')
status = wlan.ifconfig()
uart1.write( 'ip = ' + status[0] + "\n" )
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
s = socket.socket()
s.bind(addr)
s.listen(1)
uart1.write('listening on %s\n' % str(addr))
led.off()
# Listen for connections
while True:
try:
cl, addr = s.accept()
uart1.write('client connected from %s\n' % str(addr))
request = cl.recv(1024)
#uart1.write(request)
request = str(request)
reading = request.find('/reading')
stateis = "ERROR"
if reading == 6:
uart1.write("want a reading\n")
uart1.flush()
led.on()
v = False
while v is not True:
uart1.write("reading 32 bytes from UART0\n")
uart1.flush()
data = uart0.read(32)
if data is not None:
v = valid_header(data)
uart1.write("v = " + str(v) +"\n")
uart1.flush()
else:
uart1.write("data is NONE\n")
uart1.flush()
uart1.write("have data frame\n")
uart1.write("frame = " + str(data) + "\n")
measurement_idx = 0
start_read = True
while True:
if start_read is True:
uart1.write("reading... " + str(measurement_idx) + "\n")
uart1.flush()
pm25 = (data[5] << 8) | data[6]
uart1.write('pm25 = ' + str(pm25) + "\n")
uart1.flush()
measurements[measurement_idx] = pm25
if measurement_idx == 4:
start_read = False
measurement_idx = (measurement_idx + 1) % 5
uart1.write("measurements = " + str(measurements) + "\n")
else:
break
uart1.write( "temp = " + str(bmp.temperature) + "\n" )
uart1.write( "press = " + str(bmp.pressure) + "\n")
uart1.flush()
stateis = str(measurements)
#response = html % (stateis, str(bmp.temperature), str(bmp.pressure))
response = json % (stateis, str(bmp.temperature), str(bmp.pressure))
#cl.send('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
cl.send('HTTP/1.0 200 OK\r\nContent-type: application/json\r\n\r\n')
cl.send(response)
cl.close()
led.off()
except OSError as e:
cl.close()
uart1.write('connection closed')
uart1.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment