"""
Alpha estimation helpers for spectral bleed-through correction.
Author: Fabrizio Musacchio
Date: June 2026
"""
# %% IMPORTS
from __future__ import annotations
from collections.abc import Sequence
import numpy as np
from scipy.optimize import minimize_scalar
# %% CONSTANTS
MIN_MASK_VOXELS = 16
DEFAULT_ALPHA_MAX = 1.0
DEFAULT_MI_BINS = 64
DEFAULT_MAX_ALPHA_VOXELS = 500_000
DEFAULT_RANDOM_STATE = 0
EPSILON = 1e-12
SUPPORTED_ALPHA_ESTIMATION_METHODS = {
"mean_ratio",
"linear_fit",
"corr_min",
"mi_min",
}
# %% ESTIMATION HELPERS
def _validate_percentile(name: str, value: float | None) -> float | None:
"""Validate an optional percentile argument and return it as ``float``."""
if value is None:
return None
value = float(value)
if not 0.0 <= value <= 100.0:
raise ValueError(f"{name} must be between 0 and 100. Got {value!r}.")
return value
def _validate_alpha_estimation_method(method: str) -> str:
"""Normalize and validate a two-channel alpha-estimation method name."""
method = str(method).strip().lower()
if method not in SUPPORTED_ALPHA_ESTIMATION_METHODS:
raise ValueError(
f"method must be one of {sorted(SUPPORTED_ALPHA_ESTIMATION_METHODS)}. "
f"Got {method!r}."
)
return method
def _validate_positive_int(name: str, value: int, *, minimum: int = 1) -> int:
"""Validate that an integer parameter satisfies a lower bound."""
value = int(value)
if value < minimum:
raise ValueError(f"{name} must be >= {minimum}. Got {value!r}.")
return value
def _validate_positive_float(name: str, value: float, *, strictly_positive: bool = True) -> float:
"""Validate that a floating-point parameter satisfies a lower bound."""
value = float(value)
if strictly_positive and value <= 0.0:
raise ValueError(f"{name} must be > 0. Got {value!r}.")
if not strictly_positive and value < 0.0:
raise ValueError(f"{name} must be >= 0. Got {value!r}.")
return value
def _safe_correlation(x: np.ndarray, y: np.ndarray) -> float:
"""Return a finite Pearson correlation when both vectors contain variance."""
if x.size == 0 or y.size == 0:
return float("nan")
if np.std(x) <= EPSILON or np.std(y) <= EPSILON:
return float("nan")
return float(np.corrcoef(x, y)[0, 1])
def _prepare_single_volume_for_alpha(
volume,
*,
background_percentile: float = 1.0,
preprocess_alpha_inputs: bool = True,
) -> tuple[np.ndarray, float]:
"""Prepare one image volume for alpha estimation and return its background."""
volume = np.asarray(volume, dtype=np.float32)
background_percentile = _validate_percentile(
"background_percentile",
background_percentile,
)
if not preprocess_alpha_inputs:
return volume.copy(), 0.0
background = float(np.percentile(volume, background_percentile))
prepared = np.clip(volume - background, a_min=0.0, a_max=None)
return prepared.astype(np.float32, copy=False), background
[docs]
def prepare_source_target_for_alpha(
source_volume,
target_volume,
*,
background_percentile: float = 1.0,
preprocess_alpha_inputs: bool = True,
) -> tuple[np.ndarray, np.ndarray, float, float]:
"""
Convert, optionally background-correct, and clip source and target volumes.
Parameters
----------
source_volume, target_volume : array-like
Matching source and target image volumes. Any matching shape is
accepted, for example ``ZYX`` or flattened arrays.
background_percentile : float, optional
Low percentile used to estimate a rough background in each input.
preprocess_alpha_inputs : bool, optional
If ``True``, subtract the percentile-based background from each input
and clip negative values to zero. If ``False``, the inputs are only
converted to ``float32``.
Returns
-------
tuple
``(source_prepared, target_prepared, source_background, target_background)``.
Raises
------
ValueError
If the input volumes do not share the same shape or are empty.
"""
source_f = np.asarray(source_volume, dtype=np.float32)
target_f = np.asarray(target_volume, dtype=np.float32)
if source_f.shape != target_f.shape:
raise ValueError(
"source and target must have the same shape. "
f"Got {source_f.shape!r} and {target_f.shape!r}."
)
if source_f.size == 0:
raise ValueError("source and target must not be empty.")
source_prepared, source_background = _prepare_single_volume_for_alpha(
source_f,
background_percentile=background_percentile,
preprocess_alpha_inputs=preprocess_alpha_inputs,
)
target_prepared, target_background = _prepare_single_volume_for_alpha(
target_f,
background_percentile=background_percentile,
preprocess_alpha_inputs=preprocess_alpha_inputs,
)
return source_prepared, target_prepared, source_background, target_background
[docs]
def make_alpha_mask(
source,
target=None,
*,
signal_percentile: float = 99.0,
target_low_percentile: float | None = None,
min_voxels: int = MIN_MASK_VOXELS,
) -> tuple[np.ndarray, dict]:
"""
Create a robust mask for alpha estimation.
Parameters
----------
source : array-like
Prepared source intensities used to define the bright-source mask.
target : array-like or None, optional
Optional prepared target intensities used to additionally constrain the
mask to low-target voxels.
signal_percentile : float, optional
Source percentile above which voxels are considered signal-rich.
target_low_percentile : float or None, optional
If provided, only voxels at or below this target percentile are kept,
unless that mask would become too small and the source-only fallback is
used instead.
min_voxels : int, optional
Minimum number of voxels required for a valid estimation mask.
Returns
-------
tuple
``(mask, details)`` where ``mask`` is a boolean array and ``details`` is
a metadata dictionary describing thresholds, fallback behavior, and voxel
counts.
"""
source = np.asarray(source, dtype=np.float32)
target = None if target is None else np.asarray(target, dtype=np.float32)
signal_percentile = _validate_percentile("signal_percentile", signal_percentile)
target_low_percentile = _validate_percentile(
"target_low_percentile",
target_low_percentile,
)
min_voxels = _validate_positive_int("min_voxels", min_voxels, minimum=1)
if target is not None and source.shape != target.shape:
raise ValueError(
"source and target must have the same shape for mask creation. "
f"Got {source.shape!r} and {target.shape!r}."
)
source_threshold = float(np.percentile(source, signal_percentile))
mask_source_only = source >= source_threshold
source_only_voxels = int(np.count_nonzero(mask_source_only))
mask = mask_source_only
target_threshold = None
fallback_used = False
strategy = "source_only"
if target is not None and target_low_percentile is not None:
target_threshold = float(np.percentile(target, target_low_percentile))
mask_candidate = mask_source_only & (target <= target_threshold)
candidate_voxels = int(np.count_nonzero(mask_candidate))
if candidate_voxels >= min_voxels:
mask = mask_candidate
strategy = "source_and_low_target"
elif source_only_voxels >= min_voxels:
fallback_used = True
else:
raise ValueError(
"Alpha mask is too small even after fallback to the source-only mask. "
f"Candidate voxels={candidate_voxels}, source-only voxels={source_only_voxels}, "
f"required min_voxels={min_voxels}."
)
elif source_only_voxels < min_voxels:
raise ValueError(
"Source signal mask does not contain enough voxels for alpha estimation. "
f"Found {source_only_voxels}, need at least {min_voxels}."
)
mask_voxels = int(np.count_nonzero(mask))
if mask_voxels < min_voxels:
raise ValueError(
"Final alpha mask does not contain enough voxels for estimation. "
f"Found {mask_voxels}, need at least {min_voxels}."
)
details = {
"signal_percentile": float(signal_percentile),
"target_low_percentile": None
if target_low_percentile is None
else float(target_low_percentile),
"source_threshold": source_threshold,
"target_threshold": target_threshold,
"mask_strategy": strategy,
"mask_fallback_used": bool(fallback_used),
"mask_voxel_count": mask_voxels,
"source_only_mask_voxel_count": source_only_voxels,
"min_mask_voxels": int(min_voxels),
}
return mask, details
def _subsample_masked_vectors(
x: np.ndarray,
y: np.ndarray,
*,
max_alpha_voxels: int | None = DEFAULT_MAX_ALPHA_VOXELS,
random_state: int = DEFAULT_RANDOM_STATE,
) -> tuple[np.ndarray, np.ndarray, dict]:
"""Optionally subsample masked source/target vectors for faster estimation."""
x = np.asarray(x, dtype=np.float64).ravel()
y = np.asarray(y, dtype=np.float64).ravel()
if x.shape != y.shape:
raise ValueError(
f"x and y must have the same shape. Got {x.shape!r} and {y.shape!r}."
)
original_count = int(x.size)
if max_alpha_voxels is None or original_count <= int(max_alpha_voxels):
return x, y, {
"subsampled": False,
"voxel_count_before_subsampling": original_count,
"voxel_count_after_subsampling": original_count,
"max_alpha_voxels": None if max_alpha_voxels is None else int(max_alpha_voxels),
"random_state": int(random_state),
}
max_alpha_voxels = _validate_positive_int(
"max_alpha_voxels",
max_alpha_voxels,
minimum=1,
)
rng = np.random.default_rng(int(random_state))
indices = rng.choice(original_count, size=max_alpha_voxels, replace=False)
details = {
"subsampled": True,
"voxel_count_before_subsampling": original_count,
"voxel_count_after_subsampling": int(max_alpha_voxels),
"max_alpha_voxels": int(max_alpha_voxels),
"random_state": int(random_state),
}
return x[indices], y[indices], details
def _estimate_alpha_mean_ratio(x: np.ndarray, y: np.ndarray) -> tuple[float, dict]:
"""Estimate alpha from the ratio of masked mean target and source intensities."""
denominator = float(np.mean(x))
if denominator <= EPSILON:
raise ValueError("Mean source intensity inside the estimation mask must be > 0.")
alpha = max(float(np.mean(y)) / denominator, 0.0)
return alpha, {
"numerator_mean": float(np.mean(y)),
"denominator_mean": denominator,
}
def _estimate_alpha_linear_fit(x: np.ndarray, y: np.ndarray) -> tuple[float, dict]:
"""Estimate alpha by least-squares fitting of ``y β alpha * x`` without intercept."""
denominator = float(np.sum(x * x))
if denominator <= EPSILON:
raise ValueError(
"Sum of squared source intensities inside the estimation mask must be > 0."
)
numerator = float(np.sum(x * y))
alpha = max(numerator / denominator, 0.0)
return alpha, {
"numerator_sum_xy": numerator,
"denominator_sum_x2": denominator,
}
def _estimate_alpha_corr_min(
x: np.ndarray,
y: np.ndarray,
*,
alpha_max: float,
) -> tuple[float, dict]:
"""Estimate alpha by minimizing residual Pearson correlation after correction."""
alpha_max = _validate_positive_float("alpha_max", alpha_max)
def objective(alpha_value: float) -> float:
corrected = y - float(alpha_value) * x
r = _safe_correlation(x, corrected)
if not np.isfinite(r):
return float("inf")
return float(r * r)
result = minimize_scalar(objective, bounds=(0.0, alpha_max), method="bounded")
if not result.success or not np.isfinite(result.x):
raise ValueError("Correlation-minimization alpha estimation failed.")
alpha = max(float(result.x), 0.0)
corrected = y - alpha * x
r = _safe_correlation(x, corrected)
return alpha, {
"optimization_success": bool(result.success),
"optimization_nfev": int(getattr(result, "nfev", -1)),
"objective_value": float(result.fun),
"post_correlation": None if not np.isfinite(r) else float(r),
}
def _estimate_alpha_mi_min(
x: np.ndarray,
y: np.ndarray,
*,
alpha_max: float,
mi_bins: int,
) -> tuple[float, dict]:
"""Estimate alpha by minimizing residual mutual information after correction."""
alpha_max = _validate_positive_float("alpha_max", alpha_max)
mi_bins = _validate_positive_int("mi_bins", mi_bins, minimum=2)
def objective(alpha_value: float) -> float:
corrected = y - float(alpha_value) * x
mi = mutual_information_1d(x, corrected, bins=mi_bins)
if not np.isfinite(mi):
return float("inf")
return float(mi)
result = minimize_scalar(objective, bounds=(0.0, alpha_max), method="bounded")
if not result.success or not np.isfinite(result.x):
raise ValueError("Mutual-information-minimization alpha estimation failed.")
alpha = max(float(result.x), 0.0)
corrected = y - alpha * x
mi = mutual_information_1d(x, corrected, bins=mi_bins)
return alpha, {
"optimization_success": bool(result.success),
"optimization_nfev": int(getattr(result, "nfev", -1)),
"objective_value": float(result.fun),
"post_mutual_information": float(mi),
}
[docs]
def estimate_alpha_from_volume(
source,
target,
signal_percentile: float = 99.0,
background_percentile: float = 1.0,
min_mask_voxels: int = MIN_MASK_VOXELS,
*,
method: str = "mean_ratio",
target_low_percentile: float | None = None,
preprocess_alpha_inputs: bool = True,
alpha_max: float = DEFAULT_ALPHA_MAX,
mi_bins: int = DEFAULT_MI_BINS,
max_alpha_voxels: int | None = DEFAULT_MAX_ALPHA_VOXELS,
random_state: int = DEFAULT_RANDOM_STATE,
return_details: bool = False,
) -> float | tuple[float, dict]:
"""
Estimate a bleed-through coefficient alpha from matching source and target volumes.
Parameters
----------
source, target : array-like
Matching source and target image volumes. Typical microscopy input is a
``ZYX`` volume, but any matching shape is accepted.
signal_percentile : float, optional
Percentile used to define a bright-source signal mask.
background_percentile : float, optional
Low percentile used for optional percentile-based background subtraction
before alpha estimation.
min_mask_voxels : int, optional
Minimum number of voxels required in the final alpha-estimation mask.
method : {"mean_ratio", "linear_fit", "corr_min", "mi_min"}, optional
Scalar alpha-estimation strategy.
target_low_percentile : float or None, optional
Optional extra target constraint for the estimation mask. If provided,
voxels are restricted to bright source signal and low target signal
whenever that yields enough voxels.
preprocess_alpha_inputs : bool, optional
If ``True``, apply percentile-based background subtraction and clip
negative values to zero before alpha estimation. If ``False``, estimate
alpha directly on the original intensities converted to ``float32``.
alpha_max : float, optional
Upper search bound for optimization-based methods ``"corr_min"`` and
``"mi_min"``.
mi_bins : int, optional
Number of histogram bins used by the mutual-information estimator.
max_alpha_voxels : int or None, optional
Optional cap on the number of voxels used after masking. If the mask is
larger, voxels are subsampled without replacement.
random_state : int, optional
Random seed used for optional voxel subsampling.
return_details : bool, optional
If ``True``, also return a dictionary describing preprocessing,
thresholds, mask size, subsampling, and method-specific diagnostics.
Returns
-------
float or tuple
Estimated alpha, or ``(alpha, details)`` if ``return_details=True``.
Raises
------
ValueError
If the input data are incompatible, the estimation mask is too small,
or the estimated alpha is invalid.
"""
method = _validate_alpha_estimation_method(method)
signal_percentile = _validate_percentile("signal_percentile", signal_percentile)
background_percentile = _validate_percentile(
"background_percentile",
background_percentile,
)
target_low_percentile = _validate_percentile(
"target_low_percentile",
target_low_percentile,
)
min_mask_voxels = _validate_positive_int(
"min_mask_voxels",
min_mask_voxels,
minimum=1,
)
alpha_max = _validate_positive_float("alpha_max", alpha_max)
mi_bins = _validate_positive_int("mi_bins", mi_bins, minimum=2)
source_prepared, target_prepared, source_background, target_background = (
prepare_source_target_for_alpha(
source,
target,
background_percentile=background_percentile,
preprocess_alpha_inputs=preprocess_alpha_inputs,
)
)
mask, mask_details = make_alpha_mask(
source_prepared,
target_prepared,
signal_percentile=signal_percentile,
target_low_percentile=target_low_percentile,
min_voxels=min_mask_voxels,
)
x = np.asarray(source_prepared[mask], dtype=np.float64)
y = np.asarray(target_prepared[mask], dtype=np.float64)
x, y, subsampling_details = _subsample_masked_vectors(
x,
y,
max_alpha_voxels=max_alpha_voxels,
random_state=random_state,
)
if x.size < 2:
raise ValueError(
"Not enough masked voxels remain after optional subsampling for alpha estimation."
)
if method == "mean_ratio":
alpha, method_details = _estimate_alpha_mean_ratio(x, y)
elif method == "linear_fit":
alpha, method_details = _estimate_alpha_linear_fit(x, y)
elif method == "corr_min":
alpha, method_details = _estimate_alpha_corr_min(
x,
y,
alpha_max=alpha_max,
)
else:
alpha, method_details = _estimate_alpha_mi_min(
x,
y,
alpha_max=alpha_max,
mi_bins=mi_bins,
)
if not np.isfinite(alpha):
raise ValueError(f"Estimated alpha is not finite: {alpha!r}.")
details = {
"method": method,
"alpha": float(alpha),
"signal_percentile": float(signal_percentile),
"target_low_percentile": None
if target_low_percentile is None
else float(target_low_percentile),
"background_percentile": float(background_percentile),
"preprocess_alpha_inputs": bool(preprocess_alpha_inputs),
"source_background": float(source_background),
"target_background": float(target_background),
"alpha_max": float(alpha_max),
"mi_bins": int(mi_bins),
"mask_voxel_count": int(mask_details["mask_voxel_count"]),
"mask_details": mask_details,
"subsampling_details": subsampling_details,
"estimation_voxel_count": int(x.size),
"method_details": method_details,
}
if return_details:
return float(alpha), details
return float(alpha)
[docs]
def estimate_picasso_unmixing_matrix_from_volume(
channel_volumes,
*,
background_percentile: float = 1.0,
preprocess_alpha_inputs: bool = True,
mi_bins: int = DEFAULT_MI_BINS,
alpha_max: float = DEFAULT_ALPHA_MAX,
max_iter: int = 10,
tolerance: float = 1e-4,
max_alpha_voxels: int | None = DEFAULT_MAX_ALPHA_VOXELS,
random_state: int = DEFAULT_RANDOM_STATE,
) -> tuple[np.ndarray, dict]:
"""
Estimate a PICASSO-like blind unmixing matrix from multi-channel image data.
This estimation logic is motivated by the PICASSO publication:
Seo, J., Sim, Y., Kim, J. et al. PICASSO allows ultra-multiplexed
fluorescence imaging of spatially overlapping proteins without reference
spectra measurements. Nature Communications 13, 2475 (2022).
https://doi.org/10.1038/s41467-022-30168-z
Parameters
----------
channel_volumes : array-like
Multi-channel image data with channel as the first axis, for example
``(C, Z, Y, X)`` or ``(C, N)``.
background_percentile : float, optional
Low percentile used for optional per-channel background subtraction.
preprocess_alpha_inputs : bool, optional
If ``True``, apply percentile-based background subtraction and clipping
before estimating the unmixing matrix.
mi_bins : int, optional
Number of histogram bins used by the mutual-information estimator.
alpha_max : float, optional
Upper bound for each pairwise subtraction coefficient.
max_iter : int, optional
Maximum number of pairwise update sweeps.
tolerance : float, optional
Convergence criterion on the largest pairwise coefficient update in one
iteration.
max_alpha_voxels : int or None, optional
Optional cap on the number of voxels used for matrix estimation.
random_state : int, optional
Random seed used for optional subsampling.
Returns
-------
tuple
``(matrix, details)`` with the estimated unmixing matrix and a metadata
dictionary describing convergence and pairwise updates.
Notes
-----
This implements an iterative, pairwise, mutual-information-minimizing blind
unmixing routine. It is inspired by the PICASSO criterion, but it is not a
deep-learning approach and not a full spectral reference-based method.
"""
channels_f = np.asarray(channel_volumes, dtype=np.float32)
if channels_f.ndim < 2:
raise ValueError(
"channel_volumes must have channel as the first axis and at least one "
f"additional data axis. Got shape {channels_f.shape!r}."
)
n_channels = int(channels_f.shape[0])
if n_channels < 2:
raise ValueError("PICASSO-style blind unmixing requires at least two channels.")
alpha_max = _validate_positive_float("alpha_max", alpha_max)
mi_bins = _validate_positive_int("mi_bins", mi_bins, minimum=2)
max_iter = _validate_positive_int("max_iter", max_iter, minimum=1)
tolerance = _validate_positive_float("tolerance", tolerance)
prepared_channels = np.empty_like(channels_f, dtype=np.float32)
channel_backgrounds: list[float] = []
for c in range(n_channels):
prepared_channels[c], background = _prepare_single_volume_for_alpha(
channels_f[c],
background_percentile=background_percentile,
preprocess_alpha_inputs=preprocess_alpha_inputs,
)
channel_backgrounds.append(float(background))
flattened = prepared_channels.reshape(n_channels, -1).astype(np.float64, copy=False)
if max_alpha_voxels is not None and flattened.shape[1] > int(max_alpha_voxels):
max_alpha_voxels = _validate_positive_int(
"max_alpha_voxels",
max_alpha_voxels,
minimum=1,
)
rng = np.random.default_rng(int(random_state))
indices = rng.choice(flattened.shape[1], size=max_alpha_voxels, replace=False)
flattened = flattened[:, indices]
estimation_voxel_count = int(max_alpha_voxels)
subsampled = True
else:
estimation_voxel_count = int(flattened.shape[1])
subsampled = False
F = flattened.copy()
U = np.eye(n_channels, dtype=np.float64)
pairwise_updates: list[dict] = []
converged = False
iterations_run = 0
for iteration in range(max_iter):
max_change = 0.0
for i in range(n_channels):
for j in range(n_channels):
if i == j:
continue
x = F[i]
y = F[j]
if np.std(x) <= EPSILON or np.std(y) <= EPSILON:
alpha = 0.0
mi_before = mutual_information_1d(x, y, bins=mi_bins)
mi_after = mi_before
else:
alpha, method_details = _estimate_alpha_mi_min(
x,
y,
alpha_max=alpha_max,
mi_bins=mi_bins,
)
mi_before = mutual_information_1d(x, y, bins=mi_bins)
y_new = y - alpha * x
mi_after = mutual_information_1d(x, y_new, bins=mi_bins)
F[j] = y_new
U[j, :] = U[j, :] - alpha * U[i, :]
max_change = max(max_change, float(alpha))
pairwise_updates.append(
{
"iteration": int(iteration),
"source_index": int(i),
"target_index": int(j),
"alpha": float(alpha),
"mi_before": float(mi_before),
"mi_after": float(mi_after),
"optimization_nfev": int(
method_details.get("optimization_nfev", -1)
),
}
)
iterations_run = iteration + 1
if max_change < tolerance:
converged = True
break
details = {
"background_percentile": float(background_percentile),
"preprocess_alpha_inputs": bool(preprocess_alpha_inputs),
"channel_backgrounds": channel_backgrounds,
"alpha_max": float(alpha_max),
"mi_bins": int(mi_bins),
"max_iter": int(max_iter),
"tolerance": float(tolerance),
"iterations_run": int(iterations_run),
"converged": bool(converged),
"estimation_voxel_count": int(estimation_voxel_count),
"subsampled": bool(subsampled),
"max_alpha_voxels": None if max_alpha_voxels is None else int(max_alpha_voxels),
"random_state": int(random_state),
"pairwise_updates": pairwise_updates,
"unmixing_matrix": U.tolist(),
}
return U.astype(np.float64, copy=False), details
# %% PUBLIC API
__all__ = [
"DEFAULT_ALPHA_MAX",
"DEFAULT_MAX_ALPHA_VOXELS",
"DEFAULT_MI_BINS",
"DEFAULT_RANDOM_STATE",
"MIN_MASK_VOXELS",
"SUPPORTED_ALPHA_ESTIMATION_METHODS",
"estimate_alpha_from_volume",
"estimate_picasso_unmixing_matrix_from_volume",
"make_alpha_mask",
"mutual_information_1d",
"prepare_source_target_for_alpha",
]
# %% END