Catena is a distributed catalogue coordination system that retrieves offerings from DLT-Booth, distributes data across catalogue nodes using consistent hashing, and provides federated SPARQL querying capabilities. Catena is a core component of the decentralised marketplace infrastructure that orchestrates the discovery, retrieval, validation, storage, and serving of offerings provided by participants. It interacts with the DLT Booth to fetch offerings from the underlying Distributed Ledger Technology (DLT) and manages a catalogue of offerings that can operate in either a centralised or decentralised mode. The Coordinator also functions as a server responding to consumer search queries, effectively acting as a mediator between participants and the global or distributed catalogue.
Key responsibilities of the Catena include:
- Polling the associated DLT for updated offerings.
- Fetching offering metadata and full descriptions.
- Validating and storing offerings.
- Maintaining a local or distributed catalogue.
- Serving consumer queries.
- Monitoring node health and redistributing data as needed.
The coordinator consists of several key components organized in a clean, modular structure:
- Main Coordinator (
main.py): Orchestrates all background workers and the API server - API Layer (
api/): Flask-based REST API with SPARQL federation - DLT Communication (
utils/dlt_comm/): Handles communication with DLT-Booth API - Monitoring (
utils/monitoring/): Node health monitoring and failover handling - Workers (
utils/workers/): Background workers for offerings processing - Hashring (
utils/hashring/): Consistent hashing for data distribution - Redis (
utils/redis/): Redis client with graceful fallback to in-memory storage
├── api
│ ├── __init__.py
│ └── offerings_retrieval.py
├── config.py
├── docker
│ ├── docker-compose.yml
│ ├── Dockerfile
│ └── start.sh
├── examples
│ ├── catalogue_list.json
│ └── example.env
├── LICENSE
├── main.py
├── README.md
├── requirements.txt
└── utils
├── __init__.py
├── dlt_comm
│ ├── get_nodes.py
│ ├── __init__.py
│ └── offering_processor.py
├── hash_ring
│ ├── consistent_hash.py
│ └── __init__.py
├── node_monitor
│ ├── health_checker.py
│ └── __init__.py
└── workers
├── data_processor.py
├── __init__.py
└── worker_pool.py
- Offerings Retrieval: Coordinator fetches addresses from
DLT_BASE_URL/offeringsat configurable intervals - Data Distribution: Each offering is fetched and distributed to catalogue nodes using consistent hashing
- Redundancy: Data is replicated across multiple nodes (configurable via
REDUNDANCY_REPLICAS) - Node Monitoring: Continuous health checks detect node failures and trigger data redistribution
- Federated Queries: SPARQL queries are distributed across all active catalogue nodes
| Variable | Description | Default / Example |
|---|---|---|
SUBPROCESS_HEALTH_CHECK_INTERVAL |
Interval (seconds) for checking subprocess health. | 5 |
WORKER_POOL_SIZE |
Number of worker threads/processes in the pool. | 10 |
OPERATOR_PROVIDED |
Coordinator mode toggle (0 = disabled, 1 = enabled). | 0 |
| Variable | Description | Default / Example |
|---|---|---|
HOST_ADDRESS |
Address for the Flask API to bind to. | 0.0.0.0 |
HOST_PORT |
Port for the Flask API to listen on. | 3030 |
| Variable | Description | Default / Example |
|---|---|---|
GC_URL |
Base URL for the Global Catalogue | http://global-catalogue |
GC_PORT |
Port for the Global Catalogue | 3030 |
| Variable | Description | Default / Example |
|---|---|---|
DLT_BASE_URL |
Base URL for DLT Booth API. | http://dlt-booth:8085/api |
DLT_RUST_LOG |
Log level for DLT (debug, info, error). |
debug |
DLT_RUST_BACKTRACE |
Enable backtrace on errors (0 = off, 1 = on). |
1 |
DLT_HOST_ADDRESS |
DLT Booth HTTP server bind address. | 0.0.0.0 |
DLT_HOST_PORT |
DLT Booth HTTP server port. | 8085 |
DLT_NODE_URL |
DLT node endpoint URL. | https://example.com/node |
DLT_FAUCET_API_ENDPOINT |
Faucet API endpoint. | https://example.com/faucet/ |
DLT_RPC_PROVIDER |
RPC provider endpoint. | https://example.com/rpc |
DLT_CHAIN_ID |
Chain ID for the DLT network. | 1000 |
DLT_ISSUER_URL |
Issuer service endpoint. | https://example.com/issuer |
| Variable | Description | Default / Example |
|---|---|---|
REDIS_HOST |
Host address of Redis instance. | catalogue-coordinator-redis |
REDIS_PORT |
Redis port. | 6379 |
REDIS_DB |
Redis database index. | 0 |
| Variable | Description | Default / Example |
|---|---|---|
OFFERING_DESC_TIMEOUT |
Timeout (seconds) for fetching offering description. | 60 |
OFFERING_FETCH_INTERVAL |
Interval (seconds) between offering fetch cycles. | 60 |
OFFERING_REPLICA_COUNT |
Number of replicas per offering. | 2 |
| Variable | Description | Default / Example |
|---|---|---|
NODE_HEALTH_CHECK_INTERVAL |
Interval (seconds) for node health checks. | 30 |
NODE_GRACE_PERIOD |
Grace period (seconds) before marking node unhealthy. | 60 |
NODE_TIMEOUT |
Timeout (seconds) for node response. | 10 |
| Variable | Description | Default / Example |
|---|---|---|
HASH_RING_VIRTUAL_NODES |
Number of virtual nodes in the consistent hash ring. | 150 |
| Variable | Description | Default / Example |
|---|---|---|
DLT_KEY_STORAGE_STRONGHOLD_SNAPSHOT_PATH |
Path to key storage snapshot file. | ./key_storage.stronghold |
DLT_KEY_STORAGE_STRONGHOLD_PASSWORD |
Password for encrypting the key storage snapshot. | some_hopefully_secure_password |
DLT_KEY_STORAGE_MNEMONIC |
Mnemonic used to generate the key storage. | your mnemonic here |
| Variable | Description | Default / Example |
|---|---|---|
DLT_WALLET_STRONGHOLD_SNAPSHOT_PATH |
Path to wallet storage snapshot file. | ./wallet.stronghold |
DLT_WALLET_STRONGHOLD_PASSWORD |
Password for encrypting the wallet snapshot. | some_hopefully_secure_password |
| Variable | Description | Default / Example |
|---|---|---|
DLT_BOOTH_DB_USER |
Username for DLT Booth database connection. | postgres |
DLT_BOOTH_DB_PASSWORD |
Password for DLT Booth database connection. | dlt_booth |
Create an .env file (use ./env/example.env as a reference):
Alternatively, you can create a custom .env file with the help of the enviroment variable descriptions
-
Start with Docker (includes Redis):
cd docker bash start.sh -
Stop services:
cd docker docker-compose down -
View logs:
cd docker docker-compose logs -f
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
python3 main.py- GET
/health- Service health status
- POST
/offerings- Retrieve offerings by ID (JSON body:{"offerings_id": "..."})
- POST
/sparql- Execute federated SPARQL queries across all catalogue nodes
- Retry Logic: Uses tenacity for robust retry mechanisms with exponential backoff
- Caching: Implements TTL-based caching for API responses
- Failover: Automatic data redistribution when nodes go down
- Consistent Hashing: Uses consistent hashing for stable data distribution
- Graceful Degradation: Falls back to in-memory storage if Redis is unavailable
- Configurable Redundancy: Supports multiple data replicas for high availability
- Docker Support: Containerized deployment with Redis as separate service
- Health Monitoring: Built-in health checks for Docker orchestration
The coordinator has two modes, set using the variable OPERATOR_PROVIDED and CENTRALISED
OPERATOR_PROVIDED: 0: Enables automatic node retrieval mode where catalogue nodes are inferred and retrieved from the DLT offeringsOPERATOR_PROVIDED: 1: Defaults to using known catalogues and refers to catalogue nodes from./catalogue_list.json(example file under./examplesdirectory)CENTRALISED: 0: Enables decentralised mode for offerings to be spread across multiple nodesCENTRALISED: 1: Uses Global Catalogue to store all offering descriptions
-
Federated Query Support: Add support for all subqueries, not just
SELECT -
Add
/profilecall step: Add an additional call step to provider to fetch catalogue endpoints -
Clarify federated SPARQL query: Clarify if consumers directly call GC
-
Add tests: Add test cases for all submodules
This software is licensed under the European Union Public License 1.2 (EUPL-1.2).
This updated README reflects the new project structure, command-line options, storage choices, and includes information about the utility classes and debugging features.
This software has been developed by the University of Surrey under the SEDIMARK (SEcure Decentralised Intelligent Data MARKetplace) project. SEDIMARK is funded by the European Union under the Horizon Europe framework programme [grant no. 101070074]. This project is also partly funded by UK Research and Innovation (UKRI) under the UK government’s Horizon Europe funding guarantee [grant no. 10043699].
