Last active
April 23, 2023 10:02
-
-
Save michaelfeil/6d9b5dc41fd09afaa5c2f08c15808036 to your computer and use it in GitHub Desktop.
Pandas + OpenTopoData.org
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""MIT Licence, 2023, Michael Feil """ | |
import pandas as pd | |
import os | |
import requests | |
import math | |
import numpy as np | |
from typing import Callable | |
import time | |
CACHE_DIR = str(os.path.join(os.path.dirname(__file__), ".geocache")) | |
KEY_LAT = "in_latitude" | |
KEY_LNG = "in_longitude" | |
KEY_ELE = "in_elevation" | |
def _get_elevation_opentopodata_simple( | |
df: pd.DataFrame, | |
url: str = f"https://api.opentopodata.org/v1/etopo1?", | |
additonal_api_params="", | |
sleep_between_request=1.0, | |
): | |
"""Queries opentopo without prior validation or caching | |
params: | |
df: pd.DataFrame with KEY_LAT="in_latitude", KEY_LNG=in_longitude" | |
returns: | |
df: pd.DataFrame with | |
KEY_LAT="in_latitude", KEY_LNG=in_longitude", KEY_ELE = "in_elevation" | |
url: https://api.opentopodata.org/v1/{dataset}, see docs: https://www.opentopodata.org/api/ | |
additonal_api_params: e.g. "&interpolation=cubic", see docs: https://www.opentopodata.org/api/ | |
""" | |
# define rest query params | |
# split long1,lat1|lng2,lat2 | |
location = "|".join( | |
[f"{row[KEY_LAT]:.9f},{row[KEY_LNG]:.9f}" for _, row in df.iterrows()] | |
) | |
url_spec = url + "locations=" + location + additonal_api_params | |
# format query string and return query value | |
result = requests.get((url_spec)) | |
time.sleep(sleep_between_request) # free api | |
if result.status_code == 200: | |
results = pd.json_normalize(result.json()["results"]) | |
s = df.copy() | |
s[KEY_ELE] = results["elevation"].values | |
return s | |
else: | |
print(result, result.json()) | |
return None | |
def get_elevation_with_cache( | |
lng_lat: pd.DataFrame, | |
key_lat: str = "in_latitude", | |
key_lng: str = "in_longitude", | |
query_batch_size: int = 99, | |
cache_path: str = os.path.join(CACHE_DIR, f"elevation_etopo1.pickle"), | |
api_function: Callable = _get_elevation_opentopodata_simple, | |
max_cache_size: int = 10_000_000, | |
): | |
"""Query service using lat, lon. add the elevation values as a new column. | |
uses caching to cache_path, data validation and rounds to 9 decimal per | |
side effects: | |
- rounds location to 9 decimals after comma to improve caching with near duplicates | |
- attemps to query the API maximum of 4 times, with exponential backoff | |
Args: | |
lng_lat: pd.DataFrame with [key_lat, key_lng] columns, dtype float | |
key_lat: str, column name | |
key_lng: str, column name | |
query_batch_size: int | |
cache_path: str, path to cache, if "" then no cache | |
max_cache_size: int, max number of elevations to cache | |
default: 10_000_000 | |
Returns: | |
pd.Series elevation, Float64 | |
Example: | |
df = pd.DataFrame( | |
[ | |
[39.742043, -104.991531, "Denver, approx 1.610m"], | |
[49.37726, 10.19159, "Rothenburg ob der Tauber, approx 420m"], | |
], | |
columns=["lat", "LONG", "name"], | |
) | |
df["ele"] = get_elevation_with_cache( | |
df, key_lat="lat", key_lng="LONG" | |
) | |
Raises: | |
ValueError: For incorrect input data | |
""" | |
lng_lat = ( | |
lng_lat[[key_lat] + [key_lng]] | |
.copy() | |
.rename(columns={key_lat: KEY_LAT, key_lng: KEY_LNG}) | |
) | |
if cache_path: | |
os.makedirs(os.path.dirname(cache_path), exist_ok=True) | |
# checks | |
if not all("float" in str(d).lower() for d in lng_lat.dtypes): | |
raise ValueError(f"lng_lat.dtypes={lng_lat.dtypes} should be float") | |
if lng_lat.isna().any().any(): | |
raise ValueError( | |
f"lng_lat contains invalid NAN data: \n {lng_lat[lng_lat.isna().any(axis=1)]}." | |
) | |
# filter, then copy, then convert, then check if con | |
_lon_lat = lng_lat.copy() | |
lng_lat = lng_lat.astype("Float64").round(9) | |
lng_lat[(lng_lat[KEY_LNG] > 180.0) | (lng_lat[KEY_LNG] <= -180.0)] = pd.NA | |
lng_lat[(lng_lat[KEY_LAT] > 90.0) | (lng_lat[KEY_LAT] < -90.0)] = pd.NA | |
if lng_lat.isna().any().any(): | |
raise ValueError( | |
f"lng / lat contains invalid data: \n {_lon_lat[lng_lat.isna().any(axis=1)]}. \n" | |
"After conversion. only wsg84 lng/lat e.g. " | |
"pd.DataFrame([[51.958108 7.300415]]) as float is valid." | |
) | |
lon_lat_in = lng_lat.copy() | |
# lon_lat_in now immutable | |
_cache_df = None | |
lng_lat[KEY_ELE] = lng_lat[KEY_LAT] * pd.NA | |
# get cache, else just initalize column elevation with NA's | |
if cache_path and os.path.exists(cache_path): | |
# merge elevtions from cache | |
_cache_df = pd.read_pickle(cache_path).drop_duplicates() | |
lng_lat = pd.merge( | |
lng_lat, | |
_cache_df.rename(columns={key_lat: KEY_LAT, key_lng: KEY_LNG}), | |
on=[KEY_LAT] + [KEY_LNG], | |
how="left", | |
suffixes=("", "_r"), | |
) | |
lng_lat[KEY_ELE] = lng_lat[KEY_ELE].combine_first(lng_lat[KEY_ELE + "_r"]) | |
lng_lat = lng_lat[[KEY_LAT] + [KEY_LNG] + [KEY_ELE]] | |
else: | |
lng_lat[KEY_ELE] = lng_lat[KEY_LAT] * pd.NA | |
unique_pos = lng_lat.drop_duplicates(subset=[KEY_LAT] + [KEY_LNG]) | |
uniq_pos_new = [] | |
update_set = unique_pos[unique_pos[KEY_ELE].isna() & ~unique_pos[KEY_LNG].isna()] | |
if update_set.size: | |
print("querying server for elevation data") | |
for subset in np.array_split( | |
update_set, math.ceil(update_set.shape[0] / query_batch_size) | |
): | |
backoff = 0 | |
for _ in range(4): | |
time.sleep(backoff) | |
newpos = api_function(subset) | |
if newpos is not None: | |
uniq_pos_new.append(newpos) | |
break | |
else: | |
# wait for 0, 2, 6, 38s | |
backoff = 2 + backoff**2 | |
uniq_pos_new = pd.concat(uniq_pos_new, axis=0) | |
# merge new ones on top of existing positions | |
uniq_pos_new = pd.merge( | |
unique_pos, | |
uniq_pos_new, | |
on=[KEY_LAT] + [KEY_LNG], | |
how="left", | |
suffixes=("", "_r"), | |
) | |
uniq_pos_new[KEY_ELE] = uniq_pos_new[KEY_ELE + "_r"].combine_first( | |
uniq_pos_new[KEY_ELE] | |
) | |
uniq_pos_new = uniq_pos_new[[KEY_LAT] + [KEY_LNG] + [KEY_ELE]] | |
else: | |
# got all data cached | |
uniq_pos_new = unique_pos | |
print("not querying server for elevation data, only cache") | |
merged_all = pd.merge( | |
left=lon_lat_in, | |
right=uniq_pos_new, | |
on=[KEY_LAT] + [KEY_LNG], | |
how="left", | |
suffixes=("_x", ""), | |
)[[KEY_LAT] + [KEY_LNG] + [KEY_ELE]] | |
# sort in order of input, not ordered by batching / joins | |
merged_all = merged_all.reindex(lon_lat_in.index) | |
# done, report and merge cache. | |
if _cache_df is not None: | |
all_data = pd.merge( | |
merged_all, | |
_cache_df.rename(columns={key_lat: KEY_LAT, key_lng: KEY_LNG}), | |
on=[KEY_LAT] + [KEY_LNG], | |
how="outer", | |
suffixes=("", "_r"), | |
)[[KEY_LAT] + [KEY_LNG] + [KEY_ELE] + [KEY_ELE + "_r"]] | |
all_data[KEY_ELE] = all_data[KEY_ELE + "_r"].combine_first(all_data[KEY_ELE]) | |
all_data.drop([KEY_ELE + "_r"], axis=1, inplace=True) | |
all_data.drop_duplicates().iloc[:max_cache_size].to_pickle(cache_path) | |
elif cache_path: | |
merged_all.drop_duplicates().iloc[:max_cache_size].to_pickle(cache_path) | |
# return elevation column | |
return merged_all[KEY_ELE].astype("Float64") | |
def test_get_elevation(): | |
import tempfile | |
import pytest | |
df_base = pd.DataFrame( | |
[ | |
[39.742043, -104.991531, "Denver, approx 1.610m"], | |
[49.37726, 10.19159, "Rothenburg ob der Tauber, approx 420m"], | |
], | |
columns=["lat", "LONG", "name"], | |
) | |
df_muc = pd.DataFrame( | |
[ | |
[48.1351253, 11.5819806, "Munich, approx 520m"], | |
], | |
columns=["lat", "LONG", "name"], | |
) | |
df = df_base.copy() | |
df2 = df.copy() | |
df3 = df.copy() | |
fn = _get_elevation_opentopodata_simple | |
with tempfile.NamedTemporaryFile(suffix=".pkl") as tempf: | |
pd.DataFrame([], columns=[KEY_LAT] + [KEY_LNG] + [KEY_ELE]).to_pickle( | |
tempf.name | |
) | |
df["ele"] = get_elevation_with_cache( | |
df, | |
key_lat="lat", | |
key_lng="LONG", | |
api_function=fn, | |
cache_path=tempf.name, | |
query_batch_size=10, | |
) | |
_cache_df = pd.read_pickle(tempf.name) | |
# test all data written to cache | |
assert (df["ele"] == _cache_df[KEY_ELE]).all() | |
# second call should be reading only from cache | |
df2["ele"] = get_elevation_with_cache( | |
df2, key_lat="lat", key_lng="LONG", api_function=fn, cache_path=tempf.name | |
) | |
pd.testing.assert_frame_equal(df, df2) | |
# test if Munich data is additionally cached | |
df_muc["ele"] = get_elevation_with_cache( | |
df_muc, | |
key_lat="lat", | |
key_lng="LONG", | |
api_function=fn, | |
cache_path=tempf.name, | |
query_batch_size=10, | |
) | |
_cache_df = pd.read_pickle(tempf.name) | |
assert ( | |
pd.concat([df["ele"], df_muc["ele"]]).sort_values().values | |
== _cache_df[KEY_ELE].sort_values().values | |
).all() | |
# third call, without using the cache, with 2 api calls | |
df3["ele"] = get_elevation_with_cache( | |
df3, | |
key_lat="lat", | |
key_lng="LONG", | |
api_function=fn, | |
cache_path="", | |
query_batch_size=1, | |
) | |
pd.testing.assert_frame_equal(df, df3) | |
# fourth call directly without any caching or validation | |
ele = fn( | |
df[["LONG"] + ["lat"]].copy().rename(columns={"lat": KEY_LAT, "LONG": KEY_LNG}) | |
) | |
with pytest.raises(ValueError): | |
df_invalid = df_base.copy() | |
df_invalid["LONG"].iloc[0] = -191 | |
get_elevation_with_cache( | |
df_invalid, | |
key_lat="lat", | |
key_lng="LONG", | |
api_function=fn, | |
) | |
with pytest.raises(ValueError): | |
df_invalid = df_base.copy() | |
df_invalid["LONG"].iloc[0] = pd.NA | |
get_elevation_with_cache( | |
df_invalid, | |
key_lat="lat", | |
key_lng="LONG", | |
api_function=fn, | |
) | |
assert ((df["ele"].values - ele[KEY_ELE].values) <= 1).all() | |
print(df) | |
if __name__ == "__main__": | |
test_get_elevation() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment