@@ -4,7 +4,6 @@ package main
44
55import (
66 "encoding/json"
7- "errors"
87 "fmt"
98 "io"
109
@@ -77,7 +76,7 @@ func main() {
7776 log .Debugf ("Received a message (corr-id: %s, message: %s)" , delivered .CorrelationId , delivered .Body )
7877 err := schema .ValidateJSON (fmt .Sprintf ("%s/ingestion-accession.json" , conf .Broker .SchemasPath ), delivered .Body )
7978 if err != nil {
80- log .Errorf ("validation of incoming message (ingestion-accession) failed, reason: %v " , err )
79+ log .Errorf ("validation of incoming message (ingestion-accession) failed, correlation-id: %s, reason: %v " , delivered . CorrelationId , err )
8180 if err := delivered .Ack (false ); err != nil {
8281 log .Errorf ("Failed acking canceled work, reason: %v" , err )
8382 }
@@ -90,17 +89,17 @@ func main() {
9089 // If the file has been canceled by the uploader, don't spend time working on it.
9190 status , err := db .GetFileStatus (delivered .CorrelationId )
9291 if err != nil {
93- log .Errorf ("failed to get file status, reason: %v" , err )
92+ log .Errorf ("failed to get file status, correlation-id: %s, reason: %v" , delivered . CorrelationId , err )
9493 if err := delivered .Nack (false , true ); err != nil {
95- log .Errorf ("failed to Nack message, reason: (%v) " , err )
94+ log .Errorf ("failed to Nack message, reason: %v " , err )
9695 }
9796
9897 continue
9998 }
10099
101100 switch status {
102101 case "disabled" :
103- log .Infof ("file with correlation ID : %s is disabled, stopping work" , delivered .CorrelationId )
102+ log .Infof ("file with correlation-id : %s is disabled, aborting work" , delivered .CorrelationId )
104103 if err := delivered .Ack (false ); err != nil {
105104 log .Errorf ("Failed acking canceled work, reason: %v" , err )
106105 }
@@ -110,14 +109,14 @@ func main() {
110109 case "verified" :
111110 case "enabled" :
112111 case "ready" :
113- log .Infof ("File with correlation ID %s is already marked as ready." , delivered .CorrelationId )
112+ log .Infof ("File with correlation-id: %s is already marked as ready." , delivered .CorrelationId )
114113 if err := delivered .Ack (false ); err != nil {
115114 log .Errorf ("Failed acking message, reason: %v" , err )
116115 }
117116
118117 continue
119118 default :
120- log .Warnf ("file with correlation ID : %s is not verified yet, stopping work" , delivered .CorrelationId )
119+ log .Warnf ("file with correlation-id : %s is not verified yet, aborting work" , delivered .CorrelationId )
121120 if err := delivered .Nack (false , true ); err != nil {
122121 log .Errorf ("Failed acking canceled work, reason: %v" , err )
123122 }
@@ -127,9 +126,9 @@ func main() {
127126
128127 fileID , err := db .GetFileID (delivered .CorrelationId )
129128 if err != nil {
130- log .Errorf ("failed to get ID for file, reason: %v" , err )
129+ log .Errorf ("failed to get file-id for file with correlation-id: %s , reason: %v" , delivered . CorrelationId , err )
131130 if err := delivered .Nack (false , true ); err != nil {
132- log .Errorf ("failed to Nack message, reason: (%v) " , err )
131+ log .Errorf ("failed to Nack message, reason: %v " , err )
133132 }
134133
135134 continue
@@ -144,24 +143,24 @@ func main() {
144143 completeMsg , _ := json .Marshal (& c )
145144 err = schema .ValidateJSON (fmt .Sprintf ("%s/ingestion-completion.json" , conf .Broker .SchemasPath ), completeMsg )
146145 if err != nil {
147- log .Errorf ("Validation of outgoing message failed, reason: (%v)" , err )
146+ log .Errorf ("Validation of outgoing message ingestion-completion failed, reason: (%v). Message body: %s \n " , err , string ( completeMsg ) )
148147
149148 continue
150149 }
151150
152151 accessionIDExists , err := db .CheckAccessionIDExists (message .AccessionID , fileID )
153152 if err != nil {
154- log .Errorf ("CheckAccessionIdExists failed, reason: %v " , err )
153+ log .Errorf ("CheckAccessionIdExists failed, file-id: %s, reason: %v " , fileID , err )
155154 if err := delivered .Nack (false , true ); err != nil {
156- log .Errorf ("failed to Nack message, reason: (%v) " , err )
155+ log .Errorf ("failed to Nack message, reason: %v " , err )
157156 }
158157
159158 continue
160159 }
161160
162161 switch accessionIDExists {
163162 case "duplicate" :
164- log .Debugf ( "Seems accession ID already exists (corr -id: %s, accessionid : %s" , delivered . CorrelationId , message .AccessionID )
163+ log .Errorf ( " accession ID already exists in the system, file -id: %s, accession-id : %s\n " , fileID , message .AccessionID )
165164 // Send the message to an error queue so it can be analyzed.
166165 fileError := broker.InfoError {
167166 Error : "There is a conflict regarding the file accessionID" ,
@@ -172,32 +171,32 @@ func main() {
172171
173172 // Send the message to an error queue so it can be analyzed.
174173 if e := mq .SendMessage (delivered .CorrelationId , conf .Broker .Exchange , "error" , body ); e != nil {
175- log .Errorf ("failed to publish message, reason: (%v) " , err )
174+ log .Errorf ("failed to publish message, reason: %v " , err )
176175 }
177176
178177 if err := delivered .Ack (false ); err != nil {
179- log .Errorf ("failed to Ack message, reason: (%v) " , err )
178+ log .Errorf ("failed to Ack message, reason: %v " , err )
180179 }
181180
182181 continue
183182 case "same" :
184- log .Infoln ("file already has a stable ID, marking it as ready" )
183+ log .Infof ("file already has a stable ID, marking it as ready, file-id: %s" , fileID )
185184 default :
186185 if conf .Backup .Type != "" && conf .Archive .Type != "" {
187186 if err = backupFile (delivered ); err != nil {
188- log .Errorf ("Failed to backup file with corrID : %v , reason: %v" , delivered . CorrelationId , err )
187+ log .Errorf ("failed to backup file, file-id : %s , reason: %v" , fileID , err )
189188 if err := delivered .Nack (false , true ); err != nil {
190- log .Errorf ("failed to Nack message, reason: (%v) " , err )
189+ log .Errorf ("failed to Nack message, reason: %v " , err )
191190 }
192191
193192 continue
194193 }
195194 }
196195
197196 if err := db .SetAccessionID (message .AccessionID , fileID ); err != nil {
198- log .Errorf ("Failed to set accessionID for file with corrID : %v , reason: %v" , delivered . CorrelationId , err )
197+ log .Errorf ("failed to set accessionID for file, file-id : %s , reason: %v" , fileID , err )
199198 if err := delivered .Nack (false , true ); err != nil {
200- log .Errorf ("failed to Nack message, reason: (%v) " , err )
199+ log .Errorf ("failed to Nack message, reason: %v " , err )
201200 }
202201
203202 continue
@@ -206,25 +205,25 @@ func main() {
206205
207206 // Mark file as "ready"
208207 if err := db .UpdateFileEventLog (fileID , "ready" , delivered .CorrelationId , "finalize" , "{}" , string (delivered .Body )); err != nil {
209- log .Errorf ("set status ready failed, reason: (%v)" , err )
208+ log .Errorf ("set status ready failed, file-id: %s, reason: %v" , fileID , err )
210209 if err := delivered .Nack (false , true ); err != nil {
211- log .Errorf ("failed to Nack message, reason: (%v) " , err )
210+ log .Errorf ("failed to Nack message, reason: %v " , err )
212211 }
213212
214213 continue
215214 }
216215
217216 if err := mq .SendMessage (delivered .CorrelationId , conf .Broker .Exchange , conf .Broker .RoutingKey , completeMsg ); err != nil {
218- log .Errorf ("failed to publish message, reason: (%v) " , err )
217+ log .Errorf ("failed to publish message, reason: %v " , err )
219218 if err := delivered .Nack (false , true ); err != nil {
220- log .Errorf ("failed to Nack message, reason: (%v) " , err )
219+ log .Errorf ("failed to Nack message, reason: %v " , err )
221220 }
222221
223222 continue
224223 }
225224
226225 if err := delivered .Ack (false ); err != nil {
227- log .Errorf ("failed to Ack message, reason: (%v) " , err )
226+ log .Errorf ("failed to Ack message, reason: %v " , err )
228227 }
229228 }
230229 }()
@@ -234,12 +233,12 @@ func main() {
234233
235234func backupFile (delivered amqp.Delivery ) error {
236235 log .Debug ("Backup initiated" )
237- fileUUID , err := db .GetFileID (delivered .CorrelationId )
236+ fileID , err := db .GetFileID (delivered .CorrelationId )
238237 if err != nil {
239238 return fmt .Errorf ("failed to get ID for file, reason: %s" , err .Error ())
240239 }
241240
242- filePath , fileSize , err := db .GetArchived (fileUUID )
241+ filePath , fileSize , err := db .GetArchived (fileID )
243242 if err != nil {
244243 return fmt .Errorf ("failed to get file archive information, reason: %v" , err )
245244 }
@@ -251,7 +250,7 @@ func backupFile(delivered amqp.Delivery) error {
251250 }
252251
253252 if diskFileSize != int64 (fileSize ) {
254- return errors . New ( " file size in archive does not match database for archive file" )
253+ return fmt . Errorf ( "archive file size does not match registered file size, (disk size: %d, db size: %d)" , diskFileSize , fileSize )
255254 }
256255
257256 file , err := archive .NewFileReader (filePath )
@@ -273,7 +272,7 @@ func backupFile(delivered amqp.Delivery) error {
273272 }
274273
275274 // Mark file as "backed up"
276- if err := db .UpdateFileEventLog (fileUUID , "backed up" , delivered .CorrelationId , "finalize" , "{}" , string (delivered .Body )); err != nil {
275+ if err := db .UpdateFileEventLog (fileID , "backed up" , delivered .CorrelationId , "finalize" , "{}" , string (delivered .Body )); err != nil {
277276 return fmt .Errorf ("UpdateFileEventLog failed, reason: (%v)" , err )
278277 }
279278
0 commit comments