Skip to content

API Reference

Everything below is generated from the source at build time with mkdocstrings + Griffe. It reflects the exact installed version — there is no hand-maintained copy to fall out of date.

Ledger

The one object you'll use most. Every method is tool-shaped — usable directly, over REST, or as an MCP tool.

The main entry point for model-ledger v0.3.0.

Every public method is designed to work as an agent tool call: clear inputs, JSON-serializable outputs, no side effects beyond the ledger.

Source code in src/model_ledger/sdk/ledger.py
def __init__(self, backend: LedgerBackend | None = None) -> None:
    self._backend = backend or InMemoryLedgerBackend()
    self._name_cache: dict[str, ModelRef] = {}
    self._cache_complete = False  # True after bulk preload — skip individual lookups
    self._node_cache: list = []  # DataNodes from add() — reused by connect()

from_sqlite classmethod

from_sqlite(db_path: str) -> Ledger

Create a Ledger backed by a SQLite database.

Parameters:

Name Type Description Default
db_path str

Path to the SQLite database file. Created if it doesn't exist.

required
Example

ledger = Ledger.from_sqlite("./inventory.db")

Source code in src/model_ledger/sdk/ledger.py
@classmethod
def from_sqlite(cls, db_path: str) -> Ledger:
    """Create a Ledger backed by a SQLite database.

    Args:
        db_path: Path to the SQLite database file. Created if it doesn't exist.

    Example:
        >>> ledger = Ledger.from_sqlite("./inventory.db")
    """
    from model_ledger.backends.sqlite_ledger import SQLiteLedgerBackend

    return cls(SQLiteLedgerBackend(db_path))

from_snowflake classmethod

from_snowflake(
    connection: Any, schema: str = "MODEL_LEDGER"
) -> Ledger

Create a Ledger backed by Snowflake.

Parameters:

Name Type Description Default
connection Any

A Snowflake connection (snowflake.connector connection object).

required
schema str

Fully qualified schema name (e.g., "MY_DB.MODEL_LEDGER").

'MODEL_LEDGER'
Example

ledger = Ledger.from_snowflake(conn, schema="ANALYTICS.MODEL_LEDGER")

Source code in src/model_ledger/sdk/ledger.py
@classmethod
def from_snowflake(cls, connection: Any, schema: str = "MODEL_LEDGER") -> Ledger:
    """Create a Ledger backed by Snowflake.

    Args:
        connection: A Snowflake connection (snowflake.connector connection object).
        schema: Fully qualified schema name (e.g., "MY_DB.MODEL_LEDGER").

    Example:
        >>> ledger = Ledger.from_snowflake(conn, schema="ANALYTICS.MODEL_LEDGER")
    """
    from model_ledger.backends.snowflake import SnowflakeLedgerBackend

    return cls(SnowflakeLedgerBackend(connection=connection, schema=schema))

add

add(nodes)

Register DataNodes. Each becomes a ModelRef + discovered Snapshot.

Skips writing if the discovered payload is identical to the last snapshot (content-hash dedup). Preloads existing models in bulk to avoid per-node queries.

Source code in src/model_ledger/sdk/ledger.py
def add(self, nodes):
    """Register DataNodes. Each becomes a ModelRef + discovered Snapshot.

    Skips writing if the discovered payload is identical to the last snapshot
    (content-hash dedup). Preloads existing models in bulk to avoid per-node queries.
    """
    import hashlib
    import json

    from model_ledger.graph.models import DataNode

    if isinstance(nodes, DataNode):
        nodes = [nodes]
    self._node_cache.extend(nodes)

    # Bulk preload — one query for all models, one for all snapshot hashes
    if not self._cache_complete:
        for m in self._backend.list_models():
            self._name_cache[m.name] = m
        self._cache_complete = True

    # Preload last discovered snapshot content hashes for dedup.
    # We store _content_hash in the payload itself, so we can read just that field.
    existing_hashes: dict[str, str] = {}
    if hasattr(self._backend, "list_snapshot_content_hashes"):
        existing_hashes = self._backend.list_snapshot_content_hashes(event_type="discovered")
    elif hasattr(self._backend, "list_all_snapshots"):
        for s in self._backend.list_all_snapshots(event_type="discovered"):
            h = s.payload.get("_content_hash")
            if h:
                existing_hashes[s.model_hash] = h

    added = 0
    skipped = 0
    for node in nodes:
        ref = self.register(
            name=node.name,
            owner=node.metadata.get("owner") or "unknown",
            model_type=node.metadata.get("model_type")
            or node.metadata.get("node_type")
            or node.metadata.get("type")
            or "unknown",
            tier=node.metadata.get("tier") or "unclassified",
            purpose=node.metadata.get("purpose") or "",
            model_origin=node.metadata.get("model_origin") or "internal",
            actor=f"connector:{node.platform}" if node.platform else "system",
        )
        payload = {
            "platform": node.platform,
            "inputs": [{"identifier": p.identifier, **p.schema} for p in node.inputs],
            "outputs": [{"identifier": p.identifier, **p.schema} for p in node.outputs],
            **{
                k: v
                for k, v in node.metadata.items()
                if k
                not in (
                    "owner",
                    "node_type",
                    "tier",
                    "purpose",
                    "model_origin",
                    "source_updated_at",
                )
            },
        }

        # Content-hash dedup: skip if payload unchanged.
        # Exclude volatile fields (timestamps change between runs without
        # the model actually changing).
        _VOLATILE = {
            "created_at",
            "updated_at",
            "source_updated_at",
            "_content_hash",
            "change_detected",
            "change_occurred",
        }
        stable_payload = {k: v for k, v in payload.items() if k not in _VOLATILE}
        content_hash = hashlib.sha256(
            json.dumps(stable_payload, sort_keys=True, default=str).encode()
        ).hexdigest()
        # Update last_seen on every run, even if unchanged
        ref.last_seen = datetime.now(timezone.utc)
        self._backend.update_model(ref)

        if existing_hashes.get(ref.model_hash) == content_hash:
            skipped += 1
            continue

        payload["_content_hash"] = content_hash
        payload["change_detected"] = datetime.now(timezone.utc).isoformat()
        if node.metadata.get("source_updated_at"):
            payload["change_occurred"] = node.metadata["source_updated_at"]
        self.record(
            ref,
            event="discovered",
            payload=payload,
            actor=f"connector:{node.platform}" if node.platform else "system",
        )
        added += 1

    return {"added": added, "skipped": skipped}

connect

connect()

Match output ports to input ports. Write only new dependency links.

Uses cached nodes from add() if available (avoids re-reading from backend). Preloads existing edges to skip duplicates.

Source code in src/model_ledger/sdk/ledger.py
def connect(self):
    """Match output ports to input ports. Write only new dependency links.

    Uses cached nodes from add() if available (avoids re-reading from backend).
    Preloads existing edges to skip duplicates.
    """
    from collections import defaultdict

    # Use cached nodes from add() if available, otherwise load from backend
    nodes = self._node_cache if self._node_cache else self._load_discovered_nodes()

    # Build output port index
    output_index = defaultdict(list)
    for node in nodes:
        for port in node.outputs:
            output_index[port.identifier].append((node, port))

    # Preload existing edges to skip duplicates
    existing_edges: set[tuple[str, str]] = set()
    if hasattr(self._backend, "list_all_snapshots"):
        hash_to_name = {ref.model_hash: name for name, ref in self._name_cache.items()}
        for s in self._backend.list_all_snapshots(event_type="depends_on"):
            upstream = s.payload.get("upstream")
            downstream = hash_to_name.get(s.model_hash)
            if upstream and downstream:
                existing_edges.add((upstream, downstream))

    # Match ports and write only new edges
    links_created = 0
    links_skipped = 0
    seen = set()
    for node in nodes:
        for in_port in node.inputs:
            for upstream_node, out_port in output_index.get(in_port.identifier, []):
                if upstream_node.name == node.name:
                    continue
                if out_port != in_port:
                    continue
                key = (upstream_node.name, node.name)
                if key in seen:
                    continue
                seen.add(key)
                if key in existing_edges:
                    links_skipped += 1
                    continue
                try:
                    self.link_dependency(
                        upstream=upstream_node.name,
                        downstream=node.name,
                        relationship="data_flow",
                        actor="graph_builder",
                        metadata={
                            "via": in_port.identifier,
                            "via_schema": in_port.schema if in_port.schema else None,
                        },
                    )
                    links_created += 1
                except ModelNotFoundError:
                    continue
    return {"links_created": links_created, "links_skipped": links_skipped}

trace

trace(name)

Topological path from sources to this node.

Source code in src/model_ledger/sdk/ledger.py
def trace(self, name):
    """Topological path from sources to this node."""
    self._resolve_model(name)
    visited = set()
    order = []

    def _walk(n):
        if n in visited:
            return
        visited.add(n)
        for dep in self.dependencies(n, direction="upstream"):
            _walk(dep["model"].name)
        order.append(n)

    _walk(name)
    return order

upstream

upstream(name)

All models this one depends on (transitive).

Source code in src/model_ledger/sdk/ledger.py
def upstream(self, name):
    """All models this one depends on (transitive)."""
    path = self.trace(name)
    return [n for n in path if n != name]

downstream

downstream(name)

All models that depend on this one (transitive).

Source code in src/model_ledger/sdk/ledger.py
def downstream(self, name):
    """All models that depend on this one (transitive)."""
    self._resolve_model(name)
    visited = set()
    result = []

    def _walk(n):
        for dep in self.dependencies(n, direction="downstream"):
            child = dep["model"].name
            if child not in visited:
                visited.add(child)
                result.append(child)
                _walk(child)

    _walk(name)
    return result

register_group

register_group(
    *,
    name: str,
    owner: str,
    model_type: str,
    tier: str,
    purpose: str,
    members: list[str],
    actor: str,
    metadata: dict[str, Any] | None = None,
) -> ModelRef

Register a governed model group and link its members.

A group is a business-level entity that aggregates technical components. Members are linked via relationship="member_of".

Example

group = ledger.register_group( ... name="Credit Scorecard", owner="risk-team", ... model_type="ml_model", tier="high", ... purpose="Credit risk scoring pipeline", ... members=["feature_pipeline", "scoring_model", "alert_queue"], ... actor="system", ... )

Source code in src/model_ledger/sdk/ledger.py
def register_group(
    self,
    *,
    name: str,
    owner: str,
    model_type: str,
    tier: str,
    purpose: str,
    members: builtins.list[str],
    actor: str,
    metadata: dict[str, Any] | None = None,
) -> ModelRef:
    """Register a governed model group and link its members.

    A group is a business-level entity that aggregates technical components.
    Members are linked via relationship="member_of".

    Example:
        >>> group = ledger.register_group(
        ...     name="Credit Scorecard", owner="risk-team",
        ...     model_type="ml_model", tier="high",
        ...     purpose="Credit risk scoring pipeline",
        ...     members=["feature_pipeline", "scoring_model", "alert_queue"],
        ...     actor="system",
        ... )
    """
    ref = self.register(
        name=name,
        owner=owner,
        model_type=model_type,
        tier=tier,
        purpose=purpose,
        actor=actor,
        metadata=metadata,
    )
    for member in members or []:
        self.link_dependency(
            upstream=member,
            downstream=name,
            relationship="member_of",
            actor=actor,
        )
    return ref

members

members(group: ModelRef | str) -> list[ModelRef]

Return current members of this group.

Replays member_added/member_removed snapshots to determine current membership. Falls back to dependency links if no membership events exist (backward compatible with register_group).

Mixed case: groups seeded via register_group() that later have add_member()/remove_member() called use dependency links as the baseline and overlay the event log on top.

Source code in src/model_ledger/sdk/ledger.py
def members(self, group: ModelRef | str) -> builtins.list[ModelRef]:
    """Return current members of this group.

    Replays member_added/member_removed snapshots to determine
    current membership. Falls back to dependency links if no
    membership events exist (backward compatible with register_group).

    Mixed case: groups seeded via register_group() that later have
    add_member()/remove_member() called use dependency links as the
    baseline and overlay the event log on top.
    """
    ref = self._resolve_model(group)
    snaps = self._backend.list_snapshots(ref.model_hash)
    membership_events = [s for s in snaps if s.event_type in ("member_added", "member_removed")]

    # Seed from dependency links (covers register_group() seeded members
    # and is always correct as the initial universe of linked models).
    deps = self.dependencies(group, direction="upstream") or []
    current: dict[str, ModelRef] = {
        d["model"].model_hash: d["model"] for d in deps if d.get("relationship") == "member_of"
    }

    if not membership_events:
        # No events: dependency links are the full picture.
        return list(current.values())

    # Replay events on top of the dep-link baseline.
    for s in sorted(membership_events, key=lambda s: s.timestamp):
        member_hash = s.payload.get("member_hash", "")
        if s.event_type == "member_added":
            if member_hash not in current:
                try:
                    current[member_hash] = self.get(member_hash)
                except ModelNotFoundError:
                    member_name = s.payload.get("member_name", "")
                    if member_name:
                        try:
                            current[member_hash] = self.get(member_name)
                        except ModelNotFoundError:
                            continue
        elif s.event_type == "member_removed":
            current.pop(member_hash, None)
    return list(current.values())

groups

groups(model: ModelRef | str) -> list[ModelRef]

Return groups this model currently belongs to.

Replays member_added/member_removed events on each candidate group to exclude composites the model has been removed from.

Source code in src/model_ledger/sdk/ledger.py
def groups(self, model: ModelRef | str) -> builtins.list[ModelRef]:
    """Return groups this model currently belongs to.

    Replays member_added/member_removed events on each candidate group
    to exclude composites the model has been removed from.
    """
    deps = self.dependencies(model, direction="downstream") or []
    candidates = [d["model"] for d in deps if d.get("relationship") == "member_of"]
    ref = self._resolve_model(model)
    result: builtins.list[ModelRef] = []
    for comp in candidates:
        current_members = self.members(comp)
        if any(m.model_hash == ref.model_hash for m in current_members):
            result.append(comp)
    return result

add_member

add_member(
    composite: ModelRef | str,
    member: ModelRef | str,
    *,
    role: str | None = None,
    actor: str,
) -> Snapshot

Add a member to a composite.

Records a member_added snapshot on the composite and creates a member_of dependency link.

Source code in src/model_ledger/sdk/ledger.py
def add_member(
    self,
    composite: ModelRef | str,
    member: ModelRef | str,
    *,
    role: str | None = None,
    actor: str,
) -> Snapshot:
    """Add a member to a composite.

    Records a member_added snapshot on the composite and creates
    a member_of dependency link.
    """
    comp_ref = self._resolve_model(composite)
    mem_ref = self._resolve_model(member)
    self.link_dependency(
        upstream=mem_ref,
        downstream=comp_ref,
        relationship="member_of",
        actor=actor,
    )
    payload: dict[str, Any] = {
        "member_name": mem_ref.name,
        "member_hash": mem_ref.model_hash,
    }
    if role is not None:
        payload["role"] = role
    return self.record(
        comp_ref,
        event="member_added",
        payload=payload,
        actor=actor,
    )

remove_member

remove_member(
    composite: ModelRef | str,
    member: ModelRef | str,
    *,
    reason: str | None = None,
    actor: str,
) -> Snapshot

Remove a member from a composite.

Records a member_removed snapshot. The dependency link is NOT deleted (append-only event log). members() will exclude removed members by replaying member_added/member_removed events.

Source code in src/model_ledger/sdk/ledger.py
def remove_member(
    self,
    composite: ModelRef | str,
    member: ModelRef | str,
    *,
    reason: str | None = None,
    actor: str,
) -> Snapshot:
    """Remove a member from a composite.

    Records a member_removed snapshot. The dependency link is NOT deleted
    (append-only event log). members() will exclude removed members by
    replaying member_added/member_removed events.
    """
    comp_ref = self._resolve_model(composite)
    mem_ref = self._resolve_model(member)
    payload: dict[str, Any] = {
        "member_name": mem_ref.name,
        "member_hash": mem_ref.model_hash,
    }
    if reason is not None:
        payload["reason"] = reason
    return self.record(
        comp_ref,
        event="member_removed",
        payload=payload,
        actor=actor,
    )

record_observation

record_observation(
    composite: ModelRef | str,
    *,
    observation_id: str,
    observation: str,
    status: str,
    severity: str | None = None,
    actor: str,
    metadata: dict[str, Any] | None = None,
) -> Snapshot

Record an observation against a composite.

Source code in src/model_ledger/sdk/ledger.py
def record_observation(
    self,
    composite: ModelRef | str,
    *,
    observation_id: str,
    observation: str,
    status: str,
    severity: str | None = None,
    actor: str,
    metadata: dict[str, Any] | None = None,
) -> Snapshot:
    """Record an observation against a composite."""
    payload: dict[str, Any] = {
        "observation_id": observation_id,
        "observation": observation,
        "status": status,
    }
    if severity is not None:
        payload["severity"] = severity
    if metadata:
        payload.update(metadata)
    return self.record(composite, event="observation_issued", payload=payload, actor=actor)

