"""Public DTOs for skill-driven semantic authoring and assessment."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal
from marivo.render import format_bounded_card, result_repr
from marivo.semantic.ir import (
CsvSourceIR,
EntitySourceIR,
ParquetSourceIR,
TableSourceIR,
)
if TYPE_CHECKING:
from marivo.datasource.metadata import TableMetadata
from marivo.datasource.scan import ColumnProfile as ScanColumnProfile
from marivo.datasource.scan import JoinKeyProbe, ScanReport
Severity = Literal["blocker", "warning", "info"]
IssueKind = Literal[
"missing_evidence",
"stale_metadata_evidence",
"missing_source",
"missing_column",
"missing_prerequisite",
"datasource_unreachable",
"static_check_failed",
"authored_object_invalid",
"unreachable_entity",
"ibis_attribute_shadowing",
"project_load_failed",
]
ReviewStatus = Literal[
"supported",
"needs_input",
"blocked",
]
AuthoringObjectKind = Literal[
"domain",
"entity",
"dimension",
"time_dimension",
"measure",
"metric",
"derived_metric",
"relationship",
]
AuthoringSourceRole = Literal["primary", "from", "to", "component"]
ReadinessEffect = Literal["blocks", "warns", "advisory"]
FileFormat = Literal["parquet", "csv"]
TableSource = TableSourceIR
FileSource = ParquetSourceIR | CsvSourceIR
DatasetSource = EntitySourceIR
@dataclass(frozen=True)
class AuthoringSourceInput:
role: AuthoringSourceRole
datasource: str
source: DatasetSource
columns: tuple[str, ...] = ()
def to_dict(self) -> dict[str, object]:
return {
"role": self.role,
"datasource": self.datasource,
"source": self.source.to_dict(),
"columns": list(self.columns),
}
[docs]
@dataclass(frozen=True)
class AssessmentIssue:
kind: IssueKind
severity: Severity
refs: tuple[str, ...]
message: str
rule_id: str
[docs]
@dataclass(frozen=True)
class AuthoringQuestion:
id: str
decision_kind: str
subject_refs: tuple[str, ...]
prompt: str
reason: str
options: tuple[str, ...] = ()
default_option: str | None = None
readiness_effect: ReadinessEffect = "blocks"
[docs]
@dataclass(frozen=True, repr=False)
class AuthoringAssessment:
status: ReviewStatus
issues: tuple[AssessmentIssue, ...]
questions: tuple[AuthoringQuestion, ...]
def _repr_identity(self) -> str:
return (
f"AuthoringAssessment status={self.status} "
f"issues={len(self.issues)} questions={len(self.questions)}"
)
def render(self) -> str:
issue_rows = [[str(issue.kind), str(issue.severity)] for issue in self.issues]
return format_bounded_card(
identity=self._repr_identity(),
columns=["issue", "severity"],
rows=issue_rows,
row_count=len(self.issues),
preview_truncation_hint="inspect .issues / .questions",
available=(".render()", ".show()"),
)
def __repr__(self) -> str:
return result_repr(self._repr_identity())
def show(self) -> None:
print(self.render())
def derive_status(
issues: tuple[AssessmentIssue, ...],
questions: tuple[AuthoringQuestion, ...],
) -> ReviewStatus:
if any(issue.severity == "blocker" for issue in issues):
return "blocked"
if any(question.readiness_effect == "blocks" for question in questions):
return "blocked"
if any(
issue.kind in {"missing_evidence", "missing_source"} and issue.severity != "info"
for issue in issues
):
return "needs_input"
if any(question.readiness_effect == "warns" for question in questions):
return "needs_input"
return "supported"
def derive_brief_status(
issues: tuple[AssessmentIssue, ...],
questions: tuple[AuthoringQuestion, ...],
) -> BriefStatus:
"""Derive BriefStatus from issues and questions.
Mirrors :func:`derive_status` but returns the ``BriefStatus``
vocabulary (``"sufficient"`` instead of ``"supported"``).
"""
if any(issue.severity == "blocker" for issue in issues):
return "blocked"
if any(question.readiness_effect == "blocks" for question in questions):
return "blocked"
if any(
issue.kind in {"missing_evidence", "missing_source"} and issue.severity != "info"
for issue in issues
):
return "needs_input"
if any(question.readiness_effect == "warns" for question in questions):
return "needs_input"
return "sufficient"
# ---------------------------------------------------------------------------
# Stepwise authoring: Brief DTOs and result objects
# ---------------------------------------------------------------------------
BriefStatus = Literal["sufficient", "needs_input", "blocked"]
RegisteredMatchBasis = Literal[
"name_exact",
"same_source",
"same_column",
"same_endpoints",
"synonym_exact",
]
[docs]
@dataclass(frozen=True)
class RegisteredMatch:
ref: str
basis: RegisteredMatchBasis
[docs]
@dataclass(frozen=True)
class PrimaryKeyCandidate:
columns: tuple[str, ...]
sampled_unique: bool
distinct_ratio: float
[docs]
@dataclass(frozen=True)
class VersioningHints:
snapshot_partition: str | None
cadence_estimate: str | None
validity_pair: tuple[str, str] | None
[docs]
@dataclass(frozen=True)
class DomainBriefSummary:
name: str
default: bool
object_counts: dict[str, int]
# Shared gloss for the common Brief envelope fields. Centralized so the wording
# stays consistent across the eight Brief dataclasses; help('<Brief>') renders
# these via FieldInfo.description.
_STATUS_DOC = (
"Authoring readiness: 'sufficient' (author one object, then verify_object), "
"'needs_input' (answer blocking AuthoringQuestions), or 'blocked' (fix the "
"blocker or record authoring_abandoned)."
)
_QUESTIONS_DOC = "Unresolved business decisions that block authoring until answered."
_ISSUES_DOC = "Structured problems found during preparation."
_MATCHES_DOC = "Already-registered candidates with the basis on which they matched."
_SCAN_DOC = "Scan scope and truncation details for the datasource read."
class _BriefResult:
"""Shared AgentResult rendering for authoring briefs.
Subclasses are frozen dataclasses that expose ``status``, ``questions``,
and ``issues`` and implement ``_repr_identity``. This mixin is local to
the brief family; it is not a cross-module result base.
"""
status: BriefStatus
questions: tuple[AuthoringQuestion, ...]
issues: tuple[AssessmentIssue, ...]
def _repr_identity(self) -> str:
raise NotImplementedError
def render(self) -> str:
return format_bounded_card(
identity=self._repr_identity(),
status=f"questions={len(self.questions)} issues={len(self.issues)}",
available=(".render()", ".show()"),
)
def __repr__(self) -> str:
return result_repr(self._repr_identity())
def show(self) -> None:
print(self.render())
[docs]
@dataclass(frozen=True, repr=False)
class DomainBrief(_BriefResult):
status: BriefStatus = field(metadata={"description": _STATUS_DOC})
proposed_name: str = field(
metadata={"description": "The domain name passed to prepare_domain."}
)
existing_domains: tuple[DomainBriefSummary, ...] = field(
metadata={"description": "Already-registered domains with descriptions and object counts."}
)
matches: tuple[RegisteredMatch, ...] = field(
metadata={"description": "name_exact or synonym_exact matches against existing domains."}
)
questions: tuple[AuthoringQuestion, ...] = field(metadata={"description": _QUESTIONS_DOC})
issues: tuple[AssessmentIssue, ...] = field(metadata={"description": _ISSUES_DOC})
def _repr_identity(self) -> str:
return f"DomainBrief proposed_name={self.proposed_name} status={self.status}"
[docs]
@dataclass(frozen=True, repr=False)
class EntityBrief(_BriefResult):
status: BriefStatus = field(metadata={"description": _STATUS_DOC})
datasource: str = field(
metadata={"description": "Datasource name the entity source reads from."}
)
source: EntitySourceIR = field( # from marivo.datasource.ir
metadata={"description": "Physical source (table or file) for the entity."}
)
domain: str = field(metadata={"description": "Target domain name for the entity."})
table: TableMetadata = field( # from marivo.datasource.metadata
metadata={"description": "Full source metadata including columns and partitions."}
)
column_profiles: tuple[ScanColumnProfile, ...] = field( # from marivo.datasource.scan
metadata={"description": "Bounded-sample profiles for all columns."}
)
primary_key_candidates: tuple[PrimaryKeyCandidate, ...] = field(
metadata={"description": "Columns sampled as unique, candidate primary keys."}
)
versioning_hints: VersioningHints = field(
metadata={"description": "Snapshot, cadence, and validity evidence for the source."}
)
time_like_columns: tuple[str, ...] = field(
metadata={"description": "Columns whose values match temporal formats."}
)
matches: tuple[RegisteredMatch, ...] = field(metadata={"description": _MATCHES_DOC})
questions: tuple[AuthoringQuestion, ...] = field(metadata={"description": _QUESTIONS_DOC})
issues: tuple[AssessmentIssue, ...] = field(metadata={"description": _ISSUES_DOC})
scan: ScanReport = field( # from marivo.datasource.scan
metadata={"description": _SCAN_DOC}
)
def _repr_identity(self) -> str:
return f"EntityBrief domain={self.domain} datasource={self.datasource} status={self.status}"
def render(self) -> str:
profile_rows = [
[p.name, p.data_type, str(p.distinct_count), str(p.null_count)]
for p in self.column_profiles[:8]
]
parts: list[str] = [f"questions={len(self.questions)} issues={len(self.issues)}"]
if self.primary_key_candidates:
pk_desc = ", ".join(
"(" + ", ".join(c.columns) + f" distinct={c.distinct_ratio:.2f})"
for c in self.primary_key_candidates[:5]
)
parts.append(f"pk_candidates=[{pk_desc}]")
if self.time_like_columns:
parts.append(f"time_like=[{', '.join(self.time_like_columns[:8])}]")
vh = self.versioning_hints
vh_parts: list[str] = []
if vh.snapshot_partition:
vh_parts.append(f"snapshot={vh.snapshot_partition}")
if vh.cadence_estimate:
vh_parts.append(f"cadence={vh.cadence_estimate}")
if vh.validity_pair:
vh_parts.append(f"validity={vh.validity_pair[0]}/{vh.validity_pair[1]}")
if vh_parts:
parts.append(" ".join(vh_parts))
return format_bounded_card(
identity=self._repr_identity(),
status=" ".join(parts),
columns=["column", "type", "distinct", "nulls"],
rows=profile_rows,
row_count=len(self.column_profiles),
preview_truncation_hint="inspect .column_profiles for all columns",
available=(".render()", ".show()"),
)
[docs]
@dataclass(frozen=True, repr=False)
class DimensionBrief(_BriefResult):
status: BriefStatus = field(metadata={"description": _STATUS_DOC})
entity: str = field(metadata={"description": "Entity ref the dimension column belongs to."})
column: str = field(metadata={"description": "The inspected source column."})
profile: ScanColumnProfile = field( # from marivo.datasource.scan
metadata={"description": "Bounded-sample profile for the column."}
)
value_shape: Literal[
"enum_like", "id_like", "numeric", "boolean_like", "temporal_like", "free_text"
] = field(metadata={"description": "Inferred value shape guiding the dimension kind."})
matches: tuple[RegisteredMatch, ...] = field(metadata={"description": _MATCHES_DOC})
questions: tuple[AuthoringQuestion, ...] = field(metadata={"description": _QUESTIONS_DOC})
issues: tuple[AssessmentIssue, ...] = field(metadata={"description": _ISSUES_DOC})
scan: ScanReport = field( # from marivo.datasource.scan
metadata={"description": _SCAN_DOC}
)
def _repr_identity(self) -> str:
return f"DimensionBrief entity={self.entity} column={self.column} status={self.status}"
[docs]
@dataclass(frozen=True, repr=False)
class TimeDimensionBrief(_BriefResult):
status: BriefStatus = field(metadata={"description": _STATUS_DOC})
entity: str = field(metadata={"description": "Entity ref the time column belongs to."})
column: str = field(metadata={"description": "The inspected source column."})
profile: ScanColumnProfile = field(
metadata={"description": "Bounded-sample profile for the column."}
)
detected_formats: tuple[FormatCandidate, ...] = field(
metadata={
"description": "Parse-variant candidates describing which ms.date/datetime/timestamp/strptime/hour_prefix to author."
}
)
value_range: tuple[object | None, object | None] = field(
metadata={"description": "Sample-local (min, max) of the column."}
)
partition_aligned: bool = field(
metadata={"description": "Whether this column is a partition key of the source."}
)
granularity_evidence: str | None = field(
metadata={"description": "Granularity inferred from sampled values, if any."}
)
cadence_estimate: tuple[int, str] | None = field(
metadata={"description": "Sampled interval evidence as (count, unit), if any."}
)
existing_time_dimensions: tuple[str, ...] = field(
metadata={"description": "Time dimensions already registered on this entity."}
)
questions: tuple[AuthoringQuestion, ...] = field(metadata={"description": _QUESTIONS_DOC})
issues: tuple[AssessmentIssue, ...] = field(metadata={"description": _ISSUES_DOC})
scan: ScanReport = field(metadata={"description": _SCAN_DOC})
def _repr_identity(self) -> str:
return f"TimeDimensionBrief entity={self.entity} column={self.column} status={self.status}"
[docs]
@dataclass(frozen=True)
class DimensionValueFact:
dimension: str
top_values: tuple[tuple[object, int], ...]
[docs]
@dataclass(frozen=True, repr=False)
class MetricBrief(_BriefResult):
status: BriefStatus = field(metadata={"description": _STATUS_DOC})
entity: str = field(metadata={"description": "Entity ref the measure columns belong to."})
measure_profiles: tuple[ScanColumnProfile, ...] = field(
metadata={"description": "Range, negatives, and null profiles for the measure columns."}
)
filter_dimension_values: tuple[DimensionValueFact, ...] = field(
metadata={"description": "Top values for any filter dimensions."}
)
time_dimensions: tuple[str, ...] = field(
metadata={
"description": "Time dimensions on the entity; empty triggers a ladder-order advisory."
}
)
matches: tuple[RegisteredMatch, ...] = field(metadata={"description": _MATCHES_DOC})
questions: tuple[AuthoringQuestion, ...] = field(metadata={"description": _QUESTIONS_DOC})
issues: tuple[AssessmentIssue, ...] = field(metadata={"description": _ISSUES_DOC})
scan: ScanReport = field(metadata={"description": _SCAN_DOC})
def _repr_identity(self) -> str:
return f"MetricBrief entity={self.entity} status={self.status}"
[docs]
@dataclass(frozen=True, repr=False)
class MeasureBrief(_BriefResult):
status: BriefStatus = field(metadata={"description": _STATUS_DOC})
entity: str = field(metadata={"description": "Entity ref the measure column belongs to."})
column: str = field(metadata={"description": "The inspected source column."})
profile: ScanColumnProfile = field( # from marivo.datasource.scan
metadata={"description": "Bounded-sample profile for the column."}
)
additivity_hint: Literal["additive", "non_additive", "semi_additive", "unknown"] = field(
metadata={"description": "Inferred additivity hint based on column type and values."}
)
matches: tuple[RegisteredMatch, ...] = field(metadata={"description": _MATCHES_DOC})
questions: tuple[AuthoringQuestion, ...] = field(metadata={"description": _QUESTIONS_DOC})
issues: tuple[AssessmentIssue, ...] = field(metadata={"description": _ISSUES_DOC})
scan: ScanReport = field( # from marivo.datasource.scan
metadata={"description": _SCAN_DOC}
)
def _repr_identity(self) -> str:
return f"MeasureBrief entity={self.entity} column={self.column} status={self.status}"
[docs]
@dataclass(frozen=True, repr=False)
class RelationshipBrief(_BriefResult):
status: BriefStatus = field(metadata={"description": _STATUS_DOC})
from_entity: str = field(metadata={"description": "From-side entity ref."})
to_entity: str = field(metadata={"description": "To-side entity ref."})
keys: tuple[tuple[str, str], ...] = field(
metadata={
"description": "Join-key pairs as (from_key, to_key) matching ms.join_on(left, right)."
}
)
probe: JoinKeyProbe = field( # from marivo.datasource.scan
metadata={"description": "Key match rate, cardinality, and scan reports for the join."}
)
to_entity_versioning: str | None = field(
metadata={"description": "Snapshot or validity interaction note for the to-side entity."}
)
matches: tuple[RegisteredMatch, ...] = field(metadata={"description": _MATCHES_DOC})
questions: tuple[AuthoringQuestion, ...] = field(metadata={"description": _QUESTIONS_DOC})
issues: tuple[AssessmentIssue, ...] = field(metadata={"description": _ISSUES_DOC})
def _repr_identity(self) -> str:
return f"RelationshipBrief from={self.from_entity} to={self.to_entity} status={self.status}"
[docs]
@dataclass(frozen=True)
class JoinPathFact:
from_ref: str
to_ref: str
relationship: str
cardinality: str
fanout_risk: bool
[docs]
@dataclass(frozen=True, repr=False)
class CrossEntityMetricBrief(_BriefResult):
status: BriefStatus = field(metadata={"description": _STATUS_DOC})
root_entity: str = field(metadata={"description": "Root entity ref the metric is measured on."})
entities: tuple[str, ...] = field(
metadata={"description": "Target entity refs to join from the root entity."}
)
join_paths: tuple[JoinPathFact, ...] = field(
metadata={"description": "Relationship paths between participating entities."}
)
unreachable_entities: tuple[str, ...] = field(
metadata={"description": "Entities with no relationship path (blocking)."}
)
measure_profiles: tuple[ScanColumnProfile, ...] = field(
metadata={"description": "Profiles for the root-entity measure columns."}
)
root_time_dimensions: tuple[str, ...] = field(
metadata={"description": "Time dimensions on the root entity."}
)
questions: tuple[AuthoringQuestion, ...] = field(metadata={"description": _QUESTIONS_DOC})
issues: tuple[AssessmentIssue, ...] = field(metadata={"description": _ISSUES_DOC})
scan: ScanReport = field(metadata={"description": _SCAN_DOC})
def _repr_identity(self) -> str:
return f"CrossEntityMetricBrief root_entity={self.root_entity} status={self.status}"
[docs]
@dataclass(frozen=True)
class ComponentFact:
ref: str
role: Literal["numerator", "denominator", "value", "weight"]
additivity: str
composition_kind: str
verification_status: str
unit: str | None
[docs]
@dataclass(frozen=True, repr=False)
class DerivedMetricBrief(_BriefResult):
status: BriefStatus = field(metadata={"description": _STATUS_DOC})
composition_kind: Literal["ratio", "weighted_average", "linear"] = field(
metadata={"description": "Inferred composition type from the supplied components."}
)
components: tuple[ComponentFact, ...] = field(
metadata={"description": "Component metrics with additivity and verification facts."}
)
propagated_verification: str = field(
metadata={"description": "Projected verification status derived from components."}
)
unit_hint: str | None = field(
metadata={"description": "Suggested unit inferred from component units, if any."}
)
matches: tuple[RegisteredMatch, ...] = field(metadata={"description": _MATCHES_DOC})
questions: tuple[AuthoringQuestion, ...] = field(metadata={"description": _QUESTIONS_DOC})
issues: tuple[AssessmentIssue, ...] = field(metadata={"description": _ISSUES_DOC})
authoring_template: str | None = field(
default=None,
metadata={
"description": (
"Ready-to-use ms.ratio(...)/ms.weighted_average(...)/ms.linear(...) "
"call template derived from components. "
"Copy and fill in name= to declare the metric."
)
},
)
def _repr_identity(self) -> str:
return f"DerivedMetricBrief composition={self.composition_kind} status={self.status}"
def render(self) -> str:
parts: list[str] = []
if self.components:
comp_rows = [[c.ref, c.role, c.composition_kind] for c in self.components[:6]]
parts.append(
format_bounded_card(
identity=self._repr_identity(),
status=f"questions={len(self.questions)} issues={len(self.issues)}",
columns=["ref", "role", "composition"],
rows=comp_rows,
row_count=len(self.components),
preview_truncation_hint="inspect .components for all components",
available=(".render()", ".show()"),
)
)
else:
parts.append(
format_bounded_card(
identity=self._repr_identity(),
status=f"questions={len(self.questions)} issues={len(self.issues)}",
available=(".render()", ".show()"),
)
)
if self.authoring_template is not None:
parts.append(f"Template:\n{self.authoring_template}")
return "\n".join(parts)
[docs]
@dataclass(frozen=True)
class VerifyResult:
status: Literal["passed", "failed"]
ref: str
kind: AuthoringObjectKind
issues: tuple[AssessmentIssue, ...]
warnings: tuple[AssessmentIssue, ...]
scan: ScanReport | None
auto_recorded: tuple[str, ...]
def __repr__(self) -> str:
return f"<VerifyResult status={self.status} ref={self.ref} kind={self.kind}>"
def render(self) -> str:
return (
f"VerifyResult status={self.status} ref={self.ref} kind={self.kind} "
f"issues={len(self.issues)} warnings={len(self.warnings)}"
)
def show(self) -> None:
print(self.render())