|
| 1 | +# Message Processor Implementation |
| 2 | + |
| 3 | +## Overview |
| 4 | + |
| 5 | +The message processor provides functionality for processing Kafka messages via Dapr input bindings. It supports both per-message and batch processing modes, with optional Temporal workflow integration. |
| 6 | + |
| 7 | +**Key Design Decision**: The processing mode is automatically determined by `batch_size`: |
| 8 | +- `batch_size = 1` → Per-message mode (immediate processing) |
| 9 | +- `batch_size > 1` → Batch mode (accumulate and process) |
| 10 | + |
| 11 | +This eliminates the need for a separate `process_mode` parameter and provides a more intuitive API. |
| 12 | + |
| 13 | +## Architecture |
| 14 | + |
| 15 | +### Core Components |
| 16 | + |
| 17 | +1. **MessageProcessorConfig** (`application_sdk/server/messaging.py`) |
| 18 | + - Configuration for message processor behavior |
| 19 | + - Specifies processing mode, batch settings, and workflow integration |
| 20 | + |
| 21 | +2. **MessageProcessor** (`application_sdk/server/messaging.py`) |
| 22 | + - Handles message processing logic |
| 23 | + - Supports per-message and batch modes |
| 24 | + - Optional workflow triggering |
| 25 | + - Built-in metrics and error handling |
| 26 | + |
| 27 | +3. **APIServer Integration** (`application_sdk/server/fastapi/__init__.py`) |
| 28 | + - `register_message_processor()` - Register a processor for a Kafka binding |
| 29 | + - `start_message_processors()` - Start all registered processors |
| 30 | + - `stop_message_processors()` - Stop all registered processors |
| 31 | + |
| 32 | +## Processing Modes |
| 33 | + |
| 34 | +The processing mode is automatically determined by the `batch_size` parameter: |
| 35 | + |
| 36 | +### Per-Message Mode (batch_size = 1) |
| 37 | +```python |
| 38 | +config = MessageProcessorConfig( |
| 39 | + binding_name="kafka-input", |
| 40 | + batch_size=1 |
| 41 | +) |
| 42 | +``` |
| 43 | +- Processes each message immediately upon arrival |
| 44 | +- No batching or delays |
| 45 | +- Suitable for low-latency requirements |
| 46 | +- `batch_timeout` is ignored in this mode |
| 47 | + |
| 48 | +### Batch Mode (batch_size > 1) |
| 49 | +```python |
| 50 | +config = MessageProcessorConfig( |
| 51 | + binding_name="kafka-input", |
| 52 | + batch_size=50, |
| 53 | + batch_timeout=5.0 |
| 54 | +) |
| 55 | +``` |
| 56 | +- Accumulates messages up to `batch_size` |
| 57 | +- Processes batch when size threshold or timeout is reached |
| 58 | +- More efficient for high-throughput scenarios |
| 59 | + |
| 60 | +## Usage Examples |
| 61 | + |
| 62 | +### Example 1: Basic Per-Message Processing (batch_size = 1) |
| 63 | + |
| 64 | +```python |
| 65 | +from application_sdk.server.messaging import MessageProcessorConfig |
| 66 | + |
| 67 | +# Register processor in your APIServer setup |
| 68 | +config = MessageProcessorConfig( |
| 69 | + binding_name="kafka-input", |
| 70 | + batch_size=1 # Per-message mode |
| 71 | +) |
| 72 | + |
| 73 | +async def process_messages(messages: List[dict]): |
| 74 | + for msg in messages: |
| 75 | + logger.info(f"Processing: {msg}") |
| 76 | + # Your custom processing logic here |
| 77 | + |
| 78 | +server.register_message_processor(config, process_callback=process_messages) |
| 79 | +``` |
| 80 | + |
| 81 | +### Example 2: Batch Processing with Custom Callback |
| 82 | + |
| 83 | +```python |
| 84 | +config = MessageProcessorConfig( |
| 85 | + binding_name="kafka-input", |
| 86 | + batch_size=100, # Batch mode |
| 87 | + batch_timeout=10.0 |
| 88 | +) |
| 89 | + |
| 90 | +async def process_batch(messages: List[dict]): |
| 91 | + # Process entire batch |
| 92 | + logger.info(f"Processing batch of {len(messages)} messages") |
| 93 | + # Bulk insert to database, etc. |
| 94 | + await bulk_insert_to_db(messages) |
| 95 | + |
| 96 | +server.register_message_processor(config, process_callback=process_batch) |
| 97 | +``` |
| 98 | + |
| 99 | +### Example 3: Workflow Integration (Per-Message) |
| 100 | + |
| 101 | +```python |
| 102 | +from app.workflows import MessageProcessorWorkflow |
| 103 | + |
| 104 | +config = MessageProcessorConfig( |
| 105 | + binding_name="kafka-input", |
| 106 | + batch_size=1, # Per-message: each message triggers a workflow |
| 107 | + trigger_workflow=True, |
| 108 | + workflow_class=MessageProcessorWorkflow |
| 109 | +) |
| 110 | + |
| 111 | +server.register_message_processor(config) |
| 112 | +``` |
| 113 | + |
| 114 | +### Example 4: Workflow Integration (Batch) |
| 115 | + |
| 116 | +```python |
| 117 | +config = MessageProcessorConfig( |
| 118 | + binding_name="kafka-input", |
| 119 | + batch_size=50, # Batch mode |
| 120 | + batch_timeout=5.0, |
| 121 | + trigger_workflow=True, |
| 122 | + workflow_class=BatchProcessorWorkflow |
| 123 | +) |
| 124 | + |
| 125 | +# Each batch triggers a single workflow with all messages |
| 126 | +server.register_message_processor(config) |
| 127 | +``` |
| 128 | + |
| 129 | +## Dapr Integration |
| 130 | + |
| 131 | +### Binding Configuration |
| 132 | + |
| 133 | +Create a Dapr Kafka input binding in `components/kafka-input-binding.yaml`: |
| 134 | + |
| 135 | +```yaml |
| 136 | +apiVersion: dapr.io/v1alpha1 |
| 137 | +kind: Component |
| 138 | +metadata: |
| 139 | + name: kafka-input # Must match config.binding_name |
| 140 | +spec: |
| 141 | + type: bindings.kafka |
| 142 | + version: v1 |
| 143 | + metadata: |
| 144 | + - name: brokers |
| 145 | + value: "localhost:9092" |
| 146 | + - name: topics |
| 147 | + value: "events-topic" |
| 148 | + - name: consumerGroup |
| 149 | + value: "my-consumer-group" |
| 150 | + - name: initialOffset |
| 151 | + value: "newest" |
| 152 | + - name: direction |
| 153 | + value: "input" |
| 154 | + - name: authType |
| 155 | + value: "none" |
| 156 | +``` |
| 157 | +
|
| 158 | +### Endpoint Convention |
| 159 | +
|
| 160 | +- Dapr calls `POST /{binding-name}` for each message |
| 161 | +- Example: binding named "kafka-input" creates endpoint `POST /kafka-input` |
| 162 | +- Statistics available at `GET /messages/v1/stats/{binding-name}` |
| 163 | + |
| 164 | +## Monitoring & Metrics |
| 165 | + |
| 166 | +### Built-in Metrics |
| 167 | + |
| 168 | +The message processor automatically records: |
| 169 | + |
| 170 | +- `kafka_messages_processed_total` - Total messages processed (counter) |
| 171 | + - Labels: `status` (success/error), `mode` (per_message/batch) |
| 172 | +- `kafka_binding_requests_total` - Total binding requests (counter) |
| 173 | + - Labels: `status` (success/error), `binding` (binding name) |
| 174 | +- `kafka_binding_duration_seconds` - Request duration (histogram) |
| 175 | + - Labels: `binding` (binding name) |
| 176 | +- `kafka_batch_size` - Batch size distribution (histogram) |
| 177 | + |
| 178 | +### Statistics Endpoint |
| 179 | + |
| 180 | +Get processor stats: |
| 181 | +```bash |
| 182 | +curl http://localhost:3000/messages/v1/stats/kafka-input |
| 183 | +``` |
| 184 | + |
| 185 | +Response: |
| 186 | +```json |
| 187 | +{ |
| 188 | + "is_running": true, |
| 189 | + "current_batch_size": 15, |
| 190 | + "batch_size_threshold": 50, |
| 191 | + "batch_timeout": 5.0, |
| 192 | + "is_batch_mode": true, |
| 193 | + "total_processed": 1250, |
| 194 | + "total_errors": 3, |
| 195 | + "time_since_last_process": 2.5 |
| 196 | +} |
| 197 | +``` |
| 198 | + |
| 199 | +## Error Handling |
| 200 | + |
| 201 | +### Automatic Error Handling |
| 202 | + |
| 203 | +- All exceptions are logged with full stack traces |
| 204 | +- Error metrics are automatically recorded |
| 205 | +- Failed messages are counted in `total_errors` |
| 206 | +- Batch processing continues even if one message fails |
| 207 | + |
| 208 | +### Custom Error Handling |
| 209 | + |
| 210 | +```python |
| 211 | +async def process_with_error_handling(messages: List[dict]): |
| 212 | + for msg in messages: |
| 213 | + try: |
| 214 | + await process_message(msg) |
| 215 | + except ValidationError as e: |
| 216 | + logger.error(f"Validation failed: {e}") |
| 217 | + # Send to dead letter queue |
| 218 | + await send_to_dlq(msg, str(e)) |
| 219 | + except Exception as e: |
| 220 | + logger.error(f"Processing failed: {e}") |
| 221 | + # Retry logic |
| 222 | + await retry_message(msg) |
| 223 | +
|
| 224 | +config = MessageProcessorConfig( |
| 225 | + binding_name="kafka-input", |
| 226 | + batch_size=1 # Per-message mode |
| 227 | +) |
| 228 | +server.register_message_processor(config, process_callback=process_with_error_handling) |
| 229 | +``` |
| 230 | + |
| 231 | +## Lifecycle Management |
| 232 | + |
| 233 | +### Starting Processors |
| 234 | + |
| 235 | +```python |
| 236 | +# Start all registered message processors |
| 237 | +await server.start_message_processors() |
| 238 | +``` |
| 239 | + |
| 240 | +### Stopping Processors |
| 241 | + |
| 242 | +```python |
| 243 | +# Stop all processors and process remaining messages |
| 244 | +await server.stop_message_processors() |
| 245 | +``` |
| 246 | + |
| 247 | +### Integration with Application Lifecycle |
| 248 | + |
| 249 | +```python |
| 250 | +from contextlib import asynccontextmanager |
| 251 | +
|
| 252 | +@asynccontextmanager |
| 253 | +async def lifespan(app: FastAPI): |
| 254 | + # Startup |
| 255 | + await server.start_message_processors() |
| 256 | + yield |
| 257 | + # Shutdown |
| 258 | + await server.stop_message_processors() |
| 259 | +
|
| 260 | +app = FastAPI(lifespan=lifespan) |
| 261 | +``` |
| 262 | + |
| 263 | +## Performance Considerations |
| 264 | + |
| 265 | +### Per-Message Mode (batch_size = 1) |
| 266 | +- **Pros**: Low latency, immediate processing |
| 267 | +- **Cons**: Higher overhead per message |
| 268 | +- **Best for**: Real-time processing, low message volume |
| 269 | + |
| 270 | +### Batch Mode (batch_size > 1) |
| 271 | +- **Pros**: Better throughput, more efficient bulk operations |
| 272 | +- **Cons**: Higher latency (up to batch_timeout) |
| 273 | +- **Best for**: High volume, analytics, bulk database operations |
| 274 | + |
| 275 | +### Tuning Parameters |
| 276 | + |
| 277 | +```python |
| 278 | +# Per-message: instant processing |
| 279 | +batch_size=1 # batch_timeout ignored |
| 280 | +
|
| 281 | +# Low latency with small batching |
| 282 | +batch_size=10 |
| 283 | +batch_timeout=1.0 |
| 284 | +
|
| 285 | +# Balanced |
| 286 | +batch_size=100 |
| 287 | +batch_timeout=5.0 |
| 288 | +
|
| 289 | +# High throughput, can tolerate latency |
| 290 | +batch_size=1000 |
| 291 | +batch_timeout=30.0 |
| 292 | +``` |
| 293 | + |
| 294 | +## Testing |
| 295 | + |
| 296 | +### Unit Testing |
| 297 | + |
| 298 | +```python |
| 299 | +import pytest |
| 300 | +from application_sdk.server.messaging import MessageProcessor, MessageProcessorConfig |
| 301 | +
|
| 302 | +@pytest.mark.asyncio |
| 303 | +async def test_per_message_processing(): |
| 304 | + config = MessageProcessorConfig( |
| 305 | + binding_name="test-binding", |
| 306 | + batch_size=1 # Per-message mode |
| 307 | + ) |
| 308 | + |
| 309 | + processed = [] |
| 310 | + async def callback(messages): |
| 311 | + processed.extend(messages) |
| 312 | + |
| 313 | + processor = MessageProcessor(config, process_callback=callback) |
| 314 | + |
| 315 | + message = {"event_type": "test", "data": {"id": 1}} |
| 316 | + result = await processor.add_message(message) |
| 317 | + |
| 318 | + assert result["status"] == "processed" |
| 319 | + assert len(processed) == 1 |
| 320 | + assert processor.total_processed == 1 |
| 321 | +``` |
| 322 | + |
| 323 | +### Integration Testing |
| 324 | + |
| 325 | +```python |
| 326 | +@pytest.mark.asyncio |
| 327 | +async def test_batch_processing(): |
| 328 | + config = MessageProcessorConfig( |
| 329 | + binding_name="test-binding", |
| 330 | + batch_size=3, # Batch mode |
| 331 | + batch_timeout=1.0 |
| 332 | + ) |
| 333 | + |
| 334 | + batches = [] |
| 335 | + async def callback(messages): |
| 336 | + batches.append(messages) |
| 337 | + |
| 338 | + processor = MessageProcessor(config, process_callback=callback) |
| 339 | + await processor.start() |
| 340 | + |
| 341 | + # Add messages |
| 342 | + for i in range(3): |
| 343 | + await processor.add_message({"id": i}) |
| 344 | + |
| 345 | + # Wait for batch processing |
| 346 | + await asyncio.sleep(0.1) |
| 347 | + |
| 348 | + assert len(batches) == 1 |
| 349 | + assert len(batches[0]) == 3 |
| 350 | + |
| 351 | + await processor.stop() |
| 352 | +``` |
| 353 | + |
| 354 | +## Complete Application Example |
| 355 | + |
| 356 | +See `/atlan-sample-apps/quickstart/simple-message-processor/` for a complete example application using the message processor with Temporal workflows. |
| 357 | + |
| 358 | +## Troubleshooting |
| 359 | + |
| 360 | +### Messages not being processed |
| 361 | + |
| 362 | +1. Check Dapr is running: `dapr list` |
| 363 | +2. Verify binding configuration matches `binding_name` |
| 364 | +3. Check Kafka is accessible and topic exists |
| 365 | +4. Review application logs for errors |
| 366 | + |
| 367 | +### Batch not processing at expected time |
| 368 | + |
| 369 | +1. Verify `batch_timeout` configuration |
| 370 | +2. Check at least one message was received |
| 371 | +3. Ensure processor was started: `await server.start_message_processors()` |
| 372 | + |
| 373 | +### High memory usage |
| 374 | + |
| 375 | +1. Reduce `batch_size` for batch mode |
| 376 | +2. Ensure messages are being processed (check for stuck processors) |
| 377 | +3. Monitor `current_batch_size` in stats endpoint |
| 378 | + |
| 379 | +## References |
| 380 | + |
| 381 | +- [Dapr Kafka Binding](https://docs.dapr.io/reference/components-reference/supported-bindings/kafka/) |
| 382 | +- [Temporal Workflows](https://docs.temporal.io/) |
| 383 | +- [Application SDK Documentation](../docs/) |
| 384 | + |
0 commit comments