在Java中使用消息队列_大数据培训

1 在gmall-service-util中导入依赖坐标


<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>

 

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-pool</artifactId>
   <version>5.15.2</version>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>

 

2  producer端

大数据培训

 3  consumer

public static void main(String[] args) {
    ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.67.163:61616");
    try {
        Connection connection = connect.createConnection();
        connection.start();
        //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
       
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination testqueue = session.createQueue("TEST1");

        MessageConsumer consumer = session.createConsumer(testqueue);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(message instanceof TextMessage){
                    try {
                        String text = ((TextMessage) message).getText();
                        System.out.println(text);

                        //session.rollback();
                   
} catch (JMSException e) {
                        // TODO Auto-generated catch block
                       
e.printStackTrace();
                    }
                }
            }
        });


    }catch (Exception e){
        e.printStackTrace();
    }
}

 

4 关于事务控制

producer提交时的事务

事务开启

只执行send并不会提交到队列中,只有当执行session.commit()时,消息才被真正的提交到队列中。

事务不开启

只要执行send,就进入到队列中。

consumer 接收时的事务

事务开启,签收必须写

Session.SESSION_TRANSACTED

 

收到消息后,消息并没有真正的被消费。消息只是被锁住。一旦出现该线程死掉、抛异常,或者程序执行了session.rollback()那么消息会释放,重新回到队列中被别的消费端再次消费。

事务不开启,签收方式选择

Session.AUTO_ACKNOWLEDGE

只要调用comsumer.receive方法 ,自动确认。

事务不开启,签收方式选择

Session.CLIENT_ACKNOWLEDGE

需要客户端执行 message.acknowledge(),否则视为未提交状态,线程结束后,其他线程还可以接收到。

  这种方式跟事务模式很像,区别是不能手动回滚,而且可以单独确认某个消息。

事务不开启,签收方式选择

Session.DUPS_OK_ACKNOWLEDGE

在Topic模式下做批量签收时用的,可以提高性能。但是某些情况消息可能会被重复提交,使用这种模式的consumer要可以处理重复提交的问题。

 

 5 持久化与非持久化

通过producer.setDeliveryMode(DeliveryMode.PERSISTENT) 进行设置

持久化的好处就是当activemq宕机的话,消息队列中的消息不会丢失。非持久化会丢失。但是会消耗一定的性能。

想要了解跟多关于Java培训课程内容欢迎关注尚硅谷Java培训,尚硅谷除了这些技术文章外还有免费的高质量Java培训课程视频供广大学员下载学习