SpringBoot整合Kafka

SpringBoot整合Kafka, 实现消息解耦传输。

1. 环境搭建

win10, zookeeper3.5.5 , kafka2.6.0 , java1.8

  1. zookeeper 资源调度,使用kafka的前提

  2. kafka 消息队列

    docker安装详情参见 —> 常用环境Docker运维

2. pom依赖

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

3. 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

server:
port: 8080

spring:
kafka:
bootstrap-servers: 192.168.1.1:9092
listener:
concurrency: 10
ack-mode: MANUAL_IMMEDIATE
poll-timeout: 1500
consumer:
group-id: kafka_cluster1
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties: {session.timeout.ms: 6000, auto.offset.reset: earliest}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
batch-size: 65536
buffer-memory: 524288

4.SpringBoot中接发kafka消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package io.kid1999.kafkatest.api;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
* @author kid1999
* @create 2021-02-07 15:59
* @description TODO
**/

@RestController
public class TestApi {

// kafka模板
@Resource
private KafkaTemplate<String,Object> kafkaTemplate;

// 发送消息 给 Topic 为 Test 的消息队列
@PostMapping(value="message/send")
public String send(@RequestParam String message) {
System.out.println(message);
kafkaTemplate.send("Test", message);
return "success";
}


// 监听来自 Topic 为 Test 的消息
@KafkaListener(topics = "Test")
public void listen (ConsumerRecord<?, ?> record){
System.out.printf("topic is %s, offset is %d, value is %s \n", record.topic(), record.offset(), record.value());
}



// 花式发送消息
public void testTemplateSend() {
//1 发送带有时间戳的消息
kafkaTemplate.send("Test", 0, System.currentTimeMillis(), String.valueOf(0), "send message with timestamp");
//2 使用ProducerRecord发送消息
ProducerRecord record = new ProducerRecord("Test", "use ProducerRecord to send message");
kafkaTemplate.send(record);

//3 使用Spring框架Message类发送消息
Map map = new HashMap();
map.put(KafkaHeaders.TOPIC, "Test");
map.put(KafkaHeaders.PARTITION_ID, 0);
map.put(KafkaHeaders.MESSAGE_KEY, String.valueOf(0));
GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map));
kafkaTemplate.send(message);
}


}