#!/usr/bin/env python
u"""
raster.py
Written by Tyler Sutterley (05/2024)
Utilities for reading and operating on raster data
PYTHON DEPENDENCIES:
numpy: Scientific Computing Tools For Python
https://numpy.org
https://numpy.org/doc/stable/user/numpy-for-matlab-users.html
netCDF4: Python interface to the netCDF C library
https://unidata.github.io/netcdf4-python/netCDF4/index.html
h5py: Pythonic interface to the HDF5 binary data format
https://www.h5py.org/
gdal: Pythonic interface to the Geospatial Data Abstraction Library (GDAL)
https://pypi.python.org/pypi/GDAL
PyYAML: YAML parser and emitter for Python
https://github.com/yaml/pyyaml
PROGRAM DEPENDENCIES:
spatial.py: utilities for reading and writing spatial data
UPDATE HISTORY:
Updated 05/2024: use wrapper to importlib for optional dependencies
make subscriptable and allow item assignment
Updated 11/2023: cache interpolators for improving computational times
Written 10/2023
"""
from __future__ import print_function, annotations
import collections
import numpy as np
import scipy.interpolate
import grounding_zones.spatial
from grounding_zones.utilities import import_dependency
# attempt imports
pyproj = import_dependency('pyproj')
pyTMD = import_dependency('pyTMD')
[docs]
class raster:
"""Utilities for using raster files
"""
np.seterr(invalid='ignore')
def __init__(self, **kwargs):
# set default class attributes
self.dims = []
self.fields = []
self.attributes = dict()
self.interpolator = collections.OrderedDict()
def __getitem__(self, key):
return getattr(self, key)
def __setitem__(self, key, value):
setattr(self, key, value)
# PURPOSE: read a raster file
[docs]
def from_file(self, input_file, format=None, **kwargs):
"""
Read a raster file from an input format
Parameters
----------
input_file: str
path or memory map for raster file
format: str
format of input file
**kwargs: dict
Keyword arguments for file reader
"""
dinput = grounding_zones.spatial.from_file(
filename=input_file,
format=format,
**kwargs
)
# convert from dictionary to class attributes
for key,val in dinput.items():
if key in ('x','y','time'):
self.dims.append(key)
elif key in ('attributes','filename'):
pass
else:
self.fields.append(key)
# set attribute
self[key] = val
# return the raster object
return self
[docs]
def interp(self, x, y, order=0, reducer=np.ceil):
"""
Sample raster data at points
Parameters
----------
datain: np.ndarray
input data grid to be interpolated
x: np.ndarray
output x-coordinates
y: np.ndarray
output y-coordinates
order: int, default 0
interpolation order
- ``0``: nearest-neighbor interpolation
- ``k``: bivariate spline interpolation of degree k
reducer: obj, default np.ceil
operation for converting mask to boolean
"""
# output dictionary with interpolated data
temp = collections.OrderedDict()
# copy dimensions
temp.x = np.copy(x)
temp.y = np.copy(y)
# copy field names
temp.fields = self.fields
# for each field in the input data
for field in self.fields:
# extract data for field
d_in = self[field]
# interpolate values
if (order == 0):
# interpolate with nearest-neighbors
xcoord = (len(self.x)-1)*(x-self.x[0])/(self.x[-1]-self.x[0])
ycoord = (len(self.y)-1)*(y-self.y[0])/(self.y[-1]-self.y[0])
xcoord = np.clip(xcoord, 0, len(self.x)-1)
ycoord = np.clip(ycoord, 0, len(self.y)-1)
XI = np.around(xcoord).astype(np.int32)
YI = np.around(ycoord).astype(np.int32)
# interpolate data and mask for field
d_out = d_in[YI, XI]
if np.ma.is_masked(d_in):
mask = reducer(d_in.mask[YI, XI])
d_out = np.ma.array(d_out, mask=mask.astype(bool))
# set interpolated data for field
temp[field] = d_out
else:
# interpolate with bivariate spline approximations
# cache interpolator for faster interpolation
if field not in self.interpolator:
self.interpolator[field] = \
scipy.interpolate.RectBivariateSpline(
self.x, self.y, d_in.T,
kx=order, ky=order
)
if np.ma.is_masked(d_in):
self.interpolator[field].mask = \
scipy.interpolate.RectBivariateSpline(
self.x, self.y, d_in.mask.T,
kx=order, ky=order
)
# interpolate data and mask for field
d_out = self.interpolator[field].ev(x, y)
if np.ma.is_masked(d_in):
mask = reducer(self.interpolator[field].mask.ev(x, y))
d_out = np.ma.array(d_out, mask=mask.astype(bool))
# set interpolated data for field
temp[field] = d_out
# return the interpolated data on the output grid
return temp
[docs]
def warp(self, x, y, order=0, reducer=np.ceil):
"""
Interpolate raster data to a new grid
Parameters
----------
datain: np.ndarray
input data grid to be interpolated
x: np.ndarray
output x-coordinate array
y: np.ndarray
output y-coordinate array
order: int, default 0
interpolation order
- ``0``: nearest-neighbor interpolation
- ``k``: bivariate spline interpolation of degree k
reducer: obj, default np.ceil
operation for converting mask to boolean
"""
temp = raster()
# copy field names
temp.fields = self.fields
# create meshgrid of output coordinates
xout, yout = np.meshgrid(x, y)
# for each field in the input data
for field in self.fields:
# extract data for field
d_in = self[field]
# interpolate values
if (order == 0):
# interpolate with nearest-neighbors
xcoord = (len(self.x)-1)*(xout-self.x[0])/(self.x[-1]-self.x[0])
ycoord = (len(self.y)-1)*(yout-self.y[0])/(self.y[-1]-self.y[0])
xcoord = np.clip(xcoord, 0, len(self.x)-1)
ycoord = np.clip(ycoord, 0, len(self.y)-1)
XI = np.around(xcoord).astype(np.int32)
YI = np.around(ycoord).astype(np.int32)
# interpolate data and mask for field
d_out = d_in[YI, XI]
if np.ma.is_masked(d_in):
mask = reducer(d_in.mask[YI, XI])
d_out = np.ma.array(d_out, mask=mask.astype(bool))
# set interpolated data for field
temp[field] = d_out
else:
# interpolate with bivariate spline approximations
# cache interpolator for faster interpolation
if field not in self.interpolator:
self.interpolator[field] = \
scipy.interpolate.RectBivariateSpline(
self.x, self.y, d_in.T,
kx=order, ky=order
)
if np.ma.is_masked(d_in):
self.interpolator[field].mask = \
scipy.interpolate.RectBivariateSpline(
self.x, self.y, d_in.mask.T,
kx=order, ky=order
)
# interpolate data and mask for field
d_out = self.interpolator[field].ev(xout, yout)
if np.ma.is_masked(d_in):
mask = reducer(self.interpolator[field].mask.ev(xout, yout))
d_out = np.ma.array(d_out, mask=mask.astype(bool))
# set interpolated data for field
temp[field] = d_out
# return the interpolated data on the output grid
return temp
[docs]
def get_latlon(self, srs_proj4=None, srs_wkt=None, srs_epsg=None):
"""
Get the latitude and longitude of grid cells
Parameters
----------
srs_proj4: str or NoneType, default None
PROJ4 projection string
srs_wkt: str or NoneType, default None
Well-Known Text (WKT) projection string
srs_epsg: int or NoneType, default None
EPSG projection code
Returns
-------
longitude: np.ndarray
longitude coordinates of grid cells
latitude: np.ndarray
latitude coordinates of grid cells
"""
# set the spatial projection reference information
if srs_proj4 is not None:
source = pyproj.CRS.from_proj4(srs_proj4)
elif srs_wkt is not None:
source = pyproj.CRS.from_wkt(srs_wkt)
elif srs_epsg is not None:
source = pyproj.CRS.from_epsg(srs_epsg)
else:
source = pyproj.CRS.from_string(self.projection)
# target spatial reference (WGS84 latitude and longitude)
target = pyproj.CRS.from_epsg(4326)
# create transformation
transformer = pyproj.Transformer.from_crs(source, target,
always_xy=True)
# create meshgrid of points in original projection
x, y = np.meshgrid(self.x, self.y)
# convert coordinates to latitude and longitude
self.lon, self.lat = transformer.transform(x, y)
return self
[docs]
def copy(self):
"""
Copy a ``raster`` object to a new ``raster`` object
"""
temp = raster()
# copy attributes or update attributes dictionary
if isinstance(self.attributes, list):
temp['attributes'] = self.attributes
elif isinstance(self.attributes, dict):
temp.attributes.update(self.attributes)
# get dimensions and field names
temp.dims = self.dims
temp.fields = self.fields
# assign variables to self
for key in [*self.dims, *self.fields]:
try:
temp[key] = self[key].copy()
except AttributeError:
pass
return temp
[docs]
def flip(self, axis=0):
"""
Reverse the order of data and dimensions along an axis
Parameters
----------
axis: int, default 0
axis to reorder
"""
# output spatial object
temp = self.copy()
# copy dimensions and reverse order
if (axis == 0):
temp.y = temp.y[::-1].copy()
elif (axis == 1):
temp.x = temp.x[::-1].copy()
# attempt to reverse possible data variables
for key in self.fields:
try:
temp[key] = np.flip(self[key], axis=axis)
except Exception as exc:
pass
return temp