type EventBus interface {
Publisher
Subscriber
}// Closer 定义了可以关闭资源的接口
// 某些 EventBus 实现(如 RabbitMQ)需要清理内部资源(如连接池)
// 可以通过类型断言使用此接口: if closer, ok := bus.(eventbus.Closer); ok { closer.Close() }
type Closer interface {
Close() error
}type Publisher interface {
// Publish 同步发布消息
Publish(ctx context.Context, topic string, v interface{}) error
// PublishAsync 异步发布消息
PublishAsync(ctx context.Context, topic string, v interface{}) error
}type Subscriber interface {
// Subscribe 同步订阅消息
Subscribe(ctx context.Context, topic string, h SubscribeHandler) error
// SubscribeAsync 异步订阅消息
SubscribeAsync(ctx context.Context, topic string, h SubscribeHandler) error
}type SubscribeHandler func(ctx context.Context, msg *Message) errortype Message struct {
Header MessageHeader `json:"header"` // 消息头
Value interface{} `json:"value"` // 消息内容
}type MessageHeader map[string]string// NewGroupIDContext 创建带消费组ID的上下文
func NewGroupIDContext(parent context.Context, groupID string) context.Context
// FromGroupIDContext 从上下文中获取消费组ID
func FromGroupIDContext(ctx context.Context) (groupID string, ok bool)// SetMessageHeader 设置消息头的函数类型
type SetMessageHeader func(ctx context.Context) MessageHeader
// NewSetMessageHeaderContext 创建带消息头设置函数的上下文
func NewSetMessageHeaderContext(ctx context.Context, f SetMessageHeader) context.Context
// FromSetMessageHeaderContext 从上下文中获取消息头设置函数
func FromSetMessageHeaderContext(ctx context.Context) (f SetMessageHeader, ok bool)// NewRedis 创建Redis Streams事件总线
func NewRedis(conn *redis.Client, options ...*RedisOptions) (EventBus, error)
type RedisOptions struct {
ReadCount int64 // 每次读取的消息数量
ReadBlock time.Duration // 阻塞读取超时时间
Serialize Serializer // 序列化器
Logger Logger // 日志器
}// NewRedisQueue 创建Redis Queue事件总线
func NewRedisQueue(conn *redis.Client, options ...*RedisQueueOptions) (EventBus, error)
type RedisQueueOptions struct {
PollInterval time.Duration // 轮询间隔
Serialize Serializer // 序列化器
Logger Logger // 日志器
}Redis Queue 额外方法:
// RedistributeMessages 重新分发消息(用于消费组负载均衡)
func (bus *redisQueueEventBus) RedistributeMessages(ctx context.Context, fromTopic, toTopic string, count int64) error
// GetQueueLength 获取队列长度
func (bus *redisQueueEventBus) GetQueueLength(ctx context.Context, queueName string) (int64, error)
// PeekQueue 查看队列中的消息而不移除
func (bus *redisQueueEventBus) PeekQueue(ctx context.Context, queueName string, start, stop int64) ([]string, error)// NewRabbitMQ 创建RabbitMQ事件总线
func NewRabbitMQ(conn *amqp.Connection, options ...*RabbitMQOptions) (EventBus, error)
type RabbitMQOptions struct {
Serialize Serializer // 序列化器
Logger Logger // 日志器
}type Serializer interface {
Unmarshal(data []byte, msg interface{}) error
Marshal(msg interface{}) ([]byte, error)
ContentType() string
}内置实现:
type JSONSerialize struct{}type Logger interface {
Debugf(ctx context.Context, format string, args ...interface{})
Debugln(ctx context.Context, args ...interface{})
Infof(ctx context.Context, format string, args ...interface{})
Infoln(ctx context.Context, args ...interface{})
Warnf(ctx context.Context, format string, args ...interface{})
Warnln(ctx context.Context, args ...interface{})
Warningf(ctx context.Context, format string, args ...interface{})
Warningln(ctx context.Context, args ...interface{})
Errorf(ctx context.Context, format string, args ...interface{})
Errorln(ctx context.Context, args ...interface{})
}内置实现:
type StdLogger struct{} // 标准库日志
type LogrusLogger struct{} // Logrus日志
type ZapLogger struct{} // Zap日志
type ZlogLogger struct{} // Nilorg Zlog日志package main
import (
"context"
"log"
"time"
"github.com/go-redis/redis/v8"
"github.com/nilorg/eventbus"
)
func main() {
// 1. 创建Redis客户端
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
DB: 0,
})
defer client.Close()
// 2. 创建事件总线
bus, err := eventbus.NewRedisQueue(client, &eventbus.RedisQueueOptions{
PollInterval: time.Second,
Serialize: &eventbus.JSONSerialize{},
Logger: &eventbus.StdLogger{},
})
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
// 3. 订阅消息
go func() {
// 创建消费组上下文
groupCtx := eventbus.NewGroupIDContext(ctx, "worker_group")
err := bus.Subscribe(groupCtx, "tasks", func(ctx context.Context, msg *eventbus.Message) error {
log.Printf("处理任务: %+v", msg.Value)
return nil
})
if err != nil {
log.Printf("订阅失败: %v", err)
}
}()
// 4. 发布消息
for i := 0; i < 10; i++ {
// 创建带消息头的上下文
headerCtx := eventbus.NewSetMessageHeaderContext(ctx, func(ctx context.Context) eventbus.MessageHeader {
return eventbus.MessageHeader{
"source": "task_scheduler",
"timestamp": time.Now().Format(time.RFC3339),
"task_id": fmt.Sprintf("task_%d", i),
}
})
// 创建消费组上下文
groupCtx := eventbus.NewGroupIDContext(headerCtx, "worker_group")
task := map[string]interface{}{
"id": i,
"type": "data_processing",
"data": fmt.Sprintf("data_%d", i),
}
err := bus.PublishAsync(groupCtx, "tasks", task)
if err != nil {
log.Printf("发布任务失败: %v", err)
} else {
log.Printf("发布任务 %d", i)
}
time.Sleep(500 * time.Millisecond)
}
// 等待处理完成
time.Sleep(5 * time.Second)
}// 创建事件总线
bus, err := eventbus.NewRabbitMQ(conn)
if err != nil {
log.Fatal(err)
}
// 使用 defer 确保资源清理
defer func() {
if closer, ok := bus.(eventbus.Closer); ok {
if err := closer.Close(); err != nil {
log.Printf("关闭事件总线失败: %v", err)
}
}
}()
// 使用事件总线...// 订阅时的错误处理
err := bus.Subscribe(ctx, "events", func(ctx context.Context, msg *eventbus.Message) error {
// 业务处理
if err := processMessage(msg); err != nil {
// 返回错误,消息处理失败
return fmt.Errorf("处理消息失败: %w", err)
}
// 返回nil表示处理成功
return nil
})// 1. 使用异步模式
bus.PublishAsync(ctx, topic, data)
bus.SubscribeAsync(ctx, topic, handler)
// 2. 批量发布
for _, data := range batch {
bus.PublishAsync(ctx, topic, data)
}
// 3. 合理配置参数
options := &eventbus.RedisQueueOptions{
PollInterval: 100 * time.Millisecond, // 降低延迟
}创建基于NATS Core的事件总线,适用于高性能、低延迟的实时通信场景。
func NewNATS(conn *nats.Conn, options ...*NATSOptions) (bus EventBus, err error)参数:
conn: NATS连接实例options: 可选配置参数
示例:
// 连接NATS服务器
nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
return err
}
// 创建事件总线
bus, err := eventbus.NewNATS(nc)
if err != nil {
return err
}
// 发布消息
err = bus.Publish(ctx, "user.created", userData)
// 订阅消息
err = bus.SubscribeAsync(ctx, "user.*", func(ctx context.Context, msg *eventbus.Message) error {
// 处理消息
return nil
})创建基于NATS JetStream的事件总线,支持消息持久化、确认机制,适用于可靠性要求高的场景。
func NewNATSJetStream(conn *nats.Conn, options ...*NATSJetStreamOptions) (bus EventBus, err error)参数:
conn: NATS连接实例(需要支持JetStream)options: JetStream配置参数
示例:
// 连接NATS服务器
nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
return err
}
// 配置JetStream选项
options := eventbus.DefaultNATSJetStreamOptions
options.StreamName = "ORDERS"
options.MaxMsgs = 10000
options.MaxAge = 24 * time.Hour
// 创建JetStream事件总线
bus, err := eventbus.NewNATSJetStream(nc, &options)
if err != nil {
return err
}
// 发布消息(会被持久化)
err = bus.Publish(ctx, "order.created", orderData)
// 队列组订阅(负载均衡)
groupCtx := eventbus.NewGroupIDContext(ctx, "order-processors")
err = bus.SubscribeAsync(groupCtx, "order.*", func(ctx context.Context, msg *eventbus.Message) error {
// 处理订单事件
return nil
})NATS Core配置选项。
type NATSOptions struct {
Serialize Serializer // 序列化器,默认JSON
Logger Logger // 日志器
MaxRetries int // 发布重试次数,默认3
RetryInterval time.Duration // 重试间隔,默认2秒
BackoffMultiplier float64 // 退避倍数,默认2.0
MaxBackoff time.Duration // 最大退避时间,默认1分钟
MessageMaxRetries int // 消息处理重试次数,默认3
SkipBadMessages bool // 跳过无法序列化的消息,默认true
DeadLetterSubject string // 死信队列主题
QueueGroup string // 默认队列组名
}NATS JetStream配置选项,继承NATSOptions的所有配置。
type NATSJetStreamOptions struct {
// 基础选项(继承自NATSOptions)
Serialize Serializer
Logger Logger
MaxRetries int
RetryInterval time.Duration
BackoffMultiplier float64
MaxBackoff time.Duration
MessageMaxRetries int
SkipBadMessages bool
DeadLetterSubject string
QueueGroup string
// JetStream特有配置
StreamName string // 流名称,默认"EVENTBUS"
MaxMsgs int64 // 流最大消息数,默认1000000
MaxAge time.Duration // 消息最大保存时间,默认24小时
DuplicateWindow time.Duration // 去重窗口,默认2分钟
Replicas int // 副本数,默认1
AckWait time.Duration // 确认等待时间,默认30秒
MaxDeliver int // 最大投递次数,默认3
}NATS实现支持队列组功能,实现消费者负载均衡:
// 创建队列组上下文
groupCtx := eventbus.NewGroupIDContext(ctx, "my-service-group")
// 同一队列组的多个消费者会进行负载均衡
err = bus.SubscribeAsync(groupCtx, "events.*", handler1) // 消费者1
err = bus.SubscribeAsync(groupCtx, "events.*", handler2) // 消费者2
// 不同队列组的消费者会收到所有消息
otherGroupCtx := eventbus.NewGroupIDContext(ctx, "other-service-group")
err = bus.SubscribeAsync(otherGroupCtx, "events.*", handler3) // 独立消费者NATS支持主题通配符:
// 精确匹配
bus.Subscribe(ctx, "user.created", handler)
// 单级通配符 *
bus.Subscribe(ctx, "user.*", handler) // 匹配 user.created, user.updated 等
// 多级通配符 >
bus.Subscribe(ctx, "events.>", handler) // 匹配 events.user.created, events.order.paid 等// 设置消息头
headerCtx := eventbus.NewSetMessageHeaderContext(ctx, func(ctx context.Context) eventbus.MessageHeader {
return eventbus.MessageHeader{
"message-id": generateUniqueID(),
"source": "user-service",
"timestamp": fmt.Sprintf("%d", time.Now().Unix()),
}
})
// 发布带头信息的消息
err = bus.Publish(headerCtx, "user.created", userData)NATS实现提供多层次的错误处理:
- 发布重试:发布失败时自动重试
- 消息处理重试:消息处理失败时重试
- 死信队列:重试失败的消息发送到死信队列
options := eventbus.NATSOptions{
MaxRetries: 5, // 发布重试5次
RetryInterval: time.Second, // 初始重试间隔1秒
BackoffMultiplier: 1.5, // 每次重试间隔增加50%
MaxBackoff: time.Minute, // 最大重试间隔1分钟
MessageMaxRetries: 3, // 消息处理重试3次
DeadLetterSubject: "failed.messages", // 死信队列
}JetStream提供的额外特性:
消息持久化:
// 消息会被持久化到磁盘,服务重启后仍可消费
err = bus.Publish(ctx, "important.event", data)消息确认:
// 消息处理成功后自动确认,失败时会重新投递
err = bus.SubscribeAsync(ctx, "events.*", func(ctx context.Context, msg *eventbus.Message) error {
if err := processMessage(msg); err != nil {
return err // 返回错误,消息会重新投递
}
return nil // 返回nil,消息被确认
})去重机制:
// 使用消息ID实现去重
headerCtx := eventbus.NewSetMessageHeaderContext(ctx, func(ctx context.Context) eventbus.MessageHeader {
return eventbus.MessageHeader{
"message-id": "unique-id-123", // 相同ID的消息会被去重
}
})// 1. 使用异步模式提高吞吐量
bus.PublishAsync(ctx, topic, data)
bus.SubscribeAsync(ctx, topic, handler)
// 2. 合理配置JetStream参数
options := &eventbus.NATSJetStreamOptions{
MaxMsgs: 100000, // 根据业务调整流大小
MaxAge: time.Hour, // 消息保留时间
AckWait: 10 * time.Second, // 确认等待时间
MaxDeliver: 5, // 最大投递次数
}
// 3. 使用队列组实现水平扩展
groupCtx := eventbus.NewGroupIDContext(ctx, "worker-group")
for i := 0; i < workerCount; i++ {
bus.SubscribeAsync(groupCtx, "work.tasks", processTask)
}