kafka同一个消费者组重复消费问题

作者:高景洋 日期:2021-05-28 16:00:02 浏览次数:447

背景

业务中出现kafka重复消费问题,导致同一条数据重复处理多次,并推给下游业务方,增加了下游业务方的数据处理压力。


产生重复消费的原因

1、产生重复消费的原因,是kafka自身问题的可能性很少

2、通常原因为 正在运行中的程序,被kill掉,导致offset未提交成功。该情况下,程序下次拉起来时,会接着上次未提交成功的offset继续消费,导致部分数据重复消费。

3、我们的原因(坑)

     消费代码如下


for msg in consumer:

    XXXXXXXxXXXXXXX
    XXXXXXXXXXXXXXX
    if data_source_id == JobDataSourceType.KafkaCollectResult.value and period_minutes < cycle_period_minutes: # 如果是下载失败的数据且当前时间与上次下载完成时间的间隔小于任务间隔,则sleep间隔时间
    break


     a) 在消费数据的同时,因业务逻辑需要,增加了 sleep 1小时的操作

     b) kfk提交offset 的默认值为 enable_auto_commit=True, auto_commit_interval_ms = 5000



         翻译一下:默认自动推送,自动推送时间为5秒

     c) 程序会在符合一定条件的时候,break 出消费循环,但此时如果未达到自动提交5s的时间的话,则会有offset 提交不上的情况,下将程序再拉起消费时,会有部分offset 消费重复


处理方法

在break 前 ,先调用 consumer.close() 方法,手动关闭consumer 。

close() 源码如下:

def close(self, autocommit=True):
      ...




本文永久性链接:
<a href="http://r4.com.cn/art191.aspx">kafka同一个消费者组重复消费问题</a>
apple手机报价 石材厂家 石材