Springboot整合Kafka Stream实时统计数据

环境:springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2

Kafka Stream介绍

Kafka在0.10版本推出了Stream API,整合提供了对存储在Kafka内的实时统数据进行流式处理和分析的能力。

流式计算一般被用来和批量计算做比较。计数据批量计算往往有一个固定的整合数据集作为输入并计算结果。而流式计算的实时统输入往往是“无界”的(Unbounded Data),持续输入的计数据,即永远拿不到全量数据去做计算;同时,整合计算结果也是实时统持续输出的,只能拿到某一个时刻的计数据结果,而不是整合最终的结果。

Kafka Streams是实时统一个客户端类库,用于处理和分析存储在Kafka中的计数据数据。它建立在流式处理的整合一些重要的概念之上:如何区分事件时间和处理时间、Windowing的实时统支持、简单高效的计数据管理和实时查询应用程序状态。

Kafka Streams的云南idc服务商门槛非常低:和编写一个普通的Kafka消息处理程序没有太大的差异,可以通过多进程部署来完成扩容、负载均衡、高可用(Kafka Consumer的并行模型)。

Kafka Streams的一些特点:

被设计成一个简单的、轻量级的客户端类库,能够被集成到任何Java应用中 除了Kafka之外没有任何额外的依赖,利用Kafka的分区模型支持水平扩容和保证顺序性 通过可容错的状态存储实现高效的状态操作(windowed joins and aggregations) 支持exactly-once语义 支持纪录级的处理,实现毫秒级的延迟 提供High-Level的Stream DSL和Low-Level的Processor API

Stream Processing Topology流处理拓扑

流是Kafka Streams提供的最重要的抽象:它表示一个无限的、不断更新的数据集。流是不可变数据记录的有序、可重放和容错序列,源码下载其中数据记录定义为键值对。 Stream Processing Application是使用了Kafka Streams库的应用程序。它通过processor topologies定义计算逻辑,其中每个processor topology都是多个stream processor(节点)通过stream组成的图。 A stream processor 是处理器拓扑中的节点;它表示一个处理步骤,通过每次从拓扑中的上游处理器接收一个输入记录,将其操作应用于该记录,来转换流中的数据,并且随后可以向其下游处理器生成一个或多个输出记录。

有两种特殊的processor:

Source Processor 源处理器是一种特殊类型的流处理器,它没有任何上游处理器。它通过使用来自一个或多个kafka topic的记录并将其转发到其下游处理器,从而从一个或多个kafka topic生成其拓扑的输入流。

Sink Processor 接收器处理器是一种特殊类型的流处理器,没有下游处理器。它将从其上游处理器接收到的任何记录发送到指定的kafka topic。亿华云

相关的核心概念查看如下链接

下面演示Kafka Stream 在Springboot中的应用

依赖

<dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-web</artifactId>   </dependency> <dependency>   <groupId>org.springframework.kafka</groupId>   <artifactId>spring-kafka</artifactId> </dependency> <dependency>   <groupId>org.apache.kafka</groupId>   <artifactId>kafka-streams</artifactId> </dependency> 

配置

server:   port: 9090 spring:   application:     name: kafka-demo   kafka:     streams:       application-id: ${ spring.application.name}       properties:         spring.json.trusted.packages: *     bootstrap-servers:     - localhost:9092     - localhost:9093     - localhost:9094     producer:       acks: 1       retries: 10       key-serializer: org.apache.kafka.common.serialization.StringSerializer       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer #org.apache.kafka.common.serialization.StringSerializer       properties:         spring.json.trusted.packages: *     consumer:       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer #org.apache.kafka.common.serialization.StringDeserializer       enable-auto-commit: false       group-id: ConsumerTest       auto-offset-reset: latest       properties:         session.timeout.ms: 12000         heartbeat.interval.ms: 3000         max.poll.records: 100         spring.json.trusted.packages: *     listener:       ack-mode: manual-immediate       type: batch       concurrency: 8     properties:       max.poll.interval.ms: 300000 

消息发送

@Service public class MessageSend {    @Resource   private KafkaTemplate<String, Message> kafkaTemplate ;   public void sendMessage2(Message message) {      kafkaTemplate.send(new ProducerRecord<String, Message>("test", message)).addCallback(result -> {        System.out.println("执行成功..." + Thread.currentThread().getName()) ;     }, ex -> {        System.out.println("执行失败") ;       ex.printStackTrace() ;     }) ;   } } 

消息监听

@KafkaListener(topics = { "test"}) public void listener2(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) {    for (ConsumerRecord<String, Message> record : records) {      System.out.println(this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ;   }   try {      TimeUnit.SECONDS.sleep(0) ;   } catch (InterruptedException e) {      e.printStackTrace();   }   ack.acknowledge() ; } @KafkaListener(topics = { "demo"}) public void listenerDemo(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) {    for (ConsumerRecord<String, Message> record : records) {      System.out.println("Demo Topic: " + this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName() + ", key: " + record.key() + ", 接收到消息:" + record.value() + ", patition: " + record.partition() + ", offset: " + record.offset()) ;   }   ack.acknowledge() ; } 

Kafka Stream处理

消息转换并转发其它Topic

@Bean public KStream<Object, Object> kStream(StreamsBuilder streamsBuilder) {    KStream<Object, Object> stream = streamsBuilder.stream("test");   stream.map((key, value) -> {      System.out.println("原始消息内容:" + new String((byte[]) value, Charset.forName("UTF-8"))) ;     return new KeyValue<>(key, "{ \"title\": \"123123\", \"message\": \"重新定义内容\"}".getBytes(Charset.forName("UTF-8"))) ;   }).to("demo") ;   return stream; } 

执行结果:

Stream对象处理

@Bean public KStream<String, Message> kStream4(StreamsBuilder streamsBuilder) {    JsonSerde<Message> jsonSerde = new JsonSerde<>() ;   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;   descri.addTrustedPackages("*") ;   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));   stream.map((key, value) -> {      value.setTitle("XXXXXXX") ;     return new KeyValue<>(key, value) ;   }).to("demo", Produced.with(Serdes.String(), jsonSerde)) ;   return stream; } 

执行结果:

分组处理

@Bean public KStream<String, Message> kStream5(StreamsBuilder streamsBuilder) {    JsonSerde<Message> jsonSerde = new JsonSerde<>() ;   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;   descri.addTrustedPackages("*") ;   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));   stream.selectKey(new KeyValueMapper<String, Message, String>() {      @Override     public String apply(String key, Message value) {        return value.getOrgCode() ;     }   })   .groupByKey(Grouped.with(Serdes.String(), jsonSerde))   .count()   .toStream().print(Printed.toSysOut());   return stream; } 

执行结果:

聚合

@Bean public KStream<String, Message> kStream6(StreamsBuilder streamsBuilder) {    JsonSerde<Message> jsonSerde = new JsonSerde<>() ;   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;   descri.addTrustedPackages("*") ;   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));   stream.selectKey(new KeyValueMapper<String, Message, String>() {      @Override     public String apply(String key, Message value) {        return value.getOrgCode() ;     }   })   .groupByKey(Grouped.with(Serdes.String(), jsonSerde))   .aggregate(() -> 0L, (key, value ,aggValue) -> {      System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ;     return aggValue + 1 ;   }, Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long()))   .toStream().print(Printed.toSysOut());   return stream; } 

执行结果:

Filter过滤数据

@Bean public KStream<String, Message> kStream7(StreamsBuilder streamsBuilder) {    JsonSerde<Message> jsonSerde = new JsonSerde<>() ;   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;   descri.addTrustedPackages("*") ;   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));   stream.selectKey(new KeyValueMapper<String, Message, String>() {      @Override     public String apply(String key, Message value) {        return value.getOrgCode() ;     }   })   .groupByKey(Grouped.with(Serdes.String(), jsonSerde))   .aggregate(() -> 0L, (key, value ,aggValue) -> {      System.out.println("key = " + key + ", value = " + value + ", agg = " + aggValue) ;     return aggValue + 1 ;   }, Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long()))   .toStream()   .filter((key, value) -> !"2".equals(key))   .print(Printed.toSysOut());   return stream; } 

执行结果:

过滤Key不等于"2"

分支多流处理

@Bean public KStream<String, Message> kStream8(StreamsBuilder streamsBuilder) {    JsonSerde<Message> jsonSerde = new JsonSerde<>() ;   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;   descri.addTrustedPackages("*") ;   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));   // 分支,多流处理   KStream<String, Message>[] arrStream = stream.branch(     (key, value) -> "男".equals(value.getSex()),      (key, value) -> "女".equals(value.getSex()));   Stream.of(arrStream).forEach(as -> {      as.foreach((key, message) -> {        System.out.println(Thread.currentThread().getName() + ", key = " + key + ", message = " + message) ;     });   });   return stream; } 

执行结果:

多字段分组

不能使用多个selectKey,后面的会覆盖前面的

@Bean public KStream<String, Message> kStreamM2(StreamsBuilder streamsBuilder) {    JsonSerde<Message> jsonSerde = new JsonSerde<>() ;   JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer() ;   descri.addTrustedPackages("*") ;   KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));   stream   .selectKey(new KeyValueMapper<String, Message, String>() {      @Override     public String apply(String key, Message value) {        System.out.println(Thread.currentThread().getName()) ;       return value.getTime() + " | " + value.getOrgCode() ;     }   })   .groupByKey(Grouped.with(Serdes.String(), jsonSerde))   .count()   .toStream().print(Printed.toSysOut());   return stream; } 

执行结果:

数据库
上一篇:Precision 7920的澎湃动力 让《暗黑破坏神:不朽》的角色栩栩如生
下一篇:新华三绿洲平台多维度入选2022爱分析DataOps厂商全景报告