Skip to content

Commit c9cc194

Browse files
Add Kerberos support to beatsauth (#47534) (#47615)
* Add Kerberos support on beatsauth (cherry picked from commit 44078c0) Co-authored-by: Khushi Jain <[email protected]>
1 parent be65bd6 commit c9cc194

File tree

5 files changed

+306
-9
lines changed

5 files changed

+306
-9
lines changed

x-pack/filebeat/docker-compose.yml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ services:
66
image: busybox
77
depends_on:
88
elasticsearch: { condition: service_healthy }
9+
elasticsearch_kerberos.elastic: { condition: service_healthy }
910
kibana: { condition: service_healthy }
1011
cometd: { condition: service_healthy }
1112

@@ -20,6 +21,35 @@ services:
2021
ports:
2122
- 9200:9200
2223

24+
elasticsearch_kerberos.elastic:
25+
build: ${ES_BEATS}/testing/environments/docker/elasticsearch_kerberos
26+
healthcheck:
27+
test: bash -c "/healthcheck.sh"
28+
retries: 1200
29+
interval: 5s
30+
start_period: 60s
31+
environment:
32+
- "TERM=linux"
33+
- "ES_JAVA_OPTS=-Xms512m -Xmx512m -Djava.security.krb5.conf=/etc/krb5.conf -Dsun.security.krb5.debug=true"
34+
- "transport.host=127.0.0.1"
35+
- "http.host=0.0.0.0"
36+
- "xpack.license.self_generated.type=trial"
37+
- "xpack.security.enabled=true"
38+
- "indices.id_field_data.enabled=true"
39+
- "xpack.security.audit.enabled=true"
40+
- "xpack.security.authc.realms.kerberos.elastic.order=0"
41+
- "xpack.security.authc.realms.kerberos.elastic.keytab.path=/usr/share/elasticsearch/config/HTTP_localhost.keytab"
42+
- "xpack.security.authc.realms.kerberos.elastic.remove_realm_name=false"
43+
- "xpack.security.authc.realms.kerberos.elastic.krb.debug=true"
44+
volumes:
45+
# This is needed otherwise there won't be enough entropy to generate a new kerberos realm
46+
- /dev/urandom:/dev/random
47+
ports:
48+
- 1088:1088
49+
- 1749:1749
50+
- 9203:9200
51+
command: bash -c "/start.sh"
52+
2353
googlepubsub:
2454
image: docker.elastic.co/integrations-ci/beats-googlepubsub:emulator-${SDK_VERSION:-467.0.0-0}-1
2555
build:

x-pack/filebeat/tests/integration/otel_test.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package integration
99
import (
1010
"bytes"
1111
"context"
12+
"crypto/tls"
1213
"encoding/json"
1314
"fmt"
1415
"net/http"
@@ -37,6 +38,7 @@ import (
3738
"github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat/oteltestcol"
3839
"github.com/elastic/elastic-agent-libs/mapstr"
3940
"github.com/elastic/elastic-agent-libs/testing/estools"
41+
"github.com/elastic/go-elasticsearch/v8"
4042
"github.com/elastic/mock-es/pkg/api"
4143
)
4244

@@ -1009,3 +1011,155 @@ service:
10091011
})
10101012
}
10111013
}
1014+
1015+
func TestFileBeatKerberos(t *testing.T) {
1016+
1017+
wantEvents := 1
1018+
krbURL := "http://localhost:9203" // this is kerberos client - we've hardcoded the URL here
1019+
tempFile := t.TempDir()
1020+
// ES client
1021+
esCfg := elasticsearch.Config{
1022+
Addresses: []string{krbURL},
1023+
Username: "admin",
1024+
Password: "testing",
1025+
Transport: &http.Transport{
1026+
TLSClientConfig: &tls.Config{
1027+
InsecureSkipVerify: true, //nolint:gosec // this is only for testing
1028+
},
1029+
},
1030+
}
1031+
1032+
es, err := elasticsearch.NewClient(esCfg)
1033+
require.NoError(t, err, "could not get elasticsearch client")
1034+
1035+
setupRoleMapping(t, es)
1036+
1037+
namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "")
1038+
filebeatIndex := "logs-filebeat.kerberos-" + namespace
1039+
1040+
otelConfig := struct {
1041+
Index string
1042+
InputFile string
1043+
PathHome string
1044+
Endpoint string
1045+
}{
1046+
Index: filebeatIndex,
1047+
InputFile: filepath.Join(tempFile, "log.log"),
1048+
PathHome: tempFile,
1049+
Endpoint: krbURL,
1050+
}
1051+
1052+
cfg := `receivers:
1053+
filebeatreceiver/filestream:
1054+
filebeat:
1055+
inputs:
1056+
- type: filestream
1057+
id: filestream-fbreceiver
1058+
enabled: true
1059+
paths:
1060+
- {{.InputFile}}
1061+
prospector.scanner.fingerprint.enabled: false
1062+
file_identity.native: ~
1063+
output:
1064+
otelconsumer:
1065+
queue.mem.flush.timeout: 0s
1066+
management.otel.enabled: true
1067+
path.home: {{.PathHome}}
1068+
extensions:
1069+
beatsauth:
1070+
kerberos:
1071+
auth_type: "password"
1072+
config_path: "../../../../libbeat/outputs/elasticsearch/testdata/krb5.conf"
1073+
username: "beats"
1074+
password: "testing"
1075+
realm: "elastic"
1076+
exporters:
1077+
debug:
1078+
use_internal_logger: false
1079+
verbosity: detailed
1080+
elasticsearch/log:
1081+
endpoints:
1082+
- {{.Endpoint}}
1083+
logs_index: {{.Index}}
1084+
mapping:
1085+
mode: bodymap
1086+
auth:
1087+
authenticator: beatsauth
1088+
service:
1089+
extensions:
1090+
- beatsauth
1091+
pipelines:
1092+
logs:
1093+
receivers:
1094+
- filebeatreceiver/filestream
1095+
exporters:
1096+
- elasticsearch/log
1097+
- debug
1098+
`
1099+
1100+
var configBuffer bytes.Buffer
1101+
require.NoError(t,
1102+
template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig))
1103+
configContents := configBuffer.Bytes()
1104+
t.Cleanup(func() {
1105+
if t.Failed() {
1106+
t.Logf("Config contents:\n%s", configContents)
1107+
}
1108+
})
1109+
1110+
writeEventsToLogFile(t, otelConfig.InputFile, wantEvents)
1111+
oteltestcol.New(t, string(configContents))
1112+
1113+
// wait for logs to be published
1114+
require.EventuallyWithT(t,
1115+
func(ct *assert.CollectT) {
1116+
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
1117+
defer findCancel()
1118+
1119+
otelDocs, err := estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+filebeatIndex+"*")
1120+
assert.NoError(ct, err)
1121+
1122+
assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, wantEvents, "expected at least %d events, got %d", wantEvents, otelDocs.Hits.Total.Value)
1123+
},
1124+
2*time.Minute, 1*time.Second)
1125+
1126+
}
1127+
1128+
// setupRoleMapping sets up role mapping for the Kerberos user beats@elastic
1129+
func setupRoleMapping(t *testing.T, client *elasticsearch.Client) {
1130+
1131+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1132+
defer cancel()
1133+
1134+
// prepare to query ES
1135+
roleMappingURL := "http://localhost:9203/_security/role_mapping/kerbrolemapping"
1136+
1137+
body := map[string]interface{}{
1138+
"roles": []string{"superuser"},
1139+
"enabled": true,
1140+
"rules": map[string]interface{}{
1141+
"field": map[string]interface{}{
1142+
"username": "beats@elastic",
1143+
},
1144+
},
1145+
}
1146+
1147+
jsonData, err := json.Marshal(body)
1148+
require.NoError(t, err, "could not marshal role mapping body to json")
1149+
1150+
// Build request
1151+
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
1152+
roleMappingURL,
1153+
bytes.NewReader(jsonData))
1154+
require.NoError(t, err, "could not create role mapping request")
1155+
1156+
// Set content type header
1157+
req.Header.Set("Content-Type", "application/json")
1158+
1159+
resp, err := client.Perform(req)
1160+
require.NoError(t, err, "could not perform role mapping request")
1161+
defer resp.Body.Close()
1162+
1163+
require.Equal(t, resp.StatusCode, http.StatusOK, "incorrect response code")
1164+
1165+
}

x-pack/otel/extension/beatsauthextension/authenticator.go

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package beatsauthextension
66

77
import (
88
"context"
9+
"errors"
910
"fmt"
1011
"net/http"
1112
"time"
@@ -17,14 +18,23 @@ import (
1718
"go.opentelemetry.io/collector/extension/extensionauth"
1819
"google.golang.org/grpc/credentials"
1920

21+
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
22+
krbclient "github.com/elastic/gokrb5/v8/client"
23+
krbconfig "github.com/elastic/gokrb5/v8/config"
24+
"github.com/elastic/gokrb5/v8/keytab"
25+
"github.com/elastic/gokrb5/v8/spnego"
26+
2027
"github.com/elastic/elastic-agent-libs/config"
2128
"github.com/elastic/elastic-agent-libs/logp"
2229
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
2330
)
2431

25-
var _ extensionauth.HTTPClient = (*authenticator)(nil)
26-
var _ extensionauth.GRPCClient = (*authenticator)(nil)
27-
var _ extension.Extension = (*authenticator)(nil)
32+
var (
33+
_ extensionauth.HTTPClient = (*authenticator)(nil)
34+
_ extensionauth.GRPCClient = (*authenticator)(nil)
35+
_ extension.Extension = (*authenticator)(nil)
36+
ErrInvalidAuthType = errors.New("invalid authentication type")
37+
)
2838

2939
// roundTripperProvider is an interface that provides a RoundTripper
3040
type roundTripperProvider interface {
@@ -62,7 +72,7 @@ func (a *authenticator) Start(_ context.Context, host component.Host) error {
6272
}
6373

6474
var provider roundTripperProvider
65-
client, err := getHttpClient(a)
75+
prov, err := getHttpClient(a)
6676
if err != nil {
6777
componentstatus.ReportStatus(host, componentstatus.NewPermanentErrorEvent(err))
6878
err = fmt.Errorf("failed creating http client: %w", err)
@@ -73,7 +83,7 @@ func (a *authenticator) Start(_ context.Context, host component.Host) error {
7383
provider = &errorRoundTripperProvider{err: err}
7484
} else {
7585
componentstatus.ReportStatus(host, componentstatus.NewEvent(componentstatus.StatusOK))
76-
provider = &httpClientProvider{client: client}
86+
provider = prov
7787
}
7888

7989
a.rtProvider = provider
@@ -110,24 +120,32 @@ func (a *authenticator) PerRPCCredentials() (credentials.PerRPCCredentials, erro
110120
return nil, nil
111121
}
112122

113-
func getHttpClient(a *authenticator) (*http.Client, error) {
123+
func getHttpClient(a *authenticator) (roundTripperProvider, error) {
114124
parsedCfg, err := config.NewConfigFrom(a.cfg.BeatAuthConfig)
115125
if err != nil {
116126
return nil, fmt.Errorf("failed creating config: %w", err)
117127
}
118128

119-
beatAuthConfig := httpcommon.HTTPTransportSettings{}
129+
beatAuthConfig := esAuthConfig{}
120130
err = parsedCfg.Unpack(&beatAuthConfig)
121131
if err != nil {
122132
return nil, fmt.Errorf("failed unpacking config: %w", err)
123133
}
124134

125-
client, err := beatAuthConfig.Client(a.getHTTPOptions(beatAuthConfig.IdleConnTimeout)...)
135+
client, err := beatAuthConfig.Transport.Client(a.getHTTPOptions(beatAuthConfig.Transport.IdleConnTimeout)...)
126136
if err != nil {
127137
return nil, fmt.Errorf("failed creating http client: %w", err)
128138
}
129139

130-
return client, nil
140+
if beatAuthConfig.Kerberos.IsEnabled() {
141+
p, err := NewKerberosClientProvider(beatAuthConfig.Kerberos, client)
142+
if err != nil {
143+
return nil, fmt.Errorf("error creating kerberos client provider: %w", err)
144+
}
145+
return p, nil
146+
}
147+
148+
return &httpClientProvider{client: client}, nil
131149
}
132150

133151
// httpClientProvider provides a RoundTripper from an http.Client
@@ -139,6 +157,51 @@ func (h *httpClientProvider) RoundTripper() http.RoundTripper {
139157
return h.client.Transport
140158
}
141159

160+
// kerberosClientProvider provides a kerberos enabled roundtripper
161+
type kerberosClientProvider struct {
162+
kerberosClient *krbclient.Client
163+
httpClient *http.Client
164+
}
165+
166+
func NewKerberosClientProvider(config *kerberos.Config, httpClient *http.Client) (*kerberosClientProvider, error) {
167+
var krbClient *krbclient.Client
168+
krbConf, err := krbconfig.Load(config.ConfigPath)
169+
if err != nil {
170+
return nil, fmt.Errorf("error creating Kerberos client: %w", err)
171+
}
172+
173+
switch config.AuthType {
174+
// case 1 is password auth
175+
case 1:
176+
krbClient = krbclient.NewWithPassword(config.Username, config.Realm, config.Password, krbConf)
177+
// case 2 is keytab auth
178+
case 2:
179+
kTab, err := keytab.Load(config.KeyTabPath)
180+
if err != nil {
181+
return nil, fmt.Errorf("cannot load keytab file %s: %w", config.KeyTabPath, err)
182+
}
183+
krbClient = krbclient.NewWithKeytab(config.Username, config.Realm, kTab, krbConf)
184+
default:
185+
return nil, ErrInvalidAuthType
186+
}
187+
188+
return &kerberosClientProvider{kerberosClient: krbClient, httpClient: httpClient}, nil
189+
}
190+
191+
func (k *kerberosClientProvider) RoundTripper() http.RoundTripper {
192+
return k
193+
}
194+
195+
func (k *kerberosClientProvider) RoundTrip(req *http.Request) (*http.Response, error) {
196+
// set appropriate headers on request
197+
err := spnego.SetSPNEGOHeader(k.kerberosClient, req, "")
198+
if err != nil {
199+
return nil, err
200+
}
201+
202+
return k.httpClient.Transport.RoundTrip(req)
203+
}
204+
142205
// errorRoundTripperProvider provides a RoundTripper that always returns an error
143206
type errorRoundTripperProvider struct {
144207
err error

0 commit comments

Comments
 (0)