Skip to content

AI Summary Domain

AI/LLM layer — executive summaries and RAG Q&A grounded in DuckDB customer data.

Domain Entities

src.domain.ai_summary.entities

AI Summary domain entities.

Defines the core data structures for the AI/LLM layer: - SummaryContext: all facts retrieved from DuckDB that ground the LLM prompt - GuardrailResult: outcome of the hallucination + fact-grounding validation pass - ExecutiveSummary: the final entity returned by the use case, with full audit trail

Classes

SummaryContext dataclass

All structured facts retrieved from DuckDB that will ground the LLM prompt.

Business Context: This is the "retrieval" step of our RAG strategy. A single B2B customer's full history fits comfortably in Llama-3's 128k context window, so we use context-stuffing rather than a vector database. The LLM is explicitly constrained to only reference facts present here.

Parameters:

Name Type Description Default
customer Customer

The Customer entity with profile and MRR data.

required
prediction PredictionResult

The PredictionResult including churn probability and SHAP features.

required
events_last_30d_by_type dict[str, int]

Count of usage events by type in the last 30 days.

required
open_tickets list[dict[str, object]]

List of open support tickets with priority, topic, and age.

required
gtm_opportunity dict[str, object] | None

Active GTM opportunity dict (stage, amount) if one exists.

required
cohort_churn_rate float

Churn rate for customers in the same tier + industry cohort.

required
Source code in src/domain/ai_summary/entities.py
@dataclass
class SummaryContext:
    """All structured facts retrieved from DuckDB that will ground the LLM prompt.

    Business Context: This is the "retrieval" step of our RAG strategy.
    A single B2B customer's full history fits comfortably in Llama-3's 128k
    context window, so we use context-stuffing rather than a vector database.
    The LLM is explicitly constrained to only reference facts present here.

    Args:
        customer: The Customer entity with profile and MRR data.
        prediction: The PredictionResult including churn probability and SHAP features.
        events_last_30d_by_type: Count of usage events by type in the last 30 days.
        open_tickets: List of open support tickets with priority, topic, and age.
        gtm_opportunity: Active GTM opportunity dict (stage, amount) if one exists.
        cohort_churn_rate: Churn rate for customers in the same tier + industry cohort.
    """

    customer: Customer
    prediction: PredictionResult
    events_last_30d_by_type: dict[str, int]
    open_tickets: list[dict[str, object]]
    gtm_opportunity: dict[str, object] | None
    cohort_churn_rate: float
GuardrailResult dataclass

Outcome of the GuardrailsService validation pass.

Business Context: All LLM outputs must pass a validation layer before reaching CS teams or executives. A flawed summary (wrong probability, hallucinated feature) could trigger the wrong CS action or damage trust.

Parameters:

Name Type Description Default
passed bool

True if all checks passed.

required
flags list[str]

List of specific violations detected (e.g. 'probability_mismatch').

required
confidence_score float

1.0 if fully clean; decreases 0.2 per flag. Minimum 0.0.

required
Source code in src/domain/ai_summary/entities.py
@dataclass(frozen=True)
class GuardrailResult:
    """Outcome of the GuardrailsService validation pass.

    Business Context: All LLM outputs must pass a validation layer before
    reaching CS teams or executives. A flawed summary (wrong probability,
    hallucinated feature) could trigger the wrong CS action or damage trust.

    Args:
        passed: True if all checks passed.
        flags: List of specific violations detected (e.g. 'probability_mismatch').
        confidence_score: 1.0 if fully clean; decreases 0.2 per flag. Minimum 0.0.
    """

    passed: bool
    flags: list[str]
    confidence_score: float
ExecutiveSummary dataclass

The final output entity of the AI Summary bounded context.

Contains the LLM-generated narrative, guardrail validation result, and full provenance metadata for audit and human review.

Business Context: CSMs use this for pre-meeting prep (~30 sec vs 15 min manual writing). Executives use it for portfolio risk reviews. The guardrail result and watermark ensure human-in-the-loop accountability.

Parameters:

Name Type Description Default
customer_id str

UUID of the customer this summary is about.

required
audience str

Target audience — 'csm' (tactical) or 'executive' (strategic).

required
content str

LLM-generated narrative with guardrail watermark appended.

required
guardrail GuardrailResult

Validation result including flags and confidence score.

required
generated_at datetime

UTC timestamp of when the summary was created.

required
model_used str

Name of the LLM model used (e.g. 'llama-3.1-8b-instant').

required
llm_provider str

Inference provider — 'groq' or 'ollama'.

required
Source code in src/domain/ai_summary/entities.py
@dataclass
class ExecutiveSummary:
    """The final output entity of the AI Summary bounded context.

    Contains the LLM-generated narrative, guardrail validation result,
    and full provenance metadata for audit and human review.

    Business Context: CSMs use this for pre-meeting prep (~30 sec vs 15 min
    manual writing). Executives use it for portfolio risk reviews. The
    guardrail result and watermark ensure human-in-the-loop accountability.

    Args:
        customer_id: UUID of the customer this summary is about.
        audience: Target audience — 'csm' (tactical) or 'executive' (strategic).
        content: LLM-generated narrative with guardrail watermark appended.
        guardrail: Validation result including flags and confidence score.
        generated_at: UTC timestamp of when the summary was created.
        model_used: Name of the LLM model used (e.g. 'llama-3.1-8b-instant').
        llm_provider: Inference provider — 'groq' or 'ollama'.
    """

    customer_id: str
    audience: str
    content: str
    guardrail: GuardrailResult
    generated_at: datetime
    model_used: str
    llm_provider: str
    prediction: PredictionResult | None = None

Summary Port (ABC)

src.domain.ai_summary.summary_port

SummaryPort – abstract base class for LLM backend adapters.

Both GroqSummaryService and OllamaSummaryService implement this port, making the LLM backend swappable without touching domain or application code.

Classes

SummaryPort

Bases: ABC

Abstract port for LLM text generation.

Business Context: This port decouples the domain from any specific LLM provider. The application layer only knows about SummaryPort — swapping Groq for Ollama (or a future provider) requires only a config change, not a code change in the use case.

Implementations must be stateless and thread-safe; FastAPI may call generate() concurrently from multiple request handlers.

Source code in src/domain/ai_summary/summary_port.py
class SummaryPort(ABC):
    """Abstract port for LLM text generation.

    Business Context: This port decouples the domain from any specific LLM
    provider. The application layer only knows about SummaryPort — swapping
    Groq for Ollama (or a future provider) requires only a config change,
    not a code change in the use case.

    Implementations must be stateless and thread-safe; FastAPI may call
    generate() concurrently from multiple request handlers.
    """

    @abstractmethod
    def generate(self, context: SummaryContext, audience: str) -> str:
        """Generate a raw LLM narrative grounded in the provided context.

        Business Context: Guardrails are applied by the caller (GuardrailsService)
        after this method returns. This method is responsible only for making
        the API call and returning the raw text.

        Args:
            context: Structured facts from DuckDB that ground the prompt.
            audience: Target audience — 'csm' (tactical) or 'executive' (strategic).

        Returns:
            Raw LLM-generated text string (watermark NOT yet appended).
        """

    @abstractmethod
    def generate_from_prompt(self, prompt: str) -> str:
        """Generate a raw LLM response from a pre-assembled prompt string.

        Business Context: Used by the expansion narrative pipeline where
        PromptBuilder.build_expansion_prompt() assembles the full prompt
        before calling the LLM. This avoids coupling the LLM backend to
        SummaryContext and allows expansion-specific prompt engineering.

        Args:
            prompt: Fully assembled prompt string (including [CONTEXT],
                    [INSTRUCTION], and [CONSTRAINT] blocks).

        Returns:
            Raw LLM-generated text string (watermark NOT yet appended).
        """

    @property
    @abstractmethod
    def model_name(self) -> str:
        """Name of the underlying LLM model (e.g. 'llama-3.1-8b-instant')."""

    @property
    @abstractmethod
    def provider_name(self) -> str:
        """Name of the inference provider (e.g. 'groq' or 'ollama')."""
Attributes
model_name abstractmethod property
model_name: str

Name of the underlying LLM model (e.g. 'llama-3.1-8b-instant').

provider_name abstractmethod property
provider_name: str

Name of the inference provider (e.g. 'groq' or 'ollama').

Functions
generate abstractmethod
generate(context: SummaryContext, audience: str) -> str

Generate a raw LLM narrative grounded in the provided context.

Business Context: Guardrails are applied by the caller (GuardrailsService) after this method returns. This method is responsible only for making the API call and returning the raw text.

Parameters:

Name Type Description Default
context SummaryContext

Structured facts from DuckDB that ground the prompt.

required
audience str

Target audience — 'csm' (tactical) or 'executive' (strategic).

required

Returns:

Type Description
str

Raw LLM-generated text string (watermark NOT yet appended).

Source code in src/domain/ai_summary/summary_port.py
@abstractmethod
def generate(self, context: SummaryContext, audience: str) -> str:
    """Generate a raw LLM narrative grounded in the provided context.

    Business Context: Guardrails are applied by the caller (GuardrailsService)
    after this method returns. This method is responsible only for making
    the API call and returning the raw text.

    Args:
        context: Structured facts from DuckDB that ground the prompt.
        audience: Target audience — 'csm' (tactical) or 'executive' (strategic).

    Returns:
        Raw LLM-generated text string (watermark NOT yet appended).
    """
generate_from_prompt abstractmethod
generate_from_prompt(prompt: str) -> str

Generate a raw LLM response from a pre-assembled prompt string.

Business Context: Used by the expansion narrative pipeline where PromptBuilder.build_expansion_prompt() assembles the full prompt before calling the LLM. This avoids coupling the LLM backend to SummaryContext and allows expansion-specific prompt engineering.

Parameters:

Name Type Description Default
prompt str

Fully assembled prompt string (including [CONTEXT], [INSTRUCTION], and [CONSTRAINT] blocks).

required

Returns:

Type Description
str

Raw LLM-generated text string (watermark NOT yet appended).

Source code in src/domain/ai_summary/summary_port.py
@abstractmethod
def generate_from_prompt(self, prompt: str) -> str:
    """Generate a raw LLM response from a pre-assembled prompt string.

    Business Context: Used by the expansion narrative pipeline where
    PromptBuilder.build_expansion_prompt() assembles the full prompt
    before calling the LLM. This avoids coupling the LLM backend to
    SummaryContext and allows expansion-specific prompt engineering.

    Args:
        prompt: Fully assembled prompt string (including [CONTEXT],
                [INSTRUCTION], and [CONSTRAINT] blocks).

    Returns:
        Raw LLM-generated text string (watermark NOT yet appended).
    """

Guardrails Service

src.domain.ai_summary.guardrails_service

GuardrailsService – validates LLM output before returning to callers.

Three-layer defence
  1. Feature name whitelist — reject summaries that mention made-up model features
  2. Probability accuracy — flag if stated probability deviates > 2pp from model output
  3. Watermark — always append human-in-loop annotation to every output

Business Context: In a CS context, a hallucinated summary (wrong probability, invented feature name) could trigger the wrong intervention or erode trust with CS teams. The guardrail layer ensures all LLM outputs are fact-grounded before reaching customers-facing workflows.

Classes

GuardrailsService

Validates LLM output and appends the human-in-loop watermark.

Business Context: The three validation layers (feature whitelist, probability accuracy, watermark) implement the ethical guardrails described in docs/ethical-guardrails.md. A confidence_score < 0.5 should trigger human review before the summary is used.

Source code in src/domain/ai_summary/guardrails_service.py
class GuardrailsService:
    """Validates LLM output and appends the human-in-loop watermark.

    Business Context: The three validation layers (feature whitelist,
    probability accuracy, watermark) implement the ethical guardrails
    described in docs/ethical-guardrails.md. A confidence_score < 0.5
    should trigger human review before the summary is used.
    """

    def validate(self, raw_text: str, context: SummaryContext) -> tuple[str, GuardrailResult]:
        """Validate raw LLM output and append the required watermark.

        Business Context: Called by GenerateExecutiveSummaryUseCase after
        every LLM call. Returns the final text (with watermark) and a
        GuardrailResult for audit logging and confidence scoring.

        Args:
            raw_text: The raw string returned by the LLM backend.
            context: The SummaryContext used to generate the text (for fact-checking).

        Returns:
            Tuple of (final_text_with_watermark, GuardrailResult).
        """
        flags: list[str] = []

        # 1. Check for hallucinated feature names
        # Scan for tokens that look like ML/feature engineering names:
        # snake_case compound words (≥2 parts, all lowercase + digits) that are
        # NOT in the KNOWN_FEATURES whitelist.
        # This catches both explicit suffix patterns (e.g. _score, _days) and
        # general feature-name patterns like "days_until_renewal".
        _FEATURE_PATTERN = re.compile(r"^[a-z][a-z0-9]*(_[a-z0-9]+){1,}$")  # noqa: N806
        # Tokens that look like feature names but are safe (common English compounds)
        _SAFE_TOKENS: frozenset[str] = frozenset(  # noqa: N806
            [
                "plan_tier",
                "customer_id",
                "risk_tier",
                "churn_date",
                "sign_up",
                "follow_up",
                "well_known",
                "opt_in",
                "check_in",
                "log_in",
            ]
        )
        seen_hallucinations: set[str] = set()
        tokens = raw_text.split()
        for token in tokens:
            clean_token = token.strip(".,;:!?()'\"")
            if (
                _FEATURE_PATTERN.match(clean_token)
                and clean_token not in KNOWN_FEATURES
                and clean_token not in _SAFE_TOKENS
                and clean_token not in seen_hallucinations
                and len(clean_token) >= 8  # avoid short words like "re_run"
            ):
                flags.append(f"hallucinated_feature:{clean_token}")
                seen_hallucinations.add(clean_token)

        # 2. Check probability accuracy
        stated_pct = _extract_percentage(raw_text)
        if stated_pct is not None:
            model_pct = context.prediction.churn_probability.value * 100
            if abs(stated_pct - model_pct) > _PROBABILITY_TOLERANCE_PP:
                flags.append("probability_mismatch")

        # 3. Append watermark
        final_text = f"{raw_text.strip()}\n\n{WATERMARK}"

        # Compute confidence: 1.0 degraded by 0.2 per flag, floored at 0.0
        confidence = max(0.0, 1.0 - _CONFIDENCE_PENALTY_PER_FLAG * len(flags))

        return final_text, GuardrailResult(
            passed=len(flags) == 0,
            flags=flags,
            confidence_score=confidence,
        )
Functions
validate
validate(raw_text: str, context: SummaryContext) -> tuple[str, GuardrailResult]

Validate raw LLM output and append the required watermark.

Business Context: Called by GenerateExecutiveSummaryUseCase after every LLM call. Returns the final text (with watermark) and a GuardrailResult for audit logging and confidence scoring.

Parameters:

Name Type Description Default
raw_text str

The raw string returned by the LLM backend.

required
context SummaryContext

The SummaryContext used to generate the text (for fact-checking).

required

Returns:

Type Description
tuple[str, GuardrailResult]

Tuple of (final_text_with_watermark, GuardrailResult).

Source code in src/domain/ai_summary/guardrails_service.py
def validate(self, raw_text: str, context: SummaryContext) -> tuple[str, GuardrailResult]:
    """Validate raw LLM output and append the required watermark.

    Business Context: Called by GenerateExecutiveSummaryUseCase after
    every LLM call. Returns the final text (with watermark) and a
    GuardrailResult for audit logging and confidence scoring.

    Args:
        raw_text: The raw string returned by the LLM backend.
        context: The SummaryContext used to generate the text (for fact-checking).

    Returns:
        Tuple of (final_text_with_watermark, GuardrailResult).
    """
    flags: list[str] = []

    # 1. Check for hallucinated feature names
    # Scan for tokens that look like ML/feature engineering names:
    # snake_case compound words (≥2 parts, all lowercase + digits) that are
    # NOT in the KNOWN_FEATURES whitelist.
    # This catches both explicit suffix patterns (e.g. _score, _days) and
    # general feature-name patterns like "days_until_renewal".
    _FEATURE_PATTERN = re.compile(r"^[a-z][a-z0-9]*(_[a-z0-9]+){1,}$")  # noqa: N806
    # Tokens that look like feature names but are safe (common English compounds)
    _SAFE_TOKENS: frozenset[str] = frozenset(  # noqa: N806
        [
            "plan_tier",
            "customer_id",
            "risk_tier",
            "churn_date",
            "sign_up",
            "follow_up",
            "well_known",
            "opt_in",
            "check_in",
            "log_in",
        ]
    )
    seen_hallucinations: set[str] = set()
    tokens = raw_text.split()
    for token in tokens:
        clean_token = token.strip(".,;:!?()'\"")
        if (
            _FEATURE_PATTERN.match(clean_token)
            and clean_token not in KNOWN_FEATURES
            and clean_token not in _SAFE_TOKENS
            and clean_token not in seen_hallucinations
            and len(clean_token) >= 8  # avoid short words like "re_run"
        ):
            flags.append(f"hallucinated_feature:{clean_token}")
            seen_hallucinations.add(clean_token)

    # 2. Check probability accuracy
    stated_pct = _extract_percentage(raw_text)
    if stated_pct is not None:
        model_pct = context.prediction.churn_probability.value * 100
        if abs(stated_pct - model_pct) > _PROBABILITY_TOLERANCE_PP:
            flags.append("probability_mismatch")

    # 3. Append watermark
    final_text = f"{raw_text.strip()}\n\n{WATERMARK}"

    # Compute confidence: 1.0 degraded by 0.2 per flag, floored at 0.0
    confidence = max(0.0, 1.0 - _CONFIDENCE_PENALTY_PER_FLAG * len(flags))

    return final_text, GuardrailResult(
        passed=len(flags) == 0,
        flags=flags,
        confidence_score=confidence,
    )

Expansion Guardrails Service

src.domain.ai_summary.expansion_guardrails_service

ExpansionGuardrailsService — three-gate LLM output validation for expansion briefs.

Three-layer defence
  1. Feature name whitelist (Gate 1) — flag hallucinated snake_case signals; 2+ flags → REJECTED
  2. Tone calibration (Gate 2) — strip urgency language when propensity < 0.50
  3. PII/jargon scrub (Gate 3) — remove UUIDs and ML terms from email_draft only; append watermark to ae_tactical_brief

Business Context: An AE acting on a hallucinated signal (fabricated feature name, wrong propensity tier) in an outreach email destroys trust and may misrepresent product capabilities. The three gates ensure every expansion brief delivered to Sales is factually grounded and appropriately calibrated to actual propensity.

Classes

ExpansionGuardrailResult dataclass

Result of ExpansionGuardrailsService.validate().

Parameters:

Name Type Description Default
ae_tactical_brief str

Brief with watermark appended (urgency stripped if applicable).

required
email_draft str | None

Scrubbed email draft, or None.

required
guardrail_status Literal['PASSED', 'FLAGGED', 'REJECTED']

'PASSED' / 'FLAGGED' / 'REJECTED' based on Gate 1 flags.

required
fact_confidence float

1.0 − (0.25 × n_flags), floored at 0.0.

required
flags list[str]

List of Gate 1 flag strings for audit logging.

required
Source code in src/domain/ai_summary/expansion_guardrails_service.py
@dataclass(frozen=True)
class ExpansionGuardrailResult:
    """Result of ExpansionGuardrailsService.validate().

    Args:
        ae_tactical_brief: Brief with watermark appended (urgency stripped if applicable).
        email_draft: Scrubbed email draft, or None.
        guardrail_status: 'PASSED' / 'FLAGGED' / 'REJECTED' based on Gate 1 flags.
        fact_confidence: 1.0 − (0.25 × n_flags), floored at 0.0.
        flags: List of Gate 1 flag strings for audit logging.
    """

    ae_tactical_brief: str
    email_draft: str | None
    guardrail_status: Literal["PASSED", "FLAGGED", "REJECTED"]
    fact_confidence: float
    flags: list[str]
ExpansionGuardrailsService

Validates and transforms LLM output for expansion briefs.

Business Context: Mirrors GuardrailsService for the churn domain but scoped to expansion signals. Gate 1 uses the expansion_result's actual top_features as the fact whitelist, preventing the LLM from referencing signals outside the model's output.

Gate thresholds
  • 0 flags → PASSED, confidence 1.0
  • 1 flag → FLAGGED, confidence 0.75
  • 2+ flags → REJECTED, confidence ≤ 0.50
Source code in src/domain/ai_summary/expansion_guardrails_service.py
class ExpansionGuardrailsService:
    """Validates and transforms LLM output for expansion briefs.

    Business Context: Mirrors GuardrailsService for the churn domain but
    scoped to expansion signals. Gate 1 uses the expansion_result's actual
    top_features as the fact whitelist, preventing the LLM from referencing
    signals outside the model's output.

    Gate thresholds:
      - 0 flags → PASSED, confidence 1.0
      - 1 flag  → FLAGGED, confidence 0.75
      - 2+ flags → REJECTED, confidence ≤ 0.50
    """

    def validate(
        self,
        ae_tactical_brief: str,
        email_draft: str | None,
        expansion_result: object,
        propensity: float,
    ) -> ExpansionGuardrailResult:
        """Run all three gates and return the validated/transformed result.

        Business Context: Called by GenerateExpansionSummaryUseCase after every
        LLM call. Returns the final texts (with watermark and scrubbing) plus
        a result entity for audit logging and confidence scoring.

        Args:
            ae_tactical_brief: Raw LLM-generated AE brief.
            email_draft: Optional raw LLM-generated email draft (None for CSM).
            expansion_result: ExpansionResult entity (provides top_features whitelist).
            propensity: Calibrated upgrade propensity in [0, 1].

        Returns:
            ExpansionGuardrailResult with all transformations applied.
        """
        # Build the fact whitelist from the expansion result's actual features.
        top_feature_names: frozenset[str] = frozenset(
            f.feature_name
            for f in expansion_result.top_features  # type: ignore[attr-defined]
        )
        allowed_tokens: frozenset[str] = top_feature_names | _EXPANSION_KNOWN_FEATURES | _SAFE_TOKENS

        # ── Gate 1: hallucination detection ───────────────────────────────────
        flags: list[str] = []
        seen: set[str] = set()
        for token in ae_tactical_brief.split():
            clean = token.strip(".,;:!?()'\"")
            if (
                len(clean) >= 8
                and _FEATURE_PATTERN.match(clean)
                and clean not in allowed_tokens
                and clean not in seen
                and clean not in EXPANSION_TECHNICAL_TERMS
            ):
                flags.append(f"hallucinated_feature:{clean}")
                seen.add(clean)

        # Determine status from Gate 1 flags.
        if len(flags) >= 2:
            status: Literal["PASSED", "FLAGGED", "REJECTED"] = "REJECTED"
        elif len(flags) == 1:
            status = "FLAGGED"
        else:
            status = "PASSED"

        # ── Gate 2: tone calibration ───────────────────────────────────────────
        brief_body = ae_tactical_brief
        if propensity < 0.50:
            for phrase in _URGENCY_PHRASES:
                # Case-insensitive replacement — preserves surrounding text.
                brief_body = re.sub(
                    re.escape(phrase),
                    "",
                    brief_body,
                    flags=re.IGNORECASE,
                )
            # Collapse extra whitespace left by removals.
            brief_body = re.sub(r"  +", " ", brief_body).strip()

        # ── Gate 3: PII + jargon scrub (email_draft only) ────────────────────
        scrubbed_draft: str | None = None
        if email_draft is not None:
            scrubbed = _UUID_PATTERN.sub("[REDACTED]", email_draft)
            for term in EXPANSION_TECHNICAL_TERMS:
                scrubbed = re.sub(
                    r"\b" + re.escape(term) + r"\b",
                    "[REDACTED]",
                    scrubbed,
                    flags=re.IGNORECASE,
                )
            scrubbed_draft = scrubbed

        # Append watermark to ae_tactical_brief.
        final_brief = f"{brief_body}\n\n{WATERMARK}"

        # Confidence: 1.0 − (0.25 × n_flags), floored at 0.0.
        confidence = max(0.0, 1.0 - _CONFIDENCE_PENALTY_PER_FLAG * len(flags))

        return ExpansionGuardrailResult(
            ae_tactical_brief=final_brief,
            email_draft=scrubbed_draft,
            guardrail_status=status,
            fact_confidence=confidence,
            flags=flags,
        )
Functions
validate
validate(ae_tactical_brief: str, email_draft: str | None, expansion_result: object, propensity: float) -> ExpansionGuardrailResult

Run all three gates and return the validated/transformed result.

Business Context: Called by GenerateExpansionSummaryUseCase after every LLM call. Returns the final texts (with watermark and scrubbing) plus a result entity for audit logging and confidence scoring.

Parameters:

Name Type Description Default
ae_tactical_brief str

Raw LLM-generated AE brief.

required
email_draft str | None

Optional raw LLM-generated email draft (None for CSM).

required
expansion_result object

ExpansionResult entity (provides top_features whitelist).

required
propensity float

Calibrated upgrade propensity in [0, 1].

required

Returns:

Type Description
ExpansionGuardrailResult

ExpansionGuardrailResult with all transformations applied.

Source code in src/domain/ai_summary/expansion_guardrails_service.py
def validate(
    self,
    ae_tactical_brief: str,
    email_draft: str | None,
    expansion_result: object,
    propensity: float,
) -> ExpansionGuardrailResult:
    """Run all three gates and return the validated/transformed result.

    Business Context: Called by GenerateExpansionSummaryUseCase after every
    LLM call. Returns the final texts (with watermark and scrubbing) plus
    a result entity for audit logging and confidence scoring.

    Args:
        ae_tactical_brief: Raw LLM-generated AE brief.
        email_draft: Optional raw LLM-generated email draft (None for CSM).
        expansion_result: ExpansionResult entity (provides top_features whitelist).
        propensity: Calibrated upgrade propensity in [0, 1].

    Returns:
        ExpansionGuardrailResult with all transformations applied.
    """
    # Build the fact whitelist from the expansion result's actual features.
    top_feature_names: frozenset[str] = frozenset(
        f.feature_name
        for f in expansion_result.top_features  # type: ignore[attr-defined]
    )
    allowed_tokens: frozenset[str] = top_feature_names | _EXPANSION_KNOWN_FEATURES | _SAFE_TOKENS

    # ── Gate 1: hallucination detection ───────────────────────────────────
    flags: list[str] = []
    seen: set[str] = set()
    for token in ae_tactical_brief.split():
        clean = token.strip(".,;:!?()'\"")
        if (
            len(clean) >= 8
            and _FEATURE_PATTERN.match(clean)
            and clean not in allowed_tokens
            and clean not in seen
            and clean not in EXPANSION_TECHNICAL_TERMS
        ):
            flags.append(f"hallucinated_feature:{clean}")
            seen.add(clean)

    # Determine status from Gate 1 flags.
    if len(flags) >= 2:
        status: Literal["PASSED", "FLAGGED", "REJECTED"] = "REJECTED"
    elif len(flags) == 1:
        status = "FLAGGED"
    else:
        status = "PASSED"

    # ── Gate 2: tone calibration ───────────────────────────────────────────
    brief_body = ae_tactical_brief
    if propensity < 0.50:
        for phrase in _URGENCY_PHRASES:
            # Case-insensitive replacement — preserves surrounding text.
            brief_body = re.sub(
                re.escape(phrase),
                "",
                brief_body,
                flags=re.IGNORECASE,
            )
        # Collapse extra whitespace left by removals.
        brief_body = re.sub(r"  +", " ", brief_body).strip()

    # ── Gate 3: PII + jargon scrub (email_draft only) ────────────────────
    scrubbed_draft: str | None = None
    if email_draft is not None:
        scrubbed = _UUID_PATTERN.sub("[REDACTED]", email_draft)
        for term in EXPANSION_TECHNICAL_TERMS:
            scrubbed = re.sub(
                r"\b" + re.escape(term) + r"\b",
                "[REDACTED]",
                scrubbed,
                flags=re.IGNORECASE,
            )
        scrubbed_draft = scrubbed

    # Append watermark to ae_tactical_brief.
    final_brief = f"{brief_body}\n\n{WATERMARK}"

    # Confidence: 1.0 − (0.25 × n_flags), floored at 0.0.
    confidence = max(0.0, 1.0 - _CONFIDENCE_PENALTY_PER_FLAG * len(flags))

    return ExpansionGuardrailResult(
        ae_tactical_brief=final_brief,
        email_draft=scrubbed_draft,
        guardrail_status=status,
        fact_confidence=confidence,
        flags=flags,
    )

Application Use Cases

src.application.use_cases.generate_executive_summary

GenerateExecutiveSummaryUseCase – orchestrates the AI summary pipeline.

Fetches customer + prediction data, builds SummaryContext from DuckDB, calls the LLM via SummaryPort, and validates output with GuardrailsService.

Classes

GenerateSummaryRequest dataclass

Input DTO for GenerateExecutiveSummaryUseCase.

Parameters:

Name Type Description Default
customer_id str

UUID of the active customer to summarise.

required
audience str

'csm' (tactical briefing) or 'executive' (revenue-focused).

'csm'
Source code in src/application/use_cases/generate_executive_summary.py
@dataclass
class GenerateSummaryRequest:
    """Input DTO for GenerateExecutiveSummaryUseCase.

    Args:
        customer_id: UUID of the active customer to summarise.
        audience: 'csm' (tactical briefing) or 'executive' (revenue-focused).
    """

    customer_id: str
    audience: str = field(default="csm")
GenerateExecutiveSummaryUseCase

Generates an AI executive summary grounded in DuckDB customer data.

Business Context: Replaces ~15 min of manual CSM research with a 30-second API call. The output is grounded in the Phase 4 prediction pipeline (churn probability, SHAP drivers) and enriched with usage events, support tickets, and GTM signals from DuckDB.

Pipeline
  1. Fetch Customer entity (raises ValueError if not found / churned)
  2. Run PredictChurnUseCase to get calibrated probability + SHAP features
  3. Query DuckDB for events, tickets, GTM context
  4. Build SummaryContext (the RAG "retrieval" step)
  5. Call LLM via SummaryPort (no DB access in LLM layer)
  6. Validate output + append watermark via GuardrailsService
  7. Return ExecutiveSummary entity

Parameters:

Name Type Description Default
customer_repo CustomerRepository

Repository for fetching Customer entities.

required
predict_use_case PredictChurnUseCase

PredictChurnUseCase for calibrated probability + SHAP.

required
usage_repo UsageRepository

UsageRepository for event lookups.

required
summary_service SummaryPort

SummaryPort implementation (Groq or Ollama).

required
guardrails GuardrailsService

GuardrailsService for hallucination detection + watermark.

required
Source code in src/application/use_cases/generate_executive_summary.py
class GenerateExecutiveSummaryUseCase:
    """Generates an AI executive summary grounded in DuckDB customer data.

    Business Context: Replaces ~15 min of manual CSM research with a 30-second
    API call. The output is grounded in the Phase 4 prediction pipeline (churn
    probability, SHAP drivers) and enriched with usage events, support tickets,
    and GTM signals from DuckDB.

    Pipeline:
      1. Fetch Customer entity (raises ValueError if not found / churned)
      2. Run PredictChurnUseCase to get calibrated probability + SHAP features
      3. Query DuckDB for events, tickets, GTM context
      4. Build SummaryContext (the RAG "retrieval" step)
      5. Call LLM via SummaryPort (no DB access in LLM layer)
      6. Validate output + append watermark via GuardrailsService
      7. Return ExecutiveSummary entity

    Args:
        customer_repo: Repository for fetching Customer entities.
        predict_use_case: PredictChurnUseCase for calibrated probability + SHAP.
        usage_repo: UsageRepository for event lookups.
        summary_service: SummaryPort implementation (Groq or Ollama).
        guardrails: GuardrailsService for hallucination detection + watermark.
    """

    def __init__(
        self,
        customer_repo: CustomerRepository,
        predict_use_case: PredictChurnUseCase,
        usage_repo: UsageRepository,
        summary_service: SummaryPort,
        guardrails: GuardrailsService,
    ) -> None:
        self._customer_repo = customer_repo
        self._predict_use_case = predict_use_case
        self._usage_repo = usage_repo
        self._summary_service = summary_service
        self._guardrails = guardrails

    def execute(self, request: GenerateSummaryRequest) -> ExecutiveSummary:
        """Run the full AI summary pipeline for a single customer.

        Business Context: All LLM calls are grounded in verified DuckDB data.
        The guardrail layer ensures hallucinated features or wrong probabilities
        are flagged before the summary reaches a CS workflow.

        Args:
            request: Contains customer_id and target audience.

        Returns:
            ExecutiveSummary with content, guardrail result, and provenance metadata.

        Raises:
            ValueError: If the customer is not found or has already churned.
        """
        log = logger.bind(customer_id=request.customer_id, audience=request.audience)
        log.info("summary.generate.start")

        # Step 1 — fetch customer
        customer = self._customer_repo.get_by_id(request.customer_id)
        if customer is None:
            raise ValueError(f"Customer {request.customer_id} not found.")
        if not customer.is_active:
            raise ValueError(f"Customer {request.customer_id} has already churned on {customer.churn_date}.")

        # Step 2 — run churn prediction (Phase 4 pipeline)
        prediction = self._predict_use_case.execute(PredictChurnRequest(customer_id=request.customer_id))

        # Step 3 — build context (RAG retrieval from DuckDB)
        context = self._build_context(customer, prediction)

        # Step 4 — call LLM
        raw_text = self._summary_service.generate(context, request.audience)
        log.info("summary.llm.response_received", length=len(raw_text))

        # Step 5 — validate + watermark
        final_text, guardrail = self._guardrails.validate(raw_text, context)
        if not guardrail.passed:
            log.warning(
                "summary.guardrail.flags",
                flags=guardrail.flags,
                confidence=guardrail.confidence_score,
            )

        return ExecutiveSummary(
            customer_id=request.customer_id,
            audience=request.audience,
            content=final_text,
            guardrail=guardrail,
            generated_at=datetime.now(UTC),
            model_used=self._summary_service.model_name,
            llm_provider=self._summary_service.provider_name,
            prediction=prediction,
        )

    def _build_context(self, customer: object, prediction: object) -> SummaryContext:
        """Fetch all context data from DuckDB and assemble SummaryContext.

        Business Context: This is the "retrieval" step — pulls events, tickets,
        and GTM signals for the customer. All data is fetched fresh per request
        to ensure the LLM sees current state.

        Args:
            customer: The Customer entity.
            prediction: The PredictionResult from the churn pipeline.

        Returns:
            A SummaryContext with all structured facts for the customer.
        """
        from src.domain.customer.entities import Customer
        from src.domain.prediction.entities import PredictionResult

        assert isinstance(customer, Customer)
        assert isinstance(prediction, PredictionResult)

        # Fetch events in last 30 days and aggregate by type
        since_30d = datetime.now(UTC) - timedelta(days=30)
        events = self._usage_repo.get_events_for_customer(customer.customer_id, since=since_30d)
        events_by_type: dict[str, int] = {}
        for event in events:
            key = str(event.event_type)
            events_by_type[key] = events_by_type.get(key, 0) + 1

        # Fetch open tickets and GTM from DuckDB if the repos are available
        open_tickets = self._fetch_open_tickets(customer.customer_id)
        gtm_opportunity = self._fetch_gtm_opportunity(customer.customer_id)
        cohort_churn_rate = self._fetch_cohort_churn_rate(customer)

        return SummaryContext(
            customer=customer,
            prediction=prediction,
            events_last_30d_by_type=events_by_type,
            open_tickets=open_tickets,
            gtm_opportunity=gtm_opportunity,
            cohort_churn_rate=cohort_churn_rate,
        )

    def _fetch_open_tickets(self, customer_id: str) -> list[dict[str, object]]:
        """Query DuckDB for the 5 most recent support tickets (open or resolved).

        Includes resolved tickets from the last 90 days so the LLM can answer
        questions like "what was the customer's last inquiry?" — not just open items.
        Falls back to empty list if DuckDB is unavailable.
        """
        try:
            from src.infrastructure.db.duckdb_adapter import get_connection

            with get_connection() as conn:
                rows = conn.execute(
                    """
                    SELECT priority, topic,
                           DATEDIFF('day', created_date, CURRENT_DATE) AS age_days,
                           CASE WHEN resolution_time IS NULL THEN 'open' ELSE 'resolved' END AS status
                    FROM raw.support_tickets
                    WHERE customer_id = ?
                      AND (
                          resolution_time IS NULL
                          OR CAST(created_date AS DATE) >= CURRENT_DATE - INTERVAL '90 days'
                      )
                    ORDER BY created_date DESC
                    LIMIT 5
                    """,
                    [customer_id],
                ).fetchall()
            return [{"priority": r[0], "topic": r[1], "age_days": r[2], "status": r[3]} for r in rows]
        except Exception:
            return []

    def _fetch_gtm_opportunity(self, customer_id: str) -> dict[str, object] | None:
        """Query DuckDB for the most recent active GTM opportunity."""
        try:
            from src.infrastructure.db.duckdb_adapter import get_connection

            with get_connection() as conn:
                row = conn.execute(
                    """
                    SELECT stage, amount, sales_owner, close_date
                    FROM raw.gtm_opportunities
                    WHERE customer_id = ?
                      AND stage NOT IN ('closed_won', 'closed_lost')
                    ORDER BY close_date DESC
                    LIMIT 1
                    """,
                    [customer_id],
                ).fetchone()
            if row:
                return {
                    "stage": row[0],
                    "amount": row[1],
                    "sales_owner": row[2],
                    "close_date": str(row[3]),
                }
            return None
        except Exception:
            return None

    def _fetch_cohort_churn_rate(self, customer: object) -> float:
        """Query the dbt mart for churn rate in the same tier + industry cohort."""
        from src.domain.customer.entities import Customer

        assert isinstance(customer, Customer)
        try:
            from src.infrastructure.db.duckdb_adapter import get_connection

            with get_connection() as conn:
                row = conn.execute(
                    """
                    SELECT AVG(CASE WHEN churn_date IS NOT NULL THEN 1.0 ELSE 0.0 END)
                    FROM raw.customers
                    WHERE plan_tier = ?
                      AND industry = ?
                    """,
                    [str(customer.plan_tier), str(customer.industry)],
                ).fetchone()
            if row and row[0] is not None:
                return float(row[0])
            return 0.0
        except Exception:
            return 0.0
Functions
execute
execute(request: GenerateSummaryRequest) -> ExecutiveSummary

Run the full AI summary pipeline for a single customer.

Business Context: All LLM calls are grounded in verified DuckDB data. The guardrail layer ensures hallucinated features or wrong probabilities are flagged before the summary reaches a CS workflow.

Parameters:

Name Type Description Default
request GenerateSummaryRequest

Contains customer_id and target audience.

required

Returns:

Type Description
ExecutiveSummary

ExecutiveSummary with content, guardrail result, and provenance metadata.

Raises:

Type Description
ValueError

If the customer is not found or has already churned.

Source code in src/application/use_cases/generate_executive_summary.py
def execute(self, request: GenerateSummaryRequest) -> ExecutiveSummary:
    """Run the full AI summary pipeline for a single customer.

    Business Context: All LLM calls are grounded in verified DuckDB data.
    The guardrail layer ensures hallucinated features or wrong probabilities
    are flagged before the summary reaches a CS workflow.

    Args:
        request: Contains customer_id and target audience.

    Returns:
        ExecutiveSummary with content, guardrail result, and provenance metadata.

    Raises:
        ValueError: If the customer is not found or has already churned.
    """
    log = logger.bind(customer_id=request.customer_id, audience=request.audience)
    log.info("summary.generate.start")

    # Step 1 — fetch customer
    customer = self._customer_repo.get_by_id(request.customer_id)
    if customer is None:
        raise ValueError(f"Customer {request.customer_id} not found.")
    if not customer.is_active:
        raise ValueError(f"Customer {request.customer_id} has already churned on {customer.churn_date}.")

    # Step 2 — run churn prediction (Phase 4 pipeline)
    prediction = self._predict_use_case.execute(PredictChurnRequest(customer_id=request.customer_id))

    # Step 3 — build context (RAG retrieval from DuckDB)
    context = self._build_context(customer, prediction)

    # Step 4 — call LLM
    raw_text = self._summary_service.generate(context, request.audience)
    log.info("summary.llm.response_received", length=len(raw_text))

    # Step 5 — validate + watermark
    final_text, guardrail = self._guardrails.validate(raw_text, context)
    if not guardrail.passed:
        log.warning(
            "summary.guardrail.flags",
            flags=guardrail.flags,
            confidence=guardrail.confidence_score,
        )

    return ExecutiveSummary(
        customer_id=request.customer_id,
        audience=request.audience,
        content=final_text,
        guardrail=guardrail,
        generated_at=datetime.now(UTC),
        model_used=self._summary_service.model_name,
        llm_provider=self._summary_service.provider_name,
        prediction=prediction,
    )

src.application.use_cases.ask_customer_question

AskCustomerQuestionUseCase – RAG chatbot for free-text questions about a customer.

Answers questions like "Why is this customer at risk?" by building a SummaryContext from DuckDB and passing it to the LLM with a strict grounding constraint.

Classes

AskCustomerRequest dataclass

Input DTO for AskCustomerQuestionUseCase.

Parameters:

Name Type Description Default
customer_id str

UUID of the customer to ask about.

required
question str

Free-text question from the CSM (5–500 characters).

required
Source code in src/application/use_cases/ask_customer_question.py
@dataclass
class AskCustomerRequest:
    """Input DTO for AskCustomerQuestionUseCase.

    Args:
        customer_id: UUID of the customer to ask about.
        question: Free-text question from the CSM (5–500 characters).
    """

    customer_id: str
    question: str
AskCustomerResponse dataclass

Output DTO for AskCustomerQuestionUseCase.

Parameters:

Name Type Description Default
customer_id str

UUID of the customer asked about.

required
question str

The original question.

required
answer str

LLM-generated answer with guardrail watermark.

required
confidence_score float

0–1; degrades with guardrail flags.

required
guardrail_flags list[str]

List of detected violations (e.g. 'probability_mismatch').

required
scope_exceeded bool

True if the question couldn't be answered from available data.

required
generated_at datetime

UTC timestamp of the response.

required
model_used str

LLM model name for audit.

required
llm_provider str

Provider name for audit.

required
Source code in src/application/use_cases/ask_customer_question.py
@dataclass
class AskCustomerResponse:
    """Output DTO for AskCustomerQuestionUseCase.

    Args:
        customer_id: UUID of the customer asked about.
        question: The original question.
        answer: LLM-generated answer with guardrail watermark.
        confidence_score: 0–1; degrades with guardrail flags.
        guardrail_flags: List of detected violations (e.g. 'probability_mismatch').
        scope_exceeded: True if the question couldn't be answered from available data.
        generated_at: UTC timestamp of the response.
        model_used: LLM model name for audit.
        llm_provider: Provider name for audit.
    """

    customer_id: str
    question: str
    answer: str
    confidence_score: float
    guardrail_flags: list[str]
    scope_exceeded: bool
    generated_at: datetime
    model_used: str
    llm_provider: str
AskCustomerQuestionUseCase

Answers free-text questions about a customer using their DuckDB history as context.

Business Context: CSMs can ask questions like "Why is this customer at risk?" or "What support tickets are open?" and get answers grounded in real data. Questions outside available context return a 'scope_exceeded' flag rather than hallucinated answers — protecting CSM trust in the tool.

The underlying RAG strategy is context-stuffing: the customer's full history fits in Llama-3's 128k context window, so no vector database is required.

Parameters:

Name Type Description Default
customer_repo CustomerRepository

Repository for fetching Customer entities.

required
predict_use_case PredictChurnUseCase

PredictChurnUseCase for calibrated probability + SHAP.

required
usage_repo UsageRepository

UsageRepository for event lookups.

required
summary_service SummaryPort

SummaryPort with answer_question() method.

required
guardrails GuardrailsService

GuardrailsService for hallucination detection + watermark.

required
Source code in src/application/use_cases/ask_customer_question.py
class AskCustomerQuestionUseCase:
    """Answers free-text questions about a customer using their DuckDB history as context.

    Business Context: CSMs can ask questions like "Why is this customer at risk?"
    or "What support tickets are open?" and get answers grounded in real data.
    Questions outside available context return a 'scope_exceeded' flag rather
    than hallucinated answers — protecting CSM trust in the tool.

    The underlying RAG strategy is context-stuffing: the customer's full history
    fits in Llama-3's 128k context window, so no vector database is required.

    Args:
        customer_repo: Repository for fetching Customer entities.
        predict_use_case: PredictChurnUseCase for calibrated probability + SHAP.
        usage_repo: UsageRepository for event lookups.
        summary_service: SummaryPort with answer_question() method.
        guardrails: GuardrailsService for hallucination detection + watermark.
    """

    def __init__(
        self,
        customer_repo: CustomerRepository,
        predict_use_case: PredictChurnUseCase,
        usage_repo: UsageRepository,
        summary_service: SummaryPort,
        guardrails: GuardrailsService,
    ) -> None:
        self._customer_repo = customer_repo
        self._predict_use_case = predict_use_case
        self._usage_repo = usage_repo
        self._summary_service = summary_service
        self._guardrails = guardrails
        # Reuse the context-building logic from the summary use case
        self._summary_uc = GenerateExecutiveSummaryUseCase(
            customer_repo=customer_repo,
            predict_use_case=predict_use_case,
            usage_repo=usage_repo,
            summary_service=summary_service,
            guardrails=guardrails,
        )

    def execute(self, request: AskCustomerRequest) -> AskCustomerResponse:
        """Answer a free-text question about a customer.

        Business Context: Retrieves full customer context from DuckDB, passes
        question + context to the LLM, validates the answer, and returns a
        structured response with scope_exceeded flag for out-of-context questions.

        Args:
            request: Contains customer_id and the question to answer.

        Returns:
            AskCustomerResponse with grounded answer and audit metadata.

        Raises:
            ValueError: If the customer is not found or has already churned.
        """
        log = logger.bind(customer_id=request.customer_id)
        log.info("ask.question.start", question=request.question[:100])

        # Validate customer exists
        customer = self._customer_repo.get_by_id(request.customer_id)
        if customer is None:
            raise ValueError(f"Customer {request.customer_id} not found.")
        if not customer.is_active:
            raise ValueError(f"Customer {request.customer_id} has already churned on {customer.churn_date}.")

        # Get prediction for context
        prediction = self._predict_use_case.execute(PredictChurnRequest(customer_id=request.customer_id))

        # Build full context (RAG retrieval)
        context = self._summary_uc._build_context(customer, prediction)

        # Call LLM with question
        if hasattr(self._summary_service, "answer_question"):
            raw_answer = self._summary_service.answer_question(context, request.question)
        else:
            # Fallback: use generate with a question-framed prompt
            from src.infrastructure.llm.prompt_builder import PromptBuilder

            _ = PromptBuilder().build_question_prompt(context, request.question)
            raw_answer = self._summary_service.generate(context, "csm")

        log.info("ask.llm.response_received", length=len(raw_answer))

        # Validate + watermark
        final_answer, guardrail = self._guardrails.validate(raw_answer, context)

        # Detect scope-exceeded sentinel
        scope_exceeded = _SCOPE_EXCEEDED_PHRASE.lower() in raw_answer.lower()

        return AskCustomerResponse(
            customer_id=request.customer_id,
            question=request.question,
            answer=final_answer,
            confidence_score=guardrail.confidence_score,
            guardrail_flags=guardrail.flags,
            scope_exceeded=scope_exceeded,
            generated_at=datetime.now(UTC),
            model_used=self._summary_service.model_name,
            llm_provider=self._summary_service.provider_name,
        )
Functions
execute
execute(request: AskCustomerRequest) -> AskCustomerResponse

Answer a free-text question about a customer.

Business Context: Retrieves full customer context from DuckDB, passes question + context to the LLM, validates the answer, and returns a structured response with scope_exceeded flag for out-of-context questions.

Parameters:

Name Type Description Default
request AskCustomerRequest

Contains customer_id and the question to answer.

required

Returns:

Type Description
AskCustomerResponse

AskCustomerResponse with grounded answer and audit metadata.

Raises:

Type Description
ValueError

If the customer is not found or has already churned.

Source code in src/application/use_cases/ask_customer_question.py
def execute(self, request: AskCustomerRequest) -> AskCustomerResponse:
    """Answer a free-text question about a customer.

    Business Context: Retrieves full customer context from DuckDB, passes
    question + context to the LLM, validates the answer, and returns a
    structured response with scope_exceeded flag for out-of-context questions.

    Args:
        request: Contains customer_id and the question to answer.

    Returns:
        AskCustomerResponse with grounded answer and audit metadata.

    Raises:
        ValueError: If the customer is not found or has already churned.
    """
    log = logger.bind(customer_id=request.customer_id)
    log.info("ask.question.start", question=request.question[:100])

    # Validate customer exists
    customer = self._customer_repo.get_by_id(request.customer_id)
    if customer is None:
        raise ValueError(f"Customer {request.customer_id} not found.")
    if not customer.is_active:
        raise ValueError(f"Customer {request.customer_id} has already churned on {customer.churn_date}.")

    # Get prediction for context
    prediction = self._predict_use_case.execute(PredictChurnRequest(customer_id=request.customer_id))

    # Build full context (RAG retrieval)
    context = self._summary_uc._build_context(customer, prediction)

    # Call LLM with question
    if hasattr(self._summary_service, "answer_question"):
        raw_answer = self._summary_service.answer_question(context, request.question)
    else:
        # Fallback: use generate with a question-framed prompt
        from src.infrastructure.llm.prompt_builder import PromptBuilder

        _ = PromptBuilder().build_question_prompt(context, request.question)
        raw_answer = self._summary_service.generate(context, "csm")

    log.info("ask.llm.response_received", length=len(raw_answer))

    # Validate + watermark
    final_answer, guardrail = self._guardrails.validate(raw_answer, context)

    # Detect scope-exceeded sentinel
    scope_exceeded = _SCOPE_EXCEEDED_PHRASE.lower() in raw_answer.lower()

    return AskCustomerResponse(
        customer_id=request.customer_id,
        question=request.question,
        answer=final_answer,
        confidence_score=guardrail.confidence_score,
        guardrail_flags=guardrail.flags,
        scope_exceeded=scope_exceeded,
        generated_at=datetime.now(UTC),
        model_used=self._summary_service.model_name,
        llm_provider=self._summary_service.provider_name,
    )

src.application.use_cases.generate_expansion_summary

GenerateExpansionSummaryUseCase — orchestrates the expansion narrative pipeline.

Translates a high-propensity ExpansionResult into an AE tactical brief and optional email draft, validated by ExpansionGuardrailsService before returning.

Classes

PropensityTooLowError

Bases: ValueError

Raised when propensity is below the API-layer minimum threshold (0.15).

Business Context: Accounts with propensity < 0.15 are not expansion candidates. Calling the LLM for these accounts wastes tokens and produces misleading briefs. The API layer maps this to HTTP 422 so callers know the account is not ready.

Source code in src/application/use_cases/generate_expansion_summary.py
class PropensityTooLowError(ValueError):
    """Raised when propensity is below the API-layer minimum threshold (0.15).

    Business Context: Accounts with propensity < 0.15 are not expansion candidates.
    Calling the LLM for these accounts wastes tokens and produces misleading briefs.
    The API layer maps this to HTTP 422 so callers know the account is not ready.
    """
GenerateExpansionSummaryRequest dataclass

Input DTO for GenerateExpansionSummaryUseCase.

Parameters:

Name Type Description Default
customer_id str

UUID of the active customer to generate a brief for.

required
audience Literal['account_executive', 'csm']

'account_executive' (tactical brief + optional email) or 'csm' (nurture brief only; email_draft forced to None).

'account_executive'
include_email_draft bool

If True and audience is 'account_executive', the response will include a 3-sentence email draft.

False
Source code in src/application/use_cases/generate_expansion_summary.py
@dataclass
class GenerateExpansionSummaryRequest:
    """Input DTO for GenerateExpansionSummaryUseCase.

    Args:
        customer_id: UUID of the active customer to generate a brief for.
        audience: 'account_executive' (tactical brief + optional email) or
                  'csm' (nurture brief only; email_draft forced to None).
        include_email_draft: If True and audience is 'account_executive',
                             the response will include a 3-sentence email draft.
    """

    customer_id: str
    audience: Literal["account_executive", "csm"] = field(default="account_executive")
    include_email_draft: bool = False
GenerateExpansionSummaryUseCase

Generates a personalised AE brief grounded in the expansion propensity model.

Business Context: Reduces AE prep time from ~20 minutes to 30 seconds. Personalisation via SHAP signals drives 10–15% conversion lift vs generic outreach. The correlation_id in each result enables the data team to join brief quality (fact_confidence) to close rates in the V2 fine-tuning flywheel.

Pipeline
  1. Fetch Customer entity (raises ValueError if not found / churned)
  2. Run PredictExpansionUseCase → ExpansionResult
  3. Propensity < 0.15 → raise PropensityTooLowError (API → 422)
  4. Propensity < 0.35 → return "not ready" result without LLM call
  5. CSM audience override: force include_email_draft=False
  6. Build expansion prompt via PromptBuilder
  7. Call LLM via SummaryPort.generate_from_prompt()
  8. Validate + transform via ExpansionGuardrailsService
  9. Return ExpansionSummaryResult

Parameters:

Name Type Description Default
customer_repo CustomerRepository

Repository for fetching Customer entities.

required
expansion_use_case PredictExpansionUseCase

PredictExpansionUseCase for propensity + SHAP.

required
summary_service SummaryPort

SummaryPort implementation (Groq or Ollama).

required
guardrails ExpansionGuardrailsService

ExpansionGuardrailsService for validation + watermark.

required
Source code in src/application/use_cases/generate_expansion_summary.py
class GenerateExpansionSummaryUseCase:
    """Generates a personalised AE brief grounded in the expansion propensity model.

    Business Context: Reduces AE prep time from ~20 minutes to 30 seconds.
    Personalisation via SHAP signals drives 10–15% conversion lift vs generic
    outreach. The correlation_id in each result enables the data team to join
    brief quality (fact_confidence) to close rates in the V2 fine-tuning flywheel.

    Pipeline:
      1. Fetch Customer entity (raises ValueError if not found / churned)
      2. Run PredictExpansionUseCase → ExpansionResult
      3. Propensity < 0.15 → raise PropensityTooLowError (API → 422)
      4. Propensity < 0.35 → return "not ready" result without LLM call
      5. CSM audience override: force include_email_draft=False
      6. Build expansion prompt via PromptBuilder
      7. Call LLM via SummaryPort.generate_from_prompt()
      8. Validate + transform via ExpansionGuardrailsService
      9. Return ExpansionSummaryResult

    Args:
        customer_repo: Repository for fetching Customer entities.
        expansion_use_case: PredictExpansionUseCase for propensity + SHAP.
        summary_service: SummaryPort implementation (Groq or Ollama).
        guardrails: ExpansionGuardrailsService for validation + watermark.
    """

    def __init__(
        self,
        customer_repo: CustomerRepository,
        expansion_use_case: PredictExpansionUseCase,
        summary_service: SummaryPort,
        guardrails: ExpansionGuardrailsService,
    ) -> None:
        self._customer_repo = customer_repo
        self._expansion_use_case = expansion_use_case
        self._summary_service = summary_service
        self._guardrails = guardrails
        self._prompt_builder = PromptBuilder()

    def execute(self, request: GenerateExpansionSummaryRequest) -> ExpansionSummaryResult:
        """Run the full expansion narrative pipeline for a single customer.

        Business Context: All LLM calls are grounded in verified model outputs
        (ExpansionResult SHAP features). The guardrail layer ensures hallucinated
        signals are caught before the brief reaches an AE's CRM.

        Args:
            request: Contains customer_id, audience, and email draft flag.

        Returns:
            ExpansionSummaryResult with brief, guardrail result, and provenance.

        Raises:
            ValueError: If the customer is not found or has already churned.
            PropensityTooLowError: If propensity < 0.15 (API maps this to 422).
        """
        log = logger.bind(
            customer_id=request.customer_id,
            audience=request.audience,
        )
        log.info("expansion_summary.generate.start")

        # Step 1 — fetch customer
        customer = self._customer_repo.get_by_id(request.customer_id)
        if customer is None:
            raise ValueError(f"Customer {request.customer_id} not found.")
        if not customer.is_active:
            raise ValueError(f"Customer {request.customer_id} has already churned on {customer.churn_date}.")

        # Step 2 — run expansion prediction
        expansion_result = self._expansion_use_case.execute(PredictExpansionRequest(customer_id=request.customer_id))
        propensity = expansion_result.propensity.value

        # Step 3 — API-layer propensity gate
        if propensity < _MIN_PROPENSITY_FOR_API:
            raise PropensityTooLowError(
                f"Propensity {propensity:.2f} is below minimum threshold "
                f"{_MIN_PROPENSITY_FOR_API} for expansion brief generation."
            )

        correlation_id = uuid.uuid4().hex

        # Step 4 — LLM propensity gate: return "not ready" without calling the LLM
        if propensity < _MIN_PROPENSITY_FOR_LLM:
            log.info(
                "expansion_summary.not_ready",
                propensity=propensity,
                threshold=_MIN_PROPENSITY_FOR_LLM,
            )
            return self._not_ready_result(expansion_result, correlation_id)

        # Step 5 — CSM audience override: suppress email draft
        include_draft = request.include_email_draft
        if request.audience == "csm":
            if include_draft:
                log.warning(
                    "expansion_summary.csm_email_suppressed",
                    hint="email_draft is not available for csm audience",
                )
            include_draft = False

        # Step 6 — build expansion prompt
        prompt = self._prompt_builder.build_expansion_prompt(
            expansion_result=expansion_result,
            audience=request.audience,
            include_email_draft=include_draft,
        )

        # Step 7 — call LLM
        raw_text = self._summary_service.generate_from_prompt(prompt)
        log.info("expansion_summary.llm.response_received", length=len(raw_text))

        # Step 8 — parse email draft from LLM output (if requested)
        ae_brief_raw, email_draft_raw = self._split_llm_output(raw_text, include_draft)

        # Step 9 — validate + transform via guardrails
        guardrail_result = self._guardrails.validate(
            ae_tactical_brief=ae_brief_raw,
            email_draft=email_draft_raw,
            expansion_result=expansion_result,
            propensity=propensity,
        )
        if guardrail_result.guardrail_status != "PASSED":
            log.warning(
                "expansion_summary.guardrail.flags",
                status=guardrail_result.guardrail_status,
                flags=guardrail_result.flags,
                confidence=guardrail_result.fact_confidence,
            )

        return self._build_result(
            request=request,
            expansion_result=expansion_result,
            guardrail_result=guardrail_result,
            correlation_id=correlation_id,
        )

    # ── helpers ───────────────────────────────────────────────────────────────

    def _not_ready_result(self, expansion_result: ExpansionResult, correlation_id: str) -> ExpansionSummaryResult:
        """Return a 'not ready' result without invoking the LLM."""
        propensity = expansion_result.propensity.value
        tier = str(expansion_result.propensity.tier.value)
        target = expansion_result.target.next_tier.value if expansion_result.target.next_tier else "N/A"
        return ExpansionSummaryResult(
            customer_id=expansion_result.customer_id,
            propensity_summary=(
                f"Account not ready for outreach — propensity {propensity:.0%} "
                f"is below the {_MIN_PROPENSITY_FOR_LLM:.0%} threshold."
            ),
            key_narrative_drivers=[],
            ae_tactical_brief=(
                f"Account not ready for outreach. "
                f"Propensity {propensity:.0%} ({tier}) is below the activation "
                f"threshold. Monitor for rising usage signals before scheduling "
                f"an upgrade conversation toward {target}.\n\n{WATERMARK}"
            ),
            email_draft=None,
            guardrail_status="PASSED",
            fact_confidence=1.0,
            generated_at=datetime.now(UTC),
            model_used=self._summary_service.model_name,
            llm_provider=self._summary_service.provider_name,
            propensity_score=propensity,
            propensity_tier=tier,
            target_tier=target if target != "N/A" else None,
            expected_arr_uplift=expansion_result.expected_arr_uplift,
            correlation_id=correlation_id,
        )

    def _split_llm_output(self, raw_text: str, include_draft: bool) -> tuple[str, str | None]:
        """Split LLM output into ae_brief and optional email_draft.

        The prompt instructs the LLM to label the email section as [EMAIL_DRAFT]:
        when include_email_draft=True. If the section is absent, email_draft is None.
        """
        if not include_draft or "[EMAIL_DRAFT]" not in raw_text:
            return raw_text.strip(), None

        parts = raw_text.split("[EMAIL_DRAFT]", maxsplit=1)
        ae_brief = parts[0].strip()
        email_draft = parts[1].strip() if len(parts) > 1 else None
        return ae_brief, email_draft

    def _build_result(
        self,
        request: GenerateExpansionSummaryRequest,
        expansion_result: ExpansionResult,
        guardrail_result: ExpansionGuardrailResult,
        correlation_id: str,
    ) -> ExpansionSummaryResult:
        """Assemble the final ExpansionSummaryResult from pipeline outputs."""
        propensity = expansion_result.propensity.value
        tier = str(expansion_result.propensity.tier.value)
        target = expansion_result.target.next_tier.value if expansion_result.target.next_tier else None

        key_drivers = [_FEATURE_LABELS.get(f.feature_name, f.feature_name) for f in expansion_result.top_features[:3]]

        propensity_summary = (
            f"This account has {tier.upper()} expansion propensity "
            f"({propensity:.0%}) toward {target or 'the next tier'}. "
            f"Expected ARR uplift: ${expansion_result.expected_arr_uplift:,.0f}."
        )

        # CSM audience never includes an email draft — enforce at result level.
        email_draft = guardrail_result.email_draft if request.audience == "account_executive" else None

        return ExpansionSummaryResult(
            customer_id=request.customer_id,
            propensity_summary=propensity_summary,
            key_narrative_drivers=key_drivers,
            ae_tactical_brief=guardrail_result.ae_tactical_brief,
            email_draft=email_draft,
            guardrail_status=guardrail_result.guardrail_status,
            fact_confidence=guardrail_result.fact_confidence,
            generated_at=datetime.now(UTC),
            model_used=self._summary_service.model_name,
            llm_provider=self._summary_service.provider_name,
            propensity_score=propensity,
            propensity_tier=tier,
            target_tier=target,
            expected_arr_uplift=expansion_result.expected_arr_uplift,
            correlation_id=correlation_id,
        )
Functions
execute
execute(request: GenerateExpansionSummaryRequest) -> ExpansionSummaryResult

Run the full expansion narrative pipeline for a single customer.

Business Context: All LLM calls are grounded in verified model outputs (ExpansionResult SHAP features). The guardrail layer ensures hallucinated signals are caught before the brief reaches an AE's CRM.

Parameters:

Name Type Description Default
request GenerateExpansionSummaryRequest

Contains customer_id, audience, and email draft flag.

required

Returns:

Type Description
ExpansionSummaryResult

ExpansionSummaryResult with brief, guardrail result, and provenance.

Raises:

Type Description
ValueError

If the customer is not found or has already churned.

PropensityTooLowError

If propensity < 0.15 (API maps this to 422).

Source code in src/application/use_cases/generate_expansion_summary.py
def execute(self, request: GenerateExpansionSummaryRequest) -> ExpansionSummaryResult:
    """Run the full expansion narrative pipeline for a single customer.

    Business Context: All LLM calls are grounded in verified model outputs
    (ExpansionResult SHAP features). The guardrail layer ensures hallucinated
    signals are caught before the brief reaches an AE's CRM.

    Args:
        request: Contains customer_id, audience, and email draft flag.

    Returns:
        ExpansionSummaryResult with brief, guardrail result, and provenance.

    Raises:
        ValueError: If the customer is not found or has already churned.
        PropensityTooLowError: If propensity < 0.15 (API maps this to 422).
    """
    log = logger.bind(
        customer_id=request.customer_id,
        audience=request.audience,
    )
    log.info("expansion_summary.generate.start")

    # Step 1 — fetch customer
    customer = self._customer_repo.get_by_id(request.customer_id)
    if customer is None:
        raise ValueError(f"Customer {request.customer_id} not found.")
    if not customer.is_active:
        raise ValueError(f"Customer {request.customer_id} has already churned on {customer.churn_date}.")

    # Step 2 — run expansion prediction
    expansion_result = self._expansion_use_case.execute(PredictExpansionRequest(customer_id=request.customer_id))
    propensity = expansion_result.propensity.value

    # Step 3 — API-layer propensity gate
    if propensity < _MIN_PROPENSITY_FOR_API:
        raise PropensityTooLowError(
            f"Propensity {propensity:.2f} is below minimum threshold "
            f"{_MIN_PROPENSITY_FOR_API} for expansion brief generation."
        )

    correlation_id = uuid.uuid4().hex

    # Step 4 — LLM propensity gate: return "not ready" without calling the LLM
    if propensity < _MIN_PROPENSITY_FOR_LLM:
        log.info(
            "expansion_summary.not_ready",
            propensity=propensity,
            threshold=_MIN_PROPENSITY_FOR_LLM,
        )
        return self._not_ready_result(expansion_result, correlation_id)

    # Step 5 — CSM audience override: suppress email draft
    include_draft = request.include_email_draft
    if request.audience == "csm":
        if include_draft:
            log.warning(
                "expansion_summary.csm_email_suppressed",
                hint="email_draft is not available for csm audience",
            )
        include_draft = False

    # Step 6 — build expansion prompt
    prompt = self._prompt_builder.build_expansion_prompt(
        expansion_result=expansion_result,
        audience=request.audience,
        include_email_draft=include_draft,
    )

    # Step 7 — call LLM
    raw_text = self._summary_service.generate_from_prompt(prompt)
    log.info("expansion_summary.llm.response_received", length=len(raw_text))

    # Step 8 — parse email draft from LLM output (if requested)
    ae_brief_raw, email_draft_raw = self._split_llm_output(raw_text, include_draft)

    # Step 9 — validate + transform via guardrails
    guardrail_result = self._guardrails.validate(
        ae_tactical_brief=ae_brief_raw,
        email_draft=email_draft_raw,
        expansion_result=expansion_result,
        propensity=propensity,
    )
    if guardrail_result.guardrail_status != "PASSED":
        log.warning(
            "expansion_summary.guardrail.flags",
            status=guardrail_result.guardrail_status,
            flags=guardrail_result.flags,
            confidence=guardrail_result.fact_confidence,
        )

    return self._build_result(
        request=request,
        expansion_result=expansion_result,
        guardrail_result=guardrail_result,
        correlation_id=correlation_id,
    )

Infrastructure — LLM Adapters

src.infrastructure.llm.groq_summary_service

GroqSummaryService – primary LLM backend via Groq Cloud API.

Uses llama-3.1-8b-instant by default: fast inference, free tier, 128k context. Temperature is kept low (0.2) for factual grounding in production.

Classes

GroqSummaryService

Bases: SummaryPort

Implements SummaryPort using Groq Cloud inference API.

Business Context: Groq provides sub-3-second inference for Llama-3 models on free tier, making it cost-effective for per-request executive summaries. No infrastructure to manage — just an API key.

Parameters:

Name Type Description Default
api_key str

Groq API key (from GROQ_API_KEY environment variable).

required
model str

Groq model ID. Defaults to 'llama-3.1-8b-instant' for speed; use 'llama-3.1-70b-versatile' for higher quality at higher cost.

'llama-3.1-8b-instant'
Source code in src/infrastructure/llm/groq_summary_service.py
class GroqSummaryService(SummaryPort):
    """Implements SummaryPort using Groq Cloud inference API.

    Business Context: Groq provides sub-3-second inference for Llama-3 models
    on free tier, making it cost-effective for per-request executive summaries.
    No infrastructure to manage — just an API key.

    Args:
        api_key: Groq API key (from GROQ_API_KEY environment variable).
        model: Groq model ID. Defaults to 'llama-3.1-8b-instant' for speed;
               use 'llama-3.1-70b-versatile' for higher quality at higher cost.
    """

    def __init__(
        self,
        api_key: str,
        model: str = "llama-3.1-8b-instant",
    ) -> None:
        self._client = groq_sdk.Groq(api_key=api_key)
        self._model = model
        self._prompt_builder = PromptBuilder()

    def generate(self, context: SummaryContext, audience: str) -> str:
        """Call Groq API and return the raw LLM-generated narrative.

        Business Context: Low temperature (0.2) keeps the output factual and
        consistent. max_tokens=400 enforces the 3-5 sentence length constraint
        for CSM briefings. Guardrails are applied by the caller.

        Args:
            context: Structured facts from DuckDB that ground the prompt.
            audience: 'csm' or 'executive' — controls tone and focus.

        Returns:
            Raw LLM text string (no watermark; guardrails applied by caller).
        """
        prompt = self._prompt_builder.build_summary_prompt(context, audience)
        try:
            response = self._client.chat.completions.create(
                model=self._model,
                messages=[
                    {"role": "system", "content": _SYSTEM_PROMPT},
                    {"role": "user", "content": prompt},
                ],
                max_tokens=400,
                temperature=0.2,
            )
        except groq_sdk.AuthenticationError as exc:
            raise RuntimeError(
                "Groq API key is missing or invalid. Set GROQ_API_KEY in your environment (Railway Variables or .env)."
            ) from exc
        except groq_sdk.APIError as exc:
            raise RuntimeError(f"Groq API error: {exc}") from exc
        return response.choices[0].message.content or ""

    def generate_from_prompt(self, prompt: str) -> str:
        """Call Groq API with a pre-assembled prompt and return raw LLM text.

        Business Context: Used by the expansion narrative pipeline where the
        full prompt is built by PromptBuilder.build_expansion_prompt() before
        the LLM call. max_tokens=600 accommodates brief + optional email draft.

        Args:
            prompt: Fully assembled prompt string.

        Returns:
            Raw LLM-generated text string (no watermark; guardrails applied by caller).
        """
        try:
            response = self._client.chat.completions.create(
                model=self._model,
                messages=[
                    {"role": "system", "content": _SYSTEM_PROMPT},
                    {"role": "user", "content": prompt},
                ],
                max_tokens=600,
                temperature=0.2,
            )
        except groq_sdk.AuthenticationError as exc:
            raise RuntimeError("Groq API key is missing or invalid. Set GROQ_API_KEY in your environment.") from exc
        except groq_sdk.APIError as exc:
            raise RuntimeError(f"Groq API error: {exc}") from exc
        return response.choices[0].message.content or ""

    def answer_question(self, context: SummaryContext, question: str) -> str:
        """Answer a free-text question about a customer, constrained to DuckDB context.

        Business Context: Used by the RAG chatbot endpoint. The prompt includes
        a strict [CONSTRAINT] block that prevents the LLM from fabricating answers
        to questions outside the available customer data.

        Args:
            context: All structured facts for the customer from DuckDB.
            question: CSM's free-text question (5–500 chars).

        Returns:
            LLM answer string, or the scope-exceeded sentinel phrase.
        """
        prompt = self._prompt_builder.build_question_prompt(context, question)
        try:
            response = self._client.chat.completions.create(
                model=self._model,
                messages=[
                    {"role": "system", "content": _SYSTEM_PROMPT},
                    {"role": "user", "content": prompt},
                ],
                max_tokens=300,
                temperature=0.1,
            )
        except groq_sdk.AuthenticationError as exc:
            raise RuntimeError(
                "Groq API key is missing or invalid. Set GROQ_API_KEY in your environment (Railway Variables or .env)."
            ) from exc
        except groq_sdk.APIError as exc:
            raise RuntimeError(f"Groq API error: {exc}") from exc
        return response.choices[0].message.content or ""

    @property
    def model_name(self) -> str:
        """LLM model identifier reported in ExecutiveSummary provenance."""
        return self._model

    @property
    def provider_name(self) -> str:
        """Inference provider name reported in ExecutiveSummary provenance."""
        return "groq"
Attributes
model_name property
model_name: str

LLM model identifier reported in ExecutiveSummary provenance.

provider_name property
provider_name: str

Inference provider name reported in ExecutiveSummary provenance.

Functions
generate
generate(context: SummaryContext, audience: str) -> str

Call Groq API and return the raw LLM-generated narrative.

Business Context: Low temperature (0.2) keeps the output factual and consistent. max_tokens=400 enforces the 3-5 sentence length constraint for CSM briefings. Guardrails are applied by the caller.

Parameters:

Name Type Description Default
context SummaryContext

Structured facts from DuckDB that ground the prompt.

required
audience str

'csm' or 'executive' — controls tone and focus.

required

Returns:

Type Description
str

Raw LLM text string (no watermark; guardrails applied by caller).

Source code in src/infrastructure/llm/groq_summary_service.py
def generate(self, context: SummaryContext, audience: str) -> str:
    """Call Groq API and return the raw LLM-generated narrative.

    Business Context: Low temperature (0.2) keeps the output factual and
    consistent. max_tokens=400 enforces the 3-5 sentence length constraint
    for CSM briefings. Guardrails are applied by the caller.

    Args:
        context: Structured facts from DuckDB that ground the prompt.
        audience: 'csm' or 'executive' — controls tone and focus.

    Returns:
        Raw LLM text string (no watermark; guardrails applied by caller).
    """
    prompt = self._prompt_builder.build_summary_prompt(context, audience)
    try:
        response = self._client.chat.completions.create(
            model=self._model,
            messages=[
                {"role": "system", "content": _SYSTEM_PROMPT},
                {"role": "user", "content": prompt},
            ],
            max_tokens=400,
            temperature=0.2,
        )
    except groq_sdk.AuthenticationError as exc:
        raise RuntimeError(
            "Groq API key is missing or invalid. Set GROQ_API_KEY in your environment (Railway Variables or .env)."
        ) from exc
    except groq_sdk.APIError as exc:
        raise RuntimeError(f"Groq API error: {exc}") from exc
    return response.choices[0].message.content or ""
generate_from_prompt
generate_from_prompt(prompt: str) -> str

Call Groq API with a pre-assembled prompt and return raw LLM text.

Business Context: Used by the expansion narrative pipeline where the full prompt is built by PromptBuilder.build_expansion_prompt() before the LLM call. max_tokens=600 accommodates brief + optional email draft.

Parameters:

Name Type Description Default
prompt str

Fully assembled prompt string.

required

Returns:

Type Description
str

Raw LLM-generated text string (no watermark; guardrails applied by caller).

Source code in src/infrastructure/llm/groq_summary_service.py
def generate_from_prompt(self, prompt: str) -> str:
    """Call Groq API with a pre-assembled prompt and return raw LLM text.

    Business Context: Used by the expansion narrative pipeline where the
    full prompt is built by PromptBuilder.build_expansion_prompt() before
    the LLM call. max_tokens=600 accommodates brief + optional email draft.

    Args:
        prompt: Fully assembled prompt string.

    Returns:
        Raw LLM-generated text string (no watermark; guardrails applied by caller).
    """
    try:
        response = self._client.chat.completions.create(
            model=self._model,
            messages=[
                {"role": "system", "content": _SYSTEM_PROMPT},
                {"role": "user", "content": prompt},
            ],
            max_tokens=600,
            temperature=0.2,
        )
    except groq_sdk.AuthenticationError as exc:
        raise RuntimeError("Groq API key is missing or invalid. Set GROQ_API_KEY in your environment.") from exc
    except groq_sdk.APIError as exc:
        raise RuntimeError(f"Groq API error: {exc}") from exc
    return response.choices[0].message.content or ""
answer_question
answer_question(context: SummaryContext, question: str) -> str

Answer a free-text question about a customer, constrained to DuckDB context.

Business Context: Used by the RAG chatbot endpoint. The prompt includes a strict [CONSTRAINT] block that prevents the LLM from fabricating answers to questions outside the available customer data.

Parameters:

Name Type Description Default
context SummaryContext

All structured facts for the customer from DuckDB.

required
question str

CSM's free-text question (5–500 chars).

required

Returns:

Type Description
str

LLM answer string, or the scope-exceeded sentinel phrase.

Source code in src/infrastructure/llm/groq_summary_service.py
def answer_question(self, context: SummaryContext, question: str) -> str:
    """Answer a free-text question about a customer, constrained to DuckDB context.

    Business Context: Used by the RAG chatbot endpoint. The prompt includes
    a strict [CONSTRAINT] block that prevents the LLM from fabricating answers
    to questions outside the available customer data.

    Args:
        context: All structured facts for the customer from DuckDB.
        question: CSM's free-text question (5–500 chars).

    Returns:
        LLM answer string, or the scope-exceeded sentinel phrase.
    """
    prompt = self._prompt_builder.build_question_prompt(context, question)
    try:
        response = self._client.chat.completions.create(
            model=self._model,
            messages=[
                {"role": "system", "content": _SYSTEM_PROMPT},
                {"role": "user", "content": prompt},
            ],
            max_tokens=300,
            temperature=0.1,
        )
    except groq_sdk.AuthenticationError as exc:
        raise RuntimeError(
            "Groq API key is missing or invalid. Set GROQ_API_KEY in your environment (Railway Variables or .env)."
        ) from exc
    except groq_sdk.APIError as exc:
        raise RuntimeError(f"Groq API error: {exc}") from exc
    return response.choices[0].message.content or ""

src.infrastructure.llm.ollama_summary_service

OllamaSummaryService – local LLM backend via Ollama for offline/dev use.

Ollama runs as a Docker sidecar (dev profile). Zero API cost, fully offline. Latency target: < 15s p95 on CPU. Used when LLM_PROVIDER=ollama.

Classes

OllamaSummaryService

Bases: SummaryPort

Implements SummaryPort using a local Ollama instance.

Business Context: Ollama is the local fallback for development environments without Groq API access. It runs the same Llama-3 family model, ensuring prompt/response behaviour is consistent across environments.

Parameters:

Name Type Description Default
host str

Ollama API base URL. Defaults to http://localhost:11434.

'http://localhost:11434'
model str

Ollama model tag. Defaults to 'llama3.1:8b'.

'llama3.1:8b'
timeout float

HTTP timeout in seconds for the synchronous API call.

60.0
Source code in src/infrastructure/llm/ollama_summary_service.py
class OllamaSummaryService(SummaryPort):
    """Implements SummaryPort using a local Ollama instance.

    Business Context: Ollama is the local fallback for development environments
    without Groq API access. It runs the same Llama-3 family model, ensuring
    prompt/response behaviour is consistent across environments.

    Args:
        host: Ollama API base URL. Defaults to http://localhost:11434.
        model: Ollama model tag. Defaults to 'llama3.1:8b'.
        timeout: HTTP timeout in seconds for the synchronous API call.
    """

    def __init__(
        self,
        host: str = "http://localhost:11434",
        model: str = "llama3.1:8b",
        timeout: float = 60.0,
    ) -> None:
        self._host = host.rstrip("/")
        self._model = model
        self._timeout = timeout
        self._prompt_builder = PromptBuilder()

    def generate(self, context: SummaryContext, audience: str) -> str:
        """Call local Ollama API and return the raw LLM-generated narrative.

        Business Context: Ollama uses the /api/generate endpoint with stream=False
        for simplicity. The same prompt structure as Groq is used to ensure
        output quality is comparable across providers.

        Args:
            context: Structured facts from DuckDB that ground the prompt.
            audience: 'csm' or 'executive' — controls tone and focus.

        Returns:
            Raw LLM text string (no watermark; guardrails applied by caller).

        Raises:
            httpx.HTTPError: If the Ollama service is unreachable.
        """
        prompt = self._prompt_builder.build_summary_prompt(context, audience)
        full_prompt = f"{_SYSTEM_PROMPT}\n\n{prompt}"
        return self._call_ollama(full_prompt)

    def generate_from_prompt(self, prompt: str) -> str:
        """Call local Ollama API with a pre-assembled prompt and return raw LLM text.

        Business Context: Used by the expansion narrative pipeline. Delegates to
        _call_ollama with the system prompt prepended for consistent behaviour
        across providers.

        Args:
            prompt: Fully assembled prompt string.

        Returns:
            Raw LLM-generated text string (no watermark; guardrails applied by caller).
        """
        full_prompt = f"{_SYSTEM_PROMPT}\n\n{prompt}"
        return self._call_ollama(full_prompt)

    def answer_question(self, context: SummaryContext, question: str) -> str:
        """Answer a free-text question using local Ollama inference.

        Args:
            context: All structured facts for the customer from DuckDB.
            question: CSM's free-text question.

        Returns:
            LLM answer string constrained to available context.
        """
        prompt = self._prompt_builder.build_question_prompt(context, question)
        full_prompt = f"{_SYSTEM_PROMPT}\n\n{prompt}"
        return self._call_ollama(full_prompt)

    def _call_ollama(self, prompt: str) -> str:
        """Make a synchronous POST to the Ollama /api/generate endpoint.

        Args:
            prompt: Full prompt string including system instructions.

        Returns:
            Generated text from the model response.
        """
        payload = {
            "model": self._model,
            "prompt": prompt,
            "stream": False,
            "options": {
                "temperature": 0.2,
                "num_predict": 400,
            },
        }
        response = httpx.post(
            f"{self._host}/api/generate",
            json=payload,
            timeout=self._timeout,
        )
        response.raise_for_status()
        data = response.json()
        return str(data.get("response", ""))

    @property
    def model_name(self) -> str:
        """LLM model identifier reported in ExecutiveSummary provenance."""
        return self._model

    @property
    def provider_name(self) -> str:
        """Inference provider name reported in ExecutiveSummary provenance."""
        return "ollama"
Attributes
model_name property
model_name: str

LLM model identifier reported in ExecutiveSummary provenance.

provider_name property
provider_name: str

Inference provider name reported in ExecutiveSummary provenance.

Functions
generate
generate(context: SummaryContext, audience: str) -> str

Call local Ollama API and return the raw LLM-generated narrative.

Business Context: Ollama uses the /api/generate endpoint with stream=False for simplicity. The same prompt structure as Groq is used to ensure output quality is comparable across providers.

Parameters:

Name Type Description Default
context SummaryContext

Structured facts from DuckDB that ground the prompt.

required
audience str

'csm' or 'executive' — controls tone and focus.

required

Returns:

Type Description
str

Raw LLM text string (no watermark; guardrails applied by caller).

Raises:

Type Description
HTTPError

If the Ollama service is unreachable.

Source code in src/infrastructure/llm/ollama_summary_service.py
def generate(self, context: SummaryContext, audience: str) -> str:
    """Call local Ollama API and return the raw LLM-generated narrative.

    Business Context: Ollama uses the /api/generate endpoint with stream=False
    for simplicity. The same prompt structure as Groq is used to ensure
    output quality is comparable across providers.

    Args:
        context: Structured facts from DuckDB that ground the prompt.
        audience: 'csm' or 'executive' — controls tone and focus.

    Returns:
        Raw LLM text string (no watermark; guardrails applied by caller).

    Raises:
        httpx.HTTPError: If the Ollama service is unreachable.
    """
    prompt = self._prompt_builder.build_summary_prompt(context, audience)
    full_prompt = f"{_SYSTEM_PROMPT}\n\n{prompt}"
    return self._call_ollama(full_prompt)
generate_from_prompt
generate_from_prompt(prompt: str) -> str

Call local Ollama API with a pre-assembled prompt and return raw LLM text.

Business Context: Used by the expansion narrative pipeline. Delegates to _call_ollama with the system prompt prepended for consistent behaviour across providers.

Parameters:

Name Type Description Default
prompt str

Fully assembled prompt string.

required

Returns:

Type Description
str

Raw LLM-generated text string (no watermark; guardrails applied by caller).

Source code in src/infrastructure/llm/ollama_summary_service.py
def generate_from_prompt(self, prompt: str) -> str:
    """Call local Ollama API with a pre-assembled prompt and return raw LLM text.

    Business Context: Used by the expansion narrative pipeline. Delegates to
    _call_ollama with the system prompt prepended for consistent behaviour
    across providers.

    Args:
        prompt: Fully assembled prompt string.

    Returns:
        Raw LLM-generated text string (no watermark; guardrails applied by caller).
    """
    full_prompt = f"{_SYSTEM_PROMPT}\n\n{prompt}"
    return self._call_ollama(full_prompt)
answer_question
answer_question(context: SummaryContext, question: str) -> str

Answer a free-text question using local Ollama inference.

Parameters:

Name Type Description Default
context SummaryContext

All structured facts for the customer from DuckDB.

required
question str

CSM's free-text question.

required

Returns:

Type Description
str

LLM answer string constrained to available context.

Source code in src/infrastructure/llm/ollama_summary_service.py
def answer_question(self, context: SummaryContext, question: str) -> str:
    """Answer a free-text question using local Ollama inference.

    Args:
        context: All structured facts for the customer from DuckDB.
        question: CSM's free-text question.

    Returns:
        LLM answer string constrained to available context.
    """
    prompt = self._prompt_builder.build_question_prompt(context, question)
    full_prompt = f"{_SYSTEM_PROMPT}\n\n{prompt}"
    return self._call_ollama(full_prompt)

src.infrastructure.llm.prompt_builder

PromptBuilder – assembles structured prompts from SummaryContext.

Every prompt contains a [CONTEXT] block with ONLY the facts from DuckDB. The system prompt explicitly constrains the LLM to reference only those facts. This is the primary hallucination-prevention mechanism at the prompt level.

Classes

PromptBuilder

Builds grounded prompts for the LLM from structured SummaryContext data.

Business Context: Prompt structure is the first line of defence against hallucination. By providing a [CONTEXT] block with only verified facts and a [CONSTRAINT] that explicitly forbids extrapolation, we reduce the LLM's tendency to invent figures or feature names.

Source code in src/infrastructure/llm/prompt_builder.py
class PromptBuilder:
    """Builds grounded prompts for the LLM from structured SummaryContext data.

    Business Context: Prompt structure is the first line of defence against
    hallucination. By providing a [CONTEXT] block with only verified facts and
    a [CONSTRAINT] that explicitly forbids extrapolation, we reduce the LLM's
    tendency to invent figures or feature names.
    """

    def build_summary_prompt(self, context: SummaryContext, audience: str) -> str:
        """Assemble a summary generation prompt for the given audience.

        Business Context: CSM prompts focus on actionable tactics; executive
        prompts focus on revenue impact and ROI of intervention. Both include
        the same [CONTEXT] block so the same facts ground both narratives.

        Args:
            context: All verified facts from DuckDB for this customer.
            audience: 'csm' for Customer Success Manager, 'executive' for VP/C-suite.

        Returns:
            Complete prompt string ready to send to the LLM.
        """
        ctx_block = self._format_context(context)

        if audience == "csm":
            instruction = (
                "Write a 3-5 sentence briefing for a Customer Success Manager. "
                "Refer to the customer by their industry and plan "
                "(e.g. 'this EdTech Growth customer') — never use their ID or UUID. "
                "Lead with the single most important churn driver in plain business language "
                "(e.g. 'declining product activity' or 'rising support load'). "
                "If the top signals are all healthy, say so and note what to watch. "
                "Include 2 specific recommended actions grounded in the data. "
                "Mention any recent support tickets if present. "
                "Tone: practical, direct, urgent if risk tier is HIGH or CRITICAL. "
                "Never use the words SHAP, shap_impact, feature_name, or any column names."
            )
        elif audience == "expansion":
            instruction = (
                "Write a 3-sentence expansion opportunity briefing for a Sales/CS Manager. "
                "Refer to the customer by their industry and plan — never their ID or UUID. "
                "Sentence 1: state the upgrade propensity tier and target plan, quantifying the "
                "expected ARR uplift opportunity. "
                "Sentence 2: name the top 2 signals driving the upgrade intent "
                "(e.g. premium feature trials, feature requests, or tier-ceiling pressure). "
                "Sentence 3: provide one specific, actionable next step grounded in the data. "
                "Tone: opportunity-focused, ARR-quantified, concise. "
                "Never use the words SHAP, shap_impact, feature_name, column names, or the customer UUID."
            )
        else:  # executive
            instruction = (
                "Write a 3-sentence executive summary for a VP of Customer Success. "
                "Refer to the customer by their industry and plan — never their ID. "
                "Sentence 1: state the ARR at risk and the current churn outlook (risk tier + probability). "
                "Sentence 2: name the single most concerning business signal IF any signal increases churn risk — "
                "if all signals are healthy, explain what is keeping the customer retained. "
                "Sentence 3: state the estimated ROI of CS intervention (10-15% churn reduction on the ARR). "
                "Tone: concise, quantified, boardroom-ready. "
                "Never use the words SHAP, shap_impact, feature_name, column names, or the customer UUID."
            )

        return (
            f"[CONTEXT]\n{ctx_block}\n\n"
            f"[INSTRUCTION]\n{instruction}\n\n"
            f"[CONSTRAINT]\n"
            f"You may ONLY reference facts listed in [CONTEXT]. "
            f"Do not infer, extrapolate, or add information not explicitly stated. "
            f"Do not use phrases like 'I think', 'probably', or 'might be'. "
            f"Do not repeat metric names, column names, or the customer's UUID in your output."
        )

    def build_expansion_prompt(
        self,
        expansion_result: ExpansionResult,
        audience: str,
        include_email_draft: bool = False,
    ) -> str:
        """Assemble an expansion narrative prompt grounded in ExpansionResult facts.

        Business Context: Only injects facts from expansion_result.to_summary_context()
        and the top 3 SHAP features. The LLM cannot reference signals outside
        this set — primary hallucination-prevention at the prompt level.

        Args:
            expansion_result: ExpansionResult entity from the expansion pipeline.
            audience: 'account_executive' (tactical brief + optional email) or
                      'csm' (nurture brief only, no email).
            include_email_draft: If True AND audience is 'account_executive',
                                 append email draft instructions.

        Returns:
            Complete prompt string ready to send to the LLM.
        """
        ctx = expansion_result.to_summary_context()
        top3 = expansion_result.top_features[:3]

        signal_lines = "\n".join(
            "  - {label} (impact: {direction})".format(
                label=_FEATURE_LABELS.get(f.feature_name, f.feature_name),
                direction="positive" if f.shap_impact > 0 else "negative",
            )
            for f in top3
        )
        if not signal_lines:
            signal_lines = "  (no top signals available)"

        ctx_block = (
            f"Customer expansion profile:\n"
            f"  Upgrade propensity: {ctx.get('propensity_score')} "
            f"(tier: {ctx.get('propensity_tier')})\n"
            f"  Target upgrade tier: {ctx.get('target_tier')}\n"
            f"  Expected ARR uplift: {ctx.get('expected_uplift')}\n"
            f"\n"
            f"Top 3 expansion signals (SHAP drivers):\n"
            f"{signal_lines}\n"
        )

        if audience == "account_executive":
            instruction = (
                "Write a 3-sentence AE tactical brief. "
                "Do NOT use the customer's ID or UUID — refer to them by their "
                "upgrade propensity tier and target plan only. "
                "Sentence 1: state the propensity tier and target tier, quantifying "
                "the expected ARR uplift. "
                "Sentence 2: name the top 2 signals driving upgrade intent in plain "
                "business language (e.g. premium feature adoption, feature requests). "
                "Sentence 3: one specific, actionable next step grounded in the data. "
                "Tone: opportunity-focused, ARR-quantified, concise."
            )
            if include_email_draft:
                instruction += (
                    "\n\nThen write a SEPARATE section labelled [EMAIL_DRAFT]: "
                    "a 3-sentence outreach email from the AE to the economic buyer. "
                    "Sentence 1: reference the upgrade opportunity and expected value. "
                    "Sentence 2: connect to the top 2 business signals. "
                    "Sentence 3: one clear call-to-action (schedule a call, etc.). "
                    "Do NOT include any customer UUID, internal ID, or ML terms. "
                    "Close with a professional sign-off."
                )
        else:  # csm
            instruction = (
                "Write a 3-sentence CSM nurture brief. "
                "Do NOT use the customer's ID or UUID. "
                "Sentence 1: state the upgrade propensity and potential ARR uplift. "
                "Sentence 2: describe the top 2 product-usage signals driving intent. "
                "Sentence 3: recommend one nurture action for the next 30 days. "
                "Tone: growth-oriented, product-led, no urgency language."
            )

        return (
            f"[CONTEXT]\n{ctx_block}\n"
            f"[INSTRUCTION]\n{instruction}\n\n"
            f"[CONSTRAINT]\nBe concise. Temperature: 0.2. "
            f"Do not invent signals not listed in [CONTEXT]. "
            f"Do not use the words SHAP, shap_impact, xgboost, lightgbm, "
            f"propensity_score, or any internal column names. "
            f"Do not repeat the customer UUID."
        )

    def build_question_prompt(self, context: SummaryContext, question: str) -> str:
        """Assemble a Q&A prompt that constrains answers to available customer data.

        Business Context: The RAG chatbot answers free-text questions from CSMs
        about a specific customer. The [CONSTRAINT] block prevents the LLM from
        fabricating information not present in the customer's DuckDB history.

        Args:
            context: All verified facts from DuckDB for this customer.
            question: The CSM's free-text question (5–500 characters).

        Returns:
            Complete prompt string ready to send to the LLM.
        """
        ctx_block = self._format_context(context)
        return (
            f"[CONTEXT]\n{ctx_block}\n\n"
            f"[QUESTION]\n{question}\n\n"
            f"[CONSTRAINT]\n"
            f"Answer using ONLY facts in [CONTEXT]. "
            f"If the question cannot be answered from the context, reply exactly: "
            f"'I cannot answer this from the available customer data.'"
        )

    def _format_context(self, context: SummaryContext) -> str:
        """Format a SummaryContext into a structured text block for the prompt.

        Args:
            context: The SummaryContext to format.

        Returns:
            Multi-line string with labelled sections for each data source.
        """
        c = context.customer
        p = context.prediction

        shap_lines = "\n".join(
            "  - {label}: {direction} churn risk  (value: {value})".format(
                label=_FEATURE_LABELS.get(f.feature_name, f.feature_name),
                direction="increases" if f.shap_impact > 0 else "reduces",
                value=(
                    f"{f.feature_value:.0f}"
                    if f.feature_name
                    in (
                        "events_last_30d",
                        "events_last_7d",
                        "total_events",
                        "tenure_days",
                        "days_since_last_event",
                        "retention_signal_count",
                        "integration_connects_first_30d",
                        "tickets_last_30d",
                        "high_priority_tickets",
                    )
                    else f"{f.feature_value:.2f}"
                ),
            )
            for f in p.top_shap_features[:5]
        )

        ticket_lines = (
            "\n".join(
                "  - [{status}] {topic} | priority: {priority} | {age} days ago".format(
                    status=str(t.get("status", "open")).upper(),
                    topic=t.get("topic", "—"),
                    priority=t.get("priority", "—"),
                    age=t.get("age_days", "?"),
                )
                for t in context.open_tickets
            )
            if context.open_tickets
            else "  (none on record)"
        )

        event_lines = (
            "\n".join(f"  - {etype}: {count}" for etype, count in context.events_last_30d_by_type.items())
            if context.events_last_30d_by_type
            else "  (none)"
        )

        gtm_block = (
            f"  stage={context.gtm_opportunity.get('stage')}, amount={context.gtm_opportunity.get('amount')}"
            if context.gtm_opportunity
            else "  (none)"
        )

        early_stage_note = "Yes — in critical onboarding window (first 90 days)" if c.is_early_stage else "No"
        tenure_years = c.tenure_days // 365
        tenure_months = (c.tenure_days % 365) // 30
        tenure_str = (
            f"{tenure_years}y {tenure_months}mo" if tenure_years else f"{tenure_months}mo"
        ) or f"{c.tenure_days}d"

        return (
            f"Customer Profile:\n"
            f"  Segment: {c.industry} | {c.plan_tier} plan\n"
            f"  MRR: ${c.mrr.amount:,.2f}/mo  |  ARR: ${c.mrr.amount * 12:,.2f}\n"
            f"  Account tenure: {tenure_str}\n"
            f"  Early onboarding stage: {early_stage_note}\n"
            f"\n"
            f"Churn Risk Assessment:\n"
            f"  90-day churn probability: {p.churn_probability.value:.1%}\n"
            f"  Risk tier: {p.churn_probability.risk_tier.upper()}\n"
            f"  Compliance & usage risk: {p.risk_score.value:.1%}\n"
            f"  Recommended action: {p.recommended_action}\n"
            f"  Cohort churn rate (same segment): {context.cohort_churn_rate:.1%}\n"
            f"\n"
            f"Key Business Signals (what is driving this churn outlook):\n"
            f"{shap_lines}\n"
            f"\n"
            f"Product Usage (last 30 days by activity type):\n"
            f"{event_lines}\n"
            f"\n"
            f"Recent Support Tickets (last 90 days, newest first):\n"
            f"{ticket_lines}\n"
            f"\n"
            f"GTM Opportunity:\n"
            f"{gtm_block}\n"
        )
