Skip to content

Commit b4b45e6

Browse files
committed
improve kafka record validation
add kafka key validation add webui support for avro
1 parent 069eb72 commit b4b45e6

File tree

14 files changed

+169
-79
lines changed

14 files changed

+169
-79
lines changed

js/kafka/kafka.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,15 @@ func (m *Module) Produce(v goja.Value) interface{} {
7171
}
7272

7373
func (m *Module) ProduceAsync(v goja.Value) interface{} {
74-
p, resolve, _ := m.rt.NewPromise()
74+
p, resolve, reject := m.rt.NewPromise()
7575
go func() {
76+
defer func() {
77+
r := recover()
78+
if r != nil {
79+
reject(r)
80+
}
81+
}()
82+
7683
result := m.Produce(v)
7784
m.loop.Run(func(vm *goja.Runtime) {
7885
resolve(result)

kafka/pagebuffer.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,14 @@ func NewBytes(b []byte) Bytes {
3838
}
3939

4040
func BytesToString(bytes Bytes) string {
41+
return string(Read(bytes))
42+
}
43+
44+
func Read(bytes Bytes) []byte {
4145
bytes.Seek(0, io.SeekStart)
4246
b := make([]byte, bytes.Len())
4347
bytes.Read(b)
44-
return string(b)
48+
return b
4549
}
4650

4751
func (b *bytesReader) Close() error { return nil }
Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package store
22

33
import (
4+
"encoding/json"
45
"mokapi/kafka"
56
"mokapi/runtime/events"
67
)
78

8-
type LogRecord func(record *kafka.Record, partition int, traits events.Traits)
9+
type LogRecord func(key, payload interface{}, headers []kafka.RecordHeader, partition int, offset int64, traits events.Traits)
910

1011
type KafkaLog struct {
1112
Offset int64 `json:"offset"`
@@ -15,16 +16,28 @@ type KafkaLog struct {
1516
Headers map[string]string `json:"headers"`
1617
}
1718

18-
func NewKafkaLog(record *kafka.Record, partition int) *KafkaLog {
19+
func NewKafkaLog(key, payload interface{}, headers []kafka.RecordHeader, partition int, offset int64) *KafkaLog {
1920
log := &KafkaLog{
20-
Offset: record.Offset,
21-
Key: kafka.BytesToString(record.Key),
22-
Message: kafka.BytesToString(record.Value),
21+
Offset: offset,
22+
Key: toString(key),
23+
Message: toString(payload),
2324
Partition: partition,
2425
Headers: make(map[string]string),
2526
}
26-
for _, h := range record.Headers {
27+
for _, h := range headers {
2728
log.Headers[h.Key] = string(h.Value)
2829
}
2930
return log
3031
}
32+
33+
func toString(v interface{}) string {
34+
switch val := v.(type) {
35+
case string:
36+
return val
37+
case kafka.Bytes:
38+
return kafka.BytesToString(val)
39+
default:
40+
b, _ := json.Marshal(val)
41+
return string(b)
42+
}
43+
}

providers/asyncapi3/kafka/store/partition.go

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -105,45 +105,53 @@ func (p *Partition) Write(batch kafka.RecordBatch, options ...WriteOptions) (bas
105105
opt(&args)
106106
}
107107

108-
if p.validator != nil {
109-
for _, r := range batch.Records {
110-
err := p.validator.Validate(r)
111-
if err != nil {
112-
records = append(records, produce.RecordError{BatchIndex: int32(r.Offset), BatchIndexErrorMessage: err.Error()})
113-
}
114-
}
115-
if len(records) > 0 && p.Topic.channel.Bindings.Kafka.ValueSchemaValidation && !args.SkipValidation {
116-
return p.Tail, records, fmt.Errorf("validation error")
117-
}
118-
}
119-
120108
p.m.Lock()
121109
defer p.m.Unlock()
122110

111+
var writeFuncs []func()
112+
123113
now := time.Now()
124114
baseOffset = p.Tail
125115
for _, r := range batch.Records {
126-
r.Offset = p.Tail
127-
p.trigger(r)
128-
switch {
129-
case len(p.Segments) == 0:
130-
p.Segments[p.ActiveSegment] = newSegment(p.Tail)
116+
117+
key, payload, err := p.validator.Validate(r)
118+
if err != nil {
119+
records = append(records, produce.RecordError{BatchIndex: int32(r.Offset), BatchIndexErrorMessage: err.Error()})
131120
}
132-
segment, ok := p.Segments[p.ActiveSegment]
133-
if !ok {
134-
segment = p.addSegment()
121+
122+
if len(records) > 0 && p.Topic.channel.Bindings.Kafka.ValueSchemaValidation && !args.SkipValidation {
123+
return p.Tail, records, fmt.Errorf("validation error")
135124
}
136125

137126
if r.Time.IsZero() {
138127
r.Time = now
139128
}
140-
segment.Log = append(segment.Log, r)
141-
segment.Tail++
142-
segment.LastWritten = now
143-
segment.Size += r.Size()
144-
p.Tail++
145129

146-
p.logger(r, p.Index, events.NewTraits().With("partition", strconv.Itoa(p.Index)))
130+
writeFuncs = append(writeFuncs, func() {
131+
r.Offset = p.Tail
132+
p.trigger(r)
133+
134+
if len(p.Segments) == 0 {
135+
p.Segments[p.ActiveSegment] = newSegment(p.Tail)
136+
}
137+
138+
segment, ok := p.Segments[p.ActiveSegment]
139+
if !ok {
140+
segment = p.addSegment()
141+
}
142+
143+
segment.Log = append(segment.Log, r)
144+
segment.Tail++
145+
segment.LastWritten = now
146+
segment.Size += r.Size()
147+
p.Tail++
148+
149+
p.logger(key, payload, r.Headers, p.Index, r.Offset, events.NewTraits().With("partition", strconv.Itoa(p.Index)))
150+
})
151+
}
152+
153+
for _, writeFunc := range writeFuncs {
154+
writeFunc()
147155
}
148156

149157
return

providers/asyncapi3/kafka/store/partition_test.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ func TestPartition(t *testing.T) {
1616
p := newPartition(
1717
0,
1818
map[int]*Broker{1: {Id: 1}},
19-
func(record *kafka.Record, partition int, traits events.Traits) {}, func(record *kafka.Record) {}, &Topic{})
19+
func(key, payload interface{}, headers []kafka.RecordHeader, partition int, offset int64, traits events.Traits) {
20+
}, func(record *kafka.Record) {}, &Topic{})
2021

2122
require.Equal(t, 0, p.Index)
2223
require.Equal(t, int64(0), p.StartOffset())
@@ -26,12 +27,12 @@ func TestPartition(t *testing.T) {
2627
}
2728

2829
func TestPartition_Write(t *testing.T) {
29-
var log []*kafka.Record
30+
var log []int64
3031
p := newPartition(
3132
0,
3233
map[int]*Broker{1: {Id: 1}},
33-
func(record *kafka.Record, partition int, traits events.Traits) {
34-
log = append(log, record)
34+
func(key, payload interface{}, headers []kafka.RecordHeader, partition int, offset int64, traits events.Traits) {
35+
log = append(log, offset)
3536
}, func(record *kafka.Record) {}, &Topic{})
3637

3738
offset, records, err := p.Write(kafka.RecordBatch{
@@ -74,7 +75,8 @@ func TestPartition_Read_Empty(t *testing.T) {
7475
p := newPartition(
7576
0,
7677
map[int]*Broker{1: {Id: 1}},
77-
func(_ *kafka.Record, partition int, _ events.Traits) {}, func(record *kafka.Record) {}, &Topic{})
78+
func(key, payload interface{}, headers []kafka.RecordHeader, partition int, offset int64, _ events.Traits) {
79+
}, func(record *kafka.Record) {}, &Topic{})
7880
b, errCode := p.Read(0, 1)
7981
require.Equal(t, kafka.None, errCode)
8082
require.Equal(t, 0, len(b.Records))
@@ -84,7 +86,8 @@ func TestPartition_Read(t *testing.T) {
8486
p := newPartition(
8587
0,
8688
map[int]*Broker{1: {Id: 1}},
87-
func(_ *kafka.Record, partition int, _ events.Traits) {}, func(record *kafka.Record) {}, &Topic{})
89+
func(key, payload interface{}, headers []kafka.RecordHeader, partition int, offset int64, _ events.Traits) {
90+
}, func(record *kafka.Record) {}, &Topic{})
8891
offset, records, err := p.Write(kafka.RecordBatch{
8992
Records: []*kafka.Record{
9093
{
@@ -108,7 +111,8 @@ func TestPartition_Read_OutOfOffset_Empty(t *testing.T) {
108111
p := newPartition(
109112
0,
110113
map[int]*Broker{1: {Id: 1}},
111-
func(_ *kafka.Record, partition int, _ events.Traits) {}, func(record *kafka.Record) {}, &Topic{})
114+
func(key, payload interface{}, headers []kafka.RecordHeader, partition int, offset int64, _ events.Traits) {
115+
}, func(record *kafka.Record) {}, &Topic{})
112116
b, errCode := p.Read(10, 1)
113117
require.Equal(t, kafka.None, errCode)
114118
require.Equal(t, 0, len(b.Records))
@@ -118,7 +122,8 @@ func TestPartition_Read_OutOfOffset(t *testing.T) {
118122
p := newPartition(
119123
0,
120124
map[int]*Broker{1: {Id: 1}},
121-
func(_ *kafka.Record, partition int, _ events.Traits) {}, func(record *kafka.Record) {}, &Topic{})
125+
func(key, payload interface{}, headers []kafka.RecordHeader, partition int, offset int64, _ events.Traits) {
126+
}, func(record *kafka.Record) {}, &Topic{})
122127
_, _, _ = p.Write(kafka.RecordBatch{
123128
Records: []*kafka.Record{
124129
{
@@ -139,7 +144,8 @@ func TestPartition_Write_Value_Validator(t *testing.T) {
139144
p := newPartition(
140145
0,
141146
map[int]*Broker{1: {Id: 1}},
142-
func(_ *kafka.Record, partition int, _ events.Traits) {}, func(record *kafka.Record) {}, &Topic{channel: &asyncapi3.Channel{Bindings: asyncapi3.ChannelBindings{
147+
func(key, payload interface{}, headers []kafka.RecordHeader, partition int, offset int64, _ events.Traits) {
148+
}, func(record *kafka.Record) {}, &Topic{channel: &asyncapi3.Channel{Bindings: asyncapi3.ChannelBindings{
143149
Kafka: asyncapi3.TopicBindings{ValueSchemaValidation: true},
144150
}}})
145151
p.validator = &validator{
@@ -167,7 +173,7 @@ func TestPartition_Write_Value_Validator(t *testing.T) {
167173
require.EqualError(t, err, "validation error")
168174
require.Len(t, recordsWithError, 1)
169175
require.Equal(t, int32(0), recordsWithError[0].BatchIndex)
170-
require.Equal(t, "found 1 error:\ninvalid type, expected string but got number\nschema path #/type", recordsWithError[0].BatchIndexErrorMessage)
176+
require.Equal(t, "invalid message: found 1 error:\ninvalid type, expected string but got number\nschema path #/type", recordsWithError[0].BatchIndexErrorMessage)
171177
require.Equal(t, int64(0), offset)
172178
require.Equal(t, int64(0), p.Offset())
173179
require.Equal(t, int64(0), p.StartOffset())
@@ -201,7 +207,8 @@ func TestPartition_Write_Value_Validator(t *testing.T) {
201207

202208
func TestPatition_Retention(t *testing.T) {
203209
p := newPartition(0, map[int]*Broker{1: {Id: 1}},
204-
func(_ *kafka.Record, partition int, _ events.Traits) {},
210+
func(key, payload interface{}, headers []kafka.RecordHeader, partition int, offset int64, _ events.Traits) {
211+
},
205212
func(record *kafka.Record) {}, &Topic{})
206213
require.Equal(t, int64(0), p.Head)
207214
offset, records, err := p.Write(kafka.RecordBatch{

providers/asyncapi3/kafka/store/store.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,8 @@ func (s *Store) getBrokerByHost(addr string) *Broker {
278278
return nil
279279
}
280280

281-
func (s *Store) log(record *kafka.Record, partition int, traits events.Traits) {
282-
events.Push(NewKafkaLog(record, partition), traits.WithNamespace("kafka").WithName(s.cluster))
281+
func (s *Store) log(key, payload interface{}, headers []kafka.RecordHeader, partition int, offset int64, traits events.Traits) {
282+
events.Push(NewKafkaLog(key, payload, headers, partition, offset), traits.WithNamespace("kafka").WithName(s.cluster))
283283
}
284284

285285
func (s *Store) trigger(record *kafka.Record) {

providers/asyncapi3/kafka/store/topic.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ func (t *Topic) update(config *asyncapi3.Channel, s *Store) {
6868
t.Partitions = t.Partitions[:numPartitions]
6969
}
7070

71-
func (t *Topic) log(record *kafka.Record, partition int, traits events.Traits) {
72-
t.logger(record, partition, traits.With("topic", t.Name))
71+
func (t *Topic) log(key, payload interface{}, headers []kafka.RecordHeader, partition int, offset int64, traits events.Traits) {
72+
t.logger(key, payload, headers, partition, offset, traits.With("topic", t.Name))
7373
}
7474

7575
func (t *Topic) Store() *Store {

0 commit comments

Comments
 (0)