Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1439,13 +1439,13 @@ REPOSITORY_LOCATIONS_SPEC = dict(
project_name = "Kafka (source)",
project_desc = "Open-source distributed event streaming platform",
project_url = "https://kafka.apache.org",
version = "3.8.0",
sha256 = "8761a0c22738201d3049f11f78c8e6c0f201203ba799157e498ef7eb04c259f3",
version = "3.9.1",
sha256 = "c15b82940cfb9f67fce909d8600dc8bcfc42d2795da2c26c149d03a627f85234",
strip_prefix = "kafka-{version}/clients/src/main/resources/common/message",
urls = ["https://github.com/apache/kafka/archive/{version}.zip"],
use_category = ["dataplane_ext"],
extensions = ["envoy.filters.network.kafka_broker", "envoy.filters.network.kafka_mesh"],
release_date = "2024-07-23",
release_date = "2025-05-12",
cpe = "cpe:2.3:a:apache:kafka:*",
license = "Apache-2.0",
license_url = "https://github.com/apache/kafka/blob/{version}/LICENSE",
Expand All @@ -1469,11 +1469,11 @@ REPOSITORY_LOCATIONS_SPEC = dict(
project_name = "Kafka (server binary)",
project_desc = "Open-source distributed event streaming platform",
project_url = "https://kafka.apache.org",
version = "3.8.0",
sha256 = "e0297cc6fdb09ef9d9905751b25d2b629c17528f8629b60561eeff87ce29099c",
version = "3.9.1",
sha256 = "dd4399816e678946cab76e3bd1686103555e69bc8f2ab8686cda71aa15bc31a3",
strip_prefix = "kafka_2.13-{version}",
urls = ["https://downloads.apache.org/kafka/{version}/kafka_2.13-{version}.tgz"],
release_date = "2024-07-23",
release_date = "2025-05-19",
use_category = ["test_only"],
),
proxy_wasm_cpp_sdk = dict(
Expand Down
6 changes: 3 additions & 3 deletions contrib/kafka/filters/network/source/protocol/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,9 @@ def parse_complex_type(self, type_name, field_spec, versions):
fields.append(child)

# Some structures share the same name, use request/response as prefix.
if cpp_name in ['Cursor', 'DirectoryData', 'EntityData', 'EntryData', 'PartitionData',
'PartitionSnapshot', 'SnapshotId', 'TopicData', 'TopicPartitions',
'TopicSnapshot']:
if cpp_name in ['Cursor', 'DirectoryData', 'EntityData', 'EntryData', 'Listener',
'PartitionData', 'PartitionSnapshot', 'SnapshotId', 'StateBatch',
'TopicData', 'TopicPartitions', 'TopicSnapshot']:
cpp_name = self.type.capitalize() + type_name

# Some of the types repeat multiple times (e.g. AlterableConfig).
Expand Down
82 changes: 82 additions & 0 deletions contrib/kafka/filters/network/source/serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,49 @@ template <> inline uint32_t EncodingContext::computeSize(const Uuid&) const {
return 2 * sizeof(uint64_t);
}

// Specializations for primitive types that don't have compact encoding
// These must be declared before the generic template

/**
* Template overload for int8_t.
* This data type is not compacted, so we just point to non-compact implementation.
*/
template <> inline uint32_t EncodingContext::computeCompactSize(const int8_t& arg) const {
return computeSize(arg);
}

/**
* Template overload for int16_t.
* This data type is not compacted, so we just point to non-compact implementation.
*/
template <> inline uint32_t EncodingContext::computeCompactSize(const int16_t& arg) const {
return computeSize(arg);
}

/**
* Template overload for uint16_t.
* This data type is not compacted, so we just point to non-compact implementation.
*/
template <> inline uint32_t EncodingContext::computeCompactSize(const uint16_t& arg) const {
return computeSize(arg);
}

/**
* Template overload for bool.
* This data type is not compacted, so we just point to non-compact implementation.
*/
template <> inline uint32_t EncodingContext::computeCompactSize(const bool& arg) const {
return computeSize(arg);
}

/**
* Template overload for double.
* This data type is not compacted, so we just point to non-compact implementation.
*/
template <> inline uint32_t EncodingContext::computeCompactSize(const double& arg) const {
return computeSize(arg);
}

/**
* For non-primitive types, call `computeCompactSize` on them, to delegate the work to the entity
* itself. The entity may use the information in context to decide which fields are included etc.
Expand Down Expand Up @@ -1534,6 +1577,45 @@ inline uint32_t EncodingContext::encodeCompact(const int64_t& arg, Buffer::Insta
return encode(arg, dst);
}

/**
* int8_t is not encoded in compact fashion, so we just delegate to normal implementation.
*/
template <>
inline uint32_t EncodingContext::encodeCompact(const int8_t& arg, Buffer::Instance& dst) {
return encode(arg, dst);
}

/**
* int16_t is not encoded in compact fashion, so we just delegate to normal implementation.
*/
template <>
inline uint32_t EncodingContext::encodeCompact(const int16_t& arg, Buffer::Instance& dst) {
return encode(arg, dst);
}

/**
* uint16_t is not encoded in compact fashion, so we just delegate to normal implementation.
*/
template <>
inline uint32_t EncodingContext::encodeCompact(const uint16_t& arg, Buffer::Instance& dst) {
return encode(arg, dst);
}

/**
* bool is not encoded in compact fashion, so we just delegate to normal implementation.
*/
template <> inline uint32_t EncodingContext::encodeCompact(const bool& arg, Buffer::Instance& dst) {
return encode(arg, dst);
}

/**
* double is not encoded in compact fashion, so we just delegate to normal implementation.
*/
template <>
inline uint32_t EncodingContext::encodeCompact(const double& arg, Buffer::Instance& dst) {
return encode(arg, dst);
}

/**
* Template overload for variable-length uint32_t (VAR_UINT).
* Encode the value in 7-bit chunks + marker if field is the last one.
Expand Down