Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 36 additions & 22 deletions modules/redpanda/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package redpanda

import (
"fmt"
"net"
"strconv"

Expand Down Expand Up @@ -84,7 +85,7 @@ func defaultOptions() options {
var _ testcontainers.ContainerCustomizer = (Option)(nil)

// Option is an option for the Redpanda container.
type Option func(*options)
type Option func(*options) error

// Customize is a NOOP. It's defined to satisfy the testcontainers.ContainerCustomizer interface.
func (o Option) Customize(*testcontainers.GenericContainerRequest) error {
Expand All @@ -96,16 +97,18 @@ func (o Option) Customize(*testcontainers.GenericContainerRequest) error {
// that shall be created, so that you can use these to authenticate against
// Redpanda (either for the Kafka API or Schema Registry HTTP access).
func WithNewServiceAccount(username, password string) Option {
return func(o *options) {
return func(o *options) error {
o.ServiceAccounts[username] = password
return nil
}
}

// WithSuperusers defines the superusers added to the redpanda config.
// By default, there are no superusers.
func WithSuperusers(superusers ...string) Option {
return func(o *options) {
return func(o *options) error {
o.Superusers = superusers
return nil
}
}

Expand All @@ -114,31 +117,35 @@ func WithSuperusers(superusers ...string) Option {
// When setting an authentication method, make sure to add users
// as well as authorize them using the WithSuperusers() option.
func WithEnableSASL() Option {
return func(o *options) {
return func(o *options) error {
o.KafkaAuthenticationMethod = "sasl"
return nil
}
}

// WithEnableKafkaAuthorization enables authorization for connections on the Kafka API.
func WithEnableKafkaAuthorization() Option {
return func(o *options) {
return func(o *options) error {
o.KafkaEnableAuthorization = true
return nil
}
}

// WithEnableWasmTransform enables wasm transform.
// Should not be used with RP versions before 23.3
func WithEnableWasmTransform() Option {
return func(o *options) {
return func(o *options) error {
o.EnableWasmTransform = true
return nil
}
}

// WithEnableSchemaRegistryHTTPBasicAuth enables HTTP basic authentication for
// Schema Registry.
func WithEnableSchemaRegistryHTTPBasicAuth() Option {
return func(o *options) {
return func(o *options) error {
o.SchemaRegistryAuthenticationMethod = "http_basic"
return nil
}
}

Expand All @@ -147,29 +154,33 @@ func WithEnableSchemaRegistryHTTPBasicAuth() Option {
func WithHTTPProxyAuthMethod(method HTTPProxyAuthMethod) Option {
switch method {
case HTTPProxyAuthMethodNone, HTTPProxyAuthMethodHTTPBasic, HTTPProxyAuthMethodOIDC:
return func(o *options) {
return func(o *options) error {
o.HTTPProxyAuthenticationMethod = method
return nil
}
default:
return func(o *options) {
return func(o *options) error {
// Invalid method, default to "none"
o.HTTPProxyAuthenticationMethod = HTTPProxyAuthMethodNone
return nil
}
}
}

// WithAutoCreateTopics enables topic auto creation.
func WithAutoCreateTopics() Option {
return func(o *options) {
return func(o *options) error {
o.AutoCreateTopics = true
return nil
}
}

func WithTLS(cert, key []byte) Option {
return func(o *options) {
return func(o *options) error {
o.EnableTLS = true
o.cert = cert
o.key = key
return nil
}
}

Expand All @@ -178,22 +189,23 @@ func WithTLS(cert, key []byte) Option {
// networks. At least one network must be attached to the container, if not an
// error will be thrown when starting the container.
func WithListener(lis string) Option {
host, port, err := net.SplitHostPort(lis)
if err != nil {
return func(_ *options) {}
}
return func(o *options) error {
host, port, err := net.SplitHostPort(lis)
if err != nil {
return fmt.Errorf("split host port: %w", err)
}

portInt, err := strconv.Atoi(port)
if err != nil {
return func(_ *options) {}
}
portInt, err := strconv.Atoi(port)
if err != nil {
return fmt.Errorf("parse port: %w", err)
}

return func(o *options) {
o.Listeners = append(o.Listeners, listener{
Address: host,
Port: portInt,
AuthenticationMethod: o.KafkaAuthenticationMethod,
})
return nil
}
}

Expand All @@ -202,16 +214,18 @@ func WithListener(lis string) Option {
// config file, which is particularly useful for configs requiring a restart
// when otherwise applied to a running Redpanda instance.
func WithBootstrapConfig(cfg string, val any) Option {
return func(o *options) {
return func(o *options) error {
o.ExtraBootstrapConfig[cfg] = val
return nil
}
}

// WithAdminAPIAuthentication enables Admin API Authentication.
// It sets `admin_api_require_auth` configuration to true and configures a bootstrap user account.
// See https://docs.redpanda.com/current/deploy/deployment-option/self-hosted/manual/production/production-deployment/#bootstrap-a-user-account
func WithAdminAPIAuthentication() Option {
return func(o *options) {
return func(o *options) error {
o.enableAdminAPIAuthentication = true
return nil
}
}
155 changes: 78 additions & 77 deletions modules/redpanda/redpanda.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,65 +64,56 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize

// Run creates an instance of the Redpanda container type
func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*Container, error) {
// 1. Create container request.
// Some (e.g. Image) may be overridden by providing an option argument to this function.
req := testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: img,
ConfigModifier: func(c *container.Config) {
c.User = "root:root"
},
// Files: Will be added later after we've rendered our YAML templates.
ExposedPorts: []string{
defaultKafkaAPIPort,
defaultAdminAPIPort,
defaultSchemaRegistryPort,
defaultHTTPProxyPort,
},
Entrypoint: []string{entrypointFile},
Cmd: []string{
"redpanda",
"start",
"--mode=dev-container",
"--smp=1",
"--memory=1G",
},
WaitingFor: wait.ForAll(
// Wait for the ports to be mapped without accessing them,
// because container needs Redpanda configuration before Redpanda is started
// and the mapped ports are part of that configuration.
wait.ForMappedPort(defaultKafkaAPIPort),
wait.ForMappedPort(defaultAdminAPIPort),
wait.ForMappedPort(defaultSchemaRegistryPort),
wait.ForMappedPort(defaultHTTPProxyPort),
),
},
Started: true,
}

// 2. Gather all config options (defaults and then apply provided options)
// 1. Gather all config options (defaults and then apply provided options)
settings := defaultOptions()
for _, opt := range opts {
if apply, ok := opt.(Option); ok {
apply(&settings)
}
if err := opt.Customize(&req); err != nil {
return nil, err
if err := apply(&settings); err != nil {
return nil, fmt.Errorf("apply option: %w", err)
}
}
}

// 2.1. If the image is not at least v23.3, disable wasm transform
if !isAtLeastVersion(req.Image, "23.3") {
// 2. If the image is not at least v23.3, disable wasm transform
if !isAtLeastVersion(img, "23.3") {
settings.EnableWasmTransform = false
}

// 2.2. If enabled, bootstrap user account
// 3. Build module options
moduleOpts := []testcontainers.ContainerCustomizer{
testcontainers.WithConfigModifier(func(c *container.Config) {
c.User = "root:root"
}),
testcontainers.WithExposedPorts(
defaultKafkaAPIPort,
defaultAdminAPIPort,
defaultSchemaRegistryPort,
defaultHTTPProxyPort,
),
testcontainers.WithEntrypoint(entrypointFile),
testcontainers.WithCmd(
"redpanda",
"start",
"--mode=dev-container",
"--smp=1",
"--memory=1G",
),
testcontainers.WithWaitStrategy(
// Wait for the ports to be mapped without accessing them,
// because container needs Redpanda configuration before Redpanda is started
// and the mapped ports are part of that configuration.
wait.ForMappedPort(defaultKafkaAPIPort),
wait.ForMappedPort(defaultAdminAPIPort),
wait.ForMappedPort(defaultSchemaRegistryPort),
wait.ForMappedPort(defaultHTTPProxyPort),
),
}

// 4. If enabled, bootstrap user account
envVars := map[string]string{}
if settings.enableAdminAPIAuthentication {
// set the RP_BOOTSTRAP_USER env var
if req.Env == nil {
req.Env = map[string]string{}
}
req.Env["RP_BOOTSTRAP_USER"] = bootstrapAdminAPIUser + ":" + bootstrapAdminAPIPassword
envVars["RP_BOOTSTRAP_USER"] = bootstrapAdminAPIUser + ":" + bootstrapAdminAPIPassword

// add our internal bootstrap admin user to superusers
settings.Superusers = append(settings.Superusers, bootstrapAdminAPIUser)
Expand All @@ -135,13 +126,11 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
settings.ExtraBootstrapConfig["admin_api_require_auth"] = true
}

// 3. Register extra kafka listeners if provided, network aliases will be
// set
if err := registerListeners(settings, req); err != nil {
return nil, fmt.Errorf("register listeners: %w", err)
if len(envVars) > 0 {
moduleOpts = append(moduleOpts, testcontainers.WithEnv(envVars))
}

// Bootstrap config file contains cluster configurations which will only be considered
// 5. Bootstrap config file contains cluster configurations which will only be considered
// the very first time you start a cluster.
bootstrapConfig, err := renderBootstrapConfig(settings)
if err != nil {
Expand All @@ -153,22 +142,22 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
// We have to do this kind of two-step process, because we need to know the mapped
// port, so that we can use this in Redpanda's advertised listeners configuration for
// the Kafka API.
req.Files = append(req.Files,
testcontainers.ContainerFile{
files := []testcontainers.ContainerFile{
{
Reader: bytes.NewReader(entrypoint),
ContainerFilePath: entrypointFile,
FileMode: 700,
},
testcontainers.ContainerFile{
{
Reader: bytes.NewReader(bootstrapConfig),
ContainerFilePath: path.Join(redpandaDir, bootstrapConfigFile),
FileMode: 600,
},
)
}

// 4. Create certificate and key for TLS connections.
// 7. Create certificate and key for TLS connections.
if settings.EnableTLS {
req.Files = append(req.Files,
files = append(files,
testcontainers.ContainerFile{
Reader: bytes.NewReader(settings.cert),
ContainerFilePath: path.Join(redpandaDir, certFile),
Expand All @@ -182,16 +171,26 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
)
}

ctr, err := testcontainers.GenericContainer(ctx, req)
moduleOpts = append(moduleOpts, testcontainers.WithFiles(files...))

// 8. Append user-provided options
moduleOpts = append(moduleOpts, opts...)

// 9. Add listener network aliases as final step (must be after user options that add networks)
if len(settings.Listeners) > 0 {
moduleOpts = append(moduleOpts, withListeners(settings.Listeners))
}

ctr, err := testcontainers.Run(ctx, img, moduleOpts...)
var c *Container
if ctr != nil {
c = &Container{Container: ctr}
}
if err != nil {
return c, fmt.Errorf("generic container: %w", err)
return c, fmt.Errorf("run redpanda: %w", err)
}

// 5. Get mapped port for the Kafka API, so that we can render and then mount
// 9. Get mapped port for the Kafka API, so that we can render and then mount
// the Redpanda config with the advertised Kafka address.
hostIP, err := ctr.Host(ctx)
if err != nil {
Expand Down Expand Up @@ -334,27 +333,29 @@ func renderBootstrapConfig(settings options) ([]byte, error) {
return bootstrapConfig.Bytes(), nil
}

// registerListeners validates that the provided listeners are valid and set network aliases for the provided addresses.
// withListeners creates a CustomizeRequestOption that validates and sets network aliases for the provided listeners.
// The container must be attached to at least one network.
func registerListeners(settings options, req testcontainers.GenericContainerRequest) error {
if len(settings.Listeners) == 0 {
return nil
}

if len(req.Networks) == 0 {
return errors.New("container must be attached to at least one network")
}
func withListeners(listeners []listener) testcontainers.CustomizeRequestOption {
return func(req *testcontainers.GenericContainerRequest) error {
if len(listeners) == 0 {
return nil
}

for _, listener := range settings.Listeners {
if listener.Port < 0 || listener.Port > math.MaxUint16 {
return fmt.Errorf("invalid port on listener %s:%d (must be between 0 and 65535)", listener.Address, listener.Port)
if len(req.Networks) == 0 {
return errors.New("container must be attached to at least one network")
}

for _, network := range req.Networks {
req.NetworkAliases[network] = append(req.NetworkAliases[network], listener.Address)
for _, listener := range listeners {
if listener.Port < 0 || listener.Port > math.MaxUint16 {
return fmt.Errorf("invalid port on listener %s:%d (must be between 0 and 65535)", listener.Address, listener.Port)
}

for _, network := range req.Networks {
req.NetworkAliases[network] = append(req.NetworkAliases[network], listener.Address)
}
}
return nil
}
return nil
}

// renderNodeConfig renders the redpanda.yaml node config and returns it as
Expand Down
Loading