有赞nsq使用体验
有赞 go-nsq 使用体验
之前有使用过 rabbitMQ,公司调研结果推荐尝试有赞推广的
go-nsq: https://github.com/youzan/go-nsq
同时简单的体验了一下,并在这里简单的记录一下几个有意思的地方
概念简介
topic :实际上是一个 消息队列实体, 生产者 发布消息 到topic。
channel : 是消息分发的通道,从属于topic。 消费者 从 channel 里获取消息。
nsqd: 承载 topic 和 channel 功能的 功能性节点。
nsqlookupd: 承载 注册发现、负载均衡自动调度的 管理性节点。
发布方面
发布消息有多种方式,大的分类有 有序和无序
具体还有 :
- 指定分区发布,
- 发布可追踪的消息,
- 发布如果失败后代重试发布的消息,
- 发布带扩展消息的消息,
- 一次发布多个消息,
- 带同步机制的发布。
消费方面
message 是消费中主要使用的结构体, 可以简单看下
type Message struct {
// topic 中的唯一id, 用 [16]byte 存储, binary.BigEndian.Uint64(message.ID[:])) 来处理
ID MessageID
// 消息体
Body []byte
//
Timestamp int64
// 重试次数 重试次数大于 consumer 配置的最大重试次数, 走failFunc
Attempts uint16
NSQDAddress string
Partition string
Delegate MessageDelegate
// 是否向 service 发起反馈(例如 Requeue 和 Finish)
autoResponseDisabled int32
responded int32
// 分片
Offset uint64
RawSize uint32
ExtVer uint8
ExtBytes []byte
}
消费方法
AddConcurrentHandlerFuncs(HandlerFunc, FailHandlerFunc, concurrency)
工作原理实际就是启 concurrency 个协程去执行 handlerFuncLoop()
其中通过判断 message.Attempts 来决定走两种处理方式, 如果走到了 failedFunc 就不关注程序 error 状态
if r.shouldFailMessage(message, failedFunc) {
message.Finish()
continue
}
err := handlerFunc(message)
失败任务延时计算
会有一个默认超时时间的计数,可以配置 comsumer 时候配置,min:”0” max:”60m” default:”90s”`, 会用这个值和使用次数做乘积,同时可以配置一个最大延时时间,也就是转换成为一个延时消息
if delay == -1 {
// linear delay
delay = c.config.DefaultRequeueDelay * time.Duration(m.Attempts)
// bound the requeueDelay to configured max
if delay > c.config.MaxRequeueDelay {
delay = c.config.MaxRequeueDelay
}