Skip to content

Commit dd45eef

Browse files
authored
Fix ReadPendingException warning (#13201)
* fix ReadPendingException warning in HTTP and FCGI by making sure responseSuccess() is called only after HttpReceiverOver* could read the generated chunk; this can happen when the read/demand loop is not running in a thread that was called by the SerializedInvoker, which is the case when Response.Listener.onContentSource() spawns a thread to read the content. * add tests covering this case * Fix H3 by generating the missing EOF chunk when a trailers frame is encountered Signed-off-by: Ludovic Orban <[email protected]>
1 parent ab1c1d2 commit dd45eef

File tree

6 files changed

+160
-11
lines changed

6 files changed

+160
-11
lines changed

jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ public Content.Chunk read(boolean fillInterestIfNeeded)
126126
if (LOG.isDebugEnabled())
127127
LOG.debug("ParseAndFill needFillInterest {} in {}", needFillInterest, this);
128128
chunk = consumeChunk();
129+
if (state == State.COMPLETE)
130+
responseSuccess(getHttpExchange(), receiveNext);
129131
if (chunk != null)
130132
return chunk;
131133
if (needFillInterest && fillInterestIfNeeded)
@@ -339,12 +341,12 @@ private boolean parse(boolean notifyContentAvailable)
339341
boolean isUpgrade = status == HttpStatus.SWITCHING_PROTOCOLS_101;
340342
boolean isTunnel = getHttpChannel().isTunnel(method, status);
341343

342-
Runnable task = isUpgrade || isTunnel ? null : this.receiveNext;
343-
responseSuccess(exchange, task);
344-
345344
// Connection upgrade, bail out.
346345
if (isUpgrade || isTunnel)
346+
{
347+
responseSuccess(exchange, null);
347348
return true;
349+
}
348350

349351
if (byteBuffer.hasRemaining())
350352
{
@@ -359,10 +361,14 @@ private boolean parse(boolean notifyContentAvailable)
359361
if (LOG.isDebugEnabled())
360362
LOG.debug("Discarding unexpected content after response {}: {} in {}", status, BufferUtil.toDetailString(byteBuffer), this);
361363
BufferUtil.clear(byteBuffer);
362-
return false;
363364
}
364365
}
365366

367+
// When notifyContentAvailable==false, this method is called from read(boolean),
368+
// and the call to responseSuccess() is performed by read().
369+
if (notifyContentAvailable)
370+
responseSuccess(exchange, receiveNext);
371+
366372
// Continue to read from the network.
367373
return false;
368374
}

jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpConnectionOverFCGI.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -245,19 +245,30 @@ private boolean parse(ByteBuffer buffer, boolean notifyContentAvailable)
245245
}
246246
case COMPLETE ->
247247
{
248-
// For the complete event, handle==false, and cannot
249-
// differentiate between a complete event and a parse()
250-
// with zero or not enough bytes, so the state is reset
251-
// here to avoid calling responseSuccess() again.
252-
state = State.STATUS;
253-
channel.responseSuccess();
248+
// Do not call channel.responseSuccess() here to give HttpReceiverOverFCGI.read(boolean) a chance to read
249+
// the chunk field before channel.responseSuccess() resets it to null.
254250
}
255251
default -> throw new IllegalStateException("Invalid state " + state);
256252
}
257253

258254
return handle;
259255
}
260256

257+
boolean isComplete()
258+
{
259+
return state == State.COMPLETE;
260+
}
261+
262+
void complete()
263+
{
264+
// For the complete event, handle==false, and cannot
265+
// differentiate between a complete event and a parse()
266+
// with zero or not enough bytes, so the state is reset
267+
// here to avoid calling responseSuccess() again.
268+
state = State.STATUS;
269+
channel.responseSuccess();
270+
}
271+
261272
private void shutdown()
262273
{
263274
// Mark this receiver as shutdown, so that we can

jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ public Content.Chunk read(boolean fillInterestIfNeeded)
8888
HttpConnectionOverFCGI httpConnection = getHttpConnection();
8989
boolean needFillInterest = httpConnection.parseAndFill(false);
9090
chunk = consumeChunk();
91+
if (httpConnection.isComplete())
92+
httpConnection.complete();
9193
if (chunk != null)
9294
return chunk;
9395
if (needFillInterest && fillInterestIfNeeded)

jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/HTTP3Stream.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,7 @@ public void onTrailer(HeadersFrame frame)
362362
{
363363
notIdle();
364364
notifyTrailer(frame);
365+
dataRef.set(Data.EOF);
365366
updateClose(frame.isLast(), false);
366367
}
367368
}

