侧边栏壁纸
博主头像
coydone博主等级

记录学习,分享生活的个人站点

  • 累计撰写 306 篇文章
  • 累计创建 51 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

队列和主题模式

coydone
2022-06-12 / 0 评论 / 0 点赞 / 256 阅读 / 7,963 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-05-01,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

点对点(队列Queue)模式

生产者

1、创建普通的Maven项目加入ActiveMQ的依赖。

<!--activemq需要的jar包  不是使用最新版本的因为可以有BUG -->
<dependencies>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.15.12</version>
    </dependency>
    <!--下面是log4j等通用配置 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.16.18</version>
    </dependency>
</dependencies>

2、创建生产者发送消息。

步骤:
1、创建ConnectionFactory对象,需要指定服务端ip及端口号
2、使用ConnectionFactory对象创建一个Connection对象
3、开启连接,调用Connection对象的start方法
4、使用Connection对象创建一个Session对象
5、使用Session对象创建一个Destination对象[topic、queue],此处创建一个Queue对象
6、使用Session对象创建一个Producer对象
7、创建一个Message对象,创建一个TextMessage对象
8、使用Producer对象发送消息
9、关闭资源
package com.coydone.test;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TestMessageProducer {
    public static final String BROKER_URL = "tcp://127.0.0.1:61616";
    private static final String QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws JMSException {

        //1、创建ConnectionFactory对象,需要指定服务端ip及端口号
        ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);

        //2、使用ConnectionFactory对象创建一个Connection对象
        Connection connection = factory.createConnection();

        //3、开启连接,调用Connection对象的start方法
        connection.start();

        //4、使用Connection对象创建一个Session对象
        //参数1:是否开启事务。true:开启事务,第二个参数忽略。
        //参数2:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5、使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象
        Queue queue = session.createQueue(QUEUE_NAME);

        //6、使用Session对象创建一个Producer对象
        MessageProducer producer = session.createProducer(queue);

        //7、创建一个Message对象,创建一个TextMessage对象
        TextMessage textMessage = session.createTextMessage("我是一个消息----helo");

        //8、使用Producer对象发送消息
        producer.send(textMessage);

        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送成功");
    }
}

3、在浏览器中的activemq中的Queues中可以看到有一个name为queue-test的消息。

消费者

消费者有两种消费方法:

1、同步消费:通过调用消费者的receive方法从目的地中显式提取消息,receive方法可以一直阻塞到消息到达。

2、异步消费:客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。实现MessageListener接口,在MessageListener()方法中实现消息的处理逻辑。

同步消费

package com.coydone.test;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 同步消费者
 */
public class TestMessageSyncConsumer {
    public static final String BROKER_URL = "tcp://127.0.0.1:61616";
    private static final String QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws JMSException {
        //1、创建一个连接工厂
        ConnectionFactory factory=new ActiveMQConnectionFactory(BROKER_URL);
        //2、创建一个连接
        Connection connection = factory.createConnection();
        //3、打开连接
        connection.start();
        //4、创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、创建目的地
        Queue queue = session.createQueue(QUEUE_NAME);
        //6、创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        //7、接收消息
        //receive的说明  默认没参数是一直等待
        //receive(long time)   time代表阻塞时长,时间一过就不等了
        TextMessage message = (TextMessage) consumer.receive(2000); //因为前后发送的是TextMesssage
        //8、输出消息
        System.out.println(message.getText());
        
        /*
        //循环消费
        while(true){
        	TextMessage message = (TextMessage) consumer.receive();
        	System.out.println(message.getText());
        }
        */
        
        //9、关闭资源
        consumer.close();
        session.close();
        connection.close();

        System.out.println("消息消费成功");
    }
}

异步消费

package com.coydone.test;

import lombok.SneakyThrows;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 异步消费
 */
