Skip to content

Tasks API Reference

This page documents the Tasks module, which provides the core infrastructure for building and executing data processing pipelines in Seeknal. Tasks are the fundamental units of work for data transformations.

Overview

The Tasks module provides essential classes for building data processing pipelines:

Class Purpose
Task Abstract base class defining the task interface
SparkEngineTask Spark-based task for large-scale data processing
DuckDBTask Lightweight task using DuckDB for local processing
Stage Configuration for a single pipeline stage

Task

The Task class is the abstract base class that defines the interface for all data processing tasks. It provides a common structure for inputs, transformation stages, and outputs.

Bases: ABC

Abstract base class for data processing tasks.

Task defines the interface for all data processing tasks in seeknal. It provides a common structure for defining inputs, transformation stages, and outputs. Subclasses implement specific task types (e.g., SparkTask, DuckDBTask) with their respective execution engines.

A task follows a pipeline pattern where data flows through: 1. Input configuration (add_input) 2. Transformation stages (transform) 3. Output configuration

ATTRIBUTE DESCRIPTION
is_spark_job

Whether this task runs as a Spark job. If True, the task requires a SparkSession for execution.

TYPE: bool

kind

The type identifier for this task (e.g., 'spark', 'duckdb'). Used for serialization and task routing.

TYPE: str

name

Optional name for the task, used for identification and logging.

TYPE: Optional[str]

description

Optional description explaining the task's purpose.

TYPE: Optional[str]

common

Optional path to a common YAML configuration file containing shared definitions like table schemas or transformations.

TYPE: Optional[str]

input

Optional dictionary containing input configuration, including data sources like Hive tables or DataFrames.

TYPE: Optional[dict]

stages

Optional list of transformation stage dictionaries, each defining a step in the data processing pipeline.

TYPE: Optional[List[dict]]

output

Optional dictionary containing output configuration, specifying where and how to write the processed data.

TYPE: Optional[dict]

date

Optional date string for time-based filtering or partitioning of data during processing.

TYPE: Optional[str]

Example

Subclasses implement specific task types::

@dataclass
class SparkTask(Task):
    is_spark_job: bool = True
    kind: str = "spark"

    def add_input(self, hive_table=None, dataframe=None):
        # Implementation for Spark inputs
        ...

    def transform(self, spark, chain=True, ...):
        # Spark transformation logic
        ...

Functions

add_input(hive_table: Optional[str] = None, dataframe: Optional[Union[DataFrame, Table]] = None) abstractmethod

Add an input data source to the task.

Configures the task with input data from either a Hive table or an in-memory DataFrame. This method supports method chaining to allow fluent task configuration.

PARAMETER DESCRIPTION
hive_table

The fully qualified Hive table name (database.table) to read data from. Mutually exclusive with dataframe.

TYPE: Optional[str] DEFAULT: None

dataframe

A PySpark DataFrame or PyArrow Table containing the input data. Mutually exclusive with hive_table.

TYPE: Optional[Union[DataFrame, Table]] DEFAULT: None

RETURNS DESCRIPTION
Task

The current task instance for method chaining.

Example

Configure input from a Hive table::

task.add_input(hive_table="mydb.customers")

Configure input from a DataFrame::

df = spark.read.parquet("/data/customers")
task.add_input(dataframe=df)
Source code in src/seeknal/tasks/base.py
@abstractmethod
def add_input(
    self,
    hive_table: Optional[str] = None,
    dataframe: Optional[Union[DataFrame, Table]] = None,
):
    """Add an input data source to the task.

    Configures the task with input data from either a Hive table or
    an in-memory DataFrame. This method supports method chaining to
    allow fluent task configuration.

    Args:
        hive_table: The fully qualified Hive table name (database.table)
            to read data from. Mutually exclusive with dataframe.
        dataframe: A PySpark DataFrame or PyArrow Table containing
            the input data. Mutually exclusive with hive_table.

    Returns:
        Task: The current task instance for method chaining.

    Example:
        Configure input from a Hive table::

            task.add_input(hive_table="mydb.customers")

        Configure input from a DataFrame::

            df = spark.read.parquet("/data/customers")
            task.add_input(dataframe=df)
    """
    return self

transform(spark: Optional[SparkSession], chain: bool = True, materialize: bool = False, params=None, filters=None, date=None, start_date=None, end_date=None) abstractmethod

Execute the task's transformation stages on the input data.

Processes the input data through all configured transformation stages and returns the resulting DataFrame. The transformation can optionally be filtered by date ranges.

PARAMETER DESCRIPTION
spark

SparkSession instance for executing Spark operations. Required for Spark-based tasks, may be None for other task types.

TYPE: Optional[SparkSession]

chain

If True, stages are executed sequentially with each stage receiving the output of the previous stage. If False, each stage operates on the original input independently.

TYPE: bool DEFAULT: True

materialize

If True, intermediate results are materialized to improve performance for complex transformations with multiple downstream operations.

TYPE: bool DEFAULT: False

params

Optional dictionary of parameters to pass to transformation stages, allowing dynamic configuration of transformations.

DEFAULT: None

filters

Optional dictionary of filter conditions to apply to the data during transformation.

DEFAULT: None

date

Optional date string for single-date filtering.

DEFAULT: None

start_date

Optional start date for date range filtering. Used with end_date to define a date range.

DEFAULT: None

end_date

Optional end date for date range filtering. Used with start_date to define a date range.

DEFAULT: None

RETURNS DESCRIPTION
DataFrame

The transformed data as a PySpark DataFrame or equivalent data structure depending on the task type.

Example

Execute transformation with date filtering::

result = task.transform(
    spark=spark,
    start_date="2024-01-01",
    end_date="2024-01-31"
)
Source code in src/seeknal/tasks/base.py
@abstractmethod
def transform(
    self,
    spark: Optional[SparkSession],
    chain: bool = True,
    materialize: bool = False,
    params=None,
    filters=None,
    date=None,
    start_date=None,
    end_date=None,
):
    """Execute the task's transformation stages on the input data.

    Processes the input data through all configured transformation stages
    and returns the resulting DataFrame. The transformation can optionally
    be filtered by date ranges.

    Args:
        spark: SparkSession instance for executing Spark operations.
            Required for Spark-based tasks, may be None for other task types.
        chain: If True, stages are executed sequentially with each stage
            receiving the output of the previous stage. If False, each stage
            operates on the original input independently.
        materialize: If True, intermediate results are materialized to
            improve performance for complex transformations with multiple
            downstream operations.
        params: Optional dictionary of parameters to pass to transformation
            stages, allowing dynamic configuration of transformations.
        filters: Optional dictionary of filter conditions to apply to
            the data during transformation.
        date: Optional date string for single-date filtering.
        start_date: Optional start date for date range filtering.
            Used with end_date to define a date range.
        end_date: Optional end date for date range filtering.
            Used with start_date to define a date range.

    Returns:
        DataFrame: The transformed data as a PySpark DataFrame or
            equivalent data structure depending on the task type.

    Example:
        Execute transformation with date filtering::

            result = task.transform(
                spark=spark,
                start_date="2024-01-01",
                end_date="2024-01-31"
            )
    """
    pass

add_common_yaml(common_yaml: str) abstractmethod

Add a common YAML configuration file to the task.

Loads shared configuration definitions from a YAML file, such as table schemas, column mappings, or reusable transformation templates. This enables configuration reuse across multiple tasks.

PARAMETER DESCRIPTION
common_yaml

The filesystem path to the common YAML configuration file containing shared definitions.

TYPE: str

RETURNS DESCRIPTION
Task

The current task instance for method chaining.

Example

Load common definitions::

task.add_common_yaml("/config/common_schemas.yaml")
Source code in src/seeknal/tasks/base.py
@abstractmethod
def add_common_yaml(self, common_yaml: str):
    """Add a common YAML configuration file to the task.

    Loads shared configuration definitions from a YAML file, such as
    table schemas, column mappings, or reusable transformation templates.
    This enables configuration reuse across multiple tasks.

    Args:
        common_yaml: The filesystem path to the common YAML configuration
            file containing shared definitions.

    Returns:
        Task: The current task instance for method chaining.

    Example:
        Load common definitions::

            task.add_common_yaml("/config/common_schemas.yaml")
    """
    return self

SparkEngineTask

The SparkEngineTask class provides a fluent interface for building Spark-based data transformation pipelines with support for extractors, transformers, aggregators, and loaders.

PySpark-based data pipeline task.

This replaces the Scala-based SparkEngine with pure PySpark.

PARAMETER DESCRIPTION
spark

SparkSession

TYPE: SparkSession

Source code in src/seeknal/tasks/sparkengine/py_impl/spark_engine_task.py
def __init__(self, spark: SparkSession):
    self.spark = spark
    self.input_config: Optional[Dict[str, Any]] = None
    self.stages: List[Stage] = []
    self.output_config: Optional[Dict[str, Any]] = None

Functions

add_input(path: str, format: str = 'parquet', **options) -> SparkEngineTask

Add input source.

PARAMETER DESCRIPTION
path

Input path

TYPE: str

format

File format

TYPE: str DEFAULT: 'parquet'

**options

Additional options

DEFAULT: {}

RETURNS DESCRIPTION
SparkEngineTask

Self for chaining

Source code in src/seeknal/tasks/sparkengine/py_impl/spark_engine_task.py
def add_input(self, path: str, format: str = "parquet", **options) -> "SparkEngineTask":
    """Add input source.

    Args:
        path: Input path
        format: File format
        **options: Additional options

    Returns:
        Self for chaining
    """
    self.input_config = {"path": path, "format": format, "options": options}
    return self

add_stage(stage_id: str, transformer_class: str, params: Dict[str, Any]) -> SparkEngineTask

Add transformation stage.

PARAMETER DESCRIPTION
stage_id

Unique stage identifier

TYPE: str

transformer_class

Transformer class name

TYPE: str

params

Transformer parameters

TYPE: Dict[str, Any]

RETURNS DESCRIPTION
SparkEngineTask

Self for chaining

Source code in src/seeknal/tasks/sparkengine/py_impl/spark_engine_task.py
def add_stage(
    self,
    stage_id: str,
    transformer_class: str,
    params: Dict[str, Any]
) -> "SparkEngineTask":
    """Add transformation stage.

    Args:
        stage_id: Unique stage identifier
        transformer_class: Transformer class name
        params: Transformer parameters

    Returns:
        Self for chaining
    """
    stage = Stage(stage_id, transformer_class, params)
    self.stages.append(stage)
    return self

add_output(path: str, format: str = 'parquet', **options) -> SparkEngineTask

Add output destination.

PARAMETER DESCRIPTION
path

Output path

TYPE: str

format

File format

TYPE: str DEFAULT: 'parquet'

**options

Additional options

DEFAULT: {}

RETURNS DESCRIPTION
SparkEngineTask

Self for chaining

Source code in src/seeknal/tasks/sparkengine/py_impl/spark_engine_task.py
def add_output(self, path: str, format: str = "parquet", **options) -> "SparkEngineTask":
    """Add output destination.

    Args:
        path: Output path
        format: File format
        **options: Additional options

    Returns:
        Self for chaining
    """
    self.output_config = {"path": path, "format": format, "options": options}
    return self

transform() -> DataFrame

Execute transformation pipeline.

RETURNS DESCRIPTION
DataFrame

Transformed DataFrame

Source code in src/seeknal/tasks/sparkengine/py_impl/spark_engine_task.py
def transform(self) -> DataFrame:
    """Execute transformation pipeline.

    Returns:
        Transformed DataFrame
    """
    # Load input
    if not self.input_config:
        raise ValueError("No input configured")
    extractor = FileSource(
        spark=self.spark,
        path=self.input_config["path"],
        format=self.input_config.get("format", "parquet"),
        options=self.input_config.get("options", {})
    )
    df = extractor.extract()

    # Apply stages
    for stage in self.stages:
        transformer = stage.get_transformer()
        df = transformer.transform(df)

    return df

evaluate() -> None

Execute pipeline and write output.

Source code in src/seeknal/tasks/sparkengine/py_impl/spark_engine_task.py
def evaluate(self) -> None:
    """Execute pipeline and write output."""
    result = self.transform()

    if self.output_config:
        if self.output_config.get("format") == "parquet":
            loader = ParquetWriter(
                spark=self.spark,
                path=self.output_config["path"],
                mode=self.output_config.get("mode", "overwrite")
            )
            loader.load(result)
        else:
            raise ValueError(f"Unsupported output format: {self.output_config.get('format')}")

Stage

The Stage class represents a single step in a SparkEngine transformation pipeline.

Pipeline stage.

PARAMETER DESCRIPTION
stage_id

Unique stage identifier

TYPE: str

transformer_class

Transformer class name

TYPE: str

params

Transformer parameters

TYPE: Dict[str, Any]

Source code in src/seeknal/tasks/sparkengine/py_impl/spark_engine_task.py
def __init__(self, stage_id: str, transformer_class: str, params: Dict[str, Any]):
    self.stage_id = stage_id
    self.transformer_class = transformer_class
    self.params = params

Functions

get_transformer()

Get transformer instance.

RETURNS DESCRIPTION

Transformer instance

Source code in src/seeknal/tasks/sparkengine/py_impl/spark_engine_task.py
def get_transformer(self):
    """Get transformer instance.

    Returns:
        Transformer instance
    """
    # Map class names to classes
    transformer_map = {
        "FilterByExpr": FilterByExpr,
        "AddColumnByExpr": AddColumnByExpr,
        "ColumnRenamed": ColumnRenamed,
        "JoinById": JoinById,
        "JoinByExpr": JoinByExpr,
        "SQL": SQL,
        "AddEntropy": AddEntropy,
        "AddLatLongDistance": AddLatLongDistance,
    }

    cls = transformer_map.get(self.transformer_class)
    if cls is None:
        raise ValueError(f"Unknown transformer: {self.transformer_class}")

    return cls(**self.params)

DuckDBTask

The DuckDBTask class provides a lightweight task implementation using DuckDB for smaller datasets and local development.

Bases: Task

DuckDB-based data transformation task.

Provides the same API as SparkEngineTask but uses DuckDB for execution.

Key Differences from SparkEngineTask: - Uses PyArrow Tables instead of PySpark DataFrames - Executes pure Python + SQL, no JVM - Better for single-node, small-to-medium datasets (<100M rows)

Example

from seeknal.tasks.duckdb import DuckDBTask

task = DuckDBTask(name="process_data") result = task.add_input(path="data.parquet") \ ... .add_sql("SELECT * FROM THIS WHERE amount > 100") \ ... .add_new_column("amount * 1.1", "adjusted") \ ... .transform()

Functions

add_input(dataframe: Optional[Table] = None, path: Optional[str] = None, sql: Optional[str] = None)

Add input data source.

Supports PyArrow Tables, file paths, or raw SQL.

PARAMETER DESCRIPTION
dataframe

PyArrow Table with input data

TYPE: Optional[Table] DEFAULT: None

path

File path (Parquet, CSV, etc.)

TYPE: Optional[str] DEFAULT: None

sql

SQL query to generate input data

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
DuckDBTask

self for method chaining

Source code in src/seeknal/tasks/duckdb/duckdb.py
def add_input(
    self,
    dataframe: Optional[Table] = None,
    path: Optional[str] = None,
    sql: Optional[str] = None,
):
    """Add input data source.

    Supports PyArrow Tables, file paths, or raw SQL.

    Args:
        dataframe: PyArrow Table with input data
        path: File path (Parquet, CSV, etc.)
        sql: SQL query to generate input data

    Returns:
        DuckDBTask: self for method chaining
    """
    if dataframe is not None:
        self.input = {"dataframe": dataframe}
    elif path is not None:
        validate_file_path(path)
        self.input = {"path": path}
    elif sql is not None:
        self.input = {"sql": sql}
    else:
        raise ValueError("Must provide dataframe, path, or sql")
    return self

add_common_yaml(common_yaml: str)

Add common YAML configuration (placeholder for compatibility).

Source code in src/seeknal/tasks/duckdb/duckdb.py
def add_common_yaml(self, common_yaml: str):
    """Add common YAML configuration (placeholder for compatibility)."""
    return self

add_sql(statement: str)

Add SQL transformation stage.

PARAMETER DESCRIPTION
statement

SQL statement to execute

TYPE: str

RETURNS DESCRIPTION
DuckDBTask

self for method chaining

Example

task.add_sql("SELECT * FROM THIS WHERE amount > 100")

Source code in src/seeknal/tasks/duckdb/duckdb.py
def add_sql(self, statement: str):
    """Add SQL transformation stage.

    Args:
        statement: SQL statement to execute

    Returns:
        DuckDBTask: self for method chaining

    Example:
        >>> task.add_sql("SELECT * FROM __THIS__ WHERE amount > 100")
    """
    from .transformers import SQL

    sql_obj = SQL(statement=statement)
    self.stages.append(
        {
            "type": "transformer",
            "class_name": sql_obj.class_name,
            "params": {
                "statement": statement,
                "kind": sql_obj.kind,
                "class_name": sql_obj.class_name,
                "description": sql_obj.description,
            },
        }
    )
    return self

add_new_column(expression: str, output_col: str)

Add computed column.

PARAMETER DESCRIPTION
expression

SQL expression to compute the column

TYPE: str

output_col

Name of the new column

TYPE: str

RETURNS DESCRIPTION
DuckDBTask

self for method chaining

Example

task.add_new_column("amount * 1.1", "adjusted_amount")

Source code in src/seeknal/tasks/duckdb/duckdb.py
def add_new_column(self, expression: str, output_col: str):
    """Add computed column.

    Args:
        expression: SQL expression to compute the column
        output_col: Name of the new column

    Returns:
        DuckDBTask: self for method chaining

    Example:
        >>> task.add_new_column("amount * 1.1", "adjusted_amount")
    """
    from .transformers import AddColumnByExpr

    transformer = AddColumnByExpr(expression=expression, outputCol=output_col)
    self.stages.append(
        {
            "type": "transformer",
            "class_name": transformer.class_name,
            "params": {
                "expression": expression,
                "outputCol": output_col,
                "kind": transformer.kind,
                "class_name": transformer.class_name,
                "description": transformer.description,
            },
        }
    )
    return self

add_filter_by_expr(expression: str)

Filter rows by expression.

PARAMETER DESCRIPTION
expression

SQL boolean expression

TYPE: str

RETURNS DESCRIPTION
DuckDBTask

self for method chaining

Example

task.add_filter_by_expr("status = 'active' AND amount > 0")

Source code in src/seeknal/tasks/duckdb/duckdb.py
def add_filter_by_expr(self, expression: str):
    """Filter rows by expression.

    Args:
        expression: SQL boolean expression

    Returns:
        DuckDBTask: self for method chaining

    Example:
        >>> task.add_filter_by_expr("status = 'active' AND amount > 0")
    """
    from .transformers import FilterByExpr

    transformer = FilterByExpr(expression=expression)
    self.stages.append(
        {
            "type": "transformer",
            "class_name": transformer.class_name,
            "params": {
                "expression": expression,
                "kind": transformer.kind,
                "class_name": transformer.class_name,
                "description": transformer.description,
            },
        }
    )
    return self

select_columns(columns: List[str])

Select specific columns.

PARAMETER DESCRIPTION
columns

List of column names to keep

TYPE: List[str]

RETURNS DESCRIPTION
DuckDBTask

self for method chaining

Example

task.select_columns(["user_id", "name", "email"])

Source code in src/seeknal/tasks/duckdb/duckdb.py
def select_columns(self, columns: List[str]):
    """Select specific columns.

    Args:
        columns: List of column names to keep

    Returns:
        DuckDBTask: self for method chaining

    Example:
        >>> task.select_columns(["user_id", "name", "email"])
    """
    from .transformers import SelectColumns

    transformer = SelectColumns(inputCols=columns)
    self.stages.append(
        {
            "type": "transformer",
            "class_name": transformer.class_name,
            "params": {
                "inputCols": columns,
                "kind": transformer.kind,
                "class_name": transformer.class_name,
                "description": transformer.description,
            },
        }
    )
    return self

drop_columns(columns: List[str])

Drop columns.

PARAMETER DESCRIPTION
columns

List of column names to drop

TYPE: List[str]

RETURNS DESCRIPTION
DuckDBTask

self for method chaining

Example

task.drop_columns(["temp_col", "debug_info"])

Source code in src/seeknal/tasks/duckdb/duckdb.py
def drop_columns(self, columns: List[str]):
    """Drop columns.

    Args:
        columns: List of column names to drop

    Returns:
        DuckDBTask: self for method chaining

    Example:
        >>> task.drop_columns(["temp_col", "debug_info"])
    """
    from .transformers import DropCols

    transformer = DropCols(inputCols=columns)
    self.stages.append(
        {
            "type": "transformer",
            "class_name": transformer.class_name,
            "params": {
                "inputCols": columns,
                "kind": transformer.kind,
                "class_name": transformer.class_name,
                "description": transformer.description,
            },
        }
    )
    return self

add_stage(transformer: Optional[DuckDBTransformer] = None, aggregator: Optional[DuckDBAggregator] = None, id: Optional[str] = None, class_name: Optional[str] = None, params: Optional[dict] = None)

Add transformation stage to pipeline.

Mirrors SparkEngineTask.add_stage() API.

PARAMETER DESCRIPTION
transformer

DuckDBTransformer instance

TYPE: Optional[DuckDBTransformer] DEFAULT: None

aggregator

DuckDBAggregator instance

TYPE: Optional[DuckDBAggregator] DEFAULT: None

id

Reference to predefined transformation

TYPE: Optional[str] DEFAULT: None

class_name

Fully qualified class name

TYPE: Optional[str] DEFAULT: None

params

Parameters for class_name

TYPE: Optional[dict] DEFAULT: None

RETURNS DESCRIPTION
DuckDBTask

self for method chaining

Source code in src/seeknal/tasks/duckdb/duckdb.py
def add_stage(
    self,
    transformer: Optional["DuckDBTransformer"] = None,
    aggregator: Optional["DuckDBAggregator"] = None,
    id: Optional[str] = None,
    class_name: Optional[str] = None,
    params: Optional[dict] = None,
):
    """Add transformation stage to pipeline.

    Mirrors SparkEngineTask.add_stage() API.

    Args:
        transformer: DuckDBTransformer instance
        aggregator: DuckDBAggregator instance
        id: Reference to predefined transformation
        class_name: Fully qualified class name
        params: Parameters for class_name

    Returns:
        DuckDBTask: self for method chaining
    """
    if transformer is not None:
        self.stages.append(
            {
                "type": "transformer",
                "class_name": transformer.class_name,
                "params": transformer.model_dump(),
            }
        )
    elif aggregator is not None:
        # Convert aggregator to stages (pre + agg + post)
        self.stages.extend(self._aggregator_to_stages(aggregator))
    elif id is not None:
        self.stages.append({"type": "id", "id": id})
    elif class_name is not None:
        if params is None:
            raise ValueError("params must be defined with class_name")
        self.stages.append(
            {"type": "transformer", "class_name": class_name, "params": params}
        )

    return self

transform(spark=None, chain: bool = True, materialize: bool = False, params=None, filters=None, date=None, start_date=None, end_date=None) -> Union[Table, DuckDBTask]

Execute the transformation pipeline.

PARAMETER DESCRIPTION
spark

Ignored (for API compatibility with SparkEngineTask)

DEFAULT: None

chain

If True, chain stages sequentially

TYPE: bool DEFAULT: True

materialize

If True, return self for further operations

TYPE: bool DEFAULT: False

params

Optional parameters for transformations - return_as_pandas: bool, return pandas DataFrame instead of PyArrow

DEFAULT: None

filters

Optional filters (not yet implemented)

DEFAULT: None

date

Optional single date filter (not yet implemented)

DEFAULT: None

start_date

Optional start date filter (not yet implemented)

DEFAULT: None

end_date

Optional end date filter (not yet implemented)

DEFAULT: None

RETURNS DESCRIPTION
Union[Table, DuckDBTask]

PyArrow Table if materialize=False

Union[Table, DuckDBTask]

DuckDBTask if materialize=True

Union[Table, DuckDBTask]

Pandas DataFrame if params['return_as_pandas']=True

Source code in src/seeknal/tasks/duckdb/duckdb.py
def transform(
    self,
    spark=None,
    chain: bool = True,
    materialize: bool = False,
    params=None,
    filters=None,
    date=None,
    start_date=None,
    end_date=None,
) -> Union[Table, "DuckDBTask"]:
    """Execute the transformation pipeline.

    Args:
        spark: Ignored (for API compatibility with SparkEngineTask)
        chain: If True, chain stages sequentially
        materialize: If True, return self for further operations
        params: Optional parameters for transformations
            - return_as_pandas: bool, return pandas DataFrame instead of PyArrow
        filters: Optional filters (not yet implemented)
        date: Optional single date filter (not yet implemented)
        start_date: Optional start date filter (not yet implemented)
        end_date: Optional end date filter (not yet implemented)

    Returns:
        PyArrow Table if materialize=False
        DuckDBTask if materialize=True
        Pandas DataFrame if params['return_as_pandas']=True
    """
    # 1. Load input data
    current_rel = self._load_input()

    # 2. Execute stages sequentially using CTEs
    for i, stage in enumerate(self.stages):
        transformer = self._instantiate_stage(stage)

        # Special handling for DropCols which needs all columns
        from .transformers import DropCols
        if isinstance(transformer, DropCols):
            # Get current column names
            if i == 0:
                # First stage - get columns from input
                all_cols = list(self.conn.table("_input_table").arrow().read_all().column_names)
            else:
                # Subsequent stages - get columns from previous result
                all_cols = list(self.conn.table(f"_result_{i}").arrow().read_all().column_names)

            sql = transformer.to_sql(all_cols)
        else:
            sql = transformer.to_sql()

        # Replace __THIS__ with the actual relation or subquery
        if i == 0:
            # First stage, use _input_table directly
            if "__THIS__" in sql:
                sql = sql.replace("__THIS__", "_input_table")
        else:
            # Subsequent stages, wrap previous in CTE
            if "__THIS__" in sql:
                sql = sql.replace("__THIS__", f"_result_{i}")

        # Execute SQL
        current_rel = self.conn.sql(sql)

        # Register result for next stage
        result_name = f"_result_{i+1}"
        # Force execution and get Arrow table (use read_all())
        arrow_reader = current_rel.arrow()
        arrow_result = arrow_reader.read_all()
        self.conn.register(result_name, arrow_result)

    # 3. Return result
    if materialize:
        self._materialize = True
        return self
    else:
        # Get final result as Arrow table
        arrow_reader = current_rel.arrow()
        final_result = arrow_reader.read_all()

        # Check if user wants pandas DataFrame
        if params is not None and params.get("return_as_pandas") is True:
            return final_result.to_pandas()

        return final_result

SecondOrderAggregator

Perform second-order aggregations (aggregations on aggregated data).

Perform second-order aggregations with time windows using PySpark.

This class enables feature engineering by creating second-order features from transaction-level or event-level data. It supports various aggregation strategies that can be combined to generate a rich feature set.

Supported Aggregation Types
  1. basic: Simple aggregation over the entire history per ID.
  2. Format: AggregationSpec("basic", "feature_col", "agg_func")
  3. Example: AggregationSpec("basic", "amount", "sum") -> Sum of amount for each user.

  4. basic_days: Aggregation over a specific time window defined by days_between.

  5. Format: AggregationSpec("basic_days", "feature_col", "agg_func", "", lower, upper)
  6. Example: AggregationSpec("basic_days", "amount", "mean", "", 1, 30) -> Mean amount in the last 1-30 days.

  7. ratio: Ratio of two aggregations over different time windows.

  8. Format: AggregationSpec("ratio", "feature_col", "agg_func", "", lower1, upper1, lower2, upper2)
  9. Example: AggregationSpec("ratio", "amount", "sum", "", 1, 30, 31, 60) -> (Sum 1-30 days) / (Sum 31-60 days).

  10. since: Aggregation filtered by a custom condition.

  11. Format: AggregationSpec("since", "feature_col", "agg_func", "condition")
  12. Example: AggregationSpec("since", "flag", "count", "flag == 1") -> Count of flag=1 events.
PARAMETER DESCRIPTION
spark

SparkSession for executing queries

TYPE: SparkSession

idCol

Column name representing the entity ID (e.g., 'user_id', 'msisdn')

TYPE: str

featureDateCol

Column name representing the date of the event/transaction

TYPE: str DEFAULT: 'day'

featureDateFormat

Format of the feature date column (default: "yyyy-MM-dd")

TYPE: str DEFAULT: 'yyyy-MM-dd'

applicationDateCol

Column name representing the reference date (e.g., application date)

TYPE: str DEFAULT: 'application_date'

applicationDateFormat

Format of the application date column (default: "yyyy-MM-dd")

TYPE: str DEFAULT: 'yyyy-MM-dd'

The transform method applies the aggregation rules to a table and returns a DataFrame.

Source code in src/seeknal/tasks/sparkengine/py_impl/aggregators/second_order_aggregator.py
def __init__(
    self,
    spark: SparkSession,
    idCol: str,
    featureDateCol: str = "day",
    featureDateFormat: str = "yyyy-MM-dd",
    applicationDateCol: str = "application_date",
    applicationDateFormat: str = "yyyy-MM-dd",
):
    self.spark = spark
    self.idCol = idCol
    self.featureDateCol = featureDateCol
    self.featureDateFormat = featureDateFormat
    self.applicationDateCol = applicationDateCol
    self.applicationDateFormat = applicationDateFormat
    self.rules = []

Functions

setRules(rules: List[AggregationSpec]) -> SecondOrderAggregator

Set aggregation rules.

PARAMETER DESCRIPTION
rules

List of AggregationSpec objects

TYPE: List[AggregationSpec]

RETURNS DESCRIPTION
SecondOrderAggregator

self for method chaining

Source code in src/seeknal/tasks/sparkengine/py_impl/aggregators/second_order_aggregator.py
def setRules(self, rules: List[AggregationSpec]) -> "SecondOrderAggregator":
    """Set aggregation rules.

    Args:
        rules: List of AggregationSpec objects

    Returns:
        self for method chaining
    """
    self.rules = rules
    return self

transform(table_name: str) -> DataFrame

Transform the input table using the defined aggregation rules.

PARAMETER DESCRIPTION
table_name

Name of the table or view to aggregate

TYPE: str

RETURNS DESCRIPTION
DataFrame

DataFrame with aggregated features

Source code in src/seeknal/tasks/sparkengine/py_impl/aggregators/second_order_aggregator.py
def transform(self, table_name: str) -> DataFrame:
    """Transform the input table using the defined aggregation rules.

    Args:
        table_name: Name of the table or view to aggregate

    Returns:
        DataFrame with aggregated features
    """
    if not self.rules:
        raise ValueError("No rules defined for aggregation.")

    # Load the table and add days_between column
    df = self.spark.table(table_name)

    # Add days_between column: application_date - feature_date
    # Positive = historical (feature before application), negative = future
    df = df.withColumn(
        "_days_between",
        F.datediff(
            F.to_date(F.col(self.applicationDateCol), self.applicationDateFormat),
            F.to_date(F.col(self.featureDateCol), self.featureDateFormat),
        ),
    )

    # Build aggregations
    all_aggs = []
    group_cols = [self.idCol]

    for rule in self.rules:
        features = [f.strip() for f in rule.features.split(",")]
        aggs = [a.strip() for a in rule.aggregations.split(",")]
        conds = [c.strip() for c in rule.filterCondition.split(",")] if rule.filterCondition else [None] * len(features)

        if rule.name == "basic":
            all_aggs.extend(self._basic_aggregations(features, aggs))
        elif rule.name == "basic_days":
            all_aggs.extend(
                self._basic_days_aggregations(
                    features, aggs, int(rule.dayLimitLower1), int(rule.dayLimitUpper1), conds
                )
            )
        elif rule.name == "since":
            all_aggs.extend(self._since_aggregations(features, aggs, conds))
        elif rule.name == "ratio":
            all_aggs.extend(
                self._ratio_aggregations(
                    features,
                    aggs,
                    int(rule.dayLimitLower1),
                    int(rule.dayLimitUpper1),
                    int(rule.dayLimitLower2),
                    int(rule.dayLimitUpper2),
                    conds,
                )
            )

    # Perform aggregation
    result = df.groupBy(*group_cols).agg(*all_aggs)

    # Flatten column names (Spark agg creates nested column names)
    for col in result.columns:
        if col != self.idCol:
            # Clean up column name from aggregation nesting
            new_name = col.replace("(", "_").replace(")", "").replace(",", "")
            result = result.withColumnRenamed(col, new_name)

    return result

validate(table_name: str) -> List[str]

Validate that the input table has the required columns.

PARAMETER DESCRIPTION
table_name

Name of the table to validate

TYPE: str

RETURNS DESCRIPTION
List[str]

List of error messages (empty if validation passes)

Source code in src/seeknal/tasks/sparkengine/py_impl/aggregators/second_order_aggregator.py
def validate(self, table_name: str) -> List[str]:
    """Validate that the input table has the required columns.

    Args:
        table_name: Name of the table to validate

    Returns:
        List of error messages (empty if validation passes)
    """
    errors = []
    try:
        df = self.spark.table(table_name)
        columns = set(df.columns)
    except Exception as e:
        return [f"Could not access table '{table_name}': {str(e)}"]

    # Check required columns
    if self.idCol not in columns:
        errors.append(f"Missing ID column: '{self.idCol}'")

    if self.featureDateCol not in columns:
        errors.append(f"Missing feature date column: '{self.featureDateCol}'")

    if self.applicationDateCol not in columns:
        errors.append(f"Missing application date column: '{self.applicationDateCol}'")

    # Check feature columns from rules
    used_features = set()
    for rule in self.rules:
        for f in rule.features.split(","):
            used_features.add(f.strip())

    missing = used_features - columns
    if missing:
        errors.append(f"Missing feature columns: {missing}")

    return errors

builder() -> FeatureBuilder

Returns a FeatureBuilder instance for fluent API.

RETURNS DESCRIPTION
FeatureBuilder

FeatureBuilder for chaining aggregation rules

Source code in src/seeknal/tasks/sparkengine/py_impl/aggregators/second_order_aggregator.py
def builder(self) -> "FeatureBuilder":
    """Returns a FeatureBuilder instance for fluent API.

    Returns:
        FeatureBuilder for chaining aggregation rules
    """
    return FeatureBuilder(self)