Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions pkg/p2p/streamtest/streamtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Recorder struct {
streamErr func(swarm.Address, string, string, string) error
pingErr func(ma.Multiaddr) (time.Duration, error)
protocolsWithPeers map[string]p2p.ProtocolSpec
messageLatency time.Duration
}

func WithProtocols(protocols ...p2p.ProtocolSpec) Option {
Expand Down Expand Up @@ -82,6 +83,12 @@ func WithPingErr(pingErr func(ma.Multiaddr) (time.Duration, error)) Option {
})
}

func WithMessageLatency(latency time.Duration) Option {
return optionFunc(func(r *Recorder) {
r.messageLatency = latency
})
}

func New(opts ...Option) *Recorder {
r := &Recorder{
records: make(map[string][]*Record),
Expand Down Expand Up @@ -115,8 +122,8 @@ func (r *Recorder) NewStream(ctx context.Context, addr swarm.Address, h p2p.Head
}
}

recordIn := newRecord()
recordOut := newRecord()
recordIn := newRecord(r.messageLatency)
recordOut := newRecord(r.messageLatency)
streamOut := newStream(recordIn, recordOut)
streamIn := newStream(recordOut, recordIn)

Expand Down Expand Up @@ -328,16 +335,20 @@ type record struct {
c int
lock sync.Mutex
dataSigC chan struct{}
latency time.Duration
closed bool
}

func newRecord() *record {
func newRecord(latency time.Duration) *record {
return &record{
dataSigC: make(chan struct{}, 16),
latency: latency,
}
}

func (r *record) Read(p []byte) (n int, err error) {
defer time.Sleep(r.latency)

for r.c == r.bytesSize() {
_, ok := <-r.dataSigC
if !ok {
Expand All @@ -356,6 +367,8 @@ func (r *record) Read(p []byte) (n int, err error) {
}

func (r *record) Write(p []byte) (int, error) {
defer time.Sleep(r.latency)

r.lock.Lock()
defer r.lock.Unlock()

Expand Down
45 changes: 45 additions & 0 deletions pkg/p2p/streamtest/streamtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"strings"
"testing"
"testing/synctest"
"time"

"github.com/ethersphere/bee/v2/pkg/p2p"
Expand Down Expand Up @@ -785,6 +786,50 @@ func TestRecorder_ping(t *testing.T) {
}
}

func TestRecorder_WithMessageLatency(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
latency := 200 * time.Millisecond
recorder := streamtest.New(
streamtest.WithMessageLatency(latency),
streamtest.WithProtocols(
newTestProtocol(func(_ context.Context, peer p2p.Peer, stream p2p.Stream) error {
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
if _, err := rw.ReadString('\n'); err != nil {
return err
}
if _, err := rw.WriteString("pong\n"); err != nil {
return err
}
return rw.Flush()
}),
),
)

stream, err := recorder.NewStream(context.Background(), swarm.ZeroAddress, nil, testProtocolName, testProtocolVersion, testStreamName)
if err != nil {
t.Fatal(err)
}
defer stream.Close()

start := time.Now()
if _, err := stream.Write([]byte("ping\n")); err != nil {
t.Fatal(err)
}
if duration := time.Since(start); duration < latency {
t.Errorf("write took %v, want >= %v", duration, latency)
}

start = time.Now()
rw := bufio.NewReader(stream)
if _, err := rw.ReadString('\n'); err != nil {
t.Fatal(err)
}
if duration := time.Since(start); duration < latency {
t.Errorf("read took %v, want >= %v", duration, latency)
}
})
}

const (
testProtocolName = "testing"
testProtocolVersion = "1.0.1"
Expand Down
151 changes: 72 additions & 79 deletions pkg/pingpong/pingpong_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,106 +8,99 @@ import (
"bytes"
"context"
"fmt"
"runtime"
"testing"
"testing/synctest"
"time"

"github.com/ethersphere/bee/v2/pkg/swarm"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/p2p"
"github.com/ethersphere/bee/v2/pkg/p2p/protobuf"
"github.com/ethersphere/bee/v2/pkg/p2p/streamtest"
"github.com/ethersphere/bee/v2/pkg/pingpong"
"github.com/ethersphere/bee/v2/pkg/pingpong/pb"
)

func TestPing(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
logger := log.Noop

logger := log.Noop
// create a pingpong server that handles the incoming stream
server := pingpong.New(nil, logger, nil)

// create a pingpong server that handles the incoming stream
server := pingpong.New(nil, logger, nil)
messageLatency := 200 * time.Millisecond

// setup the stream recorder to record stream data
recorder := streamtest.New(
streamtest.WithProtocols(server.Protocol()),
streamtest.WithMiddlewares(func(f p2p.HandlerFunc) p2p.HandlerFunc {
if runtime.GOOS == "windows" {
// windows has a bit lower time resolution
// so, slow down the handler with a middleware
// not to get 0s for rtt value
time.Sleep(100 * time.Millisecond)
}
return f
}),
)
// setup the stream recorder to record stream data
recorder := streamtest.New(
streamtest.WithProtocols(server.Protocol()),
streamtest.WithMessageLatency(messageLatency),
)

// create a pingpong client that will do pinging
client := pingpong.New(recorder, logger, nil)
// create a pingpong client that will do pinging
client := pingpong.New(recorder, logger, nil)

// ping
addr := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c")
greetings := []string{"hey", "there", "fella"}
rtt, err := client.Ping(context.Background(), addr, greetings...)
if err != nil {
t.Fatal(err)
}
// ping
addr := swarm.MustParseHexAddress("ca1e9f3938cc1425c6061b96ad9eb93e134dfe8734ad490164ef20af9d1cf59c")
greetings := []string{"hey", "there", "fella"}
rtt, err := client.Ping(context.Background(), addr, greetings...)
if err != nil {
t.Fatal(err)
}

// check that RTT is a sane value
if rtt <= 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test must evaluate that the rtt is not 0 as well, as zero value can also signal that the protocol does not work.

It could be now made that the protocol waits when responding to messages with the middleware for any os (to remove the goos check), that and to check if the rtt value is measured correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to check the rtt is not zero as well, but the tests are failing. I think as I already mentioned since time is virtual in synctest so the the rtt can be zero.
https://github.com/ethersphere/bee/actions/runs/19616730484/job/56172634154

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, we have to make the time to advance in order to measure rtt in the bubble, by blocking something. I thought thought that it is good to use the middleware that you removed that sleeps, but for any os. Test is working with that approach, but it actually does not make the test correct as it measures the middleware, and we want to measure messages reading and writing latencies. So, I have added the option to recorder to introduce the message latency so that we can measure rtt, now even exactly, which is much better than before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, good.
Here it might be better to compare rtt as 2 * len(greetings) * messageLatency ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Updated.

t.Errorf("invalid RTT value %v", rtt)
}
// check that RTT is the sum of all message latencies
if rtt != 6*messageLatency {
t.Errorf("invalid RTT value %v", rtt)
}

// get a record for this stream
records, err := recorder.Records(addr, "pingpong", "1.0.0", "pingpong")
if err != nil {
t.Fatal(err)
}
if l := len(records); l != 1 {
t.Fatalf("got %v records, want %v", l, 1)
}
record := records[0]
// get a record for this stream
records, err := recorder.Records(addr, "pingpong", "1.0.0", "pingpong")
if err != nil {
t.Fatal(err)
}
if l := len(records); l != 1 {
t.Fatalf("got %v records, want %v", l, 1)
}
record := records[0]

// validate received ping greetings from the client
wantGreetings := greetings
messages, err := protobuf.ReadMessages(
bytes.NewReader(record.In()),
func() protobuf.Message { return new(pb.Ping) },
)
if err != nil {
t.Fatal(err)
}
gotGreetings := make([]string, 0, len(messages))
for _, m := range messages {
gotGreetings = append(gotGreetings, m.(*pb.Ping).Greeting)
}
if fmt.Sprint(gotGreetings) != fmt.Sprint(wantGreetings) {
t.Errorf("got greetings %v, want %v", gotGreetings, wantGreetings)
}
// validate received ping greetings from the client
wantGreetings := greetings
messages, err := protobuf.ReadMessages(
bytes.NewReader(record.In()),
func() protobuf.Message { return new(pb.Ping) },
)
if err != nil {
t.Fatal(err)
}
gotGreetings := make([]string, 0, len(messages))
for _, m := range messages {
gotGreetings = append(gotGreetings, m.(*pb.Ping).Greeting)
}
if fmt.Sprint(gotGreetings) != fmt.Sprint(wantGreetings) {
t.Errorf("got greetings %v, want %v", gotGreetings, wantGreetings)
}

// validate sent pong responses by handler
wantResponses := make([]string, 0, len(greetings))
for _, g := range greetings {
wantResponses = append(wantResponses, "{"+g+"}")
}
messages, err = protobuf.ReadMessages(
bytes.NewReader(record.Out()),
func() protobuf.Message { return new(pb.Pong) },
)
if err != nil {
t.Fatal(err)
}
gotResponses := make([]string, 0, len(messages))
for _, m := range messages {
gotResponses = append(gotResponses, m.(*pb.Pong).Response)
}
if fmt.Sprint(gotResponses) != fmt.Sprint(wantResponses) {
t.Errorf("got responses %v, want %v", gotResponses, wantResponses)
}
// validate sent pong responses by handler
wantResponses := make([]string, 0, len(greetings))
for _, g := range greetings {
wantResponses = append(wantResponses, "{"+g+"}")
}
messages, err = protobuf.ReadMessages(
bytes.NewReader(record.Out()),
func() protobuf.Message { return new(pb.Pong) },
)
if err != nil {
t.Fatal(err)
}
gotResponses := make([]string, 0, len(messages))
for _, m := range messages {
gotResponses = append(gotResponses, m.(*pb.Pong).Response)
}
if fmt.Sprint(gotResponses) != fmt.Sprint(wantResponses) {
t.Errorf("got responses %v, want %v", gotResponses, wantResponses)
}

if err := record.Err(); err != nil {
t.Fatal(err)
}
if err := record.Err(); err != nil {
t.Fatal(err)
}
})
}
Loading