Skip to content

Instantly share code, notes, and snippets.

@tikismoke
Created March 4, 2020 15:18
Show Gist options
  • Save tikismoke/00b1b360a771494a894880f6a0ed9271 to your computer and use it in GitHub Desktop.
Save tikismoke/00b1b360a771494a894880f6a0ed9271 to your computer and use it in GitHub Desktop.
from xee import Xee
import xee.entities as xee_entities
import sys
import os
import pickle
import pytz
import json
import paho.mqtt.client as paho
broker="YOUR BROKER IP OR DNS NAME"
port=1883
def on_publish(client,userdata,result): #create function for callback
print("data published \n")
pass
client1= paho.Client("xee2mqtt") #create client object
client1.on_publish = on_publish #assign function to callback
client1.connect(broker,port) #establish connection
xee = Xee(client_id="YOUR_CLIENT_ID_FROM_XEE_DEV_ACCOUNT",
client_secret="YOUR_CLIENT_SECRET_FROM_XEE_DEV_ACCOUNT",
redirect_uri="http://localhost")
login_url = xee.get_authentication_url()+"&redirect_uri=http://localhost"
xee_config_file = os.path.join(os.path.dirname(__file__), 'xee.json')
try:
with open(xee_config_file, 'rb') as xee_token_file:
print ("Opening File")
token = pickle.load(xee_token_file)
print ("Getting user")
user ,error = xee.get_user(token.access_token)
print (error)
if error is not None :
print ("Error getting user, try refreshing with token_refresh from file")
print (error)
token,error = xee.get_token_from_refresh_token(token.refresh_token)
if error != None :
print (error)
sys.exit("refreshing token failed from refresh_token")
except:
print ("Error with file saved or no file saved")
print("Go to %s allow the app and copy your oauth_verifier" %login_url)
authorization_code = input('Please enter your authorization_code: ')
token,error = xee.get_token_from_code(authorization_code)
if error is not None :
print ("Error getting token from code")
print (error)
print ("Exiting")
sys.exit("refresh Error")
with open(xee_config_file, 'wb') as xee_token_file:
pickle.dump(token, xee_token_file)
cars, err = xee.get_cars(token.access_token)
for car in cars:
try:
client1.publish("/XEE/" + str(car.id) + "/carname/", str(car.name))
except:
print ("error publishing this value")
print (car)
Status ,error = xee.get_status(car.id,token.access_token)
if error is None:
for statu in Status:
if statu is not None:
for signals in statu:
try:
client1.publish("/XEE/" + str(car.id) + "/" + str(signals.name) + "", str(signals.value))
print (signals)
except:
print ("error publishing this value")
print (signals)
locations ,error = xee.get_locations(car.id,token.access_token,limit=1)
for location in locations:
try:
lat=location.latitude
lon=location.longitude
client1.publish("/XEE/" + str(car.id) + "/location/", json.dumps({"longitude": lon,"latitude": lat}))
print (location)
except:
print ("error publishing this value")
print (location)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment