Flows & Pipelines¶
This guide demonstrates how to create and manage data processing pipelines in Seeknal. Seeknal supports three pipeline approaches:
- YAML Pipelines (recommended) - Declarative YAML definitions in the
seeknal/directory - Python Decorator Pipelines -
@source,@transform,@materializedecorators in.pyfiles - Legacy Spark Flows - Programmatic
Flowobjects with SparkEngineTask/DuckDBTask
YAML Pipelines¶
YAML pipelines are the primary way to define data transformations. They live in your project's seeknal/ directory and are executed via the CLI.
Project Structure¶
my_project/
├── seeknal.yml # Project configuration
├── profiles.yml # Connection profiles
└── seeknal/
├── sources/
│ └── raw_orders.yml # Data source definitions
└── transforms/
└── order_summary.yml # Transformation definitions
Defining a Source¶
# seeknal/sources/raw_orders.yml
name: raw_orders
source: csv
table: data/orders.csv
columns:
order_id: VARCHAR
customer_id: VARCHAR
amount: DOUBLE
order_date: DATE
Defining a Source from PostgreSQL¶
# seeknal/sources/pg_customers.yml
name: pg_customers
source: postgresql
connection: local_pg
table: public.customers
Defining a Transform¶
# seeknal/transforms/order_summary.yml
name: order_summary
inputs:
- ref: source.raw_orders
sql: |
SELECT
customer_id,
CAST(COUNT(*) AS BIGINT) AS order_count,
CAST(SUM(amount) AS DOUBLE) AS total_spend,
MIN(order_date) AS first_order,
MAX(order_date) AS last_order
FROM input_0
GROUP BY customer_id
Multi-Input Transform¶
# seeknal/transforms/customer_orders.yml
name: customer_orders
inputs:
- ref: source.raw_orders
- ref: source.pg_customers
sql: |
SELECT
c.customer_id,
c.name,
CAST(COUNT(o.order_id) AS BIGINT) AS order_count,
CAST(SUM(o.amount) AS DOUBLE) AS total_spend
FROM input_1 c
LEFT JOIN input_0 o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name
Transform with Named Refs¶
Instead of positional input_0, input_1, use named refs:
# seeknal/transforms/enriched_orders.yml
name: enriched_orders
inputs:
- ref: source.raw_orders
- ref: source.pg_customers
sql: |
SELECT
o.*,
c.name AS customer_name
FROM ref('source.raw_orders') o
JOIN ref('source.pg_customers') c ON o.customer_id = c.customer_id
Adding Materialization¶
Write pipeline outputs to external storage targets:
# seeknal/transforms/order_summary.yml
name: order_summary
inputs:
- ref: source.raw_orders
sql: |
SELECT
customer_id,
CAST(COUNT(*) AS BIGINT) AS order_count,
CAST(SUM(amount) AS DOUBLE) AS total_spend
FROM input_0
GROUP BY customer_id
# Single target materialization
materialization:
type: postgresql
connection: local_pg
table: analytics.order_summary
mode: full
Multi-Target Materialization¶
Write to multiple targets from a single transform:
# seeknal/transforms/order_summary.yml
name: order_summary
inputs:
- ref: source.raw_orders
sql: |
SELECT
customer_id,
CAST(COUNT(*) AS BIGINT) AS order_count,
CAST(SUM(amount) AS DOUBLE) AS total_spend
FROM input_0
GROUP BY customer_id
# Multi-target: write to both PostgreSQL and Iceberg
materializations:
- type: postgresql
connection: local_pg
table: analytics.order_summary
mode: upsert_by_key
unique_keys: [customer_id]
- type: iceberg
table: atlas.analytics.order_summary
Running YAML Pipelines¶
# Preview what will be executed
seeknal dry-run
# Execute the pipeline
seeknal apply
# Execute in parallel
seeknal apply --parallel --max-workers 8
# Query results in REPL
seeknal repl
YAML Pipeline Tips
- Use
CAST(COUNT(*) AS BIGINT)andCAST(SUM(...) AS DOUBLE)for Iceberg targets (DuckDB HUGEINT is not supported by Iceberg) - Use
ref('source.X')named refs for readability; positionalinput_0also works - Use
seeknal dry-runto preview the execution plan before applying
Python Decorator Pipelines¶
For pipelines that need Python logic beyond SQL, use decorators in .py files under seeknal/pipelines/.
Basic Pipeline¶
# seeknal/pipelines/customer_pipeline.py
# /// script
# dependencies = ["pandas", "duckdb"]
# ///
from seeknal.pipeline.decorators import source, transform
@source(name="raw_users", source="csv", table="data/users.csv")
def raw_users():
pass
@transform(name="active_users", inputs=["source.raw_users"])
def active_users(ctx):
df = ctx.ref("source.raw_users")
return ctx.duckdb.sql("""
SELECT * FROM df WHERE status = 'active'
""").df()
Pipeline with Materialization¶
# seeknal/pipelines/analytics_pipeline.py
# /// script
# dependencies = ["pandas", "duckdb"]
# ///
from seeknal.pipeline.decorators import source, transform, materialize
@source(name="orders", source="csv", table="data/orders.csv")
def orders():
pass
# Multi-target materialization with stacked decorators
@transform(name="order_metrics", inputs=["source.orders"])
@materialize(
type="postgresql",
connection="local_pg",
table="analytics.order_metrics",
mode="upsert_by_key",
unique_keys=["customer_id"],
)
@materialize(
type="iceberg",
table="atlas.analytics.order_metrics",
)
def order_metrics(ctx):
df = ctx.ref("source.orders")
return ctx.duckdb.sql("""
SELECT
customer_id,
CAST(COUNT(*) AS BIGINT) AS order_count,
CAST(SUM(amount) AS DOUBLE) AS total_spend
FROM df
GROUP BY customer_id
""").df()
Pipeline with PostgreSQL Source¶
# seeknal/pipelines/pg_pipeline.py
# /// script
# dependencies = ["pandas", "duckdb"]
# ///
from seeknal.pipeline.decorators import source, transform
@source(
name="pg_orders",
source="postgres",
connection="local_pg",
query="SELECT * FROM orders WHERE status = 'active'",
)
def pg_orders():
pass
@transform(name="order_analysis", inputs=["source.pg_orders"])
def order_analysis(ctx):
df = ctx.ref("source.pg_orders")
return ctx.duckdb.sql("""
SELECT
region,
CAST(COUNT(*) AS BIGINT) AS order_count,
CAST(AVG(amount) AS DOUBLE) AS avg_amount
FROM df
GROUP BY region
""").df()
Feature Group Decorator¶
from seeknal.pipeline.decorators import source, feature_group
from seeknal.pipeline.materialization import Materialization, OfflineConfig
@source(name="user_activity", source="parquet", table="data/activity.parquet")
def user_activity():
pass
@feature_group(
name="user_engagement",
entity="user",
inputs=["source.user_activity"],
materialization=Materialization(offline=OfflineConfig(format="parquet")),
)
def user_engagement(ctx):
df = ctx.ref("source.user_activity")
return ctx.duckdb.sql("""
SELECT
user_id,
CAST(COUNT(*) AS BIGINT) AS session_count,
CAST(AVG(duration) AS DOUBLE) AS avg_duration
FROM df
GROUP BY user_id
""").df()
Python Pipeline Tips
- Add PEP 723
# /// scriptmetadata at the top with dependencies - Use
ctx.ref("source.X")to reference upstream nodes - Use
ctx.duckdb.sql(...)for DuckDB SQL queries - Stack multiple
@materializedecorators for multi-target output - Assign
ctx.ref()results to a local variable before using in DuckDB SQL
Legacy: Spark Flows¶
The Flow API provides programmatic pipeline creation using SparkEngineTask or DuckDBTask. This approach is suited for Spark-based distributed processing.
Note: For new projects, prefer YAML pipelines or Python decorator pipelines. The Flow API is maintained for backward compatibility with existing Spark-based projects.
Prerequisites¶
- Apache Spark or PySpark installed
- SparkSession available
Basic Flow with SparkEngineTask¶
from seeknal.flow import Flow, FlowInput, FlowOutput, FlowInputEnum, FlowOutputEnum
from seeknal.tasks.sparkengine import SparkEngineTask
# Define input from Hive table
flow_input = FlowInput(
value="my_database.source_table",
kind=FlowInputEnum.HIVE_TABLE
)
# Create transformation task with SQL
transform_task = (
SparkEngineTask(name="filter_and_transform")
.add_sql("SELECT user_id, event_type, amount FROM __THIS__ WHERE amount > 0")
.add_new_column("amount * 1.1", "adjusted_amount")
.add_filter_by_expr("event_type = 'purchase'")
)
# Create flow
spark_flow = Flow(
name="spark_transformation_flow",
input=flow_input,
tasks=[transform_task],
output=FlowOutput(kind=FlowOutputEnum.SPARK_DATAFRAME),
description="Flow with Spark transformations"
)
result = spark_flow.run()
result.show()
Flow with DuckDBTask¶
from seeknal.flow import Flow, FlowInput, FlowOutput, FlowInputEnum, FlowOutputEnum
from seeknal.tasks.duckdb import DuckDBTask
# DuckDB transformation
duckdb_task = (
DuckDBTask()
.add_sql("""
SELECT
user_id,
COUNT(*) as event_count,
SUM(amount) as total_amount
FROM __THIS__
GROUP BY user_id
""")
)
duckdb_flow = Flow(
name="duckdb_aggregation_flow",
input=FlowInput(value="/data/events.parquet", kind=FlowInputEnum.PARQUET),
tasks=[duckdb_task],
output=FlowOutput(kind=FlowOutputEnum.ARROW_DATAFRAME),
)
result = duckdb_flow.run()
print(result.to_pandas())
Chaining Multiple Tasks¶
from seeknal.flow import Flow, FlowInput, FlowOutput, FlowInputEnum, FlowOutputEnum
from seeknal.tasks.sparkengine import SparkEngineTask
# Chain multiple tasks - each receives the output of the previous
cleaning_task = (
SparkEngineTask(name="data_cleaning")
.add_filter_by_expr("user_id IS NOT NULL")
.add_sql("SELECT DISTINCT * FROM __THIS__")
)
feature_task = (
SparkEngineTask(name="feature_engineering")
.add_new_column("DATEDIFF(current_date(), signup_date)", "days_since_signup")
.add_new_column("total_purchases / order_count", "avg_order_value")
)
aggregation_task = (
SparkEngineTask(name="aggregation")
.add_sql("""
SELECT segment, COUNT(*) as user_count, AVG(avg_order_value) as segment_avg
FROM __THIS__
GROUP BY segment
""")
)
multi_task_flow = Flow(
name="multi_stage_pipeline",
input=FlowInput(value="analytics.user_data", kind=FlowInputEnum.HIVE_TABLE),
tasks=[cleaning_task, feature_task, aggregation_task],
output=FlowOutput(kind=FlowOutputEnum.SPARK_DATAFRAME),
)
result = multi_task_flow.run()
result.show()
Saving and Loading Flows¶
from seeknal.flow import Flow
# Save to backend (idempotent)
my_flow = my_flow.get_or_create()
print(f"Flow ID: {my_flow.flow_id}")
# List all flows
Flow.list()
# Export as YAML for version control
flow_yaml = my_flow.as_yaml()
Task Execution Order
Tasks are executed sequentially in the order they appear in the list. Each task receives the output of the previous task as input.
Best Practices¶
Choosing a Pipeline Approach
- YAML pipelines: Best for SQL-centric transforms, team collaboration, and reproducibility
- Python decorator pipelines: Best when transforms need Python logic beyond SQL
- Spark Flows: Best for distributed processing of large datasets (>100M rows)
Performance
- Use
seeknal apply --parallelfor parallel execution of independent nodes - Use DuckDB-based approaches for datasets under 100M rows
- Use virtual environments (
seeknal plan dev) to test changes safely - Chain filters early in the pipeline to reduce data volume
Iceberg Compatibility
When writing to Iceberg tables, always cast aggregation results:
- CAST(COUNT(*) AS BIGINT) instead of bare COUNT(*)
- CAST(SUM(...) AS DOUBLE) instead of bare SUM(...)
DuckDB returns HUGEINT for these, which Iceberg does not support.
Next Steps¶
- Feature Store - Store computed features (FeatureStore Example)
- DAG Tutorial - Dependency tracking and incremental builds (DAG Tutorial)
- Virtual Environments - Test changes safely (Virtual Environments)
- CLI Reference - Full command reference (CLI Reference)