Last active
August 19, 2021 12:00
-
-
Save skeltonmod/9be3123485a8ddc2811817032984c4e9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Install needed dependencies | |
# pip install mysql | |
# pip install schedule | |
import mysql.connector | |
from ast import literal_eval | |
from datetime import datetime | |
import schedule | |
import time | |
from multiprocessing import Process, Manager | |
# now = datetime.now() | |
ns = Manager().Namespace() | |
ns.time = 0.1 | |
processes = [] | |
database = mysql.connector.connect( | |
host='localhost', | |
user='root', | |
password="", | |
database="tabletime" | |
) | |
cursor = database.cursor() | |
def updateVenue(venue_id, isOpen): | |
# print(f"Boolvar: {int(isOpen)}") | |
sql = f"UPDATE `shop_venues` SET `status` = {int(isOpen)} WHERE `id`= {int(venue_id)}" | |
cursor.execute(sql) | |
database.commit() | |
# Check the time delta | |
def check_time(current, start, end): | |
# print(f"{current} = {start} -> {end}") | |
if start > end: | |
if current > start or current < end: | |
return True | |
elif start < end: | |
if start < current < end: | |
return True | |
elif current == start: | |
return True | |
return False | |
def getSchedule(): | |
print(f"Doing Job... Current time is: {datetime.now().strftime('%H:%M:%S')}") | |
# extract venue from database | |
cursor.execute('SELECT venue_name from shop_venues') | |
venue = list(cursor.fetchall()) | |
# extract status from database | |
cursor.execute('SELECT status from shop_venues') | |
status = cursor.fetchall()[1] | |
# extract schedule from database | |
cursor.execute('SELECT schedules from shop_venues') | |
schedule = cursor.fetchall() | |
# Get the time from the schedule | |
for idx, sched in enumerate(schedule): | |
current_schedule = literal_eval(sched[0]) | |
# Loop to check current day | |
for idy, x in enumerate(current_schedule): | |
if str(x['day']).capitalize() in datetime.now().strftime('%A'): | |
hour = datetime.strptime(f"{x['time'][0]['start']['HH']}:{x['time'][0]['start']['mm']}", | |
'%H:%M').time() | |
# Check if the time is 00:00 | |
if hour.strftime('%H:%M') == datetime.strftime(datetime.strptime("00:00", '%H:%M'), | |
'%H:%M'): | |
print(f'{int(idx) + 1}: {venue[idx][0]} is currently close') | |
updateVenue(int(idx) + 1, False) | |
else: | |
current_time = datetime.strptime(f"{datetime.now().hour}:{datetime.now().minute}", "%H:%M").time() | |
# for x in current_schedule: | |
start = datetime.strptime(f"{x['time'][0]['start']['HH']}:{x['time'][0]['start']['mm']}", | |
'%H:%M').time() | |
end = datetime.strptime(f"{x['time'][0]['end']['HH']}:" | |
f"{x['time'][0]['end']['mm']}", | |
'%H:%M').time() | |
if check_time(current_time, start, end): | |
print(f'{int(idx) + 1}: {venue[idx][0]} is currently open') | |
updateVenue(int(idx) + 1, True) | |
else: | |
print(f'{int(idx) + 1}: {venue[idx][0]} is currently close') | |
updateVenue(int(idx) + 1, False) | |
pass | |
schedule.every(5).to(7).seconds.do(getSchedule) | |
if __name__ == '__main__': | |
# getSchedule() | |
while True: | |
# schedule.run_pending() | |
# time.sleep(1) | |
try: | |
s = time.time() + ns.time * 60 | |
for x in processes: | |
if not x.is_alive(): | |
x.join() | |
processes.remove(x) | |
print('Sleeping :', round(s - time.time())) | |
time.sleep(round(s - time.time())) | |
p = Process(target=getSchedule) | |
p.start() | |
processes.append(p) | |
except: | |
print('Killing all to prevent orphaning ...') | |
[p.terminate() for p in processes] | |
[processes.remove(p) for p in processes] | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment