diff --git a/avro/src/writer.rs b/avro/src/writer.rs index 9c879918..799d2043 100644 --- a/avro/src/writer.rs +++ b/avro/src/writer.rs @@ -63,6 +63,13 @@ pub struct Writer<'a, W: Write> { user_metadata: HashMap, } +/// A buffer containing Avro serialized data ready to be written to a Writer +/// See [Writer::serialize_ser] and [Writer::extend_avro_serialized_buffer] +pub struct AvroSerializedBuffer { + buffer: Vec, + num_values: usize, +} + impl<'a, W: Write> Writer<'a, W> { /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write /// to. @@ -304,6 +311,61 @@ impl<'a, W: Write> Writer<'a, W> { Ok(num_bytes) } + /// Writes a previously serialized bundle of rows directly to the writer (see [serialize_ser](Self::serialize_ser)). + /// This will not flush any intermediate buffers - only write the provided buffer + /// directly to the underlying writer. + pub fn extend_avro_serialized_buffer( + &mut self, + avro_serialized_buffer: AvroSerializedBuffer, + ) -> AvroResult { + let mut num_bytes = self.maybe_write_header()?; + let buffer = avro_serialized_buffer.buffer; + let stream_len = buffer.len(); + let num_values = avro_serialized_buffer.num_values; + + num_bytes += self.append_raw(&num_values.into(), &Schema::Long)? + + self.append_raw(&stream_len.into(), &Schema::Long)? + + self + .writer + .write(buffer.as_ref()) + .map_err(Details::WriteBytes)? + + self.append_marker()?; + + self.writer.flush().map_err(Details::FlushWriter)?; + + Ok(num_bytes) + } + + /// Serialize an iterator of serde::Serialize objects into an AvroSerializedBuffer. This call + /// does not need a `mut` self - so it is safe to call from multiple threads to prepare data + /// for writing. + pub fn serialize_ser(&self, values: I) -> AvroResult + where + I: IntoIterator, + { + let rs = match self.resolved_schema { + Some(ref rs) => rs, + None => &ResolvedSchema::try_from(self.schema)?, + }; + + let mut buffer = Vec::new(); + let mut count = 0; + let mut serializer = + SchemaAwareWriteSerializer::new(&mut buffer, self.schema, rs.get_names(), None); + + for value in values { + value.serialize(&mut serializer)?; + count += 1; + } + + self.codec.compress(&mut buffer)?; + + Ok(AvroSerializedBuffer { + buffer, + num_values: count, + }) + } + /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema /// validation on each value appended. ///