This commit is contained in:
Gordon
2022-06-08 17:19:37 +08:00
parent 0214419446
commit b46f106a91
@@ -74,6 +74,8 @@ func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSessi
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key)) log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
if len(msg.Value) != 0 { if len(msg.Value) != 0 {
pc.msgHandle[msg.Topic](msg, string(msg.Key), sess) pc.msgHandle[msg.Topic](msg, string(msg.Key), sess)
} else {
log.Error("", "msg get from kafka but is nil", msg.Key)
} }
sess.MarkMessage(msg, "") sess.MarkMessage(msg, "")
} }