66 - ' **' # This will run on all branches
77
88env :
9- AZURE_STORAGE_ACCOUNT : ${{ secrets.AZURE_STORAGE_ACCOUNT }}
9+ AZURE_QUEUE_STORAGE_ACCOUNT : ${{ secrets.AZURE_QUEUE_STORAGE_ACCOUNT }}
1010 AZURE_STORAGE_QUEUE_NAME : events-queue-${{ github.run_id }}
1111
1212jobs :
4949 events_list = json.loads(os.environ.get('EVENTS_LIST', '[{"test": "test"}]'))
5050
5151 # Create queue service client using managed identity
52- account_url = f"https://{os.environ.get('AZURE_STORAGE_ACCOUNT ')}.queue.core.windows.net"
52+ account_url = f"https://{os.environ.get('AZURE_QUEUE_STORAGE_ACCOUNT ')}.queue.core.windows.net"
5353 queue_name = os.environ.get('AZURE_STORAGE_QUEUE_NAME')
5454
5555 # Use DefaultAzureCredential which will automatically use the managed identity
@@ -66,3 +66,154 @@ jobs:
6666
6767 print(f"Successfully sent {len(events_list)} events to queue {queue_name}")
6868 EOF
69+
70+ read-events :
71+ needs : process-events
72+ runs-on : ubuntu-latest
73+ environment : nemo-ci
74+ permissions :
75+ id-token : write
76+ contents : read
77+ steps :
78+ - name : Checkout code
79+ uses : actions/checkout@v4
80+
81+ - name : Azure Login
82+ uses : azure/login@v2
83+ with :
84+ client-id : ${{ secrets.AZURE_CLIENT_ID }}
85+ tenant-id : ${{ secrets.AZURE_TENANT_ID }}
86+ subscription-id : ${{ secrets.AZURE_SUBSCRIPTION_ID }}
87+
88+ - name : Set up Python
89+ uses : actions/setup-python@v4
90+ with :
91+ python-version : ' 3.12'
92+
93+ - name : Install Azure Storage SDK
94+ run : |
95+ python -m pip install --upgrade pip
96+ pip install azure-storage-queue azure-identity
97+
98+ - name : Read Events
99+ run : |
100+ python - <<EOF
101+ import os
102+ import json
103+ import time
104+ from azure.storage.queue import QueueServiceClient
105+ from azure.identity import DefaultAzureCredential
106+
107+ # Create queue service client using managed identity
108+ account_url = f"https://{os.environ.get('AZURE_QUEUE_STORAGE_ACCOUNT')}.queue.core.windows.net"
109+ queue_name = os.environ.get('AZURE_STORAGE_QUEUE_NAME')
110+
111+ # Use DefaultAzureCredential which will automatically use the managed identity
112+ credential = DefaultAzureCredential()
113+ queue_service = QueueServiceClient(account_url=account_url, credential=credential)
114+ queue_client = queue_service.get_queue_client(queue_name)
115+
116+ # Wait for messages to be available (with timeout)
117+ max_attempts = 30
118+ attempt = 0
119+ while attempt < max_attempts:
120+ try:
121+ # Try to peek at messages
122+ messages = queue_client.peek_messages(max_messages=1)
123+ if messages:
124+ break
125+ print(f"No messages yet, attempt {attempt + 1}/{max_attempts}")
126+ time.sleep(2) # Wait 2 seconds between attempts
127+ attempt += 1
128+ except Exception as e:
129+ print(f"Error peeking messages: {e}")
130+ time.sleep(2)
131+ attempt += 1
132+
133+ if attempt >= max_attempts:
134+ print("No messages found in queue after waiting - exiting successfully")
135+ exit(0) # Exit with success code
136+
137+ # Receive and process messages
138+ while True:
139+ try:
140+ messages = queue_client.receive_messages(messages_per_page=1)
141+ if not messages:
142+ print("No more messages in queue - exiting successfully")
143+ exit(0)
144+
145+ for message in messages:
146+ try:
147+ content = json.loads(message.content)
148+ print(f"Received message: {json.dumps(content, indent=2)}")
149+ # Delete the message after processing
150+ queue_client.delete_message(message.id, message.pop_receipt)
151+
152+ # Check if there are more messages
153+ peek_messages = queue_client.peek_messages(max_messages=1)
154+ if not peek_messages:
155+ print("No more messages in queue - exiting successfully")
156+ exit(0)
157+ except json.JSONDecodeError as e:
158+ print(f"Error decoding message: {e}")
159+ print(f"Raw message content: {message.content}")
160+ except Exception as e:
161+ print(f"Error processing messages: {e}")
162+ break
163+
164+ print("Finished processing all messages")
165+ EOF
166+
167+ cleanup-queue :
168+ needs : read-events
169+ runs-on : ubuntu-latest
170+ environment : nemo-ci
171+ permissions :
172+ id-token : write
173+ contents : read
174+ steps :
175+ - name : Checkout code
176+ uses : actions/checkout@v4
177+
178+ - name : Azure Login
179+ uses : azure/login@v2
180+ with :
181+ client-id : ${{ secrets.AZURE_CLIENT_ID }}
182+ tenant-id : ${{ secrets.AZURE_TENANT_ID }}
183+ subscription-id : ${{ secrets.AZURE_SUBSCRIPTION_ID }}
184+
185+ - name : Set up Python
186+ uses : actions/setup-python@v4
187+ with :
188+ python-version : ' 3.12'
189+
190+ - name : Install Azure Storage SDK
191+ run : |
192+ python -m pip install --upgrade pip
193+ pip install azure-storage-queue azure-identity
194+
195+ - name : Delete Queue
196+ run : |
197+ python - <<EOF
198+ import os
199+ from azure.storage.queue import QueueServiceClient
200+ from azure.identity import DefaultAzureCredential
201+
202+ # Create queue service client using managed identity
203+ account_url = f"https://{os.environ.get('AZURE_QUEUE_STORAGE_ACCOUNT')}.queue.core.windows.net"
204+ queue_name = os.environ.get('AZURE_STORAGE_QUEUE_NAME')
205+
206+ # Use DefaultAzureCredential which will automatically use the managed identity
207+ credential = DefaultAzureCredential()
208+ queue_service = QueueServiceClient(account_url=account_url, credential=credential)
209+ queue_client = queue_service.get_queue_client(queue_name)
210+
211+ try:
212+ # Delete the queue
213+ queue_client.delete_queue()
214+ print(f"Successfully deleted queue: {queue_name}")
215+ except Exception as e:
216+ print(f"Error deleting queue: {e}")
217+ # Don't fail the job if queue deletion fails
218+ exit(0)
219+ EOF
0 commit comments