大数据培训之JavaAPI案例消费者Consumer

发布时间:2020年04月08日作者:atguigu浏览次数:564

消费者Consumer(消费一次就关闭)

1)消费主题为topic2的数据(获取一次,就关闭会话

package com.atguigu;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * Created by wss on 2019/4/16
 */
public class PulConsumer {

    private Client client;
    private Consumer consumer;

    public PulConsumer(String topic, String subscription) throws PulsarClientException {
        client = new Client();//创建pulsar客户端
        consumer = createConsumer(topic, subscription);//创建消费者
    }

    private Consumer createConsumer(String topic, String subscription) throws PulsarClientException {

        return client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription)
                .ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();

//使用独占方式消费数据
    }

    public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException {
        /***
         *
获取一次,就关闭会话
         */
        // Wait for a message
       
System.out.printf(“Start pulsar”);
        CompletableFuture<Message> msg = consumer.receiveAsync();

        // System.out.printf(“Message received: %s”, new String(msg.get().getData()));
       
String result = “topic is: ” + msg.get().getTopicName() + “,data is: ” + new String(msg.get().getData());

        // Acknowledge the message so that it can be deleted by the message broker
       
consumer.acknowledge(msg.get());
        consumer.close();
        client.Close();
        return result;
    }

    public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
        PulConsumer consumer = new PulConsumer(“topic2”, “my-sub”);
        String reString = consumer.getMessage();
        System.out.println(reString);

    }
}

 


上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训 UI/UE设计培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦6层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)