ZeroMQ之push/pull模式

时间:2024-03-31 15:07:25

PUHS: 用于发送消息,定义一个zeromqsocket实例,用于send msg

PULL: 用于接收消息,recv msg

APIhttp://api.zeromq.org/

 

push/pull模式如下图:(图源自http://zguide.zeromq.org/page:all)
ZeroMQ之push/pull模式
 

用于服务器与客户端消息通信。

下面是服务器给客户端发送消息的代码,服务器PUSH(绑定一个端口号),客户端PULL(连接服务器)

 

   服务器端代码  

package pushpull;

import org.zeromq.ZMQ;

public class Push {
    public static void main(String[] args) throws InterruptedException {
	// TODO Auto-generated method stub
	ZMQ.Context context = ZMQ.context(1);
	ZMQ.Socket sender = context.socket(ZMQ.PUSH);
	sender.bind("tcp://*:5557");
	int i = 0;
	while (true) {
	    Thread.currentThread().sleep(2000);
	    i++;
	    sender.send(("msg" + i).getBytes(), 0);
	}
    }
}

 

 

 

  客户端A的代码 

package pushpull;

import org.zeromq.ZMQ;

public class PullA {
    public static void main(String[] args) {
	// TODO Auto-generated method stub
	ZMQ.Context context = ZMQ.context(1);
	ZMQ.Socket receiver = context.socket(ZMQ.PULL);
	receiver.connect("tcp://localhost:5557");
	while (true) {
	  //  String msg = new String(receiver.recv(0));
	    System.out.println(new String(receiver.recv(0)));
	}

    }
}

  

  客户端B的代码

 

 

package pushpull;

import org.zeromq.ZMQ;

public class PullB {
    public static void main(String[] args) {
	// TODO Auto-generated method stub
	ZMQ.Context context = ZMQ.context(1);
	ZMQ.Socket receiver = context.socket(ZMQ.PULL);
	receiver.connect("tcp://localhost:5557");
	while (true) {
	    System.out.println(new String(receiver.recv(0)));
	}
    }
}

 

  客户端A接受的消息是  

msg5 
msg7 
msg9 
msg11 
...

   客户端B接收的消息是 

msg1
msg2 
msg3 
msg4 
msg6 
msg8 
msg10
...

 

可以看出push/pull模式是单向的,很适合消费者能力不足的情况,可以提供多个消费者。一条消息如果被A消费,B将不会再消费这条消息。

如果客户端给服务器发送消息,则服务器PULL(绑定一个端口号),客户端PUSH(连接服务器)