消息持久化顧名思義就是把發佈的消息進行落地,這樣activeMQ宕機也不會導致那些未被消費的消息給弄消失,這樣保證消息都能被消費。

activeMQ消息持久化有JDBC,AMQ,KahaDB和LevelDB等這幾種,默認採用的是KahaDB,本篇講的是如何採用jdbc的新式把消息存入數據庫的例子;

activeMQ默認採用的KahaDB,消息會落地到安裝目錄下的\data\kahadb這個目錄下

activemq卡在重連_bc

下面講講如何用jdbc進行消息持久化到數據庫

1.修改activemq.xml配置

這裏修改的activemq.xml文件是activemq安裝目錄下的conf/activemq.xml文件,與我們的項目無關

activemq卡在重連_activemq卡在重連_02


修改內容:

1.1註釋掉broker下的默認持久化方式:

<persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>

1.2在之下添加如下內容:

<persistenceAdapter>  
            <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>  
        </persistenceAdapter>


1.3在broker外添加如下內容:

<bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">  
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>  
        <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>  
        <property name="username" value="root"/>  
        <property name="password" value=""/>  
        <property name="maxActive" value="200"/>  
        <property name="poolPreparedStatements" value="true"/>  
    </bean>

注意:這裏用到了連接池以及mysql所以需要的包有:commons-dbcp-1.4.jar;commons-pool-1.5.4.jar;mysql-connector-java-5.1.22.jar這三個包

我的數據庫名activemq

2.下載及拷貝需要的jar

把1.3所説需要的jar拷貝到安裝目錄下的/lib目錄下即可
3.重啓activemq

啓動之後activemq庫裏面會生成3張表

activemq卡在重連_數據庫_03


1)activemq_acks:用於存儲訂閲關係。如果是持久化Topic,訂閲者和服務器的訂閲關係在這個表保存,主要數據庫字段如下:
container:消息的destination
sub_dest:如果是使用static集羣,這個字段會有集羣其他系統的信息
client_id:每個訂閲者都必須有一個唯一的客户端id用以區分
sub_name:訂閲者名稱
selector:選擇器,可以選擇只消費滿足條件的消息。條件可以用自定義屬性實現,可支持多屬性and和or操作
last_acked_id:記錄消費過的消息的id


2)activemq_lock:在集羣環境中才有用,只有一個Broker可以獲得消息,稱為Master Broker,其他的只能作為備份等待Master Broker不可用,才可能成為下一個Master Broker。這個表用於記錄哪個Broker是當前的Master Broker。

3)activemq_msgs:用於存儲消息,Queue和Topic都存儲在這個表中。主要的數據庫字段如下:
id:自增的數據庫主鍵
container:消息的destination
msgid_prod:消息發送者客户端的主鍵
msg_seq:是發送消息的順序,msgid_prod+msg_seq可以組成jms的messageid
expiration:消息的過期時間,存儲的是從1970-01-01到現在的毫秒數
msg:消息本體的java序列化對象的二進制數據
priority:優先級,從0-9,數值越大優先級越高
activemq_acks用於存儲訂閲關係。如果是持久化topic,訂閲者和服務器的訂閲關係在這個表保存。

注意:消息持久化的所有修改都與項目無關,都是直接操作安裝目錄下的文件

4.啓動我們之前做的activeMQ項目

4.1訪問http://localhost:8080/activeMQ/activeMQ/main,發佈queue消息

activemq卡在重連_bc_04

4.2查看數據表activemq_msgs,會生成一條數據

activemq卡在重連_數據庫_05

4.3如果我們發佈的是topic消息的話,查看activemq_acks表會有訂閲者的關係信息:

activemq卡在重連_bc_06

activemq_msgs表信息:

activemq卡在重連_bc_07

4.4消費消息,訪問http://localhost:8080/activeMQ/activeMQ/receive

activemq_msgs生成的一條數據將消失

5.ok這樣也就算完成消息的持久化了