Skip to content

Instantly share code, notes, and snippets.

@spencerkclark
Created March 26, 2021 10:47
Show Gist options
  • Save spencerkclark/c1555533f8c74bb9722a70486b372256 to your computer and use it in GitHub Desktop.
Save spencerkclark/c1555533f8c74bb9722a70486b372256 to your computer and use it in GitHub Desktop.
Code for computing regressions of variables with an arbitrary number of dimensions against a 1D index
import dask.array as darray
import numpy as np
import xarray as xr
from collections import defaultdict
def _matmul(arr1, arr2):
if isinstance(arr1, darray.core.Array):
return darray.matmul(arr1, arr2)
else:
return np.matmul(arr1, arr2)
def regress(ds, index, sampling_dim):
"""Regress a variable against a given index along a sampling dimension.
Assumes the dimension of the index is the sampling dimension.
Parameters
----------
ds : xr.Dataset
Input Dataset
index : xr.DataArray
Index DataArray; must be 1D
sampling_dim : xr.DataArray
Dimension name
Returns
-------
xr.Dataset or xr.DataArray
"""
if isinstance(ds, xr.DataArray):
ds = ds.to_dataset()
structure_dims = defaultdict(list)
for var in ds.data_vars:
sdims = set(ds[var].dims) - set([sampling_dim])
sdims = tuple(sorted(tuple(sdims)))
structure_dims[sdims].append(var)
datasets = []
for sdims, variables in structure_dims.items():
structure = ds[variables]
structure = structure.stack(structure=tuple(sdims))
structure = structure.transpose('structure', sampling_dim)
# Chunk to preserve block size
structure_chunk_sizes = [ds.chunks[dim][0] for dim in sdims]
structure_chunk_size = np.product(structure_chunk_sizes)
structure = structure.chunk({'structure': structure_chunk_size})
result = xr.apply_ufunc(
_matmul, structure.fillna(0.0), index,
input_core_dims=[('structure', sampling_dim), (sampling_dim, )],
output_core_dims=[('structure', )], dask='allowed'
)
samples = xr.apply_ufunc(
np.isfinite, structure, dask='parallelized',
output_dtypes=[np.float]).sum(sampling_dim)
result = result / samples
datasets.append(result.unstack('structure'))
return xr.merge(datasets)
def regress_(da, index, sampling_dim):
"""Regress a variable against a given index along a sampling dimension.
Assumes the dimension of the index is the sampling dimension.
Parameters
----------
da : xr.DataArray
Input DataArray
index : xr.DataArray
Index DataArray; must be 1D
sampling_dim : xr.DataArray
Dimension name
Returns
-------
xr.DataArray
"""
structure_dims = set(da.dims) - set([sampling_dim])
structure = da.stack(structure=tuple(structure_dims))
structure = structure.transpose('structure', sampling_dim)
result = xr.apply_ufunc(
_matmul, structure.fillna(0.0), index,
input_core_dims=[('structure', sampling_dim), (sampling_dim, )],
output_core_dims=[('structure', )], dask='allowed'
)
samples = xr.apply_ufunc(
np.isfinite, structure, dask='parallelized',
output_dtypes=[np.float]).sum(sampling_dim)
result = result / samples
return result.unstack('structure')
def lag_regress(da, index, sampling_dim, lag):
"""Lag regress a variable against a given index along a sampling dimension
Parameters
----------
da : xr.DataArray
Input DataArray
index : xr.DataArray
Index DataArray; must be 1D
sampling_dim : xr.DataArray
Dimension name
lag : int
Integer lag
Returns
-------
xr.DataArray
"""
shifted = da.shift(**{sampling_dim: -lag})
shifted, index = xr.align(shifted, index)
return regress(shifted, index, sampling_dim)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment