侧边栏壁纸
博主头像
coydone博主等级

记录学习,分享生活的个人站点

  • 累计撰写 306 篇文章
  • 累计创建 51 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

ActiveMQ消息持久化

coydone
2022-06-14 / 0 评论 / 0 点赞 / 570 阅读 / 6,781 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-05-01,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

理论简介

ActiveMQ的持久化机制包含JDBC、KahaDB、LevelDB。

在activemq.xml中查看默认的broker持久化机制。

详情页面:http://activemq.apache.org/persistence.html

<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

AMQ和KahaDB

AMQ的优缺点

1、性能

AMQ的性能与与JDBC的持久化机制相比,由于是在文件中追加写入消息,所以性能比较高。并且创建了消息主键索引和缓存机制以提升性能。每个日志文件默认为32M,超出后会创建一个新文件。当消息消费完成后是进行删除还是归档操作,取决于配置。

2、缺点

AMQ会为每一个Destination创建一个索引,若创建了大小的消息队列,则磁盘占用会非常大,所以由于索引文件比较大,当Broker崩溃后,重建所以速度比较慢。

KahaDB的概述

KahaDB是基于文件的本地数据库储存形式,虽然没有AMQ的速度快,但是它具有强扩展性,恢复的时间比AMQ短。

KahaDB基于文件系统,其次KahaDB支持事务。在ActiveMQ V5.4版本及后续版本KahaDB都是ActiveMQ的默认持久化存储方案。最后 Apache ActiveMQ官方表示它用来替换之前的AMQ Message Store存储方案。

KahaDB主要元素包括:一个内存Metadata Cache用来在内存中检索消息的存储位置、若干用于记录消息内容的Data log文件、一个在磁盘上检索消息存储位置的Metadata Store、还有一个用于在系统异常关闭后恢复Btree结构的redo文件。如下图所示(官网引用):

B-Tree图说明:

KahaDB主要特性

1、日志形式存储消息;

2、消息索引以B-Tree结构存储,可以快速更新;

3、完全支持JMS事务;

4、支持多种恢复机制;

消息存储在基于文件的数据日志中。如果消息发送成功,变标记为可删除的。系统会周期性的清除或者归档日志文件。

消息文件的位置索引存储在内存中,这样能快速定位到。定期将内存中的消息索引保存到metadata store中,避免大量消息未发送时,消息索引占用过多内存空间。

KahaDB的文档结构说明

activemq/data/kahadb的目录,以下是KahaDB在磁盘文件上的现实展示。注意,可能您查看自己测试实例中所运行的KahaDB,看到的效果和本文中给出的效果不完全一致。例如您的data log文件可能叫db-1.log,也有可能会多出一个db.free的文件,但是这些都不影响我们对文件结构的分析:

1、db.data:它是消息的索引文件。本质上是B-Tree的实现,使用B-Tree作为索引指向db-*.log里面存储的消息。

2、db.redo:主要用来进行消息恢复。

3、db-*.log:存储消息的内容。对于一个消息而言,不仅仅有消息本身的数据(message data),而且还有(Destinations、订阅关系、事务…)。

data log以日志形式存储消息,而且新的数据总是以APPEND的方式追加到日志文件末尾。因此,消息的存储是很快的。比如,对于持久化消息,Producer把消息发送给Broker,Broker先把消息存储到磁盘中(enableJournalDiskSyncs配置选项),然后再向Producer返回Acknowledge。Append方式在一定程度上减少了Broker向Producer返回Acknowledge的时间。

4、lock文件锁

KahaDB的属性

5.4之后的版本:

5.6版本之后有效的属性:

5.10版本:

LevelDB简介

LevelDB:从ActiveMQ 5.6版本之后,又推出了LevelDB的持久化引擎。LevelDB持久化性能高于KahaDB,虽然目前默认的持久化方式仍然是KahaDB,但是LevelDB是将来的趋势。并且,在ActiveMQ 5.9版本提供了基于LevelDB和Zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。LevelDB使用自定义的索引代替常用的BTree索引。

通过上图可以看出LevelDB主要由6部分组成:内存中的MemTable和ImmutableMemTable,还有硬盘上的log文件,manifest文件,current文件和SSTable文件,还有一些其他的辅助文件。

每写入一次数据,需要写入log文件,和MemTable,也就是说,只需要一次硬盘的顺序写入,和一个内存写入,如果系统崩溃,可以通过log文件恢复数据。每次写入会先写log文件,后写MemTable来保证不丢失数据。

当MemTable到达内存阀值,LevelDB会创建一个新的MemTable和log文件,而旧的MemTable会变成ImmutableMemTable,ImmutableMemTable的内容是只读的。然后系统会定时的异步的把ImmutableMemTable的数据写入新的SSTable文件。

SSTable文件和MemTable,ImmutableMemTable的数据结构相同,都是key,value的数据,按照key排序。

manifest文件用于记录每个SSTable的key的起始值和结束值,有点类似于B-tree索引。而manifest同样会生成新文件,旧的文件不再使用。current文件就是指定哪个manifest文件是现在正在使用的。

官网不推荐使用:http://activemq.apache.org/leveldb-store。

JDBC配置MySQL

JDBC持久化就是把消息的相关信息存入MySQL数据库里面。

配置方式:把连接MySQL数据库的Jar文件,放到ActiveMQ的lib目录下:mysql-connector-java-5.1.20-bin.jar

修改conf目录下的activemq.xml原来的kahadb的持久化数据的方式:

<!--
<persistenceAdapter>
      <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
      <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>

连接MySQL的配置(注意配置文件放置的位置,放在borker之外)

<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value="123456"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>

此时,重新启动MQ,就会发现db数据库中多了三张表:activemq_acks,activemq_lock,activemq_msgs,OK,说明activemq已经持久化成功了。

三张表的说明

activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。

主要的数据库字段如下:

  • container:消息的destination。

  • sub_dest:如果是使用static集群,这个字段会有集群其他系统的信息。

  • client_id:每个订阅者都必须有一个唯一的客户端id用以区分。

  • sub_name:订阅者名称。

  • selector:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性and和or操作 。

  • last_acked_id:记录消费过的消息的id。

activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker。

activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中。

主要的数据库字段如下:

  • id:自增的数据库主键。

  • container:消息的destination。

  • msgid_prod:消息发送者客户端的主键。

  • msg_seq:是发送消息的顺序,msgid_prod+msg_seq可以组成jms的messageid。

  • expiration:消息的过期时间,存储的是从1970-01-01到现在的毫秒数。

  • msg:消息本体的java序列化对象的二进制数据。

  • priority:优先级,从0-9,数值越大优先级越高。

  • activemq_acks:用于存储订阅关系。如果是持久化topic,订阅者和服务器的订阅关系在这个表保存。

运行代码验证:一定要开启持久化producer.setDeliveryMode(DeliveryMode.PERSISTENT);

验证结果

如果是点对点:在没有消费的情况下会把消息保存到activemq_msgs表中,只要有任意一个消费者已经消费过了,这些消息会立马被清除。

如果是主题,一般是先启动消费者订阅再生产的情况下会把消息保存到activemq_acks中。

可能遇到的坑

1、数据库jar包

记得需要使用到的相关jar文件放置到Activemq安装路径下的lib目录,连接池的包也要放

2、createTableOnStartup属性

在jdbcPersistenceAdapter标签中设置了createTablesOnStartup属性为true时在第一次启动activeMQ时,ActiveMQ服务节点会自动创建表。启动完成后可以去掉这个属性,或者更改为false。

3、下划线的问题

如果出现java.lang.IIlegalStateException:BeanFactory not initalized or already closed,这是因为操作系统的机器名中有"_"符号,请更改机器名并重启后可以解决。

关于主题和队列

队列会默认进行存储,如果主题没有持久的订阅者,那么主题消息不会被持久化 如果有,即使没有上线,也会持久化。

JDBC With Journal

概述

为了在ActiveMQ V4.x中实现持久消息传递的高性能,我们强烈建议您使用我们的高性能日志 - 默认情况下已启用。这很像一个数据库消息(以及transcation提交/回滚和消息确认)以尽可能快的速度写入日志,然后每隔一段时间我们将日志检查到长期持久性存储(在本例中为JDBC)。

它在使用队列时很常见,例如消息在发布后很快消耗掉。因此,您可以发布10,000条消息,并且只有一些未完成的消息 - 因此,当我们检查JDBC数据库时,我们通常只有少量消息可以实际写入JDBC。即使我们必须将所有消息写入JDBC,我们仍然可以通过日志获得性能提升,因为我们可以使用大型事务批处理将消息插入JDBC数据库以提高JDBC端的性能。

JDBC With Journal方式克服了JDBC Store的不足,使用快速的缓存写入技术,大大提高了性能。

JDBC With Journal方式,发送出来的消息会在内存中告诉缓存,接收端若在没有接收情况下7~10分钟后再写入数据库,这样接收端就不用等到数据库操作完了之后再接收消息。

JDBC Store和JDBC Message Store with ActiveMQ Journal的区别:

1、Jdbc with journal的性能优于jdbc。

2、Jdbc用于master/slave模式的数据库分享。

3、Jdbc with journal不能用于master/slave模式。

4、一般情况下,推荐使用jdbc with journal。

配置方式

1、打开activemq.xml配置文件(在apache-activemq/conf文件夹下),将原来使用的kahaDB消息持久化机制注释掉,添加以下代码:

<persistenceFactory>
     <journalPersistenceAdapterFactory
          journalLogFiles="4"
          journalLogFileSize="32768"
          useJournal="true"
          useQuickJournal="true"
          dataSource="#mysql-ds"
          dataDirectory="activemq-data"/>
</persistenceFactory>

2、也可以配置dbcp的数据连接池,但是必须引入dbcp的相关jar包。

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/db?relaxAutoCommit=true"/>
    <property name="username" value="activemq"/>
    <property name="password" value="activemq"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>

ActiveMQ持久化机制小结

1、持久化消息的作用:MQ所在的服务器down了消息不会丢失的机制。

2、持久化机制演化过程

从最初的AMQ Message Store方案到ActiveMQ V4版本中推出的High performance journal(高性能事务支持)附件,并且同步推出数据库的存储方案。ActiveMQ5.3版本中又推出来的KahaDB的支持(5.4版本后为ActiveMQ默认持久化方案),后来又开始支持LeaveDB,到现在。V5.9+版本提供了标准的zookeeper+LeavelDB集群化方案。

3、ActiveMQ的消息持久化机制:AMQ、KahaDB、JDBC、LevelDB、Replicated LevelDB Store 集群模式。

4、其它

  • 在发送者把消息发送出去后,消息中心首先把消息存储到本地数据文件、内存数据库者或者远程数据库等,然后试图把消息发送给接收者。

  • 发送成功则把消息从存储中删除失败则继续尝试。消息中心启动后首先要检查指定存储位置,如果有未发送成功的消息,就需要把消息发送出去。

0

评论区