本文共 9584 字,大约阅读时间需要 31 分钟。
(1) 异步投递是什么
自我理解:异步投递,是指生产者和broker之间发送消息的异步。不是指生产者和消费者之间异步。
特征:
① 不能有效保证消息的发送成功
② 如果出现slowConsumer,可能会给Broker带来
消息积压
的可能
官网介绍:http://activemq.apache.org/async-sends
说明:对于一个Slow Consumer,使用同步发送消息可能出成Producer堵塞等情况,慢消费者适合使用异步发送。
(这句话我认为有误)
总结:
① 异步发送可以让生产者发的更快。
② 如果异步投递不需要保证消息是否发送成功,发送者的效率会有所提高。如果异步投递还需要保证消息是否成功发送,并采用了回调的方式,发送者的效率提高不多,这种就有些鸡肋。
(2) 代码实现
官网上3中代码实现:
public class Jms_TX_Producer { // 方式1。3种方式任选一种 private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626?jms.useAsyncSend=true";//设置异步投递 private static final String ACTIVEMQ_QUEUE_NAME = "Async"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 方式2 activeMQConnectionFactory.setUseAsyncSend(true);//设置异步投递 Connection connection = activeMQConnectionFactory.createConnection(); // 方式3 ((ActiveMQConnection)connection).setUseAsyncSend(true);//设置异步投递 connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME); MessageProducer producer = session.createProducer(queue); try { for (int i = 0; i < 3; i++) { TextMessage textMessage = session.createTextMessage("tx msg--" + i); producer.send(textMessage); } System.out.println("消息发送完成"); } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); session.close(); connection.close(); } }}
(3) 异步发送如何确认发送成功
public class JmsProduce_asynsc { public static final String ACTIVEMQ_URL = "tcp://192.168.109.101:61616"; public static final String QUEUE_NAME = "asynsc01"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); mqConnectionFactory.setUseAsyncSend(true);//设置异步投递 Connection conn = mqConnectionFactory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); ActiveMQMessageProducer mqMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue); mqMessageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage textMessage = null; for (int i = 1; i <= 3; i++) { textMessage = session.createTextMessage("msg--asynsc01-----" + i); textMessage.setJMSMessageID(UUID.randomUUID().toString().substring(0,3)+"---orderAchang"); String msgID = textMessage.getJMSMessageID(); //new AsyncCallback()设置异步投送的回调函数 mqMessageProducer.send(textMessage, new AsyncCallback() { @Override public void onSuccess() { //发送成功的情况 System.out.println(msgID+"has benn ok send"); } @Override public void onException(JMSException exception) { //发送失败的情况 System.out.println(msgID+"fail to send to mq"); } }); } mqMessageProducer.close(); session.close(); conn.close(); System.out.println("========消息发布到MQ完成==========="); }}
控制台观察发送消息的信息:
(1) 介绍
官网文档:http://activemq.apache.org/delay-and-schedule-message-delivery.html
(2) 修改配置文件并重启
之后重启activemq
(3) 代码实现
生产者代码:
public class JmsProduce_delayAndschedule { public static final String ACTIVEMQ_URL = "tcp://192.168.109.101:61616"; public static final String QUEUE_NAME = "asynsc01"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); mqConnectionFactory.setUseAsyncSend(true);//设置异步投递 Connection conn = mqConnectionFactory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); ActiveMQMessageProducer mqMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);================================================================================== long delay = 3 * 1000; //延迟3秒 long period = 4 * 1000; //四秒钟投递一次 int repeat = 5; //投递次数 TextMessage textMessage = null; for (int i = 1; i <= 3; i++) { textMessage = session.createTextMessage("msg--delay--正文内容-----" + i); //消息属性设置 textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay); textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,period); textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat); // 此处的意思:该条消息,等待3秒,之后每4秒发送一次,重复发送5次。===================================================================================== mqMessageProducer.send(textMessage); } mqMessageProducer.close(); session.close(); conn.close(); System.out.println("========消息发布到MQ完成==========="); }}
消费者代码:与之前一样
(1) 是什么
官网文档:http://activemq.apache.org/redelivery-policy
是什么: 消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后broker不会再将该消息发送给消费者。
(2) 具体哪些情况会引发消息重发
① Client用了transactions且再session中调用了rollback
② Client用了transactions且再调用commit之前关闭或者没有commit
③ Client再CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover
(3) 请说说消息重发时间间隔和重发次数
间隔:1
次数:6
每秒发6次
(4) 有毒消息Poison ACK
一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列
)。
(5) 属性说明
(6) 代码验证
生产者。发送3条数据。代码省略…
消费者。开启事务,却没有commit。重启消费者,前6次都能收到消息,到第7次,不会再收到消息。代码:
public class Jms_TX_Consumer { private static final String ACTIVEMQ_URL = "tcp://192.168.109.101:61616"; private static final String ACTIVEMQ_QUEUE_NAME = "dead01"; public static void main(String[] args) throws JMSException, IOException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME); MessageConsumer messageConsumer = session.createConsumer(queue); messageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("***消费者接收到的消息: " + textMessage.getText()); //session.commit(); }catch (Exception e){ e.printStackTrace(); } } } }); //关闭资源 System.in.read(); messageConsumer.close(); session.close(); connection.close(); }}
activemq管理后台。多了一个名为ActiveMQ.DLQ队列,里面多了3条消息。
(7) 代码修改默认参数
修改重试次数为3。更多的设置请参考官网文档。
消费者代码:
public class Jms_TX_Consumer { private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626"; private static final String ACTIVEMQ_QUEUE_NAME = "dead01"; public static void main(String[] args) throws JMSException, IOException { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);===================================================================== // 修改默认参数,设置消息消费重试3次 RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); redeliveryPolicy.setMaximumRedeliveries(3); activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);===================================================================== Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME); MessageConsumer messageConsumer = session.createConsumer(queue); messageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("***消费者接收到的消息: " + textMessage.getText()); //session.commit(); }catch (Exception e){ e.printStackTrace(); } } } }); System.in.read(); messageConsumer.close(); session.close(); connection.close(); }}
(8)整合spring
承接上个标题的内容。
(1) 是什么
官网文档: http://activemq.apache.org/redelivery-policy
死信队列:异常消息规避处理的集合,主要处理失败的消息。
(1) 死信队列的配置(一般采用默认)
1. sharedDeadLetterStrategy
不管是queue还是topic,失败的消息都放到这个队列中。下面修改activemq.xml的配置,可以达到修改队列的名字。
2. individualDeadLetterStrategy
可以为queue和topic单独指定两个死信队列。还可以为某个话题,单独指定一个死信队列。
3. 自动删除过期消息
过期消息是值生产者指定的过期时间,超过这个时间的消息。
4. 存放非持久消息到死信队列中
如何保证消息不被重复消费呢?幕等性问题你谈谈
也就是变向问:如何解决消息的重复消费问题?
幂等性如何解决,根据messageid去查这个消息是否被消费了
感谢尚硅谷
转载地址:http://nioq.baihongyu.com/