#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2024 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of PANDORA
#
# https://github.com/CNES/Pandora
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains functions associated to the Cross Based Cost Aggregation (cbca) method.
"""
from typing import Dict, Union, Tuple, List
import os
from ast import literal_eval
import numpy as np
import xarray as xr
from json_checker import Checker, And
from numba import njit
from pandora.filter import AbstractFilter
from pandora.img_tools import shift_right_img
from . import aggregation
@aggregation.AbstractAggregation.register_subclass("cbca")
[docs]
class CrossBasedCostAggregation(aggregation.AbstractAggregation):
"""
CrossBasedCostAggregation class, allows to perform the aggregation step
"""
# Default configuration, do not change these values
def __init__(self, **cfg: dict):
"""
:param cfg: optional configuration, {'cbca_intensity': value, 'cbca_distance': value}
:type cfg: dict
"""
[docs]
self.cfg = self.check_conf(**cfg) # type: ignore
[docs]
self._cbca_intensity = self.cfg["cbca_intensity"]
[docs]
self._cbca_distance = self.cfg["cbca_distance"]
[docs]
def check_conf(self, **cfg: Union[str, float, int]) -> Dict[str, Union[str, float, int]]:
"""
Add default values to the dictionary if there are missing elements and check if the dictionary is correct
:param cfg: aggregation configuration
:type cfg: dict
:return cfg: aggregation configuration updated
:rtype: dict
"""
# Give the default value if the required element is not in the configuration
if "cbca_intensity" not in cfg:
cfg["cbca_intensity"] = self._CBCA_INTENSITY
if "cbca_distance" not in cfg:
cfg["cbca_distance"] = self._CBCA_DISTANCE
schema = {
"aggregation_method": And(str, lambda input: "cbca"),
"cbca_intensity": And(float, lambda input: input > 0),
"cbca_distance": And(int, lambda input: input > 0),
}
checker = Checker(schema)
checker.validate(cfg)
return cfg
[docs]
def desc(self):
"""
Describes the aggregation method
"""
print("CrossBasedCostAggregation method")
[docs]
def cost_volume_aggregation(
self, img_left: xr.Dataset, img_right: xr.Dataset, cv: xr.Dataset, **cfg: Union[str, int]
) -> None:
"""
Aggregated the cost volume with Cross-Based Cost Aggregation, using the pipeline define in
Zhang, K., Lu, J., & Lafruit, G. (2009).
Cross-based local stereo matching using orthogonal integral images.
IEEE transactions on circuits and systems for video technology, 19(7), 1073-1079.
:param img_left: left Dataset image containing :
- im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32
- disparity (optional): 3D (disp, row, col) xarray.DataArray float32
- msk (optional): 2D (row, col) xarray.DataArray int16
- classif (optional): 3D (band_classif, row, col) xarray.DataArray int16
- segm (optional): 2D (row, col) xarray.DataArray int16
:type img_left: xarray.Dataset
:param img_right: right Dataset image containing :
- im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32
- disparity (optional): 3D (disp, row, col) xarray.DataArray float32
- msk (optional): 2D (row, col) xarray.DataArray int16
- classif (optional): 3D (band_classif, row, col) xarray.DataArray int16
- segm (optional): 2D (row, col) xarray.DataArray int16
:type img_right: xarray.Dataset
:param cv: cost volume dataset with the data variables:
- cost_volume 3D xarray.DataArray (row, col, disp)
- confidence_measure 3D xarray.DataArray (row, col, indicator)
:type cv: xarray.Dataset
:param cfg: images configuration containing the mask convention : valid_pixels, no_data
:type cfg: dict
:return: None
"""
cross_left, cross_right = self.computes_cross_supports(img_left, img_right, cv)
offset = int(cv.attrs["offset_row_col"])
# Cost volume has input image size, if offset > 0 do not consider the marge
if offset > 0:
cv_data = cv["cost_volume"].data[offset:-offset, offset:-offset]
else:
cv_data = cv["cost_volume"].data
n_col_, n_row_, nb_disp = cv_data.shape
# Allocate the numpy aggregated cost volume cv = (disp, col, row), for efficient memory management
agg = np.zeros((nb_disp, n_row_, n_col_), dtype=np.float32)
# Add invalid costs (i.e = np.nan ) to the output aggregated cost volume (because the step 1 of cbca do not
# propagate invalid pixels, we need to retrieve them at the end of aggregation )
# Much faster than :
# id_nan = np.isnan(cv['cost_volume'].data)
# compute the aggregation ..
# cv['cost_volume'].data[id_nan] = np.nan
agg += np.swapaxes(cv_data, 0, 2)
agg *= 0
disparity_range = cv.coords["disp"].data
range_col = np.arange(0, n_row_)
for dsp in range(nb_disp):
i_right = int((disparity_range[dsp] % 1) * cv.attrs["subpixel"])
# Step 1 : horizontal integral image
step1 = cbca_step_1(cv_data[:, :, dsp])
range_col_right = range_col + disparity_range[dsp]
valid_index = np.where((range_col_right >= 0) & (range_col_right < cross_right[i_right].shape[1]))
# Step 2 : horizontal matching cost
step2, sum2 = cbca_step_2(
step1,
cross_left,
cross_right[i_right],
range_col[valid_index],
range_col_right[valid_index].astype(int),
)
# Step 3 : vertical integral image
step3 = cbca_step_3(step2)
# Step 4 : aggregate cost volume
step4, sum4 = cbca_step_4(
step3,
sum2,
cross_left,
cross_right[i_right],
range_col[valid_index],
range_col_right[valid_index].astype(int),
)
# Added the pixel anchor pixel to the number of support pixels used during the aggregation
sum4 += 1
# Add the aggregate cost to the output
agg[dsp, :, :] += np.swapaxes(step4, 0, 1)
# Normalize the aggregated cost
agg[dsp, :, :] /= np.swapaxes(sum4, 0, 1)
cv_data = np.swapaxes(agg, 0, 2)
if offset > 0:
cv["cost_volume"].data[offset:-offset, offset:-offset] = cv_data
else:
cv["cost_volume"].data = cv_data
cv.attrs["aggregation"] = "cbca"
# Maximal cost of the cost volume after agregation
cmax = cv.attrs["cmax"] * ((self._cbca_distance * 2) - 1) ** 2 # type: ignore
cv.attrs["cmax"] = cmax
[docs]
def computes_cross_supports(
self, img_left: xr.Dataset, img_right: xr.Dataset, cv: xr.Dataset
) -> Tuple[np.ndarray, List[np.ndarray]]:
"""
Prepare images and compute the cross support region of the left and right images.
A 3x3 median filter is applied to the images before calculating the cross support region.
:param img_left: left Dataset image containing :
- im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32
- disparity (optional): 3D (disp, row, col) xarray.DataArray float32
- msk (optional): 2D (row, col) xarray.DataArray int16
- classif (optional): 3D (band_classif, row, col) xarray.DataArray int16
- segm (optional): 2D (row, col) xarray.DataArray int16
:type img_left: xarray.Dataset
:param img_right: right Dataset image containing :
- im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32
- disparity (optional): 3D (disp, row, col) xarray.DataArray float32
- msk (optional): 2D (row, col) xarray.DataArray int16
- classif (optional): 3D (band_classif, row, col) xarray.DataArray int16
- segm (optional): 2D (row, col) xarray.DataArray int16
:type img_right: xarray.Dataset
:param cv: cost volume dataset with the data variables:
- cost_volume 3D xarray.DataArray (row, col, disp)
- confidence_measure 3D xarray.DataArray (row, col, indicator)
:type cv: xarray.Dataset
:return: the left and right cross support region
:rtype: Tuples(left cross support region, List(right cross support region))
"""
subpix = cv.attrs["subpixel"]
offset = int(cv.attrs["offset_row_col"])
# shift the right image
img_right_shift = shift_right_img(img_right, subpix)
# Median filter on valid pixels
filter_ = AbstractFilter(cfg={"filter_method": "median", "filter_size": 3}) # type: ignore
# Invalid and no data pixels are masked with np.nan to avoid propagating the values with the median filter
left_masked = np.copy(img_left["im"].data)
if "msk" in img_left.data_vars:
left_masked[np.where(img_left["msk"].data != img_left.attrs["valid_pixels"])] = np.nan
left_masked = filter_.median_filter(left_masked) # type: ignore
# Convert nan to inf to be able to use the comparison operators < and > in cross_support function
np.nan_to_num(left_masked, copy=False, nan=np.inf)
# Compute left cross support using numba to reduce running time
if offset != 0:
# Cross support to the size of the cost volume
cross_left = cross_support(
left_masked[offset:-offset, offset:-offset],
self._cbca_distance,
self._cbca_intensity,
)
else:
cross_left = cross_support(left_masked, self._cbca_distance, self._cbca_intensity)
# Compute the right cross support. Apply a 3×3 median filter to the input image
cross_right = []
for shift, img in enumerate(img_right_shift):
# Invalid and nodata pixels are masked with np.nan to avoid propagating the values with the median filter
right_masked = np.copy(img["im"].data)
# Pixel precision
if ("msk" in img_right.data_vars) and (shift == 0):
right_masked[np.where(img_right["msk"].data != img_right.attrs["valid_pixels"])] = np.nan
# Subpixel precision : computes the shifted right mask
if ("msk" in img_right.data_vars) and (shift != 0):
shift_mask = np.zeros(img_right["msk"].data.shape)
shift_mask[np.where(img_right["msk"].data != img_right.attrs["valid_pixels"])] = np.nan
# Since the interpolation of the right image is of order 1, the shifted right mask corresponds
# to an aggregation of two columns of the right mask
# Create a sliding window of shape 2 using as_strided function : this function create a new a view (by
# manipulating data pointer)of the shift_mask array with a different shape. The new view pointing to the
# same memory block as shift_mask so it does not consume any additional memory.
( # pylint: disable=unpacking-non-sequence
str_row,
str_col,
) = shift_mask.strides
shape_windows = (shift_mask.shape[0], shift_mask.shape[1] - 1, 2)
strides_windows = (str_row, str_col, str_col)
aggregation_window = np.lib.stride_tricks.as_strided(
shift_mask, shape_windows, strides_windows, writeable=False
)
shift_mask = np.sum(aggregation_window, 2)
right_masked += shift_mask
# Apply a 3×3 median filter to the input image
right_masked = filter_.median_filter(right_masked) # type: ignore
# Convert nan to inf to be able to use the comparison operators < and > in cross_support function
np.nan_to_num(right_masked, copy=False, nan=np.inf)
# Compute right cross support using numba to reduce running time
if offset != 0:
# Cross support to the size of the cost volume
cross_right.append(
cross_support(
right_masked[offset:-offset, offset:-offset],
self._cbca_distance,
self._cbca_intensity,
)
)
else:
cross_right.append(cross_support(right_masked, self._cbca_distance, self._cbca_intensity))
return cross_left, cross_right
@njit("f4[:, :](f4[:, :])", cache=literal_eval(os.environ.get("PANDORA_NUMBA_CACHE", "True")))
[docs]
def cbca_step_1(cv: np.ndarray) -> np.ndarray:
"""
Giving the matching cost for one disparity, build a horizontal integral image storing the cumulative row sum,
S_h(row, col) = S_h(row-1, col) + cv(row, col)
:param cv: cost volume for the current disparity
:type cv: 2D np.array (row, col) dtype = np.float32
:return: the horizontal integral image, step 1
:rtype: 2D np.array (row, col + 1) dtype = np.float32
"""
n_col_, n_row_ = cv.shape
# Allocate the intermediate cost volume S_h
# added a column to manage the case in the step 2 : row - left_arm_length -1 = -1
step1 = np.zeros((n_col_, n_row_ + 1), dtype=np.float32)
for col in range(n_col_):
for row in range(n_row_):
# Do not propagate nan
if not np.isnan(cv[col, row]):
step1[col, row] = step1[col, row - 1] + cv[col, row]
else:
step1[col, row] = step1[col, row - 1]
return step1
@njit(
"(f4[:, :], i2[:, :, :], i2[:, :, :], i8[:], i8[:])",
cache=literal_eval(os.environ.get("PANDORA_NUMBA_CACHE", "True")),
)
[docs]
def cbca_step_2(
step1: np.ndarray,
cross_left: np.ndarray,
cross_right: np.ndarray,
range_col: np.ndarray,
range_col_right: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Giving the horizontal integral image, computed the horizontal matching cost for one disparity,
E_h(row, col) = S_h(row + right_arm_length, col) - S_h(row - left_arm_length -1, col)
:param step1: horizontal integral image, from the cbca_step1, with an extra column that contains 0
:type step1: 2D np.array (row, col + 1) dtype = np.float32
:param cross_left: cross support of the left image
:type cross_left: 3D np.array (row, col, [left, right, top, bot]) dtype=np.int16
:param cross_right: cross support of the right image
:type cross_right: 3D np.array (row, col, [left, right, tpo, bot]) dtype=np.int16
:param range_col: left column for the current disparity (i.e : np.arrange(nb columns), where the correspondent \
in the right image is reachable)
:type range_col: 1D np.array
:param range_col_right: right column for the current disparity (i.e : np.arrange(nb columns) - disparity, where \
column - disparity >= 0 and <= nb columns)
:type range_col_right: 1D np.array
:return: the horizontal matching cost for the current disparity, and the number of support pixels used for the \
step 2
:rtype: tuple (2D np.array (row, col) dtype = np.float32, 2D np.array (row, col) dtype = np.float32)
"""
n_col_, n_row_ = step1.shape
# Allocate the intermediate cost volume E_h
# , remove the extra column from the step 1
step2 = np.zeros((n_col_, n_row_ - 1), dtype=np.float32)
sum_step2 = np.zeros((n_col_, n_row_ - 1), dtype=np.float32)
for col in range(step1.shape[0]):
for row in range(range_col.shape[0]):
right = min(
cross_left[col, range_col[row], 1],
cross_right[col, range_col_right[row], 1],
)
left = min(
cross_left[col, range_col[row], 0],
cross_right[col, range_col_right[row], 0],
)
step2[col, range_col[row]] = step1[col, range_col[row] + right] - step1[col, range_col[row] - left - 1]
sum_step2[col, range_col[row]] += right + left
return step2, sum_step2
@njit("f4[:, :](f4[:, :])", cache=literal_eval(os.environ.get("PANDORA_NUMBA_CACHE", "True")))
[docs]
def cbca_step_3(step2: np.ndarray) -> np.ndarray:
"""
Giving the horizontal matching cost, build a vertical integral image for one disparity,
S_v = S_v(row, col - 1) + E_h(row, col)
:param step2: horizontal matching cost, from the cbca_step2
:type step2: 3D xarray.DataArray (row, col, disp)
:return: the vertical integral image for the current disparity
:rtype: 2D np.array (row + 1, col) dtype = np.float32
"""
n_col_, n_row_ = step2.shape
# Allocate the intermediate cost volume S_v
# added a row to manage the case in the step 4 : col - up_arm_length -1 = -1
step3 = np.zeros((n_col_ + 1, n_row_), dtype=np.float32)
step3[0, :] = step2[0, :]
for col in range(1, n_col_):
for row in range(n_row_):
step3[col, row] = step3[col - 1, row] + step2[col, row]
return step3
@njit(
"(f4[:, :], f4[:, :], i2[:, :, :], i2[:, :, :], i8[:], i8[:])",
cache=literal_eval(os.environ.get("PANDORA_NUMBA_CACHE", "True")),
)
[docs]
def cbca_step_4(
step3: np.ndarray,
sum2: np.ndarray,
cross_left: np.ndarray,
cross_right: np.ndarray,
range_col: np.ndarray,
range_col_right: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Giving the vertical integral image, build the fully aggregated matching cost for one disparity,
E = S_v(row, col + bottom_arm_length) - S_v(row, col - top_arm_length - 1)
:param step3: vertical integral image, from the cbca_step3, with an extra row that contains 0
:type step3: 2D np.array (row + 1, col) dtype = np.float32
:param sum2: the number of support pixels used for the step 2
:type sum2: 2D np.array (row, col) dtype = np.float32
:param cross_left: cross support of the left image
:type cross_left: 3D np.array (row, col, [left, right, top, bot]) dtype=np.int16
:param cross_right: cross support of the right image
:type cross_right: 3D np.array (row, col, [left, right, tpo, bot]) dtype=np.int16
:param range_col: left column for the current disparity (i.e : np.arrange(nb columns), where the correspondent \
in the right image is reachable)
:type range_col: 1D np.array
:param range_col_right: right column for the current disparity (i.e : np.arrange(nb columns) - disparity, where \
column - disparity >= 0 and <= nb columns)
:type range_col_right: 1D np.array
:return: the fully aggregated matching cost, and the total number of support pixels used for the aggregation
:rtype: tuple(2D np.array (row , col) dtype = np.float32, 2D np.array (row , col) dtype = np.float32)
"""
n_col_, n_row_ = step3.shape
# Allocate the final cost volume E
# , remove the extra row from the step 3
step4 = np.zeros((n_col_ - 1, n_row_), dtype=np.float32)
sum4 = np.copy(sum2)
for col in range(step4.shape[0]):
for row in range(range_col.shape[0]):
top = min(
cross_left[col, range_col[row], 2],
cross_right[col, range_col_right[row], 2],
)
bot = min(
cross_left[col, range_col[row], 3],
cross_right[col, range_col_right[row], 3],
)
step4[col, range_col[row]] = step3[col + bot, range_col[row]] - step3[col - top - 1, range_col[row]]
sum4[col, range_col[row]] += top + bot
if top != 0:
sum4[col, range_col[row]] += np.sum(sum2[col - top : col, range_col[row]])
if bot != 0:
sum4[col, range_col[row]] += np.sum(sum2[col + 1 : col + bot + 1, range_col[row]])
return step4, sum4
@njit("i2[:, :, :](f4[:, :], i2, f4)", cache=literal_eval(os.environ.get("PANDORA_NUMBA_CACHE", "True")))
[docs]
def cross_support(image: np.ndarray, len_arms: int, intensity: float) -> np.ndarray:
"""
Compute the cross support for an image: find the 4 arms.
Enforces a minimum support region of 3×3 if pixels are valid.
The cross support of invalid pixels (pixels that are np.inf) is 0 for the 4 arms.
:param image: image
:type image: 2D np.array (row , col) dtype = np.float32
:param len_arms: maximal length arms
:param len_arms: int16
:param intensity: maximal intensity
:param intensity: float 32
:return: a 3D np.array ( row, col, [left, right, top, bot] ), with the four arms lengths computes for each pixel
:rtype: 3D np.array ( row, col, [left, right, top, bot] ), dtype=np.int16
"""
n_col_, n_row_ = image.shape
# By default, all cross supports are 0
cross = np.zeros((n_col_, n_row_, 4), dtype=np.int16)
for col in range(n_col_):
for row in range(n_row_):
# If the pixel is valid (np.isfinite = True) compute the cross support
# Else (np.isfinite = False) the pixel is not valid (no data or invalid) and the cross support value is 0
# for the 4 arms (default value of the variable cross).
if np.isfinite(image[col, row]):
left_len = 0
left = row
for left in range(row - 1, max(row - len_arms, -1), -1):
if abs(image[col, row] - image[col, left]) >= intensity:
break
left_len += 1
# enforces a minimum support region of 3×3 if pixels are valid
cross[col, row, 0] = max(left_len, 1 * (row >= 1) * np.isfinite(image[col, left]))
right_len = 0
right = row
for right in range(row + 1, min(row + len_arms, n_row_)):
if abs(image[col, row] - image[col, right]) >= intensity:
break
right_len += 1
# enforces a minimum support region of 3×3 if pixels are valid
cross[col, row, 1] = max(right_len, 1 * (row < n_row_ - 1) * np.isfinite(image[col, right]))
up_len = 0
up_col = col
for up_col in range(col - 1, max(col - len_arms, -1), -1):
if abs(image[col, row] - image[up_col, row]) >= intensity:
break
up_len += 1
# enforces a minimum support region of 3×3 if pixels are valid
cross[col, row, 2] = max(up_len, 1 * (col >= 1) * np.isfinite(image[up_col, row]))
bot_len = 0
bot = col
for bot in range(col + 1, min(col + len_arms, n_col_)):
if abs(image[col, row] - image[bot, row]) >= intensity:
break
bot_len += 1
# enforces a minimum support region of 3×3 if pixels are valid
cross[col, row, 3] = max(bot_len, 1 * (col < n_col_ - 1) * np.isfinite(image[bot, row]))
return cross