go使用rocketmq

时间:2024-10-10 08:57:17
  • //func TestMq(t *) {
  • //
  • // var err error
  • // MqProducer, err := (
  • // ("notify_test"),
  • // ([]string{"ip:port"}),
  • // (3),
  • // )
  • // if err != nil {
  • // panic(("init rocket mq producer err:%v", err))
  • // return
  • // }
  • //
  • // err = MqProducer.Start()
  • // if err != nil {
  • // panic(("producer mq start err:%v", err))
  • // return
  • // }
  • // ch := make(chan struct{}, 3)
  • // defer ()
  • //发送带tag的消息
  • // msg := ("test_notify_topic", []byte("111111111111"))
  • // //()
  • // 同步发送
  • // res, err := ((), msg)
  • //
  • // (res, err)
  • //
  • // msg = ("test_notify_topic_1", []byte("22222222222222"))
  • // //()
  • //
  • // res, err = ((), msg)
  • //
  • // (res, err)
  • // 发送延时消息
  • // msg = ("test_notify_topic_delay", []byte("delay msg"))
  • // //()
  • // (2)
  • // res, err = ((), msg)
  • //
  • // (res, err)
  • // 如果是同机器 同一组下的多个消费者,需要不同的实例化名字,不同机器的同一组不需要,默认会以机器
  • ip@port有关
  • // MqPushConsumer1, err := (
  • // //("mq1"),
  • // ("notify_test1"),
  • // ([]string{"192.168.11.8:9876"}),
  • // )
  • //
  • // MqPushConsumer2, err := (
  • // //("mq2"),
  • // ("notify_test2"),
  • // ([]string{"192.168.11.8:9876"}),
  • // )
  • //
  • // MqPushConsumer3, err := (
  • // //("mq2"),
  • // ("notify_test3"),
  • // ([]string{"192.168.11.8:9876"}),
  • // )
  • // if err != nil {
  • // panic(("init rocket mq push consumer err:%v", err))
  • // return
  • // }
  • // 取带该选择器的tag的消息,会过滤掉其他的
  • // selector := {
  • // Type: ,
  • // //Expression: ,
  • // }
  • // err = MqPushConsumer1.Subscribe("test_notify_topic", selector, func(ctx , msgs ...*) (, error) {
  • // for i := range msgs {
  • // ("11111111111111111", msgs[i])
  • // }
  • // ch <- struct{}{}
  • // return , nil
  • // })
  • // if err != nil {
  • // panic(("consumer subscribe succ notify tag err:%v", err))
  • // return
  • // }
  • //
  • // selector = {
  • // Type: ,
  • // //Expression: ,
  • // }
  • // err = MqPushConsumer2.Subscribe("test_notify_topic_1", selector, func(ctx , msgs ...*) (, error) {
  • // for i := range msgs {
  • // //("monitor notify produce suc msg, job id:%v, task id:%v, redo times:%v, msg:%v", x(), x(), , msgs[i])
  • // ("22222222222222", msgs[i])
  • // //("call back succ recv message", "msg", msgs[i])
  • // }
  • // ch <- struct{}{}
  • // return , nil
  • // })
  • // if err != nil {
  • // panic(("consumer subscribe fail notify tag err:%v", err))
  • // return
  • // }
  • //
  • // selector = {
  • // //Type: ,
  • // //Expression: ,
  • // }
  • // err = MqPushConsumer3.Subscribe("test_notify_topic_delay", selector, func(ctx , msgs ...*) (, error) {
  • // for i := range msgs {
  • // ("333333333333333", msgs[i])
  • // }
  • // ch <- struct{}{}
  • // return , nil
  • // })
  • // if err != nil {
  • // panic(("consumer subscribe fail notify tag err:%v", err))
  • // return
  • // }
  • //
  • // err = MqPushConsumer1.Start()
  • // err = MqPushConsumer2.Start()
  • // err = MqPushConsumer3.Start()
  • //
  • // if err != nil {
  • // panic(("mq push consumer err:%v", err))
  • // return
  • // }
  • //
  • // defer MqPushConsumer1.Shutdown()
  • // defer MqPushConsumer2.Shutdown()
  • // defer MqPushConsumer3.Shutdown()
  • //
  • // <-ch
  • // <-ch
  • // <-ch
  • //}
  • //
  • func callBackSucc(ctx , msgs ...*) (, error) {
  • for i := range msgs {
  • (msgs[i])
  • }
  • return , nil
  • }
  • func callBackFail(ctx , msgs ...*) (, error) {
  • for i := range msgs {
  • (msgs[i])
  • }
  • return , nil
  • }