spark-streaming整合kafka
spark-streaming整合kafka 基于java实现消息流批处理。
本次计划用在 搜索商品 —> 发送 (用户ID,商品ID) —> Kafka —> spark-streaming —> 商品推荐算法 —> Kafka —> 更改推荐商品队列
1. 环境搭建
win10, zookeeper3.5.5 , kafka2.6.0 , spark 2.4.7 , java1.8
zookeeper 资源调度,使用kafka的前提
kafka 消息队列
docker安装详情参见 —> 常用环境Docker运维
spark 计算引擎
此次先使用win10本地安装调试,Linux安装参照docker安装或直接解压安装
2. 设置kafka消息发送
3. pom依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>2.4.7</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>2.4.7</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>2.4.7</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.4.7</version> </dependency>
|
4. java代码实现spark本地执行,并接收kafka消息
- 单纯本地启动spark,前提是完成上述的spark本地安装调试环境
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SparkSession;
public class Test {
public static void main(String[] args) { SparkSession spark = SparkSession .builder() .appName("Java Spark SQL basic example") .config("spark.some.config.option", "some-value") .master("local") .getOrCreate();
Dataset<String> file = spark.read().textFile("src/main/resources/test.txt"); file.show(); } }
|
- spark-streaming 接收 kafka消息流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import scala.Tuple2;
import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Map;
public class Test3 { public static void main(String[] args) throws InterruptedException {
String brokers = "192.168.1.1:9092"; String topic = "Test";
SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]"); JavaStreamingContext jssc = new JavaStreamingContext( sparkConf, Durations.seconds(5));
Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", brokers); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "jis"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList(topic);
final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream( jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topics,kafkaParams) );
JavaPairDStream<String, String> pairDS = stream.mapToPair( new PairFunction<ConsumerRecord<String, String>, String, String>() { @Override public Tuple2<String, String> call(ConsumerRecord<String, String> record) { return new Tuple2<>(record.key(), record.value()); } }); pairDS.print(); jssc.start(); jssc.awaitTermination(); } }
|
成功显示:localhost:4040