Skip to content

Instantly share code, notes, and snippets.

@scampi
Created September 13, 2024 07:11
Show Gist options
  • Save scampi/07e7bd556fe016a5cba6c092c3f418fb to your computer and use it in GitHub Desktop.
Save scampi/07e7bd556fe016a5cba6c092c3f418fb to your computer and use it in GitHub Desktop.
synthetic generation of a phone locations network

Phone network generation

This utility generates documents with positions of mobile phones in time based indices.

In order to setup a Python 3 virtual environment to run the script and install its dependencies, execute the following:

python3 -mvenv env
env/bin/pip install -U pip
env/bin/pip install -r requirements.txt

Example:

env/bin/python3 -W ignore generator.py \
  --start-date "2020-08-01 00:00:00" \
  --end-date "2020-08-31 23:59:59" \
  --es-seeds es-data-4 \
  --es-port 9200 \
  --sampling-interval 3600 \
  --phone-numbers 6500000 \
  --es-scheme https \
  --es-username admin \
  --es-password password \
  --es-shards 8 \
  --es-replicas 1\
  --es-index-prefix positions- \
  --bulk-thread-count 45 \
  --consumer-count 40

Arguments:

  • start-date: the UTC starting date
  • end-date: the UTC starting date
  • es-seeds: the node that will be used to sniff information about the Elasticsearch cluster
  • es-port: the Elasticsearch port
  • sampling-interval: the interval between generated documents for a phone number
  • phone-numbers: the amount of unique phone numbers to use during the generation process
  • es-scheme: the scheme of the Elasticsearch URL (http or https)
  • es-username: the username of the Elasticsearch user that will index the data
  • es-password: the password of the Elasticsearch user above
  • es-shards: the number of shards per index
  • es-replicas: the number of replicas per index
  • es-index-prefix: the prefix of the generated indices
  • bulk-thread-count: the number of thread producing bulk requests
  • consumer-count: the number of clients producing requests
