点对点(队列Queue)模式
生产者
1、创建普通的Maven项目加入ActiveMQ的依赖。
<!--activemq需要的jar包 不是使用最新版本的因为可以有BUG -->
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.12</version>
</dependency>
<!--下面是log4j等通用配置 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
</dependency>
</dependencies>
2、创建生产者发送消息。
步骤:
1、创建ConnectionFactory对象,需要指定服务端ip及端口号
2、使用ConnectionFactory对象创建一个Connection对象
3、开启连接,调用Connection对象的start方法
4、使用Connection对象创建一个Session对象
5、使用Session对象创建一个Destination对象[topic、queue],此处创建一个Queue对象
6、使用Session对象创建一个Producer对象
7、创建一个Message对象,创建一个TextMessage对象
8、使用Producer对象发送消息
9、关闭资源
package com.coydone.test;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TestMessageProducer {
public static final String BROKER_URL = "tcp://127.0.0.1:61616";
private static final String QUEUE_NAME = "queue-test";
public static void main(String[] args) throws JMSException {
//1、创建ConnectionFactory对象,需要指定服务端ip及端口号
ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
//2、使用ConnectionFactory对象创建一个Connection对象
Connection connection = factory.createConnection();
//3、开启连接,调用Connection对象的start方法
connection.start();
//4、使用Connection对象创建一个Session对象
//参数1:是否开启事务。true:开启事务,第二个参数忽略。
//参数2:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象
Queue queue = session.createQueue(QUEUE_NAME);
//6、使用Session对象创建一个Producer对象
MessageProducer producer = session.createProducer(queue);
//7、创建一个Message对象,创建一个TextMessage对象
TextMessage textMessage = session.createTextMessage("我是一个消息----helo");
//8、使用Producer对象发送消息
producer.send(textMessage);
//9、关闭资源
producer.close();
session.close();
connection.close();
System.out.println("消息发送成功");
}
}
3、在浏览器中的activemq中的Queues中可以看到有一个name为queue-test的消息。
消费者
消费者有两种消费方法:
1、同步消费:通过调用消费者的receive方法从目的地中显式提取消息,receive方法可以一直阻塞到消息到达。
2、异步消费:客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。实现MessageListener接口,在MessageListener()方法中实现消息的处理逻辑。
同步消费
package com.coydone.test;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 同步消费者
*/
public class TestMessageSyncConsumer {
public static final String BROKER_URL = "tcp://127.0.0.1:61616";
private static final String QUEUE_NAME = "queue-test";
public static void main(String[] args) throws JMSException {
//1、创建一个连接工厂
ConnectionFactory factory=new ActiveMQConnectionFactory(BROKER_URL);
//2、创建一个连接
Connection connection = factory.createConnection();
//3、打开连接
connection.start();
//4、创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、创建目的地
Queue queue = session.createQueue(QUEUE_NAME);
//6、创建消费者
MessageConsumer consumer = session.createConsumer(queue);
//7、接收消息
//receive的说明 默认没参数是一直等待
//receive(long time) time代表阻塞时长,时间一过就不等了
TextMessage message = (TextMessage) consumer.receive(2000); //因为前后发送的是TextMesssage
//8、输出消息
System.out.println(message.getText());
/*
//循环消费
while(true){
TextMessage message = (TextMessage) consumer.receive();
System.out.println(message.getText());
}
*/
//9、关闭资源
consumer.close();
session.close();
connection.close();
System.out.println("消息消费成功");
}
}
异步消费
package com.coydone.test;
import lombok.SneakyThrows;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 异步消费
*/
public class TestMessageASyncConsumer {
public static final String BROKER_URL = "tcp://127.0.0.1:61616";
private static final String QUEUE_NAME = "queue-test";
public static void main(String[] args) throws Exception {
//1、创建一个连接工厂
ConnectionFactory factory=new ActiveMQConnectionFactory(BROKER_URL);
//2、创建一个连接
Connection connection = factory.createConnection();
//3、打开连接
connection.start();
//4、创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、创建目的地
Queue queue = session.createQueue(QUEUE_NAME);
//6、创建消费者
MessageConsumer consumer = session.createConsumer(queue);
//7、接收消息
consumer.setMessageListener(new MessageListener() {
@SneakyThrows
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage textMessage= (TextMessage) message;
System.out.println("接收到消息:"+textMessage.getText());
}
}
});
System.out.println("消息监听中:-----");
System.in.read();
//9、关闭资源
consumer.close();
session.close();
connection.close();
System.out.println("消息消费成功");
}
}
一个消费的监听器,当有消息到达时会回调里面的onMessage的方法,在测试的时候不能让应用程序结束,所以在关闭资源前加System.in.read();
。
总结:
1、每一个消息只能有一个消费者,类似1对1的关系。好比个人快递自己领取自己的。
2、消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息,好比我们发短息,发送者发送后不见得接收者会立即接收,如关机。
3、消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息。
发布/订阅(Topic)模式
先启动消费者再启动生产者,要不然发送的废消息。例如你不关注订阅号就接收不到消息。
生产者
public class TestMessageTopicProducer {
private static final String BROKER_URL = "tcp://127.0.0.1:61616";
private static final String TOPIC_NAME = "topic-test";
public static void main(String[] args) throws Exception {
ConnectionFactory factory=new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
//TextMessage message = session.createTextMessage("我是一个主题消息");
ActiveMQTextMessage textMessage=new ActiveMQTextMessage();
textMessage.setText("我是一个new出来的主题消息");
//使用Producer对象发送消息
producer.send(textMessage);
System.out.println("主题消费发送成功");
producer.close();
session.close();
connection.close();
}
}
消费者
public class TestMessageTopicConsumer {
private static final String BROKER_URL = "tcp://127.0.0.1:61616";
private static final String TOPIC_NAME = "topic-test";
public static void main(String[] args) throws Exception {
ConnectionFactory factory=new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createConsumer(topic);
//监听消息
consumer.setMessageListener(new MessageListener() {
@SneakyThrows
@Override
public void onMessage(Message message) {
ActiveMQTextMessage textMessage= (ActiveMQTextMessage) message;
System.out.println("消费者接收到消息:"+textMessage.getText());
}
});
System.out.println("主题消费者启动成功");
System.in.read();
consumer.close();
session.close();
connection.close();
}
}
点对点与发布订阅模式比较
比较项目 | Topic模式队列 | Queue模式队列 |
---|---|---|
工作模式 | “订阅-发布”模式,如果当前没有订阅者,消息将会被丢弃;如果有多个订阅者,那么这些订阅者都会收到消息。 | “负载均衡”模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息也只会发送给其中一个消费者,并且要求消费者ack消息。 |
有无状态 | 无状态 | Queue数据默认会在mq服务器上以文件形式保存,比如ActiveMQ一般保存在$AMQ_HOME\data\kr-store\data 下面。也可以配置成DB存储。 |
传递完整性 | 如果没有订阅者,消息会被丢弃 | 消息不会丢弃 |
处理效率 | 由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异 | 由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会有明显降低,当然不同消息协议的具体性能也是有差异的 |
发布/订阅模式(持久)
默认情况下,消费者是一个活动的非持久的订阅者。当我上线之后,只能接收我上线之后的消息。
设置持久订阅者:
public class TestMessageTopicDurableConsumer {
private static final String BROKER_URL = "tcp://127.0.0.1:61616";
private static final String TOPIC_NAME = "topic-test";
public static void main(String[] args) throws Exception {
ConnectionFactory factory=new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = factory.createConnection();
//设置订阅者的名字
connection.setClientID("zhangsan");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "---");
subscriber.setMessageListener(new MessageListener() {
@SneakyThrows
@Override
public void onMessage(Message message) {
ActiveMQTextMessage textMessage= (ActiveMQTextMessage) message;
System.out.println("消费者接收到消息:"+textMessage.getText());
}
});
System.out.println("主题消费者启动成功");
System.in.read();
subscriber.close();
session.close();
connection.close();
}
}
评论区