Python Pipelines Guide¶
Seeknal's Python pipeline API provides decorators for defining pipeline nodes as Python functions. Use Python pipelines when you need custom logic, ML model training, or integration with Python libraries.
Overview¶
Python decorators let you define data pipeline nodes directly in Python code with full access to the Python ecosystem.
from seeknal.pipeline import source, transform, feature_group, PipelineContext
from seeknal.pipeline.decorators import materialize, second_order_aggregation
When to Use Python Pipelines: - Complex business logic requiring custom algorithms - Machine learning model training and inference - External API integrations (REST, GraphQL, etc.) - Advanced data transformations beyond SQL - Custom validation and data quality checks - Integration with Python libraries (scikit-learn, requests, etc.)
When to Use YAML Pipelines: - Simple data sources (CSV, Parquet, databases) - Basic SQL transformations - Static configurations - DBT-style declarative workflows
See Mixed YAML + Python Tutorial for combining both approaches.
Decorator Reference¶
@source¶
Define data ingestion from files or databases.
Parameters:
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
name |
str |
Yes | — | Unique source name |
source |
str |
No | "csv" |
Source type: csv, parquet, json, postgres, etc. |
table |
str |
No | "" |
Table name or file path |
columns |
dict |
No | None |
Column schema definitions ({"col": "type"}) |
query |
str |
No | None |
SQL query for database sources (pushdown query) |
connection |
str |
No | None |
Connection profile name (for database sources) |
tags |
list[str] |
No | None |
Tags for organization |
materialization |
dict \| MaterializationConfig |
No | None |
Iceberg materialization config |
**params |
Any |
No | — | Additional source-specific parameters |
Function Signature:
def your_source_function(ctx: PipelineContext | None = None) -> pd.DataFrame:
"""Load data from external source."""
pass
Examples:
Simple CSV source:
Database source:
@source(
name="events",
source="postgres",
table="public.events",
columns={"event_id": "int", "user_id": "int", "event_time": "timestamp"},
tags=["production", "analytics"],
)
def events():
pass
Custom source with Python logic:
import pandas as pd
@source(name="api_users", source="csv", table="data/users.csv")
def api_users(ctx=None):
"""Load users from external API."""
import requests
# Fetch from API
response = requests.get("https://api.example.com/users")
data = response.json()
# Convert to DataFrame
return pd.DataFrame(data)
Source with Iceberg materialization:
from seeknal.pipeline.materialization_config import MaterializationConfig
@source(
name="raw_events",
source="postgres",
table="public.events",
materialization=MaterializationConfig(
enabled=True,
table="warehouse.raw.events",
mode="append",
)
)
def raw_events():
pass
@transform¶
Define data transformation logic using SQL or Python.
Parameters:
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
name |
str |
No | function name | Transform name |
sql |
str |
No | None |
Optional SQL transform (alternative to function body) |
inputs |
list[str] |
No | None |
List of upstream node IDs (["source.users"]) |
tags |
list[str] |
No | None |
Tags for organization |
materialization |
dict \| MaterializationConfig |
No | None |
Iceberg materialization config |
**params |
Any |
No | — | Additional transform-specific parameters |
Function Signature:
def your_transform_function(ctx: PipelineContext) -> pd.DataFrame:
"""Transform data from upstream nodes."""
pass
Examples:
Basic SQL transform:
@transform(name="clean_users")
def clean_users(ctx):
df = ctx.ref("source.raw_users")
return ctx.duckdb.sql("""
SELECT * FROM df
WHERE active = true
AND email IS NOT NULL
""").df()
Transform with explicit inputs:
@transform(
name="enriched_sales",
inputs=["source.sales", "source.products"],
tags=["enrichment"],
)
def enriched_sales(ctx):
sales = ctx.ref("source.sales")
products = ctx.ref("source.products")
return ctx.duckdb.sql("""
SELECT
s.*,
p.category,
p.unit_cost,
s.unit_price - p.unit_cost AS margin
FROM sales s
LEFT JOIN products p ON s.product_id = p.product_id
""").df()
Complex Python logic:
import pandas as pd
import numpy as np
@transform(name="rfm_scores")
def rfm_scores(ctx):
"""Calculate RFM scores for customer segmentation."""
df = ctx.ref("transform.clean_transactions")
# Convert to pandas
if not isinstance(df, pd.DataFrame):
df = df.df()
# Calculate RFM metrics
current_date = pd.Timestamp.now()
rfm = df.groupby("customer_id").agg({
"order_date": lambda x: (current_date - x.max()).days, # Recency
"order_id": "nunique", # Frequency
"total_amount": "sum", # Monetary
}).rename(columns={
"order_date": "recency",
"order_id": "frequency",
"total_amount": "monetary",
})
# Calculate scores (1-5 scale)
rfm["r_score"] = pd.qcut(rfm["recency"], 5, labels=[5, 4, 3, 2, 1])
rfm["f_score"] = pd.qcut(rfm["frequency"].rank(method="first"), 5, labels=[1, 2, 3, 4, 5])
rfm["m_score"] = pd.qcut(rfm["monetary"], 5, labels=[1, 2, 3, 4, 5])
return rfm.reset_index()
Machine learning model:
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
@transform(
name="sales_forecast",
inputs=["transform.regional_sales"],
tags=["ml", "forecast"],
)
def sales_forecast(ctx):
"""Forecast sales using RandomForest."""
df = ctx.ref("transform.regional_sales")
if not isinstance(df, pd.DataFrame):
df = df.df()
# Feature engineering
features = ["total_quantity", "transaction_count", "avg_price"]
X = df[features].values
y = df["total_margin"].values
# Train model
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X, y)
# Predict with 10% growth assumption
X_forecast = X * 1.10
df["forecast_margin"] = model.predict(X_forecast)
df["projected_growth"] = df["forecast_margin"] - df["total_margin"]
return df
Transform with Iceberg materialization:
from seeknal.pipeline.materialization_config import MaterializationConfig
@transform(
name="daily_metrics",
materialization=MaterializationConfig(
enabled=True,
table="warehouse.analytics.daily_metrics",
mode="overwrite",
)
)
def daily_metrics(ctx):
df = ctx.ref("transform.clean_events")
return ctx.duckdb.sql("""
SELECT
DATE(event_time) as date,
event_type,
COUNT(*) as event_count,
COUNT(DISTINCT user_id) as unique_users
FROM df
GROUP BY DATE(event_time), event_type
""").df()
@feature_group¶
Define feature groups for ML models with offline/online materialization.
Parameters:
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
name |
str |
No | function name | Feature group name |
entity |
str |
No | None |
Entity name for joins ("user", "product") |
features |
dict |
No | None |
Feature schema definitions |
inputs |
list[str] |
No | None |
List of upstream node IDs |
materialization |
Materialization \| MaterializationConfig \| dict |
No | None |
Offline/online store config or Iceberg config |
tags |
list[str] |
No | None |
Tags for organization |
**params |
Any |
No | — | Additional parameters |
Function Signature:
def your_feature_group_function(ctx: PipelineContext) -> pd.DataFrame:
"""Compute features for the entity."""
pass
Materialization Types:
-
Offline/Online Store (using
Materialization): -
Iceberg Table (using
MaterializationConfig):
Examples:
Simple feature group:
@feature_group(
name="user_features",
entity="user",
)
def user_features(ctx):
df = ctx.ref("transform.clean_users")
return ctx.duckdb.sql("""
SELECT
user_id,
COUNT(DISTINCT order_id) as total_orders,
SUM(order_total) as lifetime_value,
AVG(order_total) as avg_order_value,
MAX(order_date) as last_order_date
FROM df
GROUP BY user_id
""").df()
Feature group with offline store:
from seeknal.pipeline.materialization import Materialization, OfflineConfig
@feature_group(
name="customer_rfm_features",
entity="customer",
materialization=Materialization(
offline=OfflineConfig(
format="parquet",
partition_by=["year", "month"]
)
),
tags=["rfm", "segmentation"],
)
def customer_rfm_features(ctx):
"""Calculate RFM features per customer."""
df = ctx.ref("transform.clean_transactions")
return ctx.duckdb.sql("""
WITH customer_metrics AS (
SELECT
customer_id,
DATEDIFF('day', MAX(order_date), CURRENT_DATE) as recency_days,
COUNT(DISTINCT order_id) as frequency,
SUM(total_amount) as monetary_value
FROM df
GROUP BY customer_id
)
SELECT
*,
-- RFM Scores (1-5 scale)
NTILE(5) OVER (ORDER BY recency_days DESC) as recency_score,
NTILE(5) OVER (ORDER BY frequency ASC) as frequency_score,
NTILE(5) OVER (ORDER BY monetary_value ASC) as monetary_score
FROM customer_metrics
""").df()
Feature group with Iceberg materialization:
from seeknal.pipeline.materialization_config import MaterializationConfig
@feature_group(
name="product_affinity",
entity="product",
materialization=MaterializationConfig(
enabled=True,
table="warehouse.features.product_affinity",
mode="append",
),
)
def product_affinity(ctx):
"""Calculate product co-purchase affinity."""
df = ctx.ref("transform.clean_orders")
# Complex Python logic for affinity calculation
import pandas as pd
from itertools import combinations
# Group by order
orders = df.groupby("order_id")["product_id"].apply(list)
# Calculate co-occurrence
pairs = []
for products in orders:
for p1, p2 in combinations(products, 2):
pairs.append({"product_a": p1, "product_b": p2})
affinity = pd.DataFrame(pairs).groupby(["product_a", "product_b"]).size().reset_index(name="co_purchase_count")
return affinity
@materialize (Stackable)¶
Attach materialization targets to any node. Stack multiple @materialize decorators to write to multiple targets simultaneously.
Parameters:
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
type |
str |
No | "iceberg" |
Target type: iceberg or postgresql |
connection |
str |
No | None |
Named connection from profiles.yml (required for PostgreSQL) |
table |
str |
No | "" |
Target table name (typically required in practice) |
mode |
str |
No | "full" |
Write mode: full, append, overwrite, incremental_by_time, upsert_by_key |
time_column |
str |
No | None |
Time column for incremental mode |
lookback |
str |
No | None |
Lookback window (e.g., "7d") |
unique_keys |
list[str] |
No | None |
Columns for upsert matching |
**kwargs |
Any |
No | — | Additional target-specific parameters |
Examples:
Single target (Iceberg):
@materialize(type="iceberg", table="atlas.warehouse.orders", mode="append")
@transform(name="enriched_orders", inputs=["source.orders"])
def enriched_orders(ctx):
orders = ctx.ref("source.orders")
return ctx.duckdb.sql("SELECT * FROM orders").df()
Multi-target (PostgreSQL + Iceberg):
@materialize(type="postgresql", connection="local_pg",
table="analytics.orders", mode="upsert_by_key",
unique_keys=["order_id"])
@materialize(type="iceberg", table="atlas.warehouse.orders", mode="append")
@transform(name="enriched_orders", inputs=["source.orders"])
def enriched_orders(ctx):
orders = ctx.ref("source.orders")
return ctx.duckdb.sql("SELECT * FROM orders").df()
PostgreSQL incremental:
@materialize(type="postgresql", connection="analytics_db",
table="events.daily_metrics", mode="incremental_by_time",
time_column="event_date", lookback="7d")
@transform(name="daily_metrics", inputs=["source.events"])
def daily_metrics(ctx):
events = ctx.ref("source.events")
return ctx.duckdb.sql(
"SELECT event_date, COUNT(*) as count FROM events GROUP BY event_date"
).df()
Note: Decorators are applied bottom-up. Place
@materializeabove the node decorator (@transform,@source,@feature_group).
@second_order_aggregation¶
Define aggregations on already-aggregated data for multi-level feature engineering.
Parameters:
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
name |
str |
Yes | — | Second-order aggregation name |
source |
str |
Yes | — | Upstream aggregation reference ("aggregation.user_daily") |
id_col |
str |
Yes | — | Entity ID column for grouping ("region_id") |
feature_date_col |
str |
Yes | — | Date column for time-based operations |
application_date_col |
str |
No | None |
Reference date column for window calculations |
description |
str |
No | None |
Human-readable description |
owner |
str |
No | None |
Team/person responsible |
tags |
list[str] |
No | None |
Tags for organization |
materialization |
dict \| Any |
No | None |
Iceberg materialization config |
**params |
Any |
No | — | Additional parameters |
Function Signature:
def your_aggregation_function(ctx: PipelineContext, df: pd.DataFrame) -> pd.DataFrame:
"""Aggregate pre-aggregated data."""
pass
Use Cases: - User-level features → Store-level features → Region-level features - Daily metrics → Weekly metrics → Monthly metrics - Transaction features → Merchant features → Category features
Examples:
Basic second-order aggregation:
@second_order_aggregation(
name="region_metrics",
source="aggregation.store_daily_metrics",
id_col="region_id",
feature_date_col="date",
)
def region_metrics(ctx, df: pd.DataFrame) -> pd.DataFrame:
"""Aggregate store metrics to region level."""
return df.groupby(["region_id", "date"]).agg({
"total_sales": "sum",
"avg_transaction_value": "mean",
"unique_customers": "sum",
}).reset_index()
Advanced aggregation with statistics:
import pandas as pd
import numpy as np
@second_order_aggregation(
name="category_patterns",
source="aggregation.product_metrics",
id_col="category_id",
feature_date_col="metric_date",
description="Category-level product performance patterns",
tags=["analytics", "product"],
)
def category_patterns(ctx, df: pd.DataFrame) -> pd.DataFrame:
"""Calculate category-level patterns from product metrics."""
result = df.groupby("category_id").agg({
"total_revenue": ["sum", "mean", "std"],
"units_sold": ["sum", "mean"],
"unique_customers": "sum",
"avg_rating": "mean",
})
# Flatten column names
result.columns = ["_".join(col).strip() for col in result.columns.values]
result = result.reset_index()
# Calculate derived metrics
result["revenue_volatility"] = result["total_revenue_std"] / result["total_revenue_mean"]
result["avg_units_per_product"] = result["units_sold_sum"] / len(df)
return result
With Iceberg materialization:
from seeknal.pipeline.materialization_config import MaterializationConfig
@second_order_aggregation(
name="regional_user_metrics",
source="aggregation.user_daily_features",
id_col="region_id",
feature_date_col="date",
application_date_col="report_date",
materialization=MaterializationConfig(
enabled=True,
table="warehouse.analytics.regional_metrics",
mode="append",
),
)
def regional_user_metrics(ctx, df: pd.DataFrame) -> pd.DataFrame:
"""Aggregate user features to regional level."""
return ctx.duckdb.sql("""
SELECT
region_id,
date,
COUNT(DISTINCT user_id) as active_users,
SUM(total_spend_30d) as region_total_spend,
AVG(total_spend_30d) as avg_spend_per_user,
STDDEV(total_spend_30d) as spend_std_dev,
SUM(transaction_count) as region_transaction_count
FROM df
GROUP BY region_id, date
""").df()
Pipeline Context API¶
The PipelineContext object (ctx) is passed to all decorated functions (except sources) and provides access to:
ctx.ref(node_reference)¶
Reference upstream node outputs.
Signature:
Parameters:
- node_id (str): Node identifier in format "kind.name" (e.g., "source.users", "transform.clean_data")
Returns:
- pd.DataFrame: DataFrame from the referenced node
Examples:
# Reference a source
users = ctx.ref("source.raw_users")
# Reference a transform
clean_data = ctx.ref("transform.clean_transactions")
# Reference a feature group
features = ctx.ref("feature_group.customer_rfm")
# Reference YAML nodes
yaml_data = ctx.ref("source.sales_data") # From sales_data.yml
How it works:
1. First checks in-memory cache
2. If not found, loads from target/intermediate/<node_id>.parquet
3. Caches for future references
ctx.duckdb¶
Access the DuckDB connection for SQL queries.
Signature:
Returns: - DuckDB connection object
Examples:
Simple SQL query:
@transform(name="filter_active")
def filter_active(ctx):
df = ctx.ref("source.users")
return ctx.duckdb.sql("SELECT * FROM df WHERE active = true").df()
Register multiple dataframes:
@transform(name="join_tables")
def join_tables(ctx):
orders = ctx.ref("source.orders")
products = ctx.ref("source.products")
# Register dataframes for SQL access
ctx.duckdb.register("orders", orders)
ctx.duckdb.register("products", products)
return ctx.duckdb.sql("""
SELECT
o.order_id,
o.customer_id,
p.product_name,
p.category,
o.quantity * p.unit_price as total_amount
FROM orders o
LEFT JOIN products p ON o.product_id = p.product_id
""").df()
Use DuckDB functions:
@transform(name="advanced_analytics")
def advanced_analytics(ctx):
df = ctx.ref("source.transactions")
return ctx.duckdb.sql("""
SELECT
customer_id,
-- Window functions
SUM(amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
ROWS BETWEEN 30 PRECEDING AND CURRENT ROW
) as rolling_30d_sum,
-- Date functions
DATE_TRUNC('week', order_date) as week,
-- Statistical functions
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY amount) as median_amount,
-- String functions
REGEXP_EXTRACT(email, '.*@(.*)$', 1) as email_domain
FROM df
""").df()
Execute statements:
@transform(name="with_temp_tables")
def with_temp_tables(ctx):
df = ctx.ref("source.sales")
# Create temporary table
ctx.duckdb.execute("""
CREATE TEMP TABLE daily_totals AS
SELECT DATE(order_date) as date, SUM(amount) as total
FROM df
GROUP BY DATE(order_date)
""")
# Query the temp table
return ctx.duckdb.sql("SELECT * FROM daily_totals").df()
ctx.config¶
Access profile configuration from profiles.yml.
Signature:
Contains: - Database credentials - Environment-specific settings - Custom configuration
Example:
@source(name="db_users")
def db_users(ctx):
# Access database config
db_config = ctx.config.get("database", {})
connection_string = db_config.get("connection_string")
# Use credentials
import pandas as pd
return pd.read_sql("SELECT * FROM users", connection_string)
ctx.project_path¶
Path to the project root directory.
Signature:
Example:
@source(name="local_file")
def local_file(ctx):
import pandas as pd
file_path = ctx.project_path / "data" / "users.csv"
return pd.read_csv(file_path)
ctx.target_dir¶
Path to the target directory for outputs.
Signature:
Example:
@transform(name="export_metrics")
def export_metrics(ctx):
df = ctx.ref("transform.daily_metrics")
# Write to target directory
output_path = ctx.target_dir / "exports" / "metrics.csv"
output_path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(output_path, index=False)
return df
Running Python Pipelines¶
Auto-discovery¶
Seeknal automatically discovers Python pipeline files in the seeknal/ directory:
Discovery rules:
- Searches seeknal/ and subdirectories
- Looks for Python files with @source, @transform, @feature_group, @second_order_aggregation decorators
- Builds a DAG from all discovered nodes
- Executes in topological order
CLI Options¶
# Show execution plan without running
seeknal run --show-plan
# Run specific nodes
seeknal run --nodes transform.clean_users,feature_group.user_features
# Run with parallel execution
seeknal run --parallel --max-workers 4
# Dry run (validate without executing)
seeknal run --dry-run
# Filter by tags
seeknal run --tags production,analytics
# Run specific node types
seeknal run --type transform
Execution Flow¶
┌─────────────────────────────────────────────────────────────┐
│ Python Pipeline Execution Flow │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. Discovery │
│ └─ Scan seeknal/ directory for .py files │
│ └─ Import files and extract @decorated functions │
│ └─ Build registry of nodes │
│ │
│ 2. DAG Construction │
│ └─ Analyze inputs/dependencies │
│ └─ Create dependency graph │
│ └─ Topological sort │
│ │
│ 3. Execution (for each node) │
│ ├─ Create PipelineContext │
│ ├─ Execute function: result = func(ctx) │
│ ├─ Store output: target/intermediate/<node>.parquet │
│ ├─ Register in DuckDB: CREATE VIEW <node> AS ... │
│ └─ Apply materialization (if configured) │
│ │
│ 4. Cleanup │
│ └─ Close DuckDB connections │
│ └─ Clear context │
│ │
└─────────────────────────────────────────────────────────────┘
Project Structure¶
Organize Python pipelines using a directory structure that matches node types:
my-project/
├── seeknal_project.yml # Project configuration
├── profiles.yml # Database credentials (gitignored)
├── .gitignore
├── seeknal/
│ ├── sources/
│ │ └── users.py # @source definitions
│ ├── transforms/
│ │ ├── clean_users.py # @transform definitions
│ │ └── enrich_sales.py
│ ├── feature_groups/
│ │ └── user_features.py # @feature_group definitions
│ ├── aggregations/
│ │ └── daily_metrics.py # @second_order_aggregation definitions
│ └── pipelines/ # Mixed node types
│ └── customer_ltv.py
├── data/ # Input data files
│ ├── users.csv
│ └── transactions.csv
└── target/ # Outputs (gitignored)
├── intermediate/ # Node outputs
│ ├── source_raw_users.parquet
│ └── transform_clean_users.parquet
└── exports/ # Final exports
Recommended conventions:
- sources/: Data ingestion nodes
- transforms/: Data transformation nodes
- feature_groups/: ML feature definitions
- aggregations/: Second-order aggregations
- pipelines/: Mixed or complex multi-step pipelines
PEP 723 Dependency Management¶
Each Python pipeline file declares its dependencies inline using PEP 723 script metadata:
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "pandas>=2.0",
# "pyarrow>=14.0",
# "duckdb>=0.9.0",
# "scikit-learn>=1.3.0",
# ]
# ///
from seeknal.pipeline import transform
@transform(name="my_transform")
def my_transform(ctx):
# Your code here
pass
Benefits:
- ✅ No global requirements.txt - Each file is self-contained
- ✅ Isolated dependencies - No version conflicts between nodes
- ✅ Automatic environment management - uv creates virtual environments automatically
- ✅ Reproducible builds - Version pinning ensures consistency
How it works:
1. Seeknal uses uv run to execute each Python file
2. uv reads the PEP 723 header
3. Creates an isolated virtual environment
4. Installs specified dependencies
5. Executes the script
6. Caches environment for future runs
Common dependencies:
# Base (required for most pipelines)
"pandas>=2.0"
"pyarrow>=14.0"
"duckdb>=0.9.0"
# Machine learning
"scikit-learn>=1.3.0"
"numpy>=1.24"
# External APIs
"requests>=2.31"
# Data validation
"pydantic>=2.0"
Mixed YAML + Python Pipelines¶
Python nodes can reference YAML nodes and vice versa.
Python referencing YAML:
# Python file: seeknal/pipelines/enrich_sales.py
from seeknal.pipeline import transform
@transform(name="enriched_sales")
def enriched_sales(ctx):
# Reference YAML source
sales = ctx.ref("source.raw_sales") # From raw_sales.yml
return ctx.duckdb.sql("SELECT * FROM sales WHERE amount > 0").df()
YAML referencing Python:
# YAML file: seeknal/transforms/regional_totals.yml
name: regional_totals
kind: transform
inputs:
- ref: transform.enriched_sales # From enriched_sales.py
transform: |
SELECT region, SUM(amount) as total_sales
FROM ref('transform.enriched_sales')
GROUP BY region
See Mixed YAML + Python Tutorial for complete examples.
Common Patterns¶
Pattern 1: RFM Analysis¶
Calculate Recency, Frequency, Monetary features for customer segmentation.
from seeknal.pipeline import transform
@transform(name="customer_rfm")
def customer_rfm(ctx):
df = ctx.ref("transform.clean_transactions")
return ctx.duckdb.sql("""
WITH customer_metrics AS (
SELECT
customer_id,
DATEDIFF('day', MAX(order_date), CURRENT_DATE) as recency_days,
COUNT(DISTINCT order_id) as frequency,
SUM(total_amount) as monetary_value
FROM df
GROUP BY customer_id
)
SELECT
*,
NTILE(5) OVER (ORDER BY recency_days DESC) as recency_score,
NTILE(5) OVER (ORDER BY frequency ASC) as frequency_score,
NTILE(5) OVER (ORDER BY monetary_value ASC) as monetary_score
FROM customer_metrics
""").df()
Pattern 2: ML Model Training¶
Train a scikit-learn model as part of the pipeline.
from seeknal.pipeline import transform
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import pandas as pd
@transform(name="churn_model")
def churn_model(ctx):
"""Train churn prediction model."""
df = ctx.ref("feature_group.customer_features")
if not isinstance(df, pd.DataFrame):
df = df.df()
# Prepare features
features = ["recency_days", "frequency", "monetary_value", "avg_order_value"]
X = df[features]
y = df["churned"]
# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Evaluate
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
# Return predictions
df["churn_probability"] = model.predict_proba(X)[:, 1]
df["predicted_churn"] = model.predict(X)
return df
Pattern 3: External API Integration¶
Fetch data from external APIs.
from seeknal.pipeline import source
import pandas as pd
import requests
@source(name="weather_data")
def weather_data(ctx=None):
"""Fetch weather data from external API."""
api_key = ctx.config.get("weather_api_key") if ctx else None
response = requests.get(
"https://api.weather.com/v3/wx/forecast/daily/5day",
params={"apiKey": api_key, "format": "json"},
timeout=30,
)
response.raise_for_status()
data = response.json()
return pd.DataFrame(data["forecasts"])
Pattern 4: Data Quality Validation¶
Implement custom validation logic.
from seeknal.pipeline import transform
import pandas as pd
@transform(name="validated_orders")
def validated_orders(ctx):
"""Validate orders and flag issues."""
df = ctx.ref("source.raw_orders")
if not isinstance(df, pd.DataFrame):
df = df.df()
# Validation rules
df["is_valid"] = True
df["validation_errors"] = ""
# Rule 1: Amount must be positive
invalid_amount = df["amount"] <= 0
df.loc[invalid_amount, "is_valid"] = False
df.loc[invalid_amount, "validation_errors"] += "Invalid amount; "
# Rule 2: Customer ID must exist
missing_customer = df["customer_id"].isna()
df.loc[missing_customer, "is_valid"] = False
df.loc[missing_customer, "validation_errors"] += "Missing customer; "
# Rule 3: Date must not be in future
future_date = df["order_date"] > pd.Timestamp.now()
df.loc[future_date, "is_valid"] = False
df.loc[future_date, "validation_errors"] += "Future date; "
# Log validation summary
total_records = len(df)
invalid_records = (~df["is_valid"]).sum()
print(f"Validation: {invalid_records}/{total_records} invalid records")
return df
Pattern 5: Export to Multiple Formats¶
Export pipeline results to multiple destinations.
from seeknal.pipeline import transform
import pandas as pd
@transform(name="export_results")
def export_results(ctx):
"""Export customer segments to multiple formats."""
df = ctx.ref("transform.customer_segments")
if not isinstance(df, pd.DataFrame):
df = df.df()
# Export to CSV
csv_path = ctx.target_dir / "exports" / "segments.csv"
csv_path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(csv_path, index=False)
# Export to Parquet
parquet_path = ctx.target_dir / "exports" / "segments.parquet"
df.to_parquet(parquet_path, index=False)
# Export to JSON
json_path = ctx.target_dir / "exports" / "segments.json"
df.to_json(json_path, orient="records", indent=2)
# Export summary statistics
summary = df.groupby("segment").agg({
"customer_id": "count",
"lifetime_value": ["mean", "sum"]
})
summary_path = ctx.target_dir / "exports" / "segment_summary.csv"
summary.to_csv(summary_path)
return df
Troubleshooting¶
Issue: Import Errors¶
Error:
Solution: Add to PEP 723 dependencies:
Issue: Node Not Found¶
Error:
Solutions:
1. Check node ID format: "kind.name" (e.g., "source.users")
2. Ensure upstream node executed first
3. Verify node is registered: seeknal run --show-plan
Issue: DuckDB Registration¶
Error:
Solution: Register DataFrames explicitly:
Or use the DataFrame directly:
Issue: Type Mismatches¶
Error:
Solution: Ensure functions return pandas DataFrames:
@transform(name="my_transform")
def my_transform(ctx):
# Always return DataFrame
result = {"col1": [1, 2], "col2": [3, 4]}
return pd.DataFrame(result)
Issue: Missing Context¶
Error:
Solution: Ensure transform/feature_group functions accept ctx parameter:
# Correct
@transform(name="my_transform")
def my_transform(ctx): # ✅ ctx parameter
df = ctx.ref("source.data")
return df
# Incorrect
@transform(name="my_transform")
def my_transform(): # ❌ Missing ctx
# Cannot call ctx.ref() here
pass
Best Practices¶
1. Use Type Hints¶
from seeknal.pipeline import transform, PipelineContext
import pandas as pd
@transform(name="typed_transform")
def typed_transform(ctx: PipelineContext) -> pd.DataFrame:
"""Transform with explicit types."""
df: pd.DataFrame = ctx.ref("source.users")
return df
2. Add Docstrings¶
@transform(name="clean_users")
def clean_users(ctx):
"""Clean user data by removing invalid records.
Removes:
- Null emails
- Inactive accounts
- Test users
Returns:
DataFrame with cleaned user records
"""
df = ctx.ref("source.raw_users")
return ctx.duckdb.sql("""
SELECT * FROM df
WHERE email IS NOT NULL
AND active = true
AND email NOT LIKE '%@test.com'
""").df()
3. Use Explicit Inputs¶
# Good: Explicit dependencies
@transform(
name="join_data",
inputs=["source.users", "source.orders"],
)
def join_data(ctx):
users = ctx.ref("source.users")
orders = ctx.ref("source.orders")
# ...
4. Handle Errors Gracefully¶
@transform(name="safe_transform")
def safe_transform(ctx):
try:
df = ctx.ref("source.data")
except ValueError as e:
print(f"Warning: Could not load data: {e}")
return pd.DataFrame() # Return empty DataFrame
# Process data
return df
5. Log Progress¶
@transform(name="large_transform")
def large_transform(ctx):
df = ctx.ref("source.large_dataset")
print(f"Processing {len(df)} records...")
result = ctx.duckdb.sql("SELECT * FROM df WHERE active").df()
print(f"Filtered to {len(result)} active records")
return result
See Also¶
- Python Pipelines Tutorial - Step-by-step tutorial
- Mixed YAML + Python Tutorial - Combining YAML and Python
- YAML Pipeline Tutorial - YAML pipeline basics
- CLI Reference - Command-line interface documentation
- Iceberg Materialization - Data lakehouse integration