-
-
Save dpnkrg/369e26c51575aad7bd4a1f9ccbc7a1a3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This is an initial implementation of ReadSerial. | |
This version is directly implemented from github project. | |
Implemented as per initial requirements. | |
TODO :- TESTING | |
Working : - | |
1. Advanced version from Serial_v.0.1 | |
1. File name is generated with current date time stamp. | |
2. Before taking Serial Data. Test generated values taken from user. | |
3. Code clean up of mat-plot | |
4. Added sleep timer | |
Changelog : - | |
1. In file name seconds added | |
2. OS type detection added | |
""" | |
import serial | |
import csv | |
import re | |
import time | |
from time import sleep | |
import os | |
import redis | |
import uuid | |
""" | |
Setup Variables | |
""" | |
filename = "MAT" + "_" + time.strftime("%Y-%m-%d_%H.%M.%S") + ".csv" | |
baud = 115200 # Must match Arduino baud rate | |
timeout = 4 # Seconds // must be sync device time clock | |
max_num_readings = 16000 | |
num_signals = 1 | |
mylist = list() | |
mylist1 = list() | |
r = redis.StrictRedis(host='localhost', port=6379, db=0) | |
# weight = None | |
# tlayer = None | |
# blayer = None | |
# pires = None | |
# cfoam = None | |
# ncfoam = None | |
if os.name == 'posix': | |
portPath = "/dev/tty.usbmodem1421" # or "/dev/tty.usbmodem1421" | |
elif os.name == 'nt': | |
portPath = "COM6" # or "COM7" | |
elif os.name == 'linux' or 'linux2': | |
portPath = "" | |
def create_serial_obj(portPath, baud_rate, tout): | |
""" | |
Given the port path, baud rate, and timeout value, creates | |
and returns a pyserial object. | |
""" | |
return serial.Serial(portPath, baud_rate, timeout=tout) | |
def read_serial_data(serial): | |
""" | |
Given a pyserial object (serial). Outputs a list of lines read in | |
from the serial port | |
""" | |
serial.flushInput() | |
# serial.dtr(True) | |
serial_data = [] | |
readings_left = True | |
timeout_reached = False | |
while readings_left and not timeout_reached: | |
serial_line = serial.readline() | |
if serial_line == '': | |
timeout_reached = True | |
else: | |
serial_data.append(serial_line) | |
if len(serial_data) == max_num_readings: | |
readings_left = False | |
return serial_data | |
def is_number(string): | |
""" | |
Given a string returns True if the string represents a number. | |
Returns False otherwise. | |
""" | |
try: | |
float(string) | |
return True | |
except ValueError: | |
return False | |
def clean_serial_data(data): | |
""" | |
Given a list of serial lines (data). Removes all characters. | |
Returns the cleaned list of lists of digits. | |
Given something like: ['0.5000,33\r\n', '1.0000,283\r\n'] | |
Returns: [[0.5,33.0], [1.0,283.0]] | |
""" | |
clean_data = [] | |
for line in data: | |
line_data = re.findall("\d*\.\d*|\d*", line) # Find all digits | |
line_data = [float(element) for element in line_data if is_number(element)] # Convert strings to float | |
if len(line_data) >= 2: | |
clean_data.append(line_data) | |
print('clean data') | |
# r.hmset('user',) | |
# r.hmset(user:1 username tutorialspoint password tutorialspoint points 200) | |
print(clean_data) | |
return clean_data | |
def save_to_csv(data, filename): | |
""" | |
Saves a list of lists (data) to filename | |
""" | |
with open(filename, 'wb') as csvfile: | |
csvwrite = csv.writer(csvfile) | |
# csvfile.write(weight) | |
# csvfile.write('\n') | |
# csvfile.write(tlayer) | |
# csvfile.write('\n') | |
# csvfile.write(blayer) | |
# csvfile.write('\n') | |
# csvfile.write(pires) | |
# csvfile.write('\n') | |
# csvfile.write(cfoam) | |
# csvfile.write('\n') | |
# csvfile.write(ncfoam) | |
csvfile.write(str(mylist)) | |
csvfile.write('\n\n') | |
csvwrite.writerows(data) | |
# print("deep ", data) | |
def read_serial(): | |
print "Creating serial object..." | |
serial_obj = create_serial_obj(portPath, baud, timeout) | |
print "Reading serial data..." | |
serial_data = read_serial_data(serial_obj) | |
print "serial data" | |
print(serial_data) | |
print "Data Length" | |
print len(serial_data) | |
print "Cleaning data..." | |
clean_data = clean_serial_data(serial_data) | |
print "Saving to csv..." | |
save_to_csv(clean_data, filename) | |
print "Save to redis" | |
user = {"Name": "Pradeep", "Company": "SCTL", "Address": "Mumbai", "Location": "RCP"} | |
print("name :",mylist1) | |
# user_id = str(g.db.incrby('next_user_id', 1000)) | |
public_id = str(uuid.uuid1()), | |
values = {"uuid": public_id, "device_id": "raspberrypi", "client_id": "arduino", "Material": mylist, "Time": time.strftime("%Y-%m-%d_%H.%M.%S"), "RawData": clean_data} | |
r.hmset("data", values) | |
print "End..." | |
def main(): | |
wval = str(raw_input('Enter Weight Value : ')) | |
weight = 'Weight_' + wval | |
tlval = str(raw_input('Enter Top Layer : ')) | |
tlayer = 'TL_' + tlval | |
blval = str(raw_input('Enter Bottom Layer : ')) | |
blayer = 'BL_' + blval | |
prval = str(raw_input('Enter Piso Resistive : ')) | |
pires = 'PR_' + prval | |
cfom = str(raw_input('Enter Conductive Foam : ')) | |
cfoam = 'CF_' + cfom | |
ncfom = str(raw_input('Enter Non Conductive Foam : ')) | |
ncfoam = 'NCF_' + ncfom | |
mylist.append(weight) | |
mylist.append(tlayer) | |
mylist.append(blayer) | |
mylist.append(pires) | |
mylist.append(cfoam) | |
mylist.append(ncfoam) | |
print "Processing..." | |
sleep(3) | |
read_serial() | |
if __name__ == "__main__": | |
# read_serial() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
i