Skip to content

tst

tst #9

name: Azure Queue Events
on:
push:
branches:
- '**' # This will run on all branches
env:
AZURE_QUEUE_STORAGE_ACCOUNT: ${{ secrets.AZURE_QUEUE_STORAGE_ACCOUNT }}
AZURE_STORAGE_QUEUE_NAME: events-queue-${{ github.run_id }}
jobs:
process-events:
runs-on: ubuntu-latest
environment: nemo-ci
permissions:
id-token: write
contents: read
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Azure Login
uses: azure/login@v2
with:
client-id: ${{ secrets.AZURE_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.12'
- name: Install Azure Storage SDK
run: |
python -m pip install --upgrade pip
pip install azure-storage-queue azure-identity
- name: Process Events
run: |
python - <<EOF
import os
import json
from azure.storage.queue import QueueServiceClient
from azure.identity import DefaultAzureCredential
# Get events list from environment variable
events_list = [{"test": n} for n in range(20)]
# Create queue service client using managed identity
account_url = f"https://{os.environ.get('AZURE_QUEUE_STORAGE_ACCOUNT')}.queue.core.windows.net"
queue_name = os.environ.get('AZURE_STORAGE_QUEUE_NAME')
# Use DefaultAzureCredential which will automatically use the managed identity
credential = DefaultAzureCredential()
queue_service = QueueServiceClient(account_url=account_url, credential=credential)
# Create queue if it doesn't exist
queue_client = queue_service.get_queue_client(queue_name)
queue_client.create_queue()
# Send each event to the queue
for event in events_list:
queue_client.send_message(json.dumps(event))
print(f"Successfully sent {len(events_list)} events to queue {queue_name}")
EOF
read-events:
needs: process-events
runs-on: ubuntu-latest
environment: nemo-ci
permissions:
id-token: write
contents: read
actions: write # Needed to cancel workflow runs
strategy:
matrix:
worker: [1, 2, 3]
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Azure Login
uses: azure/login@v2
with:
client-id: ${{ secrets.AZURE_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.12'
- name: Install Azure Storage SDK
run: |
python -m pip install --upgrade pip
pip install azure-storage-queue azure-identity requests
- name: Read Events
run: |
python - <<EOF
import os
import json
import time
import requests
from azure.storage.queue import QueueServiceClient
from azure.identity import DefaultAzureCredential
def cancel_other_workers():
# Get GitHub token from environment
token = os.environ.get('GITHUB_TOKEN')
if not token:
print("No GitHub token available to cancel other workers")
return
# Get current run ID and job ID
run_id = os.environ.get('GITHUB_RUN_ID')
current_job_id = os.environ.get('GITHUB_JOB')
headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3+json'
}
# Get all jobs in the current workflow run
url = f"https://api.github.com/repos/{os.environ.get('GITHUB_REPOSITORY')}/actions/runs/{run_id}/jobs"
response = requests.get(url, headers=headers)
if response.status_code == 200:
jobs = response.json()['jobs']
for job in jobs:
# Only cancel other read-events jobs that are in progress
if (job['name'].startswith('read-events') and
job['name'] != current_job_id and
job['status'] in ['queued', 'in_progress']):
cancel_url = f"{url}/{job['id']}/cancel"
cancel_response = requests.post(cancel_url, headers=headers)
if cancel_response.status_code == 202:
print(f"Cancelled job {job['id']} ({job['name']})")
else:
print(f"Failed to cancel job {job['id']}: {cancel_response.text}")
# Create queue service client using managed identity
account_url = f"https://{os.environ.get('AZURE_QUEUE_STORAGE_ACCOUNT')}.queue.core.windows.net"
queue_name = os.environ.get('AZURE_STORAGE_QUEUE_NAME')
worker_id = os.environ.get('WORKER_ID')
# Use DefaultAzureCredential which will automatically use the managed identity
credential = DefaultAzureCredential()
queue_service = QueueServiceClient(account_url=account_url, credential=credential)
queue_client = queue_service.get_queue_client(queue_name)
# Wait for messages to be available (with timeout)
max_attempts = 30
attempt = 0
while attempt < max_attempts:
try:
# Try to peek at messages
messages = queue_client.peek_messages(max_messages=1)
if messages:
break
print(f"Worker {worker_id}: No messages yet, attempt {attempt + 1}/{max_attempts}")
time.sleep(2) # Wait 2 seconds between attempts
attempt += 1
except Exception as e:
print(f"Worker {worker_id}: Error peeking messages: {e}")
time.sleep(2)
attempt += 1
if attempt >= max_attempts:
print(f"Worker {worker_id}: No messages found in queue after waiting - exiting successfully")
cancel_other_workers()
exit(0)
# Receive and process messages
while True:
try:
messages = queue_client.receive_messages(messages_per_page=1)
if not messages:
print(f"Worker {worker_id}: No more messages in queue - exiting successfully")
cancel_other_workers()
exit(0)
for message in messages:
try:
content = json.loads(message.content)
print(f"Worker {worker_id}: Received message: {json.dumps(content, indent=2)}")
# Delete the message after processing
queue_client.delete_message(message.id, message.pop_receipt)
# Check if there are more messages
peek_messages = queue_client.peek_messages(max_messages=1)
if not peek_messages:
print(f"Worker {worker_id}: No more messages in queue - exiting successfully")
cancel_other_workers()
exit(0)
except json.JSONDecodeError as e:
print(f"Worker {worker_id}: Error decoding message: {e}")
print(f"Worker {worker_id}: Raw message content: {message.content}")
except Exception as e:
print(f"Worker {worker_id}: Error processing messages: {e}")
break
print(f"Worker {worker_id}: Finished processing all messages")
EOF
env:
WORKER_ID: ${{ matrix.worker }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
cleanup-queue:
needs: read-events
if: always() # Run even if read-events was cancelled
runs-on: ubuntu-latest
environment: nemo-ci
permissions:
id-token: write
contents: read
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Azure Login
uses: azure/login@v2
with:
client-id: ${{ secrets.AZURE_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.12'
- name: Install Azure Storage SDK
run: |
python -m pip install --upgrade pip
pip install azure-storage-queue azure-identity
- name: Delete Queue
run: |
python - <<EOF
import os
from azure.storage.queue import QueueServiceClient
from azure.identity import DefaultAzureCredential
# Create queue service client using managed identity
account_url = f"https://{os.environ.get('AZURE_QUEUE_STORAGE_ACCOUNT')}.queue.core.windows.net"
queue_name = os.environ.get('AZURE_STORAGE_QUEUE_NAME')
# Use DefaultAzureCredential which will automatically use the managed identity
credential = DefaultAzureCredential()
queue_service = QueueServiceClient(account_url=account_url, credential=credential)
queue_client = queue_service.get_queue_client(queue_name)
try:
# Delete the queue
queue_client.delete_queue()
print(f"Successfully deleted queue: {queue_name}")
except Exception as e:
print(f"Error deleting queue: {e}")
# Don't fail the job if queue deletion fails
exit(0)
EOF