#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2024 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of PANDORA
#
# https://github.com/CNES/Pandora_pandora
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains functions for estimating the risk.
"""
import os
import warnings
from typing import Dict, Tuple, Union
from ast import literal_eval
import numpy as np
from json_checker import Checker, And
from numba import njit, prange
import xarray as xr
from . import cost_volume_confidence
@cost_volume_confidence.AbstractCostVolumeConfidence.register_subclass("risk")
[docs]
class Risk(cost_volume_confidence.AbstractCostVolumeConfidence):
"""
Allows to estimate a risk confidence from the cost volume
"""
# Default configuration, do not change this value
# Percentile value to normalize ambiguity
# Method name
[docs]
_method_max = "risk_max"
[docs]
_method_min = "risk_min"
# Indicator
def __init__(self, **cfg: str) -> None:
"""
:param cfg: optional configuration, {'confidence_method': 'risk', 'eta_min': float, 'eta_max': float,
'eta_step': float}
:type cfg: dict
:return: None
"""
self.cfg = self.check_conf(**cfg)
self._eta_min = self._ETA_MIN
self._percentile = self._PERCENTILE
self._eta_step = float(self.cfg["eta_step"])
self._eta_max = float(self.cfg["eta_max"])
self._indicator_max = self._method_max + str(self.cfg["indicator"])
self._indicator_min = self._method_min + str(self.cfg["indicator"])
self._etas = np.arange(self._eta_min, self._eta_max, self._eta_step)
self._nbr_etas = self._etas.shape[0]
[docs]
def check_conf(self, **cfg: Union[str, float]) -> Dict[str, Union[str, float]]:
"""
Add default values to the dictionary if there are missing elements and check if the dictionary is correct
:param cfg: ambiguity configuration
:type cfg: dict
:return cfg: ambiguity configuration updated
:rtype: dict
"""
if "eta_max" not in cfg:
cfg["eta_max"] = self._ETA_MAX
if "eta_step" not in cfg:
cfg["eta_step"] = self._ETA_STEP
if "indicator" not in cfg:
cfg["indicator"] = self._indicator
schema = {
"confidence_method": And(str, lambda input: "risk"),
"eta_max": And(float, lambda input: 0 < input < 1),
"eta_step": And(float, lambda input: 0 < input < 1),
"indicator": str,
}
checker = Checker(schema)
checker.validate(cfg)
return cfg
[docs]
def desc(self) -> None:
"""
Describes the confidence method
:return: None
"""
print("Risk method")
[docs]
def confidence_prediction(
self,
disp: xr.Dataset,
img_left: xr.Dataset = None,
img_right: xr.Dataset = None,
cv: xr.Dataset = None,
) -> Tuple[xr.Dataset, xr.Dataset]:
"""
Computes a risk confidence measure that evaluates the matching cost function at each point
:param disp: the disparity map dataset
:type disp: xarray.Dataset
:param img_left: left Dataset image
:tye img_left: xarray.Dataset
:param img_right: right Dataset image
:type img_right: xarray.Dataset
:param cv: cost volume dataset
:type cv: xarray.Dataset
:return: the disparity map and the cost volume with new indicators 'risk_max_confidence'
and 'risk_min_confidence' in the
:rtype: Tuple(xarray.Dataset, xarray.Dataset)
"""
grids = np.array(
[img_left["disparity"].sel(band_disp="min"), img_left["disparity"].sel(band_disp="max")], dtype=np.int64
)
# Get disparity intervals parameters
disparity_range = cv["disp"].data.astype(np.float32)
# This silences numba's TBB threading layer warning
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
# Obtain sampled_ambiguity, necessary for risk_min computation
ambiguity_ = cost_volume_confidence.AbstractCostVolumeConfidence( # type: ignore
**{"confidence_method": "ambiguity"} # type: ignore
)
_, sampled_ambiguity = ambiguity_.compute_ambiguity_and_sampled_ambiguity( # type: ignore
cv["cost_volume"].data, self._etas, self._nbr_etas, grids, disparity_range
)
# Computes risk using numba in parallel for memory and computation time optimization
risk_max, risk_min = self.compute_risk(
cv["cost_volume"].data,
sampled_ambiguity,
self._etas,
self._nbr_etas,
grids,
disparity_range,
)
disp, cv = self.allocate_confidence_map(self._indicator_max, risk_max, disp, cv)
disp, cv = self.allocate_confidence_map(self._indicator_min, risk_min, disp, cv)
return disp, cv
@staticmethod
@njit(
"Tuple((f4[:, :],f4[:, :]))(f4[:, :, :], f4[:, :, :], f8[:], i8, i8[:, :, :], f4[:])",
parallel=literal_eval(os.environ.get("PANDORA_NUMBA_PARALLEL", "True")),
cache=True,
)
[docs]
def compute_risk(
cv: np.ndarray,
sampled_ambiguity: np.ndarray,
etas: np.ndarray,
nbr_etas: int,
grids: np.ndarray,
disparity_range: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Computes minimum and maximum risk.
:param cv: cost volume
:type cv: 3D np.ndarray (row, col, disp)
:param sampled_ambiguity: sampled cost volume ambiguity
:type sampled_ambiguity: 3D np.ndarray (row, col, eta)
:param etas: range between eta_min and eta_max with step eta_step
:type etas: np.ndarray
:param nbr_etas: number of etas
:type nbr_etas: int
:param grids: array containing min and max disparity grids
:type grids: 2D np.ndarray (min, max)
:param disparity_range: array containing disparity range
:type disparity_range: np.ndarray
:return: the minimum and maximum risk
:rtype: Tuple(2D np.ndarray (row, col) dtype = float32, 2D np.ndarray (row, col) dtype = float32)
"""
# Minimum and maximum of all costs, useful to normalize the cost volume
min_cost = np.nanmin(cv)
max_cost = np.nanmax(cv)
n_row, n_col, nb_disps = cv.shape
# Numba does not support the np.tile operation
two_dim_etas = np.repeat(etas, nb_disps).reshape((-1, nb_disps)).T.flatten()
# Initialize min and max risk integral
risk_max = np.zeros((n_row, n_col), dtype=np.float32)
risk_min = np.zeros((n_row, n_col), dtype=np.float32)
for row in prange(n_row): # pylint: disable=not-an-iterable
for col in prange(n_col): # pylint: disable=not-an-iterable
# Normalized minimum cost for one point
normalized_min_cost = (np.nanmin(cv[row, col, :]) - min_cost) / (max_cost - min_cost)
# If all costs are at nan, set the risk at nan for this point
if np.isnan(normalized_min_cost):
risk_max[row, col] = np.nan
risk_min[row, col] = np.nan
else:
normalized_min_cost = np.repeat(normalized_min_cost, nb_disps * nbr_etas)
# Normalized cost volume for one point
normalized_cv = (cv[row, col, :] - min_cost) / (max_cost - min_cost)
# Mask nan to -inf to later discard values out of [min; min + eta]
idx_disp_min = np.searchsorted(disparity_range, grids[0][row, col])
idx_disp_max = np.searchsorted(disparity_range, grids[1][row, col]) + 1
normalized_cv[idx_disp_min:idx_disp_max][
np.isnan(normalized_cv[idx_disp_min:idx_disp_max])
] = -np.inf
normalized_cv = np.repeat(normalized_cv, nbr_etas)
# Initialize all disparities
disp_cv = np.arange(nb_disps, dtype=np.float32)
disp_cv = np.repeat(disp_cv, nbr_etas)
# Remove disparities for every similarity value outside [min;min+eta[
disp_cv[normalized_cv > (normalized_min_cost + two_dim_etas)] = np.nan
# Reshape to distinguish each sample's disparity range
disp_cv = disp_cv.reshape((nb_disps, nbr_etas))
# Initialize minimum and maximum disparities
min_disp = np.zeros(nbr_etas)
max_disp = np.zeros(nbr_etas)
# Obtain min and max disparities for each sample
for i in range(nbr_etas):
min_disp[i] = np.nanmin(disp_cv[:, i])
max_disp[i] = np.nanmax(disp_cv[:, i])
# fill mean risks
risk_max[row, col] = np.nanmean(max_disp - min_disp)
# fill mean min risk. risk min is defined as mean( (1+risk(p,k)) - amb(p,k) )
risk_min[row, col] = np.nanmean((1 + (max_disp - min_disp)) - sampled_ambiguity[row, col, :])
return risk_max, risk_min
@staticmethod
@njit(
"Tuple((f4[:, :],f4[:, :],f4[:, :, :],f4[:, :, :]))(f4[:, :, :], f4[:, :, :], f8[:], i8, i8[:, :, :], f4[:])",
parallel=literal_eval(os.environ.get("PANDORA_NUMBA_PARALLEL", "True")),
cache=True,
)
[docs]
def compute_risk_and_sampled_risk(
cv: np.ndarray,
sampled_ambiguity: np.ndarray,
etas: np.ndarray,
nbr_etas: int,
grids: np.ndarray,
disparity_range: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Computes minimum and maximum risk and sampled_risk.
:param cv: cost volume
:type cv: 3D np.ndarray (row, col, disp)
:param sampled_ambiguity: sampled cost volume ambiguity
:type sampled_ambiguity: 3D np.ndarray (row, col, eta)
:param etas: range between eta_min and eta_max with step eta_step
:type etas: np.ndarray
:param nbr_etas: nuber of etas
:type nbr_etas: int
:param grids: array containing min and max disparity grids
:type grids: 2D np.ndarray (min, max)
:param disparity_range: array containing disparity range
:type disparity_range: np.ndarray
:return: the risk
:rtype: Tuple(2D np.ndarray (row, col) dtype = float32, 2D np.ndarray (row, col) dtype = float32,
3D np.ndarray (row, col) dtype = float32, 3D np.ndarray (row, col) dtype = float32)
"""
# Minimum and maximum of all costs, useful to normalize the cost volume
min_cost = np.nanmin(cv)
max_cost = np.nanmax(cv)
n_row, n_col, nb_disps = cv.shape
# Numba does not support the np.tile operation
two_dim_etas = np.repeat(etas, nb_disps).reshape((-1, nb_disps)).T.flatten()
# Initialize min and max risk integral
risk_max = np.zeros((n_row, n_col), dtype=np.float32)
risk_min = np.zeros((n_row, n_col), dtype=np.float32)
# Initialize min and max sampled risks
sampled_risk_min = np.zeros((n_row, n_col, nbr_etas), dtype=np.float32)
sampled_risk_max = np.zeros((n_row, n_col, nbr_etas), dtype=np.float32)
for row in prange(n_row): # pylint: disable=not-an-iterable
for col in prange(n_col): # pylint: disable=not-an-iterable
# Normalized minimum cost for one point
normalized_min_cost = (np.nanmin(cv[row, col, :]) - min_cost) / (max_cost - min_cost)
# If all costs are at nan, set the risk at nan for this point
if np.isnan(normalized_min_cost):
sampled_risk_min[row, col, :] = np.nan
sampled_risk_max[row, col, :] = np.nan
risk_max[row, col] = np.nan
risk_min[row, col] = np.nan
else:
normalized_min_cost = np.repeat(normalized_min_cost, nb_disps * nbr_etas)
# Normalized cost volume for one point
normalized_cv = (cv[row, col, :] - min_cost) / (max_cost - min_cost)
# Mask nan to -inf to later discard values out of [min; min + eta]
idx_disp_min = np.searchsorted(disparity_range, grids[0][row, col])
idx_disp_max = np.searchsorted(disparity_range, grids[1][row, col]) + 1
normalized_cv[idx_disp_min:idx_disp_max][
np.isnan(normalized_cv[idx_disp_min:idx_disp_max])
] = -np.inf
normalized_cv[:idx_disp_min][np.isnan(normalized_cv[:idx_disp_min])] = np.nan
normalized_cv[idx_disp_max:][np.isnan(normalized_cv[idx_disp_max:])] = np.nan
normalized_cv = np.repeat(normalized_cv, nbr_etas)
# Initialize all disparities
disp_cv = np.arange(nb_disps, dtype=np.float32)
disp_cv = np.repeat(disp_cv, nbr_etas)
# Remove disparities for every similarity value outside [min;min+eta[
disp_cv[normalized_cv > (normalized_min_cost + two_dim_etas)] = np.nan
# Reshape to distinguish each sample's disparity range
disp_cv = disp_cv.reshape((nb_disps, nbr_etas))
# Initialize minimum and maximum disparities
min_disp = np.zeros(nbr_etas)
max_disp = np.zeros(nbr_etas)
# Obtain min and max disparities for each sample
for i in range(nbr_etas):
min_disp[i] = np.nanmin(disp_cv[:, i])
max_disp[i] = np.nanmax(disp_cv[:, i])
# fill sampled max risk
sampled_risk_max[row, col, :] += max_disp - min_disp
# fill sampled min risk. risk min is defined as ( (1+risk(p,k)) - amb(p,k) )
sampled_risk_min[row, col, :] += (1 + (max_disp - min_disp)) - sampled_ambiguity[row, col, :]
# fill mean risks
risk_max[row, col] = np.nanmean(max_disp - min_disp)
# fill mean min risk. risk min is defined as mean( (1+risk(p,k)) - amb(p,k) )
risk_min[row, col] = np.nanmean((1 + (max_disp - min_disp)) - sampled_ambiguity[row, col, :])
return risk_max, risk_min, sampled_risk_max, sampled_risk_min