博客
关于我
Day177.高级特性 -ActiveMQ
阅读量:330 次
发布时间:2019-03-04

本文共 9584 字,大约阅读时间需要 31 分钟。

ActiveMQ

高级特性

1 异步投递

(1) 异步投递是什么

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PGrxiS2N-1611823661224)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128120034951.png)]

自我理解:异步投递,是指生产者和broker之间发送消息的异步。不是指生产者和消费者之间异步。

特征

① 不能有效保证消息的发送成功

② 如果出现slowConsumer,可能会给Broker带来消息积压的可能

官网介绍:http://activemq.apache.org/async-sends


说明:对于一个Slow Consumer,使用同步发送消息可能出成Producer堵塞等情况,慢消费者适合使用异步发送。

(这句话我认为有误)

总结

① 异步发送可以让生产者发的更快。

② 如果异步投递不需要保证消息是否发送成功,发送者的效率会有所提高。如果异步投递还需要保证消息是否成功发送,并采用了回调的方式,发送者的效率提高不多,这种就有些鸡肋。


(2) 代码实现

官网上3中代码实现:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uW0NhZ0q-1611823661229)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128123647416.png)]

public class Jms_TX_Producer {       // 方式1。3种方式任选一种    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626?jms.useAsyncSend=true";//设置异步投递    private static final String ACTIVEMQ_QUEUE_NAME = "Async";    public static void main(String[] args) throws JMSException {           ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        // 方式2        activeMQConnectionFactory.setUseAsyncSend(true);//设置异步投递        Connection connection = activeMQConnectionFactory.createConnection();        // 方式3        ((ActiveMQConnection)connection).setUseAsyncSend(true);//设置异步投递        connection.start();        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);        MessageProducer producer = session.createProducer(queue);        try {               for (int i = 0; i < 3; i++) {                   TextMessage textMessage = session.createTextMessage("tx msg--" + i);                producer.send(textMessage);            }            System.out.println("消息发送完成");        } catch (Exception e) {               e.printStackTrace();        } finally {               producer.close();            session.close();            connection.close();        }    }}

(3) 异步发送如何确认发送成功

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-laWe2cvb-1611823661232)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128130608981.png)]

public class JmsProduce_asynsc {       public static final String ACTIVEMQ_URL = "tcp://192.168.109.101:61616";    public static final String QUEUE_NAME = "asynsc01";    public static void main(String[] args) throws JMSException {           ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        mqConnectionFactory.setUseAsyncSend(true);//设置异步投递        Connection conn = mqConnectionFactory.createConnection();        conn.start();        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);        Queue queue = session.createQueue(QUEUE_NAME);        ActiveMQMessageProducer mqMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);        mqMessageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);        TextMessage textMessage = null;        for (int i = 1; i <= 3; i++) {               textMessage = session.createTextMessage("msg--asynsc01-----" + i);            textMessage.setJMSMessageID(UUID.randomUUID().toString().substring(0,3)+"---orderAchang");            String msgID = textMessage.getJMSMessageID();            //new AsyncCallback()设置异步投送的回调函数            mqMessageProducer.send(textMessage, new AsyncCallback() {                   @Override                public void onSuccess() {                       //发送成功的情况                    System.out.println(msgID+"has benn ok send");                }                @Override                public void onException(JMSException exception) {                       //发送失败的情况                    System.out.println(msgID+"fail to send to mq");                }            });        }        mqMessageProducer.close();        session.close();        conn.close();        System.out.println("========消息发布到MQ完成===========");    }}

控制台观察发送消息的信息:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-erwdgyTo-1611823661237)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128130728377.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bLg7keUm-1611823661240)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128130748684.png)]


2 延迟投递和定时投递

(1) 介绍

官网文档:http://activemq.apache.org/delay-and-schedule-message-delivery.html

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2DhhtSLd-1611823661242)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128130810198.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gZrk5nBo-1611823661244)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128130818179.png)]

(2) 修改配置文件并重启

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zObSp0kl-1611823661245)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128130838486.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Xw9r9B10-1611823661246)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128131710357.png)]

    

之后重启activemq


(3) 代码实现

生产者代码:

public class JmsProduce_delayAndschedule {       public static final String ACTIVEMQ_URL = "tcp://192.168.109.101:61616";    public static final String QUEUE_NAME = "asynsc01";    public static void main(String[] args) throws JMSException {           ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        mqConnectionFactory.setUseAsyncSend(true);//设置异步投递        Connection conn = mqConnectionFactory.createConnection();        conn.start();        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);        Queue queue = session.createQueue(QUEUE_NAME);        ActiveMQMessageProducer mqMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);==================================================================================              long delay = 3 * 1000; //延迟3秒        long period = 4 * 1000; //四秒钟投递一次        int repeat = 5; //投递次数        TextMessage textMessage = null;        for (int i = 1; i <= 3; i++) {               textMessage = session.createTextMessage("msg--delay--正文内容-----" + i);            //消息属性设置            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,period);            textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);            // 此处的意思:该条消息,等待3秒,之后每4秒发送一次,重复发送5次。=====================================================================================            mqMessageProducer.send(textMessage);        }        mqMessageProducer.close();        session.close();        conn.close();        System.out.println("========消息发布到MQ完成===========");    }}

消费者代码:与之前一样


3 消息消费的重试机制

(1) 是什么

官网文档:http://activemq.apache.org/redelivery-policy

是什么: 消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后broker不会再将该消息发送给消费者。

(2) 具体哪些情况会引发消息重发

① Client用了transactions且再session中调用了rollback

② Client用了transactions且再调用commit之前关闭或者没有commit

③ Client再CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover

(3) 请说说消息重发时间间隔和重发次数

间隔:1

次数:6

每秒发6次

(4) 有毒消息Poison ACK

一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)。

(5) 属性说明

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zK66uNep-1611823661247)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128134219876.png)]

(6) 代码验证

生产者。发送3条数据。代码省略…

消费者。开启事务,却没有commit。重启消费者,前6次都能收到消息,到第7次,不会再收到消息。代码:

public class Jms_TX_Consumer {       private static final String ACTIVEMQ_URL = "tcp://192.168.109.101:61616";    private static final String ACTIVEMQ_QUEUE_NAME = "dead01";    public static void main(String[] args) throws JMSException, IOException {           ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        Connection connection = activeMQConnectionFactory.createConnection();        connection.start();        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);        MessageConsumer messageConsumer = session.createConsumer(queue);        messageConsumer.setMessageListener(new MessageListener() {               public void onMessage(Message message) {                   if (message instanceof TextMessage) {                       TextMessage textMessage = (TextMessage) message;                    try {                           System.out.println("***消费者接收到的消息:   " + textMessage.getText());                        //session.commit();                    }catch (Exception e){                           e.printStackTrace();                    }                }            }        });        //关闭资源        System.in.read();        messageConsumer.close();        session.close();        connection.close();    }}

activemq管理后台。多了一个名为ActiveMQ.DLQ队列,里面多了3条消息。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Lq7ABpSw-1611823661249)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128141557656.png)]


(7) 代码修改默认参数

修改重试次数为3。更多的设置请参考官网文档。

消费者代码

public class Jms_TX_Consumer {       private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";    private static final String ACTIVEMQ_QUEUE_NAME = "dead01";    public static void main(String[] args) throws JMSException, IOException {           ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);=====================================================================        // 修改默认参数,设置消息消费重试3次        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();        redeliveryPolicy.setMaximumRedeliveries(3);        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);=====================================================================        Connection connection = activeMQConnectionFactory.createConnection();        connection.start();        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);        MessageConsumer messageConsumer = session.createConsumer(queue);        messageConsumer.setMessageListener(new MessageListener() {               public void onMessage(Message message) {                   if (message instanceof TextMessage) {                       TextMessage textMessage = (TextMessage) message;                    try {                           System.out.println("***消费者接收到的消息:   " + textMessage.getText());                        //session.commit();                    }catch (Exception e){                           e.printStackTrace();                    }                }            }        });        System.in.read();        messageConsumer.close();        session.close();        connection.close();    }}

(8)整合spring

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oq9I26E9-1611823661250)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128141729105.png)]


4 死信队列

承接上个标题的内容。

(1) 是什么

官网文档: http://activemq.apache.org/redelivery-policy

死信队列:异常消息规避处理的集合,主要处理失败的消息。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-V0OEQ7G8-1611823661252)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128141753364.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aQrLSo6a-1611823661253)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128141812924.png)]


(1) 死信队列的配置(一般采用默认)

1. sharedDeadLetterStrategy

不管是queue还是topic,失败的消息都放到这个队列中。下面修改activemq.xml的配置,可以达到修改队列的名字。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IJAP3gFZ-1611823661254)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128141825368.png)]

2. individualDeadLetterStrategy

可以为queue和topic单独指定两个死信队列。还可以为某个话题,单独指定一个死信队列。

在这里插入图片描述

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6WJemec9-1611823661257)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128141848717.png)]

3. 自动删除过期消息

过期消息是值生产者指定的过期时间,超过这个时间的消息。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mUzIMXbj-1611823661258)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128141900244.png)]

4. 存放非持久消息到死信队列中

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vrZ7pkCQ-1611823661259)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128141912124.png)]


5 消息不被重复消费,幂等性

如何保证消息不被重复消费呢?幕等性问题你谈谈

也就是变向问:如何解决消息的重复消费问题?

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8dHYFAu7-1611823661260)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128144728668.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vj6Ha33S-1611823661261)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128144749293.png)]

幂等性如何解决,根据messageid去查这个消息是否被消费了

感谢尚硅谷

转载地址:http://nioq.baihongyu.com/

你可能感兴趣的文章
MySQL - 4种基本索引、聚簇索引和非聚索引、索引失效情况、SQL 优化
查看>>
MySQL - ERROR 1406
查看>>
mysql - 视图
查看>>
MySQL - 解读MySQL事务与锁机制
查看>>
MTTR、MTBF、MTTF的大白话理解
查看>>
mt_rand
查看>>
mysql -存储过程
查看>>
mysql /*! 50100 ... */ 条件编译
查看>>
mudbox卸载/完美解决安装失败/如何彻底卸载清除干净mudbox各种残留注册表和文件的方法...
查看>>
mysql 1264_关于mysql 出现 1264 Out of range value for column 错误的解决办法
查看>>
mysql 1593_Linux高可用(HA)之MySQL主从复制中出现1593错误码的低级错误
查看>>
mysql 5.6 修改端口_mysql5.6.24怎么修改端口号
查看>>
MySQL 8.0 恢复孤立文件每表ibd文件
查看>>
MySQL 8.0开始Group by不再排序
查看>>
mysql ansi nulls_SET ANSI_NULLS ON SET QUOTED_IDENTIFIER ON 什么意思
查看>>
multi swiper bug solution
查看>>
MySQL Binlog 日志监听与 Spring 集成实战
查看>>
MySQL binlog三种模式
查看>>
multi-angle cosine and sines
查看>>
Mysql Can't connect to MySQL server
查看>>