Skip to content

API Reference

Everything below is generated from the source at build time with mkdocstrings + Griffe. It reflects the exact installed version — there is no hand-maintained copy to fall out of date.

Ledger

The one object you'll use most. Every method is tool-shaped — usable directly, over REST, or as an MCP tool.

The main entry point for model-ledger.

Every public method is designed to work as an agent tool call: clear inputs, JSON-serializable outputs, no side effects beyond the ledger.

Source code in src/model_ledger/sdk/ledger.py
def __init__(self, backend: LedgerBackend | None = None) -> None:
    self._backend = backend or InMemoryLedgerBackend()
    self._name_cache: dict[str, ModelRef] = {}
    self._cache_complete = False  # True after bulk preload — skip individual lookups
    self._node_cache: list = []  # DataNodes from add() — reused by connect()

from_sqlite classmethod

from_sqlite(db_path: str) -> Ledger

Create a Ledger backed by a SQLite database.

Parameters:

Name Type Description Default
db_path str

Path to the SQLite database file. Created if it doesn't exist.

required
Example

ledger = Ledger.from_sqlite("./inventory.db")

Source code in src/model_ledger/sdk/ledger.py
@classmethod
def from_sqlite(cls, db_path: str) -> Ledger:
    """Create a Ledger backed by a SQLite database.

    Args:
        db_path: Path to the SQLite database file. Created if it doesn't exist.

    Example:
        >>> ledger = Ledger.from_sqlite("./inventory.db")
    """
    from model_ledger.backends.sqlite_ledger import SQLiteLedgerBackend

    return cls(SQLiteLedgerBackend(db_path))

from_snowflake classmethod

from_snowflake(
    connection: Any, schema: str = "MODEL_LEDGER"
) -> Ledger

Create a Ledger backed by Snowflake.

Parameters:

Name Type Description Default
connection Any

A Snowflake connection (snowflake.connector connection object).

required
schema str

Fully qualified schema name (e.g., "MY_DB.MODEL_LEDGER").

'MODEL_LEDGER'
Example

ledger = Ledger.from_snowflake(conn, schema="ANALYTICS.MODEL_LEDGER")

Source code in src/model_ledger/sdk/ledger.py
@classmethod
def from_snowflake(cls, connection: Any, schema: str = "MODEL_LEDGER") -> Ledger:
    """Create a Ledger backed by Snowflake.

    Args:
        connection: A Snowflake connection (snowflake.connector connection object).
        schema: Fully qualified schema name (e.g., "MY_DB.MODEL_LEDGER").

    Example:
        >>> ledger = Ledger.from_snowflake(conn, schema="ANALYTICS.MODEL_LEDGER")
    """
    from model_ledger.backends.snowflake import SnowflakeLedgerBackend

    return cls(SnowflakeLedgerBackend(connection=connection, schema=schema))

dependencies

dependencies(
    model: ModelRef | str,
    direction: str = "both",
    *,
    snapshots: list[Snapshot] | None = None,
) -> list[dict[str, Any]]

Direct dependency edges for a model.

Resolves every edge's target model in ONE batched lookup instead of a per-edge round trip. Pass snapshots (the model's full history) to reuse an already-fetched list and skip the list_snapshots call — the graph traversal and investigate use this to avoid refetching.

Source code in src/model_ledger/sdk/ledger.py
def dependencies(
    self,
    model: ModelRef | str,
    direction: str = "both",
    *,
    snapshots: builtins.list[Snapshot] | None = None,
) -> builtins.list[dict[str, Any]]:
    """Direct dependency edges for a model.

    Resolves every edge's target model in ONE batched lookup instead of a
    per-edge round trip. Pass ``snapshots`` (the model's full history) to
    reuse an already-fetched list and skip the ``list_snapshots`` call —
    the graph traversal and ``investigate`` use this to avoid refetching.
    """
    ref = self._resolve_model(model)
    snaps = snapshots if snapshots is not None else self._backend.list_snapshots(ref.model_hash)

    # Collect edges first, then resolve all target hashes in one batch.
    # Each edge: (direction, target_hash, relationship)
    edges: builtins.list[tuple[str, str, str]] = []
    if direction in ("upstream", "both"):
        for s in snaps:
            if s.event_type == "depends_on":
                h = s.payload.get("upstream_hash")
                if h:
                    edges.append(("upstream", h, s.payload.get("relationship", "depends_on")))
    if direction in ("downstream", "both"):
        for s in snaps:
            if s.event_type == "has_dependent":
                h = s.payload.get("downstream_hash")
                if h:
                    edges.append(("downstream", h, s.payload.get("relationship", "depends_on")))

    resolved = self._resolve_hashes([h for _, h, _ in edges])

    result: builtins.list[dict[str, Any]] = []
    for edge_direction, target_hash, relationship in edges:
        target = resolved.get(target_hash)
        if target is None:
            continue
        result.append(
            {
                "model": target,
                "relationship": relationship,
                "direction": edge_direction,
            }
        )
    return result

add

add(nodes: DataNode | list[DataNode]) -> AddResult

Register DataNodes. Each becomes a ModelRef + discovered Snapshot.

Skips writing if the discovered payload is identical to the last snapshot (content-hash dedup). Preloads existing models in bulk to avoid per-node queries.

Recognized node.metadata keys map onto the model row: owner, model_type/node_type/type, tier, purpose, model_origin, and status. A discovered status (any ModelStatus value, case-insensitive) is propagated to the model — including already-registered models — so lifecycle changes detected at the source (e.g. deprecated for an entity deleted upstream) reach the model row on the next sync. An absent or unrecognized status leaves the stored status unchanged.

Source code in src/model_ledger/sdk/ledger.py
def add(self, nodes: DataNode | builtins.list[DataNode]) -> AddResult:
    """Register DataNodes. Each becomes a ModelRef + discovered Snapshot.

    Skips writing if the discovered payload is identical to the last snapshot
    (content-hash dedup). Preloads existing models in bulk to avoid per-node queries.

    Recognized ``node.metadata`` keys map onto the model row: ``owner``,
    ``model_type``/``node_type``/``type``, ``tier``, ``purpose``,
    ``model_origin``, and ``status``. A discovered ``status`` (any
    ``ModelStatus`` value, case-insensitive) is propagated to the model —
    including already-registered models — so lifecycle changes detected at
    the source (e.g. ``deprecated`` for an entity deleted upstream) reach
    the model row on the next sync. An absent or unrecognized status leaves
    the stored status unchanged.
    """
    import hashlib
    import json

    from model_ledger.graph.models import DataNode

    if isinstance(nodes, DataNode):
        nodes = [nodes]
    self._node_cache.extend(nodes)

    # Bulk preload — one query for all models, one for all snapshot hashes
    if not self._cache_complete:
        for m in self._backend.list_models():
            self._name_cache[m.name] = m
        self._cache_complete = True

    # Preload last discovered snapshot content hashes for dedup.
    # We store _content_hash in the payload itself, so we can read just that field.
    existing_hashes: dict[str, str] = {}
    if hasattr(self._backend, "list_snapshot_content_hashes"):
        existing_hashes = self._backend.list_snapshot_content_hashes(event_type="discovered")
    elif hasattr(self._backend, "list_all_snapshots"):
        for s in self._backend.list_all_snapshots(event_type="discovered"):
            h = s.payload.get("_content_hash")
            if h:
                existing_hashes[s.model_hash] = h

    added = 0
    skipped = 0
    for node in nodes:
        node_status = _normalize_status(node.metadata.get("status"))
        ref = self.register(
            name=node.name,
            owner=node.metadata.get("owner") or "unknown",
            model_type=node.metadata.get("model_type")
            or node.metadata.get("node_type")
            or node.metadata.get("type")
            or "unknown",
            tier=node.metadata.get("tier") or "unclassified",
            purpose=node.metadata.get("purpose") or "",
            model_origin=node.metadata.get("model_origin") or "internal",
            status=node_status or "active",
            actor=f"connector:{node.platform}" if node.platform else "system",
        )
        payload = {
            "platform": node.platform,
            "inputs": [{"identifier": p.identifier, **p.schema} for p in node.inputs],
            "outputs": [{"identifier": p.identifier, **p.schema} for p in node.outputs],
            **{
                k: v
                for k, v in node.metadata.items()
                if k
                not in (
                    "owner",
                    "node_type",
                    "tier",
                    "purpose",
                    "model_origin",
                    "source_updated_at",
                )
            },
        }

        # Content-hash dedup: skip if payload unchanged.
        # Exclude volatile fields (timestamps change between runs without
        # the model actually changing).
        _VOLATILE = {
            "created_at",
            "updated_at",
            "source_updated_at",
            "_content_hash",
            "change_detected",
            "change_occurred",
        }
        stable_payload = {k: v for k, v in payload.items() if k not in _VOLATILE}
        content_hash = hashlib.sha256(
            json.dumps(stable_payload, sort_keys=True, default=str).encode()
        ).hexdigest()
        # Update last_seen on every run, even if unchanged
        ref.last_seen = datetime.now(timezone.utc)
        # Propagate a discovered status onto the model row. register()
        # returns existing refs unchanged, so a connector-derived status
        # must be applied here for update_model() to persist it. This runs
        # before the dedup check so the row self-corrects even when the
        # snapshot is skipped as unchanged.
        if node_status is not None and node_status != ref.status:
            ref.status = node_status
        self._backend.update_model(ref)

        if existing_hashes.get(ref.model_hash) == content_hash:
            skipped += 1
            continue

        payload["_content_hash"] = content_hash
        payload["change_detected"] = datetime.now(timezone.utc).isoformat()
        if node.metadata.get("source_updated_at"):
            payload["change_occurred"] = node.metadata["source_updated_at"]
        self.record(
            ref,
            event="discovered",
            payload=payload,
            actor=f"connector:{node.platform}" if node.platform else "system",
        )
        added += 1

    return {"added": added, "skipped": skipped}

connect

connect() -> ConnectResult

Match output ports to input ports. Write only new dependency links.

Uses cached nodes from add() if available (avoids re-reading from backend). Preloads existing edges to skip duplicates.

Source code in src/model_ledger/sdk/ledger.py
def connect(self) -> ConnectResult:
    """Match output ports to input ports. Write only new dependency links.

    Uses cached nodes from add() if available (avoids re-reading from backend).
    Preloads existing edges to skip duplicates.
    """
    from collections import defaultdict

    # Use cached nodes from add() if available, otherwise load from backend
    nodes = self._node_cache if self._node_cache else self._load_discovered_nodes()

    # Build output port index
    output_index = defaultdict(list)
    for node in nodes:
        for port in node.outputs:
            output_index[port.identifier].append((node, port))

    # Preload existing edges to skip duplicates
    existing_edges: set[tuple[str, str]] = set()
    if hasattr(self._backend, "list_all_snapshots"):
        hash_to_name = {ref.model_hash: name for name, ref in self._name_cache.items()}
        for s in self._backend.list_all_snapshots(event_type="depends_on"):
            upstream = s.payload.get("upstream")
            downstream = hash_to_name.get(s.model_hash)
            if upstream and downstream:
                existing_edges.add((upstream, downstream))

    # Match ports and write only new edges
    links_created = 0
    links_skipped = 0
    seen = set()
    for node in nodes:
        for in_port in node.inputs:
            for upstream_node, out_port in output_index.get(in_port.identifier, []):
                if upstream_node.name == node.name:
                    continue
                if out_port != in_port:
                    continue
                key = (upstream_node.name, node.name)
                if key in seen:
                    continue
                seen.add(key)
                if key in existing_edges:
                    links_skipped += 1
                    continue
                try:
                    self.link_dependency(
                        upstream=upstream_node.name,
                        downstream=node.name,
                        relationship="data_flow",
                        actor="graph_builder",
                        metadata={
                            "via": in_port.identifier,
                            "via_schema": in_port.schema if in_port.schema else None,
                        },
                    )
                    links_created += 1
                except ModelNotFoundError:
                    continue
    return {"links_created": links_created, "links_skipped": links_skipped}

trace

trace(name: str) -> list[str]

Topological path from sources to this node.

Source code in src/model_ledger/sdk/ledger.py
def trace(self, name: str) -> builtins.list[str]:
    """Topological path from sources to this node."""
    self._resolve_model(name)
    visited = set()
    order = []

    def _walk(n):
        if n in visited:
            return
        visited.add(n)
        for dep in self.dependencies(n, direction="upstream"):
            _walk(dep["model"].name)
        order.append(n)

    _walk(name)
    return order

upstream

upstream(name: str) -> list[str]

All models this one depends on (transitive).

Source code in src/model_ledger/sdk/ledger.py
def upstream(self, name: str) -> builtins.list[str]:
    """All models this one depends on (transitive)."""
    path = self.trace(name)
    return [n for n in path if n != name]

downstream

downstream(name: str) -> list[str]

All models that depend on this one (transitive).

Source code in src/model_ledger/sdk/ledger.py
def downstream(self, name: str) -> builtins.list[str]:
    """All models that depend on this one (transitive)."""
    self._resolve_model(name)
    visited = set()
    result = []

    def _walk(n):
        for dep in self.dependencies(n, direction="downstream"):
            child = dep["model"].name
            if child not in visited:
                visited.add(child)
                result.append(child)
                _walk(child)

    _walk(name)
    return result

register_group

register_group(
    *,
    name: str,
    owner: str,
    model_type: str,
    tier: str,
    purpose: str,
    members: list[str],
    actor: str,
    metadata: dict[str, Any] | None = None,
) -> ModelRef

Register a governed model group and link its members.

A group is a business-level entity that aggregates technical components. Members are linked via relationship="member_of".

Example

group = ledger.register_group( ... name="Credit Scorecard", owner="risk-team", ... model_type="ml_model", tier="high", ... purpose="Credit risk scoring pipeline", ... members=["feature_pipeline", "scoring_model", "alert_queue"], ... actor="system", ... )

Source code in src/model_ledger/sdk/ledger.py
def register_group(
    self,
    *,
    name: str,
    owner: str,
    model_type: str,
    tier: str,
    purpose: str,
    members: builtins.list[str],
    actor: str,
    metadata: dict[str, Any] | None = None,
) -> ModelRef:
    """Register a governed model group and link its members.

    A group is a business-level entity that aggregates technical components.
    Members are linked via relationship="member_of".

    Example:
        >>> group = ledger.register_group(
        ...     name="Credit Scorecard", owner="risk-team",
        ...     model_type="ml_model", tier="high",
        ...     purpose="Credit risk scoring pipeline",
        ...     members=["feature_pipeline", "scoring_model", "alert_queue"],
        ...     actor="system",
        ... )
    """
    ref = self.register(
        name=name,
        owner=owner,
        model_type=model_type,
        tier=tier,
        purpose=purpose,
        actor=actor,
        metadata=metadata,
    )
    for member in members or []:
        self.link_dependency(
            upstream=member,
            downstream=name,
            relationship="member_of",
            actor=actor,
        )
    return ref

members

members(
    group: ModelRef | str,
    *,
    snapshots: list[Snapshot] | None = None,
) -> list[ModelRef]

Return current members of this group.

Replays member_added/member_removed snapshots to determine current membership. Falls back to dependency links if no membership events exist (backward compatible with register_group).

Mixed case: groups seeded via register_group() that later have add_member()/remove_member() called use dependency links as the baseline and overlay the event log on top.

All member_added targets are resolved in a single batched lookup instead of one round trip per event. Pass snapshots (the group's full history) to reuse an already-fetched list.

Source code in src/model_ledger/sdk/ledger.py
def members(
    self,
    group: ModelRef | str,
    *,
    snapshots: builtins.list[Snapshot] | None = None,
) -> builtins.list[ModelRef]:
    """Return current members of this group.

    Replays member_added/member_removed snapshots to determine
    current membership. Falls back to dependency links if no
    membership events exist (backward compatible with register_group).

    Mixed case: groups seeded via register_group() that later have
    add_member()/remove_member() called use dependency links as the
    baseline and overlay the event log on top.

    All member_added targets are resolved in a single batched lookup
    instead of one round trip per event. Pass ``snapshots`` (the group's
    full history) to reuse an already-fetched list.
    """
    ref = self._resolve_model(group)
    snaps = snapshots if snapshots is not None else self._backend.list_snapshots(ref.model_hash)
    membership_events = [s for s in snaps if s.event_type in ("member_added", "member_removed")]

    # Seed from dependency links (covers register_group() seeded members
    # and is always correct as the initial universe of linked models).
    # Reuse the snapshots we already have — dependency edges live in the
    # same history.
    deps = self.dependencies(group, direction="upstream", snapshots=snaps) or []
    current: dict[str, ModelRef] = {
        d["model"].model_hash: d["model"] for d in deps if d.get("relationship") == "member_of"
    }

    if not membership_events:
        # No events: dependency links are the full picture.
        return list(current.values())

    ordered_events = sorted(membership_events, key=lambda s: s.timestamp)

    # Batch-resolve every member_added hash not already seeded, in one
    # round trip. The name fallback (for the rare unresolvable hash) stays
    # per-event but only fires when the bulk lookup misses.
    added_hashes = [
        s.payload.get("member_hash", "")
        for s in ordered_events
        if s.event_type == "member_added" and s.payload.get("member_hash", "") not in current
    ]
    resolved = self._resolve_hashes([h for h in added_hashes if h])

    # Replay events on top of the dep-link baseline.
    for s in ordered_events:
        member_hash = s.payload.get("member_hash", "")
        if s.event_type == "member_added":
            if member_hash not in current:
                ref_for_member = resolved.get(member_hash)
                if ref_for_member is not None:
                    current[member_hash] = ref_for_member
                else:
                    member_name = s.payload.get("member_name", "")
                    if member_name:
                        try:
                            current[member_hash] = self.get(member_name)
                        except ModelNotFoundError:
                            continue
        elif s.event_type == "member_removed":
            current.pop(member_hash, None)
    return list(current.values())

groups

groups(
    model: ModelRef | str,
    *,
    snapshots: list[Snapshot] | None = None,
) -> list[ModelRef]

Return groups this model currently belongs to.

Replays member_added/member_removed events on each candidate group to exclude composites the model has been removed from. The candidate composites' membership histories are fetched in a single bulk list_all_snapshots scan when the backend supports it, so the cost is one query for the whole fan-out rather than one per candidate.

Pass snapshots (this model's full history) to reuse an already-fetched list for the downstream-edge discovery.

Source code in src/model_ledger/sdk/ledger.py
def groups(
    self,
    model: ModelRef | str,
    *,
    snapshots: builtins.list[Snapshot] | None = None,
) -> builtins.list[ModelRef]:
    """Return groups this model currently belongs to.

    Replays member_added/member_removed events on each candidate group
    to exclude composites the model has been removed from. The candidate
    composites' membership histories are fetched in a single bulk
    ``list_all_snapshots`` scan when the backend supports it, so the cost
    is one query for the whole fan-out rather than one per candidate.

    Pass ``snapshots`` (this model's full history) to reuse an
    already-fetched list for the downstream-edge discovery.
    """
    deps = self.dependencies(model, direction="downstream", snapshots=snapshots) or []
    candidates = [d["model"] for d in deps if d.get("relationship") == "member_of"]
    if not candidates:
        return []
    ref = self._resolve_model(model)

    # Resolve each candidate's current members. Prefer a single bulk
    # snapshot scan over per-candidate list_snapshots round trips.
    snaps_by_group = self._membership_snapshots({c.model_hash for c in candidates})

    result: builtins.list[ModelRef] = []
    for comp in candidates:
        comp_snaps = snaps_by_group.get(comp.model_hash)
        current_members = self.members(comp, snapshots=comp_snaps)
        if any(m.model_hash == ref.model_hash for m in current_members):
            result.append(comp)
    return result

add_member

add_member(
    composite: ModelRef | str,
    member: ModelRef | str,
    *,
    role: str | None = None,
    actor: str,
) -> Snapshot

Add a member to a composite.

Records a member_added snapshot on the composite and creates a member_of dependency link.

Source code in src/model_ledger/sdk/ledger.py
def add_member(
    self,
    composite: ModelRef | str,
    member: ModelRef | str,
    *,
    role: str | None = None,
    actor: str,
) -> Snapshot:
    """Add a member to a composite.

    Records a member_added snapshot on the composite and creates
    a member_of dependency link.
    """
    comp_ref = self._resolve_model(composite)
    mem_ref = self._resolve_model(member)
    self.link_dependency(
        upstream=mem_ref,
        downstream=comp_ref,
        relationship="member_of",
        actor=actor,
    )
    payload: dict[str, Any] = {
        "member_name": mem_ref.name,
        "member_hash": mem_ref.model_hash,
    }
    if role is not None:
        payload["role"] = role
    return self.record(
        comp_ref,
        event="member_added",
        payload=payload,
        actor=actor,
    )

remove_member

remove_member(
    composite: ModelRef | str,
    member: ModelRef | str,
    *,
    reason: str | None = None,
    actor: str,
) -> Snapshot

Remove a member from a composite.

Records a member_removed snapshot. The dependency link is NOT deleted (append-only event log). members() will exclude removed members by replaying member_added/member_removed events.

Source code in src/model_ledger/sdk/ledger.py
def remove_member(
    self,
    composite: ModelRef | str,
    member: ModelRef | str,
    *,
    reason: str | None = None,
    actor: str,
) -> Snapshot:
    """Remove a member from a composite.

    Records a member_removed snapshot. The dependency link is NOT deleted
    (append-only event log). members() will exclude removed members by
    replaying member_added/member_removed events.
    """
    comp_ref = self._resolve_model(composite)
    mem_ref = self._resolve_model(member)
    payload: dict[str, Any] = {
        "member_name": mem_ref.name,
        "member_hash": mem_ref.model_hash,
    }
    if reason is not None:
        payload["reason"] = reason
    return self.record(
        comp_ref,
        event="member_removed",
        payload=payload,
        actor=actor,
    )

record_observation

record_observation(
    composite: ModelRef | str,
    *,
    observation_id: str,
    observation: str,
    status: str,
    severity: str | None = None,
    actor: str,
    metadata: dict[str, Any] | None = None,
) -> Snapshot

Record an observation against a composite.

Source code in src/model_ledger/sdk/ledger.py
def record_observation(
    self,
    composite: ModelRef | str,
    *,
    observation_id: str,
    observation: str,
    status: str,
    severity: str | None = None,
    actor: str,
    metadata: dict[str, Any] | None = None,
) -> Snapshot:
    """Record an observation against a composite."""
    payload: dict[str, Any] = {
        "observation_id": observation_id,
        "observation": observation,
        "status": status,
    }
    if severity is not None:
        payload["severity"] = severity
    if metadata:
        payload.update(metadata)
    return self.record(composite, event="observation_issued", payload=payload, actor=actor)

resolve_observation

resolve_observation(
    composite: ModelRef | str,
    *,
    observation_id: str,
    resolution: str,
    actor: str,
    metadata: dict[str, Any] | None = None,
) -> Snapshot

Record the resolution of an observation.

Source code in src/model_ledger/sdk/ledger.py
def resolve_observation(
    self,
    composite: ModelRef | str,
    *,
    observation_id: str,
    resolution: str,
    actor: str,
    metadata: dict[str, Any] | None = None,
) -> Snapshot:
    """Record the resolution of an observation."""
    payload: dict[str, Any] = {
        "observation_id": observation_id,
        "resolution": resolution,
    }
    if metadata:
        payload.update(metadata)
    return self.record(composite, event="observation_resolved", payload=payload, actor=actor)

record_validation

record_validation(
    composite: ModelRef | str,
    *,
    result: str,
    actor: str,
    metadata: dict[str, Any] | None = None,
) -> Snapshot

Record a validation result against a composite.

Source code in src/model_ledger/sdk/ledger.py
def record_validation(
    self,
    composite: ModelRef | str,
    *,
    result: str,
    actor: str,
    metadata: dict[str, Any] | None = None,
) -> Snapshot:
    """Record a validation result against a composite."""
    payload: dict[str, Any] = {"result": result}
    if metadata:
        payload.update(metadata)
    return self.record(composite, event="validated", payload=payload, actor=actor)

open_observation_count staticmethod

open_observation_count(snapshots: list[Snapshot]) -> int

Return the number of observations that have been issued but not resolved.

Accepts any iterable of Snapshots. Only observation_issued and observation_resolved events that carry an observation_id payload key are considered; all other snapshots are ignored.

Source code in src/model_ledger/sdk/ledger.py
@staticmethod
def open_observation_count(snapshots: builtins.list[Snapshot]) -> int:
    """Return the number of observations that have been issued but not resolved.

    Accepts any iterable of Snapshots.  Only observation_issued and
    observation_resolved events that carry an ``observation_id`` payload key
    are considered; all other snapshots are ignored.
    """
    issued_ids: set[str] = set()
    resolved_ids: set[str] = set()
    for s in sorted(snapshots, key=lambda snap: snap.timestamp):
        obs_id = s.payload.get("observation_id")
        if not obs_id:
            continue
        if s.event_type == "observation_issued":
            issued_ids.add(obs_id)
        elif s.event_type == "observation_resolved":
            resolved_ids.add(obs_id)
    return len(issued_ids - resolved_ids)

composite_summary

composite_summary(
    model_types: list[str] | None = None,
) -> list[dict[str, Any]]

Flat inventory of all composites with derived fields.

By default, returns models whose model_type == "composite". Pass model_types=["composite", "ml_model", "heuristic"] (or any subset) to include other types the caller treats as composites.

Source code in src/model_ledger/sdk/ledger.py
def composite_summary(
    self,
    model_types: builtins.list[str] | None = None,
) -> builtins.list[dict[str, Any]]:
    """Flat inventory of all composites with derived fields.

    By default, returns models whose ``model_type == "composite"``. Pass
    ``model_types=["composite", "ml_model", "heuristic"]`` (or any subset)
    to include other types the caller treats as composites.
    """
    types_list = model_types or ["composite"]
    if hasattr(self._backend, "composite_summary"):
        result: builtins.list[dict[str, Any]] = self._backend.composite_summary(
            model_types=types_list
        )
        return result

    target_types = set(types_list)
    all_models = self._backend.list_models()
    composites = [m for m in all_models if m.model_type in target_types]
    result = []
    for comp in composites:
        snaps = self._backend.list_snapshots(comp.model_hash)
        member_count = len(self.members(comp))
        validated_snaps = [s for s in snaps if s.event_type == "validated"]
        last_validated = max(s.timestamp for s in validated_snaps) if validated_snaps else None
        result.append(
            {
                "name": comp.name,
                "owner": comp.owner,
                "tier": comp.tier,
                "status": comp.status,
                "model_type": comp.model_type,
                "member_count": member_count,
                "last_validated": last_validated,
                "open_observation_count": self.open_observation_count(snaps),
                "metadata": comp.metadata or {},
            }
        )
    return result

Data models

The event-log primitives. A model is a ModelRef; every change is a Snapshot; a Tag is a mutable pointer.

ModelRef

Bases: BaseModel

Regulatory identity — the minimum a regulator needs.

Snapshot

Bases: BaseModel

Immutable, content-addressed observation of a model.

Tag

Bases: BaseModel

Mutable pointer from a name to a snapshot.

Graph

DataNode dataclass

DataNode(
    name: str,
    platform: str = "",
    inputs: list[DataPort] = list(),
    outputs: list[DataPort] = list(),
    metadata: dict[str, Any] = dict(),
)

DataPort

DataPort(identifier: str, **schema: str)

A connection point. Acts like a string, carries optional schema for precision.

Source code in src/model_ledger/graph/models.py
def __init__(self, identifier: str, **schema: str) -> None:
    self.identifier = identifier.lower()
    self.schema = schema

Connectors

Factory functions that emit DataNodes from external sources. See Connectors & discovery for usage.

sql_connector

sql_connector(
    *,
    name: str,
    connection: Any,
    query: str,
    name_column: str,
    name_prefix: str = "",
    input_columns: list[str] | None = None,
    output_columns: list[str] | None = None,
    sql_column: str | None = None,
    sql_preprocessor: Callable[[str], str]
    | str
    | None = "default",
    shared_table_patterns: list[str] | None = None,
    shared_table_fallback: dict[str, str] | None = None,
    cron_column: str | None = None,
    input_port: dict[str, str] | None = None,
    output_port: dict[str, str] | None = None,
    metadata_columns: dict[str, str] | None = None,
) -> _SQLConnector

Create a SourceConnector that discovers models from a SQL query.

Parameters:

Name Type Description Default
name str

Platform name for discovered DataNodes.

required
connection Any

Database connection with execute() method.

required
query str

SQL query to run.

required
name_column str

Column containing the model name.

required
name_prefix str

Optional prefix for model names (e.g., "queue:").

''
input_columns list[str] | None

Columns containing input table/port identifiers.

None
output_columns list[str] | None

Columns containing output table/port identifiers.

None
sql_column str | None

Column containing SQL to parse for input/output tables.

None
sql_preprocessor Callable[[str], str] | str | None

Function to clean SQL before parsing (e.g., strip template variables). Default: strip_template_vars from model_ledger.adapters.sql. Pass None to disable preprocessing.

'default'
shared_table_patterns list[str] | None

When sql_column is set, table names matching any of these substrings will get model_name discriminators from the parsed SQL. Default: ["scores", "alert"].

None
shared_table_fallback dict[str, str] | None

When a write table matches shared_table_patterns but no model_name is found in the SQL, derive model_name from a column value. Dict with keys: source_column (required), strip_prefix (optional regex). Example: {"source_column": "NAME", "strip_prefix": "etl_"}

None
cron_column str | None

Column containing a cron expression. When set, adds both the raw cron and an English translation to metadata.

None
input_port dict[str, str] | None

Config dict with keys: column, fallback (optional), kind (optional).

None
output_port dict[str, str] | None

Config dict with keys: column, fallback (optional), kind (optional).

None
metadata_columns dict[str, str] | None

Explicit {metadata_key: column_name} mapping. If omitted, all unmapped columns become metadata automatically.

None

Returns:

Type Description
_SQLConnector

A SourceConnector with a discover() method.

Source code in src/model_ledger/connectors/sql.py
def sql_connector(
    *,
    name: str,
    connection: Any,
    query: str,
    name_column: str,
    name_prefix: str = "",
    input_columns: list[str] | None = None,
    output_columns: list[str] | None = None,
    sql_column: str | None = None,
    sql_preprocessor: Callable[[str], str] | str | None = "default",
    shared_table_patterns: list[str] | None = None,
    shared_table_fallback: dict[str, str] | None = None,
    cron_column: str | None = None,
    input_port: dict[str, str] | None = None,
    output_port: dict[str, str] | None = None,
    metadata_columns: dict[str, str] | None = None,
) -> _SQLConnector:
    """Create a SourceConnector that discovers models from a SQL query.

    Args:
        name: Platform name for discovered DataNodes.
        connection: Database connection with execute() method.
        query: SQL query to run.
        name_column: Column containing the model name.
        name_prefix: Optional prefix for model names (e.g., "queue:").
        input_columns: Columns containing input table/port identifiers.
        output_columns: Columns containing output table/port identifiers.
        sql_column: Column containing SQL to parse for input/output tables.
        sql_preprocessor: Function to clean SQL before parsing (e.g., strip template
            variables). Default: strip_template_vars from model_ledger.adapters.sql.
            Pass None to disable preprocessing.
        shared_table_patterns: When sql_column is set, table names matching any of
            these substrings will get model_name discriminators from the parsed SQL.
            Default: ["scores", "alert"].
        shared_table_fallback: When a write table matches shared_table_patterns but
            no model_name is found in the SQL, derive model_name from a column value.
            Dict with keys: source_column (required), strip_prefix (optional regex).
            Example: {"source_column": "NAME", "strip_prefix": "etl_"}
        cron_column: Column containing a cron expression. When set, adds both
            the raw cron and an English translation to metadata.
        input_port: Config dict with keys: column, fallback (optional), kind (optional).
        output_port: Config dict with keys: column, fallback (optional), kind (optional).
        metadata_columns: Explicit {metadata_key: column_name} mapping.
            If omitted, all unmapped columns become metadata automatically.

    Returns:
        A SourceConnector with a discover() method.
    """
    # Resolve default preprocessor
    resolved_preprocessor: Callable[[str], str] | None
    if sql_preprocessor == "default":
        from model_ledger.adapters.sql import strip_template_vars

        resolved_preprocessor = strip_template_vars
    elif callable(sql_preprocessor):
        resolved_preprocessor = sql_preprocessor
    else:
        resolved_preprocessor = None

    return _SQLConnector(
        name=name,
        connection=connection,
        query=query,
        name_column=name_column,
        name_prefix=name_prefix,
        input_columns=input_columns or [],
        output_columns=output_columns or [],
        sql_column=sql_column,
        sql_preprocessor=resolved_preprocessor,
        shared_table_patterns=shared_table_patterns
        if shared_table_patterns is not None
        else ["scores", "alert"],
        shared_table_fallback=shared_table_fallback,
        cron_column=cron_column,
        input_port=input_port,
        output_port=output_port,
        metadata_columns=metadata_columns,
    )

rest_connector

rest_connector(
    *,
    name: str,
    url: str,
    items_path: str,
    name_field: str,
    headers: dict[str, str] | None = None,
    input_fields: list[str] | None = None,
    output_fields: list[str] | None = None,
    metadata_fields: dict[str, str] | None = None,
    pagination: dict[str, str] | None = None,
) -> _RESTConnector

Create a SourceConnector that discovers models from a REST API.

Parameters:

Name Type Description Default
name str

Platform name for discovered DataNodes.

required
url str

API endpoint URL.

required
items_path str

Dot-path to the array of items in JSON response.

required
name_field str

Field containing the model name.

required
headers dict[str, str] | None

HTTP headers (auth goes here).

None
input_fields list[str] | None

Dot-paths to input identifiers.

None
output_fields list[str] | None

Dot-paths to output identifiers.

None
metadata_fields dict[str, str] | None

Explicit {metadata_key: field_path} mapping. If omitted, all unmapped fields become metadata automatically.

None
pagination dict[str, str] | None

Config dict with keys: type (token/offset), token_field, param.

None

Returns:

Type Description
_RESTConnector

A SourceConnector with a discover() method.

Source code in src/model_ledger/connectors/rest.py
def rest_connector(
    *,
    name: str,
    url: str,
    items_path: str,
    name_field: str,
    headers: dict[str, str] | None = None,
    input_fields: list[str] | None = None,
    output_fields: list[str] | None = None,
    metadata_fields: dict[str, str] | None = None,
    pagination: dict[str, str] | None = None,
) -> _RESTConnector:
    """Create a SourceConnector that discovers models from a REST API.

    Args:
        name: Platform name for discovered DataNodes.
        url: API endpoint URL.
        items_path: Dot-path to the array of items in JSON response.
        name_field: Field containing the model name.
        headers: HTTP headers (auth goes here).
        input_fields: Dot-paths to input identifiers.
        output_fields: Dot-paths to output identifiers.
        metadata_fields: Explicit {metadata_key: field_path} mapping.
            If omitted, all unmapped fields become metadata automatically.
        pagination: Config dict with keys: type (token/offset), token_field, param.

    Returns:
        A SourceConnector with a discover() method.
    """
    return _RESTConnector(
        name=name,
        url=url,
        items_path=items_path,
        name_field=name_field,
        headers=headers or {},
        input_fields=input_fields or [],
        output_fields=output_fields or [],
        metadata_fields=metadata_fields,
        pagination=pagination,
    )

github_connector

github_connector(
    *,
    name: str,
    repos: list[str],
    project_path: str,
    config_file: str,
    parser: Callable[[str, str], DataNode | None],
    token: str | None = None,
) -> _GitHubConnector

Create a SourceConnector that discovers models from GitHub repos.

For each repo, lists subdirectories under project_path, reads config_file from each, and passes the content to parser to produce DataNodes.

Parameters:

Name Type Description Default
name str

Platform name for discovered DataNodes.

required
repos list[str]

List of GitHub repos (org/repo format).

required
project_path str

Directory containing project subdirectories.

required
config_file str

Filename to read in each project directory.

required
parser Callable[[str, str], DataNode | None]

Function (project_name, file_content) -> DataNode | None.

required
token str | None

GitHub personal access token (optional).

None

Returns:

Type Description
_GitHubConnector

A SourceConnector with a discover() method.

Source code in src/model_ledger/connectors/github.py
def github_connector(
    *,
    name: str,
    repos: list[str],
    project_path: str,
    config_file: str,
    parser: Callable[[str, str], DataNode | None],
    token: str | None = None,
) -> _GitHubConnector:
    """Create a SourceConnector that discovers models from GitHub repos.

    For each repo, lists subdirectories under project_path, reads config_file
    from each, and passes the content to parser to produce DataNodes.

    Args:
        name: Platform name for discovered DataNodes.
        repos: List of GitHub repos (org/repo format).
        project_path: Directory containing project subdirectories.
        config_file: Filename to read in each project directory.
        parser: Function (project_name, file_content) -> DataNode | None.
        token: GitHub personal access token (optional).

    Returns:
        A SourceConnector with a discover() method.
    """
    return _GitHubConnector(
        name=name,
        repos=repos,
        project_path=project_path,
        config_file=config_file,
        parser=parser,
        token=token,
    )

Introspection

introspect

Model introspection — plugin-based metadata extraction.

ComponentInfo

Bases: BaseModel

Maps to a ComponentNode in the version's tree.

IntrospectionResult

Bases: BaseModel

Typed contract returned by all introspectors.

Introspector

Bases: Protocol

Extracts metadata from a model object or config.

get_registry

get_registry() -> IntrospectorRegistry

Get or create the global introspector registry.

Source code in src/model_ledger/introspect/registry.py
def get_registry() -> IntrospectorRegistry:
    """Get or create the global introspector registry."""
    global _registry
    if _registry is None:
        _registry = IntrospectorRegistry()
    return _registry

register_introspector

register_introspector(introspector: Introspector) -> None

Convenience function to register an introspector globally.

Source code in src/model_ledger/introspect/registry.py
def register_introspector(introspector: Introspector) -> None:
    """Convenience function to register an introspector globally."""
    get_registry().register(introspector)

register_introspector

register_introspector(introspector: Introspector) -> None

Register a custom introspector plugin.

Manually registered introspectors take priority over entry-point-discovered ones.

Parameters:

Name Type Description Default
introspector Introspector

An object implementing the Introspector protocol (must have name, can_handle, and introspect attributes).

required
Example

from model_ledger import register_introspector register_introspector(MyCustomIntrospector())

Source code in src/model_ledger/__init__.py
def register_introspector(introspector: Introspector) -> None:
    """Register a custom introspector plugin.

    Manually registered introspectors take priority over entry-point-discovered ones.

    Args:
        introspector: An object implementing the Introspector protocol
            (must have name, can_handle, and introspect attributes).

    Example:
        >>> from model_ledger import register_introspector
        >>> register_introspector(MyCustomIntrospector())
    """
    from model_ledger.introspect.registry import get_registry

    get_registry().register(introspector)