Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/workerd/api/r2-api.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ struct R2BindingRequest {
uploadPart @10 :R2UploadPartRequest $Json.flatten();
completeMultipartUpload @11 :R2CompleteMultipartUploadRequest $Json.flatten();
abortMultipartUpload @12 :R2AbortMultipartUploadRequest $Json.flatten();
listParts @13 :R2ListPartsRequest $Json.flatten();
listMultipartUploads @14 :R2ListMultipartUploadsRequest $Json.flatten();
}
}

Expand Down Expand Up @@ -140,6 +142,21 @@ struct R2AbortMultipartUploadRequest {
uploadId @1 :Text;
}

struct R2ListPartsRequest {
object @0 :Text;
uploadId @1 :Text;
maxParts @2 :UInt32 = 0xffffffff;
partNumberMarker @3 :UInt32 = 0;
}

struct R2ListMultipartUploadsRequest {
limit @0 :UInt32 = 0xffffffff;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am just being paranoid, but would a more sensible default here make more sense (i.e AWS S3 defaults for max values?). IIRC 1000

prefix @1 :Text;
cursor @2 :Text;
delimiter @3 :Text;
startAfter @4 :Text;
}

struct R2ListRequest {
limit @0 :UInt32 = 0xffffffff;

Expand Down Expand Up @@ -260,6 +277,35 @@ using R2CompleteMultipartUploadResponse = R2PutResponse;

struct R2AbortMultipartUploadResponse {}

struct R2ListPartsResponse {
uploadId @0 :Text;
object @1 :Text;
parts @2 :List(Part);
truncated @3 :Bool;
partNumberMarker @4 :UInt32;

struct Part {
partNumber @0 :UInt32;
etag @1 :Text;
size @2 :UInt64;
uploadedMillisecondsSinceEpoch @3 :UInt64 $Json.name("uploaded");
}
}

struct R2ListMultipartUploadsResponse {
uploads @0 :List(Upload);
truncated @1 :Bool;
cursor @2 :Text;
delimitedPrefixes @3 :List(Text);

struct Upload {
uploadId @0 :Text;
object @1 :Text;
initiatedMillisecondsSinceEpoch @2 :UInt64 $Json.name("initiated");
storageClass @3 :Text;
}
}

struct R2ListResponse {
objects @0 :List(R2HeadResponse);
truncated @1 :Bool;
Expand Down
109 changes: 109 additions & 0 deletions src/workerd/api/r2-bucket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,8 @@ jsg::Promise<R2Bucket::ListResult> R2Bucket::list(jsg::Lock& js,

KJ_IF_SOME(o, options) {
KJ_IF_SOME(l, o.limit) {
JSG_REQUIRE(l >= 1 && l <= 1000, RangeError,
"limit must be between 1 and 1000 (inclusive). Actual value was: ", l);
listBuilder.setLimit(l);
traceContext.userSpan.setTag("cloudflare.r2.request.limit"_kjc, static_cast<int64_t>(l));
}
Expand Down Expand Up @@ -1209,6 +1211,113 @@ jsg::Promise<R2Bucket::ListResult> R2Bucket::list(jsg::Lock& js,
});
}

jsg::Promise<R2Bucket::ListMultipartUploadsResult> R2Bucket::listMultipartUploads(jsg::Lock& js,
jsg::Optional<ListMultipartUploadsOptions> options,
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();

auto traceSpan = context.makeTraceSpan("r2_listMultipartUploads"_kjc);
auto userSpan = context.makeUserTraceSpan("r2_listMultipartUploads"_kjc);
TraceContext traceContext(kj::mv(traceSpan), kj::mv(userSpan));
auto client = context.getHttpClient(clientIndex, true, kj::none, traceContext);

traceContext.userSpan.setTag("cloudflare.binding.type"_kjc, "r2"_kjc);
KJ_IF_SOME(b, this->bindingName()) {
traceContext.userSpan.setTag("cloudflare.binding.name"_kjc, b);
}
traceContext.userSpan.setTag("cloudflare.r2.operation"_kjc, "ListMultipartUploads"_kjc);
KJ_IF_SOME(b, this->bucketName()) {
traceContext.userSpan.setTag("cloudflare.r2.bucket"_kjc, b);
}

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
json.setHasMode(capnp::HasMode::NON_DEFAULT);
capnp::MallocMessageBuilder requestMessage;

auto requestBuilder = requestMessage.initRoot<R2BindingRequest>();
requestBuilder.setVersion(VERSION_PUBLIC_BETA);
auto listBuilder = requestBuilder.initPayload().initListMultipartUploads();

KJ_IF_SOME(o, options) {
KJ_IF_SOME(l, o.limit) {
listBuilder.setLimit(l);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ListMultipartUploads should replicate the same validation logic we have for ListObjects I believe to be fully compliant with S3.

JSG_REQUIRE(l >= 1 && l <= 1000, RangeError,....

traceContext.userSpan.setTag("cloudflare.r2.request.limit"_kjc, static_cast<int64_t>(l));
}
KJ_IF_SOME(p, o.prefix) {
listBuilder.setPrefix(p.value);
traceContext.userSpan.setTag("cloudflare.r2.request.prefix"_kjc, p.value.asPtr());
}
KJ_IF_SOME(c, o.cursor) {
listBuilder.setCursor(c.value);
traceContext.userSpan.setTag("cloudflare.r2.request.cursor"_kjc, c.value.asPtr());
}
KJ_IF_SOME(d, o.delimiter) {
listBuilder.setDelimiter(d.value);
traceContext.userSpan.setTag("cloudflare.r2.request.delimiter"_kjc, d.value.asPtr());
}
KJ_IF_SOME(s, o.startAfter) {
listBuilder.setStartAfter(s.value);
traceContext.userSpan.setTag("cloudflare.r2.request.start_after"_kjc, s.value.asPtr());
}
}

auto requestJson = json.encode(requestBuilder);

kj::StringPtr components[1];
auto path = fillR2Path(components, adminBucket);
CompatibilityFlags::Reader flags = {};
auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), path, jwt, flags);

return context.awaitIo(js, kj::mv(promise),
[&errorType, traceContext = kj::mv(traceContext)](
jsg::Lock& js, R2Result r2Result) mutable {
addR2ResponseSpanTags(traceContext, r2Result);
r2Result.throwIfError("listMultipartUploads", errorType);

ListMultipartUploadsResult result;
capnp::MallocMessageBuilder responseMessage;
capnp::JsonCodec json;
json.handleByAnnotation<R2ListMultipartUploadsResponse>();
auto responseBuilder = responseMessage.initRoot<R2ListMultipartUploadsResponse>();

json.decode(KJ_ASSERT_NONNULL(r2Result.metadataPayload), responseBuilder);

result.uploads = KJ_MAP(u, responseBuilder.getUploads()) {
MultipartUploadInfo info;
info.key = kj::str(u.getObject());
info.uploadId = kj::str(u.getUploadId());
if (u.getInitiatedMillisecondsSinceEpoch() != 0) {
info.initiated =
kj::UNIX_EPOCH + u.getInitiatedMillisecondsSinceEpoch() * kj::MILLISECONDS;
}
if (u.hasStorageClass()) {
info.storageClass = kj::str(u.getStorageClass());
}
return info;
};
result.truncated = responseBuilder.getTruncated();
if (responseBuilder.hasCursor()) {
result.cursor = kj::str(responseBuilder.getCursor());
traceContext.userSpan.setTag(
"cloudflare.r2.response.cursor"_kjc, KJ_ASSERT_NONNULL(result.cursor).asPtr());
}
if (responseBuilder.hasDelimitedPrefixes()) {
result.delimitedPrefixes =
KJ_MAP(e, responseBuilder.getDelimitedPrefixes()) { return kj::str(e); };
}

traceContext.userSpan.setTag("cloudflare.r2.response.returned_uploads"_kjc,
static_cast<int64_t>(result.uploads.size()));
traceContext.userSpan.setTag("cloudflare.r2.response.delimited_prefixes"_kjc,
static_cast<int64_t>(result.delimitedPrefixes.size()));
traceContext.userSpan.setTag("cloudflare.r2.response.truncated"_kjc, result.truncated);
return kj::mv(result);
});
});
}

namespace {
kj::Array<R2Bucket::Etag> parseConditionalEtagHeader(kj::StringPtr condHeader,
kj::Vector<R2Bucket::Etag> etagAccumulator = kj::Vector<R2Bucket::Etag>(),
Expand Down
46 changes: 46 additions & 0 deletions src/workerd/api/r2-bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,48 @@ class R2Bucket: public jsg::Object {
// from `R2BucketListOptions` to `R2ListOptions`.
};

struct ListMultipartUploadsOptions {
jsg::Optional<int> limit;
jsg::Optional<jsg::NonCoercible<kj::String>> prefix;
jsg::Optional<jsg::NonCoercible<kj::String>> cursor;
jsg::Optional<jsg::NonCoercible<kj::String>> delimiter;
jsg::Optional<jsg::NonCoercible<kj::String>> startAfter;

JSG_STRUCT(limit, prefix, cursor, delimiter, startAfter);
JSG_STRUCT_TS_OVERRIDE(R2ListMultipartUploadsOptions);
};

struct MultipartUploadInfo {
kj::String key;
kj::String uploadId;
jsg::Optional<kj::Date> initiated;
jsg::Optional<kj::String> storageClass;

JSG_STRUCT(key, uploadId, initiated, storageClass);
JSG_STRUCT_TS_OVERRIDE(R2MultipartUploadListing {
key: string;
uploadId: string;
initiated?: Date;
storageClass?: string;
});
};

struct ListMultipartUploadsResult {
kj::Array<MultipartUploadInfo> uploads;
bool truncated;
jsg::Optional<kj::String> cursor;
kj::Array<kj::String> delimitedPrefixes;

JSG_STRUCT(uploads, truncated, cursor, delimitedPrefixes);
JSG_STRUCT_TS_OVERRIDE(type R2MultipartUploads = {
uploads: R2MultipartUploadListing[];
delimitedPrefixes: string[];
} & (
| { truncated: true; cursor: string }
| { truncated: false }
));
};

jsg::Promise<kj::Maybe<jsg::Ref<HeadResult>>> head(jsg::Lock& js,
kj::String key,
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType,
Expand Down Expand Up @@ -478,6 +520,9 @@ class R2Bucket: public jsg::Object {
jsg::Optional<ListOptions> options,
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType,
CompatibilityFlags::Reader flags);
jsg::Promise<ListMultipartUploadsResult> listMultipartUploads(jsg::Lock& js,
jsg::Optional<ListMultipartUploadsOptions> options,
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType);

JSG_RESOURCE_TYPE(R2Bucket, CompatibilityFlags::Reader flags) {
JSG_METHOD(head);
Expand All @@ -487,6 +532,7 @@ class R2Bucket: public jsg::Object {
JSG_METHOD(resumeMultipartUpload);
JSG_METHOD_NAMED(delete, delete_);
JSG_METHOD(list);
JSG_METHOD(listMultipartUploads);

JSG_TS_ROOT();
JSG_TS_OVERRIDE({
Expand Down
28 changes: 28 additions & 0 deletions src/workerd/api/r2-instrumentation-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@ export const test = {
'cloudflare.r2.response.etag': 'partEtag',
closed: true,
},
{
name: 'r2_listParts',
'cloudflare.binding.type': 'r2',
'cloudflare.binding.name': 'BUCKET',
'cloudflare.r2.operation': 'ListParts',
'cloudflare.r2.bucket': 'r2-test',
'cloudflare.r2.request.key': 'basicKey',
'cloudflare.r2.request.upload_id': 'multipartId',
'cloudflare.r2.response.success': true,
'cloudflare.r2.response.returned_parts': 2n,
'cloudflare.r2.response.truncated': false,
closed: true,
},
{
name: 'r2_abortMultipartUpload',
'cloudflare.binding.type': 'r2',
Expand Down Expand Up @@ -135,6 +148,21 @@ export const test = {
'cloudflare.r2.response.custom_metadata': true,
closed: true,
},
{
name: 'r2_listMultipartUploads',
'cloudflare.binding.type': 'r2',
'cloudflare.binding.name': 'BUCKET',
'cloudflare.r2.operation': 'ListMultipartUploads',
'cloudflare.r2.bucket': 'r2-test',
'cloudflare.r2.request.prefix': 'test',
'cloudflare.r2.request.delimiter': '/',
'cloudflare.r2.response.success': true,
'cloudflare.r2.response.returned_uploads': 2n,
'cloudflare.r2.response.delimited_prefixes': 2n,
'cloudflare.r2.response.truncated': true,
'cloudflare.r2.response.cursor': 'nextCursor',
closed: true,
},
{
name: 'r2_list',
'cloudflare.binding.type': 'r2',
Expand Down
93 changes: 93 additions & 0 deletions src/workerd/api/r2-multipart.c++
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,97 @@ jsg::Promise<void> R2MultipartUpload::abort(
});
});
}

jsg::Promise<R2MultipartUpload::ListPartsResult> R2MultipartUpload::listParts(jsg::Lock& js,
jsg::Optional<ListPartsOptions> options,
const jsg::TypeHandler<jsg::Ref<R2Error>>& errorType) {
return js.evalNow([&] {
auto& context = IoContext::current();

auto traceSpan = context.makeTraceSpan("r2_listParts"_kjc);
auto userSpan = context.makeUserTraceSpan("r2_listParts"_kjc);
TraceContext traceContext(kj::mv(traceSpan), kj::mv(userSpan));
auto client = context.getHttpClient(this->bucket->clientIndex, true, kj::none, traceContext);

traceContext.userSpan.setTag("cloudflare.binding.type"_kjc, "r2"_kjc);
KJ_IF_SOME(b, this->bucket->bindingName()) {
traceContext.userSpan.setTag("cloudflare.binding.name"_kjc, b);
}
traceContext.userSpan.setTag("cloudflare.r2.operation"_kjc, "ListParts"_kjc);
KJ_IF_SOME(b, this->bucket->bucketName()) {
traceContext.userSpan.setTag("cloudflare.r2.bucket"_kjc, b);
}
traceContext.userSpan.setTag("cloudflare.r2.request.upload_id"_kjc, uploadId.asPtr());
traceContext.userSpan.setTag("cloudflare.r2.request.key"_kjc, key.asPtr());

capnp::JsonCodec json;
json.handleByAnnotation<R2BindingRequest>();
json.setHasMode(capnp::HasMode::NON_DEFAULT);
capnp::MallocMessageBuilder requestMessage;

auto requestBuilder = requestMessage.initRoot<R2BindingRequest>();
requestBuilder.setVersion(VERSION_PUBLIC_BETA);
auto listPartsBuilder = requestBuilder.initPayload().initListParts();

listPartsBuilder.setObject(key);
listPartsBuilder.setUploadId(uploadId);

KJ_IF_SOME(o, options) {
KJ_IF_SOME(m, o.maxParts) {
JSG_REQUIRE(m >= 1 && m <= 1000, RangeError,
"maxParts must be between 1 and 1000 (inclusive). Actual value was: ", m);
listPartsBuilder.setMaxParts(m);
traceContext.userSpan.setTag(
"cloudflare.r2.request.max_parts"_kjc, static_cast<int64_t>(m));
}
KJ_IF_SOME(p, o.partNumberMarker) {
JSG_REQUIRE(
p >= 0, RangeError, "partNumberMarker must be non-negative. Actual value was: ", p);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is subtle and I learned this recently myself, but the part numbers are 1-indexed so this conditional is bugged since it needs to be strictly greater than.

listPartsBuilder.setPartNumberMarker(p);
traceContext.userSpan.setTag(
"cloudflare.r2.request.part_number_marker"_kjc, static_cast<int64_t>(p));
}
}

auto requestJson = json.encode(requestBuilder);

kj::StringPtr components[1];
auto path = fillR2Path(components, this->bucket->adminBucket);
CompatibilityFlags::Reader flags = {};
auto promise = doR2HTTPGetRequest(kj::mv(client), kj::mv(requestJson), path, kj::none, flags);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but wouldn't this fail if you are explicitly passing in kj::none instead of the JWT?


return context.awaitIo(js, kj::mv(promise),
[&errorType, traceContext = kj::mv(traceContext)](
jsg::Lock& js, R2Result r2Result) mutable {
addR2ResponseSpanTags(traceContext, r2Result);
r2Result.throwIfError("listParts", errorType);

ListPartsResult result;
capnp::MallocMessageBuilder responseMessage;
capnp::JsonCodec json;
json.handleByAnnotation<R2ListPartsResponse>();
auto responseBuilder = responseMessage.initRoot<R2ListPartsResponse>();

json.decode(KJ_ASSERT_NONNULL(r2Result.metadataPayload), responseBuilder);

result.parts = KJ_MAP(p, responseBuilder.getParts()) {
return UploadedPartInfo{
.partNumber = static_cast<int>(p.getPartNumber()),
.etag = kj::str(p.getEtag()),
.size = static_cast<double>(p.getSize()),
.uploaded = kj::UNIX_EPOCH + p.getUploadedMillisecondsSinceEpoch() * kj::MILLISECONDS,
};
};
result.truncated = responseBuilder.getTruncated();
if (result.truncated) {
result.partNumberMarker = static_cast<int>(responseBuilder.getPartNumberMarker());
}

traceContext.userSpan.setTag(
"cloudflare.r2.response.returned_parts"_kjc, static_cast<int64_t>(result.parts.size()));
traceContext.userSpan.setTag("cloudflare.r2.response.truncated"_kjc, result.truncated);
return kj::mv(result);
});
});
}
} // namespace workerd::api::public_beta
Loading
Loading