Skip to content

deepecgkit.evaluation

Model evaluation, metrics computation, and visualization tools.

Evaluator

ECGEvaluator

Comprehensive evaluator for ECG models.

This class provides a unified interface for evaluating ECG models with various metrics and analysis tools.

Parameters:

Name Type Description Default
metrics Optional[List[str]]

List of metrics to compute

None
task_type str

Type of task ("classification", "regression", "auto")

'auto'
device str

Device for model evaluation

'auto'

Examples:

>>> evaluator = ECGEvaluator(metrics=["accuracy", "auc", "f1"])
>>> results = evaluator.evaluate(model, test_data)
Source code in deepecgkit/evaluation/evaluator.py
class ECGEvaluator:
    """
    Comprehensive evaluator for ECG models.

    This class provides a unified interface for evaluating ECG models with
    various metrics and analysis tools.

    Args:
        metrics: List of metrics to compute
        task_type: Type of task ("classification", "regression", "auto")
        device: Device for model evaluation

    Examples:
        >>> evaluator = ECGEvaluator(metrics=["accuracy", "auc", "f1"])
        >>> results = evaluator.evaluate(model, test_data)
    """

    def __init__(
        self,
        metrics: Optional[List[str]] = None,
        task_type: str = "auto",
        device: str = "auto",
    ):
        self.metrics = metrics or ["accuracy", "precision", "recall", "f1", "auc"]
        self.task_type = task_type

        if device == "auto":
            self.device = "cuda" if torch.cuda.is_available() else "cpu"
        else:
            self.device = device

    def evaluate(
        self,
        model: Optional[Union[torch.nn.Module, np.ndarray]] = None,
        test_data: Optional[Union[torch.utils.data.DataLoader, np.ndarray, tuple]] = None,
        return_predictions: bool = False,
        y_scores: Optional[np.ndarray] = None,
    ) -> Dict[str, float]:
        """
        Evaluate model performance.

        Args:
            model: PyTorch model or numpy array of predictions
            test_data: Test data loader or (predictions, targets) tuple
            return_predictions: Whether to return predictions along with metrics
            y_scores: Scores/probabilities for AUC calculation (optional)

        Returns:
            Dictionary of metric values
        """

        self.y_scores = y_scores

        predictions, targets = self._process_input_data(model, test_data)
        metrics = self._calculate_metrics(predictions, targets)

        if return_predictions:
            return metrics, predictions
        return metrics

    def _process_input_data(
        self,
        model: Optional[Union[torch.nn.Module, np.ndarray]],
        test_data: Optional[Union[torch.utils.data.DataLoader, np.ndarray, tuple]],
    ) -> tuple:
        """Process input data to get predictions and targets."""
        if isinstance(model, np.ndarray):
            return self._process_numpy_input(model, test_data)
        return self._get_predictions(model, test_data)

    def _process_numpy_input(
        self,
        model: np.ndarray,
        test_data: Optional[Union[torch.utils.data.DataLoader, np.ndarray, tuple]],
    ) -> tuple:
        """Process input when model is a numpy array."""
        predictions = model
        if not isinstance(test_data, tuple):
            return predictions, test_data

        if len(test_data) == 3:
            targets = test_data[1]
            self.y_scores = test_data[2]
        elif len(test_data) == 2:
            targets = test_data[1]
        else:
            targets = test_data
        return predictions, targets

    def _calculate_metrics(self, predictions: np.ndarray, targets: np.ndarray) -> Dict[str, float]:
        """Calculate metrics based on predictions and targets."""
        metrics = {}
        for metric_name in self.metrics:
            if metric_name in ["accuracy", "precision", "recall", "f1"]:
                metrics[metric_name] = calculate_classification_metrics(
                    targets, predictions, metrics=[metric_name]
                )[metric_name]
            elif metric_name in ["mse", "mae", "r2"]:
                metrics[metric_name] = calculate_regression_metrics(
                    targets, predictions, metrics=[metric_name]
                )[metric_name]
            elif metric_name == "auc":
                if self.y_scores is None:
                    raise ValueError("y_scores required for AUC calculation")
                scores = self.y_scores
                n_classes = len(np.unique(targets))
                if n_classes <= 2 and scores.ndim > 1:
                    scores = scores[:, 1]
                metrics[metric_name] = roc_auc_score(targets, scores)
        return metrics

    def _get_predictions(
        self,
        model: Optional[Union[torch.nn.Module, Any]],
        test_data: Optional[Union[torch.utils.data.DataLoader, np.ndarray, tuple]],
    ) -> tuple:
        """Extract predictions and targets from model and data."""
        if model is None and isinstance(test_data, tuple):
            return self._handle_tuple_test_data(test_data)

        if isinstance(test_data, torch.utils.data.DataLoader):
            return self._process_dataloader(model, test_data)
        elif isinstance(test_data, tuple) and len(test_data) == 2:
            return self._process_tuple_data(model, test_data)
        elif isinstance(test_data, np.ndarray):
            return self._process_numpy_data(model, test_data)
        else:
            raise ValueError(f"Unsupported test data type: {type(test_data)}")

    def _handle_tuple_test_data(self, test_data: tuple) -> tuple:
        """Handle test data when it's a tuple."""
        if len(test_data) == 3:
            y_true, y_pred, y_scores = test_data
            self.y_scores = y_scores
            return y_pred, y_true
        elif len(test_data) == 2:
            return test_data[0], test_data[1]
        return test_data, None

    def _process_dataloader(
        self, model: torch.nn.Module, test_data: torch.utils.data.DataLoader
    ) -> tuple:
        """Process data from a DataLoader."""
        predictions = []
        targets = []
        with torch.no_grad():
            for batch in test_data:
                if isinstance(batch, (list, tuple)) and len(batch) == 2:
                    x, y = batch
                else:
                    x, y = batch, None

                if torch.is_tensor(x):
                    x = x.to(self.device)

                pred = model(x)

                if torch.is_tensor(pred):
                    pred = pred.cpu().numpy()
                predictions.append(pred)

                if y is not None:
                    if torch.is_tensor(y):
                        y = y.cpu().numpy()
                    targets.append(y)

        return np.concatenate(predictions), np.concatenate(targets)

    def _process_tuple_data(self, model: torch.nn.Module, test_data: tuple) -> tuple:
        """Process data when it's a tuple."""
        x, y = test_data
        if torch.is_tensor(x):
            x = x.to(self.device)

        with torch.no_grad():
            pred = model(x)

        if torch.is_tensor(pred):
            pred = pred.cpu().numpy()
        if torch.is_tensor(y):
            y = y.cpu().numpy()

        return pred, y

    def _process_numpy_data(self, model: torch.nn.Module, test_data: np.ndarray) -> tuple:
        """Process data when it's a numpy array."""
        x = torch.tensor(test_data, dtype=torch.float32).to(self.device)

        with torch.no_grad():
            pred = model(x)

        if torch.is_tensor(pred):
            pred = pred.cpu().numpy()
        return pred, np.zeros(len(pred))

    def _detect_task_type(self, targets: np.ndarray) -> str:
        """Detect whether task is classification or regression."""
        if targets is None:
            return "regression"

        unique_values = np.unique(targets)

        if len(unique_values) <= MAX_CLASSIFICATION_CLASSES and np.all(
            np.equal(np.mod(targets, 1), 0)
        ):
            return "classification"
        return "regression"

    def cross_validate(
        self, model_class: type, data: Any, k_folds: int = 5, **model_kwargs
    ) -> Dict[str, List[float]]:
        """
        Perform k-fold cross-validation.

        Args:
            model_class: Class of model to evaluate
            data: Dataset for cross-validation
            k_folds: Number of folds
            **model_kwargs: Keyword arguments for model initialization

        Returns:
            Dictionary of metric scores across folds
        """
        if isinstance(data, tuple):
            x, y = data
        else:
            raise ValueError("Data must be (X, y) tuple for cross-validation")

        kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)
        fold_results = {metric: [] for metric in self.metrics}

        for fold, (train_idx, val_idx) in enumerate(kf.split(x)):
            print(f"Evaluating fold {fold + 1}/{k_folds}")

            x_train, x_val = x[train_idx], x[val_idx]
            y_train, y_val = y[train_idx], y[val_idx]

            model = model_class(**model_kwargs)

            if hasattr(model, "fit"):
                model.fit(x_train, y_train)

            results = self.evaluate(model, (x_val, y_val))

            for metric in self.metrics:
                if metric in results:
                    fold_results[metric].append(results[metric])

        return fold_results

    def bootstrap_evaluate(
        self,
        model: Any,
        test_data: Any,
        n_bootstrap: int = 1000,
        confidence_level: float = 0.95,
    ) -> Dict[str, Dict[str, float]]:
        """
        Bootstrap evaluation for confidence intervals.

        Args:
            model: Trained model
            test_data: Test dataset
            n_bootstrap: Number of bootstrap samples
            confidence_level: Confidence level for intervals

        Returns:
            Dictionary with mean, std, and confidence intervals for each metric
        """
        predictions, targets = self._get_predictions(model, test_data)

        if targets is None:
            raise ValueError("Bootstrap evaluation requires ground truth targets")

        n_samples = len(targets)
        bootstrap_results = {metric: [] for metric in self.metrics}

        for _ in range(n_bootstrap):
            indices = np.random.choice(n_samples, n_samples, replace=True)
            boot_predictions = predictions[indices]
            boot_targets = targets[indices]

            if (
                self.task_type == "classification"
                or self._detect_task_type(boot_targets) == "classification"
            ):
                results = calculate_classification_metrics(
                    boot_targets, boot_predictions, metrics=self.metrics
                )
            else:
                results = calculate_regression_metrics(
                    boot_targets, boot_predictions, metrics=self.metrics
                )

            for metric in self.metrics:
                if metric in results:
                    bootstrap_results[metric].append(results[metric])

        alpha = 1 - confidence_level
        lower_percentile = (alpha / 2) * 100
        upper_percentile = (1 - alpha / 2) * 100

        final_results = {}
        for metric, values in bootstrap_results.items():
            if values:
                final_results[metric] = {
                    "mean": np.mean(values),
                    "std": np.std(values),
                    "lower_ci": np.percentile(values, lower_percentile),
                    "upper_ci": np.percentile(values, upper_percentile),
                }

        return final_results

    def generate_report(
        self,
        model: Any,
        test_data: Any,
        save_path: Optional[str] = None,
        y_scores: Optional[np.ndarray] = None,
    ) -> pd.DataFrame:
        """
        Generate comprehensive evaluation report.

        Args:
            model: Trained model
            test_data: Test dataset
            save_path: Path to save report (optional)
            y_scores: Scores/probabilities for AUC calculation (optional)

        Returns:
            DataFrame with evaluation results
        """

        results = self.evaluate(model, test_data, return_predictions=True, y_scores=y_scores)
        if isinstance(results, tuple):
            metrics_dict = results[0]
        else:
            metrics_dict = results

        try:
            bootstrap_results = self.bootstrap_evaluate(model, test_data)
        except Exception:
            bootstrap_results = {}

        report_data = []

        for metric in self.metrics:
            if metric in metrics_dict and metric not in {"predictions", "targets"}:
                row = {
                    "Metric": metric,
                    "Value": metrics_dict[metric],
                }

                if metric in bootstrap_results:
                    row.update(
                        {
                            "Std": bootstrap_results[metric]["std"],
                            "95% CI Lower": bootstrap_results[metric]["lower_ci"],
                            "95% CI Upper": bootstrap_results[metric]["upper_ci"],
                        }
                    )

                report_data.append(row)

        report_df = pd.DataFrame(report_data)

        if save_path:
            report_df.to_csv(save_path, index=False)
            print(f"Report saved to {save_path}")

        return report_df

