diff --git a/lib/msgpack_dart.dart b/lib/msgpack_dart.dart index b316734..e0f43ce 100644 --- a/lib/msgpack_dart.dart +++ b/lib/msgpack_dart.dart @@ -1,12 +1,17 @@ library msgpack_dart; +import 'dart:async'; import 'dart:convert'; import 'dart:typed_data'; +import 'package:async/async.dart'; + part 'src/common.dart'; part 'src/data_writer.dart'; part 'src/deserializer.dart'; part 'src/serializer.dart'; +part 'src/stream_serializer.dart'; +part 'src/stream_deserializer.dart'; Uint8List serialize( dynamic value, { diff --git a/lib/src/common.dart b/lib/src/common.dart index 36eac90..a25321d 100644 --- a/lib/src/common.dart +++ b/lib/src/common.dart @@ -8,3 +8,14 @@ class FormatError implements Exception { return "FormatError: $message"; } } + +/// The upstream stream closed unexpectedly while in the midst of decoding a message. +/// Thrown from [StreamDeserializer] +class UpstreamClosedError implements Exception { + const UpstreamClosedError(); + final String message = 'Upstream closed unexpectedly'; + + String toString() { + return "UpstreamClosedError: $message"; + } +} diff --git a/lib/src/stream_deserializer.dart b/lib/src/stream_deserializer.dart new file mode 100644 index 0000000..90b2531 --- /dev/null +++ b/lib/src/stream_deserializer.dart @@ -0,0 +1,250 @@ +part of msgpack_dart; + +/// A [StreamTransformer] that deserializes [Uint8List]s into Dart objects, +/// using the MessagePack format. +class StreamDeserializer extends StreamTransformerBase, dynamic> { + final ExtDecoder? _extDecoder; + + /// If false, decoded binary data buffers will reference underlying input + /// buffer and thus may change when the content of input buffer changes. + /// + /// If true, decoded buffers are copies and the underlying input buffer is + /// free to change after decoding. + final bool copyBinaryData; + final Encoding codec; + + StreamDeserializer({ + ExtDecoder? extDecoder, + this.copyBinaryData = false, + this.codec = const Utf8Codec(), + }) : _extDecoder = extDecoder; + + @override + Stream bind(Stream> stream) async* { + final bytesChunkReader = ChunkedStreamReader(stream); + + var uByte = await bytesChunkReader.readBytes(1); + while (uByte.isNotEmpty) { + final u = uByte[0]; + yield await _decode(u, bytesChunkReader); + uByte = await bytesChunkReader.readBytes(1); + } + } + + Future _decode( + int u, + ChunkedStreamReader bytesChunkReader, + ) async { + if (u <= 127) { + return u; + } else if ((u & 0xE0) == 0xE0) { + // negative small integer + return u - 256; + } else if ((u & 0xE0) == 0xA0) { + return await _readString(bytesChunkReader, u & 0x1F); + } else if ((u & 0xF0) == 0x90) { + return await _readArray(bytesChunkReader, u & 0xF); + } else if ((u & 0xF0) == 0x80) { + return await _readMap(bytesChunkReader, u & 0xF); + } + switch (u) { + case 0xc0: + return null; + case 0xc2: + return false; + case 0xc3: + return true; + case 0xcc: + return await _readUInt8(bytesChunkReader); + case 0xcd: + return await _readUInt16(bytesChunkReader); + case 0xce: + return await _readUInt32(bytesChunkReader); + case 0xcf: + return await _readUInt64(bytesChunkReader); + case 0xd0: + return await _readInt8(bytesChunkReader); + case 0xd1: + return await _readInt16(bytesChunkReader); + case 0xd2: + return await _readInt32(bytesChunkReader); + case 0xd3: + return await _readInt64(bytesChunkReader); + case 0xca: + return await _readFloat(bytesChunkReader); + case 0xcb: + return await _readDouble(bytesChunkReader); + case 0xd9: + return await _readString( + bytesChunkReader, await _readUInt8(bytesChunkReader)); + case 0xda: + return await _readString( + bytesChunkReader, await _readUInt16(bytesChunkReader)); + case 0xdb: + return await _readString( + bytesChunkReader, await _readUInt32(bytesChunkReader)); + case 0xc4: + return await _readBuffer( + bytesChunkReader, await _readUInt8(bytesChunkReader)); + case 0xc5: + return await _readBuffer( + bytesChunkReader, await _readUInt16(bytesChunkReader)); + case 0xc6: + return await _readBuffer( + bytesChunkReader, await _readUInt32(bytesChunkReader)); + case 0xdc: + return await _readArray( + bytesChunkReader, await _readUInt16(bytesChunkReader)); + case 0xdd: + return await _readArray( + bytesChunkReader, await _readUInt32(bytesChunkReader)); + case 0xde: + return await _readMap( + bytesChunkReader, await _readUInt16(bytesChunkReader)); + case 0xdf: + return await _readMap( + bytesChunkReader, await _readUInt32(bytesChunkReader)); + case 0xd4: + return await _readExt(bytesChunkReader, 1); + case 0xd5: + return await _readExt(bytesChunkReader, 2); + case 0xd6: + return await _readExt(bytesChunkReader, 4); + case 0xd7: + return await _readExt(bytesChunkReader, 8); + case 0xd8: + return await _readExt(bytesChunkReader, 16); + case 0xc7: + return await _readExt( + bytesChunkReader, await _readUInt8(bytesChunkReader)); + case 0xc8: + return await _readExt( + bytesChunkReader, await _readUInt16(bytesChunkReader)); + case 0xc9: + return await _readExt( + bytesChunkReader, await _readUInt32(bytesChunkReader)); + default: + throw FormatError("Invalid MessagePack format"); + } + } + + Future _readInt8(ChunkedStreamReader bytesChunkReader) async { + final bytes = await _expectBytes(bytesChunkReader, 1); + return ByteData.sublistView(bytes).getInt8(0); + } + + Future _readUInt8(ChunkedStreamReader bytesChunkReader) async { + final bytes = await _expectBytes(bytesChunkReader, 1); + return ByteData.sublistView(bytes).getUint8(0); + } + + Future _readUInt16(ChunkedStreamReader bytesChunkReader) async { + final bytes = await _expectBytes(bytesChunkReader, 2); + return ByteData.sublistView(bytes).getUint16(0); + } + + Future _readInt16(ChunkedStreamReader bytesChunkReader) async { + final bytes = await _expectBytes(bytesChunkReader, 2); + return ByteData.sublistView(bytes).getInt16(0); + } + + Future _readUInt32(ChunkedStreamReader bytesChunkReader) async { + final bytes = await _expectBytes(bytesChunkReader, 4); + return ByteData.sublistView(bytes).getUint32(0); + } + + Future _readInt32(ChunkedStreamReader bytesChunkReader) async { + final bytes = await _expectBytes(bytesChunkReader, 4); + return ByteData.sublistView(bytes).getInt32(0); + } + + Future _readUInt64(ChunkedStreamReader bytesChunkReader) async { + final bytes = await _expectBytes(bytesChunkReader, 8); + return ByteData.sublistView(bytes).getUint64(0); + } + + Future _readInt64(ChunkedStreamReader bytesChunkReader) async { + final bytes = await _expectBytes(bytesChunkReader, 8); + return ByteData.sublistView(bytes).getInt64(0); + } + + Future _readFloat(ChunkedStreamReader bytesChunkReader) async { + final bytes = await _expectBytes(bytesChunkReader, 4); + return ByteData.sublistView(bytes).getFloat32(0); + } + + Future _readDouble(ChunkedStreamReader bytesChunkReader) async { + final bytes = await _expectBytes(bytesChunkReader, 8); + return ByteData.sublistView(bytes).getFloat64(0); + } + + Future _readBuffer( + ChunkedStreamReader bytesChunkReader, + int length, + ) async { + final bytes = await _expectBytes(bytesChunkReader, length); + return copyBinaryData ? Uint8List.fromList(bytes) : bytes; + } + + Future _readString( + ChunkedStreamReader bytesChunkReader, + int length, + ) async { + final list = await _readBuffer(bytesChunkReader, length); + final len = list.length; + for (int i = 0; i < len; ++i) { + if (list[i] > 127) { + return codec.decode(list); + } + } + return String.fromCharCodes(list); + } + + Future _readArray( + ChunkedStreamReader bytesChunkReader, + int length, + ) async { + final res = List.filled(length, null, growable: false); + for (int i = 0; i < length; ++i) { + final uByte = await _expectBytes(bytesChunkReader, 1); + res[i] = await _decode(uByte[0], bytesChunkReader); + } + return res; + } + + Future _readMap( + ChunkedStreamReader bytesChunkReader, + int length, + ) async { + final res = Map(); + while (length > 0) { + final uByteKey = await _expectBytes(bytesChunkReader, 1); + final key = await _decode(uByteKey[0], bytesChunkReader); + final uByteValue = await _expectBytes(bytesChunkReader, 1); + final value = await _decode(uByteValue[0], bytesChunkReader); + res[key] = value; + --length; + } + return res; + } + + Future _readExt( + ChunkedStreamReader bytesChunkReader, + int length, + ) async { + final extType = await _readUInt8(bytesChunkReader); + final data = await _readBuffer(bytesChunkReader, length); + return _extDecoder?.decodeObject(extType, data); + } + + Future _expectBytes( + ChunkedStreamReader bytesChunkReader, + int length, + ) async { + final bytes = await bytesChunkReader.readBytes(length); + if (bytes.length != length) { + throw const UpstreamClosedError(); + } + return bytes; + } +} diff --git a/lib/src/stream_serializer.dart b/lib/src/stream_serializer.dart new file mode 100644 index 0000000..e456f7b --- /dev/null +++ b/lib/src/stream_serializer.dart @@ -0,0 +1,32 @@ +part of msgpack_dart; + +/// A [StreamTransformer] that serializes objects in the stream into [Uint8List]s +/// using MessagePack specification. +/// +/// Internally, it uses the [Serializer] class to handles the serialization. +/// This class is useful as an abstraction layer for the [Serializer] class for streams. +/// Alternatively, you can use the [Serializer] class directly. +class StreamSerializer extends StreamTransformerBase { + final Serializer serializer; + + StreamSerializer.withSerializer(this.serializer); + + factory StreamSerializer({ + DataWriter? dataWriter, + ExtEncoder? extEncoder, + }) => + StreamSerializer.withSerializer( + Serializer( + dataWriter: dataWriter, + extEncoder: extEncoder, + ), + ); + + @override + Stream bind(Stream stream) async* { + await for (final value in stream) { + serializer.encode(value); + yield serializer.takeBytes(); + } + } +} diff --git a/pubspec.yaml b/pubspec.yaml index 2dc2642..ac6db2e 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -8,3 +8,5 @@ environment: dev_dependencies: test: ^1.3.0 +dependencies: + async: ^2.10.0 diff --git a/test/msgpack_dart_stream_test.dart b/test/msgpack_dart_stream_test.dart new file mode 100644 index 0000000..2a5ed0d --- /dev/null +++ b/test/msgpack_dart_stream_test.dart @@ -0,0 +1,1256 @@ +import 'dart:async'; +import 'dart:typed_data'; + +import 'package:async/async.dart'; +import "package:msgpack_dart/msgpack_dart.dart"; +import 'package:test/test.dart'; + +// +// Tests taken from msgpack2 (https://github.com/butlermatt/msgpack2) +// + +var isString = predicate((e) => e is String, 'is a String'); +var isInt = predicate((e) => e is int, 'is an int'); +var isMap = predicate((e) => e is Map, 'is a Map'); +var isList = predicate((e) => e is List, 'is a List'); + +void main() { + test("Test Stream-pack null", packNull); + + group("Test Stream-pack Boolean", () { + test("Stream-pack boolean false", packFalse); + test("Stream-pack boolean true", packTrue); + }); + + group("Test Stream-pack Ints", () { + test("Stream-pack Positive FixInt", packPositiveFixInt); + test("Stream-pack Negative FixInt", packFixedNegative); + test("Stream-pack Uint8", packUint8); + test("Stream-pack Uint16", packUint16); + test("Stream-pack Uint32", packUint32); + test("Stream-pack Uint64", packUint64); + test("Stream-pack Int8", packInt8); + test("Stream-pack Int16", packInt16); + test("Stream-pack Int32", packInt32); + test("Stream-pack Int64", packInt64); + }); + + group("Test Stream-pack Floats", () { + test("Stream-pack Float32", packFloat32); + test("Stream-pack Float64 (double)", packDouble); + }); + + test("Stream-pack 5-character string", packString5); + test("Stream-pack 22-character string", packString22); + test("Stream-pack 256-character string", packString256); + test("Stream-pack string array", packStringArray); + test("Stream-pack int-to-string map", packIntToStringMap); + + group("Test Stream-pack Binary", () { + test("Stream-pack Bin8", packBin8); + test("Stream-pack Bin16", packBin16); + test("Stream-pack Bin32", packBin32); + test("Stream-pack ByteData", packByteData); + }); + + group("Test Stream-pack multiple objects", () { + test("Stream-pack multiple nulls", packMultipleNulls); + test("Stream-pack multiple booleans", packMultipleBooleans); + test("Stream-pack multiple integers", packMultipleIntegers); + test("Stream-pack multiple floats", packMultipleFloats); + test("Stream-pack multiple strings", packMultipleStrings); + test("Stream-pack multiple binaries", packMultipleBinaries); + test("Stream-pack multiple arrays", packMultipleArrays); + test("Stream-pack multiple maps", packMultipleMaps); + test("Stream-pack multiple dynamics", packMultipleDynamics); + }); + + group("Test Stream-unpack stream behavior", () { + test("Stream-unpack should stop on end of stream", unpackStreamEnd); + test("Stream-unpack should throw on abrupt end of stream", + unpackStreamAbruptEndThrows); + }); + + test("Test Stream-unpack Null", unpackNull); + + group("Test Stream-unpack boolean", () { + test("Stream-unpack boolean false", unpackFalse); + test("Stream-unpack boolean true", unpackTrue); + }); + + group("Test Stream-unpack Ints", () { + test("Stream-unpack Positive FixInt", unpackPositiveFixInt); + test("Stream-unpack Negative FixInt", unpackNegativeFixInt); + test("Stream-unpack Uint8", unpackUint8); + test("Stream-unpack Uint16", unpackUint16); + test("Stream-unpack Uint32", unpackUint32); + test("Stream-unpack Uint64", unpackUint64); + test("Stream-unpack Int8", unpackInt8); + test("Stream-unpack Int16", unpackInt16); + test("Stream-unpack Int32", unpackInt32); + test("Stream-unpack Int64", unpackInt64); + }); + + group("Test Stream-unpack Floats", () { + test("Stream-unpack Float32", unpackFloat32); + test("Stream-unpack Float64 (double)", unpackDouble); + }); + + test("Stream-unpack 5-character string", unpackString5); + test("Stream-unpack 22-character string", unpackString22); + test("Stream-unpack 256-character string", unpackString256); + test("Stream-unpack string array", unpackStringArray); + test("Stream-unpack int-to-string map", unpackIntToStringMap); + + group("Test Stream-unpack Large Array and Map", () { + test("Stream-unpack Large Array", largeArray); + test("Stream-unpack Very Large Array", veryLargeArray); + test("Stream-unpack Large Map", largeMap); + test("Stream-unpack Very Large Map", veryLargeMap); + }); + + group("Test Stream-unpack multiple objects", () { + test("Stream-unpack multiple nulls", unpackMultipleNulls); + test("Stream-unpack multiple booleans", unpackMultipleBooleans); + test("Stream-unpack multiple integers", unpackMultipleIntegers); + test("Stream-unpack multiple floats", unpackMultipleFloats); + test("Stream-unpack multiple strings", unpackMultipleStrings); + test("Stream-unpack multiple binaries", unpackMultipleBinaries); + test("Stream-unpack multiple arrays", unpackMultipleArrays); + test("Stream-unpack multiple maps", unpackMultipleMaps); + test("Stream-unpack multiple dynamics", unpackMultipleDynamics); + }); + + group("Test Stream-unpack from chunks", () { + test("Stream-unpack multiple dynamics from chunks", + unpackMultipleDynamicsFromChunks); + }); +} + +Future streamPack(Iterable objects) async { + final stream = Stream.fromIterable(objects).transform(StreamSerializer()); + final bytes = await collectBytes(stream); + return bytes; +} + +Future largeArray() async { + final list = []; + for (int i = 0; i < 16; ++i) { + list.add("Item $i"); + } + + final serialized = await streamPack([list]); + List deserialized = deserialize(Uint8List.fromList(serialized)); + expect(deserialized, list); +} + +Future veryLargeArray() async { + final list = []; + for (int i = 0; i < 65536; ++i) { + list.add("Item $i"); + } + + final serialized = await streamPack([list]); + List deserialized = deserialize(serialized); + expect(deserialized, list); +} + +Future largeMap() async { + final map = Map(); + for (int i = 0; i < 16; ++i) { + map[i] = "Item $i"; + } + final serialized = await streamPack([map]); + Map deserialized = deserialize(serialized); + expect(deserialized, map); +} + +Future veryLargeMap() async { + final map = Map(); + for (int i = 0; i < 65536; ++i) { + map[i] = "Item $i"; + } + final serialized = await streamPack([map]); + Map deserialized = deserialize(serialized); + expect(deserialized, map); +} + +Future packNull() async { + List encoded = await streamPack([null]); + expect(encoded, orderedEquals([0xc0])); +} + +Future packFalse() async { + List encoded = await streamPack([false]); + expect(encoded, orderedEquals([0xc2])); +} + +Future packTrue() async { + List encoded = await streamPack([true]); + expect(encoded, orderedEquals([0xc3])); +} + +Future packPositiveFixInt() async { + List encoded = await streamPack([1]); + expect(encoded, orderedEquals([1])); +} + +Future packFixedNegative() async { + List encoded = await streamPack([-16]); + expect(encoded, orderedEquals([240])); +} + +Future packUint8() async { + List encoded = await streamPack([128]); + expect(encoded, orderedEquals([204, 128])); +} + +Future packUint16() async { + List encoded = await streamPack([32768]); + expect(encoded, orderedEquals([205, 128, 0])); +} + +Future packUint32() async { + List encoded = await streamPack([2147483648]); + expect(encoded, orderedEquals([206, 128, 0, 0, 0])); +} + +Future packUint64() async { + List encoded = await streamPack([9223372036854775807]); + expect(encoded, orderedEquals([207, 127, 255, 255, 255, 255, 255, 255, 255])); +} + +Future packInt8() async { + List encoded = await streamPack([-128]); + expect(encoded, orderedEquals([208, 128])); +} + +Future packInt16() async { + List encoded = await streamPack([-32768]); + expect(encoded, orderedEquals([209, 128, 0])); +} + +Future packInt32() async { + List encoded = await streamPack([-2147483648]); + expect(encoded, orderedEquals([210, 128, 0, 0, 0])); +} + +Future packInt64() async { + List encoded = await streamPack([-9223372036854775808]); + expect(encoded, orderedEquals([211, 128, 0, 0, 0, 0, 0, 0, 0])); +} + +Future packFloat32() async { + List encoded = await streamPack([Float(3.14)]); + expect(encoded, orderedEquals([202, 64, 72, 245, 195])); +} + +Future packDouble() async { + List encoded = await streamPack([3.14]); + expect(encoded, + orderedEquals([0xcb, 0x40, 0x09, 0x1e, 0xb8, 0x51, 0xeb, 0x85, 0x1f])); +} + +Future packString5() async { + List encoded = await streamPack(["hello"]); + expect(encoded, orderedEquals([165, 104, 101, 108, 108, 111])); +} + +Future packString22() async { + List encoded = await streamPack(["hello there, everyone!"]); + expect( + encoded, + orderedEquals([ + 182, + 104, + 101, + 108, + 108, + 111, + 32, + 116, + 104, + 101, + 114, + 101, + 44, + 32, + 101, + 118, + 101, + 114, + 121, + 111, + 110, + 101, + 33 + ])); +} + +Future packString256() async { + List encoded = await streamPack([ + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" + ]); + expect(encoded, hasLength(259)); + expect(encoded.sublist(0, 3), orderedEquals([218, 1, 0])); + expect(encoded.sublist(3, 259), everyElement(65)); +} + +Future packBin8() async { + var data = Uint8List.fromList(List.filled(32, 65)); + List encoded = await streamPack([data]); + expect(encoded.length, equals(34)); + expect(encoded.getRange(0, 2), orderedEquals([0xc4, 32])); + expect(encoded.getRange(2, encoded.length), orderedEquals(data)); +} + +Future packBin16() async { + var data = Uint8List.fromList(List.filled(256, 65)); + List encoded = await streamPack([data]); + expect(encoded.length, equals(256 + 3)); + expect(encoded.getRange(0, 3), orderedEquals([0xc5, 1, 0])); + expect(encoded.getRange(3, encoded.length), orderedEquals(data)); +} + +Future packBin32() async { + var data = Uint8List.fromList(List.filled(65536, 65)); + List encoded = await streamPack([data]); + expect(encoded.length, equals(65536 + 5)); + expect(encoded.getRange(0, 5), orderedEquals([0xc6, 0, 1, 0, 0])); + expect(encoded.getRange(5, encoded.length), orderedEquals(data)); +} + +Future packByteData() async { + var data = ByteData.view(Uint8List.fromList(List.filled(32, 65)).buffer); + List encoded = await streamPack([data]); + expect(encoded.length, equals(34)); + expect(encoded.getRange(0, 2), orderedEquals([0xc4, 32])); + expect(encoded.getRange(2, encoded.length), + orderedEquals(data.buffer.asUint8List())); +} + +Future packStringArray() async { + List encoded = await streamPack([ + ["one", "two", "three"] + ]); + expect( + encoded, + orderedEquals([ + 147, + 163, + 111, + 110, + 101, + 163, + 116, + 119, + 111, + 165, + 116, + 104, + 114, + 101, + 101 + ])); +} + +Future packIntToStringMap() async { + List encoded = await streamPack([ + {1: "one", 2: "two"} + ]); + expect(encoded, + orderedEquals([130, 1, 163, 111, 110, 101, 2, 163, 116, 119, 111])); +} + +Future packMultipleNulls() async { + List encoded = await streamPack([null, null, null]); + expect(encoded, orderedEquals([0xc0, 0xc0, 0xc0])); +} + +Future packMultipleBooleans() async { + List encoded = await streamPack([ + false, + true, + ]); + expect( + encoded, + orderedEquals([ + ...[0xc2], + ...[0xc3], + ]), + ); +} + +Future packMultipleIntegers() async { + List encoded = await streamPack([ + 128, + 32768, + 2147483648, + 9223372036854775807, + -128, + -32768, + -2147483648, + -9223372036854775808, + ]); + expect( + encoded, + orderedEquals([ + ...[204, 128], + ...[205, 128, 0], + ...[206, 128, 0, 0, 0], + ...[207, 127, 255, 255, 255, 255, 255, 255, 255], + ...[208, 128], + ...[209, 128, 0], + ...[210, 128, 0, 0, 0], + ...[211, 128, 0, 0, 0, 0, 0, 0, 0], + ]), + ); +} + +Future packMultipleFloats() async { + List encoded = await streamPack([ + Float(3.14), + 3.14, + ]); + expect( + encoded, + orderedEquals([ + ...[202, 64, 72, 245, 195], + ...[0xcb, 0x40, 0x09, 0x1e, 0xb8, 0x51, 0xeb, 0x85, 0x1f], + ]), + ); +} + +Future packMultipleStrings() async { + List encoded = await streamPack([ + "hello", + "hello there, everyone!", + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" + ]); + const string1Serialized = [165, 104, 101, 108, 108, 111]; + const string2Serialized = [ + 182, + 104, + 101, + 108, + 108, + 111, + 32, + 116, + 104, + 101, + 114, + 101, + 44, + 32, + 101, + 118, + 101, + 114, + 121, + 111, + 110, + 101, + 33 + ]; + expect( + encoded, + hasLength(string1Serialized.length + string2Serialized.length + 259), + ); + expect( + encoded.getRange(0, string1Serialized.length), + orderedEquals(string1Serialized), + ); + expect( + encoded.getRange(string1Serialized.length, + string1Serialized.length + string2Serialized.length), + orderedEquals(string2Serialized), + ); + final string3Serialized = encoded.sublist( + string1Serialized.length + string2Serialized.length, + ); + expect(string3Serialized.getRange(0, 3), orderedEquals([218, 1, 0])); + expect(string3Serialized.getRange(3, 259), everyElement(65)); +} + +Future packMultipleBinaries() async { + final data1 = Uint8List.fromList(List.filled(32, 65)); + final data2 = Uint8List.fromList(List.filled(256, 65)); + final data3 = Uint8List.fromList(List.filled(65536, 65)); + final data4 = ByteData.view(Uint8List.fromList(List.filled(32, 65)).buffer); + List encoded = await streamPack([data1, data2, data3, data4]); + expect(encoded.length, equals(32 + 2 + 256 + 3 + 65536 + 5 + 32 + 2)); + expect(encoded.getRange(0, 2), orderedEquals([0xc4, 32])); + expect(encoded.getRange(2, 2 + data1.length), orderedEquals(data1)); + expect(encoded.getRange(32 + 2, 32 + 2 + 3), orderedEquals([0xc5, 1, 0])); + expect( + encoded.getRange(32 + 2 + 3, 32 + 2 + 3 + data2.length), + orderedEquals(data2), + ); + expect( + encoded.getRange(32 + 2 + 256 + 3, 32 + 2 + 256 + 3 + 5), + orderedEquals([0xc6, 0, 1, 0, 0]), + ); + expect( + encoded.getRange(32 + 2 + 256 + 3 + 5, 32 + 2 + 256 + 3 + 5 + data3.length), + orderedEquals(data3), + ); + expect( + encoded.getRange( + 32 + 2 + 256 + 3 + 5 + 65536, + 32 + 2 + 256 + 3 + 5 + 65536 + 2, + ), + orderedEquals([0xc4, 32]), + ); + expect( + encoded.getRange(32 + 2 + 256 + 3 + 65536 + 5 + 2, + 32 + 2 + 256 + 3 + 65536 + 5 + 2 + data4.lengthInBytes), + orderedEquals(data4.buffer.asUint8List()), + ); +} + +Future packMultipleArrays() async { + final data = List.generate(3, (_) => ["one", "two", "three"]); + List encoded = await streamPack(data); + final expectedSegmentBytes = [ + 147, + 163, + 111, + 110, + 101, + 163, + 116, + 119, + 111, + 165, + 116, + 104, + 114, + 101, + 101 + ]; + Iterable repeatedBytes = Iterable.empty(); + for (int i = 0; i < data.length; i++) { + repeatedBytes = repeatedBytes.followedBy(expectedSegmentBytes); + } + expect(encoded, orderedEquals(repeatedBytes)); +} + +Future packMultipleMaps() async { + final data = List.generate(3, (_) => {1: "one", 2: "two"}); + List encoded = await streamPack(data); + final expectedSegmentBytes = [ + 130, + 1, + 163, + 111, + 110, + 101, + 2, + 163, + 116, + 119, + 111 + ]; + Iterable repeatedBytes = Iterable.empty(); + for (int i = 0; i < data.length; i++) { + repeatedBytes = repeatedBytes.followedBy(expectedSegmentBytes); + } + expect(encoded, orderedEquals(repeatedBytes)); +} + +Future packMultipleDynamics() async { + List encoded = await streamPack([ + null, + true, + 9223372036854775807, + -9223372036854775808, + Float(3.14), + 3.14, + "hello there, everyone!", + Uint8List.fromList(List.filled(65536, 65)), + ByteData.view(Uint8List.fromList(List.filled(32, 65)).buffer), + ["one", "two", "three"], + {1: "one", 2: "two"}, + ]); + expect( + encoded, + orderedEquals([ + ...[0xc0], + ...[0xc3], + ...[207, 127, 255, 255, 255, 255, 255, 255, 255], + ...[211, 128, 0, 0, 0, 0, 0, 0, 0], + ...[202, 64, 72, 245, 195], + ...[0xcb, 0x40, 0x09, 0x1e, 0xb8, 0x51, 0xeb, 0x85, 0x1f], + ...[ + 182, + 104, + 101, + 108, + 108, + 111, + 32, + 116, + 104, + 101, + 114, + 101, + 44, + 32, + 101, + 118, + 101, + 114, + 121, + 111, + 110, + 101, + 33 + ], + ...[0xc6, 0, 1, 0, 0, ...List.filled(65536, 65)], + ...[0xc4, 32, ...List.filled(32, 65)], + ...[ + 147, + 163, + 111, + 110, + 101, + 163, + 116, + 119, + 111, + 165, + 116, + 104, + 114, + 101, + 101 + ], + ...[130, 1, 163, 111, 110, 101, 2, 163, 116, 119, 111], + ]), + ); +} + +// Test unpacking +Future> streamUnpack(Iterable dataChunks) async { + final stream = + Stream.fromIterable(dataChunks).transform(StreamDeserializer().cast()); + final result = await stream.toList(); + return result; +} + +Future unpackStreamEnd() async { + final toEmit = Uint8List.fromList([0xc3]); + final streamController = StreamController(); + final stream = streamController.stream.transform(StreamDeserializer().cast()); + streamController.add(toEmit); + final closeFuture = streamController.close(); + await expectLater( + stream, + emitsInOrder([ + emits(true), + emitsDone, + ]), + ); + await expectLater(closeFuture, completes); +} + +Future unpackStreamAbruptEndThrows() async { + final toEmit = Uint8List.fromList([ + 165, + 104, + 101, + 108, + // 108, + // 111, + ]); // String of hello + final streamController = StreamController(); + final stream = streamController.stream.transform(StreamDeserializer().cast()); + streamController.add(toEmit); + final closeFuture = streamController.close(); + await expectLater( + stream, + emitsInOrder([ + neverEmits(equals("hello")), + emitsError(isA()), + emitsDone, + ]), + ); + await expectLater(closeFuture, completes); +} + +Future unpackNull() async { + Uint8List data = new Uint8List.fromList([0xc0]); + var value = (await streamUnpack([data]))[0]; + expect(value, isNull); +} + +Future unpackFalse() async { + Uint8List data = Uint8List.fromList([0xc2]); + var value = (await streamUnpack([data]))[0]; + expect(value, isFalse); +} + +Future unpackTrue() async { + Uint8List data = Uint8List.fromList([0xc3]); + var value = (await streamUnpack([data]))[0]; + expect(value, isTrue); +} + +Future unpackString5() async { + Uint8List data = new Uint8List.fromList([165, 104, 101, 108, 108, 111]); + var value = (await streamUnpack([data]))[0]; + expect(value, isString); + expect(value, equals("hello")); +} + +Future unpackString22() async { + Uint8List data = new Uint8List.fromList([ + 182, + 104, + 101, + 108, + 108, + 111, + 32, + 116, + 104, + 101, + 114, + 101, + 44, + 32, + 101, + 118, + 101, + 114, + 121, + 111, + 110, + 101, + 33 + ]); + var value = (await streamUnpack([data]))[0]; + expect(value, isString); + expect(value, equals("hello there, everyone!")); +} + +Future unpackPositiveFixInt() async { + Uint8List data = Uint8List.fromList([1]); + var value = (await streamUnpack([data]))[0]; + expect(value, isInt); + expect(value, equals(1)); +} + +Future unpackNegativeFixInt() async { + Uint8List data = Uint8List.fromList([240]); + var value = (await streamUnpack([data]))[0]; + expect(value, isInt); + expect(value, equals(-16)); +} + +Future unpackUint8() async { + Uint8List data = Uint8List.fromList([204, 128]); + var value = (await streamUnpack([data]))[0]; + expect(value, isInt); + expect(value, equals(128)); +} + +Future unpackUint16() async { + Uint8List data = Uint8List.fromList([205, 128, 0]); + var value = (await streamUnpack([data]))[0]; + expect(value, isInt); + expect(value, equals(32768)); +} + +Future unpackUint32() async { + Uint8List data = Uint8List.fromList([206, 128, 0, 0, 0]); + var value = (await streamUnpack([data]))[0]; + expect(value, isInt); + expect(value, equals(2147483648)); +} + +Future unpackUint64() async { + // Dart 2 doesn't support true Uint64 without using BigInt + Uint8List data = + Uint8List.fromList([207, 127, 255, 255, 255, 255, 255, 255, 255]); + var value = (await streamUnpack([data]))[0]; + expect(value, isInt); + expect(value, equals(9223372036854775807)); +} + +Future unpackInt8() async { + Uint8List data = Uint8List.fromList([208, 128]); + var value = (await streamUnpack([data]))[0]; + expect(value, isInt); + expect(value, equals(-128)); +} + +Future unpackInt16() async { + Uint8List data = Uint8List.fromList([209, 128, 0]); + var value = (await streamUnpack([data]))[0]; + expect(value, isInt); + expect(value, equals(-32768)); + data = Uint8List.fromList([0xd1, 0x04, 0xd2]); + print(deserialize(data)); +} + +Future unpackInt32() async { + Uint8List data = Uint8List.fromList([210, 128, 0, 0, 0]); + var value = (await streamUnpack([data]))[0]; + expect(value, isInt); + expect(value, equals(-2147483648)); +} + +Future unpackInt64() async { + Uint8List data = Uint8List.fromList([211, 128, 0, 0, 0, 0, 0, 0, 0]); + var value = (await streamUnpack([data]))[0]; + expect(value, isInt); + expect(value, equals(-9223372036854775808)); +} + +Future unpackFloat32() async { + Uint8List data = Uint8List.fromList([202, 64, 72, 245, 195]); + var value = (await streamUnpack([data]))[0]; + expect((value as double).toStringAsPrecision(3), equals('3.14')); +} + +Future unpackDouble() async { + Uint8List data = Uint8List.fromList( + [0xcb, 0x40, 0x09, 0x1e, 0xb8, 0x51, 0xeb, 0x85, 0x1f]); + var value = (await streamUnpack([data]))[0]; + expect(value, equals(3.14)); +} + +Future unpackString256() async { + Uint8List data = + new Uint8List.fromList([218, 1, 0]..addAll(new List.filled(256, 65))); + var value = (await streamUnpack([data]))[0]; + expect(value, isString); + expect( + value, + equals( + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")); +} + +Future unpackStringArray() async { + Uint8List data = new Uint8List.fromList([ + 147, + 163, + 111, + 110, + 101, + 163, + 116, + 119, + 111, + 165, + 116, + 104, + 114, + 101, + 101 + ]); + var value = (await streamUnpack([data]))[0]; + expect(value, isList); + expect(value, orderedEquals(["one", "two", "three"])); +} + +Future unpackIntToStringMap() async { + Uint8List data = new Uint8List.fromList( + [130, 1, 163, 111, 110, 101, 2, 163, 116, 119, 111]); + var value = (await streamUnpack([data]))[0]; + expect(value, isMap); + expect(value[1], equals("one")); + expect(value[2], equals("two")); +} + +Future unpackSmallDateTime() async { + var data = [0xd7, 0xff, 0, 0, 0, 0, 0, 0, 0, 0]; + var value = (await streamUnpack([Uint8List.fromList(data)]))[0]; + expect(value, equals(DateTime.fromMillisecondsSinceEpoch(0))); + data = [0xd7, 0xff, 47, 175, 8, 0, 91, 124, 180, 16]; + value = (await streamUnpack([Uint8List.fromList(data)]))[0]; + expect((value as DateTime).toUtc(), + equals(DateTime.utc(2018, 8, 22, 0, 56, 56, 200))); +} + +Future unpackPastDate() async { + var data = [ + 0xc7, + 12, + 0xff, + 29, + 205, + 101, + 0, + 255, + 255, + 255, + 255, + 184, + 204, + 121, + 158 + ]; + + var value = (await streamUnpack([Uint8List.fromList(data)]))[0] as DateTime; + expect(value.toUtc(), equals(DateTime.utc(1932, 2, 24, 1, 53, 45, 500))); + + data = [ + 199, + 12, + 255, + 0, + 0, + 0, + 0, + 255, + 255, + 255, + 255, + 255, + 255, + 248, + 248 + ]; + value = (await streamUnpack([Uint8List.fromList(data)]))[0]; + expect(value.toUtc(), equals(DateTime.utc(1969, 12, 31, 23, 30))); +} + +Future unpackMultipleNulls() async { + Uint8List data = new Uint8List.fromList([0xc0, 0xc0, 0xc0]); + var value = await streamUnpack([data]); + expect(value, orderedEquals([null, null, null])); +} + +Future unpackMultipleBooleans() async { + Uint8List data = new Uint8List.fromList([0xc2, 0xc3]); + var value = await streamUnpack([data]); + expect(value, orderedEquals([false, true])); +} + +Future unpackMultipleIntegers() async { + Uint8List data = new Uint8List.fromList([ + ...[1], + ...[240], + ...[204, 128], + ...[205, 128, 0], + ...[206, 128, 0, 0, 0], + ...[207, 127, 255, 255, 255, 255, 255, 255, 255], + ...[208, 128], + ...[209, 128, 0], + ...[210, 128, 0, 0, 0], + ...[211, 128, 0, 0, 0, 0, 0, 0, 0], + ]); + var value = await streamUnpack([data]); + expect( + value, + orderedEquals([ + 1, + -16, + 128, + 32768, + 2147483648, + 9223372036854775807, + -128, + -32768, + -2147483648, + -9223372036854775808, + ]), + ); +} + +Future unpackMultipleFloats() async { + Uint8List data = new Uint8List.fromList([ + ...[202, 64, 72, 245, 195], + ...[0xcb, 0x40, 0x09, 0x1e, 0xb8, 0x51, 0xeb, 0x85, 0x1f], + ]); + var value = await streamUnpack([data]); + expect(value, hasLength(2)); + expect((value[0] as double).toStringAsPrecision(3), equals('3.14')); + expect(value[1], equals(3.14)); +} + +Future unpackMultipleStrings() async { + Uint8List data = new Uint8List.fromList([ + ...[165, 104, 101, 108, 108, 111], + ...[ + 182, + 104, + 101, + 108, + 108, + 111, + 32, + 116, + 104, + 101, + 114, + 101, + 44, + 32, + 101, + 118, + 101, + 114, + 121, + 111, + 110, + 101, + 33 + ], + ...Uint8List.fromList([218, 1, 0]..addAll(new List.filled(256, 65))) + ]); + var value = await streamUnpack([data]); + expect( + value, + orderedEquals([ + "hello", + "hello there, everyone!", + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + ]), + ); +} + +Future unpackMultipleBinaries() async { + Uint8List data = new Uint8List.fromList([ + ...[0xc4, 32, ...List.filled(32, 65)], + ...[0xc5, 1, 0, ...List.filled(256, 65)], + ...[0xc6, 0, 1, 0, 0, ...List.filled(65536, 65)], + ]); + var value = await streamUnpack([data]); + expect(value, hasLength(3)); + expect(value[0], orderedEquals(Uint8List.fromList(List.filled(32, 65)))); + expect(value[1], orderedEquals(Uint8List.fromList(List.filled(256, 65)))); + expect(value[2], orderedEquals(Uint8List.fromList(List.filled(65536, 65)))); +} + +Future unpackMultipleArrays() async { + final segmentBytes = [ + 147, + 163, + 111, + 110, + 101, + 163, + 116, + 119, + 111, + 165, + 116, + 104, + 114, + 101, + 101 + ]; + const segmentCount = 3; + Iterable data = Iterable.empty(); + for (var i = 0; i < segmentCount; i++) { + data = data.followedBy(segmentBytes); + } + var value = await streamUnpack([Uint8List.fromList(data.toList())]); + expect(value, hasLength(segmentCount)); + expect(value[0], orderedEquals(["one", "two", "three"])); + expect(value[1], orderedEquals(["one", "two", "three"])); + expect(value[2], orderedEquals(["one", "two", "three"])); +} + +Future unpackMultipleMaps() async { + final segmentBytes = [130, 1, 163, 111, 110, 101, 2, 163, 116, 119, 111]; + const segmentCount = 3; + Iterable data = Iterable.empty(); + for (var i = 0; i < segmentCount; i++) { + data = data.followedBy(segmentBytes); + } + var value = await streamUnpack([Uint8List.fromList(data.toList())]); + expect(value, hasLength(segmentCount)); + for (var i = 0; i < segmentCount; i++) { + expect(value[i], isMap); + expect(value[i], hasLength(2)); + expect(value[i][1], equals("one")); + expect(value[i][2], equals("two")); + } +} + +Future unpackMultipleDynamics() async { + final data = Uint8List.fromList([ + ...[0xc0], + ...[0xc2], + ...[0xc3], + ...[207, 127, 255, 255, 255, 255, 255, 255, 255], + ...[211, 128, 0, 0, 0, 0, 0, 0, 0], + ...[202, 64, 72, 245, 195], + ...[0xcb, 0x40, 0x09, 0x1e, 0xb8, 0x51, 0xeb, 0x85, 0x1f], + ...[165, 104, 101, 108, 108, 111], + ...[ + 182, + 104, + 101, + 108, + 108, + 111, + 32, + 116, + 104, + 101, + 114, + 101, + 44, + 32, + 101, + 118, + 101, + 114, + 121, + 111, + 110, + 101, + 33 + ], + ...[0xc4, 32, ...List.filled(32, 65)], + ...[ + 147, + 163, + 111, + 110, + 101, + 163, + 116, + 119, + 111, + 165, + 116, + 104, + 114, + 101, + 101 + ], + ...[130, 1, 163, 111, 110, 101, 2, 163, 116, 119, 111], + ]); + final value = await streamUnpack([data]); + final expectedValues = [ + null, + false, + true, + 9223372036854775807, + -9223372036854775808, + 3.14, // f32 + 3.14, // f64 + "hello", + "hello there, everyone!", + Uint8List.fromList(List.filled(32, 65)), + ["one", "two", "three"], + {1: "one", 2: "two"}, + ]; + expect(value, hasLength(expectedValues.length)); + expect(value[0], equals(expectedValues[0])); + expect(value[1], equals(expectedValues[1])); + expect(value[2], equals(expectedValues[2])); + expect(value[3], equals(expectedValues[3])); + expect(value[4], equals(expectedValues[4])); + expect( + (value[5] as double).toStringAsPrecision(3), + equals(expectedValues[5].toString()), + ); + expect(value[6], equals(expectedValues[6])); + expect(value[7], equals(expectedValues[7])); + expect(value[8], equals(expectedValues[8])); + expect(value[9], orderedEquals(expectedValues[9] as Uint8List)); + expect(value[10], orderedEquals(expectedValues[10] as List)); + expect(value[11], equals(expectedValues[11])); +} + +Future unpackMultipleDynamicsFromChunks() async { + final data = Uint8List.fromList([ + ...[0xc0], + ...[0xc2], + ...[0xc3], + ...[207, 127, 255, 255, 255, 255, 255, 255, 255], + ...[211, 128, 0, 0, 0, 0, 0, 0, 0], + ...[202, 64, 72, 245, 195], + ...[0xcb, 0x40, 0x09, 0x1e, 0xb8, 0x51, 0xeb, 0x85, 0x1f], + ...[165, 104, 101, 108, 108, 111], + ...[ + 182, + 104, + 101, + 108, + 108, + 111, + 32, + 116, + 104, + 101, + 114, + 101, + 44, + 32, + 101, + 118, + 101, + 114, + 121, + 111, + 110, + 101, + 33 + ], + ...[0xc4, 32, ...List.filled(32, 65)], + ...[ + 147, + 163, + 111, + 110, + 101, + 163, + 116, + 119, + 111, + 165, + 116, + 104, + 114, + 101, + 101 + ], + ...[130, 1, 163, 111, 110, 101, 2, 163, 116, 119, 111], + ]); + final chunkedData = []; + const chunkSize = 5; + for (var i = 0; i < data.length; i += chunkSize) { + final endAt = i + chunkSize; + chunkedData.add(data.sublist(i, endAt > data.length ? data.length : endAt)); + } + final value = await streamUnpack(chunkedData); + final expectedValues = [ + null, + false, + true, + 9223372036854775807, + -9223372036854775808, + 3.14, // f32 + 3.14, // f64 + "hello", + "hello there, everyone!", + Uint8List.fromList(List.filled(32, 65)), + ["one", "two", "three"], + {1: "one", 2: "two"}, + ]; + expect(value, hasLength(expectedValues.length)); + expect(value[0], equals(expectedValues[0])); + expect(value[1], equals(expectedValues[1])); + expect(value[2], equals(expectedValues[2])); + expect(value[3], equals(expectedValues[3])); + expect(value[4], equals(expectedValues[4])); + expect( + (value[5] as double).toStringAsPrecision(3), + equals(expectedValues[5].toString()), + ); + expect(value[6], equals(expectedValues[6])); + expect(value[7], equals(expectedValues[7])); + expect(value[8], equals(expectedValues[8])); + expect(value[9], orderedEquals(expectedValues[9] as Uint8List)); + expect(value[10], orderedEquals(expectedValues[10] as List)); + expect(value[11], equals(expectedValues[11])); +}