Mixed YAML and Python Pipelines Tutorial¶
Learn how to combine YAML and Python pipeline nodes in a single Seeknal project for maximum flexibility.
Note: This tutorial shows the complete file contents for learning purposes. In production, you can use the draft workflow to generate templates:
seeknal draft <type> <name>for YAML files, orseeknal draft <type> <name> --pythonfor Python decorator templates. See the Workflow Tutorial for the complete draft→dry-run→apply pattern.
Overview¶
Seeknal supports mixed pipelines where you can:
- Reference YAML nodes from Python nodes via ctx.ref()
- Reference Python nodes from YAML nodes via inputs
- Use YAML for simple operations and Python for complex transformations
- Get the best of both worlds
When to Use Mixed Pipelines¶
| Use YAML For | Use Python For |
|---|---|
| Simple data sources | Complex business logic |
| Basic SQL transforms | Machine learning models |
| Static configurations | Custom algorithms |
| DBT-style workflows | Data science pipelines |
| Version-controlled schemas | External API integrations |
Example Project: Sales Analytics¶
We'll build a sales analytics pipeline that: 1. YAML Source: Load sales data (simple) 2. Python Transform: Enrich with external data (complex) 3. YAML Transform: Aggregate by region (simple SQL) 4. Python ML Forecast: Predict sales using scikit-learn RandomForest 5. YAML Exposure: Export results (static)
Step 1: Initialize Project¶
Step 2: Generate Sample Data¶
Create a sample sales dataset using Python:
# Create data directory
mkdir -p data
# Generate sample sales data
python3 << 'EOF'
import pandas as pd
import random
from datetime import datetime, timedelta
# Set random seed for reproducibility
random.seed(42)
# Generate 1000 sample transactions
regions = ['USA', 'UK', 'Germany', 'France', 'Japan']
product_ids = [f'ELE-{i:04d}' for i in range(100)] + \
[f'CLO-{i:04d}' for i in range(100)] + \
[f'FOO-{i:04d}' for i in range(100)]
data = []
for i in range(1000):
data.append({
'transaction_id': f'TXN-{i:06d}',
'date': (datetime(2024, 1, 1) + timedelta(days=random.randint(0, 90))).strftime('%Y-%m-%d'),
'product_id': random.choice(product_ids),
'quantity': random.randint(1, 10),
'unit_price': round(random.uniform(10, 500), 2),
'region': random.choice(regions)
})
df = pd.DataFrame(data)
df.to_csv('data/sales.csv', index=False)
print(f"✓ Generated {len(df)} sample sales transactions")
print(f" Columns: {list(df.columns)}")
print(f" File: data/sales.csv")
EOF
Expected output:
✓ Generated 1000 sample sales transactions
Columns: ['transaction_id', 'date', 'product_id', 'quantity', 'unit_price', 'region']
File: data/sales.csv
Step 3: Create YAML Source¶
Create seeknal/sources/raw_sales.yml:
name: raw_sales
kind: source
source: csv
table: data/sales.csv
description: Raw sales transactions
tags:
- sales
- raw
columns:
transaction_id: int
date: date
product_id: str
quantity: int
unit_price: float
region: str
Alternative: Using Draft Workflow
Instead of creating the file manually, use the draft command:
# Generate a YAML source template seeknal draft source raw_sales --description "Raw sales transactions" # Edit the generated template # Edit draft_source_raw_sales.yml # Preview what will happen seeknal dry-run draft_source_raw_sales.yml # Apply to create the actual source seeknal apply draft_source_raw_sales.yml
Step 4: Create Python Transform (References YAML)¶
Create seeknal/pipelines/enrich_sales.py:
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "pandas",
# "pyarrow",
# "duckdb",
# "requests",
# ]
# ///
"""
Transform: Enrich sales data with product information from external API.
This demonstrates Python nodes referencing YAML nodes.
"""
from seeknal.pipeline import transform
@transform(
name="enriched_sales",
description="Add product category and margin to sales data",
)
def enriched_sales(ctx):
"""Enrich sales data with product information."""
# Reference YAML source
sales = ctx.ref("source.raw_sales")
return ctx.duckdb.sql("""
SELECT
s.*,
-- Add product categories (would come from API or lookup)
CASE
WHEN s.product_id LIKE 'ELE%' THEN 'Electronics'
WHEN s.product_id LIKE 'CLO%' THEN 'Clothing'
WHEN s.product_id LIKE 'FOO%' THEN 'Food'
ELSE 'Other'
END as product_category,
-- Calculate margin (would come from cost data)
(s.unit_price * 0.7) as unit_cost,
(s.unit_price - (s.unit_price * 0.7)) as margin
FROM sales s
""").df()
Key Point: ctx.ref("source.raw_sales") references the YAML source node!
Alternative: Using Draft Workflow for Python
You can also generate Python decorator templates:
# Generate a Python transform template seeknal draft transform enriched_sales --python --description "Enrich sales data" # Edit the generated template # Edit draft_transform_enriched_sales.py # Preview and apply seeknal dry-run draft_transform_enriched_sales.py seeknal apply draft_transform_enriched_sales.py
Step 5: Create YAML Transform (References Python)¶
Create seeknal/transforms/regional_totals.yml:
name: regional_totals
kind: transform
description: Aggregate sales by region
inputs:
- ref: transform.enriched_sales
transform: |
SELECT
region,
product_category,
COUNT(*) as transaction_count,
SUM(quantity) as total_quantity,
SUM(margin) as total_margin
FROM ref('transform.enriched_sales')
GROUP BY region, product_category
ORDER BY total_margin DESC
Key Point: ref: transform.enriched_sales references the Python node!
Alternative: Using Draft Workflow
# Generate a YAML transform template seeknal draft transform regional_totals --description "Aggregate sales by region" # Edit the generated template # Edit draft_transform_regional_totals.yml # Preview and apply seeknal dry-run draft_transform_regional_totals.yml seeknal apply draft_transform_regional_totals.yml
Step 6: Create ML Forecast Node¶
Create seeknal/pipelines/sales_forecast.py:
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "pandas",
# "pyarrow",
# "duckdb",
# "scikit-learn",
# "numpy",
# ]
# ///
"""
Transform: ML-based sales forecast using scikit-learn.
This demonstrates Python nodes with ML models.
Uses RandomForestRegressor to predict future margins based on
historical performance metrics.
"""
from seeknal.pipeline import transform
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
@transform(
name="sales_forecast",
description="Forecast next period sales using ML regression",
)
def sales_forecast(ctx):
"""Generate ML-based sales forecast."""
# Reference YAML transform output
aggregated = ctx.ref("transform.regional_totals")
# Convert to DataFrame
if not isinstance(aggregated, pd.DataFrame):
aggregated = aggregated.df()
# Prepare features for ML
df = aggregated.copy()
# Feature engineering
df['margin_per_unit'] = df.apply(
lambda row: row['total_margin'] / row['total_quantity'] if row['total_quantity'] > 0 else 0,
axis=1
)
df['avg_transaction_size'] = df.apply(
lambda row: row['total_quantity'] / row['transaction_count'] if row['transaction_count'] > 0 else 0,
axis=1
)
# Encode categorical variables
df['region_encoded'] = df['region'].astype('category').cat.codes
df['category_encoded'] = df['product_category'].astype('category').cat.codes
# Prepare feature matrix
feature_cols = ['total_quantity', 'transaction_count', 'margin_per_unit',
'avg_transaction_size', 'region_encoded', 'category_encoded']
X = df[feature_cols].values
y = df['total_margin'].values
# Train RandomForest model
model = RandomForestRegressor(
n_estimators=100,
max_depth=10,
random_state=42,
n_jobs=-1
)
model.fit(X, y)
# Make predictions with growth factor (simulating future period)
X_forecast = X.copy()
# Use numpy slicing for column assignment
X_forecast[:, 0:1] = X[:, 0:1] * 1.10 # Increase total_quantity by 10%
X_forecast[:, 1:2] = X[:, 1:2] * 1.08 # Increase transaction_count by 8%
forecast_margin = model.predict(X_forecast)
# Add predictions to dataframe
df['forecast_margin'] = forecast_margin
df['projected_growth'] = df['forecast_margin'] - df['total_margin']
df['growth_percentage'] = (df['projected_growth'] / df['total_margin'] * 100).round(2)
# Determine trend based on ML prediction
df['trend'] = pd.cut(
df['growth_percentage'],
bins=[-np.inf, -2, 2, np.inf],
labels=['DOWN', 'STABLE', 'UP']
)
# Calculate confidence score (R²)
df['forecast_confidence'] = model.score(X, y)
# Select output columns
result = df[[
'region', 'product_category', 'total_margin', 'forecast_margin',
'projected_growth', 'growth_percentage', 'trend', 'forecast_confidence'
]].rename(columns={'total_margin': 'current_margin'})
return result.sort_values('forecast_margin', ascending=False)
Key ML Features: - RandomForestRegressor with 100 trees for robust predictions - Feature engineering: margin_per_unit, avg_transaction_size - Categorical encoding: region and product_category - Forecast simulation: 10% quantity growth, 8% transaction growth - Confidence score: R² score indicates model reliability
Alternative: Using Draft Workflow for Python ML
# Generate a Python transform template seeknal draft transform sales_forecast --python --description "ML sales forecast" # The template will include the decorator structure # Edit draft_transform_sales_forecast.py to add ML logic # Preview and apply seeknal dry-run draft_transform_sales_forecast.py seeknal apply draft_transform_sales_forecast.py
Step 7: Enable Iceberg Materialization (Optional)¶
Both YAML and Python transforms support Iceberg materialization!
When enabled, transform outputs are automatically written to Iceberg tables via: - Lakekeeper: Iceberg REST Catalog (catalog management) - MinIO: S3-compatible object storage (data storage)
This gives you: - Long-term storage in your data lakehouse - Cross-query with other tools (Spark, Trino, etc.) - Data versioning and time travel - Production data serving
Environment Configuration¶
Seeknal automatically loads .env files from your project directory!
Create a .env file in your project root with your Lakekeeper/MinIO credentials:
# .env file in project root
# Lakekeeper (Iceberg REST Catalog)
LAKEKEEPER_URI=http://172.19.0.9:8181
LAKEKEEPER_WAREHOUSE_ID=c008ea5c-fb89-11f0-aa64-c32ca2f52144
# MinIO (S3-compatible Object Storage)
AWS_ENDPOINT_URL=http://172.19.0.9:9000
AWS_ACCESS_KEY_ID=minioadmin
AWS_SECRET_ACCESS_KEY=minioadmin_change_in_production
AWS_REGION=us-east-1
# Keycloak (OAuth2 Authentication)
KEYCLOAK_TOKEN_URL=http://172.19.0.9:8080/realms/atlas/protocol/openid-connect/token
KEYCLOAK_CLIENT_ID=duckdb
KEYCLOAK_CLIENT_SECRET=duckdb-secret-change-in-production
How it works:
- When you run any
seeknalcommand, it automatically searches for.envfiles: - Current directory
- Parent directories (up to 3 levels up)
-
Falls back to default
.envloading behavior -
No need to manually source the file - just create it and run commands!
Verify environment variables are loaded:
# Check if credentials are loaded
seeknal run --help # This will trigger .env loading
# Or check a specific variable
echo $LAKEKEEPER_URI
Materialization for Python Transforms¶
seeknal/pipelines/sales_forecast.py (already created in Step 6):
The Python transform code remains the same. Materialization is configured separately via YAML.
Create the materialization config:
# Create materialization config
cat > seeknal/transforms/sales_forecast.yml << 'EOF'
name: sales_forecast
kind: transform
file: seeknal/pipelines/sales_forecast.py
description: ML-based sales forecast with Iceberg materialization
materialization:
enabled: true
table: "warehouse.prod.sales_forecast"
mode: overwrite
create_table: true
EOF
Configuration options:
| Option | Type | Description |
|---|---|---|
enabled |
boolean | Enable/disable materialization |
table |
string | Fully qualified table name (database.schema.table) |
mode |
string | append or overwrite |
create_table |
boolean | Auto-create table if it doesn't exist |
How Python Materialization Works¶
When materialization.enabled: true:
┌─────────────────────────────────────────────────────────────┐
│ Python Transform Execution │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. Python Function Executes │
│ sales_forecast(ctx) → DataFrame │
│ │ │
│ ▼ │
│ 2. Intermediate Storage │
│ target/intermediate/transform_sales_forecast.parquet │
│ │ │
│ ▼ │
│ 3. DuckDB View Created │
│ CREATE VIEW transform.sales_forecast AS │
│ SELECT * FROM read_parquet('...parquet') │
│ │ │
│ ▼ │
│ 4. Iceberg Materialization │
│ ├─ Get OAuth2 token from Keycloak │
│ ├─ Attach to Lakekeeper catalog │
│ ├─ CREATE TABLE atlas.prod.sales_forecast AS │
│ │ SELECT * FROM transform.sales_forecast │
│ └─ Data written to MinIO S3 as Iceberg │
│ │ │
│ ▼ │
│ 5. Verification │
│ ✓ Table exists in Lakekeeper catalog │
│ ✓ Data stored in MinIO S3 │
│ ✓ Queryable from any tool │
│ │
└─────────────────────────────────────────────────────────────┘
Materialization Modes¶
| Mode | Behavior | SQL | Use Case |
|---|---|---|---|
overwrite |
DROP TABLE + CREATE TABLE |
Replaces all data | Full refresh, daily snapshots |
append |
INSERT INTO |
Adds to existing table | Incremental updates, time-series |
Example - Incremental forecasts:
materialization:
enabled: true
table: "warehouse.prod.sales_forecast_history"
mode: append # Keep historical forecasts
Example - Latest snapshot:
materialization:
enabled: true
table: "warehouse.prod.sales_forecast_latest"
mode: overwrite # Only keep latest forecast
Querying Materialized Data¶
After materialization, query from any tool:
From DuckDB (same connection):
import duckdb
# Query the materialized Iceberg table
df = duckdb.sql("""
SELECT region, product_category, forecast_margin, trend
FROM atlas.prod.sales_forecast
WHERE trend = 'UP'
ORDER BY forecast_margin DESC
""").df()
From a fresh DuckDB connection:
from seeknal.workflow.materialization.yaml_integration import IcebergMaterializationHelper
with IcebergMaterializationHelper.get_duckdb_connection() as con:
df = con.execute("""
SELECT * FROM atlas.prod.sales_forecast
ORDER BY forecast_margin DESC
""").df()
From Trino/Presto:
-- Cross-query from Trino
SELECT region, AVG(forecast_confidence) as avg_confidence
FROM warehouse.prod.sales_forecast
GROUP BY region;
From Spark:
// Read Iceberg table in Spark
val df = spark.read.format("iceberg").load("warehouse.prod.sales_forecast")
df.show()
Step 8: Create YAML Exposure¶
Create seeknal/exposures/manager_dashboard.yml:
name: manager_dashboard
kind: exposure
type: file
description: Export for regional management dashboard
depends_on:
- ref: transform.sales_forecast
params:
path: exports/manager_dashboard.csv
format: csv
Note: Exposures export data as-is from the input ref. They don't support SQL filtering. If you need to filter or transform data before export, create a separate transform node first.
Step 9: Build and Run Pipeline¶
Expected output:
Execution Plan:
============================================================
1. RUN raw_sales [sales, raw]
2. RUN enriched_sales
3. RUN regional_totals
4. RUN sales_forecast (ML: RandomForestRegressor)
5. RUN manager_dashboard
Note: Mix of YAML (1, 3, 5) and Python (2, 4) nodes
Step 4 uses scikit-learn's RandomForestRegressor for ML-based forecasting.
Visualization: Mixed Pipeline DAG¶
┌─────────────────────────────────────────────────────────────┐
│ Mixed Pipeline DAG │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ctx.ref() ┌──────────┐ │
│ │ raw_sales │ ───────────────────────▶ │enriched │ │
│ │ (YAML) │ │ sales │ │
│ └──────────────┘ └──────────┘ │
│ │ │ │
│ │ ▼ │
│ ┌──────────────┐ ┌──────────┐ │
│ │ regional │ ◀─────────────────────────│ sales │ │
│ │ totals │ inputs: │ forecast │ │
│ │ (YAML) │ transform. │(Python) │ │
│ └──────────────┘ enriched_sales └──────────┘ │
│ │ [ML Forecast: │ │
│ │ RandomForest] ▼ │
│ ▼ ┌──────────┐ │
│ ┌──────────────┐ │ dashboard│ │
│ │ manager │ ◀─────────────────────────│ export │ │
│ │ dashboard │ inputs: │ (YAML) │ │
│ │ (YAML) │ transform.sales_ └──────────┘ │
│ └──────────────┘ forecast
│ │
└─────────────────────────────────────────────────────────────┘
Complete Example: E-Commerce Pipeline¶
Let's build a more realistic e-commerce analytics pipeline with mixed nodes.
Project Structure¶
sales_analytics/
├── seeknal/
│ ├── sources/
│ │ ├── orders.yml # YAML: Load orders
│ │ └── products.yml # YAML: Product catalog
│ ├── transforms/
│ │ ├── daily_revenue.yml # YAML: Simple aggregation
│ │ └── cohort_analysis.yml # YAML: Cohort metrics
│ ├── pipelines/
│ │ ├── customer_segments.py # Python: ML clustering
│ │ ├── recommendation_engine.py # Python: Collaborative filtering
│ │ └── inventory_optimization.py # Python: Optimization algorithm
│ ├── feature_groups/
│ │ ├── customer_features.yml # YAML: Simple features
│ │ └── product_affinity.yml # YAML: Product affinity
│ └── exposures/
│ └── marketing_campaign.yml # YAML: Export for marketing
└── data/
├── orders.csv
└── products.csv
1. YAML Sources¶
seeknal/sources/orders.yml:
name: raw_orders
kind: source
source: csv
table: data/orders.csv
description: Raw order data
tags: [orders, raw]
seeknal/sources/products.yml:
name: product_catalog
kind: source
source: csv
table: data/products.csv
description: Product catalog
tags: [products, catalog]
2. Python Transform: Complex Join (References YAML Sources)¶
seeknal/pipelines/customer_ltv.py:
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "pandas",
# "pyarrow",
# "duckdb",
# ]
# ///
"""
Transform: Calculate customer lifetime value.
Complex business logic using data from multiple YAML sources.
"""
from seeknal.pipeline import transform
@transform(
name="customer_ltv",
description="Calculate customer lifetime value (LTV)",
)
def customer_ltv(ctx):
"""Calculate LTV from orders and products."""
# Reference multiple YAML sources
orders = ctx.ref("source.raw_orders")
products = ctx.ref("source.product_catalog")
# Register for SQL access
ctx.duckdb.register("orders", orders)
ctx.duckdb.register("products", products)
return ctx.duckdb.sql("""
WITH order_totals AS (
SELECT
o.customer_id,
SUM(o.quantity * p.unit_price) as total_spend,
COUNT(DISTINCT o.order_id) as order_count,
MIN(o.order_date) as first_order,
DATEDIFF('day', CURRENT_DATE, MIN(o.order_date)) as days_since_first_order
FROM orders o
LEFT JOIN products p ON o.product_id = p.product_id
GROUP BY o.customer_id
),
customer_metrics AS (
SELECT
customer_id,
total_spend,
order_count,
first_order,
-- Calculate average order value
total_spend / NULLIF(order_count, 0) as avg_order_value,
days_since_first_order,
-- Simple LTV prediction (3x AOV * order_count)
(total_spend / NULLIF(order_count, 0)) * 3 * order_count as predicted_ltv
FROM order_totals
)
SELECT
c.*,
NTILE(10) OVER (ORDER BY predicted_ltv DESC) as ltv_decile,
CASE
WHEN predicted_ltv > 1000 THEN 'High Value'
WHEN predicted_ltv > 500 THEN 'Medium Value'
ELSE 'Low Value'
END as value_segment
FROM customer_metrics c
ORDER BY predicted_ltv DESC
""").df()
3. YAML Transform: Simple Aggregation (References Python Node)¶
seeknal/transforms/segment_summary.yml:
name: segment_summary
kind: transform
description: Summarize customer segments
inputs:
- ref: transform.customer_ltv
transform: |
SELECT
value_segment,
ltv_decile,
COUNT(*) as customer_count,
AVG(avg_order_value) as segment_aov,
SUM(predicted_ltv) as total_ltv,
AVG(days_since_first_order) as avg_recency
FROM ref('transform.customer_ltv')
GROUP BY value_segment, ltv_decile
ORDER BY total_ltv DESC
4. Run Mixed Pipeline¶
Execution flow:
1. YAML: Load orders (source)
2. YAML: Load products (source)
3. Python: Calculate LTV (uses both YAML sources)
4. YAML: Summarize segments (uses Python output)
Advanced: Dynamic Imports¶
Python nodes can also import YAML-defined schemas and metadata:
seeknal/pipelines/dynamic_validation.py:
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "pandas",
# "duckdb",
# ]
# ///
"""
Transform: Dynamic validation using YAML schema.
Demonstrates reading YAML metadata from Python nodes.
"""
from seeknal.pipeline import transform
import yaml
from pathlib import Path
@transform(
name="validated_sales",
description="Validate sales data against YAML schema",
)
def validated_sales(ctx):
"""Validate data using schema defined in YAML."""
# Get data
sales = ctx.ref("source.raw_sales")
# Load YAML schema (if defined)
schema_path = Path("seeknal/sources/sales_data.yml")
if schema_path.exists():
with open(schema_path) as f:
schema = yaml.safe_load(f)
# Access schema metadata
expected_columns = schema.get("columns", {})
print(f"Validating against schema: {schema.get('name')}")
# Apply validation logic
return ctx.duckdb.sql("""
SELECT
*,
-- Add data quality flags
CASE
WHEN quantity <= 0 THEN 'Invalid Quantity'
WHEN unit_price <= 0 THEN 'Invalid Price'
WHEN product_id IS NULL THEN 'Missing Product'
ELSE 'Valid'
END as validation_status
FROM sales
""").df()
Best Practices for Mixed Pipelines¶
1. Exposure Node Format¶
Exposures don't support SQL queries - they export data as-is. Use the correct format:
# File export
kind: exposure
type: file
depends_on:
- ref: transform.my_data
params:
path: exports/data.csv
format: csv # csv, parquet, json, jsonl
# API POST
kind: exposure
type: api
depends_on:
- ref: transform.results
params:
url: https://api.example.com/data
method: POST
# Database write
kind: exposure
type: database
depends_on:
- ref: transform.processed
params:
table: production.results
Note: Use depends_on (not inputs) for exposure nodes.
If you need SQL filtering, create a transform node before the exposure:
# Transform: Filter and format data
kind: transform
name: manager_dashboard_filtered
inputs:
- ref: transform.sales_forecast
transform: |
SELECT
region,
product_category,
forecast_margin,
trend,
RANK() OVER (ORDER BY forecast_margin DESC) as margin_rank
FROM ref('transform.sales_forecast')
WHERE trend IN ('UP', 'STABLE')
# Exposure: Export the filtered data
kind: exposure
type: file
depends_on:
- ref: transform.manager_dashboard_filtered
params:
path: exports/manager_dashboard.csv
format: csv
2. Clear Naming Conventions¶
# Good: Clear indication of what type of node it is
@source(name="user_events") # Could be YAML or Python
@transform(name="enrich_users") # Complex = Python
@feature_group(name="user_features") # Could be either
# Use prefixes for clarity in Python files
def load_user_data(ctx): # Clear it's a Python function
2. Documentation in Mixed Projects¶
# Always document if YAML nodes reference Python
name: regional_sales
kind: transform
description: Aggregate by region (uses Python enrichment)
inputs:
- ref: transform.enrich_sales # Note: This is a Python node!
3. Dependency Management¶
# Python nodes need explicit dependencies
# dependencies = [
# "pandas", # For ctx.ref() results
# "duckdb", # For SQL queries
# "scikit-learn", # For ML models
# ]
4. Testing Strategies¶
# Test Python nodes in isolation
seeknal run --nodes customer_ltv
# Test specific types
seeknal run --type source
seeknal run --type transform
# Dry-run validation
seeknal dry-run seeknal/transforms/segment_summary.yml
Common Patterns¶
Pattern 1: YAML Sources → Python Enrichment¶
# pipelines/enrich_events.py
@transform(name="enriched_events")
def enriched_events(ctx):
events = ctx.ref("source.raw_events")
# Complex Python logic...
return enriched_df
Pattern 2: Python Compute → YAML Aggregation¶
# pipelines/compute_metrics.py
@transform(name="computed_metrics")
def computed_metrics(ctx):
df = ctx.ref("source.raw_data")
# Complex calculations...
return metrics_df
# transforms/aggregate_metrics.yml
name: daily_metrics
kind: transform
inputs:
- ref: transform.computed_metrics
transform: |
SELECT DATE(event_time) as date, SUM(value) as total
FROM ref('transform.computed_metrics')
GROUP BY DATE(event_time)
Pattern 3: Bidirectional References¶
# pipelines/customer_360.py
@transform(name="customer_360")
def customer_360(ctx):
# Reference both YAML and Python nodes
orders = ctx.ref("source.raw_orders") # YAML source
ltv = ctx.ref("transform.customer_ltv") # Python transform
segments = ctx.ref("transform.segments") # YAML transform
# Complex join logic...
return customer_360_df
Troubleshooting Mixed Pipelines¶
Issue: Node Not Found¶
Solution: Ensure the YAML file is in the correct directory and valid:
# Check if YAML is discovered
seeknal run --show-plan
# Validate YAML syntax
python -c "import yaml; yaml.safe_load(open('seeknal/sources/my_source.yml'))"
Issue: Type Mismatch¶
Solution: Ensure consistent return types. Python nodes must return pandas DataFrames:
# Always return DataFrame
@transform(name="my_transform")
def my_transform(ctx):
df = ctx.ref("source.raw_data")
result = some_complex_logic(df)
return pd.DataFrame(result) # Ensure DataFrame type
Issue: Circular Dependencies¶
Solution: Break the cycle with intermediate nodes:
Summary¶
Mixed pipelines give you flexibility:
| Aspect | YAML | Python |
|---|---|---|
| Learning curve | Simple | Moderate |
| Data sources | Excellent | Good |
| SQL transforms | Built-in | Via DuckDB |
| Complex logic | Limited | Unlimited |
| ML/AI | No | Yes (scikit-learn, etc.) |
| External APIs | No | Yes (requests, etc.) |
| Version control | Git-friendly | Git-friendly |
Use both for maximum productivity! - Use YAML for what it's good at (sources, simple SQL) - Use Python for everything else (complex transforms, ML, APIs)