"""Scan DTOs and source constructors for marivo.datasource.
Provides agent-safe scan configuration (ScanScope), scan result reporting
(ScanReport), column profiling (ColumnProfile, ColumnInspection), and
join-key probing (JoinSide, JoinKeyProbe) DTOs, plus the canonical
table/parquet/csv source constructors that the semantic authoring surface reuses.
"""
from __future__ import annotations
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Literal
from marivo.datasource.ir import CsvSourceIR, EntitySourceIR, ParquetSourceIR, TableSourceIR
from marivo.render import format_bounded_card, result_repr
ScanPartition = Mapping[str, str] | Literal["latest"] | None
PartitionResolution = Literal["explicit", "latest", "none", "unpruned"]
[docs]
@dataclass(frozen=True)
class ScanScope:
"""Agent-safe scan configuration with bounded defaults.
Defaults are chosen to keep automated scans safe and fast: at most
1000 rows, 100 columns, and a 30-second timeout.
Attributes:
partition: Partition filter; ``"latest"`` (default), an explicit
mapping like ``{"dt": "20260612"}``, or ``None`` for unpruned.
max_rows: Maximum rows returned from the scan.
max_columns: Maximum columns returned from the scan.
timeout_seconds: Scan timeout in seconds; ``None`` means no limit.
"""
partition: ScanPartition = "latest"
max_rows: int = 1000
max_columns: int = 100
timeout_seconds: int | None = 30
def __post_init__(self) -> None:
if self.max_rows < 1:
raise ValueError("ScanScope.max_rows must be positive.")
if self.max_columns < 1:
raise ValueError("ScanScope.max_columns must be positive.")
if self.timeout_seconds is not None and self.timeout_seconds < 1:
raise ValueError("ScanScope.timeout_seconds must be positive when provided.")
[docs]
@dataclass(frozen=True, repr=False)
class ScanReport:
"""Summary of a completed datasource scan.
Attributes:
partition_used: The partition that was actually used, or ``None``.
partition_resolution: How the partition was resolved.
rows_scanned: Number of rows observed.
columns_scanned: Column names in scan order.
truncated: Whether the result was truncated by ScanScope limits.
elapsed_seconds: Wall-clock scan duration.
warnings: Non-fatal warnings encountered during the scan.
"""
partition_used: Mapping[str, str] | None
partition_resolution: PartitionResolution
rows_scanned: int
columns_scanned: tuple[str, ...]
truncated: bool
elapsed_seconds: float
warnings: tuple[str, ...]
def _repr_identity(self) -> str:
return (
f"ScanReport rows={self.rows_scanned} "
f"columns={len(self.columns_scanned)} "
f"partition={self.partition_resolution}"
)
def render(self) -> str:
partition = (
"none"
if self.partition_used is None
else ", ".join(f"{key}={value}" for key, value in self.partition_used.items())
)
warnings = "none" if not self.warnings else "; ".join(self.warnings[:3])
return format_bounded_card(
identity=self._repr_identity(),
status=f"partition={partition} truncated={self.truncated} warnings={warnings}",
columns=list(self.columns_scanned),
available=(".render()", ".show()"),
)
def __repr__(self) -> str:
return result_repr(self._repr_identity())
def show(self) -> None:
print(self.render())
[docs]
@dataclass(frozen=True)
class ColumnProfile:
"""Statistical profile of a single column from a datasource scan.
Attributes:
name: Column name.
data_type: Resolved data type label.
nullable: Whether the column can contain nulls, if known.
comment: Column comment from the catalog, if any.
null_count: Number of null values observed.
empty_count: Number of empty string values observed.
distinct_count: Number of distinct non-null values.
top_values: Most frequent values with their counts.
sample_values: Representative sample of non-null values.
min_value: Minimum value (for orderable types).
max_value: Maximum value (for orderable types).
"""
name: str
data_type: str
nullable: bool | None
comment: str | None
null_count: int
empty_count: int
distinct_count: int
top_values: tuple[tuple[object, int], ...]
sample_values: tuple[object, ...]
min_value: object | None
max_value: object | None
[docs]
@dataclass(frozen=True, repr=False)
class ColumnInspection:
"""Column inspection result for a single datasource source.
Attributes:
datasource: Datasource short name.
source: Physical source that was inspected.
profiles: Column profiles in scan order.
scan: The scan report for this inspection.
"""
datasource: str
source: EntitySourceIR
profiles: tuple[ColumnProfile, ...]
scan: ScanReport
def _repr_identity(self) -> str:
return f"ColumnInspection datasource={self.datasource} columns={len(self.profiles)}"
def render(self) -> str:
return format_bounded_card(
identity=self._repr_identity(),
columns=[profile.name for profile in self.profiles],
available=(".render()", ".show()"),
)
def __repr__(self) -> str:
return result_repr(self._repr_identity())
def show(self) -> None:
print(self.render())
[docs]
@dataclass(frozen=True)
class JoinSide:
"""One side of a join-key probe.
Attributes:
datasource: Datasource short name.
source: Physical source on this side of the join.
columns: Column names participating in the join key.
"""
datasource: str
source: EntitySourceIR
columns: tuple[str, ...]
[docs]
@dataclass(frozen=True)
class JoinKeyProbe:
"""Result of probing join compatibility between two sources.
Attributes:
type_compatible: Whether the join key types are compatible.
sampled_key_count: Number of distinct keys in the sampled range.
matched_key_count: Number of keys present on both sides.
match_rate: Fraction of sampled keys that matched.
max_rows_per_key: Maximum fan-out on any single key.
avg_rows_per_key: Average fan-out across all keys.
cardinality_estimate: Estimated join cardinality.
from_scan: Scan report for the left (from) side.
to_scan: Scan report for the right (to) side.
"""
type_compatible: bool
sampled_key_count: int
matched_key_count: int
match_rate: float
max_rows_per_key: int
avg_rows_per_key: float
cardinality_estimate: Literal["one_to_one", "many_to_one", "indeterminate"]
from_scan: ScanReport
to_scan: ScanReport
[docs]
def table(name: str, /, *, database: str | tuple[str, ...] | None = None) -> TableSourceIR:
"""Build a structured table source reference.
Args:
name: Table name.
database: Optional database/catalog name or tuple of namespace
parts (e.g. ``("catalog", "schema")``).
Returns:
A ``TableSourceIR`` suitable for ``ms.entity(source=...)`` or
``md.entity(source=...)``.
"""
return TableSourceIR(table=name, database=database)
[docs]
def parquet(
path: str,
/,
*,
hive_partitioning: bool = False,
columns: tuple[str, ...] | list[str] | None = None,
) -> ParquetSourceIR:
"""Build a structured parquet source reference.
Args:
path: File path or glob pattern.
hive_partitioning: Whether the parquet source uses hive partitioning.
columns: Optional subset of columns to read.
Returns:
A ``ParquetSourceIR`` suitable for ``ms.entity(source=...)`` or
``md.entity(source=...)``.
"""
return ParquetSourceIR(
path=path,
hive_partitioning=hive_partitioning,
columns=tuple(columns) if columns is not None else None,
)
[docs]
def csv(
path: str,
/,
*,
header: bool = True,
delimiter: str = ",",
columns: tuple[str, ...] | list[str] | None = None,
) -> CsvSourceIR:
"""Build a structured CSV source reference.
Args:
path: File path or glob pattern.
header: Whether the CSV file has a header row. Defaults to True.
delimiter: Column delimiter. Defaults to ``","``.
columns: Optional subset of columns to read.
Returns:
A ``CsvSourceIR`` suitable for ``ms.entity(source=...)`` or
``md.entity(source=...)``.
"""
return CsvSourceIR(
path=path,
header=header,
delimiter=delimiter,
columns=tuple(columns) if columns is not None else None,
)