Sources¶
Sources define where your data comes from and how to ingest it into Seeknal.
Overview¶
Sources are the starting point for any Seeknal pipeline. They connect to your data systems and bring data into the Seeknal ecosystem.
Source Types¶
File Sources¶
Read from files in various formats:
CSV Files:
Parquet Files:
JSON Files:
Database Sources¶
Connect to databases for batch or incremental ingestion:
PostgreSQL:
MySQL:
Iceberg Sources¶
Read from existing Iceberg tables via Lakekeeper REST catalog:
name: orders
kind: source
source: iceberg
table: atlas.my_namespace.orders
params:
catalog_uri: http://lakekeeper:8181
warehouse: seeknal-warehouse
The table must use 3-part format: catalog.namespace.table. Connection details:
- catalog_uri: Lakekeeper URL (or set LAKEKEEPER_URL env var)
- warehouse: Warehouse name (default: seeknal-warehouse, or set LAKEKEEPER_WAREHOUSE env var)
S3 and OAuth2 credentials are read from environment variables:
- AWS_ENDPOINT_URL, AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY
- KEYCLOAK_TOKEN_URL, KEYCLOAK_CLIENT_ID, KEYCLOAK_CLIENT_SECRET
JSON / JSONL Sources¶
Ingest data from JSON or JSONL files:
name: json_ingestion
kind: source
source: jsonl
table: "data/events.jsonl"
columns:
event_id: "Unique event identifier"
event_type: "Type of event"
payload: "Event payload data"
Source Configuration¶
Common Options¶
| Option | Type | Description | Default |
|---|---|---|---|
name |
string | Unique identifier for the source | Required |
kind |
string | Source type (csv, parquet, postgresql, etc.) | Required |
table |
string | Table or file path | Required |
description |
string | Human-readable description | Optional |
tags |
list | Organizational tags | Optional |
Column Definition¶
Explicitly define columns for type safety:
name: typed_sales
kind: source
source: csv
table: data/sales.csv
columns:
transaction_id: int
date: date
customer_id: string
amount: float
Python Sources¶
Define sources using Python decorators:
from seeknal.workflow.decorators import source
@source(
name="python_source",
output="raw_data"
)
def get_data():
import pandas as pd
return pd.read_csv("data/sales.csv")
Incremental Sources¶
Configure sources for incremental updates:
name: incremental_orders
kind: source
source: postgresql
connection_ref: my_db
table: app.orders
incremental: true
incremental_key: updated_at
Iceberg Materialization¶
Persist source data as Iceberg tables in S3/MinIO via Lakekeeper catalog:
name: customers
kind: source
source: csv
table: "customers.csv"
schema:
- name: customer_id
data_type: integer
- name: name
data_type: string
- name: region
data_type: string
materialization:
enabled: true
mode: overwrite # overwrite or append
table: atlas.production.customers # 3-part name: catalog.namespace.table
| Field | Description |
|---|---|
enabled |
Set true to write output to Iceberg |
mode |
overwrite (full refresh) or append (accumulate) |
table |
Fully qualified name: catalog.namespace.table |
See Iceberg Materialization for full setup guide.
Best Practices¶
- Use explicit column definitions for type safety
- Tag sources for organization (e.g.,
raw,staging,production) - Use connection references instead of hardcoded credentials
- Configure incremental sources for large datasets
- Add descriptions for documentation
- Use
overwritemode for dimension/reference data sources - Use
appendmode for event/transaction data that accumulates
Related Topics¶
- Transforms - Process source data
- Incremental Processing - Advanced source patterns
- Connections - Configure database connections
- Iceberg Materialization - Persist data to Iceberg tables
Next: Learn about Transforms or return to Building Blocks