Skip to content

Core API Reference

This page documents the core Seeknal classes that form the foundation of the library. These classes handle project management, entity definitions, data flows, and execution context.

Overview

The core module provides the essential building blocks for working with Seeknal:

Class Purpose
Project Manage Seeknal projects and their lifecycle
Entity Define entities with join keys for feature stores
Flow Create and manage data transformation pipelines
Context Execution context and session management

Project

The Project class is the top-level container for organizing Seeknal resources. Projects provide namespace isolation and resource management.

Classes

Project(name: str, description: str = '') dataclass

A class used to define and manage projects in Seeknal.

Projects serve as the top-level organizational unit for grouping related entities, feature views, and data sources. Each project has a unique name and optional description.

PARAMETER DESCRIPTION
name

The name of the project. Will be converted to snake_case.

TYPE: str

description

A description of the project. Defaults to empty string.

TYPE: str DEFAULT: ''

ATTRIBUTE DESCRIPTION
project_id

The unique identifier assigned after saving to the database. Only available after calling get_or_create().

TYPE: int

Example

project = Project(name="my_project", description="My feature store project") project = project.get_or_create() print(project.project_id)

Functions

get_or_create()

Get an existing project by name or create a new one if it doesn't exist.

This method checks if a project with the current name exists in the database. If found, it loads the existing project's data into this instance. If not found, it creates a new project with the current attributes. The project_id is set on the global context for subsequent operations.

RETURNS DESCRIPTION
Project

The current instance with project_id populated.

Example

project = Project(name="my_project", description="Test project") project = project.get_or_create() print(project.project_id)

Source code in src/seeknal/project.py
def get_or_create(self):
    """
    Get an existing project by name or create a new one if it doesn't exist.

    This method checks if a project with the current name exists in the database.
    If found, it loads the existing project's data into this instance.
    If not found, it creates a new project with the current attributes.
    The project_id is set on the global context for subsequent operations.

    Returns:
        Project: The current instance with project_id populated.

    Example:
        >>> project = Project(name="my_project", description="Test project")
        >>> project = project.get_or_create()
        >>> print(project.project_id)
    """
    req = ProjectRequest(body=self.__dict__)
    project = req.select_by_name(self.name)
    if project is None:
        self.project_id = req.save()
    else:
        self.__dict__.update(project.__dict__)
        self.project_id = project.id
    context.project_id = self.project_id
    return self
update(name=None, description=None)

Update the project's name and/or description.

Updates the project in the database with the provided values. If a parameter is None, the existing value is preserved. The project must be loaded via get_or_create() before calling this method.

PARAMETER DESCRIPTION
name

New name for the project. If None, keeps current name.

TYPE: str DEFAULT: None

description

New description for the project. If None, keeps current description.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
Project

The current instance with updated attributes.

RAISES DESCRIPTION
ValueError

If the project has not been loaded via get_or_create().

ProjectNotFoundError

If the project no longer exists in the database.

Example

project = Project(name="my_project").get_or_create() project = project.update(description="Updated description")

Source code in src/seeknal/project.py
def update(self, name=None, description=None):
    """
    Update the project's name and/or description.

    Updates the project in the database with the provided values.
    If a parameter is None, the existing value is preserved.
    The project must be loaded via get_or_create() before calling this method.

    Args:
        name (str, optional): New name for the project. If None, keeps current name.
        description (str, optional): New description for the project. If None,
            keeps current description.

    Returns:
        Project: The current instance with updated attributes.

    Raises:
        ValueError: If the project has not been loaded via get_or_create().
        ProjectNotFoundError: If the project no longer exists in the database.

    Example:
        >>> project = Project(name="my_project").get_or_create()
        >>> project = project.update(description="Updated description")
    """
    if self.project_id is None:
        raise ValueError("Invalid. Make sure load project with get_or_create()")
    project = ProjectRequest.select_by_id(self.project_id)
    if project is None:
        raise ProjectNotFoundError("Project doesn't exists.")
    if name is None:
        name = project.name
    if description is None:
        description = project.description
    req = ProjectRequest(
        body={
            "name": name,
            "description": description,
        }
    )
    req.save()
    self.name = name
    self.description = description
    return self
list() staticmethod

List all projects in a tabular format.

Retrieves all projects from the database and displays them in a formatted table using GitHub-style markdown format. The table includes the project name, description, creation time, and last update time.

Note

This is a static method that outputs directly to the console. It does not return any value.

Example

Project.list() | name | description | created_at | updated_at | |------------|---------------|---------------------|---------------------| | my_project | Test project | 2024-01-15 10:30:00 | 2024-01-15 10:30:00 |

Source code in src/seeknal/project.py
@staticmethod
def list():
    """
    List all projects in a tabular format.

    Retrieves all projects from the database and displays them in a
    formatted table using GitHub-style markdown format. The table includes
    the project name, description, creation time, and last update time.

    Note:
        This is a static method that outputs directly to the console.
        It does not return any value.

    Example:
        >>> Project.list()
        | name       | description   | created_at          | updated_at          |
        |------------|---------------|---------------------|---------------------|
        | my_project | Test project  | 2024-01-15 10:30:00 | 2024-01-15 10:30:00 |
    """
    req = ProjectRequest()
    projects = req.select_all()
    projects = list(
        map(
            lambda x: {
                "name": x.name,
                "description": x.description,
                "created_at": pendulum.instance(x.created_at).format(
                    "YYYY-MM-DD HH:MM:SS"
                ),
                "updated_at": pendulum.instance(x.updated_at).format(
                    "YYYY-MM-DD HH:MM:SS"
                ),
            },
            projects,
        )
    )
    tabular = tabulate(projects, headers="keys", tablefmt="github")
    typer.echo(tabular)
get_by_id(id) staticmethod

Retrieve a project by its unique identifier.

Fetches a project from the database using its ID and returns a new Project instance with the loaded data.

PARAMETER DESCRIPTION
id

The unique identifier of the project to retrieve.

TYPE: int

RETURNS DESCRIPTION
Project

A new Project instance with the loaded data.

RAISES DESCRIPTION
ProjectNotFoundError

If no project exists with the given ID.

Example

project = Project.get_by_id(1) print(project.name)

Source code in src/seeknal/project.py
@staticmethod
def get_by_id(id):
    """
    Retrieve a project by its unique identifier.

    Fetches a project from the database using its ID and returns a new
    Project instance with the loaded data.

    Args:
        id (int): The unique identifier of the project to retrieve.

    Returns:
        Project: A new Project instance with the loaded data.

    Raises:
        ProjectNotFoundError: If no project exists with the given ID.

    Example:
        >>> project = Project.get_by_id(1)
        >>> print(project.name)
    """
    project = ProjectRequest.select_by_id(id)
    if project is None:
        raise ProjectNotFoundError("Project doesn't exists.")
    return Project(
        name=project.name, description=project.description, common=project.common
    )

Functions


Entity

The Entity class defines entities with join keys that serve as the primary identifiers for feature lookups in the feature store.

Classes

Entity(name: str, join_keys: Optional[List[str]] = None, pii_keys: Optional[List[str]] = None, description: Optional[str] = None) dataclass

Represents an entity in the feature store.

An entity defines a domain object (e.g., user, product, transaction) that features are associated with. Entities have join keys that uniquely identify instances and can optionally specify PII (Personally Identifiable Information) keys for data privacy compliance.

ATTRIBUTE DESCRIPTION
name

The entity name. Will be converted to snake_case automatically.

TYPE: str

join_keys

List of column names that uniquely identify entity instances. These keys are used for joining features during retrieval.

TYPE: Optional[List[str]]

pii_keys

Optional list of column names containing personally identifiable information. Used for data privacy and compliance purposes.

TYPE: Optional[List[str]]

description

Optional human-readable description of the entity.

TYPE: Optional[str]

Example

entity = Entity( ... name="customer", ... join_keys=["customer_id"], ... pii_keys=["email", "phone"], ... description="Customer entity for retail features" ... ) entity.get_or_create()

Functions

get_or_create()

Retrieve an existing entity or create a new one.

This method checks if an entity with the same name already exists in the feature store. If found, it loads the existing entity's properties into this instance. If not found, it creates a new entity with the current instance's properties.

After calling this method, the entity will have an 'entity_id' attribute set, which is required for operations like update().

RETURNS DESCRIPTION
Entity

The current instance with entity_id set and properties synchronized with the persisted entity.

Example

entity = Entity(name="user", join_keys=["user_id"]) entity = entity.get_or_create() print(entity.entity_id) # Now has an ID

Source code in src/seeknal/entity.py
def get_or_create(self):
    """Retrieve an existing entity or create a new one.

    This method checks if an entity with the same name already exists in the
    feature store. If found, it loads the existing entity's properties into
    this instance. If not found, it creates a new entity with the current
    instance's properties.

    After calling this method, the entity will have an 'entity_id' attribute
    set, which is required for operations like update().

    Returns:
        Entity: The current instance with entity_id set and properties
            synchronized with the persisted entity.

    Example:
        >>> entity = Entity(name="user", join_keys=["user_id"])
        >>> entity = entity.get_or_create()
        >>> print(entity.entity_id)  # Now has an ID
    """
    req = EntityRequest(body=vars(self))
    entity = req.select_by_name(self.name)
    if entity is None:
        self.entity_id = req.save()
    else:
        self.entity_id = entity.id
        self.name = entity.name
        self.join_keys = entity.join_keys.split(",")
        if entity.pii_keys is not None:
            self.pii_keys = entity.pii_keys.split(",")
        else:
            self.pii_keys = None
        self.description = entity.description
    return self
list() staticmethod

List all registered entities in the feature store.

Retrieves all entities from the feature store and displays them in a formatted table. The table includes entity name, join keys, PII keys, and description.

This is a static method that can be called without instantiating an Entity object.

Example

Entity.list() | name | join_keys | pii_keys | description | |----------|-------------|----------|----------------------| | customer | customer_id | email | Customer entity | | product | product_id | None | Product catalog item |

Source code in src/seeknal/entity.py
@staticmethod
def list():
    """List all registered entities in the feature store.

    Retrieves all entities from the feature store and displays them in a
    formatted table. The table includes entity name, join keys, PII keys,
    and description.

    This is a static method that can be called without instantiating an
    Entity object.

    Example:
        >>> Entity.list()
        | name     | join_keys   | pii_keys | description          |
        |----------|-------------|----------|----------------------|
        | customer | customer_id | email    | Customer entity      |
        | product  | product_id  | None     | Product catalog item |
    """
    entities = EntityRequest.select_all()
    if entities:
        entities = [
            {
                "name": entity.name,
                "join_keys": entity.join_keys,
                "pii_keys": entity.pii_keys,
                "description": entity.description,
            }
            for entity in entities
        ]
        typer.echo(tabulate(entities, headers="keys", tablefmt="github"))
    else:
        typer.echo("No entities found.")
update(name=None, description=None, pii_keys=None)

Update the entity's properties in the feature store.

Updates the entity with new values for name, description, or PII keys. Only the provided parameters will be updated; others retain their current values. The entity must have been previously saved via get_or_create() before calling this method.

Note

Join keys cannot be updated after entity creation as they define the entity's identity.

PARAMETER DESCRIPTION
name

Optional new name for the entity. Will be converted to snake_case.

DEFAULT: None

description

Optional new description for the entity.

DEFAULT: None

pii_keys

Optional new list of PII key column names.

DEFAULT: None

RAISES DESCRIPTION
EntityNotSavedError

If the entity has not been saved via get_or_create() first.

EntityNotFoundError

If the entity no longer exists in the feature store.

Example

entity = Entity(name="user", join_keys=["user_id"]) entity.get_or_create() entity.update(description="Updated user entity")

Source code in src/seeknal/entity.py
@require_saved
def update(self, name=None, description=None, pii_keys=None):
    """Update the entity's properties in the feature store.

    Updates the entity with new values for name, description, or PII keys.
    Only the provided parameters will be updated; others retain their
    current values. The entity must have been previously saved via
    get_or_create() before calling this method.

    Note:
        Join keys cannot be updated after entity creation as they define
        the entity's identity.

    Args:
        name: Optional new name for the entity. Will be converted to
            snake_case.
        description: Optional new description for the entity.
        pii_keys: Optional new list of PII key column names.

    Raises:
        EntityNotSavedError: If the entity has not been saved via
            get_or_create() first.
        EntityNotFoundError: If the entity no longer exists in the
            feature store.

    Example:
        >>> entity = Entity(name="user", join_keys=["user_id"])
        >>> entity.get_or_create()
        >>> entity.update(description="Updated user entity")
    """
    entity = EntityRequest.select_by_id(self.entity_id)
    if entity is None:
        raise EntityNotFoundError("Entity not found.")
    if name is None:
        name = entity.name
    if description is None:
        description = entity.description
    if pii_keys is None:
        pii_keys = entity.pii_keys
    req = EntityRequest(
        body={
            "name": name,
            "description": description,
            "pii_keys": pii_keys,
            "join_keys": entity.join_keys.split(","),
        }
    )
    req.save()
    self.name = name
    self.description = description
    self.pii_keys = pii_keys
set_key_values(*args)

Set specific values for the entity's join keys.

Maps positional arguments to the entity's join keys in order, storing them in the key_values attribute. This is useful for point lookups when retrieving features for a specific entity instance.

PARAMETER DESCRIPTION
*args

Values for each join key, in the same order as defined in join_keys. The number of arguments must match the number of join keys.

DEFAULT: ()

RETURNS DESCRIPTION
Entity

The current instance with key_values set.

Example

entity = Entity(name="order", join_keys=["user_id", "order_id"]) entity.get_or_create() entity.set_key_values("user123", "order456") print(entity.key_values)

Source code in src/seeknal/entity.py
def set_key_values(self, *args):
    """Set specific values for the entity's join keys.

    Maps positional arguments to the entity's join keys in order, storing
    them in the key_values attribute. This is useful for point lookups
    when retrieving features for a specific entity instance.

    Args:
        *args: Values for each join key, in the same order as defined
            in join_keys. The number of arguments must match the number
            of join keys.

    Returns:
        Entity: The current instance with key_values set.

    Example:
        >>> entity = Entity(name="order", join_keys=["user_id", "order_id"])
        >>> entity.get_or_create()
        >>> entity.set_key_values("user123", "order456")
        >>> print(entity.key_values)
        {'user_id': 'user123', 'order_id': 'order456'}
    """
    key_values = {}
    for idx, i in enumerate(self.join_keys):
        key_values[i] = args[idx]

    self.key_values = key_values
    return self

Functions

require_saved(func)

Decorator that ensures an entity has been saved before method execution.

This decorator checks if the entity instance has an 'entity_id' attribute, which indicates it has been persisted via get_or_create(). If not, it raises an EntityNotSavedError.

PARAMETER DESCRIPTION
func

The method to wrap.

RETURNS DESCRIPTION

A wrapper function that validates the entity is saved before calling

the original method.

RAISES DESCRIPTION
EntityNotSavedError

If the entity has not been saved or loaded.

Source code in src/seeknal/entity.py
def require_saved(func):
    """Decorator that ensures an entity has been saved before method execution.

    This decorator checks if the entity instance has an 'entity_id' attribute,
    which indicates it has been persisted via get_or_create(). If not, it raises
    an EntityNotSavedError.

    Args:
        func: The method to wrap.

    Returns:
        A wrapper function that validates the entity is saved before calling
        the original method.

    Raises:
        EntityNotSavedError: If the entity has not been saved or loaded.
    """
    def wrapper(self, *args, **kwargs):
        if not "entity_id" in vars(self):
            raise EntityNotSavedError("Entity not loaded or saved")
        else:
            func(self, *args, **kwargs)

    return wrapper

Flow

The Flow class enables the creation and execution of data transformation pipelines. Flows connect inputs, tasks, and outputs to build complete data processing workflows.

Classes

FlowOutputEnum

Bases: str, Enum

Enumeration of supported flow output types.

Defines the possible output formats for Flow execution results.

ATTRIBUTE DESCRIPTION
SPARK_DATAFRAME

Output as a PySpark DataFrame.

ARROW_DATAFRAME

Output as a PyArrow Table.

PANDAS_DATAFRAME

Output as a Pandas DataFrame.

HIVE_TABLE

Write output to a Hive table.

PARQUET

Write output to Parquet files.

LOADER

Use a custom loader for output.

FEATURE_GROUP

Output to a feature group.

FEATURE_SERVING

Output for feature serving.

FlowInputEnum

Bases: str, Enum

Enumeration of supported flow input types.

Defines the possible input sources for Flow data ingestion.

ATTRIBUTE DESCRIPTION
HIVE_TABLE

Read input from a Hive table.

PARQUET

Read input from Parquet files.

FEATURE_GROUP

Read input from a feature group.

EXTRACTOR

Use a custom extractor for input.

SOURCE

Read input from a defined Source.

FlowInput(value: Optional[Union[str, dict, Extractor]] = None, kind: FlowInputEnum = FlowInputEnum.HIVE_TABLE) dataclass

Configuration for flow input data source.

Defines how data is loaded into a Flow for processing. Supports multiple input types including Hive tables, Parquet files, extractors, and sources.

ATTRIBUTE DESCRIPTION
value

The input specification. Can be a table name (str), path (str), configuration (dict), or Extractor instance depending on the kind.

TYPE: Optional[Union[str, dict, BaseExtractorPySpark]]

kind

The type of input source (default: HIVE_TABLE).

TYPE: FlowInputEnum

Example

Read from a Hive table

flow_input = FlowInput(value="my_database.my_table", kind=FlowInputEnum.HIVE_TABLE)

Read from Parquet files

flow_input = FlowInput(value="/path/to/data.parquet", kind=FlowInputEnum.PARQUET)

FlowOutput(value: Optional[Any] = None, kind: Optional[FlowOutputEnum] = None) dataclass

Configuration for flow output destination.

Defines how flow results are returned or persisted. Supports multiple output formats including DataFrames, Hive tables, and Parquet files.

ATTRIBUTE DESCRIPTION
value

The output destination. For file-based outputs, this is the path or table name. For DataFrame outputs, this is typically None.

TYPE: Optional[Any]

kind

The type of output format (default: None, returns data as-is).

TYPE: Optional[FlowOutputEnum]

Example

Return as Spark DataFrame

output = FlowOutput(kind=FlowOutputEnum.SPARK_DATAFRAME)

Write to Hive table

output = FlowOutput(value="my_db.output_table", kind=FlowOutputEnum.HIVE_TABLE)

Flow(name: str, input: Optional[FlowInput] = None, input_date_col: Optional[dict] = None, tasks: Optional[List[Task]] = None, output: Optional[FlowOutput] = None, description: str = '') dataclass

A data processing pipeline that chains inputs, tasks, and outputs.

Flow is the core abstraction for defining data pipelines in seeknal. It connects a data source (input), a series of transformation tasks, and an output destination. Flows can be saved to and loaded from the seeknal backend for reuse and scheduling.

ATTRIBUTE DESCRIPTION
name

Unique identifier for the flow (automatically converted to snake_case).

TYPE: str

input

Configuration for the input data source.

TYPE: Optional[FlowInput]

input_date_col

Optional date column configuration for filtering input data. Contains 'dateCol' (column name) and 'datePattern' (date format).

TYPE: Optional[dict]

tasks

Optional list of Task instances to execute in sequence.

TYPE: Optional[List[Task]]

output

Configuration for the output destination.

TYPE: Optional[FlowOutput]

description

Human-readable description of the flow's purpose.

TYPE: str

Example

from seeknal.flow import Flow, FlowInput, FlowOutput, FlowInputEnum, FlowOutputEnum from seeknal.tasks.sparkengine import SparkEngineTask

Create a simple flow

flow = Flow( ... name="my_etl_flow", ... input=FlowInput(value="source_table", kind=FlowInputEnum.HIVE_TABLE), ... tasks=[SparkEngineTask()], ... output=FlowOutput(kind=FlowOutputEnum.SPARK_DATAFRAME), ... description="ETL flow for processing source data" ... )

Run the flow

result = flow.run(start_date="2024-01-01", end_date="2024-01-31")

Functions

require_saved(func)

Decorator that ensures the flow has been saved before method execution.

PARAMETER DESCRIPTION
func

The method to wrap.

RETURNS DESCRIPTION

Wrapped function that checks for flow_id before execution.

RAISES DESCRIPTION
ValueError

If the flow has not been saved (no flow_id).

Source code in src/seeknal/flow.py
def require_saved(func):
    """Decorator that ensures the flow has been saved before method execution.

    Args:
        func: The method to wrap.

    Returns:
        Wrapped function that checks for flow_id before execution.

    Raises:
        ValueError: If the flow has not been saved (no flow_id).
    """

    def wrapper(self, *args, **kwargs):
        if not "flow_id" in vars(self):
            raise ValueError("flow not loaded or saved")
        else:
            func(self, *args, **kwargs)

    return wrapper
set_input_date_col(date_col: str, date_pattern: str = 'yyyyMMdd')

Configure the date column for input data filtering.

Sets up date-based filtering on the input data, allowing the flow to process data within specific date ranges.

PARAMETER DESCRIPTION
date_col

Name of the column containing date values.

TYPE: str

date_pattern

Date format pattern (default: "yyyyMMdd").

TYPE: str DEFAULT: 'yyyyMMdd'

RETURNS DESCRIPTION

Self for method chaining.

Example

flow.set_input_date_col("event_date", "yyyy-MM-dd")

Source code in src/seeknal/flow.py
def set_input_date_col(self, date_col: str, date_pattern: str = "yyyyMMdd"):
    """Configure the date column for input data filtering.

    Sets up date-based filtering on the input data, allowing the flow
    to process data within specific date ranges.

    Args:
        date_col: Name of the column containing date values.
        date_pattern: Date format pattern (default: "yyyyMMdd").

    Returns:
        Self for method chaining.

    Example:
        >>> flow.set_input_date_col("event_date", "yyyy-MM-dd")
    """
    self.input_date_col = {
        "dateCol": date_col,
        "datePattern": date_pattern,
    }
    return self
run(params=None, filters=None, date=None, start_date=None, end_date=None)

Execute the flow pipeline.

Runs the complete flow: loads input data, applies filters, executes all tasks in sequence, and returns the output in the configured format.

PARAMETER DESCRIPTION
params

Optional dictionary of parameters to pass to tasks.

DEFAULT: None

filters

Optional filters to apply to the input data.

DEFAULT: None

date

Optional single date for filtering (mutually exclusive with start_date/end_date).

DEFAULT: None

start_date

Optional start date for date range filtering.

DEFAULT: None

end_date

Optional end date for date range filtering.

DEFAULT: None

RETURNS DESCRIPTION

The processed data in the format specified by the output configuration.

Example
Run with date range

result = flow.run(start_date="2024-01-01", end_date="2024-01-31")

Run with parameters

result = flow.run(params={"threshold": 0.5})

Source code in src/seeknal/flow.py
def run(self, params=None, filters=None, date=None, start_date=None, end_date=None):
    """Execute the flow pipeline.

    Runs the complete flow: loads input data, applies filters, executes
    all tasks in sequence, and returns the output in the configured format.

    Args:
        params: Optional dictionary of parameters to pass to tasks.
        filters: Optional filters to apply to the input data.
        date: Optional single date for filtering (mutually exclusive with
            start_date/end_date).
        start_date: Optional start date for date range filtering.
        end_date: Optional end date for date range filtering.

    Returns:
        The processed data in the format specified by the output configuration.

    Example:
        >>> # Run with date range
        >>> result = flow.run(start_date="2024-01-01", end_date="2024-01-31")
        >>> # Run with parameters
        >>> result = flow.run(params={"threshold": 0.5})
    """
    spark = SparkSession.builder.getOrCreate() if self._requires_spark() else None
    # taking care the input
    # load data and applying filters
    flow_input = self.input(spark)
    if isinstance(flow_input, DataFrame):
        pre_filter = (
            SparkEngineTask()
            .add_input(dataframe=flow_input)
            .add_common_yaml(Common().as_yaml())
        )
        if self.input_date_col is not None:
            pre_filter.input["params"] = {
                "dateCol": self.input_date_col["dateCol"],
                "datePattern": self.input_date_col["datePattern"],
            }
        flow_input = pre_filter.transform(
            spark,
            chain=True,
            materialize=False,
            params=params,
            filters=filters,
            date=date,
            start_date=start_date,
            end_date=end_date,
        )
    elif isinstance(flow_input, pa.Table):
        # todo: implement filters for arrow dataframe
        pass
    else:
        pass

    if self.tasks is None:
        return self.output(flow_input, spark)
    else:
        first_task = self.tasks[0]
        if first_task.is_spark_job:
            if not isinstance(flow_input, DataFrame):
                flow_input = spark.createDataFrame(flow_input)
        first_task.add_input(dataframe=flow_input).add_common_yaml(
            Common().as_yaml()
        )
        temp_data = first_task.transform(
            spark, chain=True, materialize=False, params=params
        )
        if len(self.tasks) > 1:
            for task in self.tasks[1:]:
                if task.is_spark_job:
                    if not isinstance(temp_data, DataFrame):
                        temp_data = spark.createDataFrame(temp_data)
                else:
                    if isinstance(temp_data, DataFrame):
                        _temp_data = temp_data.toPandas()
                        temp_data = pa.Table.from_pandas(_temp_data)
                new_task = task.add_input(dataframe=temp_data).add_common_yaml(
                    Common().as_yaml()
                )
                temp_data = new_task.transform(
                    spark, chain=True, materialize=False, params=params
                )
        return self.output(temp_data, spark)
as_dict()

Convert the flow to a dictionary representation.

Serializes the flow configuration to a dictionary suitable for storage or transmission. Removes Spark context references.

RETURNS DESCRIPTION

Dictionary containing the flow's configuration including

name, input, output, tasks, and description.

Source code in src/seeknal/flow.py
def as_dict(self):
    """Convert the flow to a dictionary representation.

    Serializes the flow configuration to a dictionary suitable for
    storage or transmission. Removes Spark context references.

    Returns:
        Dictionary containing the flow's configuration including
        name, input, output, tasks, and description.
    """
    # removing any reference to spark context
    if self.tasks is not None:
        for task in self.tasks:
            task.input = None
    flow_dict = asdict(self)
    if "input" in flow_dict:
        if flow_dict["input"] is not None:
            if flow_dict["input"]["kind"] is not None:
                flow_dict["input"]["kind"] = flow_dict["input"]["kind"].value
    if "output" in flow_dict:
        if flow_dict["output"] is not None:
            if flow_dict["output"]["kind"] is not None:
                flow_dict["output"]["kind"] = flow_dict["output"]["kind"].value
    if "tasks" in flow_dict:
        if flow_dict["tasks"] is not None:
            for index, task in enumerate(flow_dict["tasks"]):
                task["class_name"] = (
                    self.tasks[index].__module__
                    + "."
                    + type(self.tasks[index]).__name__
                )
    return flow_dict
as_yaml()

Convert the flow to a YAML string representation.

RETURNS DESCRIPTION

YAML-formatted string of the flow configuration.

Source code in src/seeknal/flow.py
def as_yaml(self):
    """Convert the flow to a YAML string representation.

    Returns:
        YAML-formatted string of the flow configuration.
    """
    flow_dict = self.as_dict()
    return yaml.dump(flow_dict)
from_dict(flow_dict: dict) staticmethod

Create a Flow instance from a dictionary.

Deserializes a flow configuration dictionary back into a Flow object. Reconstructs input, output, and task configurations.

PARAMETER DESCRIPTION
flow_dict

Dictionary containing flow configuration with keys like 'name', 'input', 'output', 'tasks'.

TYPE: dict

RETURNS DESCRIPTION

Flow instance with the configured settings.

RAISES DESCRIPTION
ValueError

If a task in the dictionary is missing 'class_name'.

Example

flow_config = {"name": "my_flow", "input": {...}, "output": {...}} flow = Flow.from_dict(flow_config)

Source code in src/seeknal/flow.py
@staticmethod
def from_dict(flow_dict: dict):
    """Create a Flow instance from a dictionary.

    Deserializes a flow configuration dictionary back into a Flow object.
    Reconstructs input, output, and task configurations.

    Args:
        flow_dict: Dictionary containing flow configuration with keys
            like 'name', 'input', 'output', 'tasks'.

    Returns:
        Flow instance with the configured settings.

    Raises:
        ValueError: If a task in the dictionary is missing 'class_name'.

    Example:
        >>> flow_config = {"name": "my_flow", "input": {...}, "output": {...}}
        >>> flow = Flow.from_dict(flow_config)
    """
    if "input" in flow_dict:
        flow_input = FlowInput(**flow_dict["input"])
        flow_input_enum = FlowInputEnum(flow_input.kind)
        flow_input.kind = flow_input_enum
    else:
        flow_input = FlowInput()

    if "output" in flow_dict:
        flow_output = FlowOutput(**flow_dict["output"])
        if flow_output.kind is not None:
            flow_output_enum = FlowOutputEnum(flow_output.kind)
            flow_output.kind = flow_output_enum
    else:
        flow_output = FlowOutput()
    arr_tasks = []
    if "tasks" in flow_dict:
        if flow_dict["tasks"] is not None:
            for i in flow_dict["tasks"]:
                if not "class_name" in i:
                    raise ValueError("Cannot identify task class")
                class_array = i["class_name"].split(".")
                class_name = class_array[-1]
                module_name = ".".join(class_array[0:-1])
                ParamClass = getattr(
                    importlib.import_module(module_name), class_name
                )
                i.pop("class_name")
                task = ParamClass(**i)
                arr_tasks.append(task)
    if arr_tasks == []:
        arr_tasks = None

    flow = Flow(
        input=flow_input,
        tasks=arr_tasks,
        output=flow_output,
        name=flow_dict["name"],
    )
    return flow
get_or_create()

Save or retrieve the flow from the backend.

If a flow with the same name exists in the current project, loads its configuration. Otherwise, saves this flow as a new entry.

RETURNS DESCRIPTION

Self with flow_id populated.

Note

Requires an active workspace and project context.

Source code in src/seeknal/flow.py
@require_workspace
@require_project
def get_or_create(self):
    """Save or retrieve the flow from the backend.

    If a flow with the same name exists in the current project, loads
    its configuration. Otherwise, saves this flow as a new entry.

    Returns:
        Self with flow_id populated.

    Note:
        Requires an active workspace and project context.
    """
    req = FlowRequest(
        body={
            "spec": self.as_dict(),
            "name": self.name,
            "description": self.description,
        }
    )
    flow = req.select_by_name(self.name)
    if flow is None:
        self.flow_id = req.save()
    else:
        logger.warning("Using an existing Flow.")
        my_flow = Flow.from_dict(json.loads(flow.spec))
        self.__dict__.update(my_flow.__dict__)
        self.flow_id = flow.id
    return self
list() staticmethod

List all flows in the current project.

Displays a formatted table of flows including name, description, specification, and timestamps.

Note

Requires an active workspace and project context. Outputs directly to the console using typer.echo.

Source code in src/seeknal/flow.py
@require_workspace
@staticmethod
def list():
    """List all flows in the current project.

    Displays a formatted table of flows including name, description,
    specification, and timestamps.

    Note:
        Requires an active workspace and project context.
        Outputs directly to the console using typer.echo.
    """
    check_project_id()
    flows = FlowRequest.select_by_project_id(context.project_id)
    if flows:
        flows = [
            {
                "name": flow.name,
                "description": flow.description,
                "flow_spec": yaml.dump(json.loads(flow.spec)),
                "created_at": pendulum.instance(flow.created_at).format(
                    "YYYY-MM-DD HH:MM:SS"
                ),
                "updated_at": pendulum.instance(flow.updated_at).format(
                    "YYYY-MM-DD HH:MM:SS"
                ),
            }
            for flow in flows
        ]

        typer.echo(tabulate(flows, headers="keys", tablefmt="github"))
    else:
        typer.echo("No flow found")
update(name: Optional[str] = None, input: Optional[FlowInput] = None, tasks: Optional[List[Task]] = None, output: Optional[FlowOutput] = None, description: str = '')

Update the flow configuration in the backend.

Updates the saved flow with new configuration values. Any parameter not provided will retain its existing value.

PARAMETER DESCRIPTION
name

New name for the flow.

TYPE: Optional[str] DEFAULT: None

input

New input configuration.

TYPE: Optional[FlowInput] DEFAULT: None

tasks

New list of tasks.

TYPE: Optional[List[Task]] DEFAULT: None

output

New output configuration.

TYPE: Optional[FlowOutput] DEFAULT: None

description

New description.

TYPE: str DEFAULT: ''

RAISES DESCRIPTION
ValueError

If the flow has not been saved yet or not found.

Note

Requires the flow to be saved first via get_or_create().

Source code in src/seeknal/flow.py
@require_workspace
@require_saved
@require_project
def update(
    self,
    name: Optional[str] = None,
    input: Optional[FlowInput] = None,
    tasks: Optional[List[Task]] = None,
    output: Optional[FlowOutput] = None,
    description: str = "",
):
    """Update the flow configuration in the backend.

    Updates the saved flow with new configuration values. Any parameter
    not provided will retain its existing value.

    Args:
        name: New name for the flow.
        input: New input configuration.
        tasks: New list of tasks.
        output: New output configuration.
        description: New description.

    Raises:
        ValueError: If the flow has not been saved yet or not found.

    Note:
        Requires the flow to be saved first via get_or_create().
    """
    if self.flow_id is None:
        raise ValueError("Flow not saved yet")
    flow = FlowRequest.select_by_id(self.flow_id)
    my_flow = Flow.from_dict(json.loads(flow.spec))
    if flow is None:
        raise ValueError("Flow not found")
    if name is None:
        name = flow.name
    if input is None:
        input = my_flow.input
    if output is None:
        output = my_flow.output
    if tasks is None:
        tasks = my_flow.tasks
    if description is None:
        description = flow.description
    new_flow = Flow(
        name=name, input=input, tasks=tasks, output=output, description=description
    )
    req = FlowRequest(
        body={"spec": new_flow.as_dict(), "name": name, "description": description}
    )
    req.save()
    self.__dict__.update(new_flow.__dict__)
delete()

Delete the flow from the backend.

Removes the saved flow from the seeknal backend permanently.

RETURNS DESCRIPTION

Result of the delete operation.

RAISES DESCRIPTION
ValueError

If the flow has not been saved yet.

Note

Requires the flow to be saved first via get_or_create().

Source code in src/seeknal/flow.py
@require_workspace
@require_saved
@require_project
def delete(self):
    """Delete the flow from the backend.

    Removes the saved flow from the seeknal backend permanently.

    Returns:
        Result of the delete operation.

    Raises:
        ValueError: If the flow has not been saved yet.

    Note:
        Requires the flow to be saved first via get_or_create().
    """
    if self.flow_id is None:
        raise ValueError("Invalid. Make sure load flow with get_or_create()")
    return FlowRequest.delete_by_id(self.flow_id)

Functions

run_flow(flow_name: Optional[str] = None, flow: Optional[Flow] = None, params=None, filters=None, date=None, start_date=None, end_date=None, name='run_flow')

Execute a flow by name or instance.

Convenience function to run a flow either by providing its name (loads from backend) or a Flow instance directly.

PARAMETER DESCRIPTION
flow_name

Name of a saved flow to load and run.

TYPE: Optional[str] DEFAULT: None

flow

Flow instance to run directly.

TYPE: Optional[Flow] DEFAULT: None

params

Optional dictionary of parameters to pass to tasks.

DEFAULT: None

filters

Optional filters to apply to the input data.

DEFAULT: None

date

Optional single date for filtering.

DEFAULT: None

start_date

Optional start date for date range filtering.

DEFAULT: None

end_date

Optional end date for date range filtering.

DEFAULT: None

name

Internal name for the operation (default: "run_flow").

DEFAULT: 'run_flow'

RETURNS DESCRIPTION

The processed data from the flow execution.

Example

Run by flow name

result = run_flow(flow_name="my_saved_flow", start_date="2024-01-01")

Run by instance

result = run_flow(flow=my_flow_instance, params={"key": "value"})

Source code in src/seeknal/flow.py
def run_flow(
    flow_name: Optional[str] = None,
    flow: Optional[Flow] = None,
    params=None,
    filters=None,
    date=None,
    start_date=None,
    end_date=None,
    name="run_flow",
):
    """Execute a flow by name or instance.

    Convenience function to run a flow either by providing its name
    (loads from backend) or a Flow instance directly.

    Args:
        flow_name: Name of a saved flow to load and run.
        flow: Flow instance to run directly.
        params: Optional dictionary of parameters to pass to tasks.
        filters: Optional filters to apply to the input data.
        date: Optional single date for filtering.
        start_date: Optional start date for date range filtering.
        end_date: Optional end date for date range filtering.
        name: Internal name for the operation (default: "run_flow").

    Returns:
        The processed data from the flow execution.

    Example:
        >>> # Run by flow name
        >>> result = run_flow(flow_name="my_saved_flow", start_date="2024-01-01")
        >>> # Run by instance
        >>> result = run_flow(flow=my_flow_instance, params={"key": "value"})
    """

    def run_flow_by_instance(flow: Flow):
        return flow.run(params, filters, date, start_date, end_date)

    @require_project
    def run_flow_by_name(name: str):
        flow = Flow(name=name).get_or_create()
        return run_flow_by_instance(flow)

    if flow_name is not None:
        return run_flow_by_name(flow_name)
    elif flow is not None:
        return run_flow_by_instance(flow)

Context

The Context class manages the execution context and session state for Seeknal operations.

Classes

Context(*args: Any, **kwargs: Any)

Bases: DotDict, local

A thread safe context store for seeknal data.

The Context is a DotDict subclass, and can be instantiated the same way.

PARAMETER DESCRIPTION
- *args

arguments to provide to the DotDict constructor (e.g., an initial dictionary)

TYPE: Any

- **kwargs

any key / value pairs to initialize this context with

TYPE: Any

Source code in src/seeknal/context.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    init = {}

    # Initialize with config context
    init.update(config.get("context", {}))
    # Overwrite with explicit args
    init.update(dict(*args, **kwargs))
    # Merge in config (with explicit args overwriting)
    init["config"] = merge_dicts(config, init.get("config", {}))
    super().__init__(init)

Functions

get(key: str, default: Any = None) -> Any

This method is defined for MyPy, which otherwise tries to type the inherited .get() method incorrectly.

PARAMETER DESCRIPTION
- key

the key to retrieve

TYPE: str

- default

a default value to return if the key is not found

TYPE: Any

RETURNS DESCRIPTION
Any
  • Any: the value of the key, or the default value if the key is not found
Source code in src/seeknal/configuration.py
def get(self, key: str, default: Any = None) -> Any:
    """
    This method is defined for MyPy, which otherwise tries to type
    the inherited `.get()` method incorrectly.

    Args:
        - key (str): the key to retrieve
        - default (Any): a default value to return if the key is not found

    Returns:
        - Any: the value of the key, or the default value if the key is not found
    """
    return super().get(key, default)
copy() -> DotDict

Creates and returns a shallow copy of the current DotDict

Source code in src/seeknal/configuration.py
def copy(self) -> "DotDict":
    """Creates and returns a shallow copy of the current DotDict"""
    return type(self)(self.__dict__.copy())
to_dict() -> dict

Converts current DotDict (and any DotDicts contained within) to an appropriate nested dictionary.

Source code in src/seeknal/configuration.py
def to_dict(self) -> dict:
    """
    Converts current `DotDict` (and any `DotDict`s contained within)
    to an appropriate nested dictionary.
    """
    # mypy cast
    return cast(dict, as_nested_dict(self, dct_class=dict))

Functions

configure_logging(testing: bool = False) -> logging.Logger

Creates a "seeknal" root logger with a StreamHandler that has level and formatting set from seeknal.config.

PARAMETER DESCRIPTION
- testing

a boolean specifying whether this configuration is for testing purposes only; this helps us isolate any global state during testing by configuring a "seeknal-test-logger" instead of the standard "seeknal" logger

TYPE: bool

RETURNS DESCRIPTION
Logger
  • logging.Logger: a configured logging object
Source code in src/seeknal/context.py
def configure_logging(testing: bool = False) -> logging.Logger:
    """
    Creates a "seeknal" root logger with a `StreamHandler` that has level and formatting
    set from `seeknal.config`.

    Args:
        - testing (bool, optional): a boolean specifying whether this configuration
            is for testing purposes only; this helps us isolate any global state during testing
            by configuring a "seeknal-test-logger" instead of the standard "seeknal" logger

    Returns:
        - logging.Logger: a configured logging object
    """
    name = "seeknal-test-logger" if testing else "seeknal"

    return _create_logger(name)

get_logger(name: str = None) -> logging.Logger

Returns a logger.

PARAMETER DESCRIPTION
- name

if None, the root seeknal logger is returned. If provided, a child logger of the name {name}" is returned. The child logger inherits the root logger's settings.

TYPE: str

RETURNS DESCRIPTION
Logger
  • logging.Logger: a configured logging object with the appropriate name
Source code in src/seeknal/context.py
def get_logger(name: str = None) -> logging.Logger:
    """
    Returns a logger.

    Args:
        - name (str): if `None`, the root seeknal logger is returned. If provided, a child
            logger of the name `{name}"` is returned. The child logger inherits
            the root logger's settings.

    Returns:
        - logging.Logger: a configured logging object with the appropriate name
    """

    if name is None:
        return seeknal_logger
    else:
        return seeknal_logger.getChild(name)

Configuration

The Configuration module handles configuration management, including loading and validating configuration files.

Classes

DotDict(init_dict: Optional[DictLike] = None, **kwargs: Any)

Bases: MutableMapping

A dict that also supports attribute ("dot") access. Think of this as an extension to the standard python dict object. Note: while any hashable object can be added to a DotDict, only valid Python identifiers can be accessed with the dot syntax; this excludes strings which begin in numbers, special characters, or double underscores.

PARAMETER DESCRIPTION
- init_dict

dictionary to initialize the DotDict

TYPE: dict

- **kwargs

key, value pairs with which to initialize the

TYPE: optional

Example
dotdict = DotDict({'a': 34}, b=56, c=set())
dotdict.a # 34
dotdict['b'] # 56
dotdict.c # set()
Source code in src/seeknal/configuration.py
def __init__(self, init_dict: Optional[DictLike] = None, **kwargs: Any):
    # a DotDict could have a key that shadows `update`
    if init_dict:
        super().update(init_dict)
    super().update(kwargs)

Functions

get(key: str, default: Any = None) -> Any

This method is defined for MyPy, which otherwise tries to type the inherited .get() method incorrectly.

PARAMETER DESCRIPTION
- key

the key to retrieve

TYPE: str

- default

a default value to return if the key is not found

TYPE: Any

RETURNS DESCRIPTION
Any
  • Any: the value of the key, or the default value if the key is not found
Source code in src/seeknal/configuration.py
def get(self, key: str, default: Any = None) -> Any:
    """
    This method is defined for MyPy, which otherwise tries to type
    the inherited `.get()` method incorrectly.

    Args:
        - key (str): the key to retrieve
        - default (Any): a default value to return if the key is not found

    Returns:
        - Any: the value of the key, or the default value if the key is not found
    """
    return super().get(key, default)
copy() -> DotDict

Creates and returns a shallow copy of the current DotDict

Source code in src/seeknal/configuration.py
def copy(self) -> "DotDict":
    """Creates and returns a shallow copy of the current DotDict"""
    return type(self)(self.__dict__.copy())
to_dict() -> dict

Converts current DotDict (and any DotDicts contained within) to an appropriate nested dictionary.

Source code in src/seeknal/configuration.py
def to_dict(self) -> dict:
    """
    Converts current `DotDict` (and any `DotDict`s contained within)
    to an appropriate nested dictionary.
    """
    # mypy cast
    return cast(dict, as_nested_dict(self, dct_class=dict))

Config

Bases: Box

A config is a Box subclass

Functions

copy() -> Config

Create a recursive copy of the config. Each level of the Config is a new Config object, so modifying keys won't affect the original Config object. However, values are not deep-copied, and mutations can affect the original.

Source code in src/seeknal/configuration.py
def copy(self) -> "Config":
    """
    Create a recursive copy of the config. Each level of the Config is a new Config object, so
    modifying keys won't affect the original Config object. However, values are not
    deep-copied, and mutations can affect the original.
    """
    new_config = Config()
    for key, value in self.items():
        if isinstance(value, Config):
            value = value.copy()
        new_config[key] = value
    return new_config

Functions

merge_dicts(d1: DictLike, d2: DictLike) -> DictLike

Updates d1 from d2 by replacing each (k, v1) pair in d1 with the corresponding (k, v2) pair in d2.

If the value of each pair is itself a dict, then the value is updated recursively.

PARAMETER DESCRIPTION
- d1

A dictionary to be replaced

TYPE: MutableMapping

- d2

A dictionary used for replacement

TYPE: MutableMapping

RETURNS DESCRIPTION
DictLike
  • A MutableMapping with the two dictionary contents merged
Source code in src/seeknal/configuration.py
def merge_dicts(d1: DictLike, d2: DictLike) -> DictLike:
    """
    Updates `d1` from `d2` by replacing each `(k, v1)` pair in `d1` with the
    corresponding `(k, v2)` pair in `d2`.

    If the value of each pair is itself a dict, then the value is updated
    recursively.

    Args:
        - d1 (MutableMapping): A dictionary to be replaced
        - d2 (MutableMapping): A dictionary used for replacement

    Returns:
        - A `MutableMapping` with the two dictionary contents merged
    """

    new_dict = d1.copy()

    for k, v in d2.items():
        if isinstance(new_dict.get(k), MutableMapping) and isinstance(
            v, MutableMapping
        ):
            new_dict[k] = merge_dicts(new_dict[k], d2[k])
        else:
            new_dict[k] = d2[k]
    return new_dict

as_nested_dict(obj: Union[DictLike, Iterable[DictLike]], dct_class: type = DotDict) -> Union[DictLike, Iterable[DictLike]]

Given a obj formatted as a dictionary, transforms it (and any nested dictionaries) into the provided dct_class

PARAMETER DESCRIPTION
- obj

An object that is formatted as a dict

TYPE: Any

- dct_class

the dict class to use (defaults to DotDict)

TYPE: type

RETURNS DESCRIPTION
Union[DictLike, Iterable[DictLike]]
  • A dict_class representation of the object passed in
Source code in src/seeknal/configuration.py
def as_nested_dict(
    obj: Union[DictLike, Iterable[DictLike]], dct_class: type = DotDict
) -> Union[DictLike, Iterable[DictLike]]:
    """
    Given a obj formatted as a dictionary, transforms it (and any nested dictionaries)
    into the provided dct_class

    Args:
        - obj (Any): An object that is formatted as a `dict`
        - dct_class (type): the `dict` class to use (defaults to DotDict)

    Returns:
        - A `dict_class` representation of the object passed in
    """
    if isinstance(obj, (list, tuple, set)):
        return type(obj)([as_nested_dict(d, dct_class) for d in obj])

    # calling as_nested_dict on `Box` objects pulls out their "private" keys due to our recursion
    # into `__dict__` if it exists. We can special-case Box and just convert it to dict this way,
    # which automatically handles recursion.
    elif isinstance(obj, Box):
        return dict(obj)
    elif isinstance(obj, (dict, DotDict)):
        # DotDicts could have keys that shadow `update` and `items`, so we
        # take care to avoid accessing those keys here
        return dct_class(
            {
                k: as_nested_dict(v, dct_class)
                for k, v in getattr(obj, "__dict__", obj).items()
            }
        )
    return obj

flatdict_to_dict(dct: dict, dct_class: Optional[Type[D]] = None) -> D

Converts a flattened dictionary back to a nested dictionary.

PARAMETER DESCRIPTION
- dct

The dictionary to be nested. Each key should be a CompoundKey, as generated by dict_to_flatdict()

TYPE: dict

- dct_class

the type of the result; defaults to dict

TYPE: type

RETURNS DESCRIPTION
D
  • D: An instance of dct_class used to represent a nested dictionary, bounded as a MutableMapping or dict
Source code in src/seeknal/configuration.py
def flatdict_to_dict(dct: dict, dct_class: Optional[Type[D]] = None) -> D:
    """Converts a flattened dictionary back to a nested dictionary.

    Args:
        - dct (dict): The dictionary to be nested. Each key should be a
            `CompoundKey`, as generated by `dict_to_flatdict()`
        - dct_class (type, optional): the type of the result; defaults to `dict`

    Returns:
        - D: An instance of `dct_class` used to represent a nested dictionary, bounded
            as a MutableMapping or dict
    """
    result = cast(D, (dct_class or dict)())
    for k, v in dct.items():
        if isinstance(k, CompoundKey):
            current_dict = result
            for ki in k[:-1]:
                current_dict = current_dict.setdefault(  # type: ignore
                    ki, (dct_class or dict)()
                )
            current_dict[k[-1]] = v
        else:
            result[k] = v

    return result

string_to_type(val: str) -> Union[bool, int, float, str]

Helper function for transforming string env var values into typed values.

Maps
  • "true" (any capitalization) to True
  • "false" (any capitalization) to False
  • any other valid literal Python syntax interpretable by ast.literal_eval
PARAMETER DESCRIPTION
- val

the string value of an environment variable

TYPE: str

RETURNS DESCRIPTION
Union[bool, int, float, str]

Union[bool, int, float, str, dict, list, None, tuple]: the type-cast env var value

Source code in src/seeknal/configuration.py
def string_to_type(val: str) -> Union[bool, int, float, str]:
    """
    Helper function for transforming string env var values into typed values.

    Maps:
        - "true" (any capitalization) to `True`
        - "false" (any capitalization) to `False`
        - any other valid literal Python syntax interpretable by ast.literal_eval

    Arguments:
        - val (str): the string value of an environment variable

    Returns:
        Union[bool, int, float, str, dict, list, None, tuple]: the type-cast env var value
    """

    # bool
    if val.upper() == "TRUE":
        return True
    elif val.upper() == "FALSE":
        return False

    # dicts, ints, floats, or any other literal Python syntax
    try:
        val_as_obj = literal_eval(val)
        return val_as_obj
    except Exception:
        pass

    # return string value
    return val

interpolate_env_vars(env_var: str) -> Optional[Union[bool, int, float, str]]

Expands (potentially nested) env vars by repeatedly applying expandvars and expanduser until interpolation stops having any effect.

Source code in src/seeknal/configuration.py
def interpolate_env_vars(env_var: str) -> Optional[Union[bool, int, float, str]]:
    """
    Expands (potentially nested) env vars by repeatedly applying
    `expandvars` and `expanduser` until interpolation stops having
    any effect.
    """
    if not env_var or not isinstance(env_var, str):
        return env_var

    counter = 0

    while counter < 10:
        interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
        if interpolated == env_var:
            # if a change was made, apply string-to-type casts; otherwise leave alone
            # this is because we don't want to override TOML type-casting if this function
            # is applied to a non-interpolated value
            if counter > 1:
                interpolated = string_to_type(interpolated)  # type: ignore
            return interpolated
        else:
            env_var = interpolated
        counter += 1

    return None

create_user_config(dest_path: str, source_path: str = '') -> None

Copies the default configuration to a user-customizable file at dest_path

Source code in src/seeknal/configuration.py
def create_user_config(dest_path: str, source_path: str = "") -> None:
    """
    Copies the default configuration to a user-customizable file at `dest_path`
    """
    dest_path = cast(str, interpolate_env_vars(dest_path))
    if os.path.isfile(dest_path):
        raise ValueError("File already exists: {}".format(dest_path))
    os.makedirs(os.path.dirname(dest_path), exist_ok=True)

    with open(dest_path, "w") as dest:
        with open(source_path, "r") as source:
            dest.write(source.read())

dict_to_flatdict(dct: DictLike, parent: Optional[CompoundKey] = None) -> dict

Converts a (nested) dictionary to a flattened representation.

Each key of the flat dict will be a CompoundKey tuple containing the "chain of keys" for the corresponding value.

PARAMETER DESCRIPTION
- dct

The dictionary to flatten

TYPE: dict

- parent

Defaults to None. The parent key

TYPE: CompoundKey

RETURNS DESCRIPTION
dict
  • dict: A flattened dict
Source code in src/seeknal/configuration.py
def dict_to_flatdict(dct: DictLike, parent: Optional[CompoundKey] = None) -> dict:
    """Converts a (nested) dictionary to a flattened representation.

    Each key of the flat dict will be a CompoundKey tuple containing the "chain of keys"
    for the corresponding value.

    Args:
        - dct (dict): The dictionary to flatten
        - parent (CompoundKey, optional): Defaults to `None`. The parent key
        (you shouldn't need to set this)

    Returns:
        - dict: A flattened dict
    """

    items = []  # type: list
    parent = parent or CompoundKey()
    for k, v in dct.items():
        k_parent = CompoundKey(parent + (k,))
        if isinstance(v, dict):
            items.extend(dict_to_flatdict(v, parent=k_parent).items())
        else:
            items.append((k_parent, v))
    return dict(items)

process_task_defaults(config: Config) -> Config

Converts task defaults from basic types to Python objects like timedeltas

PARAMETER DESCRIPTION
- config

the configuration to modify

TYPE: Config

Source code in src/seeknal/configuration.py
def process_task_defaults(config: Config) -> Config:
    """
    Converts task defaults from basic types to Python objects like timedeltas

    Args:
        - config (Config): the configuration to modify
    """
    # make sure defaults exists
    defaults = config.setdefault("tasks", {}).setdefault("defaults", {})

    # max_retries defaults to 0 if not set, False, or None
    if not defaults.setdefault("max_retries", 0):
        defaults.max_retries = 0
    defaults.max_retries = defaults.get("max_retries", 0) or 0

    # retry_delay defaults to None if not set - also check for False because TOML has no NULL
    if defaults.setdefault("retry_delay", False) is False:
        defaults.retry_delay = None
    elif isinstance(defaults.retry_delay, int):
        defaults.retry_delay = datetime.timedelta(seconds=defaults.retry_delay)

    # timeout defaults to None if not set - also check for False because TOML has no NULL
    if defaults.setdefault("timeout", False) is False:
        defaults.timeout = None

    return config

to_environment_variables(config: Config, include: Optional[Iterable[str]] = None, prefix: str = '') -> dict

Convert a configuration object to environment variables

Values will be cast to strings using 'str'

PARAMETER DESCRIPTION
- config

The configuration object to parse

- include

An optional set of keys to include. Each key to include should be formatted as 'section.key' or 'section.section.key'

- prefix

The prefix for the environment variables. Defaults to "".

RETURNS DESCRIPTION
dict
  • A dictionary mapping key to values e.g. __SECTION__KEY: VALUE
Source code in src/seeknal/configuration.py
def to_environment_variables(
    config: Config, include: Optional[Iterable[str]] = None, prefix: str = ""
) -> dict:
    """
    Convert a configuration object to environment variables

    Values will be cast to strings using 'str'

    Args:
        - config: The configuration object to parse
        - include: An optional set of keys to include. Each key to include should be
            formatted as 'section.key' or 'section.section.key'
        - prefix: The prefix for the environment variables. Defaults to "".

    Returns:
        - A dictionary mapping key to values e.g.
            __SECTION__KEY: VALUE
    """
    # Convert to a flat dict for construction without recursion
    flat_config = dict_to_flatdict(config)

    # Generate env vars as "PREFIX__SECTION__KEY"
    return {
        "__".join([prefix] + list(key)).upper(): str(value)
        for key, value in flat_config.items()
        # Only include the specified keys
        if not include or ".".join(key) in include
    }

validate_config(config: Config) -> None

Validates that the configuration file is valid. - keys do not shadow Config methods

Note that this is performed when the config is first loaded, but not after.

Source code in src/seeknal/configuration.py
def validate_config(config: Config) -> None:
    """
    Validates that the configuration file is valid.
        - keys do not shadow Config methods

    Note that this is performed when the config is first loaded, but not after.
    """

    def check_valid_keys(config: Config) -> None:
        """
        Recursively check that keys do not shadow methods of the Config object
        """
        invalid_keys = dir(Config)
        for k, v in config.items():
            if k in invalid_keys:
                raise ValueError('Invalid config key: "{}"'.format(k))
            if isinstance(v, Config):
                check_valid_keys(v)

    check_valid_keys(config)

load_toml(path: str) -> dict

Loads a config dictionary from TOML

Source code in src/seeknal/configuration.py
def load_toml(path: str) -> dict:
    """
    Loads a config dictionary from TOML
    """
    return {
        key: value
        for key, value in toml.load(cast(str, interpolate_env_vars(path))).items()
    }

interpolate_config(config: dict, env_var_prefix: Optional[str] = None) -> Config

Processes a config dictionary, such as the one loaded from load_toml.

Source code in src/seeknal/configuration.py
def interpolate_config(config: dict, env_var_prefix: Optional[str] = None) -> Config:
    """
    Processes a config dictionary, such as the one loaded from `load_toml`.
    """

    # toml supports nested dicts, so we work with a flattened representation to do any
    # requested interpolation
    flat_config = dict_to_flatdict(config)

    # --------------------- Interpolate env vars -----------------------
    # check if any env var sets a configuration value with the format:
    #     [ENV_VAR_PREFIX]__[Section]__[Optional Sub-Sections...]__[Key] = Value
    # and if it does, add it to the config file.

    if env_var_prefix:

        for env_var, env_var_value in os.environ.items():
            if env_var.startswith(env_var_prefix + "__"):

                # strip the prefix off the env var
                env_var_option = env_var[len(env_var_prefix + "__") :]

                # make sure the resulting env var has at least one delimitied section and key
                if "__" not in env_var:
                    continue

                # place the env var in the flat config as a compound key
                if env_var_option.upper().startswith("CONTEXT__SECRETS"):
                    # Lowercase `context__secrets` but retain case of the secret keys
                    formatted_option = env_var_option.replace(
                        "CONTEXT__SECRETS", "context__secrets"
                    ).split("__")
                    config_option = CompoundKey(formatted_option)
                else:
                    config_option = CompoundKey(env_var_option.lower().split("__"))

                flat_config[config_option] = string_to_type(
                    cast(str, interpolate_env_vars(env_var_value))
                )

    # interpolate any env vars referenced
    for k, v in list(flat_config.items()):
        val = interpolate_env_vars(v)
        if isinstance(val, str):
            val = string_to_type(val)
        flat_config[k] = val

    # --------------------- Interpolate other config keys -----------------
    # TOML doesn't support references to other keys... but we do!
    # This has the potential to lead to nasty recursions, so we check at most 10 times.
    # we use a set called "keys_to_check" to track only the ones of interest, so we aren't
    # checking every key every time.

    keys_to_check = set(flat_config.keys())

    for _ in range(10):

        # iterate over every key and value to check if the value uses interpolation
        for k in list(keys_to_check):

            # if the value isn't a string, it can't be a reference, so we exit
            if not isinstance(flat_config[k], str):
                keys_to_check.remove(k)
                continue

            # see if the ${...} syntax was used in the value and exit if it wasn't
            match = INTERPOLATION_REGEX.search(flat_config[k])
            if not match:
                keys_to_check.remove(k)
                continue

            # the matched_string includes "${}"; the matched_key is just the inner value
            matched_string = match.group(0)
            matched_key = match.group(1)

            # get the referenced key from the config value
            ref_key = CompoundKey(matched_key.split("."))
            # get the value corresponding to the referenced key
            ref_value = flat_config.get(ref_key, "")

            # if the matched was the entire value, replace it with the interpolated value
            if flat_config[k] == matched_string:
                flat_config[k] = ref_value
            # if it was a partial match, then drop the interpolated value into the string
            else:
                flat_config[k] = flat_config[k].replace(
                    matched_string, str(ref_value), 1
                )

    return cast(Config, flatdict_to_dict(flat_config, dct_class=Config))

load_configuration(path: str = '', user_config_path: Optional[str] = None, backend_config_path: Optional[str] = None, env_var_prefix: Optional[str] = None) -> Config

Loads a configuration.

PARAMETER DESCRIPTION
- path

DEPRECATED - no longer used

TYPE: str

- user_config_path

an optional path to a user config file. If a user config is provided, it will be used to update the main config prior to interpolation

TYPE: str

- backend_config_path

an optional path to a backend config file

TYPE: str

- env_var_prefix

any env vars matching this prefix will be used to create configuration values

TYPE: str

RETURNS DESCRIPTION
Config
  • Config
Source code in src/seeknal/configuration.py
def load_configuration(
    path: str = "",
    user_config_path: Optional[str] = None,
    backend_config_path: Optional[str] = None,
    env_var_prefix: Optional[str] = None,
) -> Config:
    """
    Loads a configuration.

    Args:
        - path (str): DEPRECATED - no longer used
        - user_config_path (str): an optional path to a user config file. If a user config
            is provided, it will be used to update the main config prior to interpolation
        - backend_config_path (str): an optional path to a backend config file
        - env_var_prefix (str): any env vars matching this prefix will be used to create
            configuration values

    Returns:
        - Config
    """
    # Start with default config
    config_dict = DEFAULT_CONFIG_DICT.copy()

    # update with user config
    if user_config_path:
        user_config_path = cast(str, interpolate_env_vars(user_config_path))
        if os.path.exists(user_config_path):
            user_config = load_toml(user_config_path)
            config_dict.update(user_config)

    # update with backend config
    if backend_config_path:
        backend_config_path = cast(str, interpolate_env_vars(backend_config_path))
        if os.path.exists(backend_config_path):
            backend_config = load_toml(backend_config_path)
            config_dict.update(backend_config)

    config_dict = interpolate_config(config_dict, env_var_prefix=env_var_prefix)
    config = Config(config_dict)
    validate_config(config)
    process_task_defaults(config)

    return config

load_default_config() -> Config

Load the default configuration.

RETURNS DESCRIPTION
Config
  • Config: A configuration object with default settings
Source code in src/seeknal/configuration.py
def load_default_config() -> Config:
    """
    Load the default configuration.

    Returns:
        - Config: A configuration object with default settings
    """
    return load_configuration(
        user_config_path=USER_CONFIG,
        backend_config_path=BACKEND_CONFIG,
        env_var_prefix=ENV_VAR_PREFIX
    )

Validation Utilities

Seeknal provides comprehensive validation utilities for SQL identifiers, table names, column names, and file paths to prevent injection attacks and ensure data integrity.

SQL Identifier and Path Validation Module.

This module provides functions to validate SQL identifiers (table names, column names, database names) and file paths to prevent SQL injection attacks, DoS through long names, and schema confusion.

Classes

Functions

validate_sql_identifier(identifier: str, identifier_type: str = 'identifier', max_length: int = SQL_IDENTIFIER_MAX_LENGTH) -> str

Validate a SQL identifier (table name, column name, database name).

SQL identifiers must: - Start with a letter (a-z, A-Z) or underscore () - Contain only alphanumeric characters (a-z, A-Z, 0-9) and underscores () - Be no longer than max_length characters (default 128) - Not be empty

PARAMETER DESCRIPTION
identifier

The SQL identifier to validate.

TYPE: str

identifier_type

Type of identifier for error messages (e.g., "table name").

TYPE: str DEFAULT: 'identifier'

max_length

Maximum allowed length for the identifier.

TYPE: int DEFAULT: SQL_IDENTIFIER_MAX_LENGTH

RETURNS DESCRIPTION
str

The validated identifier (unchanged if valid).

RAISES DESCRIPTION
InvalidIdentifierError

If the identifier is invalid.

Source code in src/seeknal/validation.py
def validate_sql_identifier(
    identifier: str,
    identifier_type: str = "identifier",
    max_length: int = SQL_IDENTIFIER_MAX_LENGTH,
) -> str:
    """
    Validate a SQL identifier (table name, column name, database name).

    SQL identifiers must:
    - Start with a letter (a-z, A-Z) or underscore (_)
    - Contain only alphanumeric characters (a-z, A-Z, 0-9) and underscores (_)
    - Be no longer than max_length characters (default 128)
    - Not be empty

    Args:
        identifier: The SQL identifier to validate.
        identifier_type: Type of identifier for error messages (e.g., "table name").
        max_length: Maximum allowed length for the identifier.

    Returns:
        The validated identifier (unchanged if valid).

    Raises:
        InvalidIdentifierError: If the identifier is invalid.
    """
    if not identifier:
        raise InvalidIdentifierError(
            f"Invalid {identifier_type}: cannot be empty"
        )

    if not isinstance(identifier, str):
        raise InvalidIdentifierError(
            f"Invalid {identifier_type}: must be a string, got {type(identifier).__name__}"
        )

    if len(identifier) > max_length:
        raise InvalidIdentifierError(
            f"Invalid {identifier_type}: '{identifier[:50]}...' exceeds maximum length of {max_length} characters"
        )

    if not SQL_IDENTIFIER_PATTERN.match(identifier):
        raise InvalidIdentifierError(
            f"Invalid {identifier_type}: '{identifier}' contains invalid characters. "
            f"Must start with a letter or underscore and contain only alphanumeric characters and underscores."
        )

    return identifier

validate_table_name(table_name: str) -> str

Validate a SQL table name.

Table names must follow SQL identifier rules: - Start with a letter or underscore - Contain only alphanumeric characters and underscores - Be no longer than 128 characters

PARAMETER DESCRIPTION
table_name

The table name to validate.

TYPE: str

RETURNS DESCRIPTION
str

The validated table name (unchanged if valid).

RAISES DESCRIPTION
InvalidIdentifierError

If the table name is invalid.

Source code in src/seeknal/validation.py
def validate_table_name(table_name: str) -> str:
    """
    Validate a SQL table name.

    Table names must follow SQL identifier rules:
    - Start with a letter or underscore
    - Contain only alphanumeric characters and underscores
    - Be no longer than 128 characters

    Args:
        table_name: The table name to validate.

    Returns:
        The validated table name (unchanged if valid).

    Raises:
        InvalidIdentifierError: If the table name is invalid.
    """
    return validate_sql_identifier(table_name, identifier_type="table name")

validate_column_name(column_name: str) -> str

Validate a SQL column name.

Column names must follow SQL identifier rules: - Start with a letter or underscore - Contain only alphanumeric characters and underscores - Be no longer than 128 characters

PARAMETER DESCRIPTION
column_name

The column name to validate.

TYPE: str

RETURNS DESCRIPTION
str

The validated column name (unchanged if valid).

RAISES DESCRIPTION
InvalidIdentifierError

If the column name is invalid.

Source code in src/seeknal/validation.py
def validate_column_name(column_name: str) -> str:
    """
    Validate a SQL column name.

    Column names must follow SQL identifier rules:
    - Start with a letter or underscore
    - Contain only alphanumeric characters and underscores
    - Be no longer than 128 characters

    Args:
        column_name: The column name to validate.

    Returns:
        The validated column name (unchanged if valid).

    Raises:
        InvalidIdentifierError: If the column name is invalid.
    """
    return validate_sql_identifier(column_name, identifier_type="column name")

validate_database_name(database_name: str) -> str

Validate a SQL database name.

Database names must follow SQL identifier rules: - Start with a letter or underscore - Contain only alphanumeric characters and underscores - Be no longer than 128 characters

PARAMETER DESCRIPTION
database_name

The database name to validate.

TYPE: str

RETURNS DESCRIPTION
str

The validated database name (unchanged if valid).

RAISES DESCRIPTION
InvalidIdentifierError

If the database name is invalid.

Source code in src/seeknal/validation.py
def validate_database_name(database_name: str) -> str:
    """
    Validate a SQL database name.

    Database names must follow SQL identifier rules:
    - Start with a letter or underscore
    - Contain only alphanumeric characters and underscores
    - Be no longer than 128 characters

    Args:
        database_name: The database name to validate.

    Returns:
        The validated database name (unchanged if valid).

    Raises:
        InvalidIdentifierError: If the database name is invalid.
    """
    return validate_sql_identifier(database_name, identifier_type="database name")

validate_file_path(file_path: str, max_length: int = FILE_PATH_MAX_LENGTH, forbidden_chars: Optional[List[str]] = None) -> str

Validate a file path to ensure it doesn't contain SQL injection characters.

File paths are validated to: - Not be empty - Not exceed max_length characters (default 4096) - Not contain forbidden characters that could be used for SQL injection

PARAMETER DESCRIPTION
file_path

The file path to validate.

TYPE: str

max_length

Maximum allowed length for the path.

TYPE: int DEFAULT: FILE_PATH_MAX_LENGTH

forbidden_chars

List of forbidden character sequences. Defaults to common SQL injection characters: ', ", ;, --, /*, */

TYPE: Optional[List[str]] DEFAULT: None

RETURNS DESCRIPTION
str

The validated file path (unchanged if valid).

RAISES DESCRIPTION
InvalidPathError

If the file path is invalid or contains forbidden characters.

Source code in src/seeknal/validation.py
def validate_file_path(
    file_path: str,
    max_length: int = FILE_PATH_MAX_LENGTH,
    forbidden_chars: Optional[List[str]] = None,
) -> str:
    """
    Validate a file path to ensure it doesn't contain SQL injection characters.

    File paths are validated to:
    - Not be empty
    - Not exceed max_length characters (default 4096)
    - Not contain forbidden characters that could be used for SQL injection

    Args:
        file_path: The file path to validate.
        max_length: Maximum allowed length for the path.
        forbidden_chars: List of forbidden character sequences. Defaults to
            common SQL injection characters: ', ", ;, --, /*, */

    Returns:
        The validated file path (unchanged if valid).

    Raises:
        InvalidPathError: If the file path is invalid or contains forbidden characters.
    """
    if forbidden_chars is None:
        forbidden_chars = FILE_PATH_FORBIDDEN_CHARS

    if not file_path:
        raise InvalidPathError("Invalid file path: cannot be empty")

    if not isinstance(file_path, str):
        raise InvalidPathError(
            f"Invalid file path: must be a string, got {type(file_path).__name__}"
        )

    if len(file_path) > max_length:
        raise InvalidPathError(
            f"Invalid file path: '{file_path[:50]}...' exceeds maximum length of {max_length} characters"
        )

    for char in forbidden_chars:
        if char in file_path:
            raise InvalidPathError(
                f"Invalid file path: contains forbidden character sequence '{char}'"
            )

    return file_path

validate_sql_value(value: str, value_type: str = 'value', forbidden_patterns: Optional[List[str]] = None) -> str

Validate a value used in SQL WHERE clauses to prevent SQL injection.

This function checks for common SQL injection patterns in values that will be used in WHERE clauses or other SQL contexts.

PARAMETER DESCRIPTION
value

The value to validate.

TYPE: str

value_type

Type of value for error messages (e.g., "filter value").

TYPE: str DEFAULT: 'value'

forbidden_patterns

List of forbidden patterns. Defaults to common SQL injection patterns like UNION, DROP, DELETE, etc.

TYPE: Optional[List[str]] DEFAULT: None

RETURNS DESCRIPTION
str

The validated value (unchanged if valid).

RAISES DESCRIPTION
InvalidIdentifierError

If the value contains forbidden patterns.

Source code in src/seeknal/validation.py
def validate_sql_value(
    value: str,
    value_type: str = "value",
    forbidden_patterns: Optional[List[str]] = None,
) -> str:
    """
    Validate a value used in SQL WHERE clauses to prevent SQL injection.

    This function checks for common SQL injection patterns in values
    that will be used in WHERE clauses or other SQL contexts.

    Args:
        value: The value to validate.
        value_type: Type of value for error messages (e.g., "filter value").
        forbidden_patterns: List of forbidden patterns. Defaults to common
            SQL injection patterns like UNION, DROP, DELETE, etc.

    Returns:
        The validated value (unchanged if valid).

    Raises:
        InvalidIdentifierError: If the value contains forbidden patterns.
    """
    if forbidden_patterns is None:
        forbidden_patterns = SQL_VALUE_FORBIDDEN_PATTERNS

    if not isinstance(value, str):
        # Non-string values are returned as-is (they'll be converted later)
        return value

    if not value:
        # Empty strings are allowed for values
        return value

    # Check for forbidden patterns (case-insensitive)
    value_upper = value.upper()
    for pattern in forbidden_patterns:
        if pattern.upper() in value_upper:
            raise InvalidIdentifierError(
                f"Invalid {value_type}: contains forbidden pattern '{pattern}'"
            )

    return value

validate_column_names(column_names: List[str]) -> List[str]

Validate a list of SQL column names.

PARAMETER DESCRIPTION
column_names

List of column names to validate.

TYPE: List[str]

RETURNS DESCRIPTION
List[str]

The validated list of column names (unchanged if all valid).

RAISES DESCRIPTION
InvalidIdentifierError

If any column name is invalid.

Source code in src/seeknal/validation.py
def validate_column_names(column_names: List[str]) -> List[str]:
    """
    Validate a list of SQL column names.

    Args:
        column_names: List of column names to validate.

    Returns:
        The validated list of column names (unchanged if all valid).

    Raises:
        InvalidIdentifierError: If any column name is invalid.
    """
    if not column_names:
        raise InvalidIdentifierError("Invalid column names: list cannot be empty")

    for column_name in column_names:
        validate_column_name(column_name)

    return column_names

validate_schema_name(schema_name: str) -> str

Validate a SQL schema name.

Schema names must follow SQL identifier rules: - Start with a letter or underscore - Contain only alphanumeric characters and underscores - Be no longer than 128 characters

PARAMETER DESCRIPTION
schema_name

The schema name to validate.

TYPE: str

RETURNS DESCRIPTION
str

The validated schema name (unchanged if valid).

RAISES DESCRIPTION
InvalidIdentifierError

If the schema name is invalid.

Source code in src/seeknal/validation.py
def validate_schema_name(schema_name: str) -> str:
    """
    Validate a SQL schema name.

    Schema names must follow SQL identifier rules:
    - Start with a letter or underscore
    - Contain only alphanumeric characters and underscores
    - Be no longer than 128 characters

    Args:
        schema_name: The schema name to validate.

    Returns:
        The validated schema name (unchanged if valid).

    Raises:
        InvalidIdentifierError: If the schema name is invalid.
    """
    return validate_sql_identifier(schema_name, identifier_type="schema name")

Exceptions

Core exception classes for error handling throughout the Seeknal library.

Classes