126126import net .spy .memcached .internal .CollectionGetBulkFuture ;
127127import net .spy .memcached .internal .OperationFuture ;
128128import net .spy .memcached .internal .SMGetFuture ;
129+ import net .spy .memcached .internal .PipedCollectionFuture ;
129130import net .spy .memcached .ops .BTreeFindPositionOperation ;
130131import net .spy .memcached .ops .BTreeFindPositionWithGetOperation ;
131132import net .spy .memcached .ops .BTreeGetBulkOperation ;
@@ -948,15 +949,9 @@ public void gotStatus(Integer index, OperationStatus status) {
948949 <T > CollectionFuture <Map <Integer , CollectionOperationStatus >> asyncCollectionPipedUpdate (
949950 final String key , final List <CollectionPipedUpdate <T >> updateList ) {
950951
951- final ConcurrentLinkedQueue <Operation > ops = new ConcurrentLinkedQueue <Operation >();
952-
953952 final CountDownLatch latch = new CountDownLatch (updateList .size ());
954-
955- final List <CollectionOperationStatus > mergedOperationStatus = Collections
956- .synchronizedList (new ArrayList <CollectionOperationStatus >(updateList .size ()));
957-
958- final Map <Integer , CollectionOperationStatus > mergedResult =
959- new ConcurrentHashMap <Integer , CollectionOperationStatus >();
953+ final PipedCollectionFuture <Integer , CollectionOperationStatus > rv =
954+ new PipedCollectionFuture <Integer , CollectionOperationStatus >(latch , operationTimeout , updateList .size ());
960955
961956 for (int i = 0 ; i < updateList .size (); i ++) {
962957 final CollectionPipedUpdate <T > update = updateList .get (i );
@@ -974,7 +969,7 @@ public void receivedStatus(OperationStatus status) {
974969 getLogger ().warn ("Unhandled state: " + status );
975970 cstatus = new CollectionOperationStatus (status );
976971 }
977- mergedOperationStatus . add (cstatus );
972+ rv . addOperationStatus (cstatus );
978973 }
979974
980975 // complete
@@ -985,97 +980,18 @@ public void complete() {
985980 // got status
986981 public void gotStatus (Integer index , OperationStatus status ) {
987982 if (status instanceof CollectionOperationStatus ) {
988- mergedResult . put (index + (idx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
983+ rv . addEachResult (index + (idx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
989984 (CollectionOperationStatus ) status );
990985 } else {
991- mergedResult . put (index + (idx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
986+ rv . addEachResult (index + (idx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
992987 new CollectionOperationStatus (status ));
993988 }
994989 }
995990 });
996991 addOp (key , op );
997- ops . add (op );
992+ rv . addOperation (op );
998993 }
999-
1000- return new CollectionFuture <Map <Integer , CollectionOperationStatus >>(
1001- latch , operationTimeout ) {
1002-
1003- @ Override
1004- public boolean cancel (boolean ign ) {
1005- boolean rv = false ;
1006- for (Operation op : ops ) {
1007- op .cancel ("by application." );
1008- rv |= op .getState () == OperationState .WRITE_QUEUED ;
1009- }
1010- return rv ;
1011- }
1012-
1013- @ Override
1014- public boolean isCancelled () {
1015- for (Operation op : ops ) {
1016- if (op .isCancelled ()) {
1017- return true ;
1018- }
1019- }
1020- return false ;
1021- }
1022-
1023- @ Override
1024- public Map <Integer , CollectionOperationStatus > get (long duration ,
1025- TimeUnit units )
1026- throws InterruptedException , TimeoutException , ExecutionException {
1027-
1028- if (!latch .await (duration , units )) {
1029- Collection <Operation > timedoutOps = new HashSet <Operation >();
1030- for (Operation op : ops ) {
1031- if (op .getState () != OperationState .COMPLETE ) {
1032- timedoutOps .add (op );
1033- } else {
1034- MemcachedConnection .opSucceeded (op );
1035- }
1036- }
1037- if (timedoutOps .size () > 0 ) {
1038- MemcachedConnection .opTimedOut (timedoutOps .iterator ().next ());
1039- throw new CheckedOperationTimeoutException (duration , units , timedoutOps );
1040- }
1041- } else {
1042- // continuous timeout counter will be reset only once in pipe
1043- MemcachedConnection .opSucceeded (ops .iterator ().next ());
1044- }
1045-
1046- for (Operation op : ops ) {
1047- if (op != null && op .hasErrored ()) {
1048- throw new ExecutionException (op .getException ());
1049- }
1050-
1051- if (op != null && op .isCancelled ()) {
1052- throw new ExecutionException (new RuntimeException (op .getCancelCause ()));
1053- }
1054- }
1055-
1056- return mergedResult ;
1057- }
1058-
1059- @ Override
1060- public CollectionOperationStatus getOperationStatus () {
1061- for (CollectionOperationStatus status : mergedOperationStatus ) {
1062- if (!status .isSuccess ()) {
1063- return status ;
1064- }
1065- }
1066- return new CollectionOperationStatus (true , "END" , CollectionResponse .END );
1067- }
1068-
1069- @ Override
1070- public boolean isDone () {
1071- for (Operation op : ops ) {
1072- if (!(op .getState () == OperationState .COMPLETE || op .isCancelled ())) {
1073- return false ;
1074- }
1075- }
1076- return true ;
1077- }
1078- };
994+ return rv ;
1079995 }
1080996
1081997 /**
@@ -3903,15 +3819,9 @@ public SMGetFuture<List<SMGetElement<Object>>> asyncBopSortMergeGet(
39033819 <T > CollectionFuture <Map <Integer , CollectionOperationStatus >> asyncCollectionPipedInsert (
39043820 final String key , final List <CollectionPipedInsert <T >> insertList ) {
39053821
3906- final ConcurrentLinkedQueue <Operation > ops = new ConcurrentLinkedQueue <Operation >();
3907-
39083822 final CountDownLatch latch = new CountDownLatch (insertList .size ());
3909-
3910- final List <CollectionOperationStatus > mergedOperationStatus = Collections
3911- .synchronizedList (new ArrayList <CollectionOperationStatus >(insertList .size ()));
3912-
3913- final Map <Integer , CollectionOperationStatus > mergedResult =
3914- new ConcurrentHashMap <Integer , CollectionOperationStatus >();
3823+ final PipedCollectionFuture <Integer , CollectionOperationStatus > rv =
3824+ new PipedCollectionFuture <Integer , CollectionOperationStatus >(latch , operationTimeout , insertList .size ());
39153825
39163826 for (int i = 0 ; i < insertList .size (); i ++) {
39173827 final CollectionPipedInsert <T > insert = insertList .get (i );
@@ -3929,7 +3839,7 @@ public void receivedStatus(OperationStatus status) {
39293839 getLogger ().warn ("Unhandled state: " + status );
39303840 cstatus = new CollectionOperationStatus (status );
39313841 }
3932- mergedOperationStatus . add (cstatus );
3842+ rv . addOperationStatus (cstatus );
39333843 }
39343844
39353845 // complete
@@ -3940,96 +3850,18 @@ public void complete() {
39403850 // got status
39413851 public void gotStatus (Integer index , OperationStatus status ) {
39423852 if (status instanceof CollectionOperationStatus ) {
3943- mergedResult . put (index + (idx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
3853+ rv . addEachResult (index + (idx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
39443854 (CollectionOperationStatus ) status );
39453855 } else {
3946- mergedResult . put (index + (idx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
3856+ rv . addEachResult (index + (idx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
39473857 new CollectionOperationStatus (status ));
39483858 }
39493859 }
39503860 });
39513861 addOp (key , op );
3952- ops . add (op );
3862+ rv . addOperation (op );
39533863 }
3954-
3955- return new CollectionFuture <Map <Integer , CollectionOperationStatus >>(
3956- latch , operationTimeout ) {
3957-
3958- @ Override
3959- public boolean cancel (boolean ign ) {
3960- boolean rv = false ;
3961- for (Operation op : ops ) {
3962- op .cancel ("by application." );
3963- rv |= op .getState () == OperationState .WRITE_QUEUED ;
3964- }
3965- return rv ;
3966- }
3967-
3968- @ Override
3969- public boolean isCancelled () {
3970- for (Operation op : ops ) {
3971- if (op .isCancelled ()) {
3972- return true ;
3973- }
3974- }
3975- return false ;
3976- }
3977-
3978- @ Override
3979- public Map <Integer , CollectionOperationStatus > get (long duration ,
3980- TimeUnit units )
3981- throws InterruptedException , TimeoutException , ExecutionException {
3982-
3983- if (!latch .await (duration , units )) {
3984- Collection <Operation > timedoutOps = new HashSet <Operation >();
3985- for (Operation op : ops ) {
3986- if (op .getState () != OperationState .COMPLETE ) {
3987- timedoutOps .add (op );
3988- } else {
3989- MemcachedConnection .opSucceeded (op );
3990- }
3991- }
3992- if (timedoutOps .size () > 0 ) {
3993- MemcachedConnection .opTimedOut (timedoutOps .iterator ().next ());
3994- throw new CheckedOperationTimeoutException (duration , units , timedoutOps );
3995- }
3996- } else {
3997- // continuous timeout counter will be reset only once in pipe
3998- MemcachedConnection .opSucceeded (ops .iterator ().next ());
3999- }
4000- for (Operation op : ops ) {
4001- if (op != null && op .hasErrored ()) {
4002- throw new ExecutionException (op .getException ());
4003- }
4004-
4005- if (op != null && op .isCancelled ()) {
4006- throw new ExecutionException (new RuntimeException (op .getCancelCause ()));
4007- }
4008- }
4009-
4010- return mergedResult ;
4011- }
4012-
4013- @ Override
4014- public CollectionOperationStatus getOperationStatus () {
4015- for (CollectionOperationStatus status : mergedOperationStatus ) {
4016- if (!status .isSuccess ()) {
4017- return status ;
4018- }
4019- }
4020- return new CollectionOperationStatus (true , "END" , CollectionResponse .END );
4021- }
4022-
4023- @ Override
4024- public boolean isDone () {
4025- for (Operation op : ops ) {
4026- if (!(op .getState () == OperationState .COMPLETE || op .isCancelled ())) {
4027- return false ;
4028- }
4029- }
4030- return true ;
4031- }
4032- };
3864+ return rv ;
40333865 }
40343866
40353867 @ Override
0 commit comments