Getting Started with DuckDB in Seeknal¶
Last Updated: 2026-01-07
Engine: DuckDB (Pure Python, No JVM Required)
Table of Contents¶
- Introduction
- Why DuckDB?
- Installation
- Quick Start
- Core Concepts
- DuckDBTask API
- Transformers
- Aggregators
- Feature Store with DuckDB
- Flow Pipelines
- Migration from Spark
Introduction¶
Seeknal's DuckDB engine provides a lightweight, pure-Python alternative to Spark for data processing and feature engineering. It uses PyArrow Tables as its primary data structure and executes transformations using DuckDB SQL.
Key Benefits¶
- No JVM Required: Pure Python implementation, faster startup
- Better Performance: 10-20x faster for datasets <100M rows
- Lower Memory Footprint: Efficient columnar processing
- Easy Debugging: Pure Python stack traces
- API Parity: Same API as SparkEngineTask
When to Use DuckDB¶
✅ Use DuckDB when: - Dataset < 100M rows - Single-machine deployment - Development/testing environment - Rapid prototyping - Cost-sensitive deployment - Team prefers pure Python
❌ Use Spark when: - Dataset > 100M rows - Distributed processing required - Existing Spark infrastructure - Need Delta Lake features - Production-scale workloads
Why DuckDB?¶
Performance Comparison¶
| Dataset Size | Spark Time | DuckDB Time | Speedup |
|---|---|---|---|
| 1K rows | ~2s | ~0.1s | 20x |
| 100K rows | ~5s | ~0.5s | 10x |
| 10M rows | ~30s | ~10s | 3x |
| 100M rows | ~2min | ~3min | 0.67x |
Architecture Comparison¶
| Aspect | Spark | DuckDB |
|---|---|---|
| Language | Scala/Java backend | Pure Python |
| Startup | JVM startup (slow) | Instant |
| Memory | High overhead | Low overhead |
| Data Format | PySpark DataFrame | PyArrow Table |
| SQL Engine | Spark SQL | DuckDB SQL |
| Distribution | Cluster | Single-node |
Installation¶
Prerequisites¶
- Python 3.11+
- No JVM or Spark installation required!
Install Dependencies¶
# Install Seeknal with DuckDB support
pip install seeknal[duckdb]
# Or install manually
pip install seeknal duckdb pyarrow pandas pydantic
Verify Installation¶
import duckdb
import pyarrow as pa
from seeknal.tasks.duckdb import DuckDBTask
print(f"DuckDB version: {duckdb.__version__}")
print("✓ DuckDB is ready!")
Quick Start¶
Basic Data Processing¶
import pandas as pd
import pyarrow as pa
from seeknal.tasks.duckdb import DuckDBTask
# 1. Create sample data
df = pd.DataFrame({
"user_id": ["A", "B", "C", "D", "E"],
"amount": [100, 200, 150, 250, 180],
"status": ["active", "inactive", "active", "active", "inactive"]
})
# 2. Convert to PyArrow Table
arrow_table = pa.Table.from_pandas(df)
# 3. Create DuckDBTask and process data
result = (
DuckDBTask(name="process_data")
.add_input(dataframe=arrow_table)
.add_filter_by_expr("status = 'active' AND amount > 100")
.add_new_column("amount * 1.1", "adjusted_amount")
.select_columns(["user_id", "amount", "adjusted_amount"])
.transform()
)
# 4. Result is a PyArrow Table
print(f"Processed {len(result)} rows")
print(result.to_pandas())
Output:
Core Concepts¶
PyArrow Tables¶
Seeknal's DuckDB engine uses PyArrow Tables as the primary data structure:
import pyarrow as pa
import pandas as pd
# Convert pandas to PyArrow
df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
arrow_table = pa.Table.from_pandas(df)
# Convert back to pandas
df_back = arrow_table.to_pandas()
# PyArrow is zero-copy compatible with DuckDB
# No serialization overhead!
DuckDBTask¶
The DuckDBTask class represents a data transformation pipeline:
from seeknal.tasks.duckdb import DuckDBTask
task = DuckDBTask(name="my_task")
# Add input data
task.add_input(dataframe=arrow_table)
# OR
task.add_input(path="/path/to/data.parquet")
# OR
task.add_input(sql="SELECT * FROM table")
# Add transformations
task.add_sql("SELECT * FROM __THIS__ WHERE amount > 100")
task.add_new_column("amount * 1.1", "adjusted")
task.add_filter_by_expr("status = 'active'")
# Execute pipeline
result = task.transform()
The __THIS__ Placeholder¶
DuckDB uses __THIS__ as a placeholder for the current dataset in SQL statements:
# These are equivalent:
task.add_sql("SELECT * FROM __THIS__ WHERE amount > 100")
# Gets replaced with:
# SELECT * FROM _input_table WHERE amount > 100
DuckDBTask API¶
Input Methods¶
from seeknal.tasks.duckdb import DuckDBTask
import pyarrow as pa
task = DuckDBTask(name="example")
# Method 1: PyArrow Table (recommended)
arrow_table = pa.Table.from_pandas(df)
task.add_input(dataframe=arrow_table)
# Method 2: File path (Parquet, CSV, etc.)
task.add_input(path="/data/my_data.parquet")
# Method 3: Raw SQL query
task.add_input(sql="SELECT * FROM source_table WHERE date > '2024-01-01'")
Transformation Methods¶
from seeknal.tasks.duckdb import DuckDBTask
task = DuckDBTask(name="transform")
# 1. SQL transformations
task.add_sql("SELECT * FROM __THIS__ WHERE amount > 100")
# 2. Add computed columns
task.add_new_column("amount * 1.1", "adjusted_amount")
task.add_new_column("CASE WHEN status = 'active' THEN 1 ELSE 0 END", "is_active")
# 3. Filter rows
task.add_filter_by_expr("status = 'active'")
task.add_filter_by_expr("amount > 100 AND category = 'food'")
# 4. Select specific columns
task.select_columns(["user_id", "amount", "date"])
# 5. Drop columns
task.drop_columns(["temp_column", "debug_info"])
# Execute
result = task.transform()
Using with Context Manager¶
with DuckDBTask(name="example") as task:
result = (task
.add_input(dataframe=arrow_table)
.add_filter_by_expr("amount > 100")
.transform()
)
Return Types¶
# Return as PyArrow Table (default)
result = task.transform() # Returns PyArrow Table
# Return as pandas DataFrame
result = task.transform(params={"return_as_pandas": True}) # Returns pd.DataFrame
# Return self for chaining
result = task.transform(materialize=True) # Returns DuckDBTask
Transformers¶
Transformers modify data row-by-row or column-by-column.
Simple Transformers¶
1. SQL Transformer¶
Execute arbitrary SQL:
from seeknal.tasks.duckdb.transformers import SQL
transformer = SQL(
statement="SELECT user_id, amount * 1.1 AS adjusted FROM __THIS__ WHERE amount > 100"
)
2. ColumnRenamed¶
Rename a column:
from seeknal.tasks.duckdb.transformers import ColumnRenamed
transformer = ColumnRenamed(
inputCol="old_name",
outputCol="new_name"
)
3. AddColumnByExpr¶
Add computed column:
from seeknal.tasks.duckdb.transformers import AddColumnByExpr
transformer = AddColumnByExpr(
expression="amount * 1.1",
outputCol="adjusted_amount"
)
4. FilterByExpr¶
Filter rows:
from seeknal.tasks.duckdb.transformers import FilterByExpr
transformer = FilterByExpr(
expression="status = 'active' AND amount > 100"
)
5. SelectColumns / DropCols¶
from seeknal.tasks.duckdb.transformers import SelectColumns, DropCols
# Keep only these columns
select = SelectColumns(inputCols=["user_id", "amount", "date"])
# Drop these columns
drop = DropCols(inputCols=["temp_col", "debug_info"])
Medium Transformers¶
6. JoinTablesByExpr¶
Join multiple tables:
from seeknal.tasks.duckdb.transformers import JoinTablesByExpr, TableJoinDef, JoinType
transformer = JoinTablesByExpr(
select_stm="a.*, b.value2",
alias="a",
tables=[
TableJoinDef(
table="other_table",
alias="b",
joinType=JoinType.LEFT,
joinExpression="a.id = b.id"
)
]
)
7. CastColumn¶
Change data type:
from seeknal.tasks.duckdb.transformers import CastColumn
transformer = CastColumn(
inputCol="amount",
outputCol="amount_int",
dataType="INTEGER"
)
Complex Transformers¶
8. AddWindowFunction¶
Add window function column:
from seeknal.tasks.duckdb.transformers import AddWindowFunction, WindowFunction
# LAG function
lag = AddWindowFunction(
inputCol="amount",
offset=1,
windowFunction=WindowFunction.LAG,
partitionCols=["user_id"],
orderCols=["date"],
outputCol="prev_amount"
)
# ROW_NUMBER
row_num = AddWindowFunction(
inputCol="amount",
windowFunction=WindowFunction.ROW_NUMBER,
partitionCols=["user_id"],
orderCols=["date"],
outputCol="row_num"
)
# Running total
running_sum = AddWindowFunction(
inputCol="amount",
windowFunction=WindowFunction.SUM,
partitionCols=["user_id"],
orderCols=["date"],
outputCol="running_total"
)
Using Transformers with DuckDBTask¶
from seeknal.tasks.duckdb import DuckDBTask
from seeknal.tasks.duckdb.transformers import AddWindowFunction, WindowFunction
task = DuckDBTask(name="window_example")
# Using convenience method
task.add_new_column("amount * 1.1", "adjusted")
# Using add_stage with transformer
lag = AddWindowFunction(
inputCol="amount",
offset=1,
windowFunction=WindowFunction.LAG,
partitionCols=["user_id"],
orderCols=["date"],
outputCol="prev_amount"
)
task.add_stage(transformer=lag)
result = task.transform()
Aggregators¶
Aggregators group data and compute summary statistics.
Simple Aggregators¶
1. FunctionAggregator¶
Standard SQL functions:
from seeknal.tasks.duckdb.aggregators import DuckDBAggregator, FunctionAggregator
aggregator = DuckDBAggregator(
group_by_cols=["user_id", "category"],
aggregators=[
FunctionAggregator(
inputCol="amount",
outputCol="total_amount",
accumulatorFunction="sum"
),
FunctionAggregator(
inputCol="amount",
outputCol="avg_amount",
accumulatorFunction="avg"
),
FunctionAggregator(
inputCol="amount",
outputCol="count_rows",
accumulatorFunction="count"
),
FunctionAggregator(
inputCol="amount",
outputCol="min_amount",
accumulatorFunction="min"
),
FunctionAggregator(
inputCol="amount",
outputCol="max_amount",
accumulatorFunction="max"
),
]
)
2. ExpressionAggregator¶
Custom SQL expressions:
from seeknal.tasks.duckdb.aggregators import ExpressionAggregator
aggregator = DuckDBAggregator(
group_by_cols=["user_id"],
aggregators=[
ExpressionAggregator(
expression="SUM(CASE WHEN status = 'active' THEN amount ELSE 0 END)",
outputCol="active_sum"
),
ExpressionAggregator(
expression="COUNT(DISTINCT category)",
outputCol="category_count"
)
]
)
3. DayTypeAggregator¶
Weekday/weekend filtering:
from seeknal.tasks.duckdb.aggregators import DayTypeAggregator
aggregator = DuckDBAggregator(
group_by_cols=["user_id"],
aggregators=[
DayTypeAggregator(
inputCol="transactions",
outputCol="weekend_sum",
accumulatorFunction="sum",
weekdayCol="is_weekend" # Boolean column
)
]
)
Complex Aggregators¶
4. LastNDaysAggregator¶
Time-based windowing:
from seeknal.tasks.duckdb.aggregators import LastNDaysAggregator, FunctionAggregator
aggregator = LastNDaysAggregator(
group_by_cols=["user_id"],
window=7, # 7-day window
date_col="date_id",
date_pattern="yyyyMMdd", # or "yyyy-MM-dd"
aggregators=[
FunctionAggregator(
inputCol="amount",
outputCol="amount_sum_7d",
accumulatorFunction="sum"
),
FunctionAggregator(
inputCol="transactions",
outputCol="transaction_count_7d",
accumulatorFunction="count"
)
]
)
Using Aggregators with DuckDBTask¶
from seeknal.tasks.duckdb import DuckDBTask
from seeknal.tasks.duckdb.aggregators import DuckDBAggregator, FunctionAggregator
# Create aggregator
agg = DuckDBAggregator(
group_by_cols=["user_id"],
aggregators=[
FunctionAggregator(
inputCol="amount",
outputCol="total",
accumulatorFunction="sum"
)
]
)
# Use with DuckDBTask
result = (
DuckDBTask(name="aggregate")
.add_input(dataframe=arrow_table)
.add_filter_by_expr("amount > 0")
.add_stage(aggregator=agg) # Add aggregator as a stage
.transform()
)
5. SecondOrderAggregator¶
Advanced feature engineering with multiple aggregation rules and time-based filtering.
What is Second Order Aggregation?
Second Order Aggregation allows you to create complex features by applying multiple aggregation rules with different time windows and conditions. It's particularly useful for:
- Customer Behavior Analysis: Compare spending in different time periods
- Risk Scoring: Calculate ratios of recent to historical activity
- Churn Prediction: Track changes in behavior patterns over time
- Credit Scoring: Compare recent vs. historical transaction patterns
Supported Rule Types:
- basic - Simple aggregation over all history
- basic_days - Aggregation within a time window (days since application date)
- ratio - Ratio of two aggregations over different time windows
- since - Aggregation filtered by custom condition
Basic Usage¶
from seeknal.tasks.duckdb.aggregators.second_order_aggregator import (
SecondOrderAggregator,
AggregationSpec
)
import pandas as pd
import pyarrow as pa
# Prepare transaction data with dates
df = pd.DataFrame({
"user_id": ["A", "A", "A", "A", "B", "B", "B", "B"],
"application_date": ["2024-01-15", "2024-01-15", "2024-01-15", "2024-01-15",
"2024-01-15", "2024-01-15", "2024-01-15", "2024-01-15"],
"transaction_date": ["2024-01-01", "2024-01-05", "2024-01-10", "2024-01-14",
"2024-01-02", "2024-01-06", "2024-01-11", "2024-01-13"],
"amount": [100, 150, 200, 250, 80, 120, 90, 110],
"category": ["food", "travel", "food", "entertainment",
"food", "travel", "food", "entertainment"]
})
arrow_table = pa.Table.from_pandas(df)
# Create aggregator
agg = SecondOrderAggregator(
idCol="user_id",
featureDateCol="transaction_date",
featureDateFormat="yyyy-MM-dd",
applicationDateCol="application_date",
applicationDateFormat="yyyy-MM-dd"
)
# Define aggregation rules
rules = [
# Rule 1: Basic aggregation - total amount over all history
AggregationSpec(
name="basic",
features="amount",
aggregations="sum"
),
# Rule 2: Time-windowed aggregation - amount in last 1-7 days
AggregationSpec(
name="basic_days",
features="amount",
aggregations="sum",
dayLimitLower1="1",
dayLimitUpper1="7"
),
# Rule 3: Ratio - compare recent (1-7 days) vs. historical (8-30 days)
AggregationSpec(
name="ratio",
features="amount",
aggregations="sum",
dayLimitLower1="1",
dayLimitUpper1="7",
dayLimitLower2="8",
dayLimitUpper2="30"
),
# Rule 4: Conditional aggregation - count since condition is met
AggregationSpec(
name="since",
features="amount",
aggregations="count",
filterCondition="amount > 100"
)
]
# Set rules and transform
agg.setRules(rules)
# Register table and execute
import duckdb
conn = duckdb.connect()
conn.register("transactions", arrow_table)
result = agg.transform("transactions")
# Get result as pandas DataFrame
features_df = result.df()
print(features_df)
Output:
user_id amount_SUM amount_SUM_1_7 amount_SUM1_7_amount_SUM8_30 SINCE_COUNT_amount_GEO_D
0 A 700 450 1.8 3
1 B 400 200 1.0 0
Explanation:
- amount_SUM: Total amount for each user (all history)
- amount_SUM_1_7: Sum of amounts in days 1-7 before application date
- amount_SUM1_7_amount_SUM8_30: Ratio of recent (1-7 days) to historical (8-30 days) spending
- SINCE_COUNT_amount_GEO_D: Count of transactions where amount > 100
Multiple Features and Aggregations¶
# Define complex rules with multiple features and aggregations
rules = [
# Total spending and count for multiple categories
AggregationSpec(
name="basic_days",
features="amount, transaction_count",
aggregations="sum, count",
dayLimitLower1="1",
dayLimitUpper1="30"
),
# Ratio of spending in two time windows
AggregationSpec(
name="ratio",
features="amount",
aggregations="sum, mean",
dayLimitLower1="1",
dayLimitUpper1="7",
dayLimitLower2="8",
dayLimitUpper2="30"
)
]
agg.setRules(rules)
result = agg.transform("transactions")
features_df = result.df()
# Output columns:
# - amount_SUM_1_30
# - amount_COUNT_1_30
# - transaction_count_SUM_1_30
# - transaction_count_COUNT_1_30
# - amount_SUM1_7_amount_SUM8_30
# - amount_MEAN1_7_amount_MEAN8_30
Real-World Example: Credit Scoring Features¶
# Transaction data for credit scoring
df = pd.DataFrame({
"customer_id": ["C001", "C001", "C001", "C001", "C001",
"C002", "C002", "C002", "C002", "C002"],
"application_date": ["2024-01-15"] * 10,
"transaction_date": ["2024-01-01", "2024-01-05", "2024-01-10", "2024-01-12", "2024-01-14",
"2024-01-02", "2024-01-06", "2024-01-09", "2024-01-11", "2024-01-13"],
"transaction_amount": [5000, 3000, 2000, 4000, 6000,
1000, 1500, 1200, 1800, 2000],
"is_large_transaction": [1, 0, 0, 1, 1, 0, 0, 0, 0, 0]
})
arrow_table = pa.Table.from_pandas(df)
# Create credit scoring features
agg = SecondOrderAggregator(
idCol="customer_id",
featureDateCol="transaction_date",
featureDateFormat="yyyy-MM-dd",
applicationDateCol="application_date",
applicationDateFormat="yyyy-MM-dd"
)
# Credit scoring rules
credit_rules = [
# Recent activity (last 7 days)
AggregationSpec(
name="basic_days",
features="transaction_amount",
aggregations="sum, count, mean",
dayLimitLower1="1",
dayLimitUpper1="7"
),
# Historical activity (8-30 days)
AggregationSpec(
name="basic_days",
features="transaction_amount",
aggregations="sum, count",
dayLimitLower1="8",
dayLimitUpper1="30"
),
# Ratio: Recent vs Historical (behavior change indicator)
AggregationSpec(
name="ratio",
features="transaction_amount",
aggregations="sum",
dayLimitLower1="1",
dayLimitUpper1="7",
dayLimitLower2="8",
dayLimitUpper2="30"
),
# Large transaction count (risk indicator)
AggregationSpec(
name="since",
features="is_large_transaction",
aggregations="sum",
filterCondition="is_large_transaction == 1"
)
]
agg.setRules(credit_rules)
conn = duckdb.connect()
conn.register("credit_transactions", arrow_table)
result = agg.transform("credit_transactions")
credit_features = result.df()
print(credit_features)
Output:
customer_id transaction_amount_SUM_1_7 transaction_amount_COUNT_1_7 ... SINCE_SUM_is_large_transaction_GEO_D
0 C001 15000 3 ... 2
1 C002 5000 2 ... 0
Feature Interpretation: - High recent transaction sum + high ratio = Active customer (good for credit limit increase) - Low ratio (recent/historical) = Declining activity (potential churn risk) - High large transaction count = High-value customer (or potential fraud risk)
Using Builder Pattern (Fluent API)¶
The improved developer experience uses a fluent builder pattern with .feature() as the starting point:
from seeknal.tasks.duckdb.aggregators.second_order_aggregator import FeatureBuilder
# Use builder pattern for complex rule definitions
agg = (
SecondOrderAggregator(
idCol="user_id",
featureDateCol="transaction_date",
applicationDateCol="application_date"
)
.builder()
.feature("amount")
.basic(["sum", "count"])
.rolling(days=[(1, 7), (8, 30)], aggs=["sum", "mean"])
.ratio(numerator=(1, 7), denominator=(8, 30), aggs=["sum"])
.since(condition="amount > 100", aggs=["count"])
.build()
)
result = agg.transform("transactions")
Builder Pattern Methods:
.feature(name)- Set the feature column to aggregate.basic(aggs)- Add basic aggregations (all history).rolling(days, aggs)- Add time-windowed aggregations.ratio(numerator, denominator, aggs)- Add ratio of two time windows.since(condition, aggs)- Add conditional aggregations.build()- Build and return the aggregator
Complete Example with Builder:
from seeknal.tasks.duckdb.aggregators.second_order_aggregator import (
SecondOrderAggregator,
FeatureBuilder
)
agg = (
SecondOrderAggregator(
idCol="customer_id",
featureDateCol="transaction_date",
applicationDateCol="application_date"
)
.builder()
# Amount feature aggregations
.feature("amount")
.basic(["sum", "count", "mean"]) # All history
.rolling(days=[(1, 7), (8, 30)], aggs=["sum"]) # Recent vs historical
.ratio(
numerator=(1, 7), # Recent: 1-7 days
denominator=(8, 30), # Historical: 8-30 days
aggs=["sum"]
)
.since(condition="amount > 1000", aggs=["count"]) # Large transactions
# Transaction count aggregations
.feature("transaction_count")
.basic(["count"])
.rolling(days=[(1, 7)], aggs=["sum"])
.build()
)
# Validate and execute
errors = agg.validate("transactions")
if errors:
print("Errors:", errors)
else:
result = agg.transform("transactions")
features_df = result.df()
Why Builder Pattern is Better:
- ✅ Type-safe: Methods check for valid inputs
- ✅ Readable: Clear structure with indentation
- ✅ Maintainable: Easy to add/remove rules
- ✅ Feature-grouped: All rules for a feature are together
- ✅ Less repetitive: Don't repeat feature name in each rule
Comparison:
# OLD WAY (repetitive)
rules = [
AggregationSpec("basic", "amount", "sum"),
AggregationSpec("basic", "amount", "count"),
AggregationSpec("basic_days", "amount", "sum", "", 1, 7),
AggregationSpec("basic_days", "amount", "count", "", 1, 7),
AggregationSpec("ratio", "amount", "sum", "", 1, 7, 8, 30),
]
# NEW WAY (builder pattern)
agg.builder()
.feature("amount")
.basic(["sum", "count"])
.rolling(days=[(1, 7)], aggs=["sum", "count"])
.ratio(numerator=(1, 7), denominator=(8, 30), aggs=["sum"])
.build()
Best Practices for Second Order Aggregation¶
- Choose Appropriate Time Windows:
- Short-term (1-7 days): Recent behavior patterns
- Medium-term (8-30 days): Medium-term trends
-
Long-term (31-90 days): Historical baseline
-
Use Ratios for Trend Analysis:
# Behavior change detection AggregationSpec( name="ratio", features="amount", aggregations="sum", dayLimitLower1="1", # Recent: 1-7 days dayLimitUpper1="7", dayLimitLower2="8", # Historical: 8-30 days dayLimitUpper2="30" ) # Ratio > 1.0: Increasing activity (good) # Ratio < 1.0: Decreasing activity (churn risk) -
Combine Multiple Rules:
rules = [ # Volume metrics AggregationSpec("basic_days", "amount", "sum", "", 1, 30), # Frequency metrics AggregationSpec("basic_days", "transaction_id", "count", "", 1, 30), # Trend metrics AggregationSpec("ratio", "amount", "sum", "", 1, 7, 8, 30), # Quality metrics AggregationSpec("since", "large_amount", "count", "large_amount == 1") ] -
Validate Input Data:
Feature Store with DuckDB¶
Seeknal's Feature Store provides feature versioning, materialization, and serving. The DuckDB implementation uses Parquet files for storage.
Basic Feature Group¶
from seeknal.featurestore.duckdbengine.feature_group import (
FeatureGroupDuckDB,
Materialization,
Entity
)
from seeknal.project import Project
import pandas as pd
# Create project
project = Project(name="my_project")
project.get_or_create()
# Define entity
entity = Entity(
name="user",
join_keys=["user_id"],
description="User entity"
)
# Define feature group
fg = FeatureGroupDuckDB(
name="user_features",
entity=entity,
materialization=Materialization(
event_time_col="timestamp",
offline_store_path="/features/user_features",
online_store_path="/features/user_features_online"
),
project="my_project"
)
# Set features
df = pd.DataFrame({
"user_id": ["A", "B", "C"],
"timestamp": pd.date_range("2024-01-01", periods=3),
"total_amount": [1000, 2000, 1500],
"transaction_count": [10, 20, 15],
"avg_amount": [100.0, 100.0, 100.0]
})
fg.set_dataframe(df).set_features()
# Write feature group
fg.write(feature_start_time=pd.Timestamp("2024-01-01"))
Historical Features (Point-in-Time Joins)¶
Use FeatureFrame.pit_join() in transforms for point-in-time joins:
from seeknal.pipeline import transform
@transform(name="training_data")
def training_data(ctx):
# Get spine with entity keys and prediction dates
spine = ctx.ref("source.application_labels") # user_id, application_date, label
# PIT join: features as of each application_date
features_df = ctx.ref("feature_group.user_features").pit_join(
spine=spine,
date_col="application_date",
keep_cols=["label"],
)
return features_df
Online Features¶
Use ctx.features() for real-time feature lookup:
from seeknal.pipeline import transform
@transform(name="predictions")
def predictions(ctx):
# Get latest features from entity store
features = ctx.features("user", [
"user_features.total_amount",
"user_features.transaction_count",
"user_features.avg_amount",
])
# features is a DataFrame with latest values for all users
print(features.head())
return features
Feature Group Versioning¶
# List versions
versions = fg.list_versions()
print(f"Found {len(versions)} versions")
# Get specific version
fg_v1 = fg.get_version(version=1)
# Compare versions
diff = fg.compare_versions(from_version=1, to_version=2)
# Materialize specific version
fg.write(
feature_start_time=pd.Timestamp("2024-01-01"),
version=1 # Materialize version 1
)
Flow Pipelines¶
The Flow class orchestrates multi-stage data pipelines with support for mixed Spark and DuckDB tasks.
Pure DuckDB Pipeline¶
from seeknal.flow import Flow, FlowInput, FlowOutput, FlowInputEnum, FlowOutputEnum
from seeknal.tasks.duckdb import DuckDBTask
import pyarrow as pa
# Create input data
df = pd.DataFrame({
"user_id": ["A", "B", "C", "D", "E"],
"amount": [100, 200, 150, 250, 180],
"category": ["food", "travel", "food", "entertainment", "travel"]
})
arrow_table = pa.Table.from_pandas(df)
# Create Flow
flow = Flow(
name="duckdb_pipeline",
input=FlowInput(
kind=FlowInputEnum.SOURCE, # Use SOURCE for PyArrow Tables
value=arrow_table
),
tasks=[
DuckDBTask(name="filter")
.add_filter_by_expr("amount > 100"),
DuckDBTask(name="transform")
.add_new_column("amount * 0.9", "discounted_amount"),
DuckDBTask(name="select")
.select_columns(["user_id", "amount", "discounted_amount"])
],
output=FlowOutput(kind=FlowOutputEnum.ARROW_DATAFRAME)
)
# Save flow
flow.get_or_create()
# Run pipeline
result = flow.run()
print(f"Result: {len(result)} rows")
Pipeline with Aggregations¶
from seeknal.tasks.duckdb.aggregators import DuckDBAggregator, FunctionAggregator
# Create aggregator
agg = DuckDBAggregator(
group_by_cols=["category"],
aggregators=[
FunctionAggregator(
inputCol="amount",
outputCol="total_amount",
accumulatorFunction="sum"
),
FunctionAggregator(
inputCol="amount",
outputCol="avg_amount",
accumulatorFunction="avg"
)
]
)
# Create flow with aggregation
flow = Flow(
name="aggregation_pipeline",
input=FlowInput(kind=FlowInputEnum.SOURCE, value=arrow_table),
tasks=[
DuckDBTask(name="aggregate").add_stage(aggregator=agg)
],
output=FlowOutput(kind=FlowOutputEnum.ARROW_DATAFRAME)
)
result = flow.run()
Mixed Spark/DuckDB Pipeline¶
from seeknal.tasks.sparkengine import SparkEngineTask
# Flow automatically detects if Spark is needed
flow = Flow(
name="mixed_pipeline",
input=FlowInput(
kind=FlowInputEnum.HIVE_TABLE,
value="source_table"
),
tasks=[
# Spark task for initial processing
SparkEngineTask()
.add_sql("SELECT * FROM __THIS__ WHERE date = '2024-01-07'"),
# DuckDB task for feature engineering
DuckDBTask(name="feature_eng")
.add_sql("SELECT user_id, amount * 1.1 AS adjusted FROM __THIS__")
],
output=FlowOutput(kind=FlowOutputEnum.SPARK_DATAFRAME)
)
# Flow will create SparkSession only because SparkEngineTask is present
result = flow.run()
Flow with Feature Groups¶
from seeknal.featurestore.duckdbengine.feature_group import (
FeatureGroupDuckDB,
Materialization
)
# Create feature group
fg = FeatureGroupDuckDB(
name="user_features",
entity=Entity(name="user", join_keys=["user_id"]),
materialization=Materialization(
event_time_col="timestamp",
offline_store_path="/features/user"
),
project="my_project"
)
# Use feature group in flow
flow = Flow(
name="feature_pipeline",
input=FlowInput(
kind=FlowInputEnum.FEATURE_GROUP,
value=fg
),
tasks=[
DuckDBTask(name="add_features")
.add_sql("SELECT user_id, feature1, feature2 FROM __THIS__")
],
output=FlowOutput()
)
result = flow.run()
Migration from Spark¶
Step-by-Step Migration¶
Step 1: Change Imports¶
Before (Spark):
from seeknal.tasks.sparkengine import SparkEngineTask
from seeknal.tasks.sparkengine import transformers as T
from seeknal.tasks.sparkengine import aggregators as G
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
After (DuckDB):
from seeknal.tasks.duckdb import DuckDBTask
from seeknal.tasks.duckdb import transformers as T
from seeknal.tasks.duckdb import aggregators as G
import pyarrow as pa
# No SparkSession needed!
Step 2: Update Data Types¶
Before:
After:
# PyArrow Table
import pandas as pd
df = pd.read_parquet("/data/my_data.parquet")
arrow_table = pa.Table.from_pandas(df)
Step 3: Update Task Initialization¶
Before:
After:
Step 4: Update Output Handling¶
Before:
After:
Complete Migration Example¶
Before (Spark):
from pyspark.sql import SparkSession
from seeknal.tasks.sparkengine import SparkEngineTask
from seeknal.tasks.sparkengine.aggregators import Aggregator, FunctionAggregator
spark = SparkSession.builder.getOrCreate()
# Read data
df = spark.read.parquet("/data/transactions.parquet")
# Create task
task = SparkEngineTask()
result = (
task
.add_input(dataframe=df)
.add_sql("SELECT * FROM __THIS__ WHERE amount > 100")
.add_stage(
aggregator=Aggregator(
group_by_cols=["user_id"],
aggregators=[
FunctionAggregator(
inputCol="amount",
outputCol="total",
accumulatorFunction="sum"
)
]
)
)
.transform()
)
# Show result
result.show()
After (DuckDB):
import pandas as pd
import pyarrow as pa
from seeknal.tasks.duckdb import DuckDBTask
from seeknal.tasks.duckdb.aggregators import DuckDBAggregator, FunctionAggregator
# Read data
df = pd.read_parquet("/data/transactions.parquet")
arrow_table = pa.Table.from_pandas(df)
# Create task
task = DuckDBTask(name="aggregate")
result = (
task
.add_input(dataframe=arrow_table)
.add_sql("SELECT * FROM __THIS__ WHERE amount > 100")
.add_stage(
aggregator=DuckDBAggregator(
group_by_cols=["user_id"],
aggregators=[
FunctionAggregator(
inputCol="amount",
outputCol="total",
accumulatorFunction="sum"
)
]
)
)
.transform()
)
# Show result
print(result.to_pandas())
Changes Summary: 1. ✓ Changed imports (SparkEngineTask → DuckDBTask) 2. ✓ Changed data format (PySpark DataFrame → PyArrow Table) 3. ✓ Changed aggregator class (Aggregator → DuckDBAggregator) 4. ✓ Changed output handling (.show() → .to_pandas())
API Compatibility Matrix¶
| Feature | Spark | DuckDB | Notes |
|---|---|---|---|
| SQL transformations | ✓ | ✓ | Same SQL syntax |
| Column operations | ✓ | ✓ | Same API |
| Filtering | ✓ | ✓ | Same API |
| Aggregations | ✓ | ✓ | Same API |
| Joins | ✓ | ✓ | Same API |
| Window functions | ✓ | ✓ | Same API |
| Feature Store | ✓ | ✓ | Same API |
Advanced Examples¶
Example 1: Customer Churn Features¶
from seeknal.tasks.duckdb import DuckDBTask
from seeknal.tasks.duckdb.aggregators import DuckDBAggregator, FunctionAggregator, ExpressionAggregator
from seeknal.tasks.duckdb.transformers import AddWindowFunction, WindowFunction
import pandas as pd
import pyarrow as pa
# Sample transaction data
df = pd.DataFrame({
"customer_id": ["A", "A", "A", "B", "B", "B"],
"transaction_date": ["2024-01-01", "2024-01-05", "2024-01-10",
"2024-01-02", "2024-01-06", "2024-01-11"],
"amount": [100, 150, 200, 80, 120, 90],
"category": ["food", "travel", "food", "food", "travel", "food"]
})
arrow_table = pa.Table.from_pandas(df)
# Pipeline: Calculate churn features
task = DuckDBTask(name="churn_features")
# Step 1: Add LAG features (previous transaction amount)
lag = AddWindowFunction(
inputCol="amount",
offset=1,
windowFunction=WindowFunction.LAG,
partitionCols=["customer_id"],
orderCols=["transaction_date"],
outputCol="prev_amount"
)
# Step 2: Add time since last transaction
task = (
DuckDBTask(name="churn_pipeline")
.add_input(dataframe=arrow_table)
.add_stage(transformer=lag)
.add_new_column(
"DATEDIFF('day', prev_transaction_date, transaction_date)",
"days_since_last"
)
)
# Step 3: Aggregate per customer
agg = DuckDBAggregator(
group_by_cols=["customer_id"],
aggregators=[
FunctionAggregator(inputCol="amount", outputCol="total_amount", accumulatorFunction="sum"),
FunctionAggregator(inputCol="amount", outputCol="avg_amount", accumulatorFunction="avg"),
FunctionAggregator(inputCol="amount", outputCol="transaction_count", accumulatorFunction="count"),
ExpressionAggregator(
expression="MAX(days_since_last)",
outputCol="recency_days"
)
]
)
result = task.add_stage(aggregator=agg).transform()
print(result.to_pandas())
Example 2: Time-Series Features¶
from seeknal.tasks.duckdb.aggregators import LastNDaysAggregator, FunctionAggregator
# Sensor data with timestamps
df = pd.DataFrame({
"sensor_id": ["S1", "S1", "S1", "S1", "S2", "S2", "S2"],
"timestamp": pd.date_range("2024-01-01", periods=7),
"temperature": [20.5, 21.0, 20.8, 21.2, 19.5, 19.8, 20.1],
"humidity": [65, 68, 67, 70, 62, 64, 63]
})
# Format date as string for LastNDaysAggregator
df["date_id"] = df["timestamp"].dt.strftime("%Y%m%d")
arrow_table = pa.Table.from_pandas(df)
# 7-day rolling aggregation
agg = LastNDaysAggregator(
group_by_cols=["sensor_id"],
window=7,
date_col="date_id",
date_pattern="yyyyMMdd",
aggregators=[
FunctionAggregator(
inputCol="temperature",
outputCol="avg_temp_7d",
accumulatorFunction="avg"
),
FunctionAggregator(
inputCol="humidity",
outputCol="avg_humidity_7d",
accumulatorFunction="avg"
)
]
)
result = (
DuckDBTask(name="sensor_features")
.add_input(dataframe=arrow_table)
.add_stage(aggregator=agg)
.transform()
)
print(result.to_pandas())
Example 3: Feature Engineering Pipeline¶
# Complete pipeline: Clean → Transform → Aggregate → Select
df = pd.DataFrame({
"user_id": ["A", "A", "B", "B", "C", "C"] * 10,
"transaction_amount": np.random.randint(10, 500, 60),
"category": np.random.choice(["food", "travel", "entertainment"], 60),
"date": pd.date_range("2024-01-01", periods=60)
})
arrow_table = pa.Table.from_pandas(df)
# Multi-stage pipeline
result = (
DuckDBTask(name="feature_engineering")
.add_input(dataframe=arrow_table)
# Stage 1: Clean data
.add_filter_by_expr("transaction_amount > 0")
# Stage 2: Feature engineering
.add_new_column("transaction_amount * 0.9", "after_discount")
.add_new_column(
"CASE WHEN category = 'food' THEN 1 WHEN category = 'travel' THEN 2 ELSE 3 END",
"category_code"
)
# Stage 3: Aggregation
.add_stage(
aggregator=DuckDBAggregator(
group_by_cols=["user_id", "category_code"],
aggregators=[
FunctionAggregator(
inputCol="transaction_amount",
outputCol="total_spent",
accumulatorFunction="sum"
),
FunctionAggregator(
inputCol="after_discount",
outputCol="total_discounted",
accumulatorFunction="sum"
),
FunctionAggregator(
inputCol="transaction_amount",
outputCol="transaction_count",
accumulatorFunction="count"
)
]
)
)
# Stage 4: Final selection
.select_columns(["user_id", "category_code", "total_spent", "transaction_count"])
.transform()
)
print(f"Final result: {len(result)} aggregated rows")
Performance Tips¶
1. Use Parquet Files¶
# Parquet is faster than CSV for columnar operations
task.add_input(path="/data/my_data.parquet") # ✓ Fast
task.add_input(path="/data/my_data.csv") # ✗ Slower
2. Filter Early¶
# Good: Filter first, then transform
task.add_filter_by_expr("amount > 100")
task.add_new_column("amount * 1.1", "adjusted")
# Bad: Transform all data, then filter
task.add_new_column("amount * 1.1", "adjusted")
task.add_filter_by_expr("amount > 100")
3. Use Aggregations Instead of Window Functions When Possible¶
# Good: Simple aggregation
agg = DuckDBAggregator(
group_by_cols=["user_id"],
aggregators=[
FunctionAggregator(inputCol="amount", outputCol="total", accumulatorFunction="sum")
]
)
# Less efficient: Window function when you just need aggregation
win = AddWindowFunction(
inputCol="amount",
windowFunction=WindowFunction.SUM,
partitionCols=["user_id"],
outputCol="total"
)
4. Batch Operations¶
# Good: Single SQL statement with multiple operations
task.add_sql("""
SELECT
user_id,
amount * 1.1 AS adjusted,
CASE WHEN amount > 100 THEN 'high' ELSE 'low' END AS tier
FROM __THIS__
WHERE amount > 50
""")
# Less efficient: Multiple stages
task.add_filter_by_expr("amount > 50")
task.add_new_column("amount * 1.1", "adjusted")
task.add_new_column("CASE WHEN amount > 100 THEN 'high' ELSE 'low' END", "tier")
Troubleshooting¶
Issue 1: "Table does not exist"¶
Problem:
Solution:
# Register table in DuckDB connection first
task = DuckDBTask(name="example")
task.conn.register("my_table", arrow_table)
# Now use it in joins
from seeknal.tasks.duckdb.transformers import JoinTablesByExpr, TableJoinDef
join = JoinTablesByExpr(
select_stm="a.*, b.value2",
alias="a",
tables=[
TableJoinDef(
table="my_table", # Now this works
alias="b",
joinType=JoinType.LEFT,
joinExpression="a.id = b.id"
)
]
)
Issue 2: "Invalid date format"¶
Problem:
Solution:
# Use correct date pattern
agg = LastNDaysAggregator(
date_col="date_id",
date_pattern="yyyyMMdd", # For "20240107" format
# OR
date_pattern="yyyy-MM-dd", # For "2024-01-07" format
...
)
Issue 3: PyArrow vs Pandas¶
Problem:
Solution:
# Always convert pandas to PyArrow first
import pyarrow as pa
# Wrong
task.add_input(dataframe=df) # pandas DataFrame
# Correct
arrow_table = pa.Table.from_pandas(df)
task.add_input(dataframe=arrow_table) # PyArrow Table
Best Practices¶
1. Always Name Your Tasks¶
# Good
task = DuckDBTask(name="aggregate_user_data")
# Less helpful for debugging
task = DuckDBTask()
2. Use Type Hints¶
from typing import Union
import pyarrow as pa
import pandas as pd
def process_data(
data: Union[pa.Table, pd.DataFrame],
filters: str
) -> pa.Table:
"""Process data with DuckDB."""
if isinstance(data, pd.DataFrame):
data = pa.Table.from_pandas(data)
return (
DuckDBTask(name="process")
.add_input(dataframe=data)
.add_filter_by_expr(filters)
.transform()
)
3. Chain Methods¶
# Good: Method chaining
result = (task
.add_input(dataframe=arrow_table)
.add_filter_by_expr("amount > 100")
.add_new_column("amount * 1.1", "adjusted")
.transform()
)
# Less readable: Separate statements
task.add_input(dataframe=arrow_table)
task.add_filter_by_expr("amount > 100")
task.add_new_column("amount * 1.1", "adjusted")
result = task.transform()
4. Validate Inputs¶
from seeknal.validation import validate_column_name
# Validate column names before using them
validate_column_name("user_id") # OK
validate_column_name("user id") # Raises error (space not allowed)
Next Steps¶
Last Updated: 2026-01-07
Version: 1.0.0
Status: Production Ready ✅