"""Authoring decorators and builders for marivo.semantic v1.1.
All authoring symbols (domain, entity, dimension, measure, metric,
time_dimension, aggregate, ratio, weighted_average, linear,
semi_additive, relationship, ref, from_sql, join_on) are defined here.
"""
from __future__ import annotations
import ast
import hashlib
import inspect
import textwrap
from collections.abc import Callable
from collections.abc import Sequence as _Sequence
from typing import Any, Literal
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from marivo.datasource.authoring import DatasourceRef
from marivo.datasource.scan import csv as _datasource_csv
from marivo.datasource.scan import parquet as _datasource_parquet
from marivo.datasource.scan import table as _datasource_table
from marivo.semantic.constraints import ConstraintId
from marivo.semantic.errors import ErrorKind, SemanticDecoratorError, _raise
from marivo.semantic.ir import (
Additivity,
AggKind,
AiContextIR,
Composition,
CsvSourceIR,
DateParse,
DatetimeParse,
DimensionIR,
DimensionKind,
DomainIR,
EntityIR,
EntitySourceIR,
HourPrefixParse,
JoinKey,
LinearComposition,
LinearTerm,
MeasureIR,
MetricIR,
ParquetSourceIR,
RatioComposition,
RelationshipIR,
SampleIntervalIR,
SemanticParse,
SemiAdditive,
SnapshotVersioningIR,
SourceLocation,
SqlProvenance,
StrptimeParse,
TableSourceIR,
TimeFoldIR,
TimestampParse,
ValidityVersioningIR,
WeightedAverageComposition,
is_time_bearing_format,
)
from marivo.semantic.loader import _LOADER_CTX, LoaderContext
from marivo.semantic.refs import (
DimensionRef,
DomainRef,
EntityRef,
MeasureRef,
MetricRef,
RelationshipRef,
TimeDimensionRef,
)
from marivo.semantic.time_format import normalize_strptime
from marivo.semantic.typing import AiContextValue
from marivo.semantic.validator import validate_metric_body_ast
__all__ = [
"DomainRef",
"aggregate",
"ai_context",
"csv",
"datetime",
"dimension",
"domain",
"entity",
"from_sql",
"hour_prefix",
"join_on",
"linear",
"measure",
"metric",
"parquet",
"ratio",
"ref",
"relationship",
"semi_additive",
"snapshot",
"strptime",
"table",
"time_dimension",
"timestamp",
"validity",
"weighted_average",
]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _require_ctx() -> LoaderContext:
"""Get the current LoaderContext or raise OutsideLoaderContextError."""
ctx = _LOADER_CTX.get()
if ctx is None:
_raise(
ErrorKind.OUTSIDE_LOADER_CONTEXT,
"Semantic decorators can only be used inside files loaded by the semantic project loader.",
cls=SemanticDecoratorError,
)
return ctx
def _resolve_domain(explicit: DomainRef | None, ctx: LoaderContext) -> str:
"""Resolve the domain name: explicit ref > default_domain > error."""
if isinstance(explicit, DomainRef):
return explicit.id
if explicit is not None:
return explicit
if ctx.default_domain is not None:
return ctx.default_domain
_raise(
ErrorKind.MISSING_DOMAIN,
"No domain name specified and no default domain is set. "
"Call ms.domain(name=...) before declaring semantic objects.",
cls=SemanticDecoratorError,
)
def _ir_kind(ir: Any) -> str:
"""Return a human-readable kind label for an IR object."""
if isinstance(ir, DimensionIR):
return "time dimension" if ir.is_time_dimension else "dimension"
if isinstance(ir, MeasureIR):
return "measure"
if isinstance(ir, MetricIR):
return "metric"
if isinstance(ir, EntityIR):
return "entity"
if isinstance(ir, RelationshipIR):
return "relationship"
return type(ir).__name__
def _check_duplicate(
ctx: LoaderContext,
semantic_id: str,
ir_type: type[EntityIR | DimensionIR | MeasureIR | MetricIR | RelationshipIR],
) -> None:
"""Raise DUPLICATE_NAME if semantic_id already in pending_objects of the same kind.
Also checks for cross-kind collisions between DimensionIR and MeasureIR,
which share the entity-qualified namespace (``<domain>.<entity>.<field>``).
"""
# DimensionIR and MeasureIR share the same semantic_id namespace, so a
# dimension "sales.orders.amount" and a measure "sales.orders.amount" would
# silently collide at the catalog/assembly layer. Other kinds (EntityIR,
# MetricIR, RelationshipIR) use the domain-qualified namespace and are
# intentionally allowed to share names with different kinds.
_cross_kinds: set[type[DimensionIR | MeasureIR]] = {DimensionIR, MeasureIR}
for ir, _ in ctx.pending_objects:
if not isinstance(ir, (EntityIR, DimensionIR, MeasureIR, MetricIR, RelationshipIR)):
continue
if ir.semantic_id != semantic_id:
continue
existing_kind = _ir_kind(ir)
if isinstance(ir, ir_type):
_raise(
ErrorKind.DUPLICATE_NAME,
f"Name conflict: {semantic_id!r} is already declared as a {existing_kind}.",
cls=SemanticDecoratorError,
refs=(semantic_id,),
)
# Cross-kind collision: only dimension vs measure (same entity-qualified namespace)
if ir_type in _cross_kinds and type(ir) in _cross_kinds:
_raise(
ErrorKind.DUPLICATE_NAME,
f"Name conflict: {semantic_id!r} is already claimed by a {existing_kind}. "
f"Use a different name for this object.",
cls=SemanticDecoratorError,
refs=(semantic_id,),
)
[docs]
def ai_context(
*,
business_definition: str | None = None,
guardrails: _Sequence[str] | None = None,
synonyms: _Sequence[str] | None = None,
examples: _Sequence[str] | None = None,
instructions: str | None = None,
owner_notes: str | None = None,
) -> AiContextValue:
"""Construct a validated AiContext for semantic objects.
Provides typed, IDE-friendly construction of AI context with eager
validation. Invalid key names are caught at call time by Python's
keyword argument checking; value-type mismatches raise
``SemanticDecoratorError`` with ``[invalid_ai_context]`` including
the caller's file and line.
Args:
business_definition: Plain-language description of what the object represents.
guardrails: Constraints on how the object should be used.
synonyms: Alternative names for the object.
examples: Example questions or use cases.
instructions: Usage guidance for AI agents.
owner_notes: Team or ownership notes.
Returns:
A validated ``AiContextValue`` for use with ``ai_context=`` parameters.
Example:
>>> ctx = ms.ai_context(
... business_definition="Total revenue from all orders",
... guardrails=["Do not use for margin calculations"],
... synonyms=["total_sales"],
... )
>>> revenue = ms.aggregate(name="revenue", measure=amount, agg="sum", ai_context=ctx)
Raises:
SemanticDecoratorError: If any value has the wrong type.
"""
location = _user_caller_location()
# Validate list-type fields
for label, value in (
("guardrails", guardrails),
("synonyms", synonyms),
("examples", examples),
):
if value is None:
continue
if not isinstance(value, list | tuple) or not all(isinstance(item, str) for item in value):
_raise(
ErrorKind.INVALID_AI_CONTEXT,
f"ms.ai_context({label}=...) requires list[str] or tuple[str, ...], "
f"got {type(value).__name__}.",
cls=SemanticDecoratorError,
location=location,
)
# Validate string-type fields
for label, value in (
("business_definition", business_definition),
("instructions", instructions),
("owner_notes", owner_notes),
):
if value is None:
continue
if not isinstance(value, str):
_raise(
ErrorKind.INVALID_AI_CONTEXT,
f"ms.ai_context({label}=...) requires str, got {type(value).__name__}.",
cls=SemanticDecoratorError,
location=location,
)
return AiContextValue(
business_definition=business_definition,
guardrails=tuple(guardrails) if guardrails is not None else (),
synonyms=tuple(synonyms) if synonyms is not None else (),
examples=tuple(examples) if examples is not None else (),
instructions=instructions,
owner_notes=owner_notes,
)
def _build_ai_context(ai_context: AiContextValue | None) -> AiContextIR:
"""Convert a validated AiContextValue into an AiContextIR.
Rejects raw dicts with a teachable error directing the user to
``ms.ai_context(...)``. Since ``AiContextValue`` is validated at
construction time by ``ms.ai_context()`` or ``__post_init__``, no
further validation is needed for genuine ``AiContextValue`` instances.
"""
if ai_context is None:
return AiContextIR()
if not isinstance(ai_context, AiContextValue):
_raise(
ErrorKind.INVALID_AI_CONTEXT,
"ai_context no longer accepts raw dicts. "
"Use ms.ai_context(business_definition=..., guardrails=[...]) "
"to construct an AiContextValue.",
cls=SemanticDecoratorError,
)
return AiContextIR(
business_definition=ai_context.business_definition,
guardrails=ai_context.guardrails,
synonyms=ai_context.synonyms,
examples=ai_context.examples,
instructions=ai_context.instructions,
owner_notes=ai_context.owner_notes,
)
def _compute_body_ast_hash(fn: Callable[..., Any]) -> str:
"""Compute a SHA-256 hash of the function body AST."""
try:
source = inspect.getsource(fn)
# Dedent to handle functions defined inside decorators/tests
source = textwrap.dedent(source)
tree = ast.parse(source)
# Find the function definition node
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
# Hash just the body statements (not the decorator or signature)
body_source = ast.get_source_segment(source, node)
if body_source is not None:
return hashlib.sha256(body_source.encode()).hexdigest()[:16]
# Fallback: hash the entire source
return hashlib.sha256(source.encode()).hexdigest()[:16]
except (OSError, TypeError, IndentationError):
return hashlib.sha256(b"<unavailable>").hexdigest()[:16]
def _compute_agg_hash(measure_id: str, agg: Any, fold: TimeFoldIR | None) -> str:
payload = repr(
{"measure": measure_id, "agg": agg, "fold": (fold.kind, fold.q) if fold else None}
)
return hashlib.sha256(payload.encode()).hexdigest()[:16]
def _validate_unit(unit: str | None, semantic_id: str, object_kind: str = "metric") -> None:
if unit is None:
return
if unit == "" or any(not (0x21 <= ord(ch) <= 0x7E) for ch in unit):
_raise(
ErrorKind.INVALID_REF,
f"{object_kind} {semantic_id!r}: unit must be a non-empty token of printable "
f"ASCII without whitespace (UCUM case-sensitive code such as 'CNY', "
f"'%', '1', 'ms', '{{order}}'); got {unit!r}.",
refs=(semantic_id,),
cls=SemanticDecoratorError,
)
def _normalize_sample_interval(
sample_interval: tuple[int, str] | None,
*,
semantic_id: str,
data_type: str,
granularity: str,
) -> SampleIntervalIR | None:
if sample_interval is None:
return None
count, unit = sample_interval
if unit not in {"minute", "hour"} or count <= 0:
_raise(
ErrorKind.INVALID_SAMPLE_INTERVAL,
"sample_interval must use a positive minute or hour interval.",
refs=(semantic_id,),
cls=SemanticDecoratorError,
constraint_id=ConstraintId.SAMPLE_INTERVAL_VALID,
)
seconds = count * (60 if unit == "minute" else 3600)
if 86400 % seconds != 0:
_raise(
ErrorKind.INVALID_SAMPLE_INTERVAL,
"sample_interval must divide one day evenly.",
refs=(semantic_id,),
cls=SemanticDecoratorError,
constraint_id=ConstraintId.SAMPLE_INTERVAL_VALID,
)
if data_type not in {"datetime", "timestamp", "string", "integer"}:
_raise(
ErrorKind.INVALID_SAMPLE_INTERVAL,
"sample_interval is only supported on datetime, timestamp, string, or integer time dimensions.",
refs=(semantic_id,),
cls=SemanticDecoratorError,
constraint_id=ConstraintId.SAMPLE_INTERVAL_VALID,
)
rank = {
"second": 0,
"minute": 1,
"hour": 2,
"day": 3,
"week": 4,
"month": 5,
"quarter": 6,
"year": 7,
}
if rank[granularity] > rank[unit]:
allowed = [g for g, r in sorted(rank.items(), key=lambda kv: kv[1]) if r <= rank[unit]]
allowed_list = ", ".join(repr(g) for g in allowed)
_raise(
ErrorKind.INVALID_SAMPLE_INTERVAL,
f"time dimension {semantic_id!r}: physical granularity {granularity!r} cannot "
f"be coarser than sample_interval unit {unit!r}. Set granularity to {unit!r} or "
f"finer (one of: {allowed_list}).",
refs=(semantic_id,),
cls=SemanticDecoratorError,
constraint_id=ConstraintId.SAMPLE_INTERVAL_VALID,
)
return SampleIntervalIR(count=count, unit=unit) # type: ignore[arg-type]
def _validate_timezone(timezone: str) -> None:
"""Validate that timezone is a valid IANA timezone name."""
try:
ZoneInfo(timezone)
except ZoneInfoNotFoundError:
_raise(
ErrorKind.INVALID_REF,
f"timezone {timezone!r} is not a valid IANA timezone name.",
cls=SemanticDecoratorError,
constraint_id=ConstraintId.REF_SHAPE,
)
def _normalize_sample_interval_value(
sample_interval: tuple[int, Literal["minute", "hour"]] | None,
) -> SampleIntervalIR | None:
"""Normalize a sample_interval tuple from a parse builder, using a synthetic semantic_id."""
if sample_interval is None:
return None
_count, unit = sample_interval
return _normalize_sample_interval(
sample_interval,
semantic_id="<parse>",
data_type="datetime",
granularity=unit,
)
def _normalize_time_fold(
time_fold: str | tuple[str, float] | None,
*,
semantic_id: str,
) -> TimeFoldIR | None:
if time_fold is None:
return None
if isinstance(time_fold, str):
if time_fold not in {"mean", "min", "max", "first", "last"}:
_raise(
ErrorKind.INVALID_TIME_FOLD,
f"time_fold {time_fold!r} is not supported.",
refs=(semantic_id,),
cls=SemanticDecoratorError,
constraint_id=ConstraintId.TIME_FOLD_VALID,
)
return TimeFoldIR(kind=time_fold) # type: ignore[arg-type]
kind, q = time_fold
if kind != "quantile" or not isinstance(q, (float, int)) or not 0 < float(q) < 1:
_raise(
ErrorKind.INVALID_TIME_FOLD,
"quantile time_fold must be ('quantile', q) with 0 < q < 1.",
refs=(semantic_id,),
cls=SemanticDecoratorError,
constraint_id=ConstraintId.TIME_FOLD_VALID,
)
return TimeFoldIR(kind="quantile", q=float(q))
def _normalize_additivity(additivity: Additivity, *, semantic_id: str) -> Additivity:
"""Validate an Additivity value (literal or SemiAdditive variant)."""
if isinstance(additivity, SemiAdditive):
return additivity
if additivity in ("additive", "non_additive"):
return additivity
_raise(
ErrorKind.INVALID_REF,
f"Metric {semantic_id!r}: additivity must be 'additive', 'non_additive', "
"or ms.semi_additive(over=..., fold=...).",
cls=SemanticDecoratorError,
constraint_id=ConstraintId.REF_SHAPE,
)
def _caller_location() -> SourceLocation:
"""Best-effort source location from the caller's frame.
Walks up 1 frame: ``_caller_location`` -> caller (decorator).
Reports the file/line of the decorator call site.
"""
frame = inspect.currentframe()
# Walk up: _caller_location -> decorator
try:
if frame is not None and frame.f_back is not None:
caller_frame = frame.f_back
if caller_frame is not None:
filename = caller_frame.f_code.co_filename
lineno = caller_frame.f_lineno
return SourceLocation(file=filename, line=lineno)
except AttributeError:
pass
return SourceLocation(file="<unknown>", line=0)
def _user_caller_location() -> SourceLocation:
"""Best-effort source location of the user's call site.
Walks up 2 frames: ``_user_caller_location`` -> internal wrapper
(e.g. ``ai_context``) -> user code. Reports the file/line where
the user called the public function.
"""
frame = inspect.currentframe()
try:
if frame is not None and frame.f_back is not None:
wrapper_frame = frame.f_back
if wrapper_frame.f_back is not None:
user_frame = wrapper_frame.f_back
filename = user_frame.f_code.co_filename
lineno = user_frame.f_lineno
return SourceLocation(file=filename, line=lineno)
# Fallback: report the wrapper itself
filename = wrapper_frame.f_code.co_filename
lineno = wrapper_frame.f_lineno
return SourceLocation(file=filename, line=lineno)
except AttributeError:
pass
return SourceLocation(file="<unknown>", line=0)
def _resolve_ref_string(
ref: EntityRef
| DimensionRef
| TimeDimensionRef
| MetricRef
| MeasureRef
| RelationshipRef
| DatasourceRef
| str,
) -> str:
"""Extract id string from a ref object or pass through a string."""
if isinstance(ref, str):
return ref
return ref.id
def _resolve_datasource_ref(ref: DatasourceRef | str) -> str:
"""Extract global datasource short name from a datasource ref or string."""
if isinstance(ref, str):
return ref
if isinstance(ref, DatasourceRef):
return ref.id
_raise(
ErrorKind.INVALID_REF,
"ms.entity(datasource=...) accepts a datasource ref or global datasource name string.",
cls=SemanticDecoratorError,
constraint_id=ConstraintId.REF_SHAPE,
)
def _resolve_dimension_refs(refs: list[DimensionRef | str]) -> tuple[str, ...]:
"""Convert a list of dimension refs/strings to tuple of semantic_ids."""
return tuple(_resolve_ref_string(r) for r in refs)
def _resolve_entity_refs(refs: list[EntityRef | str] | None) -> tuple[str, ...]:
"""Convert a list of entity refs/strings to tuple of semantic_ids."""
if refs is None:
return ()
return tuple(_resolve_ref_string(r) for r in refs)
def _push_ir(ctx: LoaderContext, ir: Any, callable_: Callable[..., Any] | None) -> None:
"""Push an (IR, callable) pair onto ctx.pending_objects."""
ctx.pending_objects.append((ir, callable_))
# ---------------------------------------------------------------------------
# Top-level calls
# ---------------------------------------------------------------------------
[docs]
def domain(
*,
name: str,
default: bool = True,
ai_context: AiContextValue | None = None,
) -> DomainRef:
"""Declare a semantic domain namespace inside a project file.
A domain groups entities, dimensions, metrics, and relationships under a single
qualified name (``<domain>.<object>``). Must be called at module top-level
inside a ``models/semantic/<model>/*.py`` project file.
Args:
name: Domain namespace, e.g. ``"sales"``.
default: If True, subsequent decorators in this file resolve to this
domain when no explicit ``domain=`` kwarg is passed.
ai_context: Optional ``AiContextValue`` from ``ms.ai_context(...)`` with extra
agent-facing hints.
Returns:
A ``DomainRef`` that can be passed as the ``domain=`` kwarg to other
decorators to override the default domain context.
Raises:
OutsideLoaderContextError: Called outside a semantic loader pass.
SemanticDecoratorError: ``name`` collides with another domain in the project.
Example:
>>> import marivo.semantic as ms
>>> sales = ms.domain(name="sales", default=True)
"""
ctx = _require_ctx()
ai_ctx = _build_ai_context(ai_context)
location = _caller_location()
ir = DomainIR(
name=name,
default=default,
ai_context=ai_ctx,
location=location,
)
_push_ir(ctx, ir, None)
if default:
ctx.default_domain = name
return DomainRef(semantic_id=name)
[docs]
def aggregate(
*,
name: str,
measure: MeasureRef | str,
agg: AggKind,
fold: str | tuple[Literal["quantile"], float] | None = None,
unit: str | None = None,
domain: DomainRef | None = None,
ai_context: AiContextValue | None = None,
) -> MetricRef:
"""Declare a tier-1 simple metric: an aggregation over a measure.
The metric inherits its additivity nature from ``measure`` (resolved at load);
``fold`` overrides the time-fold for semi-additive measures only. No function body.
Args:
name: Metric name (required).
measure: Measure to aggregate (``MeasureRef`` or qualified string).
agg: Aggregation kind (``"sum"``, ``"mean"``, ``"count"``, etc.).
fold: Time-fold override for semi-additive measures.
unit: Override the unit derived from ``measure`` at load. Leave None to
inherit the measure's unit (count/count_distinct derive nothing).
domain: Override the active domain.
ai_context: Optional ``AiContextValue`` from ``ms.ai_context(...)`` with extra agent-facing hints.
Example:
>>> revenue = ms.aggregate(name="revenue", measure=amount, agg="sum")
>>> average_inventory = ms.aggregate(name="avg_inv", measure=quantity, agg="sum", fold="avg")
"""
ctx = _require_ctx()
resolved_domain = _resolve_domain(domain, ctx)
measure_id = _resolve_ref_string(measure)
# measure ids are entity-qualified: "<domain>.<entity>.<column>"
entity_id = measure_id.rsplit(".", 1)[0]
obj_name = name
semantic_id = f"{resolved_domain}.{obj_name}"
_check_duplicate(ctx, semantic_id, MetricIR)
_validate_unit(unit, semantic_id)
fold_ir = _normalize_time_fold(fold, semantic_id=semantic_id) if fold is not None else None
ai_ctx = _build_ai_context(ai_context)
location = _caller_location()
metric_ir = MetricIR(
semantic_id=semantic_id,
domain=resolved_domain,
name=obj_name,
metric_type="simple",
entities=(entity_id,),
aggregation=agg,
measure=measure_id,
composition=None,
additivity=None, # resolved at load: downgrade(measure.additivity, agg) + fold override
provenance=None,
ai_context=ai_ctx,
body_ast_hash=_compute_agg_hash(measure_id, agg, fold_ir),
python_symbol=obj_name,
location=location,
root_entity=entity_id,
fold_override=fold_ir,
unit=unit,
)
_push_ir(ctx, metric_ir, None)
return MetricRef(semantic_id)
[docs]
def metric(
*,
name: str | None = None,
entities: list[EntityRef | str],
additivity: Additivity,
root_entity: EntityRef | str | None = None,
fanout_policy: Literal["block", "aggregate_then_join"] = "block",
unit: str | None = None,
provenance: SqlProvenance | None = None,
domain: DomainRef | None = None,
ai_context: AiContextValue | None = None,
) -> Callable[[Callable[..., Any]], MetricRef]:
"""Declare a metric from an ibis body. Declares ``additivity`` directly.
Args:
name: Metric name. Defaults to the function name.
entities: List of entity refs or qualified strings.
additivity: ``"additive"``, ``"non_additive"``, or ``ms.semi_additive(over, fold)``.
root_entity: Required when more than one entity is provided.
fanout_policy: ``"block"`` (default) or ``"aggregate_then_join"``.
unit: UCUM unit token.
provenance: Optional ``SqlProvenance`` from ``ms.from_sql(sql=..., dialect=...)``.
domain: Override the active domain namespace.
ai_context: Optional ``AiContextValue`` from ``ms.ai_context(...)`` with extra agent-facing hints.
Returns:
A decorator that returns a ``MetricRef``.
Example:
>>> @ms.metric(entities=[orders], additivity="additive")
... def gmv(orders):
... return (orders.price * orders.qty).sum()
"""
ctx = _require_ctx()
resolved_domain = _resolve_domain(domain, ctx)
def decorator(fn: Callable[..., Any]) -> MetricRef:
obj_name = name or fn.__name__
semantic_id = f"{resolved_domain}.{obj_name}"
_check_duplicate(ctx, semantic_id, MetricIR)
_validate_unit(unit, semantic_id)
entity_refs = _resolve_entity_refs(entities)
if len(entity_refs) == 0:
_raise(
ErrorKind.MISSING_ENTITIES,
"@ms.metric(...) requires non-empty entities.",
refs=(semantic_id,),
cls=SemanticDecoratorError,
constraint_id=ConstraintId.METRIC_ENTITIES_REQUIRED,
)
body_hash = validate_metric_body_ast(fn, "base", body_kind="metric")
ai_ctx = _build_ai_context(ai_context)
location = _caller_location()
root_ref = _resolve_ref_string(root_entity) if root_entity is not None else None
if root_ref is None and len(entity_refs) == 1:
root_ref = entity_refs[0]
if root_ref is None:
_raise(
ErrorKind.MISSING_METRIC_ROOT_ENTITY,
"@ms.metric(...) with more than one entity requires root_entity=...",
refs=(semantic_id,),
cls=SemanticDecoratorError,
constraint_id=ConstraintId.METRIC_ROOT_ENTITY_REQUIRED,
)
metric_ir = MetricIR(
semantic_id=semantic_id,
domain=resolved_domain,
name=obj_name,
metric_type="simple",
entities=entity_refs,
aggregation=None,
measure=None,
composition=None,
additivity=_normalize_additivity(additivity, semantic_id=semantic_id),
provenance=provenance,
ai_context=ai_ctx,
body_ast_hash=body_hash,
python_symbol=fn.__name__,
location=location,
root_entity=root_ref,
fanout_policy=fanout_policy,
unit=unit,
)
_push_ir(ctx, metric_ir, fn)
return MetricRef(semantic_id)
return decorator
# ---------------------------------------------------------------------------
# Decorators
# ---------------------------------------------------------------------------
[docs]
def table(name: str, /, *, database: str | tuple[str, ...] | None = None) -> TableSourceIR:
"""Build a structured table source for ``ms.entity(source=...)``."""
return _datasource_table(name, database=database)
[docs]
def parquet(
path: str,
/,
*,
hive_partitioning: bool = False,
columns: tuple[str, ...] | list[str] | None = None,
) -> ParquetSourceIR:
"""Build a structured parquet source for ``ms.entity(source=...)``."""
return _datasource_parquet(path, hive_partitioning=hive_partitioning, columns=columns)
[docs]
def csv(
path: str,
/,
*,
header: bool = True,
delimiter: str = ",",
columns: tuple[str, ...] | list[str] | None = None,
) -> CsvSourceIR:
"""Build a structured CSV source for ``ms.entity(source=...)``."""
return _datasource_csv(path, header=header, delimiter=delimiter, columns=columns)
[docs]
def entity(
*,
name: str,
datasource: DatasourceRef | str,
source: EntitySourceIR,
primary_key: list[str] | None = None,
versioning: SnapshotVersioningIR | ValidityVersioningIR | None = None,
domain: DomainRef | None = None,
ai_context: AiContextValue | None = None,
) -> EntityRef:
"""Declare an entity over a structured physical source.
Args:
name: Entity name.
datasource: Datasource ref returned by ``md.ref(...)`` or a global
datasource name string declared in ``models/datasources/*.py``.
source: Structured physical source, usually ``ms.table(...)`` or
``ms.parquet(...)``, or ``ms.csv(...)``.
primary_key: Optional list of column names forming the primary key.
domain: Override the active domain namespace with a ``DomainRef`` returned
by ``ms.domain(...)``. Defaults to the file's default domain.
ai_context: Optional ``AiContextValue`` from ``ms.ai_context(...)`` with extra agent-facing hints.
Returns:
An ``EntityRef`` usable by ``@ms.dimension`` and ``@ms.metric``.
Raises:
SemanticDecoratorError: ``datasource`` is not a datasource ref or string, ``name``
collides with another object, or ``source`` is not an entity source.
Example:
>>> orders = ms.entity(
... name="orders",
... datasource="warehouse",
... source=ms.table("orders", database="sales_mart"),
... )
"""
ctx = _require_ctx()
resolved_domain = _resolve_domain(domain, ctx)
semantic_id = f"{resolved_domain}.{name}"
_check_duplicate(ctx, semantic_id, EntityIR)
if not isinstance(source, (TableSourceIR, ParquetSourceIR, CsvSourceIR)):
_raise(
ErrorKind.INVALID_REF,
"ms.entity(source=...) accepts ms.table(...), ms.parquet(...), or ms.csv(...).",
cls=SemanticDecoratorError,
refs=(semantic_id,),
constraint_id=ConstraintId.REF_SHAPE,
)
ds_ref = _resolve_datasource_ref(datasource)
pk = tuple(primary_key) if primary_key else ()
ai_ctx = _build_ai_context(ai_context)
location = _caller_location()
ir = EntityIR(
semantic_id=semantic_id,
domain=resolved_domain,
name=name,
datasource=ds_ref,
source=source,
primary_key=pk,
ai_context=ai_ctx,
python_symbol=name,
location=location,
versioning=versioning,
)
_push_ir(ctx, ir, None)
return EntityRef(semantic_id)
[docs]
def dimension(
*,
name: str | None = None,
entity: EntityRef | str,
domain: DomainRef | None = None,
ai_context: AiContextValue | None = None,
) -> Callable[[Callable[..., Any]], DimensionRef]:
"""Declare a categorical dimension whose body returns an ibis expression over its entity.
The decorated function takes the entity table and returns a single
expression (single-return AST). Use this for both raw columns and derived
expressions (e.g. ``table.region``).
For quantitative measures, use ``@ms.measure(entity=..., additivity=...)``
instead.
Args:
name: Dimension name. Defaults to the function name.
entity: Owning entity, either an ``EntityRef`` or a qualified
``"<domain>.<entity>"`` string.
domain: Override the active domain namespace with a ``DomainRef`` returned
by ``ms.domain(...)``. Defaults to the file's default domain.
ai_context: Optional ``AiContextValue`` from ``ms.ai_context(...)`` with extra agent-facing hints.
Returns:
A decorator that returns a ``DimensionRef``.
Raises:
SemanticDecoratorError: ``entity`` is unknown, ``name`` collides, or the
body violates the AST whitelist.
Example:
>>> @ms.dimension(entity=orders)
... def region(orders_table):
... return orders_table.region
"""
ctx = _require_ctx()
resolved_domain = _resolve_domain(domain, ctx)
def decorator(fn: Callable[..., Any]) -> DimensionRef:
obj_name = name or fn.__name__
entity_ref = _resolve_ref_string(entity)
semantic_id = f"{entity_ref}.{obj_name}"
entity_domain = entity_ref.split(".", 1)[0]
if entity_domain != resolved_domain:
_raise(
ErrorKind.INVALID_REF,
f"Dimension {semantic_id!r} belongs to entity in domain {entity_domain!r}, "
f"but the active domain is {resolved_domain!r}.",
cls=SemanticDecoratorError,
refs=(semantic_id,),
constraint_id=ConstraintId.REF_SHAPE,
)
_check_duplicate(ctx, semantic_id, DimensionIR)
validate_metric_body_ast(fn, "base", body_kind="dimension")
ai_ctx = _build_ai_context(ai_context)
location = _caller_location()
ir = DimensionIR(
semantic_id=semantic_id,
domain=resolved_domain,
entity=entity_ref,
name=obj_name,
ai_context=ai_ctx,
is_time_dimension=False,
kind=DimensionKind.CATEGORICAL,
python_symbol=fn.__name__,
location=location,
)
_push_ir(ctx, ir, fn)
ref = DimensionRef(semantic_id)
ctx.pending_refs.append(ref)
return ref
return decorator
[docs]
def measure(
*,
name: str | None = None,
entity: EntityRef | str,
additivity: Additivity,
unit: str | None = None,
domain: DomainRef | None = None,
ai_context: AiContextValue | None = None,
) -> Callable[[Callable[..., Any]], MeasureRef]:
"""Declare a row-level quantitative measure whose expression can be aggregated.
Measures represent quantitative facts (e.g. amount, quantity) that can be
aggregated using ``ms.aggregate()``. The decorated function takes the entity
table and returns a single ibis expression.
Args:
name: Measure name. Defaults to the function name.
entity: Owning entity, either an ``EntityRef`` or a qualified
``"<domain>.<entity>"`` string.
additivity: Whether the measure is ``"additive"``, ``"non_additive"``,
or ``ms.semi_additive(over=..., fold=...)``.
unit: UCUM unit token (e.g. ``"USD"``, ``"CNY"``, ``"%"``).
domain: Override the active domain namespace with a ``DomainRef`` returned
by ``ms.domain(...)``. Defaults to the file's default domain.
ai_context: Optional ``AiContextValue`` from ``ms.ai_context(...)`` with extra agent-facing hints.
Returns:
A decorator that returns a ``MeasureRef``.
Raises:
SemanticDecoratorError: ``entity`` is unknown, ``name`` collides, or the
body violates the AST whitelist.
Example:
>>> @ms.measure(entity=orders, additivity="additive", unit="USD")
... def amount(orders_table):
... return orders_table.amount
"""
ctx = _require_ctx()
resolved_domain = _resolve_domain(domain, ctx)
def decorator(fn: Callable[..., Any]) -> MeasureRef:
obj_name = name or fn.__name__
entity_ref = _resolve_ref_string(entity)
semantic_id = f"{entity_ref}.{obj_name}"
entity_domain = entity_ref.split(".", 1)[0]
if entity_domain != resolved_domain:
_raise(
ErrorKind.INVALID_REF,
f"Measure {semantic_id!r} belongs to entity in domain {entity_domain!r}, "
f"but the active domain is {resolved_domain!r}.",
cls=SemanticDecoratorError,
refs=(semantic_id,),
constraint_id=ConstraintId.REF_SHAPE,
)
_check_duplicate(ctx, semantic_id, MeasureIR)
_validate_unit(unit, semantic_id, "measure")
validate_metric_body_ast(fn, "base", body_kind="measure")
ai_ctx = _build_ai_context(ai_context)
location = _caller_location()
ir = MeasureIR(
semantic_id=semantic_id,
domain=resolved_domain,
entity=entity_ref,
name=obj_name,
ai_context=ai_ctx,
additivity=_normalize_additivity(additivity, semantic_id=semantic_id),
unit=unit,
python_symbol=fn.__name__,
location=location,
)
_push_ir(ctx, ir, fn)
ref = MeasureRef(semantic_id)
ctx.pending_refs.append(ref)
return ref
return decorator
def _validate_time_parse_granularity(
*,
semantic_id: str,
granularity: str,
parse: SemanticParse | None,
) -> None:
"""Validate that the parse variant is compatible with the declared granularity."""
if parse is None:
return
if isinstance(parse, HourPrefixParse) and granularity != "hour":
_raise(
ErrorKind.INVALID_REF,
f"time dimension {semantic_id!r}: ms.hour_prefix(...) requires granularity='hour'.",
refs=(semantic_id,),
cls=SemanticDecoratorError,
constraint_id=ConstraintId.TIME_GRANULARITY_PARSE_COMPATIBLE,
)
if isinstance(parse, DateParse) and granularity in {"hour", "minute", "second"}:
_raise(
ErrorKind.INVALID_REF,
f"time dimension {semantic_id!r}: granularity={granularity!r} requires ms.datetime(...) or ms.timestamp(...).",
refs=(semantic_id,),
cls=SemanticDecoratorError,
constraint_id=ConstraintId.TIME_GRANULARITY_PARSE_COMPATIBLE,
)
if isinstance(parse, HourPrefixParse) and granularity in {"minute", "second"}:
_raise(
ErrorKind.INVALID_REF,
f"time dimension {semantic_id!r}: granularity={granularity!r} requires ms.datetime(...) or ms.timestamp(...).",
refs=(semantic_id,),
cls=SemanticDecoratorError,
constraint_id=ConstraintId.TIME_GRANULARITY_PARSE_COMPATIBLE,
)
if isinstance(parse, StrptimeParse) and granularity in {"hour", "minute", "second"}:
import re as _re
fmt = parse.format or ""
date_directives = {"%Y", "%y", "%m", "%d", "%j", "%U", "%W"}
time_directives = {"%H", "%I", "%k", "%l", "%M", "%S", "%T", "%p"}
tokens = set(_re.findall(r"%[a-zA-Z]", fmt))
if not tokens & time_directives:
_raise(
ErrorKind.INVALID_REF,
f"time dimension {semantic_id!r}: granularity={granularity!r} requires a time-bearing "
f"format, but strptime format {fmt!r} has no hour/minute/second directives.",
refs=(semantic_id,),
cls=SemanticDecoratorError,
constraint_id=ConstraintId.TIME_GRANULARITY_PARSE_COMPATIBLE,
)
if parse.sample_interval is not None and not tokens & date_directives:
_raise(
ErrorKind.INVALID_SAMPLE_INTERVAL,
f"time dimension {semantic_id!r}: sampled strptime format {fmt!r} must include date context.",
refs=(semantic_id,),
cls=SemanticDecoratorError,
constraint_id=ConstraintId.SAMPLE_INTERVAL_VALID,
)
def _validate_sample_interval_granularity(
*,
semantic_id: str,
granularity: str,
parse: SemanticParse | None,
) -> None:
"""Validate that sample_interval unit is not coarser than the declared granularity."""
if parse is None:
return
sample_ir: SampleIntervalIR | None = None
data_type_str: str | None = None
if isinstance(parse, DatetimeParse):
sample_ir = parse.sample_interval
data_type_str = "datetime"
elif isinstance(parse, TimestampParse):
sample_ir = parse.sample_interval
data_type_str = "timestamp"
elif isinstance(parse, StrptimeParse):
sample_ir = parse.sample_interval
data_type_str = "strptime"
elif isinstance(parse, HourPrefixParse):
sample_ir = parse.sample_interval
data_type_str = "hour_prefix"
if sample_ir is None or data_type_str is None:
return
# Re-run the granularity check from _normalize_sample_interval with the real granularity
rank = {
"second": 0,
"minute": 1,
"hour": 2,
"day": 3,
"week": 4,
"month": 5,
"quarter": 6,
"year": 7,
}
if rank.get(granularity, 99) > rank.get(sample_ir.unit, 0):
allowed = [
g for g, r in sorted(rank.items(), key=lambda kv: kv[1]) if r <= rank[sample_ir.unit]
]
allowed_list = ", ".join(repr(g) for g in allowed)
_raise(
ErrorKind.INVALID_SAMPLE_INTERVAL,
f"time dimension {semantic_id!r}: physical granularity {granularity!r} cannot "
f"be coarser than sample_interval unit {sample_ir.unit!r}. Set granularity to {sample_ir.unit!r} or "
f"finer (one of: {allowed_list}).",
refs=(semantic_id,),
cls=SemanticDecoratorError,
constraint_id=ConstraintId.SAMPLE_INTERVAL_VALID,
)
[docs]
def time_dimension(
*,
name: str | None = None,
entity: EntityRef | str,
granularity: Literal["year", "quarter", "month", "week", "day", "hour", "minute", "second"],
parse: SemanticParse | None = None,
is_default: bool = False,
domain: DomainRef | None = None,
ai_context: AiContextValue | None = None,
) -> Callable[[Callable[..., Any]], TimeDimensionRef]:
"""Declare a time-aware dimension that carries grain and parsing metadata.
Time dimensions are the only dimensions usable as window axes by ``session.observe``.
The body may return any ibis expression that represents the intended time
axis. When ``parse`` is omitted, the parse variant is inferred from the
column type at analysis time. Use ``ms.datetime(...)``, ``ms.timestamp(...)``,
``ms.strptime(...)``, or ``ms.hour_prefix(...)`` to declare a parse variant
explicitly when you need timezone, sample_interval, or string/integer parsing.
Args:
name: Dimension name. Defaults to the function name.
entity: Owning entity (``EntityRef`` or qualified string).
granularity: ``year | quarter | month | week | day | hour | minute | second`` — the
finest grain at which queries are meaningful.
parse: Optional parse variant. Omit for native temporal columns (the parse
is inferred at analysis time). Use ``ms.datetime(timezone=...)``,
``ms.timestamp(timezone=...)``, ``ms.strptime(format)``, or
``ms.hour_prefix(prefix)`` when explicit configuration is needed.
is_default: Mark this dimension as the default time axis when multiple time dimensions
exist on the entity. At most one time dimension per entity may carry
is_default=True. When observe() is called without time_dimension=, the default
dimension is used automatically.
domain: Override the active domain namespace with a ``DomainRef`` returned
by ``ms.domain(...)``. Defaults to the file's default domain.
ai_context: Optional ``AiContextValue`` from ``ms.ai_context(...)`` with extra agent-facing hints.
Returns:
A decorator that returns a ``TimeDimensionRef``.
Raises:
SemanticDecoratorError: ``entity`` is unknown, ``name`` collides, the
body violates the AST whitelist, or the parse variant is incompatible
with the declared granularity.
Example:
>>> @ms.time_dimension(entity=orders, granularity="day")
... def created_at(orders):
... return orders.created_at
"""
ctx = _require_ctx()
resolved_domain = _resolve_domain(domain, ctx)
def decorator(fn: Callable[..., Any]) -> TimeDimensionRef:
obj_name = name or fn.__name__
ds_ref = _resolve_ref_string(entity)
semantic_id = f"{ds_ref}.{obj_name}"
ds_domain = ds_ref.split(".", 1)[0]
if ds_domain != resolved_domain:
_raise(
ErrorKind.INVALID_REF,
f"Time dimension {semantic_id!r} belongs to entity in domain {ds_domain!r}, "
f"but the active domain is {resolved_domain!r}.",
cls=SemanticDecoratorError,
refs=(semantic_id,),
constraint_id=ConstraintId.REF_SHAPE,
)
_check_duplicate(ctx, semantic_id, DimensionIR)
validate_metric_body_ast(fn, "base", body_kind="time_dimension")
ai_ctx = _build_ai_context(ai_context)
location = _caller_location()
_validate_time_parse_granularity(
semantic_id=semantic_id, granularity=granularity, parse=parse
)
_validate_sample_interval_granularity(
semantic_id=semantic_id, granularity=granularity, parse=parse
)
ir = DimensionIR(
semantic_id=semantic_id,
domain=resolved_domain,
entity=ds_ref,
name=obj_name,
ai_context=ai_ctx,
is_time_dimension=True,
kind=DimensionKind.TIME,
granularity=granularity,
parse=parse,
is_default=is_default,
python_symbol=fn.__name__,
location=location,
)
_push_ir(ctx, ir, fn)
ref = TimeDimensionRef(semantic_id)
ctx.pending_refs.append(ref)
return ref
return decorator
[docs]
def relationship(
*,
name: str,
from_entity: EntityRef | str,
to_entity: EntityRef | str,
keys: list[JoinKey],
domain: DomainRef | None = None,
ai_context: AiContextValue | None = None,
) -> RelationshipRef:
"""Declare a join relationship between two entities.
Top-level call (not a decorator). Used by the compiler to plan joins when a
metric or dimension references dimensions across related entities.
Args:
name: Required relationship name.
from_entity: Source entity (``EntityRef`` or qualified string).
to_entity: Target entity (``EntityRef`` or qualified string).
keys: List of ``ms.join_on(from_key, to_key)`` pairs.
domain: Override the active domain namespace with a ``DomainRef`` returned
by ``ms.domain(...)``. Defaults to the file's default domain.
ai_context: Optional ``AiContextValue`` from ``ms.ai_context(...)`` with extra agent-facing hints.
Returns:
A ``RelationshipRef``.
Raises:
SemanticDecoratorError: ``name`` is missing, the entities are unknown, or
``keys`` is empty.
Example:
>>> ms.relationship(
... name="orders_to_customers",
... from_entity=orders, to_entity=customers,
... keys=[ms.join_on(customer_id, id)],
... )
"""
ctx = _require_ctx()
resolved_domain = _resolve_domain(domain, ctx)
semantic_id = f"{resolved_domain}.{name}"
_check_duplicate(ctx, semantic_id, RelationshipIR)
from_ds = _resolve_ref_string(from_entity)
to_ds = _resolve_ref_string(to_entity)
ai_ctx = _build_ai_context(ai_context)
location = _caller_location()
resolved_keys: tuple[JoinKey, ...] = tuple(keys)
if not resolved_keys:
_raise(
ErrorKind.INVALID_REF,
"ms.relationship(keys=...) requires at least one ms.join_on(from_key, to_key) pair.",
cls=SemanticDecoratorError,
refs=(semantic_id,),
constraint_id=ConstraintId.REF_SHAPE,
)
ir = RelationshipIR(
semantic_id=semantic_id,
domain=resolved_domain,
name=name,
from_entity=from_ds,
to_entity=to_ds,
keys=resolved_keys,
ai_context=ai_ctx,
location=location,
)
_push_ir(ctx, ir, None)
return RelationshipRef(semantic_id)
# ---------------------------------------------------------------------------
# Builder functions
# ---------------------------------------------------------------------------
[docs]
def snapshot(
*,
partition_field: DimensionRef | TimeDimensionRef | str,
grain: Literal["day"],
timezone: str | None = None,
format: str | None = None,
) -> SnapshotVersioningIR:
"""Declare daily snapshot partition versioning for an entity."""
if isinstance(partition_field, (DimensionRef, TimeDimensionRef)):
partition_ref = partition_field.id
else:
partition_ref = partition_field
if grain != "day":
_raise(
ErrorKind.INVALID_REF,
"snapshot versioning currently supports only grain='day'.",
cls=SemanticDecoratorError,
)
if timezone is not None:
try:
ZoneInfo(timezone)
except ZoneInfoNotFoundError:
_raise(
ErrorKind.INVALID_REF,
f"timezone {timezone!r} is not a valid IANA timezone name.",
cls=SemanticDecoratorError,
)
return SnapshotVersioningIR(
kind="snapshot",
partition_field=partition_ref,
grain="day",
timezone=timezone,
format=format,
)
[docs]
def validity(
*,
valid_from: DimensionRef | str,
valid_to: DimensionRef | str,
interval: Literal["closed_open", "closed_closed"],
open_end: tuple[str | None, ...],
timezone: str | None = None,
) -> ValidityVersioningIR:
"""Declare SCD2 validity interval versioning for an entity.
Args:
valid_from: Dimension semantic id (or DimensionRef) for the interval start column.
valid_to: Dimension semantic id (or DimensionRef) for the interval end column.
interval: ``"closed_open"`` (``[valid_from, valid_to)``) or
``"closed_closed"`` (``[valid_from, valid_to]``).
open_end: Non-empty tuple of sentinel values that mean "still current"
in the ``valid_to`` column. Use ``None`` for SQL NULL, or a string
sentinel such as ``"9999-12-31"``.
timezone: Optional IANA timezone name for anchor date casting.
Returns:
A ``ValidityVersioningIR`` for use in ``ms.entity(versioning=...)``.
Raises:
SemanticDecoratorError: ``interval`` is not one of the two allowed values,
``open_end`` is empty, or ``timezone`` is not a valid IANA name.
"""
if interval not in ("closed_open", "closed_closed"):
_raise(
ErrorKind.INVALID_ENTITY_VERSIONING,
f"validity versioning interval must be 'closed_open' or 'closed_closed', "
f"got {interval!r}.",
cls=SemanticDecoratorError,
details={"field": "interval", "reason": f"unsupported interval value {interval!r}"},
)
if not open_end:
_raise(
ErrorKind.INVALID_ENTITY_VERSIONING,
"validity versioning open_end must be a non-empty tuple.",
cls=SemanticDecoratorError,
details={"field": "open_end", "reason": "empty tuple is not allowed"},
)
if timezone is not None:
try:
ZoneInfo(timezone)
except ZoneInfoNotFoundError:
_raise(
ErrorKind.INVALID_ENTITY_VERSIONING,
f"timezone {timezone!r} is not a valid IANA timezone name.",
cls=SemanticDecoratorError,
details={"field": "timezone", "reason": f"unknown IANA timezone {timezone!r}"},
)
valid_from_ref = (
valid_from.id if isinstance(valid_from, (DimensionRef, TimeDimensionRef)) else valid_from
)
valid_to_ref = (
valid_to.id if isinstance(valid_to, (DimensionRef, TimeDimensionRef)) else valid_to
)
return ValidityVersioningIR(
kind="validity",
valid_from=valid_from_ref,
valid_to=valid_to_ref,
interval=interval,
open_end=open_end,
timezone=timezone,
)
[docs]
def semi_additive(
*,
over: TimeDimensionRef,
fold: str | tuple[Literal["quantile"], float],
) -> SemiAdditive:
"""Declare a semi-additive nature: additive off the ``over`` time axis, folded by ``fold``.
``over`` must be a ``TimeDimensionRef`` returned by ``@ms.time_dimension``.
Use as the ``additivity=`` value on a measure or a metric::
@ms.measure(entity=inventory,
additivity=ms.semi_additive(over=snapshot_date, fold="last"))
def quantity(inventory):
return inventory.qty
"""
if not isinstance(over, TimeDimensionRef):
received = getattr(over, "id", over)
_raise(
ErrorKind.INVALID_REF,
"ms.semi_additive(...) over must be a TimeDimensionRef returned by "
f"@ms.time_dimension(...); got {type(over).__name__}: {received!r}.",
cls=SemanticDecoratorError,
constraint_id=ConstraintId.REF_SHAPE,
)
over_id = over.id
fold_ir = _normalize_time_fold(fold, semantic_id=over_id)
if fold_ir is None:
_raise(
ErrorKind.INVALID_REF,
"ms.semi_additive(...) requires a fold (e.g. 'last', 'max', ('quantile', 0.9)).",
cls=SemanticDecoratorError,
constraint_id=ConstraintId.REF_SHAPE,
)
return SemiAdditive(over=over_id, fold=fold_ir)
def _compute_composition_hash(composition: Composition) -> str:
if isinstance(composition, RatioComposition):
text = repr(("ratio", composition.numerator, composition.denominator))
elif isinstance(composition, WeightedAverageComposition):
text = repr(("weighted_average", composition.value, composition.weight))
else: # LinearComposition
text = repr(("linear", tuple((t.sign, t.metric) for t in composition.terms)))
return hashlib.sha256(text.encode()).hexdigest()[:16]
def _derived(
*,
name: str,
composition: Composition,
unit: str | None,
domain: DomainRef | None,
ai_context: AiContextValue | None,
) -> MetricRef:
ctx = _require_ctx()
resolved_domain = _resolve_domain(domain, ctx)
semantic_id = f"{resolved_domain}.{name}"
_check_duplicate(ctx, semantic_id, MetricIR)
_validate_unit(unit, semantic_id)
ai_ctx = _build_ai_context(ai_context)
location = _caller_location()
metric_ir = MetricIR(
semantic_id=semantic_id,
domain=resolved_domain,
name=name,
metric_type="derived",
entities=(),
aggregation=None,
measure=None,
composition=composition,
additivity=None, # propagated at load from components
provenance=None,
ai_context=ai_ctx,
body_ast_hash=_compute_composition_hash(composition),
python_symbol=name,
location=location,
unit=unit,
)
_push_ir(ctx, metric_ir, None)
return MetricRef(semantic_id)
[docs]
def ratio(
*,
name: str,
numerator: MetricRef | str,
denominator: MetricRef | str,
unit: str | None = None,
domain: DomainRef | None = None,
ai_context: AiContextValue | None = None,
) -> MetricRef:
"""Declare a derived ratio metric (no body). Override the unit derived from the components at load.
Example::
loss_rate = ms.ratio(name="loss_rate", numerator=lost, denominator=total, unit="1")
"""
return _derived(
name=name,
composition=RatioComposition(
numerator=_resolve_ref_string(numerator),
denominator=_resolve_ref_string(denominator),
),
unit=unit,
domain=domain,
ai_context=ai_context,
)
[docs]
def weighted_average(
*,
name: str,
value: MetricRef | str,
weight: MetricRef | str,
unit: str | None = None,
domain: DomainRef | None = None,
ai_context: AiContextValue | None = None,
) -> MetricRef:
"""Declare a derived weighted-average metric (no body). Override the unit derived from the components at load.
Roles are ``value`` / ``weight``."""
return _derived(
name=name,
composition=WeightedAverageComposition(
value=_resolve_ref_string(value),
weight=_resolve_ref_string(weight),
),
unit=unit,
domain=domain,
ai_context=ai_context,
)
[docs]
def linear(
*,
name: str,
add: list[MetricRef | str],
subtract: list[MetricRef | str] | tuple[MetricRef | str, ...] = (),
unit: str | None = None,
domain: DomainRef | None = None,
ai_context: AiContextValue | None = None,
) -> MetricRef:
"""Declare a derived linear metric (no body): sum of ``add`` minus ``subtract``. Override the unit derived from the components at load.
Example::
net_revenue = ms.linear(name="net_revenue", add=[gross], subtract=[refunds])
"""
terms = tuple(LinearTerm("+", _resolve_ref_string(m)) for m in add) + tuple(
LinearTerm("-", _resolve_ref_string(m)) for m in subtract
)
if len(terms) < 2:
_raise(
ErrorKind.INVALID_REF,
f"ms.linear(name={name!r}) requires at least two metric terms.",
refs=(name,),
cls=SemanticDecoratorError,
constraint_id=ConstraintId.REF_SHAPE,
)
return _derived(
name=name,
composition=LinearComposition(terms=terms),
unit=unit,
domain=domain,
ai_context=ai_context,
)
[docs]
def ref(id: str) -> str:
"""Reference a semantic object by qualified ``"<domain>.<object>"`` string.
Pass-through helper: it returns ``id`` unchanged but makes intent
explicit at the call site (``datasets=[ms.ref("sales.orders")]``).
"""
return id
[docs]
def from_sql(*, sql: str, dialect: str) -> SqlProvenance:
"""Declare SQL parity provenance for a Python metric body.
Use as the ``provenance=`` value on ``@ms.metric(...)``::
@ms.metric(entities=[orders], additivity="additive",
provenance=ms.from_sql(sql="select sum(amount) from orders", dialect="duckdb"))
def revenue(orders_table):
return orders_table.amount.sum()
"""
return SqlProvenance(sql=sql, dialect=dialect)
[docs]
def join_on(from_key: DimensionRef | str, to_key: DimensionRef | str, /) -> JoinKey:
"""Build one relationship key pair for ``ms.relationship(keys=[...])``.
Each call creates one (from_key, to_key) pairing. Pass a list of
``ms.join_on(...)`` calls to ``keys=``.
Example::
ms.relationship(
name="orders_to_customers",
from_entity=orders, to_entity=customers,
keys=[ms.join_on(customer_id, id)],
)
"""
return JoinKey(from_key=_resolve_ref_string(from_key), to_key=_resolve_ref_string(to_key))
# ---------------------------------------------------------------------------
# Time parse variant constructors
# ---------------------------------------------------------------------------
[docs]
def datetime(
*,
timezone: str | None = None,
sample_interval: tuple[int, Literal["minute", "hour"]] | None = None,
) -> DatetimeParse:
"""Declare an already-temporal datetime column parse.
Use as the ``parse=`` value on ``@ms.time_dimension(...)`` when the
source column is a native datetime type.
Args:
timezone: Optional IANA timezone name. When omitted, Marivo interprets
the naive source column in the datasource engine default timezone.
sample_interval: Optional periodic sampling interval for sampled time
dimensions, e.g. ``(5, "minute")`` or ``(1, "hour")``.
Returns:
A ``DatetimeParse`` value object.
Raises:
SemanticDecoratorError: ``timezone`` is not a valid IANA name.
Example:
>>> @ms.time_dimension(entity=events, granularity="minute",
... parse=ms.datetime(timezone="UTC"))
... def ts(events):
... return events.ts
"""
if timezone is not None:
_validate_timezone(timezone)
return DatetimeParse(
timezone=timezone,
sample_interval=_normalize_sample_interval_value(sample_interval),
)
[docs]
def timestamp(
*,
timezone: str | None = None,
sample_interval: tuple[int, Literal["minute", "hour"]] | None = None,
) -> TimestampParse:
"""Declare an already-temporal timestamp column parse.
Use as the ``parse=`` value on ``@ms.time_dimension(...)`` when the
source column is a native timestamp type.
Args:
timezone: Optional IANA timezone name. When omitted, Marivo interprets
the naive source column in the datasource engine default timezone.
sample_interval: Optional periodic sampling interval for sampled time
dimensions, e.g. ``(5, "minute")`` or ``(1, "hour")``.
Returns:
A ``TimestampParse`` value object.
Raises:
SemanticDecoratorError: ``timezone`` is not a valid IANA name.
Example:
>>> @ms.time_dimension(entity=events, granularity="second",
... parse=ms.timestamp(timezone="UTC"))
... def ts(events):
... return events.ts
"""
if timezone is not None:
_validate_timezone(timezone)
return TimestampParse(
timezone=timezone,
sample_interval=_normalize_sample_interval_value(sample_interval),
)
[docs]
def strptime(
format: str,
/,
*,
timezone: str | None = None,
sample_interval: tuple[int, Literal["minute", "hour"]] | None = None,
) -> StrptimeParse:
"""Declare a string/integer strptime parse.
Use as the ``parse=`` value on ``@ms.time_dimension(...)`` when the
source column is a string or integer that must be parsed with a Python
strptime format. The physical column type (string or integer) is inferred
from the ibis expression at analysis time.
Args:
format: Canonical Python strptime format string (e.g. ``"%Y%m%d"``,
``"%Y-%m-%d %H:%M:%S"``). Must be ``%``-prefixed.
timezone: Optional IANA timezone for time-bearing formats.
sample_interval: Optional periodic sampling interval for sampled time
dimensions, e.g. ``(5, "minute")`` or ``(1, "hour")``.
Returns:
A ``StrptimeParse`` value object.
Raises:
SemanticDecoratorError: ``format`` is not a valid strptime format, or
``timezone`` is not a valid IANA name.
Example:
>>> @ms.time_dimension(entity=orders, granularity="day",
... parse=ms.strptime("%Y%m%d"))
... def dt(orders):
... return orders.dt
"""
normalized = normalize_strptime(format)
if timezone is not None:
_validate_timezone(timezone)
if not is_time_bearing_format(normalized):
_raise(
ErrorKind.INVALID_REF,
"timezone is only supported for time-bearing strptime formats, not date-only formats.",
cls=SemanticDecoratorError,
details={"field": "timezone", "format": normalized},
)
return StrptimeParse(
format=normalized,
timezone=timezone,
sample_interval=_normalize_sample_interval_value(
sample_interval,
),
)
[docs]
def hour_prefix(
prefix: str,
/,
*,
sample_interval: tuple[int, Literal["minute", "hour"]] | None = None,
) -> HourPrefixParse:
"""Declare an hour-only partition parse using a day prefix column.
Use as the ``parse=`` value on ``@ms.time_dimension(...)`` when the
source column encodes only the hour component (e.g. ``"01"``, ``"23"``)
and must be combined with a day-level time dimension prefix. The physical
column type (string or integer) is inferred from the ibis expression at
analysis time.
Args:
prefix: The semantic id of a day-level time dimension that supplies
the date context for this hour column.
sample_interval: Optional ``(count, unit)`` declaring the periodic
sampling cadence (e.g. ``(1, "hour")`` for hourly samples).
When set, the time dimension can serve as a sampled-fold axis.
Returns:
An ``HourPrefixParse`` value object.
Example:
>>> @ms.time_dimension(entity=logs, granularity="hour",
... parse=ms.hour_prefix("ops.logs.dt"))
... def hh(logs):
... return logs.hh
>>> @ms.time_dimension(entity=logs, granularity="hour",
... parse=ms.hour_prefix("ops.logs.dt",
... sample_interval=(1, "hour")))
... def hh(logs):
... return logs.hh
"""
return HourPrefixParse(
prefix=prefix,
sample_interval=_normalize_sample_interval_value(
sample_interval,
),
)