Skip to content

Commit 86ed608

Browse files
authored
Merge pull request #309 from zilliztech/azure-fix
fix azure abort err
2 parents 7a2066c + decf390 commit 86ed608

File tree

1 file changed

+27
-6
lines changed

1 file changed

+27
-6
lines changed

core/storage/azure_object_storage.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
3030
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
3131
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
32+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
3233
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
3334
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
3435
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
@@ -200,19 +201,39 @@ func (aos *AzureObjectStorage) RemoveObject(ctx context.Context, bucketName, obj
200201
}
201202

202203
func (aos *AzureObjectStorage) CopyObject(ctx context.Context, fromBucketName, toBucketName, fromPath, toPath string) error {
204+
var blobCli *blockblob.Client
205+
var fromPathUrl string
203206
if aos.clients[fromBucketName].accessKeyID == aos.clients[toBucketName].accessKeyID {
204-
fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath)
205-
_, err := aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil)
206-
return err
207+
fromPathUrl = fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath)
208+
blobCli = aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath)
207209
} else {
208210
srcSAS, err := aos.getSAS(fromBucketName)
209211
if err != nil {
210212
return err
211213
}
212-
fromPathUrl := fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s?%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath, srcSAS.Encode())
213-
_, err = aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath).StartCopyFromURL(ctx, fromPathUrl, nil)
214-
return err
214+
fromPathUrl = fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s?%s", aos.clients[fromBucketName].accessKeyID, fromBucketName, fromPath, srcSAS.Encode())
215+
blobCli = aos.clients[toBucketName].client.NewContainerClient(toBucketName).NewBlockBlobClient(toPath)
216+
}
217+
218+
// we need to abort the previous copy operation before copy from url
219+
abortErr := func() error {
220+
blobProperties, err := blobCli.BlobClient().GetProperties(ctx, nil)
221+
if err != nil {
222+
return fmt.Errorf("storage: azure get properties %w", err)
223+
}
224+
if blobProperties.CopyID != nil {
225+
if _, err = blobCli.AbortCopyFromURL(ctx, *blobProperties.CopyID, nil); err != nil {
226+
return fmt.Errorf("storage: azure abort copy from url %w", err)
227+
}
228+
}
229+
return nil
230+
}()
231+
232+
if _, err := blobCli.CopyFromURL(ctx, fromPathUrl, nil); err != nil {
233+
return fmt.Errorf("storage: azure copy from url %w abort previous %w", err, abortErr)
215234
}
235+
236+
return nil
216237
}
217238

218239
func (aos *AzureObjectStorage) getSAS(bucket string) (*sas.QueryParameters, error) {

0 commit comments

Comments
 (0)