Skip to content

Instantly share code, notes, and snippets.

@ian-r-rose
Created June 7, 2022 19:45
Show Gist options
  • Save ian-r-rose/433335cc26235452720567d1ee9a8513 to your computer and use it in GitHub Desktop.
Save ian-r-rose/433335cc26235452720567d1ee9a8513 to your computer and use it in GitHub Desktop.
import contextlib
import time
import coiled
import dask
import dask.dataframe as dd
import distributed
import pandas
from dask.datasets import timeseries
@contextlib.contextmanager
def timer(label="Block"):
"""
Time a block of code and print out the result when done.
"""
start = time.time()
yield
end = time.time()
print(f"**{label}** took **{end-start:.2f}** seconds")
new = "2022.5.2"
coiled.create_software_environment(
f"parquet-{new.replace('.', '-')}",
conda={
"channels": ["conda-forge"],
"dependencies": [
"python=3.9",
f"dask=={new}",
f"distributed=={new}",
"s3fs>=2022.5.0",
"pyarrow=7",
"fastparquet=0.8.1",
],
},
)
kind = new
with coiled.Cluster(
name=f"parquet-{kind.replace('.', '-')}",
software=f"parquet-{kind.replace('.', '-')}",
n_workers=500,
worker_vm_types=["t3.2xlarge"],
scheduler_vm_types=["t3.2xlarge"],
backend_options={"region": "us-east-1"},
) as cluster:
with distributed.Client(cluster) as client:
WRITE_INITIAL_DATA = False
if WRITE_INITIAL_DATA:
ddf = timeseries(
dtypes={
**{f"name-{i}": str for i in range(50)},
**{f"price-{i}": float for i in range(50)},
**{f"id-{i}": int for i in range(50)},
**{f"cat-{i}": "category" for i in range(50)},
},
start="2002-01-01",
end="2022-01-01",
freq="10ms",
partition_freq="1H",
)
print(ddf.npartitions)
with timer("Writing 35TB of data"):
s = ddf.to_parquet(
"s3://oss-shared-scratch/parquet-terabytes/",
engine="pyarrow",
write_metadata_file=False,
)
client.restart()
with timer("ETL 35TB"), dask.annotate(retries=5):
ddf2 = dd.read_parquet(
"s3://oss-shared-scratch/parquet-terabytes/",
engine="pyarrow",
)
ddf3 = ddf2.assign(date=pandas.Timestamp.now())
ddf3.to_parquet(
"s3://oss-shared-scratch/parquet-terabytes-transform/",
engine="pyarrow",
write_metadata_file=False,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment