Skip to content

Instantly share code, notes, and snippets.

@ocuil
Created July 6, 2021 20:18
Show Gist options
  • Save ocuil/12cbe4e9e2b82d9b7609d6fd8f22f17e to your computer and use it in GitHub Desktop.
Save ocuil/12cbe4e9e2b82d9b7609d6fd8f22f17e to your computer and use it in GitHub Desktop.
python deduplicate docs stored in elasticsearch
# This code come from https://www.elastic.co/es/blog/how-to-find-and-remove-duplicate-documents-in-elasticsearch
# Just add the step to clear duplicate docs :-)
# Thanks @elastic !!! =)
import hashlib
from elasticsearch import Elasticsearch
es = Elasticsearch(
cloud_id="XXXXXXXXX:YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY",
http_auth=("USER", "PASSWORD"),
)
dict_of_duplicate_docs = {}
# La línea siguiente define los campos que
# se usarán para determinar si un documento es un duplicado.
keys_to_include_in_hash = ["key1", "key2",
"key3", "key4", "key5"]
# Procesar documentos que devuelve search/scroll actual
def populate_dict_of_duplicate_docs(hits):
for item in hits:
combined_key = ""
for mykey in keys_to_include_in_hash:
combined_key += str(item['_source'][mykey])
_id = item["_id"]
hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
# Si el hashval es nuevo, crearemos una clave nueva
# en dict_of_duplicate_docs, a la que se
# asignará un valor de una matriz vacía.
# Luego insertamos inmediatamente el _id en la matriz.
# Si el hashval ya existe, entonces
# solo insertaremos el _id nuevo en la matriz existente.
dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
# Realiza el bucle por todos los documentos del índice y completa
# la estructura de datos dict_of_duplicate_docs.
def scroll_over_all_docs():
data = es.search(index="INDEX", scroll='1m',
body={"query": {"match_all": {}}})
# Obtén la ID de scroll.
sid = data['_scroll_id']
scroll_size = len(data['hits']['hits'])
# Antes del scroll, procesa el batch actual de resultados.
populate_dict_of_duplicate_docs(data['hits']['hits'])
while scroll_size > 0:
data = es.scroll(scroll_id=sid, scroll='2m')
# Procesa el batch actual de resultados.
populate_dict_of_duplicate_docs(data['hits']['hits'])
# Actualiza la ID de scroll.
sid = data['_scroll_id']
# Obtén la cantidad de resultados del último scroll.
scroll_size = len(data['hits']['hits'])
def loop_over_hashes_and_remove_duplicates():
count = 0
# Busca en el hash de valores de documentos para comprobar
# si se encontraron hashes duplicados.
for hashval, array_of_ids in dict_of_duplicate_docs.items():
if len(array_of_ids) > 1:
count += 1
print("********** Duplicate docs hash=%s **********" % hashval)
# Obtén los documentos mapeados al hashval actual.
matching_docs = es.mget(index="INDEX", doc_type="doc", body={
"ids": array_of_ids})
es.delete(index=matching_docs['docs'][1]['_index'],
id=matching_docs['docs'][1]['_id']
)
print(count)
def main():
scroll_over_all_docs()
loop_over_hashes_and_remove_duplicates()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment