1.将下面三个文件放入服务器中
2.先安装erlang,再安装socat,再安装rabbitmq
rpm -ivh esl-erlang_23.0.3-1~centos~8_amd64.rpmrpm -ivh socat-1.7.3.3-2.el8.x86_64.rpmrpm -ivh rabbitmq-server-3.8.6-1.el8.noarch.rpm
出现warning::Header V3 RSA/SHA256 Signature, key ID ec551f03: NOKEY警告。
解决办法:在rpm 语句后面加上 —force —nodeps就可以了。
3.启动web管理页面
rabbitmq-plugins enable rabbitmq_management
使用http://ip:15672登录,默认用户为guest,密码为guest
你也可以创建一个用户:
rabbitmqctl add_user admin 123456
然后使用账户登录管理页面:
如何在java程序中使用
接下来用java程序执行我们的hello,rabbitmq程序:
首先要导入rabbitmq的依赖:
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency>
然后在web管理页面创建一个虚拟主机:
然后创建一个用户与其绑定,权限也为admin权限:
绑定刚才创建的主机:
点击这个用户名:
然后点击刚才的虚拟主机,再点击set permission
可以看到,ems用户此时已经绑定ems虚拟主机
接下来,我们在java中创建一个生产者的测试类:
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import org.junit.jupiter.api.Test;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Provider {/*** @author: zhangyaomin* @Date: 2020/8/17 10:29* @Description: RabbitMQ第一种方式:直连,生产消息*/@Testpublic void testSendMessage() throws IOException, TimeoutException {//1.创建连接mq的连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置连接mq的主机及其信息//ipfactory.setHost("xx.xxx.xxx.xxx");//端口factory.setPort(5672);//虚拟主机名factory.setVirtualHost("/ems");//用户factory.setUsername("ems");//用户密码factory.setPassword("123456");}}
由于我们这次测试需要将通道与消息队列绑定,我们首先需要创建一个消息队列,也可以用代码创建:
//3.获取连接对象Connection connection = factory.newConnection();//4.获取连接通道对象Channel channel = connection.createChannel();//5.通道绑定消息队列/** 参数1:队列的名称,不存在则自动创建* 参数2:队列是否持久化* 参数3:是否独占队列(当前队列只允许当前连接可用)* 参数4:是否在消费完成自动删除* 参数5:额外参数*/channel.queueDeclare("hello",false,false,false,null);
然后我们创建发布消息:
//6.创建发布消息/** 参数1:交换机名称,此处是直连,所以没有交换机* 参数2:队列名称* 参数3:传递消息的额外设置* 参数4:消息内容*/channel.basicPublish("","hello",null,"hello,rabbitmq".getBytes());//7.关闭通道对象与连接对象channel.close();connection.close();
执行该类,然后查看管理页面:
可以看到我们成功的创建了一个名叫hello的队列,生产者的任务就完成了
接下来我们来模拟消费者:
package com.alex.blog.rabbitmq.helloworld;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Customer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接mq的连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置连接mq的主机及其信息//ipfactory.setHost("xx.xxx.xxx.xxx");//端口factory.setPort(5672);//虚拟主机名factory.setVirtualHost("/ems");//用户factory.setUsername("ems");//用户密码factory.setPassword("123456");//3.获取连接对象Connection connection = factory.newConnection();//4.获取连接通道对象Channel channel = connection.createChannel();//5.通道绑定消息队列/** 参数1:队列的名称,不存在则自动创建* 参数2:队列是否持久化* 参数3:是否独占队列(当前队列只允许当前连接可用)* 参数4:是否在消费完成自动删除* 参数5:额外参数*/channel.queueDeclare("hello",false,false,false,null);//6.消费消息/** 参数1:消费哪个队列里的消息* 参数2:开启消息自动确认机制* 参数3:消费消息时的回调接口*/channel.basicConsume("hello",true,new DefaultConsumer(channel){@Override//用来处理消息时的一个回调/** 参数1:* 参数2:* 参数3:* 参数4:消息队列中取出的消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息队列中的消息:"+new String(body));}});//7.关闭通道对象与连接对象,不关闭的话会一直监听队列//channel.close();//connection.close();}}
运行结果:
