Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions x-pack/filebeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,35 @@ services:
ports:
- 9200:9200

elasticsearch_kerberos.elastic:
build: ${ES_BEATS}/testing/environments/docker/elasticsearch_kerberos
healthcheck:
test: bash -c "/healthcheck.sh"
retries: 1200
interval: 5s
start_period: 60s
environment:
- "TERM=linux"
- "ES_JAVA_OPTS=-Xms512m -Xmx512m -Djava.security.krb5.conf=/etc/krb5.conf -Dsun.security.krb5.debug=true"
- "transport.host=127.0.0.1"
- "http.host=0.0.0.0"
- "xpack.license.self_generated.type=trial"
- "xpack.security.enabled=true"
- "indices.id_field_data.enabled=true"
- "xpack.security.audit.enabled=true"
- "xpack.security.authc.realms.kerberos.elastic.order=0"
- "xpack.security.authc.realms.kerberos.elastic.keytab.path=/usr/share/elasticsearch/config/HTTP_localhost.keytab"
- "xpack.security.authc.realms.kerberos.elastic.remove_realm_name=false"
- "xpack.security.authc.realms.kerberos.elastic.krb.debug=true"
volumes:
# This is needed otherwise there won't be enough entropy to generate a new kerberos realm
- /dev/urandom:/dev/random
ports:
- 1088:1088
- 1749:1749
- 9203:9200
command: bash -c "/start.sh"

googlepubsub:
image: docker.elastic.co/integrations-ci/beats-googlepubsub:emulator-${SDK_VERSION:-467.0.0-0}-1
build:
Expand Down
176 changes: 176 additions & 0 deletions x-pack/filebeat/tests/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package integration
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -36,6 +37,7 @@ import (
"github.com/elastic/beats/v7/libbeat/tests/integration"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/testing/estools"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/mock-es/pkg/api"
)

Expand Down Expand Up @@ -996,3 +998,177 @@ http.port: {{.MonitoringPort}}
})
}
}

func TestFileBeatKerberos(t *testing.T) {

wantEvents := 1

// ES client
esCfg := elasticsearch.Config{
Addresses: []string{"http://localhost:9203"}, // this is kerberos client - we've hardcoded the URL here
Username: "admin",
Password: "testing",
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true, //nolint:gosec // this is only for testing
},
},
}

es, err := elasticsearch.NewClient(esCfg)
if err != nil {
t.Fatalf("could not get elasticsearch client due to: %v", err)
}

err = setupRoleMapping(t, es)
if err != nil {
t.Fatal(err)
}

// start filebeat in otel mode
filebeatOTel := integration.NewBeat(
t,
"filebeat-otel",
"../../filebeat.test",
"otel",
)

namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "")
filebeatIndex := "logs-filebeat.kerberos-" + namespace

otelConfig := struct {
Index string
InputFile string
PathHome string
}{
Index: filebeatIndex,
InputFile: filepath.Join(filebeatOTel.TempDir(), "log.log"),
PathHome: filebeatOTel.TempDir(),
}

cfg := `receivers:
filebeatreceiver/filestream:
filebeat:
inputs:
- type: filestream
id: filestream-fbreceiver
enabled: true
paths:
- {{.InputFile}}
prospector.scanner.fingerprint.enabled: false
file_identity.native: ~
output:
otelconsumer:
queue.mem.flush.timeout: 0s
management.otel.enabled: true
path.home: {{.PathHome}}
extensions:
beatsauth:
kerberos:
auth_type: "password"
config_path: "../../../../libbeat/outputs/elasticsearch/testdata/krb5.conf"
username: "beats"
password: "testing"
realm: "elastic"
exporters:
debug:
use_internal_logger: false
verbosity: detailed
elasticsearch/log:
endpoints:
- http://localhost:9203
logs_index: {{.Index}}
mapping:
mode: bodymap
auth:
authenticator: beatsauth
service:
extensions:
- beatsauth
pipelines:
logs:
receivers:
- filebeatreceiver/filestream
exporters:
- elasticsearch/log
- debug
`

var configBuffer bytes.Buffer
require.NoError(t,
template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig))
configContents := configBuffer.Bytes()
t.Cleanup(func() {
if t.Failed() {
t.Logf("Config contents:\n%s", configContents)
}
})

filebeatOTel.WriteConfigFile(string(configContents))
writeEventsToLogFile(t, otelConfig.InputFile, wantEvents)
filebeatOTel.Start()
defer filebeatOTel.Stop()

// wait for logs to be published
require.EventuallyWithT(t,
func(ct *assert.CollectT) {
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer findCancel()

otelDocs, err := estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+filebeatIndex+"*")
assert.NoError(ct, err)

assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, wantEvents, "expected at least %d events, got %d", wantEvents, otelDocs.Hits.Total.Value)
},
2*time.Minute, 1*time.Second)

}

// setupRoleMapping sets up role mapping for the Kerberos user beats@elastic
func setupRoleMapping(t *testing.T, client *elasticsearch.Client) error {

// Create a context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// prepare to query ES
roleMappingURL := "http://localhost:9203/_security/role_mapping/kerbrolemapping"

body := map[string]interface{}{
"roles": []string{"superuser"},
"enabled": true,
"rules": map[string]interface{}{
"field": map[string]interface{}{
"username": "beats@elastic",
},
},
}

jsonData, err := json.Marshal(body)
if err != nil {
t.Fatalf("error marshalling json body:%v", err)
}

// Build request
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
roleMappingURL,
bytes.NewReader(jsonData))
if err != nil {
return fmt.Errorf("could not create http request to ES server: %w", err)
}

// Set content type header
req.Header.Set("Content-Type", "application/json")

resp, err := client.Perform(req)
if err != nil {
return fmt.Errorf("error performing request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("incorrect response code: %w", err)
}

return err
}
80 changes: 71 additions & 9 deletions x-pack/otel/extension/beatsauthextension/authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package beatsauthextension

import (
"context"
"errors"
"fmt"
"net/http"
"time"
Expand All @@ -17,14 +18,22 @@ import (
"go.opentelemetry.io/collector/extension/extensionauth"
"google.golang.org/grpc/credentials"

"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
krbclient "github.com/elastic/gokrb5/v8/client"
krbconfig "github.com/elastic/gokrb5/v8/config"
"github.com/elastic/gokrb5/v8/keytab"
"github.com/elastic/gokrb5/v8/spnego"
)

var _ extensionauth.HTTPClient = (*authenticator)(nil)
var _ extensionauth.GRPCClient = (*authenticator)(nil)
var _ extension.Extension = (*authenticator)(nil)
var (
_ extensionauth.HTTPClient = (*authenticator)(nil)
_ extensionauth.GRPCClient = (*authenticator)(nil)
_ extension.Extension = (*authenticator)(nil)
ErrInvalidAuthType = errors.New("invalid authentication type")
)

// roundTripperProvider is an interface that provides a RoundTripper
type roundTripperProvider interface {
Expand Down Expand Up @@ -62,7 +71,7 @@ func (a *authenticator) Start(_ context.Context, host component.Host) error {
}

var provider roundTripperProvider
client, err := getHttpClient(a)
prov, err := getHttpClient(a)
if err != nil {
componentstatus.ReportStatus(host, componentstatus.NewPermanentErrorEvent(err))
err = fmt.Errorf("failed creating http client: %w", err)
Expand All @@ -73,7 +82,7 @@ func (a *authenticator) Start(_ context.Context, host component.Host) error {
provider = &errorRoundTripperProvider{err: err}
} else {
componentstatus.ReportStatus(host, componentstatus.NewEvent(componentstatus.StatusOK))
provider = &httpClientProvider{client: client}
provider = prov
}

a.rtProvider = provider
Expand Down Expand Up @@ -110,24 +119,32 @@ func (a *authenticator) PerRPCCredentials() (credentials.PerRPCCredentials, erro
return nil, nil
}

func getHttpClient(a *authenticator) (*http.Client, error) {
func getHttpClient(a *authenticator) (roundTripperProvider, error) {
parsedCfg, err := config.NewConfigFrom(a.cfg.BeatAuthConfig)
if err != nil {
return nil, fmt.Errorf("failed creating config: %w", err)
}

beatAuthConfig := httpcommon.HTTPTransportSettings{}
beatAuthConfig := esAuthConfig{}
err = parsedCfg.Unpack(&beatAuthConfig)
if err != nil {
return nil, fmt.Errorf("failed unpacking config: %w", err)
}

client, err := beatAuthConfig.Client(a.getHTTPOptions(beatAuthConfig.IdleConnTimeout)...)
client, err := beatAuthConfig.Transport.Client(a.getHTTPOptions(beatAuthConfig.Transport.IdleConnTimeout)...)
if err != nil {
return nil, fmt.Errorf("failed creating http client: %w", err)
}

return client, nil
if beatAuthConfig.Kerberos.IsEnabled() {
p, err := NewKerberosClientProvider(beatAuthConfig.Kerberos, client)
if err != nil {
return nil, fmt.Errorf("error creating kerberos client provider: %w", err)
}
return p, nil
}

return &httpClientProvider{client: client}, nil
}

// httpClientProvider provides a RoundTripper from an http.Client
Expand All @@ -139,6 +156,51 @@ func (h *httpClientProvider) RoundTripper() http.RoundTripper {
return h.client.Transport
}

// kerberosClientProvider provides a kerberos enabled roundtripper
type kerberosClientProvider struct {
kerberosClient *krbclient.Client
httpClient *http.Client
}

func NewKerberosClientProvider(config *kerberos.Config, httpClient *http.Client) (*kerberosClientProvider, error) {
var krbClient *krbclient.Client
krbConf, err := krbconfig.Load(config.ConfigPath)
if err != nil {
return nil, fmt.Errorf("error creating Kerberos client: %w", err)
}

switch config.AuthType {
// case 1 is password auth
case 1:
krbClient = krbclient.NewWithPassword(config.Username, config.Realm, config.Password, krbConf)
// case 2 is keytab auth
case 2:
kTab, err := keytab.Load(config.KeyTabPath)
if err != nil {
return nil, fmt.Errorf("cannot load keytab file %s: %w", config.KeyTabPath, err)
}
krbClient = krbclient.NewWithKeytab(config.Username, config.Realm, kTab, krbConf)
default:
return nil, ErrInvalidAuthType
}

return &kerberosClientProvider{kerberosClient: krbClient, httpClient: httpClient}, nil
}

func (k *kerberosClientProvider) RoundTripper() http.RoundTripper {
return k
}

func (k *kerberosClientProvider) RoundTrip(req *http.Request) (*http.Response, error) {
// set appropriate headers on request
err := spnego.SetSPNEGOHeader(k.kerberosClient, req, "")
if err != nil {
return nil, err
}

return k.httpClient.Transport.RoundTrip(req)
}

// errorRoundTripperProvider provides a RoundTripper that always returns an error
type errorRoundTripperProvider struct {
err error
Expand Down
7 changes: 7 additions & 0 deletions x-pack/otel/extension/beatsauthextension/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

package beatsauthextension

import (

Check failure on line 7 in x-pack/otel/extension/beatsauthextension/config.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

File is not properly formatted (goimports)
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
"go.opentelemetry.io/collector/component"
)

Expand All @@ -13,6 +15,11 @@
ContinueOnError bool `mapstructure:"continue_on_error"`
}

type esAuthConfig struct {
Kerberos *kerberos.Config `config:"kerberos"`
Transport httpcommon.HTTPTransportSettings `config:",inline"`
}

func createDefaultConfig() component.Config {
return &Config{}
}
Loading