diff --git a/x-pack/filebeat/docker-compose.yml b/x-pack/filebeat/docker-compose.yml index 490e181f4498..74ee4e8c6602 100644 --- a/x-pack/filebeat/docker-compose.yml +++ b/x-pack/filebeat/docker-compose.yml @@ -5,6 +5,7 @@ services: image: busybox depends_on: elasticsearch: { condition: service_healthy } + elasticsearch_kerberos.elastic: { condition: service_healthy } kibana: { condition: service_healthy } cometd: { condition: service_healthy } @@ -19,6 +20,35 @@ services: ports: - 9200:9200 + elasticsearch_kerberos.elastic: + build: ${ES_BEATS}/testing/environments/docker/elasticsearch_kerberos + healthcheck: + test: bash -c "/healthcheck.sh" + retries: 1200 + interval: 5s + start_period: 60s + environment: + - "TERM=linux" + - "ES_JAVA_OPTS=-Xms512m -Xmx512m -Djava.security.krb5.conf=/etc/krb5.conf -Dsun.security.krb5.debug=true" + - "transport.host=127.0.0.1" + - "http.host=0.0.0.0" + - "xpack.license.self_generated.type=trial" + - "xpack.security.enabled=true" + - "indices.id_field_data.enabled=true" + - "xpack.security.audit.enabled=true" + - "xpack.security.authc.realms.kerberos.elastic.order=0" + - "xpack.security.authc.realms.kerberos.elastic.keytab.path=/usr/share/elasticsearch/config/HTTP_localhost.keytab" + - "xpack.security.authc.realms.kerberos.elastic.remove_realm_name=false" + - "xpack.security.authc.realms.kerberos.elastic.krb.debug=true" + volumes: + # This is needed otherwise there won't be enough entropy to generate a new kerberos realm + - /dev/urandom:/dev/random + ports: + - 1088:1088 + - 1749:1749 + - 9203:9200 + command: bash -c "/start.sh" + googlepubsub: image: docker.elastic.co/integrations-ci/beats-googlepubsub:emulator-${SDK_VERSION:-467.0.0-0}-1 build: diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 36f2e84b3950..d0b8794bfb0d 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -9,6 +9,7 @@ package integration import ( "bytes" "context" + "crypto/tls" "encoding/json" "fmt" "net/http" @@ -37,6 +38,7 @@ import ( "github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat/oteltestcol" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/testing/estools" + "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/mock-es/pkg/api" ) @@ -1120,3 +1122,155 @@ service: }) } } + +func TestFileBeatKerberos(t *testing.T) { + + wantEvents := 1 + krbURL := "http://localhost:9203" // this is kerberos client - we've hardcoded the URL here + tempFile := t.TempDir() + // ES client + esCfg := elasticsearch.Config{ + Addresses: []string{krbURL}, + Username: "admin", + Password: "testing", + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, //nolint:gosec // this is only for testing + }, + }, + } + + es, err := elasticsearch.NewClient(esCfg) + require.NoError(t, err, "could not get elasticsearch client") + + setupRoleMapping(t, es) + + namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "") + filebeatIndex := "logs-filebeat.kerberos-" + namespace + + otelConfig := struct { + Index string + InputFile string + PathHome string + Endpoint string + }{ + Index: filebeatIndex, + InputFile: filepath.Join(tempFile, "log.log"), + PathHome: tempFile, + Endpoint: krbURL, + } + + cfg := `receivers: + filebeatreceiver/filestream: + filebeat: + inputs: + - type: filestream + id: filestream-fbreceiver + enabled: true + paths: + - {{.InputFile}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + output: + otelconsumer: + queue.mem.flush.timeout: 0s + management.otel.enabled: true + path.home: {{.PathHome}} +extensions: + beatsauth: + kerberos: + auth_type: "password" + config_path: "../../../../libbeat/outputs/elasticsearch/testdata/krb5.conf" + username: "beats" + password: "testing" + realm: "elastic" +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - {{.Endpoint}} + logs_index: {{.Index}} + mapping: + mode: bodymap + auth: + authenticator: beatsauth +service: + extensions: + - beatsauth + pipelines: + logs: + receivers: + - filebeatreceiver/filestream + exporters: + - elasticsearch/log + - debug +` + + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig)) + configContents := configBuffer.Bytes() + t.Cleanup(func() { + if t.Failed() { + t.Logf("Config contents:\n%s", configContents) + } + }) + + writeEventsToLogFile(t, otelConfig.InputFile, wantEvents) + oteltestcol.New(t, string(configContents)) + + // wait for logs to be published + require.EventuallyWithT(t, + func(ct *assert.CollectT) { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + otelDocs, err := estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+filebeatIndex+"*") + assert.NoError(ct, err) + + assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, wantEvents, "expected at least %d events, got %d", wantEvents, otelDocs.Hits.Total.Value) + }, + 2*time.Minute, 1*time.Second) + +} + +// setupRoleMapping sets up role mapping for the Kerberos user beats@elastic +func setupRoleMapping(t *testing.T, client *elasticsearch.Client) { + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // prepare to query ES + roleMappingURL := "http://localhost:9203/_security/role_mapping/kerbrolemapping" + + body := map[string]interface{}{ + "roles": []string{"superuser"}, + "enabled": true, + "rules": map[string]interface{}{ + "field": map[string]interface{}{ + "username": "beats@elastic", + }, + }, + } + + jsonData, err := json.Marshal(body) + require.NoError(t, err, "could not marshal role mapping body to json") + + // Build request + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + roleMappingURL, + bytes.NewReader(jsonData)) + require.NoError(t, err, "could not create role mapping request") + + // Set content type header + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Perform(req) + require.NoError(t, err, "could not perform role mapping request") + defer resp.Body.Close() + + require.Equal(t, resp.StatusCode, http.StatusOK, "incorrect response code") + +} diff --git a/x-pack/otel/extension/beatsauthextension/authenticator.go b/x-pack/otel/extension/beatsauthextension/authenticator.go index 5c88fbaca07d..bc1226842345 100644 --- a/x-pack/otel/extension/beatsauthextension/authenticator.go +++ b/x-pack/otel/extension/beatsauthextension/authenticator.go @@ -6,6 +6,7 @@ package beatsauthextension import ( "context" + "errors" "fmt" "net/http" "time" @@ -17,14 +18,23 @@ import ( "go.opentelemetry.io/collector/extension/extensionauth" "google.golang.org/grpc/credentials" + "github.com/elastic/beats/v7/libbeat/common/transport/kerberos" + krbclient "github.com/elastic/gokrb5/v8/client" + krbconfig "github.com/elastic/gokrb5/v8/config" + "github.com/elastic/gokrb5/v8/keytab" + "github.com/elastic/gokrb5/v8/spnego" + "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/transport/httpcommon" ) -var _ extensionauth.HTTPClient = (*authenticator)(nil) -var _ extensionauth.GRPCClient = (*authenticator)(nil) -var _ extension.Extension = (*authenticator)(nil) +var ( + _ extensionauth.HTTPClient = (*authenticator)(nil) + _ extensionauth.GRPCClient = (*authenticator)(nil) + _ extension.Extension = (*authenticator)(nil) + ErrInvalidAuthType = errors.New("invalid authentication type") +) // roundTripperProvider is an interface that provides a RoundTripper type roundTripperProvider interface { @@ -62,7 +72,7 @@ func (a *authenticator) Start(_ context.Context, host component.Host) error { } var provider roundTripperProvider - client, err := getHttpClient(a) + prov, err := getHttpClient(a) if err != nil { componentstatus.ReportStatus(host, componentstatus.NewPermanentErrorEvent(err)) err = fmt.Errorf("failed creating http client: %w", err) @@ -73,7 +83,7 @@ func (a *authenticator) Start(_ context.Context, host component.Host) error { provider = &errorRoundTripperProvider{err: err} } else { componentstatus.ReportStatus(host, componentstatus.NewEvent(componentstatus.StatusOK)) - provider = &httpClientProvider{client: client} + provider = prov } a.rtProvider = provider @@ -110,24 +120,32 @@ func (a *authenticator) PerRPCCredentials() (credentials.PerRPCCredentials, erro return nil, nil } -func getHttpClient(a *authenticator) (*http.Client, error) { +func getHttpClient(a *authenticator) (roundTripperProvider, error) { parsedCfg, err := config.NewConfigFrom(a.cfg.BeatAuthConfig) if err != nil { return nil, fmt.Errorf("failed creating config: %w", err) } - beatAuthConfig := httpcommon.HTTPTransportSettings{} + beatAuthConfig := esAuthConfig{} err = parsedCfg.Unpack(&beatAuthConfig) if err != nil { return nil, fmt.Errorf("failed unpacking config: %w", err) } - client, err := beatAuthConfig.Client(a.getHTTPOptions(beatAuthConfig.IdleConnTimeout)...) + client, err := beatAuthConfig.Transport.Client(a.getHTTPOptions(beatAuthConfig.Transport.IdleConnTimeout)...) if err != nil { return nil, fmt.Errorf("failed creating http client: %w", err) } - return client, nil + if beatAuthConfig.Kerberos.IsEnabled() { + p, err := NewKerberosClientProvider(beatAuthConfig.Kerberos, client) + if err != nil { + return nil, fmt.Errorf("error creating kerberos client provider: %w", err) + } + return p, nil + } + + return &httpClientProvider{client: client}, nil } // httpClientProvider provides a RoundTripper from an http.Client @@ -139,6 +157,51 @@ func (h *httpClientProvider) RoundTripper() http.RoundTripper { return h.client.Transport } +// kerberosClientProvider provides a kerberos enabled roundtripper +type kerberosClientProvider struct { + kerberosClient *krbclient.Client + httpClient *http.Client +} + +func NewKerberosClientProvider(config *kerberos.Config, httpClient *http.Client) (*kerberosClientProvider, error) { + var krbClient *krbclient.Client + krbConf, err := krbconfig.Load(config.ConfigPath) + if err != nil { + return nil, fmt.Errorf("error creating Kerberos client: %w", err) + } + + switch config.AuthType { + // case 1 is password auth + case 1: + krbClient = krbclient.NewWithPassword(config.Username, config.Realm, config.Password, krbConf) + // case 2 is keytab auth + case 2: + kTab, err := keytab.Load(config.KeyTabPath) + if err != nil { + return nil, fmt.Errorf("cannot load keytab file %s: %w", config.KeyTabPath, err) + } + krbClient = krbclient.NewWithKeytab(config.Username, config.Realm, kTab, krbConf) + default: + return nil, ErrInvalidAuthType + } + + return &kerberosClientProvider{kerberosClient: krbClient, httpClient: httpClient}, nil +} + +func (k *kerberosClientProvider) RoundTripper() http.RoundTripper { + return k +} + +func (k *kerberosClientProvider) RoundTrip(req *http.Request) (*http.Response, error) { + // set appropriate headers on request + err := spnego.SetSPNEGOHeader(k.kerberosClient, req, "") + if err != nil { + return nil, err + } + + return k.httpClient.Transport.RoundTrip(req) +} + // errorRoundTripperProvider provides a RoundTripper that always returns an error type errorRoundTripperProvider struct { err error diff --git a/x-pack/otel/extension/beatsauthextension/authenticator_test.go b/x-pack/otel/extension/beatsauthextension/authenticator_test.go index 7bd5932b1c08..2793a670b4e7 100644 --- a/x-pack/otel/extension/beatsauthextension/authenticator_test.go +++ b/x-pack/otel/extension/beatsauthextension/authenticator_test.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configoptional" "go.uber.org/goleak" + "go.uber.org/zap/zaptest" "github.com/elastic/elastic-agent-libs/transport/tlscommontest" ) @@ -136,11 +137,49 @@ func TestAuthenticator(t *testing.T) { skipStart: true, testRoundTripperPreStart: true, }, + { + name: "invalid kerberos auth type - continueOnError true", + setupConfig: func(t *testing.T) *Config { + return &Config{ + BeatAuthConfig: map[string]any{ + "kerberos": map[string]any{ + "auth_type": "invalid_auth_type", + }, + }, + ContinueOnError: true, + } + }, + expectStartError: false, + expectStatus: componentstatus.StatusPermanentError, + expectHTTPClientType: "errorRoundTripperProvider", + testRoundTripError: true, + }, + { + name: "valid kerberos config", + setupConfig: func(t *testing.T) *Config { + return &Config{ + BeatAuthConfig: map[string]any{ + "kerberos": map[string]any{ + "auth_type": "password", + "config_path": "../../../../libbeat/outputs/elasticsearch/testdata/krb5.conf", + "username": "user", + "password": "pass", + "realm": "elastic", + }, + }, + ContinueOnError: true, + } + }, + expectStartError: false, + expectStatus: componentstatus.StatusOK, + expectHTTPClientType: "kerberosClientProvider", + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { settings := componenttest.NewNopTelemetrySettings() + settings.Logger = zaptest.NewLogger(t) cfg := tc.setupConfig(t) auth, err := newAuthenticator(cfg, settings) @@ -188,6 +227,9 @@ func TestAuthenticator(t *testing.T) { case "errorRoundTripperProvider": _, ok := (auth.rtProvider).(*errorRoundTripperProvider) require.True(t, ok, "Provider should be an errorRoundTripperProvider") + case "kerberosClientProvider": + _, ok := (auth.rtProvider).(*kerberosClientProvider) + require.True(t, ok, "Provider should be a kerberosClientProvider") } rt, err := auth.RoundTripper(nil) diff --git a/x-pack/otel/extension/beatsauthextension/config.go b/x-pack/otel/extension/beatsauthextension/config.go index e12d2ad8332f..646ee96dd3df 100644 --- a/x-pack/otel/extension/beatsauthextension/config.go +++ b/x-pack/otel/extension/beatsauthextension/config.go @@ -6,6 +6,9 @@ package beatsauthextension import ( "go.opentelemetry.io/collector/component" + + "github.com/elastic/beats/v7/libbeat/common/transport/kerberos" + "github.com/elastic/elastic-agent-libs/transport/httpcommon" ) type Config struct { @@ -13,6 +16,11 @@ type Config struct { ContinueOnError bool `mapstructure:"continue_on_error"` } +type esAuthConfig struct { + Kerberos *kerberos.Config `config:"kerberos"` + Transport httpcommon.HTTPTransportSettings `config:",inline"` +} + func createDefaultConfig() component.Config { return &Config{} }