#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2024 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of PANDORA
#
# https://github.com/CNES/Pandora
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains functions associated to the Cross Based Cost Aggregation (cbca) method.
"""
from typing import Dict, Union, Tuple, List
import numpy as np
import xarray as xr
from json_checker import Checker, And
from numba import njit
from pandora.filter import AbstractFilter
from pandora.img_tools import shift_right_img
from . import aggregation
@aggregation.AbstractAggregation.register_subclass("cbca")
[docs]
class CrossBasedCostAggregation(aggregation.AbstractAggregation):
"""
CrossBasedCostAggregation class, allows to perform the aggregation step
"""
# Default configuration, do not change these values
def __init__(self, **cfg: dict):
"""
:param cfg: optional configuration, {'cbca_intensity': value, 'cbca_distance': value}
:type cfg: dict
"""
self.cfg = self.check_conf(**cfg) # type: ignore
self._cbca_intensity = self.cfg["cbca_intensity"]
self._cbca_distance = self.cfg["cbca_distance"]
[docs]
def check_conf(self, **cfg: Union[str, float, int]) -> Dict[str, Union[str, float, int]]:
"""
Add default values to the dictionary if there are missing elements and check if the dictionary is correct
:param cfg: aggregation configuration
:type cfg: dict
:return cfg: aggregation configuration updated
:rtype: dict
"""
# Give the default value if the required element is not in the configuration
if "cbca_intensity" not in cfg:
cfg["cbca_intensity"] = self._CBCA_INTENSITY
if "cbca_distance" not in cfg:
cfg["cbca_distance"] = self._CBCA_DISTANCE
schema = {
"aggregation_method": And(str, lambda input: "cbca"),
"cbca_intensity": And(float, lambda input: input > 0),
"cbca_distance": And(int, lambda input: input > 0),
}
checker = Checker(schema)
checker.validate(cfg)
return cfg
[docs]
def desc(self):
"""
Describes the aggregation method
"""
print("CrossBasedCostAggregation method")
[docs]
def cost_volume_aggregation(
self, img_left: xr.Dataset, img_right: xr.Dataset, cv: xr.Dataset, **cfg: Union[str, int]
) -> None:
"""
Aggregated the cost volume with Cross-Based Cost Aggregation, using the pipeline define in
Zhang, K., Lu, J., & Lafruit, G. (2009).
Cross-based local stereo matching using orthogonal integral images.
IEEE transactions on circuits and systems for video technology, 19(7), 1073-1079.
:param img_left: left Dataset image containing :
- im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32
- disparity (optional): 3D (disp, row, col) xarray.DataArray float32
- msk (optional): 2D (row, col) xarray.DataArray int16
- classif (optional): 3D (band_classif, row, col) xarray.DataArray int16
- segm (optional): 2D (row, col) xarray.DataArray int16
:type img_left: xarray.Dataset
:param img_right: right Dataset image containing :
- im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32
- disparity (optional): 3D (disp, row, col) xarray.DataArray float32
- msk (optional): 2D (row, col) xarray.DataArray int16
- classif (optional): 3D (band_classif, row, col) xarray.DataArray int16
- segm (optional): 2D (row, col) xarray.DataArray int16
:type img_right: xarray.Dataset
:param cv: cost volume dataset with the data variables:
- cost_volume 3D xarray.DataArray (row, col, disp)
- confidence_measure 3D xarray.DataArray (row, col, indicator)
:type cv: xarray.Dataset
:param cfg: images configuration containing the mask convention : valid_pixels, no_data
:type cfg: dict
:return: None
"""
cross_left, cross_right = self.computes_cross_supports(img_left, img_right, cv)
offset = int(cv.attrs["offset_row_col"])
# Cost volume has input image size, if offset > 0 do not consider the marge
if offset > 0:
cv_data = cv["cost_volume"].data[offset:-offset, offset:-offset]
else:
cv_data = cv["cost_volume"].data
n_col_, n_row_, nb_disp = cv_data.shape
# Allocate the numpy aggregated cost volume cv = (disp, col, row), for efficient memory management
agg = np.zeros((nb_disp, n_row_, n_col_), dtype=np.float32)
# Add invalid costs (i.e = np.nan ) to the output aggregated cost volume (because the step 1 of cbca do not
# propagate invalid pixels, we need to retrieve them at the end of aggregation )
# Much faster than :
# id_nan = np.isnan(cv['cost_volume'].data)
# compute the aggregation ..
# cv['cost_volume'].data[id_nan] = np.nan
agg += np.swapaxes(cv_data, 0, 2)
agg *= 0
disparity_range = cv.coords["disp"].data
range_col = np.arange(0, n_row_)
for dsp in range(nb_disp):
i_right = int((disparity_range[dsp] % 1) * cv.attrs["subpixel"])
# Step 1 : horizontal integral image
step1 = cbca_step_1(cv_data[:, :, dsp])
range_col_right = range_col + disparity_range[dsp]
valid_index = np.where((range_col_right >= 0) & (range_col_right < cross_right[i_right].shape[1]))
# Step 2 : horizontal matching cost
step2, sum2 = cbca_step_2(
step1,
cross_left,
cross_right[i_right],
range_col[valid_index],
range_col_right[valid_index].astype(int),
)
# Step 3 : vertical integral image
step3 = cbca_step_3(step2)
# Step 4 : aggregate cost volume
step4, sum4 = cbca_step_4(
step3,
sum2,
cross_left,
cross_right[i_right],
range_col[valid_index],
range_col_right[valid_index].astype(int),
)
# Added the pixel anchor pixel to the number of support pixels used during the aggregation
sum4 += 1
# Add the aggregate cost to the output
agg[dsp, :, :] += np.swapaxes(step4, 0, 1)
# Normalize the aggregated cost
agg[dsp, :, :] /= np.swapaxes(sum4, 0, 1)
cv_data = np.swapaxes(agg, 0, 2)
if offset > 0:
cv["cost_volume"].data[offset:-offset, offset:-offset] = cv_data
else:
cv["cost_volume"].data = cv_data
cv.attrs["aggregation"] = "cbca"
# Maximal cost of the cost volume after agregation
cmax = cv.attrs["cmax"] * ((self._cbca_distance * 2) - 1) ** 2 # type: ignore
cv.attrs["cmax"] = cmax
[docs]
def computes_cross_supports(
self, img_left: xr.Dataset, img_right: xr.Dataset, cv: xr.Dataset
) -> Tuple[np.ndarray, List[np.ndarray]]:
"""
Prepare images and compute the cross support region of the left and right images.
A 3x3 median filter is applied to the images before calculating the cross support region.
:param img_left: left Dataset image containing :
- im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32
- disparity (optional): 3D (disp, row, col) xarray.DataArray float32
- msk (optional): 2D (row, col) xarray.DataArray int16
- classif (optional): 3D (band_classif, row, col) xarray.DataArray int16
- segm (optional): 2D (row, col) xarray.DataArray int16
:type img_left: xarray.Dataset
:param img_right: right Dataset image containing :
- im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32
- disparity (optional): 3D (disp, row, col) xarray.DataArray float32
- msk (optional): 2D (row, col) xarray.DataArray int16
- classif (optional): 3D (band_classif, row, col) xarray.DataArray int16
- segm (optional): 2D (row, col) xarray.DataArray int16
:type img_right: xarray.Dataset
:param cv: cost volume dataset with the data variables:
- cost_volume 3D xarray.DataArray (row, col, disp)
- confidence_measure 3D xarray.DataArray (row, col, indicator)
:type cv: xarray.Dataset
:return: the left and right cross support region
:rtype: Tuples(left cross support region, List(right cross support region))
"""
subpix = cv.attrs["subpixel"]
offset = int(cv.attrs["offset_row_col"])
# shift the right image
img_right_shift = shift_right_img(img_right, subpix)
# Median filter on valid pixels
filter_ = AbstractFilter(cfg={"filter_method": "median", "filter_size": 3}) # type: ignore
# Invalid and no data pixels are masked with np.nan to avoid propagating the values with the median filter
left_masked = np.copy(img_left["im"].data)
if "msk" in img_left.data_vars:
left_masked[np.where(img_left["msk"].data != img_left.attrs["valid_pixels"])] = np.nan
left_masked = filter_.median_filter(left_masked) # type: ignore
# Convert nan to inf to be able to use the comparison operators < and > in cross_support function
np.nan_to_num(left_masked, copy=False, nan=np.inf)
# Compute left cross support using numba to reduce running time
if offset != 0:
# Cross support to the size of the cost volume
cross_left = cross_support(
left_masked[offset:-offset, offset:-offset],
self._cbca_distance,
self._cbca_intensity,
)
else:
cross_left = cross_support(left_masked, self._cbca_distance, self._cbca_intensity)
# Compute the right cross support. Apply a 3×3 median filter to the input image
cross_right = []
for shift, img in enumerate(img_right_shift):
# Invalid and nodata pixels are masked with np.nan to avoid propagating the values with the median filter
right_masked = np.copy(img["im"].data)
# Pixel precision
if ("msk" in img_right.data_vars) and (shift == 0):
right_masked[np.where(img_right["msk"].data != img_right.attrs["valid_pixels"])] = np.nan
# Subpixel precision : computes the shifted right mask
if ("msk" in img_right.data_vars) and (shift != 0):
shift_mask = np.zeros(img_right["msk"].data.shape)
shift_mask[np.where(img_right["msk"].data != img_right.attrs["valid_pixels"])] = np.nan
# Since the interpolation of the right image is of order 1, the shifted right mask corresponds
# to an aggregation of two columns of the right mask
# Create a sliding window of shape 2 using as_strided function : this function create a new a view (by
# manipulating data pointer)of the shift_mask array with a different shape. The new view pointing to the
# same memory block as shift_mask so it does not consume any additional memory.
( # pylint: disable=unpacking-non-sequence
str_row,
str_col,
) = shift_mask.strides
shape_windows = (shift_mask.shape[0], shift_mask.shape[1] - 1, 2)
strides_windows = (str_row, str_col, str_col)
aggregation_window = np.lib.stride_tricks.as_strided(
shift_mask, shape_windows, strides_windows, writeable=False
)
shift_mask = np.sum(aggregation_window, 2)
right_masked += shift_mask
# Apply a 3×3 median filter to the input image
right_masked = filter_.median_filter(right_masked) # type: ignore
# Convert nan to inf to be able to use the comparison operators < and > in cross_support function
np.nan_to_num(right_masked, copy=False, nan=np.inf)
# Compute right cross support using numba to reduce running time
if offset != 0:
# Cross support to the size of the cost volume
cross_right.append(
cross_support(
right_masked[offset:-offset, offset:-offset],
self._cbca_distance,
self._cbca_intensity,
)
)
else:
cross_right.append(cross_support(right_masked, self._cbca_distance, self._cbca_intensity))
return cross_left, cross_right
@njit("f4[:, :](f4[:, :])", cache=True)
[docs]
def cbca_step_1(cv: np.ndarray) -> np.ndarray:
"""
Giving the matching cost for one disparity, build a horizontal integral image storing the cumulative row sum,
S_h(row, col) = S_h(row-1, col) + cv(row, col)
:param cv: cost volume for the current disparity
:type cv: 2D np.array (row, col) dtype = np.float32
:return: the horizontal integral image, step 1
:rtype: 2D np.array (row, col + 1) dtype = np.float32
"""
n_col_, n_row_ = cv.shape
# Allocate the intermediate cost volume S_h
# added a column to manage the case in the step 2 : row - left_arm_length -1 = -1
step1 = np.zeros((n_col_, n_row_ + 1), dtype=np.float32)
for col in range(n_col_):
for row in range(n_row_):
# Do not propagate nan
if not np.isnan(cv[col, row]):
step1[col, row] = step1[col, row - 1] + cv[col, row]
else:
step1[col, row] = step1[col, row - 1]
return step1
@njit("(f4[:, :], i2[:, :, :], i2[:, :, :], i8[:], i8[:])", cache=True)
[docs]
def cbca_step_2(
step1: np.ndarray,
cross_left: np.ndarray,
cross_right: np.ndarray,
range_col: np.ndarray,
range_col_right: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Giving the horizontal integral image, computed the horizontal matching cost for one disparity,
E_h(row, col) = S_h(row + right_arm_length, col) - S_h(row - left_arm_length -1, col)
:param step1: horizontal integral image, from the cbca_step1, with an extra column that contains 0
:type step1: 2D np.array (row, col + 1) dtype = np.float32
:param cross_left: cross support of the left image
:type cross_left: 3D np.array (row, col, [left, right, top, bot]) dtype=np.int16
:param cross_right: cross support of the right image
:type cross_right: 3D np.array (row, col, [left, right, tpo, bot]) dtype=np.int16
:param range_col: left column for the current disparity (i.e : np.arrange(nb columns), where the correspondent \
in the right image is reachable)
:type range_col: 1D np.array
:param range_col_right: right column for the current disparity (i.e : np.arrange(nb columns) - disparity, where \
column - disparity >= 0 and <= nb columns)
:type range_col_right: 1D np.array
:return: the horizontal matching cost for the current disparity, and the number of support pixels used for the \
step 2
:rtype: tuple (2D np.array (row, col) dtype = np.float32, 2D np.array (row, col) dtype = np.float32)
"""
n_col_, n_row_ = step1.shape
# Allocate the intermediate cost volume E_h
# , remove the extra column from the step 1
step2 = np.zeros((n_col_, n_row_ - 1), dtype=np.float32)
sum_step2 = np.zeros((n_col_, n_row_ - 1), dtype=np.float32)
for col in range(step1.shape[0]):
for row in range(range_col.shape[0]):
right = min(
cross_left[col, range_col[row], 1],
cross_right[col, range_col_right[row], 1],
)
left = min(
cross_left[col, range_col[row], 0],
cross_right[col, range_col_right[row], 0],
)
step2[col, range_col[row]] = step1[col, range_col[row] + right] - step1[col, range_col[row] - left - 1]
sum_step2[col, range_col[row]] += right + left
return step2, sum_step2
@njit("f4[:, :](f4[:, :])", cache=True)
[docs]
def cbca_step_3(step2: np.ndarray) -> np.ndarray:
"""
Giving the horizontal matching cost, build a vertical integral image for one disparity,
S_v = S_v(row, col - 1) + E_h(row, col)
:param step2: horizontal matching cost, from the cbca_step2
:type step2: 3D xarray.DataArray (row, col, disp)
:return: the vertical integral image for the current disparity
:rtype: 2D np.array (row + 1, col) dtype = np.float32
"""
n_col_, n_row_ = step2.shape
# Allocate the intermediate cost volume S_v
# added a row to manage the case in the step 4 : col - up_arm_length -1 = -1
step3 = np.zeros((n_col_ + 1, n_row_), dtype=np.float32)
step3[0, :] = step2[0, :]
for col in range(1, n_col_):
for row in range(n_row_):
step3[col, row] = step3[col - 1, row] + step2[col, row]
return step3
@njit("(f4[:, :], f4[:, :], i2[:, :, :], i2[:, :, :], i8[:], i8[:])", cache=True)
[docs]
def cbca_step_4(
step3: np.ndarray,
sum2: np.ndarray,
cross_left: np.ndarray,
cross_right: np.ndarray,
range_col: np.ndarray,
range_col_right: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Giving the vertical integral image, build the fully aggregated matching cost for one disparity,
E = S_v(row, col + bottom_arm_length) - S_v(row, col - top_arm_length - 1)
:param step3: vertical integral image, from the cbca_step3, with an extra row that contains 0
:type step3: 2D np.array (row + 1, col) dtype = np.float32
:param sum2: the number of support pixels used for the step 2
:type sum2: 2D np.array (row, col) dtype = np.float32
:param cross_left: cross support of the left image
:type cross_left: 3D np.array (row, col, [left, right, top, bot]) dtype=np.int16
:param cross_right: cross support of the right image
:type cross_right: 3D np.array (row, col, [left, right, tpo, bot]) dtype=np.int16
:param range_col: left column for the current disparity (i.e : np.arrange(nb columns), where the correspondent \
in the right image is reachable)
:type range_col: 1D np.array
:param range_col_right: right column for the current disparity (i.e : np.arrange(nb columns) - disparity, where \
column - disparity >= 0 and <= nb columns)
:type range_col_right: 1D np.array
:return: the fully aggregated matching cost, and the total number of support pixels used for the aggregation
:rtype: tuple(2D np.array (row , col) dtype = np.float32, 2D np.array (row , col) dtype = np.float32)
"""
n_col_, n_row_ = step3.shape
# Allocate the final cost volume E
# , remove the extra row from the step 3
step4 = np.zeros((n_col_ - 1, n_row_), dtype=np.float32)
sum4 = np.copy(sum2)
for col in range(step4.shape[0]):
for row in range(range_col.shape[0]):
top = min(
cross_left[col, range_col[row], 2],
cross_right[col, range_col_right[row], 2],
)
bot = min(
cross_left[col, range_col[row], 3],
cross_right[col, range_col_right[row], 3],
)
step4[col, range_col[row]] = step3[col + bot, range_col[row]] - step3[col - top - 1, range_col[row]]
sum4[col, range_col[row]] += top + bot
if top != 0:
sum4[col, range_col[row]] += np.sum(sum2[col - top : col, range_col[row]])
if bot != 0:
sum4[col, range_col[row]] += np.sum(sum2[col + 1 : col + bot + 1, range_col[row]])
return step4, sum4
@njit("i2[:, :, :](f4[:, :], i2, f4)", cache=True)
[docs]
def cross_support(image: np.ndarray, len_arms: int, intensity: float) -> np.ndarray:
"""
Compute the cross support for an image: find the 4 arms.
Enforces a minimum support region of 3×3 if pixels are valid.
The cross support of invalid pixels (pixels that are np.inf) is 0 for the 4 arms.
:param image: image
:type image: 2D np.array (row , col) dtype = np.float32
:param len_arms: maximal length arms
:param len_arms: int16
:param intensity: maximal intensity
:param intensity: float 32
:return: a 3D np.array ( row, col, [left, right, top, bot] ), with the four arms lengths computes for each pixel
:rtype: 3D np.array ( row, col, [left, right, top, bot] ), dtype=np.int16
"""
n_col_, n_row_ = image.shape
# By default, all cross supports are 0
cross = np.zeros((n_col_, n_row_, 4), dtype=np.int16)
for col in range(n_col_):
for row in range(n_row_):
# If the pixel is valid (np.isfinite = True) compute the cross support
# Else (np.isfinite = False) the pixel is not valid (no data or invalid) and the cross support value is 0
# for the 4 arms (default value of the variable cross).
if np.isfinite(image[col, row]):
left_len = 0
left = row
for left in range(row - 1, max(row - len_arms, -1), -1):
if abs(image[col, row] - image[col, left]) >= intensity:
break
left_len += 1
# enforces a minimum support region of 3×3 if pixels are valid
cross[col, row, 0] = max(left_len, 1 * (row >= 1) * np.isfinite(image[col, left]))
right_len = 0
right = row
for right in range(row + 1, min(row + len_arms, n_row_)):
if abs(image[col, row] - image[col, right]) >= intensity:
break
right_len += 1
# enforces a minimum support region of 3×3 if pixels are valid
cross[col, row, 1] = max(right_len, 1 * (row < n_row_ - 1) * np.isfinite(image[col, right]))
up_len = 0
up_col = col
for up_col in range(col - 1, max(col - len_arms, -1), -1):
if abs(image[col, row] - image[up_col, row]) >= intensity:
break
up_len += 1
# enforces a minimum support region of 3×3 if pixels are valid
cross[col, row, 2] = max(up_len, 1 * (col >= 1) * np.isfinite(image[up_col, row]))
bot_len = 0
bot = col
for bot in range(col + 1, min(col + len_arms, n_col_)):
if abs(image[col, row] - image[bot, row]) >= intensity:
break
bot_len += 1
# enforces a minimum support region of 3×3 if pixels are valid
cross[col, row, 3] = max(bot_len, 1 * (col < n_col_ - 1) * np.isfinite(image[bot, row]))
return cross