Skip to content

Instantly share code, notes, and snippets.

@jason-green-io
Last active March 8, 2017 21:49
Show Gist options
  • Save jason-green-io/cb9a602c57c7927712ae0ef6a11db314 to your computer and use it in GitHub Desktop.
Save jason-green-io/cb9a602c57c7927712ae0ef6a11db314 to your computer and use it in GitHub Desktop.
gradient.py runs on the Pi and minecraft-serveractivity.py runs on the server, the server portion is part of a larger project, it has pieces that aren't used yet or not required for the server clock
import colour
import collections
import blinkt
import time
import threading
import paho.mqtt.client as paho
import os
import socket
import ssl
import json
from blinkt import set_clear_on_exit, set_brightness, set_pixel, show
# clear the Blinkt! on exit
set_clear_on_exit()
# this is plenty bright, these light are super bright
set_brightness(0.1)
# various colors
skyColor = "#000022"
sunsetColor = "#FF4500"
sunriseColor = "#FF4500"
nightColor = "#010101"
# this is what we're using as sunset times in game ticks, taken from the wiki
sunset = (7698, 13805)
sunrise = (22550, 450)
# initial values, these get changed later
moonPhase = 0
serverTime = 12000
def on_connect(client, userdata, flags, rc):
''' used by paho, notice we're subscribing to the servertim topic coming from the server '''
print("Connection returned result: " + str(rc) )
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("servertime" , 1 )
def on_message(client, userdata, msg):
'''runs every time paho gets a message '''
# grab the global objects for the moon and time
global serverTime
global moonPhase
print("topic: "+ msg.topic)
print("payload: "+ str(msg.payload))
# parse it, since the server sends it to us via json
payload = json.loads(msg.payload.decode("utf-8"))
# update the values
serverTime = payload[1]
moonPhase = divmod(payload[0], 8)[1]
def hex_to_RGB(hex):
''' "#FFFFFF" -> [255,255,255] '''
# Pass 16 to the integer function for change of base
return [int(hex[i:i+2], 16) for i in range(1,6,2)]
def RGB_to_hex(RGB):
''' [255,255,255] -> "#FFFFFF" '''
# Components need to be integers for hex to make sense
RGB = [int(x) for x in RGB]
return "#"+"".join(["0{0:x}".format(v) if v < 16 else
"{0:x}".format(v) for v in RGB])
def color_dict(gradient):
''' Takes in a list of RGB sub-lists and returns dictionary of
colors in RGB and hex form for use in a graphing function
defined later on '''
return {"hex":[RGB_to_hex(RGB) for RGB in gradient],
"r":[RGB[0] for RGB in gradient],
"g":[RGB[1] for RGB in gradient],
"b":[RGB[2] for RGB in gradient]}
def linear_gradient(start_hex, finish_hex="#FFFFFF", n=10):
''' returns a gradient list of (n) colors between
two hex colors. start_hex and finish_hex
should be the full six-digit color string,
inlcuding the number sign ("#FFFFFF") '''
# Starting and ending colors in RGB form
s = hex_to_RGB(start_hex)
f = hex_to_RGB(finish_hex)
# Initilize a list of the output colors with the starting color
RGB_list = [s]
# Calcuate a color at each evenly spaced value of t from 1 to n
for t in range(1, n):
# Interpolate RGB vector for color at the current value of t
curr_vector = [
int(s[j] + (float(t)/(n-1))*(f[j]-s[j]))
for j in range(3)
]
# Add it to our list of output colors
RGB_list.append(curr_vector)
return RGB_list
# here we assemble a list of all the color values for the sky, calculating gradients between the different milstones
# 450 - 7698
day = (7698 - 450) * [hex_to_RGB(skyColor)]
#7698 - 13048
sunsetPart1 = linear_gradient(skyColor, sunsetColor, n=13048 - 7698)
#13048 - 13805
sunsetPart2 = linear_gradient(sunsetColor, nightColor, n=13805 - 13048)
#13805 - 22549
night = (22549 - 13805) * [hex_to_RGB(nightColor)]
# 22549 - 22925
sunrisePart1 = linear_gradient(nightColor, sunriseColor, n=22925 - 22549)
#22925 - 450
sunrisePart2 = linear_gradient(sunriseColor, skyColor, n=24000 - 22925 + 450)
# stick them all together
gradient = collections.deque(day + sunsetPart1 + sunsetPart2 + night + sunrisePart1 + sunrisePart2)
# and rotate them so the index corresponds to the tick
gradient.rotate(450)
def getMoonPixel(time):
''' return which pixel the moon is in'''
moon = (12575, 23459)
if not (time >= moon[0] and time < moon[1]):
return None
else:
bodyRange = moon[1] - moon[0]
time = time - moon[0]
return int(time / float(bodyRange) * 8)
def getSunPixel(time):
''' return which pixel the sun is in '''
sun = (22925, 13048)
if not (time >= sun[0] or time < sun[1]):
return None
else:
if time < sun[1]:
time += 24000
bodyRange = sun[1] + 24000 - sun[0]
time = time - sun[0]
return int(time / float(bodyRange) * 8)
def display():
''' continually update the display based on the global values of the moon phase and server time'''
global serverTime
while True:
for x in range(8):
r, g, b = gradient[serverTime]
set_pixel(x,r,g,b)
moonPixel = getMoonPixel(serverTime)
if moonPixel != None:
c = abs(4 - moonPhase) / 4 * 255
set_pixel(moonPixel, c, c, c)
sunPixel = getSunPixel(serverTime)
if sunPixel != None:
set_pixel(sunPixel, 255, 255, 0)
show()
time.sleep(0.011)
#print(each)
#time.sleep(0.0001)
def genTicks():
''' used to generate ticks to test the display '''
global serverTime
while True:
serverTime = divmod(serverTime + 10, 24000)[1]
print(serverTime)
time.sleep(0.01)
# connect to AWS IoT
mqttc = paho.Client()
mqttc.on_connect = on_connect
mqttc.on_message = on_message
#mqttc.on_log = on_log
# if you plan on using this code, you'll need to change this stuff
awshost = "data.iot.eu-west-1.amazonaws.com"
awsport = 8883
clientId = "endermite"
thingName = "endermite"
caPath = "aws-iot-rootCA.crt"
certPath = "8cb2484aa1-certificate.pem.crt"
keyPath = "8cb2484aa1-private.pem.key"
mqttc.tls_set(caPath, certfile=certPath, keyfile=keyPath, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
mqttc.connect(awshost, awsport, keepalive=60)
# start the display
displayThread = threading.Thread(target=display)
displayThread.start()
# for debug purposes
#tickThread = threading.Thread(target=genTicks)
#tickThread.start()
# start getting values from server
mqttc.loop_forever()
#!/usr/bin/python -u
import paho.mqtt.client as paho
import socket
import ssl
import threading
import shutil
import sys
import time
import os
import yaml
import sqlite3
import json
import queue
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
from watchdog.events import FileSystemEventHandler
sys.path.append("/minecraft/NBT")
from nbt.nbt import NBTFile, TAG_Long, TAG_Int, TAG_String, TAG_List, TAG_Compound
# load some server config stuff (this is from littlepod, the server wrapper)
with open('/minecraft/host/config/server.yaml', 'r') as configfile:
config = yaml.load(configfile)
mcfolder = config['mcdata']
dbfile = config['dbfile']
otherdata = config["otherdata"]
level = []
scoreboard = []
def unpack_nbt(tag):
"""
Unpack an NBT tag into a native Python data structure.
"""
if isinstance(tag, TAG_List):
return [unpack_nbt(i) for i in tag.tags]
elif isinstance(tag, TAG_Compound):
return dict((i.name, unpack_nbt(i)) for i in tag.tags)
else:
return tag.value
def nbt_to_object(filename):
''' return an object from an NBT file'''
nbtData = NBTFile(filename)
return unpack_nbt(nbtData)
# start a queue for database stuff
q = queue.Queue()
def writeToDB():
''' execute a query, and wait for the next one '''
while True:
DBWriter(q.get())
q.task_done()
def DBWriter(queryArgs):
''' spin on database locks '''
conn = sqlite3.connect(dbfile)
cur = conn.cursor()
fail = True
while(fail):
try:
cur.execute(*queryArgs)
conn.commit()
fail = False
except sqlite3.OperationalError:
print("Locked")
fail = True
# start the database stuff
threadDBWriter = threading.Thread(target=writeToDB)
threadDBWriter.setDaemon(True)
threadDBWriter.start()
connflag = False
def on_connect(client, userdata, flags, rc):
''' connect to AWS IoT '''
global connflag
connflag = True
print("Connection returned result: " + str(rc) )
def on_message(client, userdata, msg):
''' publish a message to a topic '''
print(msg.topic+" "+str(msg.payload))
# set some paho stuff
mqttc = paho.Client()
mqttc.on_connect = on_connect
mqttc.on_message = on_message
# if you're using this code, you'll need to change this stuff
awshost = "data.iot.eu-west-1.amazonaws.com"
awsport = 8883
clientId = "barlynaland"
thingName = "barlynaland"
caPath = "aws-iot-rootCA.crt"
certPath = "7b5fede5e4-certificate.pem.crt"
keyPath = "7b5fede5e4-private.pem.key"
mqttc.tls_set(caPath, certfile=certPath, keyfile=keyPath, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
mqttc.connect(awshost, awsport, keepalive=60)
# start the thread to publish
mqttc.loop_start()
class LevelHandler(PatternMatchingEventHandler):
#class FileHandler(FileSystemEventHandler):
'''
Overwrite the methods for creation, deletion, modification, and moving
to get more information as to what is happening on output
'''
patterns = ["*/level.dat"]
def on_moved(self, event):
''' listen to changes in level.dat '''
if not event.is_directory and event.dest_path.endswith("level.dat"):
time.sleep(1)
filename = event.dest_path
# get the level.dat as a python object
level = nbt_to_object(filename)
# letus know something happened
print("Moved", event.src_path, event.dest_path)
# go get the server total ticks, and modulo 24000 to get days and ticks in the day
servertime = divmod(level["Data"]["DayTime"], 24000)
# send it to the cloud, AWS IoT
mqttc.publish("servertime", json.dumps(servertime), qos=1)
class ScoreboardHandler(PatternMatchingEventHandler):
'''
Overwrite the methods for creation, deletion, modification, and moving
to get more information as to what is happening on output
'''
patterns = ["*/scoreboard.dat"]
def on_modified(self, event):
''' listen for changes to the scorboard '''
if not event.is_directory:
time.sleep(1)
filename = event.src_path
scoreboard = nbt_to_object(filename)
print("Modified", filename)
print([team["Players"] for team in scoreboard["data"]["Teams"] if team["Name"] == "mute"])
file_watch_directory = mcfolder + "/world/" # Get watch_directory parameter
# get the handlers ready
level_event_handler = LevelHandler()
scoreboard_event_handler = ScoreboardHandler()
# start them up
level_observer = Observer()
level_observer.schedule(level_event_handler, file_watch_directory, True)
scoreboard_observer = Observer()
scoreboard_observer.schedule(scoreboard_event_handler, file_watch_directory, True)
level_observer.start()
scoreboard_observer.start()
'''
Keep the script running or else python closes without stopping the observer
thread and this causes an error.
'''
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
level_observer.stop()
scoreboard_observer.stop()
level_observer.join()
scoreboard_observer.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment