Last active
August 25, 2017 21:17
-
-
Save gustabot42/28dbfbaac1afd7f8b043805b941178b0 to your computer and use it in GitHub Desktop.
Script para comparar gdb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!python3 | |
# Using python3 anconda | |
# conda install -c conda-forge gdal | |
# gustavodiazjaimes@gmail.com | |
# versión 0.1 | |
#usage: | |
# | |
#from gdb_cmp import * | |
# | |
#ORIGIN_PATH = "/home/kiwi/grupo2/AMB-Fase2/BACKUP FGDB/SIG_AMB15082017.gdb/" | |
#EDITED_PATH = "" | |
#ORIGIN_LAYER = "VisitaRESPEL" | |
#EDITED_LAYER = "VisitaRESPELHistorico" | |
#ID_ATTRNAME = "IDVISITARESPEL" | |
# | |
#print_compare(ORIGIN_PATH, EDITED_PATH, ORIGIN_LAYER, ID_ATTRNAME, EDITED_LAYER, rstrip=True) | |
from pathlib import Path | |
from contextlib import contextmanager | |
from collections import Counter | |
import pandas as pd | |
from osgeo import ogr | |
@contextmanager | |
def _get_dataset(gdb_path): | |
try: | |
driver = ogr.GetDriverByName("OpenFileGDB") | |
dataset = driver.Open(gdb_path, False) # False = no edition, OpenFileGDB driver is read only | |
yield dataset | |
finally: | |
del dataset | |
def get_layers(gdb_path): | |
with _get_dataset(gdb_path) as dataset: | |
layers = [] | |
for layer in dataset: | |
layers.append(layer.GetName()) | |
return layers | |
def get_layer_schema(gdb_path, layer_name): | |
with _get_dataset(gdb_path) as dataset: | |
#print(gdb_path, layer_name) | |
layer = dataset.GetLayerByName(layer_name) | |
schema = [(field.GetName(), field.GetTypeName()) for field in layer.schema] | |
return schema | |
def get_layer_ids(gdb_path, layer_name, id_attrname): | |
with _get_dataset(gdb_path) as dataset: | |
layer = dataset.GetLayerByName(layer_name) | |
ids = [e.GetField(id_attrname) for e in layer] | |
return ids | |
def find_repeated_ids(gdb_path, layer_name, id_attrname): | |
ids = get_layer_ids(gdb_path, layer_name, id_attrname) | |
repeated_ids = set([e for e,v in Counter(ids).items() if v > 1]) | |
with _get_dataset(gdb_path) as dataset: | |
layer = dataset.GetLayerByName(layer_name) | |
repeated_obj = [(e.GetFID(), e.GetField(id_attrname)) | |
for e in layer if e.GetField(id_attrname) in repeated_ids] | |
return sorted(repeated_obj, key=lambda v: (v[1],v[0])) | |
def print_elements(origin_path, edited_path, origin_layer, id_attrname, edited_layer="", ids_toprint=[]): | |
if not edited_path: | |
edited_path = origin_path | |
if not edited_layer: | |
edited_layer = origin_layer | |
*_, shared_schema = cmp_layer_schema(origin_path, edited_path, origin_layer, edited_layer) | |
shared_attrs = [name for name,_ in shared_schema] | |
elements = [shared_attrs] | |
for gdb_path, layer_name in [(origin_path, origin_layer), (edited_path, edited_layer)]: | |
with _get_dataset(gdb_path) as dataset: | |
layer = dataset.GetLayerByName(layer_name) | |
for e in layer: | |
if e.GetField(id_attrname) in ids_toprint: | |
values = [e.GetField(attr) for attr in shared_attrs] | |
elements.append(values) | |
return elements | |
def gdb_to_pandas(gdb_path, layer_name, id_attrname, attrs=None, ids=None): | |
if not attrs: | |
attrs = [name for name,_ in get_layer_schema(gdb_path, layer_name)] | |
data = [] | |
with _get_dataset(gdb_path) as dataset: | |
layer = dataset.GetLayerByName(layer_name) | |
for e in layer: | |
if ids and e.GetField(id_attrname) not in ids: | |
continue | |
values = [e.GetField(attr) for attr in attrs] | |
data.append(values) | |
df = pd.DataFrame(data) | |
df.columns = attrs | |
return df | |
def cmp_layer_schema(origin_path, edited_path, origin_layer, edited_layer=""): | |
if not edited_path: | |
edited_path = origin_path | |
if not edited_layer: | |
edited_layer = origin_layer | |
origin_schema = set(get_layer_schema(origin_path, origin_layer)) | |
edited_schema = set(get_layer_schema(edited_path, edited_layer)) | |
origin_only = origin_schema - edited_schema | |
edited_only = edited_schema - origin_schema | |
shared = origin_schema & edited_schema | |
return origin_only, edited_only, shared | |
def find_related_ids(origin_path, edited_path, origin_layer, id_attrname, edited_layer=""): | |
if not edited_path: | |
edited_path = origin_path | |
if not edited_layer: | |
edited_layer = origin_layer | |
origin_ids = set(get_layer_ids(origin_path, origin_layer, id_attrname)) | |
edited_ids = set(get_layer_ids(edited_path, edited_layer, id_attrname)) | |
origin_only = origin_ids - edited_ids | |
edited_only = edited_ids - origin_ids | |
shared_ids = origin_ids & edited_ids | |
with _get_dataset(origin_path) as dataset: | |
layer = dataset.GetLayerByName(origin_layer) | |
origin_list = [] | |
for e in layer: | |
_id = e.GetField(id_attrname) | |
if _id in origin_only: | |
origin_list.append(_id) | |
with _get_dataset(edited_path) as dataset: | |
layer = dataset.GetLayerByName(edited_layer) | |
edited_list = [] | |
for e in layer: | |
_id = e.GetField(id_attrname) | |
if _id in edited_only: | |
edited_list.append(_id) | |
return origin_list, edited_list, shared_ids | |
def cmp_related_ids(origin_path, edited_path, origin_layer, id_attrname, edited_layer="", rstrip=False): | |
def _clean(value): | |
if value is None: | |
return "" | |
if rstrip and isinstance(value, str): | |
value = value.rstrip() | |
if value == "0": | |
return "" | |
return value | |
if not edited_path: | |
edited_path = origin_path | |
if not edited_layer: | |
edited_layer = origin_layer | |
*_, shared_schema = cmp_layer_schema(origin_path, edited_path, origin_layer, edited_layer) | |
shared_attrs = [name for name,_ in shared_schema] | |
*_, shared_ids = find_related_ids(origin_path, edited_path, origin_layer, id_attrname, edited_layer) | |
origin_df = gdb_to_pandas(origin_path, origin_layer, id_attrname, shared_attrs, shared_ids) | |
edited_df = gdb_to_pandas(edited_path, edited_layer, id_attrname, shared_attrs, shared_ids) | |
compare = [] | |
for i, o_row in origin_df.iterrows(): | |
_id = o_row[id_attrname] | |
for j, e_row in edited_df[edited_df[id_attrname] == _id].iterrows(): | |
attr_diff = [] | |
for k in range(len(shared_attrs)): | |
if _clean(o_row[k]) != _clean(e_row[k]): | |
attr_diff.append(shared_attrs[k]) | |
compare.append((_id, attr_diff)) | |
return compare | |
def print_compare(origin_path, edited_path, origin_layer, id_attrname, edited_layer="", rstrip=False): | |
def _print_layer_info(attrs, ids, repeated): | |
# layer | |
print("# Layer") | |
## Unique Attrs | |
print("## Atributos únicos = {}".format(len(attrs))) | |
print(", ".join(attrs)) | |
## Núm Elemento | |
print("## Elementos en total = {}".format(len(ids))) | |
print(",".join([str(i) for i in ids])) | |
## Repited ids | |
print("## Elementos repetidos = {}".format(len(repeated))) | |
print("objectid, {}".format(id_attrname)) | |
for r in repeated: | |
print("{}, {}".format(*r)) | |
print("") | |
if not edited_path: | |
edited_path = origin_path | |
if not edited_layer: | |
edited_layer = origin_layer | |
origin_schema, edited_schema, shared_schema = cmp_layer_schema(origin_path, edited_path, origin_layer, edited_layer) | |
origin_attrs = [name for name,_ in origin_schema] | |
edited_attrs = [name for name,_ in edited_schema] | |
shared_attrs = [name for name,_ in shared_schema] | |
origin_ids = get_layer_ids(origin_path, origin_layer, id_attrname) | |
edited_ids = get_layer_ids(edited_path, edited_layer, id_attrname) | |
origin_repeated = find_repeated_ids(origin_path, origin_layer, id_attrname) | |
edited_repeated = find_repeated_ids(edited_path, edited_layer, id_attrname) | |
origin_unqids, edited_unqids, shared_ids = find_related_ids(origin_path, edited_path, origin_layer, | |
id_attrname, edited_layer) | |
compared = cmp_related_ids(origin_path, edited_path, origin_layer, id_attrname, edited_layer, rstrip) | |
_print_layer_info(origin_attrs, origin_ids, origin_repeated) | |
_print_layer_info(edited_attrs, edited_ids, edited_repeated) | |
print("# Compartidos") | |
print("## Atributos = {}".format(len(shared_attrs))) | |
print(", ".join(shared_attrs)) | |
print("") | |
print("Comparación de IDs") | |
print("{}, Tipo, Attr modificados".format(id_attrname)) | |
for nid in origin_unqids: | |
print("{}, <".format(nid)) | |
for nid in edited_unqids: | |
print("{}, >".format(nid)) | |
for nid, attrs in compared: | |
_type = "=" if not attrs else "+" | |
print("{}, {}, {}".format(nid, _type, "|".join(attrs))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment