大数据培训项目实时系统联调

发布时间:2021年12月22日作者:atguigu浏览次数:177

实时系统联调

我们的系统实时推荐的数据流向是:业务系统 -> 日志 -> flume 日志采集 -> kafka streaming数据清洗和预处理 -> spark streaming 流式计算。在我们完成实时推荐服务的代码后,应该与其它工具进行联调测试,确保系统正常运行。

<p style=’display:none;’>大数据培训</p>

1 启动实时系统的基本组件

启动实时推荐系统StreamingRecommender以及mongodb、redis

2 启动zookeeper

bin/zkServer.sh start

3 启动kafka

bin/kafka-server-start.sh -daemon ./config/server.properties

4 构建Kafka Streaming程序

在recommender下新建module,KafkaStreaming,主要用来做日志数据的预处理,过滤出需要的内容。pom.xml文件需要引入依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.10.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.1</version>
    </dependency>
</dependencies>

<build>
    <finalName>kafkastream</finalName>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <archive>
                    <manifest>
                        <mainClass>com.atguigu.kafkastream.Application</mainClass>
                    </manifest>
                </archive>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

在src/main/java下新建java类com.atguigu.kafkastreaming.Application

public class Application {
    public static void main(String[] args){

        String brokers = “localhost:9092”;
        String zookeepers = “localhost:2181”;

        // 定义输入和输出的topic
       
String from = “log”;
        String to = “recommender”;

        // 定义kafka streaming的配置
       
Properties settings = new Properties();
        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, “logFilter”);
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers);

        StreamsConfig config = new StreamsConfig(settings);

        // 拓扑建构器
       
TopologyBuilder builder = new TopologyBuilder();

        // 定义流处理的拓扑结构
       
builder.addSource(“SOURCE”, from)
                .addProcessor(“PROCESS”, () -> new LogProcessor(), “SOURCE”)
                .addSink(“SINK”, to, “PROCESS”);

        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
    }
}

这个程序会将topic为“log”的信息流获取来做处理,并以“recommender”为新的topic转发出去。

流处理程序 LogProcess.java

public class LogProcessor implements Processor<byte[],byte[]> {
    private ProcessorContext context;

    public void init(ProcessorContext context) {
        this.context = context;
    }

    public void process(byte[] dummy, byte[] line) {
        String input = new String(line);
        // 根据前缀过滤日志信息,提取后面的内容
       
if(input.contains(“MOVIE_RATING_PREFIX:”)){
            System.out.println(“movie rating coming!!!!” + input);
            input = input.split(“MOVIE_RATING_PREFIX:”)[1].trim();
            context.forward(“logProcessor”.getBytes(), input.getBytes());
        }
    }
    public void punctuate(long timestamp) {
    }
    public void close() {
    }
}

完成代码后,启动Application。

5 配置并启动flume

在flume的conf目录下新建log-kafka.properties,对flume连接kafka做配置:

agent.sources = exectail

agent.channels = memoryChannel

agent.sinks = kafkasink

 

# For each one of the sources, the type is defined

agent.sources.exectail.type = exec

# 下面这个路径是需要收集日志的绝对路径,改为自己的日志目录

agent.sources.exectail.command = tail –f

/mnt/d/Projects/BigData/MovieRecommenderSystem/businessServer/src/main/log/agent.log

agent.sources.exectail.interceptors=i1

agent.sources.exectail.interceptors.i1.type=regex_filter

# 定义日志过滤前缀的正则

agent.sources.exectail.interceptors.i1.regex=.+MOVIE_RATING_PREFIX.+

# The channel can be defined as follows.

agent.sources.exectail.channels = memoryChannel

 

# Each sink’s type must be defined

agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink

agent.sinks.kafkasink.kafka.topic = log

agent.sinks.kafkasink.kafka.bootstrap.servers = localhost:9092

agent.sinks.kafkasink.kafka.producer.acks = 1

agent.sinks.kafkasink.kafka.flumeBatchSize = 20

 

#Specify the channel the sink should use

agent.sinks.kafkasink.channel = memoryChannel

 

# Each channel’s type is defined.

agent.channels.memoryChannel.type = memory

 

# Other config values specific to each type of channel(sink or source)

# can be defined as well

# In this case, it specifies the capacity of the memory channel

agent.channels.memoryChannel.capacity = 10000

配置好后,启动flume:

./bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent -Dflume.root.logger=INFO,console

6 启动业务系统后台

将业务代码加入系统中。注意在src/main/resources/ 下的 log4j.properties中,log4j.appender.file.File的值应该替换为自己的日志目录,与flume中的配置应该相同。

启动业务系统后台,访问localhost:8088/index.html;点击某个电影进行评分,查看实时推荐列表是否会发生变化。

想要了解跟多关于大数据培训课程内容欢迎关注尚硅谷大数据培训,尚硅谷除了这些技术文章外还有免费的高质量大数据培训课程视频供广大学员下载学习。


上一篇:
下一篇:
相关课程

java培训 大数据培训 前端培训 UI/UE设计培训

关于尚硅谷
教育理念
名师团队
学员心声
资源下载
视频下载
资料下载
工具下载
加入我们
招聘岗位
岗位介绍
招贤纳师
联系我们
全国统一咨询电话:010-56253825
地址:北京市昌平区宏福科技园2号楼3层(北京校区)

深圳市宝安区西部硅谷大厦B座C区一层(深圳校区)

上海市松江区谷阳北路166号大江商厦6层(上海校区)

武汉市东湖高新开发区东湖网谷(武汉校区)

西安市雁塔区和发智能大厦B座3层(西安校区)