Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 36 additions & 22 deletions modules/redpanda/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package redpanda

import (
"fmt"
"net"
"strconv"

Expand Down Expand Up @@ -84,7 +85,7 @@ func defaultOptions() options {
var _ testcontainers.ContainerCustomizer = (Option)(nil)

// Option is an option for the Redpanda container.
type Option func(*options)
type Option func(*options) error

// Customize is a NOOP. It's defined to satisfy the testcontainers.ContainerCustomizer interface.
func (o Option) Customize(*testcontainers.GenericContainerRequest) error {
Expand All @@ -96,16 +97,18 @@ func (o Option) Customize(*testcontainers.GenericContainerRequest) error {
// that shall be created, so that you can use these to authenticate against
// Redpanda (either for the Kafka API or Schema Registry HTTP access).
func WithNewServiceAccount(username, password string) Option {
return func(o *options) {
return func(o *options) error {
o.ServiceAccounts[username] = password
return nil
}
}

// WithSuperusers defines the superusers added to the redpanda config.
// By default, there are no superusers.
func WithSuperusers(superusers ...string) Option {
return func(o *options) {
return func(o *options) error {
o.Superusers = superusers
return nil
}
}

Expand All @@ -114,31 +117,35 @@ func WithSuperusers(superusers ...string) Option {
// When setting an authentication method, make sure to add users
// as well as authorize them using the WithSuperusers() option.
func WithEnableSASL() Option {
return func(o *options) {
return func(o *options) error {
o.KafkaAuthenticationMethod = "sasl"
return nil
}
}

// WithEnableKafkaAuthorization enables authorization for connections on the Kafka API.
func WithEnableKafkaAuthorization() Option {
return func(o *options) {
return func(o *options) error {
o.KafkaEnableAuthorization = true
return nil
}
}

// WithEnableWasmTransform enables wasm transform.
// Should not be used with RP versions before 23.3
func WithEnableWasmTransform() Option {
return func(o *options) {
return func(o *options) error {
o.EnableWasmTransform = true
return nil
}
}

// WithEnableSchemaRegistryHTTPBasicAuth enables HTTP basic authentication for
// Schema Registry.
func WithEnableSchemaRegistryHTTPBasicAuth() Option {
return func(o *options) {
return func(o *options) error {
o.SchemaRegistryAuthenticationMethod = "http_basic"
return nil
}
}

Expand All @@ -147,29 +154,33 @@ func WithEnableSchemaRegistryHTTPBasicAuth() Option {
func WithHTTPProxyAuthMethod(method HTTPProxyAuthMethod) Option {
switch method {
case HTTPProxyAuthMethodNone, HTTPProxyAuthMethodHTTPBasic, HTTPProxyAuthMethodOIDC:
return func(o *options) {
return func(o *options) error {
o.HTTPProxyAuthenticationMethod = method
return nil
}
default:
return func(o *options) {
return func(o *options) error {
// Invalid method, default to "none"
o.HTTPProxyAuthenticationMethod = HTTPProxyAuthMethodNone
return nil
}
}
}

// WithAutoCreateTopics enables topic auto creation.
func WithAutoCreateTopics() Option {
return func(o *options) {
return func(o *options) error {
o.AutoCreateTopics = true
return nil
}
}

func WithTLS(cert, key []byte) Option {
return func(o *options) {
return func(o *options) error {
o.EnableTLS = true
o.cert = cert
o.key = key
return nil
}
}

Expand All @@ -178,22 +189,23 @@ func WithTLS(cert, key []byte) Option {
// networks. At least one network must be attached to the container, if not an
// error will be thrown when starting the container.
func WithListener(lis string) Option {
host, port, err := net.SplitHostPort(lis)
if err != nil {
return func(_ *options) {}
}
return func(o *options) error {
host, port, err := net.SplitHostPort(lis)
if err != nil {
return fmt.Errorf("split host port: %w", err)
}

portInt, err := strconv.Atoi(port)
if err != nil {
return func(_ *options) {}
}
portInt, err := strconv.Atoi(port)
if err != nil {
return fmt.Errorf("parse port: %w", err)
}

return func(o *options) {
o.Listeners = append(o.Listeners, listener{
Address: host,
Port: portInt,
AuthenticationMethod: o.KafkaAuthenticationMethod,
})
return nil
}
}

Expand All @@ -202,16 +214,18 @@ func WithListener(lis string) Option {
// config file, which is particularly useful for configs requiring a restart
// when otherwise applied to a running Redpanda instance.
func WithBootstrapConfig(cfg string, val any) Option {
return func(o *options) {
return func(o *options) error {
o.ExtraBootstrapConfig[cfg] = val
return nil
}
}

// WithAdminAPIAuthentication enables Admin API Authentication.
// It sets `admin_api_require_auth` configuration to true and configures a bootstrap user account.
// See https://docs.redpanda.com/current/deploy/deployment-option/self-hosted/manual/production/production-deployment/#bootstrap-a-user-account
func WithAdminAPIAuthentication() Option {
return func(o *options) {
return func(o *options) error {
o.enableAdminAPIAuthentication = true
return nil
}
}
Loading
Loading