Skip to content

Commit a73649f

Browse files
authored
Merge pull request #1610 from memphisdev/bugfix-RND-413-messages-that-send-and-the-redis-connector-on-pause-while-it-is-the-single-consumer-dosent-wait-him-to-be-reconnect
from cloud
2 parents f750de2 + a8d4143 commit a73649f

File tree

2 files changed

+105
-0
lines changed

2 files changed

+105
-0
lines changed

db/db.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2824,6 +2824,26 @@ func DeleteProducerByNameAndStationID(name string, stationId int) (bool, error)
28242824
return true, nil
28252825
}
28262826

2827+
func DeleteConnectorProducerByNameAndStationID(name string, stationId int) (bool, error) {
2828+
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
2829+
defer cancelfunc()
2830+
conn, err := MetadataDbClient.Client.Acquire(ctx)
2831+
if err != nil {
2832+
return false, err
2833+
}
2834+
defer conn.Release()
2835+
query := `DELETE FROM producers WHERE name = $1 AND station_id = $2 AND type = 'connector' LIMIT 1`
2836+
stmt, err := conn.Conn().Prepare(ctx, "delete_producer_by_name_and_station_id", query)
2837+
if err != nil {
2838+
return false, err
2839+
}
2840+
_, err = conn.Conn().Query(ctx, stmt.Name, name, stationId)
2841+
if err != nil {
2842+
return false, err
2843+
}
2844+
return true, nil
2845+
}
2846+
28272847
func DeleteProducerByNameStationIDAndConnID(name string, stationId int, connId string) (bool, error) {
28282848
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
28292849
defer cancelfunc()
@@ -3223,6 +3243,34 @@ func DeleteConsumerByNameStationIDAndConnID(connectionId, name string, stationId
32233243
return true, consumers[0], nil
32243244
}
32253245

3246+
func DeleteConsumerByNameStationIDAndType(consumerType, name string, stationId int) (bool, models.Consumer, error) {
3247+
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
3248+
defer cancelfunc()
3249+
conn, err := MetadataDbClient.Client.Acquire(ctx)
3250+
if err != nil {
3251+
return false, models.Consumer{}, err
3252+
}
3253+
defer conn.Release()
3254+
query := ` DELETE FROM consumers WHERE ctid = ( SELECT ctid FROM consumers WHERE type = $1 AND name = $2 AND station_id = $3 LIMIT 1) RETURNING *`
3255+
deleteStmt, err := conn.Conn().Prepare(ctx, "delete_consumers", query)
3256+
if err != nil {
3257+
return false, models.Consumer{}, err
3258+
}
3259+
rows, err := conn.Conn().Query(ctx, deleteStmt.Name, consumerType, name, stationId)
3260+
if err != nil {
3261+
return false, models.Consumer{}, err
3262+
}
3263+
defer rows.Close()
3264+
consumers, err := pgx.CollectRows(rows, pgx.RowToStructByPos[models.Consumer])
3265+
if err != nil {
3266+
return false, models.Consumer{}, err
3267+
}
3268+
if len(consumers) == 0 {
3269+
return false, models.Consumer{}, err
3270+
}
3271+
return true, consumers[0], nil
3272+
}
3273+
32263274
func DeleteConsumerByNameAndStationId(name string, stationId int) (bool, models.Consumer, error) {
32273275
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
32283276
defer cancelfunc()

server/memphis_handlers_consumers.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,63 @@ func (s *Server) destroyCGFromNats(c *client, reply, userName, tenantName string
757757

758758
}
759759

760+
func (s *Server) destroyCGFromNatsInternal(username, tenantName string, stationName StationName, consumer models.Consumer, station models.Station) error {
761+
// ensure not part of an active consumer group
762+
count, err := db.CountActiveConsumersInCG(consumer.ConsumersGroup, station.ID)
763+
if err != nil {
764+
return err
765+
}
766+
767+
deleted := false
768+
if count == 0 { // no other members in this group
769+
err = s.RemoveConsumer(station.TenantName, stationName, consumer.ConsumersGroup, consumer.PartitionsList)
770+
if err != nil && !IsNatsErr(err, JSConsumerNotFoundErr) && !IsNatsErr(err, JSStreamNotFoundErr) {
771+
return err
772+
}
773+
if err == nil {
774+
deleted = true
775+
}
776+
777+
err = db.RemovePoisonedCg(station.ID, consumer.ConsumersGroup)
778+
if err != nil && !IsNatsErr(err, JSConsumerNotFoundErr) && !IsNatsErr(err, JSStreamNotFoundErr) {
779+
return err
780+
}
781+
}
782+
783+
name := strings.ToLower(consumer.Name)
784+
if deleted {
785+
_, user, err := memphis_cache.GetUser(username, consumer.TenantName, false)
786+
if err != nil && !IsNatsErr(err, JSConsumerNotFoundErr) && !IsNatsErr(err, JSStreamNotFoundErr) {
787+
return err
788+
}
789+
message := fmt.Sprintf("Consumer %v has been destroyed", name)
790+
serv.Noticef("[tenant: %v][user: %v]: %v", user.TenantName, user.Username, message)
791+
var auditLogs []interface{}
792+
newAuditLog := models.AuditLog{
793+
StationName: stationName.Ext(),
794+
Message: message,
795+
CreatedBy: user.ID,
796+
CreatedByUsername: user.Username,
797+
CreatedAt: time.Now(),
798+
TenantName: user.TenantName,
799+
}
800+
auditLogs = append(auditLogs, newAuditLog)
801+
err = CreateAuditLogs(auditLogs)
802+
if err != nil {
803+
serv.Errorf("[tenant: %v]destroyCGFromNats at CreateAuditLogs: Consumer %v at station %v: %v", user.TenantName, consumer.Name, station.Name, err.Error())
804+
}
805+
806+
shouldSendAnalytics, _ := shouldSendAnalytics()
807+
if shouldSendAnalytics {
808+
analyticsParams := make(map[string]interface{})
809+
analytics.SendEvent(user.TenantName, username, analyticsParams, "user-remove-consumer-sdk")
810+
}
811+
}
812+
813+
return nil
814+
815+
}
816+
760817
func comparePartitionsList(pList1, pList2 []int) bool {
761818
if len(pList1) != len(pList2) {
762819
return false

0 commit comments

Comments
 (0)