Skip to content

API Reference

Auto-generated from Google-style docstrings in src/. Any function or class with a docstring is reflected here — no manual updates needed.


Domain Layer

The domain layer is pure Python — no infrastructure dependencies. All business rules live here.

Customer Domain

src.domain.customer.entities

Customer entity – core of the Customer bounded context.

An Entity has a unique identity (customer_id) and mutable lifecycle state. Business rules about customers live here, not in the API or infrastructure layers.

Classes

Customer dataclass

Represents a B2B SaaS customer account.

The customer entity owns lifecycle state (active vs churned) and exposes domain methods that encapsulate churn-relevant business rules.

Parameters:

Name Type Description Default
customer_id str

Unique identifier (UUID string).

required
industry Industry

Vertical segment, used for cohort analysis.

required
plan_tier PlanTier

Commercial tier (starter / growth / enterprise).

required
signup_date date

Date of first contract activation.

required
mrr MRR

Monthly Recurring Revenue value object.

required
churn_date date | None

Date of cancellation; None means still active (right-censored).

None
Source code in src/domain/customer/entities.py
@dataclass
class Customer:
    """Represents a B2B SaaS customer account.

    The customer entity owns lifecycle state (active vs churned) and exposes
    domain methods that encapsulate churn-relevant business rules.

    Args:
        customer_id: Unique identifier (UUID string).
        industry: Vertical segment, used for cohort analysis.
        plan_tier: Commercial tier (starter / growth / enterprise).
        signup_date: Date of first contract activation.
        mrr: Monthly Recurring Revenue value object.
        churn_date: Date of cancellation; None means still active (right-censored).
    """

    customer_id: str
    industry: Industry
    plan_tier: PlanTier
    signup_date: date
    mrr: MRR
    churn_date: date | None = field(default=None)

    @property
    def is_active(self) -> bool:
        """True if the customer has not churned."""
        return self.churn_date is None

    @property
    def tenure_days(self) -> int:
        """Days from signup to churn (or today if still active).

        Used as the time axis in survival analysis models.
        """
        end_date = self.churn_date if self.churn_date else date.today()
        return (end_date - self.signup_date).days

    @property
    def is_early_stage(self) -> bool:
        """True if customer is within the critical first-90-day onboarding window.

        20–25% of voluntary churn occurs in this window (per Forrester data).
        """
        return self.tenure_days <= 90

    @property
    def annual_revenue_at_risk(self) -> str:
        """Human-readable annual revenue that would be lost if this customer churns."""
        return str(self.mrr.revenue_at_risk)

    def mark_churned(self, churn_date: date) -> None:
        """Record the churn event.

        Args:
            churn_date: The date cancellation was confirmed.

        Raises:
            ValueError: If churn_date precedes signup_date or customer already churned.
        """
        if not self.is_active:
            raise ValueError(f"Customer {self.customer_id} has already churned.")
        if churn_date < self.signup_date:
            raise ValueError(f"churn_date {churn_date} cannot precede signup_date {self.signup_date}")
        self.churn_date = churn_date
Attributes
is_active property
is_active: bool

True if the customer has not churned.

tenure_days property
tenure_days: int

Days from signup to churn (or today if still active).

Used as the time axis in survival analysis models.

is_early_stage property
is_early_stage: bool

True if customer is within the critical first-90-day onboarding window.

20–25% of voluntary churn occurs in this window (per Forrester data).

annual_revenue_at_risk property
annual_revenue_at_risk: str

Human-readable annual revenue that would be lost if this customer churns.

Functions
mark_churned
mark_churned(churn_date: date) -> None

Record the churn event.

Parameters:

Name Type Description Default
churn_date date

The date cancellation was confirmed.

required

Raises:

Type Description
ValueError

If churn_date precedes signup_date or customer already churned.

Source code in src/domain/customer/entities.py
def mark_churned(self, churn_date: date) -> None:
    """Record the churn event.

    Args:
        churn_date: The date cancellation was confirmed.

    Raises:
        ValueError: If churn_date precedes signup_date or customer already churned.
    """
    if not self.is_active:
        raise ValueError(f"Customer {self.customer_id} has already churned.")
    if churn_date < self.signup_date:
        raise ValueError(f"churn_date {churn_date} cannot precede signup_date {self.signup_date}")
    self.churn_date = churn_date

src.domain.customer.value_objects

Value objects for the Customer domain.

Value objects are immutable and compared by value, not identity. They encapsulate business rules about what constitutes a valid value.

Classes

PlanTier

Bases: StrEnum

The commercial tier a customer is on.

Tier correlates with feature access, support SLA, and churn risk profile. Enterprise customers have dedicated CSMs and lower base churn rates. CUSTOM represents bespoke enterprise contracts — the ceiling of the tier ladder. FREE represents the freemium tier (zero MRR); the highest-leverage conversion event in SaaS is free → paid when a customer hits a feature data-sharing limit.

Source code in src/domain/customer/value_objects.py
class PlanTier(StrEnum):
    """The commercial tier a customer is on.

    Tier correlates with feature access, support SLA, and churn risk profile.
    Enterprise customers have dedicated CSMs and lower base churn rates.
    CUSTOM represents bespoke enterprise contracts — the ceiling of the tier ladder.
    FREE represents the freemium tier (zero MRR); the highest-leverage conversion
    event in SaaS is free → paid when a customer hits a feature data-sharing limit.
    """

    FREE = "free"
    STARTER = "starter"
    GROWTH = "growth"
    ENTERPRISE = "enterprise"
    CUSTOM = "custom"
Industry

Bases: StrEnum

Vertical industry segment.

Used for cohort segmentation and industry-specific churn benchmarking.

Source code in src/domain/customer/value_objects.py
class Industry(StrEnum):
    """Vertical industry segment.

    Used for cohort segmentation and industry-specific churn benchmarking.
    """

    FINTECH = "FinTech"
    HEALTHTECH = "HealthTech"
    LEGALTECH = "LegalTech"
    HR_TECH = "HR Tech"
    EDTECH = "EdTech"
    INSURTECH = "InsurTech"
    PROPTECH = "PropTech"
    RETAILTECH = "RetailTech"
    OTHER = "Other"
MRR dataclass

Monthly Recurring Revenue in USD.

Enforces non-negative constraint. Used for business impact calculations (e.g. revenue at risk = MRR × churn_probability).

Source code in src/domain/customer/value_objects.py
@dataclass(frozen=True)
class MRR:
    """Monthly Recurring Revenue in USD.

    Enforces non-negative constraint. Used for business impact calculations
    (e.g. revenue at risk = MRR × churn_probability).
    """

    amount: Decimal

    def __post_init__(self) -> None:
        if self.amount < Decimal("0"):
            raise ValueError(f"MRR cannot be negative, got {self.amount}")

    @classmethod
    def from_float(cls, value: float) -> MRR:
        """Create MRR from a float, rounding to 2 decimal places."""
        return cls(amount=Decimal(str(round(value, 2))))

    @property
    def revenue_at_risk(self) -> Decimal:
        """Annual revenue at risk if customer churns (MRR × 12)."""
        return self.amount * Decimal("12")

    def __str__(self) -> str:
        return f"${self.amount:,.2f}"
Attributes
revenue_at_risk property
revenue_at_risk: Decimal

Annual revenue at risk if customer churns (MRR × 12).

Functions
from_float classmethod
from_float(value: float) -> MRR

Create MRR from a float, rounding to 2 decimal places.

Source code in src/domain/customer/value_objects.py
@classmethod
def from_float(cls, value: float) -> MRR:
    """Create MRR from a float, rounding to 2 decimal places."""
    return cls(amount=Decimal(str(round(value, 2))))

src.domain.customer.repository

CustomerRepository – abstract port (interface) for the Customer domain.

Infrastructure implementations live in src/infrastructure/repositories/. The domain layer depends only on this interface, never on DuckDB directly.

Classes

CustomerRepository

Bases: ABC

Port (interface) for persisting and retrieving Customer entities.

Source code in src/domain/customer/repository.py
class CustomerRepository(ABC):
    """Port (interface) for persisting and retrieving Customer entities."""

    @abstractmethod
    def get_by_id(self, customer_id: str) -> Customer | None:
        """Retrieve a customer by their unique ID.

        Returns:
            Customer entity, or None if not found.
        """
        ...

    @abstractmethod
    def get_all_active(self) -> Sequence[Customer]:
        """Return all customers who have not yet churned.

        Used for batch churn scoring runs.
        """
        ...

    @abstractmethod
    def get_sample(self, n: int) -> Sequence[Customer]:
        """Return a random sample of n customers (any churn status).

        Business Context:
            Used by the demo list endpoint to seed load-test tooling and
            the UI customer picker. Deterministic seed (REPEATABLE 42) ensures
            the same customers are returned across requests for reproducibility.

        Args:
            n: Number of customers to sample (caller must clamp to safe max).
        """
        ...

    @abstractmethod
    def save(self, customer: Customer) -> None:
        """Persist a new or updated Customer entity."""
        ...
Functions
get_by_id abstractmethod
get_by_id(customer_id: str) -> Customer | None

Retrieve a customer by their unique ID.

Returns:

Type Description
Customer | None

Customer entity, or None if not found.

Source code in src/domain/customer/repository.py
@abstractmethod
def get_by_id(self, customer_id: str) -> Customer | None:
    """Retrieve a customer by their unique ID.

    Returns:
        Customer entity, or None if not found.
    """
    ...
get_all_active abstractmethod
get_all_active() -> Sequence[Customer]

Return all customers who have not yet churned.

Used for batch churn scoring runs.

Source code in src/domain/customer/repository.py
@abstractmethod
def get_all_active(self) -> Sequence[Customer]:
    """Return all customers who have not yet churned.

    Used for batch churn scoring runs.
    """
    ...
get_sample abstractmethod
get_sample(n: int) -> Sequence[Customer]

Return a random sample of n customers (any churn status).

Business Context

Used by the demo list endpoint to seed load-test tooling and the UI customer picker. Deterministic seed (REPEATABLE 42) ensures the same customers are returned across requests for reproducibility.

Parameters:

Name Type Description Default
n int

Number of customers to sample (caller must clamp to safe max).

required
Source code in src/domain/customer/repository.py
@abstractmethod
def get_sample(self, n: int) -> Sequence[Customer]:
    """Return a random sample of n customers (any churn status).

    Business Context:
        Used by the demo list endpoint to seed load-test tooling and
        the UI customer picker. Deterministic seed (REPEATABLE 42) ensures
        the same customers are returned across requests for reproducibility.

    Args:
        n: Number of customers to sample (caller must clamp to safe max).
    """
    ...
save abstractmethod
save(customer: Customer) -> None

Persist a new or updated Customer entity.

Source code in src/domain/customer/repository.py
@abstractmethod
def save(self, customer: Customer) -> None:
    """Persist a new or updated Customer entity."""
    ...

Usage Domain

src.domain.usage.entities

UsageEvent entity – core of the Usage bounded context.

Classes

UsageEvent dataclass

A single product interaction by a customer.

Parameters:

Name Type Description Default
event_id str

UUID primary key.

required
customer_id str

FK to Customer entity.

required
timestamp datetime

UTC datetime of the event.

required
event_type EventType

The type of product action taken.

required
feature_adoption_score FeatureAdoptionScore

Adoption score snapshot at the time of the event.

required
Source code in src/domain/usage/entities.py
@dataclass
class UsageEvent:
    """A single product interaction by a customer.

    Args:
        event_id: UUID primary key.
        customer_id: FK to Customer entity.
        timestamp: UTC datetime of the event.
        event_type: The type of product action taken.
        feature_adoption_score: Adoption score snapshot at the time of the event.
    """

    event_id: str
    customer_id: str
    timestamp: datetime
    event_type: EventType
    feature_adoption_score: FeatureAdoptionScore

    @property
    def is_retention_signal(self) -> bool:
        """True if this event type is a known strong retention indicator.

        Integration connects and API calls indicate deep product embedding,
        which significantly reduces churn probability.
        """
        return self.event_type in {
            EventType.INTEGRATION_CONNECT,
            EventType.API_CALL,
            EventType.MONITORING_RUN,
        }
Attributes
is_retention_signal property
is_retention_signal: bool

True if this event type is a known strong retention indicator.

Integration connects and API calls indicate deep product embedding, which significantly reduces churn probability.

src.domain.usage.value_objects

Value objects for the Usage domain.

Classes

EventType

Bases: StrEnum

Product event types tracked in usage_events.

integration_connect events are strong retention signals. Absence of monitoring_run events is an early churn indicator.

Source code in src/domain/usage/value_objects.py
class EventType(StrEnum):
    """Product event types tracked in usage_events.

    `integration_connect` events are strong retention signals.
    Absence of `monitoring_run` events is an early churn indicator.
    """

    EVIDENCE_UPLOAD = "evidence_upload"
    MONITORING_RUN = "monitoring_run"
    REPORT_VIEW = "report_view"
    USER_INVITE = "user_invite"
    INTEGRATION_CONNECT = "integration_connect"
    API_CALL = "api_call"
FeatureAdoptionScore dataclass

Composite 0–1 score reflecting breadth and depth of feature usage.

Scores below 0.2 combined with declining event frequency are the strongest leading indicators of churn in the first 90 days.

Source code in src/domain/usage/value_objects.py
@dataclass(frozen=True)
class FeatureAdoptionScore:
    """Composite 0–1 score reflecting breadth and depth of feature usage.

    Scores below 0.2 combined with declining event frequency are the
    strongest leading indicators of churn in the first 90 days.
    """

    value: float

    def __post_init__(self) -> None:
        if not (0.0 <= self.value <= 1.0):
            raise ValueError(f"FeatureAdoptionScore must be in [0, 1], got {self.value}")

    @property
    def is_low(self) -> bool:
        """True if adoption is critically low (below 0.2)."""
        return self.value < 0.2

    @property
    def label(self) -> str:
        if self.value < 0.2:
            return "critical"
        if self.value < 0.5:
            return "low"
        if self.value < 0.75:
            return "moderate"
        return "high"
Attributes
is_low property
is_low: bool

True if adoption is critically low (below 0.2).

src.domain.usage.repository

UsageRepository – abstract port for the Usage domain.

Classes

UsageRepository

Bases: ABC

Port for retrieving usage events.

Source code in src/domain/usage/repository.py
class UsageRepository(ABC):
    """Port for retrieving usage events."""

    @abstractmethod
    def get_events_for_customer(
        self,
        customer_id: str,
        since: datetime | None = None,
    ) -> Sequence[UsageEvent]:
        """Retrieve all usage events for a customer, optionally filtered by date.

        Args:
            customer_id: The customer whose events to fetch.
            since: If provided, return only events after this UTC datetime.
        """
        ...

    @abstractmethod
    def get_event_count_last_n_days(self, customer_id: str, days: int) -> int:
        """Count events in the last N days.

        Used as a feature for churn model: event decay is a leading indicator.
        """
        ...
Functions
get_events_for_customer abstractmethod
get_events_for_customer(customer_id: str, since: datetime | None = None) -> Sequence[UsageEvent]

Retrieve all usage events for a customer, optionally filtered by date.

Parameters:

Name Type Description Default
customer_id str

The customer whose events to fetch.

required
since datetime | None

If provided, return only events after this UTC datetime.

None
Source code in src/domain/usage/repository.py
@abstractmethod
def get_events_for_customer(
    self,
    customer_id: str,
    since: datetime | None = None,
) -> Sequence[UsageEvent]:
    """Retrieve all usage events for a customer, optionally filtered by date.

    Args:
        customer_id: The customer whose events to fetch.
        since: If provided, return only events after this UTC datetime.
    """
    ...
get_event_count_last_n_days abstractmethod
get_event_count_last_n_days(customer_id: str, days: int) -> int

Count events in the last N days.

Used as a feature for churn model: event decay is a leading indicator.

Source code in src/domain/usage/repository.py
@abstractmethod
def get_event_count_last_n_days(self, customer_id: str, days: int) -> int:
    """Count events in the last N days.

    Used as a feature for churn model: event decay is a leading indicator.
    """
    ...

Prediction Domain

src.domain.prediction.entities

PredictionResult entity – output of the Prediction domain services.

Classes

ShapFeature dataclass

A single SHAP feature contribution to a prediction.

Provides model explainability for CS teams to understand why a customer was flagged, enabling targeted interventions.

Source code in src/domain/prediction/entities.py
@dataclass
class ShapFeature:
    """A single SHAP feature contribution to a prediction.

    Provides model explainability for CS teams to understand why a
    customer was flagged, enabling targeted interventions.
    """

    feature_name: str
    feature_value: float
    shap_impact: float  # Positive = increases churn risk
PredictionResult dataclass

The complete output of a churn + risk prediction for one customer.

Parameters:

Name Type Description Default
customer_id str

The customer this prediction belongs to.

required
churn_probability ChurnProbability

Calibrated P(churn in 90 days).

required
risk_score RiskScore

Composite compliance/usage risk score.

required
top_shap_features list[ShapFeature]

Top-N SHAP drivers (sorted by |shap_impact|).

list()
model_version str

Semantic version of the model artifact used.

'0.0.0'
predicted_at datetime

UTC timestamp of when the prediction was generated.

(lambda: now(UTC))()
Source code in src/domain/prediction/entities.py
@dataclass
class PredictionResult:
    """The complete output of a churn + risk prediction for one customer.

    Args:
        customer_id: The customer this prediction belongs to.
        churn_probability: Calibrated P(churn in 90 days).
        risk_score: Composite compliance/usage risk score.
        top_shap_features: Top-N SHAP drivers (sorted by |shap_impact|).
        model_version: Semantic version of the model artifact used.
        predicted_at: UTC timestamp of when the prediction was generated.
    """

    customer_id: str
    churn_probability: ChurnProbability
    risk_score: RiskScore
    top_shap_features: list[ShapFeature] = field(default_factory=list)
    model_version: str = "0.0.0"
    predicted_at: datetime = field(default_factory=lambda: datetime.now(UTC))

    @property
    def recommended_action(self) -> str:
        """Natural-language CS recommendation based on prediction outputs.

        This is a deterministic rule — LLM summaries build on top of this
        in the AI/LLM layer (Phase 5).
        """
        if self.churn_probability.value >= 0.75:
            return "CRITICAL – Escalate to senior CSM immediately. Schedule EBR within 7 days."
        if self.churn_probability.value >= 0.5:
            return "HIGH RISK – Trigger CS outreach within 48 hours. Review top SHAP drivers."
        if self.churn_probability.value >= 0.25:
            return "MEDIUM RISK – Add to CSM watch list. Schedule check-in call."
        return "LOW RISK – No immediate action required. Monitor monthly."
Attributes
recommended_action property
recommended_action: str

Natural-language CS recommendation based on prediction outputs.

This is a deterministic rule — LLM summaries build on top of this in the AI/LLM layer (Phase 5).

src.domain.prediction.value_objects

Value objects for the Prediction domain.

Classes

ChurnProbability dataclass

P(churn in next 90 days) output from the churn model.

Calibrated probability in [0, 1]. The 0.5 threshold is the default operating point; business impact analysis should inform the actual threshold used for CS outreach triggers.

Source code in src/domain/prediction/value_objects.py
@dataclass(frozen=True)
class ChurnProbability:
    """P(churn in next 90 days) output from the churn model.

    Calibrated probability in [0, 1]. The 0.5 threshold is the default
    operating point; business impact analysis should inform the actual
    threshold used for CS outreach triggers.
    """

    value: float

    def __post_init__(self) -> None:
        if not (0.0 <= self.value <= 1.0):
            raise ValueError(f"ChurnProbability must be in [0, 1], got {self.value}")

    @property
    def risk_tier(self) -> RiskTier:
        if self.value >= 0.75:
            return RiskTier.CRITICAL
        if self.value >= 0.5:
            return RiskTier.HIGH
        if self.value >= 0.25:
            return RiskTier.MEDIUM
        return RiskTier.LOW

    @property
    def requires_immediate_action(self) -> bool:
        """True if CS outreach should be triggered within 48 hours."""
        return self.value >= 0.5
Attributes
requires_immediate_action property
requires_immediate_action: bool

True if CS outreach should be triggered within 48 hours.

RiskScore dataclass

Composite compliance + usage risk score in [0, 1].

Combines compliance_gap_score, vendor_risk_flags, and usage_decay_score. Distinct from churn probability — a customer can have high risk score but low churn probability if they are contractually locked in.

Source code in src/domain/prediction/value_objects.py
@dataclass(frozen=True)
class RiskScore:
    """Composite compliance + usage risk score in [0, 1].

    Combines compliance_gap_score, vendor_risk_flags, and usage_decay_score.
    Distinct from churn probability — a customer can have high risk score
    but low churn probability if they are contractually locked in.
    """

    value: float

    def __post_init__(self) -> None:
        if not (0.0 <= self.value <= 1.0):
            raise ValueError(f"RiskScore must be in [0, 1], got {self.value}")

    @property
    def tier(self) -> RiskTier:
        if self.value >= 0.75:
            return RiskTier.CRITICAL
        if self.value >= 0.5:
            return RiskTier.HIGH
        if self.value >= 0.25:
            return RiskTier.MEDIUM
        return RiskTier.LOW

src.domain.prediction.churn_model_service

ChurnModelService – domain service for churn probability prediction.

Domain services encapsulate operations that don't naturally belong to a single entity. The model artifact is injected as a dependency (no direct file I/O here).

Classes

ChurnFeatureVector

Bases: Protocol

Protocol for feature extraction – implemented in infrastructure layer.

Phase 4 update: the extractor queries the dbt mart directly (single DuckDB read), so events no longer need to be passed from the use case layer. This keeps the protocol minimal and moves feature logic into dbt.

Source code in src/domain/prediction/churn_model_service.py
class ChurnFeatureVector(Protocol):
    """Protocol for feature extraction – implemented in infrastructure layer.

    Phase 4 update: the extractor queries the dbt mart directly (single DuckDB
    read), so events no longer need to be passed from the use case layer.
    This keeps the protocol minimal and moves feature logic into dbt.
    """

    def extract(self, customer: Customer) -> dict[str, float | str]:
        """Extract the model's feature vector for a customer.

        Args:
            customer: Active Customer entity (used to look up mart row by ID).

        Returns:
            Flat dict of feature_name → value (numerics as float, categoricals
            as lowercase string for sklearn OrdinalEncoder compatibility).
            All feature engineering lives in mart_customer_churn_features.

        Raises:
            ValueError: If the customer is not found in the mart (e.g. churned
                        customers are excluded from the mart).
        """
        ...
Functions
extract
extract(customer: Customer) -> dict[str, float | str]

Extract the model's feature vector for a customer.

Parameters:

Name Type Description Default
customer Customer

Active Customer entity (used to look up mart row by ID).

required

Returns:

Type Description
dict[str, float | str]

Flat dict of feature_name → value (numerics as float, categoricals

dict[str, float | str]

as lowercase string for sklearn OrdinalEncoder compatibility).

dict[str, float | str]

All feature engineering lives in mart_customer_churn_features.

Raises:

Type Description
ValueError

If the customer is not found in the mart (e.g. churned customers are excluded from the mart).

Source code in src/domain/prediction/churn_model_service.py
def extract(self, customer: Customer) -> dict[str, float | str]:
    """Extract the model's feature vector for a customer.

    Args:
        customer: Active Customer entity (used to look up mart row by ID).

    Returns:
        Flat dict of feature_name → value (numerics as float, categoricals
        as lowercase string for sklearn OrdinalEncoder compatibility).
        All feature engineering lives in mart_customer_churn_features.

    Raises:
        ValueError: If the customer is not found in the mart (e.g. churned
                    customers are excluded from the mart).
    """
    ...
ChurnModelPort

Bases: ABC

Abstract port for the underlying ML model.

Concrete implementations in src/infrastructure/ml/ load the trained XGBoost/survival model artifact.

Source code in src/domain/prediction/churn_model_service.py
class ChurnModelPort(ABC):
    """Abstract port for the underlying ML model.

    Concrete implementations in src/infrastructure/ml/
    load the trained XGBoost/survival model artifact.
    """

    @abstractmethod
    def predict_proba(self, features: dict[str, float | str]) -> float:
        """Return P(churn in 90 days) for the given feature vector."""
        ...

    @abstractmethod
    def explain(self, features: dict[str, float | str]) -> list[ShapFeature]:
        """Return SHAP feature contributions for explainability."""
        ...

    @property
    @abstractmethod
    def version(self) -> str:
        """Semantic version of the loaded model artifact."""
        ...
Attributes
version abstractmethod property
version: str

Semantic version of the loaded model artifact.

Functions
predict_proba abstractmethod
predict_proba(features: dict[str, float | str]) -> float

Return P(churn in 90 days) for the given feature vector.

Source code in src/domain/prediction/churn_model_service.py
@abstractmethod
def predict_proba(self, features: dict[str, float | str]) -> float:
    """Return P(churn in 90 days) for the given feature vector."""
    ...
explain abstractmethod
explain(features: dict[str, float | str]) -> list[ShapFeature]

Return SHAP feature contributions for explainability.

Source code in src/domain/prediction/churn_model_service.py
@abstractmethod
def explain(self, features: dict[str, float | str]) -> list[ShapFeature]:
    """Return SHAP feature contributions for explainability."""
    ...
ChurnModelService

Orchestrates feature extraction → model inference → result construction.

Parameters:

Name Type Description Default
model ChurnModelPort

Concrete ML model (injected from infrastructure layer).

required
feature_extractor ChurnFeatureVector

Queries the dbt mart for the customer's feature vector.

required
Source code in src/domain/prediction/churn_model_service.py
class ChurnModelService:
    """Orchestrates feature extraction → model inference → result construction.

    Args:
        model: Concrete ML model (injected from infrastructure layer).
        feature_extractor: Queries the dbt mart for the customer's feature vector.
    """

    def __init__(self, model: ChurnModelPort, feature_extractor: ChurnFeatureVector) -> None:
        self._model = model
        self._feature_extractor = feature_extractor

    def predict(
        self,
        customer: Customer,
        risk_score: RiskScore,
    ) -> PredictionResult:
        """Generate a full PredictionResult for a customer.

        Business Context: Feature extraction, model inference, and SHAP
        computation are all delegated to injected dependencies. This service
        only owns the assembly logic, keeping it testable in isolation.

        Args:
            customer: Active Customer entity.
            risk_score: Pre-computed composite risk score (from RiskModelService).

        Returns:
            PredictionResult with calibrated churn probability, SHAP explanations,
            and a deterministic recommended CS action.
        """
        features = self._feature_extractor.extract(customer)
        churn_prob = self._model.predict_proba(features)
        shap_features = self._model.explain(features)

        return PredictionResult(
            customer_id=customer.customer_id,
            churn_probability=ChurnProbability(value=churn_prob),
            risk_score=risk_score,
            top_shap_features=sorted(shap_features, key=lambda f: abs(f.shap_impact), reverse=True)[:5],
            model_version=self._model.version,
        )
Functions
predict
predict(customer: Customer, risk_score: RiskScore) -> PredictionResult

Generate a full PredictionResult for a customer.

Business Context: Feature extraction, model inference, and SHAP computation are all delegated to injected dependencies. This service only owns the assembly logic, keeping it testable in isolation.

Parameters:

Name Type Description Default
customer Customer

Active Customer entity.

required
risk_score RiskScore

Pre-computed composite risk score (from RiskModelService).

required

Returns:

Type Description
PredictionResult

PredictionResult with calibrated churn probability, SHAP explanations,

PredictionResult

and a deterministic recommended CS action.

Source code in src/domain/prediction/churn_model_service.py
def predict(
    self,
    customer: Customer,
    risk_score: RiskScore,
) -> PredictionResult:
    """Generate a full PredictionResult for a customer.

    Business Context: Feature extraction, model inference, and SHAP
    computation are all delegated to injected dependencies. This service
    only owns the assembly logic, keeping it testable in isolation.

    Args:
        customer: Active Customer entity.
        risk_score: Pre-computed composite risk score (from RiskModelService).

    Returns:
        PredictionResult with calibrated churn probability, SHAP explanations,
        and a deterministic recommended CS action.
    """
    features = self._feature_extractor.extract(customer)
    churn_prob = self._model.predict_proba(features)
    shap_features = self._model.explain(features)

    return PredictionResult(
        customer_id=customer.customer_id,
        churn_probability=ChurnProbability(value=churn_prob),
        risk_score=risk_score,
        top_shap_features=sorted(shap_features, key=lambda f: abs(f.shap_impact), reverse=True)[:5],
        model_version=self._model.version,
    )

src.domain.prediction.risk_model_service

RiskModelService – domain service for compliance + usage risk scoring.

Classes

RiskSignals dataclass

Raw risk inputs from the risk_signals table.

Parameters:

Name Type Description Default
compliance_gap_score float

0–1 score of open compliance gaps.

required
vendor_risk_flags int

Count of third-party vendor risk alerts.

required
usage_decay_score float

0–1 score of recent usage decline (computed from events).

required
Source code in src/domain/prediction/risk_model_service.py
@dataclass(frozen=True)
class RiskSignals:
    """Raw risk inputs from the risk_signals table.

    Args:
        compliance_gap_score: 0–1 score of open compliance gaps.
        vendor_risk_flags: Count of third-party vendor risk alerts.
        usage_decay_score: 0–1 score of recent usage decline (computed from events).
    """

    compliance_gap_score: float
    vendor_risk_flags: int
    usage_decay_score: float
RiskModelService

Computes a composite risk score from compliance and usage signals.

Weights are calibrated to business impact: - Usage decay is the strongest leading indicator of near-term churn - Compliance gaps drive risk but not always churn (contractual stickiness) - Vendor risk flags have lower weight but non-zero contribution

These weights should be revisited quarterly using SHAP analysis on the full churn model to ensure they remain calibrated to observed outcomes.

Source code in src/domain/prediction/risk_model_service.py
class RiskModelService:
    """Computes a composite risk score from compliance and usage signals.

    Weights are calibrated to business impact:
    - Usage decay is the strongest leading indicator of near-term churn
    - Compliance gaps drive risk but not always churn (contractual stickiness)
    - Vendor risk flags have lower weight but non-zero contribution

    These weights should be revisited quarterly using SHAP analysis on
    the full churn model to ensure they remain calibrated to observed outcomes.
    """

    USAGE_WEIGHT: float = 0.50
    COMPLIANCE_WEIGHT: float = 0.35
    VENDOR_WEIGHT: float = 0.15
    VENDOR_FLAG_NORMALISER: float = 5.0  # treat 5+ flags as max risk

    def compute(self, signals: RiskSignals) -> RiskScore:
        """Compute a composite RiskScore from raw signals.

        Args:
            signals: The three risk signal components.

        Returns:
            RiskScore value object in [0, 1].
        """
        vendor_normalised = min(signals.vendor_risk_flags / self.VENDOR_FLAG_NORMALISER, 1.0)
        composite = (
            self.USAGE_WEIGHT * signals.usage_decay_score
            + self.COMPLIANCE_WEIGHT * signals.compliance_gap_score
            + self.VENDOR_WEIGHT * vendor_normalised
        )
        return RiskScore(value=round(composite, 4))
Functions
compute
compute(signals: RiskSignals) -> RiskScore

Compute a composite RiskScore from raw signals.

Parameters:

Name Type Description Default
signals RiskSignals

The three risk signal components.

required

Returns:

Type Description
RiskScore

RiskScore value object in [0, 1].

Source code in src/domain/prediction/risk_model_service.py
def compute(self, signals: RiskSignals) -> RiskScore:
    """Compute a composite RiskScore from raw signals.

    Args:
        signals: The three risk signal components.

    Returns:
        RiskScore value object in [0, 1].
    """
    vendor_normalised = min(signals.vendor_risk_flags / self.VENDOR_FLAG_NORMALISER, 1.0)
    composite = (
        self.USAGE_WEIGHT * signals.usage_decay_score
        + self.COMPLIANCE_WEIGHT * signals.compliance_gap_score
        + self.VENDOR_WEIGHT * vendor_normalised
    )
    return RiskScore(value=round(composite, 4))

GTM Domain

src.domain.gtm.entities

Opportunity entity for the GTM bounded context.

Classes

Opportunity dataclass

A sales or expansion opportunity linked to a customer.

Parameters:

Name Type Description Default
opp_id str

UUID primary key.

required
customer_id str

FK to Customer entity.

required
stage SalesStage

Current CRM pipeline stage.

required
close_date date

Actual or expected close date.

required
amount Decimal

USD opportunity value.

required
sales_owner str

Anonymised sales rep identifier.

required
Source code in src/domain/gtm/entities.py
@dataclass
class Opportunity:
    """A sales or expansion opportunity linked to a customer.

    Args:
        opp_id: UUID primary key.
        customer_id: FK to Customer entity.
        stage: Current CRM pipeline stage.
        close_date: Actual or expected close date.
        amount: USD opportunity value.
        sales_owner: Anonymised sales rep identifier.
    """

    opp_id: str
    customer_id: str
    stage: SalesStage
    close_date: date
    amount: Decimal
    sales_owner: str

    @property
    def is_at_risk(self) -> bool:
        """True if an open opportunity exists for a customer who may churn.

        Used in the GTM domain to flag revenue at risk in the sales pipeline.
        """
        return self.stage.is_open and self.close_date >= date.today()
Attributes
is_at_risk property
is_at_risk: bool

True if an open opportunity exists for a customer who may churn.

Used in the GTM domain to flag revenue at risk in the sales pipeline.

src.domain.gtm.value_objects

Value objects for the GTM domain.

Classes

SalesStage

Bases: StrEnum

CRM pipeline stage for an opportunity.

Source code in src/domain/gtm/value_objects.py
class SalesStage(StrEnum):
    """CRM pipeline stage for an opportunity."""

    PROSPECTING = "prospecting"
    QUALIFICATION = "qualification"
    PROPOSAL = "proposal"
    CLOSED_WON = "closed_won"
    CLOSED_LOST = "closed_lost"

    @property
    def is_open(self) -> bool:
        return self not in {SalesStage.CLOSED_WON, SalesStage.CLOSED_LOST}

src.domain.gtm.repository

OpportunityRepository – abstract port for the GTM domain.

Classes

OpportunityRepository

Bases: ABC

Port for persisting and retrieving Opportunity entities.

Source code in src/domain/gtm/repository.py
class OpportunityRepository(ABC):
    """Port for persisting and retrieving Opportunity entities."""

    @abstractmethod
    def get_open_for_customer(self, customer_id: str) -> Sequence[Opportunity]:
        """Return all open opportunities for a given customer."""
        ...
Functions
get_open_for_customer abstractmethod
get_open_for_customer(customer_id: str) -> Sequence[Opportunity]

Return all open opportunities for a given customer.

Source code in src/domain/gtm/repository.py
@abstractmethod
def get_open_for_customer(self, customer_id: str) -> Sequence[Opportunity]:
    """Return all open opportunities for a given customer."""
    ...