理论简介
ActiveMQ的持久化机制包含JDBC、KahaDB、LevelDB。
在activemq.xml中查看默认的broker持久化机制。
详情页面:http://activemq.apache.org/persistence.html 。
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
AMQ和KahaDB
AMQ的优缺点
1、性能
AMQ的性能与与JDBC的持久化机制相比,由于是在文件中追加写入消息,所以性能比较高。并且创建了消息主键索引和缓存机制以提升性能。每个日志文件默认为32M,超出后会创建一个新文件。当消息消费完成后是进行删除还是归档操作,取决于配置。
2、缺点
AMQ会为每一个Destination创建一个索引,若创建了大小的消息队列,则磁盘占用会非常大,所以由于索引文件比较大,当Broker崩溃后,重建所以速度比较慢。
KahaDB的概述
KahaDB是基于文件的本地数据库储存形式,虽然没有AMQ的速度快,但是它具有强扩展性,恢复的时间比AMQ短。
KahaDB基于文件系统,其次KahaDB支持事务。在ActiveMQ V5.4版本及后续版本KahaDB都是ActiveMQ的默认持久化存储方案。最后 Apache ActiveMQ官方表示它用来替换之前的AMQ Message Store存储方案。
KahaDB主要元素包括:一个内存Metadata Cache用来在内存中检索消息的存储位置、若干用于记录消息内容的Data log文件、一个在磁盘上检索消息存储位置的Metadata Store、还有一个用于在系统异常关闭后恢复Btree结构的redo文件。如下图所示(官网引用):
B-Tree图说明:
KahaDB主要特性
1、日志形式存储消息;
2、消息索引以B-Tree结构存储,可以快速更新;
3、完全支持JMS事务;
4、支持多种恢复机制;
消息存储在基于文件的数据日志中。如果消息发送成功,变标记为可删除的。系统会周期性的清除或者归档日志文件。
消息文件的位置索引存储在内存中,这样能快速定位到。定期将内存中的消息索引保存到metadata store中,避免大量消息未发送时,消息索引占用过多内存空间。
KahaDB的文档结构说明
在activemq/data/kahadb
的目录,以下是KahaDB在磁盘文件上的现实展示。注意,可能您查看自己测试实例中所运行的KahaDB,看到的效果和本文中给出的效果不完全一致。例如您的data log文件可能叫db-1.log,也有可能会多出一个db.free的文件,但是这些都不影响我们对文件结构的分析:
1、db.data:它是消息的索引文件。本质上是B-Tree的实现,使用B-Tree作为索引指向db-*.log里面存储的消息。
2、db.redo:主要用来进行消息恢复。
3、db-*.log:存储消息的内容。对于一个消息而言,不仅仅有消息本身的数据(message data),而且还有(Destinations、订阅关系、事务…)。
data log以日志形式存储消息,而且新的数据总是以APPEND的方式追加到日志文件末尾。因此,消息的存储是很快的。比如,对于持久化消息,Producer把消息发送给Broker,Broker先把消息存储到磁盘中(enableJournalDiskSyncs配置选项),然后再向Producer返回Acknowledge。Append方式在一定程度上减少了Broker向Producer返回Acknowledge的时间。
4、lock文件锁
KahaDB的属性
5.4之后的版本:
5.6版本之后有效的属性:
5.10版本:
LevelDB简介
LevelDB:从ActiveMQ 5.6版本之后,又推出了LevelDB的持久化引擎。LevelDB持久化性能高于KahaDB,虽然目前默认的持久化方式仍然是KahaDB,但是LevelDB是将来的趋势。并且,在ActiveMQ 5.9版本提供了基于LevelDB和Zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。LevelDB使用自定义的索引代替常用的BTree索引。
通过上图可以看出LevelDB主要由6部分组成:内存中的MemTable和ImmutableMemTable,还有硬盘上的log文件,manifest文件,current文件和SSTable文件,还有一些其他的辅助文件。
每写入一次数据,需要写入log文件,和MemTable,也就是说,只需要一次硬盘的顺序写入,和一个内存写入,如果系统崩溃,可以通过log文件恢复数据。每次写入会先写log文件,后写MemTable来保证不丢失数据。
当MemTable到达内存阀值,LevelDB会创建一个新的MemTable和log文件,而旧的MemTable会变成ImmutableMemTable,ImmutableMemTable的内容是只读的。然后系统会定时的异步的把ImmutableMemTable的数据写入新的SSTable文件。
SSTable文件和MemTable,ImmutableMemTable的数据结构相同,都是key,value的数据,按照key排序。
manifest文件用于记录每个SSTable的key的起始值和结束值,有点类似于B-tree索引。而manifest同样会生成新文件,旧的文件不再使用。current文件就是指定哪个manifest文件是现在正在使用的。
官网不推荐使用:http://activemq.apache.org/leveldb-store。
JDBC配置MySQL
JDBC持久化就是把消息的相关信息存入MySQL数据库里面。
配置方式:把连接MySQL数据库的Jar文件,放到ActiveMQ的lib目录下:mysql-connector-java-5.1.20-bin.jar
。
修改conf目录下的activemq.xml原来的kahadb的持久化数据的方式:
<!--
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
连接MySQL的配置(注意配置文件放置的位置,放在borker之外)
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
此时,重新启动MQ,就会发现db数据库中多了三张表:activemq_acks,activemq_lock,activemq_msgs,OK,说明activemq已经持久化成功了。
三张表的说明
activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。
主要的数据库字段如下:
-
container:消息的destination。
-
sub_dest:如果是使用static集群,这个字段会有集群其他系统的信息。
-
client_id:每个订阅者都必须有一个唯一的客户端id用以区分。
-
sub_name:订阅者名称。
-
selector:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性and和or操作 。
-
last_acked_id:记录消费过的消息的id。
activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker。
activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中。
主要的数据库字段如下:
-
id:自增的数据库主键。
-
container:消息的destination。
-
msgid_prod:消息发送者客户端的主键。
-
msg_seq:是发送消息的顺序,msgid_prod+msg_seq可以组成jms的messageid。
-
expiration:消息的过期时间,存储的是从1970-01-01到现在的毫秒数。
-
msg:消息本体的java序列化对象的二进制数据。
-
priority:优先级,从0-9,数值越大优先级越高。
-
activemq_acks:用于存储订阅关系。如果是持久化topic,订阅者和服务器的订阅关系在这个表保存。
运行代码验证:一定要开启持久化producer.setDeliveryMode(DeliveryMode.PERSISTENT);
。
验证结果
如果是点对点:在没有消费的情况下会把消息保存到activemq_msgs表中,只要有任意一个消费者已经消费过了,这些消息会立马被清除。
如果是主题,一般是先启动消费者订阅再生产的情况下会把消息保存到activemq_acks中。
可能遇到的坑
1、数据库jar包
记得需要使用到的相关jar文件放置到Activemq安装路径下的lib目录,连接池的包也要放
2、createTableOnStartup属性
在jdbcPersistenceAdapter标签中设置了createTablesOnStartup属性为true时在第一次启动activeMQ时,ActiveMQ服务节点会自动创建表。启动完成后可以去掉这个属性,或者更改为false。
3、下划线的问题
如果出现java.lang.IIlegalStateException:BeanFactory not initalized or already closed
,这是因为操作系统的机器名中有"_"符号,请更改机器名并重启后可以解决。
关于主题和队列
队列会默认进行存储,如果主题没有持久的订阅者,那么主题消息不会被持久化 如果有,即使没有上线,也会持久化。
JDBC With Journal
概述
为了在ActiveMQ V4.x中实现持久消息传递的高性能,我们强烈建议您使用我们的高性能日志 - 默认情况下已启用。这很像一个数据库消息(以及transcation提交/回滚和消息确认)以尽可能快的速度写入日志,然后每隔一段时间我们将日志检查到长期持久性存储(在本例中为JDBC)。
它在使用队列时很常见,例如消息在发布后很快消耗掉。因此,您可以发布10,000条消息,并且只有一些未完成的消息 - 因此,当我们检查JDBC数据库时,我们通常只有少量消息可以实际写入JDBC。即使我们必须将所有消息写入JDBC,我们仍然可以通过日志获得性能提升,因为我们可以使用大型事务批处理将消息插入JDBC数据库以提高JDBC端的性能。
JDBC With Journal方式克服了JDBC Store的不足,使用快速的缓存写入技术,大大提高了性能。
JDBC With Journal方式,发送出来的消息会在内存中告诉缓存,接收端若在没有接收情况下7~10分钟后再写入数据库,这样接收端就不用等到数据库操作完了之后再接收消息。
JDBC Store和JDBC Message Store with ActiveMQ Journal的区别:
1、Jdbc with journal的性能优于jdbc。
2、Jdbc用于master/slave模式的数据库分享。
3、Jdbc with journal不能用于master/slave模式。
4、一般情况下,推荐使用jdbc with journal。
配置方式
1、打开activemq.xml配置文件(在apache-activemq/conf文件夹下),将原来使用的kahaDB消息持久化机制注释掉,添加以下代码:
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="activemq-data"/>
</persistenceFactory>
2、也可以配置dbcp的数据连接池,但是必须引入dbcp的相关jar包。
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/db?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
ActiveMQ持久化机制小结
1、持久化消息的作用:MQ所在的服务器down了消息不会丢失的机制。
2、持久化机制演化过程
从最初的AMQ Message Store方案到ActiveMQ V4版本中推出的High performance journal(高性能事务支持)附件,并且同步推出数据库的存储方案。ActiveMQ5.3版本中又推出来的KahaDB的支持(5.4版本后为ActiveMQ默认持久化方案),后来又开始支持LeaveDB,到现在。V5.9+版本提供了标准的zookeeper+LeavelDB集群化方案。
3、ActiveMQ的消息持久化机制:AMQ、KahaDB、JDBC、LevelDB、Replicated LevelDB Store 集群模式。
4、其它
-
在发送者把消息发送出去后,消息中心首先把消息存储到本地数据文件、内存数据库者或者远程数据库等,然后试图把消息发送给接收者。
-
发送成功则把消息从存储中删除失败则继续尝试。消息中心启动后首先要检查指定存储位置,如果有未发送成功的消息,就需要把消息发送出去。
评论区