jetty-core/jetty-http3/jetty-http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpStreamOverHTTP3.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ private void sendContent(MetaData.Request request, ByteBuffer content, boolean l
398398
{
399399
if (hasContent)
400400
{
401-
callback.completeWith(sendDataFrame(content, lastContent, false)
401+
callback.completeWith(sendDataFrame(content, false, false)
402402
.thenCompose(s -> sendTrailerFrame(trailers)));
403403
}
404404
else

jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientTest.java

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.eclipse.jetty.client.Origin;
4141
import org.eclipse.jetty.client.Response;
4242
import org.eclipse.jetty.client.Result;
43+
import org.eclipse.jetty.http.HttpFields;
4344
import org.eclipse.jetty.http.HttpHeader;
4445
import org.eclipse.jetty.http.HttpMethod;
4546
import org.eclipse.jetty.http.HttpStatus;
@@ -68,6 +69,7 @@
6869
import static org.hamcrest.MatcherAssert.assertThat;
6970
import static org.hamcrest.Matchers.containsString;
7071
import static org.hamcrest.Matchers.is;
72+
import static org.hamcrest.Matchers.nullValue;
7173
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
7274
import static org.junit.jupiter.api.Assertions.assertEquals;
7375
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -79,6 +81,92 @@
7981

8082
public class HttpClientTest extends AbstractTest
8183
{
84+
@ParameterizedTest
85+
@MethodSource("transports")
86+
public void testClientUseContentSourceInSpawnedThreadEmptyResponseContent(Transport transport) throws Exception
87+
{
88+
start(transport, new Handler.Abstract()
89+
{
90+
@Override
91+
public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback)
92+
{
93+
response.write(true, BufferUtil.EMPTY_BUFFER, callback);
94+
return true;
95+
}
96+
});
97+
98+
var listener = new OnContentSourceListener();
99+
100+
client.newRequest(newURI(transport))
101+
.method("POST")
102+
.send(listener);
103+
104+
OnContentSourceListener.ClientResponseContent clientResponseContent = listener.clientResponseContent.get(5, TimeUnit.SECONDS);
105+
assertThat(clientResponseContent.body(), is(""));
106+
assertThat(clientResponseContent.status(), is(200));
107+
assertThat(clientResponseContent.trailers(), nullValue());
108+
}
109+
110+
@ParameterizedTest
111+
@MethodSource("transports")
112+
public void testClientUseContentSourceInSpawnedThreadWithResponseContent(Transport transport) throws Exception
113+
{
114+
start(transport, new Handler.Abstract()
115+
{
116+
@Override
117+
public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback)
118+
{
119+
response.write(true, BufferUtil.toBuffer("some response content", StandardCharsets.UTF_8), callback);
120+
return true;
121+
}
122+
});
123+
124+
var listener = new OnContentSourceListener();
125+
126+
client.newRequest(newURI(transport))
127+
.method("POST")
128+
.send(listener);
129+
130+
OnContentSourceListener.ClientResponseContent clientResponseContent = listener.clientResponseContent.get(5, TimeUnit.SECONDS);
131+
assertThat(clientResponseContent.body(), is("some response content"));
132+
assertThat(clientResponseContent.status(), is(200));
133+
assertThat(clientResponseContent.trailers(), nullValue());
134+
}
135+
136+
@ParameterizedTest
137+
@MethodSource("transportsNoFCGI")
138+
public void testClientUseContentSourceInSpawnedThreadWithTrailer(Transport transport) throws Exception
139+
{
140+
start(transport, new Handler.Abstract()
141+
{
142+
@Override
143+
public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback) throws IOException
144+
{
145+
response.setTrailersSupplier(() -> HttpFields.build().add("X-Trailer-test", "foobar"));
146+
// start chunked mode
147+
try (Blocker.Callback blocker = Blocker.callback())
148+
{
149+
response.write(false, BufferUtil.EMPTY_BUFFER, blocker);
150+
blocker.block();
151+
}
152+
153+
response.write(true, BufferUtil.toBuffer("some response content", StandardCharsets.UTF_8), callback);
154+
return true;
155+
}
156+
});
157+
158+
var listener = new OnContentSourceListener();
159+
160+
client.newRequest(newURI(transport))
161+
.method("POST")
162+
.send(listener);
163+
164+
OnContentSourceListener.ClientResponseContent clientResponseContent = listener.clientResponseContent.get(5, TimeUnit.SECONDS);
165+
assertThat(clientResponseContent.body(), is("some response content"));
166+
assertThat(clientResponseContent.status(), is(200));
167+
assertThat(clientResponseContent.trailers().get("X-Trailer-test"), is("foobar"));
168+
}
169+
82170
@ParameterizedTest
83171
@MethodSource("transports")
84172
public void testRequestWithoutResponseContent(Transport transport) throws Exception
@@ -1155,4 +1243,45 @@ public boolean await(long timeout, TimeUnit unit) throws InterruptedException
11551243
return latch.await(timeout, unit);
11561244
}
11571245
}
1246+
1247+
private static class OnContentSourceListener implements Response.Listener
1248+
{
1249+
record ClientResponseContent(int status, String body, HttpFields trailers)
1250+
{
1251+
}
1252+
1253+
final CompletableFuture<ClientResponseContent> clientResponseContent = new CompletableFuture<>();
1254+
final StringBuffer buffer = new StringBuffer();
1255+
1256+
@Override
1257+
public void onContentSource(Response response, Content.Source contentSource)
1258+
{
1259+
new Thread(() ->
1260+
{
1261+
Content.Chunk chunk = contentSource.read();
1262+
if (chunk == null)
1263+
{
1264+
contentSource.demand(() -> onContentSource(response, contentSource));
1265+
return;
1266+
}
1267+
1268+
buffer.append(BufferUtil.toString(chunk.getByteBuffer(), StandardCharsets.UTF_8));
1269+
chunk.release();
1270+
1271+
if (!chunk.isLast())
1272+
{
1273+
contentSource.demand(() -> onContentSource(response, contentSource));
1274+
}
1275+
else
1276+
{
1277+
Content.Chunk afterLastChunk = contentSource.read();
1278+
if (afterLastChunk != chunk)
1279+
clientResponseContent.completeExceptionally(new AssertionError("afterLastChunk != chunk"));
1280+
else
1281+
clientResponseContent.complete(new ClientResponseContent(response.getStatus(), buffer.toString(), response.getTrailers()));
1282+
}
1283+
}
1284+
).start();
1285+
}
1286+
}
11581287
}

0 commit comments

Comments
 (0)