Skip to content

Instantly share code, notes, and snippets.

@danield137
Created June 28, 2022 09:32
Show Gist options
  • Save danield137/47e56c7cfd49c8fc8d3c3ae80fcced36 to your computer and use it in GitHub Desktop.
Save danield137/47e56c7cfd49c8fc8d3c3ae80fcced36 to your computer and use it in GitHub Desktop.
Index a sample of a kusto table into elasticsearch
from typing import List
import requests
import json
import time
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from datetime import datetime, timedelta
def main():
def read_kusto_data(q):
results = kc.execute_query(KUSTO_DB ,q)
return [row[0] for row in results.primary_results[0]]
def push_objects_to_es(collection_name: str, objects: List[dict]):
for object in data:
result = session.post(f'{ES_URI}/{collection_name}/_doc', json=object, headers={'Content-Type': 'application/json'})
result.raise_for_status()
return result
# connect to es
session = requests.session()
session.auth = ES_AUTH
session.verify = False
# because elastic only knows how to handle lowercase index names
es_index_name = KUSTO_TABLE.lower()
# ensure index exists
try:
indices = session.get(f'{ES_URI}/_cat/indices')
if indices.status_code == 200 and (not indices.content or KUSTO_TABLE not in indices):
print(f'Creating index : {es_index_name}')
result = session.put(f'{ES_URI}/{es_index_name}')
print(f'Creating index : {es_index_name}, result: {result.json()}')
else:
print(f'Index {es_index_name} exists')
except requests.exceptions.HTTPError as e:
print('Index already exists', e)
# connect to kusto
kcsb = KustoConnectionStringBuilder.with_az_cli_authentication(KUSTO_URI)
kc = KustoClient(kcsb)
# read sample data from kusto
d = datetime.now() - timedelta(hours=12)
batch_size = 100
last_query_time = 0
courtesy_backoff_time_in_seconds = 15
for iteration in range(60 * 60):
time_since_last_query = time.time() - last_query_time
if time_since_last_query < courtesy_backoff_time_in_seconds:
sleep_time = int(min(courtesy_backoff_time_in_seconds - time_since_last_query, courtesy_backoff_time_in_seconds))
print(f'#{iteration}) Sleeping for {sleep_time} seconds')
time.sleep(sleep_time)
time_range = f"(datetime({d.isoformat()}) .. 1s)"
query = f'{KUSTO_TABLE} | where Timestamp between {time_range} | take {batch_size} | extend PackedRow = pack_all() | project PackedRow'
data = read_kusto_data(query)
last_query_time = time.time()
print(f"#{iteration}) submitting {batch_size} records in range {time_range} from {KUSTO_TABLE} to {es_index_name} ...")
result = push_objects_to_es(es_index_name, data)
print(f"${iteration}) Done. result: {result.status_code}")
result.raise_for_status()
d += timedelta(seconds=1)
return data
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment