NSQ介绍

NSQ是Go语言编写的一个开源的实时分布式的内存消息队列,性能十分优异。

有以下优势:

  • 1、NSQ提倡分布式和分散的拓扑,没有单点故障,支持容错和高可用性,并提供可靠的消息交付保证。
  • 2、NSQ支持横向拓展,没有任何集中式代理。
  • 3、NSQ易于配置和部署,并内置了管理界面。

NSQ的使用场景

异步处理

参照下图利用消息队列把业务流程中的非关键流程异步化,从而显著降低业务请求响应时间。

nsq

应用解耦

通过使用消息队列将不同的业务逻辑解耦,降低系统间的耦合,提高系统的健壮性。后续有其他业务要使用订单数据可直接订阅消息队列,提高系统的灵活性。

nsq2

流量削峰

类似于秒杀等场景下,某一时间可能会产生大量的请求,使用消息队列能够为后端处理请求提供一定的缓冲区,保证后端服务的稳定性。

nsq3

安装运行NSQ

1、下载nsq docker镜像

1
docker pull nsqio/nsq

2、运行docker容器nsqlookupd

nslookup是维护所有nsqd状态、提供服务发现的守护进程。它能为消费者查找特定topic下的nsqd提供了运行时的自动发现服务。他不维持持久状态,也不需要与任何其它nsqlookupd实例协调以满足查询。因此根据你系统的冗余要求尽可能多地部署nsqlookupd节点。他们消耗的资源很少可以与其他服务共存。建议为每个数据中心运行至少3个集群。

1
docker run -d --name nsqlookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd

3、获取nsqlookupd容器ip

1
docker inspect -f '{{.NetworkSettings.IPAddress}}' nsqlookupd

4、运行docker容器nsqd

nsqd是一个守护进程,他接收、排队并向客户端发送消息。

broadcast-address配置广播地址。

在搭配nsqlookupd的情况下还需要指定nsqlookupd的地址。

如果部署了多个nsqlokuod节点的集群,可以指定多个nsqlookupd的地址。

1
docker run -d --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=172.17.0.6 --lookupd-tcp-address=172.17.0.6:4160

5、运行docker容器nsqadmin

一个实时监控集群状态、执行各种管理任务的Web管理平台。

需要制定nsqlookupd地址。

1
docker run -d --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin  --lookupd-http-address=172.17.0.6:4161

image-20200830134921461

第一个-p为tcp端口,第二个-p为http端口,用的时候需要注意

NSQ架构

NSQ工作模式

nsq4

Topic和Channel

每个nsqd实例旨在一次处理多个数据流。这些数据流称为topics,一个topic具有1个或多个channels。每个channel都会收到topic所有消息的副本,实际上下游服务是通过对应的channel来消费topic消息。

topicchannel不是预先配置的。topic在首次使用时创建,方法是将其发布到指定的topic,或者订阅指定topic上的channelchannel是通过订阅指定的channel在第一次使用时创建。

topicchanel都互相独立地缓冲数据,防止缓慢的消费者导致其他channel的积压(同样适用于topic级别)。

channel可以并且通常会连接多个客户端。假设所有连接的客户端都处于准备接收消息的状态,则每条消息将被传递到随即客户端。

nsq5

总而言之,消息是从topic -> channel(每个channel接收该topic的所有消息副本)多播的,但是从channel —> consumer均匀分布(每个消费者接收该channel的一部分消息)。

NSQ接收和发送消息的流程

nsq6

NSQ特性

  • 1、消息默认不持久化,可以配置成持久化模式。nsq采用的方式是内存+硬盘的模式,当内存到达一定的程度时就会将数据持久化到硬盘。

    • 如果将--mem-queue-size设置为0,所有的消息将会储存到磁盘。
    • 服务器重启时也会将当时在内存中的消息持久化。
  • 每条消息至少传第一次。

  • 消息不保证有序。

Go操作NSQ

安装

1
go get -u github.com/nsqio/go-nsq

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package main

import (
"bufio"
"fmt"
"github.com/nsqio/go-nsq"
"os"
"strings"
)

//nsq_consumer

//生产者
var producer *nsq.Producer

//topic_key
var topic_key = "topic_demo"

//初始化生产者
func initProducer(address string) (err error) {
config := nsq.NewConfig()
producer, err = nsq.NewProducer(address, config)
if err != nil {
fmt.Printf("create producer failed, err: %#v\n", err)
return err
}
return nil
}

func main() {
address := "47.106.172.60:4150"
err := initProducer(address)
if err != nil {
fmt.Printf("init producer failed, err: %#v\n", err)
return
}

//从标准输入读取
reader := bufio.NewReader(os.Stdin)
for {
fmt.Println("please input msg:")

data, err := reader.ReadString('\n')
if err != nil {
fmt.Printf("read from stdin failed, err: %#v\n", err)
continue
}
data = strings.TrimSpace(data)
//输入Q退出
if strings.ToUpper(data) == "Q" {
break
}
//向topic_demo publish数据
err = producer.Publish(topic_key, []byte(data))
if err != nil {
fmt.Printf("publish msg to nsq failed, err: %#v\n", err)
continue
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package main

import (
"fmt"
"github.com/nsqio/go-nsq"
"os"
"os/signal"
"syscall"
"time"
)

var topic_key = "topic_demo"

var channel = "second"

//nsq consumer demo

//MyHandler 是一个消费者类型
type MyHandler struct {
Title string
}

//HandleMessage是需要实现处理消息的方法
func (m *MyHandler) HandleMessage(msg *nsq.Message)(err error){
fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body))
return
}

//初始化消费者
func initConsumer(topic string, channel string, address string) (err error){
config := nsq.NewConfig()
config.LookupdPollInterval = 15 * time.Second
c, err := nsq.NewConsumer(topic, channel, config)
if err != nil {
fmt.Printf("create consumer failed, err: %#v\n", err)
return
}
consumer := &MyHandler{
Title: "测试",
}
c.AddHandler(consumer)

//直连nsqd
//if err := c.ConnectToNSQD(address); err != nil{
// return err
//}
//通过lookupd查询
if err := c.ConnectToNSQLookupd(address);err != nil {
return err
}
return nil
}

func main() {
address := "47.106.172.60:4161"
err := initConsumer(topic_key, channel, address)
if err != nil {
fmt.Printf("init consumer failed, err: %#v\v", err)
return
}
//定义一个信号的通道
c := make(chan os.Signal)
//转发键盘中断信号到signals
signal.Notify(c, syscall.SIGINT)
//阻塞
<-c
}