Functions
build_summary_prompt
build_summary_prompt(context: SummaryContext, audience: str) -> str

Assemble a summary generation prompt for the given audience.

Business Context: CSM prompts focus on actionable tactics; executive prompts focus on revenue impact and ROI of intervention. Both include the same [CONTEXT] block so the same facts ground both narratives.

Parameters:

Name Type Description Default
context SummaryContext

All verified facts from DuckDB for this customer.

required
audience str

'csm' for Customer Success Manager, 'executive' for VP/C-suite.

required

Returns:

Type Description
str

Complete prompt string ready to send to the LLM.

Source code in src/infrastructure/llm/prompt_builder.py
def build_summary_prompt(self, context: SummaryContext, audience: str) -> str:
    """Assemble a summary generation prompt for the given audience.

    Business Context: CSM prompts focus on actionable tactics; executive
    prompts focus on revenue impact and ROI of intervention. Both include
    the same [CONTEXT] block so the same facts ground both narratives.

    Args:
        context: All verified facts from DuckDB for this customer.
        audience: 'csm' for Customer Success Manager, 'executive' for VP/C-suite.

    Returns:
        Complete prompt string ready to send to the LLM.
    """
    ctx_block = self._format_context(context)

    if audience == "csm":
        instruction = (
            "Write a 3-5 sentence briefing for a Customer Success Manager. "
            "Refer to the customer by their industry and plan "
            "(e.g. 'this EdTech Growth customer') — never use their ID or UUID. "
            "Lead with the single most important churn driver in plain business language "
            "(e.g. 'declining product activity' or 'rising support load'). "
            "If the top signals are all healthy, say so and note what to watch. "
            "Include 2 specific recommended actions grounded in the data. "
            "Mention any recent support tickets if present. "
            "Tone: practical, direct, urgent if risk tier is HIGH or CRITICAL. "
            "Never use the words SHAP, shap_impact, feature_name, or any column names."
        )
    elif audience == "expansion":
        instruction = (
            "Write a 3-sentence expansion opportunity briefing for a Sales/CS Manager. "
            "Refer to the customer by their industry and plan — never their ID or UUID. "
            "Sentence 1: state the upgrade propensity tier and target plan, quantifying the "
            "expected ARR uplift opportunity. "
            "Sentence 2: name the top 2 signals driving the upgrade intent "
            "(e.g. premium feature trials, feature requests, or tier-ceiling pressure). "
            "Sentence 3: provide one specific, actionable next step grounded in the data. "
            "Tone: opportunity-focused, ARR-quantified, concise. "
            "Never use the words SHAP, shap_impact, feature_name, column names, or the customer UUID."
        )
    else:  # executive
        instruction = (
            "Write a 3-sentence executive summary for a VP of Customer Success. "
            "Refer to the customer by their industry and plan — never their ID. "
            "Sentence 1: state the ARR at risk and the current churn outlook (risk tier + probability). "
            "Sentence 2: name the single most concerning business signal IF any signal increases churn risk — "
            "if all signals are healthy, explain what is keeping the customer retained. "
            "Sentence 3: state the estimated ROI of CS intervention (10-15% churn reduction on the ARR). "
            "Tone: concise, quantified, boardroom-ready. "
            "Never use the words SHAP, shap_impact, feature_name, column names, or the customer UUID."
        )

    return (
        f"[CONTEXT]\n{ctx_block}\n\n"
        f"[INSTRUCTION]\n{instruction}\n\n"
        f"[CONSTRAINT]\n"
        f"You may ONLY reference facts listed in [CONTEXT]. "
        f"Do not infer, extrapolate, or add information not explicitly stated. "
        f"Do not use phrases like 'I think', 'probably', or 'might be'. "
        f"Do not repeat metric names, column names, or the customer's UUID in your output."
    )
build_expansion_prompt
build_expansion_prompt(expansion_result: ExpansionResult, audience: str, include_email_draft: bool = False) -> str

Assemble an expansion narrative prompt grounded in ExpansionResult facts.

Business Context: Only injects facts from expansion_result.to_summary_context() and the top 3 SHAP features. The LLM cannot reference signals outside this set — primary hallucination-prevention at the prompt level.

Parameters:

Name Type Description Default
expansion_result ExpansionResult

ExpansionResult entity from the expansion pipeline.

required
audience str

'account_executive' (tactical brief + optional email) or 'csm' (nurture brief only, no email).

required
include_email_draft bool

If True AND audience is 'account_executive', append email draft instructions.

False

Returns:

Type Description
str

Complete prompt string ready to send to the LLM.

Source code in src/infrastructure/llm/prompt_builder.py
def build_expansion_prompt(
    self,
    expansion_result: ExpansionResult,
    audience: str,
    include_email_draft: bool = False,
) -> str:
    """Assemble an expansion narrative prompt grounded in ExpansionResult facts.

    Business Context: Only injects facts from expansion_result.to_summary_context()
    and the top 3 SHAP features. The LLM cannot reference signals outside
    this set — primary hallucination-prevention at the prompt level.

    Args:
        expansion_result: ExpansionResult entity from the expansion pipeline.
        audience: 'account_executive' (tactical brief + optional email) or
                  'csm' (nurture brief only, no email).
        include_email_draft: If True AND audience is 'account_executive',
                             append email draft instructions.

    Returns:
        Complete prompt string ready to send to the LLM.
    """
    ctx = expansion_result.to_summary_context()
    top3 = expansion_result.top_features[:3]

    signal_lines = "\n".join(
        "  - {label} (impact: {direction})".format(
            label=_FEATURE_LABELS.get(f.feature_name, f.feature_name),
            direction="positive" if f.shap_impact > 0 else "negative",
        )
        for f in top3
    )
    if not signal_lines:
        signal_lines = "  (no top signals available)"

    ctx_block = (
        f"Customer expansion profile:\n"
        f"  Upgrade propensity: {ctx.get('propensity_score')} "
        f"(tier: {ctx.get('propensity_tier')})\n"
        f"  Target upgrade tier: {ctx.get('target_tier')}\n"
        f"  Expected ARR uplift: {ctx.get('expected_uplift')}\n"
        f"\n"
        f"Top 3 expansion signals (SHAP drivers):\n"
        f"{signal_lines}\n"
    )

    if audience == "account_executive":
        instruction = (
            "Write a 3-sentence AE tactical brief. "
            "Do NOT use the customer's ID or UUID — refer to them by their "
            "upgrade propensity tier and target plan only. "
            "Sentence 1: state the propensity tier and target tier, quantifying "
            "the expected ARR uplift. "
            "Sentence 2: name the top 2 signals driving upgrade intent in plain "
            "business language (e.g. premium feature adoption, feature requests). "
            "Sentence 3: one specific, actionable next step grounded in the data. "
            "Tone: opportunity-focused, ARR-quantified, concise."
        )
        if include_email_draft:
            instruction += (
                "\n\nThen write a SEPARATE section labelled [EMAIL_DRAFT]: "
                "a 3-sentence outreach email from the AE to the economic buyer. "
                "Sentence 1: reference the upgrade opportunity and expected value. "
                "Sentence 2: connect to the top 2 business signals. "
                "Sentence 3: one clear call-to-action (schedule a call, etc.). "
                "Do NOT include any customer UUID, internal ID, or ML terms. "
                "Close with a professional sign-off."
            )
    else:  # csm
        instruction = (
            "Write a 3-sentence CSM nurture brief. "
            "Do NOT use the customer's ID or UUID. "
            "Sentence 1: state the upgrade propensity and potential ARR uplift. "
            "Sentence 2: describe the top 2 product-usage signals driving intent. "
            "Sentence 3: recommend one nurture action for the next 30 days. "
            "Tone: growth-oriented, product-led, no urgency language."
        )

    return (
        f"[CONTEXT]\n{ctx_block}\n"
        f"[INSTRUCTION]\n{instruction}\n\n"
        f"[CONSTRAINT]\nBe concise. Temperature: 0.2. "
        f"Do not invent signals not listed in [CONTEXT]. "
        f"Do not use the words SHAP, shap_impact, xgboost, lightgbm, "
        f"propensity_score, or any internal column names. "
        f"Do not repeat the customer UUID."
    )
build_question_prompt
build_question_prompt(context: SummaryContext, question: str) -> str

Assemble a Q&A prompt that constrains answers to available customer data.

Business Context: The RAG chatbot answers free-text questions from CSMs about a specific customer. The [CONSTRAINT] block prevents the LLM from fabricating information not present in the customer's DuckDB history.

Parameters:

Name Type Description Default
context SummaryContext

All verified facts from DuckDB for this customer.

required
question str

The CSM's free-text question (5–500 characters).

required

Returns:

Type Description
str

Complete prompt string ready to send to the LLM.

Source code in src/infrastructure/llm/prompt_builder.py
def build_question_prompt(self, context: SummaryContext, question: str) -> str:
    """Assemble a Q&A prompt that constrains answers to available customer data.

    Business Context: The RAG chatbot answers free-text questions from CSMs
    about a specific customer. The [CONSTRAINT] block prevents the LLM from
    fabricating information not present in the customer's DuckDB history.

    Args:
        context: All verified facts from DuckDB for this customer.
        question: The CSM's free-text question (5–500 characters).

    Returns:
        Complete prompt string ready to send to the LLM.
    """
    ctx_block = self._format_context(context)
    return (
        f"[CONTEXT]\n{ctx_block}\n\n"
        f"[QUESTION]\n{question}\n\n"
        f"[CONSTRAINT]\n"
        f"Answer using ONLY facts in [CONTEXT]. "
        f"If the question cannot be answered from the context, reply exactly: "
        f"'I cannot answer this from the available customer data.'"
    )