"""Demand-driven semantic richness report (advisory; never blocks)."""
from __future__ import annotations
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Literal
if TYPE_CHECKING:
from marivo.semantic.ir import DimensionIR
from marivo.semantic.reader import SemanticProject
from marivo.semantic.validator import Registry
[docs]
@dataclass(frozen=True)
class DemandSignal:
example_questions: tuple[str, ...] = ()
intents: tuple[str, ...] = ()
run_history_refs: tuple[str, ...] = ()
build_purpose: str | None = None
@dataclass(frozen=True)
class RichnessGap:
kind: Literal["coverage", "depth"]
subkind: str
refs: tuple[str, ...]
demand_weight: float
demand_evidence: tuple[str, ...]
suggested_action: str
def to_dict(self) -> dict[str, object]:
return {
"kind": self.kind,
"subkind": self.subkind,
"refs": list(self.refs),
"demand_weight": self.demand_weight,
"demand_evidence": list(self.demand_evidence),
"suggested_action": self.suggested_action,
}
[docs]
@dataclass(frozen=True)
class RichnessReport:
gaps: tuple[RichnessGap, ...]
checked_at: str
def __repr__(self) -> str:
return f"<RichnessReport gaps={len(self.gaps)}; call .show() to inspect>"
[docs]
def render(self) -> str:
"""Return bounded plain-text inspection card without a trailing newline."""
lines: list[str] = [f"RichnessReport gaps={len(self.gaps)}"]
if self.gaps:
for gap in self.gaps[:5]:
lines.append(f" - {gap.kind}: {', '.join(gap.refs)}")
if len(self.gaps) > 5:
lines.append(f" ... {len(self.gaps) - 5} more; call .to_dict() for full list")
else:
lines.append(" (no gaps found)")
lines.append(f"checked_at: {self.checked_at}")
lines.append("available:")
for entry in (".render()", ".to_dict()"):
lines.append(f"- {entry}")
return "\n".join(lines)
[docs]
def show(self) -> None:
"""Print render() output followed by a trailing newline and return None."""
print(self.render())
def to_dict(self) -> dict[str, object]:
return {
"gaps": [gap.to_dict() for gap in self.gaps],
"checked_at": self.checked_at,
}
@dataclass(frozen=True)
class RichnessSummary:
gaps: tuple[str, ...]
def to_dict(self) -> dict[str, object]:
return {"gaps": list(self.gaps)}
def _checked_at() -> str:
return datetime.now(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
def _detect_depth(reg: Registry) -> list[tuple[str, tuple[str, ...]]]:
gaps: list[tuple[str, tuple[str, ...]]] = []
objects = (
list(reg.entities.values())
+ list(reg.dimensions.values())
+ list(reg.measures.values())
+ list(reg.metrics.values())
)
for obj in objects:
ai = obj.ai_context
ref = (obj.semantic_id,)
if not (ai.business_definition and ai.business_definition.strip()):
gaps.append(("missing_business_definition", ref))
if not ai.guardrails:
gaps.append(("missing_guardrails", ref))
if not ai.synonyms:
gaps.append(("missing_synonyms", ref))
if not ai.examples:
gaps.append(("missing_examples", ref))
for metric in reg.metrics.values():
if metric.unit is None:
agg = metric.aggregation
agg_name = agg[0] if isinstance(agg, tuple) else agg
if agg_name in ("count", "count_distinct"):
gaps.append(("missing_unit_count", (metric.semantic_id,)))
else:
gaps.append(("missing_unit", (metric.semantic_id,)))
return gaps
def _detect_coverage(reg: Registry) -> list[tuple[str, tuple[str, ...]]]:
gaps: list[tuple[str, tuple[str, ...]]] = []
metric_datasets: set[str] = set()
for metric in reg.metrics.values():
metric_datasets.update(metric.entities)
fields_by_dataset: dict[str, list[DimensionIR]] = {}
for field_obj in reg.dimensions.values():
fields_by_dataset.setdefault(field_obj.entity, []).append(field_obj)
for dataset in reg.entities.values():
if dataset.semantic_id in metric_datasets:
continue
primary_key = set(dataset.primary_key)
has_measure_like = any(
(not field_obj.is_time_dimension) and (field_obj.name not in primary_key)
for field_obj in fields_by_dataset.get(dataset.semantic_id, [])
)
if has_measure_like:
gaps.append(("fact_table_no_metric", (dataset.semantic_id,)))
related_pairs = {
frozenset((rel.from_entity, rel.to_entity)) for rel in reg.relationships.values()
}
datasets = list(reg.entities.values())
for i in range(len(datasets)):
for j in range(i + 1, len(datasets)):
left, right = datasets[i], datasets[j]
if not (set(left.primary_key) & set(right.primary_key)):
continue
if frozenset((left.semantic_id, right.semantic_id)) in related_pairs:
continue
refs = tuple(sorted((left.semantic_id, right.semantic_id)))
gaps.append(("dataset_shares_keys_no_relationship", refs))
return gaps
_W_HISTORY = 3.0
_W_EXAMPLE = 1.0
_W_INTENT = 1.0
_W_PURPOSE = 0.5
def _gap_terms(
refs: tuple[str, ...],
objects: Mapping[str, object],
fields_by_dataset: Mapping[str, Sequence[DimensionIR]],
) -> set[str]:
terms: set[str] = set()
for ref in refs:
leaf = ref.rsplit(".", 1)[-1]
if leaf:
terms.add(leaf.lower())
obj = objects.get(ref)
if obj is not None:
name = getattr(obj, "name", None)
if name:
terms.add(str(name).lower())
ai = getattr(obj, "ai_context", None)
for synonym in getattr(ai, "synonyms", ()) or ():
terms.add(str(synonym).lower())
for example in getattr(ai, "examples", ()) or ():
terms.add(str(example).lower())
for field_obj in fields_by_dataset.get(ref, ()):
terms.add(field_obj.name.lower())
for synonym in field_obj.ai_context.synonyms:
terms.add(str(synonym).lower())
for example in field_obj.ai_context.examples:
terms.add(str(example).lower())
terms.discard("")
return terms
def _mentions(text: str, terms: set[str]) -> bool:
lowered = text.lower()
return any(term in lowered for term in terms)
def _demand_weight(
refs: tuple[str, ...],
terms: set[str],
demand: DemandSignal | None,
) -> tuple[float, tuple[str, ...]]:
if demand is None:
return 0.0, ()
weight = 0.0
evidence: list[str] = []
history = set(demand.run_history_refs)
for ref in refs:
if ref in history:
weight += _W_HISTORY
evidence.append(f"run_history:{ref}")
for question in demand.example_questions:
if _mentions(question, terms):
weight += _W_EXAMPLE
evidence.append(f"example:{question}")
for intent in demand.intents:
if _mentions(intent, terms):
weight += _W_INTENT
evidence.append(f"intent:{intent}")
if demand.build_purpose and _mentions(demand.build_purpose, terms):
weight += _W_PURPOSE
evidence.append(f"build_purpose:{demand.build_purpose}")
return weight, tuple(evidence)
_SUGGESTED_ACTION = {
"fact_table_no_metric": "Declare a metric over this dataset or confirm it is dimension-only.",
"dataset_shares_keys_no_relationship": "Declare a relationship between these datasets or confirm they are independent.",
"missing_business_definition": "Add ai_context.business_definition for reuse and intent matching.",
"missing_guardrails": "Add ai_context.guardrails to record usage constraints.",
"missing_synonyms": "Add ai_context.synonyms for natural-language matching.",
"missing_examples": "Add ai_context.examples (sample questions) to seed demand.",
"missing_unit": (
"Add unit (UCUM case-sensitive code, e.g. 'CNY', '%', '{order}') so analysis "
"payloads and displays carry it. For a tier-1 metric, declare unit= on its "
"measure dimension so every aggregation over it inherits the unit."
),
"missing_unit_count": (
"Count metric: declare a counted-noun annotation like '{order}' (UCUM "
"curly-brace count unit); the entity name is not singularized automatically."
),
}
def build_richness_report(
project: SemanticProject,
*,
demand: DemandSignal | None = None,
) -> RichnessReport:
reg = project._registry
if reg is None:
return RichnessReport(gaps=(), checked_at=_checked_at())
objects: dict[str, object] = {**reg.entities, **reg.dimensions, **reg.metrics}
fields_by_dataset: dict[str, list[DimensionIR]] = {}
for field_obj in reg.dimensions.values():
fields_by_dataset.setdefault(field_obj.entity, []).append(field_obj)
gaps: list[RichnessGap] = []
for subkind, refs in _detect_coverage(reg):
terms = _gap_terms(refs, objects, fields_by_dataset)
weight, evidence = _demand_weight(refs, terms, demand)
if demand is not None and weight == 0.0:
continue
gaps.append(
RichnessGap(
kind="coverage",
subkind=subkind,
refs=refs,
demand_weight=weight,
demand_evidence=evidence,
suggested_action=_SUGGESTED_ACTION[subkind],
)
)
for subkind, refs in _detect_depth(reg):
terms = _gap_terms(refs, objects, fields_by_dataset)
weight, evidence = _demand_weight(refs, terms, demand)
gaps.append(
RichnessGap(
kind="depth",
subkind=subkind,
refs=refs,
demand_weight=weight,
demand_evidence=evidence,
suggested_action=_SUGGESTED_ACTION[subkind],
)
)
gaps.sort(key=lambda gap: (-gap.demand_weight, gap.kind, gap.subkind, gap.refs))
return RichnessReport(gaps=tuple(gaps), checked_at=_checked_at())