#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2026 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of PANDORA
#
# https://github.com/CNES/Pandora_pandora
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains functions for estimating the risk.
"""
from typing import Dict, Tuple, Union
import numpy as np
import xarray as xr
from json_checker import And, Checker
from pandora.profiler import profile
from .cpp import cost_volume_confidence_cpp
from . import cost_volume_confidence
@cost_volume_confidence.AbstractCostVolumeConfidence.register_subclass("risk")
[docs]
class Risk(cost_volume_confidence.AbstractCostVolumeConfidence):
"""
Allows to estimate a risk confidence from the cost volume
"""
# Default configuration, do not change this value
# Percentile value to normalize ambiguity
# Method name
[docs]
_method_max = "risk_max"
[docs]
_method_min = "risk_min"
[docs]
_method_disp_inf = "disp_inf_from_risk"
[docs]
_method_disp_sup = "disp_sup_from_risk"
@profile("risk.__init__")
def __init__(self, **cfg: str) -> None:
"""
:param cfg: optional configuration, {'confidence_method': 'risk', 'eta_min': float, 'eta_max': float,
'eta_step': float}
:type cfg: dict
:return: None
"""
[docs]
self.cfg = self.check_conf(**cfg)
[docs]
self._eta_min = self._ETA_MIN
[docs]
self._percentile = self._PERCENTILE
[docs]
self._eta_step = float(self.cfg["eta_step"])
[docs]
self._eta_max = float(self.cfg["eta_max"])
[docs]
self._indicator_max = self._method_max + str(self.cfg["indicator"])
[docs]
self._indicator_min = self._method_min + str(self.cfg["indicator"])
[docs]
self._indicator_disp_sup = self._method_disp_sup + str(self.cfg["indicator"])
[docs]
self._indicator_disp_inf = self._method_disp_inf + str(self.cfg["indicator"])
[docs]
self._etas = np.arange(self._eta_min, self._eta_max, self._eta_step)
[docs]
self._nbr_etas = self._etas.shape[0]
[docs]
def check_conf(self, **cfg: Union[str, float]) -> Dict[str, Union[str, float]]:
"""
Add default values to the dictionary if there are missing elements and check if the dictionary is correct
:param cfg: ambiguity configuration
:type cfg: dict
:return cfg: ambiguity configuration updated
:rtype: dict
"""
if "eta_max" not in cfg:
cfg["eta_max"] = self._ETA_MAX
if "eta_step" not in cfg:
cfg["eta_step"] = self._ETA_STEP
if "indicator" not in cfg:
cfg["indicator"] = self._indicator
schema = {
"confidence_method": And(str, lambda input: "risk"),
"eta_max": And(float, lambda input: 0 < input < 1),
"eta_step": And(float, lambda input: 0 < input < 1),
"indicator": str,
}
checker = Checker(schema)
checker.validate(cfg)
return cfg
[docs]
def desc(self) -> None:
"""
Describes the confidence method
:return: None
"""
print("Risk method")
@profile("risk.confidence_prediction")
[docs]
def confidence_prediction(
self,
disp: xr.Dataset,
img_left: xr.Dataset = None,
img_right: xr.Dataset = None,
cv: xr.Dataset = None,
) -> Tuple[xr.Dataset, xr.Dataset]:
"""
Computes a risk confidence measure that evaluates the matching cost function at each point
:param disp: the disparity map dataset
:type disp: xarray.Dataset
:param img_left: left Dataset image
:tye img_left: xarray.Dataset
:param img_right: right Dataset image
:type img_right: xarray.Dataset
:param cv: cost volume dataset
:type cv: xarray.Dataset
:return: the disparity map and the cost volume with new indicators 'risk_max_confidence'
and 'risk_min_confidence' in the
:rtype: Tuple(xarray.Dataset, xarray.Dataset)
"""
type_measure_max = cv.attrs["type_measure"] == "max"
if type_measure_max:
cv["cost_volume"].data *= -1
grids = np.array(
[img_left["disparity"].sel(band_disp="min"), img_left["disparity"].sel(band_disp="max")], dtype=np.int64
)
# Get disparity intervals parameters
disparity_range = cv["disp"].data.astype(np.float32)
_, sampled_ambiguity = cost_volume_confidence_cpp.compute_ambiguity_and_sampled_ambiguity(
cv["cost_volume"].data, self._etas, self._nbr_etas, grids, disparity_range, True
)
risk_max, risk_min, disp_sup, disp_inf = self.compute_risk(
cv["cost_volume"].data,
sampled_ambiguity,
self._etas,
self._nbr_etas,
grids,
disparity_range,
)
disp, cv = self.allocate_confidence_map(self._indicator_max, risk_max, disp, cv)
disp, cv = self.allocate_confidence_map(self._indicator_min, risk_min, disp, cv)
disp, cv = self.allocate_confidence_map(self._indicator_disp_sup, disp_sup, disp, cv)
disp, cv = self.allocate_confidence_map(self._indicator_disp_inf, disp_inf, disp, cv)
if type_measure_max:
cv["cost_volume"].data *= -1
return disp, cv
@staticmethod
[docs]
def compute_risk(
cv: np.ndarray,
sampled_ambiguity: np.ndarray,
etas: np.ndarray,
nbr_etas: int,
grids: np.ndarray,
disparity_range: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Computes minimum and maximum risk.
Cost Volume must correspond to min similarity measure.
:param cv: cost volume
:type cv: 3D np.ndarray (row, col, disp)
:param sampled_ambiguity: sampled cost volume ambiguity
:type sampled_ambiguity: 3D np.ndarray (row, col, eta)
:param etas: range between eta_min and eta_max with step eta_step
:type etas: np.ndarray
:param nbr_etas: number of etas
:type nbr_etas: int
:param grids: array containing min and max disparity grids
:type grids: 2D np.ndarray (min, max)
:param disparity_range: array containing disparity range
:type disparity_range: np.ndarray
:return: the minimum and maximum risk
:rtype: Tuple(2D np.ndarray (row, col) dtype = float32, 2D np.ndarray (row, col) dtype = float32\
2D np.ndarray (row, col) dtype = float32, 2D np.ndarray (row, col) dtype = float32)
"""
return cost_volume_confidence_cpp.compute_risk_and_sampled_risk(
cv, sampled_ambiguity, etas, nbr_etas, grids, disparity_range, False
)
@staticmethod
[docs]
def compute_risk_and_sampled_risk(
cv: np.ndarray,
sampled_ambiguity: np.ndarray,
etas: np.ndarray,
nbr_etas: int,
grids: np.ndarray,
disparity_range: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Computes minimum and maximum risk and sampled_risk.
Cost Volume must correspond to min similarity measure.
:param cv: cost volume
:type cv: 3D np.ndarray (row, col, disp)
:param sampled_ambiguity: sampled cost volume ambiguity
:type sampled_ambiguity: 3D np.ndarray (row, col, eta)
:param etas: range between eta_min and eta_max with step eta_step
:type etas: np.ndarray
:param nbr_etas: nuber of etas
:type nbr_etas: int
:param grids: array containing min and max disparity grids
:type grids: 2D np.ndarray (min, max)
:param disparity_range: array containing disparity range
:type disparity_range: np.ndarray
:return: the risk
:rtype: Tuple(2D np.ndarray (row, col) dtype = float32, 2D np.ndarray (row, col) dtype = float32,
2D np.ndarray (row, col) dtype = float32, 2D np.ndarray (row, col) dtype = float32,
3D np.ndarray (row, col) dtype = float32, 3D np.ndarray (row, col) dtype = float32)
"""
return cost_volume_confidence_cpp.compute_risk_and_sampled_risk(
cv, sampled_ambiguity, etas, nbr_etas, grids, disparity_range, True
)