State Backends Guide¶
Overview¶
State backends provide pluggable storage for execution state, supporting both file-based and database storage with atomic transactions and locking.
Available Backends¶
File Backend (Default)¶
- Path:
target/run_state.json - Use case: Single-node deployments, development
- Pros: Simple, no dependencies
- Cons: Not suitable for distributed execution
Database Backend¶
- Path:
target/state.db(SQLite/Turso) - Use case: Production, distributed execution
- Pros: Concurrent access, transactional integrity
- Cons: Requires database setup
Configuration¶
Set backend in seeknal_project.yml:
Or via environment variable:
File Backend¶
Default storage using JSON files:
{
"run_id": "run_20240110_120000",
"started_at": "2024-01-10T12:00:00Z",
"nodes": {
"my_node": {
"run_id": "run_20240110_120000",
"node_id": "my_node",
"status": "success",
"duration_ms": 1500,
"row_count": 1000,
"fingerprint": "abc123..."
}
}
}
Database Backend¶
SQLite or Turso-based storage with tables:
runs: Run metadatanode_states: Node execution statuscompleted_intervals: Interval trackingpartitions: Partition tracking
Using SQLite¶
Using Turso (Remote)¶
Migration¶
Migrate File to Database¶
# Preview migration
seeknal migrate-state --backend database
# Execute migration
seeknal migrate-state --backend database --no-dry-run
This preserves: - ✅ All run history - ✅ Node execution states - ✅ Completed intervals - ✅ Fingerprints - ✅ Row counts
Rollback¶
Backup created automatically at run_state.json.bak.
To rollback:
API Usage¶
Using the Backend Protocol¶
from seeknal.state.backend import create_state_backend
# Create file backend
file_backend = create_state_backend("file", base_path="target")
file_backend.initialize()
# Create database backend
db_backend = create_state_backend("database", db_path="target/state.db")
db_backend.initialize()
# Use unified interface
backend.create_run("run_1", status="running", started_at=datetime.now())
backend.set_node_state("run_1", "node1", status="success")
backend.add_completed_interval("run_1", "node1", start, end)
Transactions¶
# Atomic state updates
with backend.transaction() as tx:
backend.set_node_state("run_1", "node1", status="success")
backend.set_node_state("run_1", "node2", status="success")
# Both succeed or both fail
Locking¶
# Acquire lock for concurrent execution
acquired = backend.acquire_lock("run_1", timeout_ms=5000)
if acquired:
try:
# Execute with lock held
backend.set_node_state("run_1", "node1", status="running")
finally:
# Lock released automatically
pass
Best Practices¶
1. Use Database for Production¶
2. Use File for Development¶
3. Backup Before Migration¶
# Always backup first
cp target/run_state.json target/run_state.json.pre-migration.bak
# Then migrate
seeknal migrate-state --backend database
4. Test Migration in Dry Run¶
# Preview first
seeknal migrate-state --backend database
# Check output, then execute
seeknal migrate-state --backend database --no-dry-run
Troubleshooting¶
Migration Failed¶
# Check backup
ls -la target/*.bak
# Restore backup
cp target/run_state.json.bak target/run_state.json
# Investigate error
seeknal migrate-state --backend database --dry-run
Database Locked¶
# SQLite: Check for locks
sqlite3 target/state.db "PRAGMA lock_status"
# Close other processes using the database
Performance Issues¶
For large state files:
- Migrate to database - Better concurrent access
- Clean old runs -
seeknal state cleanup --days 30 - Use partitions - Enable partition tracking
Advanced¶
Custom Backend¶
Implement the StateBackend protocol:
from seeknal.state.backend import StateBackend
class CustomBackend(StateBackend):
def initialize(self):
# Setup
pass
def create_run(self, run_id, **kwargs):
# Create run
pass
# Implement all abstract methods...
Register and use:
from seeknal.state.backend import StateBackendFactory
StateBackendFactory.register("custom", CustomBackend)
backend = create_state_backend("custom", option="value")
Reference¶
Backend Protocol Methods¶
initialize()- Setup backend storagecreate_run()- Create new run recordupdate_run()- Update run statusget_run()- Retrieve run infoset_node_state()- Update node execution stateget_node_state()- Retrieve node stateadd_completed_interval()- Track processed intervaltransaction()- Begin transaction contextacquire_lock()- Acquire optimistic lock