Skip to content

Instantly share code, notes, and snippets.

@OnePro
Last active June 4, 2020 21:56
Show Gist options
  • Save OnePro/87376f241e866e6c5b6bd22f6f4a77c9 to your computer and use it in GitHub Desktop.
Save OnePro/87376f241e866e6c5b6bd22f6f4a77c9 to your computer and use it in GitHub Desktop.
from concurrent import futures
from random import randint
import time, os, math, logging, logging.handlers, pprint
import multiprocessing
formatter = logging.Formatter('[%(asctime)s] - %(name)s - %(levelname)s - %(message)s')
cons_log = logging.StreamHandler()
cons_log.setFormatter(formatter)
handler = logging.handlers.WatchedFileHandler('status.log')
handler.setFormatter(formatter)
logger = logging.getLogger()
logger.setLevel("DEBUG")
logger.addHandler(handler)
logger.addHandler(cons_log)
manager = multiprocessing.Manager()
shared_ids = manager.list()
share_dict = manager.dict()
def generate_records():
for id in range(1,11):
share_dict.update({id:{'title':f'Record #{id}', 'id':id, 'status':'new', 'source':id, 'result':0}})
def step_one(id, share_rec):
cur_rec = share_rec[id]
cur_rec['status'] = 'step one'
share_rec[id] = cur_rec
logger.info(f"Step one - {share_rec[id]}")
time.sleep(1)
step_two(id, share_rec)
def step_two(id, share_rec):
cur_rec = share_rec[id]
cur_rec['status'] = 'step two'
share_rec[id] = cur_rec
logger.info(f"Step two - {share_rec[id]}")
time.sleep(1)
step_three(id, share_rec)
def step_three(id, share_rec):
cur_rec = share_rec[id]
cur_rec['result'] = math.sqrt(cur_rec['source'])
cur_rec['status'] = 'step three DONE'
#
if id == 10:
time.sleep(15)
share_rec[id] = cur_rec
logger.info(f"Step three - {share_rec[id]}")
time.sleep(2)
def fetch_new_records():
new_rec = []
for id, rec in share_dict.items():
if rec['status'] == 'new':
new_rec.append(id)
return new_rec[:3]
def multi_main_loop():
counter = 0
while counter < 20:
new_rec = fetch_new_records()
if new_rec:
with futures.ProcessPoolExecutor(max_workers=5) as pool:
running = []
for id in new_rec:
running.append(pool.submit(step_one, id, share_dict))
done, not_done = futures.wait(running, timeout=5)
for future in done:
exception = future.exception()
if exception:
print(exception)
else:
data = future.result()
#print('Successfully')
for future in not_done:
print('future canceling')
future.cancel()
logger.info('sleeping')
time.sleep(1)
counter+=1
if __name__ == '__main__':
generate_records()
multi_main_loop()
[print(f"{k}:{v}") for k,v in share_dict.items()]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment