Skip to content

Instantly share code, notes, and snippets.

@4poc
Last active December 1, 2016 10:45
Show Gist options
  • Save 4poc/02a6d791f35d47cbfa91cae0690ea33b to your computer and use it in GitHub Desktop.
Save 4poc/02a6d791f35d47cbfa91cae0690ea33b to your computer and use it in GitHub Desktop.
#!/usr/bin/python
# Wifibot: Read station dumps from a router and shows new/dropped wifi clients in a matrix channel
# by john & matthias
import re
import sys
import time
import subprocess
import requests # pip install requests
from threading import Thread
from matrix_client.client import MatrixClient # pip install matrix_client
MATRIX_HOST = 'https://[host]'
MATRIX_USER = '[user]'
MATRIX_PASS = '[pass]'
ROOM_ID = '![blub]:[host]'
STATION_DUMP_URLS = [
'http://192.168.0.1/wlan0_station_dump.txt'
# configure your router to make that file available:
# */1 * * * * iw dev wlan0 station dump > /www/wlan0_station_dump.txt
]
STATION_MAP = {
'00:11:22:fe:fe:fe': 'user'
}
CHECK_INTERVAL = 60 * 1
# connect to matrix:
client = MatrixClient(MATRIX_HOST)
client.login_with_password(MATRIX_USER, MATRIX_PASS)
room = client.join_room(ROOM_ID)
def get_active_stations():
output = ''
for url in STATION_DUMP_URLS:
output += requests.get(url).text
stations = re.findall(r"Station ([^ ]+) ", output)
return stations
previous = None
def format_stations(stations):
s = []
for station in stations:
if station in STATION_MAP:
s.append(STATION_MAP[station])
else:
s.append(station)
return ', '.join(s)
def station_dump_thread():
global previous
while True:
print('gather wifi...')
stations = set(get_active_stations())
print(stations)
if previous:
new_stations = stations - previous
gone_stations = previous - stations
print('new stations:', new_stations)
print('gone stations:', gone_stations)
if len(new_stations) > 0:
room.send_text('new stations:' + format_stations(new_stations))
if len(gone_stations) > 0:
room.send_text('gone stations:' + format_stations(gone_stations))
previous = stations
time.sleep(CHECK_INTERVAL)
# Called when a message is recieved.
def on_message(room_object, event):
print('event', event)
if event['type'] == "m.room.member":
if event['membership'] == "join":
print("{0} joined".format(event['content']['displayname']))
elif event['type'] == "m.room.message":
if event['content']['msgtype'] == "m.text":
user = event['sender']
message = event['content']['body']
print("{0}: {1}".format(user, message))
else:
print(event['type'])
room.add_listener(on_message)
client.start_listener_thread()
t = Thread(None, station_dump_thread)
t.start()
t.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment