Skip to content

Connectors & discovery

A connector emits DataNodes from a source system. Add them to the ledger and call connect() — the cross-platform graph assembles itself from port matching. Three factory connectors ship in core; anything else is a small protocol implementation.

SQL databases

from model_ledger import Ledger, sql_connector

ledger = Ledger.from_sqlite("./inventory.db")

# Simple: read a registry table
models = sql_connector(
    name="model_registry",
    connection=my_db,
    query="SELECT name, owner, status FROM ml_models WHERE active = true",
    name_column="name",
)

# Advanced: auto-parse SQL to extract table dependencies
etl_jobs = sql_connector(
    name="etl_scheduler",
    connection=my_db,
    query="SELECT job_name, raw_sql, cron FROM scheduled_jobs",
    name_column="job_name",
    sql_column="raw_sql",   # FROM/JOIN → inputs, INSERT/CREATE → outputs
)

ledger.add(models.discover())
ledger.add(etl_jobs.discover())
ledger.connect()            # links ETL outputs to model inputs automatically

REST APIs

Works with MLflow, SageMaker, Vertex AI, or any JSON API:

from model_ledger import rest_connector

ml_models = rest_connector(
    name="mlflow",
    url="https://mlflow.internal/api/2.0/mlflow/registered-models/list",
    headers={"Authorization": "Bearer ..."},
    items_path="registered_models",
    name_field="name",
)
ledger.add(ml_models.discover())

GitHub repos (pipelines-as-code)

Discover Airflow DAGs, dbt projects, or scoring pipelines from config files:

from model_ledger import github_connector

pipelines = github_connector(
    name="ml_pipelines",
    repos=["myorg/ml-scoring"],
    token="ghp_...",
    project_path="projects",
    config_file="deploy.yaml",
    parser=my_yaml_parser,   # (project_name, file_content) -> DataNode
)
ledger.add(pipelines.discover())

Custom connectors

Implement the SourceConnector protocol — a name and a discover() returning DataNodes — for anything the factories don't cover:

from model_ledger import DataNode

class SageMakerConnector:
    name = "sagemaker"

    def discover(self) -> list[DataNode]:
        endpoints = boto3.client("sagemaker").list_endpoints()["Endpoints"]
        return [
            DataNode(ep["EndpointName"], platform="sagemaker",
                     outputs=[ep["EndpointName"]],
                     metadata={"status": ep["EndpointStatus"]})
            for ep in endpoints
        ]

ledger.add(SageMakerConnector().discover())
ledger.connect()

Every connector is a growth event

Each new connector extends the discovery surface — a node in your warehouse links to a model in MLflow links to a queue in your alerting system, with no shared ID scheme. That's how one graph spans every platform.

Recurring discovery

Run connectors on a schedule (cron, Airflow, Prefect) writing to a shared backend. add() is idempotent — it content-hashes nodes and skips unchanged ones — and a last_seen timestamp is updated every run, so you can detect models that have gone silent. See the recipe: Discover from a SQL registry.