|
43 | 43 | import java.util.concurrent.TimeoutException; |
44 | 44 | import java.util.concurrent.atomic.AtomicBoolean; |
45 | 45 | import java.util.concurrent.atomic.AtomicInteger; |
46 | | -import java.util.concurrent.atomic.AtomicReference; |
47 | 46 | import java.util.concurrent.locks.ReentrantLock; |
48 | 47 | import java.util.jar.JarFile; |
49 | 48 | import java.util.jar.Manifest; |
|
128 | 127 | import net.spy.memcached.internal.SMGetFuture; |
129 | 128 | import net.spy.memcached.internal.PipedCollectionFuture; |
130 | 129 | import net.spy.memcached.internal.CollectionGetFuture; |
| 130 | +import net.spy.memcached.internal.BroadcastFuture; |
131 | 131 | import net.spy.memcached.ops.BTreeFindPositionOperation; |
132 | 132 | import net.spy.memcached.ops.BTreeFindPositionWithGetOperation; |
133 | 133 | import net.spy.memcached.ops.BTreeGetBulkOperation; |
@@ -1952,96 +1952,29 @@ public OperationFuture<Boolean> flush(final String prefix) { |
1952 | 1952 |
|
1953 | 1953 | @Override |
1954 | 1954 | public OperationFuture<Boolean> flush(final String prefix, final int delay) { |
1955 | | - final AtomicReference<Boolean> flushResult = new AtomicReference<Boolean>(true); |
1956 | | - final ConcurrentLinkedQueue<Operation> ops = new ConcurrentLinkedQueue<Operation>(); |
1957 | | - |
1958 | | - final CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() { |
1959 | | - public Operation newOp(final MemcachedNode n, |
1960 | | - final CountDownLatch latch) { |
1961 | | - Operation op = opFact.flush(prefix, delay, false, |
1962 | | - new OperationCallback() { |
1963 | | - public void receivedStatus(OperationStatus s) { |
1964 | | - if (!s.isSuccess()) { |
1965 | | - flushResult.set(false); |
1966 | | - } |
1967 | | - } |
1968 | | - |
1969 | | - public void complete() { |
1970 | | - latch.countDown(); |
1971 | | - } |
1972 | | - }); |
1973 | | - ops.add(op); |
1974 | | - return op; |
1975 | | - } |
1976 | | - }); |
1977 | | - |
1978 | | - return new OperationFuture<Boolean>(blatch, flushResult, |
1979 | | - operationTimeout) { |
1980 | | - @Override |
1981 | | - public boolean cancel(boolean ign) { |
1982 | | - boolean rv = false; |
1983 | | - for (Operation op : ops) { |
1984 | | - rv |= op.cancel("by application."); |
1985 | | - } |
1986 | | - return rv; |
1987 | | - } |
| 1955 | + Collection<MemcachedNode> nodes = getAllNodes(); |
| 1956 | + final BroadcastFuture<Boolean> rv |
| 1957 | + = new BroadcastFuture<Boolean>(operationTimeout, Boolean.TRUE, nodes.size()); |
1988 | 1958 |
|
1989 | | - @Override |
1990 | | - public boolean isCancelled() { |
1991 | | - for (Operation op : ops) { |
1992 | | - if (op.isCancelled()) { |
1993 | | - return true; |
1994 | | - } |
1995 | | - } |
1996 | | - return false; |
1997 | | - } |
1998 | | - |
1999 | | - @Override |
2000 | | - public Boolean get(long duration, TimeUnit units) |
2001 | | - throws InterruptedException, TimeoutException, ExecutionException { |
2002 | | - |
2003 | | - if (!blatch.await(duration, units)) { |
2004 | | - // whenever timeout occurs, continuous timeout counter will increase by 1. |
2005 | | - Collection<Operation> timedoutOps = new HashSet<Operation>(); |
2006 | | - for (Operation op : ops) { |
2007 | | - if (op.getState() != OperationState.COMPLETE) { |
2008 | | - MemcachedConnection.opTimedOut(op); |
2009 | | - timedoutOps.add(op); |
2010 | | - } else { |
2011 | | - MemcachedConnection.opSucceeded(op); |
2012 | | - } |
2013 | | - } |
2014 | | - if (timedoutOps.size() > 0) { |
2015 | | - throw new CheckedOperationTimeoutException(duration, units, timedoutOps); |
2016 | | - } |
2017 | | - } else { |
2018 | | - // continuous timeout counter will be reset |
2019 | | - MemcachedConnection.opsSucceeded(ops); |
2020 | | - } |
2021 | | - |
2022 | | - for (Operation op : ops) { |
2023 | | - if (op != null && op.hasErrored()) { |
2024 | | - throw new ExecutionException(op.getException()); |
2025 | | - } |
2026 | | - |
2027 | | - if (op != null && op.isCancelled()) { |
2028 | | - throw new ExecutionException(new RuntimeException(op.getCancelCause())); |
| 1959 | + checkState(); |
| 1960 | + for (MemcachedNode node : nodes) { |
| 1961 | + Operation op = opFact.flush(prefix, delay, false, new OperationCallback() { |
| 1962 | + @Override |
| 1963 | + public void receivedStatus(OperationStatus status) { |
| 1964 | + if (!status.isSuccess()) { |
| 1965 | + rv.set(Boolean.FALSE, status); |
2029 | 1966 | } |
2030 | 1967 | } |
2031 | 1968 |
|
2032 | | - return flushResult.get(); |
2033 | | - } |
2034 | | - |
2035 | | - @Override |
2036 | | - public boolean isDone() { |
2037 | | - for (Operation op : ops) { |
2038 | | - if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) { |
2039 | | - return false; |
2040 | | - } |
| 1969 | + @Override |
| 1970 | + public void complete() { |
| 1971 | + rv.complete(); |
2041 | 1972 | } |
2042 | | - return true; |
2043 | | - } |
2044 | | - }; |
| 1973 | + }); |
| 1974 | + rv.addOp(op); |
| 1975 | + getMemcachedConnection().addOperation(node, op); |
| 1976 | + } |
| 1977 | + return rv; |
2045 | 1978 | } |
2046 | 1979 |
|
2047 | 1980 | @Override |
|
0 commit comments