尚硅谷大数据技术之电信客服
3.2 数据采集/消费(存储)
欢迎来到数据采集模块(消费),在企业中你要清楚流式数据采集框架flume和kafka的定位是什么。我们在此需要将实时数据通过flume采集到kafka然后供给给hbase消费。
flume:cloudera公司研发
适合下游数据消费者不多的情况;
适合数据安全性要求不高的操作;
适合与Hadoop生态圈对接的操作。
kafka:linkedin公司研发
适合数据下游消费众多的情况;
适合数据安全性要求较高的操作(支持replication);
因此我们常用的一种模型是:
线上数据 --> flume --> kafka --> flume(根据情景增删该流程) --> HDFS
消费存储模块流程如图2所示:
图2 消费存储模块流程图
3.2.1 数据采集
思路:
- a) 配置kafka,启动zookeeper和kafka集群;
- b) 创建kafka主题;
- c) 启动kafka控制台消费者(此消费者只用于测试使用);
- d)配置flume,监控日志文件;
- e) 启动flume监控任务;
- f)运行日志生产脚本;
- g)观察测试。
1)启动zookeeper,kafka集群
$/opt/module/kafka/bin/kafka-server-start.sh /opt/module/kafka/config/server.properties |
2)创建kafka主题
$ /opt/module/kafka/bin/kafka-topics.sh --zookeeper hadoop102:2181 --topic calllog --create --replication-factor 1 --partitions 3 |
检查一下是否创建主题成功:
$ /opt/module/kafka/bin/kafka-topics.sh --zookeeper hadoop102:2181 --list |
3)启动kafka控制台消费者,等待flume信息的输入
$ /opt/module/kafka/bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 -topic calllog --from-beginning |
4)配置flume(flume-kafka.conf)
# define a1.sources = r1 a1.sinks = k1 a1.channels = c1
# source a1.sources.r1.type = exec a1.sources.r1.command = tail -F -c +0 /home/atguigu/call/calllog.csv a1.sources.r1.shell = /bin/bash -c
# sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k1.kafka.topic = calllog a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1
# channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
5)启动flume
$ /opt/module/flume/bin/flume-ng agent --conf /opt/module/flume/conf/ --name a1 --conf-file /home/atguigu/calllog/flume2kafka.conf |
6)运行生产日志的任务脚本,观察kafka控制台消费者是否成功显示产生的数据
$ sh /home/atguigu/calllog/productlog.sh |