DataNode & the graph¶
The core insight: a model, a rule, an ETL job, and an alert queue are the same
shape. Each consumes some things and produces others. So they're all one type —
DataNode — and the dependency graph falls out of matching what they produce to
what others consume.
A node is what it reads and writes¶
from model_ledger import DataNode
DataNode(
name="fraud_scorer",
platform="ml",
inputs=["customer_features"], # what it consumes
outputs=["risk_scores"], # what it produces
metadata={"framework": "xgboost", "owner": "risk-team"},
)
inputs and outputs are ports — the names of the data flowing in and out. A
plain string becomes a DataPort automatically.
The graph builds itself¶
You never draw edges. You call connect(), and every place an output port name
matches an input port name becomes a dependency:
from model_ledger import Ledger, DataNode
ledger = Ledger()
ledger.add([
DataNode("segmentation", platform="etl", outputs=["customer_segments"]),
DataNode("fraud_scorer", platform="ml", inputs=["customer_segments"], outputs=["risk_scores"]),
DataNode("fraud_alerts", platform="alerting", inputs=["risk_scores"]),
])
ledger.connect()
ledger.trace("fraud_alerts") # ['segmentation', 'fraud_scorer', 'fraud_alerts']
ledger.upstream("fraud_alerts") # everything that feeds it
ledger.downstream("segmentation")# everything that depends on it
graph LR
A["segmentation"] -->|customer_segments| B["fraud_scorer"] -->|risk_scores| C["fraud_alerts"]
classDef n fill:#efe8da,stroke:#7a1a1a,color:#1c1a17;
class A,B,C n;
This is why discovery scales: a connector just emits DataNodes with their ports,
and the cross-platform graph assembles itself — an ETL job in your warehouse links to
a model in MLflow links to a queue in your alerting system, with no shared ID scheme.
DataPort precision¶
When two models legitimately write a table with the same name, a bare port name would
collide. DataPort carries optional schema to disambiguate — edges only form when the
schema matches too:
from model_ledger import DataNode, DataPort
DataNode("check_rules", outputs=[DataPort("alerts", model_name="checks")])
DataNode("card_rules", outputs=[DataPort("alerts", model_name="cards")])
DataNode("check_queue", inputs=[DataPort("alerts", model_name="checks")])
# check_queue connects to check_rules only — model_name must match.
Port matching is case-insensitive, and schema values support % wildcards.
From node to governed model¶
A DataNode gives you structure. To give a node an identity and history —
owner, risk tier, purpose, and an audit trail — you
register() it as a ModelRef and
record() events against it. Discovery and registration are two views
of the same inventory: the graph (what connects to what) and the ledger (what each
thing is and how it changed).