public class TestMessageASyncConsumer {
    public static final String BROKER_URL = "tcp://127.0.0.1:61616";
    private static final String QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws Exception {
        //1、创建一个连接工厂
        ConnectionFactory factory=new ActiveMQConnectionFactory(BROKER_URL);
        //2、创建一个连接
        Connection connection = factory.createConnection();
        //3、打开连接
        connection.start();
        //4、创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、创建目的地
        Queue queue = session.createQueue(QUEUE_NAME);
        //6、创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        //7、接收消息
        consumer.setMessageListener(new MessageListener() {
            @SneakyThrows
            @Override
            public void onMessage(Message message) {
                if(message instanceof  TextMessage){
                    TextMessage textMessage= (TextMessage) message;
                    System.out.println("接收到消息:"+textMessage.getText());
                }
            }
        });

        System.out.println("消息监听中:-----");
        System.in.read();
        
        //9、关闭资源
        consumer.close();
        session.close();
        connection.close();

        System.out.println("消息消费成功");
    }
}

一个消费的监听器,当有消息到达时会回调里面的onMessage的方法,在测试的时候不能让应用程序结束,所以在关闭资源前加System.in.read();

总结:

1、每一个消息只能有一个消费者,类似1对1的关系。好比个人快递自己领取自己的。

2、消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息,好比我们发短息,发送者发送后不见得接收者会立即接收,如关机。

3、消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息。

发布/订阅(Topic)模式

先启动消费者再启动生产者,要不然发送的废消息。例如你不关注订阅号就接收不到消息。

生产者

public class TestMessageTopicProducer {
    private static final String BROKER_URL = "tcp://127.0.0.1:61616";
    private static final String TOPIC_NAME = "topic-test";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory=new ActiveMQConnectionFactory(BROKER_URL);
        Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);

        MessageProducer producer = session.createProducer(topic);

		//TextMessage message = session.createTextMessage("我是一个主题消息");
        ActiveMQTextMessage textMessage=new ActiveMQTextMessage();
        textMessage.setText("我是一个new出来的主题消息");
		//使用Producer对象发送消息
        producer.send(textMessage);
        System.out.println("主题消费发送成功");
        
        producer.close();
        session.close();
        connection.close();
    }
}

消费者

public class TestMessageTopicConsumer {
    private static final String BROKER_URL = "tcp://127.0.0.1:61616";
    private static final String TOPIC_NAME = "topic-test";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory=new ActiveMQConnectionFactory(BROKER_URL);
        Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageConsumer consumer = session.createConsumer(topic);
		//监听消息
        consumer.setMessageListener(new MessageListener() {
            @SneakyThrows
            @Override
            public void onMessage(Message message) {
                ActiveMQTextMessage textMessage= (ActiveMQTextMessage) message;
                System.out.println("消费者接收到消息:"+textMessage.getText());
            }
        });
        System.out.println("主题消费者启动成功");
        System.in.read();
        
        consumer.close();
        session.close();
        connection.close();
    }
}

点对点与发布订阅模式比较

比较项目 Topic模式队列 Queue模式队列
工作模式 “订阅-发布”模式,如果当前没有订阅者,消息将会被丢弃;如果有多个订阅者,那么这些订阅者都会收到消息。 “负载均衡”模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息也只会发送给其中一个消费者,并且要求消费者ack消息。
有无状态 无状态 Queue数据默认会在mq服务器上以文件形式保存,比如ActiveMQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。
传递完整性 如果没有订阅者,消息会被丢弃 消息不会丢弃
处理效率 由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异 由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会有明显降低,当然不同消息协议的具体性能也是有差异的

发布/订阅模式(持久)

默认情况下,消费者是一个活动的非持久的订阅者。当我上线之后,只能接收我上线之后的消息。

设置持久订阅者:

public class TestMessageTopicDurableConsumer {
    private static final String BROKER_URL = "tcp://127.0.0.1:61616";
    private static final String TOPIC_NAME = "topic-test";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory=new ActiveMQConnectionFactory(BROKER_URL);
        Connection connection = factory.createConnection();
        //设置订阅者的名字
        connection.setClientID("zhangsan");
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);

        TopicSubscriber subscriber = session.createDurableSubscriber(topic, "---");
        subscriber.setMessageListener(new MessageListener() {
            @SneakyThrows
            @Override
            public void onMessage(Message message) {
                ActiveMQTextMessage textMessage= (ActiveMQTextMessage) message;
                System.out.println("消费者接收到消息:"+textMessage.getText());
            }
        });
        System.out.println("主题消费者启动成功");
        System.in.read();
        subscriber.close();
        session.close();
        connection.close();

    }
}

0

评论区