@@ -213,33 +213,33 @@ class BackupClient[T <: KafkaConsumerInterface](maybeS3Settings: Option[S3Settin
213213 for {
214214 exists <- checkObjectExists(previousState.previousKey)
215215 } yield
216- // The backupToStorageTerminateSink gets called in response to finding in progress multipart uploads. If an S3 object exists
217- // the same key that means that in fact the upload has already been completed so in this case lets not do anything
218- if (exists) {
219- logger.debug(
220- s " Previous upload with uploadId: ${previousState.stateDetails.state.uploadId} and key: ${previousState.previousKey} doesn't actually exist, skipping terminating "
221- )
222- Sink .ignore
223- } else {
224- logger.info(
225- s " Terminating and completing previous backup with key: ${previousState.previousKey} and uploadId: ${previousState.stateDetails.state.uploadId}"
226- )
227- val sink = S3
228- .resumeMultipartUploadWithHeaders(
229- s3Config.dataBucket,
230- previousState.previousKey,
231- previousState.stateDetails.state.uploadId,
232- previousState.stateDetails.state.parts,
233- s3Headers = s3Headers,
234- chunkingParallelism = 1
216+ // The backupToStorageTerminateSink gets called in response to finding in progress multipart uploads. If an S3 object exists
217+ // the same key that means that in fact the upload has already been completed so in this case lets not do anything
218+ if (exists) {
219+ logger.debug(
220+ s " Previous upload with uploadId: ${previousState.stateDetails.state.uploadId} and key: ${previousState.previousKey} doesn't actually exist, skipping terminating "
235221 )
236-
237- val base =
238- sink.mapMaterializedValue(future => future.map(result => Some (result))(ExecutionContext .parasitic))
239-
240- maybeS3Settings
241- .fold(base)(s3Settings => base.withAttributes(S3Attributes .settings(s3Settings)))
242- }
222+ Sink .ignore
223+ } else {
224+ logger.info(
225+ s " Terminating and completing previous backup with key: ${previousState.previousKey} and uploadId: ${previousState.stateDetails.state.uploadId}"
226+ )
227+ val sink = S3
228+ .resumeMultipartUploadWithHeaders(
229+ s3Config.dataBucket,
230+ previousState.previousKey,
231+ previousState.stateDetails.state.uploadId,
232+ previousState.stateDetails.state.parts,
233+ s3Headers = s3Headers,
234+ chunkingParallelism = 1
235+ )
236+
237+ val base =
238+ sink.mapMaterializedValue(future => future.map(result => Some (result))(ExecutionContext .parasitic))
239+
240+ maybeS3Settings
241+ .fold(base)(s3Settings => base.withAttributes(S3Attributes .settings(s3Settings)))
242+ }
243243
244244 }
245245 }
0 commit comments