diff --git a/.github/workflows/test-lang-rust-ci.yml b/.github/workflows/test-lang-rust-ci.yml index 6c84abe4..5119ea20 100644 --- a/.github/workflows/test-lang-rust-ci.yml +++ b/.github/workflows/test-lang-rust-ci.yml @@ -42,7 +42,7 @@ jobs: - 'stable' - 'beta' - 'nightly' - - '1.86.0' # MSRV + - '1.88.0' # MSRV runner: - name: ubuntu-24.04 target: x86_64-unknown-linux-gnu diff --git a/.github/workflows/test-lang-rust-clippy.yml b/.github/workflows/test-lang-rust-clippy.yml index 75162293..dc8de558 100644 --- a/.github/workflows/test-lang-rust-clippy.yml +++ b/.github/workflows/test-lang-rust-clippy.yml @@ -40,7 +40,7 @@ jobs: matrix: rust: - 'stable' - - '1.86.0' # MSRV + - '1.88.0' # MSRV steps: - uses: actions/checkout@v6 - uses: dtolnay/rust-toolchain@stable diff --git a/Cargo.toml b/Cargo.toml index f1755532..dea33afd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ version = "0.22.0" license = "Apache-2.0" repository = "https://github.com/apache/avro-rs" edition = "2024" -rust-version = "1.86.0" +rust-version = "1.88.0" keywords = ["avro", "data", "serialization"] categories = ["encoding"] documentation = "https://docs.rs/apache-avro" diff --git a/avro/README.md b/avro/README.md index d9d7922f..fb0282c7 100644 --- a/avro/README.md +++ b/avro/README.md @@ -115,7 +115,7 @@ versions. If you have troubles upgrading, check the release notes. ## Minimum supported Rust version -1.86.0 +1.88.0 ## Defining a schema diff --git a/avro/src/error.rs b/avro/src/error.rs index bdb20552..8f610e04 100644 --- a/avro/src/error.rs +++ b/avro/src/error.rs @@ -611,15 +611,28 @@ pub enum CompatibilityError { #[error("Incompatible schemata! Field '{0}' in reader schema must have a default value")] MissingDefaultValue(String), - #[error("Incompatible schemata! Reader's symbols must contain all writer's symbols")] + #[error("Incompatible schemata! Reader's symbols contain none of the writer's symbols")] MissingSymbols, #[error("Incompatible schemata! All elements in union must match for both schemas")] MissingUnionElements, - #[error("Incompatible schemata! Name and size don't match for fixed")] + #[error("Incompatible schemata! At least one element in the union must match the schema")] + SchemaMismatchAllUnionElements, + + #[error("Incompatible schemata! Size doesn't match for fixed")] FixedMismatch, + #[error( + "Incompatible schemata! Decimal precision and/or scale don't match, reader: ({r_precision},{r_scale}), writer: ({w_precision},{w_scale})" + )] + DecimalMismatch { + r_precision: usize, + r_scale: usize, + w_precision: usize, + w_scale: usize, + }, + #[error( "Incompatible schemata! The name must be the same for both schemas. Writer's name {writer_name} and reader's name {reader_name}" )] diff --git a/avro/src/lib.rs b/avro/src/lib.rs index f75c5a3a..5c744ff6 100644 --- a/avro/src/lib.rs +++ b/avro/src/lib.rs @@ -104,7 +104,7 @@ //! //! # Minimum supported Rust version //! -//! 1.86.0 +//! 1.88.0 //! //! # Defining a schema //! diff --git a/avro/src/schema.rs b/avro/src/schema.rs index a7ea4935..528a4454 100644 --- a/avro/src/schema.rs +++ b/avro/src/schema.rs @@ -1853,10 +1853,10 @@ impl Parser { ) -> AvroResult { let fields_opt = complex.get("fields"); - if fields_opt.is_none() { - if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) { - return Ok(seen.clone()); - } + if fields_opt.is_none() + && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) + { + return Ok(seen.clone()); } let fully_qualified_name = Name::parse(complex, enclosing_namespace)?; @@ -1932,10 +1932,10 @@ impl Parser { ) -> AvroResult { let symbols_opt = complex.get("symbols"); - if symbols_opt.is_none() { - if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) { - return Ok(seen.clone()); - } + if symbols_opt.is_none() + && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) + { + return Ok(seen.clone()); } let fully_qualified_name = Name::parse(complex, enclosing_namespace)?; @@ -2075,10 +2075,10 @@ impl Parser { enclosing_namespace: &Namespace, ) -> AvroResult { let size_opt = complex.get("size"); - if size_opt.is_none() { - if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) { - return Ok(seen.clone()); - } + if size_opt.is_none() + && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) + { + return Ok(seen.clone()); } let doc = complex.get("doc").and_then(|v| match &v { @@ -2449,11 +2449,11 @@ fn pcf_map(schema: &Map, defined_names: &mut HashSet) -> } // Fully qualify the name, if it isn't already ([FULLNAMES] rule). - if k == "name" { - if let Some(ref n) = name { - fields.push(("name", format!("{}:{}", pcf_string(k), pcf_string(n)))); - continue; - } + if k == "name" + && let Some(ref n) = name + { + fields.push(("name", format!("{}:{}", pcf_string(k), pcf_string(n)))); + continue; } // Strip off quotes surrounding "size" type, if they exist ([INTEGERS] rule). diff --git a/avro/src/schema_compatibility.rs b/avro/src/schema_compatibility.rs index 2b21eeef..bb0f262b 100644 --- a/avro/src/schema_compatibility.rs +++ b/avro/src/schema_compatibility.rs @@ -16,271 +16,365 @@ // under the License. //! Logic for checking schema compatibility -use crate::schema::UuidSchema; use crate::{ error::CompatibilityError, - schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind}, + schema::{ + ArraySchema, DecimalSchema, EnumSchema, FixedSchema, InnerDecimalSchema, MapSchema, + RecordSchema, Schema, UuidSchema, + }, }; use std::{ - collections::{HashSet, hash_map::DefaultHasher}, + collections::{HashMap, hash_map::DefaultHasher}, hash::Hasher, + iter::once, + ops::BitAndAssign, ptr, }; -fn match_ref_schemas( - writers_schema: &Schema, - readers_schema: &Schema, -) -> Result<(), CompatibilityError> { - match (readers_schema, writers_schema) { - (Schema::Ref { name: r_name }, Schema::Ref { name: w_name }) => { - if r_name == w_name { - Ok(()) - } else { - Err(CompatibilityError::NameMismatch { - writer_name: w_name.fullname(None), - reader_name: r_name.fullname(None), - }) - } +pub struct SchemaCompatibility; + +/// How compatible are two schemas. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum Compatibility { + /// Full compatibility, resolving will always work. + Full, + /// Partial compatibility, resolving may error. + /// + /// This can happen if an enum doesn't have all fields, or unions don't entirely overlap. + Partial, +} + +impl BitAndAssign for Compatibility { + /// Combine two compatibilities. + /// + /// # Truth table + /// | | Full | Partial | + /// | ------- | ------- | ------- | + /// | Full | Full | Partial | + /// | Partial | Partial | Partial | + fn bitand_assign(&mut self, rhs: Self) { + match (*self, rhs) { + (Self::Full, Self::Full) => *self = Self::Full, + _ => *self = Self::Partial, } - _ => Err(CompatibilityError::WrongType { - writer_schema_type: format!("{writers_schema:#?}"), - reader_schema_type: format!("{readers_schema:#?}"), - }), } } -pub struct SchemaCompatibility; - struct Checker { - recursion: HashSet<(u64, u64)>, + recursion: HashMap<(u64, u64), Compatibility>, } impl Checker { /// Create a new checker, with recursion set to an empty set. pub(crate) fn new() -> Self { Self { - recursion: HashSet::new(), + recursion: HashMap::new(), } } - pub(crate) fn can_read( + /// Check if the reader schema can be resolved from the writer schema. + pub(crate) fn full_match_schemas( &mut self, writers_schema: &Schema, readers_schema: &Schema, - ) -> Result<(), CompatibilityError> { - self.full_match_schemas(writers_schema, readers_schema) + ) -> Result { + // Hash both reader and writer based on their pointer value. This is a fast way to see if + // we get the exact same schemas multiple times (because of recursive types) + let key = ( + Self::pointer_hash(writers_schema), + Self::pointer_hash(readers_schema), + ); + + // If we already saw this pairing, return the previous value + if let Some(c) = self.recursion.get(&key).copied() { + Ok(c) + } else { + let c = self.inner_full_match_schemas(writers_schema, readers_schema)?; + // Insert the new value + self.recursion.insert(key, c); + Ok(c) + } } - pub(crate) fn full_match_schemas( + /// Hash a schema based only on its pointer value. + fn pointer_hash(schema: &Schema) -> u64 { + let mut hasher = DefaultHasher::new(); + ptr::hash(schema, &mut hasher); + hasher.finish() + } + + /// The actual implementation of [`full_match_schemas`] but without the recursion protection. + /// + /// This function should never be called directly as it can recurse infinitely on recursive types. + #[rustfmt::skip] + fn inner_full_match_schemas( &mut self, writers_schema: &Schema, readers_schema: &Schema, - ) -> Result<(), CompatibilityError> { - if self.recursion_in_progress(writers_schema, readers_schema) { - return Ok(()); - } - - SchemaCompatibility::match_schemas(writers_schema, readers_schema)?; - - let w_type = SchemaKind::from(writers_schema); - let r_type = SchemaKind::from(readers_schema); - - if w_type != SchemaKind::Union - && (r_type.is_primitive() - || r_type == SchemaKind::Fixed - || r_type == SchemaKind::Uuid - || r_type == SchemaKind::Date - || r_type == SchemaKind::TimeMillis - || r_type == SchemaKind::TimeMicros - || r_type == SchemaKind::TimestampMillis - || r_type == SchemaKind::TimestampMicros - || r_type == SchemaKind::TimestampNanos - || r_type == SchemaKind::LocalTimestampMillis - || r_type == SchemaKind::LocalTimestampMicros - || r_type == SchemaKind::LocalTimestampNanos) + ) -> Result { + // Compare unqualified names if the schemas have them + if let Some(w_name) = writers_schema.name() + && let Some(r_name) = readers_schema.name() + && w_name.name != r_name.name { - return Ok(()); + return Err(CompatibilityError::NameMismatch { + writer_name: w_name.name.clone(), + reader_name: r_name.name.clone(), + }); } - match r_type { - SchemaKind::Ref => match_ref_schemas(writers_schema, readers_schema), - SchemaKind::Record => self.match_record_schemas(writers_schema, readers_schema), - SchemaKind::Map => { - if let Schema::Map(w_m) = writers_schema { - match readers_schema { - Schema::Map(r_m) => self.full_match_schemas(&w_m.types, &r_m.types), - _ => Err(CompatibilityError::WrongType { - writer_schema_type: format!("{writers_schema:#?}"), - reader_schema_type: format!("{readers_schema:#?}"), - }), - } + // Logical types are downgraded to their actual type + match (writers_schema, readers_schema) { + (Schema::Ref { name: w_name }, Schema::Ref { name: r_name }) => { + if r_name == w_name { + Ok(Compatibility::Full) } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("writers_schema"), - expected_type: vec![SchemaKind::Record], + Err(CompatibilityError::NameMismatch { + writer_name: w_name.fullname(None), + reader_name: r_name.fullname(None), }) } } - SchemaKind::Array => { - if let Schema::Array(w_a) = writers_schema { - match readers_schema { - Schema::Array(r_a) => self.full_match_schemas(&w_a.items, &r_a.items), - _ => Err(CompatibilityError::WrongType { - writer_schema_type: format!("{writers_schema:#?}"), - reader_schema_type: format!("{readers_schema:#?}"), - }), - } + (Schema::Union(writer), Schema::Union(reader)) => { + let mut any = false; + let mut all = true; + for writer in &writer.schemas { + // Try to find a reader variant that is fully compatible with this writer variant. + // In case that does not exist, we keep track of any partial compatibility we find. + let mut local_any = false; + all &= reader.schemas.iter().any(|reader| { + match self.full_match_schemas(writer, reader) { + Ok(Compatibility::Full) => { + local_any = true; + true + } + Ok(Compatibility::Partial) => { + local_any = true; + false + } + Err(_) => false, + } + }); + // Save any match we found + any |= local_any; + } + if all { + // All writer variants are fully compatible with reader variants + Ok(Compatibility::Full) + } else if any { + // At least one writer variant is partially or fully compatible with a reader variant + Ok(Compatibility::Partial) } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("writers_schema"), - expected_type: vec![SchemaKind::Array], - }) + Err(CompatibilityError::MissingUnionElements) } } - SchemaKind::Union => self.match_union_schemas(writers_schema, readers_schema), - SchemaKind::Enum => { - // reader's symbols must contain all writer's symbols - if let Schema::Enum(EnumSchema { - symbols: w_symbols, .. - }) = writers_schema - { - if let Schema::Enum(EnumSchema { - symbols: r_symbols, .. - }) = readers_schema - { - if w_symbols.iter().all(|e| r_symbols.contains(e)) { - return Ok(()); + (Schema::Union(writer), _) => { + // Check if all writer variants are fully compatible with the reader schema. + // We keep track of if we see any (partial) compatibility. + let mut any = false; + let mut all = true; + for writer in &writer.schemas { + match self.full_match_schemas(writer, readers_schema) { + Ok(Compatibility::Full) => any = true, + Ok(Compatibility::Partial) => { + any = true; + all = false; + } + Err(_) => { + all = false; } } } - Err(CompatibilityError::MissingSymbols) + if all { + // All writer variants are fully compatible with the reader schema + Ok(Compatibility::Full) + } else if any { + // At least one writer variant is partially compatible with the reader schema + Ok(Compatibility::Partial) + } else { + Err(CompatibilityError::SchemaMismatchAllUnionElements) + } } - _ => { - if w_type == SchemaKind::Union { - if let Schema::Union(r) = writers_schema { - if r.schemas.len() == 1 { - return self.full_match_schemas(&r.schemas[0], readers_schema); + (_, Schema::Union(reader)) => { + // Try to find a fully compatible reader variant for the writer schema. + // In case that does not exist, we keep track of any partial compatibility. + let mut partial = false; + if reader.schemas.iter().any(|reader| { + match self.full_match_schemas(writers_schema, reader) { + Ok(Compatibility::Full) => true, + Ok(Compatibility::Partial) => { + partial = true; + false } + Err(_) => false, } + }) { + // At least one reader variant is fully compatible with the writer schema + Ok(Compatibility::Full) + } else if partial { + // At least one reader variant is partially compatible with the writer schema + Ok(Compatibility::Partial) + } else { + Err(CompatibilityError::SchemaMismatchAllUnionElements) } - Err(CompatibilityError::Inconclusive(String::from( - "writers_schema", - ))) } - } - } - - fn match_record_schemas( - &mut self, - writers_schema: &Schema, - readers_schema: &Schema, - ) -> Result<(), CompatibilityError> { - let w_type = SchemaKind::from(writers_schema); - - if w_type == SchemaKind::Union { - return Err(CompatibilityError::TypeExpected { - schema_type: String::from("writers_schema"), - expected_type: vec![SchemaKind::Record], - }); - } - - if let Schema::Record(RecordSchema { - fields: w_fields, - lookup: w_lookup, - .. - }) = writers_schema - { - if let Schema::Record(RecordSchema { - fields: r_fields, .. - }) = readers_schema - { - for field in r_fields.iter() { - // get all field names in a vector (field.name + aliases) - let mut fields_names = vec![&field.name]; - if let Some(ref aliases) = field.aliases { - for alias in aliases { - fields_names.push(alias); - } - } - - // Check whether any of the possible fields names are in the writer schema. - // If the field was found, then it must have the exact same name with the writer field, - // otherwise we would have a false positive with the writers aliases - let position = fields_names.iter().find_map(|field_name| { - if let Some(pos) = w_lookup.get(*field_name) { - if &w_fields[*pos].name == *field_name { - return Some(pos); - } - } - None + (Schema::Null, Schema::Null) => Ok(Compatibility::Full), + (Schema::Boolean, Schema::Boolean) => Ok(Compatibility::Full), + // int promotes to long, float and double + ( + Schema::Int | Schema::Date | Schema::TimeMillis, + Schema::Int | Schema::Long | Schema::Float | Schema::Double | Schema::Date + | Schema::TimeMillis | Schema::TimeMicros | Schema::TimestampMillis + | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis + | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos, + ) => Ok(Compatibility::Full), + // long promotes to float and double + ( + Schema::Long | Schema::TimeMicros | Schema::TimestampMillis + | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis + | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos, + Schema::Long | Schema::Float | Schema::Double | Schema::TimeMicros + | Schema::TimestampMillis | Schema::TimestampMicros | Schema::TimestampNanos + | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros + | Schema::LocalTimestampNanos, + ) => Ok(Compatibility::Full), + // float promotes to double + (Schema::Float, Schema::Float | Schema::Double) => Ok(Compatibility::Full), + (Schema::Double, Schema::Double) => Ok(Compatibility::Full), + // bytes and strings are interchangeable + ( + Schema::Bytes | Schema::String | Schema::BigDecimal + | Schema::Uuid(UuidSchema::String | UuidSchema::Bytes) + | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Bytes, .. }), + Schema::Bytes | Schema::String | Schema::BigDecimal + | Schema::Uuid(UuidSchema::String | UuidSchema::Bytes) + | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Bytes, .. }), + ) => Ok(Compatibility::Full), + // This should also check the unqualified name but we don't store that for duration + (Schema::Duration, Schema::Duration) => Ok(Compatibility::Full), + (Schema::Fixed(FixedSchema { size: 12, .. }), Schema::Duration) => { + Ok(Compatibility::Full) + } + (Schema::Duration, Schema::Fixed(FixedSchema { size: 12, .. })) => { + Ok(Compatibility::Full) + } + ( + Schema::Decimal(DecimalSchema { precision: w_precision, scale: w_scale, .. }), + Schema::Decimal(DecimalSchema { precision: r_precision, scale: r_scale, .. }), + ) => { + // precision and scale must match + if r_precision == w_precision && r_scale == w_scale { + Ok(Compatibility::Full) + } else { + Err(CompatibilityError::DecimalMismatch { + r_precision: *r_precision, + r_scale: *r_scale, + w_precision: *w_precision, + w_scale: *w_scale + }) + } + } + (Schema::Uuid(_), Schema::Uuid(_)) => Ok(Compatibility::Full), + ( + Schema::Fixed(w_fixed) | Schema::Uuid(UuidSchema::Fixed(w_fixed)) + | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Fixed(w_fixed), .. }), + Schema::Fixed(r_fixed) | Schema::Uuid(UuidSchema::Fixed(r_fixed)) + | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Fixed(r_fixed), .. }), + ) => { + // Size must match + if r_fixed.size == w_fixed.size { + Ok(Compatibility::Full) + } else { + Err(CompatibilityError::FixedMismatch) + } + } + ( + Schema::Array(ArraySchema { items: w_items, .. }), + Schema::Array(ArraySchema { items: r_items, .. }), + ) => { + // array schemas must match + self.full_match_schemas(w_items, r_items) + } + ( + Schema::Map(MapSchema { types: w_types, .. }), + Schema::Map(MapSchema { types: r_types, .. }), + ) => { + // type schemas must match + self.full_match_schemas(w_types, r_types) + } + ( + Schema::Enum(EnumSchema { symbols: w_symbols, .. }), + Schema::Enum(EnumSchema { symbols: r_symbols, default: r_default, .. }), + ) => { + // Reader must have a default or all symbols in the writer must also be in the reader + if r_default.is_some() { + // No need to iter over all the fields if there is a default + Ok(Compatibility::Full) + } else { + let mut any = false; + let mut all = true; + w_symbols.iter().for_each(|s| { + let res = r_symbols.contains(s); + any |= res; + all &= res; }); - - match position { - Some(pos) => { - if let Err(err) = - self.full_match_schemas(&w_fields[*pos].schema, &field.schema) - { + if all { + // All symbols match + Ok(Compatibility::Full) + } else if any { + // Only some symbols match + Ok(Compatibility::Partial) + } else { + // No symbols match + Err(CompatibilityError::MissingSymbols) + } + } + } + ( + Schema::Record(RecordSchema { fields: w_fields, .. }), + Schema::Record(RecordSchema { fields: r_fields, .. }), + ) => { + let mut compatibility = Compatibility::Full; + for r_field in r_fields { + // Can't use RecordField.lookup as aliases are also inserted into there and we + // are not allowed to match on writer aliases. + // Search using field name and *after* that aliases. + if let Some(w_field) = once(&r_field.name) + .chain(r_field.aliases.as_deref().unwrap_or(&[]).iter()) + .find_map(|ra| w_fields.iter().find(|wf| &wf.name == ra)) + { + // Check that the schemas are compatible + match self.full_match_schemas(&w_field.schema, &r_field.schema) { + Ok(c) => compatibility &= c, + Err(err) => { return Err(CompatibilityError::FieldTypeMismatch( - field.name.clone(), + r_field.name.clone(), Box::new(err), )); } } - _ => { - if field.default.is_none() { - return Err(CompatibilityError::MissingDefaultValue( - field.name.clone(), - )); - } - } + } else if r_field.default.is_none() { + // No default and no matching field in the writer + return Err(CompatibilityError::MissingDefaultValue( + r_field.name.clone(), + )); } } + Ok(compatibility) } + (_, _) => Err(CompatibilityError::WrongType { + writer_schema_type: format!("{writers_schema:#?}"), + reader_schema_type: format!("{readers_schema:#?}"), + }), } - Ok(()) } - fn match_union_schemas( + pub(crate) fn can_read( &mut self, writers_schema: &Schema, readers_schema: &Schema, - ) -> Result<(), CompatibilityError> { - if let Schema::Union(u) = writers_schema { - if u.schemas - .iter() - .all(|schema| self.full_match_schemas(schema, readers_schema).is_ok()) - { - return Ok(()); - } else { - return Err(CompatibilityError::MissingUnionElements); - } - } else if let Schema::Union(u) = readers_schema { - // This check is needed because the writer_schema can be not union - // but the type can be contain in the union of the reader schema - // e.g. writer_schema is string and reader_schema is [string, int] - if u.schemas - .iter() - .any(|schema| self.full_match_schemas(writers_schema, schema).is_ok()) - { - return Ok(()); - } - } - Err(CompatibilityError::MissingUnionElements) - } - - fn recursion_in_progress(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool { - let mut hasher = DefaultHasher::new(); - ptr::hash(writers_schema, &mut hasher); - let w_hash = hasher.finish(); - - hasher = DefaultHasher::new(); - ptr::hash(readers_schema, &mut hasher); - let r_hash = hasher.finish(); - - let key = (w_hash, r_hash); - // This is a shortcut to add if not exists *and* return false. It will return true - // if it was able to insert. - !self.recursion.insert(key) + ) -> Result { + self.full_match_schemas(writers_schema, readers_schema) } } @@ -290,7 +384,7 @@ impl SchemaCompatibility { pub fn can_read( writers_schema: &Schema, readers_schema: &Schema, - ) -> Result<(), CompatibilityError> { + ) -> Result { let mut c = Checker::new(); c.can_read(writers_schema, readers_schema) } @@ -300,308 +394,21 @@ impl SchemaCompatibility { pub fn mutual_read( writers_schema: &Schema, readers_schema: &Schema, - ) -> Result<(), CompatibilityError> { - SchemaCompatibility::can_read(writers_schema, readers_schema)?; - SchemaCompatibility::can_read(readers_schema, writers_schema) - } - - /// `match_schemas` performs a basic check that a datum written with the - /// writers_schema could be read using the readers_schema. This check only includes - /// matching the types, including schema promotion, and matching the full name for - /// named types. Aliases for named types are not supported here, and the rust - /// implementation of Avro in general does not include support for aliases (I think). - pub(crate) fn match_schemas( - writers_schema: &Schema, - readers_schema: &Schema, - ) -> Result<(), CompatibilityError> { - fn check_reader_type_multi( - reader_type: SchemaKind, - allowed_reader_types: Vec, - writer_type: SchemaKind, - ) -> Result<(), CompatibilityError> { - if allowed_reader_types.contains(&reader_type) { - Ok(()) - } else { - let mut allowed_types: Vec = vec![writer_type]; - allowed_types.extend_from_slice(allowed_reader_types.as_slice()); - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: allowed_types, - }) - } - } - - fn check_reader_type( - reader_type: SchemaKind, - allowed_reader_type: SchemaKind, - writer_type: SchemaKind, - ) -> Result<(), CompatibilityError> { - if reader_type == allowed_reader_type { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: vec![writer_type, allowed_reader_type], - }) - } - } - - fn check_writer_type( - writers_schema: &Schema, - allowed_schema: &Schema, - expected_schema_types: Vec, - ) -> Result<(), CompatibilityError> { - if *allowed_schema == *writers_schema { - Ok(()) - } else { - Err(CompatibilityError::TypeExpected { - schema_type: String::from("writers_schema"), - expected_type: expected_schema_types, - }) - } - } - - let w_type = SchemaKind::from(writers_schema); - let r_type = SchemaKind::from(readers_schema); - - if w_type == SchemaKind::Union || r_type == SchemaKind::Union { - return Ok(()); - } - - if w_type == r_type { - if r_type.is_primitive() { - return Ok(()); - } - - match r_type { - SchemaKind::Record | SchemaKind::Enum => { - let msg = format!("A {readers_schema} type must always has a name"); - let writers_name = writers_schema.name().expect(&msg); - let readers_name = readers_schema.name().expect(&msg); - - if writers_name.name == readers_name.name { - return Ok(()); - } - - return Err(CompatibilityError::NameMismatch { - writer_name: writers_name.name.clone(), - reader_name: readers_name.name.clone(), - }); - } - SchemaKind::Fixed => { - if let Schema::Fixed(FixedSchema { - name: w_name, - aliases: _, - doc: _w_doc, - size: w_size, - default: _w_default, - attributes: _, - }) = writers_schema - { - if let Schema::Fixed(FixedSchema { - name: r_name, - aliases: _, - doc: _r_doc, - size: r_size, - default: _r_default, - attributes: _, - }) = readers_schema - { - return (w_name.name == r_name.name && w_size == r_size) - .then_some(()) - .ok_or(CompatibilityError::FixedMismatch); - } - } - } - SchemaKind::Map => { - if let Schema::Map(w_m) = writers_schema { - if let Schema::Map(r_m) = readers_schema { - return SchemaCompatibility::match_schemas(&w_m.types, &r_m.types); - } - } - } - SchemaKind::Array => { - if let Schema::Array(w_a) = writers_schema { - if let Schema::Array(r_a) = readers_schema { - return SchemaCompatibility::match_schemas(&w_a.items, &r_a.items); - } - } - } - SchemaKind::Uuid => match readers_schema { - Schema::Uuid(UuidSchema::Bytes) => { - return check_writer_type( - writers_schema, - readers_schema, - vec![r_type, SchemaKind::Bytes], - ); - } - Schema::Uuid(UuidSchema::String) => { - return check_writer_type( - writers_schema, - readers_schema, - vec![r_type, SchemaKind::String], - ); - } - Schema::Uuid(UuidSchema::Fixed(FixedSchema { - name: r_name, - size: r_size, - .. - })) => match writers_schema { - Schema::Uuid(UuidSchema::Fixed(FixedSchema { - name: w_name, - size: w_size, - .. - })) - | Schema::Fixed(FixedSchema { - name: w_name, - size: w_size, - .. - }) => { - return (w_name.name == r_name.name && w_size == r_size) - .then_some(()) - .ok_or(CompatibilityError::FixedMismatch); - } - _ => { - return Err(CompatibilityError::TypeExpected { - schema_type: String::from("writers_schema"), - expected_type: vec![SchemaKind::Uuid, SchemaKind::Fixed], - }); - } - }, - Schema::Null - | Schema::Boolean - | Schema::Int - | Schema::Long - | Schema::Float - | Schema::Double - | Schema::Bytes - | Schema::String - | Schema::Array(_) - | Schema::Map(_) - | Schema::Union(_) - | Schema::Record(_) - | Schema::Enum(_) - | Schema::Fixed(_) - | Schema::Decimal(_) - | Schema::BigDecimal - | Schema::Date - | Schema::TimeMillis - | Schema::TimeMicros - | Schema::TimestampMillis - | Schema::TimestampMicros - | Schema::TimestampNanos - | Schema::LocalTimestampMillis - | Schema::LocalTimestampMicros - | Schema::LocalTimestampNanos - | Schema::Duration - | Schema::Ref { .. } => { - unreachable!("SchemaKind::Uuid can only be a Schema::Uuid") - } - }, - SchemaKind::Date | SchemaKind::TimeMillis => { - return check_writer_type( - writers_schema, - readers_schema, - vec![r_type, SchemaKind::Int], - ); - } - SchemaKind::TimeMicros - | SchemaKind::TimestampNanos - | SchemaKind::TimestampMillis - | SchemaKind::TimestampMicros - | SchemaKind::LocalTimestampMillis - | SchemaKind::LocalTimestampMicros - | SchemaKind::LocalTimestampNanos => { - return check_writer_type( - writers_schema, - readers_schema, - vec![r_type, SchemaKind::Long], - ); - } - SchemaKind::Duration => { - return Ok(()); - } - SchemaKind::Ref => return match_ref_schemas(writers_schema, readers_schema), - _ => { - return Err(CompatibilityError::Inconclusive(String::from( - "readers_schema", - ))); - } - }; - } - - // Here are the checks for primitive types - match w_type { - SchemaKind::Int => check_reader_type_multi( - r_type, - vec![ - SchemaKind::Long, - SchemaKind::Float, - SchemaKind::Double, - SchemaKind::Date, - SchemaKind::TimeMillis, - ], - w_type, - ), - SchemaKind::Long => check_reader_type_multi( - r_type, - vec![ - SchemaKind::Float, - SchemaKind::Double, - SchemaKind::TimeMicros, - SchemaKind::TimestampMillis, - SchemaKind::TimestampMicros, - SchemaKind::TimestampNanos, - SchemaKind::LocalTimestampMillis, - SchemaKind::LocalTimestampMicros, - SchemaKind::LocalTimestampNanos, - ], - w_type, - ), - SchemaKind::Float => { - check_reader_type_multi(r_type, vec![SchemaKind::Float, SchemaKind::Double], w_type) - } - SchemaKind::String => { - check_reader_type_multi(r_type, vec![SchemaKind::Bytes, SchemaKind::Uuid], w_type) - } - SchemaKind::Bytes => { - check_reader_type_multi(r_type, vec![SchemaKind::String, SchemaKind::Uuid], w_type) - } - SchemaKind::Uuid => check_reader_type_multi( - r_type, - vec![SchemaKind::String, SchemaKind::Bytes, SchemaKind::Fixed], - w_type, - ), - SchemaKind::Fixed => check_reader_type_multi( - r_type, - vec![SchemaKind::Duration, SchemaKind::Uuid], - w_type, - ), - SchemaKind::Date | SchemaKind::TimeMillis => { - check_reader_type(r_type, SchemaKind::Int, w_type) - } - SchemaKind::TimeMicros - | SchemaKind::TimestampMicros - | SchemaKind::TimestampMillis - | SchemaKind::TimestampNanos - | SchemaKind::LocalTimestampMillis - | SchemaKind::LocalTimestampMicros - | SchemaKind::LocalTimestampNanos => { - check_reader_type(r_type, SchemaKind::Long, w_type) - } - _ => Err(CompatibilityError::Inconclusive(String::from( - "writers_schema", - ))), - } + ) -> Result { + let mut c = SchemaCompatibility::can_read(writers_schema, readers_schema)?; + c &= SchemaCompatibility::can_read(readers_schema, writers_schema)?; + Ok(c) } } #[cfg(test)] mod tests { + use std::collections::BTreeMap; + use super::*; - use crate::schema::{Name, UuidSchema}; use crate::{ - Codec, Reader, Writer, + Codec, Decimal, Reader, Writer, + schema::{Name, UuidSchema}, types::{Record, Value}, }; use apache_avro_test_helper::TestResult; @@ -749,9 +556,9 @@ mod tests { #[test] fn test_broken() { assert_eq!( - CompatibilityError::MissingUnionElements, - SchemaCompatibility::can_read(&int_string_union_schema(), &int_union_schema()) - .unwrap_err() + Compatibility::Partial, + SchemaCompatibility::can_read(&int_string_union_schema(), &int_union_schema()).unwrap(), + "Only compatible if writer writes an int" ) } @@ -847,12 +654,12 @@ mod tests { r#"{"type": "int"}"#, r#"{"type": "int", "logicalType": "time-millis"}"# )] - // time-millis type + // time-micros type #[case( r#"{"type": "long"}"#, r#"{"type": "long", "logicalType": "time-micros"}"# )] - // timetimestamp-nanos type + // timestamp-nanos type #[case( r#"{"type": "long"}"#, r#"{"type": "long", "logicalType": "timestamp-nanos"}"# @@ -894,7 +701,7 @@ mod tests { let writer_schema = Schema::parse_str(writer_schema_str).unwrap(); let reader_schema = Schema::parse_str(reader_schema_str).unwrap(); - assert!(SchemaCompatibility::match_schemas(&writer_schema, &reader_schema).is_ok()); + assert!(SchemaCompatibility::can_read(&writer_schema, &reader_schema).is_ok()); } #[rstest] @@ -920,152 +727,73 @@ mod tests { #[case( r#"{"type":"map", "values": "long"}"#, r#"{"type":"map", "values": "int"}"#, - CompatibilityError::TypeExpected {schema_type: String::from("readers_schema"), expected_type: vec![ - SchemaKind::Long, - SchemaKind::Float, - SchemaKind::Double, - SchemaKind::TimeMicros, - SchemaKind::TimestampMillis, - SchemaKind::TimestampMicros, - SchemaKind::TimestampNanos, - SchemaKind::LocalTimestampMillis, - SchemaKind::LocalTimestampMicros, - SchemaKind::LocalTimestampNanos, - ]} + CompatibilityError::WrongType { writer_schema_type: "Long".to_string(), reader_schema_type: "Int".to_string() } )] // Array type test #[case( r#"{"type": "array", "items": "long"}"#, r#"{"type": "array", "items": "int"}"#, - CompatibilityError::TypeExpected {schema_type: String::from("readers_schema"), expected_type: vec![ - SchemaKind::Long, - SchemaKind::Float, - SchemaKind::Double, - SchemaKind::TimeMicros, - SchemaKind::TimestampMillis, - SchemaKind::TimestampMicros, - SchemaKind::TimestampNanos, - SchemaKind::LocalTimestampMillis, - SchemaKind::LocalTimestampMicros, - SchemaKind::LocalTimestampNanos, - ]} + CompatibilityError::WrongType { writer_schema_type: "Long".to_string(), reader_schema_type: "Int".to_string() } )] // Date type test #[case( r#"{"type": "string"}"#, r#"{"type": "int", "logicalType": "date"}"#, - CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![ - SchemaKind::String, - SchemaKind::Bytes, - SchemaKind::Uuid, - ]} + CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "Date".to_string() } )] // time-millis type #[case( r#"{"type": "string"}"#, r#"{"type": "int", "logicalType": "time-millis"}"#, - CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![ - SchemaKind::String, - SchemaKind::Bytes, - SchemaKind::Uuid, - ]} + CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimeMillis".to_string() } )] // time-millis type #[case( - r#"{"type": "int"}"#, + r#"{"type": "string"}"#, r#"{"type": "long", "logicalType": "time-micros"}"#, - CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![ - SchemaKind::Int, - SchemaKind::Long, - SchemaKind::Float, - SchemaKind::Double, - SchemaKind::Date, - SchemaKind::TimeMillis - ]} + CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimeMicros".to_string() } + )] + // timestamp-nanos type + #[case( + r#"{"type": "string"}"#, + r#"{"type": "long", "logicalType": "timestamp-nanos"}"#, + CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimestampNanos".to_string() } )] - // timestamp-nanos type. This test should fail because it is not supported on schema parse_complex - // #[case( - // r#"{"type": "string"}"#, - // r#"{"type": "long", "logicalType": "timestamp-nanos"}"#, - // CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![ - // SchemaKind::Int, - // SchemaKind::Long, - // SchemaKind::Float, - // SchemaKind::Double, - // SchemaKind::Date, - // SchemaKind::TimeMillis - // ]} - // )] // timestamp-millis type #[case( - r#"{"type": "int"}"#, + r#"{"type": "string"}"#, r#"{"type": "long", "logicalType": "timestamp-millis"}"#, - CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![ - SchemaKind::Int, - SchemaKind::Long, - SchemaKind::Float, - SchemaKind::Double, - SchemaKind::Date, - SchemaKind::TimeMillis - ]} + CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimestampMillis".to_string() } )] // timestamp-micros type #[case( - r#"{"type": "int"}"#, + r#"{"type": "string"}"#, r#"{"type": "long", "logicalType": "timestamp-micros"}"#, - CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![ - SchemaKind::Int, - SchemaKind::Long, - SchemaKind::Float, - SchemaKind::Double, - SchemaKind::Date, - SchemaKind::TimeMillis - ]} + CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimestampMicros".to_string() } )] // local-timestamp-millis type #[case( - r#"{"type": "int"}"#, + r#"{"type": "string"}"#, r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#, - CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![ - SchemaKind::Int, - SchemaKind::Long, - SchemaKind::Float, - SchemaKind::Double, - SchemaKind::Date, - SchemaKind::TimeMillis - ]} + CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "LocalTimestampMillis".to_string() } )] // local-timestamp-micros type #[case( - r#"{"type": "int"}"#, + r#"{"type": "string"}"#, r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#, - CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![ - SchemaKind::Int, - SchemaKind::Long, - SchemaKind::Float, - SchemaKind::Double, - SchemaKind::Date, - SchemaKind::TimeMillis - ]} + CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "LocalTimestampMicros".to_string() } )] - // local-timestamp-nanos type. This test should fail because it is not supported on schema parse_complex - // #[case( - // r#"{"type": "int"}"#, - // r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#, - // CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![ - // SchemaKind::Int, - // SchemaKind::Long, - // SchemaKind::Float, - // SchemaKind::Double, - // SchemaKind::Date, - // SchemaKind::TimeMillis - // ]} - // )] - // When comparing different types we always get Inconclusive + // local-timestamp-nanos type + #[case( + r#"{"type": "string"}"#, + r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#, + CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "LocalTimestampNanos".to_string() } + )] + // Names are checked first, so this should not be a WrongType #[case( r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#, r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#, - CompatibilityError::Inconclusive(String::from("writers_schema")) + CompatibilityError::NameMismatch { writer_name: "record_b".to_string(), reader_name: "EmployeeId".to_string() } )] fn test_avro_3950_match_schemas_error( #[case] writer_schema_str: &str, @@ -1077,17 +805,14 @@ mod tests { assert_eq!( expected_error, - SchemaCompatibility::match_schemas(&writer_schema, &reader_schema).unwrap_err() + SchemaCompatibility::can_read(&writer_schema, &reader_schema).unwrap_err() ) } #[test] fn test_compatible_reader_writer_pairs() { let uuid_fixed = FixedSchema { - name: Name { - name: String::new(), - namespace: None, - }, + name: Name::new("uuid_fixed").unwrap(), aliases: None, doc: None, size: 16, @@ -1295,11 +1020,14 @@ mod tests { let valid_reader = string_array_schema(); let invalid_reader = string_map_schema(); - assert!(SchemaCompatibility::can_read(&string_array_schema(), &valid_reader).is_ok()); assert_eq!( - CompatibilityError::Inconclusive(String::from("writers_schema")), - SchemaCompatibility::can_read(&string_array_schema(), &invalid_reader).unwrap_err() + Compatibility::Full, + SchemaCompatibility::can_read(&string_array_schema(), &valid_reader).unwrap() ); + assert!(matches!( + SchemaCompatibility::can_read(&string_array_schema(), &invalid_reader), + Err(CompatibilityError::WrongType { .. }), + )); } #[test] @@ -1307,16 +1035,9 @@ mod tests { let valid_reader = Schema::String; assert!(SchemaCompatibility::can_read(&Schema::String, &valid_reader).is_ok()); assert_eq!( - CompatibilityError::TypeExpected { - schema_type: String::from("readers_schema"), - expected_type: vec![ - SchemaKind::Int, - SchemaKind::Long, - SchemaKind::Float, - SchemaKind::Double, - SchemaKind::Date, - SchemaKind::TimeMillis - ], + CompatibilityError::WrongType { + writer_schema_type: "Int".to_string(), + reader_schema_type: "String".to_string() }, SchemaCompatibility::can_read(&Schema::Int, &Schema::String).unwrap_err() ); @@ -1329,10 +1050,13 @@ mod tests { let union_reader = union_schema(vec![Schema::String]); assert_eq!( - CompatibilityError::MissingUnionElements, - SchemaCompatibility::can_read(&union_writer, &union_reader).unwrap_err() + Compatibility::Partial, + SchemaCompatibility::can_read(&union_writer, &union_reader).unwrap() + ); + assert_eq!( + Compatibility::Full, + SchemaCompatibility::can_read(&union_reader, &union_writer).unwrap() ); - assert!(SchemaCompatibility::can_read(&union_reader, &union_writer).is_ok()); } #[test] @@ -1356,9 +1080,9 @@ mod tests { assert_eq!( CompatibilityError::FieldTypeMismatch( "field1".to_owned(), - Box::new(CompatibilityError::TypeExpected { - schema_type: "readers_schema".to_owned(), - expected_type: vec![SchemaKind::String, SchemaKind::Bytes, SchemaKind::Uuid] + Box::new(CompatibilityError::WrongType { + writer_schema_type: "String".to_string(), + reader_schema_type: "Int".to_string() }) ), SchemaCompatibility::can_read(&string_schema, &int_schema).unwrap_err() @@ -1377,10 +1101,13 @@ mod tests { let enum_schema2 = Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)?; assert_eq!( - CompatibilityError::MissingSymbols, - SchemaCompatibility::can_read(&enum_schema2, &enum_schema1).unwrap_err() + Compatibility::Partial, + SchemaCompatibility::can_read(&enum_schema2, &enum_schema1)? + ); + assert_eq!( + Compatibility::Full, + SchemaCompatibility::can_read(&enum_schema1, &enum_schema2)? ); - assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2).is_ok()); Ok(()) } @@ -1453,7 +1180,7 @@ mod tests { // short name match, but no structure match let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]); assert_eq!( - CompatibilityError::MissingUnionElements, + CompatibilityError::SchemaMismatchAllUnionElements, SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err() ); } @@ -1468,7 +1195,7 @@ mod tests { point_3d_schema(), ]); assert_eq!( - CompatibilityError::MissingUnionElements, + CompatibilityError::SchemaMismatchAllUnionElements, SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err() ); } @@ -1483,7 +1210,7 @@ mod tests { point_2d_schema(), ]); assert_eq!( - CompatibilityError::MissingUnionElements, + CompatibilityError::SchemaMismatchAllUnionElements, SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err() ); } @@ -1498,7 +1225,7 @@ mod tests { point_3d_schema(), ]); assert_eq!( - CompatibilityError::MissingUnionElements, + CompatibilityError::SchemaMismatchAllUnionElements, SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err() ); } @@ -1706,7 +1433,10 @@ mod tests { }"#, )?; - assert!(SchemaCompatibility::can_read(&schema_v2, &schema_v1).is_ok()); + assert_eq!( + Compatibility::Full, + SchemaCompatibility::can_read(&schema_v2, &schema_v1)? + ); assert_eq!( CompatibilityError::MissingDefaultValue(String::from("time")), SchemaCompatibility::can_read(&schema_v1, &schema_v2).unwrap_err() @@ -1815,7 +1545,6 @@ mod tests { ] }"#, )?, - "Incompatible schemata! Field 'success' in reader schema does not match the type in the writer schema", ), ( Schema::parse_str( @@ -1836,17 +1565,17 @@ mod tests { ] }"#, )?, - "Incompatible schemata! Field 'max_values' in reader schema does not match the type in the writer schema", ), ]; - for (schema_1, schema_2, error) in schemas { - assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok()); + for (schema_1, schema_2) in schemas { + assert_eq!( + Compatibility::Full, + SchemaCompatibility::can_read(&schema_1, &schema_2).unwrap() + ); assert_eq!( - error, - SchemaCompatibility::can_read(&schema_2, &schema_1) - .unwrap_err() - .to_string() + Compatibility::Partial, + SchemaCompatibility::can_read(&schema_2, &schema_1).unwrap() ); } @@ -1887,4 +1616,65 @@ mod tests { Ok(()) } + + #[test] + fn duration_and_fixed_of_different_size() -> TestResult { + let schema_strs = vec![ + r#"{ + "type": "fixed", + "name": "Fixed25", + "size": 25 + } + "#, + r#"{ + "type": "fixed", + "logicalType": "duration", + "name": "Duration", + "size": 12 + } + "#, + ]; + + let schemas = Schema::parse_list(schema_strs).unwrap(); + assert!(SchemaCompatibility::can_read(&schemas[0], &schemas[1]).is_err()); + assert!(SchemaCompatibility::can_read(&schemas[1], &schemas[0]).is_err()); + SchemaCompatibility::can_read(&schemas[1], &schemas[1])?; + SchemaCompatibility::can_read(&schemas[0], &schemas[0])?; + + Ok(()) + } + + #[test] + fn avro_rs_342_decimal_fixed_and_bytes() -> TestResult { + let bytes = Schema::Decimal(DecimalSchema { + precision: 20, + scale: 0, + inner: InnerDecimalSchema::Bytes, + }); + let fixed = Schema::Decimal(DecimalSchema { + precision: 20, + scale: 0, + inner: InnerDecimalSchema::Fixed(FixedSchema { + name: Name::new("DecimalFixed")?, + aliases: None, + doc: None, + size: 20, + default: None, + attributes: BTreeMap::default(), + }), + }); + + assert_eq!( + Compatibility::Full, + SchemaCompatibility::mutual_read(&bytes, &fixed)? + ); + + let value = Value::Decimal(Decimal::from(vec![1; 10])); + let fixed_value = value.clone().resolve(&fixed)?; + let bytes_value = value.resolve(&bytes)?; + + assert_eq!(fixed_value, bytes_value); + + Ok(()) + } } diff --git a/avro/src/types.rs b/avro/src/types.rs index 8cca5b9d..764f9b50 100644 --- a/avro/src/types.rs +++ b/avro/src/types.rs @@ -448,9 +448,28 @@ impl Value { (&Value::Double(_), &Schema::Double) => None, (&Value::Bytes(_), &Schema::Bytes) => None, (&Value::Bytes(_), &Schema::Decimal { .. }) => None, - (&Value::Bytes(_), &Schema::Uuid(UuidSchema::Bytes)) => None, + (Value::Bytes(bytes), &Schema::Uuid(UuidSchema::Bytes)) => { + if bytes.len() != 16 { + Some(format!( + "The value's size ({}) is not the right length for a fixed UUID (16)", + bytes.len() + )) + } else { + None + } + } (&Value::String(_), &Schema::String) => None, - (&Value::String(_), &Schema::Uuid(UuidSchema::String)) => None, + (Value::String(string), &Schema::Uuid(UuidSchema::String)) => { + // Non-hyphenated is 32 characters, hyphenated is longer + if string.len() < 32 { + Some(format!( + "The value's size ({}) is not the right length for a string UUID (>=32)", + string.len() + )) + } else { + None + } + } (&Value::Fixed(n, _), &Schema::Fixed(FixedSchema { size, .. })) => { if n != size { Some(format!( @@ -1179,10 +1198,11 @@ impl Value { fn try_u8(self) -> AvroResult { let int = self.resolve(&Schema::Int)?; - if let Value::Int(n) = int { - if n >= 0 && n <= i32::from(u8::MAX) { - return Ok(n as u8); - } + if let Value::Int(n) = int + && n >= 0 + && n <= i32::from(u8::MAX) + { + return Ok(n as u8); } Err(Details::GetU8(int).into()) diff --git a/avro/tests/shared.rs b/avro/tests/shared.rs index f23c837a..3862780b 100644 --- a/avro/tests/shared.rs +++ b/avro/tests/shared.rs @@ -43,17 +43,16 @@ fn test_schema() -> TestResult { Err(e) => core::panic!("Can't get file {}", e), }; log::debug!("{:?}", entry.file_name()); - if let Ok(ft) = entry.file_type() { - if ft.is_dir() { - let sub_folder = - ROOT_DIRECTORY.to_owned() + "/" + entry.file_name().to_str().unwrap(); - - let dir_result = test_folder(sub_folder.as_str()); - if let Err(ed) = dir_result { - result = match result { - Ok(()) => Err(ed), - Err(e) => Err(e.merge(&ed)), - } + if let Ok(ft) = entry.file_type() + && ft.is_dir() + { + let sub_folder = ROOT_DIRECTORY.to_owned() + "/" + entry.file_name().to_str().unwrap(); + + let dir_result = test_folder(sub_folder.as_str()); + if let Err(ed) = dir_result { + result = match result { + Ok(()) => Err(ed), + Err(e) => Err(e.merge(&ed)), } } }