Kafka高级特性-重试队列
# 2.8 重试队列
kafka没有重试机制不支持消息重试,也没有死信队列,因此使用kafka做消息队列时,需要自己实现消息重试的功能。
实现
创建新的kafka主题作为重试队列:
- 创建一个topic作为重试topic,用于接收等待重试的消息。
- 普通topic消费者设置待重试消息的下一个重试topic。
- 从重试topic获取待重试消息储存到redis的zset中,并以下一次消费时间排序
- 定时任务从redis获取到达消费事件的消息,并把消息发送到对应的topic
- 同一个消息重试次数过多则不再重试
代码实现
- 新建springboot项目
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.8.RELEASE</version>
<relativePath/>
<!-- lookup parent from repository -->
</parent>
<groupId>com.lagou.kafka.demo</groupId>
<artifactId>demo-retryqueue</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo-retryqueue</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- 添加application.properties
# bootstrap.servers
spring.kafka.bootstrap-servers=node1:9092
# key序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# value序列化器
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 消费组id:group.id
spring.kafka.consumer.group-id=retryGroup
# key反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# value反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# redis数据库编号
spring.redis.database=0
# redis主机地址
spring.redis.host=node1
# redis端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=20
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=-1
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=10
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=0
# 连接超时时间(毫秒)
spring.redis.timeout=1000
# Kafka主题名称
spring.kafka.topics.test=tp_demo_retry_01
# 重试队列
spring.kafka.topics.retry=tp_demo_retry_02
- RetryqueueApplication.java
package com.lagou.kafka.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RetryqueueApplication
{
public static void main(String[] args)
{
SpringApplication.run(RetryqueueApplication.class, args);
}
}
- AppConfig.java
package com.lagou.kafka.demo.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
@Configuration
public class AppConfig
{
@Bean
public RedisTemplate < String, Object > redisTemplate(RedisConnectionFactory factory)
{
RedisTemplate < String, Object > template = new RedisTemplate < > ();
// 配置连接工厂
template.setConnectionFactory(factory);
return template;
}
}
- KafkaController.java
package com.lagou.kafka.demo.controller;
import com.lagou.kafka.demo.service.KafkaService;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ExecutionException;
@RestController
public class KafkaController
{
@Autowired
private KafkaService kafkaService;
@Value("${spring.kafka.topics.test}")
private String topic;
@RequestMapping("/send/{message}")
public String sendMessage(@PathVariable String message) throws ExecutionException, InterruptedException
{
ProducerRecord < String, String > record = new ProducerRecord < > (topic, message);
String result = kafkaService.sendMessage(record);
return result;
}
}
- KafkaService.java
package com.lagou.kafka.demo.service;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.ExecutionException;
@Service
public class KafkaService
{
@Autowired
private KafkaTemplate < String, String > kafkaTemplate;
public String sendMessage(ProducerRecord < String, String > record) throws ExecutionException, InterruptedException
{
SendResult < String, String > result = this.kafkaTemplate.send(record).get();
RecordMetadata metadata = result.getRecordMetadata();
String returnResult = metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset();
System.out.println("发送消息成功:" + returnResult);
return returnResult;
}
}
- ConsumerListener.java
package com.lagou.kafka.demo.listener;
import com.lagou.kafka.demo.service.KafkaRetryService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerListener
{
private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);
@Autowired
private KafkaRetryService kafkaRetryService;
private static int index = 0;
@KafkaListener(topics = "${spring.kafka.topics.test}", groupId = "${spring.kafka.consumer.group-id}")
public void consume(ConsumerRecord < String, String > record)
{
try
{
// 业务处理
log.info("消费的消息:" + record);
index++;
if(index % 2 == 0)
{
throw new Exception("该重发了");
}
}
catch (Exception e)
{
log.error(e.getMessage());
// 消息重试
kafkaRetryService.consumerLater(record);
}
}
}
- KafkaRetryService.java
package com.lagou.kafka.demo.service;
import com.alibaba.fastjson.JSON;
import com.lagou.kafka.demo.entity.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
import java.util.Calendar;
import java.util.Date;
@Service
public class KafkaRetryService
{
private static final Logger log = LoggerFactory.getLogger(KafkaRetryService.class);
/**
* 消息消费失败后下一次消费的延迟时间(秒)
* 第一次重试延迟10秒;第 二次延迟30秒,第三次延迟1分钟...
*/
private static final int[] RETRY_INTERVAL_SECONDS = {
10,
30,
1 * 60,
2 * 60,
5 * 60,
10 * 60,
30 * 60,
1 * 60 * 60,
2 * 60 * 60
};
/**
* 重试topic
*/
@Value("${spring.kafka.topics.retry}")
private String retryTopic;
@Autowired
private KafkaTemplate < String, String > kafkaTemplate;
public void consumerLater(ConsumerRecord < String, String > record)
{
// 获取消息的已重试次数
int retryTimes = getRetryTimes(record);
Date nextConsumerTime = getNextConsumerTime(retryTimes);
// 如果达到重试次数,则不再重试
if(nextConsumerTime == null)
{
return;
}
// 组织消息
RetryRecord retryRecord = new RetryRecord();
retryRecord.setNextTime(nextConsumerTime.getTime());
retryRecord.setTopic(record.topic());
retryRecord.setRetryTimes(retryTimes);
retryRecord.setKey(record.key());
retryRecord.setValue(record.value());
// 转换为字符串
String value = JSON.toJSONString(retryRecord);
// 发送到重试队列
kafkaTemplate.send(retryTopic, null, value);
}
/**
* 获取消息的已重试次数
*/
private int getRetryTimes(ConsumerRecord record)
{
int retryTimes = -1;
for(Header header: record.headers())
{
if(RetryRecord.KEY_RETRY_TIMES.equals(header.key()))
{
ByteBuffer buffer = ByteBuffer.wrap(header.value());
retryTimes = buffer.getInt();
}
}
retryTimes++;
return retryTimes;
}
/**
* 获取待重试消息的下一次消费时间
*/
private Date getNextConsumerTime(int retryTimes)
{
// 重试次数超过上限,不再重试
if(RETRY_INTERVAL_SECONDS.length < retryTimes)
{
return null;
}
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.SECOND, RETRY_INTERVAL_SECONDS[retryTimes]);
return calendar.getTime();
}
}
- RetryListener.java
package com.lagou.kafka.demo.listener;
import com.alibaba.fastjson.JSON;
import com.lagou.kafka.demo.entity.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.UUID;
@Component
@EnableScheduling
public class RetryListener
{
private Logger log = LoggerFactory.getLogger(RetryListener.class);
private static final String RETRY_KEY_ZSET = "_retry_key";
private static final String RETRY_VALUE_MAP = "_retry_value";
@Autowired
private RedisTemplate < String, Object > redisTemplate;
@Autowired
private KafkaTemplate < String, String > kafkaTemplate;
@Value("${spring.kafka.topics.test}")
private String bizTopic;
@KafkaListener(topics = "${spring.kafka.topics.retry}")
// public void consume(List<ConsumerRecord<String, String>> list){
// for(ConsumerRecord<String, String> record : list){
public void consume(ConsumerRecord < String, String > record)
{
System.out.println("需要重试的消息:" + record);
RetryRecord retryRecord = JSON.parseObject(record.value(), RetryRecord.class);
/**
* 防止待重试消息太多撑爆redis,可以将待重试消息按下一次重试时间分开存储放到不同介质
* 例如下一次重试时间在半小时以后的消息储存到mysql,并定时从mysql读取即将重试的消息储储存到redis
*/
// 通过redis的zset进行时间排序
String key = UUID.randomUUID().toString();
redisTemplate.opsForHash().put(RETRY_VALUE_MAP, key, record.value());
redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, key, retryRecord.getNextTime());
}
// }
/**
* 定时任务从redis读取到达重试时间的消息,发送到对应的topic
*/
// @Scheduled(cron="2 * * * * *")
@Scheduled(fixedDelay = 2000)
public void retryFromRedis()
{
log.warn("retryFromRedis----begin");
long currentTime = System.currentTimeMillis();
// 根据时间倒序获取
Set < ZSetOperations.TypedTuple < Object >> typedTuples = redisTemplate.opsForZSet().reverseRangeByScoreWithScores(RETRY_KEY_ZSET, 0, currentTime);
// 移除取出的消息
redisTemplate.opsForZSet().removeRangeByScore(RETRY_KEY_ZSET, 0, currentTime);
for(ZSetOperations.TypedTuple < Object > tuple: typedTuples)
{
String key = tuple.getValue().toString();
String value = redisTemplate.opsForHash().get(RETRY_VALUE_MAP, key).toString();
redisTemplate.opsForHash().delete(RETRY_VALUE_MAP, key);
RetryRecord retryRecord = JSON.parseObject(value, RetryRecord.class);
ProducerRecord record = retryRecord.parse();
ProducerRecord recordReal = new ProducerRecord(bizTopic, record.partition(), record.timestamp(), record.key(), record.value(), record.headers());
kafkaTemplate.send(recordReal);
}
// todo 发生异常将发送失败的消息重新发送到redis
}
}
- RetryRecord.java
package com.lagou.kafka.demo.entity;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class RetryRecord
{
public static final String KEY_RETRY_TIMES = "retryTimes";
private String key;
private String value;
private Integer retryTimes;
private String topic;
private Long nextTime;
public RetryRecord()
{}
public String getKey()
{
return key;
}
public void setKey(String key)
{
this.key = key;
}
public String getValue()
{
return value;
}
public void setValue(String value)
{
this.value = value;
}
public Integer getRetryTimes()
{
return retryTimes;
}
public void setRetryTimes(Integer retryTimes)
{
this.retryTimes = retryTimes;
}
public String getTopic()
{
return topic;
}
public void setTopic(String topic)
{
this.topic = topic;
}
public Long getNextTime()
{
return nextTime;
}
public void setNextTime(Long nextTime)
{
this.nextTime = nextTime;
}
public ProducerRecord parse()
{
Integer partition = null;
Long timestamp = System.currentTimeMillis();
List < Header > headers = new ArrayList < > ();
ByteBuffer retryTimesBuffer = ByteBuffer.allocate(4);
retryTimesBuffer.putInt(retryTimes);
retryTimesBuffer.flip();
headers.add(new RecordHeader(RetryRecord.KEY_RETRY_TIMES, retryTimesBuffer));
ProducerRecord sendRecord = new ProducerRecord(topic, partition, timestamp, key, value, headers);
return sendRecord;
}
}
上次更新: 2023/08/10, 16:25:16