"""Evidence ledger: provenance DTOs, structural fingerprint, and on-disk store.
Provenance metadata only — never executable expression bodies, never a parallel
semantic definition. Python files remain the only semantic source of truth.
Contracts spec sections 3, 5, 6.
"""
from __future__ import annotations
import hashlib
import json
from collections.abc import Mapping
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from marivo.datasource.metadata import TableMetadata
[docs]
@dataclass(frozen=True)
class DecisionRecord:
decision_kind: str
chosen: object
agreement_confidence: str
qualifying_sources: tuple[str, ...]
materiality: str
blast_radius: int
evidence_fingerprint: str
question_id: str | None
decided_at: str
cited_source: dict[str, object] | None = None
cited_columns: tuple[str, ...] = ()
def __post_init__(self) -> None:
if self.chosen is None:
raise ValueError("DecisionRecord.chosen must not be None")
_validate_blast_radius(self.blast_radius)
def to_dict(self) -> dict[str, object]:
return {
"decision_kind": self.decision_kind,
"chosen": self.chosen,
"agreement_confidence": self.agreement_confidence,
"qualifying_sources": list(self.qualifying_sources),
"materiality": self.materiality,
"blast_radius": self.blast_radius,
"evidence_fingerprint": self.evidence_fingerprint,
"question_id": self.question_id,
"decided_at": self.decided_at,
"cited_source": self.cited_source,
"cited_columns": list(self.cited_columns),
}
@classmethod
def from_dict(cls, data: Mapping[str, object]) -> DecisionRecord:
cited_columns_raw = data.get("cited_columns", [])
cited_source_raw = data.get("cited_source")
return cls(
decision_kind=str(data["decision_kind"]),
chosen=data["chosen"],
agreement_confidence=str(data["agreement_confidence"]),
qualifying_sources=tuple(str(s) for s in data["qualifying_sources"]), # type: ignore[attr-defined]
materiality=str(data["materiality"]),
blast_radius=_validate_blast_radius(data["blast_radius"]),
evidence_fingerprint=str(data["evidence_fingerprint"]),
question_id=None if data["question_id"] is None else str(data["question_id"]),
decided_at=str(data["decided_at"]),
cited_source=(
dict(cited_source_raw) if isinstance(cited_source_raw, Mapping) else None
),
cited_columns=tuple(str(c) for c in cited_columns_raw), # type: ignore[attr-defined]
)
def _validate_blast_radius(value: object, *, field: str = "DecisionRecord.blast_radius") -> int:
if type(value) is not int:
raise TypeError(
f"{field} must be a non-negative int; got {type(value).__name__}: {value!r}"
)
if value < 0:
raise ValueError(f"{field} must be a non-negative int; got {value!r}")
return value
@dataclass(frozen=True)
class RejectedCandidate:
decision_kind: str
candidate: str
reason: str
evidence_fingerprint: str
rejected_at: str
def to_dict(self) -> dict[str, object]:
return {
"decision_kind": self.decision_kind,
"candidate": self.candidate,
"reason": self.reason,
"evidence_fingerprint": self.evidence_fingerprint,
"rejected_at": self.rejected_at,
}
@classmethod
def from_dict(cls, data: Mapping[str, object]) -> RejectedCandidate:
return cls(
decision_kind=str(data["decision_kind"]),
candidate=str(data["candidate"]),
reason=str(data["reason"]),
evidence_fingerprint=str(data["evidence_fingerprint"]),
rejected_at=str(data["rejected_at"]),
)
def evidence_fingerprint(
columns: Mapping[str, str],
table_comment: str | None,
column_comments: Mapping[str, str | None],
) -> str:
"""Structural, single-tier fingerprint (contracts spec section 6). Hashes the
cited columns + comments only. Sample values are deliberately excluded -- data
drift is not detected here."""
payload = {
"columns": [{"name": name, "type": columns[name]} for name in sorted(columns)],
"table_comment": table_comment,
"column_comments": {name: column_comments[name] for name in sorted(column_comments)},
}
digest = hashlib.sha256(
json.dumps(payload, sort_keys=True, separators=(",", ":")).encode()
).hexdigest()
return f"sha256:{digest}"
@dataclass(frozen=True)
class ObjectEvidence:
semantic_id: str
authored_at: str
decisions: tuple[DecisionRecord, ...]
rejected_candidates: tuple[RejectedCandidate, ...]
def to_dict(self) -> dict[str, object]:
return {
"semantic_id": self.semantic_id,
"authored_at": self.authored_at,
"decisions": [d.to_dict() for d in self.decisions],
"rejected_candidates": [r.to_dict() for r in self.rejected_candidates],
}
@classmethod
def from_dict(cls, data: Mapping[str, object]) -> ObjectEvidence:
return cls(
semantic_id=str(data["semantic_id"]),
authored_at=str(data["authored_at"]),
decisions=tuple(DecisionRecord.from_dict(d) for d in data["decisions"]), # type: ignore[attr-defined]
rejected_candidates=tuple(
RejectedCandidate.from_dict(r)
for r in data["rejected_candidates"] # type: ignore[attr-defined]
),
)
def _model_of(semantic_id: str) -> str:
return semantic_id.split(".", 1)[0]
def _read_object_evidence(path: Path) -> ObjectEvidence:
try:
return ObjectEvidence.from_dict(json.loads(path.read_text()))
except TypeError as exc:
raise TypeError(f"Invalid evidence ledger object at {path}: {exc}") from exc
except ValueError as exc:
raise ValueError(f"Invalid evidence ledger object at {path}: {exc}") from exc
except KeyError as exc:
raise KeyError(f"Invalid evidence ledger object at {path}: missing field {exc}") from exc
class LedgerStore:
"""Canonical-JSON file IO under <state_root>/evidence/<model>/."""
def __init__(self, state_root: str | Path) -> None:
self._root = Path(state_root)
def _evidence_dir(self, model: str) -> Path:
return self._root / "evidence" / model
def _object_path(self, semantic_id: str) -> Path:
return self._evidence_dir(_model_of(semantic_id)) / "objects" / f"{semantic_id}.json"
def write_object(self, obj: ObjectEvidence) -> None:
path = self._object_path(obj.semantic_id)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(obj.to_dict(), sort_keys=True, indent=2) + "\n")
def read_object(self, semantic_id: str) -> ObjectEvidence | None:
path = self._object_path(semantic_id)
if not path.exists():
return None
return _read_object_evidence(path)
def iter_object_records(self) -> tuple[ObjectEvidence, ...]:
records: list[ObjectEvidence] = []
for objects_dir in sorted(self._root.glob("evidence/*/objects")):
for path in sorted(objects_dir.glob("*.json")):
records.append(_read_object_evidence(path))
return tuple(records)
def record_decision(self, semantic_id: str, record: DecisionRecord) -> None:
"""Append a decision record to the object evidence for *semantic_id*.
Creates the object evidence file if it does not already exist.
"""
obj = self.read_object(semantic_id)
if obj is None:
obj = ObjectEvidence(
semantic_id=semantic_id,
authored_at=record.decided_at,
decisions=(record,),
rejected_candidates=(),
)
else:
obj = ObjectEvidence(
semantic_id=obj.semantic_id,
authored_at=obj.authored_at,
decisions=(*obj.decisions, record),
rejected_candidates=obj.rejected_candidates,
)
self.write_object(obj)
def write_rejected_candidate(self, candidate: RejectedCandidate) -> None:
"""Persist a rejected candidate alongside the relevant object evidence.
The candidate is stored under a dedicated rejected-candidates file keyed
by its candidate name so that list_rejected_candidates can enumerate
them without scanning every object evidence file.
"""
path = self._rejected_candidates_path()
existing = list(self.list_rejected_candidates())
existing.append(candidate)
path.parent.mkdir(parents=True, exist_ok=True)
payload = {"rejected_candidates": [c.to_dict() for c in existing]}
path.write_text(json.dumps(payload, sort_keys=True, indent=2) + "\n")
def list_rejected_candidates(self) -> tuple[RejectedCandidate, ...]:
"""Return all rejected candidates recorded in this project."""
path = self._rejected_candidates_path()
if not path.exists():
return ()
try:
payload = json.loads(path.read_text())
except (json.JSONDecodeError, OSError):
return ()
raw = payload.get("rejected_candidates", [])
return tuple(RejectedCandidate.from_dict(item) for item in raw)
def _rejected_candidates_path(self) -> Path:
return self._root / "evidence" / "rejected_candidates.json"
def is_decision_stale(record: DecisionRecord, metadata: TableMetadata) -> bool:
"""True if recomputing the decision's structural fingerprint over current
metadata differs from the stored one. A decision with no cited_source cannot be
recomputed and is treated as not stale (contracts spec accepts this)."""
if record.cited_source is None:
return False
cited = set(record.cited_columns)
columns = {col.name: col.type for col in metadata.columns if col.name in cited}
comments = {col.name: col.comment for col in metadata.columns if col.name in cited}
current = evidence_fingerprint(columns, metadata.comment, comments)
return current != record.evidence_fingerprint