SpringBoot整合Kafka
SpringBoot整合Kafka, 实现消息解耦传输。
1. 环境搭建
win10, zookeeper3.5.5 , kafka2.6.0 , java1.8
zookeeper 资源调度,使用kafka的前提
kafka 消息队列
docker安装详情参见 —> 常用环境Docker运维
2. pom依赖
1 2 3 4 5 6 7 8 9 10 11 12
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
|
3. 配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| server: port: 8080
spring: kafka: bootstrap-servers: 192.168.1.1:9092 listener: concurrency: 10 ack-mode: MANUAL_IMMEDIATE poll-timeout: 1500 consumer: group-id: kafka_cluster1 enable-auto-commit: false key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: {session.timeout.ms: 6000, auto.offset.reset: earliest} producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer batch-size: 65536 buffer-memory: 524288
|
4.SpringBoot中接发kafka消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| package io.kid1999.kafkatest.api;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.GenericMessage; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import java.util.HashMap; import java.util.Map;
@RestController public class TestApi {
@Resource private KafkaTemplate<String,Object> kafkaTemplate;
@PostMapping(value="message/send") public String send(@RequestParam String message) { System.out.println(message); kafkaTemplate.send("Test", message); return "success"; }
@KafkaListener(topics = "Test") public void listen (ConsumerRecord<?, ?> record){ System.out.printf("topic is %s, offset is %d, value is %s \n", record.topic(), record.offset(), record.value()); }
public void testTemplateSend() { kafkaTemplate.send("Test", 0, System.currentTimeMillis(), String.valueOf(0), "send message with timestamp"); ProducerRecord record = new ProducerRecord("Test", "use ProducerRecord to send message"); kafkaTemplate.send(record);
Map map = new HashMap(); map.put(KafkaHeaders.TOPIC, "Test"); map.put(KafkaHeaders.PARTITION_ID, 0); map.put(KafkaHeaders.MESSAGE_KEY, String.valueOf(0)); GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map)); kafkaTemplate.send(message); }
}
|