diff --git a/sphe/candle_generator/PLUGIN_SUMMARY.md b/sphe/candle_generator/PLUGIN_SUMMARY.md new file mode 100644 index 0000000..c457b27 --- /dev/null +++ b/sphe/candle_generator/PLUGIN_SUMMARY.md @@ -0,0 +1,208 @@ +# Candle Generator Plugin - Complete Overview + +## Plugin Purpose + +This InfluxDB 3 plugin is specifically designed to generate OHLCV (Open, High, Low, Close, Volume) candles from `InfluxProchainDataPoint` data structures. It automatically processes **all supported timeframes** (30s, 60s, 120s, 300s, 600s, 900s) in a single trigger, eliminating the need for multiple plugin instances. It can be used for both scheduled batch processing and on-demand HTTP requests. + +## File Structure + +``` +influxdb3/candle_generator/ +├── candle_generator.py # Main plugin implementation +├── candle_config_scheduler.toml # TOML configuration template +├── README.md # Comprehensive documentation +├── setup_example.sh # Setup script with examples +├── test_plugin.py # Test script for validation +└── PLUGIN_SUMMARY.md # This overview file +``` + +## How It Works + +### 1. Data Source: InfluxProchainDataPoint + +The plugin reads from measurements containing data with this structure: +```rust +pub struct InfluxProchainDataPoint { + pub time: DateTime, + pub price: f64, + pub quote_volume: f64, + pub base_volume: f64, + pub key: String, // tag + pub object_type: String, // tag + pub mint: String, // tag + pub is_buy: bool, // tag + pub payer: String, // tag + pub trx: String, // tag + pub platform: String, // tag +} +``` + +### 2. Candle Generation Process + +1. **Data Source**: Reads from the specified source measurement/table (default: `raw_trades`) +2. **Automatic Window Calculation**: Uses a 1-hour data window (automatically calculated based on 30-second trigger interval) +3. **Multi-Timeframe Processing**: Automatically processes **all supported timeframes** in a single execution +4. **Data Aggregation**: Groups data points by each timeframe (30s, 60s, 120s, 300s, 600s, 900s) from the source table +5. **OHLCV Calculation**: + - **Open**: First price in the timeframe + - **High**: Highest price in the timeframe + - **Low**: Lowest price in the timeframe + - **Close**: Last price in the timeframe + - **Volume**: Sum of quote_volume in the timeframe +6. **Essential Tag Preservation**: Preserves only essential tags (mint, platform, timeframe) +7. **Single Measurement**: All candles are written to a single measurement with timeframe as a tag +8. **Output**: Writes candles to a single measurement with timeframe differentiation via tags + +### 3. Generated Candle Structure + +Each candle contains: +- **Fields**: `open`, `high`, `low`, `close`, `volume` +- **Tags**: Essential tags only (`mint`, `platform`, `timeframe`) +- **Measurement**: All candles are saved to a single measurement (e.g., `candles`) with timeframe as a tag + +## Supported Timeframes + +- **30s**: 30-second candles +- **60s**: 1-minute candles +- **120s**: 2-minute candles +- **300s**: 5-minute candles +- **600s**: 10-minute candles +- **900s**: 15-minute candles + +## Usage Modes + +### Scheduled Mode +Automatically generates candles for **all timeframes** on configurable intervals: +```bash +# Generate all timeframes (30s, 60s, 120s, 300s, 600s, 900s) every 30 seconds +influxdb3 create trigger \ + --database prochain_db \ + --plugin-filename candle_generator.py \ + --trigger-spec "every:30s" \ + --trigger-arguments 'target_measurement=candles,target_database=candles_db,source_measurement=raw_trades' \ + prochain_candles_all_timeframes +``` + +### HTTP Mode +On-demand candle generation via HTTP API (supports single or all timeframes): +```bash +# Create HTTP trigger for all timeframes +influxdb3 create trigger \ + --database prochain_db \ + --plugin-filename candle_generator.py \ + --trigger-spec "http" \ + --trigger-arguments 'target_measurement=candles,target_database=candles_db,source_measurement=raw_trades' \ + prochain_candles_http + +# Or specify a single timeframe for HTTP requests +influxdb3 create trigger \ + --database prochain_db \ + --plugin-filename candle_generator.py \ + --trigger-spec "http" \ + --trigger-arguments 'target_measurement=candles,timeframe=30s,target_database=candles_db,source_measurement=raw_trades' \ + prochain_candles_http_single +``` + +## Key Features + +### 1. Multi-Timeframe Processing +- **Single Trigger, All Timeframes**: One plugin instance processes all 6 timeframes (30s, 60s, 120s, 300s, 600s, 900s) +- **Efficient Resource Usage**: Eliminates the need for 6 separate plugin instances +- **Consistent Data**: All timeframes are processed from the same data window, ensuring consistency +- **Flexible HTTP Mode**: HTTP requests can target all timeframes or specify a single timeframe + +### 2. Automatic Window Calculation +- **Smart Window Sizing**: Automatically uses a 1-hour data window based on the 30-second trigger interval +- **Simplified Configuration**: No need to specify window size - it's automatically optimized +- **Optimal Coverage**: 1-hour window ensures sufficient data for all timeframes (up to 15-minute candles) +- **Consistent Processing**: Every execution processes the same amount of historical data + +### 3. Single Measurement Architecture +- **Unified Storage**: All candles are stored in a single measurement regardless of timeframe +- **Timeframe Tagging**: Each candle includes a `timeframe` tag (30s, 60s, 120s, 300s, 600s, 900s) +- **Simplified Queries**: Easy to query all timeframes or filter by specific timeframe +- **Efficient Storage**: Reduces measurement proliferation and simplifies data management + +### 4. Flexible Configuration +- TOML configuration file support +- Command-line argument support +- Environment variable support + +### 5. Robust Error Handling +- Configurable retry logic +- Detailed logging with task IDs +- Graceful error recovery + +### 6. Performance Optimizations +- Efficient SQL queries for aggregation using time_bucket functions +- Batch processing capabilities +- Memory-conscious data handling + +### 7. Filtering Options +- Mint address filtering +- Time range filtering +- Tag-based filtering + +## Example Data Flow + +``` +Raw InfluxProchainDataPoint Data: +┌─────────────────┬─────────┬─────────────┬─────────┬─────────┬─────────┐ +│ time │ price │ quote_vol │ mint │ is_buy │ platform│ +├─────────────────┼─────────┼─────────────┼─────────┼─────────┼─────────┤ +│ 12:00:00 │ 100.0 │ 1000.0 │ TOKEN1 │ true │ pumpfun │ +│ 12:00:15 │ 101.0 │ 1500.0 │ TOKEN1 │ false │ pumpfun │ +│ 12:00:25 │ 99.5 │ 800.0 │ TOKEN1 │ true │ pumpfun │ +│ 12:00:45 │ 102.0 │ 2000.0 │ TOKEN1 │ true │ pumpfun │ +└─────────────────┴─────────┴─────────────┴─────────┴─────────┴─────────┘ + +Generated 30s Candle (saved to `candles` measurement with timeframe tag): +┌─────────────────┬──────┬──────┬──────┬───────┬─────────┬─────────┬───────────┐ +│ time │ open │ high │ low │ close │ volume │ mint │ timeframe │ +├─────────────────┼──────┼──────┼──────┼───────┼─────────┼─────────┼───────────┤ +│ 12:00:00 │ 100.0│ 101.0│ 99.5 │ 99.5 │ 3300.0 │ TOKEN1 │ 30s │ +└─────────────────┴──────┴──────┴──────┴───────┴─────────┴─────────┴───────────┘ +``` + +## Integration with Your System + +### 1. Prerequisites +- InfluxDB 3 with Processing Engine enabled +- Python 3.8+ (for plugin execution) +- `influxdb_client_3` package + +### 2. Setup Steps +1. Copy plugin files to your plugin directory +2. Start InfluxDB 3 with `--plugin-dir` flag +3. Create database and measurements +4. Set up triggers using provided scripts +5. Monitor candle generation + +### 3. Monitoring +- Check plugin logs for execution status +- Query generated candles for validation +- Monitor performance metrics + +## Customization + +The plugin can be customized for different data structures by modifying: +- `build_candle_query()` function for different field names +- `write_candle_data()` function for different output formats +- `parse_timeframe()` function for additional timeframes + +## Troubleshooting + +Common issues and solutions: +1. **No candles generated**: Check source data exists and timeframes match +2. **Memory issues**: Reduce window size or increase processing intervals +3. **Permission errors**: Verify database and measurement permissions +4. **Invalid timeframes**: Ensure timeframe is one of supported values + +## Performance Considerations + +- **Window Size**: Balance between data coverage and memory usage +- **Timeframe**: Shorter timeframes = more candles = more processing +- **Filtering**: Use mint filters to reduce data volume +- **Scheduling**: Adjust trigger frequencies based on data volume + +This plugin provides a complete solution for generating trading candles from Prochain data, enabling real-time and historical analysis of market movements. diff --git a/sphe/candle_generator/README.md b/sphe/candle_generator/README.md new file mode 100644 index 0000000..1edcde9 --- /dev/null +++ b/sphe/candle_generator/README.md @@ -0,0 +1,266 @@ +# Candle Generator Plugin + +⚡ scheduled, http 🏷️ candles, OHLCV, trading, aggregation 🔧 InfluxDB 3 Core, InfluxDB 3 Enterprise + +## Description + +The Candle Generator Plugin enables automatic generation of OHLCV (Open, High, Low, Close, Volume) candles from InfluxProchainDataPoint data in InfluxDB 3. This plugin is specifically designed to work with Prochain trading data and supports multiple timeframes (30s, 60s, 300s). It can process both scheduled batch operations and on-demand HTTP requests to generate candlestick data for trading analysis. + +## Features + +- **Multiple Timeframes**: Support for 30-second, 1-minute, and 5-minute candles +- **OHLCV Generation**: Complete candlestick data with open, high, low, close, and volume +- **Dynamic Table Naming**: Candles are automatically saved to tables named after the mint (e.g., `candles_TOKEN1`) +- **Essential Tags Only**: Preserves only essential tags (mint, is_buy, platform) for cleaner data +- **Mint Filtering**: Optional filtering by specific mint addresses +- **Scheduled Processing**: Automatic candle generation on configurable intervals +- **HTTP API**: On-demand candle generation via HTTP requests +- **Retry Logic**: Robust error handling with configurable retry attempts +- **TOML Configuration**: Support for configuration files + +## Candle Structure + +Each generated candle contains the following fields: + +### OHLCV Fields +- `open`: Opening price for the timeframe +- `high`: Highest price during the timeframe +- `low`: Lowest price during the timeframe +- `close`: Closing price for the timeframe +- `volume`: Total quote volume during the timeframe + +### Tags (Essential Only) +- `mint`: Token mint address +- `is_buy`: Boolean indicating if it's a buy transaction +- `platform`: Platform identifier + +### Table Naming +Candles are automatically saved to tables named after the mint address. For example: +- `candles_EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v` for USDC +- `candles_So11111111111111111111111111111111111111112` for SOL + +## Configuration + +Plugin parameters may be specified as key-value pairs in the `--trigger-arguments` flag (CLI) or in the `trigger_arguments` field (API) when creating a trigger. The plugin also supports TOML configuration files. + +### Plugin metadata + +This plugin includes a JSON metadata schema in its docstring that defines supported trigger types and configuration parameters. This metadata enables the [InfluxDB 3 Explorer](https://docs.influxdata.com/influxdb3/explorer/) UI to display and configure the plugin. + +### Required parameters + +| Parameter | Type | Default | Description | +|----------------------|--------|---------|------------------------------------------------------------------------------------| +| `source_measurement` | string | required | Source measurement containing InfluxProchainDataPoint data | +| `target_measurement` | string | required | Destination measurement for candle data | +| `timeframe` | string | required | Candle timeframe. Supported: '30s', '60s', '300s' | +| `window` | string | required | Time window for each candle generation job. Format: `` (e.g., "1h") | + +### Optional parameters + +| Parameter | Type | Default | Description | +|-------------------|--------|---------|---------------------------------------------------------------------| +| `offset` | string | "0s" | Time offset to apply to the window | +| `mint_filter` | string | none | Specific mint address to filter for candle generation | +| `max_retries` | integer| 5 | Maximum number of retries for write operations | +| `target_database` | string | default | Database for storing candle data | + +### TOML configuration + +| Parameter | Type | Default | Description | +|--------------------|--------|---------|----------------------------------------------------------------------------------| +| `config_file_path` | string | none | TOML config file path relative to `PLUGIN_DIR` (required for TOML configuration) | + +*To use a TOML configuration file, set the `PLUGIN_DIR` environment variable and specify the `config_file_path` in the trigger arguments.* + +#### Example TOML configuration + +[candle_config_scheduler.toml](candle_config_scheduler.toml) + +## Installation steps + +1. Start InfluxDB 3 with the Processing Engine enabled (`--plugin-dir /path/to/plugins`): + + ```bash + influxdb3 serve \ + --node-id node0 \ + --object-store file \ + --data-dir ~/.influxdb3 \ + --plugin-dir ~/.plugins + ``` + +2. No additional Python packages required for this plugin. + +## Trigger setup + +### Scheduled candle generation + +Run candle generation periodically on historical data: + +```bash +# 30-second candles - every 30 minutes +influxdb3 create trigger \ + --database prochain_db \ + --plugin-filename candle_generator.py \ + --trigger-spec "every:30min" \ + --trigger-arguments 'source_measurement=prochain_data,target_measurement=candles_30s,timeframe=30s,window=30min' \ + prochain_candles_30s + +# 1-minute candles - every hour +influxdb3 create trigger \ + --database prochain_db \ + --plugin-filename candle_generator.py \ + --trigger-spec "every:1h" \ + --trigger-arguments 'source_measurement=prochain_data,target_measurement=candles_60s,timeframe=60s,window=1h' \ + prochain_candles_60s + +# 5-minute candles - every 6 hours +influxdb3 create trigger \ + --database prochain_db \ + --plugin-filename candle_generator.py \ + --trigger-spec "every:6h" \ + --trigger-arguments 'source_measurement=prochain_data,target_measurement=candles_300s,timeframe=300s,window=6h' \ + prochain_candles_300s +``` + +### Using TOML configuration + +```bash +influxdb3 create trigger \ + --database prochain_db \ + --plugin-filename candle_generator.py \ + --trigger-spec "every:1h" \ + --trigger-arguments config_file_path=candle_config_scheduler.toml \ + prochain_candles_30s +``` + +### HTTP candle generation + +Generate candles on-demand via HTTP requests: + +```bash +# Create HTTP trigger +influxdb3 create trigger \ + --database prochain_db \ + --plugin-filename candle_generator.py \ + --trigger-spec "http" \ + --trigger-arguments 'source_measurement=prochain_data,target_measurement=candles_30s,timeframe=30s' \ + prochain_candles_http +``` + +#### HTTP API Usage + +**Endpoint**: `POST /api/v2/triggers/{trigger_id}/execute` + +**Request Body**: +```json +{ + "start_time": "2024-01-01T00:00:00Z", + "end_time": "2024-01-01T01:00:00Z", + "mint_filter": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v" +} +``` + +**Response**: +```json +{ + "status": "success", + "task_id": "a1b2c3d4", + "candles_generated": 120, + "timeframe": "30s", + "source_measurement": "prochain_data", + "target_measurement": "candles_30s", + "time_range": { + "start": "2024-01-01T00:00:00Z", + "end": "2024-01-01T01:00:00Z" + } +} +``` + +## Data Flow + +1. **Source Data**: InfluxProchainDataPoint records with price, volume, and metadata +2. **Aggregation**: Data is grouped by timeframe and aggregated into OHLCV candles +3. **Tag Preservation**: All original tags are preserved in the candle records +4. **Output**: Candle data written to target measurement with OHLCV fields + +## Example Queries + +### View generated candles +```sql +SELECT * FROM "candles_30s" +WHERE time >= NOW() - INTERVAL '1 hour' +ORDER BY time +``` + +### Compare different timeframes +```sql +SELECT + time, + '30s' as timeframe, + close +FROM "candles_30s" +WHERE time >= NOW() - INTERVAL '1 day' + +UNION ALL + +SELECT + time, + '60s' as timeframe, + close +FROM "candles_60s" +WHERE time >= NOW() - INTERVAL '1 day' + +UNION ALL + +SELECT + time, + '300s' as timeframe, + close +FROM "candles_300s" +WHERE time >= NOW() - INTERVAL '1 day' + +ORDER BY time +``` + +### Filter by specific mint +```sql +SELECT time, open, high, low, close, volume +FROM "candles_30s" +WHERE time >= NOW() - INTERVAL '1 hour' +AND mint = 'EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v' +ORDER BY time +``` + +## Troubleshooting + +### Common Issues + +1. **No candles generated**: Check that source measurement contains data in the specified time range +2. **Invalid timeframe**: Ensure timeframe is one of: '30s', '60s', '300s' +3. **Permission errors**: Verify database permissions for read/write operations +4. **Memory issues**: Reduce window size for large datasets + +### Logs + +The plugin provides detailed logging with task IDs for tracking: +``` +[a1b2c3d4] Starting scheduled candle generation at 2024-01-01 12:00:00 +[a1b2c3d4] Processing window: 2024-01-01 11:00:00 to 2024-01-01 12:00:00 +[a1b2c3d4] Timeframe: 30s +[a1b2c3d4] Source: prochain_data -> Target: candles_30s +[a1b2c3d4] Generated 120 candles +[a1b2c3d4] Successfully wrote 120 candle records to candles_30s +[a1b2c3d4] Candle generation completed successfully +``` + +## Performance Considerations + +- **Window Size**: Larger windows process more data but use more memory +- **Timeframe**: Shorter timeframes generate more candles but require more processing +- **Mint Filtering**: Filtering by mint can significantly reduce processing time +- **Batch Processing**: Consider using multiple triggers with different schedules for optimal performance + +## Contributing + +This plugin is designed to work with InfluxProchainDataPoint structures. If you need to modify it for different data structures, update the `build_candle_query` function and field mappings accordingly. diff --git a/sphe/candle_generator/candle_config_scheduler.toml b/sphe/candle_generator/candle_config_scheduler.toml new file mode 100644 index 0000000..4adc612 --- /dev/null +++ b/sphe/candle_generator/candle_config_scheduler.toml @@ -0,0 +1,84 @@ +# Candle Generator Plugin Scheduler Configuration Template +# Copy this file to your PLUGIN_DIR and reference it with `--trigger-arguments config_file_path=candle_config_scheduler.toml` + +########## Required Parameters ########## +# Name of the source measurement containing InfluxProchainDataPoint data +# Specify the measurement name (string) from your source InfluxDB database +source_measurement = "prochain_data" # e.g., "prochain_data", "trades", "market_data" + +# Name of the target measurement to write candle data +# Specify the target measurement name (string) +target_measurement = "prochain_candles" # e.g., "candles_30s", "candles_1m", "candles_5m" + +# Candle timeframe for aggregation +# Supported timeframes: "30s", "60s", "300s" +timeframe = "30s" # e.g., "30s", "60s", "300s" + +# Time window for each candle generation job +# Format: , where unit is s (seconds), min (minutes), h (hours), d (days), w (weeks) +window = "1h" # e.g., "30min", "1h", "6h", "1d" + + +########## Optional Parameters ########## +# Time offset to shift the window +# Format: , where unit is s (seconds), min (minutes), h (hours), d (days), w (weeks) +#offset = "10s" # e.g., "10s", "1h" + +# Specific mint address to filter for candle generation +# Leave commented or empty to process all mints +#mint_filter = "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v" # e.g., specific token mint address + +# Maximum number of retries for write operations +# Integer value ≥ 0; default is typically 5 +#max_retries = 5 # e.g., 3, 10 + +# Target database for writing candle data +# Specify the target database name (string); if omitted, source database is used +#target_database = "candles_db" # e.g., "analytics_db", "candles_data" + +###### Example Configurations for Different Timeframes ###### + +# 30-second candles +# timeframe = "30s" +# window = "30min" + +# 1-minute candles +# timeframe = "60s" +# window = "1h" + +# 5-minute candles +# timeframe = "300s" +# window = "6h" + +###### Example: Create Trigger Using This Config ###### +# influxdb3 create trigger \ +# --database your_database_name \ +# --plugin-filename candle_generator.py \ +# --trigger-spec "every:1h" \ +# --trigger-arguments config_file_path=candle_config_scheduler.toml \ +# prochain_candles_30s + +###### Example: Multiple Triggers for Different Timeframes ###### +# 30s candles - every 30 minutes +# influxdb3 create trigger \ +# --database prochain_db \ +# --plugin-filename candle_generator.py \ +# --trigger-spec "every:30min" \ +# --trigger-arguments 'source_measurement=prochain_data,target_measurement=candles_30s,timeframe=30s,window=30min' \ +# prochain_candles_30s + +# 60s candles - every hour +# influxdb3 create trigger \ +# --database prochain_db \ +# --plugin-filename candle_generator.py \ +# --trigger-spec "every:1h" \ +# --trigger-arguments 'source_measurement=prochain_data,target_measurement=candles_60s,timeframe=60s,window=1h' \ +# prochain_candles_60s + +# 300s candles - every 6 hours +# influxdb3 create trigger \ +# --database prochain_db \ +# --plugin-filename candle_generator.py \ +# --trigger-spec "every:6h" \ +# --trigger-arguments 'source_measurement=prochain_data,target_measurement=candles_300s,timeframe=300s,window=6h' \ +# prochain_candles_300s diff --git a/sphe/candle_generator/candle_generator.py b/sphe/candle_generator/candle_generator.py new file mode 100644 index 0000000..076b558 --- /dev/null +++ b/sphe/candle_generator/candle_generator.py @@ -0,0 +1,618 @@ +""" +{ + "plugin_type": ["scheduled", "http"], + "scheduled_args_config": [ + + { + "name": "target_measurement", + "example": "prochain_candles", + "description": "Name of the target measurement to write candle data.", + "required": true + }, + { + "name": "timeframe", + "example": "30s", + "description": "Candle timeframe. Supported: '30s', '60s', '120s', '300s', '600s', '900s'. Note: All timeframes are processed automatically.", + "required": false + }, + + { + "name": "offset", + "example": "10s", + "description": "Time offset to apply to the window (e.g., '10s', '1h'). Units: 's', 'min', 'h', 'd', 'w'.", + "required": false + }, + + { + "name": "max_retries", + "example": "5", + "description": "Maximum number of retries for write operations.", + "required": false + }, + { + "name": "target_database", + "example": "candles_db", + "description": "Target database for writing candle data.", + "required": true + }, + { + "name": "source_measurement", + "example": "raw_trades", + "description": "Source measurement/table to read trade data from.", + "required": false + }, + { + "name": "config_file_path", + "example": "config.toml", + "description": "Path to config file to override args. Format: 'config.toml'.", + "required": false + } + ] +} +""" + +import json +import os +import re +import time +import tomllib +import uuid +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Dict, List, Optional, Tuple + + +def get_all_timeframes() -> List[timedelta]: + """ + Get all supported timeframes as timedelta objects. + + Returns: + List of timedelta objects for all supported timeframes + """ + supported_timeframes = {'30s', '60s', '120s', '300s', '600s', '900s'} + timeframes = [] + + for timeframe_str in supported_timeframes: + if timeframe_str.endswith('s'): + seconds = int(timeframe_str[:-1]) + timeframes.append(timedelta(seconds=seconds)) + + return timeframes + + +def parse_timeframe(timeframe: str) -> timedelta: + """ + Parse timeframe string into timedelta. + + Args: + timeframe: Timeframe string (e.g., '30s', '60s', '300s') + + Returns: + timedelta representing the timeframe + + Raises: + ValueError: If timeframe is not supported + """ + supported_timeframes = {'30s', '60s', '120s', '300s', '600s', '900s'} + if timeframe not in supported_timeframes: + raise ValueError(f"Unsupported timeframe: {timeframe}. Supported: {supported_timeframes}") + + # Parse timeframe + if timeframe.endswith('s'): + seconds = int(timeframe[:-1]) + return timedelta(seconds=seconds) + else: + raise ValueError(f"Invalid timeframe format: {timeframe}") + + +def calculate_window() -> timedelta: + """ + Calculate the time window based on the plugin trigger interval. + Since the plugin runs every 30 seconds, we use a 1-hour window + to ensure we capture enough data for all timeframes. + + Returns: + timedelta representing the window (1 hour) + """ + return timedelta(hours=1) + + +def parse_offset(args: dict, task_id: str) -> timedelta: + """ + Parse offset string into timedelta. + + Args: + args: Arguments dictionary + task_id: Task ID for logging + + Returns: + timedelta representing the offset (default: 0) + """ + offset_str = args.get('offset', '0s') + + # Parse offset string (e.g., "10s", "1h") + match = re.match(r'^(\d+)(s|min|h|d|w)$', offset_str) + if not match: + raise ValueError(f"[{task_id}] Invalid offset format: {offset_str}") + + value, unit = match.groups() + value = int(value) + + if unit == 's': + return timedelta(seconds=value) + elif unit == 'min': + return timedelta(minutes=value) + elif unit == 'h': + return timedelta(hours=value) + elif unit == 'd': + return timedelta(days=value) + elif unit == 'w': + return timedelta(weeks=value) + else: + raise ValueError(f"[{task_id}] Unsupported offset unit: {unit}") + + +def build_multi_timeframe_candle_query( + timeframes: List[timedelta], + start_time: datetime, + end_time: datetime, + source_measurement: str = "raw_trades" +) -> str: + """ + Build SQL query for multi-timeframe candle generation from source measurement table. + + Args: + timeframes: List of candle timeframes + start_time: Start time for query + end_time: End time for query + source_measurement: Source measurement/table name + + Returns: + SQL query string + """ + # Format timestamps for SQL + start_time_str = start_time.strftime('%Y-%m-%d %H:%M:%S') + end_time_str = end_time.strftime('%Y-%m-%d %H:%M:%S') + + # Build UNION ALL queries for each timeframe + timeframe_queries = [] + + for timeframe in timeframes: + timeframe_seconds = int(timeframe.total_seconds()) + + timeframe_query = f""" + SELECT + date_bin(INTERVAL '{timeframe_seconds} seconds', time, TIMESTAMP '1970-01-01 00:00:00') AS bucket_time, + mint, + platform, + price, + quote_volume, + time as original_time, + {timeframe_seconds} as timeframe_seconds + FROM "{source_measurement}" + WHERE time >= '{start_time_str}' + AND time < '{end_time_str}' + """ + timeframe_queries.append(timeframe_query) + + # Combine all timeframe queries + combined_query = f""" + WITH all_timeframes AS ( + {' UNION ALL '.join(timeframe_queries)} + ), + ranked_data AS ( + SELECT + bucket_time, + mint, + platform, + price, + quote_volume, + timeframe_seconds, + ROW_NUMBER() OVER (PARTITION BY bucket_time, mint, platform, timeframe_seconds ORDER BY original_time ASC) as rn_asc, + ROW_NUMBER() OVER (PARTITION BY bucket_time, mint, platform, timeframe_seconds ORDER BY original_time DESC) as rn_desc + FROM all_timeframes + ), + candle_data AS ( + SELECT + bucket_time, + mint, + platform, + timeframe_seconds, + MAX(CASE WHEN rn_asc = 1 THEN price END) AS open, + MAX(price) AS high, + MIN(price) AS low, + MAX(CASE WHEN rn_desc = 1 THEN price END) AS close, + SUM(quote_volume) AS volume + FROM ranked_data + GROUP BY bucket_time, mint, platform, timeframe_seconds + ORDER BY bucket_time, timeframe_seconds + ) + SELECT * FROM candle_data + """ + + return combined_query + + +def write_candle_data( + influxdb3_local, + data: List[Dict], + max_retries: int, + target_measurement: str, + target_database: str, + task_id: str, +): + """ + Write candle data to InfluxDB. + + Args: + influxdb3_local: InfluxDB client + data: List of candle data dictionaries (each with timeframe field) + max_retries: Maximum number of retries + target_measurement: Target measurement name + target_database: Target database name + task_id: Task ID for logging + """ + if not data: + influxdb3_local.info(f"[{task_id}] No candle data to write") + return 0 + + influxdb3_local.info(f"[{task_id}] Writing {len(data)} candle records to {target_measurement}") + + # Use target database (now mandatory) + database = target_database + + # Transform data to InfluxDB line protocol format using LineBuilder + builders: list = [] + for candle in data: + # Get timeframe from candle data + timeframe_seconds = candle.get('timeframe_seconds') + if timeframe_seconds is None: + influxdb3_local.info(f"[{task_id}] Warning: candle missing timeframe_seconds, skipping") + continue + + # Create tags including timeframe - only add non-empty, valid values + tags = {} + mint = candle.get('mint', '') + platform = candle.get('platform', '') + + tags['mint'] = str(mint).strip() + tags['platform'] = str(platform).strip() + tags['timeframe'] = f"{timeframe_seconds}s" + + # Create fields - make OHLCV mandatory even if zero + fields = {} + open_price = candle.get('open') + if open_price is not None: + fields['open'] = float(open_price) + else: + fields['open'] = 0.0 + + high_price = candle.get('high') + if high_price is not None: + fields['high'] = float(high_price) + else: + fields['high'] = 0.0 + + low_price = candle.get('low') + if low_price is not None: + fields['low'] = float(low_price) + else: + fields['low'] = 0.0 + + close_price = candle.get('close') + if close_price is not None: + fields['close'] = float(close_price) + else: + fields['close'] = 0.0 + + volume = candle.get('volume') + if volume is not None: + fields['volume'] = float(volume) + else: + fields['volume'] = 0.0 + + # Set timestamp + timestamp = candle.get('_time', datetime.now(timezone.utc)) + if isinstance(timestamp, str): + timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) + elif isinstance(timestamp, int): + # Convert nanoseconds to datetime + timestamp = datetime.fromtimestamp(timestamp / 1e9, tz=timezone.utc) + + # Create LineBuilder object + line = LineBuilder(target_measurement) + + # Set timestamp + line.time_ns(int(timestamp.timestamp() * 1e9)) + + # Add tags - only add non-empty tags + for key, value in tags.items(): + line.tag(key, str(value).strip()) + + # Add fields - all OHLCV fields are mandatory + for key, value in fields.items(): + if value is not None: + line.float64_field(key, value) + + builders.append(line) + + # Write data with retries + for attempt in range(max_retries + 1): + try: + for row in builders: + influxdb3_local.write_to_db(database, row) + influxdb3_local.info(f"[{task_id}] Successfully wrote {len(builders)} candles to {target_measurement}") + break + except Exception as e: + if attempt == max_retries: + influxdb3_local.info(f"[{task_id}] Failed to write candle data to {target_measurement} after {max_retries} retries: {e}") + raise + else: + influxdb3_local.info(f"[{task_id}] Write attempt {attempt + 1} failed, retrying: {e}") + time.sleep(2 ** attempt) # Exponential backoff + + return len(builders) + + +def process_scheduled_call( + influxdb3_local, call_time: datetime, args: Optional[Dict] = None +): + """ + Process scheduled candle generation call. + + Args: + influxdb3_local: InfluxDB client + call_time: Current call time + args: Arguments dictionary + """ + task_id = str(uuid.uuid4())[:8] + influxdb3_local.info(f"[{task_id}] Starting scheduled candle generation at {call_time}") + + try: + # Load config from file if specified + if args and args.get('config_file_path'): + config_path = Path(args['config_file_path']) + if not config_path.is_absolute(): + plugin_dir = os.environ.get('PLUGIN_DIR', '.') + config_path = Path(plugin_dir) / config_path + + with open(config_path, 'rb') as f: + config = tomllib.load(f) + args = {**args, **config} + + # Parse arguments + target_measurement = args.get('target_measurement') + if not target_measurement: + raise ValueError(f"[{task_id}] target_measurement is required") + + # Get all timeframes to process + timeframes = get_all_timeframes() + window = calculate_window() + offset = parse_offset(args, task_id) + max_retries = int(args.get('max_retries', 5)) + target_database = args.get('target_database') + if not target_database: + raise ValueError(f"[{task_id}] target_database is required") + source_measurement = args.get('source_measurement', 'raw_trades') + + # Calculate time window + end_time = call_time - offset + start_time = end_time - window + + influxdb3_local.info(f"[{task_id}] Processing window: {start_time} to {end_time}") + influxdb3_local.info(f"[{task_id}] Source: {source_measurement}") + influxdb3_local.info(f"[{task_id}] Target: {target_measurement}") + influxdb3_local.info(f"[{task_id}] Processing {len(timeframes)} timeframes: {[f'{int(tf.total_seconds())}s' for tf in timeframes]}") + + # Build and execute single query for all timeframes + influxdb3_local.info(f"[{task_id}] Building multi-timeframe query for {len(timeframes)} timeframes...") + query = build_multi_timeframe_candle_query( + timeframes=timeframes, + start_time=start_time, + end_time=end_time, + source_measurement=source_measurement + ) + + influxdb3_local.info(f"[{task_id}] Executing multi-timeframe query...") + result = influxdb3_local.query(query) + + # Process results from single query + all_candles = [] + timeframe_counts = {} + + for row in result: + candle = { + '_time': row['bucket_time'], + 'open': row['open'], + 'high': row['high'], + 'low': row['low'], + 'close': row['close'], + 'volume': row['volume'], + 'mint': row['mint'], + 'platform': row['platform'], + 'timeframe_seconds': row['timeframe_seconds'] + } + all_candles.append(candle) + + # Count candles per timeframe + timeframe_str = f"{row['timeframe_seconds']}s" + timeframe_counts[timeframe_str] = timeframe_counts.get(timeframe_str, 0) + 1 + + # Log results per timeframe + for timeframe_str, count in timeframe_counts.items(): + influxdb3_local.info(f"[{task_id}] Generated {count} candles for {timeframe_str}") + + # Write all candles to database in a single call + if all_candles: + total_candles_generated = write_candle_data( + influxdb3_local=influxdb3_local, + data=all_candles, + max_retries=max_retries, + target_measurement=target_measurement, + target_database=target_database, + task_id=task_id + ) + else: + total_candles_generated = 0 + + influxdb3_local.info(f"[{task_id}] Total candles generated across all timeframes: {total_candles_generated}") + + influxdb3_local.info(f"[{task_id}] Candle generation completed successfully") + + except Exception as e: + influxdb3_local.info(f"[{task_id}] Error in scheduled candle generation: {e}") + raise + + +def process_request( + influxdb3_local, query_parameters, request_headers, request_body, args=None +): + """ + Process HTTP request for candle generation. + + Args: + influxdb3_local: InfluxDB client + query_parameters: Query parameters from request + request_headers: Request headers + request_body: Request body + args: Arguments dictionary + """ + task_id = str(uuid.uuid4())[:8] + influxdb3_local.info(f"[{task_id}] Processing HTTP candle generation request") + + try: + Parse request data + if request_body: + data = json.loads(request_body) + else: + data = {} + + Merge query parameters and request body + request_args = {**query_parameters, **data} + + Load config from file if specified + if request_args.get('config_file_path'): + config_path = Path(request_args['config_file_path']) + if not config_path.is_absolute(): + plugin_dir = os.environ.get('PLUGIN_DIR', '.') + config_path = Path(plugin_dir) / config_path + + with open(config_path, 'rb') as f: + config = tomllib.load(f) + request_args = {**request_args, **config} + + Parse arguments + target_measurement = request_args.get('target_measurement') + if not target_measurement: + raise ValueError(f"[{task_id}] target_measurement is required") + + Check if specific timeframe is requested, otherwise use all timeframes + timeframe_str = request_args.get('timeframe') + if timeframe_str: + Single timeframe requested + timeframes = [parse_timeframe(timeframe_str)] + influxdb3_local.info(f"[{task_id}] Single timeframe requested: {timeframe_str}") + else: + All timeframes + timeframes = get_all_timeframes() + influxdb3_local.info(f"[{task_id}] All timeframes requested") + + max_retries = int(request_args.get('max_retries', 5)) + target_database = request_args.get('target_database') + if not target_database: + raise ValueError(f"[{task_id}] target_database is required") + source_measurement = request_args.get('source_measurement', 'raw_trades') + + Parse time range from request + start_time_str = request_args.get('start_time') + end_time_str = request_args.get('end_time') + + if not start_time_str or not end_time_str: + raise ValueError(f"[{task_id}] start_time and end_time are required for HTTP requests") + + start_time = datetime.fromisoformat(start_time_str.replace('Z', '+00:00')) + end_time = datetime.fromisoformat(end_time_str.replace('Z', '+00:00')) + + influxdb3_local.info(f"[{task_id}] Processing time range: {start_time} to {end_time}") + influxdb3_local.info(f"[{task_id}] Source: {source_measurement}") + influxdb3_local.info(f"[{task_id}] Target: {target_measurement}") + influxdb3_local.info(f"[{task_id}] Processing {len(timeframes)} timeframes: {[f'{int(tf.total_seconds())}s' for tf in timeframes]}") + + Build and execute single query for all timeframes + influxdb3_local.info(f"[{task_id}] Building multi-timeframe query for {len(timeframes)} timeframes...") + query = build_multi_timeframe_candle_query( + timeframes=timeframes, + start_time=start_time, + end_time=end_time, + source_measurement=source_measurement + ) + + influxdb3_local.info(f"[{task_id}] Executing multi-timeframe query...") + result = influxdb3_local.query(query) + + Process results from single query + all_candles = [] + timeframe_results = {} + + for row in result: + candle = { + '_time': row['bucket_time'], + 'open': row['open'], + 'high': row['high'], + 'low': row['low'], + 'close': row['close'], + 'volume': row['volume'], + 'mint': row['mint'], + 'platform': row['platform'], + 'timeframe_seconds': row['timeframe_seconds'] + } + all_candles.append(candle) + + Count candles per timeframe + timeframe_str = f"{row['timeframe_seconds']}s" + timeframe_results[timeframe_str] = timeframe_results.get(timeframe_str, 0) + 1 + + Log results per timeframe + for timeframe_str, count in timeframe_results.items(): + influxdb3_local.info(f"[{task_id}] Generated {count} candles for {timeframe_str}") + + Write all candles to database in a single call + if all_candles: + total_candles_generated = write_candle_data( + influxdb3_local=influxdb3_local, + data=all_candles, + max_retries=max_retries, + target_measurement=target_measurement, + target_database=target_database, + task_id=task_id + ) + else: + total_candles_generated = 0 + + influxdb3_local.info(f"[{task_id}] Total candles generated across all timeframes: {total_candles_generated}") + + Return success response + response = { + 'status': 'success', + 'task_id': task_id, + 'candles_generated': total_candles_generated, + 'timeframes_processed': len(timeframes), + 'timeframe_results': timeframe_results, + 'target_measurement': target_measurement, + 'time_range': { + 'start': start_time.isoformat(), + 'end': end_time.isoformat() + } + } + + return json.dumps(response), 200, {'Content-Type': 'application/json'} + + except Exception as e: + influxdb3_local.info(f"[{task_id}] Error in HTTP candle generation: {e}") + error_response = { + 'status': 'error', + 'task_id': task_id, + 'error': str(e) + } + + return json.dumps(error_response), 500, {'Content-Type': 'application/json'} diff --git a/sphe/candle_generator/setup_example.sh b/sphe/candle_generator/setup_example.sh new file mode 100644 index 0000000..8b113e1 --- /dev/null +++ b/sphe/candle_generator/setup_example.sh @@ -0,0 +1,66 @@ +#!/bin/bash + +# Candle Generator Plugin Setup Example +# This script demonstrates how to set up the candle generator plugin for InfluxDB 3 + +echo "Setting up Candle Generator Plugin for InfluxDB 3..." + +# Set your database name +DATABASE_NAME="prochain_db" +PLUGIN_DIR="$(pwd)" + +echo "Using database: $DATABASE_NAME" +echo "Plugin directory: $PLUGIN_DIR" + +# Create database if it doesn't exist +echo "Creating database if it doesn't exist..." +influxdb3 database create --name "$DATABASE_NAME" 2>/dev/null || echo "Database already exists" + +# Set up triggers for different timeframes + +echo "Setting up 30-second candle trigger..." +influxdb3 create trigger \ + --database "$DATABASE_NAME" \ + --plugin-filename "$PLUGIN_DIR/candle_generator.py" \ + --trigger-spec "every:30min" \ + --trigger-arguments "source_measurement=prochain_data,target_measurement=candles_30s,timeframe=30s,window=30min" \ + prochain_candles_30s + +echo "Setting up 1-minute candle trigger..." +influxdb3 create trigger \ + --database "$DATABASE_NAME" \ + --plugin-filename "$PLUGIN_DIR/candle_generator.py" \ + --trigger-spec "every:1h" \ + --trigger-arguments "source_measurement=prochain_data,target_measurement=candles_60s,timeframe=60s,window=1h" \ + prochain_candles_60s + +echo "Setting up 5-minute candle trigger..." +influxdb3 create trigger \ + --database "$DATABASE_NAME" \ + --plugin-filename "$PLUGIN_DIR/candle_generator.py" \ + --trigger-spec "every:6h" \ + --trigger-arguments "source_measurement=prochain_data,target_measurement=candles_300s,timeframe=300s,window=6h" \ + prochain_candles_300s + +echo "Setting up HTTP trigger for on-demand candle generation..." +influxdb3 create trigger \ + --database "$DATABASE_NAME" \ + --plugin-filename "$PLUGIN_DIR/candle_generator.py" \ + --trigger-spec "http" \ + --trigger-arguments "source_measurement=prochain_data,target_measurement=candles_30s,timeframe=30s" \ + prochain_candles_http + +echo "" +echo "Setup complete! Created triggers:" +echo "- prochain_candles_30s (30-second candles, every 30 minutes)" +echo "- prochain_candles_60s (1-minute candles, every hour)" +echo "- prochain_candles_300s (5-minute candles, every 6 hours)" +echo "- prochain_candles_http (HTTP endpoint for on-demand generation)" +echo "" +echo "To list all triggers:" +echo "influxdb3 trigger list --database $DATABASE_NAME" +echo "" +echo "To test HTTP generation, use:" +echo "curl -X POST http://localhost:8086/api/v2/triggers/{trigger_id}/execute \\" +echo " -H 'Content-Type: application/json' \\" +echo " -d '{\"start_time\": \"2024-01-01T00:00:00Z\", \"end_time\": \"2024-01-01T01:00:00Z\"}'" diff --git a/sphe/candle_generator/test_plugin.py b/sphe/candle_generator/test_plugin.py new file mode 100644 index 0000000..df5d04d --- /dev/null +++ b/sphe/candle_generator/test_plugin.py @@ -0,0 +1,246 @@ +#!/usr/bin/env python3 +""" +Test script for the Candle Generator Plugin +This script creates sample raw_trades data and tests the candle generation functionality. +""" + +import json +import time +from datetime import datetime, timedelta, timezone +from influxdb_client_3 import InfluxDBClient3 + +def create_sample_data(client, database_name): + """Create sample raw_trades data for testing.""" + + # Sample data points over the last hour + end_time = datetime.now(timezone.utc) + start_time = end_time - timedelta(hours=1) + + # Generate sample data points every 10 seconds + current_time = start_time + data_points = [] + + base_price = 100.0 + price_variation = 5.0 + + # Sample mints for testing + mints = [ + "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", + "So11111111111111111111111111111111111111112", + "mSoLzYCxHdYgdzU16g5QSh3i5K3z3KZK7ytfqcJm7So" + ] + + platforms = ["raydium", "orca", "serum"] + + while current_time <= end_time: + # Simulate price movement + price = base_price + (price_variation * (current_time.minute % 10) / 10.0) + volume = 1000.0 + (current_time.second * 10) + + # Alternate between different mints and platforms + mint = mints[current_time.second % len(mints)] + platform = platforms[current_time.second % len(platforms)] + + data_point = { + "measurement": "raw_trades", + "time": current_time, + "tags": { + "mint": mint, + "platform": platform + }, + "fields": { + "price": price, + "quote_volume": volume + } + } + data_points.append(data_point) + current_time += timedelta(seconds=10) + + # Write data to InfluxDB + print(f"Writing {len(data_points)} sample raw_trades data points...") + client.write(data_points, database=database_name) + print("Sample data written successfully!") + + return start_time, end_time + +def test_candle_generation(client, database_name, target_measurement): + """Test candle generation for all timeframes.""" + + print(f"\nTesting candle generation for {target_measurement}...") + + # Query to check if candles were generated + query = f""" + SELECT COUNT(*) as candle_count, + COUNT(DISTINCT timeframe) as timeframe_count, + timeframe + FROM "{target_measurement}" + WHERE time >= NOW() - INTERVAL '1 hour' + GROUP BY timeframe + ORDER BY timeframe + """ + + try: + result = client.query(query) + total_candles = 0 + timeframes_found = [] + + print("\nCandle generation results:") + print("-" * 50) + + for row in result: + timeframe = row['timeframe'] + count = row['candle_count'] + total_candles += count + timeframes_found.append(timeframe) + print(f" {timeframe}: {count} candles") + + print("-" * 50) + print(f"Total candles: {total_candles}") + print(f"Timeframes found: {len(timeframes_found)}") + + if total_candles > 0: + print("✅ Candle generation test PASSED") + print(f"✅ Single measurement working: {target_measurement}") + print(f"✅ Timeframe tags working: {timeframes_found}") + return True + else: + print("❌ Candle generation test FAILED - No candles found") + return False + + except Exception as e: + print(f"❌ Candle generation test FAILED - Error: {e}") + return False + +def test_candle_data_quality(client, database_name, target_measurement): + """Test the quality and structure of generated candles.""" + + print(f"\nTesting candle data quality for {target_measurement}...") + + # Query to check candle structure + query = f""" + SELECT open, high, low, close, volume, mint, platform, timeframe + FROM "{target_measurement}" + WHERE time >= NOW() - INTERVAL '1 hour' + LIMIT 5 + """ + + try: + result = client.query(query) + candles = list(result) + + if not candles: + print("❌ No candles found for quality testing") + return False + + print(f"\nSample candle data (first {len(candles)} candles):") + print("-" * 80) + + for i, candle in enumerate(candles, 1): + print(f"Candle {i}:") + print(f" OHLCV: O={candle['open']:.4f}, H={candle['high']:.4f}, L={candle['low']:.4f}, C={candle['close']:.4f}, V={candle['volume']:.2f}") + print(f" Tags: mint={candle['mint']}, platform={candle['platform']}, timeframe={candle['timeframe']}") + print() + + # Validate OHLCV logic + valid_candles = 0 + for candle in candles: + high = candle['high'] + low = candle['low'] + open_price = candle['open'] + close_price = candle['close'] + + # Basic OHLCV validation + if (high >= low and + high >= open_price and + high >= close_price and + low <= open_price and + low <= close_price and + candle['volume'] >= 0): + valid_candles += 1 + + print(f"✅ Valid candles: {valid_candles}/{len(candles)}") + + if valid_candles == len(candles): + print("✅ Candle data quality test PASSED") + return True + else: + print("❌ Candle data quality test FAILED - Invalid OHLCV data") + return False + + except Exception as e: + print(f"❌ Candle data quality test FAILED - Error: {e}") + return False + +def main(): + """Main test function.""" + + # Configuration + database_name = "test_prochain_db" + target_measurement = "candles" # Single measurement with timeframe tags + + # Connect to InfluxDB (adjust connection details as needed) + try: + client = InfluxDBClient3( + host="localhost", + port=8086, + database=database_name + ) + print("Connected to InfluxDB successfully!") + except Exception as e: + print(f"Failed to connect to InfluxDB: {e}") + print("Make sure InfluxDB 3 is running and accessible") + return + + # Create database if it doesn't exist + try: + client.database = database_name + print(f"Using database: {database_name}") + except Exception as e: + print(f"Database error: {e}") + return + + # Create sample data + start_time, end_time = create_sample_data(client, database_name) + + print("\n" + "="*60) + print("TESTING CANDLE GENERATION") + print("="*60) + + # Note: In a real scenario, you would need to trigger the plugin + print("\nNote: This test assumes the candle generator plugin has been executed manually.") + print("To test the plugin, you need to:") + print("1. Set up the plugin triggers using the setup script") + print("2. Wait for scheduled execution or trigger manually") + print("3. Run this test script to verify results") + + # Test candle generation + generation_success = test_candle_generation(client, database_name, target_measurement) + + # Test candle data quality + quality_success = test_candle_data_quality(client, database_name, target_measurement) + + print("\n" + "="*60) + print("TEST SUMMARY") + print("="*60) + print(f"Candle Generation: {'✅ PASSED' if generation_success else '❌ FAILED'}") + print(f"Data Quality: {'✅ PASSED' if quality_success else '❌ FAILED'}") + + if generation_success and quality_success: + print("\n🎉 All tests PASSED!") + else: + print("\n⚠️ Some tests FAILED. Check the plugin configuration and execution.") + + print("\n" + "="*60) + print("TEST COMPLETE") + print("="*60) + print("\nTo manually test the plugin, you can:") + print("1. Use the HTTP trigger to generate candles on-demand") + print("2. Check the generated candles with queries like:") + print(f" SELECT * FROM \"{target_measurement}\" WHERE time >= NOW() - INTERVAL '1 hour'") + print(f" SELECT * FROM \"{target_measurement}\" WHERE timeframe = '30s' LIMIT 10") + print("\nSample data time range:") + print(f" Start: {start_time}") + print(f" End: {end_time}") + +if __name__ == "__main__": + main()