Skip to content

Commit 688a914

Browse files
Add Kerberos support to beatsauth (#47534) (#47617)
* Add Kerberos support on beatsauth (cherry picked from commit 44078c0) Co-authored-by: Khushi Jain <[email protected]>
1 parent 8234bf5 commit 688a914

File tree

5 files changed

+306
-9
lines changed

5 files changed

+306
-9
lines changed

x-pack/filebeat/docker-compose.yml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ services:
55
image: busybox
66
depends_on:
77
elasticsearch: { condition: service_healthy }
8+
elasticsearch_kerberos.elastic: { condition: service_healthy }
89
kibana: { condition: service_healthy }
910
cometd: { condition: service_healthy }
1011

@@ -19,6 +20,35 @@ services:
1920
ports:
2021
- 9200:9200
2122

23+
elasticsearch_kerberos.elastic:
24+
build: ${ES_BEATS}/testing/environments/docker/elasticsearch_kerberos
25+
healthcheck:
26+
test: bash -c "/healthcheck.sh"
27+
retries: 1200
28+
interval: 5s
29+
start_period: 60s
30+
environment:
31+
- "TERM=linux"
32+
- "ES_JAVA_OPTS=-Xms512m -Xmx512m -Djava.security.krb5.conf=/etc/krb5.conf -Dsun.security.krb5.debug=true"
33+
- "transport.host=127.0.0.1"
34+
- "http.host=0.0.0.0"
35+
- "xpack.license.self_generated.type=trial"
36+
- "xpack.security.enabled=true"
37+
- "indices.id_field_data.enabled=true"
38+
- "xpack.security.audit.enabled=true"
39+
- "xpack.security.authc.realms.kerberos.elastic.order=0"
40+
- "xpack.security.authc.realms.kerberos.elastic.keytab.path=/usr/share/elasticsearch/config/HTTP_localhost.keytab"
41+
- "xpack.security.authc.realms.kerberos.elastic.remove_realm_name=false"
42+
- "xpack.security.authc.realms.kerberos.elastic.krb.debug=true"
43+
volumes:
44+
# This is needed otherwise there won't be enough entropy to generate a new kerberos realm
45+
- /dev/urandom:/dev/random
46+
ports:
47+
- 1088:1088
48+
- 1749:1749
49+
- 9203:9200
50+
command: bash -c "/start.sh"
51+
2252
googlepubsub:
2353
image: docker.elastic.co/integrations-ci/beats-googlepubsub:emulator-${SDK_VERSION:-467.0.0-0}-1
2454
build:

x-pack/filebeat/tests/integration/otel_test.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package integration
99
import (
1010
"bytes"
1111
"context"
12+
"crypto/tls"
1213
"encoding/json"
1314
"fmt"
1415
"net/http"
@@ -37,6 +38,7 @@ import (
3738
"github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat/oteltestcol"
3839
"github.com/elastic/elastic-agent-libs/mapstr"
3940
"github.com/elastic/elastic-agent-libs/testing/estools"
41+
"github.com/elastic/go-elasticsearch/v8"
4042
"github.com/elastic/mock-es/pkg/api"
4143
)
4244

@@ -1120,3 +1122,155 @@ service:
11201122
})
11211123
}
11221124
}
1125+
1126+
func TestFileBeatKerberos(t *testing.T) {
1127+
1128+
wantEvents := 1
1129+
krbURL := "http://localhost:9203" // this is kerberos client - we've hardcoded the URL here
1130+
tempFile := t.TempDir()
1131+
// ES client
1132+
esCfg := elasticsearch.Config{
1133+
Addresses: []string{krbURL},
1134+
Username: "admin",
1135+
Password: "testing",
1136+
Transport: &http.Transport{
1137+
TLSClientConfig: &tls.Config{
1138+
InsecureSkipVerify: true, //nolint:gosec // this is only for testing
1139+
},
1140+
},
1141+
}
1142+
1143+
es, err := elasticsearch.NewClient(esCfg)
1144+
require.NoError(t, err, "could not get elasticsearch client")
1145+
1146+
setupRoleMapping(t, es)
1147+
1148+
namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "")
1149+
filebeatIndex := "logs-filebeat.kerberos-" + namespace
1150+
1151+
otelConfig := struct {
1152+
Index string
1153+
InputFile string
1154+
PathHome string
1155+
Endpoint string
1156+
}{
1157+
Index: filebeatIndex,
1158+
InputFile: filepath.Join(tempFile, "log.log"),
1159+
PathHome: tempFile,
1160+
Endpoint: krbURL,
1161+
}
1162+
1163+
cfg := `receivers:
1164+
filebeatreceiver/filestream:
1165+
filebeat:
1166+
inputs:
1167+
- type: filestream
1168+
id: filestream-fbreceiver
1169+
enabled: true
1170+
paths:
1171+
- {{.InputFile}}
1172+
prospector.scanner.fingerprint.enabled: false
1173+
file_identity.native: ~
1174+
output:
1175+
otelconsumer:
1176+
queue.mem.flush.timeout: 0s
1177+
management.otel.enabled: true
1178+
path.home: {{.PathHome}}
1179+
extensions:
1180+
beatsauth:
1181+
kerberos:
1182+
auth_type: "password"
1183+
config_path: "../../../../libbeat/outputs/elasticsearch/testdata/krb5.conf"
1184+
username: "beats"
1185+
password: "testing"
1186+
realm: "elastic"
1187+
exporters:
1188+
debug:
1189+
use_internal_logger: false
1190+
verbosity: detailed
1191+
elasticsearch/log:
1192+
endpoints:
1193+
- {{.Endpoint}}
1194+
logs_index: {{.Index}}
1195+
mapping:
1196+
mode: bodymap
1197+
auth:
1198+
authenticator: beatsauth
1199+
service:
1200+
extensions:
1201+
- beatsauth
1202+
pipelines:
1203+
logs:
1204+
receivers:
1205+
- filebeatreceiver/filestream
1206+
exporters:
1207+
- elasticsearch/log
1208+
- debug
1209+
`
1210+
1211+
var configBuffer bytes.Buffer
1212+
require.NoError(t,
1213+
template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig))
1214+
configContents := configBuffer.Bytes()
1215+
t.Cleanup(func() {
1216+
if t.Failed() {
1217+
t.Logf("Config contents:\n%s", configContents)
1218+
}
1219+
})
1220+
1221+
writeEventsToLogFile(t, otelConfig.InputFile, wantEvents)
1222+
oteltestcol.New(t, string(configContents))
1223+
1224+
// wait for logs to be published
1225+
require.EventuallyWithT(t,
1226+
func(ct *assert.CollectT) {
1227+
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
1228+
defer findCancel()
1229+
1230+
otelDocs, err := estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+filebeatIndex+"*")
1231+
assert.NoError(ct, err)
1232+
1233+
assert.GreaterOrEqual(ct, otelDocs.Hits.Total.Value, wantEvents, "expected at least %d events, got %d", wantEvents, otelDocs.Hits.Total.Value)
1234+
},
1235+
2*time.Minute, 1*time.Second)
1236+
1237+
}
1238+
1239+
// setupRoleMapping sets up role mapping for the Kerberos user beats@elastic
1240+
func setupRoleMapping(t *testing.T, client *elasticsearch.Client) {
1241+
1242+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1243+
defer cancel()
1244+
1245+
// prepare to query ES
1246+
roleMappingURL := "http://localhost:9203/_security/role_mapping/kerbrolemapping"
1247+
1248+
body := map[string]interface{}{
1249+
"roles": []string{"superuser"},
1250+
"enabled": true,
1251+
"rules": map[string]interface{}{
1252+
"field": map[string]interface{}{
1253+
"username": "beats@elastic",
1254+
},
1255+
},
1256+
}
1257+
1258+
jsonData, err := json.Marshal(body)
1259+
require.NoError(t, err, "could not marshal role mapping body to json")
1260+
1261+
// Build request
1262+
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
1263+
roleMappingURL,
1264+
bytes.NewReader(jsonData))
1265+
require.NoError(t, err, "could not create role mapping request")
1266+
1267+
// Set content type header
1268+
req.Header.Set("Content-Type", "application/json")
1269+
1270+
resp, err := client.Perform(req)
1271+
require.NoError(t, err, "could not perform role mapping request")
1272+
defer resp.Body.Close()
1273+
1274+
require.Equal(t, resp.StatusCode, http.StatusOK, "incorrect response code")
1275+
1276+
}

