有赞nsq使用体验

有赞 go-nsq 使用体验

之前有使用过 rabbitMQ,公司调研结果推荐尝试有赞推广的

go-nsq: https://github.com/youzan/go-nsq

同时简单的体验了一下,并在这里简单的记录一下几个有意思的地方

概念简介

topic :实际上是一个 消息队列实体, 生产者 发布消息 到topic。

channel : 是消息分发的通道,从属于topic。 消费者 从 channel 里获取消息。

nsqd: 承载 topic 和 channel 功能的 功能性节点。

nsqlookupd: 承载 注册发现、负载均衡自动调度的 管理性节点。

发布方面

发布消息有多种方式,大的分类有 有序和无序

具体还有 :

  1. 指定分区发布,
  2. 发布可追踪的消息,
  3. 发布如果失败后代重试发布的消息,
  4. 发布带扩展消息的消息,
  5. 一次发布多个消息,
  6. 带同步机制的发布。

消费方面

message 是消费中主要使用的结构体, 可以简单看下

type Message struct {
	// topic 中的唯一id, 用 [16]byte 存储, binary.BigEndian.Uint64(message.ID[:])) 来处理
	ID        MessageID  
	// 消息体
	Body      []byte
	//
	Timestamp int64
	// 重试次数 重试次数大于 consumer 配置的最大重试次数, 走failFunc
	Attempts  uint16
	
	NSQDAddress string
	Partition string
	Delegate MessageDelegate
	// 是否向 service 发起反馈(例如 Requeue 和 Finish)
	autoResponseDisabled int32
	responded            int32
    // 分片
	Offset               uint64
	RawSize              uint32
	ExtVer   uint8
	ExtBytes []byte
}

消费方法

AddConcurrentHandlerFuncs(HandlerFunc, FailHandlerFunc, concurrency)

工作原理实际就是启 concurrency 个协程去执行 handlerFuncLoop()

其中通过判断 message.Attempts 来决定走两种处理方式, 如果走到了 failedFunc 就不关注程序 error 状态

    if r.shouldFailMessage(message, failedFunc) { 
        message.Finish()
        continue
    }

    err := handlerFunc(message)

失败任务延时计算

会有一个默认超时时间的计数,可以配置 comsumer 时候配置,min:”0” max:”60m” default:”90s”`, 会用这个值和使用次数做乘积,同时可以配置一个最大延时时间,也就是转换成为一个延时消息

    if delay == -1 {
    // linear delay
    delay = c.config.DefaultRequeueDelay * time.Duration(m.Attempts)
    // bound the requeueDelay to configured max
    if delay > c.config.MaxRequeueDelay {
        delay = c.config.MaxRequeueDelay
    }