resolve_observation

resolve_observation(
    composite: ModelRef | str,
    *,
    observation_id: str,
    resolution: str,
    actor: str,
    metadata: dict[str, Any] | None = None,
) -> Snapshot

Record the resolution of an observation.

Source code in src/model_ledger/sdk/ledger.py
def resolve_observation(
    self,
    composite: ModelRef | str,
    *,
    observation_id: str,
    resolution: str,
    actor: str,
    metadata: dict[str, Any] | None = None,
) -> Snapshot:
    """Record the resolution of an observation."""
    payload: dict[str, Any] = {
        "observation_id": observation_id,
        "resolution": resolution,
    }
    if metadata:
        payload.update(metadata)
    return self.record(composite, event="observation_resolved", payload=payload, actor=actor)

record_validation

record_validation(
    composite: ModelRef | str,
    *,
    result: str,
    actor: str,
    metadata: dict[str, Any] | None = None,
) -> Snapshot

Record a validation result against a composite.

Source code in src/model_ledger/sdk/ledger.py
def record_validation(
    self,
    composite: ModelRef | str,
    *,
    result: str,
    actor: str,
    metadata: dict[str, Any] | None = None,
) -> Snapshot:
    """Record a validation result against a composite."""
    payload: dict[str, Any] = {"result": result}
    if metadata:
        payload.update(metadata)
    return self.record(composite, event="validated", payload=payload, actor=actor)

open_observation_count staticmethod

open_observation_count(snapshots: list[Snapshot]) -> int

Return the number of observations that have been issued but not resolved.

Accepts any iterable of Snapshots. Only observation_issued and observation_resolved events that carry an observation_id payload key are considered; all other snapshots are ignored.

Source code in src/model_ledger/sdk/ledger.py
@staticmethod
def open_observation_count(snapshots: builtins.list[Snapshot]) -> int:
    """Return the number of observations that have been issued but not resolved.

    Accepts any iterable of Snapshots.  Only observation_issued and
    observation_resolved events that carry an ``observation_id`` payload key
    are considered; all other snapshots are ignored.
    """
    issued_ids: set[str] = set()
    resolved_ids: set[str] = set()
    for s in sorted(snapshots, key=lambda snap: snap.timestamp):
        obs_id = s.payload.get("observation_id")
        if not obs_id:
            continue
        if s.event_type == "observation_issued":
            issued_ids.add(obs_id)
        elif s.event_type == "observation_resolved":
            resolved_ids.add(obs_id)
    return len(issued_ids - resolved_ids)

composite_summary

composite_summary(
    model_types: list[str] | None = None,
) -> list[dict[str, Any]]

Flat inventory of all composites with derived fields.

By default, returns models whose model_type == "composite". Pass model_types=["composite", "ml_model", "heuristic"] (or any subset) to include other types the caller treats as composites.

Source code in src/model_ledger/sdk/ledger.py
def composite_summary(
    self,
    model_types: builtins.list[str] | None = None,
) -> builtins.list[dict[str, Any]]:
    """Flat inventory of all composites with derived fields.

    By default, returns models whose ``model_type == "composite"``. Pass
    ``model_types=["composite", "ml_model", "heuristic"]`` (or any subset)
    to include other types the caller treats as composites.
    """
    types_list = model_types or ["composite"]
    if hasattr(self._backend, "composite_summary"):
        result: builtins.list[dict[str, Any]] = self._backend.composite_summary(
            model_types=types_list
        )
        return result

    target_types = set(types_list)
    all_models = self._backend.list_models()
    composites = [m for m in all_models if m.model_type in target_types]
    result = []
    for comp in composites:
        snaps = self._backend.list_snapshots(comp.model_hash)
        member_count = len(self.members(comp))
        validated_snaps = [s for s in snaps if s.event_type == "validated"]
        last_validated = max(s.timestamp for s in validated_snaps) if validated_snaps else None
        result.append(
            {
                "name": comp.name,
                "owner": comp.owner,
                "tier": comp.tier,
                "status": comp.status,
                "model_type": comp.model_type,
                "member_count": member_count,
                "last_validated": last_validated,
                "open_observation_count": self.open_observation_count(snaps),
                "metadata": comp.metadata or {},
            }
        )
    return result

Data models

The event-log primitives. A model is a ModelRef; every change is a Snapshot; a Tag is a mutable pointer.

ModelRef

Bases: BaseModel

Regulatory identity — the minimum a regulator needs.

Snapshot

Bases: BaseModel

Immutable, content-addressed observation of a model.

Tag

Bases: BaseModel

Mutable pointer from a name to a snapshot.

Graph

DataNode dataclass

DataNode(
    name: str,
    platform: str = "",
    inputs: list[DataPort] = list(),
    outputs: list[DataPort] = list(),
    metadata: dict[str, Any] = dict(),
)

DataPort

DataPort(identifier: str, **schema: str)

A connection point. Acts like a string, carries optional schema for precision.

Source code in src/model_ledger/graph/models.py
def __init__(self, identifier: str, **schema: str) -> None:
    self.identifier = identifier.lower()
    self.schema = schema

Connectors

Factory functions that emit DataNodes from external sources. See Connectors & discovery for usage.

sql_connector

sql_connector(
    *,
    name: str,
    connection: Any,
    query: str,
    name_column: str,
    name_prefix: str = "",
    input_columns: list[str] | None = None,
    output_columns: list[str] | None = None,
    sql_column: str | None = None,
    sql_preprocessor: Callable[[str], str]
    | str
    | None = "default",
    shared_table_patterns: list[str] | None = None,
    shared_table_fallback: dict[str, str] | None = None,
    cron_column: str | None = None,
    input_port: dict[str, str] | None = None,
    output_port: dict[str, str] | None = None,
    metadata_columns: dict[str, str] | None = None,
) -> _SQLConnector

Create a SourceConnector that discovers models from a SQL query.

Parameters:

Name Type Description Default
name str

Platform name for discovered DataNodes.

required
connection Any

Database connection with execute() method.

required
query str

SQL query to run.

required
name_column str

Column containing the model name.

required
name_prefix str

Optional prefix for model names (e.g., "queue:").

''
input_columns list[str] | None

Columns containing input table/port identifiers.

None
output_columns list[str] | None

Columns containing output table/port identifiers.

None
sql_column str | None

Column containing SQL to parse for input/output tables.

None
sql_preprocessor Callable[[str], str] | str | None

Function to clean SQL before parsing (e.g., strip template variables). Default: strip_template_vars from model_ledger.adapters.sql. Pass None to disable preprocessing.

'default'
shared_table_patterns list[str] | None

When sql_column is set, table names matching any of these substrings will get model_name discriminators from the parsed SQL. Default: ["scores", "alert"].

None
shared_table_fallback dict[str, str] | None

When a write table matches shared_table_patterns but no model_name is found in the SQL, derive model_name from a column value. Dict with keys: source_column (required), strip_prefix (optional regex). Example: {"source_column": "NAME", "strip_prefix": "etl_"}

None
cron_column str | None

Column containing a cron expression. When set, adds both the raw cron and an English translation to metadata.

None
input_port dict[str, str] | None

Config dict with keys: column, fallback (optional), kind (optional).

None
output_port dict[str, str] | None

Config dict with keys: column, fallback (optional), kind (optional).

None
metadata_columns dict[str, str] | None

Explicit {metadata_key: column_name} mapping. If omitted, all unmapped columns become metadata automatically.

None

Returns:

Type Description
_SQLConnector

A SourceConnector with a discover() method.

Source code in src/model_ledger/connectors/sql.py
def sql_connector(
    *,
    name: str,
    connection: Any,
    query: str,
    name_column: str,
    name_prefix: str = "",
    input_columns: list[str] | None = None,
    output_columns: list[str] | None = None,
    sql_column: str | None = None,
    sql_preprocessor: Callable[[str], str] | str | None = "default",
    shared_table_patterns: list[str] | None = None,
    shared_table_fallback: dict[str, str] | None = None,
    cron_column: str | None = None,
    input_port: dict[str, str] | None = None,
    output_port: dict[str, str] | None = None,
    metadata_columns: dict[str, str] | None = None,
) -> _SQLConnector:
    """Create a SourceConnector that discovers models from a SQL query.

    Args:
        name: Platform name for discovered DataNodes.
        connection: Database connection with execute() method.
        query: SQL query to run.
        name_column: Column containing the model name.
        name_prefix: Optional prefix for model names (e.g., "queue:").
        input_columns: Columns containing input table/port identifiers.
        output_columns: Columns containing output table/port identifiers.
        sql_column: Column containing SQL to parse for input/output tables.
        sql_preprocessor: Function to clean SQL before parsing (e.g., strip template
            variables). Default: strip_template_vars from model_ledger.adapters.sql.
            Pass None to disable preprocessing.
        shared_table_patterns: When sql_column is set, table names matching any of
            these substrings will get model_name discriminators from the parsed SQL.
            Default: ["scores", "alert"].
        shared_table_fallback: When a write table matches shared_table_patterns but
            no model_name is found in the SQL, derive model_name from a column value.
            Dict with keys: source_column (required), strip_prefix (optional regex).
            Example: {"source_column": "NAME", "strip_prefix": "etl_"}
        cron_column: Column containing a cron expression. When set, adds both
            the raw cron and an English translation to metadata.
        input_port: Config dict with keys: column, fallback (optional), kind (optional).
        output_port: Config dict with keys: column, fallback (optional), kind (optional).
        metadata_columns: Explicit {metadata_key: column_name} mapping.
            If omitted, all unmapped columns become metadata automatically.

    Returns:
        A SourceConnector with a discover() method.
    """
    # Resolve default preprocessor
    resolved_preprocessor: Callable[[str], str] | None
    if sql_preprocessor == "default":
        from model_ledger.adapters.sql import strip_template_vars

        resolved_preprocessor = strip_template_vars
    elif callable(sql_preprocessor):
        resolved_preprocessor = sql_preprocessor
    else:
        resolved_preprocessor = None

    return _SQLConnector(
        name=name,
        connection=connection,
        query=query,
        name_column=name_column,
        name_prefix=name_prefix,
        input_columns=input_columns or [],
        output_columns=output_columns or [],
        sql_column=sql_column,
        sql_preprocessor=resolved_preprocessor,
        shared_table_patterns=shared_table_patterns
        if shared_table_patterns is not None
        else ["scores", "alert"],
        shared_table_fallback=shared_table_fallback,
        cron_column=cron_column,
        input_port=input_port,
        output_port=output_port,
        metadata_columns=metadata_columns,
    )

rest_connector

rest_connector(
    *,
    name: str,
    url: str,
    items_path: str,
    name_field: str,
    headers: dict[str, str] | None = None,
    input_fields: list[str] | None = None,
    output_fields: list[str] | None = None,
    metadata_fields: dict[str, str] | None = None,
    pagination: dict[str, str] | None = None,
) -> _RESTConnector

Create a SourceConnector that discovers models from a REST API.

Parameters:

Name Type Description Default
name str

Platform name for discovered DataNodes.

required
url str

API endpoint URL.

required
items_path str

Dot-path to the array of items in JSON response.

required
name_field str

Field containing the model name.

required
headers dict[str, str] | None

HTTP headers (auth goes here).

None
input_fields list[str] | None

Dot-paths to input identifiers.

None
output_fields list[str] | None

Dot-paths to output identifiers.

None
metadata_fields dict[str, str] | None

Explicit {metadata_key: field_path} mapping. If omitted, all unmapped fields become metadata automatically.

None
pagination dict[str, str] | None

Config dict with keys: type (token/offset), token_field, param.

None

Returns:

Type Description
_RESTConnector

A SourceConnector with a discover() method.

Source code in src/model_ledger/connectors/rest.py
def rest_connector(
    *,
    name: str,
    url: str,
    items_path: str,
    name_field: str,
    headers: dict[str, str] | None = None,
    input_fields: list[str] | None = None,
    output_fields: list[str] | None = None,
    metadata_fields: dict[str, str] | None = None,
    pagination: dict[str, str] | None = None,
) -> _RESTConnector:
    """Create a SourceConnector that discovers models from a REST API.

    Args:
        name: Platform name for discovered DataNodes.
        url: API endpoint URL.
        items_path: Dot-path to the array of items in JSON response.
        name_field: Field containing the model name.
        headers: HTTP headers (auth goes here).
        input_fields: Dot-paths to input identifiers.
        output_fields: Dot-paths to output identifiers.
        metadata_fields: Explicit {metadata_key: field_path} mapping.
            If omitted, all unmapped fields become metadata automatically.
        pagination: Config dict with keys: type (token/offset), token_field, param.

    Returns:
        A SourceConnector with a discover() method.
    """
    return _RESTConnector(
        name=name,
        url=url,
        items_path=items_path,
        name_field=name_field,
        headers=headers or {},
        input_fields=input_fields or [],
        output_fields=output_fields or [],
        metadata_fields=metadata_fields,
        pagination=pagination,
    )

github_connector

github_connector(
    *,
    name: str,
    repos: list[str],
    project_path: str,
    config_file: str,
    parser: Callable[[str, str], DataNode | None],
    token: str | None = None,
) -> _GitHubConnector

Create a SourceConnector that discovers models from GitHub repos.

For each repo, lists subdirectories under project_path, reads config_file from each, and passes the content to parser to produce DataNodes.

Parameters:

Name Type Description Default
name str

Platform name for discovered DataNodes.

required
repos list[str]

List of GitHub repos (org/repo format).

required
project_path str

Directory containing project subdirectories.

required
config_file str

Filename to read in each project directory.

required
parser Callable[[str, str], DataNode | None]

Function (project_name, file_content) -> DataNode | None.

required
token str | None

GitHub personal access token (optional).

None

Returns:

Type Description
_GitHubConnector

A SourceConnector with a discover() method.

Source code in src/model_ledger/connectors/github.py
def github_connector(
    *,
    name: str,
    repos: list[str],
    project_path: str,
    config_file: str,
    parser: Callable[[str, str], DataNode | None],
    token: str | None = None,
) -> _GitHubConnector:
    """Create a SourceConnector that discovers models from GitHub repos.

    For each repo, lists subdirectories under project_path, reads config_file
    from each, and passes the content to parser to produce DataNodes.

    Args:
        name: Platform name for discovered DataNodes.
        repos: List of GitHub repos (org/repo format).
        project_path: Directory containing project subdirectories.
        config_file: Filename to read in each project directory.
        parser: Function (project_name, file_content) -> DataNode | None.
        token: GitHub personal access token (optional).

    Returns:
        A SourceConnector with a discover() method.
    """
    return _GitHubConnector(
        name=name,
        repos=repos,
        project_path=project_path,
        config_file=config_file,
        parser=parser,
        token=token,
    )

Introspection

introspect

Model introspection — plugin-based metadata extraction.

ComponentInfo

Bases: BaseModel

Maps to a ComponentNode in the version's tree.

IntrospectionResult

Bases: BaseModel

Typed contract returned by all introspectors.

Introspector

Bases: Protocol

Extracts metadata from a model object or config.

get_registry

get_registry() -> IntrospectorRegistry

Get or create the global introspector registry.

Source code in src/model_ledger/introspect/registry.py
def get_registry() -> IntrospectorRegistry:
    """Get or create the global introspector registry."""
    global _registry
    if _registry is None:
        _registry = IntrospectorRegistry()
    return _registry

register_introspector

register_introspector(introspector: Introspector) -> None

Convenience function to register an introspector globally.

Source code in src/model_ledger/introspect/registry.py
def register_introspector(introspector: Introspector) -> None:
    """Convenience function to register an introspector globally."""
    get_registry().register(introspector)

register_introspector

register_introspector(introspector: Introspector) -> None

Register a custom introspector plugin.

Manually registered introspectors take priority over entry-point-discovered ones.

Parameters:

Name Type Description Default
introspector Introspector

An object implementing the Introspector protocol (must have name, can_handle, and introspect attributes).

required
Example

from model_ledger import register_introspector register_introspector(MyCustomIntrospector())

Source code in src/model_ledger/__init__.py
def register_introspector(introspector: Introspector) -> None:
    """Register a custom introspector plugin.

    Manually registered introspectors take priority over entry-point-discovered ones.

    Args:
        introspector: An object implementing the Introspector protocol
            (must have name, can_handle, and introspect attributes).

    Example:
        >>> from model_ledger import register_introspector
        >>> register_introspector(MyCustomIntrospector())
    """
    from model_ledger.introspect.registry import get_registry

    get_registry().register(introspector)