x-pack/otel/extension/beatsauthextension/authenticator.go

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package beatsauthextension
66

77
import (
88
"context"
9+
"errors"
910
"fmt"
1011
"net/http"
1112
"time"
@@ -17,14 +18,23 @@ import (
1718
"go.opentelemetry.io/collector/extension/extensionauth"
1819
"google.golang.org/grpc/credentials"
1920

21+
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
22+
krbclient "github.com/elastic/gokrb5/v8/client"
23+
krbconfig "github.com/elastic/gokrb5/v8/config"
24+
"github.com/elastic/gokrb5/v8/keytab"
25+
"github.com/elastic/gokrb5/v8/spnego"
26+
2027
"github.com/elastic/elastic-agent-libs/config"
2128
"github.com/elastic/elastic-agent-libs/logp"
2229
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
2330
)
2431

25-
var _ extensionauth.HTTPClient = (*authenticator)(nil)
26-
var _ extensionauth.GRPCClient = (*authenticator)(nil)
27-
var _ extension.Extension = (*authenticator)(nil)
32+
var (
33+
_ extensionauth.HTTPClient = (*authenticator)(nil)
34+
_ extensionauth.GRPCClient = (*authenticator)(nil)
35+
_ extension.Extension = (*authenticator)(nil)
36+
ErrInvalidAuthType = errors.New("invalid authentication type")
37+
)
2838

2939
// roundTripperProvider is an interface that provides a RoundTripper
3040
type roundTripperProvider interface {
@@ -62,7 +72,7 @@ func (a *authenticator) Start(_ context.Context, host component.Host) error {
6272
}
6373

6474
var provider roundTripperProvider
65-
client, err := getHttpClient(a)
75+
prov, err := getHttpClient(a)
6676
if err != nil {
6777
componentstatus.ReportStatus(host, componentstatus.NewPermanentErrorEvent(err))
6878
err = fmt.Errorf("failed creating http client: %w", err)
@@ -73,7 +83,7 @@ func (a *authenticator) Start(_ context.Context, host component.Host) error {
7383
provider = &errorRoundTripperProvider{err: err}
7484
} else {
7585
componentstatus.ReportStatus(host, componentstatus.NewEvent(componentstatus.StatusOK))
76-
provider = &httpClientProvider{client: client}
86+
provider = prov
7787
}
7888

7989
a.rtProvider = provider
@@ -110,24 +120,32 @@ func (a *authenticator) PerRPCCredentials() (credentials.PerRPCCredentials, erro
110120
return nil, nil
111121
}
112122

113-
func getHttpClient(a *authenticator) (*http.Client, error) {
123+
func getHttpClient(a *authenticator) (roundTripperProvider, error) {
114124
parsedCfg, err := config.NewConfigFrom(a.cfg.BeatAuthConfig)
115125
if err != nil {
116126
return nil, fmt.Errorf("failed creating config: %w", err)
117127
}
118128

119-
beatAuthConfig := httpcommon.HTTPTransportSettings{}
129+
beatAuthConfig := esAuthConfig{}
120130
err = parsedCfg.Unpack(&beatAuthConfig)
121131
if err != nil {
122132
return nil, fmt.Errorf("failed unpacking config: %w", err)
123133
}
124134

125-
client, err := beatAuthConfig.Client(a.getHTTPOptions(beatAuthConfig.IdleConnTimeout)...)
135+
client, err := beatAuthConfig.Transport.Client(a.getHTTPOptions(beatAuthConfig.Transport.IdleConnTimeout)...)
126136
if err != nil {
127137
return nil, fmt.Errorf("failed creating http client: %w", err)
128138
}
129139

130-
return client, nil
140+
if beatAuthConfig.Kerberos.IsEnabled() {
141+
p, err := NewKerberosClientProvider(beatAuthConfig.Kerberos, client)
142+
if err != nil {
143+
return nil, fmt.Errorf("error creating kerberos client provider: %w", err)
144+
}
145+
return p, nil
146+
}
147+
148+
return &httpClientProvider{client: client}, nil
131149
}
132150

133151
// httpClientProvider provides a RoundTripper from an http.Client
@@ -139,6 +157,51 @@ func (h *httpClientProvider) RoundTripper() http.RoundTripper {
139157
return h.client.Transport
140158
}
141159

160+
// kerberosClientProvider provides a kerberos enabled roundtripper
161+
type kerberosClientProvider struct {
162+
kerberosClient *krbclient.Client
163+
httpClient *http.Client
164+
}
165+
166+
func NewKerberosClientProvider(config *kerberos.Config, httpClient *http.Client) (*kerberosClientProvider, error) {
167+
var krbClient *krbclient.Client
168+
krbConf, err := krbconfig.Load(config.ConfigPath)
169+
if err != nil {
170+
return nil, fmt.Errorf("error creating Kerberos client: %w", err)
171+
}
172+
173+
switch config.AuthType {
174+
// case 1 is password auth
175+
case 1:
176+
krbClient = krbclient.NewWithPassword(config.Username, config.Realm, config.Password, krbConf)
177+
// case 2 is keytab auth
178+
case 2:
179+
kTab, err := keytab.Load(config.KeyTabPath)
180+
if err != nil {
181+
return nil, fmt.Errorf("cannot load keytab file %s: %w", config.KeyTabPath, err)
182+
}
183+
krbClient = krbclient.NewWithKeytab(config.Username, config.Realm, kTab, krbConf)
184+
default:
185+
return nil, ErrInvalidAuthType
186+
}
187+
188+
return &kerberosClientProvider{kerberosClient: krbClient, httpClient: httpClient}, nil
189+
}
190+
191+
func (k *kerberosClientProvider) RoundTripper() http.RoundTripper {
192+
return k
193+
}
194+
195+
func (k *kerberosClientProvider) RoundTrip(req *http.Request) (*http.Response, error) {
196+
// set appropriate headers on request
197+
err := spnego.SetSPNEGOHeader(k.kerberosClient, req, "")
198+
if err != nil {
199+
return nil, err
200+
}
201+
202+
return k.httpClient.Transport.RoundTrip(req)
203+
}
204+
142205
// errorRoundTripperProvider provides a RoundTripper that always returns an error
143206
type errorRoundTripperProvider struct {
144207
err error

0 commit comments

Comments
 (0)