-
Notifications
You must be signed in to change notification settings - Fork 49
ENHANCE: Change asyncSetPipedExist method logic. #628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3119,99 +3119,100 @@ public CollectionFuture<Integer> asyncBopGetItemCount(String key, | |
| } | ||
|
|
||
| @Override | ||
| public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key, | ||
| List<Object> values) { | ||
| SetPipedExist<Object> exist = new SetPipedExist<Object>(key, values, collectionTranscoder); | ||
| return asyncSetPipedExist(key, exist); | ||
| public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key, List<Object> values) { | ||
| return asyncSopPipedExistBulk(key, values, collectionTranscoder); | ||
| } | ||
|
|
||
| @Override | ||
| public <T> CollectionFuture<Map<T, Boolean>> asyncSopPipedExistBulk(String key, | ||
| List<T> values, | ||
| Transcoder<T> tc) { | ||
| SetPipedExist<T> exist = new SetPipedExist<T>(key, values, tc); | ||
| return asyncSetPipedExist(key, exist); | ||
| if (values.size() == 0) { | ||
| throw new IllegalArgumentException( | ||
| "The number of piped operations must be larger than 0."); | ||
| } | ||
|
|
||
| List<SetPipedExist<T>> existList = new ArrayList<SetPipedExist<T>>(); | ||
| if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) { | ||
| existList.add(new SetPipedExist<T>(key, values, tc)); | ||
| } else { | ||
| PartitionedList<T> partitionedList = new PartitionedList<T>(values, SetPipedExist.MAX_PIPED_ITEM_COUNT); | ||
| for (List<T> partition : partitionedList) { | ||
| existList.add(new SetPipedExist<T>(key, partition, tc)); | ||
| } | ||
| } | ||
brido4125 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return asyncSetPipedExist(key, existList); | ||
| } | ||
|
|
||
| /** | ||
| * Generic pipelined existence operation for set items. Public methods call this method. | ||
| * | ||
| * @param key collection item's key | ||
| * @param exist operation parameters (element values) | ||
| * @param existList list of operation parameters (element values) | ||
| * @return future holding the map of elements and their existence results | ||
| */ | ||
| <T> CollectionFuture<Map<T, Boolean>> asyncSetPipedExist( | ||
| final String key, final SetPipedExist<T> exist) { | ||
|
|
||
| if (exist.getItemCount() == 0) { | ||
| throw new IllegalArgumentException( | ||
| "The number of piped operations must be larger than 0."); | ||
| } | ||
| if (exist.getItemCount() > CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) { | ||
| throw new IllegalArgumentException( | ||
| "The number of piped operations must not exceed a maximum of " | ||
| + CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + "."); | ||
| } | ||
|
|
||
| final CountDownLatch latch = new CountDownLatch(1); | ||
| final CollectionFuture<Map<T, Boolean>> rv = new CollectionFuture<Map<T, Boolean>>( | ||
| latch, operationTimeout); | ||
|
|
||
| Operation op = opFact.collectionPipedExist(key, exist, | ||
| new CollectionPipedExistOperation.Callback() { | ||
|
|
||
| private final Map<T, Boolean> result = new HashMap<T, Boolean>(); | ||
| private boolean hasAnError = false; | ||
|
|
||
| public void receivedStatus(OperationStatus status) { | ||
| if (hasAnError) { | ||
brido4125 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return; | ||
| } | ||
|
|
||
| CollectionOperationStatus cstatus; | ||
| if (status instanceof CollectionOperationStatus) { | ||
| cstatus = (CollectionOperationStatus) status; | ||
| } else { | ||
| final String key, final List<SetPipedExist<T>> existList) { | ||
| final CountDownLatch latch = new CountDownLatch(existList.size()); | ||
| final PipedCollectionFuture<T, Boolean> rv | ||
| = new PipedCollectionFuture<T, Boolean>(latch, operationTimeout); | ||
|
|
||
| for (final SetPipedExist<T> exist : existList) { | ||
| Operation op = opFact.collectionPipedExist(key, exist, new CollectionPipedExistOperation.Callback() { | ||
| private CollectionOperationStatus failedStatus = null; | ||
| private int failStatusCount = 0; | ||
| public void gotStatus(Integer index, OperationStatus status) { | ||
| CollectionOperationStatus cstatus; | ||
| if (status instanceof CollectionOperationStatus) { | ||
| cstatus = (CollectionOperationStatus) status; | ||
| } else { | ||
| getLogger().warn("Unhandled state: " + status); | ||
| cstatus = new CollectionOperationStatus(status); | ||
| } | ||
| switch (cstatus.getResponse()) { | ||
| case EXIST: | ||
| rv.addEachResult(exist.getValues().get(index), true); | ||
| break; | ||
| case NOT_EXIST: | ||
| rv.addEachResult(exist.getValues().get(index), false); | ||
| break; | ||
brido4125 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| case UNREADABLE: | ||
| case TYPE_MISMATCH: | ||
| case NOT_FOUND: | ||
| if (failedStatus == null) { | ||
| failedStatus = cstatus; | ||
| failStatusCount++; | ||
| } else if (failedStatus.equals(cstatus)) { | ||
| failStatusCount++; | ||
| } | ||
| break; | ||
| default: | ||
brido4125 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| getLogger().warn("Unhandled state: " + status); | ||
| cstatus = new CollectionOperationStatus(status); | ||
| } | ||
| rv.set(result, cstatus); | ||
| } | ||
| } | ||
|
|
||
| public void complete() { | ||
| latch.countDown(); | ||
| public void receivedStatus(OperationStatus status) { | ||
| CollectionOperationStatus cstatus; | ||
| if (status instanceof CollectionOperationStatus) { | ||
| cstatus = (CollectionOperationStatus) status; | ||
| } else { | ||
| getLogger().warn("Unhandled state: " + status); | ||
| cstatus = new CollectionOperationStatus(status); | ||
| } | ||
|
|
||
| public void gotStatus(Integer index, OperationStatus status) { | ||
| CollectionOperationStatus cstatus; | ||
| if (status instanceof CollectionOperationStatus) { | ||
| cstatus = (CollectionOperationStatus) status; | ||
| } else { | ||
| cstatus = new CollectionOperationStatus(status); | ||
| } | ||
|
|
||
| switch (cstatus.getResponse()) { | ||
| case EXIST: | ||
| case NOT_EXIST: | ||
| result.put(exist.getValues().get(index), | ||
| (CollectionResponse.EXIST.equals(cstatus | ||
| .getResponse()))); | ||
| break; | ||
| case UNREADABLE: | ||
| case TYPE_MISMATCH: | ||
| case NOT_FOUND: | ||
| hasAnError = true; | ||
| rv.set(new HashMap<T, Boolean>(0), | ||
| (CollectionOperationStatus) status); | ||
| break; | ||
| default: | ||
| getLogger().warn("Unhandled state: " + status); | ||
| } | ||
| if (failedStatus != null && exist.getItemCount() == failStatusCount) { | ||
| rv.setOperationStatus(failedStatus); | ||
| } else { | ||
| rv.setOperationStatus(cstatus); | ||
| } | ||
| }); | ||
| } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 아래 경우에 setOperationStatus 부분이 문제가 될 것 같습니다.
이 경우, 최종 OperationStatus는 NOT_FOUND가 됩니다.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. #628 (comment) 반대의 상황에서도 다만, PR 내의
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 바로 위의 코멘트 (https://github.com/naver/arcus-java-client/pull/628/files#r1530238965) 에 보인 예시에서는
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
최종 @jhpark816 기존 동작으로 수행해야 하위 호환성 문제가 없을 것 같네요.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 현재 구현은 count를 통해서 해당 pipe Op의 모든 연산들이 동일한 원인인 경우, 만약 연산 중 일부는 성공 / 일부는 실패일 경우는 하지만
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @brido4125 N개 pipe 연산에서 i번째 연산이 실패하면 그 뒤에 있는 모든 연산도 함께 실패 처리하는 것이 나을 것 같습니다. @uhm0311 @oliviarla
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
그러면 N개 pipe 연산을 보낼 때 1개를 보내고 응답을 받아서 실패하지 않았으면 다시 다음걸 보내는 방식으로 해야 합니다.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
N개 pipe 연산을 보낼 때, 중간에 에러 발생 등으로 pipeline이 중단되는 경우가 아니면 마지막 N번째 연산을 보내기 전까지는 응답이 돌아오지 않을텐데요.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 500개의 연산을 1개의 pipe 연산으로 지칭한 것입니다.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
exists 파이프만 1개 연산 지원이고, 다른 pipe는 초기 커밋부터 n개를 허용하였기에 그렇다고 보긴 힘들것 같네요 |
||
|
|
||
| rv.setOperation(op); | ||
| addOp(key, op); | ||
| public void complete() { | ||
| latch.countDown(); | ||
| } | ||
| }); | ||
| rv.addOperation(op); | ||
| addOp(key, op); | ||
| } | ||
| return rv; | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.