|
24 | 24 | MuxedConnUnavailable, |
25 | 25 | ) |
26 | 26 | from libp2p.stream_muxer.yamux.yamux import ( |
| 27 | + FLAG_ACK, |
27 | 28 | FLAG_FIN, |
28 | 29 | FLAG_RST, |
29 | 30 | FLAG_SYN, |
30 | 31 | GO_AWAY_PROTOCOL_ERROR, |
| 32 | + TYPE_DATA, |
31 | 33 | TYPE_PING, |
32 | 34 | TYPE_WINDOW_UPDATE, |
33 | 35 | YAMUX_HEADER_FORMAT, |
@@ -624,3 +626,150 @@ async def accept_should_unblock(): |
624 | 626 | "accept_stream() should have raised MuxedConnUnavailable" |
625 | 627 | ) |
626 | 628 | logging.debug("test_yamux_accept_stream_unblocks_on_error complete") |
| 629 | + |
| 630 | + |
| 631 | +@pytest.mark.trio |
| 632 | +async def test_yamux_syn_with_data(yamux_pair): |
| 633 | + """Test that data sent with SYN frame is properly received and buffered.""" |
| 634 | + logging.debug("Starting test_yamux_syn_with_data") |
| 635 | + client_yamux, server_yamux = yamux_pair |
| 636 | + |
| 637 | + # Manually construct a SYN frame with accompanying data |
| 638 | + test_data = b"data with SYN frame" |
| 639 | + stream_id = 1 # Client stream ID (odd number) |
| 640 | + |
| 641 | + # Create SYN header with data length |
| 642 | + syn_header = struct.pack( |
| 643 | + YAMUX_HEADER_FORMAT, |
| 644 | + 0, # version |
| 645 | + TYPE_DATA, # type |
| 646 | + FLAG_SYN, # flags |
| 647 | + stream_id, |
| 648 | + len(test_data), # length of accompanying data |
| 649 | + ) |
| 650 | + |
| 651 | + # Send SYN with data directly |
| 652 | + await client_yamux.secured_conn.write(syn_header + test_data) |
| 653 | + logging.debug(f"Sent SYN with {len(test_data)} bytes of data") |
| 654 | + |
| 655 | + # Server should accept the stream and have data already buffered |
| 656 | + server_stream = await server_yamux.accept_stream() |
| 657 | + assert server_stream.stream_id == stream_id |
| 658 | + |
| 659 | + # Verify the data was buffered and is immediately available |
| 660 | + received = await server_stream.read(len(test_data)) |
| 661 | + assert received == test_data, "Data sent with SYN should be immediately available" |
| 662 | + logging.debug("test_yamux_syn_with_data complete") |
| 663 | + |
| 664 | + |
| 665 | +@pytest.mark.trio |
| 666 | +async def test_yamux_ack_with_data(yamux_pair): |
| 667 | + """Test that data sent with ACK frame is properly received and buffered.""" |
| 668 | + logging.debug("Starting test_yamux_ack_with_data") |
| 669 | + client_yamux, server_yamux = yamux_pair |
| 670 | + |
| 671 | + # Client opens a stream (sends SYN) |
| 672 | + client_stream = await client_yamux.open_stream() |
| 673 | + stream_id = client_stream.stream_id |
| 674 | + |
| 675 | + # Wait for server to receive SYN and respond with ACK |
| 676 | + await trio.sleep(0.1) |
| 677 | + |
| 678 | + # Now manually send data with an ACK flag from server to client |
| 679 | + test_data = b"data with ACK frame" |
| 680 | + ack_header = struct.pack( |
| 681 | + YAMUX_HEADER_FORMAT, |
| 682 | + 0, # version |
| 683 | + TYPE_DATA, # type |
| 684 | + FLAG_ACK, # flags (ACK flag set) |
| 685 | + stream_id, |
| 686 | + len(test_data), # length of accompanying data |
| 687 | + ) |
| 688 | + |
| 689 | + # Send ACK with data from server to client |
| 690 | + await server_yamux.secured_conn.write(ack_header + test_data) |
| 691 | + logging.debug(f"Sent ACK with {len(test_data)} bytes of data") |
| 692 | + |
| 693 | + # Wait for the data to be processed |
| 694 | + await trio.sleep(0.1) |
| 695 | + |
| 696 | + # Verify the data was buffered on the client side |
| 697 | + # Since the stream is already open, the data should be in the buffer |
| 698 | + async with client_yamux.streams_lock: |
| 699 | + assert stream_id in client_yamux.stream_buffers |
| 700 | + assert len(client_yamux.stream_buffers[stream_id]) >= len(test_data) |
| 701 | + buffered_data = bytes(client_yamux.stream_buffers[stream_id][: len(test_data)]) |
| 702 | + # Remove the data we just checked |
| 703 | + client_yamux.stream_buffers[stream_id] = client_yamux.stream_buffers[stream_id][ |
| 704 | + len(test_data) : |
| 705 | + ] |
| 706 | + |
| 707 | + assert buffered_data == test_data, "Data sent with ACK should be buffered" |
| 708 | + logging.debug("test_yamux_ack_with_data complete") |
| 709 | + |
| 710 | + |
| 711 | +@pytest.mark.trio |
| 712 | +async def test_yamux_syn_with_empty_data(yamux_pair): |
| 713 | + """Test that SYN frame with zero-length data is handled correctly.""" |
| 714 | + logging.debug("Starting test_yamux_syn_with_empty_data") |
| 715 | + client_yamux, server_yamux = yamux_pair |
| 716 | + |
| 717 | + # Manually construct a SYN frame with no data (length = 0) |
| 718 | + stream_id = 3 # Client stream ID (odd number) |
| 719 | + |
| 720 | + syn_header = struct.pack( |
| 721 | + YAMUX_HEADER_FORMAT, |
| 722 | + 0, # version |
| 723 | + TYPE_DATA, # type |
| 724 | + FLAG_SYN, # flags |
| 725 | + stream_id, |
| 726 | + 0, # length = 0, no accompanying data |
| 727 | + ) |
| 728 | + |
| 729 | + # Send SYN with no data |
| 730 | + await client_yamux.secured_conn.write(syn_header) |
| 731 | + logging.debug("Sent SYN with no data") |
| 732 | + |
| 733 | + # Server should accept the stream |
| 734 | + server_stream = await server_yamux.accept_stream() |
| 735 | + assert server_stream.stream_id == stream_id |
| 736 | + |
| 737 | + # Verify no data is in the buffer |
| 738 | + async with server_yamux.streams_lock: |
| 739 | + assert len(server_yamux.stream_buffers[stream_id]) == 0 |
| 740 | + |
| 741 | + logging.debug("test_yamux_syn_with_empty_data complete") |
| 742 | + |
| 743 | + |
| 744 | +@pytest.mark.trio |
| 745 | +async def test_yamux_syn_with_large_data(yamux_pair): |
| 746 | + """Test that large data sent with SYN frame is properly handled.""" |
| 747 | + logging.debug("Starting test_yamux_syn_with_large_data") |
| 748 | + client_yamux, server_yamux = yamux_pair |
| 749 | + |
| 750 | + # Create large test data (but within window size) |
| 751 | + test_data = b"X" * 1024 # 1KB of data |
| 752 | + stream_id = 5 # Client stream ID (odd number) |
| 753 | + |
| 754 | + syn_header = struct.pack( |
| 755 | + YAMUX_HEADER_FORMAT, |
| 756 | + 0, # version |
| 757 | + TYPE_DATA, # type |
| 758 | + FLAG_SYN, # flags |
| 759 | + stream_id, |
| 760 | + len(test_data), |
| 761 | + ) |
| 762 | + |
| 763 | + # Send SYN with large data |
| 764 | + await client_yamux.secured_conn.write(syn_header + test_data) |
| 765 | + logging.debug(f"Sent SYN with {len(test_data)} bytes of large data") |
| 766 | + |
| 767 | + # Server should accept the stream and have all data buffered |
| 768 | + server_stream = await server_yamux.accept_stream() |
| 769 | + assert server_stream.stream_id == stream_id |
| 770 | + |
| 771 | + # Verify all data was buffered correctly |
| 772 | + received = await server_stream.read(len(test_data)) |
| 773 | + assert received == test_data |
| 774 | + assert len(received) == 1024 |
| 775 | + logging.debug("test_yamux_syn_with_large_data complete") |
0 commit comments