Instantly share code, notes, and snippets.
Last active
October 9, 2022 08:11
-
Star
(1)
1
You must be signed in to star a gist -
Fork
(0)
0
You must be signed in to fork a gist
-
Save Thisisnotdalton/fefb4698df468651b83349b8dc578d73 to your computer and use it in GitHub Desktop.
Examples of using Python 3.8+ shared_memory for working with pandas DataFrames and geopandas GeoDataFrames.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pickle | |
import typing | |
from multiprocessing.shared_memory import SharedMemory, ShareableList | |
import numpy as np | |
import pandas as pd | |
import geopandas as gpd | |
def create_shared_memory_of_size(shared_memory_name: str, minimum_size_bytes: int) -> SharedMemory: | |
try: | |
buffer = SharedMemory(name=shared_memory_name, size=minimum_size_bytes, create=True) | |
except FileExistsError: | |
buffer = SharedMemory(name=shared_memory_name, size=minimum_size_bytes) | |
if buffer.size < minimum_size_bytes: | |
buffer.close() | |
buffer.unlink() | |
buffer = SharedMemory(name=shared_memory_name, size=minimum_size_bytes, create=True) | |
return buffer | |
def share_df(df: pd.DataFrame, df_id: str, extra_meta: dict = None): | |
df_columns = list(sorted(df.columns)) | |
indexed_df = df.reset_index() | |
index_name = list(set(indexed_df.columns).difference(df.columns))[0] | |
df_columns.insert(0, index_name) | |
length = len(df) | |
column_memory_buffer_id = f'{df_id}_columns' | |
meta = dict(column_memory_buffer_id=column_memory_buffer_id, columns=len(df_columns), length=length) | |
if isinstance(extra_meta, dict): | |
meta.update(extra_meta) | |
column_data = [] | |
buffer_boundaries = {} | |
buffer_size = 0 | |
string_mappings = {} | |
for i, column in enumerate(df_columns): | |
if indexed_df[column].dtype == object: | |
strings = list(sorted(filter(None, set(indexed_df[column].unique())))) | |
strings.append(None) | |
string_to_index = {k: i for i, k in enumerate(strings)} | |
encoded_values = indexed_df[column].apply(lambda _x: string_to_index.get(_x, -1)) | |
missing_values = indexed_df.loc[encoded_values == -1, column] | |
if len(missing_values) > 0: | |
print(f'Missing values: {missing_values}!') | |
indexed_df[column] = encoded_values.astype('float') | |
string_mappings[column] = strings | |
column_dtype = indexed_df[column].dtype | |
column_size_bytes = indexed_df[column].nbytes | |
buffer_end = buffer_size + column_size_bytes | |
buffer_boundaries[column] = buffer_size, buffer_end | |
column_data.extend([column, pickle.dumps(column_dtype), buffer_size]) | |
buffer_size = buffer_end + 1 | |
if len(string_mappings) > 0: | |
strings_data = pickle.dumps(string_mappings) | |
meta['string_mappings_buffer_id'] = f'{df_id}_string_columns' | |
strings_shared_memory = create_shared_memory_of_size(meta['string_mappings_buffer_id'], minimum_size_bytes=len(strings_data)) | |
strings_shared_memory.buf[:len(strings_data)] = strings_data | |
shared_buffer = create_shared_memory_of_size(column_memory_buffer_id, minimum_size_bytes=buffer_size) | |
for column in df_columns: | |
column_values_np = np.ndarray((length,), dtype=indexed_df[column].dtype, buffer=shared_buffer.buf, offset=buffer_boundaries[column][0]) | |
column_values_np[:] = indexed_df[column].to_numpy() | |
meta_list = [len(meta)] | |
for k, v in meta.items(): | |
meta_list.extend([k, v]) | |
meta_list.extend(column_data) | |
try: | |
meta_shared_list = ShareableList(name=df_id, sequence=meta_list) | |
except FileExistsError as e: | |
meta_shared_list = ShareableList(name=df_id) | |
meta_shared_list.shm.close() | |
meta_shared_list.shm.unlink() | |
meta_shared_list = ShareableList(name=df_id, sequence=meta_list) | |
return df_id | |
def share_gdf(gdf: gpd.GeoDataFrame, df_id: str): | |
df = pd.DataFrame(gdf) | |
extra_meta = dict() | |
geometry_columns = [] | |
if df.geometry is not None: | |
geometry_columns.append(gdf.geometry.name) | |
for column in gdf.columns: | |
if isinstance(gdf[column], gpd.GeoSeries) and column not in geometry_columns: | |
geometry_columns.append(column) | |
if len(geometry_columns) > 0: | |
for column in geometry_columns: | |
df[column] = gdf[column].to_wkt() | |
geometry_columns = [(column, gdf[column].crs) for column in geometry_columns] | |
extra_meta.update(geometry_columns=pickle.dumps(geometry_columns)) | |
return share_df(df, df_id, extra_meta=extra_meta) | |
def share_table(df: pd.DataFrame, df_id: str): | |
if isinstance(df, gpd.GeoDataFrame): | |
return share_gdf(df, df_id) | |
return share_df(df, df_id) | |
def parse_table_meta(df_id: str) -> dict: | |
meta_list = list(ShareableList(name=df_id)) | |
meta_header_length = meta_list.pop(0) | |
meta = {} | |
for i in range(meta_header_length): | |
k, v = meta_list[2 * i:2 * (i + 1)] | |
meta[k] = v | |
column_meta = meta_list[meta_header_length * 2:] | |
column_entry_length = 3 | |
column_dtypes = {} | |
columns = [] | |
for i in range(meta['columns']): | |
column_name, column_dtype_pickle_string, buffer_offset = column_meta[column_entry_length * i: column_entry_length * (i + 1)] | |
columns.append(column_name) | |
column_dtype = pickle.loads(column_dtype_pickle_string) | |
column_dtypes[column_name] = dict(dtype=column_dtype, offset=buffer_offset) | |
meta['columns'] = columns | |
meta['column_dtypes'] = column_dtypes | |
string_mappings = meta.get('string_mappings_buffer_id') | |
if isinstance(string_mappings, str): | |
string_mappings_data = SharedMemory(name=string_mappings, create=False) | |
meta['string_mappings'] = pickle.loads(bytearray(string_mappings_data.buf)) | |
return meta | |
def access_shared_df(df_id: str) -> typing.Tuple[pd.DataFrame, dict]: | |
meta = parse_table_meta(df_id) | |
column_data_buffer = SharedMemory(name=meta['column_memory_buffer_id']) | |
columns = {} | |
for column_name, column_meta in meta['column_dtypes'].items(): | |
columns[column_name] = np.ndarray(shape=(meta['length'],), dtype=column_meta['dtype'], buffer=column_data_buffer.buf, offset=column_meta['offset']) | |
string_mappings = meta.get('string_mappings') | |
df = pd.DataFrame(columns) | |
if isinstance(string_mappings, dict): | |
for column_name, words in string_mappings.items(): | |
assert isinstance(words, list) | |
df[column_name] = df[column_name].apply(lambda _x: words[-1 if (_x is None or _x == np.nan) else int(_x)]) | |
df = df.set_index(meta['columns'][0], drop=True) | |
return df, meta | |
def access_shared_gdf(df_id: str) -> typing.Tuple[pd.DataFrame, dict]: | |
df, meta = access_shared_df(df_id) | |
geometry_columns = meta.get('geometry_columns') | |
if geometry_columns: | |
geometry_columns = pickle.loads(geometry_columns) | |
meta['geometry_columns'] = geometry_columns | |
for column, crs in geometry_columns: | |
df[column] = gpd.GeoSeries.from_wkt(df[column], crs=crs) | |
df = gpd.GeoDataFrame(df, geometry=geometry_columns[0][0]) | |
return df, meta | |
def access_shared_table(df_id: str) -> typing.Tuple[pd.DataFrame, dict]: | |
return access_shared_gdf(df_id) | |
def close_shared_table(df_id: str, unlink: bool = False): | |
meta = parse_table_meta(df_id) | |
shared_memory_buffers = list(map(lambda _x: SharedMemory(name=_x, create=False), filter(None, map(meta.get, ('column_memory_buffer_id', 'string_mappings_buffer_id'))))) | |
shared_memory_buffers.append(ShareableList(name=df_id).shm) | |
for buffer in shared_memory_buffers: | |
buffer.close() | |
if unlink: | |
for buffer in shared_memory_buffers: | |
buffer.unlink() | |
def dispose_shared_table(df_id: str): | |
close_shared_table(df_id, unlink=True) | |
def main(): | |
tests = { | |
f'gdf_{i}': gpd.read_file(gpd.datasets.get_path(data_set)) | |
for i, data_set in enumerate(gpd.datasets.available) | |
} | |
df1 = pd.DataFrame() | |
df1_columns = ['a', 'b'] | |
rows = 10000 | |
for i, column in enumerate(df1_columns): | |
df1[f'{column}_int'] = np.array(list(map(lambda _x: _x * (i + 1), range(rows))), dtype='i4') | |
df1[f'{column}_long'] = np.array(list(map(lambda _x: _x * (i + 1), range(rows))), dtype='i8') | |
df1[f'{column}_float'] = np.array(list(map(lambda _x: _x * (i + 1), range(rows))), dtype='f4') | |
df1[f'{column}_double'] = np.array(list(map(lambda _x: _x * (i + 1), range(rows))), dtype='f8') | |
df1[f'{column}_string'] = np.array(list(map(lambda _x: str(_x * (i + 1)) * (1 + (i % 10)), range(rows))), dtype='f8') | |
for column in ('a_string', 'a_float', 'a_double'): | |
df1.loc[df1.index.to_series() % 3 == 0, column] = None | |
df1.loc[df1.index.to_series() % 3 == 1, column] = np.nan | |
tests['test_df1'] = df1 | |
shared_memory_objects = [] | |
for table_id, table_df in tests.items(): | |
meta = share_table(table_df, table_id) | |
shared_memory_objects.append(table_id) | |
shared_df, meta = access_shared_table(table_id) | |
assert isinstance(shared_df, pd.DataFrame) | |
assert len(shared_df) == len(table_df) | |
assert set(shared_df.columns) == set(table_df.columns), f'Mismatched columns: {set(shared_df.columns)} != {set(table_df.columns)}' | |
for column in table_df.columns: | |
if isinstance(shared_df[column], gpd.GeoSeries): | |
geometries1 = shared_df[column].to_wkt() | |
geometries2 = table_df[column].to_wkt() | |
assert geometries1.equals(geometries2), f'Column {column} does not match: {geometries1} != {geometries2}' | |
else: | |
assert shared_df[column].equals(table_df[column]), f'Column {column} does not match: {shared_df[column]} != {table_df[column]}' | |
for table_id in tests: | |
dispose_shared_table(table_id) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment