导航菜单
路很长,又很短
博主信息
昵   称:Cocodroid ->关于我
Q     Q:2531075716
博文数:356
阅读量:1663808
访问量:207489
至今:
×
云标签 标签球>>
云标签 - Su的技术博客
Tags : kafka,解决方案发表时间: 2021-10-27 13:57:44


kafka shutdown停止很慢问题


在数据量大的时候,consumer一次抓取数据的数据很多,进入到业务处理的数据可能有很多,

假设一次poll有1万条数据进入业务程序,而且业务程序是和poll绑定在一起线程同步执行的,假设平均每条数据,执行业务程序花费100ms,

那么poll一次的数据,至少要执行 1w*0.1s = 1000s = 16.67分钟。

所以,在数据量大的时候,停止一个线程(需要先等待业务程序处理完数据),可能要十几分钟。

 

shutdown问题解决方案

1、改成异步处理数据,consumer取出来的数据,放到BlockQueue中,由异步线程去处理,当异步线程处理不过来时,阻塞consumer,调用consumer.pause()方法avoid group management rebalance,代码如下(来源于Spring-Kafka):

1

2

3

4

5

6

this.consumer.pause(this.assignedPartitions.toArray(new TopicPartition[this.assignedPartitions.size()]));

public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

    this.assignedPartitions = partitions;

}


2、如果是同步执行数据处理,考虑提高业务程序 处理数据的速度。


3、同步处理数据,但是改成手动提交offset,当shutdown的时候,poll的数据不需要全部处理,只需要记录处理的位置即可。代码示例如下:

1

2

3

4

5

6

7

8

9

list data = consumer.poll();

for(record: data) {

    if(shutdown) { 

        break;

    }

   deal(record);

   saveTopicOffset(record);

}

submitDealtDataOffset();



另外,

Kafka停不掉shutdown关闭不了问题

原因是卡在了consumer.close()方法里面,它会提交offset信息,如果网络中断或者kafka服务器有问题导致提交不了offset,则consumer.close方法会一直卡住(不停的循环尝试提交offset,永不中断)。


参见:Kafka poll一直等待的bug:

https://issues.apache.org/jira/browse/KAFKA-4189?jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20consumer%20ORDER%20BY%20priority%20DESC


https://issues.apache.org/jira/browse/KAFKA-3172?jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20consumer%20ORDER%20BY%20priority%20DESC



解决方法:目前还没有好的办法,只能将offset的自动提交改成手动提交offset。但是,我写了一个程序可以在调用consumer.close后将线程强行杀死,作为临时解决方案。



...阅读原文
上一篇 : Mesos 架构
推荐文章