Source code for fcpgtools.terrainengine.pysheds_engine

import xarray as xr
import numpy as np
from pysheds.grid import Grid
from pysheds.view import Raster as PyShedsRaster
from pysheds.view import ViewFinder
from pathlib import Path
from typing import Union, Optional
import fcpgtools.tools as tools
import fcpgtools.utilities as utilities
import fcpgtools.custom_types as custom_types
from fcpgtools.custom_types import Raster, PyShedsInputDict, PourPointValuesDict


[docs]class PyShedsEngine: d8_format = 'esri' function_kwargs = { 'accumulate_flow': custom_types.PyShedsFACkwargsDict.__annotations__, 'accumulate_parameter': custom_types.PyShedsFACkwargsDict.__annotations__, } @staticmethod def _prep_fdr_for_pysheds( array: xr.DataArray, ) -> PyShedsInputDict: """Converts a three dimension (i.e. value = f(x, y)) xr.DataArray Flow Direction Raster into necessary pysheds inputs. Args: array: A 3-dimension array. Returns: A dict storing PyShed's relevant data formats of the following form {'input_array': param:array, 'raster': pysheds.Raster(), 'grid': pysheds.Grid()} """ array.rio.write_transform() affine = array.rio.transform() # get nodata value nodata_val = array.rio.nodata array_np = array.values.astype(dtype=str(array.dtype)).squeeze() # make a mask for the grid object mask = array.astype('bool') mask = mask.where(array != array.rio.nodata, False).values view = ViewFinder( shape=array_np.shape, affine=affine, nodata=nodata_val, mask=mask, ) raster_obj = PyShedsRaster( array_np, view, ) # note: edits to this dictionary should be reflected in the PyShedsInputDict TypedDict instance out_dict = { 'input_array': array, 'raster': raster_obj, 'grid': Grid().from_raster(raster_obj, affine=affine), } return out_dict @staticmethod def _pysheds_to_xarray( pysheds_io_dict: PyShedsInputDict, name: str = 'pysheds_output', ) -> xr.DataArray: """Backend function used to convert PySheds objects back into an xarray.DataArray.""" array = xr.DataArray( pysheds_io_dict['raster'], coords=pysheds_io_dict['input_array'].squeeze().coords, name=name, attrs=pysheds_io_dict['input_array'].attrs, ) return array
[docs] @staticmethod def accumulate_flow( d8_fdr: Raster, upstream_pour_points: Optional[PourPointValuesDict] = None, weights: Optional[xr.DataArray] = None, out_path: Optional[Union[str, Path]] = None, **kwargs, ) -> xr.DataArray: """Create a Flow Accumulation Cell (FAC) raster from a ESRI format D8 Flow Direction Raster. NOTE: Replaces tools.tauFlowAccum() from V1 FCPGtools. Args: d8_fdr: A ESRI format D8 Flow Direction Raster (dtype=Int). upstream_pour_points: A list of lists each with with coordinate tuples as the first item [0], and updated cell values as the second [1]. This allows the FAC to be made with boundary conditions such as upstream basin pour points. weights: A grid defining the value to accumulate from each cell. Default is a grid of 1s. out_path: Defines a path to save the output raster. **kwargs: Can pass in optional pysheds.Grid.accumulation kwargs. Returns: The output Flow Accumulation Cells (FAC) raster. """ d8_fdr = tools.load_raster(d8_fdr) d8_fdr = d8_fdr.where( (d8_fdr.values != d8_fdr.rio.nodata), 0, ) pysheds_input_dict = PyShedsEngine._prep_fdr_for_pysheds(d8_fdr) # prep kwargs to be passed into accumulate_flow() if 'kwargs' in kwargs.keys(): kwargs = kwargs['kwargs'] # add weights if necessary if weights is not None or upstream_pour_points is not None: if weights is not None: pass elif upstream_pour_points is not None: weights = xr.zeros_like( d8_fdr, dtype=np.dtype('float64'), ) + 1 weights = tools.adjust_parameter_raster( weights, d8_fdr, upstream_pour_points, ) weights = PyShedsRaster( tools.make_fac_weights( weights, d8_fdr, np.nan, ).values, pysheds_input_dict['raster'].viewfinder, ) else: weights = None # apply accumulate function accumulate = pysheds_input_dict['grid'].accumulation( pysheds_input_dict['raster'], nodata_in=pysheds_input_dict['input_array'].rio.nodata, weights=weights, kwargs=kwargs, ) # export back to DataArray out_raster = PyShedsEngine._pysheds_to_xarray( pysheds_io_dict={ 'grid': pysheds_input_dict['grid'], 'raster': accumulate, 'input_array': pysheds_input_dict['input_array'], }, name='accumulate', ) # convert out of bounds values to np.nan out_raster = out_raster.where( d8_fdr.values != d8_fdr.rio.nodata, out_raster.rio.nodata, ) out_raster = utilities._change_nodata_value( out_raster, np.nan, ) # save if necessary if out_path is not None: tools.save_raster( out_raster, out_path, ) return out_raster
[docs] @staticmethod def accumulate_parameter( d8_fdr: Raster, parameter_raster: Raster, upstream_pour_points: Optional[PourPointValuesDict] = None, out_path: Optional[Union[str, Path]] = None, **kwargs, ) -> xr.DataArray: """Create a parameter accumulation raster from a ESRI format D8 Flow Direction Raster and a parameter raster. A key aspect of this function is that the output DataArray will have dimensions matching param:parameter_raster. NOTE: Replaces tools.accumulateParam() from V1 FCPGtools. Args: d8_fdr: A ESRI format D8 Flow Direction Raster (dtype=Int). parameter_raster: A parameter raster aligned via tools.align_raster() with the us_fdr. This can be multi-dimensional (i.e. f(x, y, t)), and if so, a multi-dimensional output is returned. upstream_pour_points: A list of lists each with with coordinate tuples as the first item [0], and updated cell values as the second [1]. This allows the FAC to be made with boundary conditions such as upstream basin pour points. out_path: Defines a path to save the output raster. **kwargs: Can pass in optional pysheds.Grid.accumulation kwargs. Returns: The output parameter accumulation raster. """ d8_fdr = tools.load_raster(d8_fdr) parameter_raster = tools.load_raster(parameter_raster) # add any pour point accumulation via utilities.tools.adjust_parameter_raster() if upstream_pour_points is not None: parameter_raster = tools.adjust_parameter_raster( parameter_raster, d8_fdr, upstream_pour_points, ) # prep kwargs to be passed into accumulate_flow() if 'kwargs' in kwargs.keys(): kwargs = kwargs['kwargs'] # split if multi-dimensional if len(parameter_raster.shape) > 2: raster_bands = utilities._split_bands(parameter_raster) else: raster_bands = {(0, 0): parameter_raster} # create weighted accumulation rasters out_dict = {} for index_tuple, array in raster_bands.items(): i, dim_name = index_tuple accumulated = PyShedsEngine.accumulate_flow( d8_fdr, upstream_pour_points=upstream_pour_points, weights=array, kwargs=kwargs, ) out_dict[(i, dim_name)] = accumulated.copy() # re-combine into DataArray if len(out_dict.keys()) > 1: out_raster = utilities._combine_split_bands(out_dict) else: out_raster = list(out_dict.items())[0][1] # save if necessary if out_path is not None: tools.save_raster( out_raster, out_path, ) return out_raster