This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| name: Azure Queue Events | |
| on: | |
| push: | |
| branches: | |
| - '**' # This will run on all branches | |
| env: | |
| AZURE_QUEUE_STORAGE_ACCOUNT: ${{ secrets.AZURE_QUEUE_STORAGE_ACCOUNT }} | |
| AZURE_STORAGE_QUEUE_NAME: events-queue-${{ github.run_id }} | |
| jobs: | |
| process-events: | |
| runs-on: ubuntu-latest | |
| environment: nemo-ci | |
| permissions: | |
| id-token: write | |
| contents: read | |
| steps: | |
| - name: Checkout code | |
| uses: actions/checkout@v4 | |
| - name: Azure Login | |
| uses: azure/login@v2 | |
| with: | |
| client-id: ${{ secrets.AZURE_CLIENT_ID }} | |
| tenant-id: ${{ secrets.AZURE_TENANT_ID }} | |
| subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }} | |
| - name: Set up Python | |
| uses: actions/setup-python@v4 | |
| with: | |
| python-version: '3.12' | |
| - name: Install Azure Storage SDK | |
| run: | | |
| python -m pip install --upgrade pip | |
| pip install azure-storage-queue azure-identity | |
| - name: Process Events | |
| run: | | |
| python - <<EOF | |
| import os | |
| import json | |
| from azure.storage.queue import QueueServiceClient | |
| from azure.identity import DefaultAzureCredential | |
| # Get events list from environment variable | |
| events_list = [{"test": n} for n in range(20)] | |
| # Create queue service client using managed identity | |
| account_url = f"https://{os.environ.get('AZURE_QUEUE_STORAGE_ACCOUNT')}.queue.core.windows.net" | |
| queue_name = os.environ.get('AZURE_STORAGE_QUEUE_NAME') | |
| # Use DefaultAzureCredential which will automatically use the managed identity | |
| credential = DefaultAzureCredential() | |
| queue_service = QueueServiceClient(account_url=account_url, credential=credential) | |
| # Create queue if it doesn't exist | |
| queue_client = queue_service.get_queue_client(queue_name) | |
| queue_client.create_queue() | |
| # Send each event to the queue | |
| for event in events_list: | |
| queue_client.send_message(json.dumps(event)) | |
| print(f"Successfully sent {len(events_list)} events to queue {queue_name}") | |
| EOF | |
| read-events: | |
| needs: process-events | |
| runs-on: ubuntu-latest | |
| environment: nemo-ci | |
| permissions: | |
| id-token: write | |
| contents: read | |
| actions: write # Needed to cancel workflow runs | |
| strategy: | |
| matrix: | |
| worker: [1, 2, 3] | |
| steps: | |
| - name: Checkout code | |
| uses: actions/checkout@v4 | |
| - name: Azure Login | |
| uses: azure/login@v2 | |
| with: | |
| client-id: ${{ secrets.AZURE_CLIENT_ID }} | |
| tenant-id: ${{ secrets.AZURE_TENANT_ID }} | |
| subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }} | |
| - name: Set up Python | |
| uses: actions/setup-python@v4 | |
| with: | |
| python-version: '3.12' | |
| - name: Install Azure Storage SDK | |
| run: | | |
| python -m pip install --upgrade pip | |
| pip install azure-storage-queue azure-identity requests | |
| - name: Read Events | |
| run: | | |
| python - <<EOF | |
| import os | |
| import json | |
| import time | |
| import requests | |
| from azure.storage.queue import QueueServiceClient | |
| from azure.identity import DefaultAzureCredential | |
| def cancel_other_workers(): | |
| # Get GitHub token from environment | |
| token = os.environ.get('GITHUB_TOKEN') | |
| if not token: | |
| print("No GitHub token available to cancel other workers") | |
| return | |
| # Get current run ID and job ID | |
| run_id = os.environ.get('GITHUB_RUN_ID') | |
| current_job_id = os.environ.get('GITHUB_JOB') | |
| headers = { | |
| 'Authorization': f'token {token}', | |
| 'Accept': 'application/vnd.github.v3+json' | |
| } | |
| # Get all jobs in the current workflow run | |
| url = f"https://api.github.com/repos/{os.environ.get('GITHUB_REPOSITORY')}/actions/runs/{run_id}/jobs" | |
| response = requests.get(url, headers=headers) | |
| if response.status_code == 200: | |
| jobs = response.json()['jobs'] | |
| for job in jobs: | |
| # Only cancel other read-events jobs that are in progress | |
| if (job['name'].startswith('read-events') and | |
| job['name'] != current_job_id and | |
| job['status'] in ['queued', 'in_progress']): | |
| cancel_url = f"{url}/{job['id']}/cancel" | |
| cancel_response = requests.post(cancel_url, headers=headers) | |
| if cancel_response.status_code == 202: | |
| print(f"Cancelled job {job['id']} ({job['name']})") | |
| else: | |
| print(f"Failed to cancel job {job['id']}: {cancel_response.text}") | |
| # Create queue service client using managed identity | |
| account_url = f"https://{os.environ.get('AZURE_QUEUE_STORAGE_ACCOUNT')}.queue.core.windows.net" | |
| queue_name = os.environ.get('AZURE_STORAGE_QUEUE_NAME') | |
| worker_id = os.environ.get('WORKER_ID') | |
| # Use DefaultAzureCredential which will automatically use the managed identity | |
| credential = DefaultAzureCredential() | |
| queue_service = QueueServiceClient(account_url=account_url, credential=credential) | |
| queue_client = queue_service.get_queue_client(queue_name) | |
| # Wait for messages to be available (with timeout) | |
| max_attempts = 30 | |
| attempt = 0 | |
| while attempt < max_attempts: | |
| try: | |
| # Try to peek at messages | |
| messages = queue_client.peek_messages(max_messages=1) | |
| if messages: | |
| break | |
| print(f"Worker {worker_id}: No messages yet, attempt {attempt + 1}/{max_attempts}") | |
| time.sleep(2) # Wait 2 seconds between attempts | |
| attempt += 1 | |
| except Exception as e: | |
| print(f"Worker {worker_id}: Error peeking messages: {e}") | |
| time.sleep(2) | |
| attempt += 1 | |
| if attempt >= max_attempts: | |
| print(f"Worker {worker_id}: No messages found in queue after waiting - exiting successfully") | |
| cancel_other_workers() | |
| exit(0) | |
| # Receive and process messages | |
| while True: | |
| try: | |
| messages = queue_client.receive_messages(messages_per_page=1) | |
| if not messages: | |
| print(f"Worker {worker_id}: No more messages in queue - exiting successfully") | |
| cancel_other_workers() | |
| exit(0) | |
| for message in messages: | |
| try: | |
| content = json.loads(message.content) | |
| print(f"Worker {worker_id}: Received message: {json.dumps(content, indent=2)}") | |
| # Delete the message after processing | |
| queue_client.delete_message(message.id, message.pop_receipt) | |
| # Check if there are more messages | |
| peek_messages = queue_client.peek_messages(max_messages=1) | |
| if not peek_messages: | |
| print(f"Worker {worker_id}: No more messages in queue - exiting successfully") | |
| cancel_other_workers() | |
| exit(0) | |
| except json.JSONDecodeError as e: | |
| print(f"Worker {worker_id}: Error decoding message: {e}") | |
| print(f"Worker {worker_id}: Raw message content: {message.content}") | |
| except Exception as e: | |
| print(f"Worker {worker_id}: Error processing messages: {e}") | |
| break | |
| print(f"Worker {worker_id}: Finished processing all messages") | |
| EOF | |
| env: | |
| WORKER_ID: ${{ matrix.worker }} | |
| GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} | |
| cleanup-queue: | |
| needs: read-events | |
| if: always() # Run even if read-events was cancelled | |
| runs-on: ubuntu-latest | |
| environment: nemo-ci | |
| permissions: | |
| id-token: write | |
| contents: read | |
| steps: | |
| - name: Checkout code | |
| uses: actions/checkout@v4 | |
| - name: Azure Login | |
| uses: azure/login@v2 | |
| with: | |
| client-id: ${{ secrets.AZURE_CLIENT_ID }} | |
| tenant-id: ${{ secrets.AZURE_TENANT_ID }} | |
| subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }} | |
| - name: Set up Python | |
| uses: actions/setup-python@v4 | |
| with: | |
| python-version: '3.12' | |
| - name: Install Azure Storage SDK | |
| run: | | |
| python -m pip install --upgrade pip | |
| pip install azure-storage-queue azure-identity | |
| - name: Delete Queue | |
| run: | | |
| python - <<EOF | |
| import os | |
| from azure.storage.queue import QueueServiceClient | |
| from azure.identity import DefaultAzureCredential | |
| # Create queue service client using managed identity | |
| account_url = f"https://{os.environ.get('AZURE_QUEUE_STORAGE_ACCOUNT')}.queue.core.windows.net" | |
| queue_name = os.environ.get('AZURE_STORAGE_QUEUE_NAME') | |
| # Use DefaultAzureCredential which will automatically use the managed identity | |
| credential = DefaultAzureCredential() | |
| queue_service = QueueServiceClient(account_url=account_url, credential=credential) | |
| queue_client = queue_service.get_queue_client(queue_name) | |
| try: | |
| # Delete the queue | |
| queue_client.delete_queue() | |
| print(f"Successfully deleted queue: {queue_name}") | |
| except Exception as e: | |
| print(f"Error deleting queue: {e}") | |
| # Don't fail the job if queue deletion fails | |
| exit(0) | |
| EOF |