Skip to content

Instantly share code, notes, and snippets.

@Aluriak
Created September 10, 2021 10:43
Show Gist options
  • Save Aluriak/1eaa3084659ea031992d77e681429ad1 to your computer and use it in GitHub Desktop.
Save Aluriak/1eaa3084659ea031992d77e681429ad1 to your computer and use it in GitHub Desktop.
#! /usr/bin/env python3
# coding: utf-8
#
# Module to generate the code of Netcdf4 Driver (Mbg...)
#
import ntpath
import netCDF4 as nc
import numpy as np
# Global parameters
nc_file = '/home/lucas/shared/filtri_Pr_023_tide_COR__MAREE.mbg'
driver_name = "Mbg"
layer_prefix = "mb" # None if variables does start by any prefix
def camel_to_snake(s: str) -> str:
return "".join(["_" + c.lower() if c.isupper() else c for c in s]).lstrip("_")
def variable_to_layer(variable_name: str) -> str:
if layer_prefix is not None and variable_name.startswith(layer_prefix):
variable_name = variable_name[len(layer_prefix) :]
return camel_to_snake(variable_name).upper()
def print_variable_initialization(variable, variable_name, max_var_name_len: int):
dtype = 'np.' + repr(variable.dtype)
has_vm = "valid_maximum" in variable.ncattrs()
vm = variable.valid_maximum if has_vm else None
has_sf = "scale_factor" in variable.ncattrs()
sf = isinstance(variable.scale_factor, float) if has_sf else None
r = dtype + ', ' + ', '.join(map(repr, (has_vm, vm, has_sf, sf)))
print(f' {repr(variable_to_layer(variable_name)).rjust(max_var_name_len+5)}: ({r}),')
with nc.Dataset(nc_file) as dataset:
print(
"""
#! /usr/bin/env python3
# coding: utf-8
import netCDF4 as nc
import numpy as np
from functools import partial as partialmethod # for currying
BYTE_MAX = (1 << 7) - 1
SHORT_MAX = (1 << 15) - 1
INT_MAX = (1 << 31) - 1
layer_prefix = "mb" # None if variables does start by any prefix
def camel_to_snake(s: str) -> str:
return "".join(["_" + c.lower() if c.isupper() else c for c in s]).lstrip("_")
def variable_to_layer(variable_name: str) -> str:
if layer_prefix is not None and variable_name.startswith(layer_prefix):
variable_name = variable_name[len(layer_prefix) :]
return camel_to_snake(variable_name).upper()
"""
)
print("# Dimensions")
dimensions = list(dataset.dimensions)
dimensions.sort()
for variable_name in dimensions:
print(f'{variable_to_layer(variable_name)} = "{variable_name}"')
print("# Layers")
variables = list(dataset.variables.keys())
variables.sort()
for variable_name in variables:
print(f'{variable_to_layer(variable_name)} = "{variable_name}"')
# Class definition
print(
f"""
class {driver_name}Driver():
\"""
Driver class of {driver_name} files to encapsulate the Netcdf4 access
\"""
@property
def dataset(self) -> nc.Dataset:
return self._dataset
"""
)
print(' LAYERS = {')
max_var_name_len = max(map(len, variables))
for variable_name in variables:
print_variable_initialization(dataset.variables[variable_name], variable_name, max_var_name_len)
print(' }')
print(f'''
# def __getattr__(self, k):
# """Return the data of asked variable as a numpy array.
# Generated with {ntpath.basename(__file__)}
# """
# if k.startswith('read_'):
# layer = variable_to_layer(k[len('read_'):])
# if layer in self.LAYERS:
# return self.__accessor_for(layer)
# # in any other case…
# raise AttributeError
def __accessor_for(self, layer: str):
dtype, has_valid_max, valid_maximum, has_sf, sf = self.LAYERS[layer]
numpy_dtype = None
if has_valid_max:
if dtype == np.short and valid_maximum > SHORT_MAX:
numpy_dtype = np.ushort
elif dtype is int and valid_maximum > INT_MAX:
numpy_dtype = np.uintc
elif dtype == np.byte and valid_maximum > BYTE_MAX:
numpy_dtype = np.ubyte
elif dtype == np.dtype("|S1"):
numpy_dtype = np.int8
if numpy_dtype:
if has_sf and isinstance(sf, float):
return partialmethod(self.__read_layer_as, layer_name=layer, from_numpy_dtype=numpy_dtype.__name__, to_numpy_dtype=float)
else:
return partialmethod(self.__read_layer_as, layer_name=layer, from_numpy_dtype=numpy_dtype.__name__, to_numpy_dtype=None)
else:
return partialmethod(self.__read_layer, layer_name=layer)
def __new__(cls, *args, **kwargs):
obj = super().__new__(cls)
# create accessors
for layer in cls.LAYERS:
layername = layer.lower()
setattr(obj, f'read_{{layername}}', obj.__accessor_for(layer))
return obj
''')
# Factorized methods
print(f'''
def __apply_offset_and_scale(self, variable : nc.Variable, data : np.ndarray) -> None:
\"""
Apply the offset and scale if present
Generated with {ntpath.basename(__file__)}
\"""
if "scale_factor" in variable.ncattrs():
np.multiply(data, variable.scale_factor, out=data)
if "add_offset" in variable.ncattrs():
np.add(data, variable.add_offset, out=data)
def __read_layer(self, layer_name:str, from_index: int = None, to_index: int = None) -> np.ndarray:
\"""
return the data of the specified variable as a numpy array.
Generated with {ntpath.basename(__file__)}
\"""
if from_index is None and to_index is None:
return self.dataset[layer_name][:]
if from_index is not None and to_index is None:
return self.dataset[layer_name][from_index:]
if from_index is None and to_index is not None:
return self.dataset[layer_name][:to_index]
return self.dataset[layer_name][from_index:to_index]
def __read_layer_as(
self, layer_name: str, from_numpy_dtype=np.int8, to_numpy_dtype=None, from_index: int = None, to_index: int = None
) -> np.ndarray:
\"""
return the data of the specified variable as a numpy array of a specific type.
Generated with {ntpath.basename(__file__)}
\"""
variable = self.dataset[layer_name]
variable.set_auto_maskandscale(False)
data = self.__read_layer(layer_name, from_index, to_index)
result = np.frombuffer(data, dtype=from_numpy_dtype).reshape(data.shape)
if to_numpy_dtype is not None:
result = result.astype(to_numpy_dtype)
self.__apply_offset_and_scale(variable, result)
return result
def __init__(self, file_path: str):
self._dataset = None
# Keep this layers in memory
self._antennas: Optional[np.ndarray] = None
self._fcs_depths: Optional[np.ndarray] = None
self._scs_depths: Optional[np.ndarray] = None
self._distance_scales: Optional[np.ndarray] = None
self._reflectivities: Optional[np.ndarray] = None
def open(self, mode: str = "r") -> nc.Dataset:
"""
Open the file and return the resulting Dataset
Implementation of SounderDriver abstract method
"""
self._dataset = nc.Dataset(self.sounder_file.file_path, mode)
self.sounder_file.south = self.dataset.mbSouthLatitude
self.sounder_file.north = self.dataset.mbNorthLatitude
self.sounder_file.west = self.dataset.mbWestLongitude
self.sounder_file.east = self.dataset.mbEastLongitude
self.sounder_file.swath_count = self.dataset.dimensions[CYCLE_NBR].size
self.sounder_file.beam_count = self.dataset.dimensions[BEAM_NBR].size
return self.dataset
def close(self) -> None:
"""
Close the dataset if opened
Implementation of SounderDriver abstract method
"""
if self.dataset and self.dataset.isopen():
self.dataset.close()
self._dataset = None
self._antennas = None
self._distance_scales = None
self._reflectivities = None
'''
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment