window下 ActiveMQ安装
ActiveMQ部署其实很简单,和所有Java一样,要跑java程序就必须先安装JDK并配置好环境变量,这个很简单。
然后解压下载的apache-activemq-5.10-20140603.133406-78-bin.zip压缩包到一个目录,得到解压后的目录结构如下图:
进入bin目录,发现有win32和win64两个文件夹,这2个文件夹分别对应windows32位和windows64位操作系统的启动脚本。

我的实验环境是windowsXP,就进入win32目录,会看到如下目录结构。

其中activemq.bat便是启动脚本,双击启动。

ActiveMQ默认启动到8161端口,启动完了后在浏览器地址栏输入:http://localhost:8161/admin要求输入用户名密码,默认用户名密码为admin、admin,这个用户名密码是在conf/users.properties中配置的。输入用户名密码后便可看到如下图的ActiveMQ控制台界面了。
控制台介绍

Number Of Consumers 消费者
这个是消费者端的消费者数量
Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数
Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减
Messages Dequeued 出了队列的消息 可以理解为是消费这消费掉的数量
这个要分两种情况理解
在queues里它和进入队列的总数量相等(因为一个消息只会被成功消费一次),如果暂时不等是因为消费者还没来得及消费。
在 topics里 它因为多消费者从而导致数量会比入队列数高。
简单的理解上面的意思就是
当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。
当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1.
在来一条消息时,等待消费的消息是1,进入队列的消息就是2.
没有消费者时 Pending Messages 和 入队列数量一样
有消费者消费的时候 Pedding会减少 出队列会增加
到最后 就是 入队列和出队列的数量一样多
以此类推,进入队列的消息和出队列的消息是池子,等待消费的消息是水流。
实现点对点通讯模式
使用ActiveMQ完成点对点(p2p)通讯模式
引入pom文件依赖
<dependencies><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-core</artifactId><version>5.7.0</version></dependency></dependencies>
生产者
public class Producter {public static void main(String[] args) throws JMSException {// ConnectionFactory :连接工厂,JMS 用它创建连接ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");// JMS 客户端到JMS Provider 的连接Connection connection = connectionFactory.createConnection();connection.start();// Session: 一个发送或接收消息的线程Session session = connection.createSession(Boolean.falst, Session.AUTO_ACKNOWLEDGE);// Destination :消息的目的地;消息发送给谁.// 获取session注意参数值my-queue是Query的名字Destination destination = session.createQueue("my-queue");// MessageProducer:消息生产者MessageProducer producer = session.createProducer(destination);// 设置不持久化producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 发送一条消息for (int i = 1; i <= 5; i++) {sendMsg(session, producer, i);}session.commit();connection.close();}/*** 在指定的会话上,通过指定的消息生产者发出一条消息** @param session* 消息会话* @param producer* 消息生产者*/public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {// 创建一条文本消息TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);// 通过消息生产者发出消息producer.send(message);}}
消费者
public class JmsReceiver {public static void main(String[] args) throws JMSException {// ConnectionFactory :连接工厂,JMS 用它创建连接ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");// JMS 客户端到JMS Provider 的连接Connection connection = connectionFactory.createConnection();connection.start();// Session: 一个发送或接收消息的线程Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// Destination :消息的目的地;消息发送给谁.// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置Destination destination = session.createQueue("my-queue");// 消费者,消息接收者MessageConsumer consumer = session.createConsumer(destination);while (true) {TextMessage message = (TextMessage) consumer.receive();if (null != message) {System.out.println("收到消息:" + message.getText());session.commit();} elsebreak;}session.close();connection.close();}}
JMS消息可靠机制
ActiveMQ消息签收机制:
客戶端成功接收一条消息的标志是一条消息被签收,成功应答。
消息的签收情形分两种:
1、带事务的session
如果session带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。
2、不带事务的session
不带事务的session的签收方式,取决于session的配置。
Activemq支持一下三种模式:
Session.AUTO_ACKNOWLEDGE 消息自动签收
Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收
Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送。在第二次重新传递消息的时候,消息头 的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。
发布订阅
生产者
public class TOPSend {private static String BROKERURL = "tcp://127.0.0.1:61616";private static String TOPIC = "my-topic";public static void main(String[] args) throws JMSException {start();}static public void start() throws JMSException {System.out.println("生产者已经启动....");// 创建ActiveMQConnectionFactory 会话工厂ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);Connection connection = activeMQConnectionFactory.createConnection();// 启动JMS 连接connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);MessageProducer producer = session.createProducer(null);producer.setDeliveryMode(DeliveryMode.PERSISTENT);send(producer, session);System.out.println("发送成功!");connection.close();}static public void send(MessageProducer producer, Session session) throws JMSException {for (int i = 1; i <= 5; i++) {System.out.println("我是消息" + i);TextMessage textMessage = session.createTextMessage("我是消息" + i);Destination destination = session.createTopic(TOPIC);producer.send(destination, textMessage);}}}
消费者:
public class TopReceiver {private static String BROKERURL = "tcp://127.0.0.1:61616";private static String TOPIC = "my-topic";public static void main(String[] args) throws JMSException {start();}static public void start() throws JMSException {System.out.println("消费点启动...");// 创建ActiveMQConnectionFactory 会话工厂ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);Connection connection = activeMQConnectionFactory.createConnection();// 启动JMS 连接connection.start();// 不开消息启事物,消息主要发送消费者,则表示消息已经签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 创建一个队列Topic topic = session.createTopic(TOPIC);MessageConsumer consumer = session.createConsumer(topic);// consumer.setMessageListener(new MsgListener());while (true) {TextMessage textMessage = (TextMessage) consumer.receive();if (textMessage != null) {System.out.println("接受到消息:" + textMessage.getText());// textMessage.acknowledge();// 手动签收// session.commit();} else {break;}}connection.close();}}
