Skip to content

Commit 069eb72

Browse files
committed
fix update topic config: partitions and validation
1 parent d7f699a commit 069eb72

File tree

11 files changed

+103
-23
lines changed

11 files changed

+103
-23
lines changed

engine/kafka_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,6 @@ func TestKafkaClient_Produce(t *testing.T) {
285285
asyncapi3test.WithPayload(schematest.New("string")),
286286
asyncapi3test.WithKey(schematest.New("string")),
287287
),
288-
asyncapi3test.WithTopicBinding(asyncapi3.TopicBindings{ValueSchemaValidation: true}),
289288
),
290289
asyncapi3test.WithChannel("bar",
291290
asyncapi3test.WithKafkaChannelBinding(asyncapi3.TopicBindings{Partitions: 10}),

providers/asyncapi3/asyncapi3test/channel.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ type ChannelOptions func(c *asyncapi3.Channel)
99

1010
func NewChannel(opts ...ChannelOptions) *asyncapi3.Channel {
1111
ch := &asyncapi3.Channel{}
12+
// default enable validation
13+
ch.Bindings.Kafka.ValueSchemaValidation = true
14+
ch.Bindings.Kafka.Partitions = 1
1215
for _, opt := range opts {
1316
opt(ch)
1417
}

providers/asyncapi3/asyncapi3test/config.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ type ConfigOptions func(c *asyncapi3.Config)
88

99
func NewConfig(opts ...ConfigOptions) *asyncapi3.Config {
1010
c := &asyncapi3.Config{
11-
Version: "2.0.0",
12-
Info: asyncapi3.Info{Name: "test", Version: "1.0"},
13-
Servers: map[string]*asyncapi3.ServerRef{}}
11+
Version: "2.0.0",
12+
Info: asyncapi3.Info{Name: "test", Version: "1.0"},
13+
Servers: map[string]*asyncapi3.ServerRef{},
14+
DefaultContentType: "application/json",
15+
}
1416
for _, opt := range opts {
1517
opt(c)
1618
}

providers/asyncapi3/channel.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ func (r *ChannelRef) parse(config *dynamic.Config, reader dynamic.Reader) error
7777
func (c *Channel) UnmarshalYAML(node *yaml.Node) error {
7878
// set default
7979
c.Bindings.Kafka.ValueSchemaValidation = true
80+
c.Bindings.Kafka.Partitions = 1
8081

8182
type alias Channel
8283
a := alias(*c)
@@ -91,6 +92,7 @@ func (c *Channel) UnmarshalYAML(node *yaml.Node) error {
9192
func (c *Channel) UnmarshalJSON(b []byte) error {
9293
// set default
9394
c.Bindings.Kafka.ValueSchemaValidation = true
95+
c.Bindings.Kafka.Partitions = 1
9496

9597
type alias Channel
9698
a := alias(*c)

providers/asyncapi3/kafka/store/produce.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (s *Store) produce(rw kafka.ResponseWriter, req *kafka.Request) error {
4141
} else {
4242
p := topic.Partition(int(rp.Index))
4343
if p == nil {
44-
resPartition.ErrorCode = kafka.UnknownServerError
44+
resPartition.ErrorCode = kafka.UnknownTopicOrPartition
4545
resPartition.ErrorMessage = fmt.Sprintf("unknown partition %v", rp.Index)
4646
log.Errorf("kafka Produce: %v", resPartition.ErrorMessage)
4747
} else {

providers/asyncapi3/kafka/store/produce_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,6 @@ func TestProduce(t *testing.T) {
183183
asyncapi3test.WithMessage("foo",
184184
asyncapi3test.WithContentType("application/json"),
185185
asyncapi3test.WithPayload(schematest.New("integer"))),
186-
asyncapi3test.WithTopicBinding(asyncapi3.TopicBindings{ValueSchemaValidation: true}),
187186
),
188187
))
189188

providers/asyncapi3/kafka/store/store.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,8 @@ func (s *Store) Update(c *asyncapi3.Config) {
156156
if ch.Value == nil {
157157
continue
158158
}
159-
k := ch.Value.Bindings.Kafka
160159
if t, ok := s.topics[n]; ok {
161-
for _, p := range t.Partitions[k.Partitions:] {
162-
p.delete()
163-
}
160+
t.update(ch.Value, s)
164161
} else {
165162
if _, err := s.addTopic(n, ch.Value, getOperations(ch.Value, c)); err != nil {
166163
log.Errorf("unable to add topic '%v' to broker '%v': %v", n, s.cluster, err)
@@ -219,7 +216,7 @@ func (s *Store) addTopic(name string, channel *asyncapi3.Channel, ops []*asyncap
219216
if _, ok := s.topics[name]; ok {
220217
return nil, fmt.Errorf("topic %v already exists", name)
221218
}
222-
t := newTopic(name, channel, ops, s.brokers, s.log, s.trigger, s)
219+
t := newTopic(name, channel, ops, s)
223220
s.topics[name] = t
224221
return t, nil
225222
}

providers/asyncapi3/kafka/store/store_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package store_test
33
import (
44
"github.com/stretchr/testify/require"
55
"mokapi/engine/enginetest"
6+
"mokapi/kafka"
67
"mokapi/providers/asyncapi3"
78
"mokapi/providers/asyncapi3/asyncapi3test"
89
"mokapi/providers/asyncapi3/kafka/store"
10+
"mokapi/schema/json/schematest"
911
"testing"
1012
)
1113

@@ -76,6 +78,60 @@ func TestStore(t *testing.T) {
7678
require.Error(t, err, "topic foo already exists")
7779
},
7880
},
81+
{
82+
"update topic add partition",
83+
func(t *testing.T) {
84+
s := store.New(asyncapi3test.NewConfig(asyncapi3test.WithChannel("foo", asyncapi3test.WithKafkaChannelBinding(asyncapi3.TopicBindings{Partitions: 1}))), enginetest.NewEngine())
85+
defer s.Close()
86+
87+
s.Update(asyncapi3test.NewConfig(asyncapi3test.WithChannel("foo", asyncapi3test.WithKafkaChannelBinding(asyncapi3.TopicBindings{Partitions: 2}))))
88+
89+
require.Len(t, s.Topic("foo").Partitions, 2)
90+
},
91+
},
92+
{
93+
"update topic remove partition",
94+
func(t *testing.T) {
95+
s := store.New(asyncapi3test.NewConfig(asyncapi3test.WithChannel("foo", asyncapi3test.WithKafkaChannelBinding(asyncapi3.TopicBindings{Partitions: 2}))), enginetest.NewEngine())
96+
defer s.Close()
97+
98+
s.Update(asyncapi3test.NewConfig(asyncapi3test.WithChannel("foo", asyncapi3test.WithKafkaChannelBinding(asyncapi3.TopicBindings{Partitions: 1}))))
99+
100+
require.Len(t, s.Topic("foo").Partitions, 1)
101+
},
102+
},
103+
{
104+
"update topic change schema",
105+
func(t *testing.T) {
106+
s := store.New(asyncapi3test.NewConfig(
107+
asyncapi3test.WithChannel("foo",
108+
asyncapi3test.WithMessage("foo",
109+
asyncapi3test.WithPayload(schematest.New("integer")),
110+
asyncapi3test.WithContentType("application/json"),
111+
),
112+
)),
113+
enginetest.NewEngine())
114+
defer s.Close()
115+
116+
s.Update(asyncapi3test.NewConfig(
117+
asyncapi3test.WithChannel("foo",
118+
asyncapi3test.WithMessage("foo",
119+
asyncapi3test.WithPayload(schematest.New("string")),
120+
asyncapi3test.WithContentType("application/json"),
121+
),
122+
),
123+
))
124+
125+
_, _, err := s.Topic("foo").Partitions[0].Write(
126+
kafka.RecordBatch{Records: []*kafka.Record{
127+
{
128+
Value: kafka.NewBytes([]byte("123")),
129+
},
130+
}},
131+
)
132+
require.EqualError(t, err, "validation error")
133+
},
134+
},
79135
}
80136

81137
t.Parallel()

providers/asyncapi3/kafka/store/topic.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,39 @@ func (t *Topic) delete() {
3535
}
3636
}
3737

38-
func newTopic(name string, channel *asyncapi3.Channel, ops []*asyncapi3.Operation, brokers Brokers, logger LogRecord, trigger Trigger, s *Store) *Topic {
39-
t := &Topic{Name: name, logger: logger, s: s, channel: channel, operations: ops}
38+
func newTopic(name string, channel *asyncapi3.Channel, ops []*asyncapi3.Operation, s *Store) *Topic {
39+
t := &Topic{Name: name, logger: s.log, s: s, channel: channel, operations: ops}
4040

4141
numPartitions := channel.Bindings.Kafka.Partitions
42-
if numPartitions == 0 {
43-
numPartitions = 1
44-
}
4542
for i := 0; i < numPartitions; i++ {
46-
part := newPartition(i, brokers, t.log, trigger, t)
43+
part := newPartition(i, s.brokers, t.log, s.trigger, t)
4744
part.validator = newValidator(channel)
4845
t.Partitions = append(t.Partitions, part)
4946
}
5047

5148
return t
5249
}
5350

51+
func (t *Topic) update(config *asyncapi3.Channel, s *Store) {
52+
numPartitions := config.Bindings.Kafka.Partitions
53+
54+
for i, p := range t.Partitions {
55+
if i >= numPartitions {
56+
p.delete()
57+
} else {
58+
p.validator = newValidator(config)
59+
}
60+
}
61+
62+
for i := len(t.Partitions); i < numPartitions; i++ {
63+
part := newPartition(i, s.brokers, t.log, s.trigger, t)
64+
part.validator = newValidator(config)
65+
t.Partitions = append(t.Partitions, part)
66+
}
67+
68+
t.Partitions = t.Partitions[:numPartitions]
69+
}
70+
5471
func (t *Topic) log(record *kafka.Record, partition int, traits events.Traits) {
5572
t.logger(record, partition, traits.With("topic", t.Name))
5673
}

providers/asyncapi3/kafka/store/validation.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,15 @@ type recordValidator interface {
2121

2222
func newValidator(c *asyncapi3.Channel) *validator {
2323
v := &validator{}
24-
v.update(c)
25-
return v
26-
}
27-
28-
func (v *validator) update(c *asyncapi3.Channel) {
29-
v.validators = nil
3024

3125
for id, msg := range c.Messages {
3226
if msg.Value == nil || msg.Value.Payload == nil {
3327
continue
3428
}
3529
v.validators = append(v.validators, newMessageValidator(id, msg.Value))
3630
}
31+
32+
return v
3733
}
3834

3935
func (v *validator) Validate(record *kafka.Record) error {

0 commit comments

Comments
 (0)