Skip to content

Instantly share code, notes, and snippets.

@ritchie46
Last active February 23, 2023 14:47
Show Gist options
  • Save ritchie46/3c4825cd28c867cc028626b920677497 to your computer and use it in GitHub Desktop.
Save ritchie46/3c4825cd28c867cc028626b920677497 to your computer and use it in GitHub Desktop.
Example of polars multithreading third party work pool
import polars as pl
import numpy as np
df = (pl.DataFrame({
"a": np.random.randint(0, 100, int(1e8))
}).groupby("a").agg(
pl.col("a").alias("list")
))
# prints
# shape: (100, 2)
# ┌─────┬──────────────────┐
# │ a ┆ list │
# │ --- ┆ --- │
# │ i64 ┆ list[i64] │
# ╞═════╪══════════════════╡
# │ 16 ┆ [16, 16, ... 16] │
# │ 32 ┆ [32, 32, ... 32] │
# │ 64 ┆ [64, 64, ... 64] │
# │ 96 ┆ [96, 96, ... 96] │
# │ ... ┆ ... │
# │ 65 ┆ [65, 65, ... 65] │
# │ 97 ┆ [97, 97, ... 97] │
# │ 33 ┆ [33, 33, ... 33] │
# │ 1 ┆ [1, 1, ... 1] │
# └─────┴──────────────────┘
# an expensive numpy operation that releases the GIL
def third_party(s: pl.Series) -> int:
return np.sum(np.cumsum(s.to_numpy())) * np.sum(np.cunsum(np.ones(int(1e7))))
# A function that takes in a DataFrame
# converts it to a LazyFrame and uses `apply`
# to run a python loop
def my_computation(df: pl.DataFrame) -> pl.LazyFrame:
return df.lazy().select(
pl.col("list").apply(third_party)
)
# 8 threads
# partition the work to run on 8 threads
n_threads = 8
chunk_size = df.height // n_threads
# create partitions with LazyFrames
# this are promises on a computation
partitions = [my_computation(df[i * chunk_size: i * chunk_size + chunk_size]) for i in range(n_threads)]
# Collect all LazyFrames in parallel.
pl.collect_all(partitions)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment