Skip to content

Commit 8ebe7c2

Browse files
committed
INTERNAL: Creating piped collection operation future.
1 parent 2e1f5fa commit 8ebe7c2

File tree

2 files changed

+129
-180
lines changed

2 files changed

+129
-180
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

Lines changed: 13 additions & 180 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@
126126
import net.spy.memcached.internal.OperationFuture;
127127
import net.spy.memcached.internal.BulkOperationFuture;
128128
import net.spy.memcached.internal.SMGetFuture;
129+
import net.spy.memcached.internal.PipedOperationFuture;
129130
import net.spy.memcached.ops.BTreeFindPositionOperation;
130131
import net.spy.memcached.ops.BTreeFindPositionWithGetOperation;
131132
import net.spy.memcached.ops.BTreeGetBulkOperation;
@@ -948,15 +949,9 @@ public void gotStatus(Integer index, OperationStatus status) {
948949
<T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPipedUpdate(
949950
final String key, final List<CollectionPipedUpdate<T>> updateList) {
950951

951-
final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<Operation>();
952952

953953
final CountDownLatch latch = new CountDownLatch(updateList.size());
954-
955-
final List<OperationStatus> mergedOperationStatus = Collections
956-
.synchronizedList(new ArrayList<OperationStatus>(1));
957-
958-
final Map<Integer, CollectionOperationStatus> mergedResult =
959-
new ConcurrentHashMap<Integer, CollectionOperationStatus>();
954+
final PipedOperationFuture rv = new PipedOperationFuture(latch, operationTimeout);
960955

961956
for (int i = 0; i < updateList.size(); i++) {
962957
final CollectionPipedUpdate<T> update = updateList.get(i);
@@ -974,7 +969,7 @@ public void receivedStatus(OperationStatus status) {
974969
getLogger().warn("Unhandled state: " + status);
975970
cstatus = new CollectionOperationStatus(status);
976971
}
977-
mergedOperationStatus.add(cstatus);
972+
rv.addCollectionOpStatus(cstatus);
978973
}
979974

980975
// complete
@@ -985,97 +980,18 @@ public void complete() {
985980
// got status
986981
public void gotStatus(Integer index, OperationStatus status) {
987982
if (status instanceof CollectionOperationStatus) {
988-
mergedResult.put(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
983+
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
989984
(CollectionOperationStatus) status);
990985
} else {
991-
mergedResult.put(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
986+
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
992987
new CollectionOperationStatus(status));
993988
}
994989
}
995990
});
996991
addOp(key, op);
997-
ops.add(op);
992+
rv.addOperation(op);
998993
}
999-
1000-
return new CollectionFuture<Map<Integer, CollectionOperationStatus>>(
1001-
latch, operationTimeout) {
1002-
1003-
@Override
1004-
public boolean cancel(boolean ign) {
1005-
boolean rv = false;
1006-
for (Operation op : ops) {
1007-
op.cancel("by application.");
1008-
rv |= op.getState() == OperationState.WRITE_QUEUED;
1009-
}
1010-
return rv;
1011-
}
1012-
1013-
@Override
1014-
public boolean isCancelled() {
1015-
for (Operation op : ops) {
1016-
if (op.isCancelled()) {
1017-
return true;
1018-
}
1019-
}
1020-
return false;
1021-
}
1022-
1023-
@Override
1024-
public Map<Integer, CollectionOperationStatus> get(long duration,
1025-
TimeUnit units)
1026-
throws InterruptedException, TimeoutException, ExecutionException {
1027-
1028-
if (!latch.await(duration, units)) {
1029-
Collection<Operation> timedoutOps = new HashSet<Operation>();
1030-
for (Operation op : ops) {
1031-
if (op.getState() != OperationState.COMPLETE) {
1032-
timedoutOps.add(op);
1033-
} else {
1034-
MemcachedConnection.opSucceeded(op);
1035-
}
1036-
}
1037-
if (timedoutOps.size() > 0) {
1038-
MemcachedConnection.opTimedOut(timedoutOps.iterator().next());
1039-
throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
1040-
}
1041-
} else {
1042-
// continuous timeout counter will be reset only once in pipe
1043-
MemcachedConnection.opSucceeded(ops.iterator().next());
1044-
}
1045-
1046-
for (Operation op : ops) {
1047-
if (op != null && op.hasErrored()) {
1048-
throw new ExecutionException(op.getException());
1049-
}
1050-
1051-
if (op != null && op.isCancelled()) {
1052-
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
1053-
}
1054-
}
1055-
1056-
return mergedResult;
1057-
}
1058-
1059-
@Override
1060-
public CollectionOperationStatus getOperationStatus() {
1061-
for (OperationStatus status : mergedOperationStatus) {
1062-
if (!status.isSuccess()) {
1063-
return new CollectionOperationStatus(status);
1064-
}
1065-
}
1066-
return new CollectionOperationStatus(true, "END", CollectionResponse.END);
1067-
}
1068-
1069-
@Override
1070-
public boolean isDone() {
1071-
for (Operation op : ops) {
1072-
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
1073-
return false;
1074-
}
1075-
}
1076-
return true;
1077-
}
1078-
};
994+
return rv;
1079995
}
1080996

1081997
/**
@@ -3936,15 +3852,9 @@ public SMGetFuture<List<SMGetElement<Object>>> asyncBopSortMergeGet(
39363852
<T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPipedInsert(
39373853
final String key, final List<CollectionPipedInsert<T>> insertList) {
39383854

3939-
final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<Operation>();
39403855

39413856
final CountDownLatch latch = new CountDownLatch(insertList.size());
3942-
3943-
final List<OperationStatus> mergedOperationStatus = Collections
3944-
.synchronizedList(new ArrayList<OperationStatus>(1));
3945-
3946-
final Map<Integer, CollectionOperationStatus> mergedResult =
3947-
new ConcurrentHashMap<Integer, CollectionOperationStatus>();
3857+
final PipedOperationFuture rv = new PipedOperationFuture(latch, operationTimeout);
39483858

39493859
for (int i = 0; i < insertList.size(); i++) {
39503860
final CollectionPipedInsert<T> insert = insertList.get(i);
@@ -3962,7 +3872,7 @@ public void receivedStatus(OperationStatus status) {
39623872
getLogger().warn("Unhandled state: " + status);
39633873
cstatus = new CollectionOperationStatus(status);
39643874
}
3965-
mergedOperationStatus.add(cstatus);
3875+
rv.addCollectionOpStatus(cstatus);
39663876
}
39673877

39683878
// complete
@@ -3973,96 +3883,19 @@ public void complete() {
39733883
// got status
39743884
public void gotStatus(Integer index, OperationStatus status) {
39753885
if (status instanceof CollectionOperationStatus) {
3976-
mergedResult.put(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3886+
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
39773887
(CollectionOperationStatus) status);
39783888
} else {
3979-
mergedResult.put(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3889+
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
39803890
new CollectionOperationStatus(status));
39813891
}
39823892
}
39833893
});
39843894
addOp(key, op);
3985-
ops.add(op);
3895+
rv.addOperation(op);
39863896
}
39873897

3988-
return new CollectionFuture<Map<Integer, CollectionOperationStatus>>(
3989-
latch, operationTimeout) {
3990-
3991-
@Override
3992-
public boolean cancel(boolean ign) {
3993-
boolean rv = false;
3994-
for (Operation op : ops) {
3995-
op.cancel("by application.");
3996-
rv |= op.getState() == OperationState.WRITE_QUEUED;
3997-
}
3998-
return rv;
3999-
}
4000-
4001-
@Override
4002-
public boolean isCancelled() {
4003-
for (Operation op : ops) {
4004-
if (op.isCancelled()) {
4005-
return true;
4006-
}
4007-
}
4008-
return false;
4009-
}
4010-
4011-
@Override
4012-
public Map<Integer, CollectionOperationStatus> get(long duration,
4013-
TimeUnit units)
4014-
throws InterruptedException, TimeoutException, ExecutionException {
4015-
4016-
if (!latch.await(duration, units)) {
4017-
Collection<Operation> timedoutOps = new HashSet<Operation>();
4018-
for (Operation op : ops) {
4019-
if (op.getState() != OperationState.COMPLETE) {
4020-
timedoutOps.add(op);
4021-
} else {
4022-
MemcachedConnection.opSucceeded(op);
4023-
}
4024-
}
4025-
if (timedoutOps.size() > 0) {
4026-
MemcachedConnection.opTimedOut(timedoutOps.iterator().next());
4027-
throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
4028-
}
4029-
} else {
4030-
// continuous timeout counter will be reset only once in pipe
4031-
MemcachedConnection.opSucceeded(ops.iterator().next());
4032-
}
4033-
for (Operation op : ops) {
4034-
if (op != null && op.hasErrored()) {
4035-
throw new ExecutionException(op.getException());
4036-
}
4037-
4038-
if (op != null && op.isCancelled()) {
4039-
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
4040-
}
4041-
}
4042-
4043-
return mergedResult;
4044-
}
4045-
4046-
@Override
4047-
public CollectionOperationStatus getOperationStatus() {
4048-
for (OperationStatus status : mergedOperationStatus) {
4049-
if (!status.isSuccess()) {
4050-
return new CollectionOperationStatus(status);
4051-
}
4052-
}
4053-
return new CollectionOperationStatus(true, "END", CollectionResponse.END);
4054-
}
4055-
4056-
@Override
4057-
public boolean isDone() {
4058-
for (Operation op : ops) {
4059-
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
4060-
return false;
4061-
}
4062-
}
4063-
return true;
4064-
}
4065-
};
3898+
return rv;
40663899
}
40673900

40683901
@Override
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package net.spy.memcached.internal;
2+
3+
import net.spy.memcached.MemcachedConnection;
4+
import net.spy.memcached.ops.CollectionOperationStatus;
5+
import net.spy.memcached.ops.Operation;
6+
import net.spy.memcached.ops.OperationState;
7+
8+
import java.util.List;
9+
import java.util.Map;
10+
import java.util.Collections;
11+
import java.util.ArrayList;
12+
import java.util.Collection;
13+
import java.util.HashSet;
14+
import java.util.concurrent.CountDownLatch;
15+
import java.util.concurrent.ConcurrentLinkedQueue;
16+
import java.util.concurrent.ConcurrentHashMap;
17+
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.TimeoutException;
19+
import java.util.concurrent.ExecutionException;
20+
21+
public class PipedOperationFuture
22+
extends CollectionFuture<Map<Integer, CollectionOperationStatus>> {
23+
private final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<Operation>();
24+
private final List<CollectionOperationStatus> mergedOperationStatus = Collections
25+
.synchronizedList(new ArrayList<CollectionOperationStatus>(1));
26+
27+
private final Map<Integer, CollectionOperationStatus> mergedResult =
28+
new ConcurrentHashMap<Integer, CollectionOperationStatus>();
29+
30+
public PipedOperationFuture(CountDownLatch l, long opTimeout) {
31+
super(l, opTimeout);
32+
}
33+
34+
@Override
35+
public boolean cancel(boolean ign) {
36+
boolean rv = false;
37+
for (Operation op : ops) {
38+
op.cancel("by application.");
39+
rv |= op.getState() == OperationState.WRITE_QUEUED;
40+
}
41+
return rv;
42+
}
43+
44+
@Override
45+
public boolean isCancelled() {
46+
for (Operation op : ops) {
47+
if (op.isCancelled()) {
48+
return true;
49+
}
50+
}
51+
return false;
52+
}
53+
54+
@Override
55+
public boolean isDone() {
56+
for (Operation op : ops) {
57+
if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
58+
return false;
59+
}
60+
}
61+
return true;
62+
}
63+
64+
@Override
65+
public Map<Integer, CollectionOperationStatus> get(long duration,
66+
TimeUnit units)
67+
throws InterruptedException, TimeoutException, ExecutionException {
68+
69+
if (!latch.await(duration, units)) {
70+
Collection<Operation> timedoutOps = new HashSet<Operation>();
71+
for (Operation op : ops) {
72+
if (op.getState() != OperationState.COMPLETE) {
73+
timedoutOps.add(op);
74+
} else {
75+
MemcachedConnection.opSucceeded(op);
76+
}
77+
}
78+
if (timedoutOps.size() > 0) {
79+
MemcachedConnection.opTimedOut(timedoutOps.iterator().next());
80+
throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
81+
}
82+
} else {
83+
// continuous timeout counter will be reset only once in pipe
84+
MemcachedConnection.opSucceeded(ops.iterator().next());
85+
}
86+
87+
for (Operation op : ops) {
88+
if (op != null && op.hasErrored()) {
89+
throw new ExecutionException(op.getException());
90+
}
91+
92+
if (op != null && op.isCancelled()) {
93+
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
94+
}
95+
}
96+
97+
return mergedResult;
98+
}
99+
100+
@Override
101+
public CollectionOperationStatus getOperationStatus() {
102+
return mergedOperationStatus.get(0);
103+
}
104+
105+
public void addCollectionOpStatus(CollectionOperationStatus status) {
106+
mergedOperationStatus.add(status);
107+
}
108+
109+
public void addEachResult(int index, CollectionOperationStatus status) {
110+
mergedResult.put(index, status);
111+
}
112+
113+
public void addOperation(Operation op) {
114+
ops.add(op);
115+
}
116+
}

0 commit comments

Comments
 (0)