Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions exporter/dorisexporter/exporter_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,22 @@ func TestPushLogData(t *testing.T) {
_ = exporter.shutdown(ctx)
}()

mux := http.NewServeMux()
mux.HandleFunc("/api/otel/otel_logs/_stream_load", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"Status":"Success"}`))
})

server := &http.Server{
ReadTimeout: 3 * time.Second,
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
}

// Run the server.
serverErr := make(chan error, 1)
go func() {
http.HandleFunc("/api/otel/otel_logs/_stream_load", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"Status":"Success"}`))
})
err = server.ListenAndServe()
assert.Equal(t, http.ErrServerClosed, err)
serverErr <- server.ListenAndServe()
}()

err0 := errors.New("Not Started")
Expand All @@ -67,6 +71,8 @@ func TestPushLogData(t *testing.T) {
require.NoError(t, err0)

_ = server.Shutdown(ctx)
err = <-serverErr
assert.True(t, err == nil || errors.Is(err, http.ErrServerClosed), "unexpected server error: %v", err)
}

func simpleLogs(count int) plog.Logs {
Expand All @@ -78,7 +84,7 @@ func simpleLogs(count int) plog.Logs {
sl.Scope().SetVersion("1.0.0")
sl.Scope().Attributes().PutStr("lib", "doris")
timestamp := time.Now()
for i := 0; i < count; i++ {
for i := range count {
r := sl.LogRecords().AppendEmpty()
r.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
r.SetObservedTimestamp(pcommon.NewTimestampFromTime(timestamp))
Expand Down
36 changes: 20 additions & 16 deletions exporter/dorisexporter/exporter_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ func TestPushMetricData(t *testing.T) {
config := createDefaultConfig().(*Config)
config.Endpoint = fmt.Sprintf("http://127.0.0.1:%d", port)
config.CreateSchema = false

err = config.Validate()
require.NoError(t, err)
require.NoError(t, config.Validate())

exporter := newMetricsExporter(zap.NewNop(), config, componenttest.NewNopTelemetrySettings())

Expand All @@ -43,22 +41,26 @@ func TestPushMetricData(t *testing.T) {
_ = exporter.shutdown(ctx)
}()

mux := http.NewServeMux()
metrics := []string{"gauge", "sum", "histogram", "exponential_histogram", "summary"}
for _, metric := range metrics {
url := fmt.Sprintf("/api/otel/otel_metrics_%s/_stream_load", metric)
mux.HandleFunc(url, func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"Status":"Success"}`))
})
}

server := &http.Server{
ReadTimeout: 3 * time.Second,
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
}

// Run server
serverErr := make(chan error, 1)
go func() {
metrics := []string{"gauge", "sum", "histogram", "exponential_histogram", "summary"}
for _, metric := range metrics {
url := fmt.Sprintf("/api/otel/otel_metrics_%s/_stream_load", metric)
http.HandleFunc(url, func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"Status":"Success"}`))
})
}
err = server.ListenAndServe()
assert.Equal(t, http.ErrServerClosed, err)
serverErr <- server.ListenAndServe()
}()

err0 := errors.New("Not Started")
Expand All @@ -75,6 +77,8 @@ func TestPushMetricData(t *testing.T) {
require.NoError(t, err0)

_ = server.Shutdown(ctx)
err = <-serverErr
assert.True(t, err == nil || errors.Is(err, http.ErrServerClosed), "unexpected server error: %v", err)
}

func simpleMetrics(count int, typeSet map[pmetric.MetricType]struct{}) pmetric.Metrics {
Expand All @@ -88,7 +92,7 @@ func simpleMetrics(count int, typeSet map[pmetric.MetricType]struct{}) pmetric.M
sm.Scope().SetName("Scope name 1")
sm.Scope().SetVersion("Scope version 1")
timestamp := time.Now()
for i := 0; i < count; i++ {
for i := range count {
// gauge
if _, ok := typeSet[pmetric.MetricTypeGauge]; ok {
m := sm.Metrics().AppendEmpty()
Expand Down Expand Up @@ -217,7 +221,7 @@ func simpleMetrics(count int, typeSet map[pmetric.MetricType]struct{}) pmetric.M
sm.Scope().SetDroppedAttributesCount(20)
sm.Scope().SetName("Scope name 2")
sm.Scope().SetVersion("Scope version 2")
for i := 0; i < count; i++ {
for i := range count {
// gauge
if _, ok := typeSet[pmetric.MetricTypeGauge]; ok {
m := sm.Metrics().AppendEmpty()
Expand Down Expand Up @@ -334,7 +338,7 @@ func simpleMetrics(count int, typeSet map[pmetric.MetricType]struct{}) pmetric.M
sm.Scope().SetDroppedAttributesCount(20)
sm.Scope().SetName("Scope name 3")
sm.Scope().SetVersion("Scope version 3")
for i := 0; i < count; i++ {
for i := range count {
// gauge
if _, ok := typeSet[pmetric.MetricTypeGauge]; ok {
m := sm.Metrics().AppendEmpty()
Expand Down
24 changes: 14 additions & 10 deletions exporter/dorisexporter/exporter_traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ func TestPushTraceData(t *testing.T) {
config := createDefaultConfig().(*Config)
config.Endpoint = fmt.Sprintf("http://127.0.0.1:%d", port)
config.CreateSchema = false

err = config.Validate()
require.NoError(t, err)
require.NoError(t, config.Validate())

exporter := newTracesExporter(zap.NewNop(), config, componenttest.NewNopTelemetrySettings())

Expand All @@ -44,18 +42,22 @@ func TestPushTraceData(t *testing.T) {
_ = exporter.shutdown(ctx)
}()

mux := http.NewServeMux()
mux.HandleFunc("/api/otel/otel_traces/_stream_load", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"Status":"Success"}`))
})

server := &http.Server{
ReadTimeout: 3 * time.Second,
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
}

// Start server
serverErr := make(chan error, 1)
go func() {
http.HandleFunc("/api/otel/otel_traces/_stream_load", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"Status":"Success"}`))
})
err = server.ListenAndServe()
assert.Equal(t, http.ErrServerClosed, err)
serverErr <- server.ListenAndServe()
}()

err0 := errors.New("Not Started")
Expand All @@ -66,6 +68,8 @@ func TestPushTraceData(t *testing.T) {
require.NoError(t, err0)

_ = server.Shutdown(ctx)
err = <-serverErr
assert.True(t, err == nil || errors.Is(err, http.ErrServerClosed), "unexpected server error: %v", err)
}

func simpleTraces(count int) ptrace.Traces {
Expand All @@ -81,7 +85,7 @@ func simpleTraces(count int) ptrace.Traces {
ss.Scope().SetDroppedAttributesCount(20)
ss.Scope().Attributes().PutStr("lib", "doris")
timestamp := time.Now()
for i := 0; i < count; i++ {
for i := range count {
s := ss.Spans().AppendEmpty()
s.SetTraceID([16]byte{1, 2, 3, byte(i)})
s.SetSpanID([8]byte{1, 2, 3, byte(i)})
Expand Down