Skip to content

Instantly share code, notes, and snippets.

@jomido
Created July 19, 2017 14:44
Show Gist options
  • Save jomido/bc78b109925396f5cc075420eff2799a to your computer and use it in GitHub Desktop.
Save jomido/bc78b109925396f5cc075420eff2799a to your computer and use it in GitHub Desktop.
Messing About with Elasticsearch (in Python)
# stdlib
import json
import os
from functools import partial
# 3rd party
import certifi
import elasticsearch
# TODO: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-bool-query.html
# use bool queries too (maybe instead of and and or)
def And(*clauses):
return {
"query": {
"filtered": {
"filter": {
"and": clauses
}
}
}
}
def Or(*clauses):
return {
"query": {
"filtered": {
"filter": {
"or": clauses
}
}
}
}
def Exists(field):
return {"exists": {"field": field}}
def Match(field, value):
return {"match": {field: value}}
def Script(*statements):
return {
"inline": '; '.join(statements)
}
def debug_elasticsearch(log='elastic.log', trace='elastic.trace.log'):
"""
Call this one time to turn on a trace to log files. This will log the
HTTP calls, which you can then execute using curl.
"""
import logging
import logging.handlers
es_logger = logging.getLogger('elasticsearch')
es_logger.propagate = False
es_logger.setLevel(logging.INFO)
es_logger_handler = logging.handlers.RotatingFileHandler(
log,
maxBytes=0.5*10**9,
backupCount=3
)
es_logger.addHandler(es_logger_handler)
es_tracer = logging.getLogger('elasticsearch.trace')
es_tracer.propagate = False
es_tracer.setLevel(logging.DEBUG)
es_tracer_handler = logging.handlers.RotatingFileHandler(
trace,
maxBytes=0.5*10**9,
backupCount=3
)
es_tracer.addHandler(es_tracer_handler)
def get_elastic(url):
"""
Return an Elasticsearch instance
"""
elastic = elasticsearch.Elasticsearch(
hosts=[url],
verify_certs=True,
use_ssl=True,
ca_certs=certifi.where()
)
return elastic
def get_operation(elastic, method, **kwargs):
"""
Return an es operation bound to **kwargs.
"""
return partial(getattr(elastic, method), **kwargs)
def get_document_op(elastic, index, doc_type):
"""
Get an operation bound to an :index and :doc_type, so that just the
:method remains to be specified
"""
return partial(get_operation, elastic, doc_type=doc_type, index=index)
def enumerate_call(search, call_id, link, call_side, show=None):
"""
List the fields on a call document, showing any in :show
"""
doc_id = '_'.join([call_id, link, call_side])
print('Fetching {}...'.format(doc_id))
query = And([
Match('call_id', call_id),
Match('call_side', call_side),
Match('link', link)
])
result = search(
body=query,
size=2,
sort=['created:desc', 'call_id']
)
documents = result['hits']['hits']
for doc in documents:
print(json.dumps(doc, indent=2, sort_keys=True))
print(json.dumps(
sorted(doc['_source'].keys()),
indent=2,
sort_keys=True
))
for field in show:
print('{}:\n'.format(field))
print(json.dumps(
doc['_source'].get(field),
indent=2,
sort_keys=True
))
def doc_by_id(search, doc_id):
query = And([Match('doc_id', doc_id)])
result = search(
body=query,
size=1
)
documents = result['hits']['hits']
print(documents)
for doc in documents:
print('doc id: {}'.format(doc['_id']))
print(json.dumps(
sorted(doc['_source'].keys()),
indent=2,
sort_keys=True
))
def docs_with_fields(search, fields, size=1):
"""
Get documents that have any field in :fields.
"""
query = Or([Exists(field) for field in fields])
result = search(
body=query,
size=size,
sort=['created:desc', 'call_id']
)
return result
def count_docs_with_fields(search, fields):
"""
Get a count of documents that have any field in :fields.
"""
result = docs_with_fields(search, fields)
return result['hits']['total']
def list_docs_with_fields(search, fields, size=1):
"""
Get a count of documents that have any field in :fields.
"""
result = docs_with_fields(search, fields, size=size)
return [d['_id'] for d in result['hits']['hits']]
def last_N_processed(search, fields, N=1):
doc_ids = list_docs_with_fields(search, fields, size=N * 2)
return [d for d in doc_ids if 'caller' in d]
def get_operations(elastic, doc_type, read_idx, write_idx, read_ops,
write_ops):
"""
Binds indexes and doc_type to es operations
"""
ops = []
get_read_op = get_document_op(elastic, read_idx, doc_type)
get_write_op = get_document_op(elastic, write_idx, doc_type)
for op in read_ops:
ops.append(get_read_op(op))
for op in write_ops:
ops.append(get_write_op(op))
return ops
def get_ops(url, doc_type, read_index, write_index):
# instance
elastic = get_elastic(url)
# bound operations
search, index, update, update_by_query = get_operations(
elastic,
doc_type,
read_index,
write_index,
['search'],
['index', 'update', 'update_by_query']
)
return search, index, update, update_by_query
def transcript_context():
url = os.environ["ELASTICSEARCH_WRITE_URL"]
doc_type = 'transcripts'
read_index = 'echelon-read'
write_index = 'echelon-write'
return (url, doc_type, read_index, write_index)
def main():
# context
url = os.environ["ELASTICSEARCH_WRITE_URL"]
doc_type = 'transcripts'
read_index = 'echelon-read'
write_index = 'echelon-write'
# create the operations (these are functions)
search, index, update, update_by_query = get_ops(
url,
doc_type,
read_index,
write_index
)
# uncomment this to turn on tracing/logging
# debug_elasticsearch()
call = '00ad34d8-d09e-1235-06b0-0cc47a392728_0_caller'
call_id, link, call_side = call.split('_')
enumerate_call(search, call_id, link, call_side, show=[
'post_features', # 'model_events' # 'post_events'
])
# doc_ids = last_N_processed(search, ['post_features'], N=50)
# for id in doc_ids:
# print(id)
# doc_id = 'jdobson-test-12345'
# doc_by_id(search, doc_id)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment