"""Shared bounded preview DTOs and normalization helpers."""
from __future__ import annotations
from collections.abc import Iterable, Mapping, Sequence
from dataclasses import dataclass, field
from datetime import date, datetime, time
from typing import Any, Literal, TypedDict
from zoneinfo import ZoneInfo
import pandas as pd
from marivo.render import format_bounded_card, result_repr
PreviewKind = Literal[
"datasource_table",
"semantic_dataset",
"semantic_dimension",
"semantic_measure",
"semantic_field",
"semantic_metric",
"analysis_frame",
]
PreviewWarningKind = Literal[
"wide_table",
"null_heavy_column",
"constant_column",
"time_parse_risk",
"empty_preview",
"backend_limit_unknown",
"approximate_preview",
]
PreviewSampleMethod = Literal["head", "bounded_limit", "ordered_limit", "pre_aggregate_limit"]
PREVIEW_DEFAULT_LIMIT = 20
PREVIEW_MAX_LIMIT = 100
METRIC_PREVIEW_SAMPLE_SIZE = 10_000
_PREVIEW_MIN_LIMIT = 1
_WIDE_TABLE_THRESHOLD = 50
class PreviewFilter(TypedDict, total=False):
column: str
op: Literal["=", "!=", "<", "<=", ">", ">=", "in", "is_null", "is_not_null"]
value: object
class PreviewOrder(TypedDict, total=False):
column: str
direction: Literal["asc", "desc"]
PreviewTimezoneInfo = dict[str, str | None]
class PreviewLimitError(ValueError):
"""Raised when a preview limit is outside the public contract."""
def __init__(self, limit: int, *, min_limit: int, max_limit: int) -> None:
super().__init__(f"preview limit must be between {min_limit} and {max_limit}")
self.limit = limit
self.min_limit = min_limit
self.max_limit = max_limit
[docs]
@dataclass(frozen=True)
class PreviewWarning:
kind: PreviewWarningKind
message: str
columns: tuple[str, ...] = ()
[docs]
@dataclass(frozen=True)
class PreviewSamplePolicy:
method: PreviewSampleMethod
limit: int
order_by: tuple[str, ...] = ()
filters: tuple[PreviewFilter, ...] = ()
[docs]
@dataclass(frozen=True, repr=False)
class PreviewResult:
kind: PreviewKind
ref: str
columns: tuple[str, ...]
types: dict[str, str]
rows: tuple[dict[str, object], ...]
requested_limit: int
returned_row_count: int
is_truncated: bool
warnings: tuple[PreviewWarning, ...] = field(default_factory=tuple)
timezones: dict[str, PreviewTimezoneInfo] = field(default_factory=dict)
sample_policy: PreviewSamplePolicy = field(
default_factory=lambda: PreviewSamplePolicy(
method="bounded_limit",
limit=PREVIEW_DEFAULT_LIMIT,
)
)
def _repr_identity(self) -> str:
return (
f"PreviewResult kind={self.kind} ref={self.ref} "
f"rows={self.returned_row_count}/{self.requested_limit}"
)
def render(self) -> str:
preview_rows = [[str(row.get(col, "")) for col in self.columns] for row in self.rows]
status_parts = [f"truncated={self.is_truncated}"]
if self.timezones:
labels = [
f"{column}:read_tz={info.get('read_tz')} report_tz={info.get('report_tz')}"
for column, info in sorted(self.timezones.items())
]
status_parts.append("; ".join(labels))
return format_bounded_card(
identity=self._repr_identity(),
status=" ".join(status_parts),
columns=list(self.columns),
rows=preview_rows,
row_count=self.returned_row_count,
preview_truncation_hint="call preview(limit=...)",
available=(".render()", ".show()"),
)
def __repr__(self) -> str:
return result_repr(self._repr_identity())
def show(self) -> None:
print(self.render())
def validate_preview_limit(
limit: int,
*,
min_limit: int = _PREVIEW_MIN_LIMIT,
max_limit: int = PREVIEW_MAX_LIMIT,
) -> int:
if not isinstance(limit, int) or isinstance(limit, bool):
raise PreviewLimitError(limit, min_limit=min_limit, max_limit=max_limit)
if limit < min_limit or limit > max_limit:
raise PreviewLimitError(limit, min_limit=min_limit, max_limit=max_limit)
return limit
def display_column_names(columns: Iterable[Any]) -> tuple[str, ...]:
display_columns: list[str] = []
used_columns: set[str] = set()
for column in columns:
column_name = str(column)
display_name = column_name
suffix = 2
while display_name in used_columns:
display_name = f"{column_name}#{suffix}"
suffix += 1
used_columns.add(display_name)
display_columns.append(display_name)
return tuple(display_columns)
def _is_missing(value: Any) -> bool:
try:
missing = pd.isna(value)
except (TypeError, ValueError):
return False
if isinstance(missing, bool):
return missing
item = getattr(missing, "item", None)
if callable(item):
try:
scalar = item()
except (TypeError, ValueError):
return False
return scalar if isinstance(scalar, bool) else False
return False
def normalize_preview_cell(value: Any, *, report_tz: str | None = None) -> object:
if _is_missing(value):
return None
if isinstance(value, pd.Timestamp):
if value.tzinfo is not None and report_tz is not None:
return value.tz_convert(ZoneInfo(report_tz)).tz_localize(None).isoformat()
return value.isoformat()
if isinstance(value, (datetime, date, time)):
return value.isoformat()
if isinstance(value, pd.Timedelta):
return str(value)
item = getattr(value, "item", None)
if callable(item):
try:
return item()
except (TypeError, ValueError):
return value
return value
def _types_from_dataframe(
dataframe: pd.DataFrame,
display_columns: Sequence[str],
explicit_types: Mapping[str, str] | None,
) -> dict[str, str]:
if explicit_types is not None:
return {str(key): str(value) for key, value in explicit_types.items()}
return {
display_column: str(dtype)
for display_column, dtype in zip(display_columns, dataframe.dtypes, strict=True)
}
def preview_from_pandas(
dataframe: pd.DataFrame,
*,
kind: PreviewKind,
ref: str,
requested_limit: int,
sample_policy: PreviewSamplePolicy,
types: Mapping[str, str] | None = None,
timezones: Mapping[str, PreviewTimezoneInfo] | None = None,
report_tz: str | None = None,
warnings: Iterable[PreviewWarning] = (),
) -> PreviewResult:
limit = validate_preview_limit(requested_limit)
display_columns = display_column_names(dataframe.columns)
source = dataframe.head(limit)
rows: list[dict[str, object]] = []
for row in source.itertuples(index=False, name=None):
out_row: dict[str, object] = {}
for column, value in zip(display_columns, row, strict=True):
out_row[column] = normalize_preview_cell(value, report_tz=report_tz)
rows.append(out_row)
result_warnings = list(warnings)
if len(display_columns) > _WIDE_TABLE_THRESHOLD:
result_warnings.append(
PreviewWarning(
kind="wide_table",
message=f"preview has {len(display_columns)} columns",
columns=display_columns,
)
)
if not rows:
result_warnings.append(
PreviewWarning(kind="empty_preview", message="preview returned no rows")
)
return PreviewResult(
kind=kind,
ref=ref,
columns=display_columns,
types=_types_from_dataframe(dataframe, display_columns, types),
rows=tuple(rows),
requested_limit=limit,
returned_row_count=len(rows),
is_truncated=len(dataframe) > limit,
warnings=tuple(result_warnings),
sample_policy=sample_policy,
timezones=dict(timezones or {}),
)
def preview_ibis_table(
table: Any,
*,
kind: PreviewKind,
ref: str,
limit: int,
sample_policy: PreviewSamplePolicy,
include_types: bool = True,
timezones: Mapping[str, PreviewTimezoneInfo] | None = None,
report_tz: str | None = None,
) -> PreviewResult:
limit = validate_preview_limit(limit)
dataframe = table.limit(limit + 1).execute()
schema_types = (
{name: str(dtype) for name, dtype in table.schema().items()} if include_types else {}
)
return preview_from_pandas(
dataframe,
kind=kind,
ref=ref,
requested_limit=limit,
sample_policy=sample_policy,
types=schema_types,
timezones=timezones,
report_tz=report_tz,
)
def preview_ibis_value(
value: Any,
*,
kind: PreviewKind,
ref: str,
limit: int,
column_name: str,
sample_policy: PreviewSamplePolicy,
include_types: bool = True,
timezones: Mapping[str, PreviewTimezoneInfo] | None = None,
report_tz: str | None = None,
) -> PreviewResult:
named_value = value.name(column_name) if callable(getattr(value, "name", None)) else value
table = named_value.as_table()
return preview_ibis_table(
table,
kind=kind,
ref=ref,
limit=limit,
sample_policy=sample_policy,
include_types=include_types,
timezones=timezones,
report_tz=report_tz,
)