Skip to content

test

test #3

name: Azure Queue Events
on:
push:
branches:
- '**' # This will run on all branches
env:
AZURE_STORAGE_ACCOUNT: ${{ secrets.AZURE_STORAGE_ACCOUNT }}
AZURE_STORAGE_QUEUE_NAME: events-queue-${{ github.run_id }}
jobs:
process-events:
runs-on: ubuntu-latest
environment: nemo-ci
permissions:
id-token: write
contents: read
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Azure Login
uses: azure/login@v2
with:
client-id: ${{ secrets.AZURE_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.12'
- name: Install Azure Storage SDK
run: |
python -m pip install --upgrade pip
pip install azure-storage-queue azure-identity
- name: Process Events
run: |
python - <<EOF
import os
import json
from azure.storage.queue import QueueServiceClient
from azure.identity import DefaultAzureCredential
# Get events list from environment variable
events_list = json.loads(os.environ.get('EVENTS_LIST', '[{"test": "test"}]'))
# Create queue service client using managed identity
account_url = f"https://{os.environ.get('AZURE_STORAGE_ACCOUNT')}.queue.core.windows.net"
queue_name = os.environ.get('AZURE_STORAGE_QUEUE_NAME')
# Use DefaultAzureCredential which will automatically use the managed identity
credential = DefaultAzureCredential()
queue_service = QueueServiceClient(account_url=account_url, credential=credential)
# Create queue if it doesn't exist
queue_client = queue_service.get_queue_client(queue_name)
queue_client.create_queue()
# Send each event to the queue
for event in events_list:
queue_client.send_message(json.dumps(event))
print(f"Successfully sent {len(events_list)} events to queue {queue_name}")
EOF