基本指令
service rabbitmq-server start # 开机
service rabbitmq-server stop # 关机
service rabbitmq-server status # 运行状态
rabbitmqctl add_user {username} {pwd} # 创建用户
rabbitmqctl delete_user {username} # 删除用户
rabbitmqctl change_password {username} {newPwd} # 修改用户密码
rabbitmqctl set_user_tags {username} {tag} # 分配角色
rabbitmqctl list_users # 查看用户列表
rabbitmqctl set_permissions -p {vhost} {user} '.*' '.*' '.*' # 给用户赋权使其拥有指定vhost的配置写读权限
rabbitmqctl list_user_permissions {user} # 查看指定用户的权限
rabbitmqctl clear_permissions [-p vhostPath] {user} # 清除权限
交换机
种类:
- 直接(direct)
- 主题(topic)
- 标题(headers)
- 发布订阅(fanout)
工作模式
1.Hello World模式
获取信道
package cc.fireflyhut.rabbitmqstudy.client;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMqClient {
public static Channel getProducerChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
return connection.createChannel();
}
public static Channel getConsumerChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("用户名");
factory.setPassword("密码");
Connection connection = factory.newConnection();
return connection.createChannel();
}
}
生产者
package cc.fireflyhut.rabbitmqstudy.hellowworld.producer;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqClient.getProducerChannel();
channel.queueDeclare("Study", false, false, false, null);
channel.basicPublish("", "Study", null, "HelloWorld".getBytes(StandardCharsets.UTF_8));
}
}
消费者
package cc.fireflyhut.rabbitmqstudy.hellowworld.consumer;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqClient.getConsumerChannel();
channel.basicConsume("Study", true, (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
}, (consumerTag) -> {
System.out.println("消息取消");
});
}
}
2.Worker Queues模式(资源竞争)
生产者
package cc.fireflyhut.rabbitmqstudy.workerqueue;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;
public class WqProducer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqClient.getProducerChannel();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
while (true) {
channel.queueDeclare("Study", false, false, false, null);
channel.basicPublish("", "Study", null, sdf.format(new Date()).getBytes(StandardCharsets.UTF_8));
Thread.sleep(5000);
}
}
}
消费者
package cc.fireflyhut.rabbitmqstudy.workerqueue;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class WorkQueue {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqClient.getConsumerChannel();
for (int i = 1; i <= 3; i++) {
final int num = i;
new Thread(() -> {
try {
channel.basicConsume("Study", true, (consumerTag, message) -> {
System.out.println("Worker-" + num + ":" + new String(message.getBody()));
}, (consumerTag) -> {
System.out.println("Worker-" + num + ":消息取消");
});
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
}
运行效果
3.Publish/Subscribe发布订阅(资源共享)
发布订阅模式是一个生产者发送消息可以被交换机放入通过路由键绑定的多个队列中。
订阅者(消费者)
package cc.fireflyhut.rabbitmqstudy.publishandsubscribe;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Subscribe {
public static final String EXCHANGE_NAME = "study_fanout_exchange";
public static final String ROUTING_KEY = "study_fanout_key";
public static void main(String[] args) throws IOException, TimeoutException {
createExchange();
}
private static void createExchange() throws IOException, TimeoutException {
// 创建信道
Channel subscribeChannel = RabbitMqClient.getConsumerChannel();
// 声明交换机
subscribeChannel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 创建订阅者
createSubscriber(subscribeChannel, "S1");
createSubscriber(subscribeChannel, "S2");
}
private static void createSubscriber(Channel channel, String name) throws IOException {
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();
// 将交换机和队列通过routing key绑定
channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
//监听队列
channel.basicConsume(queueName, true, (consumerTag, message) -> {
System.out.println("订阅者" + name + "接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
}, (consumerTag) -> {
System.out.println("取消");
});
System.out.println(name + "已就绪");
}
}
发布者(生产者)
package cc.fireflyhut.rabbitmqstudy.publishandsubscribe;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import cc.fireflyhut.rabbitmqstudy.util.SleepUtil;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;
public class Publisher {
public static void main(String[] args) throws IOException, TimeoutException {
createPublisher();
}
private static void createPublisher() throws IOException, TimeoutException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
Channel channel = RabbitMqClient.getProducerChannel();
// 交换机已被声明,这里不在声明交换机
while (true) {
channel.basicPublish(Subscribe.EXCHANGE_NAME, Subscribe.ROUTING_KEY,
null, sdf.format(new Date()).getBytes(StandardCharsets.UTF_8));
SleepUtil.sleep(2);
}
}
}
测试,可以看到两个队列都可以接收到消息
4.Direct直接模式
直接模式是指一个routing key只绑定一个队列,保证消息只被一个队列接收。
消费者
package cc.fireflyhut.rabbitmqstudy.direct;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class DirectConsumer {
// 交换机名称
public static final String EXCHANGE_NAME = "direct_exchange";
// 路由键1
public static final String ROUTING_KEY_1 = "direct_routing_key_1";
// 路由键2
public static final String ROUTING_KEY_2 = "direct_routing_key_2";
// 队列1
public static final String QUEUE_1 = "direct_queue_1";
// 队列2
public static final String QUEUE_2 = "direct_queue_2";
public static void main(String[] args) throws IOException, TimeoutException {
createExchange();
}
/**
* 创建交换机并创建队列
*/
public static void createExchange() throws IOException, TimeoutException {
// 创建信道
Channel channel = RabbitMqClient.getConsumerChannel();
// 声明交换机:direc类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 创建消费者
createConsumer(channel, QUEUE_1, ROUTING_KEY_1);
createConsumer(channel, QUEUE_2, ROUTING_KEY_2);
}
/**
* 创建队列并绑定
*/
private static void createConsumer(Channel channel, String queueName, String routingKey) throws IOException {
// 创建队列
channel.queueDeclare(queueName, false, false, false, null);
// 将交换机和队列通过routing key绑定
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
// 开始消费
doConsume(queueName, channel);
}
/**
* 消费
*/
private static void doConsume(String queueName, Channel channel) throws IOException {
//监听队列
channel.basicConsume(queueName, true, (consumerTag, message) -> {
System.out.println("消费者队列" + queueName + "接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
}, (consumerTag) -> {
System.out.println("取消");
});
System.out.println(queueName + "队列消费者已就绪");
}
}
生产者
package cc.fireflyhut.rabbitmqstudy.direct;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import cc.fireflyhut.rabbitmqstudy.util.SleepUtil;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.TimeoutException;
public class DirectProducer {
public static void main(String[] args) throws IOException, TimeoutException {
createProducer();
}
private static void createProducer() throws IOException, TimeoutException {
Random random = new Random();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
Channel channel = RabbitMqClient.getProducerChannel();
// 交换机已被声明,这里不在声明交换机
while (true) {
String routingKey = null;
if (0 == random.nextInt(2)) {
routingKey = DirectConsumer.ROUTING_KEY_1;
} else {
routingKey = DirectConsumer.ROUTING_KEY_2;
}
System.out.println(routingKey + "推送消息");
channel.basicPublish(DirectConsumer.EXCHANGE_NAME, routingKey,
null, (routingKey + "-" + sdf.format(new Date())).getBytes(StandardCharsets.UTF_8));
SleepUtil.sleep(2);
}
}
}
运行效果
可以看到1队列只会消费key1的消息,2队列只会消费key2的消息。
5.Topic主题模式
最牛逼的模式,想发给谁就发给谁。指定交换机通过带有通配符的routing key绑定到队列,发送消息的时候输入的routing key能够匹配到交换机的routing key就将消息投递到绑定的队列中。
消费者
package cc.fireflyhut.rabbitmqstudy.topic;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class TopicConsumer {
public static final String EXCHANGE_NAME = "topic_exchange";
public static final String FRONT_ROUTING_KEY = "front.*.*";
public static final String MIDDLE_ROUTING_KEY = "*.middle.*";
public static final String REAR_ROUTING_KEY = "*.*.rear";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建信道
Channel channel = RabbitMqClient.getConsumerChannel();
// 声明交换机:direc类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 创建队列并绑定
createConsumer(channel, "topic_front", FRONT_ROUTING_KEY);
createConsumer(channel, "topic_middle", MIDDLE_ROUTING_KEY);
createConsumer(channel, "topic_rear", REAR_ROUTING_KEY);
}
/**
* 创建队列并绑定
*/
private static void createConsumer(Channel channel, String queueName, String routingKey) throws IOException {
// 创建队列
channel.queueDeclare(queueName, false, false, false, null);
// 将交换机和队列通过routing key绑定
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
// 开始消费
doConsume(queueName, channel);
}
/**
* 执行消费
*/
private static void doConsume(String queueName, Channel channel) throws IOException {
//监听队列
channel.basicConsume(queueName, true, (consumerTag, message) -> {
System.out.println("消费者队列" + queueName + "接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
}, (consumerTag) -> {
System.out.println("取消");
});
System.out.println(queueName + "队列消费者已就绪");
}
}
生产者
package cc.fireflyhut.rabbitmqstudy.topic;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import cc.fireflyhut.rabbitmqstudy.util.SleepUtil;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class TopicProducer {
// key:routingKey, value:message
public static final Map<String, String> testMsg = new HashMap<>();
static {
testMsg.put("front.waibibabo.haihaihai", "只有front队列接收");
testMsg.put("wuxidixi.middle.waibiwaibi", "只有middle队列接收");
testMsg.put("xuanbu.geshi.rear", "只有rear队列接受");
testMsg.put("front.middle.woshigeshabi", "只有front和middle队列接收");
testMsg.put("woxinmaideche.middle.rear", "只有middle和rear队列接收");
testMsg.put("front.nizenmekaideche.rear", "只有front和rear队列接收");
testMsg.put("front.middle.rear", "front、middle和rear队列都会接收");
testMsg.put("front.tuanzhang.nijiushigejb", "只有front队列接收");
testMsg.put("koupizi.middle.guamazi", "只有middle队列接收");
testMsg.put("zhuifengzi.fshazi.rear", "只有rear队列接受");
testMsg.put("front.middle.tuituitui", "只有front和middle队列接收");
testMsg.put("aduiduidui.middle.rear", "只有middle和rear队列接收");
testMsg.put("front.tingwoshuoxiexieni.rear", "只有front和rear队列接收");
testMsg.put("front.tingwoshuoxiexieni.rear.guer", "没人要,会被丢弃");
}
public static void main(String[] args) throws IOException, TimeoutException {
createProducer();
}
private static void createProducer() throws IOException, TimeoutException {
Channel channel = RabbitMqClient.getProducerChannel();
for (Map.Entry<String, String> entry : testMsg.entrySet()) {
String routingKey = entry.getKey();
String message = entry.getValue();
channel.basicPublish(TopicConsumer.EXCHANGE_NAME, routingKey,
null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("已发送,key:" + routingKey + ",消息:" + message);
SleepUtil.sleep(2);
}
}
}
运行效果
手动应答
为了不丢消息,建议手动应答
package cc.fireflyhut.rabbitmqstudy.workerqueue;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class WorkQueue {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqClient.getConsumerChannel();
for (int i = 1; i <= 3; i++) {
final int num = i;
new Thread(() -> {
try {
// 队列名 是否自动应答 处理 取消
channel.basicConsume("Study", false, (consumerTag, message) -> {
System.out.println("Worker-" + num + ":" + new String(message.getBody()));
// 手动应答:deliveryTag 是否批量应答(指应答当前信道中所有消息,不推荐开启)
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}, (consumerTag) -> {
System.out.println("Worker-" + num + ":消息取消");
});
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
}
持久化
为了确保消息得可靠性,需要将队列和消息进行持久化,持久化后及时RabbitMQ挂了消息和队列也不会丢失。
- 队列持久化:关机或者宕机后队列还在
注意:当一个队列已经存在,尝试再次用不同参数定义队列时会报错。报错信息如下
Exception in thread "main" java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:968)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333)
at cc.fireflyhut.rabbitmqstudy.workerqueue.WqProducer.main(WqProducer.java:22)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'Study' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
... 3 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'Study' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:522)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:719)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:646)
at java.lang.Thread.run(Thread.java:745)
- 消息持久化:关机或者宕机后消息还在
package cc.fireflyhut.rabbitmqstudy.workerqueue;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;
public class WqProducer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqClient.getProducerChannel();
/**
* 定义队列时设置队列持久化
* 第二个参数设置成 true
* @param durable 是否开启队列持久化
*/
channel.queueDeclare("Study", true, false, false, null);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
while (true) {
/**
* 发布消息时设置消息持久化
* 第3个参数设置成 MessageProperties.PERSISTENT_TEXT_PLAIN
* @param props 设置成消息持久化
*/
channel.basicPublish("", "Study", MessageProperties.PERSISTENT_TEXT_PLAIN, sdf.format(new Date()).getBytes(StandardCharsets.UTF_8));
Thread.sleep(5000);
}
}
}
发布确认
为了不丢失消息,生产者发布消息时建议开启发布确认。
单个发布确认
package cc.fireflyhut.rabbitmqstudy.workerqueue;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;
public class WqProducer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqClient.getProducerChannel();
// 定义队列
channel.queueDeclare("Study", true, false, false, null);
// 开启发布确认(*)
channel.confirmSelect();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
long start = System.currentTimeMillis();
for (int i = 1; i <= 1000; i++) {
channel.basicPublish("", "Study", MessageProperties.PERSISTENT_TEXT_PLAIN,
(i + "-" + sdf.format(new Date())).getBytes(StandardCharsets.UTF_8));
// 在这里进行确认
boolean confirm = channel.waitForConfirms();
if (confirm) {
System.out.println(i + ":已确认发布");
}
}
System.out.println("单个确认耗时:" + String.valueOf(System.currentTimeMillis() - start));
}
}
可以看到单个确认发布1000条消息耗时是比较长的(我的服务器可能吞吐量非常小会有一些影响)
批量发布确认
package cc.fireflyhut.rabbitmqstudy.workerqueue;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;
public class WqProducer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqClient.getProducerChannel();
// 定义队列
channel.queueDeclare("Study", true, false, false, null);
// 开启发布确认(*)
channel.confirmSelect();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
long start = System.currentTimeMillis();
for (int i = 1; i <= 1000; i++) {
channel.basicPublish("", "Study", MessageProperties.PERSISTENT_TEXT_PLAIN,
(i + "-" + sdf.format(new Date())).getBytes(StandardCharsets.UTF_8));
// 这里设置成批量确认:每100条确认一次
if (i % 100 == 0) {
boolean confirm = channel.waitForConfirms();
if (confirm) {
System.out.println((i - 99) + "~" + i + ":已确认发布");
}
}
}
System.out.println("批量确认耗时:" + String.valueOf(System.currentTimeMillis() - start));
}
}
可以看到这里的时间明显减少
异步发布确认
package cc.fireflyhut.rabbitmqstudy.workerqueue;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;
public class WqProducer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqClient.getProducerChannel();
// 定义队列
channel.queueDeclare("Study", true, false, false, null);
// 开启发布确认(*)
channel.confirmSelect();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
// 开启异步确认监听器
channel.addConfirmListener((deliveryTag, multiple) -> {
System.out.println("已确认:" + deliveryTag + ";是否批量:" + multiple);
}, (deliveryTag, multiple) -> {
System.out.println("未确认:" + deliveryTag + ";是否批量:" + multiple);
});
// 发布
long start = System.currentTimeMillis();
for (int i = 1; i <= 1000; i++) {
// 发布消息
channel.basicPublish("", "Study", MessageProperties.PERSISTENT_TEXT_PLAIN,
(i + "-" + sdf.format(new Date())).getBytes(StandardCharsets.UTF_8));
}
System.out.println("发布消息耗时:" + String.valueOf(System.currentTimeMillis() - start));
}
}
发布消息和发布确认两个事情分头进行,发布消息时间就更加节省
未确认处理
package cc.fireflyhut.rabbitmqstudy.workerqueue;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
public class WqProducer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
// 发送消息缓存池
final ConcurrentSkipListMap<Long, String> msgCache = new ConcurrentSkipListMap<>();
// 拿到信道
Channel channel = RabbitMqClient.getProducerChannel();
// 定义队列
channel.queueDeclare("Study", true, false, false, null);
// 开启发布确认(*)
channel.confirmSelect();
// 开启异步确认监听器
channel.addConfirmListener((deliveryTag, multiple) -> {
if (multiple) {
System.out.println("已批量确认确认:" + deliveryTag);
// 获取前deliveryTag个
ConcurrentNavigableMap<Long, String> confirm = msgCache.headMap(deliveryTag, true);
// 批量删除
confirm.clear();
} else {
// 单个删除
msgCache.remove(deliveryTag);
}
}, (deliveryTag, multiple) -> {
if (multiple) {
// 批量未确认
ConcurrentNavigableMap<Long, String> unconfirmed = msgCache.headMap(deliveryTag, true);
// 循环处理
for (Map.Entry<Long, String> entry : unconfirmed.entrySet()) {
Long tag = entry.getKey();
String unconfirmedMsg = entry.getValue();
System.out.println("未确认的消息,tag = " + tag + ",消息内容:" + unconfirmedMsg);
}
} else {
String unconfirmedMsg = msgCache.get(deliveryTag);
System.out.println("未确认的消息,tag = " + deliveryTag + ",消息内容:" + unconfirmedMsg);
}
});
// 发布
long start = System.currentTimeMillis();
for (int i = 1; i <= 10; i++) {
String msg = i + "-" + sdf.format(new Date());
// 将发布的消息放到缓存池中
msgCache.put(channel.getNextPublishSeqNo(), msg);
// 发布消息
channel.basicPublish("", "Study",
MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes(StandardCharsets.UTF_8));
}
System.out.println("发布消息耗时:" + String.valueOf(System.currentTimeMillis() - start));
}
}
死信队列
死信的来源
- 消息TTL过期
- 队列满了
- 消息被拒绝(reject或者nack,同时requeue = false)
TTL过期死信
发布消息将TTL设置成10s,然后模拟正常消费者挂了,观察正常队列和信息队列消息数量。
死信消费者
package cc.fireflyhut.rabbitmqstudy.deadletter;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class DeadLetterConsumer {
public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
public static final String DEAD_LETTER_ROUTING_KEY = "dead_letter_routing_key";
public static void main(String[] args) throws IOException, TimeoutException {
createDeadLetterExchange(DEAD_LETTER_EXCHANGE, DEAD_LETTER_QUEUE, DEAD_LETTER_ROUTING_KEY);
}
private static void createDeadLetterExchange(String exchangeName, String queueName, String routingKey) throws IOException, TimeoutException {
Channel channel = RabbitMqClient.getConsumerChannel();
// 声明交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
// 声明队列
channel.queueDeclare(queueName, false, false, false, null);
// 绑定
channel.queueBind(queueName, exchangeName, routingKey);
// 消费
doDeadLetterConsume(channel, queueName);
}
private static void doDeadLetterConsume(Channel channel, String queueName) throws IOException {
// 监听消息
channel.basicConsume(queueName, true, (consumerTag, message) -> {
System.out.println("死信消费者已接收:" + new String(message.getBody(), StandardCharsets.UTF_8));
}, consumerTag -> {
System.out.println("已取消");
});
System.out.println("死信队列消费者已就绪");
}
}
正常消费者
package cc.fireflyhut.rabbitmqstudy.deadletter;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class NormalConsumer {
public static final String NORMAL_LETTER_EXCHANGE = "normal_letter_exchange";
public static final String NORMAL_LETTER_QUEUE = "normal_letter_queue";
public static final String NORMAL_LETTER_ROUTING_KEY = "normal_letter_routing_key";
public static void main(String[] args) throws IOException, TimeoutException {
createExchange(NORMAL_LETTER_EXCHANGE, NORMAL_LETTER_QUEUE, NORMAL_LETTER_ROUTING_KEY);
}
private static void createExchange(String exchangeName, String queueName, String routingKey) throws IOException, TimeoutException {
Channel channel = RabbitMqClient.getConsumerChannel();
// 声明交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
// 声明队列
channel.queueDeclare(queueName, false, false, false, buildQueueProperties());
// 绑定
channel.queueBind(queueName, exchangeName, routingKey);
// 消费
doConsume(channel, queueName);
}
/**
* 设置队列属性
*/
private static Map<String, Object> buildQueueProperties() {
Map<String, Object> prop = new HashMap<>();
// 指定其死信交换机
prop.put("x-dead-letter-exchange", DeadLetterConsumer.DEAD_LETTER_EXCHANGE);
// 执行其死信路由键
prop.put("x-dead-letter-routing-key", DeadLetterConsumer.DEAD_LETTER_ROUTING_KEY);
// 设置消息过期时间
/// 消息发送方也可以指定
//prop.put("x-message-ttl", "1000");
/// 后续补充
//prop.put("", "");
return prop;
}
private static void doConsume(Channel channel, String queueName) throws IOException {
// 监听消息
channel.basicConsume(queueName, true, (consumerTag, message) -> {
System.out.println("正常消费者已接收:" + new String(message.getBody(), StandardCharsets.UTF_8));
}, consumerTag -> {
System.out.println("已取消");
});
System.out.println("正常队列消费者已就绪");
}
}
生产者
package cc.fireflyhut.rabbitmqstudy.deadletter;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class DeadLetterProducer {
public static int PUBLISH_NUM = 10;
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqClient.getProducerChannel();
// 循环发布消息
for (int i = 1; i <= PUBLISH_NUM; i++) {
String msg = "测试死信" + i;
channel.basicPublish(NormalConsumer.NORMAL_LETTER_EXCHANGE, NormalConsumer.NORMAL_LETTER_ROUTING_KEY,
getTtlProp("10000"), msg.getBytes(StandardCharsets.UTF_8));
System.out.println("已发布:" + msg);
}
}
// 获取过期时间的属性
private static AMQP.BasicProperties getTtlProp(String millisecond) {
// 获取过期时间属性
AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().expiration(millisecond).build();
return prop;
}
}
运行效果:
将死信和正常交换机和队列声明并将其绑定后并将其监听线程关闭,只开启生产者发布线程,然后观察队列消息数量。
发送消息到正常队列
在十秒内观察到消息在正常队列滞留
十秒后观察到消息被转发至死信队列并在其中滞留
开启死信队列消费线程,观察到死信队列消费者将其消费掉了
队列满死信
修改正常队列消费者
package cc.fireflyhut.rabbitmqstudy.deadletter;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class NormalConsumer {
public static final String NORMAL_LETTER_EXCHANGE = "normal_letter_exchange";
public static final String NORMAL_LETTER_QUEUE = "normal_letter_queue";
public static final String NORMAL_LETTER_ROUTING_KEY = "normal_letter_routing_key";
public static void main(String[] args) throws IOException, TimeoutException {
createExchange(NORMAL_LETTER_EXCHANGE, NORMAL_LETTER_QUEUE, NORMAL_LETTER_ROUTING_KEY);
}
private static void createExchange(String exchangeName, String queueName, String routingKey) throws IOException, TimeoutException {
Channel channel = RabbitMqClient.getConsumerChannel();
// 声明交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
// 声明队列
channel.queueDeclare(queueName, false, false, false, buildQueueProperties());
// 绑定
channel.queueBind(queueName, exchangeName, routingKey);
// 消费
doConsume(channel, queueName);
}
/**
* 设置队列属性
*/
private static Map<String, Object> buildQueueProperties() {
Map<String, Object> prop = new HashMap<>();
// 指定其死信交换机
prop.put("x-dead-letter-exchange", DeadLetterConsumer.DEAD_LETTER_EXCHANGE);
// 执行其死信路由键
prop.put("x-dead-letter-routing-key", DeadLetterConsumer.DEAD_LETTER_ROUTING_KEY);
// 队列最大长度
prop.put("x-max-length", 6);
// 设置消息过期时间
/// 消息发送方也可以指定
//prop.put("x-message-ttl", "1000");
/// 后续补充
//prop.put("", "");
return prop;
}
private static void doConsume(Channel channel, String queueName) throws IOException {
// 监听消息
channel.basicConsume(queueName, true, (consumerTag, message) -> {
System.out.println("正常消费者已接收:" + new String(message.getBody(), StandardCharsets.UTF_8));
}, consumerTag -> {
System.out.println("已取消");
});
System.out.println("正常队列消费者已就绪");
}
}
修改生产者
package cc.fireflyhut.rabbitmqstudy.deadletter;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class DeadLetterProducer {
public static int PUBLISH_NUM = 10;
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqClient.getProducerChannel();
// 循环发布消息
for (int i = 1; i <= PUBLISH_NUM; i++) {
String msg = "测试死信" + i;
/// 注释掉过期时间方式
// channel.basicPublish(NormalConsumer.NORMAL_LETTER_EXCHANGE, NormalConsumer.NORMAL_LETTER_ROUTING_KEY,
// getTtlProp("10000"), msg.getBytes(StandardCharsets.UTF_8));
// 发送无属性消息
channel.basicPublish(NormalConsumer.NORMAL_LETTER_EXCHANGE, NormalConsumer.NORMAL_LETTER_ROUTING_KEY,
null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("已发布:" + msg);
}
}
// 获取过期时间的属性
private static AMQP.BasicProperties getTtlProp(String millisecond) {
// 获取过期时间属性
AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().expiration(millisecond).build();
return prop;
}
}
启动程序前,删除已有队列。启动死信消费者和正常消费者创建队列后将其关闭。只启动生产者观察队列状态。
发现多出的4条进入死信队列
启动正常队列和死信队列观察消费情况
发现1~4条被死信队列消费,5~10条被正常队列消费
拒绝死信
只修改正常消费者,将其拒绝
package cc.fireflyhut.rabbitmqstudy.deadletter;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class NormalConsumer {
public static final String NORMAL_LETTER_EXCHANGE = "normal_letter_exchange";
public static final String NORMAL_LETTER_QUEUE = "normal_letter_queue";
public static final String NORMAL_LETTER_ROUTING_KEY = "normal_letter_routing_key";
public static void main(String[] args) throws IOException, TimeoutException {
createExchange(NORMAL_LETTER_EXCHANGE, NORMAL_LETTER_QUEUE, NORMAL_LETTER_ROUTING_KEY);
}
private static void createExchange(String exchangeName, String queueName, String routingKey) throws IOException, TimeoutException {
Channel channel = RabbitMqClient.getConsumerChannel();
// 声明交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
// 声明队列
channel.queueDeclare(queueName, false, false, false, buildQueueProperties());
// 绑定
channel.queueBind(queueName, exchangeName, routingKey);
// 消费
doConsume(channel, queueName);
}
/**
* 设置队列属性
*/
private static Map<String, Object> buildQueueProperties() {
Map<String, Object> prop = new HashMap<>();
// 指定其死信交换机
prop.put("x-dead-letter-exchange", DeadLetterConsumer.DEAD_LETTER_EXCHANGE);
// 执行其死信路由键
prop.put("x-dead-letter-routing-key", DeadLetterConsumer.DEAD_LETTER_ROUTING_KEY);
// 队列最大长度
/// 演示拒绝将其注释
//prop.put("x-max-length", 6);
// 设置消息过期时间
/// 消息发送方也可以指定
//prop.put("x-message-ttl", "1000");
/// 后续补充
//prop.put("", "");
return prop;
}
private static void doConsume(Channel channel, String queueName) throws IOException {
// 监听消息:修改autoAck为false
channel.basicConsume(queueName, false, (consumerTag, message) -> {
System.out.println("正常消费者已拒绝:" + new String(message.getBody(), StandardCharsets.UTF_8));
// 拒绝消息
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {
System.out.println("已取消");
});
System.out.println("正常队列消费者已就绪");
}
}
启动程序前,删除已有队列。先启动正常消费者和死信消费者,然后再启动生产者。
发现被正常消费者拒绝的消息已被死信队列全盘接收。
延迟队列
下载安装插件
# 停止rabbitmq
service rabbitmq-server stop
# 进入插件目录
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.x.x/plugins
# 上传插件
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 开启rabbitmq
service rabbitmq-server start
代码演示
消费者
package cc.fireflyhut.rabbitmqstudy.xdelayed;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import cc.fireflyhut.rabbitmqstudy.util.TimeUtil;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class DelayedConsumer {
public static final String DELAYED_EXCHANGE_NAME = "delayed_exchange";
public static final String DELAYED_QUEUE_NAME = "delayed_queue";
public static final String DELAYED_KEY_NAME = "delayed_key";
public static void main(String[] args) throws IOException, TimeoutException {
createConsumer(RabbitMqClient.getConsumerChannel(),
DELAYED_EXCHANGE_NAME, DELAYED_QUEUE_NAME, DELAYED_KEY_NAME, getDelayedArgs());
}
private static Map<String, Object> getDelayedArgs() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return args;
}
/**
* 创建队列并绑定
*/
private static void createConsumer(Channel channel, String exchangeName,
String queueName, String routingKey, Map<String, Object> args) throws IOException {
// 定义交换机
channel.exchangeDeclare(exchangeName, "x-delayed-message", true, false, args);
// 创建队列
channel.queueDeclare(queueName, true, false, false, null);
// 将交换机和队列通过routing key绑定
channel.queueBind(queueName, exchangeName, routingKey);
// 开始消费
doConsume(queueName, channel);
}
/**
* 执行消费
*/
private static void doConsume(String queueName, Channel channel) throws IOException {
//监听队列
channel.basicConsume(queueName, true, (consumerTag, message) -> {
System.out.println("消费者队列" + queueName + "于" + TimeUtil.now() + "接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
}, (consumerTag) -> {
System.out.println("取消");
});
System.out.println(queueName + "队列消费者已就绪");
}
}
生产者
package cc.fireflyhut.rabbitmqstudy.xdelayed;
import cc.fireflyhut.rabbitmqstudy.client.RabbitMqClient;
import cc.fireflyhut.rabbitmqstudy.util.TimeUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class DelayedProducer {
public static void main(String[] args) throws IOException, TimeoutException {
createProducer();
}
private static void createProducer() throws IOException, TimeoutException {
Channel channel = RabbitMqClient.getProducerChannel();
for (int i = 1; i <= 10; i++) {
String message = "消息体:" + i + ", " + TimeUtil.now() + "时发送, 延迟" + (i * 1000) + "毫秒";
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(getDelayedMap(i * 1000)).build();
channel.basicPublish(DelayedConsumer.DELAYED_EXCHANGE_NAME, DelayedConsumer.DELAYED_KEY_NAME,
properties, message.getBytes(StandardCharsets.UTF_8));
System.out.println("已发送,消息:" + message);
}
}
public static Map<String, Object> getDelayedMap(Integer millisecond) {
Map<String, Object> prop = new HashMap<>();
prop.put("x-delay", millisecond);
return prop;
}
}
运行演示
生产者
消费者