Tasks API Reference¶
This page documents the Tasks module, which provides the core infrastructure for building and executing data processing pipelines in Seeknal. Tasks are the fundamental units of work for data transformations.
Overview¶
The Tasks module provides essential classes for building data processing pipelines:
| Class | Purpose |
|---|---|
Task |
Abstract base class defining the task interface |
SparkEngineTask |
Spark-based task for large-scale data processing |
DuckDBTask |
Lightweight task using DuckDB for local processing |
Stage |
Configuration for a single pipeline stage |
Task¶
The Task class is the abstract base class that defines the interface for all data processing tasks. It provides a common structure for inputs, transformation stages, and outputs.
Bases: ABC
Abstract base class for data processing tasks.
Task defines the interface for all data processing tasks in seeknal. It provides a common structure for defining inputs, transformation stages, and outputs. Subclasses implement specific task types (e.g., SparkTask, DuckDBTask) with their respective execution engines.
A task follows a pipeline pattern where data flows through: 1. Input configuration (add_input) 2. Transformation stages (transform) 3. Output configuration
| ATTRIBUTE | DESCRIPTION |
|---|---|
is_spark_job |
Whether this task runs as a Spark job. If True, the task requires a SparkSession for execution.
TYPE:
|
kind |
The type identifier for this task (e.g., 'spark', 'duckdb'). Used for serialization and task routing.
TYPE:
|
name |
Optional name for the task, used for identification and logging.
TYPE:
|
description |
Optional description explaining the task's purpose.
TYPE:
|
common |
Optional path to a common YAML configuration file containing shared definitions like table schemas or transformations.
TYPE:
|
input |
Optional dictionary containing input configuration, including data sources like Hive tables or DataFrames.
TYPE:
|
stages |
Optional list of transformation stage dictionaries, each defining a step in the data processing pipeline.
TYPE:
|
output |
Optional dictionary containing output configuration, specifying where and how to write the processed data.
TYPE:
|
date |
Optional date string for time-based filtering or partitioning of data during processing.
TYPE:
|
Example
Subclasses implement specific task types::
@dataclass
class SparkTask(Task):
is_spark_job: bool = True
kind: str = "spark"
def add_input(self, hive_table=None, dataframe=None):
# Implementation for Spark inputs
...
def transform(self, spark, chain=True, ...):
# Spark transformation logic
...
Functions¶
add_input(hive_table: Optional[str] = None, dataframe: Optional[Union[DataFrame, Table]] = None)
abstractmethod
¶
Add an input data source to the task.
Configures the task with input data from either a Hive table or an in-memory DataFrame. This method supports method chaining to allow fluent task configuration.
| PARAMETER | DESCRIPTION |
|---|---|
hive_table
|
The fully qualified Hive table name (database.table) to read data from. Mutually exclusive with dataframe.
TYPE:
|
dataframe
|
A PySpark DataFrame or PyArrow Table containing the input data. Mutually exclusive with hive_table.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Task
|
The current task instance for method chaining. |
Example
Configure input from a Hive table::
task.add_input(hive_table="mydb.customers")
Configure input from a DataFrame::
df = spark.read.parquet("/data/customers")
task.add_input(dataframe=df)
Source code in src/seeknal/tasks/base.py
transform(spark: Optional[SparkSession], chain: bool = True, materialize: bool = False, params=None, filters=None, date=None, start_date=None, end_date=None)
abstractmethod
¶
Execute the task's transformation stages on the input data.
Processes the input data through all configured transformation stages and returns the resulting DataFrame. The transformation can optionally be filtered by date ranges.
| PARAMETER | DESCRIPTION |
|---|---|
spark
|
SparkSession instance for executing Spark operations. Required for Spark-based tasks, may be None for other task types.
TYPE:
|
chain
|
If True, stages are executed sequentially with each stage receiving the output of the previous stage. If False, each stage operates on the original input independently.
TYPE:
|
materialize
|
If True, intermediate results are materialized to improve performance for complex transformations with multiple downstream operations.
TYPE:
|
params
|
Optional dictionary of parameters to pass to transformation stages, allowing dynamic configuration of transformations.
DEFAULT:
|
filters
|
Optional dictionary of filter conditions to apply to the data during transformation.
DEFAULT:
|
date
|
Optional date string for single-date filtering.
DEFAULT:
|
start_date
|
Optional start date for date range filtering. Used with end_date to define a date range.
DEFAULT:
|
end_date
|
Optional end date for date range filtering. Used with start_date to define a date range.
DEFAULT:
|
| RETURNS | DESCRIPTION |
|---|---|
DataFrame
|
The transformed data as a PySpark DataFrame or equivalent data structure depending on the task type. |
Example
Execute transformation with date filtering::
result = task.transform(
spark=spark,
start_date="2024-01-01",
end_date="2024-01-31"
)
Source code in src/seeknal/tasks/base.py
add_common_yaml(common_yaml: str)
abstractmethod
¶
Add a common YAML configuration file to the task.
Loads shared configuration definitions from a YAML file, such as table schemas, column mappings, or reusable transformation templates. This enables configuration reuse across multiple tasks.
| PARAMETER | DESCRIPTION |
|---|---|
common_yaml
|
The filesystem path to the common YAML configuration file containing shared definitions.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Task
|
The current task instance for method chaining. |
Example
Load common definitions::
task.add_common_yaml("/config/common_schemas.yaml")
Source code in src/seeknal/tasks/base.py
SparkEngineTask¶
The SparkEngineTask class provides a fluent interface for building Spark-based data transformation pipelines with support for extractors, transformers, aggregators, and loaders.
PySpark-based data pipeline task.
This replaces the Scala-based SparkEngine with pure PySpark.
| PARAMETER | DESCRIPTION |
|---|---|
spark
|
SparkSession
TYPE:
|
Source code in src/seeknal/tasks/sparkengine/py_impl/spark_engine_task.py
Functions¶
add_input(path: str, format: str = 'parquet', **options) -> SparkEngineTask
¶
Add input source.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Input path
TYPE:
|
format
|
File format
TYPE:
|
**options
|
Additional options
DEFAULT:
|
| RETURNS | DESCRIPTION |
|---|---|
SparkEngineTask
|
Self for chaining |
Source code in src/seeknal/tasks/sparkengine/py_impl/spark_engine_task.py
add_stage(stage_id: str, transformer_class: str, params: Dict[str, Any]) -> SparkEngineTask
¶
Add transformation stage.
| PARAMETER | DESCRIPTION |
|---|---|
stage_id
|
Unique stage identifier
TYPE:
|
transformer_class
|
Transformer class name
TYPE:
|
params
|
Transformer parameters
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SparkEngineTask
|
Self for chaining |
Source code in src/seeknal/tasks/sparkengine/py_impl/spark_engine_task.py
add_output(path: str, format: str = 'parquet', **options) -> SparkEngineTask
¶
Add output destination.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Output path
TYPE:
|
format
|
File format
TYPE:
|
**options
|
Additional options
DEFAULT:
|
| RETURNS | DESCRIPTION |
|---|---|
SparkEngineTask
|
Self for chaining |
Source code in src/seeknal/tasks/sparkengine/py_impl/spark_engine_task.py
transform() -> DataFrame
¶
Execute transformation pipeline.
| RETURNS | DESCRIPTION |
|---|---|
DataFrame
|
Transformed DataFrame |
Source code in src/seeknal/tasks/sparkengine/py_impl/spark_engine_task.py
evaluate() -> None
¶
Execute pipeline and write output.
Source code in src/seeknal/tasks/sparkengine/py_impl/spark_engine_task.py
Stage¶
The Stage class represents a single step in a SparkEngine transformation pipeline.
Pipeline stage.
| PARAMETER | DESCRIPTION |
|---|---|
stage_id
|
Unique stage identifier
TYPE:
|
transformer_class
|
Transformer class name
TYPE:
|
params
|
Transformer parameters
TYPE:
|
Source code in src/seeknal/tasks/sparkengine/py_impl/spark_engine_task.py
Functions¶
get_transformer()
¶
Get transformer instance.
| RETURNS | DESCRIPTION |
|---|---|
|
Transformer instance |
Source code in src/seeknal/tasks/sparkengine/py_impl/spark_engine_task.py
DuckDBTask¶
The DuckDBTask class provides a lightweight task implementation using DuckDB for smaller datasets and local development.
Bases: Task
DuckDB-based data transformation task.
Provides the same API as SparkEngineTask but uses DuckDB for execution.
Key Differences from SparkEngineTask: - Uses PyArrow Tables instead of PySpark DataFrames - Executes pure Python + SQL, no JVM - Better for single-node, small-to-medium datasets (<100M rows)
Example
from seeknal.tasks.duckdb import DuckDBTask
task = DuckDBTask(name="process_data") result = task.add_input(path="data.parquet") \ ... .add_sql("SELECT * FROM THIS WHERE amount > 100") \ ... .add_new_column("amount * 1.1", "adjusted") \ ... .transform()
Functions¶
add_input(dataframe: Optional[Table] = None, path: Optional[str] = None, sql: Optional[str] = None)
¶
Add input data source.
Supports PyArrow Tables, file paths, or raw SQL.
| PARAMETER | DESCRIPTION |
|---|---|
dataframe
|
PyArrow Table with input data
TYPE:
|
path
|
File path (Parquet, CSV, etc.)
TYPE:
|
sql
|
SQL query to generate input data
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
DuckDBTask
|
self for method chaining |
Source code in src/seeknal/tasks/duckdb/duckdb.py
add_common_yaml(common_yaml: str)
¶
add_sql(statement: str)
¶
Add SQL transformation stage.
| PARAMETER | DESCRIPTION |
|---|---|
statement
|
SQL statement to execute
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
DuckDBTask
|
self for method chaining |
Example
task.add_sql("SELECT * FROM THIS WHERE amount > 100")
Source code in src/seeknal/tasks/duckdb/duckdb.py
add_new_column(expression: str, output_col: str)
¶
Add computed column.
| PARAMETER | DESCRIPTION |
|---|---|
expression
|
SQL expression to compute the column
TYPE:
|
output_col
|
Name of the new column
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
DuckDBTask
|
self for method chaining |
Example
task.add_new_column("amount * 1.1", "adjusted_amount")
Source code in src/seeknal/tasks/duckdb/duckdb.py
add_filter_by_expr(expression: str)
¶
Filter rows by expression.
| PARAMETER | DESCRIPTION |
|---|---|
expression
|
SQL boolean expression
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
DuckDBTask
|
self for method chaining |
Example
task.add_filter_by_expr("status = 'active' AND amount > 0")
Source code in src/seeknal/tasks/duckdb/duckdb.py
select_columns(columns: List[str])
¶
Select specific columns.
| PARAMETER | DESCRIPTION |
|---|---|
columns
|
List of column names to keep
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
DuckDBTask
|
self for method chaining |
Example
task.select_columns(["user_id", "name", "email"])
Source code in src/seeknal/tasks/duckdb/duckdb.py
drop_columns(columns: List[str])
¶
Drop columns.
| PARAMETER | DESCRIPTION |
|---|---|
columns
|
List of column names to drop
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
DuckDBTask
|
self for method chaining |
Example
task.drop_columns(["temp_col", "debug_info"])
Source code in src/seeknal/tasks/duckdb/duckdb.py
add_stage(transformer: Optional[DuckDBTransformer] = None, aggregator: Optional[DuckDBAggregator] = None, id: Optional[str] = None, class_name: Optional[str] = None, params: Optional[dict] = None)
¶
Add transformation stage to pipeline.
Mirrors SparkEngineTask.add_stage() API.
| PARAMETER | DESCRIPTION |
|---|---|
transformer
|
DuckDBTransformer instance
TYPE:
|
aggregator
|
DuckDBAggregator instance
TYPE:
|
id
|
Reference to predefined transformation
TYPE:
|
class_name
|
Fully qualified class name
TYPE:
|
params
|
Parameters for class_name
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
DuckDBTask
|
self for method chaining |
Source code in src/seeknal/tasks/duckdb/duckdb.py
transform(spark=None, chain: bool = True, materialize: bool = False, params=None, filters=None, date=None, start_date=None, end_date=None) -> Union[Table, DuckDBTask]
¶
Execute the transformation pipeline.
| PARAMETER | DESCRIPTION |
|---|---|
spark
|
Ignored (for API compatibility with SparkEngineTask)
DEFAULT:
|
chain
|
If True, chain stages sequentially
TYPE:
|
materialize
|
If True, return self for further operations
TYPE:
|
params
|
Optional parameters for transformations - return_as_pandas: bool, return pandas DataFrame instead of PyArrow
DEFAULT:
|
filters
|
Optional filters (not yet implemented)
DEFAULT:
|
date
|
Optional single date filter (not yet implemented)
DEFAULT:
|
start_date
|
Optional start date filter (not yet implemented)
DEFAULT:
|
end_date
|
Optional end date filter (not yet implemented)
DEFAULT:
|
| RETURNS | DESCRIPTION |
|---|---|
Union[Table, DuckDBTask]
|
PyArrow Table if materialize=False |
Union[Table, DuckDBTask]
|
DuckDBTask if materialize=True |
Union[Table, DuckDBTask]
|
Pandas DataFrame if params['return_as_pandas']=True |
Source code in src/seeknal/tasks/duckdb/duckdb.py
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 | |
SecondOrderAggregator¶
Perform second-order aggregations (aggregations on aggregated data).
Perform second-order aggregations with time windows using PySpark.
This class enables feature engineering by creating second-order features from transaction-level or event-level data. It supports various aggregation strategies that can be combined to generate a rich feature set.
Supported Aggregation Types
- basic: Simple aggregation over the entire history per ID.
- Format:
AggregationSpec("basic", "feature_col", "agg_func") -
Example:
AggregationSpec("basic", "amount", "sum")-> Sum of amount for each user. -
basic_days: Aggregation over a specific time window defined by
days_between. - Format:
AggregationSpec("basic_days", "feature_col", "agg_func", "", lower, upper) -
Example:
AggregationSpec("basic_days", "amount", "mean", "", 1, 30)-> Mean amount in the last 1-30 days. -
ratio: Ratio of two aggregations over different time windows.
- Format:
AggregationSpec("ratio", "feature_col", "agg_func", "", lower1, upper1, lower2, upper2) -
Example:
AggregationSpec("ratio", "amount", "sum", "", 1, 30, 31, 60)-> (Sum 1-30 days) / (Sum 31-60 days). -
since: Aggregation filtered by a custom condition.
- Format:
AggregationSpec("since", "feature_col", "agg_func", "condition") - Example:
AggregationSpec("since", "flag", "count", "flag == 1")-> Count of flag=1 events.
| PARAMETER | DESCRIPTION |
|---|---|
spark
|
SparkSession for executing queries
TYPE:
|
idCol
|
Column name representing the entity ID (e.g., 'user_id', 'msisdn')
TYPE:
|
featureDateCol
|
Column name representing the date of the event/transaction
TYPE:
|
featureDateFormat
|
Format of the feature date column (default: "yyyy-MM-dd")
TYPE:
|
applicationDateCol
|
Column name representing the reference date (e.g., application date)
TYPE:
|
applicationDateFormat
|
Format of the application date column (default: "yyyy-MM-dd")
TYPE:
|
The transform method applies the aggregation rules to a table and returns a DataFrame.
Source code in src/seeknal/tasks/sparkengine/py_impl/aggregators/second_order_aggregator.py
Functions¶
setRules(rules: List[AggregationSpec]) -> SecondOrderAggregator
¶
Set aggregation rules.
| PARAMETER | DESCRIPTION |
|---|---|
rules
|
List of AggregationSpec objects
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
SecondOrderAggregator
|
self for method chaining |
Source code in src/seeknal/tasks/sparkengine/py_impl/aggregators/second_order_aggregator.py
transform(table_name: str) -> DataFrame
¶
Transform the input table using the defined aggregation rules.
| PARAMETER | DESCRIPTION |
|---|---|
table_name
|
Name of the table or view to aggregate
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
DataFrame
|
DataFrame with aggregated features |
Source code in src/seeknal/tasks/sparkengine/py_impl/aggregators/second_order_aggregator.py
validate(table_name: str) -> List[str]
¶
Validate that the input table has the required columns.
| PARAMETER | DESCRIPTION |
|---|---|
table_name
|
Name of the table to validate
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
List[str]
|
List of error messages (empty if validation passes) |
Source code in src/seeknal/tasks/sparkengine/py_impl/aggregators/second_order_aggregator.py
builder() -> FeatureBuilder
¶
Returns a FeatureBuilder instance for fluent API.
| RETURNS | DESCRIPTION |
|---|---|
FeatureBuilder
|
FeatureBuilder for chaining aggregation rules |