Skip to content

Commit febf089

Browse files
committed
ENHANCE: Change asyncSetPipedExist method logic.
1 parent 5dde87b commit febf089

File tree

2 files changed

+104
-75
lines changed

2 files changed

+104
-75
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

Lines changed: 74 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,15 @@
119119
import net.spy.memcached.compat.log.Logger;
120120
import net.spy.memcached.compat.log.LoggerFactory;
121121
import net.spy.memcached.internal.BTreeStoreAndGetFuture;
122+
import net.spy.memcached.internal.BroadcastFuture;
122123
import net.spy.memcached.internal.BulkOperationFuture;
123124
import net.spy.memcached.internal.CheckedOperationTimeoutException;
124125
import net.spy.memcached.internal.CollectionFuture;
125126
import net.spy.memcached.internal.CollectionGetBulkFuture;
127+
import net.spy.memcached.internal.CollectionGetFuture;
126128
import net.spy.memcached.internal.OperationFuture;
127-
import net.spy.memcached.internal.SMGetFuture;
128129
import net.spy.memcached.internal.PipedCollectionFuture;
129-
import net.spy.memcached.internal.CollectionGetFuture;
130-
import net.spy.memcached.internal.BroadcastFuture;
130+
import net.spy.memcached.internal.SMGetFuture;
131131
import net.spy.memcached.ops.BTreeFindPositionOperation;
132132
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
133133
import net.spy.memcached.ops.BTreeGetBulkOperation;
@@ -3498,99 +3498,98 @@ public CollectionFuture<Integer> asyncBopGetItemCount(String key,
34983498
}
34993499

35003500
@Override
3501-
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key,
3502-
List<Object> values) {
3503-
SetPipedExist<Object> exist = new SetPipedExist<Object>(key, values, collectionTranscoder);
3504-
return asyncSetPipedExist(key, exist);
3501+
public CollectionFuture<Map<Object, Boolean>> asyncSopPipedExistBulk(String key, List<Object> values) {
3502+
return asyncSopPipedExistBulk(key, values, collectionTranscoder);
35053503
}
35063504

35073505
@Override
35083506
public <T> CollectionFuture<Map<T, Boolean>> asyncSopPipedExistBulk(String key,
35093507
List<T> values,
35103508
Transcoder<T> tc) {
3511-
SetPipedExist<T> exist = new SetPipedExist<T>(key, values, tc);
3512-
return asyncSetPipedExist(key, exist);
3509+
if (values.size() == 0) {
3510+
throw new IllegalArgumentException(
3511+
"The number of piped operations must be larger than 0.");
3512+
}
3513+
3514+
List<SetPipedExist<T>> existList = new ArrayList<SetPipedExist<T>>();
3515+
if (values.size() <= SetPipedExist.MAX_PIPED_ITEM_COUNT) {
3516+
existList.add(new SetPipedExist<T>(key, values, tc));
3517+
} else {
3518+
PartitionedList<T> partitionedList = new PartitionedList<T>(values, SetPipedExist.MAX_PIPED_ITEM_COUNT);
3519+
for (List<T> partition : partitionedList) {
3520+
existList.add(new SetPipedExist<T>(key, partition, tc));
3521+
}
3522+
}
3523+
return asyncSetPipedExist(key, existList);
35133524
}
35143525

35153526
/**
35163527
* Generic pipelined existence operation for set items. Public methods call this method.
35173528
*
35183529
* @param key collection item's key
3519-
* @param exist operation parameters (element values)
3530+
* @param existList list of operation parameters (element values)
35203531
* @return future holding the map of elements and their existence results
35213532
*/
35223533
<T> CollectionFuture<Map<T, Boolean>> asyncSetPipedExist(
3523-
final String key, final SetPipedExist<T> exist) {
3524-
3525-
if (exist.getItemCount() == 0) {
3526-
throw new IllegalArgumentException(
3527-
"The number of piped operations must be larger than 0.");
3528-
}
3529-
if (exist.getItemCount() > CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
3530-
throw new IllegalArgumentException(
3531-
"The number of piped operations must not exceed a maximum of "
3532-
+ CollectionPipedInsert.MAX_PIPED_ITEM_COUNT + ".");
3533-
}
3534-
3535-
final CountDownLatch latch = new CountDownLatch(1);
3536-
final CollectionFuture<Map<T, Boolean>> rv = new CollectionFuture<Map<T, Boolean>>(
3537-
latch, operationTimeout);
3538-
3539-
Operation op = opFact.collectionPipedExist(key, exist,
3540-
new CollectionPipedExistOperation.Callback() {
3541-
3542-
private final Map<T, Boolean> result = new HashMap<T, Boolean>();
3543-
private boolean hasAnError = false;
3544-
3545-
public void receivedStatus(OperationStatus status) {
3546-
if (hasAnError) {
3547-
return;
3548-
}
3549-
3550-
CollectionOperationStatus cstatus;
3551-
if (status instanceof CollectionOperationStatus) {
3552-
cstatus = (CollectionOperationStatus) status;
3553-
} else {
3534+
final String key, final List<SetPipedExist<T>> existList) {
3535+
final CountDownLatch latch = new CountDownLatch(existList.size());
3536+
3537+
final PipedCollectionFuture<T, Boolean> rv
3538+
= new PipedCollectionFuture<T, Boolean>(latch, operationTimeout);
3539+
3540+
for (final SetPipedExist<T> exist : existList) {
3541+
Operation op = opFact.collectionPipedExist(key, exist, new CollectionPipedExistOperation.Callback() {
3542+
private CollectionOperationStatus failedStatus = null;
3543+
private boolean isSameStatus = true;
3544+
public void gotStatus(Integer index, OperationStatus status) {
3545+
CollectionOperationStatus cstatus;
3546+
if (status instanceof CollectionOperationStatus) {
3547+
cstatus = (CollectionOperationStatus) status;
3548+
} else {
3549+
getLogger().warn("Unhandled state: " + status);
3550+
cstatus = new CollectionOperationStatus(status);
3551+
}
3552+
switch (cstatus.getResponse()) {
3553+
case EXIST:
3554+
case NOT_EXIST:
3555+
rv.addEachResult(exist.getValues().get(index), (CollectionResponse.EXIST.equals(cstatus.getResponse())));
3556+
break;
3557+
case UNREADABLE:
3558+
case TYPE_MISMATCH:
3559+
case NOT_FOUND:
3560+
if (failedStatus == null) {
3561+
failedStatus = cstatus;
3562+
} else if (!failedStatus.equals(cstatus)) {
3563+
isSameStatus = false;
3564+
}
3565+
break;
3566+
default:
35543567
getLogger().warn("Unhandled state: " + status);
3555-
cstatus = new CollectionOperationStatus(status);
3556-
}
3557-
rv.set(result, cstatus);
35583568
}
3569+
}
35593570

3560-
public void complete() {
3561-
latch.countDown();
3571+
public void receivedStatus(OperationStatus status) {
3572+
CollectionOperationStatus cstatus;
3573+
if (status instanceof CollectionOperationStatus) {
3574+
cstatus = (CollectionOperationStatus) status;
3575+
} else {
3576+
getLogger().warn("Unhandled state: " + status);
3577+
cstatus = new CollectionOperationStatus(status);
35623578
}
3563-
3564-
public void gotStatus(Integer index, OperationStatus status) {
3565-
CollectionOperationStatus cstatus;
3566-
if (status instanceof CollectionOperationStatus) {
3567-
cstatus = (CollectionOperationStatus) status;
3568-
} else {
3569-
cstatus = new CollectionOperationStatus(status);
3570-
}
3571-
3572-
switch (cstatus.getResponse()) {
3573-
case EXIST:
3574-
case NOT_EXIST:
3575-
result.put(exist.getValues().get(index),
3576-
(CollectionResponse.EXIST.equals(cstatus
3577-
.getResponse())));
3578-
break;
3579-
case UNREADABLE:
3580-
case TYPE_MISMATCH:
3581-
case NOT_FOUND:
3582-
hasAnError = true;
3583-
rv.set(new HashMap<T, Boolean>(0),
3584-
(CollectionOperationStatus) status);
3585-
break;
3586-
default:
3587-
getLogger().warn("Unhandled state: " + status);
3588-
}
3579+
if (failedStatus != null && isSameStatus) {
3580+
rv.addOperationStatus(failedStatus);
3581+
} else {
3582+
rv.addOperationStatus(cstatus);
35893583
}
3590-
});
3584+
}
35913585

3592-
rv.setOperation(op);
3593-
addOp(key, op);
3586+
public void complete() {
3587+
latch.countDown();
3588+
}
3589+
});
3590+
rv.addOperation(op);
3591+
addOp(key, op);
3592+
}
35943593
return rv;
35953594
}
35963595

src/test/manual/net/spy/memcached/collection/set/SopPipedExistTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,36 @@ public void testMaxPipedExist() {
166166
Assert.fail(e.getMessage());
167167
}
168168
}
169+
public void testMaxOverPipedExist() {
170+
int OVER_COUNT = 1000;
171+
172+
try {
173+
List<Object> findValues = new ArrayList<Object>();
174+
175+
// insert items
176+
for (int i = 0; i < OVER_COUNT; i++) {
177+
findValues.add("VALUE" + i);
178+
179+
Assert.assertTrue(mc.asyncSopInsert(KEY, "VALUE" + i, new CollectionAttributes()).get());
180+
}
181+
182+
// exist bulk
183+
CollectionFuture<Map<Object, Boolean>> future = mc
184+
.asyncSopPipedExistBulk(KEY, findValues);
185+
186+
Map<Object, Boolean> map = future.get();
187+
188+
Assert.assertTrue(future.getOperationStatus().isSuccess());
189+
190+
for (int i = 0; i < OVER_COUNT; i++) {
191+
Assert.assertTrue(map.get("VALUE" + i));
192+
}
193+
194+
} catch (Exception e) {
195+
e.printStackTrace();
196+
Assert.fail(e.getMessage());
197+
}
198+
}
169199

170200
public void testPipedExistNotExistsKey() {
171201
try {

0 commit comments

Comments
 (0)