尚硅谷大数据技术之Kafka第4章 Kafka API实战

第4章 Kafka API实战

4.1 环境准备

1)启动zk和kafka集群,在kafka集群中打开一个消费者

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \

--zookeeper hadoop102:2181 --topic first

2)导入pom依赖

<dependencies>

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->

    <dependency>

        <groupId>org.apache.kafka</groupId>

        <artifactId>kafka-clients</artifactId>

        <version>0.11.0.0</version>

    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->

    <dependency>

        <groupId>org.apache.kafka</groupId>

        <artifactId>kafka_2.12</artifactId>

        <version>0.11.0.0</version>

    </dependency>

</dependencies>

4.2 Kafka生产者Java API

4.2.1 创建生产过时的API)

package com.atguigu.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

public class OldProducer {

@SuppressWarnings("deprecation")

public static void main(String[] args) {

Properties properties = new Properties();

properties.put("metadata.broker.list", "hadoop102:9092");

properties.put("request.required.acks", "1");

properties.put("serializer.class", "kafka.serializer.StringEncoder");

Producer<Integer, String> producer = new Producer<Integer,String>(new ProducerConfig(properties));

KeyedMessage<Integer, String> message = new KeyedMessage<Integer, String>("first", "hello world");

producer.send(message );

}

}

4.2.2 创建生产者(新API

package com.atguigu.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

public class NewProducer {

public static void main(String[] args) {

Properties props = new Properties();

// Kafka服务端的主机名和端口号

props.put("bootstrap.servers", "hadoop103:9092");

// 等待所有副本节点的应答

props.put("acks", "all");

// 消息发送最大尝试次数

props.put("retries", 0);

// 一批消息处理大小

props.put("batch.size", 16384);

// 请求延时

props.put("linger.ms", 1);

// 发送缓存区内存大小

props.put("buffer.memory", 33554432);

// key序列化

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// value序列化

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

for (int i = 0; i < 50; i++) {

producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i));

}

producer.close();

}

}

4.2.3 创建生产者回调函数(新API

package com.atguigu.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

public class CallBackProducer {

public static void main(String[] args) {

Properties props = new Properties();

// Kafka服务端的主机名和端口号

props.put("bootstrap.servers", "hadoop103:9092");

// 等待所有副本节点的应答

props.put("acks", "all");

// 消息发送最大尝试次数

props.put("retries", 0);

// 一批消息处理大小

props.put("batch.size", 16384);

// 增加服务端请求延时

props.put("linger.ms", 1);

// 发送缓存区内存大小

props.put("buffer.memory", 33554432);

// key序列化

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// value序列化

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

for (int i = 0; i < 50; i++) {

kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() {

@Override

public void onCompletion(RecordMetadata metadata, Exception exception) {

if (metadata != null) {

System.err.println(metadata.partition() + "---" + metadata.offset());

}

}

});

}

kafkaProducer.close();

}

}

4.2.4 自定义分区生产者

0)需求:将所有数据存储到topic的第0号分区上

1)定义一个类实现Partitioner接口,重写里面的方法(过时API)

package com.atguigu.kafka;

import java.util.Map;

import kafka.producer.Partitioner;

public class CustomPartitioner implements Partitioner {

public CustomPartitioner() {

super();

}

@Override

public int partition(Object key, int numPartitions) {

// 控制分区

return 0;

}

}

2)自定义分区(新API)

package com.atguigu.kafka;

import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;

import org.apache.kafka.common.Cluster;

public class CustomPartitioner implements Partitioner {

@Override

public void configure(Map<String, ?> configs) {

}

@Override

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // 控制分区

return 0;

}

@Override

public void close() {

}

}

3)在代码中调用

package com.atguigu.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

public class PartitionerProducer {

public static void main(String[] args) {

Properties props = new Properties();

// Kafka服务端的主机名和端口号

props.put("bootstrap.servers", "hadoop103:9092");

// 等待所有副本节点的应答

props.put("acks", "all");

// 消息发送最大尝试次数

props.put("retries", 0);

// 一批消息处理大小

props.put("batch.size", 16384);

// 增加服务端请求延时

props.put("linger.ms", 1);

// 发送缓存区内存大小

props.put("buffer.memory", 33554432);

// key序列化

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// value序列化

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 自定义分区

props.put("partitioner.class", "com.atguigu.kafka.CustomPartitioner");

Producer<String, String> producer = new KafkaProducer<>(props);

producer.send(new ProducerRecord<String, String>("first", "1", "atguigu"));

producer.close();

}

}

4)测试

(1)在hadoop102上监控/opt/module/kafka/logs/目录下first主题3个分区的log日志动态变化情况

[atguigu@hadoop102 first-0]$ tail -f 00000000000000000000.log

[atguigu@hadoop102 first-1]$ tail -f 00000000000000000000.log

[atguigu@hadoop102 first-2]$ tail -f 00000000000000000000.log

(2)发现数据都存储到指定的分区了。