11from __future__ import annotations
22
3+ import signal
34import logging
45import sys
56from concurrent .futures import Future
67from concurrent .futures import ThreadPoolExecutor
78from concurrent .futures import as_completed
89from enum import Enum
910from pathlib import Path
10- from time import monotonic
11+ from time import monotonic , sleep
1112from typing import Dict
1213from typing import NamedTuple
1314from typing import Optional
3940DOWNLOAD_CHUNK_SIZE = 1024 * 1024 * 16
4041DOWNLOAD_URL = "/files/download"
4142
43+ MAX_UPLOAD_RETRIES = 3
4244S3_MAX_RETRIES = 60 # same as frontend
4345S3_READ_TIMEOUT = 60 * 5 # 5 minutes
4446
@@ -66,8 +68,8 @@ def _cancel_file_upload(
6668 client : AuthenticatedClient , file_id : UUID , mission_id : UUID
6769) -> None :
6870 data = {
69- "uuid " : [str (file_id )],
70- "missionUUID " : str (mission_id ),
71+ "uuids " : [str (file_id )],
72+ "missionUuid " : str (mission_id ),
7173 }
7274 resp = client .post (UPLOAD_CANCEL , json = data )
7375 resp .raise_for_status ()
@@ -151,6 +153,37 @@ class UploadState(Enum):
151153 CANCELED = 3
152154
153155
156+ def _get_upload_credentials_with_retry (
157+ client , pbar , filename , mission_id , max_attempts = 5
158+ ):
159+ """
160+ Retrieves upload credentials with retry logic.
161+
162+ Args:
163+ client: The client object used for retrieving credentials.
164+ filename: The internal filename.
165+ mission_id: The mission ID.
166+ max_attempts: Maximum number of retry attempts.
167+
168+ Returns:
169+ The upload credentials or None if retrieval fails after all attempts.
170+ """
171+ attempt = 0
172+ while attempt < max_attempts :
173+ creds = _get_upload_creditials (
174+ client , internal_filename = filename , mission_id = mission_id
175+ )
176+ if creds is not None :
177+ return creds
178+
179+ attempt += 1
180+ if attempt < max_attempts :
181+ delay = 2 ** attempt # Exponential backoff (2, 4, 8, 16...)
182+ sleep (delay )
183+
184+ return None
185+
186+
154187# TODO: i dont want to handle errors at this level
155188def upload_file (
156189 client : AuthenticatedClient ,
@@ -161,40 +194,54 @@ def upload_file(
161194 verbose : bool = False ,
162195 s3_endpoint : Optional [str ] = None ,
163196) -> Tuple [UploadState , int ]:
164- """\
197+ """
165198 returns UploadState and bytes uploaded (0 if not uploaded)
199+ Retries up to 3 times on failure.
166200 """
167201 if s3_endpoint is None :
168202 s3_endpoint = get_config ().endpoint .s3
169203
170204 total_size = path .stat ().st_size
171- with tqdm (
172- total = total_size ,
173- unit = "B" ,
174- unit_scale = True ,
175- desc = f"uploading { path } ..." ,
176- leave = False ,
177- disable = not verbose ,
178- ) as pbar :
179- # get per file upload credentials
180- creds = _get_upload_creditials (
181- client , internal_filename = filename , mission_id = mission_id
182- )
183- if creds is None :
184- return UploadState .EXISTS , 0
205+ for attempt in range (MAX_UPLOAD_RETRIES ):
206+ with tqdm (
207+ total = total_size ,
208+ unit = "B" ,
209+ unit_scale = True ,
210+ desc = f"uploading { path } ..." ,
211+ leave = False ,
212+ disable = not verbose ,
213+ ) as pbar :
214+
215+ # get per file upload credentials
216+ creds = _get_upload_credentials_with_retry (
217+ client , pbar , filename , mission_id , max_attempts = 5 if attempt > 0 else 1
218+ )
219+
220+ if creds is None :
221+ return UploadState .EXISTS , 0
185222
186- try :
187- _s3_upload (path , endpoint = s3_endpoint , credentials = creds , pbar = pbar )
188- except Exception as e :
189- logger .error (format_traceback (e ))
190223 try :
191- _cancel_file_upload (client , creds .file_id , mission_id )
192- except Exception as cancel_e :
193- logger .error (f"Failed to cancel upload for { creds .file_id } : { cancel_e } " )
194- raise e from e
195- else :
196- _confirm_file_upload (client , creds .file_id , b64_md5 (path ))
197- return UploadState .UPLOADED , total_size
224+ _s3_upload (path , endpoint = s3_endpoint , credentials = creds , pbar = pbar )
225+ except Exception as e :
226+ logger .error (format_traceback (e ))
227+ try :
228+ _cancel_file_upload (client , creds .file_id , mission_id )
229+ except Exception as cancel_e :
230+ logger .error (
231+ f"Failed to cancel upload for { creds .file_id } : { cancel_e } "
232+ )
233+
234+ if attempt < 2 : # Retry if not the last attempt
235+ pbar .update (0 )
236+ logger .error (f"Retrying upload for { attempt + 1 } " )
237+ continue
238+ else :
239+ logger .error (f"Cancelling upload for { attempt } " )
240+ raise e from e
241+
242+ else :
243+ _confirm_file_upload (client , creds .file_id , b64_md5 (path ))
244+ return UploadState .UPLOADED , total_size
198245
199246
200247def _get_file_download (client : AuthenticatedClient , id : UUID ) -> str :
@@ -420,6 +467,9 @@ def upload_files(
420467 ) as pbar :
421468 start = monotonic ()
422469 futures : Dict [Future [Tuple [UploadState , int ]], Path ] = {}
470+
471+ skipped_files = 0
472+ failed_files = 0
423473 with ThreadPoolExecutor (max_workers = n_workers ) as executor :
424474 for name , path in files .items ():
425475 if not path .is_file ():
@@ -441,6 +491,16 @@ def upload_files(
441491
442492 total_uploaded_bytes = 0
443493 for future in as_completed (futures ):
494+
495+ if future .exception ():
496+ failed_files += 1
497+
498+ if (
499+ future .exception () is None
500+ and future .result ()[0 ] == UploadState .EXISTS
501+ ):
502+ skipped_files += 1
503+
444504 path = futures [future ]
445505 uploaded_bytes = _upload_handler (future , path , verbose = verbose )
446506 total_uploaded_bytes += uploaded_bytes
@@ -455,6 +515,16 @@ def upload_files(
455515 console .print (f"Total uploaded: { format_bytes (total_uploaded_bytes )} " )
456516 console .print (f"Average speed: { format_bytes (avg_speed_bps , speed = True )} " )
457517
518+ if failed_files > 0 :
519+ console .print (
520+ f"\n Uploaded { len (files ) - failed_files - skipped_files } files, { skipped_files } skipped, { failed_files } uploads failed" ,
521+ style = "red" ,
522+ )
523+ else :
524+ console .print (
525+ f"\n Uploaded { len (files ) - skipped_files } files, { skipped_files } skipped"
526+ )
527+
458528
459529def download_files (
460530 client : AuthenticatedClient ,
0 commit comments