Skip to content

Instantly share code, notes, and snippets.

@djs-basil-sys
Last active September 12, 2022 19:51
Show Gist options
  • Save djs-basil-sys/6a0cd30f6d6758be4473aadb126959ed to your computer and use it in GitHub Desktop.
Save djs-basil-sys/6a0cd30f6d6758be4473aadb126959ed to your computer and use it in GitHub Desktop.
import pandas as pd
from datetime import datetime
from rapidfuzz.process import extractOne
from string_grouper import match_strings, group_similar_strings
from sys import argv
from time import monotonic
from uuid import uuid4
class Constants:
UUID: str = str(uuid4()).split("-")[1]
TIMESTAMP: str = datetime.utcnow().strftime("%Y%m%dT%H%M%S")
MINIMUM_TOKEN_LENGTH: int = 2
DEFAULT_SEPR: str = "|"
THRESHOLD: float = 0.75
@staticmethod
def return_uuid() -> str:
return str(uuid4()).split("-")[1]
@staticmethod
def return_ts() -> str:
return datetime.utcnow().strftime("%Y-%m-%dT%H.%M.%S")
class Matchers:
@staticmethod
def levenshtein_matcher(
names: list[str],
df: pd.DataFrame,
find_in: str,
match_against: str,
threshold: float = Constants.THRESHOLD,
) -> pd.DataFrame:
"""
`find_in`: Column within which `names` may be found.
`match_against`: Column against which `names` are compared.
"""
matched = []
for name in names:
against = df[df[find_in] == name][match_against].unique().tolist()
# Don't want to "filter out" matches, so no threshold. However, still need
# to pick one (and only one) match.
matches = extractOne(name, against)
if not matches:
matches = (None, 0.0)
matched.append([name, matches[0], matches[1]])
return pd.DataFrame(matched, columns=[find_in, match_against, "similarity"])
@staticmethod
def cosine_matcher(
df_1: pd.DataFrame,
col_1: str,
df_2: pd.DataFrame,
col_2: str,
threshold: float = Constants.THRESHOLD,
) -> pd.DataFrame:
"""
Use the `string_grouper.match_strings` method (which uses cosine
similarity) to match a "best fit" for each name.
"""
df = match_strings(
pd.Series(df_1[col_1].unique().tolist()).astype(str),
pd.Series(df_2[col_2].unique().tolist()).astype(str),
ignore_index=True,
min_similarity=threshold,
)
df = df.sort_values(by=["left_side", "similarity"], ascending=[True, False])
df = df.drop_duplicates(subset="left_side")
df.columns = [col_1, "similarity", col_2]
return df
@staticmethod
def deduplicate(
df: pd.DataFrame, col: str, threshold: float = Constants.THRESHOLD
) -> pd.DataFrame:
df[f"deduplicated_{col}"] = group_similar_strings(
df[col], ignore_index=True, min_similarity=threshold
)
return df
@staticmethod
def get_best_match(
df_1: pd.DataFrame,
df_1_col: str,
df_2: pd.DataFrame,
df_2_cols: list[str, str],
threshold: float = Constants.THRESHOLD,
) -> pd.DataFrame:
start = monotonic()
print(f"Starting process to get best matching {df_1_col} from {df_2_cols[0]}.")
print(f"Original DataFrame has {df_1.shape[0]} records.")
df_1 = df_1.merge(
df_2[df_2_cols].drop_duplicates(),
how="left",
on=df_2_cols[0],
suffixes=["", "_"],
)
print(
f"DataFrame now has {df_1.shape[0]} following merge to secondary DataFrame."
)
counts = (
df_1[df_1_col].value_counts().reset_index()
) # Get duplicates; given many-to-many nature of different levels, need to get best matches.
counts.columns = [df_1_col, "count"]
counts = counts[counts["count"] > 1]
print(f"There were {counts.shape[0]} duplicates post-merge.")
df_1_singl = df_1[~df_1[df_1_col].isin(counts[df_1_col])]
df_1_singl = df_1_singl[
[df_1_col, df_2_cols[0], df_2_cols[1]]
].drop_duplicates()
print(
f"There are {df_1_singl.shape[0]} unique values that do not require further matching."
)
df_1_multi = df_1[df_1[df_1_col].isin(counts[df_1_col])]
print(
f"There are now {df_1_multi.shape[0]} names following merge of trade names to establishment names."
)
print(f"Getting Levenshtein distance between {df_1_col} and {df_2_cols[1]}.")
best_match = Matchers.levenshtein_matcher(
df_1_multi[df_1_col].unique().tolist(),
df_1_multi,
df_1_col,
df_2_cols[1],
threshold,
)
df_1_multi = df_1_multi.merge(
best_match, how="left", on=df_1_col, suffixes=["", "_"]
)
score_hierarchy = [
col for col in df_1_multi.columns if col.startswith(df_2_cols[1])
]
score_hierarchy = sorted(score_hierarchy, key=len, reverse=True)
df_1_multi[df_2_cols[1]] = df_1_multi[score_hierarchy[0]].fillna(
df_1_multi[score_hierarchy[1]]
)
df_1_multi = df_1_multi[
[df_1_col, df_2_cols[0], df_2_cols[1]]
].drop_duplicates()
df = pd.concat([df_1_singl, df_1_multi])
print(
f"Refining results complete; final DataFrame has has {df.shape[0]} records. Total process took: {monotonic() - start}"
)
return df
class DfReader:
"""
ERDL File:
registration_id|level_1|level_2|level_3
7542|DENTSPLY SIRONA INC|DEGUDENT GMBH|DENTSPLY SIRONA INC
69194||BIO PLAS INC|BIO PLAS INC
6805|LUV N CARE LTD|LUV N CARE LTD|LUV N CARE LTD
"""
"""
Source File
id|name
1|VELA DIAGNOSTICS USA INC
2|OHMEDA MEDICAL
3|UROSURGE INC
"""
@staticmethod
def load_df(filepath: str) -> pd.DataFrame:
start = monotonic()
print(f"Starting load of `{filepath}`: {Constants.return_ts()}")
df = pd.read_csv(filepath, dtype=str, sep=Constants.DEFAULT_SEPR)
df = df.drop_duplicates()
df = df.fillna("NULL")
if "id" in df.columns:
df["id"] = df["id"].astype(int)
df = df.sort_values(by="id")
print(f"`{filepath}` loading took: {monotonic() - start}")
return df
@staticmethod
def write_df(df: pd.DataFrame, filepath):
df = df.fillna("NULL")
df.to_csv(filepath, sep="|", index=False)
print(f"DataFrame saved to:\n{filepath}")
def main(source_filepath: str, erdl_filepath: str, col: str, threshold: float):
start = monotonic()
threshold = float(threshold)
print(
f"Starting the creation of families against Establishment Registrations. Started: {Constants.return_ts()}"
)
df = DfReader.load_df(source_filepath)
print(f"Initial DataFrame has {df.shape[0]} records.")
temp_col = Constants.return_uuid()
df[temp_col] = df[col].str.replace(r"[^A-Za-z0-9\s]", " ", regex=True)
df[temp_col] = df[temp_col].apply(lambda x: " ".join(x.split()).strip())
erdl = DfReader.load_df(erdl_filepath)
print(f"Establishment registrations file has {erdl.shape[0]} records.")
start = monotonic()
print("Starting initial match to business trade names and establishment names.")
matches_trade = Matchers.cosine_matcher(df, temp_col, erdl, "level_1", threshold)
print(f"Made {matches_trade.shape[0]} matches by trade name.")
matches_estab = Matchers.cosine_matcher(df, temp_col, erdl, "level_2", threshold)
matches_estab = matches_estab[[temp_col, "level_2"]].drop_duplicates()
print(f"Made {matches_estab.shape[0]} matches by establishment name.")
matches_trade = matches_trade[
~matches_trade[temp_col].isin(matches_estab[temp_col])
]
print(
f"Filtered duplicate matches from trade names using establishment names: {matches_trade.shape[0]}"
)
matches_trade = Matchers.get_best_match(
matches_trade, temp_col, erdl, ["level_1", "level_2"], threshold
)
sub_family_names = pd.concat([matches_estab, matches_trade])
print(
f"Match to business trade names and establishment names done. Took: {monotonic() - start}"
)
start = monotonic()
print("Attempting to match establishment names to operators.")
sub_family_names = Matchers.get_best_match(
sub_family_names, temp_col, erdl, ["level_2", "level_3"], threshold
)
print(f"Matched {sub_family_names.shape[0]} records to an establishment.")
print(
f"Matching establishments to owner operators complete. Took: {monotonic() - start}"
)
start = monotonic()
print("Starting matching of companies without matched sub families.")
# Essentially same as merging sub_family_names into df, then getting
# values where level_2 or level_3 are null.
orphans = df[~df[temp_col].isin(sub_family_names[temp_col])]
print(f"There are {orphans.shape[0]} companies without a matching establishment.")
matches_operator = Matchers.cosine_matcher(
orphans, temp_col, erdl, "level_3", threshold
)
matches_operator = matches_operator[[temp_col, "level_3"]].drop_duplicates()
matches_operator["level_2"] = pd.NA
matches_operator = matches_operator[
sub_family_names.columns
] # Make sure order of columns is the same.
print(f"Matched {matches_operator.shape[0]} records to an owner operator.")
matches = pd.concat([matches_operator, sub_family_names])
print(
f"Matching companies to owner operators complete. Took: {monotonic() - start}"
)
start = monotonic()
print(f"Filling out families and removing duplicates.")
df = df.merge(matches, how="left", on=temp_col)
df["pre_family"] = df["level_3"].fillna(df[col])
df = Matchers.deduplicate(
df, "pre_family", 0.85
) # Use a 10% higher threshold for de-duplicating.
df = df.drop(columns=[temp_col])
starting_size = len(df[col].unique())
ending_size = len(df[f"deduplicated_pre_family"].unique())
reduction_rate = (ending_size - starting_size) / starting_size
print(
f"Families condensed from {starting_size} to {ending_size} for a reduction rate of {abs(reduction_rate) * 100:.3f}%."
)
print(f"De-duplication complete. Took: {monotonic() - start}")
DfReader.write_df(df, f"{source_filepath}.{Constants.return_ts()}.processed")
print(
f"Completed creation of families. Ended: {Constants.return_ts()}. Took: {monotonic() - start}"
)
if __name__ == "__main__":
main(argv[1], argv[2], argv[3], argv[4])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment