Skip to content

FeatureStore API Reference

This page documents the FeatureStore module, which provides the core feature store functionality for managing and serving ML features for both offline (batch) and online (real-time) use cases.

Overview

The FeatureStore module provides essential classes for feature engineering and serving:

Class Purpose
FeatureGroup Define and manage groups of features with customizable materialization
FeatureLookup Specify feature lookups from feature groups
HistoricalFeatures Retrieve historical feature data with point-in-time correctness
OnlineFeatures Serve features in real-time for model inference
OfflineStore Configure offline storage backends
OnlineStore Configure online storage backends

FeatureGroup

The FeatureGroup class is the primary abstraction for defining and managing groups of related features. It handles feature materialization to both offline and online stores.

Bases: FeatureStore

A feature group representing a set of features created from a data source.

A FeatureGroup is a logical grouping of related features that share the same entity and are typically computed from the same data source (Flow or DataFrame). It supports both offline and online materialization for feature storage and serving.

ATTRIBUTE DESCRIPTION
name

The unique name of the feature group within the project.

TYPE: str

materialization

Configuration for how features are stored and served, including offline/online storage settings and TTL configurations.

TYPE: Materialization

source

The data source for computing features. Can be a Flow pipeline or a Spark DataFrame. If Flow, output will be set to SPARK_DATAFRAME.

TYPE: Optional[Union[Flow, DataFrame]]

description

Optional human-readable description of the feature group.

TYPE: Optional[str]

features

List of Feature objects to register. If None, all columns except join_keys and event_time will be registered as features.

TYPE: Optional[List[Feature]]

tag

Optional list of tags for categorizing and filtering feature groups.

TYPE: Optional[List[str]]

validation_config

Configuration for data validation including validators to run and validation mode (FAIL or WARN). When set, enables declarative validation that can be used with the validate() method.

TYPE: Optional[ValidationConfig]

feature_group_id

Unique identifier assigned by the system after creation.

TYPE: Optional[str]

offline_watermarks

List of timestamps indicating when offline data was materialized. Used for tracking data freshness.

TYPE: List[str]

online_watermarks

List of timestamps indicating when online data was materialized. Used for tracking data freshness.

TYPE: List[str]

version

Schema version number for the feature group.

TYPE: Optional[int]

created_at

Timestamp when the feature group was created.

TYPE: Optional[str]

updated_at

Timestamp when the feature group was last updated.

TYPE: Optional[str]

avro_schema

Avro schema definition for the feature data structure.

TYPE: Optional[dict]

Example

from seeknal.featurestore import FeatureGroup, Materialization from seeknal.entity import Entity

Create a feature group from a flow

fg = FeatureGroup( ... name="user_features", ... materialization=Materialization(event_time_col="event_date"), ... ) fg.entity = Entity(name="user", join_keys=["user_id"]) fg.set_flow(my_flow).set_features().get_or_create()

Note

A SparkContext must be active before creating a FeatureGroup instance. Initialize your Project and Workspace first if you encounter SparkContext errors.

Functions

set_flow(flow: Flow)

Set the data flow pipeline as the source for this feature group.

Configures a Flow as the data source for computing features. The flow's output will be automatically set to SPARK_DATAFRAME if not already configured. If the flow hasn't been persisted, it will be created.

PARAMETER DESCRIPTION
flow

The Flow pipeline to use for feature computation.

TYPE: Flow

RETURNS DESCRIPTION
FeatureGroup

The current instance for method chaining.

Example

fg = FeatureGroup(name="user_features") fg.set_flow(my_transformation_flow)

Source code in src/seeknal/featurestore/feature_group.py
def set_flow(self, flow: Flow):
    """Set the data flow pipeline as the source for this feature group.

    Configures a Flow as the data source for computing features. The flow's
    output will be automatically set to SPARK_DATAFRAME if not already
    configured. If the flow hasn't been persisted, it will be created.

    Args:
        flow: The Flow pipeline to use for feature computation.

    Returns:
        FeatureGroup: The current instance for method chaining.

    Example:
        >>> fg = FeatureGroup(name="user_features")
        >>> fg.set_flow(my_transformation_flow)
    """
    if flow.output is not None:
        if flow.output.kind != FlowOutputEnum.SPARK_DATAFRAME:
            flow.output = FlowOutput(kind=FlowOutputEnum.SPARK_DATAFRAME)
        if not "flow_id" in vars(flow):
            flow.get_or_create()
    self.source = flow
    return self

set_dataframe(dataframe: DataFrame)

Set a Spark DataFrame as the source for this feature group.

Configures a pre-computed Spark DataFrame as the data source for features. This is useful when features have already been computed or when using ad-hoc data that doesn't require a Flow pipeline.

PARAMETER DESCRIPTION
dataframe

The Spark DataFrame containing the feature data.

TYPE: DataFrame

RETURNS DESCRIPTION
FeatureGroup

The current instance for method chaining.

Example

fg = FeatureGroup(name="user_features") fg.set_dataframe(my_spark_df)

Source code in src/seeknal/featurestore/feature_group.py
def set_dataframe(self, dataframe: DataFrame):
    """Set a Spark DataFrame as the source for this feature group.

    Configures a pre-computed Spark DataFrame as the data source for
    features. This is useful when features have already been computed
    or when using ad-hoc data that doesn't require a Flow pipeline.

    Args:
        dataframe: The Spark DataFrame containing the feature data.

    Returns:
        FeatureGroup: The current instance for method chaining.

    Example:
        >>> fg = FeatureGroup(name="user_features")
        >>> fg.set_dataframe(my_spark_df)
    """
    self.source = dataframe
    return self

set_validation_config(config: ValidationConfig)

Set validation configuration for this feature group.

PARAMETER DESCRIPTION
config

Validation configuration containing validators to run and validation mode (FAIL or WARN).

TYPE: ValidationConfig

RETURNS DESCRIPTION
self

Returns the FeatureGroup instance for method chaining.

Example

from seeknal.feature_validation.models import ValidationConfig, ValidatorConfig

config = ValidationConfig( ... mode=ValidationMode.WARN, ... validators=[ ... ValidatorConfig(validator_type="null", columns=["user_id"]), ... ValidatorConfig(validator_type="range", columns=["age"], ... params={"min_val": 0, "max_val": 120}) ... ] ... ) feature_group.set_validation_config(config)

Source code in src/seeknal/featurestore/feature_group.py
def set_validation_config(self, config: ValidationConfig):
    """
    Set validation configuration for this feature group.

    Args:
        config (ValidationConfig): Validation configuration containing
            validators to run and validation mode (FAIL or WARN).

    Returns:
        self: Returns the FeatureGroup instance for method chaining.

    Example:
        >>> from seeknal.feature_validation.models import ValidationConfig, ValidatorConfig
        >>>
        >>> config = ValidationConfig(
        ...     mode=ValidationMode.WARN,
        ...     validators=[
        ...         ValidatorConfig(validator_type="null", columns=["user_id"]),
        ...         ValidatorConfig(validator_type="range", columns=["age"],
        ...                         params={"min_val": 0, "max_val": 120})
        ...     ]
        ... )
        >>> feature_group.set_validation_config(config)
    """
    self.validation_config = config
    return self

validate(validators: List[BaseValidator], mode: Union[str, ValidationMode] = ValidationMode.FAIL, reference_date: Optional[str] = None) -> ValidationSummary

Validate the feature group data using the provided validators.

This method runs a list of validators against the feature group's data and returns a summary of the validation results. The validation can be configured to either warn on failures (continue execution) or fail immediately on the first validation failure.

PARAMETER DESCRIPTION
validators

List of validators to run against the feature group data. Each validator should be an instance of a class that inherits from BaseValidator (e.g., NullValidator, RangeValidator, UniquenessValidator, FreshnessValidator, or CustomValidator).

TYPE: List[BaseValidator]

mode

Validation execution mode. - ValidationMode.FAIL or "fail": Raise exception on first failure. - ValidationMode.WARN or "warn": Log failures but continue execution. Defaults to ValidationMode.FAIL.

TYPE: Union[str, ValidationMode] DEFAULT: FAIL

reference_date

Reference date for running the source Flow. Only used when source is a Flow. Defaults to None.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
ValidationSummary

A summary containing all validation results, pass/fail status, and counts.

TYPE: ValidationSummary

RAISES DESCRIPTION
ValueError

If source is not set (use @require_set_source decorator).

ValidationException

If mode is FAIL and any validator fails.

Example

from seeknal.feature_validation.validators import NullValidator, RangeValidator from seeknal.feature_validation.models import ValidationMode

Create validators

validators = [ ... NullValidator(columns=["user_id", "email"]), ... RangeValidator(column="age", min_val=0, max_val=120) ... ]

Run validation in warn mode (continues on failures)

summary = feature_group.validate(validators, mode=ValidationMode.WARN) print(f"Passed: {summary.passed}, Failed: {summary.failed_count}")

Run validation in fail mode (stops on first failure)

try: ... summary = feature_group.validate(validators, mode="fail") ... except ValidationException as e: ... print(f"Validation failed: {e.message}")

Source code in src/seeknal/featurestore/feature_group.py
@require_set_source
def validate(
    self,
    validators: List[BaseValidator],
    mode: Union[str, ValidationMode] = ValidationMode.FAIL,
    reference_date: Optional[str] = None,
) -> ValidationSummary:
    """
    Validate the feature group data using the provided validators.

    This method runs a list of validators against the feature group's data
    and returns a summary of the validation results. The validation can be
    configured to either warn on failures (continue execution) or fail
    immediately on the first validation failure.

    Args:
        validators (List[BaseValidator]): List of validators to run against
            the feature group data. Each validator should be an instance of
            a class that inherits from BaseValidator (e.g., NullValidator,
            RangeValidator, UniquenessValidator, FreshnessValidator, or
            CustomValidator).
        mode (Union[str, ValidationMode], optional): Validation execution mode.
            - ValidationMode.FAIL or "fail": Raise exception on first failure.
            - ValidationMode.WARN or "warn": Log failures but continue execution.
            Defaults to ValidationMode.FAIL.
        reference_date (Optional[str], optional): Reference date for running
            the source Flow. Only used when source is a Flow. Defaults to None.

    Returns:
        ValidationSummary: A summary containing all validation results,
            pass/fail status, and counts.

    Raises:
        ValueError: If source is not set (use @require_set_source decorator).
        ValidationException: If mode is FAIL and any validator fails.

    Example:
        >>> from seeknal.feature_validation.validators import NullValidator, RangeValidator
        >>> from seeknal.feature_validation.models import ValidationMode
        >>>
        >>> # Create validators
        >>> validators = [
        ...     NullValidator(columns=["user_id", "email"]),
        ...     RangeValidator(column="age", min_val=0, max_val=120)
        ... ]
        >>>
        >>> # Run validation in warn mode (continues on failures)
        >>> summary = feature_group.validate(validators, mode=ValidationMode.WARN)
        >>> print(f"Passed: {summary.passed}, Failed: {summary.failed_count}")
        >>>
        >>> # Run validation in fail mode (stops on first failure)
        >>> try:
        ...     summary = feature_group.validate(validators, mode="fail")
        ... except ValidationException as e:
        ...     print(f"Validation failed: {e.message}")
    """
    # Convert string mode to ValidationMode enum if necessary
    if isinstance(mode, str):
        mode = ValidationMode(mode.lower())

    # Get the DataFrame from source
    if isinstance(self.source, Flow):
        df = self.source.run(date=reference_date)
    elif isinstance(self.source, DataFrame):
        df = self.source
    else:
        raise ValueError("Source must be a Flow or DataFrame")

    # Create validation runner and execute validators
    runner = ValidationRunner(
        validators=validators,
        mode=mode,
        feature_group_name=self.name,
    )

    # Run validation and return summary
    summary = runner.run(df)

    logger.info(
        f"Feature group '{self.name}' validation complete: "
        f"{summary.passed_count}/{summary.total_validators} validators passed"
    )

    return summary

set_features(features: Optional[List[Feature]] = None, reference_date: Optional[str] = None)

Set features to be used for this feature group. If features set as None, then it will use all columns except join_keys and event_time as features

PARAMETER DESCRIPTION
features

Specify features. If this None, then automatically get features from transformation result. In addition, user may tell the feature name and description, then the detail about datatype automatically fetch from transformation result. Defaults to None.

TYPE: Optional[List[Feature]] DEFAULT: None

reference_date

Specify date can be used as reference for get features from the transformation. Defaults to None.

TYPE: Optional[str] DEFAULT: None

validate_with_source

If set as true, it won't validate with transformation result. Defaults to True.

TYPE: bool

RAISES DESCRIPTION
ValueError

If specify features not found from the transformation result

RETURNS DESCRIPTION

Populate features of the feature group

Source code in src/seeknal/featurestore/feature_group.py
@require_set_source
@require_set_entity
def set_features(
    self,
    features: Optional[List[Feature]] = None,
    reference_date: Optional[str] = None,
):
    """
    Set features to be used for this feature group. If features set as None,
    then it will use all columns except join_keys and event_time as features

    Args:
        features (Optional[List[Feature]], optional): Specify features. If this None,
            then automatically get features from transformation result. In addition,
            user may tell the feature name and description, then the detail about datatype
            automatically fetch from transformation result. Defaults to None.
        reference_date (Optional[str], optional): Specify date can be used as reference for
            get features from the transformation. Defaults to None.
        validate_with_source (bool, optional): If set as true, it won't validate with
            transformation result. Defaults to True.

    Raises:
        ValueError: If specify features not found from the transformation result

    Returns:
        Populate features of the feature group
    """
    if self.source is None:
        raise ValueError("Source is not set")

    reserved_cols = []
    _features = None
    for z in self.entity.join_keys:
        reserved_cols.append(z)

    if self.materialization.event_time_col is not None:
        reserved_cols.append(self.materialization.event_time_col)
    # if features not known yet, then need to load the data first
    # for getting list of features
    if features is None:
        # logger.info("Using all columns except entity join_key and event_time columns.")
        if isinstance(self.source, Flow):
            res = self.source.run(date=reference_date).drop(*reserved_cols)
        elif isinstance(self.source, DataFrame):
            res = self.source.drop(*reserved_cols)
        else:
            raise ValueError("Source only accepts Flow or DataFrame.")
        _avro_schema = json.loads(
            self._jvm_gateway.za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils.toAvroSchema(
                res._jdf
            ).toString()
        )

        if _avro_schema is None:
            raise ValueError(
                f"Feature not found in the source. Please make sure features are available from source."
            )
        _features = self._parse_avro_schema(schema=_avro_schema)
    else:
        feature_names = [f.name for f in features]
        metadata = {}
        if self.features is None:
            for k in features:
                metadata[k.name] = {
                    "description": k.description,
                    **Feature(name=k.name).model_dump(exclude={"description"}),
                }
        else:
            for k in features:
                for f in self.features:
                    if k.name == f.name:
                        metadata[k.name] = f.dict()

        selections = feature_names + reserved_cols
        if isinstance(self.source, Flow):
            res = (
                self.source.run(date=reference_date)
                .select(*selections)
                .drop(*reserved_cols)
            )
        elif isinstance(self.source, DataFrame):
            res = self.source.select(*selections).drop(*reserved_cols)
        else:
            raise ValueError("Source only accepts Flow or DataFrame.")
        _avro_schema = json.loads(
            self._jvm_gateway.za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils.toAvroSchema(
                res._jdf
            ).toString()
        )
        if _avro_schema is None:
            raise ValueError(
                "Cannot parse schema from the source. Please make sure features are available from source."
            )
        _features = self._parse_avro_schema(schema=_avro_schema)
        for k in _features:
            k.description = metadata[k.name]["description"]
            k.feature_id = metadata[k.name]["feature_id"]
            k.created_at = metadata[k.name]["created_at"]
            k.updated_at = metadata[k.name]["updated_at"]

    self.features = _features
    self.avro_schema = _avro_schema

    return self

get_or_create(version=None)

The get_or_create function retrieves an existing feature group or creates a new one based on the provided parameters.

PARAMETER DESCRIPTION
version

The version parameter is an optional argument that specifies the version of the feature

DEFAULT: None

group to retrieve. If a version is provided, the code will load the feature group with that specific version. If no version is provided, the code will load the latest version of the feature group.

RETURNS DESCRIPTION

The method get_or_create returns the instance of the class self after performing some

operations and updating its attributes.

Source code in src/seeknal/featurestore/feature_group.py
@require_workspace
@require_project
def get_or_create(self, version=None):
    """
    The `get_or_create` function retrieves an existing feature group or creates a new one based on the
    provided parameters.

    Args:
      version: The `version` parameter is an optional argument that specifies the version of the feature
    group to retrieve. If a version is provided, the code will load the feature group with that specific
    version. If no version is provided, the code will load the latest version of the feature group.

    Returns:
      The method `get_or_create` returns the instance of the class `self` after performing some
    operations and updating its attributes.
    """

    materialization_params = self.materialization.model_dump(exclude_none=False)
    materialization_params["offline_materialization"]["store"] = (
        asdict(self.materialization.offline_materialization.store)
        if self.materialization.offline_materialization.store is not None
        else None
    )
    materialization_params["online_materialization"]["store"] = (
        asdict(self.materialization.online_materialization.store)
        if self.materialization.online_materialization.store is not None
        else None
    )

    body = {
        "name": self.name,
        "project_id": context.project_id,
        "description": ("" if self.description is None else self.description),
        "offline": self.materialization.offline,
        "online": self.materialization.online,
        "materialization_params": materialization_params,
    }

    feature_group = FeatureGroupRequest.select_by_name(self.name)
    if feature_group is None:
        if self.source is None:
            raise ValueError("source is not set")
        if self.entity is None:
            raise ValueError("Entity is not set")
        if self.features is None:
            raise ValueError("Features are not set")

        if isinstance(self.source, Flow):
            body["flow_id"] = self.source.flow_id
        else:
            body["flow_id"] = None

        req = FeatureGroupRequest(
            body={
                **body,
                "entity_id": self.entity.entity_id,
                "features": [f.to_dict() for f in self.features],
                "avro_schema": self.avro_schema,
            }
        )

        (
            self.feature_group_id,
            features,
            version_obj,
            offline_store_id,
            online_store_id,
        ) = req.save()
        self.version = version_obj.version
        for i in features:
            for j in self.features:
                if i["metadata"]["name"] == j.name:
                    j.feature_id = i["feature_id"]
                    j.created_at = pendulum.instance(i["created_at"]).format(
                        "YYYY-MM-DD HH:mm:ss"
                    )
                    j.updated_at = pendulum.instance(i["updated_at"]).format(
                        "YYYY-MM-DD HH:mm:ss"
                    )
                    break
        self.offline_store_id = offline_store_id
        self.online_store_id = online_store_id

    else:
        logger.warning("Feature group already exists. Loading the feature group.")
        if version is None:
            versions = FeatureGroupRequest.select_version_by_feature_group_id(
                feature_group.id
            )
            self.version = versions[0].version
            version_id = versions[0].id
            self.avro_schema = json.loads(versions[0].avro_schema)
        else:
            version_obj = (
                FeatureGroupRequest.select_by_feature_group_id_and_version(
                    feature_group.id, version
                )
            )
            if version_obj is None:
                raise ValueError(f"Version {version} not found.")
            self.version = version
            version_id = version_obj.id
            self.avro_schema = json.loads(version_obj.avro_schema)

        offline_watermarks = FeatureGroupRequest.select_watermarks_by_version_id(
        feature_group.id, version_id
        )
        if offline_watermarks is not None:
            self.offline_watermarks = list(
                map(
                    lambda x: pendulum.instance(x.date).format(
                        "YYYY-MM-DD HH:mm:SS"
                    ),
                    offline_watermarks,
                )
            )
        if feature_group.online_watermark is not None:
            self.online_watermarks = feature_group.online_watermark.split(",")

        if feature_group.flow_id is not None:
            flow = FlowRequest.select_by_id(feature_group.flow_id)
            self.source = Flow(name=flow.name).get_or_create()
        else:
            self.source = None
        entity = EntityRequest.select_by_id(feature_group.entity_id)
        self.entity = Entity(name=entity.name).get_or_create()
        self.feature_group_id = feature_group.id
        self.id = feature_group.id

        features = FeatureRequest.select_by_feature_group_id_and_version(
            feature_group.id, self.version
        )
        self.features = []
        for i in features:
            self.features.append(
                Feature(
                    name=i.name,
                    feature_id=str(i.id),
                    description=i.description,
                    data_type=i.datatype,
                    online_data_type=i.online_datatype,
                    created_at=pendulum.instance(i.created_at).format(
                        "YYYY-MM-DD HH:mm:ss"
                    ),
                    updated_at=pendulum.instance(i.updated_at).format(
                        "YYYY-MM-DD HH:mm:ss"
                    ),
                )
            )

        # handle materialization
        for key, value in json.loads(feature_group.materialization_params).items():
            if key == "offline_materialization":
                for k, v in value.items():
                    setattr(self.materialization.offline_materialization, k, v)
            elif key == "online_materialization":
                for k, v in value.items():
                    if k == "store":
                        if v is not None:
                            self.materialization.online_materialization.store = (
                                OnlineStore(
                                    kind=OnlineStoreEnum(v["kind"]), value=v["value"]
                                )
                            )
                        else:
                            self.materialization.online_materialization.store = None
                    else:
                        setattr(self.materialization.online_materialization, k, v)
            else:
                setattr(self.materialization, key, value)
        self.offline_store_id = feature_group.offline_store
        self.online_store_id = feature_group.online_store

    # handling load offline store object
    _offline_store = FeatureGroupRequest.get_offline_store_by_id(
        self.offline_store_id
    )
    self.materialization.offline_materialization.store = OfflineStore(
        kind=OfflineStoreEnum(_offline_store.kind), name=_offline_store.name
    )
    if _offline_store.params == "null":
        self.materialization.offline_materialization.store.value = None
    else:
        value_params = json.loads(_offline_store.params)
        if _offline_store.kind == "file":
            self.materialization.offline_materialization.store.value = (
                FeatureStoreFileOutput(
                    path=value_params["path"],
                    kind=FileKindEnum(value_params["kind"]),
                )
            )
        elif _offline_store.kind == "hive_table":
            self.materialization.offline_materialization.store.value = (
                FeatureStoreHiveTableOutput(database=value_params["database"])
            )
        else:
            self.materialization.offline_materialization.store.value = value_params

    return self

update_materialization(offline: Optional[bool] = None, online: Optional[bool] = None, offline_materialization: Optional[OfflineMaterialization] = None, online_materialization: Optional[OnlineMaterialization] = None)

Update the materialization settings for this feature group.

Modifies the materialization configuration and persists the changes to the feature store backend. This allows changing storage settings, TTL values, and enabling/disabling offline or online storage.

PARAMETER DESCRIPTION
offline

Enable or disable offline storage. If None, keeps current setting.

TYPE: Optional[bool] DEFAULT: None

online

Enable or disable online storage. If None, keeps current setting.

TYPE: Optional[bool] DEFAULT: None

offline_materialization

New offline materialization configuration. If None, keeps current setting.

TYPE: Optional[OfflineMaterialization] DEFAULT: None

online_materialization

New online materialization configuration. If None, keeps current setting.

TYPE: Optional[OnlineMaterialization] DEFAULT: None

RETURNS DESCRIPTION
FeatureGroup

The current instance for method chaining.

Example

fg.update_materialization( ... online=True, ... online_materialization=OnlineMaterialization(ttl=2880) ... )

Source code in src/seeknal/featurestore/feature_group.py
def update_materialization(
    self,
    offline: Optional[bool] = None,
    online: Optional[bool] = None,
    offline_materialization: Optional[OfflineMaterialization] = None,
    online_materialization: Optional[OnlineMaterialization] = None,
):
    """Update the materialization settings for this feature group.

    Modifies the materialization configuration and persists the changes
    to the feature store backend. This allows changing storage settings,
    TTL values, and enabling/disabling offline or online storage.

    Args:
        offline: Enable or disable offline storage. If None, keeps current setting.
        online: Enable or disable online storage. If None, keeps current setting.
        offline_materialization: New offline materialization configuration.
            If None, keeps current setting.
        online_materialization: New online materialization configuration.
            If None, keeps current setting.

    Returns:
        FeatureGroup: The current instance for method chaining.

    Example:
        >>> fg.update_materialization(
        ...     online=True,
        ...     online_materialization=OnlineMaterialization(ttl=2880)
        ... )
    """
    if offline is not None:
        self.materialization.offline = offline
    if online is not None:
        self.materialization.online = online
    if offline_materialization is not None:
        self.materialization.offline_materialization = offline_materialization
    if online_materialization is not None:
        self.materialization.online_materialization = online_materialization

    materialization_params = self.materialization.dict(exclude_none=False)
    materialization_params["offline_materialization"]["store"] = (
        asdict(self.materialization.offline_materialization.store)
        if self.materialization.offline_materialization.store is not None
        else None
    )
    materialization_params["online_materialization"]["store"] = (
        asdict(self.materialization.online_materialization.store)
        if self.materialization.online_materialization.store is not None
        else None
    )

    body = {
        "offline": self.materialization.offline,
        "online": self.materialization.online,
        "materialization_params": materialization_params,
        "feature_group_id": self.feature_group_id,
    }

    req = FeatureGroupRequest(
        body={
            **body,
        }
    )

    req.update_materialization()

    return self

list_versions()

List all versions of this feature group.

Returns a list of dictionaries containing version metadata including: - version: The version number - avro_schema: The Avro schema for this version (as dict) - created_at: When the version was created - updated_at: When the version was last updated - feature_count: Number of features in this version

RETURNS DESCRIPTION

List[dict]: A list of version metadata dictionaries, ordered by version number descending (latest first). Returns an empty list if the feature group has not been saved or has no versions.

Example

fg = FeatureGroup(name="user_features").get_or_create() versions = fg.list_versions() for v in versions: ... print(f"Version {v['version']}: {v['feature_count']} features")

Source code in src/seeknal/featurestore/feature_group.py
@require_workspace
@require_project
def list_versions(self):
    """
    List all versions of this feature group.

    Returns a list of dictionaries containing version metadata including:
    - version: The version number
    - avro_schema: The Avro schema for this version (as dict)
    - created_at: When the version was created
    - updated_at: When the version was last updated
    - feature_count: Number of features in this version

    Returns:
        List[dict]: A list of version metadata dictionaries, ordered by version
            number descending (latest first). Returns an empty list if the
            feature group has not been saved or has no versions.

    Example:
        >>> fg = FeatureGroup(name="user_features").get_or_create()
        >>> versions = fg.list_versions()
        >>> for v in versions:
        ...     print(f"Version {v['version']}: {v['feature_count']} features")
    """
    if not hasattr(self, 'feature_group_id') or self.feature_group_id is None:
        # Feature group not saved yet, return empty list
        return []

    versions = FeatureGroupRequest.select_version_by_feature_group_id(
        self.feature_group_id
    )

    if versions is None:
        return []

    result = []
    for v in versions:
        # Parse avro_schema from JSON string
        try:
            avro_schema = json.loads(v.avro_schema) if v.avro_schema else None
        except (json.JSONDecodeError, TypeError):
            avro_schema = None

        # Get feature count for this version
        features = FeatureRequest.select_by_feature_group_id_and_version(
            self.feature_group_id, v.version
        )
        feature_count = len(features) if features else 0

        result.append({
            "version": v.version,
            "avro_schema": avro_schema,
            "created_at": pendulum.instance(v.created_at).format("YYYY-MM-DD HH:mm:ss") if v.created_at else None,
            "updated_at": pendulum.instance(v.updated_at).format("YYYY-MM-DD HH:mm:ss") if v.updated_at else None,
            "feature_count": feature_count,
        })

    return result

get_version(version: int) -> Optional[dict]

Get metadata for a specific version of this feature group.

PARAMETER DESCRIPTION
version

The version number to retrieve.

TYPE: int

RETURNS DESCRIPTION
Optional[dict]

Optional[dict]: A dictionary containing version metadata if found, None if the version doesn't exist. The dictionary includes: - version: The version number - avro_schema: The Avro schema for this version (as dict) - created_at: When the version was created - updated_at: When the version was last updated - feature_count: Number of features in this version

Example

fg = FeatureGroup(name="user_features").get_or_create() v1 = fg.get_version(1) if v1: ... print(f"Version 1 has {v1['feature_count']} features")

Source code in src/seeknal/featurestore/feature_group.py
@require_workspace
@require_project
def get_version(self, version: int) -> Optional[dict]:
    """
    Get metadata for a specific version of this feature group.

    Args:
        version (int): The version number to retrieve.

    Returns:
        Optional[dict]: A dictionary containing version metadata if found,
            None if the version doesn't exist. The dictionary includes:
            - version: The version number
            - avro_schema: The Avro schema for this version (as dict)
            - created_at: When the version was created
            - updated_at: When the version was last updated
            - feature_count: Number of features in this version

    Example:
        >>> fg = FeatureGroup(name="user_features").get_or_create()
        >>> v1 = fg.get_version(1)
        >>> if v1:
        ...     print(f"Version 1 has {v1['feature_count']} features")
    """
    if not hasattr(self, 'feature_group_id') or self.feature_group_id is None:
        # Feature group not saved yet, return None
        return None

    version_obj = FeatureGroupRequest.select_by_feature_group_id_and_version(
        self.feature_group_id, version
    )

    if version_obj is None:
        return None

    # Parse avro_schema from JSON string
    try:
        avro_schema = json.loads(version_obj.avro_schema) if version_obj.avro_schema else None
    except (json.JSONDecodeError, TypeError):
        avro_schema = None

    # Get feature count for this version
    features = FeatureRequest.select_by_feature_group_id_and_version(
        self.feature_group_id, version
    )
    feature_count = len(features) if features else 0

    return {
        "version": version_obj.version,
        "avro_schema": avro_schema,
        "created_at": pendulum.instance(version_obj.created_at).format("YYYY-MM-DD HH:mm:ss") if version_obj.created_at else None,
        "updated_at": pendulum.instance(version_obj.updated_at).format("YYYY-MM-DD HH:mm:ss") if version_obj.updated_at else None,
        "feature_count": feature_count,
    }

compare_versions(from_version: int, to_version: int) -> Optional[dict]

Compare schemas between two versions of this feature group.

Identifies added, removed, and modified features between the two versions by comparing their Avro schemas.

PARAMETER DESCRIPTION
from_version

The base version number to compare from.

TYPE: int

to_version

The target version number to compare to.

TYPE: int

RETURNS DESCRIPTION
Optional[dict]

Optional[dict]: A dictionary containing the comparison result if both versions exist, None if either version doesn't exist or the feature group has not been saved. The dictionary includes: - from_version: The base version number - to_version: The target version number - added: List of field names added in to_version - removed: List of field names removed in to_version - modified: List of dicts with field name and type changes

RAISES DESCRIPTION
ValueError

If from_version equals to_version.

Example

fg = FeatureGroup(name="user_features").get_or_create() diff = fg.compare_versions(1, 2) if diff: ... print(f"Added features: {diff['added']}") ... print(f"Removed features: {diff['removed']}") ... print(f"Modified features: {diff['modified']}")

Source code in src/seeknal/featurestore/feature_group.py
@require_workspace
@require_project
def compare_versions(self, from_version: int, to_version: int) -> Optional[dict]:
    """
    Compare schemas between two versions of this feature group.

    Identifies added, removed, and modified features between the two versions
    by comparing their Avro schemas.

    Args:
        from_version (int): The base version number to compare from.
        to_version (int): The target version number to compare to.

    Returns:
        Optional[dict]: A dictionary containing the comparison result if both
            versions exist, None if either version doesn't exist or the feature
            group has not been saved. The dictionary includes:
            - from_version: The base version number
            - to_version: The target version number
            - added: List of field names added in to_version
            - removed: List of field names removed in to_version
            - modified: List of dicts with field name and type changes

    Raises:
        ValueError: If from_version equals to_version.

    Example:
        >>> fg = FeatureGroup(name="user_features").get_or_create()
        >>> diff = fg.compare_versions(1, 2)
        >>> if diff:
        ...     print(f"Added features: {diff['added']}")
        ...     print(f"Removed features: {diff['removed']}")
        ...     print(f"Modified features: {diff['modified']}")
    """
    if from_version == to_version:
        raise ValueError("from_version and to_version must be different")

    if not hasattr(self, 'feature_group_id') or self.feature_group_id is None:
        # Feature group not saved yet, return None
        return None

    # Fetch both versions
    from_version_obj = FeatureGroupRequest.select_by_feature_group_id_and_version(
        self.feature_group_id, from_version
    )
    to_version_obj = FeatureGroupRequest.select_by_feature_group_id_and_version(
        self.feature_group_id, to_version
    )

    # Check if both versions exist
    if from_version_obj is None:
        raise ValueError(f"Version {from_version} not found for this feature group")
    if to_version_obj is None:
        raise ValueError(f"Version {to_version} not found for this feature group")

    # Get avro_schema JSON strings
    from_schema_json = from_version_obj.avro_schema if from_version_obj.avro_schema else "{}"
    to_schema_json = to_version_obj.avro_schema if to_version_obj.avro_schema else "{}"

    # Compare schemas using FeatureGroupRequest.compare_schemas()
    schema_diff = FeatureGroupRequest.compare_schemas(from_schema_json, to_schema_json)

    return {
        "from_version": from_version,
        "to_version": to_version,
        "added": schema_diff.get("added", []),
        "removed": schema_diff.get("removed", []),
        "modified": schema_diff.get("modified", []),
    }

delete()

Delete this feature group and its associated data.

Removes the feature group from the feature store backend along with any data stored in the offline store. This operation is irreversible.

RETURNS DESCRIPTION
FeatureGroupRequest

The request object used to perform the deletion.

RAISES DESCRIPTION
ValueError

If the feature group has not been saved (no feature_group_id).

Note

This requires an active workspace and project context. The feature group must have been previously saved using get_or_create().

Source code in src/seeknal/featurestore/feature_group.py
@require_workspace
@require_saved
@require_project
def delete(self):
    """Delete this feature group and its associated data.

    Removes the feature group from the feature store backend along with
    any data stored in the offline store. This operation is irreversible.

    Returns:
        FeatureGroupRequest: The request object used to perform the deletion.

    Raises:
        ValueError: If the feature group has not been saved (no feature_group_id).

    Note:
        This requires an active workspace and project context. The feature
        group must have been previously saved using get_or_create().
    """
    offline_store = self.materialization.offline_materialization.store
    offline_store.delete(
        name=self.name, project=context.project_id, entity=self.entity.entity_id
    )

    return FeatureGroupRequest.delete_by_id(self.feature_group_id)

write(feature_start_time: Optional[datetime] = None, feature_end_time: Optional[datetime] = None, output_date_pattern: str = 'yyyyMMdd')

Writes the feature group data to the offline store, using the specified feature start and end times and output date pattern.

PARAMETER DESCRIPTION
feature_start_time

The start time for the feature data. If None, the current date is used.

TYPE: Optional[datetime] DEFAULT: None

feature_end_time

The end time for the feature data. If None, all available data is used.

TYPE: Optional[datetime] DEFAULT: None

output_date_pattern

The output date pattern for the feature data.

TYPE: str DEFAULT: 'yyyyMMdd'

RETURNS DESCRIPTION

None

Source code in src/seeknal/featurestore/feature_group.py
@require_workspace
@require_saved
@require_project
def write(
    self,
    feature_start_time: Optional[datetime] = None,
    feature_end_time: Optional[datetime] = None,
    output_date_pattern: str = "yyyyMMdd",
):
    """
    Writes the feature group data to the offline store, using the specified
    feature start and end times and output date pattern.

    Args:
        feature_start_time (Optional[datetime]): The start time for the feature data.
            If None, the current date is used.
        feature_end_time (Optional[datetime]): The end time for the feature data.
            If None, all available data is used.
        output_date_pattern (str): The output date pattern for the feature data.

    Returns:
        None
    """

    date_now = pendulum.now(tz="UTC").format("YYYY-MM-DD")
    if isinstance(self.source, Flow):
        fg_flow = deepcopy(self.source)
        if self.materialization.event_time_col is None:
            add_date_column = SparkEngineTask().add_stage(
                class_name="tech.mta.seeknal.transformers.AddColumnByExpr",
                params={"expression": f"'{date_now}'", "outputCol": "__date__"},
            )
            if fg_flow.tasks is not None:
                fg_flow.tasks.append(add_date_column)
            else:
                fg_flow.tasks = [add_date_column]
    elif isinstance(self.source, DataFrame):
        if self.materialization.event_time_col is None:
            fg_flow = self.source.withColumn("__date__", F.lit(date_now))
        else:
            fg_flow = self.source
    else:
        raise ValueError("Source only accepts Flow or DataFrame.")

    date_pattern = "yyyy-MM-dd"
    if (
        self.materialization.event_time_col is not None
        and self.materialization.date_pattern is not None
    ):
        date_pattern = self.materialization.date_pattern

    spark = SparkSession.builder.getOrCreate()
    if self.materialization.event_time_col is not None:
        event_time_col = self.materialization.event_time_col
        if isinstance(fg_flow, Flow):
            fg_flow = fg_flow.set_input_date_col(
                date_col=event_time_col, date_pattern=date_pattern
            )
    else:
        event_time_col = "__date__"
    if isinstance(fg_flow, Flow):
        flow_res = fg_flow.run(
            start_date=feature_start_time, end_date=feature_end_time
        )
    elif isinstance(fg_flow, DataFrame):
        if feature_start_time is not None:
            flow_res = fg_flow.filter(
                F.col(event_time_col)
                >= pendulum.instance(feature_start_time).format(
                    date_pattern.upper()
                )
            )
        elif feature_end_time is not None:
            flow_res = fg_flow.filter(
                F.col(event_time_col)
                <= pendulum.instance(feature_end_time).format(date_pattern.upper())
            )
        elif feature_start_time is not None and feature_end_time is not None:
            flow_res = fg_flow.filter(
                (
                    F.col(event_time_col)
                    >= pendulum.instance(feature_start_time).format(
                        date_pattern.upper()
                    )
                )
                & (
                    F.col(event_time_col)
                    <= pendulum.instance(feature_end_time).format(
                        date_pattern.upper()
                    )
                )
            )
        else:
            flow_res = fg_flow
    flow_res = quinn.snake_case_col_names(flow_res)
    # generate pk given entity join_keys
    flow_res = mack.with_md5_cols(
        flow_res, self.entity.join_keys + [event_time_col], "__pk__"
    )

    # getting the date from the flow result
    if self.materialization.event_time_col is not None:
        _date = (
            SparkEngineTask()
            .add_input(dataframe=flow_res)
            .set_date_col(date_col=event_time_col)
            .get_date_available()
        )
        _date = [
            pendulum.parse(i).format(output_date_pattern.upper()) for i in _date
        ]
        date_available = [
            datetime.fromisoformat(pendulum.parse(i).to_datetime_string())
            for i in _date
        ]
    else:
        date_available = [
            datetime.fromisoformat(pendulum.parse(date_now).to_datetime_string())
        ]

    if self.materialization.offline:
        logger.info("writing to offline-store")

        arr_size = len(self.entity.join_keys) + 1
        arr = spark.sparkContext._gateway.new_array(
            self._jvm_gateway.java.lang.String, arr_size
        )
        for idx, i in enumerate(self.entity.join_keys):
            arr[idx] = i
        arr[arr_size - 1] = "__pk__"

        project_name = ProjectRequest.select_by_id(context.project_id).name
        fs_serialize = (
            self._jvm_gateway.tech.mta.seeknal.connector.serde.FeatureStoreSerDe()
            .setEventTimeCol(event_time_col)
            .setDatePattern(date_pattern)
            .setEntity(self.entity.name)
            .setProject(project_name)
            .setFeatureGroup(self.name)
            .setKeyCols(arr)
            .setSerialize(True)
        )

        res = fs_serialize.transform(flow_res._jdf)
        res_df = DataFrame(res, self._jvm_gateway._wrapped)

        # add watermarks
        version_obj = FeatureGroupRequest.select_by_feature_group_id_and_version(
            self.feature_group_id, self.version
        )
        if version_obj is None:
            raise ValueError(f"Version {self.version} not found.")
        req = FeatureGroupRequest(
            body={
                "feature_group_id": self.feature_group_id,
                "feature_group_version_id": version_obj.id,
            }
        )

        req.add_offline_watermarks(date_available)
        offline_watermarks = [
            w.date
            for w in FeatureGroupRequest.select_watermarks_by_version_id(
                self.feature_group_id, version_obj.id
            )
        ]

        # writing to offline-store
        offline_store = self.materialization.offline_materialization.store
        _start_date = (
            date_now
            if feature_start_time is None
            else pendulum.instance(feature_start_time).format(date_pattern.upper())
        )
        _end_date = (
            "none"
            if feature_end_time is None
            else pendulum.instance(feature_end_time).format(date_pattern.upper())
        )
        offline_mat_result = offline_store(
            result=res_df,
            name=self.name,
            project=context.project_id,
            entity=self.entity.entity_id,
            mode=self.materialization.offline_materialization.mode,
            start_date=_start_date,
            end_date=_end_date,
            version=self.version,
            latest_watermark=max(offline_watermarks),
            ttl=self.materialization.offline_materialization.ttl,
        )

        # Capture Iceberg snapshot metadata
        if offline_mat_result and offline_mat_result.get("storage_type") == "iceberg":
            req.add_iceberg_watermark(
                snapshot_id=offline_mat_result.get("snapshot_id"),
                table=offline_mat_result.get("table"),
                namespace=offline_mat_result.get("namespace"),
                row_count=offline_mat_result.get("num_rows"),
            )
            logger.info(
                f"Iceberg snapshot created: {offline_mat_result.get('snapshot_id')[:8]} "
                f"for table {offline_mat_result.get('namespace')}.{offline_mat_result.get('table')}"
            )

    if self.materialization.online:
        logger.info("Writing to online-store.")
        if self.materialization.offline:
            hist = HistoricalFeatures(lookups=[FeatureLookup(source=self)])
            hist.using_latest().serve(
                target=self.materialization.online_materialization.store,
                ttl=timedelta(days=self.materialization.online_materialization.ttl),
            )
        else:
            flow_res = flow_res.withColumn(
                "event_time", F.to_timestamp(F.col(event_time_col), date_pattern)
            ).drop(event_time_col)
            if self.materialization.online_materialization.ttl is not None:
                _timedelta = timedelta(
                    minutes=self.materialization.online_materialization.ttl
                )
            else:
                _timedelta = None
            OnlineFeatures(
                lookup_key=self.entity,
                lookups=[FeatureLookup(source=self)],
                ttl=_timedelta,
                online_store=self.materialization.online_materialization.store,
                dataframe=flow_res,
            )

FeatureLookup

The FeatureLookup class specifies how to look up features from a feature group, with optional feature selection and exclusion.

A class that represents a feature lookup operation in a feature store.

ATTRIBUTE DESCRIPTION
source

The feature store to perform the lookup on.

TYPE: FeatureStore

features

A list of feature names to include in the lookup. If None, all features in the store will be included.

TYPE: Optional[List[str]]

exclude_features

A list of feature names to exclude from the lookup. If None, no features will be excluded.

TYPE: Optional[List[str]]


Materialization

The Materialization class configures how features are materialized to offline and online stores.

Bases: BaseModel

Materialization options

ATTRIBUTE DESCRIPTION
event_time_col

Specify which column that contains event time. Default to None.

TYPE: str

date_pattern

Date pattern that use in event_time_col. Default to "yyyy-MM-dd".

TYPE: str

offline

Set the feature group should be stored in offline-store. Default to True.

TYPE: bool

online

Set the feature group should be stored in online-store. Default to False.

TYPE: bool

serving_ttl_days

Look back window for features defined at the online-store. This parameters determines how long features will live in the online store. The unit is in days. Shorter TTLs improve performance and reduce computation. Default to 1. For example, if we set TTLs as 1 then only one day data available in online-store

TYPE: int

force_update_online

force to update the data in online-store. This will not consider to check whether the data going materialized newer than the data that already stored in online-store. Default to False.

TYPE: bool

online_write_mode

Write mode when materialize to online-store. Default to "Append"

TYPE: OnlineWriteModeEnum

schema_version

Determine which schema version for the feature group. Default to None.

TYPE: List[dict]


HistoricalFeatures

The HistoricalFeatures class retrieves historical feature data with point-in-time correctness for training ML models.

A class for retrieving historical features from a feature store.

ATTRIBUTE DESCRIPTION
lookups

A list of FeatureLookup objects representing the features to retrieve.

TYPE: List[FeatureLookup]

Functions

using_spine(spine: pd.DataFrame, date_col: Optional[str] = None, offset: int = 0, length: Optional[int] = None, keep_cols: Optional[List[str]] = None)

Adds a spine DataFrame to the feature store serving pipeline.

PARAMETER DESCRIPTION
spine

The spine DataFrame to add to the pipeline.

TYPE: DataFrame

date_col

The name of the column containing the date to use for point-in-time joins. If not provided, point-in-time joins will not be performed.

TYPE: str DEFAULT: None

offset

number of days to use as a reference point for join. E.g. offset=3, how='past' means that features dates equal (and older than) to three days before application date will be joined. Defaults to 0.

TYPE: int DEFAULT: 0

length

when how is not equal to 'point in time' limit the period of feature dates to join. Defaults to no limit.

TYPE: int DEFAULT: None

keep_cols

A list of column names to keep from the spine DataFrame. If not provided, none columns will be kept.

TYPE: List[str] DEFAULT: None

Source code in src/seeknal/featurestore/feature_group.py
def using_spine(
    self,
    spine: pd.DataFrame,
    date_col: Optional[str] = None,
    offset: int = 0,
    length: Optional[int] = None,
    keep_cols: Optional[List[str]] = None,
):
    """
    Adds a spine DataFrame to the feature store serving pipeline.

    Args:
        spine (pd.DataFrame): The spine DataFrame to add to the pipeline.
        date_col (str, optional): The name of the column containing the date to use for point-in-time joins.
            If not provided, point-in-time joins will not be performed.
        offset (int, optional): number of days to use as a reference point for join.
            E.g. offset=3, how='past' means that features dates equal (and older than) to three days before application date will be joined. Defaults to 0.
        length (int, optional): when how is not equal to 'point in time' limit the period of feature dates to join. Defaults to no limit.
        keep_cols (List[str], optional): A list of column names to keep from the spine DataFrame.
            If not provided, none columns will be kept.

    """
    spine_columns = list(spine.keys())
    for i in self.lookup_key.join_keys:
        if i not in spine_columns:
            raise ValueError("Spine DataFrame must contain all join keys")
    spine_df = self.spark.createDataFrame(spine)

    # Validate join_keys before using them in SQL expressions
    for join_key in self.lookup_key.join_keys:
        validate_column_name(join_key)

    if date_col is not None:
        point_in_time = PointInTime(
            spine=spine_df,
            offset=offset,
            length=length,
            feature_date_format="yyyy-MM-dd HH:mm:SS",
            app_date=date_col,
            app_date_format="yyyy-MM-dd",
            col_id=self.lookup_key.join_keys[0],
            spine_col_id=self.lookup_key.join_keys[0],
            keep_cols=keep_cols,
        )
        self.flow.add_stage(transformer=point_in_time)
    else:
        selector = ["a.*"]
        if keep_cols is not None:
            selector += keep_cols
        tables = [
            TableJoinDef(
                table=spine_df,
                joinType=JoinType.INNER,
                alias="b",
                joinExpression="a.{} = b.{}".format(
                    self.lookup_key.join_keys[0], self.lookup_key.join_keys[0]
                ),
            )
        ]
        join = JoinTablesByExpr(tables=tables, select_stm=",".join(selector))
        self.flow.add_stage(transformer=join)
    return self

to_dataframe(feature_start_time: Optional[datetime] = None, feature_end_time: Optional[datetime] = None) -> DataFrame

Returns a pandas DataFrame containing the transformed feature data within the specified time range.

PARAMETER DESCRIPTION
feature_start_time

The start time of the time range to filter the feature data.

TYPE: Optional[datetime] DEFAULT: None

feature_end_time

The end time of the time range to filter the feature data.

TYPE: Optional[datetime] DEFAULT: None

Source code in src/seeknal/featurestore/feature_group.py
def to_dataframe(
    self,
    feature_start_time: Optional[datetime] = None,
    feature_end_time: Optional[datetime] = None,
) -> DataFrame:
    """
    Returns a pandas DataFrame containing the transformed feature data within the specified time range.

    Args:
        feature_start_time (Optional[datetime]): The start time of the time range to filter the feature data.
        feature_end_time (Optional[datetime]): The end time of the time range to filter the feature data.
    """
    df = self.flow.transform(spark=self.spark)
    return self._filter_by_start_end_time(df, feature_start_time, feature_end_time)

OnlineFeatures

The OnlineFeatures class serves features in real-time for model inference with low-latency access patterns.


GetLatestTimeStrategy

The GetLatestTimeStrategy class defines strategies for retrieving the latest feature values based on timestamp.

Bases: Enum


OfflineStore

The OfflineStore class configures offline storage backends for batch feature storage. Supports Hive tables and Delta files.

Configuration for offline feature store storage.

ATTRIBUTE DESCRIPTION
value

The storage configuration (path for FILE, database for HIVE_TABLE). A security warning will be logged if a file path is in an insecure location (e.g., /tmp).

TYPE: Optional[Union[str, FeatureStoreFileOutput, FeatureStoreHiveTableOutput, IcebergStoreOutput]]

kind

The storage type (FILE or HIVE_TABLE).

TYPE: OfflineStoreEnum

name

Optional name for this offline store configuration.

TYPE: Optional[str]

Functions

get_or_create()

Retrieve an existing offline store or create a new one.

If an offline store with the specified name exists, it is retrieved and its configuration is loaded. Otherwise, a new offline store is created with the current configuration.

RETURNS DESCRIPTION
OfflineStore

The current instance with id populated.

Note

The store name defaults to "default" if not specified.

Source code in src/seeknal/featurestore/featurestore.py
def get_or_create(self):
    """Retrieve an existing offline store or create a new one.

    If an offline store with the specified name exists, it is retrieved and
    its configuration is loaded. Otherwise, a new offline store is created
    with the current configuration.

    Returns:
        OfflineStore: The current instance with id populated.

    Note:
        The store name defaults to "default" if not specified.
    """
    if self.name is None:
        name = "default"
    else:
        name = self.name
    offline_store = FeatureGroupRequest.get_offline_store_by_name(self.kind, name)
    if offline_store is None:
        offline_store = FeatureGroupRequest.save_offline_store(
            self.kind, self.value, name
        )
        self.id = offline_store.id
    else:
        self.id = offline_store.id
        self.kind = offline_store.kind
        value_params = json.loads(offline_store.params)
        if self.kind == OfflineStoreEnum.HIVE_TABLE:
            if self.value is not None:
                self.value = FeatureStoreHiveTableOutput(**value_params)
        elif self.kind == OfflineStoreEnum.FILE:
            if self.value is not None:
                self.value = FeatureStoreFileOutput(
                    path=value_params["path"],
                    kind=FileKindEnum(value_params["kind"]),
                )
        elif self.kind == OfflineStoreEnum.ICEBERG:
            if self.value is not None:
                self.value = IcebergStoreOutput(**value_params)
        else:
            self.value = value_params
    return self

list() staticmethod

List all registered offline stores.

Displays a formatted table of all offline stores with their names, kinds, and configuration values. If no stores are found, displays an appropriate message.

RETURNS DESCRIPTION
None

Output is printed to the console.

Source code in src/seeknal/featurestore/featurestore.py
@staticmethod
def list():
    """List all registered offline stores.

    Displays a formatted table of all offline stores with their names,
    kinds, and configuration values. If no stores are found, displays
    an appropriate message.

    Returns:
        None: Output is printed to the console.
    """
    offline_stores = FeatureGroupRequest.get_offline_stores()
    if offline_stores:
        offline_stores = [
            {
                "name": offline_store.name,
                "kind": offline_store.kind,
                "value": offline_store.params,
            }
            for offline_store in offline_stores
        ]
        typer.echo(tabulate(offline_stores, headers="keys", tablefmt="github"))
    else:
        typer.echo("No offline stores found.")

delete(spark: Optional[SparkSession] = None, *args, **kwargs) -> bool

Delete storage for a feature group from the offline store.

For FILE type: Deletes the directory containing the delta table. For HIVE_TABLE type: Drops the Hive table using Spark SQL.

PARAMETER DESCRIPTION
spark

SparkSession instance (required for HIVE_TABLE, optional for FILE).

TYPE: Optional[SparkSession] DEFAULT: None

**kwargs

Must include 'project' and 'entity' to construct the table name.

DEFAULT: {}

RETURNS DESCRIPTION
bool

True if deletion was successful or resource didn't exist.

TYPE: bool

RAISES DESCRIPTION
ValueError

If required kwargs (project, entity) are missing.

Source code in src/seeknal/featurestore/featurestore.py
def delete(
    self,
    spark: Optional[SparkSession] = None,
    *args,
    **kwargs,
) -> bool:
    """Delete storage for a feature group from the offline store.

    For FILE type: Deletes the directory containing the delta table.
    For HIVE_TABLE type: Drops the Hive table using Spark SQL.

    Args:
        spark: SparkSession instance (required for HIVE_TABLE, optional for FILE).
        **kwargs: Must include 'project' and 'entity' to construct the table name.

    Returns:
        bool: True if deletion was successful or resource didn't exist.

    Raises:
        ValueError: If required kwargs (project, entity) are missing.
    """
    project = kwargs.get("project")
    entity = kwargs.get("entity")

    if project is None or entity is None:
        raise ValueError("Both 'project' and 'entity' are required for deletion")

    # Validate project and entity parameters
    validate_table_name(project)
    validate_table_name(entity)
    table_name = "fg_{}__{}".format(project, entity)

    match self.kind:
        case OfflineStoreEnum.FILE:
            if self.value is None:
                base_path = CONFIG_BASE_URL
                path = os.path.join(base_path, "data", table_name)
            else:
                if isinstance(self.value, FeatureStoreFileOutput):
                    base_path = self.value.path
                elif isinstance(self.value, dict):
                    base_path = self.value.get("path", CONFIG_BASE_URL)
                else:
                    base_path = self.value
                path = os.path.join(base_path, table_name)

            if os.path.exists(path):
                shutil.rmtree(path)
                logger.info(f"Deleted offline store files at: {path}")
            else:
                logger.info(f"Offline store path does not exist: {path}")
            return True

        case OfflineStoreEnum.HIVE_TABLE:
            if spark is None:
                spark = SparkSession.builder.getOrCreate()

            if self.value is None:
                database = "seeknal"
            elif isinstance(self.value, FeatureStoreHiveTableOutput):
                database = self.value.database
            elif isinstance(self.value, dict):
                database = self.value.get("database", "seeknal")
            else:
                database = "seeknal"

            # Validate database name before use in SQL
            validate_database_name(database)

            full_table_name = "{}.{}".format(database, table_name)
            if spark.catalog.tableExists(full_table_name):
                spark.sql(f"DROP TABLE IF EXISTS {full_table_name}")
                logger.info(f"Dropped Hive table: {full_table_name}")
            else:
                logger.info(f"Hive table does not exist: {full_table_name}")
            return True

        case OfflineStoreEnum.ICEBERG:
            # Iceberg table deletion
            if self.value is None:
                logger.warning("Iceberg configuration required for deletion")
                return False

            # Get configuration
            if isinstance(self.value, IcebergStoreOutput):
                iceberg_config = self.value
            elif isinstance(self.value, dict):
                iceberg_config = IcebergStoreOutput(**self.value)
            else:
                logger.warning(f"Invalid Iceberg configuration type: {type(self.value)}")
                return False

            # Import Iceberg operations
            from seeknal.workflow.materialization.operations import DuckDBIcebergExtension
            from seeknal.workflow.materialization.profile_loader import ProfileLoader
            from seeknal.workflow.materialization.config import MaterializationConfig, ConfigurationError

            # Load profile for catalog configuration
            profile_loader = ProfileLoader()
            try:
                profile_config = profile_loader.load_profile()
            except (ConfigurationError, Exception) as e:
                logger.warning(f"Could not load materialization profile: {e}. Using defaults.")
                profile_config = MaterializationConfig()

            # Get catalog configuration
            catalog_uri = profile_config.catalog.uri if profile_config.catalog.uri else ""
            warehouse_path = (
                iceberg_config.warehouse or
                profile_config.catalog.warehouse
            )
            bearer_token = profile_config.catalog.bearer_token

            # Validate required configuration
            if not catalog_uri or not warehouse_path:
                logger.warning("Catalog URI and warehouse path required for Iceberg deletion")
                return False

            # Validate table name
            validate_table_name(iceberg_config.table)
            validate_table_name(iceberg_config.namespace)

            # Create DuckDB connection and drop table
            import duckdb
            con = duckdb.connect(":memory:")

            try:
                # Load Iceberg extension
                DuckDBIcebergExtension.load_extension(con)

                # Setup REST catalog
                catalog_name = "seeknal_catalog"
                DuckDBIcebergExtension.create_rest_catalog(
                    con=con,
                    catalog_name=catalog_name,
                    uri=catalog_uri,
                    warehouse_path=warehouse_path,
                    bearer_token=bearer_token,
                )

                # Create table reference and drop
                table_ref = f"{catalog_name}.{iceberg_config.namespace}.{iceberg_config.table}"
                con.execute(f"DROP TABLE IF EXISTS {table_ref}")
                logger.info(f"Dropped Iceberg table: {table_ref}")
                return True

            except Exception as e:
                logger.error(f"Failed to drop Iceberg table: {e}")
                return False
            finally:
                con.close()

        case _:
            logger.warning(f"Unknown offline store kind: {self.kind}")
            return False

OnlineStore

The OnlineStore class configures online storage backends for real-time feature serving.

Configuration for online feature store.

ATTRIBUTE DESCRIPTION
value

The storage configuration (file path or hive table). A security warning will be logged if a file path is in an insecure location (e.g., /tmp).

TYPE: Optional[Union[str, FeatureStoreFileOutput, FeatureStoreHiveTableOutput]]

kind

Type of online store (FILE or HIVE_TABLE).

TYPE: OnlineStoreEnum

name

Optional name for the store.

TYPE: Optional[str]

Functions

delete(*args, **kwargs)

Delete feature data from the online store.

Removes the directory containing the feature data for the specified project and feature name.

PARAMETER DESCRIPTION
*args

Additional positional arguments (unused).

DEFAULT: ()

**kwargs

Keyword arguments including: - project (str): Project name for file naming. - name (str): Feature name for file naming.

DEFAULT: {}

RETURNS DESCRIPTION
bool

True if the deletion was successful or if the path did not exist.

Source code in src/seeknal/featurestore/featurestore.py
def delete(self, *args, **kwargs):
    """Delete feature data from the online store.

    Removes the directory containing the feature data for the specified
    project and feature name.

    Args:
        *args: Additional positional arguments (unused).
        **kwargs: Keyword arguments including:
            - project (str): Project name for file naming.
            - name (str): Feature name for file naming.

    Returns:
        bool: True if the deletion was successful or if the path
            did not exist.
    """
    match self.kind:
        case OnlineStoreEnum.FILE:
            name = kwargs.get("name")
            file_name_complete = "fs_{}__{}".format(kwargs["project"], name)

            if self.value == "null" or self.value is None:
                base_path = CONFIG_BASE_URL
                path = os.path.join(base_path, "data", file_name_complete)
            else:
                base_path = self.value["path"]
                path = os.path.join(base_path, file_name_complete)
            if os.path.exists(path):
                shutil.rmtree(path)
            return True

Feature

The Feature class represents an individual feature definition with name, data type, and fill null handling.

Bases: BaseModel

Define a Feature.

ATTRIBUTE DESCRIPTION
name

Feature name.

TYPE: str

feature_id

Feature ID (assigned by seeknal).

TYPE: Optional[str]

description

Feature description.

TYPE: Optional[str]

data_type

Data type for the feature.

TYPE: Optional[str]

online_data_type

Data type when stored in online-store.

TYPE: Optional[str]

created_at

Creation timestamp.

TYPE: Optional[str]

updated_at

Last update timestamp.

TYPE: Optional[str]

Functions

to_dict()

Convert the feature definition to a dictionary representation.

Creates a dictionary suitable for API requests with metadata and data type information.

RETURNS DESCRIPTION
dict

Dictionary with structure: - metadata: dict containing 'name' and optionally 'description' - datatype: The feature's data type - onlineDatatype: The feature's online store data type

Source code in src/seeknal/featurestore/featurestore.py
def to_dict(self):
    """Convert the feature definition to a dictionary representation.

    Creates a dictionary suitable for API requests with metadata and
    data type information.

    Returns:
        dict: Dictionary with structure:
            - metadata: dict containing 'name' and optionally 'description'
            - datatype: The feature's data type
            - onlineDatatype: The feature's online store data type
    """
    _dict = {
        "metadata": {"name": self.name},
        "datatype": self.data_type,
        "onlineDatatype": self.online_data_type,
    }
    if self.description is not None:
        _dict["metadata"]["description"] = self.description
    return _dict

FillNull

The FillNull class defines how null values should be handled for features.

Bases: BaseModel

Configuration for filling null values in columns.

ATTRIBUTE DESCRIPTION
value

Value to use for filling nulls.

TYPE: str

dataType

Data type for the value (e.g., 'double', 'string').

TYPE: str

columns

Optional list of columns to fill. If None, applies to all columns.

TYPE: Optional[List[str]]

Functions

to_dict()

Convert the configuration to a dictionary representation.

RETURNS DESCRIPTION
dict

Dictionary containing non-None values from the model.

Source code in src/seeknal/featurestore/featurestore.py
def to_dict(self):
    """Convert the configuration to a dictionary representation.

    Returns:
        dict: Dictionary containing non-None values from the model.
    """
    return {k: v for k, v in self.model_dump().items() if v is not None}

Storage Enums

Enumerations for storage backend configuration.

OfflineStoreEnum

Bases: str, Enum

Enumeration of supported offline storage types.

ATTRIBUTE DESCRIPTION
HIVE_TABLE

Store features as a Hive table in a database.

FILE

Store features as files on the filesystem (e.g., Delta format).

ICEBERG

Store features in Apache Iceberg tables with ACID transactions, time travel, and cloud storage compatibility.

OnlineStoreEnum

Bases: str, Enum

Enumeration of supported online storage types.

ATTRIBUTE DESCRIPTION
HIVE_TABLE

Store features as a Hive table for online serving.

FILE

Store features as Parquet files for online serving.

FileKindEnum

Bases: str, Enum

Enumeration of supported file formats for feature storage.

ATTRIBUTE DESCRIPTION
DELTA

Delta Lake format, providing ACID transactions and versioning.


Output Configurations

Classes for configuring feature store output destinations.

FeatureStoreHiveTableOutput

Configuration for Hive table-based feature store output.

ATTRIBUTE DESCRIPTION
database

The Hive database name where features will be stored.

TYPE: str

Functions

to_dict()

Convert the configuration to a dictionary representation.

RETURNS DESCRIPTION
dict

Dictionary with 'database' key.

Source code in src/seeknal/featurestore/featurestore.py
def to_dict(self):
    """Convert the configuration to a dictionary representation.

    Returns:
        dict: Dictionary with 'database' key.
    """
    return {"database": self.database}

FeatureStoreFileOutput

Configuration for file-based feature store output.

ATTRIBUTE DESCRIPTION
path

The filesystem path for storing feature data. A security warning will be logged if this path is in an insecure location (e.g., /tmp).

TYPE: str

kind

The file format to use (default: DELTA).

TYPE: FileKindEnum

Functions

to_dict()

Convert the configuration to a dictionary representation.

RETURNS DESCRIPTION
dict

Dictionary with 'path' and 'kind' keys.

Source code in src/seeknal/featurestore/featurestore.py
def to_dict(self):
    """Convert the configuration to a dictionary representation.

    Returns:
        dict: Dictionary with 'path' and 'kind' keys.
    """
    return {"path": self.path, "kind": self.kind.value}

Module Reference

Complete module reference for the featurestore package.

Feature store module for managing and serving ML features.

This module provides the core feature store functionality for Seeknal, enabling storage, management, and serving of machine learning features for both offline (batch) and online (real-time) use cases.

Key Components
  • FeatureGroup: Define and manage groups of features with customizable materialization options for offline and online storage.
  • FeatureLookup: Specify feature lookups from feature groups with optional feature selection and exclusion.
  • HistoricalFeatures: Retrieve historical feature data with point-in-time correctness for training ML models.
  • OnlineFeatures: Serve features in real-time for model inference with low-latency access patterns.
  • OfflineStore: Configure offline storage backends (Hive tables or Delta files).
  • OnlineStore: Configure online storage backends for real-time serving.
Storage Backends
  • Hive Tables: Managed table storage with SQL access.
  • Delta Files: File-based storage with ACID transactions and time travel.
Typical Usage
from seeknal.featurestore import FeatureGroup, FeatureLookup
from seeknal.featurestore import HistoricalFeatures, OnlineFeatures

# Define a feature group
feature_group = FeatureGroup(
    name="user_features",
    entity=user_entity,
    materialization=Materialization(
        offline=True,
        online=True,
    ),
)

# Create and materialize features
feature_group.set_flow(my_flow).set_features().get_or_create()
feature_group.write()

# Retrieve historical features for training
historical = HistoricalFeatures(
    lookups=[FeatureLookup(source=feature_group)]
)
training_df = historical.using_spine(spine_df).to_dataframe()

# Serve features online for inference
online = OnlineFeatures(
    lookup_key=user_entity,
    lookups=[FeatureLookup(source=feature_group)],
)
features = online.get_features(keys=[{"user_id": "123"}])
See Also

seeknal.entity: Entity definitions for feature store join keys. seeknal.flow: Data flow definitions for feature transformations. seeknal.tasks: Task definitions for data processing pipelines.

Classes

Modules

duckdbengine

DuckDB-based Feature Store implementation.

This module provides a feature store implementation using DuckDB instead of Spark, enabling in-process feature engineering and serving without distributed infrastructure.

Classes

OfflineStoreDuckDB(value: Optional[Union[str, FeatureStoreFileOutput]] = None, kind: OfflineStoreEnum = OfflineStoreEnum.PARQUET, name: Optional[str] = None, connection: Optional[duckdb.DuckDBPyConnection] = None) dataclass

DuckDB-based offline feature store.

Stores features in DuckDB tables or Parquet files with metadata tracking. Provides ACID-like guarantees through atomic file operations and metadata.

Functions
write(df: pd.DataFrame, project: str, entity: str, name: str, mode: str = 'overwrite', start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, **kwargs) -> None

Write features to offline store.

PARAMETER DESCRIPTION
df

Features to write

TYPE: DataFrame

project

Project name

TYPE: str

entity

Entity name

TYPE: str

name

Feature group name

TYPE: str

mode

Write mode - 'overwrite', 'append', or 'merge'

TYPE: str DEFAULT: 'overwrite'

start_date

Start date for this batch

TYPE: Optional[datetime] DEFAULT: None

end_date

End date for this batch

TYPE: Optional[datetime] DEFAULT: None

Source code in src/seeknal/featurestore/duckdbengine/featurestore.py
def write(
    self,
    df: pd.DataFrame,
    project: str,
    entity: str,
    name: str,
    mode: str = "overwrite",
    start_date: Optional[datetime] = None,
    end_date: Optional[datetime] = None,
    **kwargs
) -> None:
    """Write features to offline store.

    Args:
        df: Features to write
        project: Project name
        entity: Entity name
        name: Feature group name
        mode: Write mode - 'overwrite', 'append', or 'merge'
        start_date: Start date for this batch
        end_date: End date for this batch
    """
    if df is None or len(df) == 0:
        logger.warning("Empty DataFrame, skipping write")
        return

    table_dir = self._get_table_path(project, entity, name)
    metadata = self._load_metadata(table_dir)

    # Generate version ID
    version = datetime.now().strftime("%Y%m%d_%H%M%S")

    if mode == "overwrite":
        self._write_overwrite(df, table_dir, metadata, version, start_date, end_date)
    elif mode == "append":
        self._write_append(df, table_dir, metadata, version, start_date, end_date)
    elif mode == "merge":
        self._write_merge(df, table_dir, metadata, version, start_date, end_date)
    else:
        raise ValueError(f"Unknown mode: {mode}")

    # Update metadata
    metadata["versions"].append({
        "version": version,
        "start_date": str(start_date) if start_date else None,
        "end_date": str(end_date) if end_date else None,
        "mode": mode,
        "rows": len(df),
        "timestamp": datetime.now().isoformat()
    })

    # Track watermarks
    if start_date:
        watermark = str(start_date)
        if watermark not in metadata["watermarks"]:
            metadata["watermarks"].append(watermark)
    if end_date and end_date != "none":
        watermark = str(end_date)
        if watermark not in metadata["watermarks"]:
            metadata["watermarks"].append(watermark)

    metadata["watermarks"] = sorted(set(metadata["watermarks"]))

    # Save schema
    metadata["schema"] = {col: str(dtype) for col, dtype in df.dtypes.items()}

    self._save_metadata(table_dir, metadata)
    logger.info(f"Wrote {len(df)} rows to {table_dir} (mode={mode}, version={version})")
read(project: str, entity: str, name: str, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, **kwargs) -> pd.DataFrame

Read features from offline store.

PARAMETER DESCRIPTION
project

Project name

TYPE: str

entity

Entity name

TYPE: str

name

Feature group name

TYPE: str

start_date

Filter start date

TYPE: Optional[datetime] DEFAULT: None

end_date

Filter end date

TYPE: Optional[datetime] DEFAULT: None

RETURNS DESCRIPTION
DataFrame

DataFrame with features

Source code in src/seeknal/featurestore/duckdbengine/featurestore.py
def read(
    self,
    project: str,
    entity: str,
    name: str,
    start_date: Optional[datetime] = None,
    end_date: Optional[datetime] = None,
    **kwargs
) -> pd.DataFrame:
    """Read features from offline store.

    Args:
        project: Project name
        entity: Entity name
        name: Feature group name
        start_date: Filter start date
        end_date: Filter end date

    Returns:
        DataFrame with features
    """
    table_dir = self._get_table_path(project, entity, name)
    parquet_path = os.path.join(table_dir, "data.parquet")

    if not os.path.exists(parquet_path):
        raise FileNotFoundError(f"Feature group not found: {name}")

    # Read parquet file
    df = pd.read_parquet(parquet_path)

    # Apply date filters if provided
    if start_date and 'event_time' in df.columns:
        df = df[df['event_time'] >= start_date]

    if end_date and end_date != "none" and 'event_time' in df.columns:
        df = df[df['event_time'] <= end_date]

    logger.info(f"Read {len(df)} rows from {table_dir}")
    return df
get_watermarks(project: str, entity: str, name: str) -> list

Get watermarks for a feature group.

Source code in src/seeknal/featurestore/duckdbengine/featurestore.py
def get_watermarks(self, project: str, entity: str, name: str) -> list:
    """Get watermarks for a feature group."""
    table_dir = self._get_table_path(project, entity, name)
    metadata = self._load_metadata(table_dir)
    return metadata.get("watermarks", [])
delete(project: str, entity: str, name: str) -> bool

Delete a feature group from the offline store.

Removes all data files and metadata associated with the feature group.

PARAMETER DESCRIPTION
project

Project name

TYPE: str

entity

Entity name

TYPE: str

name

Feature group name

TYPE: str

RETURNS DESCRIPTION
bool

True if deletion was successful, False otherwise

Source code in src/seeknal/featurestore/duckdbengine/featurestore.py
def delete(self, project: str, entity: str, name: str) -> bool:
    """Delete a feature group from the offline store.

    Removes all data files and metadata associated with the feature group.

    Args:
        project: Project name
        entity: Entity name
        name: Feature group name

    Returns:
        True if deletion was successful, False otherwise
    """
    if isinstance(self.value, dict):
        base_path = self.value.get("path", "/tmp/feature_store")
    else:
        base_path = "/tmp/feature_store"

    # Build path to feature group directory
    table_dir = os.path.join(base_path, project, entity, name)

    if os.path.exists(table_dir):
        shutil.rmtree(table_dir)
        logger.info(f"Deleted offline feature group at {table_dir}")
        return True
    else:
        logger.warning(f"Offline feature group directory not found: {table_dir}")
        return False
OnlineStoreDuckDB(value: Optional[Union[str, FeatureStoreFileOutput]] = None, kind: OnlineStoreEnum = OnlineStoreEnum.DUCKDB_TABLE, name: Optional[str] = None, connection: Optional[duckdb.DuckDBPyConnection] = None) dataclass

DuckDB-based online feature store.

Stores features for low-latency serving using DuckDB tables.

Functions
write(df: pd.DataFrame, table_name: str, **kwargs) -> None

Write features to online store.

PARAMETER DESCRIPTION
df

Features to write

TYPE: DataFrame

table_name

Name of the online table

TYPE: str

Source code in src/seeknal/featurestore/duckdbengine/featurestore.py
def write(
    self,
    df: pd.DataFrame,
    table_name: str,
    **kwargs
) -> None:
    """Write features to online store.

    Args:
        df: Features to write
        table_name: Name of the online table
    """
    conn = self._get_connection()

    # Drop table if exists and recreate
    conn.execute(f"DROP TABLE IF EXISTS {table_name}")
    conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM df")

    logger.info(f"Wrote {len(df)} rows to online table {table_name}")
read(table_name: str, keys: Optional[list] = None, **kwargs) -> pd.DataFrame

Read features from online store.

PARAMETER DESCRIPTION
table_name

Name of the online table

TYPE: str

keys

Filter by specific keys (list of dicts with key-value pairs)

TYPE: Optional[list] DEFAULT: None

RETURNS DESCRIPTION
DataFrame

DataFrame with features

Source code in src/seeknal/featurestore/duckdbengine/featurestore.py
def read(
    self,
    table_name: str,
    keys: Optional[list] = None,
    **kwargs
) -> pd.DataFrame:
    """Read features from online store.

    Args:
        table_name: Name of the online table
        keys: Filter by specific keys (list of dicts with key-value pairs)

    Returns:
        DataFrame with features
    """
    conn = self._get_connection()

    if keys:
        # Build WHERE clause from keys
        conditions = []
        for key_dict in keys:
            key_conditions = [f"{k} = '{v}'" for k, v in key_dict.items()]
            conditions.append("(" + " AND ".join(key_conditions) + ")")

        where_clause = " OR ".join(conditions)
        query = f"SELECT * FROM {table_name} WHERE {where_clause}"
    else:
        query = f"SELECT * FROM {table_name}"

    df = conn.execute(query).df()
    logger.info(f"Read {len(df)} rows from online table {table_name}")
    return df
delete(name: str, project: str, entity: Optional[str] = None) -> bool

Delete an online table from the store.

Removes the DuckDB table and any associated parquet files.

PARAMETER DESCRIPTION
name

Table name

TYPE: str

project

Project name

TYPE: str

entity

Entity name (optional, used for file-based storage path)

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
bool

True if deletion was successful, False otherwise

Source code in src/seeknal/featurestore/duckdbengine/featurestore.py
def delete(self, name: str, project: str, entity: Optional[str] = None) -> bool:
    """Delete an online table from the store.

    Removes the DuckDB table and any associated parquet files.

    Args:
        name: Table name
        project: Project name
        entity: Entity name (optional, used for file-based storage path)

    Returns:
        True if deletion was successful, False otherwise
    """
    # Try to drop the DuckDB table if it exists
    try:
        conn = self._get_connection()
        conn.execute(f"DROP TABLE IF EXISTS {name}")
        logger.info(f"Dropped online table '{name}' from DuckDB")
    except Exception as e:
        logger.warning(f"Could not drop DuckDB table '{name}': {e}")

    # Also clean up any file-based storage
    if isinstance(self.value, dict):
        base_path = self.value.get("path", "/tmp/feature_store_online")
    else:
        base_path = "/tmp/feature_store_online"

    # Build path to table directory
    if entity:
        table_dir = os.path.join(base_path, project, entity, name)
    else:
        # Fallback pattern for online tables without entity
        table_dir = os.path.join(base_path, project, name)

    if os.path.exists(table_dir):
        shutil.rmtree(table_dir)
        logger.info(f"Deleted online table files at {table_dir}")
        return True

    logger.info(f"Online table '{name}' deleted successfully")
    return True
FeatureStoreFileOutput(path: str) dataclass

File-based feature store output configuration.

FeatureGroupDuckDB(name: str, entity: Entity, materialization: Materialization = Materialization(), dataframe: Optional[pd.DataFrame] = None, description: Optional[str] = None, features: Optional[List[str]] = None, project: str = 'default', offline_watermarks: List[str] = list(), online_watermarks: List[str] = list(), version: Optional[int] = None) dataclass

DuckDB-based Feature Group.

A feature group is a collection of related features computed from source data.

Functions
set_dataframe(dataframe: pd.DataFrame) -> FeatureGroupDuckDB

Set the source dataframe for this feature group.

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def set_dataframe(self, dataframe: pd.DataFrame) -> 'FeatureGroupDuckDB':
    """Set the source dataframe for this feature group."""
    self.dataframe = dataframe
    return self
set_features(features: Optional[List[str]] = None) -> FeatureGroupDuckDB

Set which features to include.

If features is None, all columns except entity keys and event_time are used.

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def set_features(self, features: Optional[List[str]] = None) -> 'FeatureGroupDuckDB':
    """Set which features to include.

    If features is None, all columns except entity keys and event_time are used.
    """
    if features is None and self.dataframe is not None:
        # Auto-detect features
        reserved_cols = self.entity.join_keys.copy()
        if self.materialization.event_time_col:
            reserved_cols.append(self.materialization.event_time_col)

        self.features = [col for col in self.dataframe.columns if col not in reserved_cols]
    else:
        self.features = features

    return self
write(feature_start_time: Optional[datetime] = None, feature_end_time: Optional[datetime] = None, mode: str = 'overwrite') -> None

Materialize features to offline/online stores.

PARAMETER DESCRIPTION
feature_start_time

Start time for this batch

TYPE: Optional[datetime] DEFAULT: None

feature_end_time

End time for this batch

TYPE: Optional[datetime] DEFAULT: None

mode

Write mode - 'overwrite', 'append', or 'merge'

TYPE: str DEFAULT: 'overwrite'

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def write(
    self,
    feature_start_time: Optional[datetime] = None,
    feature_end_time: Optional[datetime] = None,
    mode: str = "overwrite"
) -> None:
    """Materialize features to offline/online stores.

    Args:
        feature_start_time: Start time for this batch
        feature_end_time: End time for this batch
        mode: Write mode - 'overwrite', 'append', or 'merge'
    """
    if self.dataframe is None:
        raise ValueError("Dataframe not set. Use set_dataframe() first.")

    # Prepare DataFrame for write
    df = self.dataframe.copy()

    # Add 'name' column for tracking feature group
    df['name'] = self.name

    # Ensure event_time column exists
    if self.materialization.event_time_col and self.materialization.event_time_col in df.columns:
        # Rename to standard 'event_time' column
        if self.materialization.event_time_col != 'event_time':
            df = df.rename(columns={self.materialization.event_time_col: 'event_time'})

    # Write to offline store
    if self.materialization.offline:
        self.materialization.offline_store.write(
            df=df,
            project=self.project,
            entity=self.entity.name,
            name=self.name,
            mode=mode,
            start_date=feature_start_time,
            end_date=feature_end_time,
        )

        # Update watermarks
        watermarks = self.materialization.offline_store.get_watermarks(
            project=self.project,
            entity=self.entity.name,
            name=self.name
        )
        self.offline_watermarks = watermarks

    logger.info(f"Wrote feature group '{self.name}' with {len(df)} rows")
get_or_create() -> FeatureGroupDuckDB

Get or create this feature group (idempotent operation).

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def get_or_create(self) -> 'FeatureGroupDuckDB':
    """Get or create this feature group (idempotent operation)."""
    # For simplicity, just load watermarks if they exist
    try:
        watermarks = self.materialization.offline_store.get_watermarks(
            project=self.project,
            entity=self.entity.name,
            name=self.name
        )
        self.offline_watermarks = watermarks
    except FileNotFoundError:
        # Feature group doesn't exist yet
        pass

    return self
delete() -> bool

Delete this feature group.

Removes all data files from the offline store.

RETURNS DESCRIPTION
bool

True if deletion was successful

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def delete(self) -> bool:
    """Delete this feature group.

    Removes all data files from the offline store.

    Returns:
        True if deletion was successful
    """
    # Delete from offline store
    result = self.materialization.offline_store.delete(
        project=self.project,
        entity=self.entity.name,
        name=self.name
    )
    logger.info(f"Deleted feature group '{self.name}'")
    return result
HistoricalFeaturesDuckDB(lookups: List[FeatureLookup], fill_nulls: Optional[List[FillNull]] = None, spine: Optional[pd.DataFrame] = None, date_col: Optional[str] = None, keep_cols: Optional[List[str]] = None, latest_strategy: Optional[GetLatestTimeStrategy] = None) dataclass

Point-in-time historical feature retrieval using DuckDB.

Performs point-in-time correct joins to get features as they existed at specific points in time.

Functions
to_dataframe(feature_start_time: Optional[datetime] = None, feature_end_time: Optional[datetime] = None) -> pd.DataFrame

Retrieve historical features as a DataFrame.

PARAMETER DESCRIPTION
feature_start_time

Start time for features

TYPE: Optional[datetime] DEFAULT: None

feature_end_time

End time for features

TYPE: Optional[datetime] DEFAULT: None

RETURNS DESCRIPTION
DataFrame

DataFrame with features

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def to_dataframe(
    self,
    feature_start_time: Optional[datetime] = None,
    feature_end_time: Optional[datetime] = None
) -> pd.DataFrame:
    """Retrieve historical features as a DataFrame.

    Args:
        feature_start_time: Start time for features
        feature_end_time: End time for features

    Returns:
        DataFrame with features
    """
    if not self.lookups:
        raise ValueError("No feature lookups specified")

    # Read all feature groups
    dfs = []
    for lookup in self.lookups:
        fg = lookup.source
        df = fg.materialization.offline_store.read(
            project=fg.project,
            entity=fg.entity.name,
            name=fg.name,
            start_date=feature_start_time,
            end_date=feature_end_time
        )
        dfs.append(df)

    # Merge all feature groups
    if len(dfs) == 1:
        result = dfs[0]
    else:
        # Join on entity keys + event_time
        result = dfs[0]
        for df in dfs[1:]:
            join_keys = self.lookups[0].source.entity.join_keys + ['event_time']
            result = result.merge(df, on=join_keys, how='outer')

    return result
using_spine(spine: pd.DataFrame, date_col: str, keep_cols: Optional[List[str]] = None) -> HistoricalFeaturesDuckDB

Use a spine (entity-date pairs) for point-in-time feature retrieval.

PARAMETER DESCRIPTION
spine

DataFrame with entity keys and application dates

TYPE: DataFrame

date_col

Name of the date column in spine

TYPE: str

keep_cols

Columns from spine to keep in the result

TYPE: Optional[List[str]] DEFAULT: None

RETURNS DESCRIPTION
HistoricalFeaturesDuckDB

Self for chaining

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def using_spine(
    self,
    spine: pd.DataFrame,
    date_col: str,
    keep_cols: Optional[List[str]] = None
) -> 'HistoricalFeaturesDuckDB':
    """Use a spine (entity-date pairs) for point-in-time feature retrieval.

    Args:
        spine: DataFrame with entity keys and application dates
        date_col: Name of the date column in spine
        keep_cols: Columns from spine to keep in the result

    Returns:
        Self for chaining
    """
    self.spine = spine
    self.date_col = date_col
    self.keep_cols = keep_cols or []
    return self
using_latest(fetch_strategy: GetLatestTimeStrategy = GetLatestTimeStrategy.REQUIRE_ALL) -> HistoricalFeaturesDuckDB

Get the latest available features.

PARAMETER DESCRIPTION
fetch_strategy

Strategy for handling missing features

TYPE: GetLatestTimeStrategy DEFAULT: REQUIRE_ALL

RETURNS DESCRIPTION
HistoricalFeaturesDuckDB

Self for chaining

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def using_latest(
    self,
    fetch_strategy: GetLatestTimeStrategy = GetLatestTimeStrategy.REQUIRE_ALL
) -> 'HistoricalFeaturesDuckDB':
    """Get the latest available features.

    Args:
        fetch_strategy: Strategy for handling missing features

    Returns:
        Self for chaining
    """
    self.latest_strategy = fetch_strategy
    return self
to_dataframe_with_spine() -> pd.DataFrame

Retrieve features using point-in-time join with spine.

For each row in the spine, gets features as they existed at or before the application date.

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def to_dataframe_with_spine(self) -> pd.DataFrame:
    """Retrieve features using point-in-time join with spine.

    For each row in the spine, gets features as they existed at or before
    the application date.
    """
    if self.spine is None:
        raise ValueError("Spine not set. Use using_spine() first.")

    conn = duckdb.connect()

    # Convert spine date column to datetime
    spine = self.spine.copy()
    if self.date_col in spine.columns:
        spine[self.date_col] = pd.to_datetime(spine[self.date_col])

    # Register spine
    conn.register("spine", spine)

    # Process each feature group
    result_dfs = []
    for lookup in self.lookups:
        fg = lookup.source

        # Read feature group
        fg_df = fg.materialization.offline_store.read(
            project=fg.project,
            entity=fg.entity.name,
            name=fg.name
        )

        # Ensure event_time is datetime
        if 'event_time' in fg_df.columns:
            fg_df['event_time'] = pd.to_datetime(fg_df['event_time'])

        # Register feature group
        conn.register("features", fg_df)

        # Build point-in-time join query
        entity_keys = fg.entity.join_keys
        join_conditions = " AND ".join([f"spine.{k} = features.{k}" for k in entity_keys])

        # Point-in-time join: get features where event_time <= app_date
        query = f"""
        SELECT
            spine.*,
            features.* EXCLUDE ({', '.join(entity_keys)}),
            ROW_NUMBER() OVER (
                PARTITION BY {', '.join([f'spine.{k}' for k in entity_keys])}, spine.{self.date_col}
                ORDER BY features.event_time DESC
            ) as rn
        FROM spine
        LEFT JOIN features
            ON {join_conditions}
            AND features.event_time <= spine.{self.date_col}
        """

        # Get most recent features
        result_df = conn.execute(f"""
            SELECT * EXCLUDE (rn)
            FROM ({query})
            WHERE rn = 1 OR rn IS NULL
        """).df()

        result_dfs.append(result_df)

    # Merge all feature groups
    if len(result_dfs) == 1:
        result = result_dfs[0]
    else:
        result = result_dfs[0]
        merge_keys = self.lookups[0].source.entity.join_keys + [self.date_col]
        for df in result_dfs[1:]:
            result = result.merge(df, on=merge_keys, how='outer')

    # Keep only requested columns from spine
    if self.keep_cols:
        keep_cols_set = set(self.keep_cols + self.lookups[0].source.entity.join_keys + [self.date_col])
        # Also keep feature columns
        feature_cols = [col for col in result.columns if col not in spine.columns or col in keep_cols_set]
        result = result[[col for col in result.columns if col in keep_cols_set or col in feature_cols]]

    conn.close()
    return result
serve(name: Optional[str] = None, target: Optional[OnlineStoreDuckDB] = None, ttl: Optional[timedelta] = None) -> OnlineFeaturesDuckDB

Materialize features to online store for serving.

PARAMETER DESCRIPTION
name

Name for the online table

TYPE: Optional[str] DEFAULT: None

target

Target online store

TYPE: Optional[OnlineStoreDuckDB] DEFAULT: None

ttl

Time-to-live for features

TYPE: Optional[timedelta] DEFAULT: None

RETURNS DESCRIPTION
OnlineFeaturesDuckDB

OnlineFeaturesDuckDB instance for serving

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def serve(
    self,
    name: Optional[str] = None,
    target: Optional[OnlineStoreDuckDB] = None,
    ttl: Optional[timedelta] = None
) -> 'OnlineFeaturesDuckDB':
    """Materialize features to online store for serving.

    Args:
        name: Name for the online table
        target: Target online store
        ttl: Time-to-live for features

    Returns:
        OnlineFeaturesDuckDB instance for serving
    """
    # Get latest features
    df = self.to_dataframe()

    # Generate table name if not provided
    if name is None:
        name = hashlib.md5(str(datetime.now().timestamp()).encode()).hexdigest()

    # Use target store or default
    if target is None:
        target = OnlineStoreDuckDB()

    # Write to online store
    target.write(df=df, table_name=name)

    # Create OnlineFeaturesDuckDB instance
    return OnlineFeaturesDuckDB(
        name=name,
        lookup_key=self.lookups[0].source.entity,
        online_store=target
    )
OnlineFeaturesDuckDB(name: str, lookup_key: Entity, online_store: Optional[OnlineStoreDuckDB] = None, lookups: Optional[List[FeatureLookup]] = None, project: str = 'default', id: Optional[str] = None) dataclass

Online feature serving using DuckDB.

Provides low-latency feature lookups for real-time predictions. Also known as OnlineTableDuckDB in some contexts.

Functions
get_features(keys: List[Union[Entity, Dict[str, Any]]]) -> pd.DataFrame

Get features for specific entity keys.

PARAMETER DESCRIPTION
keys

List of entity instances or key dictionaries

TYPE: List[Union[Entity, Dict[str, Any]]]

RETURNS DESCRIPTION
DataFrame

DataFrame with features for the requested keys

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def get_features(self, keys: List[Union[Entity, Dict[str, Any]]]) -> pd.DataFrame:
    """Get features for specific entity keys.

    Args:
        keys: List of entity instances or key dictionaries

    Returns:
        DataFrame with features for the requested keys
    """
    # Convert Entity instances to dicts
    key_dicts = []
    for key in keys:
        if isinstance(key, Entity):
            # Extract key values from Entity
            key_dict = key.key_values if hasattr(key, 'key_values') else {}
        elif isinstance(key, dict):
            key_dict = key
        else:
            raise ValueError(f"Invalid key type: {type(key)}")

        key_dicts.append(key_dict)

    # Read from online store
    df = self.online_store.read(table_name=self.name, keys=key_dicts)

    return df
delete() -> bool

Delete this online table.

Removes all data files from the online store and cleans up metadata from the database.

RETURNS DESCRIPTION
bool

True if deletion was successful

RAISES DESCRIPTION
Exception

If file deletion fails (metadata cleanup still attempted)

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def delete(self) -> bool:
    """Delete this online table.

    Removes all data files from the online store and cleans up metadata
    from the database.

    Returns:
        True if deletion was successful

    Raises:
        Exception: If file deletion fails (metadata cleanup still attempted)
    """
    deletion_success = True

    # Step 1: Delete data files from online store
    try:
        entity_name = self.lookup_key.name if self.lookup_key else None
        self.online_store.delete(
            name=self.name,
            project=self.project,
            entity=entity_name
        )
        logger.info(f"Deleted online table data files for '{self.name}'")
    except Exception as e:
        logger.error(f"Failed to delete online table data files for '{self.name}': {e}")
        deletion_success = False

    # Step 2: Clean up metadata from database
    try:
        if self.id is not None:
            OnlineTableRequest.delete_by_id(self.id)
            logger.info(f"Deleted online table metadata for '{self.name}' (id={self.id})")
    except Exception as e:
        logger.error(f"Failed to delete online table metadata for '{self.name}': {e}")
        deletion_success = False

    if deletion_success:
        logger.info(f"Successfully deleted online table '{self.name}'")
    else:
        logger.warning(f"Partial deletion of online table '{self.name}' - some cleanup may have failed")

    return deletion_success
FeatureLookup(source: FeatureGroupDuckDB, features: Optional[List[str]] = None) dataclass

Defines which features to retrieve from a feature group.

FillNull(value: str, dataType: str) dataclass

Configuration for filling null values.

GetLatestTimeStrategy

Bases: str, Enum

Strategy for getting latest features.

Materialization(event_time_col: Optional[str] = None, offline: bool = True, online: bool = False, offline_store: Optional[OfflineStoreDuckDB] = None, online_store: Optional[OnlineStoreDuckDB] = None) dataclass

Materialization configuration for a feature group.

Modules

feature_group

DuckDB-based Feature Group implementation.

Provides feature group management, historical feature retrieval, and online serving using DuckDB instead of Spark.

Classes
GetLatestTimeStrategy

Bases: str, Enum

Strategy for getting latest features.

FillNull(value: str, dataType: str) dataclass

Configuration for filling null values.

FeatureLookup(source: FeatureGroupDuckDB, features: Optional[List[str]] = None) dataclass

Defines which features to retrieve from a feature group.

Materialization(event_time_col: Optional[str] = None, offline: bool = True, online: bool = False, offline_store: Optional[OfflineStoreDuckDB] = None, online_store: Optional[OnlineStoreDuckDB] = None) dataclass

Materialization configuration for a feature group.

FeatureGroupDuckDB(name: str, entity: Entity, materialization: Materialization = Materialization(), dataframe: Optional[pd.DataFrame] = None, description: Optional[str] = None, features: Optional[List[str]] = None, project: str = 'default', offline_watermarks: List[str] = list(), online_watermarks: List[str] = list(), version: Optional[int] = None) dataclass

DuckDB-based Feature Group.

A feature group is a collection of related features computed from source data.

Functions
set_dataframe(dataframe: pd.DataFrame) -> FeatureGroupDuckDB

Set the source dataframe for this feature group.

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def set_dataframe(self, dataframe: pd.DataFrame) -> 'FeatureGroupDuckDB':
    """Set the source dataframe for this feature group."""
    self.dataframe = dataframe
    return self
set_features(features: Optional[List[str]] = None) -> FeatureGroupDuckDB

Set which features to include.

If features is None, all columns except entity keys and event_time are used.

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def set_features(self, features: Optional[List[str]] = None) -> 'FeatureGroupDuckDB':
    """Set which features to include.

    If features is None, all columns except entity keys and event_time are used.
    """
    if features is None and self.dataframe is not None:
        # Auto-detect features
        reserved_cols = self.entity.join_keys.copy()
        if self.materialization.event_time_col:
            reserved_cols.append(self.materialization.event_time_col)

        self.features = [col for col in self.dataframe.columns if col not in reserved_cols]
    else:
        self.features = features

    return self
write(feature_start_time: Optional[datetime] = None, feature_end_time: Optional[datetime] = None, mode: str = 'overwrite') -> None

Materialize features to offline/online stores.

PARAMETER DESCRIPTION
feature_start_time

Start time for this batch

TYPE: Optional[datetime] DEFAULT: None

feature_end_time

End time for this batch

TYPE: Optional[datetime] DEFAULT: None

mode

Write mode - 'overwrite', 'append', or 'merge'

TYPE: str DEFAULT: 'overwrite'

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def write(
    self,
    feature_start_time: Optional[datetime] = None,
    feature_end_time: Optional[datetime] = None,
    mode: str = "overwrite"
) -> None:
    """Materialize features to offline/online stores.

    Args:
        feature_start_time: Start time for this batch
        feature_end_time: End time for this batch
        mode: Write mode - 'overwrite', 'append', or 'merge'
    """
    if self.dataframe is None:
        raise ValueError("Dataframe not set. Use set_dataframe() first.")

    # Prepare DataFrame for write
    df = self.dataframe.copy()

    # Add 'name' column for tracking feature group
    df['name'] = self.name

    # Ensure event_time column exists
    if self.materialization.event_time_col and self.materialization.event_time_col in df.columns:
        # Rename to standard 'event_time' column
        if self.materialization.event_time_col != 'event_time':
            df = df.rename(columns={self.materialization.event_time_col: 'event_time'})

    # Write to offline store
    if self.materialization.offline:
        self.materialization.offline_store.write(
            df=df,
            project=self.project,
            entity=self.entity.name,
            name=self.name,
            mode=mode,
            start_date=feature_start_time,
            end_date=feature_end_time,
        )

        # Update watermarks
        watermarks = self.materialization.offline_store.get_watermarks(
            project=self.project,
            entity=self.entity.name,
            name=self.name
        )
        self.offline_watermarks = watermarks

    logger.info(f"Wrote feature group '{self.name}' with {len(df)} rows")
get_or_create() -> FeatureGroupDuckDB

Get or create this feature group (idempotent operation).

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def get_or_create(self) -> 'FeatureGroupDuckDB':
    """Get or create this feature group (idempotent operation)."""
    # For simplicity, just load watermarks if they exist
    try:
        watermarks = self.materialization.offline_store.get_watermarks(
            project=self.project,
            entity=self.entity.name,
            name=self.name
        )
        self.offline_watermarks = watermarks
    except FileNotFoundError:
        # Feature group doesn't exist yet
        pass

    return self
delete() -> bool

Delete this feature group.

Removes all data files from the offline store.

RETURNS DESCRIPTION
bool

True if deletion was successful

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def delete(self) -> bool:
    """Delete this feature group.

    Removes all data files from the offline store.

    Returns:
        True if deletion was successful
    """
    # Delete from offline store
    result = self.materialization.offline_store.delete(
        project=self.project,
        entity=self.entity.name,
        name=self.name
    )
    logger.info(f"Deleted feature group '{self.name}'")
    return result
HistoricalFeaturesDuckDB(lookups: List[FeatureLookup], fill_nulls: Optional[List[FillNull]] = None, spine: Optional[pd.DataFrame] = None, date_col: Optional[str] = None, keep_cols: Optional[List[str]] = None, latest_strategy: Optional[GetLatestTimeStrategy] = None) dataclass

Point-in-time historical feature retrieval using DuckDB.

Performs point-in-time correct joins to get features as they existed at specific points in time.

Functions
to_dataframe(feature_start_time: Optional[datetime] = None, feature_end_time: Optional[datetime] = None) -> pd.DataFrame

Retrieve historical features as a DataFrame.

PARAMETER DESCRIPTION
feature_start_time

Start time for features

TYPE: Optional[datetime] DEFAULT: None

feature_end_time

End time for features

TYPE: Optional[datetime] DEFAULT: None

RETURNS DESCRIPTION
DataFrame

DataFrame with features

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def to_dataframe(
    self,
    feature_start_time: Optional[datetime] = None,
    feature_end_time: Optional[datetime] = None
) -> pd.DataFrame:
    """Retrieve historical features as a DataFrame.

    Args:
        feature_start_time: Start time for features
        feature_end_time: End time for features

    Returns:
        DataFrame with features
    """
    if not self.lookups:
        raise ValueError("No feature lookups specified")

    # Read all feature groups
    dfs = []
    for lookup in self.lookups:
        fg = lookup.source
        df = fg.materialization.offline_store.read(
            project=fg.project,
            entity=fg.entity.name,
            name=fg.name,
            start_date=feature_start_time,
            end_date=feature_end_time
        )
        dfs.append(df)

    # Merge all feature groups
    if len(dfs) == 1:
        result = dfs[0]
    else:
        # Join on entity keys + event_time
        result = dfs[0]
        for df in dfs[1:]:
            join_keys = self.lookups[0].source.entity.join_keys + ['event_time']
            result = result.merge(df, on=join_keys, how='outer')

    return result
using_spine(spine: pd.DataFrame, date_col: str, keep_cols: Optional[List[str]] = None) -> HistoricalFeaturesDuckDB

Use a spine (entity-date pairs) for point-in-time feature retrieval.

PARAMETER DESCRIPTION
spine

DataFrame with entity keys and application dates

TYPE: DataFrame

date_col

Name of the date column in spine

TYPE: str

keep_cols

Columns from spine to keep in the result

TYPE: Optional[List[str]] DEFAULT: None

RETURNS DESCRIPTION
HistoricalFeaturesDuckDB

Self for chaining

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def using_spine(
    self,
    spine: pd.DataFrame,
    date_col: str,
    keep_cols: Optional[List[str]] = None
) -> 'HistoricalFeaturesDuckDB':
    """Use a spine (entity-date pairs) for point-in-time feature retrieval.

    Args:
        spine: DataFrame with entity keys and application dates
        date_col: Name of the date column in spine
        keep_cols: Columns from spine to keep in the result

    Returns:
        Self for chaining
    """
    self.spine = spine
    self.date_col = date_col
    self.keep_cols = keep_cols or []
    return self
using_latest(fetch_strategy: GetLatestTimeStrategy = GetLatestTimeStrategy.REQUIRE_ALL) -> HistoricalFeaturesDuckDB

Get the latest available features.

PARAMETER DESCRIPTION
fetch_strategy

Strategy for handling missing features

TYPE: GetLatestTimeStrategy DEFAULT: REQUIRE_ALL

RETURNS DESCRIPTION
HistoricalFeaturesDuckDB

Self for chaining

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def using_latest(
    self,
    fetch_strategy: GetLatestTimeStrategy = GetLatestTimeStrategy.REQUIRE_ALL
) -> 'HistoricalFeaturesDuckDB':
    """Get the latest available features.

    Args:
        fetch_strategy: Strategy for handling missing features

    Returns:
        Self for chaining
    """
    self.latest_strategy = fetch_strategy
    return self
to_dataframe_with_spine() -> pd.DataFrame

Retrieve features using point-in-time join with spine.

For each row in the spine, gets features as they existed at or before the application date.

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def to_dataframe_with_spine(self) -> pd.DataFrame:
    """Retrieve features using point-in-time join with spine.

    For each row in the spine, gets features as they existed at or before
    the application date.
    """
    if self.spine is None:
        raise ValueError("Spine not set. Use using_spine() first.")

    conn = duckdb.connect()

    # Convert spine date column to datetime
    spine = self.spine.copy()
    if self.date_col in spine.columns:
        spine[self.date_col] = pd.to_datetime(spine[self.date_col])

    # Register spine
    conn.register("spine", spine)

    # Process each feature group
    result_dfs = []
    for lookup in self.lookups:
        fg = lookup.source

        # Read feature group
        fg_df = fg.materialization.offline_store.read(
            project=fg.project,
            entity=fg.entity.name,
            name=fg.name
        )

        # Ensure event_time is datetime
        if 'event_time' in fg_df.columns:
            fg_df['event_time'] = pd.to_datetime(fg_df['event_time'])

        # Register feature group
        conn.register("features", fg_df)

        # Build point-in-time join query
        entity_keys = fg.entity.join_keys
        join_conditions = " AND ".join([f"spine.{k} = features.{k}" for k in entity_keys])

        # Point-in-time join: get features where event_time <= app_date
        query = f"""
        SELECT
            spine.*,
            features.* EXCLUDE ({', '.join(entity_keys)}),
            ROW_NUMBER() OVER (
                PARTITION BY {', '.join([f'spine.{k}' for k in entity_keys])}, spine.{self.date_col}
                ORDER BY features.event_time DESC
            ) as rn
        FROM spine
        LEFT JOIN features
            ON {join_conditions}
            AND features.event_time <= spine.{self.date_col}
        """

        # Get most recent features
        result_df = conn.execute(f"""
            SELECT * EXCLUDE (rn)
            FROM ({query})
            WHERE rn = 1 OR rn IS NULL
        """).df()

        result_dfs.append(result_df)

    # Merge all feature groups
    if len(result_dfs) == 1:
        result = result_dfs[0]
    else:
        result = result_dfs[0]
        merge_keys = self.lookups[0].source.entity.join_keys + [self.date_col]
        for df in result_dfs[1:]:
            result = result.merge(df, on=merge_keys, how='outer')

    # Keep only requested columns from spine
    if self.keep_cols:
        keep_cols_set = set(self.keep_cols + self.lookups[0].source.entity.join_keys + [self.date_col])
        # Also keep feature columns
        feature_cols = [col for col in result.columns if col not in spine.columns or col in keep_cols_set]
        result = result[[col for col in result.columns if col in keep_cols_set or col in feature_cols]]

    conn.close()
    return result
serve(name: Optional[str] = None, target: Optional[OnlineStoreDuckDB] = None, ttl: Optional[timedelta] = None) -> OnlineFeaturesDuckDB

Materialize features to online store for serving.

PARAMETER DESCRIPTION
name

Name for the online table

TYPE: Optional[str] DEFAULT: None

target

Target online store

TYPE: Optional[OnlineStoreDuckDB] DEFAULT: None

ttl

Time-to-live for features

TYPE: Optional[timedelta] DEFAULT: None

RETURNS DESCRIPTION
OnlineFeaturesDuckDB

OnlineFeaturesDuckDB instance for serving

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def serve(
    self,
    name: Optional[str] = None,
    target: Optional[OnlineStoreDuckDB] = None,
    ttl: Optional[timedelta] = None
) -> 'OnlineFeaturesDuckDB':
    """Materialize features to online store for serving.

    Args:
        name: Name for the online table
        target: Target online store
        ttl: Time-to-live for features

    Returns:
        OnlineFeaturesDuckDB instance for serving
    """
    # Get latest features
    df = self.to_dataframe()

    # Generate table name if not provided
    if name is None:
        name = hashlib.md5(str(datetime.now().timestamp()).encode()).hexdigest()

    # Use target store or default
    if target is None:
        target = OnlineStoreDuckDB()

    # Write to online store
    target.write(df=df, table_name=name)

    # Create OnlineFeaturesDuckDB instance
    return OnlineFeaturesDuckDB(
        name=name,
        lookup_key=self.lookups[0].source.entity,
        online_store=target
    )
OnlineFeaturesDuckDB(name: str, lookup_key: Entity, online_store: Optional[OnlineStoreDuckDB] = None, lookups: Optional[List[FeatureLookup]] = None, project: str = 'default', id: Optional[str] = None) dataclass

Online feature serving using DuckDB.

Provides low-latency feature lookups for real-time predictions. Also known as OnlineTableDuckDB in some contexts.

Functions
get_features(keys: List[Union[Entity, Dict[str, Any]]]) -> pd.DataFrame

Get features for specific entity keys.

PARAMETER DESCRIPTION
keys

List of entity instances or key dictionaries

TYPE: List[Union[Entity, Dict[str, Any]]]

RETURNS DESCRIPTION
DataFrame

DataFrame with features for the requested keys

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def get_features(self, keys: List[Union[Entity, Dict[str, Any]]]) -> pd.DataFrame:
    """Get features for specific entity keys.

    Args:
        keys: List of entity instances or key dictionaries

    Returns:
        DataFrame with features for the requested keys
    """
    # Convert Entity instances to dicts
    key_dicts = []
    for key in keys:
        if isinstance(key, Entity):
            # Extract key values from Entity
            key_dict = key.key_values if hasattr(key, 'key_values') else {}
        elif isinstance(key, dict):
            key_dict = key
        else:
            raise ValueError(f"Invalid key type: {type(key)}")

        key_dicts.append(key_dict)

    # Read from online store
    df = self.online_store.read(table_name=self.name, keys=key_dicts)

    return df
delete() -> bool

Delete this online table.

Removes all data files from the online store and cleans up metadata from the database.

RETURNS DESCRIPTION
bool

True if deletion was successful

RAISES DESCRIPTION
Exception

If file deletion fails (metadata cleanup still attempted)

Source code in src/seeknal/featurestore/duckdbengine/feature_group.py
def delete(self) -> bool:
    """Delete this online table.

    Removes all data files from the online store and cleans up metadata
    from the database.

    Returns:
        True if deletion was successful

    Raises:
        Exception: If file deletion fails (metadata cleanup still attempted)
    """
    deletion_success = True

    # Step 1: Delete data files from online store
    try:
        entity_name = self.lookup_key.name if self.lookup_key else None
        self.online_store.delete(
            name=self.name,
            project=self.project,
            entity=entity_name
        )
        logger.info(f"Deleted online table data files for '{self.name}'")
    except Exception as e:
        logger.error(f"Failed to delete online table data files for '{self.name}': {e}")
        deletion_success = False

    # Step 2: Clean up metadata from database
    try:
        if self.id is not None:
            OnlineTableRequest.delete_by_id(self.id)
            logger.info(f"Deleted online table metadata for '{self.name}' (id={self.id})")
    except Exception as e:
        logger.error(f"Failed to delete online table metadata for '{self.name}': {e}")
        deletion_success = False

    if deletion_success:
        logger.info(f"Successfully deleted online table '{self.name}'")
    else:
        logger.warning(f"Partial deletion of online table '{self.name}' - some cleanup may have failed")

    return deletion_success
featurestore

DuckDB-based offline and online feature stores.

This module replaces Spark/Delta Lake with DuckDB for feature storage and retrieval.

Classes
OfflineStoreEnum

Bases: str, Enum

Offline store backend types.

OnlineStoreEnum

Bases: str, Enum

Online store backend types.

FeatureStoreFileOutput(path: str) dataclass

File-based feature store output configuration.

OfflineStoreDuckDB(value: Optional[Union[str, FeatureStoreFileOutput]] = None, kind: OfflineStoreEnum = OfflineStoreEnum.PARQUET, name: Optional[str] = None, connection: Optional[duckdb.DuckDBPyConnection] = None) dataclass

DuckDB-based offline feature store.

Stores features in DuckDB tables or Parquet files with metadata tracking. Provides ACID-like guarantees through atomic file operations and metadata.

Functions
write(df: pd.DataFrame, project: str, entity: str, name: str, mode: str = 'overwrite', start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, **kwargs) -> None

Write features to offline store.

PARAMETER DESCRIPTION
df

Features to write

TYPE: DataFrame

project

Project name

TYPE: str

entity

Entity name

TYPE: str

name

Feature group name

TYPE: str

mode

Write mode - 'overwrite', 'append', or 'merge'

TYPE: str DEFAULT: 'overwrite'

start_date

Start date for this batch

TYPE: Optional[datetime] DEFAULT: None

end_date

End date for this batch

TYPE: Optional[datetime] DEFAULT: None

Source code in src/seeknal/featurestore/duckdbengine/featurestore.py
def write(
    self,
    df: pd.DataFrame,
    project: str,
    entity: str,
    name: str,
    mode: str = "overwrite",
    start_date: Optional[datetime] = None,
    end_date: Optional[datetime] = None,
    **kwargs
) -> None:
    """Write features to offline store.

    Args:
        df: Features to write
        project: Project name
        entity: Entity name
        name: Feature group name
        mode: Write mode - 'overwrite', 'append', or 'merge'
        start_date: Start date for this batch
        end_date: End date for this batch
    """
    if df is None or len(df) == 0:
        logger.warning("Empty DataFrame, skipping write")
        return

    table_dir = self._get_table_path(project, entity, name)
    metadata = self._load_metadata(table_dir)

    # Generate version ID
    version = datetime.now().strftime("%Y%m%d_%H%M%S")

    if mode == "overwrite":
        self._write_overwrite(df, table_dir, metadata, version, start_date, end_date)
    elif mode == "append":
        self._write_append(df, table_dir, metadata, version, start_date, end_date)
    elif mode == "merge":
        self._write_merge(df, table_dir, metadata, version, start_date, end_date)
    else:
        raise ValueError(f"Unknown mode: {mode}")

    # Update metadata
    metadata["versions"].append({
        "version": version,
        "start_date": str(start_date) if start_date else None,
        "end_date": str(end_date) if end_date else None,
        "mode": mode,
        "rows": len(df),
        "timestamp": datetime.now().isoformat()
    })

    # Track watermarks
    if start_date:
        watermark = str(start_date)
        if watermark not in metadata["watermarks"]:
            metadata["watermarks"].append(watermark)
    if end_date and end_date != "none":
        watermark = str(end_date)
        if watermark not in metadata["watermarks"]:
            metadata["watermarks"].append(watermark)

    metadata["watermarks"] = sorted(set(metadata["watermarks"]))

    # Save schema
    metadata["schema"] = {col: str(dtype) for col, dtype in df.dtypes.items()}

    self._save_metadata(table_dir, metadata)
    logger.info(f"Wrote {len(df)} rows to {table_dir} (mode={mode}, version={version})")
read(project: str, entity: str, name: str, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, **kwargs) -> pd.DataFrame

Read features from offline store.

PARAMETER DESCRIPTION
project

Project name

TYPE: str

entity

Entity name

TYPE: str

name

Feature group name

TYPE: str

start_date

Filter start date

TYPE: Optional[datetime] DEFAULT: None

end_date

Filter end date

TYPE: Optional[datetime] DEFAULT: None

RETURNS DESCRIPTION
DataFrame

DataFrame with features

Source code in src/seeknal/featurestore/duckdbengine/featurestore.py
def read(
    self,
    project: str,
    entity: str,
    name: str,
    start_date: Optional[datetime] = None,
    end_date: Optional[datetime] = None,
    **kwargs
) -> pd.DataFrame:
    """Read features from offline store.

    Args:
        project: Project name
        entity: Entity name
        name: Feature group name
        start_date: Filter start date
        end_date: Filter end date

    Returns:
        DataFrame with features
    """
    table_dir = self._get_table_path(project, entity, name)
    parquet_path = os.path.join(table_dir, "data.parquet")

    if not os.path.exists(parquet_path):
        raise FileNotFoundError(f"Feature group not found: {name}")

    # Read parquet file
    df = pd.read_parquet(parquet_path)

    # Apply date filters if provided
    if start_date and 'event_time' in df.columns:
        df = df[df['event_time'] >= start_date]

    if end_date and end_date != "none" and 'event_time' in df.columns:
        df = df[df['event_time'] <= end_date]

    logger.info(f"Read {len(df)} rows from {table_dir}")
    return df
get_watermarks(project: str, entity: str, name: str) -> list

Get watermarks for a feature group.

Source code in src/seeknal/featurestore/duckdbengine/featurestore.py
def get_watermarks(self, project: str, entity: str, name: str) -> list:
    """Get watermarks for a feature group."""
    table_dir = self._get_table_path(project, entity, name)
    metadata = self._load_metadata(table_dir)
    return metadata.get("watermarks", [])
delete(project: str, entity: str, name: str) -> bool

Delete a feature group from the offline store.

Removes all data files and metadata associated with the feature group.

PARAMETER DESCRIPTION
project

Project name

TYPE: str

entity

Entity name

TYPE: str

name

Feature group name

TYPE: str

RETURNS DESCRIPTION
bool

True if deletion was successful, False otherwise

Source code in src/seeknal/featurestore/duckdbengine/featurestore.py
def delete(self, project: str, entity: str, name: str) -> bool:
    """Delete a feature group from the offline store.

    Removes all data files and metadata associated with the feature group.

    Args:
        project: Project name
        entity: Entity name
        name: Feature group name

    Returns:
        True if deletion was successful, False otherwise
    """
    if isinstance(self.value, dict):
        base_path = self.value.get("path", "/tmp/feature_store")
    else:
        base_path = "/tmp/feature_store"

    # Build path to feature group directory
    table_dir = os.path.join(base_path, project, entity, name)

    if os.path.exists(table_dir):
        shutil.rmtree(table_dir)
        logger.info(f"Deleted offline feature group at {table_dir}")
        return True
    else:
        logger.warning(f"Offline feature group directory not found: {table_dir}")
        return False
OnlineStoreDuckDB(value: Optional[Union[str, FeatureStoreFileOutput]] = None, kind: OnlineStoreEnum = OnlineStoreEnum.DUCKDB_TABLE, name: Optional[str] = None, connection: Optional[duckdb.DuckDBPyConnection] = None) dataclass

DuckDB-based online feature store.

Stores features for low-latency serving using DuckDB tables.

Functions
write(df: pd.DataFrame, table_name: str, **kwargs) -> None

Write features to online store.

PARAMETER DESCRIPTION
df

Features to write

TYPE: DataFrame

table_name

Name of the online table

TYPE: str

Source code in src/seeknal/featurestore/duckdbengine/featurestore.py
def write(
    self,
    df: pd.DataFrame,
    table_name: str,
    **kwargs
) -> None:
    """Write features to online store.

    Args:
        df: Features to write
        table_name: Name of the online table
    """
    conn = self._get_connection()

    # Drop table if exists and recreate
    conn.execute(f"DROP TABLE IF EXISTS {table_name}")
    conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM df")

    logger.info(f"Wrote {len(df)} rows to online table {table_name}")
read(table_name: str, keys: Optional[list] = None, **kwargs) -> pd.DataFrame

Read features from online store.

PARAMETER DESCRIPTION
table_name

Name of the online table

TYPE: str

keys

Filter by specific keys (list of dicts with key-value pairs)

TYPE: Optional[list] DEFAULT: None

RETURNS DESCRIPTION
DataFrame

DataFrame with features

Source code in src/seeknal/featurestore/duckdbengine/featurestore.py
def read(
    self,
    table_name: str,
    keys: Optional[list] = None,
    **kwargs
) -> pd.DataFrame:
    """Read features from online store.

    Args:
        table_name: Name of the online table
        keys: Filter by specific keys (list of dicts with key-value pairs)

    Returns:
        DataFrame with features
    """
    conn = self._get_connection()

    if keys:
        # Build WHERE clause from keys
        conditions = []
        for key_dict in keys:
            key_conditions = [f"{k} = '{v}'" for k, v in key_dict.items()]
            conditions.append("(" + " AND ".join(key_conditions) + ")")

        where_clause = " OR ".join(conditions)
        query = f"SELECT * FROM {table_name} WHERE {where_clause}"
    else:
        query = f"SELECT * FROM {table_name}"

    df = conn.execute(query).df()
    logger.info(f"Read {len(df)} rows from online table {table_name}")
    return df
delete(name: str, project: str, entity: Optional[str] = None) -> bool

Delete an online table from the store.

Removes the DuckDB table and any associated parquet files.

PARAMETER DESCRIPTION
name

Table name

TYPE: str

project

Project name

TYPE: str

entity

Entity name (optional, used for file-based storage path)

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
bool

True if deletion was successful, False otherwise

Source code in src/seeknal/featurestore/duckdbengine/featurestore.py
def delete(self, name: str, project: str, entity: Optional[str] = None) -> bool:
    """Delete an online table from the store.

    Removes the DuckDB table and any associated parquet files.

    Args:
        name: Table name
        project: Project name
        entity: Entity name (optional, used for file-based storage path)

    Returns:
        True if deletion was successful, False otherwise
    """
    # Try to drop the DuckDB table if it exists
    try:
        conn = self._get_connection()
        conn.execute(f"DROP TABLE IF EXISTS {name}")
        logger.info(f"Dropped online table '{name}' from DuckDB")
    except Exception as e:
        logger.warning(f"Could not drop DuckDB table '{name}': {e}")

    # Also clean up any file-based storage
    if isinstance(self.value, dict):
        base_path = self.value.get("path", "/tmp/feature_store_online")
    else:
        base_path = "/tmp/feature_store_online"

    # Build path to table directory
    if entity:
        table_dir = os.path.join(base_path, project, entity, name)
    else:
        # Fallback pattern for online tables without entity
        table_dir = os.path.join(base_path, project, name)

    if os.path.exists(table_dir):
        shutil.rmtree(table_dir)
        logger.info(f"Deleted online table files at {table_dir}")
        return True

    logger.info(f"Online table '{name}' deleted successfully")
    return True

feature_group

Classes

OfflineStoreEnum

Bases: str, Enum

Enumeration of supported offline storage types.

ATTRIBUTE DESCRIPTION
HIVE_TABLE

Store features as a Hive table in a database.

FILE

Store features as files on the filesystem (e.g., Delta format).

ICEBERG

Store features in Apache Iceberg tables with ACID transactions, time travel, and cloud storage compatibility.

OnlineStoreEnum

Bases: str, Enum

Enumeration of supported online storage types.

ATTRIBUTE DESCRIPTION
HIVE_TABLE

Store features as a Hive table for online serving.

FILE

Store features as Parquet files for online serving.

FileKindEnum

Bases: str, Enum

Enumeration of supported file formats for feature storage.

ATTRIBUTE DESCRIPTION
DELTA

Delta Lake format, providing ACID transactions and versioning.

FeatureStoreFileOutput(path: str, kind: FileKindEnum = FileKindEnum.DELTA) dataclass

Configuration for file-based feature store output.

ATTRIBUTE DESCRIPTION
path

The filesystem path for storing feature data. A security warning will be logged if this path is in an insecure location (e.g., /tmp).

TYPE: str

kind

The file format to use (default: DELTA).

TYPE: FileKindEnum

Functions
to_dict()

Convert the configuration to a dictionary representation.

RETURNS DESCRIPTION
dict

Dictionary with 'path' and 'kind' keys.

Source code in src/seeknal/featurestore/featurestore.py
def to_dict(self):
    """Convert the configuration to a dictionary representation.

    Returns:
        dict: Dictionary with 'path' and 'kind' keys.
    """
    return {"path": self.path, "kind": self.kind.value}
FeatureStoreHiveTableOutput(database: str) dataclass

Configuration for Hive table-based feature store output.

ATTRIBUTE DESCRIPTION
database

The Hive database name where features will be stored.

TYPE: str

Functions
to_dict()

Convert the configuration to a dictionary representation.

RETURNS DESCRIPTION
dict

Dictionary with 'database' key.

Source code in src/seeknal/featurestore/featurestore.py
def to_dict(self):
    """Convert the configuration to a dictionary representation.

    Returns:
        dict: Dictionary with 'database' key.
    """
    return {"database": self.database}
IcebergStoreOutput(table: str, catalog: str = 'lakekeeper', namespace: str = 'default', warehouse: Optional[str] = None, mode: str = 'append') dataclass

Iceberg storage configuration for feature group materialization.

This configuration enables storing features in Apache Iceberg tables with ACID transactions, time travel, and cloud storage compatibility.

PARAMETER DESCRIPTION
table

Table name within namespace

TYPE: str

catalog

Catalog name from profiles.yml (default: "lakekeeper")

TYPE: str DEFAULT: 'lakekeeper'

namespace

Iceberg namespace/database (default: "default")

TYPE: str DEFAULT: 'default'

warehouse

Optional warehouse path override (s3://, gs://, azure://)

TYPE: Optional[str] DEFAULT: None

mode

Write mode - "append" or "overwrite" (default: "append")

TYPE: str DEFAULT: 'append'

Functions
to_dict()

Convert the configuration to a dictionary representation.

RETURNS DESCRIPTION
dict

Dictionary with all Iceberg configuration keys.

Source code in src/seeknal/featurestore/featurestore.py
def to_dict(self):
    """Convert the configuration to a dictionary representation.

    Returns:
        dict: Dictionary with all Iceberg configuration keys.
    """
    return {
        "catalog": self.catalog,
        "warehouse": self.warehouse,
        "namespace": self.namespace,
        "table": self.table,
        "mode": self.mode,
    }
OfflineStore(value: Optional[Union[str, FeatureStoreFileOutput, FeatureStoreHiveTableOutput, IcebergStoreOutput]] = None, kind: OfflineStoreEnum = OfflineStoreEnum.HIVE_TABLE, name: Optional[str] = None) dataclass

Configuration for offline feature store storage.

ATTRIBUTE DESCRIPTION
value

The storage configuration (path for FILE, database for HIVE_TABLE). A security warning will be logged if a file path is in an insecure location (e.g., /tmp).

TYPE: Optional[Union[str, FeatureStoreFileOutput, FeatureStoreHiveTableOutput, IcebergStoreOutput]]

kind

The storage type (FILE or HIVE_TABLE).

TYPE: OfflineStoreEnum

name

Optional name for this offline store configuration.

TYPE: Optional[str]

Functions
get_or_create()

Retrieve an existing offline store or create a new one.

If an offline store with the specified name exists, it is retrieved and its configuration is loaded. Otherwise, a new offline store is created with the current configuration.

RETURNS DESCRIPTION
OfflineStore

The current instance with id populated.

Note

The store name defaults to "default" if not specified.

Source code in src/seeknal/featurestore/featurestore.py
def get_or_create(self):
    """Retrieve an existing offline store or create a new one.

    If an offline store with the specified name exists, it is retrieved and
    its configuration is loaded. Otherwise, a new offline store is created
    with the current configuration.

    Returns:
        OfflineStore: The current instance with id populated.

    Note:
        The store name defaults to "default" if not specified.
    """
    if self.name is None:
        name = "default"
    else:
        name = self.name
    offline_store = FeatureGroupRequest.get_offline_store_by_name(self.kind, name)
    if offline_store is None:
        offline_store = FeatureGroupRequest.save_offline_store(
            self.kind, self.value, name
        )
        self.id = offline_store.id
    else:
        self.id = offline_store.id
        self.kind = offline_store.kind
        value_params = json.loads(offline_store.params)
        if self.kind == OfflineStoreEnum.HIVE_TABLE:
            if self.value is not None:
                self.value = FeatureStoreHiveTableOutput(**value_params)
        elif self.kind == OfflineStoreEnum.FILE:
            if self.value is not None:
                self.value = FeatureStoreFileOutput(
                    path=value_params["path"],
                    kind=FileKindEnum(value_params["kind"]),
                )
        elif self.kind == OfflineStoreEnum.ICEBERG:
            if self.value is not None:
                self.value = IcebergStoreOutput(**value_params)
        else:
            self.value = value_params
    return self
list() staticmethod

List all registered offline stores.

Displays a formatted table of all offline stores with their names, kinds, and configuration values. If no stores are found, displays an appropriate message.

RETURNS DESCRIPTION
None

Output is printed to the console.

Source code in src/seeknal/featurestore/featurestore.py
@staticmethod
def list():
    """List all registered offline stores.

    Displays a formatted table of all offline stores with their names,
    kinds, and configuration values. If no stores are found, displays
    an appropriate message.

    Returns:
        None: Output is printed to the console.
    """
    offline_stores = FeatureGroupRequest.get_offline_stores()
    if offline_stores:
        offline_stores = [
            {
                "name": offline_store.name,
                "kind": offline_store.kind,
                "value": offline_store.params,
            }
            for offline_store in offline_stores
        ]
        typer.echo(tabulate(offline_stores, headers="keys", tablefmt="github"))
    else:
        typer.echo("No offline stores found.")
delete(spark: Optional[SparkSession] = None, *args, **kwargs) -> bool

Delete storage for a feature group from the offline store.

For FILE type: Deletes the directory containing the delta table. For HIVE_TABLE type: Drops the Hive table using Spark SQL.

PARAMETER DESCRIPTION
spark

SparkSession instance (required for HIVE_TABLE, optional for FILE).

TYPE: Optional[SparkSession] DEFAULT: None

**kwargs

Must include 'project' and 'entity' to construct the table name.

DEFAULT: {}

RETURNS DESCRIPTION
bool

True if deletion was successful or resource didn't exist.

TYPE: bool

RAISES DESCRIPTION
ValueError

If required kwargs (project, entity) are missing.

Source code in src/seeknal/featurestore/featurestore.py
def delete(
    self,
    spark: Optional[SparkSession] = None,
    *args,
    **kwargs,
) -> bool:
    """Delete storage for a feature group from the offline store.

    For FILE type: Deletes the directory containing the delta table.
    For HIVE_TABLE type: Drops the Hive table using Spark SQL.

    Args:
        spark: SparkSession instance (required for HIVE_TABLE, optional for FILE).
        **kwargs: Must include 'project' and 'entity' to construct the table name.

    Returns:
        bool: True if deletion was successful or resource didn't exist.

    Raises:
        ValueError: If required kwargs (project, entity) are missing.
    """
    project = kwargs.get("project")
    entity = kwargs.get("entity")

    if project is None or entity is None:
        raise ValueError("Both 'project' and 'entity' are required for deletion")

    # Validate project and entity parameters
    validate_table_name(project)
    validate_table_name(entity)
    table_name = "fg_{}__{}".format(project, entity)

    match self.kind:
        case OfflineStoreEnum.FILE:
            if self.value is None:
                base_path = CONFIG_BASE_URL
                path = os.path.join(base_path, "data", table_name)
            else:
                if isinstance(self.value, FeatureStoreFileOutput):
                    base_path = self.value.path
                elif isinstance(self.value, dict):
                    base_path = self.value.get("path", CONFIG_BASE_URL)
                else:
                    base_path = self.value
                path = os.path.join(base_path, table_name)

            if os.path.exists(path):
                shutil.rmtree(path)
                logger.info(f"Deleted offline store files at: {path}")
            else:
                logger.info(f"Offline store path does not exist: {path}")
            return True

        case OfflineStoreEnum.HIVE_TABLE:
            if spark is None:
                spark = SparkSession.builder.getOrCreate()

            if self.value is None:
                database = "seeknal"
            elif isinstance(self.value, FeatureStoreHiveTableOutput):
                database = self.value.database
            elif isinstance(self.value, dict):
                database = self.value.get("database", "seeknal")
            else:
                database = "seeknal"

            # Validate database name before use in SQL
            validate_database_name(database)

            full_table_name = "{}.{}".format(database, table_name)
            if spark.catalog.tableExists(full_table_name):
                spark.sql(f"DROP TABLE IF EXISTS {full_table_name}")
                logger.info(f"Dropped Hive table: {full_table_name}")
            else:
                logger.info(f"Hive table does not exist: {full_table_name}")
            return True

        case OfflineStoreEnum.ICEBERG:
            # Iceberg table deletion
            if self.value is None:
                logger.warning("Iceberg configuration required for deletion")
                return False

            # Get configuration
            if isinstance(self.value, IcebergStoreOutput):
                iceberg_config = self.value
            elif isinstance(self.value, dict):
                iceberg_config = IcebergStoreOutput(**self.value)
            else:
                logger.warning(f"Invalid Iceberg configuration type: {type(self.value)}")
                return False

            # Import Iceberg operations
            from seeknal.workflow.materialization.operations import DuckDBIcebergExtension
            from seeknal.workflow.materialization.profile_loader import ProfileLoader
            from seeknal.workflow.materialization.config import MaterializationConfig, ConfigurationError

            # Load profile for catalog configuration
            profile_loader = ProfileLoader()
            try:
                profile_config = profile_loader.load_profile()
            except (ConfigurationError, Exception) as e:
                logger.warning(f"Could not load materialization profile: {e}. Using defaults.")
                profile_config = MaterializationConfig()

            # Get catalog configuration
            catalog_uri = profile_config.catalog.uri if profile_config.catalog.uri else ""
            warehouse_path = (
                iceberg_config.warehouse or
                profile_config.catalog.warehouse
            )
            bearer_token = profile_config.catalog.bearer_token

            # Validate required configuration
            if not catalog_uri or not warehouse_path:
                logger.warning("Catalog URI and warehouse path required for Iceberg deletion")
                return False

            # Validate table name
            validate_table_name(iceberg_config.table)
            validate_table_name(iceberg_config.namespace)

            # Create DuckDB connection and drop table
            import duckdb
            con = duckdb.connect(":memory:")

            try:
                # Load Iceberg extension
                DuckDBIcebergExtension.load_extension(con)

                # Setup REST catalog
                catalog_name = "seeknal_catalog"
                DuckDBIcebergExtension.create_rest_catalog(
                    con=con,
                    catalog_name=catalog_name,
                    uri=catalog_uri,
                    warehouse_path=warehouse_path,
                    bearer_token=bearer_token,
                )

                # Create table reference and drop
                table_ref = f"{catalog_name}.{iceberg_config.namespace}.{iceberg_config.table}"
                con.execute(f"DROP TABLE IF EXISTS {table_ref}")
                logger.info(f"Dropped Iceberg table: {table_ref}")
                return True

            except Exception as e:
                logger.error(f"Failed to drop Iceberg table: {e}")
                return False
            finally:
                con.close()

        case _:
            logger.warning(f"Unknown offline store kind: {self.kind}")
            return False
OnlineStore(value: Optional[Union[str, FeatureStoreFileOutput, FeatureStoreHiveTableOutput]] = None, kind: OnlineStoreEnum = OnlineStoreEnum.FILE, name: Optional[str] = None) dataclass

Configuration for online feature store.

ATTRIBUTE DESCRIPTION
value

The storage configuration (file path or hive table). A security warning will be logged if a file path is in an insecure location (e.g., /tmp).

TYPE: Optional[Union[str, FeatureStoreFileOutput, FeatureStoreHiveTableOutput]]

kind

Type of online store (FILE or HIVE_TABLE).

TYPE: OnlineStoreEnum

name

Optional name for the store.

TYPE: Optional[str]

Functions
delete(*args, **kwargs)

Delete feature data from the online store.

Removes the directory containing the feature data for the specified project and feature name.

PARAMETER DESCRIPTION
*args

Additional positional arguments (unused).

DEFAULT: ()

**kwargs

Keyword arguments including: - project (str): Project name for file naming. - name (str): Feature name for file naming.

DEFAULT: {}

RETURNS DESCRIPTION
bool

True if the deletion was successful or if the path did not exist.

Source code in src/seeknal/featurestore/featurestore.py
def delete(self, *args, **kwargs):
    """Delete feature data from the online store.

    Removes the directory containing the feature data for the specified
    project and feature name.

    Args:
        *args: Additional positional arguments (unused).
        **kwargs: Keyword arguments including:
            - project (str): Project name for file naming.
            - name (str): Feature name for file naming.

    Returns:
        bool: True if the deletion was successful or if the path
            did not exist.
    """
    match self.kind:
        case OnlineStoreEnum.FILE:
            name = kwargs.get("name")
            file_name_complete = "fs_{}__{}".format(kwargs["project"], name)

            if self.value == "null" or self.value is None:
                base_path = CONFIG_BASE_URL
                path = os.path.join(base_path, "data", file_name_complete)
            else:
                base_path = self.value["path"]
                path = os.path.join(base_path, file_name_complete)
            if os.path.exists(path):
                shutil.rmtree(path)
            return True
OfflineMaterialization

Bases: BaseModel

Configuration for offline materialization.

ATTRIBUTE DESCRIPTION
store

The offline store configuration.

TYPE: Optional[OfflineStore]

mode

Write mode ('overwrite', 'append', 'merge').

TYPE: str

ttl

Time-to-live in days for data retention.

TYPE: Optional[int]

OnlineMaterialization

Bases: BaseModel

Configuration for online materialization.

ATTRIBUTE DESCRIPTION
store

The online store configuration.

TYPE: Optional[OnlineStore]

ttl

Time-to-live in minutes for online store data.

TYPE: Optional[int]

Materialization

Bases: BaseModel

Materialization options

ATTRIBUTE DESCRIPTION
event_time_col

Specify which column that contains event time. Default to None.

TYPE: str

date_pattern

Date pattern that use in event_time_col. Default to "yyyy-MM-dd".

TYPE: str

offline

Set the feature group should be stored in offline-store. Default to True.

TYPE: bool

online

Set the feature group should be stored in online-store. Default to False.

TYPE: bool

serving_ttl_days

Look back window for features defined at the online-store. This parameters determines how long features will live in the online store. The unit is in days. Shorter TTLs improve performance and reduce computation. Default to 1. For example, if we set TTLs as 1 then only one day data available in online-store

TYPE: int

force_update_online

force to update the data in online-store. This will not consider to check whether the data going materialized newer than the data that already stored in online-store. Default to False.

TYPE: bool

online_write_mode

Write mode when materialize to online-store. Default to "Append"

TYPE: OnlineWriteModeEnum

schema_version

Determine which schema version for the feature group. Default to None.

TYPE: List[dict]

FeatureGroup(name: str, entity: Optional[Entity] = None, id: Optional[str] = None) dataclass

Bases: FeatureStore

A feature group representing a set of features created from a data source.

A FeatureGroup is a logical grouping of related features that share the same entity and are typically computed from the same data source (Flow or DataFrame). It supports both offline and online materialization for feature storage and serving.

ATTRIBUTE DESCRIPTION
name

The unique name of the feature group within the project.

TYPE: str

materialization

Configuration for how features are stored and served, including offline/online storage settings and TTL configurations.

TYPE: Materialization

source

The data source for computing features. Can be a Flow pipeline or a Spark DataFrame. If Flow, output will be set to SPARK_DATAFRAME.

TYPE: Optional[Union[Flow, DataFrame]]

description

Optional human-readable description of the feature group.

TYPE: Optional[str]

features

List of Feature objects to register. If None, all columns except join_keys and event_time will be registered as features.

TYPE: Optional[List[Feature]]

tag

Optional list of tags for categorizing and filtering feature groups.

TYPE: Optional[List[str]]

validation_config

Configuration for data validation including validators to run and validation mode (FAIL or WARN). When set, enables declarative validation that can be used with the validate() method.

TYPE: Optional[ValidationConfig]

feature_group_id

Unique identifier assigned by the system after creation.

TYPE: Optional[str]

offline_watermarks

List of timestamps indicating when offline data was materialized. Used for tracking data freshness.

TYPE: List[str]

online_watermarks

List of timestamps indicating when online data was materialized. Used for tracking data freshness.

TYPE: List[str]

version

Schema version number for the feature group.

TYPE: Optional[int]

created_at

Timestamp when the feature group was created.

TYPE: Optional[str]

updated_at

Timestamp when the feature group was last updated.

TYPE: Optional[str]

avro_schema

Avro schema definition for the feature data structure.

TYPE: Optional[dict]

Example

from seeknal.featurestore import FeatureGroup, Materialization from seeknal.entity import Entity

Create a feature group from a flow

fg = FeatureGroup( ... name="user_features", ... materialization=Materialization(event_time_col="event_date"), ... ) fg.entity = Entity(name="user", join_keys=["user_id"]) fg.set_flow(my_flow).set_features().get_or_create()

Note

A SparkContext must be active before creating a FeatureGroup instance. Initialize your Project and Workspace first if you encounter SparkContext errors.

Functions
set_flow(flow: Flow)

Set the data flow pipeline as the source for this feature group.

Configures a Flow as the data source for computing features. The flow's output will be automatically set to SPARK_DATAFRAME if not already configured. If the flow hasn't been persisted, it will be created.

PARAMETER DESCRIPTION
flow

The Flow pipeline to use for feature computation.

TYPE: Flow

RETURNS DESCRIPTION
FeatureGroup

The current instance for method chaining.

Example

fg = FeatureGroup(name="user_features") fg.set_flow(my_transformation_flow)

Source code in src/seeknal/featurestore/feature_group.py
def set_flow(self, flow: Flow):
    """Set the data flow pipeline as the source for this feature group.

    Configures a Flow as the data source for computing features. The flow's
    output will be automatically set to SPARK_DATAFRAME if not already
    configured. If the flow hasn't been persisted, it will be created.

    Args:
        flow: The Flow pipeline to use for feature computation.

    Returns:
        FeatureGroup: The current instance for method chaining.

    Example:
        >>> fg = FeatureGroup(name="user_features")
        >>> fg.set_flow(my_transformation_flow)
    """
    if flow.output is not None:
        if flow.output.kind != FlowOutputEnum.SPARK_DATAFRAME:
            flow.output = FlowOutput(kind=FlowOutputEnum.SPARK_DATAFRAME)
        if not "flow_id" in vars(flow):
            flow.get_or_create()
    self.source = flow
    return self
set_dataframe(dataframe: DataFrame)

Set a Spark DataFrame as the source for this feature group.

Configures a pre-computed Spark DataFrame as the data source for features. This is useful when features have already been computed or when using ad-hoc data that doesn't require a Flow pipeline.

PARAMETER DESCRIPTION
dataframe

The Spark DataFrame containing the feature data.

TYPE: DataFrame

RETURNS DESCRIPTION
FeatureGroup

The current instance for method chaining.

Example

fg = FeatureGroup(name="user_features") fg.set_dataframe(my_spark_df)

Source code in src/seeknal/featurestore/feature_group.py
def set_dataframe(self, dataframe: DataFrame):
    """Set a Spark DataFrame as the source for this feature group.

    Configures a pre-computed Spark DataFrame as the data source for
    features. This is useful when features have already been computed
    or when using ad-hoc data that doesn't require a Flow pipeline.

    Args:
        dataframe: The Spark DataFrame containing the feature data.

    Returns:
        FeatureGroup: The current instance for method chaining.

    Example:
        >>> fg = FeatureGroup(name="user_features")
        >>> fg.set_dataframe(my_spark_df)
    """
    self.source = dataframe
    return self
set_validation_config(config: ValidationConfig)

Set validation configuration for this feature group.

PARAMETER DESCRIPTION
config

Validation configuration containing validators to run and validation mode (FAIL or WARN).

TYPE: ValidationConfig

RETURNS DESCRIPTION
self

Returns the FeatureGroup instance for method chaining.

Example

from seeknal.feature_validation.models import ValidationConfig, ValidatorConfig

config = ValidationConfig( ... mode=ValidationMode.WARN, ... validators=[ ... ValidatorConfig(validator_type="null", columns=["user_id"]), ... ValidatorConfig(validator_type="range", columns=["age"], ... params={"min_val": 0, "max_val": 120}) ... ] ... ) feature_group.set_validation_config(config)

Source code in src/seeknal/featurestore/feature_group.py
def set_validation_config(self, config: ValidationConfig):
    """
    Set validation configuration for this feature group.

    Args:
        config (ValidationConfig): Validation configuration containing
            validators to run and validation mode (FAIL or WARN).

    Returns:
        self: Returns the FeatureGroup instance for method chaining.

    Example:
        >>> from seeknal.feature_validation.models import ValidationConfig, ValidatorConfig
        >>>
        >>> config = ValidationConfig(
        ...     mode=ValidationMode.WARN,
        ...     validators=[
        ...         ValidatorConfig(validator_type="null", columns=["user_id"]),
        ...         ValidatorConfig(validator_type="range", columns=["age"],
        ...                         params={"min_val": 0, "max_val": 120})
        ...     ]
        ... )
        >>> feature_group.set_validation_config(config)
    """
    self.validation_config = config
    return self
validate(validators: List[BaseValidator], mode: Union[str, ValidationMode] = ValidationMode.FAIL, reference_date: Optional[str] = None) -> ValidationSummary

Validate the feature group data using the provided validators.

This method runs a list of validators against the feature group's data and returns a summary of the validation results. The validation can be configured to either warn on failures (continue execution) or fail immediately on the first validation failure.

PARAMETER DESCRIPTION
validators

List of validators to run against the feature group data. Each validator should be an instance of a class that inherits from BaseValidator (e.g., NullValidator, RangeValidator, UniquenessValidator, FreshnessValidator, or CustomValidator).

TYPE: List[BaseValidator]

mode

Validation execution mode. - ValidationMode.FAIL or "fail": Raise exception on first failure. - ValidationMode.WARN or "warn": Log failures but continue execution. Defaults to ValidationMode.FAIL.

TYPE: Union[str, ValidationMode] DEFAULT: FAIL

reference_date

Reference date for running the source Flow. Only used when source is a Flow. Defaults to None.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
ValidationSummary

A summary containing all validation results, pass/fail status, and counts.

TYPE: ValidationSummary

RAISES DESCRIPTION
ValueError

If source is not set (use @require_set_source decorator).

ValidationException

If mode is FAIL and any validator fails.

Example

from seeknal.feature_validation.validators import NullValidator, RangeValidator from seeknal.feature_validation.models import ValidationMode

Create validators

validators = [ ... NullValidator(columns=["user_id", "email"]), ... RangeValidator(column="age", min_val=0, max_val=120) ... ]

Run validation in warn mode (continues on failures)

summary = feature_group.validate(validators, mode=ValidationMode.WARN) print(f"Passed: {summary.passed}, Failed: {summary.failed_count}")

Run validation in fail mode (stops on first failure)

try: ... summary = feature_group.validate(validators, mode="fail") ... except ValidationException as e: ... print(f"Validation failed: {e.message}")

Source code in src/seeknal/featurestore/feature_group.py
@require_set_source
def validate(
    self,
    validators: List[BaseValidator],
    mode: Union[str, ValidationMode] = ValidationMode.FAIL,
    reference_date: Optional[str] = None,
) -> ValidationSummary:
    """
    Validate the feature group data using the provided validators.

    This method runs a list of validators against the feature group's data
    and returns a summary of the validation results. The validation can be
    configured to either warn on failures (continue execution) or fail
    immediately on the first validation failure.

    Args:
        validators (List[BaseValidator]): List of validators to run against
            the feature group data. Each validator should be an instance of
            a class that inherits from BaseValidator (e.g., NullValidator,
            RangeValidator, UniquenessValidator, FreshnessValidator, or
            CustomValidator).
        mode (Union[str, ValidationMode], optional): Validation execution mode.
            - ValidationMode.FAIL or "fail": Raise exception on first failure.
            - ValidationMode.WARN or "warn": Log failures but continue execution.
            Defaults to ValidationMode.FAIL.
        reference_date (Optional[str], optional): Reference date for running
            the source Flow. Only used when source is a Flow. Defaults to None.

    Returns:
        ValidationSummary: A summary containing all validation results,
            pass/fail status, and counts.

    Raises:
        ValueError: If source is not set (use @require_set_source decorator).
        ValidationException: If mode is FAIL and any validator fails.

    Example:
        >>> from seeknal.feature_validation.validators import NullValidator, RangeValidator
        >>> from seeknal.feature_validation.models import ValidationMode
        >>>
        >>> # Create validators
        >>> validators = [
        ...     NullValidator(columns=["user_id", "email"]),
        ...     RangeValidator(column="age", min_val=0, max_val=120)
        ... ]
        >>>
        >>> # Run validation in warn mode (continues on failures)
        >>> summary = feature_group.validate(validators, mode=ValidationMode.WARN)
        >>> print(f"Passed: {summary.passed}, Failed: {summary.failed_count}")
        >>>
        >>> # Run validation in fail mode (stops on first failure)
        >>> try:
        ...     summary = feature_group.validate(validators, mode="fail")
        ... except ValidationException as e:
        ...     print(f"Validation failed: {e.message}")
    """
    # Convert string mode to ValidationMode enum if necessary
    if isinstance(mode, str):
        mode = ValidationMode(mode.lower())

    # Get the DataFrame from source
    if isinstance(self.source, Flow):
        df = self.source.run(date=reference_date)
    elif isinstance(self.source, DataFrame):
        df = self.source
    else:
        raise ValueError("Source must be a Flow or DataFrame")

    # Create validation runner and execute validators
    runner = ValidationRunner(
        validators=validators,
        mode=mode,
        feature_group_name=self.name,
    )

    # Run validation and return summary
    summary = runner.run(df)

    logger.info(
        f"Feature group '{self.name}' validation complete: "
        f"{summary.passed_count}/{summary.total_validators} validators passed"
    )

    return summary
set_features(features: Optional[List[Feature]] = None, reference_date: Optional[str] = None)

Set features to be used for this feature group. If features set as None, then it will use all columns except join_keys and event_time as features

PARAMETER DESCRIPTION
features

Specify features. If this None, then automatically get features from transformation result. In addition, user may tell the feature name and description, then the detail about datatype automatically fetch from transformation result. Defaults to None.

TYPE: Optional[List[Feature]] DEFAULT: None

reference_date

Specify date can be used as reference for get features from the transformation. Defaults to None.

TYPE: Optional[str] DEFAULT: None

validate_with_source

If set as true, it won't validate with transformation result. Defaults to True.

TYPE: bool

RAISES DESCRIPTION
ValueError

If specify features not found from the transformation result

RETURNS DESCRIPTION

Populate features of the feature group

Source code in src/seeknal/featurestore/feature_group.py
@require_set_source
@require_set_entity
def set_features(
    self,
    features: Optional[List[Feature]] = None,
    reference_date: Optional[str] = None,
):
    """
    Set features to be used for this feature group. If features set as None,
    then it will use all columns except join_keys and event_time as features

    Args:
        features (Optional[List[Feature]], optional): Specify features. If this None,
            then automatically get features from transformation result. In addition,
            user may tell the feature name and description, then the detail about datatype
            automatically fetch from transformation result. Defaults to None.
        reference_date (Optional[str], optional): Specify date can be used as reference for
            get features from the transformation. Defaults to None.
        validate_with_source (bool, optional): If set as true, it won't validate with
            transformation result. Defaults to True.

    Raises:
        ValueError: If specify features not found from the transformation result

    Returns:
        Populate features of the feature group
    """
    if self.source is None:
        raise ValueError("Source is not set")

    reserved_cols = []
    _features = None
    for z in self.entity.join_keys:
        reserved_cols.append(z)

    if self.materialization.event_time_col is not None:
        reserved_cols.append(self.materialization.event_time_col)
    # if features not known yet, then need to load the data first
    # for getting list of features
    if features is None:
        # logger.info("Using all columns except entity join_key and event_time columns.")
        if isinstance(self.source, Flow):
            res = self.source.run(date=reference_date).drop(*reserved_cols)
        elif isinstance(self.source, DataFrame):
            res = self.source.drop(*reserved_cols)
        else:
            raise ValueError("Source only accepts Flow or DataFrame.")
        _avro_schema = json.loads(
            self._jvm_gateway.za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils.toAvroSchema(
                res._jdf
            ).toString()
        )

        if _avro_schema is None:
            raise ValueError(
                f"Feature not found in the source. Please make sure features are available from source."
            )
        _features = self._parse_avro_schema(schema=_avro_schema)
    else:
        feature_names = [f.name for f in features]
        metadata = {}
        if self.features is None:
            for k in features:
                metadata[k.name] = {
                    "description": k.description,
                    **Feature(name=k.name).model_dump(exclude={"description"}),
                }
        else:
            for k in features:
                for f in self.features:
                    if k.name == f.name:
                        metadata[k.name] = f.dict()

        selections = feature_names + reserved_cols
        if isinstance(self.source, Flow):
            res = (
                self.source.run(date=reference_date)
                .select(*selections)
                .drop(*reserved_cols)
            )
        elif isinstance(self.source, DataFrame):
            res = self.source.select(*selections).drop(*reserved_cols)
        else:
            raise ValueError("Source only accepts Flow or DataFrame.")
        _avro_schema = json.loads(
            self._jvm_gateway.za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils.toAvroSchema(
                res._jdf
            ).toString()
        )
        if _avro_schema is None:
            raise ValueError(
                "Cannot parse schema from the source. Please make sure features are available from source."
            )
        _features = self._parse_avro_schema(schema=_avro_schema)
        for k in _features:
            k.description = metadata[k.name]["description"]
            k.feature_id = metadata[k.name]["feature_id"]
            k.created_at = metadata[k.name]["created_at"]
            k.updated_at = metadata[k.name]["updated_at"]

    self.features = _features
    self.avro_schema = _avro_schema

    return self
get_or_create(version=None)

The get_or_create function retrieves an existing feature group or creates a new one based on the provided parameters.

PARAMETER DESCRIPTION
version

The version parameter is an optional argument that specifies the version of the feature

DEFAULT: None

group to retrieve. If a version is provided, the code will load the feature group with that specific version. If no version is provided, the code will load the latest version of the feature group.

RETURNS DESCRIPTION

The method get_or_create returns the instance of the class self after performing some

operations and updating its attributes.

Source code in src/seeknal/featurestore/feature_group.py
@require_workspace
@require_project
def get_or_create(self, version=None):
    """
    The `get_or_create` function retrieves an existing feature group or creates a new one based on the
    provided parameters.

    Args:
      version: The `version` parameter is an optional argument that specifies the version of the feature
    group to retrieve. If a version is provided, the code will load the feature group with that specific
    version. If no version is provided, the code will load the latest version of the feature group.

    Returns:
      The method `get_or_create` returns the instance of the class `self` after performing some
    operations and updating its attributes.
    """

    materialization_params = self.materialization.model_dump(exclude_none=False)
    materialization_params["offline_materialization"]["store"] = (
        asdict(self.materialization.offline_materialization.store)
        if self.materialization.offline_materialization.store is not None
        else None
    )
    materialization_params["online_materialization"]["store"] = (
        asdict(self.materialization.online_materialization.store)
        if self.materialization.online_materialization.store is not None
        else None
    )

    body = {
        "name": self.name,
        "project_id": context.project_id,
        "description": ("" if self.description is None else self.description),
        "offline": self.materialization.offline,
        "online": self.materialization.online,
        "materialization_params": materialization_params,
    }

    feature_group = FeatureGroupRequest.select_by_name(self.name)
    if feature_group is None:
        if self.source is None:
            raise ValueError("source is not set")
        if self.entity is None:
            raise ValueError("Entity is not set")
        if self.features is None:
            raise ValueError("Features are not set")

        if isinstance(self.source, Flow):
            body["flow_id"] = self.source.flow_id
        else:
            body["flow_id"] = None

        req = FeatureGroupRequest(
            body={
                **body,
                "entity_id": self.entity.entity_id,
                "features": [f.to_dict() for f in self.features],
                "avro_schema": self.avro_schema,
            }
        )

        (
            self.feature_group_id,
            features,
            version_obj,
            offline_store_id,
            online_store_id,
        ) = req.save()
        self.version = version_obj.version
        for i in features:
            for j in self.features:
                if i["metadata"]["name"] == j.name:
                    j.feature_id = i["feature_id"]
                    j.created_at = pendulum.instance(i["created_at"]).format(
                        "YYYY-MM-DD HH:mm:ss"
                    )
                    j.updated_at = pendulum.instance(i["updated_at"]).format(
                        "YYYY-MM-DD HH:mm:ss"
                    )
                    break
        self.offline_store_id = offline_store_id
        self.online_store_id = online_store_id

    else:
        logger.warning("Feature group already exists. Loading the feature group.")
        if version is None:
            versions = FeatureGroupRequest.select_version_by_feature_group_id(
                feature_group.id
            )
            self.version = versions[0].version
            version_id = versions[0].id
            self.avro_schema = json.loads(versions[0].avro_schema)
        else:
            version_obj = (
                FeatureGroupRequest.select_by_feature_group_id_and_version(
                    feature_group.id, version
                )
            )
            if version_obj is None:
                raise ValueError(f"Version {version} not found.")
            self.version = version
            version_id = version_obj.id
            self.avro_schema = json.loads(version_obj.avro_schema)

        offline_watermarks = FeatureGroupRequest.select_watermarks_by_version_id(
        feature_group.id, version_id
        )
        if offline_watermarks is not None:
            self.offline_watermarks = list(
                map(
                    lambda x: pendulum.instance(x.date).format(
                        "YYYY-MM-DD HH:mm:SS"
                    ),
                    offline_watermarks,
                )
            )
        if feature_group.online_watermark is not None:
            self.online_watermarks = feature_group.online_watermark.split(",")

        if feature_group.flow_id is not None:
            flow = FlowRequest.select_by_id(feature_group.flow_id)
            self.source = Flow(name=flow.name).get_or_create()
        else:
            self.source = None
        entity = EntityRequest.select_by_id(feature_group.entity_id)
        self.entity = Entity(name=entity.name).get_or_create()
        self.feature_group_id = feature_group.id
        self.id = feature_group.id

        features = FeatureRequest.select_by_feature_group_id_and_version(
            feature_group.id, self.version
        )
        self.features = []
        for i in features:
            self.features.append(
                Feature(
                    name=i.name,
                    feature_id=str(i.id),
                    description=i.description,
                    data_type=i.datatype,
                    online_data_type=i.online_datatype,
                    created_at=pendulum.instance(i.created_at).format(
                        "YYYY-MM-DD HH:mm:ss"
                    ),
                    updated_at=pendulum.instance(i.updated_at).format(
                        "YYYY-MM-DD HH:mm:ss"
                    ),
                )
            )

        # handle materialization
        for key, value in json.loads(feature_group.materialization_params).items():
            if key == "offline_materialization":
                for k, v in value.items():
                    setattr(self.materialization.offline_materialization, k, v)
            elif key == "online_materialization":
                for k, v in value.items():
                    if k == "store":
                        if v is not None:
                            self.materialization.online_materialization.store = (
                                OnlineStore(
                                    kind=OnlineStoreEnum(v["kind"]), value=v["value"]
                                )
                            )
                        else:
                            self.materialization.online_materialization.store = None
                    else:
                        setattr(self.materialization.online_materialization, k, v)
            else:
                setattr(self.materialization, key, value)
        self.offline_store_id = feature_group.offline_store
        self.online_store_id = feature_group.online_store

    # handling load offline store object
    _offline_store = FeatureGroupRequest.get_offline_store_by_id(
        self.offline_store_id
    )
    self.materialization.offline_materialization.store = OfflineStore(
        kind=OfflineStoreEnum(_offline_store.kind), name=_offline_store.name
    )
    if _offline_store.params == "null":
        self.materialization.offline_materialization.store.value = None
    else:
        value_params = json.loads(_offline_store.params)
        if _offline_store.kind == "file":
            self.materialization.offline_materialization.store.value = (
                FeatureStoreFileOutput(
                    path=value_params["path"],
                    kind=FileKindEnum(value_params["kind"]),
                )
            )
        elif _offline_store.kind == "hive_table":
            self.materialization.offline_materialization.store.value = (
                FeatureStoreHiveTableOutput(database=value_params["database"])
            )
        else:
            self.materialization.offline_materialization.store.value = value_params

    return self
update_materialization(offline: Optional[bool] = None, online: Optional[bool] = None, offline_materialization: Optional[OfflineMaterialization] = None, online_materialization: Optional[OnlineMaterialization] = None)

Update the materialization settings for this feature group.

Modifies the materialization configuration and persists the changes to the feature store backend. This allows changing storage settings, TTL values, and enabling/disabling offline or online storage.

PARAMETER DESCRIPTION
offline

Enable or disable offline storage. If None, keeps current setting.

TYPE: Optional[bool] DEFAULT: None

online

Enable or disable online storage. If None, keeps current setting.

TYPE: Optional[bool] DEFAULT: None

offline_materialization

New offline materialization configuration. If None, keeps current setting.

TYPE: Optional[OfflineMaterialization] DEFAULT: None

online_materialization

New online materialization configuration. If None, keeps current setting.

TYPE: Optional[OnlineMaterialization] DEFAULT: None

RETURNS DESCRIPTION
FeatureGroup

The current instance for method chaining.

Example

fg.update_materialization( ... online=True, ... online_materialization=OnlineMaterialization(ttl=2880) ... )

Source code in src/seeknal/featurestore/feature_group.py
def update_materialization(
    self,
    offline: Optional[bool] = None,
    online: Optional[bool] = None,
    offline_materialization: Optional[OfflineMaterialization] = None,
    online_materialization: Optional[OnlineMaterialization] = None,
):
    """Update the materialization settings for this feature group.

    Modifies the materialization configuration and persists the changes
    to the feature store backend. This allows changing storage settings,
    TTL values, and enabling/disabling offline or online storage.

    Args:
        offline: Enable or disable offline storage. If None, keeps current setting.
        online: Enable or disable online storage. If None, keeps current setting.
        offline_materialization: New offline materialization configuration.
            If None, keeps current setting.
        online_materialization: New online materialization configuration.
            If None, keeps current setting.

    Returns:
        FeatureGroup: The current instance for method chaining.

    Example:
        >>> fg.update_materialization(
        ...     online=True,
        ...     online_materialization=OnlineMaterialization(ttl=2880)
        ... )
    """
    if offline is not None:
        self.materialization.offline = offline
    if online is not None:
        self.materialization.online = online
    if offline_materialization is not None:
        self.materialization.offline_materialization = offline_materialization
    if online_materialization is not None:
        self.materialization.online_materialization = online_materialization

    materialization_params = self.materialization.dict(exclude_none=False)
    materialization_params["offline_materialization"]["store"] = (
        asdict(self.materialization.offline_materialization.store)
        if self.materialization.offline_materialization.store is not None
        else None
    )
    materialization_params["online_materialization"]["store"] = (
        asdict(self.materialization.online_materialization.store)
        if self.materialization.online_materialization.store is not None
        else None
    )

    body = {
        "offline": self.materialization.offline,
        "online": self.materialization.online,
        "materialization_params": materialization_params,
        "feature_group_id": self.feature_group_id,
    }

    req = FeatureGroupRequest(
        body={
            **body,
        }
    )

    req.update_materialization()

    return self
list_versions()

List all versions of this feature group.

Returns a list of dictionaries containing version metadata including: - version: The version number - avro_schema: The Avro schema for this version (as dict) - created_at: When the version was created - updated_at: When the version was last updated - feature_count: Number of features in this version

RETURNS DESCRIPTION

List[dict]: A list of version metadata dictionaries, ordered by version number descending (latest first). Returns an empty list if the feature group has not been saved or has no versions.

Example

fg = FeatureGroup(name="user_features").get_or_create() versions = fg.list_versions() for v in versions: ... print(f"Version {v['version']}: {v['feature_count']} features")

Source code in src/seeknal/featurestore/feature_group.py
@require_workspace
@require_project
def list_versions(self):
    """
    List all versions of this feature group.

    Returns a list of dictionaries containing version metadata including:
    - version: The version number
    - avro_schema: The Avro schema for this version (as dict)
    - created_at: When the version was created
    - updated_at: When the version was last updated
    - feature_count: Number of features in this version

    Returns:
        List[dict]: A list of version metadata dictionaries, ordered by version
            number descending (latest first). Returns an empty list if the
            feature group has not been saved or has no versions.

    Example:
        >>> fg = FeatureGroup(name="user_features").get_or_create()
        >>> versions = fg.list_versions()
        >>> for v in versions:
        ...     print(f"Version {v['version']}: {v['feature_count']} features")
    """
    if not hasattr(self, 'feature_group_id') or self.feature_group_id is None:
        # Feature group not saved yet, return empty list
        return []

    versions = FeatureGroupRequest.select_version_by_feature_group_id(
        self.feature_group_id
    )

    if versions is None:
        return []

    result = []
    for v in versions:
        # Parse avro_schema from JSON string
        try:
            avro_schema = json.loads(v.avro_schema) if v.avro_schema else None
        except (json.JSONDecodeError, TypeError):
            avro_schema = None

        # Get feature count for this version
        features = FeatureRequest.select_by_feature_group_id_and_version(
            self.feature_group_id, v.version
        )
        feature_count = len(features) if features else 0

        result.append({
            "version": v.version,
            "avro_schema": avro_schema,
            "created_at": pendulum.instance(v.created_at).format("YYYY-MM-DD HH:mm:ss") if v.created_at else None,
            "updated_at": pendulum.instance(v.updated_at).format("YYYY-MM-DD HH:mm:ss") if v.updated_at else None,
            "feature_count": feature_count,
        })

    return result
get_version(version: int) -> Optional[dict]

Get metadata for a specific version of this feature group.

PARAMETER DESCRIPTION
version

The version number to retrieve.

TYPE: int

RETURNS DESCRIPTION
Optional[dict]

Optional[dict]: A dictionary containing version metadata if found, None if the version doesn't exist. The dictionary includes: - version: The version number - avro_schema: The Avro schema for this version (as dict) - created_at: When the version was created - updated_at: When the version was last updated - feature_count: Number of features in this version

Example

fg = FeatureGroup(name="user_features").get_or_create() v1 = fg.get_version(1) if v1: ... print(f"Version 1 has {v1['feature_count']} features")

Source code in src/seeknal/featurestore/feature_group.py
@require_workspace
@require_project
def get_version(self, version: int) -> Optional[dict]:
    """
    Get metadata for a specific version of this feature group.

    Args:
        version (int): The version number to retrieve.

    Returns:
        Optional[dict]: A dictionary containing version metadata if found,
            None if the version doesn't exist. The dictionary includes:
            - version: The version number
            - avro_schema: The Avro schema for this version (as dict)
            - created_at: When the version was created
            - updated_at: When the version was last updated
            - feature_count: Number of features in this version

    Example:
        >>> fg = FeatureGroup(name="user_features").get_or_create()
        >>> v1 = fg.get_version(1)
        >>> if v1:
        ...     print(f"Version 1 has {v1['feature_count']} features")
    """
    if not hasattr(self, 'feature_group_id') or self.feature_group_id is None:
        # Feature group not saved yet, return None
        return None

    version_obj = FeatureGroupRequest.select_by_feature_group_id_and_version(
        self.feature_group_id, version
    )

    if version_obj is None:
        return None

    # Parse avro_schema from JSON string
    try:
        avro_schema = json.loads(version_obj.avro_schema) if version_obj.avro_schema else None
    except (json.JSONDecodeError, TypeError):
        avro_schema = None

    # Get feature count for this version
    features = FeatureRequest.select_by_feature_group_id_and_version(
        self.feature_group_id, version
    )
    feature_count = len(features) if features else 0

    return {
        "version": version_obj.version,
        "avro_schema": avro_schema,
        "created_at": pendulum.instance(version_obj.created_at).format("YYYY-MM-DD HH:mm:ss") if version_obj.created_at else None,
        "updated_at": pendulum.instance(version_obj.updated_at).format("YYYY-MM-DD HH:mm:ss") if version_obj.updated_at else None,
        "feature_count": feature_count,
    }
compare_versions(from_version: int, to_version: int) -> Optional[dict]

Compare schemas between two versions of this feature group.

Identifies added, removed, and modified features between the two versions by comparing their Avro schemas.

PARAMETER DESCRIPTION
from_version

The base version number to compare from.

TYPE: int

to_version

The target version number to compare to.

TYPE: int

RETURNS DESCRIPTION
Optional[dict]

Optional[dict]: A dictionary containing the comparison result if both versions exist, None if either version doesn't exist or the feature group has not been saved. The dictionary includes: - from_version: The base version number - to_version: The target version number - added: List of field names added in to_version - removed: List of field names removed in to_version - modified: List of dicts with field name and type changes

RAISES DESCRIPTION
ValueError

If from_version equals to_version.

Example

fg = FeatureGroup(name="user_features").get_or_create() diff = fg.compare_versions(1, 2) if diff: ... print(f"Added features: {diff['added']}") ... print(f"Removed features: {diff['removed']}") ... print(f"Modified features: {diff['modified']}")

Source code in src/seeknal/featurestore/feature_group.py
@require_workspace
@require_project
def compare_versions(self, from_version: int, to_version: int) -> Optional[dict]:
    """
    Compare schemas between two versions of this feature group.

    Identifies added, removed, and modified features between the two versions
    by comparing their Avro schemas.

    Args:
        from_version (int): The base version number to compare from.
        to_version (int): The target version number to compare to.

    Returns:
        Optional[dict]: A dictionary containing the comparison result if both
            versions exist, None if either version doesn't exist or the feature
            group has not been saved. The dictionary includes:
            - from_version: The base version number
            - to_version: The target version number
            - added: List of field names added in to_version
            - removed: List of field names removed in to_version
            - modified: List of dicts with field name and type changes

    Raises:
        ValueError: If from_version equals to_version.

    Example:
        >>> fg = FeatureGroup(name="user_features").get_or_create()
        >>> diff = fg.compare_versions(1, 2)
        >>> if diff:
        ...     print(f"Added features: {diff['added']}")
        ...     print(f"Removed features: {diff['removed']}")
        ...     print(f"Modified features: {diff['modified']}")
    """
    if from_version == to_version:
        raise ValueError("from_version and to_version must be different")

    if not hasattr(self, 'feature_group_id') or self.feature_group_id is None:
        # Feature group not saved yet, return None
        return None

    # Fetch both versions
    from_version_obj = FeatureGroupRequest.select_by_feature_group_id_and_version(
        self.feature_group_id, from_version
    )
    to_version_obj = FeatureGroupRequest.select_by_feature_group_id_and_version(
        self.feature_group_id, to_version
    )

    # Check if both versions exist
    if from_version_obj is None:
        raise ValueError(f"Version {from_version} not found for this feature group")
    if to_version_obj is None:
        raise ValueError(f"Version {to_version} not found for this feature group")

    # Get avro_schema JSON strings
    from_schema_json = from_version_obj.avro_schema if from_version_obj.avro_schema else "{}"
    to_schema_json = to_version_obj.avro_schema if to_version_obj.avro_schema else "{}"

    # Compare schemas using FeatureGroupRequest.compare_schemas()
    schema_diff = FeatureGroupRequest.compare_schemas(from_schema_json, to_schema_json)

    return {
        "from_version": from_version,
        "to_version": to_version,
        "added": schema_diff.get("added", []),
        "removed": schema_diff.get("removed", []),
        "modified": schema_diff.get("modified", []),
    }
delete()

Delete this feature group and its associated data.

Removes the feature group from the feature store backend along with any data stored in the offline store. This operation is irreversible.

RETURNS DESCRIPTION
FeatureGroupRequest

The request object used to perform the deletion.

RAISES DESCRIPTION
ValueError

If the feature group has not been saved (no feature_group_id).

Note

This requires an active workspace and project context. The feature group must have been previously saved using get_or_create().

Source code in src/seeknal/featurestore/feature_group.py
@require_workspace
@require_saved
@require_project
def delete(self):
    """Delete this feature group and its associated data.

    Removes the feature group from the feature store backend along with
    any data stored in the offline store. This operation is irreversible.

    Returns:
        FeatureGroupRequest: The request object used to perform the deletion.

    Raises:
        ValueError: If the feature group has not been saved (no feature_group_id).

    Note:
        This requires an active workspace and project context. The feature
        group must have been previously saved using get_or_create().
    """
    offline_store = self.materialization.offline_materialization.store
    offline_store.delete(
        name=self.name, project=context.project_id, entity=self.entity.entity_id
    )

    return FeatureGroupRequest.delete_by_id(self.feature_group_id)
write(feature_start_time: Optional[datetime] = None, feature_end_time: Optional[datetime] = None, output_date_pattern: str = 'yyyyMMdd')

Writes the feature group data to the offline store, using the specified feature start and end times and output date pattern.

PARAMETER DESCRIPTION
feature_start_time

The start time for the feature data. If None, the current date is used.

TYPE: Optional[datetime] DEFAULT: None

feature_end_time

The end time for the feature data. If None, all available data is used.

TYPE: Optional[datetime] DEFAULT: None

output_date_pattern

The output date pattern for the feature data.

TYPE: str DEFAULT: 'yyyyMMdd'

RETURNS DESCRIPTION

None

Source code in src/seeknal/featurestore/feature_group.py
@require_workspace
@require_saved
@require_project
def write(
    self,
    feature_start_time: Optional[datetime] = None,
    feature_end_time: Optional[datetime] = None,
    output_date_pattern: str = "yyyyMMdd",
):
    """
    Writes the feature group data to the offline store, using the specified
    feature start and end times and output date pattern.

    Args:
        feature_start_time (Optional[datetime]): The start time for the feature data.
            If None, the current date is used.
        feature_end_time (Optional[datetime]): The end time for the feature data.
            If None, all available data is used.
        output_date_pattern (str): The output date pattern for the feature data.

    Returns:
        None
    """

    date_now = pendulum.now(tz="UTC").format("YYYY-MM-DD")
    if isinstance(self.source, Flow):
        fg_flow = deepcopy(self.source)
        if self.materialization.event_time_col is None:
            add_date_column = SparkEngineTask().add_stage(
                class_name="tech.mta.seeknal.transformers.AddColumnByExpr",
                params={"expression": f"'{date_now}'", "outputCol": "__date__"},
            )
            if fg_flow.tasks is not None:
                fg_flow.tasks.append(add_date_column)
            else:
                fg_flow.tasks = [add_date_column]
    elif isinstance(self.source, DataFrame):
        if self.materialization.event_time_col is None:
            fg_flow = self.source.withColumn("__date__", F.lit(date_now))
        else:
            fg_flow = self.source
    else:
        raise ValueError("Source only accepts Flow or DataFrame.")

    date_pattern = "yyyy-MM-dd"
    if (
        self.materialization.event_time_col is not None
        and self.materialization.date_pattern is not None
    ):
        date_pattern = self.materialization.date_pattern

    spark = SparkSession.builder.getOrCreate()
    if self.materialization.event_time_col is not None:
        event_time_col = self.materialization.event_time_col
        if isinstance(fg_flow, Flow):
            fg_flow = fg_flow.set_input_date_col(
                date_col=event_time_col, date_pattern=date_pattern
            )
    else:
        event_time_col = "__date__"
    if isinstance(fg_flow, Flow):
        flow_res = fg_flow.run(
            start_date=feature_start_time, end_date=feature_end_time
        )
    elif isinstance(fg_flow, DataFrame):
        if feature_start_time is not None:
            flow_res = fg_flow.filter(
                F.col(event_time_col)
                >= pendulum.instance(feature_start_time).format(
                    date_pattern.upper()
                )
            )
        elif feature_end_time is not None:
            flow_res = fg_flow.filter(
                F.col(event_time_col)
                <= pendulum.instance(feature_end_time).format(date_pattern.upper())
            )
        elif feature_start_time is not None and feature_end_time is not None:
            flow_res = fg_flow.filter(
                (
                    F.col(event_time_col)
                    >= pendulum.instance(feature_start_time).format(
                        date_pattern.upper()
                    )
                )
                & (
                    F.col(event_time_col)
                    <= pendulum.instance(feature_end_time).format(
                        date_pattern.upper()
                    )
                )
            )
        else:
            flow_res = fg_flow
    flow_res = quinn.snake_case_col_names(flow_res)
    # generate pk given entity join_keys
    flow_res = mack.with_md5_cols(
        flow_res, self.entity.join_keys + [event_time_col], "__pk__"
    )

    # getting the date from the flow result
    if self.materialization.event_time_col is not None:
        _date = (
            SparkEngineTask()
            .add_input(dataframe=flow_res)
            .set_date_col(date_col=event_time_col)
            .get_date_available()
        )
        _date = [
            pendulum.parse(i).format(output_date_pattern.upper()) for i in _date
        ]
        date_available = [
            datetime.fromisoformat(pendulum.parse(i).to_datetime_string())
            for i in _date
        ]
    else:
        date_available = [
            datetime.fromisoformat(pendulum.parse(date_now).to_datetime_string())
        ]

    if self.materialization.offline:
        logger.info("writing to offline-store")

        arr_size = len(self.entity.join_keys) + 1
        arr = spark.sparkContext._gateway.new_array(
            self._jvm_gateway.java.lang.String, arr_size
        )
        for idx, i in enumerate(self.entity.join_keys):
            arr[idx] = i
        arr[arr_size - 1] = "__pk__"

        project_name = ProjectRequest.select_by_id(context.project_id).name
        fs_serialize = (
            self._jvm_gateway.tech.mta.seeknal.connector.serde.FeatureStoreSerDe()
            .setEventTimeCol(event_time_col)
            .setDatePattern(date_pattern)
            .setEntity(self.entity.name)
            .setProject(project_name)
            .setFeatureGroup(self.name)
            .setKeyCols(arr)
            .setSerialize(True)
        )

        res = fs_serialize.transform(flow_res._jdf)
        res_df = DataFrame(res, self._jvm_gateway._wrapped)

        # add watermarks
        version_obj = FeatureGroupRequest.select_by_feature_group_id_and_version(
            self.feature_group_id, self.version
        )
        if version_obj is None:
            raise ValueError(f"Version {self.version} not found.")
        req = FeatureGroupRequest(
            body={
                "feature_group_id": self.feature_group_id,
                "feature_group_version_id": version_obj.id,
            }
        )

        req.add_offline_watermarks(date_available)
        offline_watermarks = [
            w.date
            for w in FeatureGroupRequest.select_watermarks_by_version_id(
                self.feature_group_id, version_obj.id
            )
        ]

        # writing to offline-store
        offline_store = self.materialization.offline_materialization.store
        _start_date = (
            date_now
            if feature_start_time is None
            else pendulum.instance(feature_start_time).format(date_pattern.upper())
        )
        _end_date = (
            "none"
            if feature_end_time is None
            else pendulum.instance(feature_end_time).format(date_pattern.upper())
        )
        offline_mat_result = offline_store(
            result=res_df,
            name=self.name,
            project=context.project_id,
            entity=self.entity.entity_id,
            mode=self.materialization.offline_materialization.mode,
            start_date=_start_date,
            end_date=_end_date,
            version=self.version,
            latest_watermark=max(offline_watermarks),
            ttl=self.materialization.offline_materialization.ttl,
        )

        # Capture Iceberg snapshot metadata
        if offline_mat_result and offline_mat_result.get("storage_type") == "iceberg":
            req.add_iceberg_watermark(
                snapshot_id=offline_mat_result.get("snapshot_id"),
                table=offline_mat_result.get("table"),
                namespace=offline_mat_result.get("namespace"),
                row_count=offline_mat_result.get("num_rows"),
            )
            logger.info(
                f"Iceberg snapshot created: {offline_mat_result.get('snapshot_id')[:8]} "
                f"for table {offline_mat_result.get('namespace')}.{offline_mat_result.get('table')}"
            )

    if self.materialization.online:
        logger.info("Writing to online-store.")
        if self.materialization.offline:
            hist = HistoricalFeatures(lookups=[FeatureLookup(source=self)])
            hist.using_latest().serve(
                target=self.materialization.online_materialization.store,
                ttl=timedelta(days=self.materialization.online_materialization.ttl),
            )
        else:
            flow_res = flow_res.withColumn(
                "event_time", F.to_timestamp(F.col(event_time_col), date_pattern)
            ).drop(event_time_col)
            if self.materialization.online_materialization.ttl is not None:
                _timedelta = timedelta(
                    minutes=self.materialization.online_materialization.ttl
                )
            else:
                _timedelta = None
            OnlineFeatures(
                lookup_key=self.entity,
                lookups=[FeatureLookup(source=self)],
                ttl=_timedelta,
                online_store=self.materialization.online_materialization.store,
                dataframe=flow_res,
            )
FeatureLookup dataclass

A class that represents a feature lookup operation in a feature store.

ATTRIBUTE DESCRIPTION
source

The feature store to perform the lookup on.

TYPE: FeatureStore

features

A list of feature names to include in the lookup. If None, all features in the store will be included.

TYPE: Optional[List[str]]

exclude_features

A list of feature names to exclude from the lookup. If None, no features will be excluded.

TYPE: Optional[List[str]]

HistoricalFeatures dataclass

A class for retrieving historical features from a feature store.

ATTRIBUTE DESCRIPTION
lookups

A list of FeatureLookup objects representing the features to retrieve.

TYPE: List[FeatureLookup]

Functions
using_spine(spine: pd.DataFrame, date_col: Optional[str] = None, offset: int = 0, length: Optional[int] = None, keep_cols: Optional[List[str]] = None)

Adds a spine DataFrame to the feature store serving pipeline.

PARAMETER DESCRIPTION
spine

The spine DataFrame to add to the pipeline.

TYPE: DataFrame

date_col

The name of the column containing the date to use for point-in-time joins. If not provided, point-in-time joins will not be performed.

TYPE: str DEFAULT: None

offset

number of days to use as a reference point for join. E.g. offset=3, how='past' means that features dates equal (and older than) to three days before application date will be joined. Defaults to 0.

TYPE: int DEFAULT: 0

length

when how is not equal to 'point in time' limit the period of feature dates to join. Defaults to no limit.

TYPE: int DEFAULT: None

keep_cols

A list of column names to keep from the spine DataFrame. If not provided, none columns will be kept.

TYPE: List[str] DEFAULT: None

Source code in src/seeknal/featurestore/feature_group.py
def using_spine(
    self,
    spine: pd.DataFrame,
    date_col: Optional[str] = None,
    offset: int = 0,
    length: Optional[int] = None,
    keep_cols: Optional[List[str]] = None,
):
    """
    Adds a spine DataFrame to the feature store serving pipeline.

    Args:
        spine (pd.DataFrame): The spine DataFrame to add to the pipeline.
        date_col (str, optional): The name of the column containing the date to use for point-in-time joins.
            If not provided, point-in-time joins will not be performed.
        offset (int, optional): number of days to use as a reference point for join.
            E.g. offset=3, how='past' means that features dates equal (and older than) to three days before application date will be joined. Defaults to 0.
        length (int, optional): when how is not equal to 'point in time' limit the period of feature dates to join. Defaults to no limit.
        keep_cols (List[str], optional): A list of column names to keep from the spine DataFrame.
            If not provided, none columns will be kept.

    """
    spine_columns = list(spine.keys())
    for i in self.lookup_key.join_keys:
        if i not in spine_columns:
            raise ValueError("Spine DataFrame must contain all join keys")
    spine_df = self.spark.createDataFrame(spine)

    # Validate join_keys before using them in SQL expressions
    for join_key in self.lookup_key.join_keys:
        validate_column_name(join_key)

    if date_col is not None:
        point_in_time = PointInTime(
            spine=spine_df,
            offset=offset,
            length=length,
            feature_date_format="yyyy-MM-dd HH:mm:SS",
            app_date=date_col,
            app_date_format="yyyy-MM-dd",
            col_id=self.lookup_key.join_keys[0],
            spine_col_id=self.lookup_key.join_keys[0],
            keep_cols=keep_cols,
        )
        self.flow.add_stage(transformer=point_in_time)
    else:
        selector = ["a.*"]
        if keep_cols is not None:
            selector += keep_cols
        tables = [
            TableJoinDef(
                table=spine_df,
                joinType=JoinType.INNER,
                alias="b",
                joinExpression="a.{} = b.{}".format(
                    self.lookup_key.join_keys[0], self.lookup_key.join_keys[0]
                ),
            )
        ]
        join = JoinTablesByExpr(tables=tables, select_stm=",".join(selector))
        self.flow.add_stage(transformer=join)
    return self
to_dataframe(feature_start_time: Optional[datetime] = None, feature_end_time: Optional[datetime] = None) -> DataFrame

Returns a pandas DataFrame containing the transformed feature data within the specified time range.

PARAMETER DESCRIPTION
feature_start_time

The start time of the time range to filter the feature data.

TYPE: Optional[datetime] DEFAULT: None

feature_end_time

The end time of the time range to filter the feature data.

TYPE: Optional[datetime] DEFAULT: None

Source code in src/seeknal/featurestore/feature_group.py
def to_dataframe(
    self,
    feature_start_time: Optional[datetime] = None,
    feature_end_time: Optional[datetime] = None,
) -> DataFrame:
    """
    Returns a pandas DataFrame containing the transformed feature data within the specified time range.

    Args:
        feature_start_time (Optional[datetime]): The start time of the time range to filter the feature data.
        feature_end_time (Optional[datetime]): The end time of the time range to filter the feature data.
    """
    df = self.flow.transform(spark=self.spark)
    return self._filter_by_start_end_time(df, feature_start_time, feature_end_time)

Functions

validate_database_name(database_name: str) -> str

Validate a SQL database name.

Database names must follow SQL identifier rules: - Start with a letter or underscore - Contain only alphanumeric characters and underscores - Be no longer than 128 characters

PARAMETER DESCRIPTION
database_name

The database name to validate.

TYPE: str

RETURNS DESCRIPTION
str

The validated database name (unchanged if valid).

RAISES DESCRIPTION
InvalidIdentifierError

If the database name is invalid.

Source code in src/seeknal/validation.py
def validate_database_name(database_name: str) -> str:
    """
    Validate a SQL database name.

    Database names must follow SQL identifier rules:
    - Start with a letter or underscore
    - Contain only alphanumeric characters and underscores
    - Be no longer than 128 characters

    Args:
        database_name: The database name to validate.

    Returns:
        The validated database name (unchanged if valid).

    Raises:
        InvalidIdentifierError: If the database name is invalid.
    """
    return validate_sql_identifier(database_name, identifier_type="database name")
validate_table_name(table_name: str) -> str

Validate a SQL table name.

Table names must follow SQL identifier rules: - Start with a letter or underscore - Contain only alphanumeric characters and underscores - Be no longer than 128 characters

PARAMETER DESCRIPTION
table_name

The table name to validate.

TYPE: str

RETURNS DESCRIPTION
str

The validated table name (unchanged if valid).

RAISES DESCRIPTION
InvalidIdentifierError

If the table name is invalid.

Source code in src/seeknal/validation.py
def validate_table_name(table_name: str) -> str:
    """
    Validate a SQL table name.

    Table names must follow SQL identifier rules:
    - Start with a letter or underscore
    - Contain only alphanumeric characters and underscores
    - Be no longer than 128 characters

    Args:
        table_name: The table name to validate.

    Returns:
        The validated table name (unchanged if valid).

    Raises:
        InvalidIdentifierError: If the table name is invalid.
    """
    return validate_sql_identifier(table_name, identifier_type="table name")
warn_if_insecure_path(path: str, context: Optional[str] = None, logger: Optional[logging.Logger] = None) -> Tuple[bool, Optional[str]]

Log a warning if the path is in an insecure location.

This function checks if the provided path is in a world-writable or otherwise insecure location, and logs a warning with a secure alternative recommendation.

PARAMETER DESCRIPTION
path

The filesystem path to check.

TYPE: str

context

Optional context string describing what the path is used for (e.g., "offline store", "feature store").

TYPE: Optional[str] DEFAULT: None

logger

Optional logger instance. If not provided, uses the module logger.

TYPE: Optional[Logger] DEFAULT: None

RETURNS DESCRIPTION
bool

A tuple of (is_insecure, secure_alternative) where:

Optional[str]
  • is_insecure: True if the path is insecure
Tuple[bool, Optional[str]]
  • secure_alternative: A recommended secure path if insecure, None otherwise

Examples:

>>> is_insecure, alt = warn_if_insecure_path("/tmp/data", "offline store")
>>> is_insecure
True
>>> alt  # Will be something like '/home/user/.seeknal/data'
Source code in src/seeknal/utils/path_security.py
def warn_if_insecure_path(
    path: str,
    context: Optional[str] = None,
    logger: Optional[logging.Logger] = None,
) -> Tuple[bool, Optional[str]]:
    """
    Log a warning if the path is in an insecure location.

    This function checks if the provided path is in a world-writable or
    otherwise insecure location, and logs a warning with a secure alternative
    recommendation.

    Args:
        path: The filesystem path to check.
        context: Optional context string describing what the path is used for
            (e.g., "offline store", "feature store").
        logger: Optional logger instance. If not provided, uses the module logger.

    Returns:
        A tuple of (is_insecure, secure_alternative) where:
        - is_insecure: True if the path is insecure
        - secure_alternative: A recommended secure path if insecure, None otherwise

    Examples:
        >>> is_insecure, alt = warn_if_insecure_path("/tmp/data", "offline store")
        >>> is_insecure
        True
        >>> alt  # Will be something like '/home/user/.seeknal/data'
    """
    if not path:
        return False, None

    if logger is None:
        logger = logging.getLogger("seeknal")

    if is_insecure_path(path):
        secure_alternative = get_secure_path_recommendation(path)
        context_str = f" for {context}" if context else ""

        warning_message = (
            f"Security Warning: Using insecure path '{path}'{context_str}. "
            f"The /tmp directory and other world-writable locations are shared "
            f"among all system users and may expose sensitive data. "
            f"Consider using a secure alternative like '{secure_alternative}' "
            f"or set the SEEKNAL_BASE_CONFIG_PATH environment variable."
        )
        logger.warning(warning_message)

        return True, secure_alternative

    return False, None

featurestore

Classes

OfflineStoreEnum

Bases: str, Enum

Enumeration of supported offline storage types.

ATTRIBUTE DESCRIPTION
HIVE_TABLE

Store features as a Hive table in a database.

FILE

Store features as files on the filesystem (e.g., Delta format).

ICEBERG

Store features in Apache Iceberg tables with ACID transactions, time travel, and cloud storage compatibility.

OnlineStoreEnum

Bases: str, Enum

Enumeration of supported online storage types.

ATTRIBUTE DESCRIPTION
HIVE_TABLE

Store features as a Hive table for online serving.

FILE

Store features as Parquet files for online serving.

FileKindEnum

Bases: str, Enum

Enumeration of supported file formats for feature storage.

ATTRIBUTE DESCRIPTION
DELTA

Delta Lake format, providing ACID transactions and versioning.

FeatureStoreFileOutput(path: str, kind: FileKindEnum = FileKindEnum.DELTA) dataclass

Configuration for file-based feature store output.

ATTRIBUTE DESCRIPTION
path

The filesystem path for storing feature data. A security warning will be logged if this path is in an insecure location (e.g., /tmp).

TYPE: str

kind

The file format to use (default: DELTA).

TYPE: FileKindEnum

Functions
to_dict()

Convert the configuration to a dictionary representation.

RETURNS DESCRIPTION
dict

Dictionary with 'path' and 'kind' keys.

Source code in src/seeknal/featurestore/featurestore.py
def to_dict(self):
    """Convert the configuration to a dictionary representation.

    Returns:
        dict: Dictionary with 'path' and 'kind' keys.
    """
    return {"path": self.path, "kind": self.kind.value}
FeatureStoreHiveTableOutput(database: str) dataclass

Configuration for Hive table-based feature store output.

ATTRIBUTE DESCRIPTION
database

The Hive database name where features will be stored.

TYPE: str

Functions
to_dict()

Convert the configuration to a dictionary representation.

RETURNS DESCRIPTION
dict

Dictionary with 'database' key.

Source code in src/seeknal/featurestore/featurestore.py
def to_dict(self):
    """Convert the configuration to a dictionary representation.

    Returns:
        dict: Dictionary with 'database' key.
    """
    return {"database": self.database}
IcebergStoreOutput(table: str, catalog: str = 'lakekeeper', namespace: str = 'default', warehouse: Optional[str] = None, mode: str = 'append') dataclass

Iceberg storage configuration for feature group materialization.

This configuration enables storing features in Apache Iceberg tables with ACID transactions, time travel, and cloud storage compatibility.

PARAMETER DESCRIPTION
table

Table name within namespace

TYPE: str

catalog

Catalog name from profiles.yml (default: "lakekeeper")

TYPE: str DEFAULT: 'lakekeeper'

namespace

Iceberg namespace/database (default: "default")

TYPE: str DEFAULT: 'default'

warehouse

Optional warehouse path override (s3://, gs://, azure://)

TYPE: Optional[str] DEFAULT: None

mode

Write mode - "append" or "overwrite" (default: "append")

TYPE: str DEFAULT: 'append'

Functions
to_dict()

Convert the configuration to a dictionary representation.

RETURNS DESCRIPTION
dict

Dictionary with all Iceberg configuration keys.

Source code in src/seeknal/featurestore/featurestore.py
def to_dict(self):
    """Convert the configuration to a dictionary representation.

    Returns:
        dict: Dictionary with all Iceberg configuration keys.
    """
    return {
        "catalog": self.catalog,
        "warehouse": self.warehouse,
        "namespace": self.namespace,
        "table": self.table,
        "mode": self.mode,
    }
OfflineStore(value: Optional[Union[str, FeatureStoreFileOutput, FeatureStoreHiveTableOutput, IcebergStoreOutput]] = None, kind: OfflineStoreEnum = OfflineStoreEnum.HIVE_TABLE, name: Optional[str] = None) dataclass

Configuration for offline feature store storage.

ATTRIBUTE DESCRIPTION
value

The storage configuration (path for FILE, database for HIVE_TABLE). A security warning will be logged if a file path is in an insecure location (e.g., /tmp).

TYPE: Optional[Union[str, FeatureStoreFileOutput, FeatureStoreHiveTableOutput, IcebergStoreOutput]]

kind

The storage type (FILE or HIVE_TABLE).

TYPE: OfflineStoreEnum

name

Optional name for this offline store configuration.

TYPE: Optional[str]

Functions
get_or_create()

Retrieve an existing offline store or create a new one.

If an offline store with the specified name exists, it is retrieved and its configuration is loaded. Otherwise, a new offline store is created with the current configuration.

RETURNS DESCRIPTION
OfflineStore

The current instance with id populated.

Note

The store name defaults to "default" if not specified.

Source code in src/seeknal/featurestore/featurestore.py
def get_or_create(self):
    """Retrieve an existing offline store or create a new one.

    If an offline store with the specified name exists, it is retrieved and
    its configuration is loaded. Otherwise, a new offline store is created
    with the current configuration.

    Returns:
        OfflineStore: The current instance with id populated.

    Note:
        The store name defaults to "default" if not specified.
    """
    if self.name is None:
        name = "default"
    else:
        name = self.name
    offline_store = FeatureGroupRequest.get_offline_store_by_name(self.kind, name)
    if offline_store is None:
        offline_store = FeatureGroupRequest.save_offline_store(
            self.kind, self.value, name
        )
        self.id = offline_store.id
    else:
        self.id = offline_store.id
        self.kind = offline_store.kind
        value_params = json.loads(offline_store.params)
        if self.kind == OfflineStoreEnum.HIVE_TABLE:
            if self.value is not None:
                self.value = FeatureStoreHiveTableOutput(**value_params)
        elif self.kind == OfflineStoreEnum.FILE:
            if self.value is not None:
                self.value = FeatureStoreFileOutput(
                    path=value_params["path"],
                    kind=FileKindEnum(value_params["kind"]),
                )
        elif self.kind == OfflineStoreEnum.ICEBERG:
            if self.value is not None:
                self.value = IcebergStoreOutput(**value_params)
        else:
            self.value = value_params
    return self
list() staticmethod

List all registered offline stores.

Displays a formatted table of all offline stores with their names, kinds, and configuration values. If no stores are found, displays an appropriate message.

RETURNS DESCRIPTION
None

Output is printed to the console.

Source code in src/seeknal/featurestore/featurestore.py
@staticmethod
def list():
    """List all registered offline stores.

    Displays a formatted table of all offline stores with their names,
    kinds, and configuration values. If no stores are found, displays
    an appropriate message.

    Returns:
        None: Output is printed to the console.
    """
    offline_stores = FeatureGroupRequest.get_offline_stores()
    if offline_stores:
        offline_stores = [
            {
                "name": offline_store.name,
                "kind": offline_store.kind,
                "value": offline_store.params,
            }
            for offline_store in offline_stores
        ]
        typer.echo(tabulate(offline_stores, headers="keys", tablefmt="github"))
    else:
        typer.echo("No offline stores found.")
delete(spark: Optional[SparkSession] = None, *args, **kwargs) -> bool

Delete storage for a feature group from the offline store.

For FILE type: Deletes the directory containing the delta table. For HIVE_TABLE type: Drops the Hive table using Spark SQL.

PARAMETER DESCRIPTION
spark

SparkSession instance (required for HIVE_TABLE, optional for FILE).

TYPE: Optional[SparkSession] DEFAULT: None

**kwargs

Must include 'project' and 'entity' to construct the table name.

DEFAULT: {}

RETURNS DESCRIPTION
bool

True if deletion was successful or resource didn't exist.

TYPE: bool

RAISES DESCRIPTION
ValueError

If required kwargs (project, entity) are missing.

Source code in src/seeknal/featurestore/featurestore.py
def delete(
    self,
    spark: Optional[SparkSession] = None,
    *args,
    **kwargs,
) -> bool:
    """Delete storage for a feature group from the offline store.

    For FILE type: Deletes the directory containing the delta table.
    For HIVE_TABLE type: Drops the Hive table using Spark SQL.

    Args:
        spark: SparkSession instance (required for HIVE_TABLE, optional for FILE).
        **kwargs: Must include 'project' and 'entity' to construct the table name.

    Returns:
        bool: True if deletion was successful or resource didn't exist.

    Raises:
        ValueError: If required kwargs (project, entity) are missing.
    """
    project = kwargs.get("project")
    entity = kwargs.get("entity")

    if project is None or entity is None:
        raise ValueError("Both 'project' and 'entity' are required for deletion")

    # Validate project and entity parameters
    validate_table_name(project)
    validate_table_name(entity)
    table_name = "fg_{}__{}".format(project, entity)

    match self.kind:
        case OfflineStoreEnum.FILE:
            if self.value is None:
                base_path = CONFIG_BASE_URL
                path = os.path.join(base_path, "data", table_name)
            else:
                if isinstance(self.value, FeatureStoreFileOutput):
                    base_path = self.value.path
                elif isinstance(self.value, dict):
                    base_path = self.value.get("path", CONFIG_BASE_URL)
                else:
                    base_path = self.value
                path = os.path.join(base_path, table_name)

            if os.path.exists(path):
                shutil.rmtree(path)
                logger.info(f"Deleted offline store files at: {path}")
            else:
                logger.info(f"Offline store path does not exist: {path}")
            return True

        case OfflineStoreEnum.HIVE_TABLE:
            if spark is None:
                spark = SparkSession.builder.getOrCreate()

            if self.value is None:
                database = "seeknal"
            elif isinstance(self.value, FeatureStoreHiveTableOutput):
                database = self.value.database
            elif isinstance(self.value, dict):
                database = self.value.get("database", "seeknal")
            else:
                database = "seeknal"

            # Validate database name before use in SQL
            validate_database_name(database)

            full_table_name = "{}.{}".format(database, table_name)
            if spark.catalog.tableExists(full_table_name):
                spark.sql(f"DROP TABLE IF EXISTS {full_table_name}")
                logger.info(f"Dropped Hive table: {full_table_name}")
            else:
                logger.info(f"Hive table does not exist: {full_table_name}")
            return True

        case OfflineStoreEnum.ICEBERG:
            # Iceberg table deletion
            if self.value is None:
                logger.warning("Iceberg configuration required for deletion")
                return False

            # Get configuration
            if isinstance(self.value, IcebergStoreOutput):
                iceberg_config = self.value
            elif isinstance(self.value, dict):
                iceberg_config = IcebergStoreOutput(**self.value)
            else:
                logger.warning(f"Invalid Iceberg configuration type: {type(self.value)}")
                return False

            # Import Iceberg operations
            from seeknal.workflow.materialization.operations import DuckDBIcebergExtension
            from seeknal.workflow.materialization.profile_loader import ProfileLoader
            from seeknal.workflow.materialization.config import MaterializationConfig, ConfigurationError

            # Load profile for catalog configuration
            profile_loader = ProfileLoader()
            try:
                profile_config = profile_loader.load_profile()
            except (ConfigurationError, Exception) as e:
                logger.warning(f"Could not load materialization profile: {e}. Using defaults.")
                profile_config = MaterializationConfig()

            # Get catalog configuration
            catalog_uri = profile_config.catalog.uri if profile_config.catalog.uri else ""
            warehouse_path = (
                iceberg_config.warehouse or
                profile_config.catalog.warehouse
            )
            bearer_token = profile_config.catalog.bearer_token

            # Validate required configuration
            if not catalog_uri or not warehouse_path:
                logger.warning("Catalog URI and warehouse path required for Iceberg deletion")
                return False

            # Validate table name
            validate_table_name(iceberg_config.table)
            validate_table_name(iceberg_config.namespace)

            # Create DuckDB connection and drop table
            import duckdb
            con = duckdb.connect(":memory:")

            try:
                # Load Iceberg extension
                DuckDBIcebergExtension.load_extension(con)

                # Setup REST catalog
                catalog_name = "seeknal_catalog"
                DuckDBIcebergExtension.create_rest_catalog(
                    con=con,
                    catalog_name=catalog_name,
                    uri=catalog_uri,
                    warehouse_path=warehouse_path,
                    bearer_token=bearer_token,
                )

                # Create table reference and drop
                table_ref = f"{catalog_name}.{iceberg_config.namespace}.{iceberg_config.table}"
                con.execute(f"DROP TABLE IF EXISTS {table_ref}")
                logger.info(f"Dropped Iceberg table: {table_ref}")
                return True

            except Exception as e:
                logger.error(f"Failed to drop Iceberg table: {e}")
                return False
            finally:
                con.close()

        case _:
            logger.warning(f"Unknown offline store kind: {self.kind}")
            return False
OnlineStore(value: Optional[Union[str, FeatureStoreFileOutput, FeatureStoreHiveTableOutput]] = None, kind: OnlineStoreEnum = OnlineStoreEnum.FILE, name: Optional[str] = None) dataclass

Configuration for online feature store.

ATTRIBUTE DESCRIPTION
value

The storage configuration (file path or hive table). A security warning will be logged if a file path is in an insecure location (e.g., /tmp).

TYPE: Optional[Union[str, FeatureStoreFileOutput, FeatureStoreHiveTableOutput]]

kind

Type of online store (FILE or HIVE_TABLE).

TYPE: OnlineStoreEnum

name

Optional name for the store.

TYPE: Optional[str]

Functions
delete(*args, **kwargs)

Delete feature data from the online store.

Removes the directory containing the feature data for the specified project and feature name.

PARAMETER DESCRIPTION
*args

Additional positional arguments (unused).

DEFAULT: ()

**kwargs

Keyword arguments including: - project (str): Project name for file naming. - name (str): Feature name for file naming.

DEFAULT: {}

RETURNS DESCRIPTION
bool

True if the deletion was successful or if the path did not exist.

Source code in src/seeknal/featurestore/featurestore.py
def delete(self, *args, **kwargs):
    """Delete feature data from the online store.

    Removes the directory containing the feature data for the specified
    project and feature name.

    Args:
        *args: Additional positional arguments (unused).
        **kwargs: Keyword arguments including:
            - project (str): Project name for file naming.
            - name (str): Feature name for file naming.

    Returns:
        bool: True if the deletion was successful or if the path
            did not exist.
    """
    match self.kind:
        case OnlineStoreEnum.FILE:
            name = kwargs.get("name")
            file_name_complete = "fs_{}__{}".format(kwargs["project"], name)

            if self.value == "null" or self.value is None:
                base_path = CONFIG_BASE_URL
                path = os.path.join(base_path, "data", file_name_complete)
            else:
                base_path = self.value["path"]
                path = os.path.join(base_path, file_name_complete)
            if os.path.exists(path):
                shutil.rmtree(path)
            return True
FillNull

Bases: BaseModel

Configuration for filling null values in columns.

ATTRIBUTE DESCRIPTION
value

Value to use for filling nulls.

TYPE: str

dataType

Data type for the value (e.g., 'double', 'string').

TYPE: str

columns

Optional list of columns to fill. If None, applies to all columns.

TYPE: Optional[List[str]]

Functions
to_dict()

Convert the configuration to a dictionary representation.

RETURNS DESCRIPTION
dict

Dictionary containing non-None values from the model.

Source code in src/seeknal/featurestore/featurestore.py
def to_dict(self):
    """Convert the configuration to a dictionary representation.

    Returns:
        dict: Dictionary containing non-None values from the model.
    """
    return {k: v for k, v in self.model_dump().items() if v is not None}
OfflineMaterialization

Bases: BaseModel

Configuration for offline materialization.

ATTRIBUTE DESCRIPTION
store

The offline store configuration.

TYPE: Optional[OfflineStore]

mode

Write mode ('overwrite', 'append', 'merge').

TYPE: str

ttl

Time-to-live in days for data retention.

TYPE: Optional[int]

OnlineMaterialization

Bases: BaseModel

Configuration for online materialization.

ATTRIBUTE DESCRIPTION
store

The online store configuration.

TYPE: Optional[OnlineStore]

ttl

Time-to-live in minutes for online store data.

TYPE: Optional[int]

Feature

Bases: BaseModel

Define a Feature.

ATTRIBUTE DESCRIPTION
name

Feature name.

TYPE: str

feature_id

Feature ID (assigned by seeknal).

TYPE: Optional[str]

description

Feature description.

TYPE: Optional[str]

data_type

Data type for the feature.

TYPE: Optional[str]

online_data_type

Data type when stored in online-store.

TYPE: Optional[str]

created_at

Creation timestamp.

TYPE: Optional[str]

updated_at

Last update timestamp.

TYPE: Optional[str]

Functions
to_dict()

Convert the feature definition to a dictionary representation.

Creates a dictionary suitable for API requests with metadata and data type information.

RETURNS DESCRIPTION
dict

Dictionary with structure: - metadata: dict containing 'name' and optionally 'description' - datatype: The feature's data type - onlineDatatype: The feature's online store data type

Source code in src/seeknal/featurestore/featurestore.py
def to_dict(self):
    """Convert the feature definition to a dictionary representation.

    Creates a dictionary suitable for API requests with metadata and
    data type information.

    Returns:
        dict: Dictionary with structure:
            - metadata: dict containing 'name' and optionally 'description'
            - datatype: The feature's data type
            - onlineDatatype: The feature's online store data type
    """
    _dict = {
        "metadata": {"name": self.name},
        "datatype": self.data_type,
        "onlineDatatype": self.online_data_type,
    }
    if self.description is not None:
        _dict["metadata"]["description"] = self.description
    return _dict
FeatureStore(name: str, entity: Optional[Entity] = None, id: Optional[str] = None) dataclass

Bases: ABC

Abstract base class for feature stores.

ATTRIBUTE DESCRIPTION
name

Feature store name.

TYPE: str

entity

Associated entity.

TYPE: Optional[Entity]

id

Feature store ID.

TYPE: Optional[str]

Functions
get_or_create() abstractmethod

Retrieve an existing feature store or create a new one.

This abstract method must be implemented by subclasses to handle the creation or retrieval of feature store instances.

RETURNS DESCRIPTION
FeatureStore

The feature store instance.

Source code in src/seeknal/featurestore/featurestore.py
@abstractmethod
def get_or_create(self):
    """Retrieve an existing feature store or create a new one.

    This abstract method must be implemented by subclasses to handle
    the creation or retrieval of feature store instances.

    Returns:
        FeatureStore: The feature store instance.
    """
    pass

Functions