Seeknal 2.0 DAG Tutorial¶
This tutorial demonstrates the DAG (Directed Acyclic Graph) infrastructure in Seeknal 2.0. You'll learn how to:
- Define reusable components in common config YAML files
- Use dependency declaration functions in Python
- Generate and inspect the manifest
- Detect changes for incremental builds
Prerequisites¶
- Python 3.11+
- Seeknal installed (
pip install seeknal)
Project Structure¶
Create a new project with this structure:
my_project/
├── seeknal.yml # Project configuration
├── profiles.yml # Connection profiles (optional)
├── seeknal/
│ ├── common/
│ │ └── sources.yml # Reusable sources, transforms, rules
│ ├── sources/
│ │ └── *.yml # Source definitions
│ ├── transforms/
│ │ └── *.yml # Transform definitions
│ └── pipelines/
│ ├── user_features.py # Python pipeline: user features
│ └── transaction_features.py
└── target/
└── manifest.json # Generated by seeknal
Step 1: Define Reusable Components¶
Common config files define reusable data sources, transformations, business rules, and aggregations that can be referenced across your project. Place them in seeknal/common/.
# seeknal/common/sources.yml
# Data Sources - Define where your data comes from
sources:
- id: traffic_day
description: "Daily traffic events data"
source: hive
table: analytics.traffic_day
params:
partition_column: event_date
- id: user_profiles
description: "User profile information"
source: hive
table: core.user_profiles
- id: transactions
description: "Transaction history"
source: hive
table: finance.transactions
params:
partition_column: transaction_date
# Reusable Transformations
transformations:
- id: clean_nulls
description: "Replace null values with defaults"
className: com.seeknal.transforms.CleanNulls
params:
strategy: default
- id: normalize_timestamps
description: "Normalize all timestamps to UTC"
className: com.seeknal.transforms.NormalizeTimestamps
params:
target_timezone: UTC
# Business Rules - Reusable filtering logic
rules:
- id: active_users
description: "Filter for active users only"
rule:
condition: "status = 'active' AND last_login > DATE_SUB(CURRENT_DATE, 30)"
- id: valid_transactions
description: "Filter for valid, non-refunded transactions"
rule:
condition: "status = 'completed' AND refunded = false"
- id: high_value_customers
description: "Customers with total spend > $1000"
rule:
condition: "total_lifetime_spend > 1000"
# Reusable Aggregations
aggregations:
- id: daily_sum
description: "Sum values by day"
group_by: [entity_id, event_date]
agg_functions:
- column: amount
function: sum
alias: daily_total
- id: rolling_7d_avg
description: "7-day rolling average"
window:
partition_by: entity_id
order_by: event_date
frame: "ROWS BETWEEN 6 PRECEDING AND CURRENT ROW"
agg_functions:
- column: daily_total
function: avg
alias: rolling_7d_avg
Step 2: Create Feature Groups with Dependencies¶
Create Python files that declare dependencies using Seeknal's DAG functions.
seeknal/pipelines/user_features.py¶
"""User engagement feature group."""
from seeknal.dag.functions import source, ref, use_transform, use_rule
from seeknal.featurestore.duckdbengine.feature_group import (
FeatureGroupDuckDB,
Materialization,
)
from seeknal.entity import Entity
def create_user_engagement_features():
"""
Create user engagement features.
This feature group depends on:
- source.traffic_day (from common config)
- transform.clean_nulls (from common config)
- rule.active_users (from common config)
"""
# Declare dependency on traffic_day source from common config
traffic = source("traffic_day")
# Declare dependency on user_profiles source
profiles = source("user_profiles")
# Create the feature group
fg = FeatureGroupDuckDB(
name="user_engagement",
entity=Entity(name="user", join_keys=["user_id"]),
materialization=Materialization(
event_time_col="event_timestamp",
ttl_days=365,
),
description="User engagement metrics derived from traffic data",
owner="data-team",
tags=["engagement", "user-behavior"],
)
# Apply reusable transformation from common config
df = traffic.load()
df = use_transform("clean_nulls", df)
# Apply business rule to filter active users
active_rule = use_rule("active_users")
df = df.filter(active_rule)
# Calculate features
features_df = df.groupby("user_id").agg({
"page_views": "sum",
"session_duration": "mean",
"events": "count",
})
fg.set_dataframe(features_df).set_features()
return fg
if __name__ == "__main__":
fg = create_user_engagement_features()
print(f"Created feature group: {fg.name}")
seeknal/pipelines/transaction_features.py¶
"""Transaction feature group with dependencies."""
from seeknal.dag.functions import source, ref, use_transform, use_rule, use_aggregation
from seeknal.featurestore.duckdbengine.feature_group import (
FeatureGroupDuckDB,
Materialization,
)
from seeknal.entity import Entity
def create_transaction_features():
"""
Create transaction-based features.
Dependencies:
- source.transactions
- rule.valid_transactions
- aggregation.daily_sum
- ref.user_engagement (another feature group)
"""
# Declare source dependency
txn_data = source("transactions")
# Reference another feature group (creates DAG edge)
user_features = ref("user_engagement")
fg = FeatureGroupDuckDB(
name="transaction_features",
entity=Entity(name="user", join_keys=["user_id"]),
materialization=Materialization(
event_time_col="transaction_timestamp",
ttl_days=180,
),
description="Transaction-based features for ML models",
owner="ml-team",
tags=["transactions", "financial"],
)
# Load transaction data
df = txn_data.load()
# Apply valid transactions rule
valid_txn_rule = use_rule("valid_transactions")
df = df.filter(valid_txn_rule)
# Apply daily aggregation from common config
df = use_aggregation("daily_sum", df)
# Join with user engagement features
df = df.join(user_features, on="user_id")
fg.set_dataframe(df).set_features()
return fg
Step 3: Generate the Manifest¶
Run seeknal parse to analyze your project and generate the manifest:
# Parse current directory
seeknal parse
# Parse specific path
seeknal parse --path /path/to/my_project
# Output in JSON format (default is summary)
seeknal parse --format json
# Skip diff detection (first run or full rebuild)
seeknal parse --no-diff
Example Output¶
Parsing project: my_project
Found 2 sources, 2 transforms, 3 rules, 2 aggregations
Building dependency graph...
Discovered 5 nodes:
- source.traffic_day
- source.user_profiles
- source.transactions
- feature_group.user_engagement
- feature_group.transaction_features
Discovered 4 edges:
- source.traffic_day -> feature_group.user_engagement
- source.user_profiles -> feature_group.user_engagement
- source.transactions -> feature_group.transaction_features
- feature_group.user_engagement -> feature_group.transaction_features
Manifest saved to: target/manifest.json
Step 4: Understand the Manifest Structure¶
The generated target/manifest.json contains the complete DAG:
{
"metadata": {
"project": "my_project",
"seeknal_version": "2.1.0",
"generated_at": "2026-02-01T10:30:00"
},
"nodes": {
"source.traffic_day": {
"id": "source.traffic_day",
"name": "traffic_day",
"type": "source",
"description": "Daily traffic events data",
"config": {
"source": "hive",
"table": "analytics.traffic_day",
"params": {"partition_column": "event_date"}
}
},
"source.transactions": {
"id": "source.transactions",
"name": "transactions",
"type": "source",
"description": "Transaction history",
"config": {
"source": "hive",
"table": "finance.transactions"
}
},
"feature_group.user_engagement": {
"id": "feature_group.user_engagement",
"name": "user_engagement",
"type": "feature_group",
"description": "User engagement metrics derived from traffic data",
"owner": "data-team",
"tags": ["engagement", "user-behavior"],
"config": {}
},
"feature_group.transaction_features": {
"id": "feature_group.transaction_features",
"name": "transaction_features",
"type": "feature_group",
"description": "Transaction-based features for ML models",
"owner": "ml-team",
"tags": ["transactions", "financial"],
"config": {}
}
},
"edges": [
{"from": "source.traffic_day", "to": "feature_group.user_engagement", "type": "dependency"},
{"from": "source.transactions", "to": "feature_group.transaction_features", "type": "dependency"},
{"from": "feature_group.user_engagement", "to": "feature_group.transaction_features", "type": "dependency"}
]
}
Step 5: Detect Changes for Incremental Builds¶
When you modify your project, Seeknal compares the new manifest with the previous one to determine what needs to be rebuilt.
Make a Change¶
Edit seeknal/common/sources.yml to add a new rule:
rules:
# ... existing rules ...
- id: premium_users
description: "Users with premium subscription"
rule:
condition: "subscription_tier = 'premium'"
Re-parse to Detect Changes¶
Example Diff Output¶
Parsing project: my_project
Changes detected:
1 node(s) added
0 node(s) removed
0 node(s) modified
Added nodes:
- rule.premium_users
Nodes requiring rebuild: 0
Manifest saved to: target/manifest.json
Modify an Existing Component¶
If you modify source.traffic_day, Seeknal detects the change and identifies downstream impacts:
sources:
- id: traffic_day
description: "Daily traffic events data (updated)" # Changed!
source: hive
table: analytics.traffic_day_v2 # Changed table!
Changes detected:
0 node(s) added
0 node(s) removed
1 node(s) modified
Modified nodes:
- source.traffic_day (changed: description, config)
Nodes requiring rebuild: 2
- source.traffic_day (directly modified)
- feature_group.user_engagement (downstream dependent)
Manifest saved to: target/manifest.json
You can also use seeknal diff to see changes between the current and previous manifest:
# Show all changes
seeknal diff
# Show only modified nodes
seeknal diff --type modified
# Show summary statistics
seeknal diff --stat
Step 6: Programmatic API Usage¶
You can also use the DAG infrastructure programmatically:
from seeknal.dag.manifest import Manifest
from seeknal.dag.parser import ProjectParser
from seeknal.dag.diff import ManifestDiff
# Parse a project
parser = ProjectParser(
project_name="my_project",
project_path="/path/to/my_project",
)
new_manifest = parser.parse()
# Validate the manifest
errors = parser.validate()
if errors:
for error in errors:
print(f"Error: {error}")
# Load previous manifest for comparison
try:
old_manifest = Manifest.load("target/manifest.json")
except FileNotFoundError:
old_manifest = None
# Detect changes
if old_manifest:
diff = ManifestDiff.compare(old_manifest, new_manifest)
if diff.has_changes():
print(f"Changes: {diff.summary()}")
# Get nodes that need rebuilding
to_rebuild = diff.get_nodes_to_rebuild(new_manifest)
print(f"Nodes to rebuild: {to_rebuild}")
else:
print("No changes detected")
# Save the new manifest
new_manifest.save("target/manifest.json")
Node Types Reference¶
| Type | Prefix | Description |
|---|---|---|
| SOURCE | source. |
External data sources (CSV, Parquet, PostgreSQL, Hive, etc.) |
| TRANSFORM | transform. |
Reusable transformations |
| RULE | rule. |
Business rules for filtering |
| AGGREGATION | aggregation. |
Reusable aggregation definitions |
| FEATURE_GROUP | feature_group. |
Feature group nodes |
| MODEL | model. |
ML model nodes |
| EXPOSURE | exposure. |
Data exposure points |
Dependency Functions Reference¶
| Function | Purpose | Example |
|---|---|---|
source(id) |
Reference a data source from common config | source("traffic_day") |
source(type, table) |
Inline source declaration | source("hive", "db.table") |
ref(name) |
Reference another node | ref("user_engagement") |
use_transform(id, df) |
Apply a transformation | use_transform("clean_nulls", df) |
use_rule(id) |
Get a business rule | use_rule("active_users") |
use_aggregation(id, df) |
Apply an aggregation | use_aggregation("daily_sum", df) |
All functions are available from seeknal.dag.functions or directly from seeknal.dag:
Best Practices¶
-
Organize by Domain: Group related sources, transforms, and rules together in
seeknal/common/ -
Use Descriptive IDs: Choose clear, meaningful identifiers that explain the purpose
-
Document Dependencies: Add descriptions to help others understand the DAG
-
Incremental Builds: Use diff detection to only rebuild what changed
-
Validate Regularly: Run
seeknal parsein CI/CD to catch issues early -
Use Virtual Environments: Test changes safely with
seeknal plan devbefore production
Next Steps¶
- Explore the FeatureStore Example for materialization
- Learn about Flows & Pipelines for YAML and Python decorator workflows
- See Virtual Environments for safe testing
- Review the CLI Reference for all available commands