OUT了吧,Kafka能实现消息延时了_kafka消息长度限制

OUT了吧,Kafka能实现消息延时了_kafka消息长度限制

经验文章nimo972025-02-27 13:02:5710A+A-

摘要:本文讲述如何在保存Kafka特有能力的情况下给Kafka扩充一个具有能处理延时消息场景的能力。

本文分享自华为云社区《Kafka也能实现消息延时了?-云社区-华为云》,作者:HuaweiCloudDeveloper 。

1、背景

Kafka是一个拥有高吞吐、可持久化、可水平扩展,支持流式数据处理等多种特性的分布式消息流处理中间件,采用分布式消息发布与订阅机制,在日志收集、流式数据传输、在线/离线系统分析、实时监控等领域有广泛的应用,Kafka它虽有以上这么多的应用场景和优点,但也具备其缺陷,比如在延时消息场景下,Kafka就不具备这种能力,因此希望能在保存Kafka特有能力的情况下给Kafka扩充一个具有能处理延时消息场景的能力。

2、开发环境

3、云服务介绍

分布式消息服务Kafka版: 华为云分布式消息服务Kafka版是一款基于开源社区版Kafka提供的消息队列服务,向用户提供计算、存储和带宽资源独占式的Kafka专享实例。使用华为云分布式消息服务Kafka版,资源按需申请,按需配置Topic的分区与副本数量,即买即用,您将有更多精力专注于业务快速开发,不用考虑部署和运维。

4、方案设计

i、方案简述

此方案实现,需要借助两个Topic来进行实现,一个Topic用于及时接收生产者们所产生的消息,另一个Topic则用于消费者拉取消息进行消费。另外在这两个Topic之间加上一个队列用于做延时的逻辑判断,如果消息满足了延时的条件,则将队列中的消息生产至我们的消费者需要拉取的Topic中。

ii、方案架构图

Kafka消息延时方案架构图

Kafka消息延时实现思路

  1. 生产者将生产消息存入topic_delay主题中进行存储。
  2. 将topic_delay主题中的所有消息拉取至ConcurrentLinkedQueue队列中。
  3. 取值判断是否满足延时要求。
  4. a. 如果满足延时要求,则将消息生产至topic_out主题中,并将queue队列中的值移除。
  5. b. 如果不满足延时要求,则等待自定义时间后重试判断。
  6. 消费者最终从topic_out主题中拉取消息进行消费。

三、方案时序图

Kafka消息延时方案时序图

5、代码参数指南

本项目中起到延时作用的类Delay.java其余类为官方提供用于测试生产和消费消息, 如需使用官方测试的使用的生产消费代码相关配置介绍可以参考
https://support.huaweicloud.com/devg-kafka/how-to-connect-kafka.html 。 如需使用自己配置的生产者消费者,只配置Delay.java中的参数即可。

Delay.java参数详情

  1. delay:自定义延时时间,单位ms。
  2. topic_delay变量:用于临时存储消息的topic名称。
  3. topic_out变量:用于消费者拉取消息消费的topic名称。
  4. 关于消费者和生产者配置可按需配置,可参考Kafka官方文档:https://kafka.apache.org/documentation/#producerconfigs

6、代码实现

实现代码可参考Kafka消息延时


package com.dms.delay;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.time.Duration;

import java.util.Arrays;

import java.util.Date;

import java.util.Properties;

import java.util.concurrent.ConcurrentLinkedQueue;

/**

* Hello world!

*

*/

public class Delay {

//缓存队列

public static ConcurrentLinkedQueue> link = new ConcurrentLinkedQueue();

//延迟时间(20秒),可根据需要设置延迟大小

public static long delay = 20000L;

/**

*入口

* @param args

*/

public static void main( String[] args )

{

//延时主题(用于控制延时缓冲)

String topic_delay = "topic_delay";

//输出主题(直接供消费者消费)

String topic_out = "topic_out";

/*

消费线程

*/

new Thread(new Runnable() {

@Override

public void run() {

//消费者配置。请根据需要自行设置Kafka配置

Properties props = new Properties();

props.setProperty("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092");

props.setProperty("group.id", "test");

props.setProperty("enable.auto.commit", "true");

props.setProperty("auto.commit.interval.ms", "1000");

props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

//创建消费者

KafkaConsumer consumer = new KafkaConsumer<>(props);

//指定消费主题

consumer.subscribe(Arrays.asList(topic_delay));

while (true) {

//轮询消费

ConsumerRecords records = consumer.poll(Duration.ofMillis(10));

//遍历当前轮询批次拉取到的消息

for (ConsumerRecord record : records){

System.out.println(record);

//将消息添加到缓存队列

link.add(record);

}

}

}

}).start();

/*

生产线程

*/

new Thread(new Runnable() {

@Override

public void run() {

//生产者配置(请根据需求自行配置)

Properties props = new Properties();

props.put("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092");

props.put("linger.ms", 1);

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

//创建生产者

Producer producer = new KafkaProducer<>(props);

//持续从缓存队列中获取消息

while(true){

//如果缓存队列为空则放缓取值速度

if(link.isEmpty()){

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}

continue;

}

//获取缓存队列栈顶消息

ConsumerRecord record = link.peek();

//获取该消息时间戳

long timestamp = record.timestamp();

Date now = new Date();

long nowTime = now.getTime();

if(timestamp+ Delay.delay (topic_out, "",value));

//从缓存队列中移除该消息

link.poll();

}else {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

}).start();

}

}

7、结果反馈

点击下方,第一时间了解华为云新鲜技术~

华为云博客_大数据博客_AI博客_云计算博客_开发者中心-华为云

点击这里复制本文地址 以上内容由nimo97整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

尼墨宝库 © All Rights Reserved.  蜀ICP备2024111239号-7