安装与运行
nsq的镜像开启容器时并不是默认开启三个服务的,而是需要手动开启。
docker pull nsqio/nsq
docker run -itd --restart=on-failure:20 -p 4150:4150 -p 4151:4151 -p 4160:4160 -p 4161:4161 -p 4170:4170 -p 4171:4171 nsqio/nsq
docker exec -itd 容器id nsqlookupd
docker exec -itd 容器id nsqd --lookupd-tcp-address=127.0.0.1:4160
docker exec -itd 容器id nsqadmin --lookupd-http-address=127.0.0.1:4161
执行完毕后,在主机浏览器输入:localhost:4171, 可以看到nsq的topic后台。
PS
怀疑代码内部可能存在隐式端口未说明,目前docker方式是只能连nsqd,lookupd有点问题,不能给消费者寻找topic,暂时没找到原因,实体部署nsq如下:
- https://nsq.io/deployment/installing.html 安装指定版本
- 将安装文件里的bin路径,写入PATH
- 执行
nsq --version
得到有效信息,则为成功安装
基本上都是release免安版,不需要编译, 解压就好了.~
使用详解
https://nsq.io/overview/quick_start.html
go使用实例
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"log"
"runtime"
"sync"
"time"
)
//handler
type ConsumerT struct{}
var consumeMessageNumber int
var l sync.RWMutex
//处理消息
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
l.Lock()
defer l.Unlock()
consumeMessageNumber ++
return nil
}
var producer *nsq.Producer
var consumers []*nsq.Consumer
var conf *nsq.Config
var nsqdAddrTCP = "localhost:4150" // tcp used to publish topics and messages
var nsqdAddrHTTP = "localhost:4151" // http publish topics,not exampled
var nsqAdminAddr = "localhost:4171" // backend ui
var nsqlookupdAddr = "localhost:4161" // help consumer find topics
func init() {
log.SetFlags(log.Llongfile | log.LstdFlags)
//init a producer
var er error
conf = nsq.NewConfig()
producer, er = nsq.NewProducer(nsqdAddrTCP, conf)
if er != nil {
log.Println(er.Error())
return
}
er = producer.Ping()
if er != nil {
log.Println(er.Error())
return
}
//
init consumer
conf.LookupdPollInterval = 5 * time.Second
}
func main() {
defer func(){
producer.Stop()
for i,_:=range consumers {
consumers[i].Stop()
}
}()
runtime.GOMAXPROCS(runtime.NumCPU())
// 创建100个消费者
consumers = make([]*nsq.Consumer, 100)
var e error
for i, _ := range consumers {
consumers[i], e = nsq.NewConsumer("go-nsq_testcase", "channel_1", conf)
if e != nil {
log.Println(e.Error())
return
}
consumers[i].SetLogger(nil, 0)
consumers[i].AddHandler(&ConsumerT{}) // 添加消费者接口
if e = consumers[i].ConnectToNSQLookupd(nsqlookupdAddr);e!=nil {
log.Println(e)
return
}
}
// 并发发布3000个消息
for i := 0; i < 3000; i++ {
go func(i int) {
er := producer.Publish("go-nsq_testcase", []byte(fmt.Sprintf("hello,everyone_%d", i)))
if er != nil {
log.Println(er.Error())
return
}
}(i)
}
time.Sleep(20 * time.Second)
fmt.Println(consumeMessageNumber)
select {}
}
解决消息多次投递,单服务如何确保单次
假设从nsq拿到订单,需要避免订单重复处理
type Unique struct{
M *sync.RWMutex
mp map[string]interface{}
}
func NewUnique() Unique{
return Unique{
M: &sync.RWMutex{},
mp: make(map[string]interface{})
}
}
// 已处理过,true
// 未处理过,false
func (u *Unique)HasDealed(uniqueID string)bool {
u.RLock()
defer u.RUnlock()
_, ok := u.mp[uniqueID]
return ok
}
// 未处理时,添加
// 处理过时,无视
func (u *Unique) UnDealedAndWrite(uniqueID string, order interface{}) (existed bool) {
u.Lock()
defer u.Unlock()
_, ok := u.mp[uniqueID]
if !ok {
u.mp[uniqueID] = order
return false
}
return true
}
func main() {
unique := NewUnique()
// get an order
order := GetFromNSQ()
if unique.UnDealeAndWrite(order.UniqueID, order) {
fmt.Println("order 已被消费,无须添加处理,可通知nsq停止推送")
// 停止nsq推送该条order
StopNSQTopic(topicName)
return
}
fmt.Println("order未被消费,已添加")
// 消费order
handle(order)
}
服务群如何保证消息被消费一次
在思考,可以用数据库id唯一保证,比如处理以前,先插入记录,如果插入成功,则未处理,否则处理过