zoukankan      html  css  js  c++  java
  • ActiveMQ消息中间件

    最近学习到ActiveMQ,之前也没有用过相关或者类似的工具,因此特地写个文章进行相关的学习记录。

    相关参考博文:https://www.cnblogs.com/cyfonly/p/6380860.htmlhttps://blog.csdn.net/qq_26641781/article/details/80408987https://blog.csdn.net/qinweili751/article/details/80620104

    1.安装ActiveMQ

    (1)进入官网http://activemq.apache.org/,选择最新的版本下载

    (2)再选择对应的系统环境(我这里选择的是windows版本)

    (3)下载完成后将其解压(我这里将它存放在D盘根目录下),目录结构如下

    (4)进入bin/win64/目录,启动activemq.bat文件(注意:MQ与jdk版本必须要匹配。我这里下载的MQ是5.15版本,对应的jdk最低要求是1.8)。

    (5)启动完成后,输入浏览器地址http://localhost:8161/admin,会弹出用户名/密码输入框

    我们的账号密码是存放在ActiveMQ根目录的conf/jetty-realm.properties文件中。

    打开可以看到最下面已经有两个创建好了的用户了。如果我们需要添加自己的用户,或是修改它们的角色,都可以按照上面所写的格式"用户名:密码 [,角色]"来进行配置(角色被定义在~/conf/jetty.xml中)。

    这里我们使用默认帐号,admin/admin

    (6)至此,我们的ActiveMQ已经安装完成。

    2.使用ActiveMQ

    • ActiveMQ的使用一般分为以下几个步骤:
    • connectionFactory:创建连接工厂;
    • connection:从连接工厂中得到连接;
    • session:从连接中获得一个会话;
    • destination:从会话中获取一个destination。可以是Queue(P2P)或Topic(Pub/Sub)
    • Producer:根据session和destination创建服务生产者。
    •   Message:根据session创建消息。
    •   send:消息生产者将message发送给MQ
    • Consumer:根据session和destination创建服务消费者。
    •   receive:接收MQ中的消息。可以是同步接收,也可以是创建监听器异步接收。
    • 关闭资源。

    由于我这里是Springboot的项目,因此有部分步骤已经在自动配置中处理好了。

    (1)引入依赖

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
                <version>5.15.4</version>
            </dependency>

    (2)添加相关配置

    #默认端口是61616,而不是我们访问网站的8161端口
    spring.activemq.broker-url=tcp://localhost:61616 spring.activemq.user=admin spring.activemq.password=admin spring.activemq.pool.enabled=false

    (3)添加配置类(PS:如果不配置该类,默认只会使用P2P,即设置Queue为destination。如果要使用Topic,则必须要配置下面的类)。

    @Configuration
    public class JmsConfig {
    
        @Bean
        public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory connectionFactory) {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setPubSubDomain(false);  //Queue是P2P,因此Pub/Sub设置为false。默认是false。
            factory.setConnectionFactory(connectionFactory);
            return factory;
        }
    
        @Bean
        public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory connectionFactory) {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setPubSubDomain(true);  //Topic是Pub/Sub,需要显示声明。
            factory.setConnectionFactory(connectionFactory);
            return factory;
        }
    
    }

    (4)编写服务消费者

    这里为了能区别P2P和Pub/Sub,创建了两个服务消费者。

    消费者1

    @Service
    public class ConsumerService {
    
        @JmsListener(destination = "springboot.queue.test", containerFactory = "jmsListenerContainerQueue")
        public void receiveQueue(String msg) {
            System.out.println(LocalDateTime.now().toString() + " consumer接收到Queue消息:" + msg);
        }
    
        @JmsListener(destination = "springboot.topic.test", containerFactory = "jmsListenerContainerTopic")
        public void receiveTopic(String msg) {
            System.out.println(LocalDateTime.now().toString() + " consumer接收到Topic消息:" + msg);
        }
    
    }

    消费者2

    @Service
    public class Consumer2Service {
    
        @JmsListener(destination = "springboot.queue.test", containerFactory = "jmsListenerContainerQueue")
        public void receiveQueue(String msg) {
            System.out.println(LocalDateTime.now().toString() + " consumer2接收Queue消息:" + msg);
        }
    
        @JmsListener(destination = "springboot.topic.test", containerFactory = "jmsListenerContainerTopic")
        public void receiveTopic(String msg) {
            System.out.println(LocalDateTime.now().toString() + " consumer2接收到Topic消息:" + msg);
        }
    }

    (5)编写服务生产者

    @Service
    public class ProducerService {
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
        public void sendMessage(Destination destination, String msg) {
            System.out.println(LocalDateTime.now().toString() + " productor发送消息:" + msg);
            jmsTemplate.convertAndSend(destination, msg);
        }
    
    }

    (6)测试类

    先测试P2P下的消息:

        @Test
        public void testMQQueue() {
            Destination destination = new ActiveMQQueue("springboot.queue.test");
            for (int i = 0; i < 3; i++) {
                producerService.sendMessage(destination, "hellow world " + i);
            }
        }

    输出结果

    2019-05-22T17:29:39.324 productor发送消息:hellow world 0
    2019-05-22T17:29:39.373 consumer2接收Queue消息:hellow world 0
    2019-05-22T17:29:39.379 productor发送消息:hellow world 1
    2019-05-22T17:29:39.385 productor发送消息:hellow world 2
    2019-05-22T17:29:39.388 consumer接收到Queue消息:hellow world 1
    2019-05-22T17:29:39.391 consumer2接收Queue消息:hellow world 2

    可以看到生产者每发出一个消息,都只会有一个消费者对消息进行处理。并且这里采用的是轮询的方式,即这次是消费者1接收了消息,下次就是消费者2接收,再下次又是消费者1。以此类推。

    然后我们再测试下Pub/Sub的消息:

        @Test
        public void testMQTopic() {
            Destination destination = new ActiveMQTopic("springboot.topic.test");
            for (int i = 0; i < 2; i++) {
                producerService.sendMessage(destination, "hellow world " + i);
            }
        }

    输出结果:

    2019-05-22T17:35:58.535 productor发送消息:hellow world 0
    2019-05-22T17:35:58.576 productor发送消息:hellow world 1
    2019-05-22T17:35:58.581 consumer接收到Topic消息:hellow world 0
    2019-05-22T17:35:58.582 consumer接收到Topic消息:hellow world 1
    2019-05-22T17:35:58.582 consumer2接收到Topic消息:hellow world 0
    2019-05-22T17:35:58.584 consumer2接收到Topic消息:hellow world 1

    这里可以看到,每一个消息被发出来后,会被所有的服务消费者接收并处理。

  • 相关阅读:
    Session的使用与管理
    CSS控制文字,超出部分显示省略号
    MP4 文件前端获取视频封面
    prefetch预加载功能使用
    react-学习之路-react-router-解决问题记录
    如何将一个div盒子水平垂直都居中?
    window下查看端口号,并删除对应进程
    判断js 验证字符串里面有没有包含汉字:
    vue 现有的$变量 以及如何添加全局变量
    与人言
  • 原文地址:https://www.cnblogs.com/yxth/p/10907511.html
Copyright © 2011-2022 走看看