"""Session class and session-local summaries."""
from __future__ import annotations
import json
from collections.abc import Callable, Iterator
from dataclasses import dataclass
from datetime import datetime, tzinfo
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, cast
from marivo.analysis.session._layout import PersistenceLayout, read_job_record
from marivo.analysis.timezone import resolve_system_timezone
from marivo.render import format_bounded_card, result_repr
if TYPE_CHECKING:
import pandas as pd
from marivo.analysis.escape_hatch import DimensionAnchorInput
from marivo.analysis.evidence import (
Assessment,
EvidenceTrace,
Finding,
Proposition,
SessionKnowledge,
)
from marivo.analysis.evidence.store import JudgmentStore
from marivo.analysis.frames.association import AssociationResult
from marivo.analysis.frames.attribution import AttributionFrame
from marivo.analysis.frames.base import BaseFrame
from marivo.analysis.frames.candidate import CandidateSet
from marivo.analysis.frames.delta import DeltaFrame
from marivo.analysis.frames.exploration import ExplorationResult
from marivo.analysis.frames.forecast import ForecastFrame
from marivo.analysis.frames.hypothesis import HypothesisTestResult
from marivo.analysis.frames.metric import MetricFrame
from marivo.analysis.frames.quality import QualityReport
from marivo.analysis.intents._shape import SemanticShape
from marivo.analysis.intents._types import SliceValue
from marivo.analysis.intents.transform import NormalizeKind
from marivo.analysis.policies import AlignmentPolicy, PromotionPolicy, SamplingPolicy
from marivo.analysis.publish.publish_targets import PublishTarget
from marivo.analysis.publish.report_models import (
MarivoReportArtifact,
ReportPackageValidationResult,
)
from marivo.analysis.publish.report_publish import PublishReportResult
from marivo.analysis.refs import ArtifactRef
from marivo.analysis.semantic_inputs import DimensionInput, MetricInput
from marivo.analysis.session._store import SessionStore
from marivo.analysis.windows.spec import GrainInput, TimeScopeInput
from marivo.semantic.catalog import SemanticCatalog
SemanticKind = Literal["scalar", "time_series", "segmented", "panel"]
[docs]
@dataclass(frozen=True, repr=False)
class JobSummary:
id: str
intent: str
status: str
started_at: str
duration_ms: int
output_frame_ref: str | None
def _repr_identity(self) -> str:
return f"JobSummary id={self.id} intent={self.intent} status={self.status}"
def render(self) -> str:
return format_bounded_card(
identity=self._repr_identity(),
status=f"duration={self.duration_ms}ms frame={self.output_frame_ref}",
available=(".render()", ".show()"),
)
def __repr__(self) -> str:
return result_repr(self._repr_identity())
def show(self) -> None:
print(self.render())
[docs]
@dataclass(frozen=True, repr=False)
class FrameSummaryEntry:
ref: str
kind: str
metric_id: str | None
semantic_kind: str | None
semantic_model: str | None
created_at: str | None
row_count: int | None = None
def _repr_identity(self) -> str:
parts = f"FrameSummaryEntry ref={self.ref} kind={self.kind}"
if self.metric_id:
parts += f" metric={self.metric_id}"
return parts
def render(self) -> str:
return format_bounded_card(
identity=self._repr_identity(),
status=f"metric={self.metric_id} created={self.created_at}",
available=(".render()", ".show()"),
)
def __repr__(self) -> str:
return result_repr(self._repr_identity())
def show(self) -> None:
print(self.render())
[docs]
@dataclass(frozen=True)
class ReportRegistration:
"""Immutable result of persisting a report package under a session.
Attributes:
report_id: The report identifier (generated or user-supplied).
package_dir: Absolute path to the on-disk report package directory.
entrypoint: The primary entrypoint file name within the package.
content_hash: Deterministic ``sha256:`` hash of the package contents.
"""
report_id: str
package_dir: Path
entrypoint: str
content_hash: str
[docs]
class Session:
__slots__ = (
"_calendars",
"_catalog",
"_connection_runtime",
"_created_at",
"_cwd",
"_default_calendar",
"_id",
"_judgment_store",
"_judgment_store_unavailable",
"_layout",
"_name",
"_project_root",
"_question",
"_report_tz_name",
"_report_tz_resolution",
"_report_tz_warning",
"_store",
"_tz",
"_updated_at",
)
def __init__(
self,
id: str,
name: str,
question: str | None,
cwd: Path,
project_root: Path,
created_at: datetime,
updated_at: datetime,
connection_runtime: Any,
layout: PersistenceLayout,
semantic_catalog: SemanticCatalog,
store: SessionStore,
report_tz: tzinfo | None = None,
report_tz_name: str | None = None,
report_tz_resolution: str | None = None,
report_tz_warning: str | None = None,
default_calendar: str | None = None,
calendars: Any = None,
judgment_store: JudgmentStore | None = None,
judgment_store_unavailable: bool = False,
) -> None:
self._id = id
self._name = name
self._question = question
self._cwd = cwd
self._project_root = project_root
self._created_at = created_at
self._updated_at = updated_at
self._connection_runtime = connection_runtime
self._layout = layout
self._catalog = semantic_catalog
self._store = store
if report_tz is not None:
self._tz = report_tz
self._report_tz_name = report_tz_name if report_tz_name is not None else str(report_tz)
self._report_tz_resolution = (
report_tz_resolution if report_tz_resolution is not None else "iana"
)
self._report_tz_warning = report_tz_warning
else:
resolved_report_tz = resolve_system_timezone()
self._tz = resolved_report_tz.tz
self._report_tz_name = (
report_tz_name if report_tz_name is not None else resolved_report_tz.name
)
self._report_tz_resolution = (
report_tz_resolution
if report_tz_resolution is not None
else resolved_report_tz.resolution
)
self._report_tz_warning = (
report_tz_warning if report_tz_warning is not None else resolved_report_tz.warning
)
self._default_calendar = default_calendar
self._calendars = calendars
self._judgment_store = judgment_store
self._judgment_store_unavailable = judgment_store_unavailable
if self._calendars is None:
from marivo.analysis.calendar.loader import CalendarCache
self._calendars = CalendarCache(self._project_root)
def __repr__(self) -> str:
return f"Session(name={self._name!r}, id={self._id!r})"
def __dir__(self) -> list[str]:
return sorted(
name
for name in super().__dir__()
if not (name.startswith("_") and not name.startswith("__"))
)
# -- Public identity properties (read-only) --
@property
def id(self) -> str:
return self._id
@property
def name(self) -> str:
return self._name
@property
def question(self) -> str | None:
return self._question
@property
def cwd(self) -> Path:
return self._cwd
@property
def project_root(self) -> Path:
return self._project_root
@property
def catalog(self) -> SemanticCatalog:
"""Return the session semantic catalog."""
return self._catalog
@property
def created_at(self) -> datetime:
return self._created_at
@property
def updated_at(self) -> datetime:
return self._updated_at
@property
def tz(self) -> tzinfo:
return self._tz
@property
def report_tz(self) -> tzinfo:
return self._tz
@property
def report_tz_name(self) -> str:
return self._report_tz_name
@property
def report_tz_resolution(self) -> str:
return self._report_tz_resolution
@property
def report_tz_warning(self) -> str | None:
return self._report_tz_warning
@property
def default_calendar(self) -> str | None:
return self._default_calendar
@property
def is_read_only(self) -> bool:
"""Whether this session can execute queries against datasources.
Returns ``True`` when no datasource resolution path is configured,
meaning the session can read persisted artifacts but cannot run new
analysis that touches a datasource.
"""
service = getattr(self._connection_runtime, "service", None)
if service is None:
return False
has_overrides = bool(getattr(service, "_backend_overrides", {}))
has_factory = getattr(service, "_backend_factory", None) is not None
uses_datasources = bool(getattr(service, "_use_datasources", False))
return not (has_overrides or has_factory or uses_datasources)
[docs]
def jobs(self) -> list[JobSummary]:
"""Return lightweight summaries for every recorded job, oldest first.
Each entry is a :class:`JobSummary` (id, intent, status, timing, output
frame ref). For the full record of a single job, use :meth:`job`.
"""
summaries: list[JobSummary] = []
for row in self._store.list_jobs(self.id):
job_id = row["job_id"]
record = read_job_record(self._layout, job_id)
summaries.append(
JobSummary(
id=record["id"],
intent=record["intent"],
status=record["status"],
started_at=record["started_at"],
duration_ms=record["duration_ms"],
output_frame_ref=record.get("output_frame_ref"),
)
)
summaries.sort(key=lambda item: (item.started_at, item.id))
return summaries
[docs]
def recent_jobs(self, limit: int = 5) -> list[JobSummary]:
"""Return the most recent ``limit`` job summaries, oldest first.
A non-positive ``limit`` returns an empty list.
"""
if limit <= 0:
return []
return self.jobs()[-limit:]
[docs]
def job(self, job_id: str) -> dict[str, Any]:
"""Return the full record for a single job as a dict.
Unlike :meth:`jobs`, which returns lightweight :class:`JobSummary`
objects, this returns the complete persisted record including fields
such as ``params``. Raises if no job with ``job_id`` exists.
"""
from marivo.analysis.errors import JobNotFoundError
row = self._store.get_job(self.id, job_id)
if row is None:
raise JobNotFoundError(
message=f"no job '{job_id}' in session {self.id!r}",
details={"session_id": self.id, "job_id": job_id},
)
return read_job_record(self._layout, job_id)
[docs]
def get_frame(self, ref: str) -> BaseFrame:
"""Load a persisted frame by ref or artifact_id.
Reconstructs a live frame object from the on-disk parquet and
meta.json. The returned frame is fully functional and can be
passed to any intent (compare, decompose, etc.).
Args:
ref: The frame ref string. After observe() or compare()
returns, ``frame.ref`` equals the deterministic
artifact_id, so ``session.get_frame(prev_frame.ref)``
works across script boundaries.
Raises:
FrameRefNotFound: No frame with this ref exists in this session.
CrossSessionFrameError: The frame belongs to a different session.
FrameCacheCorruptedError: The frame data is on disk but unreadable.
"""
from marivo.analysis.session._load import load_frame
return load_frame(ref, session=self)
[docs]
def frame_summaries(self) -> list[FrameSummaryEntry]:
"""Return rich metadata for each persisted frame in this session.
Unlike :meth:`frames` which returns lightweight (ref, kind) pairs,
this method includes metric_id, semantic_kind, and other fields
needed for semantic lookup across script boundaries.
"""
entries: list[FrameSummaryEntry] = []
for row in self._store.list_artifacts(self.id):
meta_path = row["meta_path"]
abs_meta = self._project_root / meta_path
if abs_meta.is_file():
meta = json.loads(abs_meta.read_text())
entries.append(
FrameSummaryEntry(
ref=meta["ref"],
kind=meta["kind"],
metric_id=meta.get("metric_id"),
semantic_kind=meta.get("semantic_kind"),
semantic_model=meta.get("semantic_model"),
created_at=meta.get("created_at"),
row_count=meta.get("row_count"),
)
)
entries.sort(key=lambda e: (e.created_at or "", e.ref))
return entries
[docs]
def close(self) -> None:
"""Release session resources: the evidence store and cached backends.
Safe to call more than once. After closing, the evidence store is
reopened lazily on next access via :meth:`_evidence_store`.
"""
if self._judgment_store is not None:
self._judgment_store.close()
self._judgment_store = None
if self._connection_runtime is not None:
self._connection_runtime.close_all()
# -- Report methods -----------------------------------------------------
[docs]
def save_report(
self,
artifact: MarivoReportArtifact,
*,
report_id: str | None = None,
adapter: Literal["mcp", "package"] = "package",
) -> ReportRegistration:
"""Persist a report package under this session and register it in the store.
Writes package files first, then registers metadata in the store so
that on-disk state and store state are always consistent.
Args:
artifact: The report artifact to persist.
report_id: Optional report identifier. When ``None``, a unique id
is generated with the ``rpt_`` prefix.
adapter: Materialization adapter to use:
- ``"package"`` (default) writes only the canonical JSON
package via :func:`write_report_artifact`.
- ``"mcp"`` writes the canonical package plus MCP adapter files
via :func:`materialize_mcp_adapter`.
Returns:
A :class:`ReportRegistration` with the report id, package dir,
entrypoint, and content hash.
Raises:
ValueError: If *report_id* is supplied but not safe for use as a
directory name.
Example:
>>> registration = session.save_report(artifact)
>>> registration.report_id
'rpt_abc123'
"""
import secrets
from marivo.analysis.publish.publish_hash import compute_package_hash
from marivo.analysis.session._layout import report_dir
resolved_id = report_id if report_id is not None else f"rpt_{secrets.token_hex(6)}"
pkg_dir = report_dir(self._layout, resolved_id)
# Materialize package files to disk.
if adapter == "mcp":
from marivo.analysis.publish.report_mcp_adapter import materialize_mcp_adapter
updated = materialize_mcp_adapter(artifact, pkg_dir)
elif adapter == "package":
from marivo.analysis.publish.report_package import write_report_artifact
write_report_artifact(artifact, pkg_dir)
updated = artifact
else:
raise ValueError(f"unknown adapter: {adapter!r}")
# Determine the primary entrypoint from the updated manifest.
entrypoints = updated.manifest.entrypoints
entrypoint = next(iter(entrypoints.values())) if entrypoints else ""
# Compute package hash after files are written.
content_hash = compute_package_hash(pkg_dir)
# Register in the store after files are on disk.
package_dir_relative = self._layout.relative_path(pkg_dir)
self._store.record_report(
session_id=self.id,
report_id=resolved_id,
package_dir=package_dir_relative,
entrypoint=entrypoint,
package_hash=content_hash,
)
return ReportRegistration(
report_id=resolved_id,
package_dir=pkg_dir,
entrypoint=entrypoint,
content_hash=content_hash,
)
[docs]
def validate_report(self, report_id: str) -> ReportPackageValidationResult:
"""Resolve a registered report's package dir and validate it.
Args:
report_id: The report identifier previously used with
:meth:`save_report`.
Returns:
A :class:`~marivo.analysis.publish.ReportPackageValidationResult`
indicating whether the package is valid.
Raises:
ReportPublishError: If *report_id* is not registered in this
session.
Example:
>>> result = session.validate_report("rpt_abc123")
>>> result.ok
True
"""
from marivo.analysis.errors import ReportPublishError
from marivo.analysis.publish.report_validation import validate_report_artifact
row = self._store.get_report(self.id, report_id)
if row is None:
raise ReportPublishError(
message=f"report {report_id!r} is not registered in session {self.name!r}",
details={"report_id": report_id, "session_name": self.name},
)
package_dir = self._project_root / row["package_dir"]
from marivo.analysis.publish.report_package import load_report_artifact
artifact = load_report_artifact(package_dir)
return validate_report_artifact(artifact)
[docs]
def publish_report(
self,
report_id: str,
*,
exported_by: str | None = None,
exported_at: str | None = None,
target: str | PublishTarget | None = None,
project_root: str | Path | None = None,
) -> PublishReportResult:
"""Publish a registered session report and record the published URL.
Resolves the package directory from the store, validates the package,
then delegates to the existing publish logic.
Args:
report_id: The report identifier previously used with
:meth:`save_report`.
exported_by: Exporter name. Defaults to the current OS user.
exported_at: ISO-8601 export timestamp. Defaults to now.
target: Publish target (a path string for local filesystem, or a
:class:`~marivo.analysis.publish.PublishTarget` instance).
project_root: Project root for resolving publish config. Defaults
to this session's project root.
Returns:
A :class:`~marivo.analysis.publish.PublishReportResult` with the
published URI, content hash, exporter info, and file count.
Raises:
ReportPublishError: If *report_id* is not registered in this
session.
ReportPublishValidationError: If the package fails validation.
Example:
>>> result = session.publish_report("rpt_abc123", target="/published")
>>> result.uri
'file:///published/...'
"""
from marivo.analysis.errors import ReportPublishError
from marivo.analysis.publish.report_publish import publish_report_package
row = self._store.get_report(self.id, report_id)
if row is None:
raise ReportPublishError(
message=f"report {report_id!r} is not registered in session {self.name!r}",
details={"report_id": report_id, "session_name": self.name},
)
package_dir = self._project_root / row["package_dir"]
result = publish_report_package(
package_dir,
exported_by=exported_by,
exported_at=exported_at,
target=target,
project_root=project_root or self._project_root,
)
# Record the published URL in the store.
self._store.update_report_published_url(self.id, report_id, result.uri)
return result
def _evidence_store(self) -> JudgmentStore | None:
"""Return the lazily-opened JudgmentStore, or None if unavailable."""
if self._judgment_store is not None:
return self._judgment_store
if self._judgment_store_unavailable:
return None
from marivo.analysis.errors import EvidenceStoreUnavailableError
from marivo.analysis.evidence.store import open_judgment_store, run_startup_gc
db_path = self._layout.session_dir / "judgment.db"
try:
store = open_judgment_store(db_path)
except EvidenceStoreUnavailableError:
self._judgment_store_unavailable = True
return None
run_startup_gc(store, self._layout.frames_dir)
self._judgment_store = store
return store
[docs]
def knowledge(self) -> SessionKnowledge:
"""Return a SessionKnowledge projection for this session."""
from marivo.analysis.evidence.knowledge import build_session_knowledge
db_path = self._layout.session_dir / "judgment.db"
if not db_path.exists():
from datetime import UTC
from datetime import datetime as _dt
from marivo.analysis.evidence.knowledge import SessionKnowledge
now = _dt.now(UTC)
return SessionKnowledge(
session_id=self.id,
snapshot_id=f"snap_{self.id}_{int(now.timestamp() * 1_000_000)}",
snapshot_at=now,
evidence_completeness="unavailable",
)
return build_session_knowledge(db_path=db_path, session_id=self.id)
@property
def evidence(self) -> EvidenceNamespace:
"""Return Surface 3 evidence lookup helpers."""
return EvidenceNamespace(self)
@property
def discover(self) -> SessionDiscoverNamespace:
"""Return session-bound candidate discovery helpers."""
return SessionDiscoverNamespace(self)
@property
def transform(self) -> SessionTransformNamespace:
"""Return session-bound transform helpers."""
return SessionTransformNamespace(self)
[docs]
def observe(
self,
metric: MetricInput,
*,
timescope: TimeScopeInput = None,
grain: GrainInput = None,
dimensions: list[DimensionInput] | None = None,
where: dict[DimensionInput, SliceValue] | None = None,
time_dimension: DimensionInput | None = None,
expect_shape: SemanticShape | None = None,
) -> MetricFrame:
"""Materialize a metric into a typed MetricFrame.
When to use: starting point for any metric analysis workflow.
Resolves ``metric`` against the active semantic catalog, applies the
optional ``timescope`` / ``grain`` / ``dimensions`` / ``where`` filters, executes against
the session's backend, and persists the result as a MetricFrame on disk.
Args:
metric: Catalog metric object or ``SemanticRef`` from ``session.catalog``.
Bare strings are rejected.
timescope: Half-open time range ``{"start": ..., "end": ...}`` — start is
inclusive, end is exclusive. For date-only strings, ``end="2026-08-01"``
means data from August 1 is **not** included.
grain: Optional time bucket grain. When present, observe returns a time
series or panel depending on ``dimensions``.
dimensions: Segment axes. In v1 all dimensions must resolve to the same
entity as ``metric``.
where: Pre-aggregation row filter. Keys are catalog dimension objects/refs for
the filtered dimension; values are either a scalar (``==``), a list
(``in``), or ``{"op": "<op>", "value": ...}`` where op is one of
``==, !=, in, >, >=, <, <=, between``.
time_dimension: Pick the entity time axis as
a catalog time-dimension object/ref when an entity declares multiple
``@ms.time_dimension`` columns. Omit when the entity has a single (or
default) time dimension.
expect_shape: Optional guard. If set, observe predicts the output shape
from ``grain``/``dimensions`` and raises ``SemanticKindMismatchError``
before any backend work when the prediction differs.
Raises:
MetricNotFoundError: The metric id is unknown or not ``<domain>.<metric>``.
SemanticKindMismatchError: ``metric`` is not a catalog metric object/ref,
``time_dimension`` is not a catalog dimension object/ref, or a ``where``
key is not a catalog dimension object/ref.
ObservePlanningError: Planning failed (e.g. cross-datasource plan, missing
path, ambiguous dimension). Check ``details["code"]`` for the specific
error code.
Example:
>>> catalog = session.catalog
>>> revenue = catalog.get("sales.revenue")
>>> country = catalog.get("sales.orders.country").ref
>>> frame = session.observe(
... revenue,
... timescope={"start": "2026-07-01", "end": "2026-10-01"},
... grain="day",
... dimensions=[country],
... )
>>> frame.summary()
"""
from marivo.analysis.intents.observe import observe
return observe(
metric,
timescope=timescope,
grain=grain,
dimensions=dimensions,
where=where,
time_dimension=time_dimension,
expect_shape=expect_shape,
session=self,
)
[docs]
def compare(
self,
current: MetricFrame,
baseline: MetricFrame,
*,
alignment: AlignmentPolicy | None = None,
) -> DeltaFrame:
"""Compute the typed delta between two MetricFrames (current minus baseline).
When to use: quantify change between two periods; produces a DeltaFrame for decompose or discover.
The two frames must share ``metric_id`` and ``semantic_kind``. ``segmented``
frames must share segment columns; ``panel`` frames must share grain.
Args:
current: Current-period MetricFrame.
baseline: Baseline-period MetricFrame.
alignment: Defaults to ``mv.window_bucket()``. For
``segmented`` frames, only ``window_bucket`` is supported in v1.
Raises:
SemanticKindMismatchError: Different ``metric_id``, ``semantic_kind``, or
``current``/``baseline`` is not a MetricFrame.
SegmentDimensionMismatchError: ``segmented`` frames disagree on segment columns.
PanelGrainMismatchError: ``panel`` frames disagree on time grain.
AlignmentPolicyNotApplicableError: Alignment kind incompatible with the frame shape.
CrossSessionFrameError: A frame belongs to a different session.
Example:
>>> revenue = session.catalog.get("sales.revenue")
>>> cur = session.observe(revenue, timescope={"start": "2026-07-01", "end": "2026-10-01"})
>>> base = session.observe(revenue, timescope={"start": "2025-07-01", "end": "2025-10-01"})
>>> delta = session.compare(cur, base, alignment=mv.window_bucket())
"""
from marivo.analysis.intents.compare import compare
return compare(current, baseline, alignment=alignment, session=self)
[docs]
def decompose(
self,
frame: DeltaFrame,
*,
axis: DimensionInput,
) -> AttributionFrame:
"""Attribute a DeltaFrame's movement across a chosen segment axis.
When to use: attribute a delta to dimension segments (why did revenue drop?).
For ``panel`` deltas, ``axis`` must be one of the frame's segment dimensions.
For ``time_series`` deltas, ``axis`` is the bucket-start column.
Args:
frame: A DeltaFrame produced by ``session.compare``.
axis: Catalog dimension ref/object to attribute over. Dotted ids
resolve to the persisted DeltaFrame column leaf when present.
Raises:
SemanticKindMismatchError: ``frame`` is not a DeltaFrame, or ``axis`` is not a catalog dimension.
AxisNotInPanelDimensionsError: ``axis`` is not a segment column of the panel.
CrossSessionFrameError: ``frame`` belongs to a different session.
Example:
>>> delta = session.compare(cur, base, alignment=mv.window_bucket())
>>> attribution = session.decompose(delta, axis=session.catalog.get("sales.orders.country").ref)
>>> attribution.summary()
"""
from marivo.analysis.intents.decompose import decompose
return decompose(frame, axis=axis, session=self)
[docs]
def correlate(
self,
a: MetricFrame,
b: MetricFrame,
*,
measure_a: str | None = None,
measure_b: str | None = None,
alignment: AlignmentPolicy | None = None,
method: Literal["pearson"] = "pearson",
) -> AssociationResult:
"""Measure the association between two MetricFrames over aligned buckets.
When to use: measure statistical association between two metrics over aligned time buckets.
v1 only supports Pearson correlation under ``window_bucket`` alignment with
zero-lag behavior. Both frames must belong to the active session.
Args:
a: First MetricFrame.
b: Second MetricFrame.
measure_a: Numeric column on ``a``. Defaults to the frame's measure column.
measure_b: Numeric column on ``b``. Defaults to the frame's measure column.
alignment: Defaults to ``mv.window_bucket()``.
method: Only ``"pearson"`` in v1.
Raises:
SemanticKindMismatchError: Inputs are not MetricFrames, or alignment
kinds are unsupported.
AlignmentFailedError: Frames cannot be aligned (e.g. no overlapping buckets).
CrossSessionFrameError: A frame belongs to a different session.
Example:
>>> result = session.correlate(
... a, b,
... alignment=mv.window_bucket(),
... )
>>> result.summary()
"""
from marivo.analysis.intents.correlate import correlate
return correlate(
a,
b,
measure_a=measure_a,
measure_b=measure_b,
alignment=alignment,
method=method,
session=self,
)
[docs]
def forecast(
self,
history: MetricFrame,
*,
horizon: int,
model: Literal["naive", "seasonal_naive", "drift"] = "seasonal_naive",
seasonality_period: int | None = None,
interval_level: float = 0.95,
measure_column: str | None = None,
) -> ForecastFrame:
"""Project a time_series or panel MetricFrame forward by ``horizon`` buckets.
When to use: project a time series forward; requires time_series or panel shape.
v1 requires continuous time buckets and no NaN values. Impute or re-observe
before forecasting. ``seasonal_naive`` needs at least
``seasonality_period + 1`` training rows per series.
Args:
history: A ``time_series`` or ``panel`` MetricFrame.
horizon: Number of buckets to project. Must be >= 1.
model: Forecast strategy. ``seasonal_naive`` defaults to the grain-typical period.
seasonality_period: Override for the seasonality period. Defaults by grain
(day=7, week=52, month=12, quarter=4).
interval_level: Confidence level for prediction intervals. Must be in (0, 1).
measure_column: Numeric column to forecast. Defaults to the frame's measure column.
Raises:
ForecastShapeUnsupportedError: ``history`` is not a time_series / panel MetricFrame,
or its grain is not in {day, week, month, quarter}.
ForecastPolicyError: ``horizon`` or ``interval_level`` is out of range.
ForecastInsufficientHistoryError: Not enough rows for the chosen model.
ForecastInputQualityError: ``history`` contains NaN values in ``value``.
CrossSessionFrameError: ``history`` belongs to a different session.
Example:
>>> history = session.observe(
... session.catalog.get("sales.revenue"),
... timescope={"start": "2026-01-01", "end": "2026-04-01"}, grain="day",
... )
>>> forecast = session.forecast(history, horizon=30)
>>> forecast.summary()
"""
from marivo.analysis.intents.forecast import forecast
return forecast(
history,
horizon=horizon,
model=model,
seasonality_period=seasonality_period,
interval_level=interval_level,
measure_column=measure_column,
session=self,
)
[docs]
def assess_quality(self, frame: BaseFrame) -> QualityReport:
"""Run quality checks over a MetricFrame and return a structured report.
When to use: check data quality (nulls, outliers, coverage) before analysis.
v1 accepts only MetricFrames. Reports for DeltaFrame / CandidateSet /
ForecastFrame / AttributionFrame are planned for later releases. The
returned QualityReport carries per-check rows, blocking issues, and a list
of recommended follow-up intents.
Args:
frame: A MetricFrame to inspect.
Raises:
QualityShapeUnsupportedError: ``frame`` is not a MetricFrame.
CrossSessionFrameError: ``frame`` belongs to a different session.
Example:
>>> report = session.assess_quality(frame)
>>> for issue in report.blocking_issues:
... print(issue)
"""
from marivo.analysis.intents.assess_quality import assess_quality
return assess_quality(frame, session=self)
[docs]
def hypothesis_test(
self,
a: MetricFrame,
b: MetricFrame,
*,
hypothesis: Literal["mean_changed"] = "mean_changed",
value_a: str | None = None,
value_b: str | None = None,
alignment: AlignmentPolicy | None = None,
sampling: SamplingPolicy | None = None,
alpha: float = 0.05,
) -> HypothesisTestResult:
"""Run a paired hypothesis test over two compatible MetricFrames.
When to use: statistically validate whether a metric changed between two periods.
v1 only supports ``hypothesis="mean_changed"`` under ``window_bucket``
alignment. Scalar MetricFrames are not testable. ``a`` and ``b`` must share
``semantic_kind`` and ``semantic_model``; ``sampling.pairing`` must match
the frame shape (``segment_key`` for segmented, ``window_bucket`` for
time_series / panel).
Args:
a: Current MetricFrame.
b: Baseline MetricFrame.
hypothesis: Only ``"mean_changed"`` in v1.
value_a: Numeric column on ``a``. Defaults to the frame's measure column.
value_b: Numeric column on ``b``. Defaults to the frame's measure column.
alignment: Defaults to ``mv.window_bucket()``.
sampling: Defaults to ``SamplingPolicy()`` (pairing inferred from shape).
alpha: Significance level in (0, 0.5].
Raises:
SemanticKindMismatchError: Inputs are not MetricFrames, or differ in
``semantic_kind`` / ``semantic_model``.
TestPolicyError: ``hypothesis`` / ``alpha`` / ``alignment.kind`` is unsupported.
TestAlignmentError: Frames cannot be paired under the alignment.
TestShapeNotTestableError: Frame shape is scalar or otherwise untestable.
CrossSessionFrameError: A frame belongs to a different session.
Example:
>>> result = session.hypothesis_test(cur, base)
>>> result.summary()
"""
from marivo.analysis.intents.test import hypothesis_test
return hypothesis_test(
a,
b,
hypothesis=hypothesis,
value_a=value_a,
value_b=value_b,
alignment=alignment,
sampling=sampling,
alpha=alpha,
session=self,
)
[docs]
def from_pandas(
self,
df: Any,
*,
description: str | None = None,
sources: list[Any] | None = None,
) -> ExplorationResult:
"""Import a pandas DataFrame into the session as an ExplorationResult.
Use this when you have data from an external source (CSV, API response,
manual construction) that you want to bring into the Marivo analysis
pipeline. The returned ExplorationResult is an untyped scratch frame;
promote it with ``promote_metric_frame`` or similar before passing to
typed intents like ``compare`` or ``decompose``.
Args:
df: Source DataFrame. A defensive copy is made internally.
description: Human-readable note stored in frame metadata.
sources: Optional lineage references to upstream artifacts that
produced this data.
Returns:
An ExplorationResult persisted to the session's frame store.
Raises:
SessionNotWritableError: If the resolved session is read-only.
Example:
>>> result = session.from_pandas(my_df, description="daily sales extract")
>>> mf = session.promote_metric_frame(result, metric=metric_ref, ...)
"""
from marivo.analysis.escape_hatch import from_pandas
return from_pandas(df, session=self, description=description, sources=sources)
[docs]
def explore_ibis(
self,
query_builder: Callable[[Any], Any],
*,
datasource: str,
description: str | None = None,
sources: list[Any] | None = None,
) -> ExplorationResult:
"""Run an ibis query against a datasource and return an ExplorationResult.
Use this when you need to query a registered datasource with custom ibis
logic that goes beyond what the semantic model exposes. The query is
executed immediately and the result is persisted as an untyped scratch
frame. Promote the result before passing to typed intents.
Args:
query_builder: A callable that receives an ibis backend connection
and returns an ibis expression (table or column expression).
The callable runs in its own closure scope — you must
``import ibis`` (or ``from ibis import _``) in your module
before using ibis top-level names like ``ibis.desc()`` or
``_`` inside the callable.
datasource: Name of the datasource registered in the session's
backend cache.
description: Human-readable note stored in frame metadata.
sources: Optional lineage references to upstream artifacts.
Returns:
An ExplorationResult containing the query result, persisted to disk.
Raises:
TypeError: If ``query_builder`` does not return a valid ibis expression.
NameError: If the callable references ibis names not in scope
(e.g. forgot ``import ibis``).
SessionNotWritableError: If the resolved session is read-only.
Example:
>>> import ibis
>>> result = session.explore_ibis(
... lambda con: con.table("orders").order_by(ibis.desc("amount")),
... datasource="warehouse",
... )
"""
from marivo.analysis.escape_hatch import explore_ibis
return explore_ibis(
query_builder,
datasource=datasource,
session=self,
description=description,
sources=sources,
)
def ensure_session_can_execute(session: Session) -> None:
"""Raise ``NoBackendFactoryError`` when the session has no backend factory."""
from marivo.analysis.errors import NoBackendFactoryError
if session.is_read_only:
raise NoBackendFactoryError(
message=f"session '{session.name}' has no backend factory configured",
details={"session_name": session.name},
)
# Deprecated: kept for backward compatibility with intent modules that import
# ensure_session_writable. Will be removed once those modules are migrated to
# ensure_session_can_execute (Task 5).
ensure_session_writable = ensure_session_can_execute
@dataclass(frozen=True)
class SessionDiscoverNamespace:
"""Session-bound candidate discovery helpers."""
_session: Session
def point_anomalies(
self,
source: MetricFrame,
*,
value: str | None = None,
threshold: float | None = None,
) -> CandidateSet:
"""Find time-series points with unusual values.
Source must be a MetricFrame with time_series or panel shape.
``threshold`` is an absolute z-score cutoff (|z| >= threshold); default 3.0.
Lower values flag more candidates.
"""
from marivo.analysis.intents.discover import discover
return discover.point_anomalies(
source,
value=value,
threshold=threshold,
session=self._session,
)
def period_shifts(
self,
source: DeltaFrame,
*,
value: str | None = None,
threshold: float | None = None,
) -> CandidateSet:
"""Find period-shift candidates from a DeltaFrame.
Requires at least four time buckets in a time-series delta, or at least
one panel series with four time buckets.
``threshold`` is an absolute z-score cutoff on rolling window means
(|z| >= threshold); default 2.0.
"""
from marivo.analysis.intents.discover import discover
return discover.period_shifts(
source,
value=value,
threshold=threshold,
session=self._session,
)
def driver_axes(
self,
source: DeltaFrame,
*,
search_space: list[DimensionInput],
value: str | None = None,
limit: int | None = None,
) -> CandidateSet:
"""Find dimensions that explain a delta.
Source must be a DeltaFrame. ``search_space`` is required and lists
the candidate dimensions to evaluate for explanatory power.
"""
from marivo.analysis.intents.discover import discover
return discover.driver_axes(
source,
search_space=search_space,
value=value,
limit=limit,
session=self._session,
)
def interesting_slices(
self,
source: MetricFrame | DeltaFrame,
*,
search_space: list[DimensionInput] | None = None,
value: str | None = None,
threshold: float | None = None,
limit: int | None = None,
) -> CandidateSet:
"""Find dimension slices with notable values.
Accepts a MetricFrame or DeltaFrame. Optionally narrow the search
with ``search_space``; otherwise all available dimensions are probed.
``threshold`` is an absolute z-score for MetricFrame (|z| >= threshold)
or absolute delta value for DeltaFrame; default 2.0.
"""
from marivo.analysis.intents.discover import discover
return discover.interesting_slices(
source,
search_space=search_space,
value=value,
threshold=threshold,
limit=limit,
session=self._session,
)
def interesting_windows(
self,
source: MetricFrame | DeltaFrame,
*,
value: str | None = None,
threshold: float | None = None,
) -> CandidateSet:
"""Find time windows with notable behavior.
Source must have time_series or panel shape. Returns windows where
the metric exhibits significant trends, level shifts, or volatility.
``threshold`` is an absolute z-score cutoff (|z| >= threshold); default 2.0.
"""
from marivo.analysis.intents.discover import discover
return discover.interesting_windows(
source,
value=value,
threshold=threshold,
session=self._session,
)
def cross_sectional_outliers(
self,
source: MetricFrame,
*,
peer_scope: list[DimensionInput] | None = None,
value: str | None = None,
threshold: float | None = None,
) -> CandidateSet:
"""Find segments that are outliers compared to their peers.
Source must be a MetricFrame with segmented or panel shape.
``peer_scope`` defines the grouping for peer comparison; defaults to
all non-time axes.
``threshold`` is a robust z-score cutoff using MAD
(|robust_z| >= threshold); default 3.0.
"""
from marivo.analysis.intents.discover import discover
return discover.cross_sectional_outliers(
source,
peer_scope=peer_scope,
value=value,
threshold=threshold,
session=self._session,
)
@dataclass(frozen=True)
class SessionTransformNamespace:
"""Session-bound family-preserving transform helpers."""
_session: Session
def filter(self, frame: object, *, predicate: Callable[[Any], Any]) -> MetricFrame | DeltaFrame:
"""Filter rows using a predicate function.
The predicate receives the underlying DataFrame and must return a
boolean Series of the same length.
"""
from marivo.analysis.intents.transform import transform
return transform.filter(frame, predicate=predicate, session=self._session)
def slice(self, frame: object, *, where: dict[DimensionInput, Any]) -> MetricFrame | DeltaFrame:
"""Filter rows by exact axis values.
``where`` maps catalog dimension refs/objects to the value(s) to keep.
Unlike ``filter``, operates on raw axis values without a callable.
"""
from marivo.analysis.intents.transform import transform
return transform.slice(frame, where=where, session=self._session)
def rollup(self, frame: object, *, drop_axes: list[DimensionInput]) -> MetricFrame | DeltaFrame:
"""Aggregate to coarser segments by dropping axes.
Removes the listed catalog dimensions and re-aggregates
measures over the remaining axes.
"""
from marivo.analysis.intents.transform import transform
return transform.rollup(frame, drop_axes=drop_axes, session=self._session)
def topk(
self,
frame: object,
*,
by: str,
limit: int,
order: str | None = None,
) -> MetricFrame | DeltaFrame:
"""Keep the top N rows ranked by a measure column.
``order`` defaults to ``"decrease"`` (largest first). Use
``"increase"`` to select the smallest values instead.
"""
from marivo.analysis.intents.transform import transform
return transform.topk(
frame,
by=by,
limit=limit,
order=cast("Any", order),
session=self._session,
)
def bottomk(self, frame: object, *, by: str, limit: int) -> MetricFrame | DeltaFrame:
"""Keep the bottom N rows ranked by a measure column.
Equivalent to ``topk(..., order="increase")``. Returns the rows with
the smallest values in the ``by`` column.
"""
from marivo.analysis.intents.transform import transform
return transform.bottomk(frame, by=by, limit=limit, session=self._session)
def rank(
self,
frame: object,
*,
by: str,
method: str = "ordinal",
rank_column: str = "rank",
) -> MetricFrame | DeltaFrame:
"""Add a rank column ordered by a measure.
``method`` controls tie-breaking: ``"ordinal"``, ``"dense"``,
``"min"``, or ``"max"``. The new column is named ``rank_column``.
"""
from marivo.analysis.intents.transform import transform
return transform.rank(
frame,
by=by,
method=cast("Any", method),
rank_column=rank_column,
session=self._session,
)
def normalize(
self,
frame: MetricFrame,
*,
mode: NormalizeKind,
baseline: object | None = None,
) -> MetricFrame:
"""Convert measure values to a normalized form (MetricFrame only).
Supported modes: ``"index"``, ``"share"``, ``"pct_change"``,
``"per_unit"``, ``"z_score"``. ``baseline`` sets the reference point
when required by the mode.
"""
from marivo.analysis.intents.transform import transform
return transform.normalize(
frame,
mode=mode,
baseline=baseline,
session=self._session,
)
def window(self, frame: object, *, window: object) -> MetricFrame | DeltaFrame:
"""Restrict a frame to a time window.
``window`` is an ``AbsoluteWindow`` or compatible specification that
defines the start/end bounds. The returned frame contains only rows
within those bounds, preserving the original frame kind.
"""
from marivo.analysis.intents.transform import transform
return transform.window(frame, window=window, session=self._session)
@dataclass(frozen=True)
class EvidenceNamespace:
"""Session-scoped Surface 3 evidence object lookups."""
_session: Session
def findings(
self,
*,
artifact_id: str | None = None,
finding_type: str | None = None,
subject: Any = None,
) -> Iterator[Finding]:
"""Return Surface 3 findings for this session."""
from marivo.analysis.evidence.audit import query_findings
return query_findings(
db_path=self._session._layout.session_dir / "judgment.db",
session_id=self._session.id,
artifact_id=artifact_id,
finding_type=finding_type,
subject=subject,
)
def propositions(
self,
*,
proposition_type: str | None = None,
subject: Any = None,
status: str | None = None,
) -> Iterator[Proposition]:
"""Return Surface 3 propositions for this session."""
from marivo.analysis.evidence.audit import query_propositions
return query_propositions(
db_path=self._session._layout.session_dir / "judgment.db",
session_id=self._session.id,
proposition_type=proposition_type,
subject=subject,
status=status,
)
def assessments(
self,
*,
proposition_id: str | None = None,
latest_only: bool = True,
) -> Iterator[Assessment]:
"""Return Surface 3 assessments for this session."""
from marivo.analysis.evidence.audit import query_assessments
return query_assessments(
db_path=self._session._layout.session_dir / "judgment.db",
session_id=self._session.id,
proposition_id=proposition_id,
latest_only=latest_only,
)
def proposition(self, proposition_id: str) -> Proposition:
"""Return the proposition with the given id for this session."""
from marivo.analysis.evidence.audit import get_proposition
return get_proposition(
db_path=self._session._layout.session_dir / "judgment.db",
proposition_id=proposition_id,
)
def latest_assessment(self, proposition_id: str) -> Assessment | None:
"""Return the most recent assessment for a proposition, or None.
Returns ``None`` when the proposition has never been assessed.
"""
from marivo.analysis.evidence.audit import get_latest_assessment
return get_latest_assessment(
db_path=self._session._layout.session_dir / "judgment.db",
proposition_id=proposition_id,
)
def trace(self, proposition_id: str) -> EvidenceTrace:
"""Return the full evidence trace for a proposition.
The trace links the proposition to its supporting findings and
assessments for audit and explanation.
"""
from marivo.analysis.evidence.audit import build_evidence_trace
return build_evidence_trace(
db_path=self._session._layout.session_dir / "judgment.db",
proposition_id=proposition_id,
)