|
import configargparse |
|
import math |
|
import datetime |
|
import time |
|
from time import sleep |
|
from dateutil.parser import parse |
|
from elasticsearch import Elasticsearch |
|
from elasticsearch.helpers import bulk, parallel_bulk |
|
from multiprocessing import Queue, Value, Process |
|
from queue import Full, Empty |
|
import random |
|
import sys |
|
import select |
|
import threading |
|
|
|
# Remember that, while this program is running, a minimal user interaction is allowed to send commands. Just type |
|
# the command and press ENTER to have it processed. Commands like 'inc', 'dec', 'stat' and 'stop' are supported to |
|
# add/pause consumer threads, see current statistics, stop the whole process and more. |
|
# Once the 'stop' command has been issued, allow for consumer threads to stop their currently active tasks. |
|
|
|
|
|
def getBooleanBitValue(value, bit_mask): |
|
if value & bit_mask: |
|
return True |
|
else: |
|
return False |
|
|
|
|
|
def printStatus(): |
|
paused = 0 |
|
pausedStr = '' |
|
speedStr = '' |
|
processingStr = '' |
|
for idx in range(len(consumers)): |
|
pausedStr = (pausedStr + ' ' + str(getBooleanBitValue(consumerStatus[idx].value, CONSUMER_PAUSED))).strip() |
|
processingStr = (processingStr + ' ' + str(getBooleanBitValue(consumerStatus[idx].value, CONSUMER_PROCESSING))).strip() |
|
if consumerStatus[idx].value & CONSUMER_PAUSED: |
|
paused = paused + 1 |
|
with consumerSpeeds[idx].get_lock(): |
|
if consumerSpeeds[idx].value == 0: |
|
speedStr = ('{0} 0'.format(speedStr)).strip() |
|
else: |
|
speedStr = ('{0} {1:.0f}'.format(speedStr, 1000000 / consumerSpeeds[idx].value)).strip() |
|
print('Consumer threads status: {0} total - {1} paused'.format(len(consumers), paused)) |
|
print(' Paused: {0}'.format(pausedStr)) |
|
print(' Processing: {0}'.format(processingStr)) |
|
print(' Speeds: {0} records / sec'.format(speedStr)) |
|
return |
|
|
|
|
|
def increaseActiveConsumers(): |
|
print('Increasing active consumers...') |
|
done = False |
|
# search for a paused consumer |
|
for idx in range(len(consumers)): |
|
if consumerStatus[idx].value & CONSUMER_PAUSED == 1: |
|
print('Unpausing consumer thread {0}'.format(idx + 1)) |
|
with consumerStatus[idx].get_lock(): |
|
consumerStatus[idx].value = consumerStatus[idx].value & ~CONSUMER_PAUSED |
|
done = True |
|
break |
|
|
|
if not done: |
|
print('Creating new consumer thread') |
|
idx = len(consumers) + 1 |
|
consumerSpeeds.append(Value('d', 0)) |
|
consumerStatus.append(Value('i', 0)) |
|
consumers.append(Process(target=consumerThread, |
|
args=('thread{0}'.format(idx), |
|
histTasksQueue, |
|
liveTasksQueue, |
|
stopProcessing, |
|
seeds, |
|
client_options, |
|
bulkDocsSize, |
|
parallelBulkThreadCount, |
|
indexMapping, |
|
consumerSpeeds[-1], |
|
consumerStatus[-1]))) |
|
# start consumer |
|
consumers[-1].start() |
|
|
|
printStatus() |
|
return |
|
|
|
|
|
def decreaseActiveConsumers(): |
|
print('Decreasing active consumers') |
|
done = False |
|
# search for a non-paused consumer |
|
for idx in range(len(consumers) - 1, -1, -1): |
|
if consumerStatus[idx].value & CONSUMER_PAUSED == 0: |
|
print('Pausing consumer thread {0}'.format(idx + 1)) |
|
with consumerStatus[idx].get_lock(): |
|
consumerStatus[idx].value = consumerStatus[idx].value | CONSUMER_PAUSED |
|
done = True |
|
break |
|
|
|
if not done: |
|
print('All consumer threads are already paused') |
|
|
|
printStatus() |
|
return |
|
|
|
|
|
def processUserInput(): |
|
print('Starting user input processor') |
|
askedToQuit = False |
|
while stopProcessing.value == 0: |
|
while sys.stdin in select.select([sys.stdin], [], [], 0)[0]: |
|
line = sys.stdin.readline() |
|
if line: |
|
printHelp = False |
|
line = line.strip('\n\r') |
|
if line: |
|
print('GOT <{0}> command'.format(line)) |
|
if line == 'quit': |
|
askedToQuit = True |
|
elif line == 'inc': |
|
increaseActiveConsumers() |
|
elif line == 'dec': |
|
decreaseActiveConsumers() |
|
elif line == 'stat': |
|
printStatus() |
|
elif line == 'help': |
|
printHelp = True |
|
elif line == 'stop': |
|
printStatus() |
|
with stopProcessing.get_lock(): |
|
print('Initiated shutdown procedure...') |
|
stopProcessing.value = 1 |
|
if printHelp: |
|
print('Supported commands:') |
|
print(' - inc : increases the number of unpaused consumer threads (creating new ones if needed)') |
|
print(' - dec : pauses one more consumer thread') |
|
print(' - stat : print some statistics') |
|
print(' - stop : starts the shutdown procedure for this whole process') |
|
print(' - help : show this message') |
|
if askedToQuit: |
|
print('Quitting on user request...') |
|
break |
|
sleep(0.5) |
|
print('Stopped user input processor') |
|
return |
|
|
|
|
|
def computeGlobalStats(stats_): |
|
result = 0 |
|
secs = 0 |
|
recs = 0 |
|
samples = 0 |
|
for s in stats_: |
|
if s.value != 0: |
|
secs = secs + s.value |
|
recs = recs + 1000000 |
|
samples = samples + 1 |
|
if secs != 0: |
|
result = (recs / secs) * samples |
|
return result |
|
|
|
|
|
# Generate a position inside a square starting from a seed |
|
def WhereAmINow(time_, seed_, MaxXPos_, MinXPos_, MaxYPos_, MinYPos_): |
|
shifter1 = (seed_ % 100) / 100 # a number between 0 and 1 dependant on the seed (E.g. userID) |
|
shifter2 = (seed_ % 33) / 33 |
|
lat = MinXPos_ + ((MaxXPos_ - MinXPos_) / 2) * math.sin(time_ * (1 + shifter2 / 4) / 10000) + ( |
|
MaxXPos_ - MinXPos_) * 0.1 * shifter1 |
|
lon = MinYPos_ + ((MaxYPos_ - MinYPos_) / 2) * math.cos(time_ * (1 + shifter2 / 3) / 12000) |
|
return lat, lon |
|
|
|
|
|
def genIndexName(flag_, data_timeStart): |
|
if flag_ == "hist": |
|
index_name = index_prefix + str(datetime.datetime.fromtimestamp(data_timeStart).date()) |
|
else: |
|
index_name = index_prefix + "live-" + str(datetime.datetime.fromtimestamp(time.time()).date()) |
|
return index_name |
|
|
|
|
|
# Generate a a set of positions inside a square for a bunch of people |
|
def generatePositionsForAll(data_timeStart, startUser_, endUser_, flag_, readingInterval_): |
|
index_name = genIndexName(flag_, data_timeStart) |
|
for user in range(startUser_, endUser_ + 1): |
|
baseDoc = { |
|
"msisdn": int("49" + str(user).rjust(10, "0")), |
|
"imei": int("7777" + str(user).rjust(10, "0")), |
|
"imsi": int("98888" + str(user).rjust(8, "0")), |
|
"time": data_timeStart + user % readingInterval_, |
|
"hpe": random.uniform(1, 10), |
|
"type": "GNSS" |
|
} |
|
|
|
position = WhereAmINow(data_timeStart, user, MaxXPos, MinXPos, MaxYPos, MinYPos) |
|
baseDoc["location"] = {"lat": position[0], "lon": position[1]} |
|
|
|
baseDoc["geotimehash"] = ((int((float(baseDoc["location"]["lat"])) * 2000) * |
|
int((float(baseDoc["location"]["lon"])) * 2000) * |
|
int(baseDoc["time"] / 600))) % 2000000000 |
|
|
|
document = baseDoc.copy() |
|
|
|
""" |
|
yield { |
|
"_index": index_name, |
|
"_type": "doc", |
|
"_source": document |
|
} |
|
""" |
|
yield { |
|
"_index": index_name, |
|
"_source": document |
|
} |
|
|
|
return |
|
|
|
|
|
# Consumer thread |
|
def consumerThread(threadName_, histTasksQueue_, liveTasksQueue_, stopProcessing_, |
|
esSeeds_, esClientOptions_, |
|
bulkDocsSize_, bulkThreadCount_, |
|
indexMapping_, |
|
speedStat_, |
|
status_): |
|
print('Starting thread {0}'.format(threadName_)) |
|
|
|
# Initialize connection object |
|
client = Elasticsearch(esSeeds_, **esClientOptions_) |
|
|
|
while stopProcessing_.value == 0: |
|
if status_.value & CONSUMER_PAUSED: |
|
# print('Thread {0} is paused...'.format(threadName_)) |
|
with speedStat_.get_lock(): |
|
speedStat_.value = 0 |
|
sleep(1) |
|
continue |
|
# search for a pending task first in the 'live' queue and then in the 'historical' one |
|
try: |
|
task = liveTasksQueue_.get(True, 1) |
|
except Empty: |
|
task = None |
|
if task is None: |
|
try: |
|
task = histTasksQueue_.get(True, 1) |
|
except Empty: |
|
continue |
|
with status_.get_lock(): |
|
status_.value = status_.value | CONSUMER_PROCESSING |
|
print('Thread {0} - Starting task {1} ({2})'.format(threadName_, |
|
task, |
|
datetime.datetime.fromtimestamp( |
|
task['timeStart']).isoformat())) |
|
starttime = time.time() |
|
indexName = genIndexName(task['flag'], task['timeStart']) |
|
try: |
|
client.indices.create(index=indexName, body=indexMapping_) |
|
print('Thread {0} created index {1}'.format(threadName_, indexName)) |
|
except: |
|
pass |
|
|
|
for success, info in parallel_bulk(client, |
|
generatePositionsForAll(task['timeStart'], |
|
task['startUser'], |
|
task['endUser'], |
|
task['flag'], |
|
task['readingInterval']), |
|
thread_count=bulkThreadCount_, |
|
chunk_size=bulkDocsSize_, |
|
request_timeout=240, |
|
raise_on_exception=True, |
|
raise_on_error=True): |
|
pass |
|
with speedStat_.get_lock(): |
|
speedStat_.value = (time.time() - starttime) / (task['endUser'] - task['startUser'] + 1) * 1000000 |
|
with status_.get_lock(): |
|
status_.value = status_.value & ~CONSUMER_PROCESSING |
|
print('Thread {0} has finished'.format(threadName_)) |
|
return |
|
|
|
|
|
def produceLiveTasks(timeStart_, interval_, totalUsers_, queue_): |
|
""" |
|
Produces tasks to create live data. Live data is divided in bursts, one every 'interval' seconds. Every |
|
burst will contain a sample for each user (phone number) |
|
:param timeStart_: |
|
:param interval_: seconds between bursts |
|
:param totalUsers_: |
|
:param queue_: queue where to add tasks |
|
:return: |
|
""" |
|
print('Starting producer of live data. Will start at {0}...'.format(datetime.datetime.fromtimestamp( |
|
timeStart_).isoformat())) |
|
while stopProcessing.value == 0: |
|
if time.time() < timeStart_: |
|
sleep(1) |
|
else: |
|
startUser = 0 |
|
while (startUser < totalUsers_) and (stopProcessing.value == 0): |
|
endUser = startUser + 1000000 - 1 |
|
if endUser >= totalUsers_: |
|
endUser = totalUsers_ - 1 |
|
task = {'timeStart': timeStart_, |
|
'startUser': startUser, |
|
'endUser': endUser, |
|
'flag': 'hist', |
|
'readingInterval': interval_} |
|
print('Creating task {0} ({1})'.format(task, |
|
datetime.datetime.fromtimestamp(timeStart_).isoformat())) |
|
queue_.put(task) |
|
startUser = endUser + 1 |
|
|
|
timeStart_ = timeStart_ + interval_ |
|
print('Will produce new live data at {0}...'.format(datetime.datetime.fromtimestamp( |
|
timeStart_).isoformat())) |
|
|
|
print('Producer of live data has stopped') |
|
return |
|
|
|
|
|
def produceHistoricalTasks(timeStart_, timeEnd_, interval_, totalUsers_, consumers_, queue_): |
|
while (timeStart_ <= timeEnd_) and (stopProcessing.value == 0): |
|
startUser = 0 |
|
while (startUser < totalUsers_) and (stopProcessing.value == 0): |
|
endUser = startUser + 1000000 - 1 |
|
if endUser >= totalUsers_: |
|
endUser = totalUsers_ - 1 |
|
task = {'timeStart': timeStart_, |
|
'startUser': startUser, |
|
'endUser': endUser, |
|
'flag': 'hist', |
|
'readingInterval': interval_} |
|
# wait for taskQueue to be below a certain level |
|
while (queue_.qsize() > len(consumers_) * 2) and (stopProcessing.value == 0): |
|
sleep(1) |
|
if stopProcessing.value == 0: |
|
print('Creating task {0} ({1})'.format(task, |
|
datetime.datetime.fromtimestamp(timeStart_).isoformat())) |
|
queue_.put(task) |
|
totalSpeed = computeGlobalStats(consumerSpeeds) |
|
if totalSpeed > 0: |
|
print('STATS: {0:.0f} records / sec'.format(totalSpeed)) |
|
startUser = endUser + 1 |
|
timeStart_ += interval_ |
|
print('Finished creating historical tasks') |
|
return |
|
|
|
|
|
def firstLiveDataTimestamp(interval_): |
|
""" |
|
:param interval_: sampling interval in seconds |
|
:return: the first timestamp to use for live data. It's rounded based on interval_ starting at midnight |
|
""" |
|
result = math.floor(time.time()) + interval_ - 1 |
|
result = result - result % interval_ |
|
print('First live sample will be at {0}'.format(datetime.datetime.fromtimestamp(result).isoformat())) |
|
return result |
|
|
|
|
|
def proceedWithIngestion(timeStart_, timeEnd_, interval_, totalUsers_, consumerCount_, indexPrefix_): |
|
result = True |
|
|
|
# Compute how many blocks we need to send to ES according to the time interval we want to fill |
|
missingDataInSec = timeEnd_ - timeStart_ |
|
missingBlocks = math.ceil(missingDataInSec / interval_) |
|
blocksXDay = (24 * 3600) / interval_ |
|
|
|
# Computing estimates |
|
bytes1doc = 180 |
|
totalDocNumber = totalUsers_ * missingBlocks |
|
days = totalDocNumber / (blocksXDay * totalUsers_) |
|
totalDocBytes = bytes1doc * totalDocNumber |
|
estimatedRateOfIngestion = 25000 |
|
estimatedTime = round(totalDocNumber / estimatedRateOfIngestion / 3600 / consumerCount_, 2) |
|
|
|
# Do you want to ingest? |
|
while True: |
|
print('{0} documents will be sent to {1}:'.format(totalDocNumber, seeds)) |
|
print(' Total Days: {0}'.format(days)) |
|
print(' Index Prefix: {0}'.format(indexPrefix_)) |
|
print(' Estimated Size: {0:.0f} Bytes (approx {1:.2f} GB)'.format(totalDocBytes, |
|
(totalDocBytes / 1073741824))) |
|
print(' Estimated Size x day: {0:.0f} Bytes (approx {1:.2f} GB)'.format(totalDocBytes / days, |
|
(totalDocBytes / days / 1073741824))) |
|
print(' Estimated Time of Completion: {0} hours'.format(estimatedTime)) |
|
|
|
startIngestionWarning = input("Are you sure you want to continue? y/n \n") |
|
|
|
if startIngestionWarning.lower() in ["y", "yes"]: |
|
break |
|
elif startIngestionWarning.lower() in ["n", "no"]: |
|
result = False |
|
break |
|
else: |
|
print("please enter valid answer (accepted y,n,yes,no)") |
|
return result |
|
|
|
|
|
# Imaginary square around Berlin in coords, X = lat, Y = long |
|
MaxXPos = 53.000000 |
|
MinXPos = 51.000000 |
|
MaxYPos = 15.000000 |
|
MinYPos = 11.000000 |
|
|
|
parser = configargparse.ArgumentParser('bigDataCreator', |
|
formatter_class=configargparse.ArgumentDefaultsHelpFormatter) |
|
parser.add_argument('--start-date', action='store', dest='startDate', required=True, help='Start date in string format') |
|
parser.add_argument('--end-date', action='store', dest='endDate', required=True, help='Start date in string format') |
|
parser.add_argument('--phone-numbers', action='store', dest='phoneNumbers', required=True, type=int, |
|
help='Number of unique phone numbers to create') |
|
parser.add_argument('--live-data', action='store', dest='liveData', default=False, |
|
type=lambda x: (str(x).lower() in ['true', '1', 'yes']), |
|
help='True if live data must be generated') |
|
|
|
parser.add_argument('--es-seeds', action='store', dest='esSeeds', required=True, type=str, nargs='+', |
|
help='One or more ES nodes to connect to (separated with space)') |
|
parser.add_argument('--es-port', action='store', dest='esPort', default=9200, type=int, |
|
help='TCP/IP port to connect to ES') |
|
parser.add_argument('--es-scheme', action='store', dest='esScheme', default='http', |
|
help='Scheme to use to connect to ES (http or https)') |
|
parser.add_argument('--es-http-compression', action='store', dest='esHttpCompression', default=False, |
|
type=lambda x: (str(x).lower() in ['true', '1', 'yes']), |
|
help='Set to True to enable ES http compression') |
|
parser.add_argument('--es-username', action='store', dest='esUsername', default='', |
|
help='Username to connect to ES') |
|
parser.add_argument('--es-password', action='store', dest='esPassword', default='', |
|
help='Password to connect to ES') |
|
parser.add_argument('--es-index-prefix', action='store', dest='esIndexPrefix', default='positions-', |
|
help='ES index prefix to be used') |
|
parser.add_argument('--es-refresh-interval', action='store', dest='esRefreshInterval', type=int, default=120, |
|
help='Refresh interval set for generated ES indexes') |
|
parser.add_argument('--es-shards', action='store', dest='esShards', type=int, default=1, |
|
help='Number of shards per ES index') |
|
parser.add_argument('--es-replicas', action='store', dest='esReplicas', type=int, default=1, |
|
help='Number of replicas per ES index') |
|
|
|
parser.add_argument('--sampling-interval', action='store', dest='samplingInterval', type=int, default=3600, |
|
help='Sampling interval (in seconds)') |
|
parser.add_argument('--bulk-thread-count', action='store', dest='bulkThreadCount', type=int, default=4, |
|
help='Size of the threadpool to use for the bulk requests') |
|
parser.add_argument('--bulk-docs-size', action='store', dest='bulkDocsSize', type=int, default=600, |
|
help='Number of docs in one chunk sent to ES') |
|
parser.add_argument('--consumer-count', action='store', dest='consumerCount', type=int, default=4, |
|
help='Number of parallel threads used to send data to ES') |
|
|
|
args = parser.parse_args() |
|
|
|
seeds = args.esSeeds |
|
esPort = args.esPort |
|
esScheme = args.esScheme |
|
esHttpCompression = args.esHttpCompression |
|
esUsername = args.esUsername |
|
esPassword = args.esPassword |
|
index_prefix = args.esIndexPrefix |
|
refresh_interval = args.esRefreshInterval |
|
createLiveData = args.liveData |
|
esShards = args.esShards |
|
esReplicas = args.esReplicas |
|
timeStart = round(parse(args.startDate).timestamp()) |
|
timeEnd = round(parse(args.endDate).timestamp()) |
|
totalUsers = args.phoneNumbers |
|
ReadingInterval = args.samplingInterval |
|
parallelBulkThreadCount = args.bulkThreadCount |
|
bulkDocsSize = args.bulkDocsSize |
|
consumerCount = args.consumerCount |
|
print('ES seeds: {0} - Port: {1} - Scheme: {2} - Compression: {3}'.format(seeds, esPort, esScheme, esHttpCompression)) |
|
print('ES username: <{0}> - Password: <{1}>'.format(esUsername, esPassword)) |
|
print('ES index prefix: {0} - Refresh interval: {1} - Shards: {2} - Replicas: {3}'.format(index_prefix, |
|
refresh_interval, |
|
esShards, |
|
esReplicas)) |
|
print('Dates from {0} to {1}'.format(datetime.datetime.fromtimestamp(timeStart).isoformat(), |
|
datetime.datetime.fromtimestamp(timeEnd).isoformat())) |
|
print('Phone numbers to create: {0}'.format(totalUsers)) |
|
print('Sampling interval: {0}s'.format(ReadingInterval)) |
|
print('Generate live data: {0}'.format(createLiveData)) |
|
print('Parallel bulk thread count: {0}'.format(parallelBulkThreadCount)) |
|
print('Bulk size: {0}'.format(bulkDocsSize)) |
|
print('Consumer count: {0}'.format(consumerCount)) |
|
|
|
client_options = { |
|
"http_auth": (esUsername, esPassword), |
|
"scheme": esScheme, |
|
"port": esPort, |
|
"sniff_on_start": True, |
|
"ca_certs": False, |
|
"verify_certs": False, |
|
"max_size": 200, |
|
"http_compress": esHttpCompression |
|
} |
|
|
|
# Queues used for communication between producer and consumers |
|
histTasksQueue = Queue() |
|
liveTasksQueue = Queue() |
|
|
|
# Structures used for/by consumers |
|
consumers = [] |
|
consumerSpeeds = [] |
|
consumerStatus = [] |
|
CONSUMER_PAUSED = 1 << 0 # bit used to pause a consumer |
|
CONSUMER_PROCESSING = 1 << 1 # bit set to 1 if a consumer is processing a task |
|
|
|
# Variable that, if set to 1, instructs everything to stop |
|
# This variable can be safely shared between threads and processes |
|
stopProcessing = Value('i', 0) |
|
|
|
indexMapping = { |
|
'settings': { |
|
'number_of_shards': esShards, |
|
'number_of_replicas': esReplicas, |
|
'refresh_interval': f'{refresh_interval}s' |
|
}, |
|
"mappings": { |
|
"properties": { |
|
"geotimehash": { |
|
"type": "long" |
|
}, |
|
"hpe": { |
|
"type": "float" |
|
}, |
|
"imei": { |
|
"type": "long" |
|
}, |
|
"imsi": { |
|
"type": "long" |
|
}, |
|
"location": { |
|
"type": "geo_point" |
|
}, |
|
"msisdn": { |
|
"type": "long" |
|
}, |
|
"time": { |
|
"type": "date", |
|
"format": "epoch_second" |
|
}, |
|
"type": { |
|
"type": "keyword" |
|
} |
|
} |
|
} |
|
} |
|
|
|
if not proceedWithIngestion(timeStart, timeEnd, ReadingInterval, totalUsers, consumerCount, index_prefix): |
|
print('Aborting as per user request.') |
|
exit(1) |
|
|
|
# create consumers |
|
for c in range(1, consumerCount + 1): |
|
consumerSpeeds.append(Value('d', 0)) |
|
consumerStatus.append(Value('i', 0)) |
|
consumers.append(Process(target=consumerThread, |
|
args=('thread{0}'.format(c), |
|
histTasksQueue, |
|
liveTasksQueue, |
|
stopProcessing, |
|
seeds, |
|
client_options, |
|
bulkDocsSize, |
|
parallelBulkThreadCount, |
|
indexMapping, |
|
consumerSpeeds[-1], |
|
consumerStatus[-1]))) |
|
|
|
# start consumers |
|
for c in consumers: |
|
c.start() |
|
|
|
# create the background thread that will intercept user input |
|
userInputThread = threading.Thread(target=processUserInput) |
|
userInputThread.start() |
|
|
|
# create the background thread that will produce live tasks, if needed |
|
liveDataThread = None |
|
if createLiveData: |
|
liveDataThread = threading.Thread(target=produceLiveTasks, |
|
args=(firstLiveDataTimestamp(ReadingInterval), |
|
ReadingInterval, |
|
totalUsers, |
|
liveTasksQueue)) |
|
liveDataThread.start() |
|
|
|
# execute procedure that will produce historical tasks |
|
produceHistoricalTasks(timeStart, timeEnd, ReadingInterval, totalUsers, consumers, histTasksQueue) |
|
|
|
if stopProcessing.value == 0: |
|
if not liveTasksQueue.empty(): |
|
print('Waiting for live tasks queue to become empty...') |
|
while not liveTasksQueue.empty(): |
|
sleep(5) |
|
if not histTasksQueue.empty(): |
|
print('Waiting for historical tasks queue to become empty...') |
|
while not histTasksQueue.empty(): |
|
sleep(5) |
|
|
|
print('Stopping threads...') |
|
with stopProcessing.get_lock(): |
|
stopProcessing.value = 1 |
|
|
|
for c in consumers: |
|
c.join() |
|
|
|
userInputThread.join() |
|
if liveDataThread: |
|
liveDataThread.join() |
|
print('Done.') |