Skip to content

Commit b1ef7d5

Browse files
committed
INTERNAL: Creating piped collection operation future.
1 parent 7cdb935 commit b1ef7d5

File tree

4 files changed

+166
-197
lines changed

4 files changed

+166
-197
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

Lines changed: 19 additions & 183 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@
126126
import net.spy.memcached.internal.CollectionGetBulkFuture;
127127
import net.spy.memcached.internal.OperationFuture;
128128
import net.spy.memcached.internal.SMGetFuture;
129+
import net.spy.memcached.internal.PipedCollectionFuture;
129130
import net.spy.memcached.ops.BTreeFindPositionOperation;
130131
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
131132
import net.spy.memcached.ops.BTreeGetBulkOperation;
@@ -948,15 +949,10 @@ public void gotStatus(Integer index, OperationStatus status) {
948949
<T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPipedUpdate(
949950
final String key, final List<CollectionPipedUpdate<T>> updateList) {
950951

951-
final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<Operation>();
952-
953952
final CountDownLatch latch = new CountDownLatch(updateList.size());
954-
955-
final List<CollectionOperationStatus> mergedOperationStatus = Collections
956-
.synchronizedList(new ArrayList<CollectionOperationStatus>(updateList.size()));
957-
958-
final Map<Integer, CollectionOperationStatus> mergedResult =
959-
new ConcurrentHashMap<Integer, CollectionOperationStatus>();
953+
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
954+
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout, updateList.size());
955+
Collection<Operation> ops = new ConcurrentLinkedQueue<Operation>();
960956

961957
for (int i = 0; i < updateList.size(); i++) {
962958
final CollectionPipedUpdate<T> update = updateList.get(i);
@@ -974,7 +970,7 @@ public void receivedStatus(OperationStatus status) {
974970
getLogger().warn("Unhandled state: " + status);
975971
cstatus = new CollectionOperationStatus(status);
976972
}
977-
mergedOperationStatus.add(cstatus);
973+
rv.addOperationStatus(cstatus);
978974
}
979975

980976
// complete
@@ -985,97 +981,19 @@ public void complete() {
985981
// got status
986982
public void gotStatus(Integer index, OperationStatus status) {
987983
if (status instanceof CollectionOperationStatus) {
988-
mergedResult.put(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
984+
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
989985
(CollectionOperationStatus) status);
990986
} else {
991-
mergedResult.put(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
987+
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
992988
new CollectionOperationStatus(status));
993989
}
994990
}
995991
});
996-
addOp(key, op);
997992
ops.add(op);
998993
}
999-
1000-
return new CollectionFuture<Map<Integer, CollectionOperationStatus>>(
1001-
latch, operationTimeout) {
1002-
1003-
@Override
1004-
public boolean cancel(boolean ign) {
1005-
boolean rv = false;
1006-
for (Operation op : ops) {
1007-
op.cancel("by application.");
1008-
rv |= op.getState() == OperationState.WRITE_QUEUED;
1009-
}
1010-
return rv;
1011-
}
1012-
1013-
@Override
1014-
public boolean isCancelled() {
1015-
for (Operation op : ops) {
1016-
if (op.isCancelled()) {
1017-
return true;
1018-
}
1019-
}
1020-
return false;
1021-
}
1022-
1023-
@Override
1024-
public Map<Integer, CollectionOperationStatus> get(long duration,
1025-
TimeUnit units)
1026-
throws InterruptedException, TimeoutException, ExecutionException {
1027-
1028-
if (!latch.await(duration, units)) {
1029-
Collection<Operation> timedoutOps = new HashSet<Operation>();
1030-
for (Operation op : ops) {
1031-
if (op.getState() != OperationState.COMPLETE) {
1032-
timedoutOps.add(op);
1033-
} else {
1034-
MemcachedConnection.opSucceeded(op);
1035-
}
1036-
}
1037-
if (timedoutOps.size() > 0) {
1038-
MemcachedConnection.opTimedOut(timedoutOps.iterator().next());
1039-
throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
1040-
}
1041-
} else {
1042-
// continuous timeout counter will be reset only once in pipe
1043-
MemcachedConnection.opSucceeded(ops.iterator().next());
1044-
}
1045-
1046-
for (Operation op : ops) {
1047-
if (op != null && op.hasErrored()) {
1048-
throw new ExecutionException(op.getException());
1049-
}
1050-
1051-
if (op != null && op.isCancelled()) {
1052-
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
1053-
}
1054-
}
1055-
1056-
return mergedResult;
1057-
}
1058-
1059-
@Override
1060-
public CollectionOperationStatus getOperationStatus() {
1061-
for (CollectionOperationStatus status : mergedOperationStatus) {
1062-
if (!status.isSuccess()) {
1063-
return status;
1064-
}
1065-
}
1066-
return new CollectionOperationStatus(true, "END", CollectionResponse.END);
1067-
}
1068-
1069-
@Override
1070-
public boolean isDone() {
1071-
for (Operation op : ops) {
1072-
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
1073-
return false;
1074-
}
1075-
}
1076-
return true;
1077-
}
1078-
};
994+
rv.setOps(ops);
995+
addOps(key, ops);
996+
return rv;
1079997
}
1080998

1081999
/**
@@ -3903,15 +3821,10 @@ public SMGetFuture<List<SMGetElement<Object>>> asyncBopSortMergeGet(
39033821
<T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPipedInsert(
39043822
final String key, final List<CollectionPipedInsert<T>> insertList) {
39053823

3906-
final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<Operation>();
3907-
39083824
final CountDownLatch latch = new CountDownLatch(insertList.size());
3909-
3910-
final List<CollectionOperationStatus> mergedOperationStatus = Collections
3911-
.synchronizedList(new ArrayList<CollectionOperationStatus>(insertList.size()));
3912-
3913-
final Map<Integer, CollectionOperationStatus> mergedResult =
3914-
new ConcurrentHashMap<Integer, CollectionOperationStatus>();
3825+
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
3826+
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout, insertList.size());
3827+
Collection<Operation> ops = new ConcurrentLinkedQueue<Operation>();
39153828

39163829
for (int i = 0; i < insertList.size(); i++) {
39173830
final CollectionPipedInsert<T> insert = insertList.get(i);
@@ -3929,7 +3842,7 @@ public void receivedStatus(OperationStatus status) {
39293842
getLogger().warn("Unhandled state: " + status);
39303843
cstatus = new CollectionOperationStatus(status);
39313844
}
3932-
mergedOperationStatus.add(cstatus);
3845+
rv.addOperationStatus(cstatus);
39333846
}
39343847

39353848
// complete
@@ -3940,96 +3853,19 @@ public void complete() {
39403853
// got status
39413854
public void gotStatus(Integer index, OperationStatus status) {
39423855
if (status instanceof CollectionOperationStatus) {
3943-
mergedResult.put(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3856+
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
39443857
(CollectionOperationStatus) status);
39453858
} else {
3946-
mergedResult.put(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3859+
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
39473860
new CollectionOperationStatus(status));
39483861
}
39493862
}
39503863
});
3951-
addOp(key, op);
39523864
ops.add(op);
39533865
}
3954-
3955-
return new CollectionFuture<Map<Integer, CollectionOperationStatus>>(
3956-
latch, operationTimeout) {
3957-
3958-
@Override
3959-
public boolean cancel(boolean ign) {
3960-
boolean rv = false;
3961-
for (Operation op : ops) {
3962-
op.cancel("by application.");
3963-
rv |= op.getState() == OperationState.WRITE_QUEUED;
3964-
}
3965-
return rv;
3966-
}
3967-
3968-
@Override
3969-
public boolean isCancelled() {
3970-
for (Operation op : ops) {
3971-
if (op.isCancelled()) {
3972-
return true;
3973-
}
3974-
}
3975-
return false;
3976-
}
3977-
3978-
@Override
3979-
public Map<Integer, CollectionOperationStatus> get(long duration,
3980-
TimeUnit units)
3981-
throws InterruptedException, TimeoutException, ExecutionException {
3982-
3983-
if (!latch.await(duration, units)) {
3984-
Collection<Operation> timedoutOps = new HashSet<Operation>();
3985-
for (Operation op : ops) {
3986-
if (op.getState() != OperationState.COMPLETE) {
3987-
timedoutOps.add(op);
3988-
} else {
3989-
MemcachedConnection.opSucceeded(op);
3990-
}
3991-
}
3992-
if (timedoutOps.size() > 0) {
3993-
MemcachedConnection.opTimedOut(timedoutOps.iterator().next());
3994-
throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
3995-
}
3996-
} else {
3997-
// continuous timeout counter will be reset only once in pipe
3998-
MemcachedConnection.opSucceeded(ops.iterator().next());
3999-
}
4000-
for (Operation op : ops) {
4001-
if (op != null && op.hasErrored()) {
4002-
throw new ExecutionException(op.getException());
4003-
}
4004-
4005-
if (op != null && op.isCancelled()) {
4006-
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
4007-
}
4008-
}
4009-
4010-
return mergedResult;
4011-
}
4012-
4013-
@Override
4014-
public CollectionOperationStatus getOperationStatus() {
4015-
for (CollectionOperationStatus status : mergedOperationStatus) {
4016-
if (!status.isSuccess()) {
4017-
return status;
4018-
}
4019-
}
4020-
return new CollectionOperationStatus(true, "END", CollectionResponse.END);
4021-
}
4022-
4023-
@Override
4024-
public boolean isDone() {
4025-
for (Operation op : ops) {
4026-
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
4027-
return false;
4028-
}
4029-
}
4030-
return true;
4031-
}
4032-
};
3866+
rv.setOps(ops);
3867+
addOps(key, ops);
3868+
return rv;
40333869
}
40343870

40353871
@Override

src/main/java/net/spy/memcached/MemcachedClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,12 @@ protected Operation addOp(final MemcachedNode node, final Operation op) {
366366
return op;
367367
}
368368

369+
protected void addOps(final String key, final Collection<Operation> ops) {
370+
validateKey(key);
371+
checkState();
372+
conn.addOperations(key, ops);
373+
}
374+
369375
protected void addOpMap(final Map<String, Operation> opMap) {
370376
checkState();
371377
for (Map.Entry<String, Operation> me : opMap.entrySet()) {

src/main/java/net/spy/memcached/MemcachedConnection.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1464,14 +1464,7 @@ public void insertOperation(final MemcachedNode node, final Operation o) {
14641464
}
14651465

14661466
public void addOperation(final MemcachedNode node, final Operation o) {
1467-
if (node == null) {
1468-
o.cancel("no node");
1469-
return;
1470-
}
1471-
if ((!node.isActive() && !node.isFirstConnecting()) &&
1472-
failureMode == FailureMode.Cancel) {
1473-
o.setHandlingNode(node);
1474-
o.cancel("inactive node");
1467+
if (checkNodeState(node, o)) {
14751468
return;
14761469
}
14771470
node.addOpToInputQ(o);
@@ -1481,17 +1474,29 @@ public void addOperation(final MemcachedNode node, final Operation o) {
14811474
getLogger().debug("Added %s to %s", o, node);
14821475
}
14831476

1484-
public void addOperations(final Map<MemcachedNode, Operation> ops) {
1485-
for (Map.Entry<MemcachedNode, Operation> me : ops.entrySet()) {
1486-
final MemcachedNode node = me.getKey();
1487-
Operation o = me.getValue();
1488-
node.addOpToInputQ(o);
1489-
addedQueue.offer(node);
1477+
public void addOperations(String key, Collection<Operation> ops) {
1478+
MemcachedNode findNode = findNodeByKey(key);
1479+
for (Operation op : ops) {
1480+
addOperation(findNode, op);
14901481
}
14911482
Selector s = selector.wakeup();
14921483
assert s == selector : "Wakeup returned the wrong selector.";
14931484
}
14941485

1486+
private boolean checkNodeState(MemcachedNode findNode, Operation op) {
1487+
if (findNode == null) {
1488+
op.cancel("no node");
1489+
return true;
1490+
}
1491+
if ((!findNode.isActive() && !findNode.isFirstConnecting()) &&
1492+
failureMode == FailureMode.Cancel) {
1493+
op.setHandlingNode(findNode);
1494+
op.cancel("inactive node");
1495+
return true;
1496+
}
1497+
return false;
1498+
}
1499+
14951500
/**
14961501
* Broadcast an operation to all nodes.
14971502
*/

0 commit comments

Comments
 (0)