evaluate

evaluate(
    model: Optional[Union[Module, ndarray]] = None,
    test_data: Optional[
        Union[DataLoader, ndarray, tuple]
    ] = None,
    return_predictions: bool = False,
    y_scores: Optional[ndarray] = None,
) -> Dict[str, float]

Evaluate model performance.

Parameters:

Name Type Description Default
model Optional[Union[Module, ndarray]]

PyTorch model or numpy array of predictions

None
test_data Optional[Union[DataLoader, ndarray, tuple]]

Test data loader or (predictions, targets) tuple

None
return_predictions bool

Whether to return predictions along with metrics

False
y_scores Optional[ndarray]

Scores/probabilities for AUC calculation (optional)

None

Returns:

Type Description
Dict[str, float]

Dictionary of metric values

Source code in deepecgkit/evaluation/evaluator.py
def evaluate(
    self,
    model: Optional[Union[torch.nn.Module, np.ndarray]] = None,
    test_data: Optional[Union[torch.utils.data.DataLoader, np.ndarray, tuple]] = None,
    return_predictions: bool = False,
    y_scores: Optional[np.ndarray] = None,
) -> Dict[str, float]:
    """
    Evaluate model performance.

    Args:
        model: PyTorch model or numpy array of predictions
        test_data: Test data loader or (predictions, targets) tuple
        return_predictions: Whether to return predictions along with metrics
        y_scores: Scores/probabilities for AUC calculation (optional)

    Returns:
        Dictionary of metric values
    """

    self.y_scores = y_scores

    predictions, targets = self._process_input_data(model, test_data)
    metrics = self._calculate_metrics(predictions, targets)

    if return_predictions:
        return metrics, predictions
    return metrics

cross_validate

cross_validate(
    model_class: type,
    data: Any,
    k_folds: int = 5,
    **model_kwargs,
) -> Dict[str, List[float]]

Perform k-fold cross-validation.

Parameters:

Name Type Description Default
model_class type

Class of model to evaluate

required
data Any

Dataset for cross-validation

required
k_folds int

Number of folds

5
**model_kwargs

Keyword arguments for model initialization

{}

Returns:

Type Description
Dict[str, List[float]]

Dictionary of metric scores across folds

Source code in deepecgkit/evaluation/evaluator.py
def cross_validate(
    self, model_class: type, data: Any, k_folds: int = 5, **model_kwargs
) -> Dict[str, List[float]]:
    """
    Perform k-fold cross-validation.

    Args:
        model_class: Class of model to evaluate
        data: Dataset for cross-validation
        k_folds: Number of folds
        **model_kwargs: Keyword arguments for model initialization

    Returns:
        Dictionary of metric scores across folds
    """
    if isinstance(data, tuple):
        x, y = data
    else:
        raise ValueError("Data must be (X, y) tuple for cross-validation")

    kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)
    fold_results = {metric: [] for metric in self.metrics}

    for fold, (train_idx, val_idx) in enumerate(kf.split(x)):
        print(f"Evaluating fold {fold + 1}/{k_folds}")

        x_train, x_val = x[train_idx], x[val_idx]
        y_train, y_val = y[train_idx], y[val_idx]

        model = model_class(**model_kwargs)

        if hasattr(model, "fit"):
            model.fit(x_train, y_train)

        results = self.evaluate(model, (x_val, y_val))

        for metric in self.metrics:
            if metric in results:
                fold_results[metric].append(results[metric])

    return fold_results

bootstrap_evaluate

bootstrap_evaluate(
    model: Any,
    test_data: Any,
    n_bootstrap: int = 1000,
    confidence_level: float = 0.95,
) -> Dict[str, Dict[str, float]]

Bootstrap evaluation for confidence intervals.

Parameters:

Name Type Description Default
model Any

Trained model

required
test_data Any

Test dataset

required
n_bootstrap int

Number of bootstrap samples

1000
confidence_level float

Confidence level for intervals

0.95

Returns:

Type Description
Dict[str, Dict[str, float]]

Dictionary with mean, std, and confidence intervals for each metric

Source code in deepecgkit/evaluation/evaluator.py
def bootstrap_evaluate(
    self,
    model: Any,
    test_data: Any,
    n_bootstrap: int = 1000,
    confidence_level: float = 0.95,
) -> Dict[str, Dict[str, float]]:
    """
    Bootstrap evaluation for confidence intervals.

    Args:
        model: Trained model
        test_data: Test dataset
        n_bootstrap: Number of bootstrap samples
        confidence_level: Confidence level for intervals

    Returns:
        Dictionary with mean, std, and confidence intervals for each metric
    """
    predictions, targets = self._get_predictions(model, test_data)

    if targets is None:
        raise ValueError("Bootstrap evaluation requires ground truth targets")

    n_samples = len(targets)
    bootstrap_results = {metric: [] for metric in self.metrics}

    for _ in range(n_bootstrap):
        indices = np.random.choice(n_samples, n_samples, replace=True)
        boot_predictions = predictions[indices]
        boot_targets = targets[indices]

        if (
            self.task_type == "classification"
            or self._detect_task_type(boot_targets) == "classification"
        ):
            results = calculate_classification_metrics(
                boot_targets, boot_predictions, metrics=self.metrics
            )
        else:
            results = calculate_regression_metrics(
                boot_targets, boot_predictions, metrics=self.metrics
            )

        for metric in self.metrics:
            if metric in results:
                bootstrap_results[metric].append(results[metric])

    alpha = 1 - confidence_level
    lower_percentile = (alpha / 2) * 100
    upper_percentile = (1 - alpha / 2) * 100

    final_results = {}
    for metric, values in bootstrap_results.items():
        if values:
            final_results[metric] = {
                "mean": np.mean(values),
                "std": np.std(values),
                "lower_ci": np.percentile(values, lower_percentile),
                "upper_ci": np.percentile(values, upper_percentile),
            }

    return final_results

generate_report

generate_report(
    model: Any,
    test_data: Any,
    save_path: Optional[str] = None,
    y_scores: Optional[ndarray] = None,
) -> pd.DataFrame

Generate comprehensive evaluation report.

Parameters:

Name Type Description Default
model Any

Trained model

required
test_data Any

Test dataset

required
save_path Optional[str]

Path to save report (optional)

None
y_scores Optional[ndarray]

Scores/probabilities for AUC calculation (optional)

None

Returns:

Type Description
DataFrame

DataFrame with evaluation results

Source code in deepecgkit/evaluation/evaluator.py
def generate_report(
    self,
    model: Any,
    test_data: Any,
    save_path: Optional[str] = None,
    y_scores: Optional[np.ndarray] = None,
) -> pd.DataFrame:
    """
    Generate comprehensive evaluation report.

    Args:
        model: Trained model
        test_data: Test dataset
        save_path: Path to save report (optional)
        y_scores: Scores/probabilities for AUC calculation (optional)

    Returns:
        DataFrame with evaluation results
    """

    results = self.evaluate(model, test_data, return_predictions=True, y_scores=y_scores)
    if isinstance(results, tuple):
        metrics_dict = results[0]
    else:
        metrics_dict = results

    try:
        bootstrap_results = self.bootstrap_evaluate(model, test_data)
    except Exception:
        bootstrap_results = {}

    report_data = []

    for metric in self.metrics:
        if metric in metrics_dict and metric not in {"predictions", "targets"}:
            row = {
                "Metric": metric,
                "Value": metrics_dict[metric],
            }

            if metric in bootstrap_results:
                row.update(
                    {
                        "Std": bootstrap_results[metric]["std"],
                        "95% CI Lower": bootstrap_results[metric]["lower_ci"],
                        "95% CI Upper": bootstrap_results[metric]["upper_ci"],
                    }
                )

            report_data.append(row)

    report_df = pd.DataFrame(report_data)

    if save_path:
        report_df.to_csv(save_path, index=False)
        print(f"Report saved to {save_path}")

    return report_df

Metrics

calculate_classification_metrics

calculate_classification_metrics(
    y_true: ndarray,
    y_pred: ndarray,
    metrics: Optional[List[str]] = None,
) -> Dict[str, float]

Calculate classification metrics.

Parameters:

Name Type Description Default
y_true ndarray

True labels

required
y_pred ndarray

Predicted labels or probabilities

required
metrics Optional[List[str]]

List of metrics to compute

None

Returns:

Type Description
Dict[str, float]

Dictionary of computed metrics

Source code in deepecgkit/evaluation/metrics.py
def calculate_classification_metrics(
    y_true: np.ndarray, y_pred: np.ndarray, metrics: Optional[List[str]] = None
) -> Dict[str, float]:
    """
    Calculate classification metrics.

    Args:
        y_true: True labels
        y_pred: Predicted labels or probabilities
        metrics: List of metrics to compute

    Returns:
        Dictionary of computed metrics
    """
    if metrics is None:
        metrics = ["accuracy", "precision", "recall", "f1", "auc", "mcc"]

    results = {}

    if y_pred.ndim > 1:
        y_pred_labels = np.argmax(y_pred, axis=1)
        y_scores = y_pred
    elif (
        y_pred.dtype == float
        and np.max(y_pred) <= 1.0
        and len(np.unique(y_true)) <= MULTICLASS_THRESHOLD
    ):
        y_pred_labels = (y_pred > BINARY_THRESHOLD).astype(int)
        y_scores = y_pred
    else:
        y_pred_labels = y_pred.astype(int)
        y_scores = y_pred

    n_classes = len(np.unique(y_true))
    avg = "binary" if n_classes <= MULTICLASS_THRESHOLD else "macro"

    def _compute_metric(metric):
        result = np.nan
        if metric == "accuracy":
            result = accuracy_score(y_true, y_pred_labels)
        elif metric in ["precision", "recall", "f1"]:
            precision, recall, f1, _ = precision_recall_fscore_support(
                y_true, y_pred_labels, average=avg, zero_division=0
            )
            if metric == "precision":
                result = precision
            elif metric == "recall":
                result = recall
            elif metric == "f1":
                result = f1
        elif metric == "auc":
            if n_classes > MULTICLASS_THRESHOLD:
                result = roc_auc_score(y_true, y_scores, multi_class="ovr")
            else:
                # Binary AUC expects 1D scores (positive class probability)
                scores = y_scores[:, 1] if y_scores.ndim > 1 else y_scores
                result = roc_auc_score(y_true, scores)
        elif metric == "mcc":
            result = matthews_corrcoef(y_true, y_pred_labels)
        return result

    for metric in metrics:
        try:
            results[metric] = _compute_metric(metric)
        except Exception as e:
            print(f"Warning: Could not compute {metric}: {e}")
            results[metric] = np.nan

    return results

calculate_regression_metrics

calculate_regression_metrics(
    y_true: ndarray,
    y_pred: ndarray,
    metrics: Optional[List[str]] = None,
) -> Dict[str, float]

Calculate regression metrics.

Parameters:

Name Type Description Default
y_true ndarray

True values

required
y_pred ndarray

Predicted values

required
metrics Optional[List[str]]

List of metrics to compute

None

Returns:

Type Description
Dict[str, float]

Dictionary of computed metrics

Source code in deepecgkit/evaluation/metrics.py
def calculate_regression_metrics(
    y_true: np.ndarray, y_pred: np.ndarray, metrics: Optional[List[str]] = None
) -> Dict[str, float]:
    """
    Calculate regression metrics.

    Args:
        y_true: True values
        y_pred: Predicted values
        metrics: List of metrics to compute

    Returns:
        Dictionary of computed metrics
    """
    if metrics is None:
        metrics = ["mse", "mae", "r2"]

    results = {}

    for metric in metrics:
        try:
            if metric in ["mse", "mean_squared_error"]:
                results[metric] = mean_squared_error(y_true, y_pred)
            elif metric in ["mae", "mean_absolute_error"]:
                results[metric] = mean_absolute_error(y_true, y_pred)
            elif metric in ["r2", "r2_score"]:
                results[metric] = r2_score(y_true, y_pred)
            elif metric == "rmse":
                results[metric] = np.sqrt(mean_squared_error(y_true, y_pred))
        except Exception as e:
            print(f"Warning: Could not compute {metric}: {e}")
            results[metric] = np.nan

    return results

confusion_matrix_analysis

confusion_matrix_analysis(
    y_true: ndarray, y_pred: ndarray
) -> Dict[str, Any]

Detailed confusion matrix analysis.

Parameters:

Name Type Description Default
y_true ndarray

True labels

required
y_pred ndarray

Predicted labels

required

Returns:

Type Description
Dict[str, Any]

Dictionary with confusion matrix and derived metrics

Source code in deepecgkit/evaluation/metrics.py
def confusion_matrix_analysis(y_true: np.ndarray, y_pred: np.ndarray) -> Dict[str, Any]:
    """
    Detailed confusion matrix analysis.

    Args:
        y_true: True labels
        y_pred: Predicted labels

    Returns:
        Dictionary with confusion matrix and derived metrics
    """
    cm = confusion_matrix(y_true, y_pred)

    n_classes = cm.shape[0]
    precision = np.zeros(n_classes)
    recall = np.zeros(n_classes)
    f1 = np.zeros(n_classes)

    for i in range(n_classes):
        if cm[:, i].sum() > 0:
            precision[i] = cm[i, i] / cm[:, i].sum()
        if cm[i, :].sum() > 0:
            recall[i] = cm[i, i] / cm[i, :].sum()
        if precision[i] + recall[i] > 0:
            f1[i] = 2 * precision[i] * recall[i] / (precision[i] + recall[i])

    return {
        "confusion_matrix": cm,
        "per_class_precision": precision,
        "per_class_recall": recall,
        "per_class_f1": f1,
        "macro_precision": np.mean(precision),
        "macro_recall": np.mean(recall),
        "macro_f1": np.mean(f1),
    }

Visualization

plot_confusion_matrix

plot_confusion_matrix(
    y_true: ndarray,
    y_pred: ndarray,
    class_names: Optional[List[str]] = None,
    title: str = "Confusion Matrix",
    save_path: Optional[str] = None,
)

Plot confusion matrix.

Source code in deepecgkit/evaluation/visualization.py
def plot_confusion_matrix(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    class_names: Optional[List[str]] = None,
    title: str = "Confusion Matrix",
    save_path: Optional[str] = None,
):
    """Plot confusion matrix."""
    labels = sorted(set(y_true) | set(y_pred))
    cm = confusion_matrix(y_true, y_pred, labels=labels)

    if class_names is None or len(class_names) != len(labels):
        class_names = [str(lbl) for lbl in labels]

    _, ax = plt.subplots(figsize=(8, 6))

    sns.heatmap(
        cm,
        annot=True,
        fmt="d",
        cmap="Blues",
        xticklabels=class_names,
        yticklabels=class_names,
        ax=ax,
    )

    ax.set_title(title)
    ax.set_ylabel("True Label")
    ax.set_xlabel("Predicted Label")

    if save_path:
        plt.savefig(save_path, dpi=300, bbox_inches="tight")
        plt.close()
    else:
        plt.show()

plot_roc_curve

plot_roc_curve(
    y_true: ndarray,
    y_scores: ndarray,
    title: str = "ROC Curve",
    save_path: Optional[str] = None,
)

Plot ROC curve.

Source code in deepecgkit/evaluation/visualization.py
def plot_roc_curve(
    y_true: np.ndarray,
    y_scores: np.ndarray,
    title: str = "ROC Curve",
    save_path: Optional[str] = None,
):
    """Plot ROC curve."""
    fpr, tpr, _ = roc_curve(y_true, y_scores)
    roc_auc = auc(fpr, tpr)

    plt.figure(figsize=(8, 6))
    plt.plot(
        fpr,
        tpr,
        color="darkorange",
        lw=2,
        label=f"ROC curve (area = {roc_auc:.2f})",
    )
    plt.plot([0, 1], [0, 1], color="navy", lw=2, linestyle="--")
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title(title)
    plt.legend(loc="lower right")

    if save_path:
        plt.savefig(save_path, dpi=300, bbox_inches="tight")
        plt.close()
    else:
        plt.show()

plot_training_curves

plot_training_curves(
    metrics_path: str,
    save_dir: Optional[str] = None,
    save_path: Optional[str] = None,
)

Plot training and validation loss and accuracy as separate figures.

Parameters:

Name Type Description Default
metrics_path str

Path to CSVLogger metrics.csv file.

required
save_dir Optional[str]

Directory to save separate loss.png and accuracy.png files.

None
save_path Optional[str]

Deprecated combined path; if provided and save_dir is not, saves loss plot to this path for backward compatibility.

None
Source code in deepecgkit/evaluation/visualization.py
def plot_training_curves(
    metrics_path: str,
    save_dir: Optional[str] = None,
    save_path: Optional[str] = None,
):
    """Plot training and validation loss and accuracy as separate figures.

    Args:
        metrics_path: Path to CSVLogger metrics.csv file.
        save_dir: Directory to save separate loss.png and accuracy.png files.
        save_path: Deprecated combined path; if provided and save_dir is not,
            saves loss plot to this path for backward compatibility.
    """
    df = pd.read_csv(metrics_path)

    has_acc = "train_acc" in df.columns

    resolved_dir = None
    if save_dir:
        resolved_dir = Path(save_dir)
    elif save_path:
        resolved_dir = Path(save_path).parent

    train_loss = df.dropna(subset=["train_loss"]).groupby("epoch")["train_loss"].mean()
    val_loss = df.dropna(subset=["val_loss"]).groupby("epoch")["val_loss"].mean()

    plt.figure(figsize=(10, 5))
    plt.plot(train_loss.index, train_loss.values, label="Train Loss")
    plt.plot(val_loss.index, val_loss.values, label="Val Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title("Training & Validation Loss")
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()

    if resolved_dir:
        plt.savefig(str(resolved_dir / "loss.png"), dpi=300, bbox_inches="tight")
        plt.close()
    else:
        plt.show()

    if has_acc:
        train_acc = df.dropna(subset=["train_acc"]).groupby("epoch")["train_acc"].mean()
        val_acc = df.dropna(subset=["val_acc"]).groupby("epoch")["val_acc"].mean()

        plt.figure(figsize=(10, 5))
        plt.plot(train_acc.index, train_acc.values, label="Train Acc")
        plt.plot(val_acc.index, val_acc.values, label="Val Acc")
        plt.xlabel("Epoch")
        plt.ylabel("Accuracy")
        plt.title("Training & Validation Accuracy")
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.tight_layout()

        if resolved_dir:
            plt.savefig(str(resolved_dir / "accuracy.png"), dpi=300, bbox_inches="tight")
            plt.close()
        else:
            plt.show()

plot_ecg_signals

plot_ecg_signals(
    data: ndarray,
    sampling_rate: float = 500.0,
    leads: Optional[List[str]] = None,
    title: str = "ECG Signals",
    save_path: Optional[str] = None,
)

Plot ECG signals.

Source code in deepecgkit/evaluation/visualization.py
def plot_ecg_signals(
    data: np.ndarray,
    sampling_rate: float = 500.0,
    leads: Optional[List[str]] = None,
    title: str = "ECG Signals",
    save_path: Optional[str] = None,
):
    """Plot ECG signals."""
    if data.ndim == 1:
        data = data.reshape(-1, 1)

    n_samples, n_leads = data.shape
    time = np.arange(n_samples) / sampling_rate

    if leads is None:
        leads = [f"Lead {i + 1}" for i in range(n_leads)]

    _fig, axes = plt.subplots(n_leads, 1, figsize=(12, 2 * n_leads), sharex=True)
    if n_leads == 1:
        axes = [axes]

    for i, ax in enumerate(axes):
        ax.plot(time, data[:, i])
        ax.set_ylabel(f"{leads[i]} (mV)")
        ax.grid(True, alpha=0.3)

    axes[-1].set_xlabel("Time (s)")
    plt.suptitle(title)
    plt.tight_layout()

    if save_path:
        plt.savefig(save_path, dpi=300, bbox_inches="tight")
        plt.close()
    else:
        plt.show()

plot_predictions

plot_predictions(
    y_true: ndarray,
    y_pred: ndarray,
    title: str = "Predictions vs True Values",
    save_path: Optional[str] = None,
)

Plot predictions vs true values.

Source code in deepecgkit/evaluation/visualization.py
def plot_predictions(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    title: str = "Predictions vs True Values",
    save_path: Optional[str] = None,
):
    """Plot predictions vs true values."""
    plt.figure(figsize=(8, 6))
    plt.scatter(y_true, y_pred, alpha=0.6)
    plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], "r--", lw=2)
    plt.xlabel("True Values")
    plt.ylabel("Predictions")
    plt.title(title)

    if save_path:
        plt.savefig(save_path, dpi=300, bbox_inches="tight")
        plt.close()
    else:
        plt.show()

plot_calibration_curve

plot_calibration_curve(
    y_true: ndarray,
    y_prob: ndarray,
    class_names: Optional[List[str]] = None,
    n_bins: int = 10,
    title: str = "Calibration Plot",
    save_dir: Optional[str] = None,
    save_path: Optional[str] = None,
)

Plot reliability diagram and prediction distribution as separate figures.

Parameters:

Name Type Description Default
y_true ndarray

True labels.

required
y_prob ndarray

Predicted probabilities (n_samples, n_classes).

required
class_names Optional[List[str]]

Optional list of class names.

None
n_bins int

Number of bins for calibration curve.

10
title str

Title for the calibration curve plot.

'Calibration Plot'
save_dir Optional[str]

Directory to save calibration_curve.png and prediction_distribution.png.

None
save_path Optional[str]

Deprecated combined path; if provided and save_dir is not, uses its parent directory for backward compatibility.

None
Source code in deepecgkit/evaluation/visualization.py
def plot_calibration_curve(
    y_true: np.ndarray,
    y_prob: np.ndarray,
    class_names: Optional[List[str]] = None,
    n_bins: int = 10,
    title: str = "Calibration Plot",
    save_dir: Optional[str] = None,
    save_path: Optional[str] = None,
):
    """Plot reliability diagram and prediction distribution as separate figures.

    Args:
        y_true: True labels.
        y_prob: Predicted probabilities (n_samples, n_classes).
        class_names: Optional list of class names.
        n_bins: Number of bins for calibration curve.
        title: Title for the calibration curve plot.
        save_dir: Directory to save calibration_curve.png and prediction_distribution.png.
        save_path: Deprecated combined path; if provided and save_dir is not,
            uses its parent directory for backward compatibility.
    """
    num_classes = y_prob.shape[1]
    if class_names is None:
        class_names = [f"Class {i}" for i in range(num_classes)]

    resolved_dir = None
    if save_dir:
        resolved_dir = Path(save_dir)
    elif save_path:
        resolved_dir = Path(save_path).parent

    plt.figure(figsize=(8, 6))
    for i in range(num_classes):
        y_binary = (y_true == i).astype(int)
        prob_class = y_prob[:, i]
        if y_binary.sum() == 0:
            continue
        try:
            fraction_of_positives, mean_predicted_value = calibration_curve(
                y_binary, prob_class, n_bins=n_bins, strategy="uniform"
            )
            plt.plot(mean_predicted_value, fraction_of_positives, "s-", label=class_names[i])
        except ValueError:
            continue

    plt.plot([0, 1], [0, 1], "k--", label="Perfectly calibrated")
    plt.xlabel("Mean Predicted Probability")
    plt.ylabel("Fraction of Positives")
    plt.title(title)
    plt.legend(loc="lower right", fontsize="small")
    plt.grid(True, alpha=0.3)
    plt.tight_layout()

    if resolved_dir:
        plt.savefig(str(resolved_dir / "calibration_curve.png"), dpi=300, bbox_inches="tight")
        plt.close()
    else:
        plt.show()

    plt.figure(figsize=(8, 6))
    for i in range(num_classes):
        plt.hist(y_prob[:, i], bins=n_bins, alpha=0.5, label=class_names[i])
    plt.xlabel("Predicted Probability")
    plt.ylabel("Count")
    plt.title("Prediction Distribution")
    plt.legend(fontsize="small")
    plt.grid(True, alpha=0.3)
    plt.tight_layout()

    if resolved_dir:
        plt.savefig(str(resolved_dir / "prediction_distribution.png"), dpi=300, bbox_inches="tight")
        plt.close()
    else:
        plt.show()