pip install elasticsearch
Read https://elasticsearch-py.readthedocs.io/en/master/
Read example_es.py
. :)
pip install elasticsearch
Read https://elasticsearch-py.readthedocs.io/en/master/
Read example_es.py
. :)
# stdlib | |
import json | |
import os | |
from functools import partial | |
# 3rd party | |
import certifi | |
import elasticsearch | |
# TODO: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-bool-query.html | |
# use bool queries too (maybe instead of and and or) | |
def And(*clauses): | |
return { | |
"query": { | |
"filtered": { | |
"filter": { | |
"and": clauses | |
} | |
} | |
} | |
} | |
def Or(*clauses): | |
return { | |
"query": { | |
"filtered": { | |
"filter": { | |
"or": clauses | |
} | |
} | |
} | |
} | |
def Exists(field): | |
return {"exists": {"field": field}} | |
def Match(field, value): | |
return {"match": {field: value}} | |
def Script(*statements): | |
return { | |
"inline": '; '.join(statements) | |
} | |
def debug_elasticsearch(log='elastic.log', trace='elastic.trace.log'): | |
""" | |
Call this one time to turn on a trace to log files. This will log the | |
HTTP calls, which you can then execute using curl. | |
""" | |
import logging | |
import logging.handlers | |
es_logger = logging.getLogger('elasticsearch') | |
es_logger.propagate = False | |
es_logger.setLevel(logging.INFO) | |
es_logger_handler = logging.handlers.RotatingFileHandler( | |
log, | |
maxBytes=0.5*10**9, | |
backupCount=3 | |
) | |
es_logger.addHandler(es_logger_handler) | |
es_tracer = logging.getLogger('elasticsearch.trace') | |
es_tracer.propagate = False | |
es_tracer.setLevel(logging.DEBUG) | |
es_tracer_handler = logging.handlers.RotatingFileHandler( | |
trace, | |
maxBytes=0.5*10**9, | |
backupCount=3 | |
) | |
es_tracer.addHandler(es_tracer_handler) | |
def get_elastic(url): | |
""" | |
Return an Elasticsearch instance | |
""" | |
elastic = elasticsearch.Elasticsearch( | |
hosts=[url], | |
verify_certs=True, | |
use_ssl=True, | |
ca_certs=certifi.where() | |
) | |
return elastic | |
def get_operation(elastic, method, **kwargs): | |
""" | |
Return an es operation bound to **kwargs. | |
""" | |
return partial(getattr(elastic, method), **kwargs) | |
def get_document_op(elastic, index, doc_type): | |
""" | |
Get an operation bound to an :index and :doc_type, so that just the | |
:method remains to be specified | |
""" | |
return partial(get_operation, elastic, doc_type=doc_type, index=index) | |
def enumerate_call(search, call_id, link, call_side, show=None): | |
""" | |
List the fields on a call document, showing any in :show | |
""" | |
doc_id = '_'.join([call_id, link, call_side]) | |
print('Fetching {}...'.format(doc_id)) | |
query = And([ | |
Match('call_id', call_id), | |
Match('call_side', call_side), | |
Match('link', link) | |
]) | |
result = search( | |
body=query, | |
size=2, | |
sort=['created:desc', 'call_id'] | |
) | |
documents = result['hits']['hits'] | |
for doc in documents: | |
print(json.dumps(doc, indent=2, sort_keys=True)) | |
print(json.dumps( | |
sorted(doc['_source'].keys()), | |
indent=2, | |
sort_keys=True | |
)) | |
for field in show: | |
print('{}:\n'.format(field)) | |
print(json.dumps( | |
doc['_source'].get(field), | |
indent=2, | |
sort_keys=True | |
)) | |
def doc_by_id(search, doc_id): | |
query = And([Match('doc_id', doc_id)]) | |
result = search( | |
body=query, | |
size=1 | |
) | |
documents = result['hits']['hits'] | |
print(documents) | |
for doc in documents: | |
print('doc id: {}'.format(doc['_id'])) | |
print(json.dumps( | |
sorted(doc['_source'].keys()), | |
indent=2, | |
sort_keys=True | |
)) | |
def docs_with_fields(search, fields, size=1): | |
""" | |
Get documents that have any field in :fields. | |
""" | |
query = Or([Exists(field) for field in fields]) | |
result = search( | |
body=query, | |
size=size, | |
sort=['created:desc', 'call_id'] | |
) | |
return result | |
def count_docs_with_fields(search, fields): | |
""" | |
Get a count of documents that have any field in :fields. | |
""" | |
result = docs_with_fields(search, fields) | |
return result['hits']['total'] | |
def list_docs_with_fields(search, fields, size=1): | |
""" | |
Get a count of documents that have any field in :fields. | |
""" | |
result = docs_with_fields(search, fields, size=size) | |
return [d['_id'] for d in result['hits']['hits']] | |
def last_N_processed(search, fields, N=1): | |
doc_ids = list_docs_with_fields(search, fields, size=N * 2) | |
return [d for d in doc_ids if 'caller' in d] | |
def get_operations(elastic, doc_type, read_idx, write_idx, read_ops, | |
write_ops): | |
""" | |
Binds indexes and doc_type to es operations | |
""" | |
ops = [] | |
get_read_op = get_document_op(elastic, read_idx, doc_type) | |
get_write_op = get_document_op(elastic, write_idx, doc_type) | |
for op in read_ops: | |
ops.append(get_read_op(op)) | |
for op in write_ops: | |
ops.append(get_write_op(op)) | |
return ops | |
def get_ops(url, doc_type, read_index, write_index): | |
# instance | |
elastic = get_elastic(url) | |
# bound operations | |
search, index, update, update_by_query = get_operations( | |
elastic, | |
doc_type, | |
read_index, | |
write_index, | |
['search'], | |
['index', 'update', 'update_by_query'] | |
) | |
return search, index, update, update_by_query | |
def transcript_context(): | |
url = os.environ["ELASTICSEARCH_WRITE_URL"] | |
doc_type = 'transcripts' | |
read_index = 'echelon-read' | |
write_index = 'echelon-write' | |
return (url, doc_type, read_index, write_index) | |
def main(): | |
# context | |
url = os.environ["ELASTICSEARCH_WRITE_URL"] | |
doc_type = 'transcripts' | |
read_index = 'echelon-read' | |
write_index = 'echelon-write' | |
# create the operations (these are functions) | |
search, index, update, update_by_query = get_ops( | |
url, | |
doc_type, | |
read_index, | |
write_index | |
) | |
# uncomment this to turn on tracing/logging | |
# debug_elasticsearch() | |
call = '00ad34d8-d09e-1235-06b0-0cc47a392728_0_caller' | |
call_id, link, call_side = call.split('_') | |
enumerate_call(search, call_id, link, call_side, show=[ | |
'post_features', # 'model_events' # 'post_events' | |
]) | |
# doc_ids = last_N_processed(search, ['post_features'], N=50) | |
# for id in doc_ids: | |
# print(id) | |
# doc_id = 'jdobson-test-12345' | |
# doc_by_id(search, doc_id) | |
if __name__ == "__main__": | |
main() |