Source code for pandora.aggregation.cbca

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2024 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of PANDORA
#
#     https://github.com/CNES/Pandora
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains functions associated to the Cross Based Cost Aggregation (cbca) method.
"""

from typing import Dict, Union, Tuple, List
import os
from ast import literal_eval

import numpy as np
import xarray as xr
from json_checker import Checker, And
from numba import njit

from pandora.filter import AbstractFilter
from pandora.img_tools import shift_right_img
from . import aggregation


@aggregation.AbstractAggregation.register_subclass("cbca")
[docs] class CrossBasedCostAggregation(aggregation.AbstractAggregation): """ CrossBasedCostAggregation class, allows to perform the aggregation step """ # Default configuration, do not change these values
[docs] _CBCA_INTENSITY = 30.0
[docs] _CBCA_DISTANCE = 5
def __init__(self, **cfg: dict): """ :param cfg: optional configuration, {'cbca_intensity': value, 'cbca_distance': value} :type cfg: dict """
[docs] self.cfg = self.check_conf(**cfg) # type: ignore
[docs] self._cbca_intensity = self.cfg["cbca_intensity"]
[docs] self._cbca_distance = self.cfg["cbca_distance"]
[docs] def check_conf(self, **cfg: Union[str, float, int]) -> Dict[str, Union[str, float, int]]: """ Add default values to the dictionary if there are missing elements and check if the dictionary is correct :param cfg: aggregation configuration :type cfg: dict :return cfg: aggregation configuration updated :rtype: dict """ # Give the default value if the required element is not in the configuration if "cbca_intensity" not in cfg: cfg["cbca_intensity"] = self._CBCA_INTENSITY if "cbca_distance" not in cfg: cfg["cbca_distance"] = self._CBCA_DISTANCE schema = { "aggregation_method": And(str, lambda input: "cbca"), "cbca_intensity": And(float, lambda input: input > 0), "cbca_distance": And(int, lambda input: input > 0), } checker = Checker(schema) checker.validate(cfg) return cfg
[docs] def desc(self): """ Describes the aggregation method """ print("CrossBasedCostAggregation method")
[docs] def cost_volume_aggregation( self, img_left: xr.Dataset, img_right: xr.Dataset, cv: xr.Dataset, **cfg: Union[str, int] ) -> None: """ Aggregated the cost volume with Cross-Based Cost Aggregation, using the pipeline define in Zhang, K., Lu, J., & Lafruit, G. (2009). Cross-based local stereo matching using orthogonal integral images. IEEE transactions on circuits and systems for video technology, 19(7), 1073-1079. :param img_left: left Dataset image containing : - im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32 - disparity (optional): 3D (disp, row, col) xarray.DataArray float32 - msk (optional): 2D (row, col) xarray.DataArray int16 - classif (optional): 3D (band_classif, row, col) xarray.DataArray int16 - segm (optional): 2D (row, col) xarray.DataArray int16 :type img_left: xarray.Dataset :param img_right: right Dataset image containing : - im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32 - disparity (optional): 3D (disp, row, col) xarray.DataArray float32 - msk (optional): 2D (row, col) xarray.DataArray int16 - classif (optional): 3D (band_classif, row, col) xarray.DataArray int16 - segm (optional): 2D (row, col) xarray.DataArray int16 :type img_right: xarray.Dataset :param cv: cost volume dataset with the data variables: - cost_volume 3D xarray.DataArray (row, col, disp) - confidence_measure 3D xarray.DataArray (row, col, indicator) :type cv: xarray.Dataset :param cfg: images configuration containing the mask convention : valid_pixels, no_data :type cfg: dict :return: None """ cross_left, cross_right = self.computes_cross_supports(img_left, img_right, cv) offset = int(cv.attrs["offset_row_col"]) # Cost volume has input image size, if offset > 0 do not consider the marge if offset > 0: cv_data = cv["cost_volume"].data[offset:-offset, offset:-offset] else: cv_data = cv["cost_volume"].data n_col_, n_row_, nb_disp = cv_data.shape # Allocate the numpy aggregated cost volume cv = (disp, col, row), for efficient memory management agg = np.zeros((nb_disp, n_row_, n_col_), dtype=np.float32) # Add invalid costs (i.e = np.nan ) to the output aggregated cost volume (because the step 1 of cbca do not # propagate invalid pixels, we need to retrieve them at the end of aggregation ) # Much faster than : # id_nan = np.isnan(cv['cost_volume'].data) # compute the aggregation .. # cv['cost_volume'].data[id_nan] = np.nan agg += np.swapaxes(cv_data, 0, 2) agg *= 0 disparity_range = cv.coords["disp"].data range_col = np.arange(0, n_row_) for dsp in range(nb_disp): i_right = int((disparity_range[dsp] % 1) * cv.attrs["subpixel"]) # Step 1 : horizontal integral image step1 = cbca_step_1(cv_data[:, :, dsp]) range_col_right = range_col + disparity_range[dsp] valid_index = np.where((range_col_right >= 0) & (range_col_right < cross_right[i_right].shape[1])) # Step 2 : horizontal matching cost step2, sum2 = cbca_step_2( step1, cross_left, cross_right[i_right], range_col[valid_index], range_col_right[valid_index].astype(int), ) # Step 3 : vertical integral image step3 = cbca_step_3(step2) # Step 4 : aggregate cost volume step4, sum4 = cbca_step_4( step3, sum2, cross_left, cross_right[i_right], range_col[valid_index], range_col_right[valid_index].astype(int), ) # Added the pixel anchor pixel to the number of support pixels used during the aggregation sum4 += 1 # Add the aggregate cost to the output agg[dsp, :, :] += np.swapaxes(step4, 0, 1) # Normalize the aggregated cost agg[dsp, :, :] /= np.swapaxes(sum4, 0, 1) cv_data = np.swapaxes(agg, 0, 2) if offset > 0: cv["cost_volume"].data[offset:-offset, offset:-offset] = cv_data else: cv["cost_volume"].data = cv_data cv.attrs["aggregation"] = "cbca" # Maximal cost of the cost volume after agregation cmax = cv.attrs["cmax"] * ((self._cbca_distance * 2) - 1) ** 2 # type: ignore cv.attrs["cmax"] = cmax
[docs] def computes_cross_supports( self, img_left: xr.Dataset, img_right: xr.Dataset, cv: xr.Dataset ) -> Tuple[np.ndarray, List[np.ndarray]]: """ Prepare images and compute the cross support region of the left and right images. A 3x3 median filter is applied to the images before calculating the cross support region. :param img_left: left Dataset image containing : - im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32 - disparity (optional): 3D (disp, row, col) xarray.DataArray float32 - msk (optional): 2D (row, col) xarray.DataArray int16 - classif (optional): 3D (band_classif, row, col) xarray.DataArray int16 - segm (optional): 2D (row, col) xarray.DataArray int16 :type img_left: xarray.Dataset :param img_right: right Dataset image containing : - im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32 - disparity (optional): 3D (disp, row, col) xarray.DataArray float32 - msk (optional): 2D (row, col) xarray.DataArray int16 - classif (optional): 3D (band_classif, row, col) xarray.DataArray int16 - segm (optional): 2D (row, col) xarray.DataArray int16 :type img_right: xarray.Dataset :param cv: cost volume dataset with the data variables: - cost_volume 3D xarray.DataArray (row, col, disp) - confidence_measure 3D xarray.DataArray (row, col, indicator) :type cv: xarray.Dataset :return: the left and right cross support region :rtype: Tuples(left cross support region, List(right cross support region)) """ subpix = cv.attrs["subpixel"] offset = int(cv.attrs["offset_row_col"]) # shift the right image img_right_shift = shift_right_img(img_right, subpix) # Median filter on valid pixels filter_ = AbstractFilter(cfg={"filter_method": "median", "filter_size": 3}) # type: ignore # Invalid and no data pixels are masked with np.nan to avoid propagating the values with the median filter left_masked = np.copy(img_left["im"].data) if "msk" in img_left.data_vars: left_masked[np.where(img_left["msk"].data != img_left.attrs["valid_pixels"])] = np.nan left_masked = filter_.median_filter(left_masked) # type: ignore # Convert nan to inf to be able to use the comparison operators < and > in cross_support function np.nan_to_num(left_masked, copy=False, nan=np.inf) # Compute left cross support using numba to reduce running time if offset != 0: # Cross support to the size of the cost volume cross_left = cross_support( left_masked[offset:-offset, offset:-offset], self._cbca_distance, self._cbca_intensity, ) else: cross_left = cross_support(left_masked, self._cbca_distance, self._cbca_intensity) # Compute the right cross support. Apply a 3×3 median filter to the input image cross_right = [] for shift, img in enumerate(img_right_shift): # Invalid and nodata pixels are masked with np.nan to avoid propagating the values with the median filter right_masked = np.copy(img["im"].data) # Pixel precision if ("msk" in img_right.data_vars) and (shift == 0): right_masked[np.where(img_right["msk"].data != img_right.attrs["valid_pixels"])] = np.nan # Subpixel precision : computes the shifted right mask if ("msk" in img_right.data_vars) and (shift != 0): shift_mask = np.zeros(img_right["msk"].data.shape) shift_mask[np.where(img_right["msk"].data != img_right.attrs["valid_pixels"])] = np.nan # Since the interpolation of the right image is of order 1, the shifted right mask corresponds # to an aggregation of two columns of the right mask # Create a sliding window of shape 2 using as_strided function : this function create a new a view (by # manipulating data pointer)of the shift_mask array with a different shape. The new view pointing to the # same memory block as shift_mask so it does not consume any additional memory. ( # pylint: disable=unpacking-non-sequence str_row, str_col, ) = shift_mask.strides shape_windows = (shift_mask.shape[0], shift_mask.shape[1] - 1, 2) strides_windows = (str_row, str_col, str_col) aggregation_window = np.lib.stride_tricks.as_strided( shift_mask, shape_windows, strides_windows, writeable=False ) shift_mask = np.sum(aggregation_window, 2) right_masked += shift_mask # Apply a 3×3 median filter to the input image right_masked = filter_.median_filter(right_masked) # type: ignore # Convert nan to inf to be able to use the comparison operators < and > in cross_support function np.nan_to_num(right_masked, copy=False, nan=np.inf) # Compute right cross support using numba to reduce running time if offset != 0: # Cross support to the size of the cost volume cross_right.append( cross_support( right_masked[offset:-offset, offset:-offset], self._cbca_distance, self._cbca_intensity, ) ) else: cross_right.append(cross_support(right_masked, self._cbca_distance, self._cbca_intensity)) return cross_left, cross_right
@njit("f4[:, :](f4[:, :])", cache=literal_eval(os.environ.get("PANDORA_NUMBA_CACHE", "True")))
[docs] def cbca_step_1(cv: np.ndarray) -> np.ndarray: """ Giving the matching cost for one disparity, build a horizontal integral image storing the cumulative row sum, S_h(row, col) = S_h(row-1, col) + cv(row, col) :param cv: cost volume for the current disparity :type cv: 2D np.array (row, col) dtype = np.float32 :return: the horizontal integral image, step 1 :rtype: 2D np.array (row, col + 1) dtype = np.float32 """ n_col_, n_row_ = cv.shape # Allocate the intermediate cost volume S_h # added a column to manage the case in the step 2 : row - left_arm_length -1 = -1 step1 = np.zeros((n_col_, n_row_ + 1), dtype=np.float32) for col in range(n_col_): for row in range(n_row_): # Do not propagate nan if not np.isnan(cv[col, row]): step1[col, row] = step1[col, row - 1] + cv[col, row] else: step1[col, row] = step1[col, row - 1] return step1
@njit( "(f4[:, :], i2[:, :, :], i2[:, :, :], i8[:], i8[:])", cache=literal_eval(os.environ.get("PANDORA_NUMBA_CACHE", "True")), )
[docs] def cbca_step_2( step1: np.ndarray, cross_left: np.ndarray, cross_right: np.ndarray, range_col: np.ndarray, range_col_right: np.ndarray, ) -> Tuple[np.ndarray, np.ndarray]: """ Giving the horizontal integral image, computed the horizontal matching cost for one disparity, E_h(row, col) = S_h(row + right_arm_length, col) - S_h(row - left_arm_length -1, col) :param step1: horizontal integral image, from the cbca_step1, with an extra column that contains 0 :type step1: 2D np.array (row, col + 1) dtype = np.float32 :param cross_left: cross support of the left image :type cross_left: 3D np.array (row, col, [left, right, top, bot]) dtype=np.int16 :param cross_right: cross support of the right image :type cross_right: 3D np.array (row, col, [left, right, tpo, bot]) dtype=np.int16 :param range_col: left column for the current disparity (i.e : np.arrange(nb columns), where the correspondent \ in the right image is reachable) :type range_col: 1D np.array :param range_col_right: right column for the current disparity (i.e : np.arrange(nb columns) - disparity, where \ column - disparity >= 0 and <= nb columns) :type range_col_right: 1D np.array :return: the horizontal matching cost for the current disparity, and the number of support pixels used for the \ step 2 :rtype: tuple (2D np.array (row, col) dtype = np.float32, 2D np.array (row, col) dtype = np.float32) """ n_col_, n_row_ = step1.shape # Allocate the intermediate cost volume E_h # , remove the extra column from the step 1 step2 = np.zeros((n_col_, n_row_ - 1), dtype=np.float32) sum_step2 = np.zeros((n_col_, n_row_ - 1), dtype=np.float32) for col in range(step1.shape[0]): for row in range(range_col.shape[0]): right = min( cross_left[col, range_col[row], 1], cross_right[col, range_col_right[row], 1], ) left = min( cross_left[col, range_col[row], 0], cross_right[col, range_col_right[row], 0], ) step2[col, range_col[row]] = step1[col, range_col[row] + right] - step1[col, range_col[row] - left - 1] sum_step2[col, range_col[row]] += right + left return step2, sum_step2
@njit("f4[:, :](f4[:, :])", cache=literal_eval(os.environ.get("PANDORA_NUMBA_CACHE", "True")))
[docs] def cbca_step_3(step2: np.ndarray) -> np.ndarray: """ Giving the horizontal matching cost, build a vertical integral image for one disparity, S_v = S_v(row, col - 1) + E_h(row, col) :param step2: horizontal matching cost, from the cbca_step2 :type step2: 3D xarray.DataArray (row, col, disp) :return: the vertical integral image for the current disparity :rtype: 2D np.array (row + 1, col) dtype = np.float32 """ n_col_, n_row_ = step2.shape # Allocate the intermediate cost volume S_v # added a row to manage the case in the step 4 : col - up_arm_length -1 = -1 step3 = np.zeros((n_col_ + 1, n_row_), dtype=np.float32) step3[0, :] = step2[0, :] for col in range(1, n_col_): for row in range(n_row_): step3[col, row] = step3[col - 1, row] + step2[col, row] return step3
@njit( "(f4[:, :], f4[:, :], i2[:, :, :], i2[:, :, :], i8[:], i8[:])", cache=literal_eval(os.environ.get("PANDORA_NUMBA_CACHE", "True")), )
[docs] def cbca_step_4( step3: np.ndarray, sum2: np.ndarray, cross_left: np.ndarray, cross_right: np.ndarray, range_col: np.ndarray, range_col_right: np.ndarray, ) -> Tuple[np.ndarray, np.ndarray]: """ Giving the vertical integral image, build the fully aggregated matching cost for one disparity, E = S_v(row, col + bottom_arm_length) - S_v(row, col - top_arm_length - 1) :param step3: vertical integral image, from the cbca_step3, with an extra row that contains 0 :type step3: 2D np.array (row + 1, col) dtype = np.float32 :param sum2: the number of support pixels used for the step 2 :type sum2: 2D np.array (row, col) dtype = np.float32 :param cross_left: cross support of the left image :type cross_left: 3D np.array (row, col, [left, right, top, bot]) dtype=np.int16 :param cross_right: cross support of the right image :type cross_right: 3D np.array (row, col, [left, right, tpo, bot]) dtype=np.int16 :param range_col: left column for the current disparity (i.e : np.arrange(nb columns), where the correspondent \ in the right image is reachable) :type range_col: 1D np.array :param range_col_right: right column for the current disparity (i.e : np.arrange(nb columns) - disparity, where \ column - disparity >= 0 and <= nb columns) :type range_col_right: 1D np.array :return: the fully aggregated matching cost, and the total number of support pixels used for the aggregation :rtype: tuple(2D np.array (row , col) dtype = np.float32, 2D np.array (row , col) dtype = np.float32) """ n_col_, n_row_ = step3.shape # Allocate the final cost volume E # , remove the extra row from the step 3 step4 = np.zeros((n_col_ - 1, n_row_), dtype=np.float32) sum4 = np.copy(sum2) for col in range(step4.shape[0]): for row in range(range_col.shape[0]): top = min( cross_left[col, range_col[row], 2], cross_right[col, range_col_right[row], 2], ) bot = min( cross_left[col, range_col[row], 3], cross_right[col, range_col_right[row], 3], ) step4[col, range_col[row]] = step3[col + bot, range_col[row]] - step3[col - top - 1, range_col[row]] sum4[col, range_col[row]] += top + bot if top != 0: sum4[col, range_col[row]] += np.sum(sum2[col - top : col, range_col[row]]) if bot != 0: sum4[col, range_col[row]] += np.sum(sum2[col + 1 : col + bot + 1, range_col[row]]) return step4, sum4
@njit("i2[:, :, :](f4[:, :], i2, f4)", cache=literal_eval(os.environ.get("PANDORA_NUMBA_CACHE", "True")))
[docs] def cross_support(image: np.ndarray, len_arms: int, intensity: float) -> np.ndarray: """ Compute the cross support for an image: find the 4 arms. Enforces a minimum support region of 3×3 if pixels are valid. The cross support of invalid pixels (pixels that are np.inf) is 0 for the 4 arms. :param image: image :type image: 2D np.array (row , col) dtype = np.float32 :param len_arms: maximal length arms :param len_arms: int16 :param intensity: maximal intensity :param intensity: float 32 :return: a 3D np.array ( row, col, [left, right, top, bot] ), with the four arms lengths computes for each pixel :rtype: 3D np.array ( row, col, [left, right, top, bot] ), dtype=np.int16 """ n_col_, n_row_ = image.shape # By default, all cross supports are 0 cross = np.zeros((n_col_, n_row_, 4), dtype=np.int16) for col in range(n_col_): for row in range(n_row_): # If the pixel is valid (np.isfinite = True) compute the cross support # Else (np.isfinite = False) the pixel is not valid (no data or invalid) and the cross support value is 0 # for the 4 arms (default value of the variable cross). if np.isfinite(image[col, row]): left_len = 0 left = row for left in range(row - 1, max(row - len_arms, -1), -1): if abs(image[col, row] - image[col, left]) >= intensity: break left_len += 1 # enforces a minimum support region of 3×3 if pixels are valid cross[col, row, 0] = max(left_len, 1 * (row >= 1) * np.isfinite(image[col, left])) right_len = 0 right = row for right in range(row + 1, min(row + len_arms, n_row_)): if abs(image[col, row] - image[col, right]) >= intensity: break right_len += 1 # enforces a minimum support region of 3×3 if pixels are valid cross[col, row, 1] = max(right_len, 1 * (row < n_row_ - 1) * np.isfinite(image[col, right])) up_len = 0 up_col = col for up_col in range(col - 1, max(col - len_arms, -1), -1): if abs(image[col, row] - image[up_col, row]) >= intensity: break up_len += 1 # enforces a minimum support region of 3×3 if pixels are valid cross[col, row, 2] = max(up_len, 1 * (col >= 1) * np.isfinite(image[up_col, row])) bot_len = 0 bot = col for bot in range(col + 1, min(col + len_arms, n_col_)): if abs(image[col, row] - image[bot, row]) >= intensity: break bot_len += 1 # enforces a minimum support region of 3×3 if pixels are valid cross[col, row, 3] = max(bot_len, 1 * (col < n_col_ - 1) * np.isfinite(image[bot, row])) return cross