Created
September 10, 2021 10:43
-
-
Save Aluriak/1eaa3084659ea031992d77e681429ad1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
# coding: utf-8 | |
# | |
# Module to generate the code of Netcdf4 Driver (Mbg...) | |
# | |
import ntpath | |
import netCDF4 as nc | |
import numpy as np | |
# Global parameters | |
nc_file = '/home/lucas/shared/filtri_Pr_023_tide_COR__MAREE.mbg' | |
driver_name = "Mbg" | |
layer_prefix = "mb" # None if variables does start by any prefix | |
def camel_to_snake(s: str) -> str: | |
return "".join(["_" + c.lower() if c.isupper() else c for c in s]).lstrip("_") | |
def variable_to_layer(variable_name: str) -> str: | |
if layer_prefix is not None and variable_name.startswith(layer_prefix): | |
variable_name = variable_name[len(layer_prefix) :] | |
return camel_to_snake(variable_name).upper() | |
def print_variable_initialization(variable, variable_name, max_var_name_len: int): | |
dtype = 'np.' + repr(variable.dtype) | |
has_vm = "valid_maximum" in variable.ncattrs() | |
vm = variable.valid_maximum if has_vm else None | |
has_sf = "scale_factor" in variable.ncattrs() | |
sf = isinstance(variable.scale_factor, float) if has_sf else None | |
r = dtype + ', ' + ', '.join(map(repr, (has_vm, vm, has_sf, sf))) | |
print(f' {repr(variable_to_layer(variable_name)).rjust(max_var_name_len+5)}: ({r}),') | |
with nc.Dataset(nc_file) as dataset: | |
print( | |
""" | |
#! /usr/bin/env python3 | |
# coding: utf-8 | |
import netCDF4 as nc | |
import numpy as np | |
from functools import partial as partialmethod # for currying | |
BYTE_MAX = (1 << 7) - 1 | |
SHORT_MAX = (1 << 15) - 1 | |
INT_MAX = (1 << 31) - 1 | |
layer_prefix = "mb" # None if variables does start by any prefix | |
def camel_to_snake(s: str) -> str: | |
return "".join(["_" + c.lower() if c.isupper() else c for c in s]).lstrip("_") | |
def variable_to_layer(variable_name: str) -> str: | |
if layer_prefix is not None and variable_name.startswith(layer_prefix): | |
variable_name = variable_name[len(layer_prefix) :] | |
return camel_to_snake(variable_name).upper() | |
""" | |
) | |
print("# Dimensions") | |
dimensions = list(dataset.dimensions) | |
dimensions.sort() | |
for variable_name in dimensions: | |
print(f'{variable_to_layer(variable_name)} = "{variable_name}"') | |
print("# Layers") | |
variables = list(dataset.variables.keys()) | |
variables.sort() | |
for variable_name in variables: | |
print(f'{variable_to_layer(variable_name)} = "{variable_name}"') | |
# Class definition | |
print( | |
f""" | |
class {driver_name}Driver(): | |
\""" | |
Driver class of {driver_name} files to encapsulate the Netcdf4 access | |
\""" | |
@property | |
def dataset(self) -> nc.Dataset: | |
return self._dataset | |
""" | |
) | |
print(' LAYERS = {') | |
max_var_name_len = max(map(len, variables)) | |
for variable_name in variables: | |
print_variable_initialization(dataset.variables[variable_name], variable_name, max_var_name_len) | |
print(' }') | |
print(f''' | |
# def __getattr__(self, k): | |
# """Return the data of asked variable as a numpy array. | |
# Generated with {ntpath.basename(__file__)} | |
# """ | |
# if k.startswith('read_'): | |
# layer = variable_to_layer(k[len('read_'):]) | |
# if layer in self.LAYERS: | |
# return self.__accessor_for(layer) | |
# # in any other case… | |
# raise AttributeError | |
def __accessor_for(self, layer: str): | |
dtype, has_valid_max, valid_maximum, has_sf, sf = self.LAYERS[layer] | |
numpy_dtype = None | |
if has_valid_max: | |
if dtype == np.short and valid_maximum > SHORT_MAX: | |
numpy_dtype = np.ushort | |
elif dtype is int and valid_maximum > INT_MAX: | |
numpy_dtype = np.uintc | |
elif dtype == np.byte and valid_maximum > BYTE_MAX: | |
numpy_dtype = np.ubyte | |
elif dtype == np.dtype("|S1"): | |
numpy_dtype = np.int8 | |
if numpy_dtype: | |
if has_sf and isinstance(sf, float): | |
return partialmethod(self.__read_layer_as, layer_name=layer, from_numpy_dtype=numpy_dtype.__name__, to_numpy_dtype=float) | |
else: | |
return partialmethod(self.__read_layer_as, layer_name=layer, from_numpy_dtype=numpy_dtype.__name__, to_numpy_dtype=None) | |
else: | |
return partialmethod(self.__read_layer, layer_name=layer) | |
def __new__(cls, *args, **kwargs): | |
obj = super().__new__(cls) | |
# create accessors | |
for layer in cls.LAYERS: | |
layername = layer.lower() | |
setattr(obj, f'read_{{layername}}', obj.__accessor_for(layer)) | |
return obj | |
''') | |
# Factorized methods | |
print(f''' | |
def __apply_offset_and_scale(self, variable : nc.Variable, data : np.ndarray) -> None: | |
\""" | |
Apply the offset and scale if present | |
Generated with {ntpath.basename(__file__)} | |
\""" | |
if "scale_factor" in variable.ncattrs(): | |
np.multiply(data, variable.scale_factor, out=data) | |
if "add_offset" in variable.ncattrs(): | |
np.add(data, variable.add_offset, out=data) | |
def __read_layer(self, layer_name:str, from_index: int = None, to_index: int = None) -> np.ndarray: | |
\""" | |
return the data of the specified variable as a numpy array. | |
Generated with {ntpath.basename(__file__)} | |
\""" | |
if from_index is None and to_index is None: | |
return self.dataset[layer_name][:] | |
if from_index is not None and to_index is None: | |
return self.dataset[layer_name][from_index:] | |
if from_index is None and to_index is not None: | |
return self.dataset[layer_name][:to_index] | |
return self.dataset[layer_name][from_index:to_index] | |
def __read_layer_as( | |
self, layer_name: str, from_numpy_dtype=np.int8, to_numpy_dtype=None, from_index: int = None, to_index: int = None | |
) -> np.ndarray: | |
\""" | |
return the data of the specified variable as a numpy array of a specific type. | |
Generated with {ntpath.basename(__file__)} | |
\""" | |
variable = self.dataset[layer_name] | |
variable.set_auto_maskandscale(False) | |
data = self.__read_layer(layer_name, from_index, to_index) | |
result = np.frombuffer(data, dtype=from_numpy_dtype).reshape(data.shape) | |
if to_numpy_dtype is not None: | |
result = result.astype(to_numpy_dtype) | |
self.__apply_offset_and_scale(variable, result) | |
return result | |
def __init__(self, file_path: str): | |
self._dataset = None | |
# Keep this layers in memory | |
self._antennas: Optional[np.ndarray] = None | |
self._fcs_depths: Optional[np.ndarray] = None | |
self._scs_depths: Optional[np.ndarray] = None | |
self._distance_scales: Optional[np.ndarray] = None | |
self._reflectivities: Optional[np.ndarray] = None | |
def open(self, mode: str = "r") -> nc.Dataset: | |
""" | |
Open the file and return the resulting Dataset | |
Implementation of SounderDriver abstract method | |
""" | |
self._dataset = nc.Dataset(self.sounder_file.file_path, mode) | |
self.sounder_file.south = self.dataset.mbSouthLatitude | |
self.sounder_file.north = self.dataset.mbNorthLatitude | |
self.sounder_file.west = self.dataset.mbWestLongitude | |
self.sounder_file.east = self.dataset.mbEastLongitude | |
self.sounder_file.swath_count = self.dataset.dimensions[CYCLE_NBR].size | |
self.sounder_file.beam_count = self.dataset.dimensions[BEAM_NBR].size | |
return self.dataset | |
def close(self) -> None: | |
""" | |
Close the dataset if opened | |
Implementation of SounderDriver abstract method | |
""" | |
if self.dataset and self.dataset.isopen(): | |
self.dataset.close() | |
self._dataset = None | |
self._antennas = None | |
self._distance_scales = None | |
self._reflectivities = None | |
''' | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment