126126import net .spy .memcached .internal .OperationFuture ;
127127import net .spy .memcached .internal .BulkOperationFuture ;
128128import net .spy .memcached .internal .SMGetFuture ;
129+ import net .spy .memcached .internal .PipedCollectionFuture ;
129130import net .spy .memcached .ops .BTreeFindPositionOperation ;
130131import net .spy .memcached .ops .BTreeFindPositionWithGetOperation ;
131132import net .spy .memcached .ops .BTreeGetBulkOperation ;
@@ -948,15 +949,9 @@ public void gotStatus(Integer index, OperationStatus status) {
948949 <T > CollectionFuture <Map <Integer , CollectionOperationStatus >> asyncCollectionPipedUpdate (
949950 final String key , final List <CollectionPipedUpdate <T >> updateList ) {
950951
951- final ConcurrentLinkedQueue <Operation > ops = new ConcurrentLinkedQueue <Operation >();
952952
953953 final CountDownLatch latch = new CountDownLatch (updateList .size ());
954-
955- final List <OperationStatus > mergedOperationStatus = Collections
956- .synchronizedList (new ArrayList <OperationStatus >(1 ));
957-
958- final Map <Integer , CollectionOperationStatus > mergedResult =
959- new ConcurrentHashMap <Integer , CollectionOperationStatus >();
954+ final PipedCollectionFuture rv = new PipedCollectionFuture (latch , operationTimeout , updateList .size ());
960955
961956 for (int i = 0 ; i < updateList .size (); i ++) {
962957 final CollectionPipedUpdate <T > update = updateList .get (i );
@@ -974,7 +969,7 @@ public void receivedStatus(OperationStatus status) {
974969 getLogger ().warn ("Unhandled state: " + status );
975970 cstatus = new CollectionOperationStatus (status );
976971 }
977- mergedOperationStatus . add (cstatus );
972+ rv . addOperationStatus (cstatus );
978973 }
979974
980975 // complete
@@ -985,97 +980,18 @@ public void complete() {
985980 // got status
986981 public void gotStatus (Integer index , OperationStatus status ) {
987982 if (status instanceof CollectionOperationStatus ) {
988- mergedResult . put (index + (idx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
983+ rv . addEachResult (index + (idx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
989984 (CollectionOperationStatus ) status );
990985 } else {
991- mergedResult . put (index + (idx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
986+ rv . addEachResult (index + (idx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
992987 new CollectionOperationStatus (status ));
993988 }
994989 }
995990 });
996991 addOp (key , op );
997- ops . add (op );
992+ rv . addOperation (op );
998993 }
999-
1000- return new CollectionFuture <Map <Integer , CollectionOperationStatus >>(
1001- latch , operationTimeout ) {
1002-
1003- @ Override
1004- public boolean cancel (boolean ign ) {
1005- boolean rv = false ;
1006- for (Operation op : ops ) {
1007- op .cancel ("by application." );
1008- rv |= op .getState () == OperationState .WRITE_QUEUED ;
1009- }
1010- return rv ;
1011- }
1012-
1013- @ Override
1014- public boolean isCancelled () {
1015- for (Operation op : ops ) {
1016- if (op .isCancelled ()) {
1017- return true ;
1018- }
1019- }
1020- return false ;
1021- }
1022-
1023- @ Override
1024- public Map <Integer , CollectionOperationStatus > get (long duration ,
1025- TimeUnit units )
1026- throws InterruptedException , TimeoutException , ExecutionException {
1027-
1028- if (!latch .await (duration , units )) {
1029- Collection <Operation > timedoutOps = new HashSet <Operation >();
1030- for (Operation op : ops ) {
1031- if (op .getState () != OperationState .COMPLETE ) {
1032- timedoutOps .add (op );
1033- } else {
1034- MemcachedConnection .opSucceeded (op );
1035- }
1036- }
1037- if (timedoutOps .size () > 0 ) {
1038- MemcachedConnection .opTimedOut (timedoutOps .iterator ().next ());
1039- throw new CheckedOperationTimeoutException (duration , units , timedoutOps );
1040- }
1041- } else {
1042- // continuous timeout counter will be reset only once in pipe
1043- MemcachedConnection .opSucceeded (ops .iterator ().next ());
1044- }
1045-
1046- for (Operation op : ops ) {
1047- if (op != null && op .hasErrored ()) {
1048- throw new ExecutionException (op .getException ());
1049- }
1050-
1051- if (op != null && op .isCancelled ()) {
1052- throw new ExecutionException (new RuntimeException (op .getCancelCause ()));
1053- }
1054- }
1055-
1056- return mergedResult ;
1057- }
1058-
1059- @ Override
1060- public CollectionOperationStatus getOperationStatus () {
1061- for (OperationStatus status : mergedOperationStatus ) {
1062- if (!status .isSuccess ()) {
1063- return new CollectionOperationStatus (status );
1064- }
1065- }
1066- return new CollectionOperationStatus (true , "END" , CollectionResponse .END );
1067- }
1068-
1069- @ Override
1070- public boolean isDone () {
1071- for (Operation op : ops ) {
1072- if (!(op .getState () == OperationState .COMPLETE || op .isCancelled ())) {
1073- return false ;
1074- }
1075- }
1076- return true ;
1077- }
1078- };
994+ return rv ;
1079995 }
1080996
1081997 /**
@@ -3936,15 +3852,9 @@ public SMGetFuture<List<SMGetElement<Object>>> asyncBopSortMergeGet(
39363852 <T > CollectionFuture <Map <Integer , CollectionOperationStatus >> asyncCollectionPipedInsert (
39373853 final String key , final List <CollectionPipedInsert <T >> insertList ) {
39383854
3939- final ConcurrentLinkedQueue <Operation > ops = new ConcurrentLinkedQueue <Operation >();
39403855
39413856 final CountDownLatch latch = new CountDownLatch (insertList .size ());
3942-
3943- final List <OperationStatus > mergedOperationStatus = Collections
3944- .synchronizedList (new ArrayList <OperationStatus >(1 ));
3945-
3946- final Map <Integer , CollectionOperationStatus > mergedResult =
3947- new ConcurrentHashMap <Integer , CollectionOperationStatus >();
3857+ final PipedCollectionFuture rv = new PipedCollectionFuture (latch , operationTimeout , insertList .size ());
39483858
39493859 for (int i = 0 ; i < insertList .size (); i ++) {
39503860 final CollectionPipedInsert <T > insert = insertList .get (i );
@@ -3962,7 +3872,7 @@ public void receivedStatus(OperationStatus status) {
39623872 getLogger ().warn ("Unhandled state: " + status );
39633873 cstatus = new CollectionOperationStatus (status );
39643874 }
3965- mergedOperationStatus . add (cstatus );
3875+ rv . addOperationStatus (cstatus );
39663876 }
39673877
39683878 // complete
@@ -3973,96 +3883,19 @@ public void complete() {
39733883 // got status
39743884 public void gotStatus (Integer index , OperationStatus status ) {
39753885 if (status instanceof CollectionOperationStatus ) {
3976- mergedResult . put (index + (idx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
3886+ rv . addEachResult (index + (idx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
39773887 (CollectionOperationStatus ) status );
39783888 } else {
3979- mergedResult . put (index + (idx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
3889+ rv . addEachResult (index + (idx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
39803890 new CollectionOperationStatus (status ));
39813891 }
39823892 }
39833893 });
39843894 addOp (key , op );
3985- ops . add (op );
3895+ rv . addOperation (op );
39863896 }
39873897
3988- return new CollectionFuture <Map <Integer , CollectionOperationStatus >>(
3989- latch , operationTimeout ) {
3990-
3991- @ Override
3992- public boolean cancel (boolean ign ) {
3993- boolean rv = false ;
3994- for (Operation op : ops ) {
3995- op .cancel ("by application." );
3996- rv |= op .getState () == OperationState .WRITE_QUEUED ;
3997- }
3998- return rv ;
3999- }
4000-
4001- @ Override
4002- public boolean isCancelled () {
4003- for (Operation op : ops ) {
4004- if (op .isCancelled ()) {
4005- return true ;
4006- }
4007- }
4008- return false ;
4009- }
4010-
4011- @ Override
4012- public Map <Integer , CollectionOperationStatus > get (long duration ,
4013- TimeUnit units )
4014- throws InterruptedException , TimeoutException , ExecutionException {
4015-
4016- if (!latch .await (duration , units )) {
4017- Collection <Operation > timedoutOps = new HashSet <Operation >();
4018- for (Operation op : ops ) {
4019- if (op .getState () != OperationState .COMPLETE ) {
4020- timedoutOps .add (op );
4021- } else {
4022- MemcachedConnection .opSucceeded (op );
4023- }
4024- }
4025- if (timedoutOps .size () > 0 ) {
4026- MemcachedConnection .opTimedOut (timedoutOps .iterator ().next ());
4027- throw new CheckedOperationTimeoutException (duration , units , timedoutOps );
4028- }
4029- } else {
4030- // continuous timeout counter will be reset only once in pipe
4031- MemcachedConnection .opSucceeded (ops .iterator ().next ());
4032- }
4033- for (Operation op : ops ) {
4034- if (op != null && op .hasErrored ()) {
4035- throw new ExecutionException (op .getException ());
4036- }
4037-
4038- if (op != null && op .isCancelled ()) {
4039- throw new ExecutionException (new RuntimeException (op .getCancelCause ()));
4040- }
4041- }
4042-
4043- return mergedResult ;
4044- }
4045-
4046- @ Override
4047- public CollectionOperationStatus getOperationStatus () {
4048- for (OperationStatus status : mergedOperationStatus ) {
4049- if (!status .isSuccess ()) {
4050- return new CollectionOperationStatus (status );
4051- }
4052- }
4053- return new CollectionOperationStatus (true , "END" , CollectionResponse .END );
4054- }
4055-
4056- @ Override
4057- public boolean isDone () {
4058- for (Operation op : ops ) {
4059- if (!(op .getState () == OperationState .COMPLETE || op .isCancelled ())) {
4060- return false ;
4061- }
4062- }
4063- return true ;
4064- }
4065- };
3898+ return rv ;
40663899 }
40673900
40683901 @ Override
0 commit comments