Source code for pandora.interval_tools

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2024 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of PANDORA
#
#     https://github.com/CNES/Pandora
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains functions associated to confidence intervals.
"""

import os
from ast import literal_eval
from typing import Tuple

import numpy as np
from numba import njit, prange


@njit("b1[:,:](i8[:,:], i8[:,:], i8)", parallel=literal_eval(os.environ.get("PANDORA_NUMBA_PARALLEL", "True")))
[docs] def create_connected_graph(border_left: np.ndarray, border_right: np.ndarray, depth: int) -> np.ndarray: """ Create a boolean connection matrix from segment coordinates :param border_left: array containing the coordinates of segments left border :type border_left: (n, 2) np.ndarray where n is the number of segments :param border_right: array containing the coordinates of segments right border :type border_right: (n, 2) np.ndarray where n is the number of segments :param depth: the depth for regularization. It corresponds to the number of rows to explore below and above. :return: A symmetric boolean matrix of shape (n, n). 1 indicating that the segment are connected :rtype: np.ndarray of shape (n, n) """ # border_left and border_right are already sorted by argwhere # we only need to create a connection graph by looking at neighboors from below n_segments = len(border_left) if depth == 0: aggregated_graph = np.eye(n_segments, dtype=np.bool_) else: connection_graph = np.full((n_segments, n_segments), False, dtype=np.bool_) for i in prange(n_segments): # pylint: disable=not-an-iterable row_i = border_left[i, 0] for k in range(i + 1, n_segments): if border_left[k, 0] == row_i: continue if border_left[k, 0] > row_i + 1: break if (border_left[k, 1] <= border_right[i, 1]) & (border_right[k, 1] >= border_left[i, 1]): connection_graph[i, k] = connection_graph[k, i] = True aggregated_graph = np.full((n_segments, n_segments), False, dtype=np.bool_) for i in prange(connection_graph.shape[0]): # pylint: disable=not-an-iterable list_lines = connection_graph[i, :].copy() for _ in range(1, depth): new_points = connection_graph[list_lines, :].copy() for j in prange(connection_graph.shape[0]): # pylint: disable=not-an-iterable list_lines[j] = np.bitwise_or(new_points[:, j].any(), list_lines[j]) aggregated_graph[i, :] = list_lines.copy() aggregated_graph[i, i] = 1 return aggregated_graph
@njit( "Tuple([f4[:,:],f4[:,:],b1[:,:]])(f4[:,:],f4[:,:],i8[:,:],i8[:,:],b1[:,:],f8)", parallel=literal_eval(os.environ.get("PANDORA_NUMBA_PARALLEL", "True")), )
[docs] def graph_regularization( interval_inf: np.ndarray, interval_sup: np.ndarray, border_left: np.ndarray, border_right: np.ndarray, connection_graph: np.ndarray, quantile: float, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Regularize the intervals based on quantiles and a given connection graph. :param interval_inf: The lower estimation of the disparity to regularize :type interval_inf: (row, col) np.ndarray :param interval_sup: The upper estimation of the disparity to regularize :type interval_sup: (row, col) np.ndarray :param border_left: array containing the coordinates of segments left border :type border_left: (n, 2) np.ndarray where n is the number of segments :param border_right: array containing the coordinates of segments right border :type border_right: (n, 2) np.ndarray where n is the number of segments :param connection graph: A matrix indicating if the segments (n in total) are connected :type connection graph: (n, n) boolean symmetric np.ndarray :param quantile: Which quantile to select for the regularized value :type quantile: float. 0 <= quantile <= 1 :return: The regularized inf and sup of the disparity, and a boolean mask indicating regularization :rtype: Tuple[np.ndarray, np.ndarray, np.ndarray] """ interval_inf_reg = interval_inf.copy() interval_sup_reg = interval_sup.copy() mask_regularization = np.full(interval_inf.shape, False, dtype=np.bool_) for i in prange(connection_graph.shape[0]): # pylint: disable=not-an-iterable left_i, right_i = border_left[connection_graph[i, :]], border_right[connection_graph[i, :]] # Contains the lengths of the segments n_pixels = np.hstack((np.array([0]), (right_i[:, 1] - left_i[:, 1] + 1).cumsum())) agg_inf = np.full(n_pixels[-1], 0, dtype=np.float32) agg_sup = np.full(n_pixels[-1], 0, dtype=np.float32) for j in range(len(n_pixels) - 1): agg_inf[n_pixels[j] : n_pixels[j + 1]] = interval_inf[left_i[j, 0], left_i[j, 1] : right_i[j, 1] + 1] agg_sup[n_pixels[j] : n_pixels[j + 1]] = interval_sup[left_i[j, 0], left_i[j, 1] : right_i[j, 1] + 1] interval_inf_reg[border_left[i, 0], border_left[i, 1] : border_right[i, 1] + 1] = np.nanquantile( agg_inf, 1 - quantile ) interval_sup_reg[border_left[i, 0], border_left[i, 1] : border_right[i, 1] + 1] = np.nanquantile( agg_sup, quantile ) mask_regularization[border_left[i, 0], border_left[i, 1] : border_right[i, 1] + 1] = True return interval_inf_reg, interval_sup_reg, mask_regularization
[docs] def interval_regularization( interval_inf: np.ndarray, interval_sup: np.ndarray, ambiguity: np.ndarray, ambiguity_threshold: float, ambiguity_kernel_size: int, vertical_depth: int = 0, quantile_regularization: float = 1.0, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Regularize interval bounds in ambiguous zones. :param interval_inf: lower bound of the confidence interval :type cv: 2D np.ndarray (row, col) :param interval_sup: upper bound of the confidence interval :type cv: 2D np.ndarray (row, col) :param ambiguity: ambiguity confidence map :type cv: 2D np.ndarray (row, col) :param ambiguity_threshold: threshold used for detecting ambiguous zones :type ambiguity_threshold: float :param ambiguity_kernel_size: number of columns for the minimitive kernel applied to ambiguity :type ambiguity_kernel_size: int :param vertical_depth: The number of lines above and below to look for adjacent segment during the regularization :type vertical_depth: int >= 0 :param quantile_regularization: The quantile used for selecting the disparity value in the regularization step :type quantile_regularization: float between 0 and 1 :return: the regularized infimum and supremum of the set containing the true disparity and the mask of pixel that have been regularized :rtype: Tuple(2D np.array (row, col) dtype = float32, 2D np.array (row, col) dtype = float32, 2D np.array (row, col) dtype = np.bool) """ n_row, _ = ambiguity.shape pad = ambiguity_kernel_size // 2 minimized_conf_from_amb = np.hstack((np.ones((n_row, pad)), ambiguity, np.ones((n_row, pad)))) # H-stacking is to conserve the same shape after view. # Because we take the minimum afterwards, it needs to be ones minimized_conf_from_amb = np.nanmin( np.lib.stride_tricks.sliding_window_view(minimized_conf_from_amb, ambiguity_kernel_size, axis=1), axis=-1, ) # Final column to 1 and H-stacking is to unsure that # we always start with a left border and end with a right border minimized_conf_from_amb[:, -1] = 1 border = np.diff( np.hstack([np.ones((minimized_conf_from_amb.shape[0], 1)), minimized_conf_from_amb >= ambiguity_threshold]), axis=-1, ) border_left = np.argwhere(border == -1) border_right = np.argwhere(border == 1) # If border[i,j]==1, then minimized_conf_from_amb[i,j]>=ambiguity_threshold and we keep i, j-1 border_right[:, 1] = border_right[:, 1] - 1 graph = create_connected_graph(border_left, border_right, vertical_depth) return graph_regularization(interval_inf, interval_sup, border_left, border_right, graph, quantile_regularization)