ActiveMQ结合Spring开发
标签:ActiveMQ

ActiveMQ结合Spring开发

1. ActiveMQ和Spring的Queue开发

引入Spring和ActiveMQ的相关依赖

<dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>4.3.18.RELEASE</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>4.3.18.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>4.3.18.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>4.3.18.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>4.3.18.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.3.18.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aspects</artifactId>
            <version>4.3.18.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-orm</artifactId>
            <version>4.3.18.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.3.18.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.6.0</version>
        </dependency>
    </dependencies>

配置JmsTemplate

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
		http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">
    <context:component-scan base-package="com.liuyao"/>
    <aop:aspectj-autoproxy proxy-target-class="true"/>
    
    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL">
                    <value>tcp://10.13.67.107:61616</value>
                </property>
            </bean>
        </property>
        <property name="maxConnections" value="100"/>
    </bean>
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory"/>
        <property name="defaultDestination" ref="destination"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="spring-queue"/>
    </bean>
    
</beans>

创建发送者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import javax.jms.TextMessage;

@Service
public class QueueSender {

    @Autowired
    private JmsTemplate jmsTemplate=null;

    public static void main(String[] args) {
        ApplicationContext ctx=new ClassPathXmlApplicationContext("ApplicationContext.xml");
        QueueSender queueSender= (QueueSender) ctx.getBean("queueSender");
        queueSender.jmsTemplate.send(s->{
            TextMessage msg=s.createTextMessage("Spring msg");
            return msg;
        });
    }
}

创建接收者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

@Service
public class QueueReceiver {
    @Autowired
    private JmsTemplate jmsTemplate=null;

    public static void main(String[] args) {
        ApplicationContext ctx=new ClassPathXmlApplicationContext("ApplicationContext.xml");
        QueueReceiver queueReceiver= (QueueReceiver) ctx.getBean("queueReceiver");
        String msg= (String) queueReceiver.jmsTemplate.receiveAndConvert();
        System.out.println(msg);
    }
}

2. ActiveMQ和Spring的Topic开发

2.1 如果topic的话,首先需要修改spring的配置

先添加topic的配置,当然,也需要修改jmsTemplate配置里面的default Destination,如果不想修改这个配置,那么直接把Destination注入程序,在程序里面选择发送的Destination也可以:

<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg index="0" value="spring-topic"/>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="jmsFactory"/>
    <!--修改默认的Destination-->
    <property name="defaultDestination" ref="destinationTopic"/> 
    <property name="messageConverter">
        <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
    </property>
</bean>

发送者和接收者和队列一样的,不需要做任何修改。但是需要先运行接收者,然后再运行发送者,就可以看到消息了。

2.2 如果想要在Spring中配置消费者的话,就不需要再启动接收的客户端了

在上面的配置再加上下面的配置:

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsFactory"/>
    <property name="destination" ref="destinationTopic"/>
    <property name="messageListener" ref="messageListener"/>
</bean>
<bean id="messageListener" class="com.liuyao.spring.MyMessageListener"/>

新建一个MyMessageListener

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage= (TextMessage) message;
        try {
            System.out.println("receive txt msg:  "+textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

这样测试的时候,只需要启动消息发送者就可以了,就能看到消息了。

3. 总结

  1. Spring消息发送的核心架构是JMSTemplate,隔离了像打开、关闭session和producer的繁琐操作,因此应用人员仅仅需要关注实际的业务逻辑。但是JMSTemplate损害了ActiveMQ的PooledConnectionFactory对session和消息producer的缓存机制带来的性能提升。
  2. 新的Spring里面,可以设置 org.springframework.jms.connection.CachingConnectionFactory的sessionCacheSize,或者干脆使用ActiveMQ的PooledConnectionFactory
  3. 不建议使用JMSTemplate的receive()调用,因为在JMSTemplate上所有调用都是同步的,这意味着调用线程需要被阻塞,直到方法返回,这对性能影响很大
  4. 请使用DefaultMessageListenerContainer,它允许异步接收消息并缓存session和consumer,而且还可以根据消息数量动态的增加或缩减监听器的数量

4. 连接到ActiveMQ

ActiveMQ通过Connector来实现连接通讯的功能。包括client-to-broker、broker-to-broker。ActiveMQ允许客户端使用多种协议来连接。我们可以通过配置conf/activemq.xml来实现。

<transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

ActiveMQ支持的client-broker通讯协议有:TCP、NIO、UDP、SSL、HTTP(S)、VM

Transmission Control Protocol (TCP)

  1. 这是默认的Broker配置,TCP的Client监听端口是61616。

  2. 在网络传输数据前,必须要序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。默认情况下,ActiveMQ把wire protocol叫做OpenWire,它的目的是促使网络上的效率和数据快速交互。

  3. TCP连接的URI形式:tcp://hostname:port?key=value&key=value,加粗部分是必须的

  4. TCP传输的优点:

    (1)TCP协议传输可靠性高,稳定性强

    (2)高效性:字节流方式传递,效率很高

    (3)有效性、可用性:应用广泛,支持任何平台

  5. 所有关于Transport协议的可配置参数,可以参见:

http://activemq.apache.org/configuring-version-5-transports.html

5. ActiveMQ的消息存储持久化

ActiveMQ不仅支持persistent和non-persistent两种方式,还支持消息的恢复(recovery)方式

5.1 PTP

通过一个FIFO的Queue

5.2 PUB/SUB

对于持久化订阅主题,每一个消费者将获得一个消息的复制

5.3 有效的消息存储

ActiveMQ提供了一个插件式的消息存储,类似于消息的多点传播,主要实现有如下几种:

  1. AMQ消息存储:基于文件的存储方式,是以前的默认消息存储
  2. KahaDB消息存储:提供了容量的提升和恢复能力,是现在的默认存储方式
  3. JDBC消息存储:消息基于JDBC存储的
  4. Memory消息存储:基于内存的消息存储

5.4.1 KahaDB

KahaDB Message Store概述

KahaDB是ActiveMQ目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。

KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。在KahaDB中,数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。

KahaDB基本配置例子

<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

可用的属性有:

  1. director: KahaDB存放的路径,默认值activemq-data
  2. indexWriteBatchSize:批量写入磁盘的索引page数量,默认值1000
  3. indexCacheSize:内存中缓存索引page的数量,默认值10000
  4. enablelndexWriteAsync:是否异步写出索引,默认false
  5. journalMaxFileLength:设置每个消息data log的大小,默认是32MB
  6. enableJournalDiskSyncs:设置是否保证每个没有事务的内容,被同步写入磁盘,JMS持久化的时候需要,默认为true
  7. cleanuplnterval:在检查到不再使用的消息后,在具体删除消息前的时间,默认30000
  8. checkpo intin ter val : checkpoint 的间隔时间,默认 5000
  9. ignoreMissingJournalfiles:是否忽略丢失的消息日志文件,默认false
  10. checkForCorruptJournalFiles:在启动的时候,将会验证消息文件是否损坏,默认false
  11. checksumJournalFiles:是否为每个消息日志文件提供checksum,默认false
  12. archiveDataLogs:是否移动文件到特定的路径,而不是删除它们,默认false
  13. directoryArchive:定义消息己经被消费过后,移动data log到的路径,默认null
  14. databaseLockedWaitDelay:获得数据库锁的等待时间(used by shared master/slave),默认 10000
  15. maxAsyncJobs:设置最大的可以存储的异步消息队列,默认值10000,可以和concurrent Message Producers设置成一样的值
  16. concurrentstoreAndDispatchTransactions:是否分发消息到客户端,同时事务存储消息,默认 true
  17. concurrentstoreAndDispatchTopics:是否分发Topic消息到客户端,同时进行存储,默认true
  18. concurrents toreAndDi spa tchQueues :是否分发queue消息到客户端,同时进行存储,默认true
  • 10 min read

CONTRIBUTORS


  • 10 min read