diff --git a/backend/BSR_INTEGRATION.md b/backend/BSR_INTEGRATION.md new file mode 100644 index 0000000000..2aaa52041a --- /dev/null +++ b/backend/BSR_INTEGRATION.md @@ -0,0 +1,190 @@ +# Buf Schema Registry (BSR) Integration + +This document describes the integration of Buf Schema Registry (BSR) support in Redpanda Console. + +## Overview + +Redpanda Console now supports deserializing Protobuf messages that use Buf Schema Registry (BSR) for schema management. Unlike Confluent Schema Registry, clients of BSR (including Bufstream) stores message type and commit information in Kafka record headers rather than embedding schema IDs in the message payload. + +## Wire Format + +BSR Kafka clients use a different wire format compared to Confluent Schema Registry: + +- **Confluent Schema Registry**: `[magic_byte][schema_id][index_array][protobuf_payload]` +- **BSR Kafka clients**: Plain protobuf payload with metadata in headers + +### Record Headers + +BSR Kafka clieints stores schema information in Kafka record headers: + +- **`buf.registry.value.schema.message`**: Fully qualified message name (e.g., `com.example.EmailUpdated`) +- **`buf.registry.value.schema.commit`**: BSR commit ID for the schema version + +## Configuration + +To enable BSR support, add the following configuration to your `config.yaml`: + +```yaml +bsr: + enabled: true + url: "https://buf.build" # or your BSR instance URL + token: "your-bsr-auth-token" +``` + +### Configuration Options + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `enabled` | boolean | Yes | Enable BSR integration | +| `url` | string | Yes | BSR API endpoint URL (e.g., `https://buf.build` or `https://demo.buf.dev`) | +| `token` | string | Yes | Authentication token for BSR API | + +### Environment Variables + +You can also configure BSR using environment variables: + +```bash +export CONFIG_BSR_ENABLED=true +export CONFIG_BSR_URL=https://buf.build +export CONFIG_BSR_TOKEN=your-bsr-auth-token +``` + +## How It Works + +### Deserialization Flow + +1. **Header Extraction**: When a message is consumed, the BSR serde checks for BSR headers in the Kafka record +2. **Descriptor Lookup**: Using the message name and commit from headers, the BSR client fetches the protobuf descriptor from BSR +3. **Binary Decoding**: The raw protobuf payload is unmarshaled using the fetched descriptor +4. **JSON Conversion**: The protobuf message is converted to JSON for display in the console + +### Caching + +The BSR client implements intelligent caching to minimize API calls: + +- **Positive Cache**: Successfully fetched descriptors are cached for 1 hour +- **Negative Cache**: Failed lookups are cached for 1 minute to avoid repeated failures +- Cache keys are based on `(message_name, commit)` pairs + +### API Integration + +The integration uses the Buf Connect API: + +- **Endpoint**: `/buf.registry.module.v1.FileDescriptorSetService/GetFileDescriptorSet` +- **Protocol**: gRPC via Connect +- **Authentication**: Bearer token in `Authorization` header + +## Example Usage + +### Producing Messages with BSR Headers + +When producing messages, ensure you include the BSR headers: + +```go +import ( + "github.com/twmb/franz-go/pkg/kgo" +) + +record := &kgo.Record{ + Topic: "my-topic", + Value: protoBytes, // raw protobuf payload + Headers: []kgo.RecordHeader{ + { + Key: "buf.registry.value.schema.message", + Value: []byte("com.example.v1.EmailUpdated"), + }, + { + Key: "buf.registry.value.schema.commit", + Value: []byte("abc123def456"), // BSR commit ID + }, + }, +} +``` + +### Viewing Messages in Console + +Once configured, messages with BSR headers will automatically be deserialized and displayed as JSON in the Redpanda Console message viewer. + +## Implementation Details + +### Key Components + +1. **`pkg/config/bsr.go`**: BSR configuration structure +2. **`pkg/bsr/client.go`**: BSR API client with caching +3. **`pkg/serde/protobuf_bsr.go`**: BSR deserialization logic +4. **`pkg/serde/service.go`**: Integration with serde service +5. **`pkg/console/service.go`**: Console service initialization + +### Serde Priority + +BSR serde is registered in the deserialization chain after: +- Null +- JSON +- JSON Schema +- XML +- Avro +- Plain Protobuf (with topic mappings) +- Protobuf Schema Registry + +This ensures BSR is only attempted when other formats don't match. + +## Comparison with Confluent Schema Registry + +| Feature | Confluent Schema Registry | BSR | +|---------|---------------------------|-----| +| Schema ID Location | Embedded in payload | Headers | +| Wire Format | Custom (magic byte + schema ID) | Plain protobuf | +| Message Type Info | Index array in payload | Fully qualified name in header | +| Version Info | Schema ID (integer) | Commit ID (string) | +| Compatibility | Kafka ecosystem standard | Buf ecosystem | + +## Troubleshooting + +### Messages Not Deserializing + +1. **Check Headers**: Ensure messages have both required BSR headers + ``` + buf.registry.value.schema.message + buf.registry.value.schema.commit + ``` + +2. **Verify Configuration**: Confirm BSR is enabled and credentials are correct + ```bash + # Test BSR API access + curl -H "Authorization: Bearer $TOKEN" \ + "https://buf.build/buf.registry.module.v1.FileDescriptorSetService/GetFileDescriptorSet" + ``` + +3. **Check Logs**: Look for BSR-related errors in console logs + ``` + level=error msg="failed to get message descriptor from BSR" + ``` + +### Common Errors + +- **"BSR client is not configured"**: BSR is not enabled in config +- **"header not found"**: Message is missing required BSR headers +- **"failed to call BSR API"**: Network or authentication issue +- **"failed to find message descriptor"**: Message type not found in commit + +## Limitations + +- Only supports value deserialization (not keys) - BSR typically only encodes values +- Requires network access to BSR API +- Serialization support is basic (no automatic header injection) + +## Future Enhancements + +Potential improvements for future releases: + +1. Support for BSR key encoding +2. Automatic header injection during message production +3. Support for BSR modules (not just commit IDs) +4. Configurable cache TTLs +5. Metrics for BSR API calls and cache hit rates + +## References + +- [Buf Schema Registry Documentation](https://buf.build/docs/bsr) +- [bsr-kafka-serde-go](https://github.com/bufbuild/bsr-kafka-serde-go) +- [Buf Connect Protocol](https://connectrpc.com/) diff --git a/backend/go.mod b/backend/go.mod index 3acf3a6441..b391dc5e03 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -74,6 +74,8 @@ require ( ) require ( + buf.build/gen/go/bufbuild/registry/connectrpc/go v1.19.1-20251027152159-f1066ce064ca.2 // indirect + buf.build/gen/go/bufbuild/registry/protocolbuffers/go v1.36.10-20251027152159-f1066ce064ca.1 // indirect cel.dev/expr v0.24.0 // indirect cuelang.org/go v0.14.1 // indirect dario.cat/mergo v1.0.2 // indirect diff --git a/backend/go.sum b/backend/go.sum index 01d486430b..46486a7e15 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -2,6 +2,10 @@ buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-2025091214101 buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.9-20250912141014-52f32327d4b0.1/go.mod h1:aY3zbkNan5F+cGm9lITDP6oxJIwu0dn9KjJuJjWaHkg= buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.10-20250912141014-52f32327d4b0.1 h1:31on4W/yPcV4nZHL4+UCiCvLPsMqe/vJcNg8Rci0scc= buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.10-20250912141014-52f32327d4b0.1/go.mod h1:fUl8CEN/6ZAMk6bP8ahBJPUJw7rbp+j4x+wCcYi2IG4= +buf.build/gen/go/bufbuild/registry/connectrpc/go v1.19.1-20251027152159-f1066ce064ca.2 h1:Dbh4Edwy5qHlz1/boPAQ7T5Q7ZDMgEuQlEbXa94+JEo= +buf.build/gen/go/bufbuild/registry/connectrpc/go v1.19.1-20251027152159-f1066ce064ca.2/go.mod h1:SqqTA3aiYVDkpDINxgbxDT6QBjkVjdqUXtbiz6DiWIg= +buf.build/gen/go/bufbuild/registry/protocolbuffers/go v1.36.10-20251027152159-f1066ce064ca.1 h1:5tUFlRgcC+N2JJtjwlwyb2J4bBk/bJYLXk50zlewtzk= +buf.build/gen/go/bufbuild/registry/protocolbuffers/go v1.36.10-20251027152159-f1066ce064ca.1/go.mod h1:AaYXXeRvnOc151wEuupAmn58Mh9bccKce2kk3QKMIrQ= buf.build/gen/go/connectrpc/eliza/connectrpc/go v1.15.0-20230913231627-233fca715f49.1 h1:F1ZMhxV6+MRxWtc1aT+vH9+yZLB4KrhlhQmtuDxqTYQ= buf.build/gen/go/connectrpc/eliza/connectrpc/go v1.15.0-20230913231627-233fca715f49.1/go.mod h1:OZPBPnAuuFcUf5WHYm5pIXkUhIy7Pp6dzV4W2Zbc2/c= buf.build/gen/go/connectrpc/eliza/protocolbuffers/go v1.33.0-20230913231627-233fca715f49.1 h1:bHffCjg+jKMaDnUeYjBJXHAlH659fX4N1YExnWl5wFU= diff --git a/backend/pkg/api/connect/service/console/mapper.go b/backend/pkg/api/connect/service/console/mapper.go index 35d9995872..218e131c13 100644 --- a/backend/pkg/api/connect/service/console/mapper.go +++ b/backend/pkg/api/connect/service/console/mapper.go @@ -50,6 +50,8 @@ func rpcPublishMessagePayloadOptionsToSerializeInput(po *v1alpha.PublishMessageP encoding = serde.PayloadEncodingBinary case v1alpha.PayloadEncoding_PAYLOAD_ENCODING_CBOR: encoding = serde.PayloadEncodingCbor + case v1alpha.PayloadEncoding_PAYLOAD_ENCODING_PROTOBUF_BSR: + encoding = serde.PayloadEncodingProtobufBSR } input := &serde.RecordPayloadInput{ @@ -121,6 +123,8 @@ func toProtoEncoding(serdeEncoding serde.PayloadEncoding) v1alpha.PayloadEncodin encoding = v1alpha.PayloadEncoding_PAYLOAD_ENCODING_UNSPECIFIED case serde.PayloadEncodingCbor: encoding = v1alpha.PayloadEncoding_PAYLOAD_ENCODING_CBOR + case serde.PayloadEncodingProtobufBSR: + encoding = v1alpha.PayloadEncoding_PAYLOAD_ENCODING_PROTOBUF_BSR } return encoding @@ -162,6 +166,8 @@ func fromProtoEncoding(protoEncoding v1alpha.PayloadEncoding) serde.PayloadEncod encoding = serde.PayloadEncodingUnspecified case v1alpha.PayloadEncoding_PAYLOAD_ENCODING_CBOR: encoding = serde.PayloadEncodingCbor + case v1alpha.PayloadEncoding_PAYLOAD_ENCODING_PROTOBUF_BSR: + encoding = serde.PayloadEncodingProtobufBSR } return encoding diff --git a/backend/pkg/api/connect/service/console/mapper_test.go b/backend/pkg/api/connect/service/console/mapper_test.go new file mode 100644 index 0000000000..15b160403a --- /dev/null +++ b/backend/pkg/api/connect/service/console/mapper_test.go @@ -0,0 +1,175 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package console + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + v1alpha "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/console/v1alpha1" + "github.com/redpanda-data/console/backend/pkg/serde" +) + +func TestToProtoEncoding(t *testing.T) { + tests := []struct { + name string + serdeEncoding serde.PayloadEncoding + expectedProto v1alpha.PayloadEncoding + }{ + { + name: "ProtobufBSR encoding", + serdeEncoding: serde.PayloadEncodingProtobufBSR, + expectedProto: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_PROTOBUF_BSR, + }, + { + name: "Protobuf encoding", + serdeEncoding: serde.PayloadEncodingProtobuf, + expectedProto: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_PROTOBUF, + }, + { + name: "ProtobufSchema encoding", + serdeEncoding: serde.PayloadEncodingProtobufSchema, + expectedProto: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_PROTOBUF_SCHEMA, + }, + { + name: "JSON encoding", + serdeEncoding: serde.PayloadEncodingJSON, + expectedProto: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_JSON, + }, + { + name: "JSONSchema encoding", + serdeEncoding: serde.PayloadEncodingJSONSchema, + expectedProto: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_JSON_SCHEMA, + }, + { + name: "Avro encoding", + serdeEncoding: serde.PayloadEncodingAvro, + expectedProto: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_AVRO, + }, + { + name: "Binary encoding", + serdeEncoding: serde.PayloadEncodingBinary, + expectedProto: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_BINARY, + }, + { + name: "Cbor encoding", + serdeEncoding: serde.PayloadEncodingCbor, + expectedProto: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_CBOR, + }, + { + name: "Null encoding", + serdeEncoding: serde.PayloadEncodingNull, + expectedProto: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_NULL, + }, + { + name: "Unspecified encoding", + serdeEncoding: serde.PayloadEncodingUnspecified, + expectedProto: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_UNSPECIFIED, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := toProtoEncoding(tt.serdeEncoding) + assert.Equal(t, tt.expectedProto, result) + }) + } +} + +func TestFromProtoEncoding(t *testing.T) { + tests := []struct { + name string + protoEncoding v1alpha.PayloadEncoding + expectedSerde serde.PayloadEncoding + }{ + { + name: "ProtobufBSR encoding", + protoEncoding: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_PROTOBUF_BSR, + expectedSerde: serde.PayloadEncodingProtobufBSR, + }, + { + name: "Protobuf encoding", + protoEncoding: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_PROTOBUF, + expectedSerde: serde.PayloadEncodingProtobuf, + }, + { + name: "ProtobufSchema encoding", + protoEncoding: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_PROTOBUF_SCHEMA, + expectedSerde: serde.PayloadEncodingProtobufSchema, + }, + { + name: "JSON encoding", + protoEncoding: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_JSON, + expectedSerde: serde.PayloadEncodingJSON, + }, + { + name: "JSONSchema encoding", + protoEncoding: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_JSON_SCHEMA, + expectedSerde: serde.PayloadEncodingJSONSchema, + }, + { + name: "Avro encoding", + protoEncoding: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_AVRO, + expectedSerde: serde.PayloadEncodingAvro, + }, + { + name: "Binary encoding", + protoEncoding: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_BINARY, + expectedSerde: serde.PayloadEncodingBinary, + }, + { + name: "Cbor encoding", + protoEncoding: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_CBOR, + expectedSerde: serde.PayloadEncodingCbor, + }, + { + name: "Null encoding", + protoEncoding: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_NULL, + expectedSerde: serde.PayloadEncodingNull, + }, + { + name: "Unspecified encoding", + protoEncoding: v1alpha.PayloadEncoding_PAYLOAD_ENCODING_UNSPECIFIED, + expectedSerde: serde.PayloadEncodingUnspecified, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := fromProtoEncoding(tt.protoEncoding) + assert.Equal(t, tt.expectedSerde, result) + }) + } +} + +func TestEncodingRoundTrip(t *testing.T) { + // Test that converting from serde -> proto -> serde yields the same result + encodings := []serde.PayloadEncoding{ + serde.PayloadEncodingProtobufBSR, + serde.PayloadEncodingProtobuf, + serde.PayloadEncodingProtobufSchema, + serde.PayloadEncodingJSON, + serde.PayloadEncodingJSONSchema, + serde.PayloadEncodingAvro, + serde.PayloadEncodingBinary, + serde.PayloadEncodingCbor, + serde.PayloadEncodingNull, + serde.PayloadEncodingUnspecified, + } + + for _, encoding := range encodings { + t.Run(string(encoding), func(t *testing.T) { + proto := toProtoEncoding(encoding) + result := fromProtoEncoding(proto) + assert.Equal(t, encoding, result, "Round trip conversion should preserve encoding") + }) + } +} diff --git a/backend/pkg/bsr/client.go b/backend/pkg/bsr/client.go new file mode 100644 index 0000000000..a390119d38 --- /dev/null +++ b/backend/pkg/bsr/client.go @@ -0,0 +1,270 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package bsr + +import ( + "context" + "errors" + "fmt" + "log/slog" + "net/http" + "sync" + "time" + + modulev1connect "buf.build/gen/go/bufbuild/registry/connectrpc/go/buf/registry/module/v1/modulev1connect" + modulev1 "buf.build/gen/go/bufbuild/registry/protocolbuffers/go/buf/registry/module/v1" + "connectrpc.com/connect" + "github.com/bufbuild/protocompile/linker" + "google.golang.org/protobuf/reflect/protodesc" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/reflect/protoregistry" + + "github.com/redpanda-data/console/backend/pkg/config" +) + +// Client is a wrapper around the Buf Schema Registry API for fetching protobuf descriptors. +type Client struct { + cfg config.BSR + logger *slog.Logger + + // Connect RPC client for FileDescriptorSetService + fileDescriptorSetClient modulev1connect.FileDescriptorSetServiceClient + + // Cache for file descriptor sets to avoid repeated API calls + cache map[cacheKey]*cacheEntry + cacheMutex sync.RWMutex +} + +type cacheKey struct { + messageName string + commit string +} + +type cacheEntry struct { + files linker.Files + messageDesc protoreflect.MessageDescriptor + cachedAt time.Time + err error + negativeExpiry time.Time // for error caching +} + +const ( + // Cache entries for 1 hour + cacheTTL = 1 * time.Hour + // Negative cache (errors) for 1 minute + negativeCacheTTL = 1 * time.Minute +) + +// bearerTokenInterceptor creates a Connect interceptor that adds Authorization header +func bearerTokenInterceptor(token string) connect.UnaryInterceptorFunc { + return func(next connect.UnaryFunc) connect.UnaryFunc { + return func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { + if token != "" { + req.Header().Set("Authorization", "Bearer "+token) + } + return next(ctx, req) + } + } +} + +// NewClient creates a new BSR client. +func NewClient(cfg config.BSR, logger *slog.Logger) (*Client, error) { + if !cfg.Enabled { + return nil, errors.New("BSR is not enabled") + } + + if err := cfg.Validate(); err != nil { + return nil, fmt.Errorf("invalid BSR config: %w", err) + } + + // Create HTTP client with timeout + httpClient := &http.Client{ + Timeout: 30 * time.Second, + } + + // Create Connect client with bearer token interceptor + fileDescriptorSetClient := modulev1connect.NewFileDescriptorSetServiceClient( + httpClient, + cfg.URL, // Should be full URL like https://bufbuild.internal + connect.WithInterceptors(bearerTokenInterceptor(cfg.Token)), + ) + + return &Client{ + cfg: cfg, + logger: logger, + fileDescriptorSetClient: fileDescriptorSetClient, + cache: make(map[cacheKey]*cacheEntry), + }, nil +} + +// GetMessageDescriptor fetches a message descriptor from BSR by message name and commit. +func (c *Client) GetMessageDescriptor(ctx context.Context, messageName, commit string) (protoreflect.MessageDescriptor, error) { + if messageName == "" { + return nil, errors.New("message name is empty") + } + if commit == "" { + return nil, errors.New("commit is empty") + } + + key := cacheKey{messageName: messageName, commit: commit} + + // Check cache first + c.cacheMutex.RLock() + entry, exists := c.cache[key] + c.cacheMutex.RUnlock() + + if exists { + if entry.err != nil { + // Return cached error if within negative cache window + if time.Now().Before(entry.negativeExpiry) { + return nil, entry.err + } + } else if time.Since(entry.cachedAt) < cacheTTL { + // Return cached success + return entry.messageDesc, nil + } + } + + // Fetch from BSR + _, messageDesc, err := c.fetchFromBSR(ctx, messageName, commit) + + // Update cache + c.cacheMutex.Lock() + if err != nil { + c.cache[key] = &cacheEntry{ + err: err, + negativeExpiry: time.Now().Add(negativeCacheTTL), + } + } else { + // Update or create cache entry with both files and descriptor + if entry, exists := c.cache[key]; exists { + entry.messageDesc = messageDesc + entry.cachedAt = time.Now() + } else { + c.cache[key] = &cacheEntry{ + messageDesc: messageDesc, + cachedAt: time.Now(), + } + } + } + c.cacheMutex.Unlock() + + return messageDesc, err +} + +// GetFileDescriptorSet fetches the complete file descriptor set from BSR. +// This is used for the protojson resolver when marshaling to JSON. +// Returns linker.Files which has an AsResolver() method for protojson. +func (c *Client) GetFileDescriptorSet(ctx context.Context, messageName, commit string) (linker.Files, error) { + if messageName == "" { + return nil, errors.New("message name is empty") + } + if commit == "" { + return nil, errors.New("commit is empty") + } + + key := cacheKey{messageName: messageName, commit: commit} + + // Check cache first + c.cacheMutex.RLock() + entry, exists := c.cache[key] + c.cacheMutex.RUnlock() + + if exists { + if entry.err != nil { + if time.Now().Before(entry.negativeExpiry) { + return nil, entry.err + } + } else if time.Since(entry.cachedAt) < cacheTTL { + return entry.files, nil + } + } + + // Fetch from BSR (this will also populate the cache) + files, _, err := c.fetchFromBSR(ctx, messageName, commit) + return files, err +} + +// fetchFromBSR fetches the file descriptor set from BSR via the Connect API. +func (c *Client) fetchFromBSR(ctx context.Context, messageName, commit string) (linker.Files, protoreflect.MessageDescriptor, error) { + // Build request - ResourceRef with Id value to specify commit + req := connect.NewRequest(&modulev1.GetFileDescriptorSetRequest{ + ResourceRef: &modulev1.ResourceRef{ + Value: &modulev1.ResourceRef_Id{ + Id: commit, + }, + }, + IncludeTypes: []string{messageName}, + }) + + // Call BSR API using the generated client (interceptor handles auth) + resp, err := c.fileDescriptorSetClient.GetFileDescriptorSet(ctx, req) + if err != nil { + return nil, nil, fmt.Errorf("failed to call BSR API: %w", err) + } + + if resp.Msg == nil || resp.Msg.FileDescriptorSet == nil { + return nil, nil, errors.New("BSR returned empty response") + } + + // Convert FileDescriptorSet to protoregistry.Files + protoFiles, err := protodesc.NewFiles(resp.Msg.FileDescriptorSet) + if err != nil { + return nil, nil, fmt.Errorf("failed to create proto files from descriptor set: %w", err) + } + + // Convert protoregistry.Files to linker.Files + var linkerFiles linker.Files + protoFiles.RangeFiles(func(fd protoreflect.FileDescriptor) bool { + file, err := linker.NewFileRecursive(fd) + if err != nil { + c.logger.Error("failed to create linker file", "file", fd.Path(), "error", err) + return true // continue + } + linkerFiles = append(linkerFiles, file) + return true + }) + + if len(linkerFiles) == 0 { + return nil, nil, errors.New("no valid files in descriptor set") + } + + // Find the message descriptor by fully qualified name + messageDesc, err := findMessageDescriptor(protoFiles, messageName) + if err != nil { + return nil, nil, fmt.Errorf("failed to find message descriptor for %q: %w", messageName, err) + } + + // Cache both files and message descriptor + c.cacheMutex.Lock() + c.cache[cacheKey{messageName: messageName, commit: commit}] = &cacheEntry{ + files: linkerFiles, + messageDesc: messageDesc, + cachedAt: time.Now(), + } + c.cacheMutex.Unlock() + + return linkerFiles, messageDesc, nil +} + +// findMessageDescriptor searches for a message descriptor by fully qualified name. +func findMessageDescriptor(files *protoregistry.Files, fullName string) (protoreflect.MessageDescriptor, error) { + desc, err := files.FindDescriptorByName(protoreflect.FullName(fullName)) + if err != nil { + return nil, err + } + + messageDesc, ok := desc.(protoreflect.MessageDescriptor) + if !ok { + return nil, fmt.Errorf("descriptor for %q is not a message descriptor", fullName) + } + + return messageDesc, nil +} diff --git a/backend/pkg/config/bsr.go b/backend/pkg/config/bsr.go new file mode 100644 index 0000000000..7f4e08e25c --- /dev/null +++ b/backend/pkg/config/bsr.go @@ -0,0 +1,47 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package config + +import ( + "errors" + "fmt" + "net/url" +) + +// BSR Config for using Buf Schema Registry (BSR) +type BSR struct { + Enabled bool `yaml:"enabled"` + URL string `yaml:"url"` + + // Token is the authentication token for the BSR + Token string `yaml:"token"` +} + +// Validate the BSR configurations. +func (c *BSR) Validate() error { + if !c.Enabled { + return nil + } + + if c.URL == "" { + return errors.New("BSR is enabled but no URL is configured") + } + + _, err := url.Parse(c.URL) + if err != nil { + return fmt.Errorf("failed to parse BSR url %q: %w", c.URL, err) + } + + if c.Token == "" { + return errors.New("BSR is enabled but no authentication token is configured") + } + + return nil +} diff --git a/backend/pkg/config/config.go b/backend/pkg/config/config.go index 8f1f06e586..ac8e664d27 100644 --- a/backend/pkg/config/config.go +++ b/backend/pkg/config/config.go @@ -42,6 +42,7 @@ type Config struct { Kafka Kafka `yaml:"kafka"` Serde Serde `yaml:"serde"` SchemaRegistry Schema `yaml:"schemaRegistry"` + BSR BSR `yaml:"bsr"` Logger Logging `yaml:"logger"` Analytics Analytics `yaml:"analytics"` } @@ -96,6 +97,11 @@ func (c *Config) Validate() error { return err } + err = c.BSR.Validate() + if err != nil { + return fmt.Errorf("failed to validate BSR config: %w", err) + } + err = c.Analytics.Validate() if err != nil { return fmt.Errorf("failed to validate Analytics config: %w", err) diff --git a/backend/pkg/console/service.go b/backend/pkg/console/service.go index 5de7e9c0a0..093e8433dd 100644 --- a/backend/pkg/console/service.go +++ b/backend/pkg/console/service.go @@ -21,6 +21,7 @@ import ( "github.com/twmb/franz-go/pkg/kversion" "github.com/redpanda-data/console/backend/pkg/backoff" + "github.com/redpanda-data/console/backend/pkg/bsr" "github.com/redpanda-data/console/backend/pkg/config" "github.com/redpanda-data/console/backend/pkg/connect" kafkafactory "github.com/redpanda-data/console/backend/pkg/factory/kafka" @@ -104,7 +105,16 @@ func NewService( return nil, fmt.Errorf("failed to create schema client: %w", err) } } - serdeSvc, err := serde.NewService(protoSvc, msgPackSvc, cachedSchemaClient, cfg.Serde.Cbor) + + var bsrClient *bsr.Client + if cfg.BSR.Enabled { + bsrClient, err = bsr.NewClient(cfg.BSR, loggerpkg.Named(logger, "bsr_client")) + if err != nil { + return nil, fmt.Errorf("failed to create BSR client: %w", err) + } + } + + serdeSvc, err := serde.NewService(protoSvc, msgPackSvc, cachedSchemaClient, bsrClient, cfg.Serde.Cbor) if err != nil { return nil, fmt.Errorf("failed creating serde service: %w", err) } diff --git a/backend/pkg/protogen/redpanda/api/console/v1alpha1/common.pb.go b/backend/pkg/protogen/redpanda/api/console/v1alpha1/common.pb.go index b2ed0a69cd..b5174d6299 100644 --- a/backend/pkg/protogen/redpanda/api/console/v1alpha1/common.pb.go +++ b/backend/pkg/protogen/redpanda/api/console/v1alpha1/common.pb.go @@ -98,6 +98,7 @@ const ( PayloadEncoding_PAYLOAD_ENCODING_UINT PayloadEncoding = 13 PayloadEncoding_PAYLOAD_ENCODING_CONSUMER_OFFSETS PayloadEncoding = 14 PayloadEncoding_PAYLOAD_ENCODING_CBOR PayloadEncoding = 15 + PayloadEncoding_PAYLOAD_ENCODING_PROTOBUF_BSR PayloadEncoding = 16 ) // Enum value maps for PayloadEncoding. @@ -119,6 +120,7 @@ var ( 13: "PAYLOAD_ENCODING_UINT", 14: "PAYLOAD_ENCODING_CONSUMER_OFFSETS", 15: "PAYLOAD_ENCODING_CBOR", + 16: "PAYLOAD_ENCODING_PROTOBUF_BSR", } PayloadEncoding_value = map[string]int32{ "PAYLOAD_ENCODING_UNSPECIFIED": 0, @@ -137,6 +139,7 @@ var ( "PAYLOAD_ENCODING_UINT": 13, "PAYLOAD_ENCODING_CONSUMER_OFFSETS": 14, "PAYLOAD_ENCODING_CBOR": 15, + "PAYLOAD_ENCODING_PROTOBUF_BSR": 16, } ) diff --git a/backend/pkg/serde/protobuf_bsr.go b/backend/pkg/serde/protobuf_bsr.go new file mode 100644 index 0000000000..de8f3fc37a --- /dev/null +++ b/backend/pkg/serde/protobuf_bsr.go @@ -0,0 +1,148 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package serde + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/twmb/franz-go/pkg/kgo" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/dynamicpb" +) + +var _ Serde = (*ProtobufBSRSerde)(nil) + +const ( + // BSR header keys for message type and commit + headerBSRMessageKey = "buf.registry.value.schema.message" + headerBSRCommitKey = "buf.registry.value.schema.commit" +) + +// ProtobufBSRSerde represents the serde for dealing with Protobuf messages +// stored with Buf Schema Registry (BSR) metadata in record headers. +type ProtobufBSRSerde struct { + bsrClient BSRClient +} + +// Name returns the name of the serde payload encoding. +func (ProtobufBSRSerde) Name() PayloadEncoding { + return PayloadEncodingProtobufBSR +} + +// DeserializePayload deserializes the kafka record to our internal record payload representation. +// BSR uses a different wire format than Confluent Schema Registry: +// - No magic byte or schema ID in the payload +// - Message type and commit are stored in record headers +// - Payload is just the raw protobuf binary +func (d ProtobufBSRSerde) DeserializePayload(ctx context.Context, record *kgo.Record, payloadType PayloadType) (*RecordPayload, error) { + if d.bsrClient == nil { + return &RecordPayload{}, errors.New("BSR client is not configured") + } + + // Extract payload (key or value) + payload := payloadFromRecord(record, payloadType) + if len(payload) == 0 { + return &RecordPayload{}, errors.New("payload is empty") + } + + // Extract BSR headers + // Note: We use "value" headers even for key payloads, as BSR typically only encodes values + // If BSR also supports key encoding, you may need to conditionally use "key" headers + messageName, err := extractHeader(record, headerBSRMessageKey) + if err != nil { + return &RecordPayload{}, fmt.Errorf("failed to extract message name from header: %w", err) + } + + commit, err := extractHeader(record, headerBSRCommitKey) + if err != nil { + return &RecordPayload{}, fmt.Errorf("failed to extract commit from header: %w", err) + } + + // Fetch message descriptor from BSR + messageDescriptor, err := d.bsrClient.GetMessageDescriptor(ctx, messageName, commit) + if err != nil { + return &RecordPayload{}, fmt.Errorf("failed to get message descriptor from BSR: %w", err) + } + + // Create dynamic protobuf message and unmarshal + protoMessage := dynamicpb.NewMessage(messageDescriptor) + err = proto.Unmarshal(payload, protoMessage) + if err != nil { + return nil, fmt.Errorf("unable to unmarshal protobuf encoded record: %w", err) + } + + // Get file descriptor set for resolver + files, err := d.bsrClient.GetFileDescriptorSet(ctx, messageName, commit) + if err != nil { + return &RecordPayload{}, fmt.Errorf("failed to get file descriptor set from BSR: %w", err) + } + + // Marshal proto message into JSON + o := protojson.MarshalOptions{ + UseProtoNames: false, // use lowerCamelCase + EmitUnpopulated: true, + EmitDefaultValues: true, + Resolver: files.AsResolver(), // linker.Files.AsResolver() returns protojson.Resolver + } + jsonBytes, err := o.Marshal(protoMessage) + if err != nil { + return nil, fmt.Errorf("unable to marshal protobuf message as JSON: %w", err) + } + + var native any + err = json.Unmarshal(jsonBytes, &native) + if err != nil { + return &RecordPayload{}, fmt.Errorf("failed to deserialize JSON payload: %w", err) + } + + result := &RecordPayload{ + DeserializedPayload: native, + NormalizedPayload: jsonBytes, + Encoding: PayloadEncodingProtobufBSR, + PayloadSizeBytes: len(payload), + // BSR doesn't use schema IDs like Confluent, so we leave SchemaID as nil + SchemaID: nil, + } + + return result, nil +} + +// SerializeObject serializes data into binary format ready for writing to Kafka as a record. +// For BSR, this would only handle the protobuf payload itself, not the headers. +// Headers must be set separately by the caller. +func (ProtobufBSRSerde) SerializeObject(_ context.Context, obj any, _ PayloadType, _ ...SerdeOpt) ([]byte, error) { + switch v := obj.(type) { + case proto.Message: + b, err := proto.Marshal(v) + if err != nil { + return nil, fmt.Errorf("failed to serialize protobuf payload: %w", err) + } + return b, nil + default: + return nil, fmt.Errorf("unsupported object type for BSR serialization: %T", obj) + } +} + +// extractHeader extracts a header value from a Kafka record by key. +func extractHeader(record *kgo.Record, key string) (string, error) { + for _, header := range record.Headers { + if header.Key == key { + if len(header.Value) == 0 { + return "", fmt.Errorf("header %q is empty", key) + } + return string(header.Value), nil + } + } + return "", fmt.Errorf("header %q not found", key) +} diff --git a/backend/pkg/serde/protobuf_bsr_test.go b/backend/pkg/serde/protobuf_bsr_test.go new file mode 100644 index 0000000000..eff35d02a7 --- /dev/null +++ b/backend/pkg/serde/protobuf_bsr_test.go @@ -0,0 +1,359 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package serde + +import ( + "context" + "testing" + "time" + + "github.com/bufbuild/protocompile/linker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/twmb/franz-go/pkg/kgo" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/types/known/timestamppb" + + shopv1 "github.com/redpanda-data/console/backend/pkg/serde/testdata/proto/gen/shop/v1" +) + +// mockBSRClient implements the BSRClient interface for testing +type mockBSRClient struct { + messageDescriptor protoreflect.MessageDescriptor + files linker.Files + err error +} + +func (m *mockBSRClient) GetMessageDescriptor(_ context.Context, _, _ string) (protoreflect.MessageDescriptor, error) { + if m.err != nil { + return nil, m.err + } + return m.messageDescriptor, nil +} + +func (m *mockBSRClient) GetFileDescriptorSet(_ context.Context, _, _ string) (linker.Files, error) { + if m.err != nil { + return linker.Files{}, m.err + } + return m.files, nil +} + +func TestProtobufBSRSerde_Name(t *testing.T) { + serde := ProtobufBSRSerde{} + assert.Equal(t, PayloadEncodingProtobufBSR, serde.Name()) +} + +func TestProtobufBSRSerde_DeserializePayload(t *testing.T) { + // Create test message + orderCreatedAt := time.Date(2023, time.June, 10, 13, 0, 0, 0, time.UTC) + msg := shopv1.Order{ + Id: "test-order-123", + CreatedAt: timestamppb.New(orderCreatedAt), + } + + msgData, err := proto.Marshal(&msg) + require.NoError(t, err) + + // Get message descriptor and files for mock + msgDesc := msg.ProtoReflect().Descriptor() + fileDesc := msgDesc.ParentFile() + + // Create a linker.Files from the file descriptor + linkerFile, err := linker.NewFileRecursive(fileDesc) + require.NoError(t, err) + files := linker.Files{linkerFile} + + tests := []struct { + name string + record *kgo.Record + payloadType PayloadType + bsrClient BSRClient + wantErr bool + wantErrMsg string + validateRes func(t *testing.T, res *RecordPayload) + }{ + { + name: "successful deserialization", + record: &kgo.Record{ + Value: msgData, + Headers: []kgo.RecordHeader{ + {Key: "buf.registry.value.schema.message", Value: []byte("shop.v1.Order")}, + {Key: "buf.registry.value.schema.commit", Value: []byte("abc123")}, + }, + }, + payloadType: PayloadTypeValue, + bsrClient: &mockBSRClient{ + messageDescriptor: msgDesc, + files: files, + }, + wantErr: false, + validateRes: func(t *testing.T, res *RecordPayload) { + assert.Equal(t, PayloadEncodingProtobufBSR, res.Encoding) + assert.NotNil(t, res.DeserializedPayload) + assert.NotEmpty(t, res.NormalizedPayload) + assert.Nil(t, res.SchemaID) + assert.Equal(t, len(msgData), res.PayloadSizeBytes) + + // Validate the deserialized content + payload, ok := res.DeserializedPayload.(map[string]any) + require.True(t, ok, "expected payload to be a map") + assert.Equal(t, "test-order-123", payload["id"]) + }, + }, + { + name: "nil bsr client", + record: &kgo.Record{ + Value: msgData, + }, + payloadType: PayloadTypeValue, + bsrClient: nil, + wantErr: true, + wantErrMsg: "BSR client is not configured", + }, + { + name: "empty payload", + record: &kgo.Record{ + Value: []byte{}, + }, + payloadType: PayloadTypeValue, + bsrClient: &mockBSRClient{ + messageDescriptor: msgDesc, + files: files, + }, + wantErr: true, + wantErrMsg: "payload is empty", + }, + { + name: "missing message header", + record: &kgo.Record{ + Value: msgData, + Headers: []kgo.RecordHeader{ + {Key: "buf.registry.value.schema.commit", Value: []byte("abc123")}, + }, + }, + payloadType: PayloadTypeValue, + bsrClient: &mockBSRClient{ + messageDescriptor: msgDesc, + files: files, + }, + wantErr: true, + wantErrMsg: "failed to extract message name from header", + }, + { + name: "missing commit header", + record: &kgo.Record{ + Value: msgData, + Headers: []kgo.RecordHeader{ + {Key: "buf.registry.value.schema.message", Value: []byte("shop.v1.Order")}, + }, + }, + payloadType: PayloadTypeValue, + bsrClient: &mockBSRClient{ + messageDescriptor: msgDesc, + files: files, + }, + wantErr: true, + wantErrMsg: "failed to extract commit from header", + }, + { + name: "empty message header value", + record: &kgo.Record{ + Value: msgData, + Headers: []kgo.RecordHeader{ + {Key: "buf.registry.value.schema.message", Value: []byte{}}, + {Key: "buf.registry.value.schema.commit", Value: []byte("abc123")}, + }, + }, + payloadType: PayloadTypeValue, + bsrClient: &mockBSRClient{ + messageDescriptor: msgDesc, + files: files, + }, + wantErr: true, + wantErrMsg: "header \"buf.registry.value.schema.message\" is empty", + }, + { + name: "invalid protobuf data", + record: &kgo.Record{ + Value: []byte("not valid protobuf"), + Headers: []kgo.RecordHeader{ + {Key: "buf.registry.value.schema.message", Value: []byte("shop.v1.Order")}, + {Key: "buf.registry.value.schema.commit", Value: []byte("abc123")}, + }, + }, + payloadType: PayloadTypeValue, + bsrClient: &mockBSRClient{ + messageDescriptor: msgDesc, + files: files, + }, + wantErr: true, + wantErrMsg: "unable to unmarshal protobuf encoded record", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + serde := ProtobufBSRSerde{ + bsrClient: tt.bsrClient, + } + + res, err := serde.DeserializePayload(context.Background(), tt.record, tt.payloadType) + + if tt.wantErr { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.wantErrMsg) + return + } + + require.NoError(t, err) + if tt.validateRes != nil { + tt.validateRes(t, res) + } + }) + } +} + +func TestProtobufBSRSerde_SerializeObject(t *testing.T) { + orderCreatedAt := time.Date(2023, time.June, 10, 13, 0, 0, 0, time.UTC) + msg := &shopv1.Order{ + Id: "test-order-456", + CreatedAt: timestamppb.New(orderCreatedAt), + } + + tests := []struct { + name string + obj any + payloadType PayloadType + wantErr bool + wantErrMsg string + validate func(t *testing.T, data []byte) + }{ + { + name: "successful serialization", + obj: msg, + payloadType: PayloadTypeValue, + wantErr: false, + validate: func(t *testing.T, data []byte) { + // Verify we can unmarshal it back + var decoded shopv1.Order + err := proto.Unmarshal(data, &decoded) + require.NoError(t, err) + assert.Equal(t, "test-order-456", decoded.Id) + }, + }, + { + name: "unsupported object type", + obj: "not a proto message", + payloadType: PayloadTypeValue, + wantErr: true, + wantErrMsg: "unsupported object type for BSR serialization", + }, + { + name: "nil object", + obj: nil, + payloadType: PayloadTypeValue, + wantErr: true, + wantErrMsg: "unsupported object type for BSR serialization", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + serde := ProtobufBSRSerde{} + + data, err := serde.SerializeObject(context.Background(), tt.obj, tt.payloadType) + + if tt.wantErr { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.wantErrMsg) + return + } + + require.NoError(t, err) + if tt.validate != nil { + tt.validate(t, data) + } + }) + } +} + +func TestExtractHeader(t *testing.T) { + tests := []struct { + name string + record *kgo.Record + key string + wantValue string + wantErr bool + wantErrMsg string + }{ + { + name: "header exists", + record: &kgo.Record{ + Headers: []kgo.RecordHeader{ + {Key: "test-key", Value: []byte("test-value")}, + }, + }, + key: "test-key", + wantValue: "test-value", + wantErr: false, + }, + { + name: "header not found", + record: &kgo.Record{ + Headers: []kgo.RecordHeader{ + {Key: "other-key", Value: []byte("other-value")}, + }, + }, + key: "test-key", + wantErr: true, + wantErrMsg: "header \"test-key\" not found", + }, + { + name: "empty header value", + record: &kgo.Record{ + Headers: []kgo.RecordHeader{ + {Key: "test-key", Value: []byte{}}, + }, + }, + key: "test-key", + wantErr: true, + wantErrMsg: "header \"test-key\" is empty", + }, + { + name: "multiple headers, find correct one", + record: &kgo.Record{ + Headers: []kgo.RecordHeader{ + {Key: "header1", Value: []byte("value1")}, + {Key: "header2", Value: []byte("value2")}, + {Key: "header3", Value: []byte("value3")}, + }, + }, + key: "header2", + wantValue: "value2", + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + value, err := extractHeader(tt.record, tt.key) + + if tt.wantErr { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.wantErrMsg) + return + } + + require.NoError(t, err) + assert.Equal(t, tt.wantValue, value) + }) + } +} diff --git a/backend/pkg/serde/service.go b/backend/pkg/serde/service.go index 4745bc67e3..0090dc7972 100644 --- a/backend/pkg/serde/service.go +++ b/backend/pkg/serde/service.go @@ -13,7 +13,9 @@ import ( "context" "fmt" + "github.com/bufbuild/protocompile/linker" "github.com/twmb/franz-go/pkg/kgo" + "google.golang.org/protobuf/reflect/protoreflect" "github.com/redpanda-data/console/backend/pkg/config" "github.com/redpanda-data/console/backend/pkg/msgpack" @@ -21,6 +23,12 @@ import ( "github.com/redpanda-data/console/backend/pkg/schema" ) +// BSRClient is the interface for the BSR client that fetches protobuf descriptors. +type BSRClient interface { + GetMessageDescriptor(ctx context.Context, messageName, commit string) (protoreflect.MessageDescriptor, error) + GetFileDescriptorSet(ctx context.Context, messageName, commit string) (linker.Files, error) +} + // Service is the struct that holds all dependencies that are required to deserialize // a record. type Service struct { @@ -32,25 +40,43 @@ func NewService( protoSvc *proto.Service, msgPackSvc *msgpack.Service, cachedSchemaClient schema.Client, + bsrClient BSRClient, cborConfig config.Cbor, ) (*Service, error) { - return &Service{ - SerDes: []Serde{ - NullSerde{}, - JSONSerde{}, - JSONSchemaSerde{schemaClient: cachedSchemaClient}, - XMLSerde{}, - AvroSerde{schemaClient: cachedSchemaClient}, + serdes := []Serde{ + NullSerde{}, + JSONSerde{}, + JSONSchemaSerde{schemaClient: cachedSchemaClient}, + XMLSerde{}, + AvroSerde{schemaClient: cachedSchemaClient}, + } + + // Add BSR serde if client is configured - try before other protobuf serdes + // since BSR messages have specific headers that identify them + if bsrClient != nil { + serdes = append(serdes, ProtobufBSRSerde{bsrClient: bsrClient}, ProtobufSerde{ProtoSvc: protoSvc}, ProtobufSchemaSerde{schemaClient: cachedSchemaClient}, - MsgPackSerde{MsgPackService: msgPackSvc}, - SmileSerde{}, - CborSerde{Config: cborConfig}, - UTF8Serde{}, - TextSerde{}, - UintSerde{}, - BinarySerde{}, - }, + ) + } else { + serdes = append(serdes, + ProtobufSerde{ProtoSvc: protoSvc}, + ProtobufSchemaSerde{schemaClient: cachedSchemaClient}, + ) + } + + serdes = append(serdes, + MsgPackSerde{MsgPackService: msgPackSvc}, + SmileSerde{}, + CborSerde{Config: cborConfig}, + UTF8Serde{}, + TextSerde{}, + UintSerde{}, + BinarySerde{}, + ) + + return &Service{ + SerDes: serdes, }, nil } diff --git a/backend/pkg/serde/service_integration_test.go b/backend/pkg/serde/service_integration_test.go index 42c4b2cef8..829cc97f13 100644 --- a/backend/pkg/serde/service_integration_test.go +++ b/backend/pkg/serde/service_integration_test.go @@ -172,7 +172,7 @@ func (s *SerdeIntegrationTestSuite) TestDeserializeRecord() { cborConfig := config.Cbor{} - serdeSvc, err := NewService(protoSvc, mspPackSvc, cachedSchemaClient, cborConfig) + serdeSvc, err := NewService(protoSvc, mspPackSvc, cachedSchemaClient, nil, cborConfig) require.NoError(err) t.Run("plain JSON", func(t *testing.T) { @@ -560,7 +560,7 @@ func (s *SerdeIntegrationTestSuite) TestDeserializeRecord() { require.NoError(err) require.NoError(protoSvc.Start()) - serdeSvc, err := NewService(protoSvc, mspPackSvc, cachedSchemaClient, cborConfig) + serdeSvc, err := NewService(protoSvc, mspPackSvc, cachedSchemaClient, nil, cborConfig) require.NoError(err) orderCreatedAt := time.Date(2023, time.June, 10, 13, 0, 0, 0, time.UTC) @@ -721,7 +721,7 @@ func (s *SerdeIntegrationTestSuite) TestDeserializeRecord() { require.NoError(err) require.NoError(testProtoSvc.Start()) - serdeSvc, err := NewService(testProtoSvc, mspPackSvc, cachedSchemaClient, cborConfig) + serdeSvc, err := NewService(testProtoSvc, mspPackSvc, cachedSchemaClient, nil, cborConfig) require.NoError(err) orderCreatedAt := time.Date(2023, time.July, 15, 10, 0, 0, 0, time.UTC) @@ -2178,7 +2178,7 @@ func (s *SerdeIntegrationTestSuite) TestDeserializeRecord() { cachedSchemaClient2, err := schemacache.NewCachedClient(schemaClientFactory2, cacheNamespaceFn) require.NoError(err) - serdeSvc2, err := NewService(protoSvc2, mspPackSvc, cachedSchemaClient2, cborConfig) + serdeSvc2, err := NewService(protoSvc2, mspPackSvc, cachedSchemaClient2, nil, cborConfig) require.NoError(err) for _, cr := range records { @@ -2588,7 +2588,7 @@ func (s *SerdeIntegrationTestSuite) TestDeserializeRecord() { cborConfig := config.Cbor{} - serdeSvc, err := NewService(protoSvc, mspPackSvc, cachedSchemaClient, cborConfig) + serdeSvc, err := NewService(protoSvc, mspPackSvc, cachedSchemaClient, nil, cborConfig) require.NoError(err) var serde sr.Serde @@ -2826,7 +2826,7 @@ func (s *SerdeIntegrationTestSuite) TestDeserializeRecord() { } // cachedSchemaClient remains nil here when schema registry disabled - disabledSerdeSvc, err := NewService(protoSvc, mspPackSvc, cachedSchemaClient, cborConfig) + disabledSerdeSvc, err := NewService(protoSvc, mspPackSvc, cachedSchemaClient, nil, cborConfig) require.NoError(err) // Step 3: Consume the Avro message and attempt deserialization diff --git a/backend/pkg/serde/types.go b/backend/pkg/serde/types.go index 4822fa39cc..9576ccac8c 100644 --- a/backend/pkg/serde/types.go +++ b/backend/pkg/serde/types.go @@ -23,6 +23,8 @@ const ( PayloadEncodingProtobuf PayloadEncoding = "protobuf" // PayloadEncodingProtobufSchema is the enum of protobuf encoded types using schema registry. PayloadEncodingProtobufSchema PayloadEncoding = "protobufSchema" + // PayloadEncodingProtobufBSR is the enum of protobuf encoded types using Buf Schema Registry. + PayloadEncodingProtobufBSR PayloadEncoding = "protobufBSR" // PayloadEncodingJSON is the enum of JSON encoded types. PayloadEncodingJSON PayloadEncoding = "json" // PayloadEncodingJSONSchema is the enum of JSON encoded types using schema registry. diff --git a/frontend/src/components/pages/topics/Tab.Messages/message-display/message-key-preview.tsx b/frontend/src/components/pages/topics/Tab.Messages/message-display/message-key-preview.tsx index a56f0edb08..5090606ae9 100644 --- a/frontend/src/components/pages/topics/Tab.Messages/message-display/message-key-preview.tsx +++ b/frontend/src/components/pages/topics/Tab.Messages/message-display/message-key-preview.tsx @@ -125,7 +125,7 @@ export const MessageKeyPreview = observer( {text} - {key.encoding.toUpperCase()} - {prettyBytes(key.size)} + {key.encoding?.toUpperCase() || 'UNKNOWN'} - {prettyBytes(key.size)} ); diff --git a/frontend/src/components/pages/topics/Tab.Messages/message-display/message-preview.tsx b/frontend/src/components/pages/topics/Tab.Messages/message-display/message-preview.tsx index b48903372d..6d3fce4075 100644 --- a/frontend/src/components/pages/topics/Tab.Messages/message-display/message-preview.tsx +++ b/frontend/src/components/pages/topics/Tab.Messages/message-display/message-preview.tsx @@ -106,7 +106,7 @@ export const MessagePreview = observer( - {value.encoding.toUpperCase()} - {prettyBytes(value.size)} + {value.encoding?.toUpperCase() || 'UNKNOWN'} - {prettyBytes(value.size)} ); diff --git a/frontend/src/protogen/redpanda/api/console/v1alpha1/common_pb.ts b/frontend/src/protogen/redpanda/api/console/v1alpha1/common_pb.ts index 909202719c..2324ae9172 100644 --- a/frontend/src/protogen/redpanda/api/console/v1alpha1/common_pb.ts +++ b/frontend/src/protogen/redpanda/api/console/v1alpha1/common_pb.ts @@ -186,6 +186,11 @@ export enum PayloadEncoding { * @generated from enum value: PAYLOAD_ENCODING_CBOR = 15; */ CBOR = 15, + + /** + * @generated from enum value: PAYLOAD_ENCODING_PROTOBUF_BSR = 16; + */ + PROTOBUF_BSR = 16, } /** diff --git a/frontend/src/state/backend-api.ts b/frontend/src/state/backend-api.ts index 3bce57b61a..55ca1f1e4e 100644 --- a/frontend/src/state/backend-api.ts +++ b/frontend/src/state/backend-api.ts @@ -2888,9 +2888,18 @@ export function createMessageSearch() { case PayloadEncoding.JSON: m.key.encoding = 'json'; break; + case PayloadEncoding.JSON_SCHEMA: + m.key.encoding = 'jsonSchema'; + break; case PayloadEncoding.PROTOBUF: m.key.encoding = 'protobuf'; break; + case PayloadEncoding.PROTOBUF_SCHEMA: + m.key.encoding = 'protobufSchema'; + break; + case PayloadEncoding.PROTOBUF_BSR: + m.key.encoding = 'protobufBSR'; + break; case PayloadEncoding.MESSAGE_PACK: m.key.encoding = 'msgpack'; break; @@ -2965,9 +2974,18 @@ export function createMessageSearch() { case PayloadEncoding.JSON: m.value.encoding = 'json'; break; + case PayloadEncoding.JSON_SCHEMA: + m.value.encoding = 'jsonSchema'; + break; case PayloadEncoding.PROTOBUF: m.value.encoding = 'protobuf'; break; + case PayloadEncoding.PROTOBUF_SCHEMA: + m.value.encoding = 'protobufSchema'; + break; + case PayloadEncoding.PROTOBUF_BSR: + m.value.encoding = 'protobufBSR'; + break; case PayloadEncoding.MESSAGE_PACK: m.value.encoding = 'msgpack'; break; diff --git a/frontend/src/state/rest-interfaces.ts b/frontend/src/state/rest-interfaces.ts index 4a7d10f284..6c0d5bbb8b 100644 --- a/frontend/src/state/rest-interfaces.ts +++ b/frontend/src/state/rest-interfaces.ts @@ -171,7 +171,10 @@ export type MessageDataType = | 'null' | 'avro' | 'protobuf' + | 'protobufSchema' + | 'protobufBSR' | 'json' + | 'jsonSchema' | 'xml' | 'text' | 'utf8WithControlChars' diff --git a/proto/redpanda/api/console/v1alpha1/common.proto b/proto/redpanda/api/console/v1alpha1/common.proto index 61810d7340..a74c920206 100644 --- a/proto/redpanda/api/console/v1alpha1/common.proto +++ b/proto/redpanda/api/console/v1alpha1/common.proto @@ -34,6 +34,7 @@ enum PayloadEncoding { PAYLOAD_ENCODING_UINT = 13; PAYLOAD_ENCODING_CONSUMER_OFFSETS = 14; PAYLOAD_ENCODING_CBOR = 15; + PAYLOAD_ENCODING_PROTOBUF_BSR = 16; } message TroubleshootReport {