"""
Core spectral bleed-through correction routines.
Author: Fabrizio Musacchio
Date: June 2026
"""
# %% IMPORTS
from __future__ import annotations
import json
import warnings
from pathlib import Path
import numpy as np
from .estimation import (
DEFAULT_ALPHA_MAX,
DEFAULT_MAX_ALPHA_VOXELS,
DEFAULT_MI_BINS,
DEFAULT_RANDOM_STATE,
MIN_MASK_VOXELS,
SUPPORTED_ALPHA_ESTIMATION_METHODS,
estimate_alpha_from_volume,
estimate_picasso_unmixing_matrix_from_volume,
)
from .io import CANONICAL_AXIS_ORDER, load_stack_with_omio, write_stack_with_omio
from .picasso_impl import (
build_source_sink_matrix,
DEFAULT_PICASSO_ALPHA_CLIP,
DEFAULT_PICASSO_BIN_FACTOR,
DEFAULT_PICASSO_CLIP_EVERY_N_ITERATIONS,
DEFAULT_PICASSO_IMPLEMENTATION,
DEFAULT_PICASSO_NEGATIVITY_THRESHOLD,
DEFAULT_PICASSO_QN,
DEFAULT_PICASSO_STEP_SIZE,
DEFAULT_SOURCE_SINK_JOINT_OPTIMIZATION,
DEFAULT_SOURCE_SINK_MAX_BACKGROUND,
DEFAULT_SOURCE_SINK_N_RESTARTS,
DEFAULT_SOURCE_SINK_OPTIMIZE_BACKGROUND,
default_source_sink_matrix,
apply_source_sink_parameters,
run_picasso_matlab_like,
run_source_sink_unmixing,
apply_matlab_incremental_sequence,
validate_picasso_implementation,
validate_source_sink_matrix,
)
# %% CONSTANTS
ALPHA_MODES = {"fixed", "reference_t", "per_t"}
SUPPORTED_UNMIX_METHODS = {"manual", *SUPPORTED_ALPHA_ESTIMATION_METHODS}
SUPPORTED_PICASSO_METHODS = {"picasso"}
PICASSO_ALPHA_MODES = {"reference_t", "per_t"}
# %% INTERNAL HELPERS
[docs]
def report_path_from_output_path(output_path: str | Path) -> Path:
"""Return the JSON sidecar path used for reproducibility metadata."""
output_path = Path(output_path)
return output_path.with_suffix(output_path.suffix + ".json")
def _print_verbose(verbose: bool, message: str) -> None:
"""Print a status message only when verbose mode is enabled."""
if verbose:
print(message)
def _validate_channel_index(name: str, channel: int, channel_count: int) -> int:
"""Validate a channel index against the available channel count."""
channel = int(channel)
if not 0 <= channel < channel_count:
raise ValueError(
f"{name} must be between 0 and {channel_count - 1}. Got {channel!r}."
)
return channel
def _validate_alpha_mode(alpha_mode: str) -> str:
"""Normalize and validate an explicitly requested scalar-alpha mode."""
alpha_mode = str(alpha_mode).strip().lower()
if alpha_mode not in ALPHA_MODES:
raise ValueError(
f"alpha_mode must be one of {sorted(ALPHA_MODES)}. Got {alpha_mode!r}."
)
return alpha_mode
def _resolve_unmix_alpha_mode(
*,
alpha_mode,
alpha,
method: str,
alpha_reference_t: int,
verbose: bool,
) -> tuple[str | None, str]:
"""
Resolve the effective alpha mode for ``unmix(...)``.
If ``alpha_mode`` is ``None``, the workflow uses ``"fixed"`` whenever a
user-provided ``alpha`` is available. Otherwise, ``method="manual"``
raises an error and all other methods default to ``"reference_t"`` with
the supplied ``alpha_reference_t``.
"""
if alpha_mode is not None:
normalized_mode = _validate_alpha_mode(alpha_mode)
return normalized_mode, normalized_mode
if alpha is not None:
_print_verbose(
verbose,
"alpha_mode was not provided; using alpha_mode='fixed' because alpha was provided.",
)
return None, "fixed"
if method == "manual":
raise ValueError(
"method='manual' requires a user-provided alpha when alpha_mode is None. "
"Provide alpha or set alpha_mode='reference_t' or alpha_mode='per_t'."
)
_print_verbose(
verbose,
(
"alpha_mode was not provided; using alpha_mode='reference_t' with "
f"alpha_reference_t={int(alpha_reference_t)} because no alpha was provided "
f"and method='{method}'."
),
)
return None, "reference_t"
def _validate_picasso_alpha_mode(alpha_mode: str) -> str:
"""Validate the subset of alpha modes currently supported by ``unmix_picasso``."""
alpha_mode = _validate_alpha_mode(alpha_mode)
if alpha_mode not in PICASSO_ALPHA_MODES:
raise ValueError(
"unmix_picasso currently supports only alpha_mode='reference_t' or "
f"'per_t'. Got {alpha_mode!r}."
)
return alpha_mode
def _validate_unmix_method(method: str) -> str:
"""Normalize and validate a two-channel unmixing method name."""
method = str(method).strip().lower()
if method not in SUPPORTED_UNMIX_METHODS:
raise ValueError(
f"method must be one of {sorted(SUPPORTED_UNMIX_METHODS)}. Got {method!r}."
)
return method
def _validate_picasso_method(method: str) -> str:
"""Normalize and validate the multi-channel blind-unmixing method name."""
method = str(method).strip().lower()
if method not in SUPPORTED_PICASSO_METHODS:
raise ValueError(
f"method must be one of {sorted(SUPPORTED_PICASSO_METHODS)}. Got {method!r}."
)
return method
def _validate_alpha_value(alpha: float) -> float:
"""Validate a user-provided fixed alpha value."""
alpha = float(alpha)
if not np.isfinite(alpha) or alpha < 0.0:
raise ValueError(
f"alpha must be finite and >= 0 for manual/fixed unmixing. Got {alpha!r}."
)
return alpha
def _validate_common_estimation_parameters(
*,
alpha_max: float,
mi_bins: int,
min_mask_voxels: int,
) -> tuple[float, int, int]:
"""Validate alpha-estimation parameters shared across multiple workflows."""
alpha_max = float(alpha_max)
if alpha_max <= 0.0:
raise ValueError(f"alpha_max must be > 0. Got {alpha_max!r}.")
mi_bins = int(mi_bins)
if mi_bins < 2:
raise ValueError(f"mi_bins must be >= 2. Got {mi_bins!r}.")
min_mask_voxels = int(min_mask_voxels)
if min_mask_voxels < 1:
raise ValueError(f"min_mask_voxels must be >= 1. Got {min_mask_voxels!r}.")
return alpha_max, mi_bins, min_mask_voxels
def _validate_channels(channels, channel_count: int) -> list[int]:
"""Normalize and validate the channel subset used for PICASSO-style unmixing."""
if channels is None:
return list(range(channel_count))
normalized = [_validate_channel_index("channel", channel, channel_count) for channel in channels]
if len(normalized) < 2:
raise ValueError("channels must contain at least two distinct channel indices.")
if len(set(normalized)) != len(normalized):
raise ValueError(f"channels must not contain duplicates. Got {normalized!r}.")
return normalized
def _resolve_reverse_parameter(forward_value, reverse_value):
"""Return a reverse-direction override or fall back to the forward value."""
return forward_value if reverse_value is None else reverse_value
def _estimate_reference_alpha(
stack: np.ndarray,
*,
alpha_reference_t: int,
source_channel: int,
target_channel: int,
signal_percentile: float,
target_low_percentile: float | None,
background_percentile: float,
method: str,
preprocess_alpha_inputs: bool,
alpha_max: float,
mi_bins: int,
max_alpha_voxels: int | None,
random_state: int,
min_mask_voxels: int,
) -> tuple[float, dict]:
"""Estimate one scalar alpha from all Z slices of a chosen reference time point."""
time_count = int(stack.shape[0])
if not 0 <= int(alpha_reference_t) < time_count:
raise ValueError(
f"alpha_reference_t must be between 0 and {time_count - 1}. "
f"Got {alpha_reference_t!r}."
)
source = stack[int(alpha_reference_t), :, source_channel, :, :]
target = stack[int(alpha_reference_t), :, target_channel, :, :]
return estimate_alpha_from_volume(
source=source,
target=target,
signal_percentile=signal_percentile,
target_low_percentile=target_low_percentile,
background_percentile=background_percentile,
min_mask_voxels=min_mask_voxels,
method=method,
preprocess_alpha_inputs=preprocess_alpha_inputs,
alpha_max=alpha_max,
mi_bins=mi_bins,
max_alpha_voxels=max_alpha_voxels,
random_state=random_state,
return_details=True,
)
def _estimate_per_t_alphas(
stack: np.ndarray,
*,
source_channel: int,
target_channel: int,
signal_percentile: float,
target_low_percentile: float | None,
background_percentile: float,
method: str,
preprocess_alpha_inputs: bool,
alpha_max: float,
mi_bins: int,
max_alpha_voxels: int | None,
random_state: int,
min_mask_voxels: int,
) -> tuple[np.ndarray, list[dict]]:
"""Estimate one scalar alpha per time point using all Z slices of each time point."""
alpha_values = np.empty(stack.shape[0], dtype=np.float32)
details_by_t: list[dict] = []
for t in range(stack.shape[0]):
alpha_value, details = estimate_alpha_from_volume(
source=stack[t, :, source_channel, :, :],
target=stack[t, :, target_channel, :, :],
signal_percentile=signal_percentile,
target_low_percentile=target_low_percentile,
background_percentile=background_percentile,
min_mask_voxels=min_mask_voxels,
method=method,
preprocess_alpha_inputs=preprocess_alpha_inputs,
alpha_max=alpha_max,
mi_bins=mi_bins,
max_alpha_voxels=max_alpha_voxels,
random_state=random_state + int(t),
return_details=True,
)
alpha_values[t] = alpha_value
details_by_t.append({"t": int(t), **details})
return alpha_values, details_by_t
def _estimate_directional_alpha(
stack: np.ndarray,
*,
alpha_mode: str,
alpha,
alpha_reference_t: int,
source_channel: int,
target_channel: int,
signal_percentile: float,
target_low_percentile: float | None,
background_percentile: float,
method: str,
preprocess_alpha_inputs: bool,
alpha_max: float,
mi_bins: int,
max_alpha_voxels: int | None,
random_state: int,
min_mask_voxels: int,
verbose: bool,
direction_label: str,
) -> tuple[float | None, np.ndarray | None, dict | None, list[dict] | None, str, str]:
"""Estimate or validate one directional bleed-through coefficient workflow."""
alpha_scalar: float | None = None
alpha_values: np.ndarray | None = None
alpha_details: dict | None = None
alpha_details_by_t: list[dict] | None = None
alpha_source = "estimated"
method_effective = method
if alpha_mode == "fixed":
if alpha is None:
raise ValueError(
f"alpha must be provided for {direction_label} correction when alpha_mode='fixed'."
)
alpha_scalar = _validate_alpha_value(alpha)
alpha_source = "user_provided"
method_effective = "manual"
_print_verbose(
verbose,
(
f"Using user-provided {direction_label} alpha={alpha_scalar:.6f} for "
f"source_channel={source_channel} -> target_channel={target_channel}."
),
)
elif alpha_mode == "reference_t":
alpha_scalar, alpha_details = _estimate_reference_alpha(
stack,
alpha_reference_t=int(alpha_reference_t),
source_channel=source_channel,
target_channel=target_channel,
signal_percentile=signal_percentile,
target_low_percentile=target_low_percentile,
background_percentile=background_percentile,
method=method,
preprocess_alpha_inputs=bool(preprocess_alpha_inputs),
alpha_max=alpha_max,
mi_bins=mi_bins,
max_alpha_voxels=max_alpha_voxels,
random_state=int(random_state),
min_mask_voxels=min_mask_voxels,
)
_print_verbose(
verbose,
(
f"Estimated {direction_label} reference alpha={alpha_scalar:.6f} from "
f"t={int(alpha_reference_t)} with method='{method}'."
),
)
else:
alpha_values, alpha_details_by_t = _estimate_per_t_alphas(
stack,
source_channel=source_channel,
target_channel=target_channel,
signal_percentile=signal_percentile,
target_low_percentile=target_low_percentile,
background_percentile=background_percentile,
method=method,
preprocess_alpha_inputs=bool(preprocess_alpha_inputs),
alpha_max=alpha_max,
mi_bins=mi_bins,
max_alpha_voxels=max_alpha_voxels,
random_state=int(random_state),
min_mask_voxels=min_mask_voxels,
)
_print_verbose(
verbose,
f"Estimated {direction_label} per-time-point alpha values: "
+ ", ".join(f"{float(value):.6f}" for value in np.asarray(alpha_values)),
)
return (
alpha_scalar,
alpha_values,
alpha_details,
alpha_details_by_t,
alpha_source,
method_effective,
)
def _validate_bidirectional_determinant(
alpha_forward,
alpha_reverse,
*,
context: str,
) -> float:
"""Validate the determinant of the 2x2 bidirectional mixing matrix."""
determinant = 1.0 - float(alpha_forward) * float(alpha_reverse)
if determinant <= 0.0 or not np.isfinite(determinant):
raise ValueError(
f"Invalid bidirectional unmixing matrix for {context}: "
f"determinant = 1 - alpha_forward * alpha_reverse = {determinant!r}. "
"Please choose coefficients with alpha_forward * alpha_reverse < 1."
)
return float(determinant)
def _apply_bidirectional_unmixing(
source_measured: np.ndarray,
target_measured: np.ndarray,
*,
alpha_forward: float,
alpha_reverse: float,
clip_negative: bool,
context: str,
) -> tuple[np.ndarray, np.ndarray, float]:
"""Unmix a two-channel bidirectional linear mixture by 2x2 matrix inversion."""
determinant = _validate_bidirectional_determinant(
alpha_forward,
alpha_reverse,
context=context,
)
corrected_source = (
source_measured - float(alpha_reverse) * target_measured
) / determinant
corrected_target = (
target_measured - float(alpha_forward) * source_measured
) / determinant
if clip_negative:
corrected_source = np.maximum(corrected_source, 0.0)
corrected_target = np.maximum(corrected_target, 0.0)
return corrected_source, corrected_target, determinant
def _cast_output_stack(stack: np.ndarray, output_dtype: str | np.dtype) -> np.ndarray:
"""Cast an output stack while clipping integer targets to their valid range."""
dtype = np.dtype(output_dtype)
if np.issubdtype(dtype, np.integer):
info = np.iinfo(dtype)
stack = np.clip(stack, info.min, info.max)
return stack.astype(dtype, copy=False)
def _write_report(report: dict, report_path: Path) -> Path:
"""Write a JSON sidecar report for a processing run."""
report_path.parent.mkdir(parents=True, exist_ok=True)
with report_path.open("w", encoding="utf-8") as handle:
json.dump(report, handle, indent=2)
handle.write("\n")
return report_path
def _derive_picasso_output_path(input_path: Path) -> Path:
"""Derive a default output path for PICASSO-style unmixing results."""
return input_path.with_name(f"{input_path.stem}_picasso.tif")
def _move_selected_channels_last(stack: np.ndarray, channels: list[int]) -> np.ndarray:
"""Extract selected channels and move channel to the trailing axis."""
selected = np.take(stack, channels, axis=2)
return np.moveaxis(selected, 2, -1)
def _apply_reference_unmixing_matrix(
stack: np.ndarray,
*,
channels: list[int],
matrix: np.ndarray,
) -> np.ndarray:
"""Apply one blind-unmixing matrix to the selected channels of an entire stack."""
moved = _move_selected_channels_last(stack, channels)
unmixed = np.einsum("ij,tzyxj->tzyxi", matrix, moved, optimize=True)
return np.moveaxis(unmixed, -1, 2)
def _apply_per_t_unmixing_matrices(
stack: np.ndarray,
*,
channels: list[int],
matrices: np.ndarray,
) -> np.ndarray:
"""Apply one blind-unmixing matrix per time point to selected stack channels."""
moved = _move_selected_channels_last(stack, channels)
unmixed = np.einsum("tij,tzyxj->tzyxi", matrices, moved, optimize=True)
return np.moveaxis(unmixed, -1, 2)
def _estimate_reference_picasso_matrix(
stack: np.ndarray,
*,
channels: list[int],
alpha_reference_t: int,
background_percentile: float,
preprocess_alpha_inputs: bool,
mi_bins: int,
alpha_max: float,
max_iter: int,
tolerance: float,
max_alpha_voxels: int | None,
random_state: int,
) -> tuple[np.ndarray, dict]:
"""Estimate one blind-unmixing matrix from a chosen reference time point."""
time_count = int(stack.shape[0])
if not 0 <= int(alpha_reference_t) < time_count:
raise ValueError(
f"alpha_reference_t must be between 0 and {time_count - 1}. "
f"Got {alpha_reference_t!r}."
)
channel_volumes = np.moveaxis(
np.take(stack[int(alpha_reference_t), :, :, :, :], channels, axis=1),
1,
0,
)
matrix, details = estimate_picasso_unmixing_matrix_from_volume(
channel_volumes,
background_percentile=background_percentile,
preprocess_alpha_inputs=preprocess_alpha_inputs,
mi_bins=mi_bins,
alpha_max=alpha_max,
max_iter=max_iter,
tolerance=tolerance,
max_alpha_voxels=max_alpha_voxels,
random_state=random_state,
)
return matrix, {"t": int(alpha_reference_t), **details}
def _estimate_per_t_picasso_matrices(
stack: np.ndarray,
*,
channels: list[int],
background_percentile: float,
preprocess_alpha_inputs: bool,
mi_bins: int,
alpha_max: float,
max_iter: int,
tolerance: float,
max_alpha_voxels: int | None,
random_state: int,
) -> tuple[np.ndarray, list[dict]]:
"""Estimate one blind-unmixing matrix per time point."""
matrices = np.empty((stack.shape[0], len(channels), len(channels)), dtype=np.float64)
details_by_t: list[dict] = []
for t in range(stack.shape[0]):
channel_volumes = np.moveaxis(
np.take(stack[t, :, :, :, :], channels, axis=1),
1,
0,
)
matrix, details = estimate_picasso_unmixing_matrix_from_volume(
channel_volumes,
background_percentile=background_percentile,
preprocess_alpha_inputs=preprocess_alpha_inputs,
mi_bins=mi_bins,
alpha_max=alpha_max,
max_iter=max_iter,
tolerance=tolerance,
max_alpha_voxels=max_alpha_voxels,
random_state=random_state + int(t),
)
matrices[t] = matrix
details_by_t.append({"t": int(t), **details})
return matrices, details_by_t
[docs]
def unmix(
input_path,
output_path,
alpha=None,
alpha_mode=None,
alpha_reference_t=0,
source_channel=0,
target_channel=1,
signal_percentile=99.0,
target_low_percentile=None,
background_percentile=1.0,
clip_negative=True,
output_dtype="float32",
verbose=True,
method="mean_ratio",
bidirectional=False,
alpha_reverse=None,
method_reverse=None,
preprocess_alpha_inputs=True,
alpha_max=DEFAULT_ALPHA_MAX,
signal_percentile_reverse=None,
background_percentile_reverse=None,
target_low_percentile_reverse=None,
alpha_max_reverse=None,
mi_bins=DEFAULT_MI_BINS,
mi_bins_reverse=None,
max_alpha_voxels=DEFAULT_MAX_ALPHA_VOXELS,
random_state=DEFAULT_RANDOM_STATE,
min_mask_voxels=MIN_MASK_VOXELS,
) -> Path:
"""
Remove bleed-through from one source channel into one target channel in a TZCYX stack.
Parameters
----------
input_path : str or Path
Path to the input microscopy stack readable by OMIO.
output_path : str or Path
Path to the output stack to be written via OMIO. A JSON sidecar report
with the same name plus ``.json`` is written alongside it.
alpha : float or None, optional
User-provided bleed-through coefficient. Required when the effective
alpha mode is ``"fixed"``.
alpha_mode : {"fixed", "reference_t", "per_t"} or None, optional
Strategy that determines whether alpha is taken directly from ``alpha``,
estimated once from a reference time point, or estimated separately for
each time point. If ``None``, a provided ``alpha`` implies
``"fixed"``; otherwise ``method="manual"`` raises an error and the
remaining methods default to ``"reference_t"`` with the supplied
``alpha_reference_t``.
alpha_reference_t : int, optional
Reference time point used when ``alpha_mode="reference_t"``.
source_channel : int, optional
Channel whose signal bleeds into the target channel.
target_channel : int, optional
Channel from which the source contribution should be removed.
signal_percentile : float, optional
Percentile used to define a bright-source signal mask for automatic
alpha estimation.
target_low_percentile : float or None, optional
Optional low-target constraint for the alpha-estimation mask.
background_percentile : float, optional
Low percentile used for optional percentile-based background subtraction
during alpha estimation.
clip_negative : bool, optional
If ``True``, clip negative corrected target values to zero.
output_dtype : str or numpy.dtype, optional
Data type used for the written output stack.
verbose : bool, optional
If ``True``, print processing progress and estimated coefficients.
method : {"manual", "mean_ratio", "linear_fit", "corr_min", "mi_min"}, optional
Method used to obtain alpha. ``"manual"`` is meaningful only together
with an effective ``alpha_mode="fixed"``; the other methods estimate
alpha from the data.
bidirectional : bool, optional
If ``True``, estimate or use coefficients for both
``source_channel -> target_channel`` and
``target_channel -> source_channel`` and solve the resulting 2x2 linear
mixing model by matrix inversion.
alpha_reverse : float or None, optional
Optional fixed reverse-direction bleed-through coefficient, i.e. from
``target_channel`` back into ``source_channel``. If ``None`` and
``bidirectional=True``, the forward ``alpha`` value is reused.
method_reverse : {"manual", "mean_ratio", "linear_fit", "corr_min", "mi_min"} or None, optional
Optional reverse-direction alpha-estimation method. If ``None`` and
``bidirectional=True``, the forward ``method`` value is reused.
preprocess_alpha_inputs : bool, optional
If ``True``, apply percentile-based background subtraction and clipping
before automatic alpha estimation.
alpha_max : float, optional
Upper search bound for optimization-based alpha-estimation methods.
signal_percentile_reverse : float or None, optional
Optional reverse-direction source-mask percentile. Falls back to
``signal_percentile`` when ``None``.
background_percentile_reverse : float or None, optional
Optional reverse-direction background percentile. Falls back to
``background_percentile`` when ``None``.
target_low_percentile_reverse : float or None, optional
Optional reverse-direction low-target percentile. Falls back to
``target_low_percentile`` when ``None``.
alpha_max_reverse : float or None, optional
Optional reverse-direction optimization bound. Falls back to
``alpha_max`` when ``None``.
mi_bins : int, optional
Number of histogram bins used by the mutual-information estimator.
mi_bins_reverse : int or None, optional
Optional reverse-direction histogram-bin count used by ``mi_min``.
Falls back to ``mi_bins`` when ``None``.
max_alpha_voxels : int or None, optional
Optional cap on the number of voxels used for alpha estimation after
masking.
random_state : int, optional
Random seed used for optional voxel subsampling during alpha estimation.
min_mask_voxels : int, optional
Minimum number of voxels required for a valid alpha-estimation mask.
Returns
-------
Path
Actual path of the written output stack.
Raises
------
ValueError
If the input configuration is inconsistent or would overwrite the input.
Notes
-----
Only ``target_channel`` is modified. ``source_channel`` remains unchanged in
the output stack. Automatic alpha estimation is performed on prepared data,
but the final subtraction is applied to the original stack intensities cast
to ``float32``.
If ``bidirectional=True``, both selected channels are corrected jointly by
inverting the 2x2 linear mixing model. In that mode, ``clip_negative``
applies to both corrected channels.
"""
input_path = Path(input_path)
output_path = Path(output_path)
report_path = report_path_from_output_path(output_path)
if input_path.resolve() == output_path.resolve():
raise ValueError(
"Refusing to overwrite the input file. Please choose a different output_path."
)
method = _validate_unmix_method(method)
alpha_mode_requested, alpha_mode = _resolve_unmix_alpha_mode(
alpha_mode=alpha_mode,
alpha=alpha,
method=method,
alpha_reference_t=int(alpha_reference_t),
verbose=bool(verbose),
)
alpha_max, mi_bins, min_mask_voxels = _validate_common_estimation_parameters(
alpha_max=alpha_max,
mi_bins=mi_bins,
min_mask_voxels=min_mask_voxels,
)
bidirectional = bool(bidirectional)
method_reverse_resolved = None if not bidirectional else _validate_unmix_method(
_resolve_reverse_parameter(method, method_reverse)
)
signal_percentile_reverse_resolved = _resolve_reverse_parameter(
signal_percentile,
signal_percentile_reverse,
)
background_percentile_reverse_resolved = _resolve_reverse_parameter(
background_percentile,
background_percentile_reverse,
)
target_low_percentile_reverse_resolved = _resolve_reverse_parameter(
target_low_percentile,
target_low_percentile_reverse,
)
alpha_max_reverse_resolved = _resolve_reverse_parameter(
alpha_max,
alpha_max_reverse,
)
mi_bins_reverse_resolved = _resolve_reverse_parameter(
mi_bins,
mi_bins_reverse,
)
if bidirectional:
alpha_max_reverse_resolved, mi_bins_reverse_resolved, _ = _validate_common_estimation_parameters(
alpha_max=alpha_max_reverse_resolved,
mi_bins=mi_bins_reverse_resolved,
min_mask_voxels=min_mask_voxels,
)
if method == "manual" and alpha_mode != "fixed":
raise ValueError("method='manual' is only valid with alpha_mode='fixed'.")
if bidirectional and method_reverse_resolved == "manual" and alpha_mode != "fixed":
raise ValueError("method_reverse='manual' is only valid with alpha_mode='fixed'.")
_print_verbose(verbose, f"Reading stack with OMIO: {input_path}")
stack, metadata = load_stack_with_omio(input_path)
channel_count = int(stack.shape[2])
time_count = int(stack.shape[0])
z_count = int(stack.shape[1])
source_channel = _validate_channel_index(
"source_channel",
source_channel,
channel_count,
)
target_channel = _validate_channel_index(
"target_channel",
target_channel,
channel_count,
)
if source_channel == target_channel:
raise ValueError("source_channel and target_channel must be different.")
_print_verbose(
verbose,
(
f"Loaded stack with shape {tuple(int(v) for v in stack.shape)} in "
f"{CANONICAL_AXIS_ORDER} order. T={'multiple' if time_count > 1 else 'single'} "
f"({time_count}), Z={'multiple' if z_count > 1 else 'single'} ({z_count})."
),
)
alpha_scalar: float | None = None
alpha_values: np.ndarray | None = None
alpha_details: dict | None = None
alpha_details_by_t: list[dict] | None = None
alpha_source = "estimated"
method_effective = method
(
alpha_scalar,
alpha_values,
alpha_details,
alpha_details_by_t,
alpha_source,
method_effective,
) = _estimate_directional_alpha(
stack,
alpha_mode=alpha_mode,
alpha=alpha,
alpha_reference_t=int(alpha_reference_t),
source_channel=source_channel,
target_channel=target_channel,
signal_percentile=signal_percentile,
target_low_percentile=target_low_percentile,
background_percentile=background_percentile,
method=method,
preprocess_alpha_inputs=bool(preprocess_alpha_inputs),
alpha_max=alpha_max,
mi_bins=mi_bins,
max_alpha_voxels=max_alpha_voxels,
random_state=int(random_state),
min_mask_voxels=min_mask_voxels,
verbose=bool(verbose),
direction_label="forward",
)
alpha_reverse_scalar: float | None = None
alpha_reverse_values: np.ndarray | None = None
alpha_reverse_details: dict | None = None
alpha_reverse_details_by_t: list[dict] | None = None
alpha_reverse_source: str | None = None
method_reverse_effective: str | None = None
if bidirectional:
reverse_alpha_input = _resolve_reverse_parameter(alpha, alpha_reverse)
(
alpha_reverse_scalar,
alpha_reverse_values,
alpha_reverse_details,
alpha_reverse_details_by_t,
alpha_reverse_source,
method_reverse_effective,
) = _estimate_directional_alpha(
stack,
alpha_mode=alpha_mode,
alpha=reverse_alpha_input,
alpha_reference_t=int(alpha_reference_t),
source_channel=target_channel,
target_channel=source_channel,
signal_percentile=signal_percentile_reverse_resolved,
target_low_percentile=target_low_percentile_reverse_resolved,
background_percentile=background_percentile_reverse_resolved,
method=method_reverse_resolved,
preprocess_alpha_inputs=bool(preprocess_alpha_inputs),
alpha_max=alpha_max_reverse_resolved,
mi_bins=mi_bins_reverse_resolved,
max_alpha_voxels=max_alpha_voxels,
random_state=int(random_state) + 10_000,
min_mask_voxels=min_mask_voxels,
verbose=bool(verbose),
direction_label="reverse",
)
working_stack = stack.astype(np.float32, copy=True)
source_view = working_stack[:, :, source_channel, :, :]
target_view = working_stack[:, :, target_channel, :, :]
bidirectional_determinants: list[float] | None = None
if not bidirectional and alpha_mode in {"fixed", "reference_t"}:
corrected_target = target_view - float(alpha_scalar) * source_view
if clip_negative:
corrected_target = np.maximum(corrected_target, 0.0)
working_stack[:, :, target_channel, :, :] = corrected_target
elif not bidirectional:
for t in range(working_stack.shape[0]):
corrected_target = target_view[t] - float(alpha_values[t]) * source_view[t]
if clip_negative:
corrected_target = np.maximum(corrected_target, 0.0)
working_stack[t, :, target_channel, :, :] = corrected_target
elif alpha_mode in {"fixed", "reference_t"}:
corrected_source, corrected_target, determinant = _apply_bidirectional_unmixing(
source_view,
target_view,
alpha_forward=float(alpha_scalar),
alpha_reverse=float(alpha_reverse_scalar),
clip_negative=bool(clip_negative),
context="global bidirectional unmixing",
)
working_stack[:, :, source_channel, :, :] = corrected_source
working_stack[:, :, target_channel, :, :] = corrected_target
bidirectional_determinants = [float(determinant)]
else:
bidirectional_determinants = []
for t in range(working_stack.shape[0]):
corrected_source, corrected_target, determinant = _apply_bidirectional_unmixing(
source_view[t],
target_view[t],
alpha_forward=float(alpha_values[t]),
alpha_reverse=float(alpha_reverse_values[t]),
clip_negative=bool(clip_negative),
context=f"bidirectional unmixing at t={t}",
)
working_stack[t, :, source_channel, :, :] = corrected_source
working_stack[t, :, target_channel, :, :] = corrected_target
bidirectional_determinants.append(float(determinant))
_print_verbose(verbose, f"Writing corrected stack to: {output_path}")
output_stack = _cast_output_stack(working_stack, output_dtype)
actual_output_path = write_stack_with_omio(output_path, output_stack, metadata)
report = {
"input_path": str(input_path),
"output_path": str(actual_output_path),
"report_path": str(report_path),
"alpha_mode": alpha_mode,
"alpha_mode_requested": alpha_mode_requested,
"alpha_mode_was_defaulted": bool(alpha_mode_requested is None),
"method": method,
"method_effective": method_effective,
"alpha_source": alpha_source,
"alpha": None if alpha_scalar is None else float(alpha_scalar),
"alpha_values": None
if alpha_values is None
else [float(value) for value in np.asarray(alpha_values)],
"alpha_by_t": None
if alpha_values is None
else [float(value) for value in np.asarray(alpha_values)],
"bidirectional": bool(bidirectional),
"alpha_reverse": None
if alpha_reverse_scalar is None
else float(alpha_reverse_scalar),
"alpha_reverse_values": None
if alpha_reverse_values is None
else [float(value) for value in np.asarray(alpha_reverse_values)],
"alpha_reverse_by_t": None
if alpha_reverse_values is None
else [float(value) for value in np.asarray(alpha_reverse_values)],
"alpha_reverse_inherited_from_forward": bool(bidirectional and alpha_reverse is None),
"method_reverse": None if not bidirectional else method_reverse_resolved,
"method_reverse_effective": method_reverse_effective,
"alpha_reverse_source": alpha_reverse_source,
"source_channel": int(source_channel),
"target_channel": int(target_channel),
"signal_percentile": float(signal_percentile),
"target_low_percentile": None
if target_low_percentile is None
else float(target_low_percentile),
"background_percentile": float(background_percentile),
"signal_percentile_reverse": None
if not bidirectional
else float(signal_percentile_reverse_resolved),
"target_low_percentile_reverse": None
if not bidirectional or target_low_percentile_reverse_resolved is None
else float(target_low_percentile_reverse_resolved),
"background_percentile_reverse": None
if not bidirectional
else float(background_percentile_reverse_resolved),
"preprocess_alpha_inputs": bool(preprocess_alpha_inputs),
"alpha_max": float(alpha_max),
"alpha_max_reverse": None if not bidirectional else float(alpha_max_reverse_resolved),
"mi_bins": int(mi_bins),
"mi_bins_reverse": None if not bidirectional else int(mi_bins_reverse_resolved),
"max_alpha_voxels": None if max_alpha_voxels is None else int(max_alpha_voxels),
"random_state": int(random_state),
"min_mask_voxels": int(min_mask_voxels),
"mask_voxel_count": None
if alpha_details is None
else int(alpha_details["mask_voxel_count"]),
"mask_voxel_count_reverse": None
if alpha_reverse_details is None
else int(alpha_reverse_details["mask_voxel_count"]),
"mask_voxel_count_by_t": None
if alpha_details_by_t is None
else [int(item["mask_voxel_count"]) for item in alpha_details_by_t],
"mask_voxel_count_reverse_by_t": None
if alpha_reverse_details_by_t is None
else [int(item["mask_voxel_count"]) for item in alpha_reverse_details_by_t],
"alpha_estimation": alpha_details,
"alpha_estimation_by_t": alpha_details_by_t,
"alpha_estimation_reverse": alpha_reverse_details,
"alpha_estimation_reverse_by_t": alpha_reverse_details_by_t,
"bidirectional_mixing_matrix_determinant": None
if bidirectional_determinants is None
else (
float(bidirectional_determinants[0])
if len(bidirectional_determinants) == 1
else None
),
"bidirectional_mixing_matrix_determinant_by_t": None
if bidirectional_determinants is None or len(bidirectional_determinants) == 1
else [float(value) for value in bidirectional_determinants],
"input_shape": tuple(int(v) for v in stack.shape),
"axis_order": CANONICAL_AXIS_ORDER,
"size_t": time_count,
"size_z": z_count,
"has_multiple_t": bool(time_count > 1),
"has_multiple_z": bool(z_count > 1),
"clip_negative": bool(clip_negative),
"output_dtype": str(np.dtype(output_dtype)),
}
actual_report_path = _write_report(report, report_path)
_print_verbose(verbose, f"Wrote processing report to: {actual_report_path}")
return actual_output_path
[docs]
def unmix_picasso(
input_path,
output_path=None,
channels=None,
*,
method="picasso",
implementation=DEFAULT_PICASSO_IMPLEMENTATION,
alpha_mode="reference_t",
alpha_reference_t=0,
source_sink_matrix=None,
sink_channels=None,
neutral_channels=None,
background_percentile=1.0,
preprocess_alpha_inputs=True,
mi_bins=DEFAULT_MI_BINS,
alpha_max=DEFAULT_ALPHA_MAX,
max_iter=200,
tolerance=1e-4,
max_alpha_voxels=DEFAULT_MAX_ALPHA_VOXELS,
random_state=DEFAULT_RANDOM_STATE,
step_size=DEFAULT_PICASSO_STEP_SIZE,
qn=DEFAULT_PICASSO_QN,
pixel_bin_size=DEFAULT_PICASSO_BIN_FACTOR,
alpha_clip=DEFAULT_PICASSO_ALPHA_CLIP,
negativity_threshold=DEFAULT_PICASSO_NEGATIVITY_THRESHOLD,
clip_every_n_iterations=DEFAULT_PICASSO_CLIP_EVERY_N_ITERATIONS,
source_sink_optimize_background=DEFAULT_SOURCE_SINK_OPTIMIZE_BACKGROUND,
source_sink_max_background=DEFAULT_SOURCE_SINK_MAX_BACKGROUND,
source_sink_n_restarts=DEFAULT_SOURCE_SINK_N_RESTARTS,
source_sink_joint_optimization=DEFAULT_SOURCE_SINK_JOINT_OPTIMIZATION,
clip_negative=True,
output_dtype="float32",
verbose=True,
) -> Path:
"""
Perform PICASSO-family multi-channel blind unmixing.
This workflow is motivated by the PICASSO publication:
Seo, J., Sim, Y., Kim, J. et al. PICASSO allows ultra-multiplexed
fluorescence imaging of spatially overlapping proteins without reference
spectra measurements. Nature Communications 13, 2475 (2022).
https://doi.org/10.1038/s41467-022-30168-z
Parameters
----------
input_path : str or Path
Path to the input microscopy stack readable by OMIO.
output_path : str or Path or None, optional
Output path for the unmixed stack written via OMIO. If ``None``, a
filename ending in ``"_picasso.tif"`` is created next to the input.
channels : sequence of int or None, optional
Channel indices to include in blind unmixing. If ``None``, all channels
are used.
method : {"picasso"}, optional
Method label reserved for the PICASSO-like workflow.
implementation : {"matlab_3c", "matlab_n", "source_sink_n"}, optional
Concrete implementation variant:
``"matlab_3c"`` for a close Python port of the original MATLAB 3-channel
workflow, ``"matlab_n"`` for an N-channel generalization of that
workflow, and ``"source_sink_n"`` for a source-sink multi-channel model
inspired by the napari plugin.
alpha_mode : {"reference_t", "per_t"}, optional
Whether to estimate one unmixing matrix from a reference time point or
one matrix per time point.
alpha_reference_t : int, optional
Reference time point used when ``alpha_mode="reference_t"``.
source_sink_matrix : array-like or None, optional
Square matrix used only by ``implementation="source_sink_n"``. The
selected channels define both rows and columns. The diagonal must be
``1``. Off-diagonal values must be ``-1`` for modeled source-to-sink
spillover and ``0`` otherwise. If ``None``, an all-to-all matrix with
diagonal ``1`` and off-diagonal ``-1`` is used.
sink_channels : sequence of int or None, optional
Convenience alternative to ``source_sink_matrix`` for
``implementation="source_sink_n"``. Provide actual channel indices from
``channels`` that should be corrected as sinks. All selected channels
except those listed in ``neutral_channels`` may contribute to these
sinks.
neutral_channels : sequence of int or None, optional
Convenience option for ``implementation="source_sink_n"``. Provide
actual channel indices from ``channels`` that should remain neutral,
meaning they are neither corrected as sinks nor used as sources when a
source-sink matrix is auto-generated.
background_percentile : float, optional
Low percentile used for per-channel background subtraction in the
selected PICASSO implementation.
preprocess_alpha_inputs : bool, optional
Backward-compatibility parameter retained from earlier PICASSO-like
implementations. The current PICASSO-family implementations define
their own internal preprocessing and therefore do not switch behavior
based on this flag. The value is nevertheless recorded in the JSON
sidecar report for reproducibility.
mi_bins : int, optional
Number of histogram bins used by the mutual-information estimator in the
source-sink implementation.
alpha_max : float, optional
Upper bound for source-to-sink coefficients in the source-sink
implementation.
max_iter : int, optional
Maximum number of iterations for the selected implementation.
tolerance : float, optional
Reserved for compatibility with the previous API. The MATLAB-like
implementations follow a fixed iteration count rather than tolerance-
based early stopping.
max_alpha_voxels : int or None, optional
Optional cap on the number of voxels used for coefficient estimation in
the source-sink implementation.
random_state : int, optional
Random seed used for optional subsampling during matrix estimation.
step_size : float, optional
Incremental update step size used by the MATLAB-style implementations.
qn : int, optional
MATLAB-style mutual-information quantization parameter.
pixel_bin_size : int, optional
MATLAB-style 2D binning factor applied before mutual-information
evaluation.
alpha_clip : float, optional
Absolute clipping bound applied to each MATLAB-style pairwise alpha
estimate before the incremental update matrix is constructed.
negativity_threshold : float, optional
MATLAB-style negativity ratio threshold above which a pairwise alpha is
divided by ten.
clip_every_n_iterations : int, optional
Frequency of positivity enforcement during the MATLAB-style iterations.
source_sink_optimize_background : bool, optional
If ``True``, ``implementation="source_sink_n"`` jointly optimizes one
background parameter ``beta`` per modeled source-to-sink relation,
following the source-sink formulation used by the napari PICASSO
plugin.
source_sink_max_background : float, optional
Upper bound for the optimized source-background parameters in the
source-sink implementation. The values operate on globally normalized
intensities in the interval ``[0, 1]``.
source_sink_n_restarts : int, optional
Number of multi-start optimizer initializations used by the
source-sink implementation for each sink.
source_sink_joint_optimization : bool, optional
If ``True`` (default), all modeled sources contributing to a given sink
are optimized jointly. If ``False``, the package falls back to the
older greedy pairwise update strategy.
clip_negative : bool, optional
If ``True``, clip negative unmixed intensities to zero before writing.
output_dtype : str or numpy.dtype, optional
Data type used for the written output stack.
verbose : bool, optional
If ``True``, print processing progress and output paths.
Returns
-------
Path
Actual path of the written output stack.
Notes
-----
``implementation="matlab_3c"`` is the default and aims to match the
published MATLAB 3-channel workflow as closely as possible.
``implementation="matlab_n"`` is an explicit N-channel generalization of
that MATLAB workflow.
``implementation="source_sink_n"`` uses a direct source-sink subtraction
model inspired by the napari plugin structure. It is not a neural-MINE port
of the plugin, but it follows the same idea of user-defined source/sink
relations.
"""
input_path = Path(input_path)
output_path = _derive_picasso_output_path(input_path) if output_path is None else Path(output_path)
report_path = report_path_from_output_path(output_path)
if input_path.resolve() == output_path.resolve():
raise ValueError(
"Refusing to overwrite the input file. Please choose a different output_path."
)
method = _validate_picasso_method(method)
alpha_mode = _validate_picasso_alpha_mode(alpha_mode)
implementation = validate_picasso_implementation(implementation)
alpha_max, mi_bins, _ = _validate_common_estimation_parameters(
alpha_max=alpha_max,
mi_bins=mi_bins,
min_mask_voxels=1,
)
_print_verbose(verbose, f"Reading stack with OMIO: {input_path}")
stack, metadata = load_stack_with_omio(input_path)
time_count = int(stack.shape[0])
channels = _validate_channels(channels, int(stack.shape[2]))
selected_stack = np.take(stack, channels, axis=2).astype(np.float32, copy=False)
if implementation == "matlab_3c" and len(channels) != 3:
raise ValueError(
"implementation='matlab_3c' requires exactly 3 selected channels. "
f"Got {len(channels)} channels: {channels!r}."
)
if source_sink_matrix is not None and implementation != "source_sink_n":
raise ValueError(
"source_sink_matrix is only used when implementation='source_sink_n'."
)
if (
sink_channels is not None or neutral_channels is not None
) and implementation != "source_sink_n":
raise ValueError(
"sink_channels and neutral_channels are only used when "
"implementation='source_sink_n'."
)
if implementation == "source_sink_n":
if source_sink_matrix is not None and (
sink_channels is not None or neutral_channels is not None
):
raise ValueError(
"Use either source_sink_matrix or sink_channels/neutral_channels, "
"not both at the same time."
)
if source_sink_matrix is None and (
sink_channels is not None or neutral_channels is not None
):
source_sink_matrix = build_source_sink_matrix(
channels,
sink_channels=sink_channels,
neutral_channels=neutral_channels,
)
elif source_sink_matrix is None:
source_sink_matrix = default_source_sink_matrix(len(channels))
source_sink_matrix = validate_source_sink_matrix(
source_sink_matrix,
n_channels=len(channels),
)
if not bool(preprocess_alpha_inputs):
_print_verbose(
verbose,
(
"Note: preprocess_alpha_inputs is retained for backward "
"compatibility but is not used to switch preprocessing inside "
f"implementation='{implementation}'."
),
)
_print_verbose(
verbose,
(
f"Starting PICASSO unmixing with implementation='{implementation}', "
f"method='{method}', alpha_mode='{alpha_mode}', channels={channels}."
),
)
transformed_selected = np.empty_like(selected_stack, dtype=np.float32)
matrix = None
matrices_by_t = None
matrix_details = None
matrix_details_by_t = None
if alpha_mode == "reference_t":
if not 0 <= int(alpha_reference_t) < time_count:
raise ValueError(
f"alpha_reference_t must be between 0 and {time_count - 1}. "
f"Got {alpha_reference_t!r}."
)
reference_channel_volumes = np.moveaxis(selected_stack[int(alpha_reference_t)], 1, 0)
if implementation in {"matlab_3c", "matlab_n"}:
reference_unmixed, matrix_details = run_picasso_matlab_like(
reference_channel_volumes,
background_percentile=float(background_percentile),
max_iter=int(max_iter),
step_size=float(step_size),
qn=int(qn),
pixel_bin_size=int(pixel_bin_size),
alpha_clip=float(alpha_clip),
negativity_threshold=float(negativity_threshold),
clip_every_n_iterations=int(clip_every_n_iterations),
require_three_channels=(implementation == "matlab_3c"),
)
matrix = np.asarray(matrix_details["unmixing_matrix"], dtype=np.float64)
transformed_selected[int(alpha_reference_t)] = np.moveaxis(reference_unmixed, 0, 1)
application_details_by_t: list[dict] = []
for t in range(time_count):
if t == int(alpha_reference_t):
application_details_by_t.append({"t": int(t), "mode": "estimated"})
continue
transformed_t, apply_details = apply_matlab_incremental_sequence(
np.moveaxis(selected_stack[t], 1, 0),
background_percentile=float(background_percentile),
incremental_matrices=matrix_details["incremental_matrices"],
clip_every_n_iterations=int(clip_every_n_iterations),
)
transformed_selected[t] = np.moveaxis(transformed_t, 0, 1)
application_details_by_t.append({"t": int(t), **apply_details})
matrix_details["application_details_by_t"] = application_details_by_t
else:
reference_unmixed, matrix_details = run_source_sink_unmixing(
reference_channel_volumes,
source_sink_matrix=source_sink_matrix,
background_percentile=float(background_percentile),
mi_bins=int(mi_bins),
alpha_max=float(alpha_max),
max_alpha_voxels=max_alpha_voxels,
random_state=int(random_state),
max_iter=int(max_iter),
tolerance=float(tolerance),
optimize_background=bool(source_sink_optimize_background),
max_background=float(source_sink_max_background),
n_restarts=int(source_sink_n_restarts),
joint_optimization=bool(source_sink_joint_optimization),
)
transformed_selected[int(alpha_reference_t)] = np.moveaxis(reference_unmixed, 0, 1)
matrix = np.asarray(matrix_details["alpha_parameters"], dtype=np.float64)
application_details_by_t = []
for t in range(time_count):
if t == int(alpha_reference_t):
application_details_by_t.append({"t": int(t), "mode": "estimated"})
continue
transformed_t = apply_source_sink_parameters(
np.moveaxis(selected_stack[t], 1, 0),
source_sink_matrix=source_sink_matrix,
alpha_parameters=matrix_details["alpha_parameters"],
background_values=matrix_details["background_values"],
background_parameters=matrix_details["background_parameters"],
)
transformed_selected[t] = np.moveaxis(transformed_t, 0, 1)
application_details_by_t.append({"t": int(t), "mode": "applied"})
matrix_details["application_details_by_t"] = application_details_by_t
else:
matrix_details_by_t = []
if implementation in {"matlab_3c", "matlab_n"}:
matrices_by_t = np.empty((time_count, len(channels), len(channels)), dtype=np.float64)
else:
matrices_by_t = np.empty((time_count, len(channels), len(channels)), dtype=np.float64)
for t in range(time_count):
channel_volumes_t = np.moveaxis(selected_stack[t], 1, 0)
if implementation in {"matlab_3c", "matlab_n"}:
transformed_t, details_t = run_picasso_matlab_like(
channel_volumes_t,
background_percentile=float(background_percentile),
max_iter=int(max_iter),
step_size=float(step_size),
qn=int(qn),
pixel_bin_size=int(pixel_bin_size),
alpha_clip=float(alpha_clip),
negativity_threshold=float(negativity_threshold),
clip_every_n_iterations=int(clip_every_n_iterations),
require_three_channels=(implementation == "matlab_3c"),
)
matrices_by_t[t] = np.asarray(details_t["unmixing_matrix"], dtype=np.float64)
else:
transformed_t, details_t = run_source_sink_unmixing(
channel_volumes_t,
source_sink_matrix=source_sink_matrix,
background_percentile=float(background_percentile),
mi_bins=int(mi_bins),
alpha_max=float(alpha_max),
max_alpha_voxels=max_alpha_voxels,
random_state=int(random_state) + int(t),
max_iter=int(max_iter),
tolerance=float(tolerance),
optimize_background=bool(source_sink_optimize_background),
max_background=float(source_sink_max_background),
n_restarts=int(source_sink_n_restarts),
joint_optimization=bool(source_sink_joint_optimization),
)
matrices_by_t[t] = np.asarray(details_t["alpha_parameters"], dtype=np.float64)
transformed_selected[t] = np.moveaxis(transformed_t, 0, 1)
matrix_details_by_t.append({"t": int(t), **details_t})
if clip_negative:
transformed_selected = np.maximum(transformed_selected, 0.0)
working_stack = stack.astype(np.float32, copy=True)
working_stack[:, :, channels, :, :] = transformed_selected
_print_verbose(verbose, f"Writing blind-unmixed stack to: {output_path}")
output_stack = _cast_output_stack(working_stack, output_dtype)
actual_output_path = write_stack_with_omio(output_path, output_stack, metadata)
report = {
"input_path": str(input_path),
"output_path": str(actual_output_path),
"report_path": str(report_path),
"method": method,
"implementation": implementation,
"alpha_mode": alpha_mode,
"alpha_reference_t": int(alpha_reference_t),
"channels": [int(channel) for channel in channels],
"source_sink_matrix": None
if source_sink_matrix is None
else np.asarray(source_sink_matrix, dtype=int).tolist(),
"sink_channels": None
if sink_channels is None
else [int(channel) for channel in sink_channels],
"neutral_channels": None
if neutral_channels is None
else [int(channel) for channel in neutral_channels],
"background_percentile": float(background_percentile),
"preprocess_alpha_inputs": bool(preprocess_alpha_inputs),
"alpha_max": float(alpha_max),
"mi_bins": int(mi_bins),
"max_iter": int(max_iter),
"tolerance": float(tolerance),
"max_alpha_voxels": None if max_alpha_voxels is None else int(max_alpha_voxels),
"random_state": int(random_state),
"step_size": float(step_size),
"qN": int(qn),
"pixel_bin_size": int(pixel_bin_size),
"alpha_clip": float(alpha_clip),
"negativity_threshold": float(negativity_threshold),
"clip_every_n_iterations": int(clip_every_n_iterations),
"source_sink_optimize_background": bool(source_sink_optimize_background),
"source_sink_max_background": float(source_sink_max_background),
"source_sink_n_restarts": int(source_sink_n_restarts),
"source_sink_joint_optimization": bool(source_sink_joint_optimization),
"clip_negative": bool(clip_negative),
"unmixing_matrix": None if matrix is None else matrix.tolist(),
"unmixing_matrix_by_t": None
if matrices_by_t is None
else matrices_by_t.tolist(),
"iterations_run": None
if matrix_details is None
else (
None
if matrix_details.get("iterations_run") is None
else int(matrix_details["iterations_run"])
),
"iterations_run_by_t": None
if matrix_details_by_t is None
else [
None if item.get("iterations_run") is None else int(item["iterations_run"])
for item in matrix_details_by_t
],
"converged": None,
"converged_by_t": None,
"picasso_estimation": matrix_details,
"picasso_estimation_by_t": matrix_details_by_t,
"input_shape": tuple(int(v) for v in stack.shape),
"axis_order": CANONICAL_AXIS_ORDER,
"size_t": int(stack.shape[0]),
"size_z": int(stack.shape[1]),
"output_dtype": str(np.dtype(output_dtype)),
}
actual_report_path = _write_report(report, report_path)
_print_verbose(verbose, f"Wrote processing report to: {actual_report_path}")
return actual_output_path
[docs]
def unmix_ch0_from_ch1(*args, **kwargs) -> Path:
"""
Backward-compatible wrapper for older code paths.
Deprecated in favor of :func:`unmix`.
"""
warnings.warn(
"unmix_ch0_from_ch1 is deprecated; use unmix with source_channel and "
"target_channel instead.",
DeprecationWarning,
stacklevel=2,
)
return unmix(*args, **kwargs)
# %% END