Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketListener;
Expand All @@ -45,6 +50,9 @@ public class WebSocketProxy implements WebSocketListener
ByteBuffer socketReadBuffer;
CompletionHandler<Integer, Void> socketReadHandler;
CompletionHandler<Integer, Void> socketWriteHandler;

private final Queue<ByteBuffer> writeQueue = new ConcurrentLinkedQueue<>();
private final AtomicBoolean writeInProgress = new AtomicBoolean(false);


WebSocketProxy(InetSocketAddress mqttHost, Logger logger)
Expand Down Expand Up @@ -143,13 +151,63 @@ public void onWebSocketClose(int statusCode, String reason)


@Override
public void onWebSocketBinary(byte[] payload, int offset, int len)
{
if (mqttSocket != null && mqttSocket.isOpen())
mqttSocket.write(ByteBuffer.wrap(payload));
public void onWebSocketBinary(byte[] payload, int offset, int len) {
if (mqttSocket == null || !mqttSocket.isOpen())
return;

ByteBuffer buf = ByteBuffer.wrap(Arrays.copyOfRange(payload, offset, offset + len));

writeQueue.add(buf);
tryWrite();
}

private void tryWrite() {
if (!writeInProgress.compareAndSet(false, true))
return;

ByteBuffer first = writeQueue.poll();
if (first == null) {
writeInProgress.set(false);
return;
}

// Wrapper so the handler can mutate the current buffer
class State {
ByteBuffer current = first;
}

State state = new State();

mqttSocket.write(state.current, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
// Write the remaining bytes
if (state.current.hasRemaining()) {
mqttSocket.write(state.current, null, this);
return;
}

// Get next buffer if finished
ByteBuffer next = writeQueue.poll();
if (next == null) {
writeInProgress.set(false);
return;
}

state.current = next;
mqttSocket.write(state.current, null, this);
}

@Override
public void failed(Throwable exc, Void attachment) {
writeInProgress.set(false);
log.error("Error writing to MQTT TCP socket", exc);
}
});
}



@Override
public void onWebSocketText(String message)
{
Expand Down