# pylint: disable=invalid-name
#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2026 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of PANDORA
#
# https://github.com/CNES/Pandora
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains functions associated to the cost volume measure step.
"""
# pylint:disable=too-many-branches
import sys
from abc import ABCMeta, abstractmethod
from math import ceil, floor
from typing import Tuple, List, Union, Dict
import operator
from json_checker import And, Or
import numpy as np
import xarray as xr
from scipy.ndimage import binary_dilation
from pandora.profiler import profile
from pandora.margins.descriptors import HalfWindowMargins
from pandora.criteria import mask_invalid_variable_disparity_range, mask_border
from .cpp import matching_cost_cpp
[docs]
class AbstractMatchingCost:
"""
Abstract Matching Cost class
"""
[docs]
matching_cost_methods_avail: Dict = {}
[docs]
_subpix: int | None = None
[docs]
_window_size: int | None = None
[docs]
cfg: Dict[str, Union[str, int]] | None = None
[docs]
_band: str | None = None
[docs]
_step_col: int | None = None
[docs]
_method: str | None = None
[docs]
_spline_order: int | None = None
# Default configuration, do not change these values
[docs]
_BAND: str | None = None
# Matching cost schema confi
[docs]
schema = {
"subpix": And(int, lambda sp: sp in [1, 2, 4]),
"band": Or(str, lambda input: input is None),
"step": And(int, lambda y: y >= 1),
"spline_order": And(int, lambda y: 1 <= y <= 5),
}
[docs]
margins = HalfWindowMargins()
[docs]
ops = {"+": operator.add, "-": operator.sub}
def __new__(cls, **cfg: Union[str, int]):
"""
Return the plugin associated with the matching_cost_method given in the configuration
:param cfg: configuration {'matching_cost_method': value}
:type cfg: dictionary
"""
if cls is AbstractMatchingCost:
if isinstance(cfg["matching_cost_method"], str):
try:
return super(AbstractMatchingCost, cls).__new__(
cls.matching_cost_methods_avail[cfg["matching_cost_method"]]
)
except:
raise KeyError("No matching cost method named {} supported".format(cfg["matching_cost_method"]))
else:
if isinstance(cfg["matching_cost_method"], unicode): # type: ignore # pylint:disable=undefined-variable
# creating a plugin from registered short name given as unicode (py2 & 3 compatibility)
try:
return super(AbstractMatchingCost, cls).__new__(
cls.matching_cost_methods_avail[cfg["matching_cost_method"].encode("utf-8")]
)
except:
raise KeyError("No matching cost method named {} supported".format(cfg["matching_cost_method"]))
else:
return super(AbstractMatchingCost, cls).__new__(cls)
return None
@classmethod
[docs]
def register_subclass(cls, short_name: str, *args):
"""
Allows to register the subclass with its short name
:param short_name: the subclass to be registered
:type short_name: string
:param args: allows to register one plugin that contains different methods
"""
def decorator(subclass):
"""
Registers the subclass in the available methods
:param subclass: the subclass to be registered
:type subclass: object
"""
cls.matching_cost_methods_avail[short_name] = subclass
for arg in args:
cls.matching_cost_methods_avail[arg] = subclass
return subclass
return decorator
[docs]
def desc(self) -> None:
"""
Describes the matching cost method
:return: None
"""
print(f"{self._method} similarity measure")
[docs]
def instantiate_class(self, **cfg: Union[str, int]) -> None:
"""
:param cfg: optional configuration, {'window_size': int, 'subpix': int,
'band': str}
:type cfg: dictionary
:return: None
"""
self.cfg = self.check_conf(**cfg) # type: ignore
self._window_size = int(self.cfg["window_size"])
self._subpix = int(self.cfg["subpix"])
self._band = self.cfg["band"] # type: ignore
self._step_col = int(self.cfg["step"])
self._method = str(self.cfg["matching_cost_method"])
self._spline_order = int(self.cfg["spline_order"])
# Remove spline_order key because it is a pandora2d setting and a need
del self.cfg["spline_order"]
[docs]
def check_conf(self, **cfg: Dict[str, Union[str, int]]) -> Dict[str, Union[str, int]]:
"""
Add default values to the dictionary if there are missing elements and check if the dictionary is correct
:param cfg: matching cost configuration
:type cfg: dict
:return cfg: matching cost configuration updated
:rtype: dict
"""
# Give the default value if the required element is not in the conf
if "window_size" not in cfg:
cfg["window_size"] = self._WINDOW_SIZE # type: ignore
if "subpix" not in cfg:
cfg["subpix"] = self._SUBPIX # type: ignore
if "band" not in cfg:
cfg["band"] = self._BAND # type: ignore
if "pandora2d" not in sys.modules:
if "step" in cfg and cfg["step"] != 1:
raise ValueError("Step parameter cannot be different from 1")
if "step" not in cfg:
cfg["step"] = self._STEP_COL # type: ignore
if "spline_order" not in cfg:
cfg["spline_order"] = self._SPLINE_ORDER # type: ignore
return cfg # type: ignore
@abstractmethod
[docs]
def compute_cost_volume(
self,
img_left: xr.Dataset,
img_right: xr.Dataset,
cost_volume: xr.Dataset,
) -> xr.Dataset:
"""
Computes the cost volume for a pair of images
:param img_left: left Dataset image containing :
- im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32
- disparity (optional): 3D (disp, row, col) xarray.DataArray float32
- msk (optional): 2D (row, col) xarray.DataArray int16
- classif (optional): 3D (band_classif, row, col) xarray.DataArray int16
- segm (optional): 2D (row, col) xarray.DataArray int16
- edges (optional): 2D (row, col) xarray.DataArray int16
:type img_left: xarray.Dataset
:param img_right: right Dataset image containing :
- im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32
- disparity (optional): 3D (disp, row, col) xarray.DataArray float32
- msk (optional): 2D (row, col) xarray.DataArray int16
- classif (optional): 3D (band_classif, row, col) xarray.DataArray int16
- segm (optional): 2D (row, col) xarray.DataArray int16
- edges (optional): 2D (row, col) xarray.DataArray int16
:type img_right: xarray.Dataset
:param cost_volume: an empty cost volume
:type cost_volume: xr.Dataset
:return: the cost volume dataset , with the data variables:
- cost_volume 3D xarray.DataArray (row, col, disp)
:rtype: xarray.Dataset
"""
@staticmethod
[docs]
def get_coordinates(margin: int, img_coordinates: np.ndarray, step: int) -> np.ndarray:
"""
In the case of a ROI, computes the right coordinates to be sure to process the first point of the ROI.
:param margin: ROI margin
:type margin: int
:param img_coordinates: coordinates of the ROI with margins
:type img_coordinates: np.ndarray
:param step: matching cost step value
:type step: int
:return: a np.ndarray that contains the right coordinates
:rtype: np.ndarray
"""
index_compute = np.arange(img_coordinates[0], img_coordinates[-1] + 1, step) # type: np.ndarray
# Check if the first column of roi is inside the final CV (with the step)
if margin % step != 0:
if margin < step:
# For example, given left_margin = 2 and step = 3
#
# roi_img = M M M M M M M
# M M 1 2 3 M M
# M M 4 5 6 M M
# M M 7 8 9 M M
# M M M M M M M
#
# Our starting point would be at index left_margin = 2 --> starting point = 1st point of ROI
# We are directly on the first point to compute
index_compute = np.arange(img_coordinates[0] + margin, img_coordinates[-1] + 1, step)
else:
# For example, given left_margin = 3 and step = 2
#
# roi_img = M M M M M M M M M
# M M M 1 2 3 M M M
# M M M 4 5 6 M M M
# M M M 7 8 9 M M M
# M M M M M M M M M
#
# Our starting point would be at index 1 --> starting point = 2nd M
# With a step of 2, the first point of ROI is calculated without cropping margins too much
#
# For example, given left_margin = 4 and step = 3
#
# roi_img = M M M M M M M M M M M
# M M M M 1 2 3 M M M M
# M M M M 4 5 6 M M M M
# M M M M 7 8 9 M M M M
# M M M M M M M M M M M
#
# Our starting point would be at index 1 --> starting point = 2nd M
# With a step of 3, the first point of ROI is calculated without cropping margins too much
# give the number of the first column to compute
start = step - (AbstractMatchingCost.find_nearest_multiple_of_step(margin, step) - margin)
index_compute = np.arange(img_coordinates[0] + start, img_coordinates[-1] + 1, step)
return index_compute
[docs]
def grid_estimation(
self, img: xr.Dataset, cfg: Union[Dict[str, dict], None], disparity_grids: Tuple[np.ndarray, np.ndarray]
) -> xr.Dataset:
"""
Estimation of the grid xarray dataset that will store the cost volume.
:param img: left Dataset image
:type img: xarray.Dataset
:param cfg: user configuration
:type cfg: dict
:param disparity_grids: Tuple of disparity grids min and max
:type disparity_grids: Tuple[np.ndarray, np.ndarray]
:return: a grid with coordinates and a attributes list with:
- indexs of columns to compute
- size
- sampling interval
:rtype: xarray.Dataset
"""
# Get col dimension
c_col = img["im"].coords["col"].values
# Get the index of the columns that should be computed
if cfg and "ROI" in cfg:
index_compute_col = self.get_coordinates(cfg["ROI"]["margins"][0], c_col, self._step_col)
else:
index_compute_col = np.arange(c_col[0], c_col[-1] + 1, self._step_col) # type: np.ndarray # type: ignore
# get disparity_range
disparity_min, disparity_max = self.get_min_max_from_grid(*disparity_grids)
disparity_range = self.get_disparity_range(disparity_min, disparity_max, self._subpix)
# Instantiate grid
grid = xr.Dataset(
{},
coords={"row": img["im"].coords["row"], "col": index_compute_col, "disp": disparity_range},
)
# Add img attributes
grid.attrs = img.attrs
# Add step in grid attributes
grid.attrs["sampling_interval"] = self._step_col
# Add index of columns to compute in grid attributes
grid.attrs["col_to_compute"] = index_compute_col
return grid
@profile("matching_cost.allocate_cost_volume")
[docs]
def allocate_cost_volume(
self, image: xr.Dataset, disparity_grids: Tuple[np.ndarray, np.ndarray], cfg: Dict = None
) -> xr.Dataset:
"""
Create a cost_volume dataset.
:param image: Image to compute cost volume from
:type image: xr.Dataset
:param cfg: user configuration
:type cfg: Dict
:param disparity_grids: Tuple of disparity grids min and max
:type disparity_grids: Tuple[np.ndarray, np.ndarray]
:return: a empty grid
:rtype: xr.Dataset
"""
cv = self.grid_estimation(image, cfg, disparity_grids)
cv["cost_volume"] = (
["row", "col", "disp"],
np.full((cv.sizes["row"], cv.sizes["col"], cv.sizes["disp"]), np.nan, dtype=np.float32),
)
cv.attrs.update(
{
"window_size": self._window_size,
"subpixel": self._subpix,
"band_correl": self._band,
"offset_row_col": int((self._window_size - 1) / 2),
"measure": self._method,
}
)
return cv
@staticmethod
[docs]
def get_disparity_range(disparity_min: int, disparity_max: int, subpix: int) -> np.ndarray:
"""
Build disparity range and return it.
:param disparity_min: minimum disparity
:type disparity_min: int
:param disparity_max: maximum disparity
:type disparity_max: int
:param subpix: subpixel precision = (1 or 2 or 4)
:return: disparity range
:rtype: np.ndarray
"""
if subpix == 1:
disparity_range = np.arange(disparity_min, disparity_max + 1)
else:
disparity_range = np.arange(disparity_min, disparity_max, 1 / float(subpix), dtype=np.float64)
disparity_range = np.append(disparity_range, [disparity_max])
return disparity_range
[docs]
def point_interval(
self, img_left: xr.Dataset, img_right: xr.Dataset, disp: float
) -> Tuple[Tuple[int, int], Tuple[int, int]]:
"""
Computes the range of points over which the similarity measure will be applied
:param img_left: left Dataset image containing :
- im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32
- disparity (optional): 3D (disp, row, col) xarray.DataArray float32
- msk (optional): 2D (row, col) xarray.DataArray int16
- classif (optional): 3D (band_classif, row, col) xarray.DataArray int16
- segm (optional): 2D (row, col) xarray.DataArray int16
- edges (optional): 2D (row, col) xarray.DataArray int16
:type img_left: xarray.Dataset
:param img_right: right Dataset image containing :
- im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32
- disparity (optional): 3D (disp, row, col) xarray.DataArray float32
- msk (optional): 2D (row, col) xarray.DataArray int16
- classif (optional): 3D (band_classif, row, col) xarray.DataArray int16
- segm (optional): 2D (row, col) xarray.DataArray int16
- edges (optional): 2D (row, col) xarray.DataArray int16
:type img_right: xarray.Dataset
:param disp: current disparity
:type disp: float
:return: the range of the left and right image over which the similarity measure will be applied
:rtype: tuple
"""
nx_left = int(img_left.sizes["col"])
nx_right = int(img_right.sizes["col"])
# range in the left image
# if disp is outside the image, point_p corresponds to an empty range
if abs(disp) > nx_left:
point_p = (nx_left, nx_left)
else:
point_p = (max(0 - disp, 0), min(nx_left - disp, nx_left))
# range in the right image
# if disp is outside the image, point_q corresponds to an empty range
if abs(disp) > nx_right:
point_q = (nx_right, nx_right)
else:
point_q = (max(0 + disp, 0), min(nx_right + disp, nx_right))
# Because the disparity can be floating
if disp < 0:
point_p = (int(ceil(point_p[0])), int(ceil(point_p[1])))
point_q = (int(ceil(point_q[0])), int(ceil(point_q[1])))
else:
point_p = (int(floor(point_p[0])), int(floor(point_p[1])))
point_q = (int(floor(point_q[0])), int(floor(point_q[1])))
return point_p, point_q
@staticmethod
[docs]
def masks_dilatation(
img_left: xr.Dataset, img_right: xr.Dataset, window_size: int, subp: int
) -> Tuple[xr.DataArray, List[xr.DataArray]]:
"""
Return the left and right mask with the convention :
- Invalid pixels are nan
- No_data pixels are nan
- Valid pixels are 0
Apply dilation on no_data : if a pixel contains a no_data in its aggregation window, then the central pixel
becomes no_data
:param img_left: left Dataset image containing :
- im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32
- disparity (optional): 3D (disp, row, col) xarray.DataArray float32
- msk (optional): 2D (row, col) xarray.DataArray int16
- classif (optional): 3D (band_classif, row, col) xarray.DataArray int16
- segm (optional): 2D (row, col) xarray.DataArray int16
- edges (optional): 2D (row, col) xarray.DataArray int16
:type img_left: xarray.Dataset
:param img_right: right Dataset image containing :
- im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32
- disparity (optional): 3D (disp, row, col) xarray.DataArray float32
- msk (optional): 2D (row, col) xarray.DataArray int16
- classif (optional): 3D (band_classif, row, col) xarray.DataArray int16
- segm (optional): 2D (row, col) xarray.DataArray int16
- edges (optional): 2D (row, col) xarray.DataArray int16
:type img_right: xarray.Dataset
:param window_size: window size of the measure
:type window_size: int
:param subp: subpixel precision = (1 or 2 or 4)
:type subp: int
:return: the left mask and the right masks:
- left mask : xarray.DataArray msk 2D(row, col)
- right mask : xarray.DataArray msk 2D(row, col)
- right mask shifted : xarray.DataArray msk 2D(row, shifted col by 0.5)
:rtype: tuple (left mask, list[right mask, right mask shifted by 0.5])
"""
# Create the left mask with the convention : 0 = valid, nan = invalid and no_data
if "msk" in img_left.data_vars:
dilatate_left_mask = np.zeros(img_left["msk"].shape)
# Invalid pixels are nan
dilatate_left_mask[
np.where(
(img_left["msk"].data != img_left.attrs["valid_pixels"])
& (img_left["msk"].data != img_left.attrs["no_data_mask"])
)
] = np.nan
# Dilatation : pixels that contains no_data in their aggregation window become no_data = nan
dil = binary_dilation(
img_left["msk"].data == img_left.attrs["no_data_mask"],
structure=np.ones((window_size, window_size)),
iterations=1,
)
dilatate_left_mask[dil] = np.nan
else:
# All pixels are valid
dilatate_left_mask = np.zeros((img_left.sizes["row"], img_left.sizes["col"]))
# Create the right mask with the convention : 0 = valid, nan = invalid and no_data
if "msk" in img_right.data_vars:
dilatate_right_mask = np.zeros(img_right["msk"].shape)
# Invalid pixels are nan
dilatate_right_mask[
np.where(
(img_right["msk"].data != img_right.attrs["valid_pixels"])
& (img_right["msk"].data != img_right.attrs["no_data_mask"])
)
] = np.nan
# Dilatation : pixels that contains no_data in their aggregation window become no_data = nan
dil = binary_dilation(
img_right["msk"].data == img_right.attrs["no_data_mask"],
structure=np.ones((window_size, window_size)),
iterations=1,
)
dilatate_right_mask[dil] = np.nan
else:
# All pixels are valid
dilatate_right_mask = np.zeros((img_left.sizes["row"], img_left.sizes["col"]))
nx_ = img_left.sizes["col"]
row = img_left.coords["row"]
col = img_left.coords["col"]
# Shift the right mask, for sub-pixel precision. If an no_data or invalid pixel was used to create the
# sub-pixel point, then the sub-pixel point is invalidated / no_data.
dilatate_right_mask_shift = xr.DataArray()
if subp != 1:
# Since the interpolation of the right image is of order 1, the shifted right mask corresponds
# to an aggregation of two columns of the dilated right mask
str_row, str_col = dilatate_right_mask.strides
shape_windows = (
dilatate_right_mask.shape[0],
dilatate_right_mask.shape[1] - 1,
2,
)
strides_windows = (str_row, str_col, str_col)
aggregation_window = np.lib.stride_tricks.as_strided(dilatate_right_mask, shape_windows, strides_windows)
dilatate_right_mask_shift = np.sum(aggregation_window, 2)
# Whatever the sub-pixel precision, only one sub-pixel mask is created,
# since 0.5 shifted mask == 0.25 shifted mask
col_shift = np.arange(col.values[0] + 0.5, col.values[0] + nx_ - 1, step=1) # type: np.ndarray
dilatate_right_mask_shift = xr.DataArray(
dilatate_right_mask_shift, coords=[row, col_shift], dims=["row", "col"]
)
dilatate_left_mask_xr = xr.DataArray(dilatate_left_mask, coords=[row, col], dims=["row", "col"])
dilatate_right_mask_xr = xr.DataArray(dilatate_right_mask, coords=[row, col], dims=["row", "col"])
return dilatate_left_mask_xr, [dilatate_right_mask_xr, dilatate_right_mask_shift]
@staticmethod
[docs]
def get_min_max_from_grid(disp_min: np.ndarray, disp_max: np.ndarray) -> Tuple[int, int]:
"""
Find the smallest disparity present in disp_min, and the highest disparity present in disp_max
:param disp_min: minimum disparity
:type disp_min: np.ndarray
:param disp_max: maximum disparity
:type disp_max: np.ndarray
:return: dmin_min: the smallest disparity in disp_min, dmax_max: the highest disparity in disp_max
:rtype: Tuple(int, int)
"""
return int(np.nanmin(disp_min)), int(np.nanmax(disp_max))
@staticmethod
[docs]
def find_nearest_multiple_of_step(value: int, step: int) -> int:
"""
In case value is not a multiple of step, find nearest greater value for which it is the case.
:param value: Initial value.
:type: value: int
:param step: matching cost step value
:type step: int
:return: nearest multiple of step.
:rtype: int
"""
while value % step != 0:
value += 1
return value
[docs]
def find_nearest_column(self, value: int, index: np.ndarray, s: str = "+") -> int:
"""
Find the nearest number in a list in ascending or descending order
:param value: Initial value.
:type: value: int
:param index: List with all values
:type: index: np.ndarray
:param s: operator more ('+') or less ('-')
:type: s: str
:return: the value in column_index
:rtype: int
"""
shift = 1 / self._subpix
while value not in index:
value = self.ops[s](value, shift)
if s == "+" and value > index[-1]:
value = index[-1]
break
if s == "-" and value < index[0]:
value = index[0]
break
return value
[docs]
def mask_column_interval_without_step(
self, cost_volume: xr.Dataset, coord_mask_right: np.ndarray, disp: float
) -> Tuple[np.ndarray, np.ndarray]:
"""
Computes the index columns over which the similarity measure will be applied
:param cost_volume: the cost_volume DataSet with the data variables:
- cost_volume 3D xarray.DataArray (row, col, disp)
:type cost_volume: xarray.Dataset
:param coord_mask_right: coordinate of right mask
:type coord_mask_right: np.ndarray
:param disp: current disparity
:type disp: float
:return: the range of the left and right image over which the similarity measure will be applied
:rtype: tuple
"""
# Get coordinates
coords_column_left = cost_volume.coords["col"].data
ind = int((disp % 1) * self._subpix)
if self._subpix != 1 and ind != 0:
coords_column_right = coord_mask_right
else:
coords_column_right = cost_volume.coords["col"].data
# range in the left image
point_p = (
max(coords_column_left[0] - disp, coords_column_left[0]),
min(coords_column_left[-1] - disp, coords_column_left[-1]),
)
# range in the right image
point_q = (
max(coords_column_right[0] + disp, coords_column_right[0]),
min(coords_column_right[-1] + disp, coords_column_right[-1]),
)
# Only look for the right interval if it exists
if self._subpix != 1 and point_p[0] <= point_p[1]:
if disp < 0:
point_p = (self.find_nearest_column(point_p[0], coords_column_left, "+"), point_p[1])
point_q = (point_q[0], self.find_nearest_column(point_q[1], coords_column_right, "+"))
else:
point_p = (point_p[0], self.find_nearest_column(point_p[1], coords_column_left, "-"))
point_q = (self.find_nearest_column(point_q[0], coords_column_right, "-"), point_q[1])
# if disparity is outside the image, returned column_interval_left and column_interval right are empty
if abs(disp) > len(coords_column_right):
column_interval_left = np.array([])
column_interval_right = np.array([])
else:
column_interval_left = np.arange(point_p[0], point_p[-1] + 1)
column_interval_right = np.arange(point_q[0], point_q[-1] + 1)
return column_interval_left, column_interval_right
[docs]
def mask_column_interval(
self, cost_volume: xr.Dataset, coord_mask_left: np.ndarray, coord_mask_right: np.ndarray, disp: float
) -> Tuple[np.ndarray, np.ndarray]:
"""
Computes the index columns over which the similarity measure will be applied
:param cost_volume: the cost_volume DataSet with the data variables:
- cost_volume 3D xarray.DataArray (row, col, disp)
:type cost_volume: xarray.Dataset
:param coord_mask_right: coordinate of right mask
:type coord_mask_right: np.ndarray
:param disp: current disparity
:type disp: float
:return: the range of the left and right image over which the similarity measure will be applied
:rtype: tuple
"""
# OLD METHOD
if self._step_col == 1:
return self.mask_column_interval_without_step(cost_volume, coord_mask_right, disp)
# range in the left image
p0 = cost_volume.coords["col"].data[0]
p1 = cost_volume.coords["col"].data[-1]
if disp < 0:
idx = 1
while (p0 + disp) < coord_mask_left[0]:
# Check coordinates list exist
if cost_volume.coords["col"].data[idx:].size == 0:
p0 = p1 + self._step_col
break
p0 = self.find_nearest_column(p0, cost_volume.coords["col"].data[idx:])
idx += 1
if disp > 0:
idx = cost_volume.sizes["col"] - 1
while (p1 + disp) > coord_mask_left[-1]:
# Check coordinates list exist
if cost_volume.coords["col"].data[:idx].size == 0:
p1 = p0 - self._step_col
break
p1 = self.find_nearest_column(p1, cost_volume.coords["col"].data[:idx], "-")
idx -= 1
column_interval_left = np.arange(p0, p1 + self._step_col, self._step_col)
# range in the right image
# there is a small change to the disp variable because there is no additional mask to create
# when slef._subpix = 4 is used. In other words, the mask created for .5 is the same as for .25 and .75.
if self._subpix == 4 and disp % 1:
if disp < 0:
disp = ceil(disp) - 0.5
else:
disp = floor(disp) + 0.5
column_interval_right = column_interval_left + disp
return column_interval_left, column_interval_right
@profile("matching_cost.cv_masked")
[docs]
def cv_masked(
self,
img_left: xr.Dataset,
img_right: xr.Dataset,
cost_volume: xr.Dataset,
disp_min: np.ndarray,
disp_max: np.ndarray,
) -> None:
"""
Masks the cost volume by:
- Setting entire disparity range to NaN for pixels masked in the
left image (including the window of matching cost when left pixel is NaN)
- Setting disparities outside local [disp_min, disp_max] range to NaN
- Setting disparities to NaN if corresponding right pixel is
masked (including the window of matching cost when right pixel is NaN)
:param img_left: left Dataset image containing :
- im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32
- disparity (optional): 3D (disp, row, col) xarray.DataArray float32
- msk (optional): 2D (row, col) xarray.DataArray int16
- classif (optional): 3D (band_classif, row, col) xarray.DataArray int16
- segm (optional): 2D (row, col) xarray.DataArray int16
- edges (optional): 2D (row, col) xarray.DataArray int16
:type img_left: xarray.Dataset
:param img_right: right Dataset image containing :
- im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32
- disparity (optional): 3D (disp, row, col) xarray.DataArray float32
- msk (optional): 2D (row, col) xarray.DataArray int16
- classif (optional): 3D (band_classif, row, col) xarray.DataArray int16
- segm (optional): 2D (row, col) xarray.DataArray int16
- edges (optional): 2D (row, col) xarray.DataArray int16
:type img_right: xarray.Dataset
:param cost_volume: the cost_volume DataSet with the data variables:
- cost_volume 3D xarray.DataArray (row, col, disp)
:type cost_volume: xarray.Dataset
:param disp_min: minimum disparity
:type disp_min: np.ndarray
:param disp_max: maximum disparity
:type disp_max: np.ndarray
:return: None
"""
dmin, _ = self.get_min_max_from_grid(disp_min, disp_max)
# ----- Masking invalid pixels and local disparity range -----
# Computes the validity mask of the cost volume : invalid pixels or no_data are masked with the value nan.
# Valid pixels are = 0
mask_left, mask_right = self.masks_dilatation(img_left, img_right, self._window_size, self._subpix)
# Depending on subpix, create an empty mask or use the shifted right mask
if self._subpix > 1:
mask_right_shift = mask_right[1].data.astype(np.float32)
else:
mask_right_shift = np.empty((0, 0), dtype=np.float32)
# mask the cost volume
matching_cost_cpp.cv_masked(
cost_volume["cost_volume"].data,
mask_left.data,
mask_right[0].data,
mask_right_shift,
disp_min.astype(np.float32),
disp_max.astype(np.float32),
cost_volume.coords["disp"].data.astype(np.float32),
dmin,
self._subpix,
)
# The disp_min and disp_max used to search missing disparity interval are not the local disp_min and disp_max
# in case of a variable range of disparities. So there may be pixels that have missing disparity range (all
# cost are np.nan), but are not detected in the validity_mask function. To find the pixels that have a missing
# disparity range, we search in the cost volume pixels where cost_volume(row,col, for all d) = np.nan
mask_invalid_variable_disparity_range(cost_volume)
# Mask border pixels
offset = cost_volume.attrs["offset_row_col"]
if offset > 0:
mask_border(cost_volume)
@profile("matching_cost.allocate_numpy_cost_volume")
[docs]
def allocate_numpy_cost_volume(self, img_left: xr.Dataset, disparity_range: Union[np.ndarray, List]) -> np.ndarray:
"""
Allocate the numpy cost volume cv = (disp, col, row), for efficient memory management
:param img_left: left Dataset image containing :
- im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32
- disparity (optional): 3D (disp, row, col) xarray.DataArray float32
- msk (optional): 2D (row, col) xarray.DataArray int16
- classif (optional): 3D (band_classif, row, col) xarray.DataArray int16
- segm (optional): 2D (row, col) xarray.DataArray int16
- edges (optional): 2D (row, col) xarray.DataArray int16
:type img_left: xarray.Dataset
:param disparity_range: disparity range
:type disparity_range: np.ndarray
:param offset_row_col: offset in row and col
:type offset_row_col: int
:return: the cost volume dataset , with the data variables:
- cost_volume 3D xarray.DataArray (row, col, disp)
:rtype: xarray.Dataset
"""
return np.full(
(len(disparity_range), int(img_left.sizes["col"]), int(img_left.sizes["row"])),
np.nan,
dtype=np.float32,
)
@staticmethod
@profile("matching_cost.crop_cost_volume")
[docs]
def crop_cost_volume(cost_volume: np.ndarray, offset: int = 0) -> np.ndarray:
"""
Return a cropped view of cost_volume.
If offset, do not consider border position for cost computation.
:param cost_volume: cost volume to crop
:type cost_volume: np.ndarray
:param offset: offset used to crop cost volume
:type offset: int
:return: cropped view of cost_volume.
:rtype: np.ndarray
"""
return cost_volume[:, offset:-offset, offset:-offset] if offset else cost_volume
@staticmethod
@profile("matching_cost.reverse_cost_volume")
[docs]
def reverse_cost_volume(left_cv: np.ndarray, disp_min: int) -> np.ndarray:
"""
Create the right cost volume from the left cost volume, by reindexing.
(i,j,d) -> (i, j + d, -d)
:param left_cv: the 3D cost_colume data array, with dimensions row, col, disp
:type left_cv: np.ndarray(dtype=float32)
:param disp_min: the minimum of the right disparities
:type min_disp: int64
:return: The right cost volume data
:rtype: 3D np.ndarray of type float32
"""
return matching_cost_cpp.reverse_cost_volume(left_cv, disp_min)
@staticmethod
@profile("matching_cost.reverse_disp_range")
[docs]
def reverse_disp_range(left_min: np.ndarray, left_max: np.ndarray) -> np.ndarray:
"""
Create the right disp ranges from the left disp ranges
:param left_min: the 2D left disp min array, with dimensions row, col
:type left_min: np.ndarray(dtype=float32)
:param left_max: the 2D left disp max array, with dimensions row, col
:type left_max: np.ndarray(dtype=float32)
:return: The min and max disp ranges for the right image
:rtype: Tuple[np.ndarray(dtype=float32), np.ndarray(dtype=float32)]
"""
return matching_cost_cpp.reverse_disp_range(left_min, left_max)