JMS入门案例
标签:ActiveMQ

JMS入门案例

1. JMS开发的基本步骤

  1. 创建一个JMS connection factory
  2. 通过connection factory来创建JMS connection
  3. 启动JMS connection
  4. 通过connection创建JMS session
  5. 创建JMS destination
  6. 创建JMS producer,或者创建JMS message,并设置destination
  7. 创建JMS consumer,或者是注册一个JMS message listener
  8. 发送或者接受JMS message(s)
  9. 关闭所有的JMS资源(connection,session,producer,consumer等)

QueueSender.java

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class QueueSender {
    public static void main(String[] args) throws JMSException, InterruptedException {
//        1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://10.13.67.107:61616");
//        2.通过连接工厂创建连接
        Connection connection=connectionFactory.createConnection();
//        3.启动连接
        connection.start();
//        4.通过连接创建session
        Session session=connection.createSession(Boolean.TRUE,Session.CLIENT_ACKNOWLEDGE);
//        5.通过session创建一个queue,并设置Destination
        Destination destination=session.createQueue("my-queue");
//        6.创建一个producer
        MessageProducer producer=session.createProducer(destination);

        for (int i = 0; i < 5; i++) {
//            7.创建一个Map类型的消息
            MapMessage message=session.createMapMessage();

            message.setStringProperty("key"+i,"value"+i);

            message.setString("message"+i,"message"+i);
//            8. 发送消息
            producer.send(message);
        }
//        9.session(会话)确认
        session.commit();
        session.close();
        connection.close();
    }
}

QueueReceiver.java

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class QueueReceiver {
    public static void main(String[] args) throws JMSException {
//        1.接收者的创建和发送者前面是一样的
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://10.13.67.107:61616");

        Connection connection=connectionFactory.createConnection();
        connection.start();

        Session session=connection.createSession(Boolean.TRUE,Session.CLIENT_ACKNOWLEDGE);

        Destination destination=session.createQueue("my-queue");
//        2.创建接收者
        MessageConsumer consumer=session.createConsumer(destination);
        
        int i=0;
        while (i<5){
//            3.接收消息
            MapMessage message= (MapMessage) consumer.receive();
//            4.从消息中提取内容
            System.out.println("message:"+message.getString("message"+i));
            System.out.println("key:"+message.getStringProperty("key"+i));
            i++;
//            5.消息确认
            message.acknowledge();
        }
        session.close();
        connection.close();
    }
}

这样当我们运行发送者后再运行接收者将会受到消息,输出结果为:

message:message0
key:value0
message:message1
key:value1
message:message2
key:value2
message:message3
key:value3
message:message4
key:value4

2. 非持久的Topic消息

对于非持久的Topic消息的发送,基本跟前面发送队列消息是一样的,只是把创建Destination的地方,由创建队列变成了创建Topic。

Destination destination = session.createTopic("MyTopic);

对于非持久的Topic消息接收

NoPersistenceSender.java

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class NoPersistenceSender {
    public static void main(String[] args) throws JMSException, InterruptedException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://10.13.67.107:61616");

        Connection connection=connectionFactory.createConnection();
        connection.start();

        Session session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);

//        创建Topic
        Destination destination=session.createTopic("my-topic");

        MessageProducer producer=session.createProducer(destination);

        for (int i = 0; i < 5; i++) {
          TextMessage textMessage=session.createTextMessage("msg"+i);
          producer.send(textMessage);
        }
        session.commit();
        session.close();
        connection.close();
    }
}

NoPersistenceReceiver.java

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class NoPersistenceReceiver {
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://10.13.67.107:61616");

        Connection connection=connectionFactory.createConnection();
        connection.start();

        Session session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
//        创建Topic
        Destination destination=session.createTopic("my-topic");

        MessageConsumer consumer=session.createConsumer(destination);

//        循环接收
        Message message=consumer.receive();
        while (message!=null){
            TextMessage textMessage= (TextMessage) message;
            System.out.println(textMessage.getText());
            message=consumer.receive(1000L);
        }
        session.commit();
        session.close();
        connection.close();

    }
}

由于是非持久的消息发送,如果我们先运行发送者发送了消息,在运行接收者的话,接收者将不会收到任何消息,消息将丢失了。

3. 持久化的Topic消息

对于持久化Topic消息的发送

  1. 要用持久化订阅,发送消息者要用DeliveryMode.PERSISENT模式发现,在连接之前设定
  2. 一定要设置完成后,再执行connection.start()

PersistenceSender.java

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class PersistenceSender {
    public static void main(String[] args) throws JMSException, InterruptedException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://10.13.67.107:61616");

        Connection connection=connectionFactory.createConnection();

        Session session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);

        Destination destination=session.createTopic("my-topic2");

        MessageProducer producer=session.createProducer(destination);
//        设置持久化模式
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//        再启动连接
        connection.start();

        for (int i = 0; i < 5; i++) {
          TextMessage textMessage=session.createTextMessage("msg2"+i);
          producer.send(textMessage);
        }
        session.commit();
        session.close();
        connection.close();
    }
}

对于持久化Topic消息的接收

  1. 需要在连接上设置消费者id,用来辨识消费者
  2. 需要创建TopicSubscriber来订阅
  3. 要设置好了过后再执行connection.start()
  4. 一定要先运行一次,等于向消息服务中间件注册这个接收者,然后再运行客户端发送消息,这个时候无论接收者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收过的消息都接收下来。
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class PersistenceReceiver {
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://10.13.67.107:61616");

        Connection connection=connectionFactory.createConnection();

//        1.设置一个id
        connection.setClientID("cc1");

        Session session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
//        2.创建Topic
        Topic destination=session.createTopic("my-topic2");
//        3.创建持久化的订阅者
        TopicSubscriber consumer=session.createDurableSubscriber(destination,"consumer");

        connection.start();

//        循环接收
        Message message=consumer.receive();
        while (message!=null){
            TextMessage textMessage= (TextMessage) message;
            System.out.println(textMessage.getText());
            message=consumer.receive(1000L);
        }
        session.commit();
        session.close();
        connection.close();

    }
}

4. 总结

持久化消息:

这是ActiveMQ的默认传递模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存緒。如果消息服务由于某祌原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的幵销,但却增加了可靠性。

非持久化消息

保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。此模 式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。有两神方法指定传送模式:

1.使用setDeliveryMode方法,这样所有的消息都采用此传送模式;如:

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

2.使用send方法为每一条消息设置传送模式

  • 6 min read

CONTRIBUTORS


  • 6 min read