简述
ZeroMQ(ØMQ)也叫ZMQ,是一个github开源项目,他为可伸缩的分布式或并发应用程序设计的高性能异步消息库。它提供一个消息队列,但是与面向消息的中间件不同,ZeroMQ的运行不需要专门的消息代理(message broker)。该库设计成常见的套接字(Socket)风格的API。
消息模型
请求响应模式(Request-Reply)
将一组客户端连接到一组服务器。这是一种远程过程调用和任务分发模式。
public class ReqZeroMQ {public static void main(String[] args) {ZMQ.Context context = ZMQ.context(1);// Socket to talk to serverSystem.out.println("Connecting to hello world server…");ZMQ.Socket requester = context.socket(SocketType.REQ);requester.connect("tcp://localhost:5555");for (int requestNbr = 0; requestNbr != 100; requestNbr++) {String request = "Hello";System.out.println("Sending Hello " + requestNbr);requester.send(request.getBytes(), 0);byte[] reply = requester.recv(0);System.out.println("Received " + new String(reply) + " " + requestNbr);}requester.close();context.term();}}
public class RepZeroMQ {public static void main(String[] args) {new Thread(() -> {ZMQ.Context context = ZMQ.context(1);// Socket to talk to clientsZMQ.Socket responder = context.socket(SocketType.REP);responder.bind("tcp://*:5555");while (!Thread.currentThread().isInterrupted()) {// Wait for next request from the clientbyte[] request = responder.recv(0);System.out.println("Received Hello");// Do some 'work'try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// Send reply back to clientString reply = "World";responder.send(reply.getBytes(), 0);}responder.close();context.term();}).start();}}
发布/订阅模式(Publish-Subscribe)
将一组发布者连接到一组订阅者。这是一种数据分发模式。
public class PubZeroMQ {public static void main(String[] args) {// Prepare our context and publisherZMQ.Context context = ZMQ.context(1);ZMQ.Socket publisher = context.socket(SocketType.PUB);publisher.bind("tcp://*:5556");// Initialize random number generatorRandom srandom = new Random(System.currentTimeMillis());while (!Thread.currentThread().isInterrupted()) {// Get values that will fool the bossint zipcode, temperature, relhumidity;zipcode = 10000 + srandom.nextInt(10000);temperature = srandom.nextInt(215) - 80 + 1;relhumidity = srandom.nextInt(50) + 10 + 1;// Send message to all subscribersString update = String.format("%05d %d %d", zipcode, temperature, relhumidity);publisher.send(update, 0);}publisher.close();context.term();}}
public class SubZeroMQ {public static void main(String[] args) {new Thread(() -> {ZMQ.Context context = ZMQ.context(1);// Socket to talk to serverSystem.out.println("Collecting updates from weather server");ZMQ.Socket subscriber = context.socket(SocketType.SUB);subscriber.connect("tcp://localhost:5556");// Subscribe to zipcode, default is NYC, 10001String filter = (args.length > 0) ? args[0] : "10001 ";subscriber.subscribe(filter.getBytes());// Process 100 updatesint update_nbr;long total_temp = 0;for (update_nbr = 0; update_nbr < 100; update_nbr++) {// Use trim to remove the tailing '0' characterString string = subscriber.recvStr(0).trim();StringTokenizer sscanf = new StringTokenizer(string, " ");int temperature = Integer.valueOf(sscanf.nextToken());total_temp += temperature;}System.out.println("Average temperature for zipcode '"+ filter + "' was " + (int) (total_temp / update_nbr));System.out.println("...");try {System.in.read();} catch (IOException e) {e.printStackTrace();}subscriber.close();context.term();}).start();new Thread(() -> {ZMQ.Context context = ZMQ.context(1);// Socket to talk to serverSystem.out.println("Collecting updates from weather server");ZMQ.Socket subscriber = context.socket(ZMQ.SUB);subscriber.connect("tcp://localhost:5556");// Subscribe to zipcode, default is NYC, 10001String filter = (args.length > 0) ? args[0] : "10002";subscriber.subscribe(filter.getBytes());// Process 100 updatesint update_nbr;long total_temp = 0;for (update_nbr = 0; update_nbr < 100; update_nbr++) {// Use trim to remove the tailing '0' characterString string = subscriber.recvStr(0).trim();StringTokenizer sscanf = new StringTokenizer(string, " ");int temperature = Integer.valueOf(sscanf.nextToken());total_temp += temperature;}System.out.println("Average temperature for zipcode '"+ filter + "' was " + (int) (total_temp / update_nbr));System.out.println("...");try {System.in.read();} catch (IOException e) {e.printStackTrace();}subscriber.close();context.term();}).start();}}
管道模式(Push-Pull)
以Push/Pull模式连接节点,可以有多个步骤,可以有循环。这是一种并行的任务分发和收集模式。
public class PushZeroMQ {public static void main(String[] args) {new Thread(() -> {ZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(SocketType.PUSH);socket.connect("tcp://127.0.0.1:5555");socket.setSendTimeOut(1000);int i = 0;while (true) {byte[] data = String.valueOf(i++).getBytes();boolean send = socket.send(data);System.out.println(socket.getTCPKeepAlive() + " send data " + data.length + " result " + send);}}).start();}}
public class PullZeroMQ {public static void main(String[] args) {new Thread(() -> {ZMQ.Context context = ZMQ.context(1);ZMQ.Socket socket = context.socket(SocketType.PULL);socket.bind("tcp://127.0.0.1:5555");socket.setReceiveTimeOut(1000);while (true) {String recv = socket.recvStr();System.out.println(socket.getTCPKeepAlive() + " recv data " + "result " + recv);}}).start();}}
排他对模式
在一个排他对中连接两个套接字(Socket)。(这是一种高级的为某种用例而设计的低级别模式)
参考文献
消息队列库——ZeroMQ
RabbitMq、ActiveMq、ZeroMq、kafka之间的比较,资料汇总
ØMQ维基百科

