Source code for marivo.semantic.catalog

"""SemanticCatalog — unified agent-facing read surface for marivo.semantic.

Public entrypoint: ms.load() -> SemanticCatalog
"""

from __future__ import annotations

import contextlib
from collections.abc import Iterable, Iterator, Sequence
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Literal, NoReturn

from marivo.datasource.ir import AiContextIR, DatasourceIR, DatasourceSourceLocation
from marivo.datasource.scan import ScanScope
from marivo.preview import (
    METRIC_PREVIEW_SAMPLE_SIZE,
    PREVIEW_DEFAULT_LIMIT,
    PreviewResult,
    PreviewSamplePolicy,
    PreviewWarning,
    preview_ibis_table,
    preview_ibis_value,
    validate_preview_limit,
)
from marivo.refs import SemanticRef
from marivo.render import format_bounded_card, result_repr
from marivo.semantic.constraints import ConstraintId
from marivo.semantic.dtos import DatasetSource
from marivo.semantic.errors import ErrorKind, SemanticLoadFailed, SemanticRuntimeError, _raise
from marivo.semantic.ir import (
    DateParse,
    DatetimeParse,
    DimensionIR,
    DomainIR,
    EntityIR,
    EntityVersioningIR,
    HourPrefixParse,
    LinearComposition,
    MeasureIR,
    MetricIR,
    ParityStatus,
    RelationshipIR,
    SampleIntervalIR,
    SemiAdditive,
    SnapshotVersioningIR,
    SourceLocation,
    SqlProvenance,
    StrptimeParse,
    SymbolKind,
    TimestampParse,
    ValidityVersioningIR,
    additivity_bucket,
    composition_components,
)
from marivo.semantic.parity import propagated_parity_status
from marivo.semantic.refs import as_ref_id, make_ref

if TYPE_CHECKING:
    from marivo.semantic.dtos import VerifyResult
    from marivo.semantic.reader import SemanticProject
    from marivo.semantic.readiness import ReadinessReport
    from marivo.semantic.resolver import SemanticResolver
    from marivo.semantic.validator import Registry

from marivo.semantic.reader import _suggest_ref_level

# list[SemanticObject] return annotations inside SemanticCatalog shadow the
# built-in list type because the class has a method named list().  Use this
# alias to avoid the name collision.
_ListOfSemanticObject = list["SemanticObject"]

__all__ = [
    "AiContextView",
    "DatasourceDetails",
    "DerivedMetricDetails",
    "DimensionDetails",
    "DomainDetails",
    "EntityDetails",
    "EntityVersioning",
    "MeasureDetails",
    "MetricDetails",
    "RelationshipDetails",
    "SemanticCatalog",
    "SemanticKind",
    "SemanticKindInput",
    "SemanticObject",
    "SemanticObjectDetails",
    "SemanticObjectList",
    "SemanticRef",
    "SemanticRefInput",
    "SimpleMetricDetails",
    "SnapshotVersioning",
    "TimeDimensionDetails",
    "ValidityVersioning",
    "load",
]

# SemanticKind is a stable alias for the internal SymbolKind enum.
# Both share the same values: domain, datasource, entity, dimension,
# measure, time_dimension, metric, relationship.
SemanticKind = SymbolKind
AiContextView = AiContextIR
SnapshotVersioning = SnapshotVersioningIR
ValidityVersioning = ValidityVersioningIR
EntityVersioning = EntityVersioningIR


# ---------------------------------------------------------------------------
# Kind-specific details
# ---------------------------------------------------------------------------


def _render_details_card(
    *,
    identity: str,
    status: str | None = None,
    extra_lines: tuple[str, ...] = (),
) -> str:
    """Return a bounded plain-text details card without a trailing newline."""
    lines: list[str] = [identity]
    if status:
        lines.append(f"status: {status}")
    for line in extra_lines:
        lines.append(line)
    lines.append("available:")
    lines.append("- .show()")
    return "\n".join(lines)


def _source_location_text(source_location: SourceLocation) -> str:
    return f"{source_location.file}:{source_location.line}"


def _format_ref(ref: SemanticRef | None) -> str:
    return ref.id if ref is not None else "(none)"


def _format_refs(refs: tuple[SemanticRef, ...], *, limit: int = 6) -> str:
    if not refs:
        return "(none)"
    visible = [ref.id for ref in refs[:limit]]
    if len(refs) > limit:
        visible.append(f"... (+{len(refs) - limit} more)")
    return ", ".join(visible)


def _format_tuple_values(values: tuple[str, ...], *, limit: int = 6) -> str:
    if not values:
        return "(none)"
    visible = list(values[:limit])
    if len(values) > limit:
        visible.append(f"... (+{len(values) - limit} more)")
    return ", ".join(visible)


def _format_mapping(mapping: dict[str, object] | dict[str, str]) -> str:
    if not mapping:
        return "(none)"
    return ", ".join(f"{key}: {value}" for key, value in sorted(mapping.items()))


def _source_text(source: DatasetSource) -> str:
    if hasattr(source, "to_dict"):
        return str(source.to_dict())
    return repr(source)


def _versioning_text(versioning: EntityVersioning | None) -> str:
    if versioning is None:
        return "(none)"
    return repr(versioning)


def _provenance_text(provenance: SqlProvenance | None) -> str:
    if provenance is None:
        return "(none)"
    return f"{provenance.kind} dialect={provenance.dialect} sql={provenance.sql!r}"


def _common_detail_lines(
    *,
    context: AiContextView,
    python_symbol: str,
    source_location: SourceLocation,
    parents: tuple[SemanticRef, ...],
    children: tuple[SemanticRef, ...],
    dependents: tuple[SemanticRef, ...],
) -> list[str]:
    lines = [
        f"business_definition: {context.business_definition or '(none)'}",
        "guardrails:",
    ]
    lines.extend(f"- {guardrail}" for guardrail in context.guardrails[:6])
    if not context.guardrails:
        lines.append("- (none)")
    if len(context.guardrails) > 6:
        lines.append(f"- ... (+{len(context.guardrails) - 6} more)")
    if context.synonyms:
        lines.append(f"synonyms: {_format_tuple_values(context.synonyms)}")
    if context.examples:
        lines.append("examples:")
        lines.extend(f"- {example}" for example in context.examples[:3])
        if len(context.examples) > 3:
            lines.append(f"- ... (+{len(context.examples) - 3} more)")
    if context.instructions:
        lines.append(f"instructions: {context.instructions}")
    if context.owner_notes:
        lines.append(f"owner_notes: {context.owner_notes}")
    lines.extend(
        (
            f"source_location: {_source_location_text(source_location)}",
            f"python_symbol: {python_symbol or '(none)'}",
            f"parents: {_format_refs(parents)}",
            f"children: {_format_refs(children)}",
            f"dependents: {_format_refs(dependents)}",
        )
    )
    return lines


@dataclass(frozen=True, repr=False)
class _DetailsBase:
    """Common fields and result protocol shared by all *Details classes."""

    ref: SemanticRef
    kind: SemanticKind
    name: str
    domain: str | None
    context: AiContextView
    source_location: SourceLocation
    parents: tuple[SemanticRef, ...]
    children: tuple[SemanticRef, ...]
    dependents: tuple[SemanticRef, ...]
    python_symbol: str

    def _repr_identity(self) -> str:
        return f"{self.__class__.__name__} ref={self.ref.id}"

    def __repr__(self) -> str:
        return result_repr(self._repr_identity())

    def render(self) -> str:
        raise NotImplementedError

    def show(self) -> None:
        print(self.render())


[docs] @dataclass(frozen=True, repr=False) class DatasourceDetails(_DetailsBase): """Details for a datasource object.""" backend_type: str fields: dict[str, object] env_refs: dict[str, str] def render(self) -> str: extra = _common_detail_lines( context=self.context, python_symbol=self.python_symbol, source_location=self.source_location, parents=self.parents, children=self.children, dependents=self.dependents, ) extra.extend( ( f"backend_type: {self.backend_type}", f"fields: {_format_mapping(self.fields)}", f"env_refs: {_format_mapping(self.env_refs)}", ) ) return _render_details_card( identity=self._repr_identity(), status=self.context.business_definition, extra_lines=tuple(extra), )
[docs] @dataclass(frozen=True, repr=False) class DomainDetails(_DetailsBase): """Details for a domain object.""" default: bool def render(self) -> str: extra = _common_detail_lines( context=self.context, python_symbol=self.python_symbol, source_location=self.source_location, parents=self.parents, children=self.children, dependents=self.dependents, ) extra.append(f"default: {self.default}") return _render_details_card( identity=self._repr_identity(), status=self.context.business_definition, extra_lines=tuple(extra), )
[docs] @dataclass(frozen=True, repr=False) class EntityDetails(_DetailsBase): """Details for an entity object.""" datasource: SemanticRef source: DatasetSource primary_key: tuple[str, ...] versioning: EntityVersioning | None def render(self) -> str: extra = _common_detail_lines( context=self.context, python_symbol=self.python_symbol, source_location=self.source_location, parents=self.parents, children=self.children, dependents=self.dependents, ) extra.extend( ( f"datasource: {self.datasource.id}", f"source: {_source_text(self.source)}", f"primary_key: {_format_tuple_values(self.primary_key)}", f"versioning: {_versioning_text(self.versioning)}", ) ) return _render_details_card( identity=self._repr_identity(), status=self.context.business_definition, extra_lines=tuple(extra), )
[docs] @dataclass(frozen=True, repr=False) class DimensionDetails(_DetailsBase): """Details for a categorical dimension object.""" entity: SemanticRef def render(self) -> str: extra = _common_detail_lines( context=self.context, python_symbol=self.python_symbol, source_location=self.source_location, parents=self.parents, children=self.children, dependents=self.dependents, ) extra.append(f"entity: {self.entity.id}") return _render_details_card( identity=self._repr_identity(), status=self.context.business_definition, extra_lines=tuple(extra), )
[docs] @dataclass(frozen=True, repr=False) class MeasureDetails(_DetailsBase): """Details for a row-level quantitative measure object.""" entity: SemanticRef additivity: Literal["additive", "semi_additive", "non_additive"] unit: str | None def render(self) -> str: extra = _common_detail_lines( context=self.context, python_symbol=self.python_symbol, source_location=self.source_location, parents=self.parents, children=self.children, dependents=self.dependents, ) extra.extend((f"entity: {self.entity.id}", f"additivity: {self.additivity}")) if self.unit: extra.append(f"unit: {self.unit}") return _render_details_card( identity=self._repr_identity(), status=self.context.business_definition, extra_lines=tuple(extra), )
[docs] @dataclass(frozen=True, repr=False) class TimeDimensionDetails(_DetailsBase): """Details for a time dimension object.""" entity: SemanticRef parse_kind: Literal["date", "datetime", "timestamp", "strptime", "hour_prefix"] | None data_type: str | None granularity: str | None format: str | None timezone: str | None is_default: bool sample_interval: SampleIntervalIR | None def render(self) -> str: extra = _common_detail_lines( context=self.context, python_symbol=self.python_symbol, source_location=self.source_location, parents=self.parents, children=self.children, dependents=self.dependents, ) parse_kind_display = self.parse_kind or "(inferred)" extra.extend( ( f"entity: {self.entity.id}", f"parse_kind: {parse_kind_display}", f"granularity: {self.granularity}", f"format: {self.format!r}", f"timezone: {self.timezone!r}", f"is_default: {self.is_default}", f"sample_interval: {self.sample_interval.to_token() if self.sample_interval else '(none)'}", ) ) return _render_details_card( identity=self._repr_identity(), status=self.context.business_definition, extra_lines=tuple(extra), )
def _metric_common_lines( *, entities: tuple[SemanticRef, ...], root_entity: SemanticRef | None, metric_type: Literal["simple", "derived"], additivity: Literal["additive", "semi_additive", "non_additive"], fold: str | None, status_time_dimension: str | None, fanout_policy: Literal["block", "aggregate_then_join"], unit: str | None, provenance: SqlProvenance | None, parity_status: ParityStatus, ) -> list[str]: """Render lines shared by all metric detail variants.""" lines = [ f"entities: {_format_refs(entities)}", f"root_entity: {_format_ref(root_entity)}", f"type: {metric_type}", f"additivity: {additivity}", ] if fold is not None: lines.append(f"fold: {fold} over {status_time_dimension}") lines.append(f"fanout_policy: {fanout_policy}") if unit: lines.append(f"unit: {unit}") lines.append(f"provenance: {_provenance_text(provenance)}") lines.append(f"parity_status: {parity_status}") return lines
[docs] @dataclass(frozen=True, repr=False) class SimpleMetricDetails(_DetailsBase): """Details for a simple (entity-backed) metric. Simple metrics are declared with ``@ms.metric(...)`` or ``ms.aggregate(...)``. They have an optional aggregation and measure reference; they never have composition, components, or linear_terms. """ entities: tuple[SemanticRef, ...] root_entity: SemanticRef | None aggregation: str | None measure: SemanticRef | None additivity: Literal["additive", "semi_additive", "non_additive"] fold: str | None status_time_dimension: str | None fanout_policy: Literal["block", "aggregate_then_join"] unit: str | None provenance: SqlProvenance | None parity_status: ParityStatus @property def metric_type(self) -> Literal["simple"]: return "simple" def render(self) -> str: extra = _common_detail_lines( context=self.context, python_symbol=self.python_symbol, source_location=self.source_location, parents=self.parents, children=self.children, dependents=self.dependents, ) extra.extend( _metric_common_lines( entities=self.entities, root_entity=self.root_entity, metric_type=self.metric_type, additivity=self.additivity, fold=self.fold, status_time_dimension=self.status_time_dimension, fanout_policy=self.fanout_policy, unit=self.unit, provenance=self.provenance, parity_status=self.parity_status, ) ) if self.aggregation is not None: extra.append(f"aggregation: {self.aggregation}") if self.measure is not None: extra.append(f"measure: {self.measure.id}") return _render_details_card( identity=self._repr_identity(), status=self.context.business_definition, extra_lines=tuple(extra), )
[docs] @dataclass(frozen=True, repr=False) class DerivedMetricDetails(_DetailsBase): """Details for a derived (composed) metric. Derived metrics are declared with ``ms.ratio(...)``, ``ms.weighted_average(...)``, or ``ms.linear(...)``. They always carry a composition kind and components; they never have aggregation or measure. """ entities: tuple[SemanticRef, ...] root_entity: SemanticRef | None composition: Literal["ratio", "weighted_average", "linear"] components: tuple[tuple[str, SemanticRef], ...] linear_terms: tuple[tuple[str, str], ...] required_relationships: tuple[SemanticRef, ...] additivity: Literal["additive", "semi_additive", "non_additive"] fold: str | None status_time_dimension: str | None fanout_policy: Literal["block", "aggregate_then_join"] unit: str | None provenance: SqlProvenance | None parity_status: ParityStatus @property def metric_type(self) -> Literal["derived"]: return "derived" def render(self) -> str: extra = _common_detail_lines( context=self.context, python_symbol=self.python_symbol, source_location=self.source_location, parents=self.parents, children=self.children, dependents=self.dependents, ) extra.extend( _metric_common_lines( entities=self.entities, root_entity=self.root_entity, metric_type=self.metric_type, additivity=self.additivity, fold=self.fold, status_time_dimension=self.status_time_dimension, fanout_policy=self.fanout_policy, unit=self.unit, provenance=self.provenance, parity_status=self.parity_status, ) ) extra.append(f"composition: {self.composition}") if self.components: extra.append( "components: " + ", ".join(f"{role}={ref.id}" for role, ref in self.components) ) if self.linear_terms: extra.append( "linear_terms: " + ", ".join(f"{sign}{metric}" for sign, metric in self.linear_terms) ) if self.required_relationships: extra.append(f"required_relationships: {_format_refs(self.required_relationships)}") return _render_details_card( identity=self._repr_identity(), status=self.context.business_definition, extra_lines=tuple(extra), )
MetricDetails = SimpleMetricDetails | DerivedMetricDetails
[docs] @dataclass(frozen=True, repr=False) class RelationshipDetails(_DetailsBase): """Details for a relationship between entities.""" from_entity: SemanticRef to_entity: SemanticRef from_keys: tuple[str, ...] to_keys: tuple[str, ...] def __post_init__(self) -> None: # Compatibility: these are no longer stored directly on RelationshipIR, # but RelationshipDetails still exposes them for catalog consumers. # Set by _build_relationship_object from JoinKey pairs. pass def render(self) -> str: extra = _common_detail_lines( context=self.context, python_symbol=self.python_symbol, source_location=self.source_location, parents=self.parents, children=self.children, dependents=self.dependents, ) extra.extend( ( f"from: {self.from_entity.id}", f"to: {self.to_entity.id}", "join_keys: " + ", ".join( f"{left}={right}" for left, right in zip(self.from_keys, self.to_keys, strict=True) ), ) ) return _render_details_card( identity=self._repr_identity(), status=self.context.business_definition, extra_lines=tuple(extra), )
SemanticObjectDetails = ( DatasourceDetails | DomainDetails | EntityDetails | DimensionDetails | MeasureDetails | TimeDimensionDetails | MetricDetails | RelationshipDetails )
[docs] @dataclass(frozen=True, repr=False) class SemanticObject: """Single read shape for all loaded semantic objects. Args: ref: Stable semantic identifier, passable directly to analysis APIs. kind: Semantic kind of this object. name: Short leaf name (no domain prefix). domain: Owning domain name, or None for datasources. description: Short display summary (not business meaning). context: Business meaning, guardrails, and usage guidance from ai_context. source_location: Location in the user-authored semantic file. Returns: SemanticObject with all common fields and kind-specific detail via details(). Example: >>> revenue = catalog.get("sales.revenue") >>> revenue.ref # make_ref("sales.revenue", "metric") >>> revenue.description # "Gross revenue." >>> revenue.context.business_definition >>> revenue.details().additivity >>> revenue.children # tuple[SemanticRef, ...] Constraints: ``description`` is a short display summary only. Business meaning and guardrails live under ``context``. Use ``catalog.list(parent=...)`` for hierarchy browsing — SemanticObject does not expose navigation methods. """ ref: SemanticRef kind: SemanticKind name: str domain: str | None context: AiContextView source_location: SourceLocation python_symbol: str _details: SemanticObjectDetails @property def children(self) -> tuple[SemanticRef, ...]: """Return the children refs for this object. Returns: Tuple of SemanticRef values for child objects. Non-container objects (metrics, dimensions, relationships) return an empty tuple. Example: >>> domain = catalog.get("sales") >>> domain.children # (SemanticRef("sales.orders", ...), ...) Constraints: The returned refs are read-only; they cannot be used to modify the semantic model. """ return self._details.children
[docs] def details(self) -> SemanticObjectDetails: """Return the typed kind-specific details for this object. Args: None Returns: Kind-specific details dataclass (EntityDetails, MetricDetails, etc.) including parents, children, dependents, and structural facts. Example: >>> d = catalog.get("sales.revenue").details() >>> d.additivity >>> d.components Constraints: The returned object exposes stable catalog value views and shared immutable value types where the semantic and datasource layers already use the same representation. """ return self._details
def _repr_identity(self) -> str: return f"SemanticObject kind={self.kind} ref={self.ref.id}"
[docs] def render(self) -> str: """Return a bounded plain-text object card without a trailing newline.""" return format_bounded_card( identity=self._repr_identity(), status=self.context.business_definition, available=(".details()", ".show()"), )
def __repr__(self) -> str: return result_repr(self._repr_identity())
[docs] def show(self) -> None: """Print render() output and return None.""" print(self.render())
[docs] class SemanticObjectList: """Browsing result returned by catalog.list(...). Args: items: Ordered tuple of SemanticObject results. parent_label: String label of the parent used for rendering (e.g. 'sales.orders'). kind_filter: Kind filter string used in the request, or None. Returns: SemanticObjectList with render/show for display and refs()/objects for consumption. Example: >>> result = catalog.list("sales.orders") >>> result.show() >>> result.refs() # tuple[SemanticRef, ...] >>> result.objects # tuple[SemanticObject, ...] Constraints: render() never omits items from the objects tuple unless explicitly truncated with a message. """ def __init__( self, items: tuple[SemanticObject, ...], parent_label: str | None, kind_filter: str | None, ) -> None: self._items = items self._parent_label = parent_label self._kind_filter = kind_filter @property def objects(self) -> tuple[SemanticObject, ...]: """Return all SemanticObject results.""" return self._items
[docs] def refs(self) -> tuple[SemanticRef, ...]: """Return the SemanticRef for every object in this list.""" return tuple(obj.ref for obj in self._items)
[docs] def ids(self) -> list[str]: """Return plain-string refs for every object in this list.""" return [obj.ref.id for obj in self._items]
def __len__(self) -> int: return len(self._items) def __iter__(self) -> Iterator[SemanticObject]: return iter(self._items) def __getitem__(self, index: int) -> SemanticObject: return self._items[index]
[docs] def render(self) -> str: """Return bounded plain-text browsing card without a trailing newline.""" lines: list[str] = [] if self._parent_label: lines.append(self._parent_label) if not self._items: filter_note = f" kind={self._kind_filter!r}" if self._kind_filter else "" parent_note = self._parent_label or "catalog" lines.append(f" (no objects found under {parent_note!r}{filter_note})") lines.append("next steps:") lines.append( " catalog.list().show() # browse top-level domains and datasources" ) return "\n".join(lines) for obj in self._items: kind_str = str(obj.kind) ref_str = obj.ref.id lines.append(f" {kind_str:<12}{ref_str}") lines.append("") lines.append("next steps:") if self._items: first_ref = self._items[0].ref.id lines.append( f" catalog.get({first_ref!r}){'': <4}# retrieve a SemanticObject by full ref" ) lines.append( " result.refs() # obtain all SemanticRef values for analysis handoff" ) return "\n".join(lines).rstrip("\n")
def _repr_identity(self) -> str: label = self._parent_label or "catalog" filter_note = f" kind={self._kind_filter}" if self._kind_filter else "" return f"SemanticObjectList parent={label}{filter_note} count={len(self._items)}" def __repr__(self) -> str: return result_repr(self._repr_identity())
[docs] def show(self) -> None: """Print render() output and return None.""" print(self.render())
# --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- SemanticRefInput = SemanticRef | str SemanticKindInput = SemanticKind | str _VALID_KINDS: frozenset[str] = frozenset(str(k) for k in SymbolKind) _BROWSABLE_PARENT_KINDS: frozenset[str] = frozenset( { str(SymbolKind.DOMAIN), str(SymbolKind.ENTITY), str(SymbolKind.DATASOURCE), } ) def _to_ref_str(ref: SemanticRefInput) -> str: return as_ref_id(ref) def _validate_kind(kind_input: SemanticKindInput) -> SemanticKind: kind_str = str(kind_input).lower() if kind_str not in _VALID_KINDS: sorted_values = ", ".join(sorted(_VALID_KINDS)) _raise( ErrorKind.UNSUPPORTED_KIND, f"Unsupported semantic kind {kind_input!r}. Supported values: {sorted_values}.", cls=SemanticRuntimeError, ) return SymbolKind(kind_str) def _normalize_location(loc: SourceLocation | DatasourceSourceLocation) -> SourceLocation: return SourceLocation(file=loc.file, line=loc.line) def _build_datasource_object(ds_ir: DatasourceIR, reg: Registry) -> SemanticObject: ref = make_ref(ds_ir.semantic_id, SemanticKind.DATASOURCE) dependents = tuple( make_ref(d.semantic_id, SemanticKind.ENTITY) for d in reg.entities.values() if d.datasource == ds_ir.semantic_id ) details = DatasourceDetails( ref=ref, kind=SemanticKind.DATASOURCE, name=ds_ir.name, domain=None, context=ds_ir.ai_context, source_location=_normalize_location(ds_ir.location), parents=(), children=(), dependents=dependents, python_symbol=ds_ir.python_symbol, backend_type=ds_ir.backend_type, fields=dict(ds_ir.fields), env_refs=dict(ds_ir.env_refs), ) return SemanticObject( ref=ref, kind=SemanticKind.DATASOURCE, name=ds_ir.name, domain=None, context=ds_ir.ai_context, source_location=_normalize_location(ds_ir.location), python_symbol=ds_ir.python_symbol, _details=details, ) def _build_domain_object(model_ir: DomainIR, reg: Registry) -> SemanticObject: ref = make_ref(model_ir.name, SemanticKind.DOMAIN) datasets_refs = tuple( make_ref(d.semantic_id, SemanticKind.ENTITY) for d in reg.entities.values() if d.domain == model_ir.name ) metrics_refs = tuple( make_ref(m.semantic_id, SemanticKind.METRIC) for m in reg.metrics.values() if m.domain == model_ir.name ) children = datasets_refs + metrics_refs details = DomainDetails( ref=ref, kind=SemanticKind.DOMAIN, name=model_ir.name, domain=model_ir.name, context=model_ir.ai_context, source_location=model_ir.location, parents=(), children=children, dependents=(), python_symbol="", default=model_ir.default, ) return SemanticObject( ref=ref, kind=SemanticKind.DOMAIN, name=model_ir.name, domain=model_ir.name, context=model_ir.ai_context, source_location=model_ir.location, python_symbol="", _details=details, ) def _build_entity_object(ds_ir: EntityIR, reg: Registry) -> SemanticObject: ref = make_ref(ds_ir.semantic_id, SemanticKind.ENTITY) ds_ref = make_ref(ds_ir.datasource, SemanticKind.DATASOURCE) fields_refs = tuple( make_ref( f.semantic_id, SemanticKind.TIME_DIMENSION if f.is_time_dimension else SemanticKind.DIMENSION, ) for f in reg.dimensions.values() if f.entity == ds_ir.semantic_id ) measure_refs = tuple( make_ref(m.semantic_id, SemanticKind.MEASURE) for m in reg.measures.values() if m.entity == ds_ir.semantic_id ) rels_refs = tuple( make_ref(r.semantic_id, SemanticKind.RELATIONSHIP) for r in reg.relationships.values() if r.from_entity == ds_ir.semantic_id or r.to_entity == ds_ir.semantic_id ) metric_refs = tuple( make_ref(m.semantic_id, SemanticKind.METRIC) for m in reg.metrics.values() if ds_ir.semantic_id in m.entities ) children = fields_refs + measure_refs + metric_refs + rels_refs metric_dependents = tuple( make_ref(m.semantic_id, SemanticKind.METRIC) for m in reg.metrics.values() if ds_ir.semantic_id in m.entities ) details = EntityDetails( ref=ref, kind=SemanticKind.ENTITY, name=ds_ir.name, domain=ds_ir.domain, context=ds_ir.ai_context, source_location=ds_ir.location, parents=(ds_ref,), children=children, dependents=metric_dependents, python_symbol=ds_ir.python_symbol, datasource=ds_ref, source=ds_ir.source, primary_key=ds_ir.primary_key, versioning=ds_ir.versioning, ) return SemanticObject( ref=ref, kind=SemanticKind.ENTITY, name=ds_ir.name, domain=ds_ir.domain, context=ds_ir.ai_context, source_location=ds_ir.location, python_symbol=ds_ir.python_symbol, _details=details, ) def _preview_timezones_for_field( *, column_name: str, field_ir: DimensionIR, datasource_timezone: object | None, report_tz: str, ) -> dict[str, dict[str, str | None]]: if not field_ir.is_time_dimension or field_ir.parse is None: return {} declared = getattr(field_ir.parse, "timezone", None) read_tz = declared read_resolution: str | None = "declared" if declared is not None else None if read_tz is None and datasource_timezone is not None: read_tz = getattr(datasource_timezone, "engine_timezone_name", None) read_resolution = getattr(datasource_timezone, "read_tz_resolution", None) kind = "instant" if read_tz is None else "localizable_wall_clock" return { column_name: { "kind": kind, "read_tz": read_tz, "report_tz": report_tz, "read_tz_resolution": read_resolution, } } def _build_dimension_object(f_ir: DimensionIR, reg: Registry) -> SemanticObject: is_time = f_ir.is_time_dimension kind = SemanticKind.TIME_DIMENSION if is_time else SemanticKind.DIMENSION ref = make_ref(f_ir.semantic_id, kind) ds_ref = make_ref(f_ir.entity, SemanticKind.ENTITY) if is_time: # Extract time-dimension metadata from the parse variant parse = f_ir.parse data_type: str | None = None fmt: str | None = None tz: str | None = None sample_interval: SampleIntervalIR | None = None if parse is None: parse_kind: ( Literal["date", "datetime", "timestamp", "strptime", "hour_prefix"] | None ) = None elif isinstance(parse, DateParse): parse_kind = "date" data_type = "date" elif isinstance(parse, DatetimeParse): parse_kind = "datetime" data_type = "datetime" tz = parse.timezone sample_interval = parse.sample_interval elif isinstance(parse, TimestampParse): parse_kind = "timestamp" data_type = "timestamp" tz = parse.timezone sample_interval = parse.sample_interval elif isinstance(parse, StrptimeParse): parse_kind = "strptime" fmt = parse.format tz = parse.timezone sample_interval = parse.sample_interval elif isinstance(parse, HourPrefixParse): parse_kind = "hour_prefix" sample_interval = parse.sample_interval else: raise AssertionError(f"unsupported time parse variant: {type(parse).__name__}") details: SemanticObjectDetails = TimeDimensionDetails( ref=ref, kind=kind, name=f_ir.name, domain=f_ir.domain, context=f_ir.ai_context, source_location=f_ir.location, parents=(ds_ref,), children=(), dependents=(), python_symbol=f_ir.python_symbol, entity=ds_ref, parse_kind=parse_kind, data_type=data_type, granularity=f_ir.granularity, format=fmt, timezone=tz, is_default=f_ir.is_default, sample_interval=sample_interval, ) else: details = DimensionDetails( ref=ref, kind=kind, name=f_ir.name, domain=f_ir.domain, context=f_ir.ai_context, source_location=f_ir.location, parents=(ds_ref,), children=(), dependents=(), python_symbol=f_ir.python_symbol, entity=ds_ref, ) return SemanticObject( ref=ref, kind=kind, name=f_ir.name, domain=f_ir.domain, context=f_ir.ai_context, source_location=f_ir.location, python_symbol=f_ir.python_symbol, _details=details, ) def _build_measure_object(m_ir: MeasureIR, reg: Registry) -> SemanticObject: ref = make_ref(m_ir.semantic_id, SemanticKind.MEASURE) entity_ref = make_ref(m_ir.entity, SemanticKind.ENTITY) dependents = tuple( make_ref(metric.semantic_id, SemanticKind.METRIC) for metric in reg.metrics.values() if metric.measure == m_ir.semantic_id ) details = MeasureDetails( ref=ref, kind=SemanticKind.MEASURE, name=m_ir.name, domain=m_ir.domain, context=m_ir.ai_context, source_location=m_ir.location, parents=(entity_ref,), children=(), dependents=dependents, python_symbol=m_ir.python_symbol, entity=entity_ref, additivity=additivity_bucket(m_ir.additivity), unit=m_ir.unit, ) return SemanticObject( ref=ref, kind=SemanticKind.MEASURE, name=m_ir.name, domain=m_ir.domain, context=m_ir.ai_context, source_location=m_ir.location, python_symbol=m_ir.python_symbol, _details=details, ) def _format_agg(agg: object) -> str | None: if agg is None: return None if isinstance(agg, tuple): return f"{agg[0]}({agg[1]})" return str(agg) def _build_metric_object(m_ir: MetricIR, reg: Registry, project: SemanticProject) -> SemanticObject: ref = make_ref(m_ir.semantic_id, SemanticKind.METRIC) entity_refs = tuple(make_ref(ds, SemanticKind.ENTITY) for ds in m_ir.entities) root_entity_ref = make_ref(m_ir.root_entity, SemanticKind.ENTITY) if m_ir.root_entity else None comp_map = composition_components(m_ir.composition) if m_ir.composition is not None else {} components = tuple( (role, make_ref(comp_ref, SemanticKind.METRIC)) for role, comp_ref in comp_map.items() ) component_refs = tuple(r for _, r in components) linear_terms = ( tuple((t.sign, t.metric) for t in m_ir.composition.terms) if isinstance(m_ir.composition, LinearComposition) else () ) required_rels: tuple[SemanticRef, ...] = () if len(m_ir.entities) > 1: required_rels = tuple( make_ref(r.semantic_id, SemanticKind.RELATIONSHIP) for r in reg.relationships.values() if r.domain == m_ir.domain and r.from_entity in m_ir.entities and r.to_entity in m_ir.entities ) parents = entity_refs + component_refs + required_rels dependents = tuple( make_ref(m2.semantic_id, SemanticKind.METRIC) for m2 in reg.metrics.values() if m2.composition is not None and m_ir.semantic_id in composition_components(m2.composition).values() ) parity_status = propagated_parity_status(project, m_ir.semantic_id) add = m_ir.additivity if m_ir.metric_type == "derived": assert m_ir.composition is not None, ( f"Derived metric {m_ir.semantic_id!r} has no composition IR" ) details: MetricDetails = DerivedMetricDetails( ref=ref, kind=SemanticKind.METRIC, name=m_ir.name, domain=m_ir.domain, context=m_ir.ai_context, source_location=m_ir.location, parents=parents, children=(), dependents=dependents, python_symbol=m_ir.python_symbol, entities=entity_refs, root_entity=root_entity_ref, composition=m_ir.composition.kind, components=components, linear_terms=linear_terms, required_relationships=required_rels, additivity=additivity_bucket(add) if add is not None else "non_additive", fold=add.fold.label() if isinstance(add, SemiAdditive) else None, status_time_dimension=add.over if isinstance(add, SemiAdditive) else None, fanout_policy=m_ir.fanout_policy, unit=m_ir.unit, provenance=m_ir.provenance, parity_status=parity_status, ) else: details = SimpleMetricDetails( ref=ref, kind=SemanticKind.METRIC, name=m_ir.name, domain=m_ir.domain, context=m_ir.ai_context, source_location=m_ir.location, parents=parents, children=(), dependents=dependents, python_symbol=m_ir.python_symbol, entities=entity_refs, root_entity=root_entity_ref, aggregation=_format_agg(m_ir.aggregation), measure=make_ref(m_ir.measure, SemanticKind.MEASURE) if m_ir.measure else None, additivity=additivity_bucket(add) if add is not None else "non_additive", fold=add.fold.label() if isinstance(add, SemiAdditive) else None, status_time_dimension=add.over if isinstance(add, SemiAdditive) else None, fanout_policy=m_ir.fanout_policy, unit=m_ir.unit, provenance=m_ir.provenance, parity_status=parity_status, ) return SemanticObject( ref=ref, kind=SemanticKind.METRIC, name=m_ir.name, domain=m_ir.domain, context=m_ir.ai_context, source_location=m_ir.location, python_symbol=m_ir.python_symbol, _details=details, ) def _build_relationship_object(r_ir: RelationshipIR, reg: Registry) -> SemanticObject: ref = make_ref(r_ir.semantic_id, SemanticKind.RELATIONSHIP) from_ref = make_ref(r_ir.from_entity, SemanticKind.ENTITY) to_ref = make_ref(r_ir.to_entity, SemanticKind.ENTITY) details = RelationshipDetails( ref=ref, kind=SemanticKind.RELATIONSHIP, name=r_ir.name, domain=r_ir.domain, context=r_ir.ai_context, source_location=r_ir.location, parents=(from_ref, to_ref), children=(), dependents=(), python_symbol="", from_entity=from_ref, to_entity=to_ref, from_keys=tuple(k.from_key for k in r_ir.keys), to_keys=tuple(k.to_key for k in r_ir.keys), ) return SemanticObject( ref=ref, kind=SemanticKind.RELATIONSHIP, name=r_ir.name, domain=r_ir.domain, context=r_ir.ai_context, source_location=r_ir.location, python_symbol="", _details=details, ) # --------------------------------------------------------------------------- # SemanticCatalog # ---------------------------------------------------------------------------
[docs] class SemanticCatalog: """Read-only object graph over a loaded semantic project. Args: project: A loaded SemanticProject instance (status must be 'ready'). Returns: SemanticCatalog with list(), get(), preview(), readiness(), and verify_object() methods. Example: >>> catalog = ms.load() >>> catalog.list().show() >>> catalog.list("sales").show() >>> catalog.list(kind="metric").show() # all metrics across domains >>> catalog.list(domain="sales", kind="metric").show() >>> revenue = catalog.get("sales.revenue") >>> revenue.details().additivity Constraints: catalog is obtained via ms.load(), not constructed directly. SemanticCatalog objects do not expose internal IR instances. """ def __init__(self, project: SemanticProject) -> None: self._project = project self._reg = project._registry @property def semantic_root(self) -> Path: """Return the semantic root path (models/semantic/).""" return self._project.semantic_root @property def workspace_dir(self) -> Path: """Return the workspace directory path.""" return self._project.workspace_dir
[docs] def load( self, *, domains: str | Sequence[str] | None = None, ) -> None: """Reload the semantic project from disk and refresh the catalog registry. Args: domains: When specified, only those domain directories are loaded. Pass a single domain name as a string or a list of names. When omitted, the previously active filter (if any) is reused. Example: >>> catalog.load(domains="sales") >>> catalog.load(domains=["sales", "inventory"]) """ if isinstance(domains, str): domains = [domains] resolved = ( domains if domains is not None else ( list(self._project._filtered_domains) if self._project._filtered_domains else None ) ) result = self._project.load(domains=resolved) self._reg = self._project._registry if result.status != "ready": raise SemanticLoadFailed(result.errors)
def _require_ready(self) -> Registry: reg = self._reg if self._project.is_ready() and reg is not None: return reg errors = self._project.errors() if errors: raise SemanticLoadFailed(errors) _raise( ErrorKind.PROJECT_NOT_LOADED, "Semantic catalog is not loaded. Call catalog.load() before browsing.", cls=SemanticRuntimeError, )
[docs] def list( self, parent: SemanticRefInput | None = None, *, kind: SemanticKindInput | None = None, domain: str | None = None, ) -> SemanticObjectList: """Browse the semantic hierarchy under the given parent ref. Args: parent: Full semantic ref of the parent to browse under. None returns top-level domains and datasources. A domain ref (e.g. "sales") returns entities, metrics, and relationships. An entity ref (e.g. "sales.orders") returns dimensions, time dimensions, relationships, and a filtered metric view. kind: Optional kind filter. Accepts SemanticKind values or strings such as "metric", "dimension". Raises an error on unsupported values. At the top level (no parent, no domain), leaf kinds such as "metric" search across all domains. domain: Optional domain name to scope results. Equivalent to using ``parent`` with a domain ref, but can be combined with ``kind`` for filtered domain-level browsing. Mutually exclusive with ``parent``. Returns: SemanticObjectList with .show(), .refs(), and .objects. Example: >>> catalog.list().show() >>> catalog.list("sales").show() >>> catalog.list("sales.orders", kind="metric").show() >>> catalog.list(kind="metric").show() # all metrics >>> catalog.list(domain="sales", kind="metric").show() # metrics in one domain Constraints: Only full semantic refs are accepted as parents. Non-container refs (metric, field, time_field, relationship) raise an unsupported-parent error. ``parent`` and ``domain`` are mutually exclusive. """ reg = self._require_ready() if parent is not None and domain is not None: _raise( ErrorKind.CONFLICTING_PARAMETERS, "catalog.list() 'parent' and 'domain' are mutually exclusive. " "Use catalog.list(domain=...) with an optional kind= filter, " "or catalog.list(parent=...) for hierarchy browsing.", cls=SemanticRuntimeError, constraint_id=ConstraintId.CATALOG_PARAMETERS_COMPATIBLE, ) validated_kind = _validate_kind(kind) if kind is not None else None # Domain shortcut: scope to a single domain if domain is not None: if domain not in reg.domains: available = sorted(reg.domains.keys()) _raise( ErrorKind.NOT_FOUND, f"Domain {domain!r} was not found. Available domains: {available}.", cls=SemanticRuntimeError, refs=(domain,), ) items = self._list_under_model(domain, reg, validated_kind) return SemanticObjectList( items=tuple(items), parent_label=domain, kind_filter=str(kind) if kind else None, ) if parent is None: items = self._list_top_level(reg, validated_kind) return SemanticObjectList( items=tuple(items), parent_label=None, kind_filter=str(kind) if kind else None, ) parent_str = _to_ref_str(parent) # Resolve parent kind from registry parent_kind = self._resolve_kind_of(parent_str, reg) if parent_kind is None: self._raise_not_found(parent_str) # Guard: only model, datasource, and dataset refs can be browsed if str(parent_kind) not in _BROWSABLE_PARENT_KINDS: _raise( ErrorKind.UNSUPPORTED_LIST_PARENT, f"Semantic object {parent_str!r} is a {parent_kind} and cannot be used as a " f"catalog list parent. Use catalog.get({parent_str!r}).details() to inspect dependencies.", cls=SemanticRuntimeError, refs=(parent_str,), ) if parent_kind == SemanticKind.DOMAIN: items = self._list_under_model(parent_str, reg, validated_kind) elif parent_kind == SemanticKind.DATASOURCE: items = self._list_under_datasource(parent_str, reg, validated_kind) else: items = self._list_under_dataset(parent_str, reg, validated_kind) return SemanticObjectList( items=tuple(items), parent_label=parent_str, kind_filter=str(kind) if kind else None, )
def _list_top_level( self, reg: Registry, kind_filter: SemanticKind | None, ) -> _ListOfSemanticObject: items: list[SemanticObject] = [] if kind_filter is None or kind_filter == SemanticKind.DOMAIN: for model_ir in reg.domains.values(): items.append(_build_domain_object(model_ir, reg)) if kind_filter is None or kind_filter == SemanticKind.DATASOURCE: datasource_irs = self._project._datasource_irs or tuple(reg.datasources.values()) for ds_ir in datasource_irs: items.append(_build_datasource_object(ds_ir, reg)) if kind_filter == SemanticKind.ENTITY: for entity_ir in reg.entities.values(): items.append(_build_entity_object(entity_ir, reg)) if kind_filter == SemanticKind.DIMENSION: for f_ir in reg.dimensions.values(): if not f_ir.is_time_dimension: items.append(_build_dimension_object(f_ir, reg)) if kind_filter == SemanticKind.TIME_DIMENSION: for f_ir in reg.dimensions.values(): if f_ir.is_time_dimension: items.append(_build_dimension_object(f_ir, reg)) if kind_filter == SemanticKind.METRIC: for m_ir in reg.metrics.values(): items.append(_build_metric_object(m_ir, reg, self._project)) if kind_filter == SemanticKind.RELATIONSHIP: for r_ir in reg.relationships.values(): items.append(_build_relationship_object(r_ir, reg)) if kind_filter == SemanticKind.MEASURE: for meas_ir in reg.measures.values(): items.append(_build_measure_object(meas_ir, reg)) return items def _list_under_model( self, model_name: str, reg: Registry, kind_filter: SemanticKind | None, ) -> _ListOfSemanticObject: items: list[SemanticObject] = [] if kind_filter is None or kind_filter == SemanticKind.ENTITY: for ds_ir in reg.entities.values(): if ds_ir.domain == model_name: items.append(_build_entity_object(ds_ir, reg)) if kind_filter is None or kind_filter == SemanticKind.METRIC: for m_ir in reg.metrics.values(): if m_ir.domain == model_name: items.append(_build_metric_object(m_ir, reg, self._project)) if kind_filter is None or kind_filter == SemanticKind.RELATIONSHIP: for r_ir in reg.relationships.values(): if r_ir.domain == model_name: items.append(_build_relationship_object(r_ir, reg)) if kind_filter is None or kind_filter == SemanticKind.MEASURE: for meas_ir in reg.measures.values(): if meas_ir.domain == model_name: items.append(_build_measure_object(meas_ir, reg)) return items def _list_under_datasource( self, datasource_ref: str, reg: Registry, kind_filter: SemanticKind | None, ) -> _ListOfSemanticObject: items: list[SemanticObject] = [] if kind_filter is None or kind_filter == SemanticKind.ENTITY: for ds_ir in reg.entities.values(): if ds_ir.datasource == datasource_ref: items.append(_build_entity_object(ds_ir, reg)) if kind_filter is None or kind_filter == SemanticKind.MEASURE: entity_ids_for_datasource = { e.semantic_id for e in reg.entities.values() if e.datasource == datasource_ref } for meas_ir in reg.measures.values(): if meas_ir.entity in entity_ids_for_datasource: items.append(_build_measure_object(meas_ir, reg)) return items def _list_under_dataset( self, dataset_ref: str, reg: Registry, kind_filter: SemanticKind | None, ) -> _ListOfSemanticObject: items: list[SemanticObject] = [] if kind_filter is None or kind_filter == SemanticKind.DIMENSION: for f_ir in reg.dimensions.values(): if f_ir.entity == dataset_ref and not f_ir.is_time_dimension: items.append(_build_dimension_object(f_ir, reg)) if kind_filter is None or kind_filter == SemanticKind.TIME_DIMENSION: for f_ir in reg.dimensions.values(): if f_ir.entity == dataset_ref and f_ir.is_time_dimension: items.append(_build_dimension_object(f_ir, reg)) if kind_filter is None or kind_filter == SemanticKind.MEASURE: for meas_ir in reg.measures.values(): if meas_ir.entity == dataset_ref: items.append(_build_measure_object(meas_ir, reg)) if kind_filter is None or kind_filter == SemanticKind.RELATIONSHIP: for r_ir in reg.relationships.values(): if r_ir.from_entity == dataset_ref or r_ir.to_entity == dataset_ref: items.append(_build_relationship_object(r_ir, reg)) if kind_filter is None or kind_filter == SemanticKind.METRIC: seen: set[str] = set() for m_ir in reg.metrics.values(): if dataset_ref in m_ir.entities and m_ir.semantic_id not in seen: seen.add(m_ir.semantic_id) items.append(_build_metric_object(m_ir, reg, self._project)) return items def _resolve_kind_of(self, ref_str: str, reg: Registry) -> SemanticKind | None: if ref_str in reg.domains: return SemanticKind.DOMAIN datasource_irs = self._project._datasource_irs or tuple(reg.datasources.values()) for ds_ir in datasource_irs: if ds_ir.semantic_id == ref_str: return SemanticKind.DATASOURCE if ref_str in reg.entities: return SemanticKind.ENTITY if ref_str in reg.dimensions: f = reg.dimensions[ref_str] return SemanticKind.TIME_DIMENSION if f.is_time_dimension else SemanticKind.DIMENSION if ref_str in reg.measures: return SemanticKind.MEASURE if ref_str in reg.metrics: return SemanticKind.METRIC if ref_str in reg.relationships: return SemanticKind.RELATIONSHIP return None def _raise_not_found(self, ref_str: str) -> NoReturn: reg = self._reg suggestion = _suggest_ref_level(reg, ref_str) if reg is not None else None if suggestion is not None: message = f"Semantic object {ref_str!r} was not found. {suggestion}" else: message = ( f"Semantic object {ref_str!r} was not found. " f"`catalog.get(...)` requires a full semantic ref such as 'sales.revenue'.\n" f"Use catalog.list().show(), catalog.list('<domain>').show(), and then\n" f"catalog.list('<domain.entity>').show() to browse object refs." ) _raise( ErrorKind.NOT_FOUND, message, cls=SemanticRuntimeError, refs=(ref_str,), )
[docs] def get(self, ref: SemanticRefInput) -> SemanticObject: """Retrieve a single semantic object by full ref. Args: ref: Full semantic ref string or SemanticRef (e.g. "sales.revenue"). Returns: SemanticObject for the requested ref. Example: >>> revenue = catalog.get("sales.revenue") >>> revenue.details().additivity Constraints: Raises a typed not-found error when no object exists. Does not return None. Short names such as "revenue" raise the not-found error with browse guidance. """ reg = self._require_ready() ref_str = _to_ref_str(ref) obj = self._get_object(ref_str, reg) if obj is None: self._raise_not_found(ref_str) return obj
def _get_object(self, ref_str: str, reg: Registry) -> SemanticObject | None: if ref_str in reg.domains: return _build_domain_object(reg.domains[ref_str], reg) datasource_irs = self._project._datasource_irs or tuple(reg.datasources.values()) for ds_ir in datasource_irs: if ds_ir.semantic_id == ref_str: return _build_datasource_object(ds_ir, reg) if ref_str in reg.entities: return _build_entity_object(reg.entities[ref_str], reg) if ref_str in reg.dimensions: return _build_dimension_object(reg.dimensions[ref_str], reg) if ref_str in reg.measures: return _build_measure_object(reg.measures[ref_str], reg) if ref_str in reg.metrics: return _build_metric_object(reg.metrics[ref_str], reg, self._project) if ref_str in reg.relationships: return _build_relationship_object(reg.relationships[ref_str], reg) return None
[docs] def readiness( self, refs: Sequence[SemanticRefInput] | None = None, ) -> ReadinessReport: """Run structural readiness check for the given semantic refs. Performs pure in-memory checks without datasource connectivity. For runtime validation, use ``catalog.preview(...)``, ``project.parity_check(...)``, and ``project.richness()``. Args: refs: Semantic refs to check. Resolves the full dependency closure for each ref. None checks all loaded objects. Returns: ReadinessReport indicating whether analysis handoff is safe. Example: >>> report = catalog.readiness(refs=[revenue.ref, region.ref]) >>> if report.status == "blocked": ... report.show() ... raise SystemExit Constraints: This is the required semantic gate before passing refs to analysis APIs. """ self._require_ready() str_refs = [_to_ref_str(r) for r in refs] if refs is not None else None return self._project.readiness(refs=str_refs)
[docs] def verify_object( self, ref: SemanticRefInput, *, scope: ScanScope | None = None, ) -> VerifyResult: """Verify a single authored semantic object is reachable and valid. Automatically reloads the catalog from disk so that newly authored objects are visible without a separate ``catalog.load()`` call. For domains, relationships, and dimensions this is a static-only check. For entities, a scoped preview confirms the datasource is reachable and the expression is valid. For time dimensions, metrics, and derived metrics, the check is static and auto-records a decision into the evidence ledger (``time_dimension_identity`` or ``metric_composition`` respectively). Args: ref: Full semantic ref string or SemanticRef to verify. scope: Scan scope controlling partition, max rows, and timeout. Defaults to ``ScanScope()``. Returns: VerifyResult with status, issues, and optional scan report. Example: >>> result = catalog.verify_object("sales.orders") >>> if result.status == "failed": ... result.show() Constraints: ``verify_object`` is enforced by the authoring ladder: prepare APIs for dimensions, time dimensions, metrics, relationships, and cross-entity metrics raise ``LadderOrderError`` if the entity has not passed verification. """ with contextlib.suppress(SemanticLoadFailed): # Project failed to load; let _project.verify_object handle it # so we get a proper VerifyResult with the real load errors # instead of an unhandled exception. self.load() ref_str = _to_ref_str(ref) result = self._project.verify_object(ref_str, scope=scope) self._reg = self._project._registry return result
def _resolver( self, *, connections: object | None = None, sample_size: int | None = None, ) -> SemanticResolver: """Return an internal resolver backed by Materializer.""" self._require_ready() if connections is None: connections = self._project._connection_service() from marivo.semantic.resolver import SemanticResolver return SemanticResolver(self, connections=connections, sample_size=sample_size)
[docs] def preview( self, ref: SemanticRefInput, *, limit: int = PREVIEW_DEFAULT_LIMIT, include_types: bool = True, context_columns: Iterable[str] | None = None, ) -> PreviewResult: """Return a bounded preview for an entity, dimension, time dimension, measure, or metric. Args: ref: Full semantic ref string or SemanticRef to preview. limit: Maximum number of preview rows to return. include_types: Whether to include backend schema type strings. context_columns: Optional parent-entity columns to include before a dimension or time-dimension preview value. Returns: PreviewResult with bounded rows, display columns, warnings, and sample policy metadata. Example: >>> catalog.preview("sales.orders.region", context_columns=("order_id",)) >>> catalog.preview("sales.orders.amount") >>> catalog.preview("sales.revenue").warnings Constraints: ``context_columns`` is valid only for dimension and time-dimension refs. Measure previews show bounded row-level values. Metric previews use the existing approximate pre-aggregate sample behavior. """ reg = self._require_ready() ref_str = _to_ref_str(ref) kind = self._resolve_kind_of(ref_str, reg) if kind is None: self._raise_not_found(ref_str) from marivo.datasource.timezone import system_timezone_name resolver = self._resolver( sample_size=METRIC_PREVIEW_SAMPLE_SIZE if kind == SemanticKind.METRIC else None ) if kind == SemanticKind.ENTITY: if context_columns is not None: _raise( ErrorKind.MATERIALIZE_FAILED, "catalog.preview(..., context_columns=...) is only valid for dimension refs.", cls=SemanticRuntimeError, refs=(ref_str,), ) preview_limit = validate_preview_limit(limit) table = resolver.table(make_ref(ref_str, SemanticKind.ENTITY)) report_tz = system_timezone_name() return preview_ibis_table( table, kind="semantic_dataset", ref=ref_str, limit=preview_limit, sample_policy=PreviewSamplePolicy(method="bounded_limit", limit=preview_limit), include_types=include_types, report_tz=report_tz, ) if kind == SemanticKind.MEASURE: if context_columns is not None: _raise( ErrorKind.MATERIALIZE_FAILED, "catalog.preview(..., context_columns=...) is only valid for dimension refs.", cls=SemanticRuntimeError, refs=(ref_str,), ) preview_limit = validate_preview_limit(limit) measure_ir = reg.measures[ref_str] parent_table = resolver.table(make_ref(measure_ir.entity, SemanticKind.ENTITY)) measure_value = resolver.measure(make_ref(ref_str, SemanticKind.MEASURE)) measure_column_name = ref_str.rsplit(".", 1)[-1] preview_table = parent_table.select(measure_value.name(measure_column_name)) report_tz = system_timezone_name() return preview_ibis_table( preview_table, kind="semantic_measure", ref=ref_str, limit=preview_limit, sample_policy=PreviewSamplePolicy(method="bounded_limit", limit=preview_limit), include_types=include_types, report_tz=report_tz, ) if kind in {SemanticKind.DIMENSION, SemanticKind.TIME_DIMENSION}: preview_limit = validate_preview_limit(limit) field_ir = reg.dimensions[ref_str] parent_table = resolver.table(make_ref(field_ir.entity, SemanticKind.ENTITY)) field_value = resolver.dimension(make_ref(ref_str, kind)) field_column_name = ref_str.rsplit(".", 1)[-1] report_tz = system_timezone_name() datasource_timezone = None if kind == SemanticKind.TIME_DIMENSION: entity_ir = reg.entities[field_ir.entity] connections = getattr(resolver, "connections", None) engine_tz_method = getattr(connections, "engine_timezone", None) if callable(engine_tz_method): datasource_timezone = engine_tz_method(entity_ir.datasource) if context_columns is None: selected_context = tuple( column for column in parent_table.columns if column != field_column_name )[:3] else: selected_context = tuple(context_columns) missing_context = [ column for column in selected_context if column not in parent_table.columns ] if missing_context: _raise( ErrorKind.MATERIALIZE_FAILED, f"Field preview context columns are not present on parent dataset: {missing_context}", cls=SemanticRuntimeError, refs=(ref_str,), ) preview_table = parent_table.select( *[parent_table[column] for column in selected_context], field_value.name(field_column_name), ) return preview_ibis_table( preview_table, kind="semantic_field", ref=ref_str, limit=preview_limit, sample_policy=PreviewSamplePolicy(method="bounded_limit", limit=preview_limit), include_types=include_types, timezones=_preview_timezones_for_field( column_name=field_column_name, field_ir=field_ir, datasource_timezone=datasource_timezone, report_tz=report_tz, ), report_tz=report_tz, ) if kind == SemanticKind.METRIC: if context_columns is not None: _raise( ErrorKind.MATERIALIZE_FAILED, "catalog.preview(..., context_columns=...) is only valid for dimension refs.", cls=SemanticRuntimeError, refs=(ref_str,), ) preview_limit = validate_preview_limit(limit) metric_value = resolver.metric(make_ref(ref_str, SemanticKind.METRIC)) result = preview_ibis_value( metric_value, kind="semantic_metric", ref=ref_str, limit=preview_limit, column_name="value", sample_policy=PreviewSamplePolicy( method="pre_aggregate_limit", limit=preview_limit ), include_types=include_types, ) return PreviewResult( kind=result.kind, ref=result.ref, columns=result.columns, types=result.types, rows=result.rows, requested_limit=result.requested_limit, returned_row_count=result.returned_row_count, is_truncated=result.is_truncated, warnings=( *result.warnings, PreviewWarning( kind="approximate_preview", message=f"metric computed on {METRIC_PREVIEW_SAMPLE_SIZE} row sample, result is approximate", ), ), sample_policy=result.sample_policy, timezones=result.timezones, ) _raise( ErrorKind.MATERIALIZE_FAILED, f"catalog.preview() does not support {kind} refs.", cls=SemanticRuntimeError, refs=(ref_str,), details={"kind": str(kind)}, )
[docs] def load( *, workspace_dir: str | Path | None = None, domains: str | Sequence[str] | None = None, ) -> SemanticCatalog: """Load a semantic project and return a browseable SemanticCatalog. Args: workspace_dir: Path to the project root containing ``marivo.toml``. Defaults to the current working directory when omitted. domains: When specified, only those domain directories are loaded. Pass a single domain name as a string or a list of names. Cross-domain references to filtered-out domains produce warnings instead of errors, so the registry remains usable. Returns: SemanticCatalog on success. Example: >>> import marivo.semantic as ms >>> catalog = ms.load() >>> catalog.list().show() >>> catalog = ms.load(domains=["sales"]) >>> catalog.list().show() Constraints: Raises a typed load error on failure. Does not return a partial catalog. Does not print to stdout. """ import os from marivo.semantic.reader import SemanticProject if workspace_dir is None: env = os.environ.get("MARIVO_PROJECT_ROOT") workspace_dir = env if env else Path.cwd() project = SemanticProject(workspace_dir=workspace_dir) result = project.load(domains=domains) if result.status != "ready": from marivo.semantic.errors import SemanticLoadFailed raise SemanticLoadFailed(result.errors) return SemanticCatalog(project)