Skip to content

Instantly share code, notes, and snippets.

@pvergain
Last active March 2, 2018 10:40
Show Gist options
  • Save pvergain/b779d29ae8deb2949894921d702dbcbd to your computer and use it in GitHub Desktop.
Save pvergain/b779d29ae8deb2949894921d702dbcbd to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Exemple d'application Python basée sur pyserial, logging et click."""
# Import standard modules
from os.path import abspath, basename, dirname, join, normpath
import binascii
import csv
import string
import time
from collections import namedtuple
import logging
from logging.handlers import RotatingFileHandler
# Import addons modules
#########################
# https://pythonhosted.org/pyserial/
import serial
# http://click.pocoo.org
import click
__all__ = ('simul_beagle')
"""
Fonction exportée : simul_beagle
Voir http://www.dabeaz.com/modulepackage/ModulePackage.pdf
"""
# création de l'objet logger qui va nous servir à écrire dans les logs
# ===============================================
# CRITICAL 50 Le programme complet agonise.
# ERROR 40 Une opération a foirée.
# WARNING 30 Pour avertir que quelque chose mérite l’attention
# enclenchement d’un mode particulier, detection d’une situation
# rare, un lib optionelle peut être installée.
# INFO 20 Pour informer de la marche du programme.
# Par exemple : “Starting CSV parsing”
# DEBUG 10 Pour dumper des information quand vous débuggez.
logger = logging.getLogger()
# on met le niveau du logger à DEBUG, comme ça il écrit tout
logger.setLevel(logging.DEBUG)
# création d'un formateur qui va ajouter le temps, le niveau
# de chaque message quand on écrira un message dans le log
formatter = logging.Formatter('%(asctime)s :: %(levelname)s :: %(message)s')
# création d'un handler qui va rediriger une écriture du log vers
# un fichier en mode 'append', avec 1 backup et une taille max de 1Mo
file_handler = RotatingFileHandler('test_serial_id3.log', 'a', 1000000, 1)
# on lui met le niveau sur DEBUG donc tous les messages
# de log vont se retrouver dans le fichier de logs
file_handler.setLevel(logging.DEBUG)
# si on met "INFO", les messages de DEBUG ne seront pas dans le fichier de log
# file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# création d'un second handler qui va rediriger chaque écriture de log
# sur la console
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
# stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
BeagleRecord = namedtuple('BeagleRecord', 'Index, Time, Dur, Len, Err, Dev, Ep, Level, Record Data')
"""
Définition d'un enregistrement beagle
"""
def beagle_records(filename):
"""Définition d'un générateur renvoyant un enregistrement 'beagle'.
Note::
En employant un générateur, on économise beaucoup de mémoire.
Args:
filename (str) : le nom complet du fichier.
Yields:
BeagleRecord: a beagle record.
Raises:
Exception: the filename does not exist.
"""
try:
for beagle_record in map(BeagleRecord._make, csv.reader(open(filename, "r"))):
yield beagle_record
except Exception as e:
logger.error('In beagle_records() exception:{}'.format(filename))
@click.command()
@click.option('--beagle_file', default='C:\projects_id3\P5E627\XLOG9X324_python_Simul\data\CR_beagle_2009_11_09_all_gen.csv',
help='Le nom du fichier contenant les enregistrements beagle')
@click.option('--nb_max_records', default=1000000,
help="Nombre maximal d'enregistrements a lire (1000000 par defaut)")
def read_beagle_file(beagle_file, nb_max_records):
"""read_beagle_file()."""
logger.info("beagle_file:{}".format(beagle_file))
logger.info("abspath:{}".format(abspath(beagle_file)))
logger.info("nb_max_records:{}".format(nb_max_records))
nb_records = 0
for beagle_record in beagle_records(abspath(beagle_file)):
nb_records=nb_records+1
if ((nb_records % 200) == 0):
logger.info("{} beagle records".format(nb_records))
if (nb_records > nb_max_records):
break;
@click.command()
@click.option('--serial_port', default=5,
help='Le numero de port COM')
@click.option('--beagle_file', default='../CR_beagle_2009_11_09_all_gen.csv',
help='Le nom du fichier contenant les enregistrements beagle')
@click.option('--nb_max_records', default=100000000000,
help="Nombre maximal d'enregistrements a lire (100000000000 par defaut)")
def simul_beagle(serial_port, beagle_file, nb_max_records):
"""simul_beagle.
Exemples d'appel:
> python serial_id3.py --serial_port=4
> python serial_id3.py --beagle_file='C:\projects_id3\P5E627\XLOG9X324_python_Simul\data\CR_beagle_20091109_small_gen.csv'
"""
logger.info("serial port:{}".format(serial_port))
logger.info("beagle_file:{}".format(beagle_file))
logger.info("normpath:{}".format(normpath(beagle_file)))
logger.info("nb_max_records:{}".format(nb_max_records))
try:
port_com = 'COM'+ str(serial_port-1)
ser = serial.Serial(port_com, 115200)
logger.info("serial port:{}".format(serial_port))
except serial.SerialException as e :
logger.error("Unable to open port:{}, please check port number".format(port_com))
return
try :
Table = string.maketrans('','')
RemoveNonHexCharsTable = string.translate(Table, Table, string.hexdigits)
CheckBadCharTable = string.translate(Table, Table, string.punctuation)
UniformSepTable = string.maketrans(string.punctuation,' '.ljust(len(string.punctuation)))
LastTime = time.clock()
PreviousTimeStamp = None
nb_records=0;
for beagle_record in beagle_records(normpath(beagle_file)):
nb_records=nb_records+1
if ((nb_records % 200) == 0):
logger.info("{} beagle records".format(nb_records))
if (nb_records > nb_max_records):
break;
SplittedTime = string.translate(beagle_record.Time, UniformSepTable).split()
Minutes, Secondes, Millisecondes = int(SplittedTime[0]), int(SplittedTime[1]), int(SplittedTime[2])
Microsecondes , Nanosecondes = int(SplittedTime[3]), int(SplittedTime[4])
CurrentTimeStamp = (Minutes * 60) + Secondes + (Millisecondes / 1000.0) + (Microsecondes / 1000000.0) + (Nanosecondes / 1000000000.0)
# to comment if no delay between frames.
if PreviousTimeStamp is not None :
if CurrentTimeStamp > PreviousTimeStamp :
time.sleep(CurrentTimeStamp-PreviousTimeStamp)
else:
PreviousTimeStamp = CurrentTimeStamp
else:
PreviousTimeStamp = CurrentTimeStamp
if len(string.translate(beagle_record.Data, Table, CheckBadCharTable)) == 0 :
try :
RawData = binascii.a2b_hex(string.translate(beagle_record.Data, Table, RemoveNonHexCharsTable))
ser.write(RawData)
except TypeError :
pass
PreviousTimeStamp = CurrentTimeStamp
finally :
logger.info("{} records processed".format(nb_records))
logger.info("Closing serial port ...")
ser.close()
if __name__ == '__main__':
simul_beagle()
# read_beagle_file()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment