Skip to content

Seeknal 2.0 DAG Tutorial

This tutorial demonstrates the DAG (Directed Acyclic Graph) infrastructure in Seeknal 2.0. You'll learn how to:

  1. Define reusable components in common config YAML files
  2. Use dependency declaration functions in Python
  3. Generate and inspect the manifest
  4. Detect changes for incremental builds

Prerequisites

  • Python 3.11+
  • Seeknal installed (pip install seeknal)

Project Structure

Create a new project with this structure:

my_project/
├── seeknal.yml                 # Project configuration
├── profiles.yml                # Connection profiles (optional)
├── seeknal/
│   ├── common/
│   │   └── sources.yml         # Reusable sources, transforms, rules
│   ├── sources/
│   │   └── *.yml               # Source definitions
│   ├── transforms/
│   │   └── *.yml               # Transform definitions
│   └── pipelines/
│       ├── user_features.py    # Python pipeline: user features
│       └── transaction_features.py
└── target/
    └── manifest.json           # Generated by seeknal

Step 1: Define Reusable Components

Common config files define reusable data sources, transformations, business rules, and aggregations that can be referenced across your project. Place them in seeknal/common/.

# seeknal/common/sources.yml

# Data Sources - Define where your data comes from
sources:
  - id: traffic_day
    description: "Daily traffic events data"
    source: hive
    table: analytics.traffic_day
    params:
      partition_column: event_date

  - id: user_profiles
    description: "User profile information"
    source: hive
    table: core.user_profiles

  - id: transactions
    description: "Transaction history"
    source: hive
    table: finance.transactions
    params:
      partition_column: transaction_date

# Reusable Transformations
transformations:
  - id: clean_nulls
    description: "Replace null values with defaults"
    className: com.seeknal.transforms.CleanNulls
    params:
      strategy: default

  - id: normalize_timestamps
    description: "Normalize all timestamps to UTC"
    className: com.seeknal.transforms.NormalizeTimestamps
    params:
      target_timezone: UTC

# Business Rules - Reusable filtering logic
rules:
  - id: active_users
    description: "Filter for active users only"
    rule:
      condition: "status = 'active' AND last_login > DATE_SUB(CURRENT_DATE, 30)"

  - id: valid_transactions
    description: "Filter for valid, non-refunded transactions"
    rule:
      condition: "status = 'completed' AND refunded = false"

  - id: high_value_customers
    description: "Customers with total spend > $1000"
    rule:
      condition: "total_lifetime_spend > 1000"

# Reusable Aggregations
aggregations:
  - id: daily_sum
    description: "Sum values by day"
    group_by: [entity_id, event_date]
    agg_functions:
      - column: amount
        function: sum
        alias: daily_total

  - id: rolling_7d_avg
    description: "7-day rolling average"
    window:
      partition_by: entity_id
      order_by: event_date
      frame: "ROWS BETWEEN 6 PRECEDING AND CURRENT ROW"
    agg_functions:
      - column: daily_total
        function: avg
        alias: rolling_7d_avg

Step 2: Create Feature Groups with Dependencies

Create Python files that declare dependencies using Seeknal's DAG functions.

seeknal/pipelines/user_features.py

"""User engagement feature group."""
from seeknal.dag.functions import source, ref, use_transform, use_rule
from seeknal.featurestore.duckdbengine.feature_group import (
    FeatureGroupDuckDB,
    Materialization,
)
from seeknal.entity import Entity


def create_user_engagement_features():
    """
    Create user engagement features.

    This feature group depends on:
    - source.traffic_day (from common config)
    - transform.clean_nulls (from common config)
    - rule.active_users (from common config)
    """
    # Declare dependency on traffic_day source from common config
    traffic = source("traffic_day")

    # Declare dependency on user_profiles source
    profiles = source("user_profiles")

    # Create the feature group
    fg = FeatureGroupDuckDB(
        name="user_engagement",
        entity=Entity(name="user", join_keys=["user_id"]),
        materialization=Materialization(
            event_time_col="event_timestamp",
            ttl_days=365,
        ),
        description="User engagement metrics derived from traffic data",
        owner="data-team",
        tags=["engagement", "user-behavior"],
    )

    # Apply reusable transformation from common config
    df = traffic.load()
    df = use_transform("clean_nulls", df)

    # Apply business rule to filter active users
    active_rule = use_rule("active_users")
    df = df.filter(active_rule)

    # Calculate features
    features_df = df.groupby("user_id").agg({
        "page_views": "sum",
        "session_duration": "mean",
        "events": "count",
    })

    fg.set_dataframe(features_df).set_features()
    return fg


if __name__ == "__main__":
    fg = create_user_engagement_features()
    print(f"Created feature group: {fg.name}")

seeknal/pipelines/transaction_features.py

"""Transaction feature group with dependencies."""
from seeknal.dag.functions import source, ref, use_transform, use_rule, use_aggregation
from seeknal.featurestore.duckdbengine.feature_group import (
    FeatureGroupDuckDB,
    Materialization,
)
from seeknal.entity import Entity


def create_transaction_features():
    """
    Create transaction-based features.

    Dependencies:
    - source.transactions
    - rule.valid_transactions
    - aggregation.daily_sum
    - ref.user_engagement (another feature group)
    """
    # Declare source dependency
    txn_data = source("transactions")

    # Reference another feature group (creates DAG edge)
    user_features = ref("user_engagement")

    fg = FeatureGroupDuckDB(
        name="transaction_features",
        entity=Entity(name="user", join_keys=["user_id"]),
        materialization=Materialization(
            event_time_col="transaction_timestamp",
            ttl_days=180,
        ),
        description="Transaction-based features for ML models",
        owner="ml-team",
        tags=["transactions", "financial"],
    )

    # Load transaction data
    df = txn_data.load()

    # Apply valid transactions rule
    valid_txn_rule = use_rule("valid_transactions")
    df = df.filter(valid_txn_rule)

    # Apply daily aggregation from common config
    df = use_aggregation("daily_sum", df)

    # Join with user engagement features
    df = df.join(user_features, on="user_id")

    fg.set_dataframe(df).set_features()
    return fg

Step 3: Generate the Manifest

Run seeknal parse to analyze your project and generate the manifest:

# Parse current directory
seeknal parse

# Parse specific path
seeknal parse --path /path/to/my_project

# Output in JSON format (default is summary)
seeknal parse --format json

# Skip diff detection (first run or full rebuild)
seeknal parse --no-diff

Example Output

Parsing project: my_project
Found 2 sources, 2 transforms, 3 rules, 2 aggregations

Building dependency graph...
Discovered 5 nodes:
  - source.traffic_day
  - source.user_profiles
  - source.transactions
  - feature_group.user_engagement
  - feature_group.transaction_features

Discovered 4 edges:
  - source.traffic_day -> feature_group.user_engagement
  - source.user_profiles -> feature_group.user_engagement
  - source.transactions -> feature_group.transaction_features
  - feature_group.user_engagement -> feature_group.transaction_features

Manifest saved to: target/manifest.json

Step 4: Understand the Manifest Structure

The generated target/manifest.json contains the complete DAG:

{
  "metadata": {
    "project": "my_project",
    "seeknal_version": "2.1.0",
    "generated_at": "2026-02-01T10:30:00"
  },
  "nodes": {
    "source.traffic_day": {
      "id": "source.traffic_day",
      "name": "traffic_day",
      "type": "source",
      "description": "Daily traffic events data",
      "config": {
        "source": "hive",
        "table": "analytics.traffic_day",
        "params": {"partition_column": "event_date"}
      }
    },
    "source.transactions": {
      "id": "source.transactions",
      "name": "transactions",
      "type": "source",
      "description": "Transaction history",
      "config": {
        "source": "hive",
        "table": "finance.transactions"
      }
    },
    "feature_group.user_engagement": {
      "id": "feature_group.user_engagement",
      "name": "user_engagement",
      "type": "feature_group",
      "description": "User engagement metrics derived from traffic data",
      "owner": "data-team",
      "tags": ["engagement", "user-behavior"],
      "config": {}
    },
    "feature_group.transaction_features": {
      "id": "feature_group.transaction_features",
      "name": "transaction_features",
      "type": "feature_group",
      "description": "Transaction-based features for ML models",
      "owner": "ml-team",
      "tags": ["transactions", "financial"],
      "config": {}
    }
  },
  "edges": [
    {"from": "source.traffic_day", "to": "feature_group.user_engagement", "type": "dependency"},
    {"from": "source.transactions", "to": "feature_group.transaction_features", "type": "dependency"},
    {"from": "feature_group.user_engagement", "to": "feature_group.transaction_features", "type": "dependency"}
  ]
}

Step 5: Detect Changes for Incremental Builds

When you modify your project, Seeknal compares the new manifest with the previous one to determine what needs to be rebuilt.

Make a Change

Edit seeknal/common/sources.yml to add a new rule:

rules:
  # ... existing rules ...

  - id: premium_users
    description: "Users with premium subscription"
    rule:
      condition: "subscription_tier = 'premium'"

Re-parse to Detect Changes

seeknal parse

Example Diff Output

Parsing project: my_project

Changes detected:
  1 node(s) added
  0 node(s) removed
  0 node(s) modified

Added nodes:
  - rule.premium_users

Nodes requiring rebuild: 0

Manifest saved to: target/manifest.json

Modify an Existing Component

If you modify source.traffic_day, Seeknal detects the change and identifies downstream impacts:

sources:
  - id: traffic_day
    description: "Daily traffic events data (updated)"  # Changed!
    source: hive
    table: analytics.traffic_day_v2  # Changed table!
seeknal parse
Changes detected:
  0 node(s) added
  0 node(s) removed
  1 node(s) modified

Modified nodes:
  - source.traffic_day (changed: description, config)

Nodes requiring rebuild: 2
  - source.traffic_day (directly modified)
  - feature_group.user_engagement (downstream dependent)

Manifest saved to: target/manifest.json

You can also use seeknal diff to see changes between the current and previous manifest:

# Show all changes
seeknal diff

# Show only modified nodes
seeknal diff --type modified

# Show summary statistics
seeknal diff --stat

Step 6: Programmatic API Usage

You can also use the DAG infrastructure programmatically:

from seeknal.dag.manifest import Manifest
from seeknal.dag.parser import ProjectParser
from seeknal.dag.diff import ManifestDiff

# Parse a project
parser = ProjectParser(
    project_name="my_project",
    project_path="/path/to/my_project",
)
new_manifest = parser.parse()

# Validate the manifest
errors = parser.validate()
if errors:
    for error in errors:
        print(f"Error: {error}")

# Load previous manifest for comparison
try:
    old_manifest = Manifest.load("target/manifest.json")
except FileNotFoundError:
    old_manifest = None

# Detect changes
if old_manifest:
    diff = ManifestDiff.compare(old_manifest, new_manifest)

    if diff.has_changes():
        print(f"Changes: {diff.summary()}")

        # Get nodes that need rebuilding
        to_rebuild = diff.get_nodes_to_rebuild(new_manifest)
        print(f"Nodes to rebuild: {to_rebuild}")
    else:
        print("No changes detected")

# Save the new manifest
new_manifest.save("target/manifest.json")

Node Types Reference

Type Prefix Description
SOURCE source. External data sources (CSV, Parquet, PostgreSQL, Hive, etc.)
TRANSFORM transform. Reusable transformations
RULE rule. Business rules for filtering
AGGREGATION aggregation. Reusable aggregation definitions
FEATURE_GROUP feature_group. Feature group nodes
MODEL model. ML model nodes
EXPOSURE exposure. Data exposure points

Dependency Functions Reference

Function Purpose Example
source(id) Reference a data source from common config source("traffic_day")
source(type, table) Inline source declaration source("hive", "db.table")
ref(name) Reference another node ref("user_engagement")
use_transform(id, df) Apply a transformation use_transform("clean_nulls", df)
use_rule(id) Get a business rule use_rule("active_users")
use_aggregation(id, df) Apply an aggregation use_aggregation("daily_sum", df)

All functions are available from seeknal.dag.functions or directly from seeknal.dag:

from seeknal.dag import source, ref, use_transform, use_rule, use_aggregation

Best Practices

  1. Organize by Domain: Group related sources, transforms, and rules together in seeknal/common/

  2. Use Descriptive IDs: Choose clear, meaningful identifiers that explain the purpose

  3. Document Dependencies: Add descriptions to help others understand the DAG

  4. Incremental Builds: Use diff detection to only rebuild what changed

  5. Validate Regularly: Run seeknal parse in CI/CD to catch issues early

  6. Use Virtual Environments: Test changes safely with seeknal plan dev before production

Next Steps