import configargparse
import math
import datetime
import time
from time import sleep
from dateutil.parser import parse
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, parallel_bulk
from multiprocessing import Queue, Value, Process
from queue import Full, Empty
import random
import sys
import select
import threading
# Remember that, while this program is running, a minimal user interaction is allowed to send commands. Just type
# the command and press ENTER to have it processed. Commands like 'inc', 'dec', 'stat' and 'stop' are supported to
# add/pause consumer threads, see current statistics, stop the whole process and more.
# Once the 'stop' command has been issued, allow for consumer threads to stop their currently active tasks.
def getBooleanBitValue(value, bit_mask):
if value & bit_mask:
return True
else:
return False
def printStatus():
paused = 0
pausedStr = ''
speedStr = ''
processingStr = ''
for idx in range(len(consumers)):
pausedStr = (pausedStr + ' ' + str(getBooleanBitValue(consumerStatus[idx].value, CONSUMER_PAUSED))).strip()
processingStr = (processingStr + ' ' + str(getBooleanBitValue(consumerStatus[idx].value, CONSUMER_PROCESSING))).strip()
if consumerStatus[idx].value & CONSUMER_PAUSED:
paused = paused + 1
with consumerSpeeds[idx].get_lock():
if consumerSpeeds[idx].value == 0:
speedStr = ('{0} 0'.format(speedStr)).strip()
else:
speedStr = ('{0} {1:.0f}'.format(speedStr, 1000000 / consumerSpeeds[idx].value)).strip()
print('Consumer threads status: {0} total - {1} paused'.format(len(consumers), paused))
print(' Paused: {0}'.format(pausedStr))
print(' Processing: {0}'.format(processingStr))
print(' Speeds: {0} records / sec'.format(speedStr))
return
def increaseActiveConsumers():
print('Increasing active consumers...')
done = False
# search for a paused consumer
for idx in range(len(consumers)):
if consumerStatus[idx].value & CONSUMER_PAUSED == 1:
print('Unpausing consumer thread {0}'.format(idx + 1))
with consumerStatus[idx].get_lock():
consumerStatus[idx].value = consumerStatus[idx].value & ~CONSUMER_PAUSED
done = True
break
if not done:
print('Creating new consumer thread')
idx = len(consumers) + 1
consumerSpeeds.append(Value('d', 0))
consumerStatus.append(Value('i', 0))
consumers.append(Process(target=consumerThread,
args=('thread{0}'.format(idx),
histTasksQueue,
liveTasksQueue,
stopProcessing,
seeds,
client_options,
bulkDocsSize,
parallelBulkThreadCount,
indexMapping,
consumerSpeeds[-1],
consumerStatus[-1])))
# start consumer
consumers[-1].start()
printStatus()
return
def decreaseActiveConsumers():
print('Decreasing active consumers')
done = False
# search for a non-paused consumer
for idx in range(len(consumers) - 1, -1, -1):
if consumerStatus[idx].value & CONSUMER_PAUSED == 0:
print('Pausing consumer thread {0}'.format(idx + 1))
with consumerStatus[idx].get_lock():
consumerStatus[idx].value = consumerStatus[idx].value | CONSUMER_PAUSED
done = True
break
if not done:
print('All consumer threads are already paused')
printStatus()
return
def processUserInput():
print('Starting user input processor')
askedToQuit = False
while stopProcessing.value == 0:
while sys.stdin in select.select([sys.stdin], [], [], 0)[0]:
line = sys.stdin.readline()
if line:
printHelp = False
line = line.strip('\n\r')
if line:
print('GOT <{0}> command'.format(line))
if line == 'quit':
askedToQuit = True
elif line == 'inc':
increaseActiveConsumers()
elif line == 'dec':
decreaseActiveConsumers()
elif line == 'stat':
printStatus()
elif line == 'help':
printHelp = True
elif line == 'stop':
printStatus()
with stopProcessing.get_lock():
print('Initiated shutdown procedure...')
stopProcessing.value = 1
if printHelp:
print('Supported commands:')
print(' - inc : increases the number of unpaused consumer threads (creating new ones if needed)')
print(' - dec : pauses one more consumer thread')
print(' - stat : print some statistics')
print(' - stop : starts the shutdown procedure for this whole process')
print(' - help : show this message')
if askedToQuit:
print('Quitting on user request...')
break
sleep(0.5)
print('Stopped user input processor')
return
def computeGlobalStats(stats_):
result = 0
secs = 0
recs = 0
samples = 0
for s in stats_:
if s.value != 0:
secs = secs + s.value
recs = recs + 1000000
samples = samples + 1
if secs != 0:
result = (recs / secs) * samples
return result
# Generate a position inside a square starting from a seed
def WhereAmINow(time_, seed_, MaxXPos_, MinXPos_, MaxYPos_, MinYPos_):
shifter1 = (seed_ % 100) / 100 # a number between 0 and 1 dependant on the seed (E.g. userID)
shifter2 = (seed_ % 33) / 33
lat = MinXPos_ + ((MaxXPos_ - MinXPos_) / 2) * math.sin(time_ * (1 + shifter2 / 4) / 10000) + (
MaxXPos_ - MinXPos_) * 0.1 * shifter1
lon = MinYPos_ + ((MaxYPos_ - MinYPos_) / 2) * math.cos(time_ * (1 + shifter2 / 3) / 12000)
return lat, lon
def genIndexName(flag_, data_timeStart):
if flag_ == "hist":
index_name = index_prefix + str(datetime.datetime.fromtimestamp(data_timeStart).date())
else:
index_name = index_prefix + "live-" + str(datetime.datetime.fromtimestamp(time.time()).date())
return index_name
# Generate a a set of positions inside a square for a bunch of people
def generatePositionsForAll(data_timeStart, startUser_, endUser_, flag_, readingInterval_):
index_name = genIndexName(flag_, data_timeStart)
for user in range(startUser_, endUser_ + 1):
baseDoc = {
"msisdn": int("49" + str(user).rjust(10, "0")),
"imei": int("7777" + str(user).rjust(10, "0")),
"imsi": int("98888" + str(user).rjust(8, "0")),
"time": data_timeStart + user % readingInterval_,
"hpe": random.uniform(1, 10),
"type": "GNSS"
}
position = WhereAmINow(data_timeStart, user, MaxXPos, MinXPos, MaxYPos, MinYPos)
baseDoc["location"] = {"lat": position[0], "lon": position[1]}
baseDoc["geotimehash"] = ((int((float(baseDoc["location"]["lat"])) * 2000) *
int((float(baseDoc["location"]["lon"])) * 2000) *
int(baseDoc["time"] / 600))) % 2000000000
document = baseDoc.copy()
"""
yield {
"_index": index_name,
"_type": "doc",
"_source": document
}
"""
yield {
"_index": index_name,
"_source": document
}
return
# Consumer thread
def consumerThread(threadName_, histTasksQueue_, liveTasksQueue_, stopProcessing_,
esSeeds_, esClientOptions_,
bulkDocsSize_, bulkThreadCount_,
indexMapping_,
speedStat_,
status_):
print('Starting thread {0}'.format(threadName_))
# Initialize connection object
client = Elasticsearch(esSeeds_, **esClientOptions_)
while stopProcessing_.value == 0:
if status_.value & CONSUMER_PAUSED:
# print('Thread {0} is paused...'.format(threadName_))
with speedStat_.get_lock():
speedStat_.value = 0
sleep(1)
continue
# search for a pending task first in the 'live' queue and then in the 'historical' one
try:
task = liveTasksQueue_.get(True, 1)
except Empty:
task = None
if task is None:
try:
task = histTasksQueue_.get(True, 1)
except Empty:
continue
with status_.get_lock():
status_.value = status_.value | CONSUMER_PROCESSING
print('Thread {0} - Starting task {1} ({2})'.format(threadName_,
task,
datetime.datetime.fromtimestamp(
task['timeStart']).isoformat()))
starttime = time.time()
indexName = genIndexName(task['flag'], task['timeStart'])
try:
client.indices.create(index=indexName, body=indexMapping_)
print('Thread {0} created index {1}'.format(threadName_, indexName))
except:
pass
for success, info in parallel_bulk(client,
generatePositionsForAll(task['timeStart'],
task['startUser'],
task['endUser'],
task['flag'],
task['readingInterval']),
thread_count=bulkThreadCount_,
chunk_size=bulkDocsSize_,
request_timeout=240,
raise_on_exception=True,
raise_on_error=True):
pass
with speedStat_.get_lock():
speedStat_.value = (time.time() - starttime) / (task['endUser'] - task['startUser'] + 1) * 1000000
with status_.get_lock():
status_.value = status_.value & ~CONSUMER_PROCESSING
print('Thread {0} has finished'.format(threadName_))
return
def produceLiveTasks(timeStart_, interval_, totalUsers_, queue_):
"""
Produces tasks to create live data. Live data is divided in bursts, one every 'interval' seconds. Every
burst will contain a sample for each user (phone number)
:param timeStart_:
:param interval_: seconds between bursts
:param totalUsers_:
:param queue_: queue where to add tasks
:return:
"""
print('Starting producer of live data. Will start at {0}...'.format(datetime.datetime.fromtimestamp(
timeStart_).isoformat()))
while stopProcessing.value == 0:
if time.time() < timeStart_:
sleep(1)
else:
startUser = 0
while (startUser < totalUsers_) and (stopProcessing.value == 0):
endUser = startUser + 1000000 - 1
if endUser >= totalUsers_:
endUser = totalUsers_ - 1
task = {'timeStart': timeStart_,
'startUser': startUser,
'endUser': endUser,
'flag': 'hist',
'readingInterval': interval_}
print('Creating task {0} ({1})'.format(task,
datetime.datetime.fromtimestamp(timeStart_).isoformat()))
queue_.put(task)
startUser = endUser + 1
timeStart_ = timeStart_ + interval_
print('Will produce new live data at {0}...'.format(datetime.datetime.fromtimestamp(
timeStart_).isoformat()))
print('Producer of live data has stopped')
return
def produceHistoricalTasks(timeStart_, timeEnd_, interval_, totalUsers_, consumers_, queue_):
while (timeStart_ <= timeEnd_) and (stopProcessing.value == 0):
startUser = 0
while (startUser < totalUsers_) and (stopProcessing.value == 0):
endUser = startUser + 1000000 - 1
if endUser >= totalUsers_:
endUser = totalUsers_ - 1
task = {'timeStart': timeStart_,
'startUser': startUser,
'endUser': endUser,
'flag': 'hist',
'readingInterval': interval_}
# wait for taskQueue to be below a certain level
while (queue_.qsize() > len(consumers_) * 2) and (stopProcessing.value == 0):
sleep(1)
if stopProcessing.value == 0:
print('Creating task {0} ({1})'.format(task,
datetime.datetime.fromtimestamp(timeStart_).isoformat()))
queue_.put(task)
totalSpeed = computeGlobalStats(consumerSpeeds)
if totalSpeed > 0:
print('STATS: {0:.0f} records / sec'.format(totalSpeed))
startUser = endUser + 1
timeStart_ += interval_
print('Finished creating historical tasks')
return
def firstLiveDataTimestamp(interval_):
"""
:param interval_: sampling interval in seconds
:return: the first timestamp to use for live data. It's rounded based on interval_ starting at midnight
"""
result = math.floor(time.time()) + interval_ - 1
result = result - result % interval_
print('First live sample will be at {0}'.format(datetime.datetime.fromtimestamp(result).isoformat()))
return result
def proceedWithIngestion(timeStart_, timeEnd_, interval_, totalUsers_, consumerCount_, indexPrefix_):
result = True
# Compute how many blocks we need to send to ES according to the time interval we want to fill
missingDataInSec = timeEnd_ - timeStart_
missingBlocks = math.ceil(missingDataInSec / interval_)
blocksXDay = (24 * 3600) / interval_
# Computing estimates
bytes1doc = 180
totalDocNumber = totalUsers_ * missingBlocks
days = totalDocNumber / (blocksXDay * totalUsers_)
totalDocBytes = bytes1doc * totalDocNumber
estimatedRateOfIngestion = 25000
estimatedTime = round(totalDocNumber / estimatedRateOfIngestion / 3600 / consumerCount_, 2)
# Do you want to ingest?
while True:
print('{0} documents will be sent to {1}:'.format(totalDocNumber, seeds))
print(' Total Days: {0}'.format(days))
print(' Index Prefix: {0}'.format(indexPrefix_))
print(' Estimated Size: {0:.0f} Bytes (approx {1:.2f} GB)'.format(totalDocBytes,
(totalDocBytes / 1073741824)))
print(' Estimated Size x day: {0:.0f} Bytes (approx {1:.2f} GB)'.format(totalDocBytes / days,
(totalDocBytes / days / 1073741824)))
print(' Estimated Time of Completion: {0} hours'.format(estimatedTime))
startIngestionWarning = input("Are you sure you want to continue? y/n \n")
if startIngestionWarning.lower() in ["y", "yes"]:
break
elif startIngestionWarning.lower() in ["n", "no"]:
result = False
break
else:
print("please enter valid answer (accepted y,n,yes,no)")
return result
# Imaginary square around Berlin in coords, X = lat, Y = long
MaxXPos = 53.000000
MinXPos = 51.000000
MaxYPos = 15.000000
MinYPos = 11.000000
parser = configargparse.ArgumentParser('bigDataCreator',
formatter_class=configargparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--start-date', action='store', dest='startDate', required=True, help='Start date in string format')
parser.add_argument('--end-date', action='store', dest='endDate', required=True, help='Start date in string format')
parser.add_argument('--phone-numbers', action='store', dest='phoneNumbers', required=True, type=int,
help='Number of unique phone numbers to create')
parser.add_argument('--live-data', action='store', dest='liveData', default=False,
type=lambda x: (str(x).lower() in ['true', '1', 'yes']),
help='True if live data must be generated')
parser.add_argument('--es-seeds', action='store', dest='esSeeds', required=True, type=str, nargs='+',
help='One or more ES nodes to connect to (separated with space)')
parser.add_argument('--es-port', action='store', dest='esPort', default=9200, type=int,
help='TCP/IP port to connect to ES')
parser.add_argument('--es-scheme', action='store', dest='esScheme', default='http',
help='Scheme to use to connect to ES (http or https)')
parser.add_argument('--es-http-compression', action='store', dest='esHttpCompression', default=False,
type=lambda x: (str(x).lower() in ['true', '1', 'yes']),
help='Set to True to enable ES http compression')
parser.add_argument('--es-username', action='store', dest='esUsername', default='',
help='Username to connect to ES')
parser.add_argument('--es-password', action='store', dest='esPassword', default='',
help='Password to connect to ES')
parser.add_argument('--es-index-prefix', action='store', dest='esIndexPrefix', default='positions-',
help='ES index prefix to be used')
parser.add_argument('--es-refresh-interval', action='store', dest='esRefreshInterval', type=int, default=120,
help='Refresh interval set for generated ES indexes')
parser.add_argument('--es-shards', action='store', dest='esShards', type=int, default=1,
help='Number of shards per ES index')
parser.add_argument('--es-replicas', action='store', dest='esReplicas', type=int, default=1,
help='Number of replicas per ES index')
parser.add_argument('--sampling-interval', action='store', dest='samplingInterval', type=int, default=3600,
help='Sampling interval (in seconds)')
parser.add_argument('--bulk-thread-count', action='store', dest='bulkThreadCount', type=int, default=4,
help='Size of the threadpool to use for the bulk requests')
parser.add_argument('--bulk-docs-size', action='store', dest='bulkDocsSize', type=int, default=600,
help='Number of docs in one chunk sent to ES')
parser.add_argument('--consumer-count', action='store', dest='consumerCount', type=int, default=4,
help='Number of parallel threads used to send data to ES')
args = parser.parse_args()
seeds = args.esSeeds
esPort = args.esPort
esScheme = args.esScheme
esHttpCompression = args.esHttpCompression
esUsername = args.esUsername
esPassword = args.esPassword
index_prefix = args.esIndexPrefix
refresh_interval = args.esRefreshInterval
createLiveData = args.liveData
esShards = args.esShards
esReplicas = args.esReplicas
timeStart = round(parse(args.startDate).timestamp())
timeEnd = round(parse(args.endDate).timestamp())
totalUsers = args.phoneNumbers
ReadingInterval = args.samplingInterval
parallelBulkThreadCount = args.bulkThreadCount
bulkDocsSize = args.bulkDocsSize
consumerCount = args.consumerCount
print('ES seeds: {0} - Port: {1} - Scheme: {2} - Compression: {3}'.format(seeds, esPort, esScheme, esHttpCompression))
print('ES username: <{0}> - Password: <{1}>'.format(esUsername, esPassword))
print('ES index prefix: {0} - Refresh interval: {1} - Shards: {2} - Replicas: {3}'.format(index_prefix,
refresh_interval,
esShards,
esReplicas))
print('Dates from {0} to {1}'.format(datetime.datetime.fromtimestamp(timeStart).isoformat(),
datetime.datetime.fromtimestamp(timeEnd).isoformat()))
print('Phone numbers to create: {0}'.format(totalUsers))
print('Sampling interval: {0}s'.format(ReadingInterval))
print('Generate live data: {0}'.format(createLiveData))
print('Parallel bulk thread count: {0}'.format(parallelBulkThreadCount))
print('Bulk size: {0}'.format(bulkDocsSize))
print('Consumer count: {0}'.format(consumerCount))
client_options = {
"http_auth": (esUsername, esPassword),
"scheme": esScheme,
"port": esPort,
"sniff_on_start": True,
"ca_certs": False,
"verify_certs": False,
"max_size": 200,
"http_compress": esHttpCompression
}
# Queues used for communication between producer and consumers
histTasksQueue = Queue()
liveTasksQueue = Queue()
# Structures used for/by consumers
consumers = []
consumerSpeeds = []
consumerStatus = []
CONSUMER_PAUSED = 1 << 0 # bit used to pause a consumer
CONSUMER_PROCESSING = 1 << 1 # bit set to 1 if a consumer is processing a task
# Variable that, if set to 1, instructs everything to stop
# This variable can be safely shared between threads and processes
stopProcessing = Value('i', 0)
indexMapping = {
'settings': {
'number_of_shards': esShards,
'number_of_replicas': esReplicas,
'refresh_interval': f'{refresh_interval}s'
},
"mappings": {
"properties": {
"geotimehash": {
"type": "long"
},
"hpe": {
"type": "float"
},
"imei": {
"type": "long"
},
"imsi": {
"type": "long"
},
"location": {
"type": "geo_point"
},
"msisdn": {
"type": "long"
},
"time": {
"type": "date",
"format": "epoch_second"
},
"type": {
"type": "keyword"
}
}
}
}
if not proceedWithIngestion(timeStart, timeEnd, ReadingInterval, totalUsers, consumerCount, index_prefix):
print('Aborting as per user request.')
exit(1)
# create consumers
for c in range(1, consumerCount + 1):
consumerSpeeds.append(Value('d', 0))
consumerStatus.append(Value('i', 0))
consumers.append(Process(target=consumerThread,
args=('thread{0}'.format(c),
histTasksQueue,
liveTasksQueue,
stopProcessing,
seeds,
client_options,
bulkDocsSize,
parallelBulkThreadCount,
indexMapping,
consumerSpeeds[-1],
consumerStatus[-1])))
# start consumers
for c in consumers:
c.start()
# create the background thread that will intercept user input
userInputThread = threading.Thread(target=processUserInput)
userInputThread.start()
# create the background thread that will produce live tasks, if needed
liveDataThread = None
if createLiveData:
liveDataThread = threading.Thread(target=produceLiveTasks,
args=(firstLiveDataTimestamp(ReadingInterval),
ReadingInterval,
totalUsers,
liveTasksQueue))
liveDataThread.start()
# execute procedure that will produce historical tasks
produceHistoricalTasks(timeStart, timeEnd, ReadingInterval, totalUsers, consumers, histTasksQueue)
if stopProcessing.value == 0:
if not liveTasksQueue.empty():
print('Waiting for live tasks queue to become empty...')
while not liveTasksQueue.empty():
sleep(5)
if not histTasksQueue.empty():
print('Waiting for historical tasks queue to become empty...')
while not histTasksQueue.empty():
sleep(5)
print('Stopping threads...')
with stopProcessing.get_lock():
stopProcessing.value = 1
for c in consumers:
c.join()
userInputThread.join()
if liveDataThread:
liveDataThread.join()
print('Done.')
certifi==2020.6.20
ConfigArgParse==1.2.3
elasticsearch==7.9.1
pipi==1.0.1
python-dateutil==2.8.1
six==1.15.0
urllib3==1.25.10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment