Skip to content

Application Use Cases

The application layer orchestrates domain objects to fulfil business use cases. It has no knowledge of FastAPI, DuckDB, or pickle files — those are injected via the repository and model ports.

Predict Churn

src.application.use_cases.predict_churn

PredictChurnUseCase – application layer use case.

Orchestrates domain objects to produce a churn prediction. This layer has no knowledge of FastAPI, DuckDB, or pickle files.

Classes

PredictChurnRequest dataclass

Input DTO for the PredictChurnUseCase.

Source code in src/application/use_cases/predict_churn.py
@dataclass
class PredictChurnRequest:
    """Input DTO for the PredictChurnUseCase."""

    customer_id: str
    lookback_days: int = field(default=30)
PredictChurnUseCase

Coordinates retrieval, feature engineering, and prediction for one customer.

Parameters:

Name Type Description Default
customer_repo CustomerRepository

Repository for fetching Customer entities.

required
usage_repo UsageRepository

Repository for fetching UsageEvent sequences (retained for future use; feature extraction now happens inside the extractor).

required
churn_service ChurnModelService

Domain service that runs the churn model.

required
risk_service RiskModelService

Domain service that computes the composite risk score.

required
risk_signals_repo RiskSignalsRepository | None

Optional repository for real risk signal data. Falls back to zeroed signals when not provided, which preserves backward compatibility for unit tests.

None
Source code in src/application/use_cases/predict_churn.py
class PredictChurnUseCase:
    """Coordinates retrieval, feature engineering, and prediction for one customer.

    Args:
        customer_repo: Repository for fetching Customer entities.
        usage_repo: Repository for fetching UsageEvent sequences (retained for
                    future use; feature extraction now happens inside the extractor).
        churn_service: Domain service that runs the churn model.
        risk_service: Domain service that computes the composite risk score.
        risk_signals_repo: Optional repository for real risk signal data.
                           Falls back to zeroed signals when not provided,
                           which preserves backward compatibility for unit tests.
    """

    def __init__(
        self,
        customer_repo: CustomerRepository,
        usage_repo: UsageRepository,
        churn_service: ChurnModelService,
        risk_service: RiskModelService,
        risk_signals_repo: RiskSignalsRepository | None = None,
    ) -> None:
        self._customer_repo = customer_repo
        self._usage_repo = usage_repo
        self._churn_service = churn_service
        self._risk_service = risk_service
        self._risk_signals_repo = risk_signals_repo

    def execute(self, request: PredictChurnRequest) -> PredictionResult:
        """Run the end-to-end churn prediction pipeline for a single customer.

        Business Context: Fetches customer state, resolves real risk signals
        from raw.risk_signals (compliance gaps + vendor flags + usage decay),
        and delegates to ChurnModelService which queries the dbt feature mart
        for all ML features. One DB read per layer, < 5ms total.

        Args:
            request: Contains customer_id and optional lookback window.

        Returns:
            PredictionResult with calibrated churn probability, composite risk
            score, top-5 SHAP feature drivers, and a recommended CS action.

        Raises:
            ValueError: If the customer is not found or has already churned.
        """
        customer = self._customer_repo.get_by_id(request.customer_id)
        if customer is None:
            raise ValueError(f"Customer {request.customer_id} not found.")
        if not customer.is_active:
            raise ValueError(f"Customer {request.customer_id} has already churned on {customer.churn_date}.")

        # Resolve real risk signals when the infrastructure repo is available
        if self._risk_signals_repo is not None:
            risk_signals = self._risk_signals_repo.get_signals(request.customer_id)
        else:
            # Fallback for unit tests that don't wire up real infrastructure
            risk_signals = RiskSignals(
                compliance_gap_score=0.0,
                vendor_risk_flags=0,
                usage_decay_score=0.0,
            )

        risk_score = self._risk_service.compute(risk_signals)
        return self._churn_service.predict(customer, risk_score)
Functions
execute
execute(request: PredictChurnRequest) -> PredictionResult

Run the end-to-end churn prediction pipeline for a single customer.

Business Context: Fetches customer state, resolves real risk signals from raw.risk_signals (compliance gaps + vendor flags + usage decay), and delegates to ChurnModelService which queries the dbt feature mart for all ML features. One DB read per layer, < 5ms total.

Parameters:

Name Type Description Default
request PredictChurnRequest

Contains customer_id and optional lookback window.

required

Returns:

Type Description
PredictionResult

PredictionResult with calibrated churn probability, composite risk

PredictionResult

score, top-5 SHAP feature drivers, and a recommended CS action.

Raises:

Type Description
ValueError

If the customer is not found or has already churned.

Source code in src/application/use_cases/predict_churn.py
def execute(self, request: PredictChurnRequest) -> PredictionResult:
    """Run the end-to-end churn prediction pipeline for a single customer.

    Business Context: Fetches customer state, resolves real risk signals
    from raw.risk_signals (compliance gaps + vendor flags + usage decay),
    and delegates to ChurnModelService which queries the dbt feature mart
    for all ML features. One DB read per layer, < 5ms total.

    Args:
        request: Contains customer_id and optional lookback window.

    Returns:
        PredictionResult with calibrated churn probability, composite risk
        score, top-5 SHAP feature drivers, and a recommended CS action.

    Raises:
        ValueError: If the customer is not found or has already churned.
    """
    customer = self._customer_repo.get_by_id(request.customer_id)
    if customer is None:
        raise ValueError(f"Customer {request.customer_id} not found.")
    if not customer.is_active:
        raise ValueError(f"Customer {request.customer_id} has already churned on {customer.churn_date}.")

    # Resolve real risk signals when the infrastructure repo is available
    if self._risk_signals_repo is not None:
        risk_signals = self._risk_signals_repo.get_signals(request.customer_id)
    else:
        # Fallback for unit tests that don't wire up real infrastructure
        risk_signals = RiskSignals(
            compliance_gap_score=0.0,
            vendor_risk_flags=0,
            usage_decay_score=0.0,
        )

    risk_score = self._risk_service.compute(risk_signals)
    return self._churn_service.predict(customer, risk_score)

Compute Risk Score

src.application.use_cases.compute_risk_score

ComputeRiskScoreUseCase – application layer use case.

Classes

ComputeRiskScoreRequest dataclass

Input DTO for the ComputeRiskScoreUseCase.

Source code in src/application/use_cases/compute_risk_score.py
@dataclass
class ComputeRiskScoreRequest:
    """Input DTO for the ComputeRiskScoreUseCase."""

    customer_id: str
    compliance_gap_score: float
    vendor_risk_flags: int
    usage_decay_score: float
ComputeRiskScoreUseCase

Computes a composite risk score from pre-fetched signal values.

Parameters:

Name Type Description Default
risk_service RiskModelService

Domain service that applies the weighting formula.

required
Source code in src/application/use_cases/compute_risk_score.py
class ComputeRiskScoreUseCase:
    """Computes a composite risk score from pre-fetched signal values.

    Args:
        risk_service: Domain service that applies the weighting formula.
    """

    def __init__(self, risk_service: RiskModelService) -> None:
        self._risk_service = risk_service

    def execute(self, request: ComputeRiskScoreRequest) -> RiskScore:
        """Compute and return a RiskScore for the given signals.

        Args:
            request: Pre-fetched signal values for the customer.

        Returns:
            RiskScore value object in [0, 1] with risk tier.
        """
        signals = RiskSignals(
            compliance_gap_score=request.compliance_gap_score,
            vendor_risk_flags=request.vendor_risk_flags,
            usage_decay_score=request.usage_decay_score,
        )
        return self._risk_service.compute(signals)
Functions
execute
execute(request: ComputeRiskScoreRequest) -> RiskScore

Compute and return a RiskScore for the given signals.

Parameters:

Name Type Description Default
request ComputeRiskScoreRequest

Pre-fetched signal values for the customer.

required

Returns:

Type Description
RiskScore

RiskScore value object in [0, 1] with risk tier.

Source code in src/application/use_cases/compute_risk_score.py
def execute(self, request: ComputeRiskScoreRequest) -> RiskScore:
    """Compute and return a RiskScore for the given signals.

    Args:
        request: Pre-fetched signal values for the customer.

    Returns:
        RiskScore value object in [0, 1] with risk tier.
    """
    signals = RiskSignals(
        compliance_gap_score=request.compliance_gap_score,
        vendor_risk_flags=request.vendor_risk_flags,
        usage_decay_score=request.usage_decay_score,
    )
    return self._risk_service.compute(signals)

Get Customer 360

src.application.use_cases.get_customer_360

GetCustomer360UseCase – assembles a full Customer 360 profile.

Orchestrates the customer domain, prediction domain, and raw DuckDB queries to produce a single rich response for the Customer 360 API endpoint.

Classes

ShapFeatureSummary dataclass

Application-layer DTO for a single SHAP feature contribution.

Parameters:

Name Type Description Default
feature str

Feature name as returned by the model.

required
value float

Raw feature value.

required
shap_impact float

Signed SHAP contribution (positive = increases churn risk).

required
Source code in src/application/use_cases/get_customer_360.py
@dataclass
class ShapFeatureSummary:
    """Application-layer DTO for a single SHAP feature contribution.

    Args:
        feature: Feature name as returned by the model.
        value: Raw feature value.
        shap_impact: Signed SHAP contribution (positive = increases churn risk).
    """

    feature: str
    value: float
    shap_impact: float
Customer360Profile dataclass

Application-layer DTO returned by GetCustomer360UseCase.

This DTO is mapped to Customer360Response by the delivery layer (FastAPI router). Keeping it here ensures the application layer has no dependency on app/schemas/.

Parameters:

Name Type Description Default
customer_id str

Unique customer identifier.

required
plan_tier str

Commercial tier (starter / growth / enterprise).

required
industry str

Vertical segment.

required
mrr float

Monthly Recurring Revenue in USD.

required
tenure_days int

Days since signup.

required
churn_probability float

Calibrated P(churn in 90 days), 0–1.

required
risk_tier str

LOW | MEDIUM | HIGH | CRITICAL.

required
top_shap_features list[ShapFeatureSummary]

Top SHAP drivers (sorted by |impact|).

list()
events_last_30d int

Product events in the last 30 days.

0
open_ticket_count int

Currently open support tickets.

0
gtm_stage str | None

Most recent GTM opportunity stage, if any.

None
latest_prediction_at str

ISO-8601 UTC timestamp of the prediction.

''
Source code in src/application/use_cases/get_customer_360.py
@dataclass
class Customer360Profile:
    """Application-layer DTO returned by GetCustomer360UseCase.

    This DTO is mapped to Customer360Response by the delivery layer (FastAPI router).
    Keeping it here ensures the application layer has no dependency on app/schemas/.

    Args:
        customer_id: Unique customer identifier.
        plan_tier: Commercial tier (starter / growth / enterprise).
        industry: Vertical segment.
        mrr: Monthly Recurring Revenue in USD.
        tenure_days: Days since signup.
        churn_probability: Calibrated P(churn in 90 days), 0–1.
        risk_tier: LOW | MEDIUM | HIGH | CRITICAL.
        top_shap_features: Top SHAP drivers (sorted by |impact|).
        events_last_30d: Product events in the last 30 days.
        open_ticket_count: Currently open support tickets.
        gtm_stage: Most recent GTM opportunity stage, if any.
        latest_prediction_at: ISO-8601 UTC timestamp of the prediction.
    """

    customer_id: str
    plan_tier: str
    industry: str
    mrr: float
    tenure_days: int
    churn_probability: float
    risk_tier: str
    top_shap_features: list[ShapFeatureSummary] = field(default_factory=list)
    events_last_30d: int = 0
    open_ticket_count: int = 0
    gtm_stage: str | None = None
    latest_prediction_at: str = ""
GetCustomer360Request dataclass

Input DTO for GetCustomer360UseCase.

Parameters:

Name Type Description Default
customer_id str

UUID of the customer to profile.

required
Source code in src/application/use_cases/get_customer_360.py
@dataclass
class GetCustomer360Request:
    """Input DTO for GetCustomer360UseCase.

    Args:
        customer_id: UUID of the customer to profile.
    """

    customer_id: str
GetCustomer360UseCase

Assembles a Customer 360 profile from multiple domain sources.

Business Context: CS teams spend 10–15 min per customer pulling data from 3+ tools before each call. This use case collapses that into a single <50ms API response — enabling same-day at-risk customer triage.

Parameters:

Name Type Description Default
customer_repo CustomerRepository

Reads customer master data.

required
predict_use_case PredictChurnUseCase

Reuses the existing churn prediction pipeline.

required
Source code in src/application/use_cases/get_customer_360.py
class GetCustomer360UseCase:
    """Assembles a Customer 360 profile from multiple domain sources.

    Business Context: CS teams spend 10–15 min per customer pulling data from
    3+ tools before each call. This use case collapses that into a single <50ms
    API response — enabling same-day at-risk customer triage.

    Args:
        customer_repo: Reads customer master data.
        predict_use_case: Reuses the existing churn prediction pipeline.
    """

    def __init__(
        self,
        customer_repo: CustomerRepository,
        predict_use_case: PredictChurnUseCase,
    ) -> None:
        self._customer_repo = customer_repo
        self._predict_use_case = predict_use_case

    def execute(self, request: GetCustomer360Request) -> Customer360Profile:
        """Build and return the Customer 360 profile.

        Business Context: Single entrypoint for all customer health data —
        churn score, feature drivers, usage velocity, support load, and
        GTM pipeline stage.

        Args:
            request: Contains customer_id.

        Returns:
            Customer360Profile DTO with all fields populated.

        Raises:
            ValueError: If the customer is not found.
        """
        customer = self._customer_repo.get_by_id(request.customer_id)
        if customer is None:
            raise ValueError(f"Customer {request.customer_id} not found.")

        prediction = self._predict_use_case.execute(PredictChurnRequest(customer_id=request.customer_id))

        events_last_30d, open_ticket_count, gtm_stage = self._query_supplemental(request.customer_id)

        shap_features = [
            ShapFeatureSummary(
                feature=f.feature_name,
                value=f.feature_value,
                shap_impact=f.shap_impact,
            )
            for f in prediction.top_shap_features
        ]

        return Customer360Profile(
            customer_id=customer.customer_id,
            plan_tier=customer.plan_tier.value,
            industry=customer.industry.value,
            mrr=float(customer.mrr.amount),
            tenure_days=customer.tenure_days,
            churn_probability=prediction.churn_probability.value,
            risk_tier=prediction.risk_score.tier,
            top_shap_features=shap_features,
            events_last_30d=events_last_30d,
            open_ticket_count=open_ticket_count,
            gtm_stage=gtm_stage,
            latest_prediction_at=prediction.predicted_at.isoformat(),
        )

    def _query_supplemental(self, customer_id: str) -> tuple[int, int, str | None]:
        """Fetch events_last_30d, open_ticket_count, and GTM stage from DuckDB.

        Business Context: These signals are leading indicators of churn risk —
        low event velocity + high ticket volume = imminent churn pattern.

        Args:
            customer_id: The customer to query supplemental data for.

        Returns:
            Tuple of (events_last_30d, open_ticket_count, gtm_stage).
        """
        try:
            with get_connection() as conn:
                event_row = conn.execute(
                    """
                    SELECT COUNT(*) AS cnt
                    FROM raw.usage_events
                    WHERE customer_id = ?
                      AND timestamp >= CURRENT_DATE - INTERVAL 30 DAY
                    """,
                    [customer_id],
                ).fetchone()

                ticket_row = conn.execute(
                    """
                    SELECT COUNT(*) AS cnt
                    FROM raw.support_tickets
                    WHERE customer_id = ?
                      AND resolution_time IS NULL
                    """,
                    [customer_id],
                ).fetchone()

                gtm_row = conn.execute(
                    """
                    SELECT stage
                    FROM raw.gtm_opportunities
                    WHERE customer_id = ?
                    ORDER BY close_date DESC
                    LIMIT 1
                    """,
                    [customer_id],
                ).fetchone()

            events_last_30d = int(event_row[0]) if event_row else 0
            open_ticket_count = int(ticket_row[0]) if ticket_row else 0
            gtm_stage = str(gtm_row[0]) if gtm_row else None
        except Exception:
            # Graceful degradation — return zeros if DuckDB unavailable
            events_last_30d = 0
            open_ticket_count = 0
            gtm_stage = None

        return events_last_30d, open_ticket_count, gtm_stage
Functions
execute
execute(request: GetCustomer360Request) -> Customer360Profile

Build and return the Customer 360 profile.

Business Context: Single entrypoint for all customer health data — churn score, feature drivers, usage velocity, support load, and GTM pipeline stage.

Parameters:

Name Type Description Default
request GetCustomer360Request

Contains customer_id.

required

Returns:

Type Description
Customer360Profile

Customer360Profile DTO with all fields populated.

Raises:

Type Description
ValueError

If the customer is not found.

Source code in src/application/use_cases/get_customer_360.py
def execute(self, request: GetCustomer360Request) -> Customer360Profile:
    """Build and return the Customer 360 profile.

    Business Context: Single entrypoint for all customer health data —
    churn score, feature drivers, usage velocity, support load, and
    GTM pipeline stage.

    Args:
        request: Contains customer_id.

    Returns:
        Customer360Profile DTO with all fields populated.

    Raises:
        ValueError: If the customer is not found.
    """
    customer = self._customer_repo.get_by_id(request.customer_id)
    if customer is None:
        raise ValueError(f"Customer {request.customer_id} not found.")

    prediction = self._predict_use_case.execute(PredictChurnRequest(customer_id=request.customer_id))

    events_last_30d, open_ticket_count, gtm_stage = self._query_supplemental(request.customer_id)

    shap_features = [
        ShapFeatureSummary(
            feature=f.feature_name,
            value=f.feature_value,
            shap_impact=f.shap_impact,
        )
        for f in prediction.top_shap_features
    ]

    return Customer360Profile(
        customer_id=customer.customer_id,
        plan_tier=customer.plan_tier.value,
        industry=customer.industry.value,
        mrr=float(customer.mrr.amount),
        tenure_days=customer.tenure_days,
        churn_probability=prediction.churn_probability.value,
        risk_tier=prediction.risk_score.tier,
        top_shap_features=shap_features,
        events_last_30d=events_last_30d,
        open_ticket_count=open_ticket_count,
        gtm_stage=gtm_stage,
        latest_prediction_at=prediction.predicted_at.isoformat(),
    )

Functions

Generate Expansion Summary

src.application.use_cases.generate_expansion_summary

GenerateExpansionSummaryUseCase — orchestrates the expansion narrative pipeline.

Translates a high-propensity ExpansionResult into an AE tactical brief and optional email draft, validated by ExpansionGuardrailsService before returning.

Classes

PropensityTooLowError

Bases: ValueError

Raised when propensity is below the API-layer minimum threshold (0.15).

Business Context: Accounts with propensity < 0.15 are not expansion candidates. Calling the LLM for these accounts wastes tokens and produces misleading briefs. The API layer maps this to HTTP 422 so callers know the account is not ready.

Source code in src/application/use_cases/generate_expansion_summary.py
class PropensityTooLowError(ValueError):
    """Raised when propensity is below the API-layer minimum threshold (0.15).

    Business Context: Accounts with propensity < 0.15 are not expansion candidates.
    Calling the LLM for these accounts wastes tokens and produces misleading briefs.
    The API layer maps this to HTTP 422 so callers know the account is not ready.
    """
GenerateExpansionSummaryRequest dataclass

Input DTO for GenerateExpansionSummaryUseCase.

Parameters:

Name Type Description Default
customer_id str

UUID of the active customer to generate a brief for.

required
audience Literal['account_executive', 'csm']

'account_executive' (tactical brief + optional email) or 'csm' (nurture brief only; email_draft forced to None).

'account_executive'
include_email_draft bool

If True and audience is 'account_executive', the response will include a 3-sentence email draft.

False
Source code in src/application/use_cases/generate_expansion_summary.py
@dataclass
class GenerateExpansionSummaryRequest:
    """Input DTO for GenerateExpansionSummaryUseCase.

    Args:
        customer_id: UUID of the active customer to generate a brief for.
        audience: 'account_executive' (tactical brief + optional email) or
                  'csm' (nurture brief only; email_draft forced to None).
        include_email_draft: If True and audience is 'account_executive',
                             the response will include a 3-sentence email draft.
    """

    customer_id: str
    audience: Literal["account_executive", "csm"] = field(default="account_executive")
    include_email_draft: bool = False
GenerateExpansionSummaryUseCase

Generates a personalised AE brief grounded in the expansion propensity model.

Business Context: Reduces AE prep time from ~20 minutes to 30 seconds. Personalisation via SHAP signals drives 10–15% conversion lift vs generic outreach. The correlation_id in each result enables the data team to join brief quality (fact_confidence) to close rates in the V2 fine-tuning flywheel.

Pipeline
  1. Fetch Customer entity (raises ValueError if not found / churned)
  2. Run PredictExpansionUseCase → ExpansionResult
  3. Propensity < 0.15 → raise PropensityTooLowError (API → 422)
  4. Propensity < 0.35 → return "not ready" result without LLM call
  5. CSM audience override: force include_email_draft=False
  6. Build expansion prompt via PromptBuilder
  7. Call LLM via SummaryPort.generate_from_prompt()
  8. Validate + transform via ExpansionGuardrailsService
  9. Return ExpansionSummaryResult

Parameters:

Name Type Description Default
customer_repo CustomerRepository

Repository for fetching Customer entities.

required
expansion_use_case PredictExpansionUseCase

PredictExpansionUseCase for propensity + SHAP.

required
summary_service SummaryPort

SummaryPort implementation (Groq or Ollama).

required
guardrails ExpansionGuardrailsService

ExpansionGuardrailsService for validation + watermark.

required
Source code in src/application/use_cases/generate_expansion_summary.py
class GenerateExpansionSummaryUseCase:
    """Generates a personalised AE brief grounded in the expansion propensity model.

    Business Context: Reduces AE prep time from ~20 minutes to 30 seconds.
    Personalisation via SHAP signals drives 10–15% conversion lift vs generic
    outreach. The correlation_id in each result enables the data team to join
    brief quality (fact_confidence) to close rates in the V2 fine-tuning flywheel.

    Pipeline:
      1. Fetch Customer entity (raises ValueError if not found / churned)
      2. Run PredictExpansionUseCase → ExpansionResult
      3. Propensity < 0.15 → raise PropensityTooLowError (API → 422)
      4. Propensity < 0.35 → return "not ready" result without LLM call
      5. CSM audience override: force include_email_draft=False
      6. Build expansion prompt via PromptBuilder
      7. Call LLM via SummaryPort.generate_from_prompt()
      8. Validate + transform via ExpansionGuardrailsService
      9. Return ExpansionSummaryResult

    Args:
        customer_repo: Repository for fetching Customer entities.
        expansion_use_case: PredictExpansionUseCase for propensity + SHAP.
        summary_service: SummaryPort implementation (Groq or Ollama).
        guardrails: ExpansionGuardrailsService for validation + watermark.
    """

    def __init__(
        self,
        customer_repo: CustomerRepository,
        expansion_use_case: PredictExpansionUseCase,
        summary_service: SummaryPort,
        guardrails: ExpansionGuardrailsService,
    ) -> None:
        self._customer_repo = customer_repo
        self._expansion_use_case = expansion_use_case
        self._summary_service = summary_service
        self._guardrails = guardrails
        self._prompt_builder = PromptBuilder()

    def execute(self, request: GenerateExpansionSummaryRequest) -> ExpansionSummaryResult:
        """Run the full expansion narrative pipeline for a single customer.

        Business Context: All LLM calls are grounded in verified model outputs
        (ExpansionResult SHAP features). The guardrail layer ensures hallucinated
        signals are caught before the brief reaches an AE's CRM.

        Args:
            request: Contains customer_id, audience, and email draft flag.

        Returns:
            ExpansionSummaryResult with brief, guardrail result, and provenance.

        Raises:
            ValueError: If the customer is not found or has already churned.
            PropensityTooLowError: If propensity < 0.15 (API maps this to 422).
        """
        log = logger.bind(
            customer_id=request.customer_id,
            audience=request.audience,
        )
        log.info("expansion_summary.generate.start")

        # Step 1 — fetch customer
        customer = self._customer_repo.get_by_id(request.customer_id)
        if customer is None:
            raise ValueError(f"Customer {request.customer_id} not found.")
        if not customer.is_active:
            raise ValueError(f"Customer {request.customer_id} has already churned on {customer.churn_date}.")

        # Step 2 — run expansion prediction
        expansion_result = self._expansion_use_case.execute(PredictExpansionRequest(customer_id=request.customer_id))
        propensity = expansion_result.propensity.value

        # Step 3 — API-layer propensity gate
        if propensity < _MIN_PROPENSITY_FOR_API:
            raise PropensityTooLowError(
                f"Propensity {propensity:.2f} is below minimum threshold "
                f"{_MIN_PROPENSITY_FOR_API} for expansion brief generation."
            )

        correlation_id = uuid.uuid4().hex

        # Step 4 — LLM propensity gate: return "not ready" without calling the LLM
        if propensity < _MIN_PROPENSITY_FOR_LLM:
            log.info(
                "expansion_summary.not_ready",
                propensity=propensity,
                threshold=_MIN_PROPENSITY_FOR_LLM,
            )
            return self._not_ready_result(expansion_result, correlation_id)

        # Step 5 — CSM audience override: suppress email draft
        include_draft = request.include_email_draft
        if request.audience == "csm":
            if include_draft:
                log.warning(
                    "expansion_summary.csm_email_suppressed",
                    hint="email_draft is not available for csm audience",
                )
            include_draft = False

        # Step 6 — build expansion prompt
        prompt = self._prompt_builder.build_expansion_prompt(
            expansion_result=expansion_result,
            audience=request.audience,
            include_email_draft=include_draft,
        )

        # Step 7 — call LLM
        raw_text = self._summary_service.generate_from_prompt(prompt)
        log.info("expansion_summary.llm.response_received", length=len(raw_text))

        # Step 8 — parse email draft from LLM output (if requested)
        ae_brief_raw, email_draft_raw = self._split_llm_output(raw_text, include_draft)

        # Step 9 — validate + transform via guardrails
        guardrail_result = self._guardrails.validate(
            ae_tactical_brief=ae_brief_raw,
            email_draft=email_draft_raw,
            expansion_result=expansion_result,
            propensity=propensity,
        )
        if guardrail_result.guardrail_status != "PASSED":
            log.warning(
                "expansion_summary.guardrail.flags",
                status=guardrail_result.guardrail_status,
                flags=guardrail_result.flags,
                confidence=guardrail_result.fact_confidence,
            )

        return self._build_result(
            request=request,
            expansion_result=expansion_result,
            guardrail_result=guardrail_result,
            correlation_id=correlation_id,
        )

    # ── helpers ───────────────────────────────────────────────────────────────

    def _not_ready_result(self, expansion_result: ExpansionResult, correlation_id: str) -> ExpansionSummaryResult:
        """Return a 'not ready' result without invoking the LLM."""
        propensity = expansion_result.propensity.value
        tier = str(expansion_result.propensity.tier.value)
        target = expansion_result.target.next_tier.value if expansion_result.target.next_tier else "N/A"
        return ExpansionSummaryResult(
            customer_id=expansion_result.customer_id,
            propensity_summary=(
                f"Account not ready for outreach — propensity {propensity:.0%} "
                f"is below the {_MIN_PROPENSITY_FOR_LLM:.0%} threshold."
            ),
            key_narrative_drivers=[],
            ae_tactical_brief=(
                f"Account not ready for outreach. "
                f"Propensity {propensity:.0%} ({tier}) is below the activation "
                f"threshold. Monitor for rising usage signals before scheduling "
                f"an upgrade conversation toward {target}.\n\n{WATERMARK}"
            ),
            email_draft=None,
            guardrail_status="PASSED",
            fact_confidence=1.0,
            generated_at=datetime.now(UTC),
            model_used=self._summary_service.model_name,
            llm_provider=self._summary_service.provider_name,
            propensity_score=propensity,
            propensity_tier=tier,
            target_tier=target if target != "N/A" else None,
            expected_arr_uplift=expansion_result.expected_arr_uplift,
            correlation_id=correlation_id,
        )

    def _split_llm_output(self, raw_text: str, include_draft: bool) -> tuple[str, str | None]:
        """Split LLM output into ae_brief and optional email_draft.

        The prompt instructs the LLM to label the email section as [EMAIL_DRAFT]:
        when include_email_draft=True. If the section is absent, email_draft is None.
        """
        if not include_draft or "[EMAIL_DRAFT]" not in raw_text:
            return raw_text.strip(), None

        parts = raw_text.split("[EMAIL_DRAFT]", maxsplit=1)
        ae_brief = parts[0].strip()
        email_draft = parts[1].strip() if len(parts) > 1 else None
        return ae_brief, email_draft

    def _build_result(
        self,
        request: GenerateExpansionSummaryRequest,
        expansion_result: ExpansionResult,
        guardrail_result: ExpansionGuardrailResult,
        correlation_id: str,
    ) -> ExpansionSummaryResult:
        """Assemble the final ExpansionSummaryResult from pipeline outputs."""
        propensity = expansion_result.propensity.value
        tier = str(expansion_result.propensity.tier.value)
        target = expansion_result.target.next_tier.value if expansion_result.target.next_tier else None

        key_drivers = [_FEATURE_LABELS.get(f.feature_name, f.feature_name) for f in expansion_result.top_features[:3]]

        propensity_summary = (
            f"This account has {tier.upper()} expansion propensity "
            f"({propensity:.0%}) toward {target or 'the next tier'}. "
            f"Expected ARR uplift: ${expansion_result.expected_arr_uplift:,.0f}."
        )

        # CSM audience never includes an email draft — enforce at result level.
        email_draft = guardrail_result.email_draft if request.audience == "account_executive" else None

        return ExpansionSummaryResult(
            customer_id=request.customer_id,
            propensity_summary=propensity_summary,
            key_narrative_drivers=key_drivers,
            ae_tactical_brief=guardrail_result.ae_tactical_brief,
            email_draft=email_draft,
            guardrail_status=guardrail_result.guardrail_status,
            fact_confidence=guardrail_result.fact_confidence,
            generated_at=datetime.now(UTC),
            model_used=self._summary_service.model_name,
            llm_provider=self._summary_service.provider_name,
            propensity_score=propensity,
            propensity_tier=tier,
            target_tier=target,
            expected_arr_uplift=expansion_result.expected_arr_uplift,
            correlation_id=correlation_id,
        )
Functions
execute
execute(request: GenerateExpansionSummaryRequest) -> ExpansionSummaryResult

Run the full expansion narrative pipeline for a single customer.

Business Context: All LLM calls are grounded in verified model outputs (ExpansionResult SHAP features). The guardrail layer ensures hallucinated signals are caught before the brief reaches an AE's CRM.

Parameters:

Name Type Description Default
request GenerateExpansionSummaryRequest

Contains customer_id, audience, and email draft flag.

required

Returns:

Type Description
ExpansionSummaryResult

ExpansionSummaryResult with brief, guardrail result, and provenance.

Raises:

Type Description
ValueError

If the customer is not found or has already churned.

PropensityTooLowError

If propensity < 0.15 (API maps this to 422).

Source code in src/application/use_cases/generate_expansion_summary.py
def execute(self, request: GenerateExpansionSummaryRequest) -> ExpansionSummaryResult:
    """Run the full expansion narrative pipeline for a single customer.

    Business Context: All LLM calls are grounded in verified model outputs
    (ExpansionResult SHAP features). The guardrail layer ensures hallucinated
    signals are caught before the brief reaches an AE's CRM.

    Args:
        request: Contains customer_id, audience, and email draft flag.

    Returns:
        ExpansionSummaryResult with brief, guardrail result, and provenance.

    Raises:
        ValueError: If the customer is not found or has already churned.
        PropensityTooLowError: If propensity < 0.15 (API maps this to 422).
    """
    log = logger.bind(
        customer_id=request.customer_id,
        audience=request.audience,
    )
    log.info("expansion_summary.generate.start")

    # Step 1 — fetch customer
    customer = self._customer_repo.get_by_id(request.customer_id)
    if customer is None:
        raise ValueError(f"Customer {request.customer_id} not found.")
    if not customer.is_active:
        raise ValueError(f"Customer {request.customer_id} has already churned on {customer.churn_date}.")

    # Step 2 — run expansion prediction
    expansion_result = self._expansion_use_case.execute(PredictExpansionRequest(customer_id=request.customer_id))
    propensity = expansion_result.propensity.value

    # Step 3 — API-layer propensity gate
    if propensity < _MIN_PROPENSITY_FOR_API:
        raise PropensityTooLowError(
            f"Propensity {propensity:.2f} is below minimum threshold "
            f"{_MIN_PROPENSITY_FOR_API} for expansion brief generation."
        )

    correlation_id = uuid.uuid4().hex

    # Step 4 — LLM propensity gate: return "not ready" without calling the LLM
    if propensity < _MIN_PROPENSITY_FOR_LLM:
        log.info(
            "expansion_summary.not_ready",
            propensity=propensity,
            threshold=_MIN_PROPENSITY_FOR_LLM,
        )
        return self._not_ready_result(expansion_result, correlation_id)

    # Step 5 — CSM audience override: suppress email draft
    include_draft = request.include_email_draft
    if request.audience == "csm":
        if include_draft:
            log.warning(
                "expansion_summary.csm_email_suppressed",
                hint="email_draft is not available for csm audience",
            )
        include_draft = False

    # Step 6 — build expansion prompt
    prompt = self._prompt_builder.build_expansion_prompt(
        expansion_result=expansion_result,
        audience=request.audience,
        include_email_draft=include_draft,
    )

    # Step 7 — call LLM
    raw_text = self._summary_service.generate_from_prompt(prompt)
    log.info("expansion_summary.llm.response_received", length=len(raw_text))

    # Step 8 — parse email draft from LLM output (if requested)
    ae_brief_raw, email_draft_raw = self._split_llm_output(raw_text, include_draft)

    # Step 9 — validate + transform via guardrails
    guardrail_result = self._guardrails.validate(
        ae_tactical_brief=ae_brief_raw,
        email_draft=email_draft_raw,
        expansion_result=expansion_result,
        propensity=propensity,
    )
    if guardrail_result.guardrail_status != "PASSED":
        log.warning(
            "expansion_summary.guardrail.flags",
            status=guardrail_result.guardrail_status,
            flags=guardrail_result.flags,
            confidence=guardrail_result.fact_confidence,
        )

    return self._build_result(
        request=request,
        expansion_result=expansion_result,
        guardrail_result=guardrail_result,
        correlation_id=correlation_id,
    )