Skip to content

Commit e734b4a

Browse files
committed
hashmail: block until stream is freed
Fix flaky tests. Reproducer: go test -run TestHashMailServerReturnStream -count=20 TestHashMailServerReturnStream fails because the test cancels a read stream and immediately dials RecvStream again expecting the same stream to be handed out once the server returns it. The hashmail server implemented RequestReadStream/RequestWriteStream with a non-blocking channel poll and returned "read/write stream occupied" as soon as the mailbox was busy. That raced with the deferred ReturnStream call and the reconnect often happened before the stream got pushed back, so clients received the occupancy error instead of the context cancellation they triggered. Teach RequestReadStream/RequestWriteStream to wait for the stream to become available (or the caller's context / server shutdown) with a bounded timeout. If the wait expires we still return the "... stream occupied" error, so callers that legitimately pile up can see that signal. The new streamAcquireTimeout constant documents the policy, and the blocking select removes the race, so reconnect attempts now either succeed or surface the original context error.
1 parent bf020ea commit e734b4a

File tree

2 files changed

+26
-3
lines changed

2 files changed

+26
-3
lines changed

hashmail_server.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ const (
3939
// reads for it to be considered for pruning. Otherwise, memory will grow
4040
// unbounded.
4141
streamTTL = 24 * time.Hour
42+
43+
// streamAcquireTimeout determines how long we wait for a read/write
44+
// stream to become available before reporting it as occupied. Context
45+
// cancellation is still honoured immediately, so callers can shorten
46+
// the wait.
47+
streamAcquireTimeout = 250 * time.Millisecond
4248
)
4349

4450
// streamID is the identifier of a stream.
@@ -317,7 +323,14 @@ func (s *stream) RequestReadStream(ctx context.Context) (*readStream, error) {
317323
case r := <-s.readStreamChan:
318324
s.status.streamTaken(true)
319325
return r, nil
320-
default:
326+
327+
case <-s.quit:
328+
return nil, fmt.Errorf("stream shutting down")
329+
330+
case <-ctx.Done():
331+
return nil, ctx.Err()
332+
333+
case <-time.After(streamAcquireTimeout):
321334
return nil, fmt.Errorf("read stream occupied")
322335
}
323336
}
@@ -332,7 +345,14 @@ func (s *stream) RequestWriteStream(ctx context.Context) (*writeStream, error) {
332345
case w := <-s.writeStreamChan:
333346
s.status.streamTaken(false)
334347
return w, nil
335-
default:
348+
349+
case <-s.quit:
350+
return nil, fmt.Errorf("stream shutting down")
351+
352+
case <-ctx.Done():
353+
return nil, ctx.Err()
354+
355+
case <-time.After(streamAcquireTimeout):
336356
return nil, fmt.Errorf("write stream occupied")
337357
}
338358
}

hashmail_server_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,10 @@ func recvFromStream(client hashmailrpc.HashMailClient) error {
512512
}()
513513

514514
select {
515-
case <-time.After(time.Second):
515+
// Wait a little longer than the server's stream-acquire timeout so we
516+
// only trip this path when the server truly failed to hand over the
517+
// stream (instead of beating it to the punch).
518+
case <-time.After(2 * streamAcquireTimeout):
516519
return fmt.Errorf("timed out waiting to receive from receive " +
517520
"stream")
518521

0 commit comments

Comments
 (0)