spark-streaming整合kafka

spark-streaming整合kafka 基于java实现消息流批处理。

本次计划用在 搜索商品 —> 发送 (用户ID,商品ID) —> Kafka —> spark-streaming —> 商品推荐算法 —> Kafka —> 更改推荐商品队列

1. 环境搭建

win10, zookeeper3.5.5 , kafka2.6.0 , spark 2.4.7 , java1.8

  1. zookeeper 资源调度,使用kafka的前提

  2. kafka 消息队列

    docker安装详情参见 —> 常用环境Docker运维

  3. spark 计算引擎

    此次先使用win10本地安装调试,Linux安装参照docker安装或直接解压安装

2. 设置kafka消息发送

3. pom依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.7</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.7</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.7</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.7</version>
</dependency>

4. java代码实现spark本地执行,并接收kafka消息

  1. 单纯本地启动spark,前提是完成上述的spark本地安装调试环境
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;

/**
* @author kid1999
* @create 2021-02-07 10:11
* @description TODO
**/
public class Test {

public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value") // 各种配置
.master("local")
.getOrCreate();

// 读取test.txt文件内容并显示
Dataset<String> file = spark.read().textFile("src/main/resources/test.txt");
file.show();
}
}
  1. spark-streaming 接收 kafka消息流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
* @author kid1999
* @create 2021-02-08 21:19
* @description TODO
**/
public class Test3 {
public static void main(String[] args) throws InterruptedException {


String brokers = "192.168.1.1:9092";
String topic = "Test";


SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]");
// 每隔5秒钟,咱们的spark streaming作业就会收集最近5秒内的数据源接收过来的数据
JavaStreamingContext jssc = new JavaStreamingContext(
sparkConf, Durations.seconds(5));


// 基于kafka direct api模式,构建出了针对kafka集群中指定topic的输入DStream
// 两个值,val1,val2;val1没有什么特殊的意义;val2中包含了kafka topic中的一条一条的实时日志数据
Map<String, Object> kafkaParams = new HashMap<>();
//Kafka服务监听端口
kafkaParams.put("bootstrap.servers", brokers);
//指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
kafkaParams.put("key.deserializer", StringDeserializer.class);
//指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
kafkaParams.put("value.deserializer", StringDeserializer.class);
//消费者ID,随意指定
kafkaParams.put("group.id", "jis");
//指定从latest(最新,其他版本的是largest这里不行)还是smallest(最早)处开始读取数据
kafkaParams.put("auto.offset.reset", "latest");
//如果true,consumer定期地往zookeeper写入每个分区的offset
kafkaParams.put("enable.auto.commit", false);


// 构建topic set
Collection<String> topics = Arrays.asList(topic);

final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics,kafkaParams)
);

JavaPairDStream<String, String> pairDS = stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
return new Tuple2<>(record.key(), record.value());
}
});
// 打印 消息的key 和value
pairDS.print();
jssc.start();
jssc.awaitTermination();
}
}

成功显示:localhost:4040

image-20210208231052331