Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions avro/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ pub struct Writer<'a, W: Write> {
user_metadata: HashMap<String, Value>,
}

/// A buffer containing Avro serialized data ready to be written to a Writer
/// See [Writer::serialize_ser] and [Writer::extend_avro_serialized_buffer]
pub struct AvroSerializedBuffer {
buffer: Vec<u8>,
num_values: usize,
}
Comment on lines +68 to +71
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new public type AvroSerializedBuffer should be exported in avro/src/lib.rs in the pub use writer:: block (around line 911) to make it accessible to library consumers. Currently users cannot import this type even though it's returned by the public serialize_ser method.

Copilot uses AI. Check for mistakes.

impl<'a, W: Write> Writer<'a, W> {
/// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write
/// to.
Expand Down Expand Up @@ -304,6 +311,61 @@ impl<'a, W: Write> Writer<'a, W> {
Ok(num_bytes)
}

/// Writes a previously serialized bundle of rows directly to the writer (see [serialize_ser](Self::serialize_ser)).
/// This will not flush any intermediate buffers - only write the provided buffer
/// directly to the underlying writer.
pub fn extend_avro_serialized_buffer(
&mut self,
avro_serialized_buffer: AvroSerializedBuffer,
) -> AvroResult<usize> {
let mut num_bytes = self.maybe_write_header()?;
let buffer = avro_serialized_buffer.buffer;
let stream_len = buffer.len();
let num_values = avro_serialized_buffer.num_values;

num_bytes += self.append_raw(&num_values.into(), &Schema::Long)?
+ self.append_raw(&stream_len.into(), &Schema::Long)?
+ self
.writer
.write(buffer.as_ref())
.map_err(Details::WriteBytes)?
+ self.append_marker()?;

self.writer.flush().map_err(Details::FlushWriter)?;

Ok(num_bytes)
}

/// Serialize an iterator of serde::Serialize objects into an AvroSerializedBuffer. This call
/// does not need a `mut` self - so it is safe to call from multiple threads to prepare data
/// for writing.
pub fn serialize_ser<I, T: Serialize>(&self, values: I) -> AvroResult<AvroSerializedBuffer>
where
I: IntoIterator<Item = T>,
{
let rs = match self.resolved_schema {
Some(ref rs) => rs,
None => &ResolvedSchema::try_from(self.schema)?,
};

let mut buffer = Vec::new();
let mut count = 0;
let mut serializer =
SchemaAwareWriteSerializer::new(&mut buffer, self.schema, rs.get_names(), None);

for value in values {
value.serialize(&mut serializer)?;
count += 1;
}

self.codec.compress(&mut buffer)?;

Ok(AvroSerializedBuffer {
buffer,
num_values: count,
})
}

/// Extend a `Writer` by appending each `Value` from a slice, while also performing schema
/// validation on each value appended.
///
Expand Down