-
Notifications
You must be signed in to change notification settings - Fork 479
R2-3560: Implement ListParts and ListMultipartUploads for R2 Binding
#5596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1088,6 +1088,8 @@ jsg::Promise<R2Bucket::ListResult> R2Bucket::list(jsg::Lock& js, | |
|
|
||
| KJ_IF_SOME(o, options) { | ||
| KJ_IF_SOME(l, o.limit) { | ||
| JSG_REQUIRE(l >= 1 && l <= 1000, RangeError, | ||
| "limit must be between 1 and 1000 (inclusive). Actual value was: ", l); | ||
| listBuilder.setLimit(l); | ||
| traceContext.userSpan.setTag("cloudflare.r2.request.limit"_kjc, static_cast<int64_t>(l)); | ||
| } | ||
|
|
@@ -1209,6 +1211,113 @@ jsg::Promise<R2Bucket::ListResult> R2Bucket::list(jsg::Lock& js, | |
| }); | ||
| } | ||
|
|
||
| jsg::Promise<R2Bucket::ListMultipartUploadsResult> R2Bucket::listMultipartUploads(jsg::Lock& js, | ||
| jsg::Optional<ListMultipartUploadsOptions> options, | ||
| const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) { | ||
| return js.evalNow([&] { | ||
| auto& context = IoContext::current(); | ||
|
|
||
| auto traceSpan = context.makeTraceSpan("r2_listMultipartUploads"_kjc); | ||
| auto userSpan = context.makeUserTraceSpan("r2_listMultipartUploads"_kjc); | ||
| TraceContext traceContext(kj::mv(traceSpan), kj::mv(userSpan)); | ||
| auto client = context.getHttpClient(clientIndex, true, kj::none, traceContext); | ||
|
|
||
| traceContext.userSpan.setTag("cloudflare.binding.type"_kjc, "r2"_kjc); | ||
| KJ_IF_SOME(b, this->bindingName()) { | ||
| traceContext.userSpan.setTag("cloudflare.binding.name"_kjc, b); | ||
| } | ||
| traceContext.userSpan.setTag("cloudflare.r2.operation"_kjc, "ListMultipartUploads"_kjc); | ||
| KJ_IF_SOME(b, this->bucketName()) { | ||
| traceContext.userSpan.setTag("cloudflare.r2.bucket"_kjc, b); | ||
| } | ||
|
|
||
| capnp::JsonCodec json; | ||
| json.handleByAnnotation<R2BindingRequest>(); | ||
| json.setHasMode(capnp::HasMode::NON_DEFAULT); | ||
| capnp::MallocMessageBuilder requestMessage; | ||
|
|
||
| auto requestBuilder = requestMessage.initRoot<R2BindingRequest>(); | ||
| requestBuilder.setVersion(VERSION_PUBLIC_BETA); | ||
| auto listBuilder = requestBuilder.initPayload().initListMultipartUploads(); | ||
|
|
||
| KJ_IF_SOME(o, options) { | ||
| KJ_IF_SOME(l, o.limit) { | ||
| listBuilder.setLimit(l); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ListMultipartUploads should replicate the same validation logic we have for ListObjects I believe to be fully compliant with S3. |
||
| traceContext.userSpan.setTag("cloudflare.r2.request.limit"_kjc, static_cast<int64_t>(l)); | ||
| } | ||
| KJ_IF_SOME(p, o.prefix) { | ||
| listBuilder.setPrefix(p.value); | ||
| traceContext.userSpan.setTag("cloudflare.r2.request.prefix"_kjc, p.value.asPtr()); | ||
| } | ||
| KJ_IF_SOME(c, o.cursor) { | ||
| listBuilder.setCursor(c.value); | ||
| traceContext.userSpan.setTag("cloudflare.r2.request.cursor"_kjc, c.value.asPtr()); | ||
| } | ||
| KJ_IF_SOME(d, o.delimiter) { | ||
| listBuilder.setDelimiter(d.value); | ||
| traceContext.userSpan.setTag("cloudflare.r2.request.delimiter"_kjc, d.value.asPtr()); | ||
| } | ||
| KJ_IF_SOME(s, o.startAfter) { | ||
| listBuilder.setStartAfter(s.value); | ||
| traceContext.userSpan.setTag("cloudflare.r2.request.start_after"_kjc, s.value.asPtr()); | ||
| } | ||
| } | ||
|
|
||
| auto requestJson = json.encode(requestBuilder); | ||
|
|
||
| kj::StringPtr components[1]; | ||
| auto path = fillR2Path(components, adminBucket); | ||
| CompatibilityFlags::Reader flags = {}; | ||
| auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), path, jwt, flags); | ||
|
|
||
| return context.awaitIo(js, kj::mv(promise), | ||
| [&errorType, traceContext = kj::mv(traceContext)]( | ||
| jsg::Lock& js, R2Result r2Result) mutable { | ||
| addR2ResponseSpanTags(traceContext, r2Result); | ||
| r2Result.throwIfError("listMultipartUploads", errorType); | ||
|
|
||
| ListMultipartUploadsResult result; | ||
| capnp::MallocMessageBuilder responseMessage; | ||
| capnp::JsonCodec json; | ||
| json.handleByAnnotation<R2ListMultipartUploadsResponse>(); | ||
| auto responseBuilder = responseMessage.initRoot<R2ListMultipartUploadsResponse>(); | ||
|
|
||
| json.decode(KJ_ASSERT_NONNULL(r2Result.metadataPayload), responseBuilder); | ||
|
|
||
| result.uploads = KJ_MAP(u, responseBuilder.getUploads()) { | ||
| MultipartUploadInfo info; | ||
| info.key = kj::str(u.getObject()); | ||
| info.uploadId = kj::str(u.getUploadId()); | ||
| if (u.getInitiatedMillisecondsSinceEpoch() != 0) { | ||
| info.initiated = | ||
| kj::UNIX_EPOCH + u.getInitiatedMillisecondsSinceEpoch() * kj::MILLISECONDS; | ||
| } | ||
| if (u.hasStorageClass()) { | ||
| info.storageClass = kj::str(u.getStorageClass()); | ||
| } | ||
| return info; | ||
| }; | ||
| result.truncated = responseBuilder.getTruncated(); | ||
| if (responseBuilder.hasCursor()) { | ||
| result.cursor = kj::str(responseBuilder.getCursor()); | ||
| traceContext.userSpan.setTag( | ||
| "cloudflare.r2.response.cursor"_kjc, KJ_ASSERT_NONNULL(result.cursor).asPtr()); | ||
| } | ||
| if (responseBuilder.hasDelimitedPrefixes()) { | ||
| result.delimitedPrefixes = | ||
| KJ_MAP(e, responseBuilder.getDelimitedPrefixes()) { return kj::str(e); }; | ||
| } | ||
|
|
||
| traceContext.userSpan.setTag("cloudflare.r2.response.returned_uploads"_kjc, | ||
| static_cast<int64_t>(result.uploads.size())); | ||
| traceContext.userSpan.setTag("cloudflare.r2.response.delimited_prefixes"_kjc, | ||
| static_cast<int64_t>(result.delimitedPrefixes.size())); | ||
| traceContext.userSpan.setTag("cloudflare.r2.response.truncated"_kjc, result.truncated); | ||
| return kj::mv(result); | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| namespace { | ||
| kj::Array<R2Bucket::Etag> parseConditionalEtagHeader(kj::StringPtr condHeader, | ||
| kj::Vector<R2Bucket::Etag> etagAccumulator = kj::Vector<R2Bucket::Etag>(), | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -266,4 +266,97 @@ jsg::Promise<void> R2MultipartUpload::abort( | |
| }); | ||
| }); | ||
| } | ||
|
|
||
| jsg::Promise<R2MultipartUpload::ListPartsResult> R2MultipartUpload::listParts(jsg::Lock& js, | ||
| jsg::Optional<ListPartsOptions> options, | ||
| const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) { | ||
| return js.evalNow([&] { | ||
| auto& context = IoContext::current(); | ||
|
|
||
| auto traceSpan = context.makeTraceSpan("r2_listParts"_kjc); | ||
| auto userSpan = context.makeUserTraceSpan("r2_listParts"_kjc); | ||
| TraceContext traceContext(kj::mv(traceSpan), kj::mv(userSpan)); | ||
| auto client = context.getHttpClient(this->bucket->clientIndex, true, kj::none, traceContext); | ||
|
|
||
| traceContext.userSpan.setTag("cloudflare.binding.type"_kjc, "r2"_kjc); | ||
| KJ_IF_SOME(b, this->bucket->bindingName()) { | ||
| traceContext.userSpan.setTag("cloudflare.binding.name"_kjc, b); | ||
| } | ||
| traceContext.userSpan.setTag("cloudflare.r2.operation"_kjc, "ListParts"_kjc); | ||
| KJ_IF_SOME(b, this->bucket->bucketName()) { | ||
| traceContext.userSpan.setTag("cloudflare.r2.bucket"_kjc, b); | ||
| } | ||
| traceContext.userSpan.setTag("cloudflare.r2.request.upload_id"_kjc, uploadId.asPtr()); | ||
| traceContext.userSpan.setTag("cloudflare.r2.request.key"_kjc, key.asPtr()); | ||
|
|
||
| capnp::JsonCodec json; | ||
| json.handleByAnnotation<R2BindingRequest>(); | ||
| json.setHasMode(capnp::HasMode::NON_DEFAULT); | ||
| capnp::MallocMessageBuilder requestMessage; | ||
|
|
||
| auto requestBuilder = requestMessage.initRoot<R2BindingRequest>(); | ||
| requestBuilder.setVersion(VERSION_PUBLIC_BETA); | ||
| auto listPartsBuilder = requestBuilder.initPayload().initListParts(); | ||
|
|
||
| listPartsBuilder.setObject(key); | ||
| listPartsBuilder.setUploadId(uploadId); | ||
|
|
||
| KJ_IF_SOME(o, options) { | ||
| KJ_IF_SOME(m, o.maxParts) { | ||
| JSG_REQUIRE(m >= 1 && m <= 1000, RangeError, | ||
| "maxParts must be between 1 and 1000 (inclusive). Actual value was: ", m); | ||
| listPartsBuilder.setMaxParts(m); | ||
| traceContext.userSpan.setTag( | ||
| "cloudflare.r2.request.max_parts"_kjc, static_cast<int64_t>(m)); | ||
| } | ||
| KJ_IF_SOME(p, o.partNumberMarker) { | ||
| JSG_REQUIRE( | ||
| p >= 0, RangeError, "partNumberMarker must be non-negative. Actual value was: ", p); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is subtle and I learned this recently myself, but the part numbers are 1-indexed so this conditional is bugged since it needs to be strictly greater than. |
||
| listPartsBuilder.setPartNumberMarker(p); | ||
| traceContext.userSpan.setTag( | ||
| "cloudflare.r2.request.part_number_marker"_kjc, static_cast<int64_t>(p)); | ||
| } | ||
| } | ||
|
|
||
| auto requestJson = json.encode(requestBuilder); | ||
|
|
||
| kj::StringPtr components[1]; | ||
| auto path = fillR2Path(components, this->bucket->adminBucket); | ||
| CompatibilityFlags::Reader flags = {}; | ||
| auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), path, kj::none, flags); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe I'm missing something, but wouldn't this fail if you are explicitly passing in |
||
|
|
||
| return context.awaitIo(js, kj::mv(promise), | ||
| [&errorType, traceContext = kj::mv(traceContext)]( | ||
| jsg::Lock& js, R2Result r2Result) mutable { | ||
| addR2ResponseSpanTags(traceContext, r2Result); | ||
| r2Result.throwIfError("listParts", errorType); | ||
|
|
||
| ListPartsResult result; | ||
| capnp::MallocMessageBuilder responseMessage; | ||
| capnp::JsonCodec json; | ||
| json.handleByAnnotation<R2ListPartsResponse>(); | ||
| auto responseBuilder = responseMessage.initRoot<R2ListPartsResponse>(); | ||
|
|
||
| json.decode(KJ_ASSERT_NONNULL(r2Result.metadataPayload), responseBuilder); | ||
|
|
||
| result.parts = KJ_MAP(p, responseBuilder.getParts()) { | ||
| return UploadedPartInfo{ | ||
| .partNumber = static_cast<int>(p.getPartNumber()), | ||
| .etag = kj::str(p.getEtag()), | ||
| .size = static_cast<double>(p.getSize()), | ||
| .uploaded = kj::UNIX_EPOCH + p.getUploadedMillisecondsSinceEpoch() * kj::MILLISECONDS, | ||
| }; | ||
| }; | ||
| result.truncated = responseBuilder.getTruncated(); | ||
| if (result.truncated) { | ||
| result.partNumberMarker = static_cast<int>(responseBuilder.getPartNumberMarker()); | ||
| } | ||
|
|
||
| traceContext.userSpan.setTag( | ||
| "cloudflare.r2.response.returned_parts"_kjc, static_cast<int64_t>(result.parts.size())); | ||
| traceContext.userSpan.setTag("cloudflare.r2.response.truncated"_kjc, result.truncated); | ||
| return kj::mv(result); | ||
| }); | ||
| }); | ||
| } | ||
| } // namespace workerd::api::public_beta | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I am just being paranoid, but would a more sensible default here make more sense (i.e AWS S3 defaults for max values?). IIRC 1000