Cherry-picked from #44913 Co-authored-by: hui lai <laihui@selectdb.com> Co-authored-by: Dongyang Li <lidongyang@selectdb.com>
This commit is contained in:
committed by
GitHub
parent
0bcb99710b
commit
a8b5125a25
111
thirdparty/patches/librdkafka-1.9.2.patch
vendored
111
thirdparty/patches/librdkafka-1.9.2.patch
vendored
@ -67,7 +67,19 @@
|
||||
|
||||
--- src/rdkafka_broker.c
|
||||
+++ src/rdkafka_broker.c
|
||||
@@ -5461,7 +5461,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
|
||||
@@ -3288,6 +3288,11 @@ rd_kafka_broker_op_serve(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko) {
|
||||
: (topic_err
|
||||
? topic_err
|
||||
: RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION));
|
||||
+
|
||||
+ if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER) {
|
||||
+ rd_kafka_toppar_purge_internal_fetch_queue_maybe(
|
||||
+ rktp);
|
||||
+ }
|
||||
}
|
||||
|
||||
rd_kafka_toppar_unlock(rktp);
|
||||
@@ -5461,7 +5466,9 @@ static int rd_kafka_broker_thread_main(void *arg) {
|
||||
*/
|
||||
void rd_kafka_broker_destroy_final(rd_kafka_broker_t *rkb) {
|
||||
|
||||
@ -78,3 +90,100 @@
|
||||
rd_assert(TAILQ_EMPTY(&rkb->rkb_monitors));
|
||||
rd_assert(TAILQ_EMPTY(&rkb->rkb_outbufs.rkbq_bufs));
|
||||
rd_assert(TAILQ_EMPTY(&rkb->rkb_waitresps.rkbq_bufs));
|
||||
--- src/rdkafka_cgrp.c
|
||||
+++ src/rdkafka_cgrp.c
|
||||
@@ -2734,6 +2734,9 @@ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg,
|
||||
rd_kafka_toppar_lock(rktp);
|
||||
rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP);
|
||||
rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP;
|
||||
+
|
||||
+ rd_kafka_toppar_purge_internal_fetch_queue_maybe(rktp);
|
||||
+
|
||||
rd_kafka_toppar_unlock(rktp);
|
||||
|
||||
rd_list_remove(&rkcg->rkcg_toppars, rktp);
|
||||
--- src/rdkafka_partition.c
|
||||
+++ src/rdkafka_partition.c
|
||||
@@ -959,7 +959,71 @@ void rd_kafka_toppar_insert_msgq(rd_kafka_toppar_t *rktp,
|
||||
rd_kafka_toppar_unlock(rktp);
|
||||
}
|
||||
|
||||
+/**
|
||||
+ * @brief Purge internal fetch queue if toppar is stopped
|
||||
+ * (RD_KAFKA_TOPPAR_FETCH_STOPPED) and removed from the cluster
|
||||
+ * (RD_KAFKA_TOPPAR_F_REMOVE). Will be called from different places as it's
|
||||
+ * removed starting from a metadata response and stopped from a rebalance or a
|
||||
+ * consumer close.
|
||||
+ *
|
||||
+ * @remark Avoids circular dependencies in from `rktp_fetchq` ops to the same
|
||||
+ * toppar that stop destroying a consumer.
|
||||
+ *
|
||||
+ * @locks rd_kafka_toppar_lock() MUST be held
|
||||
+ */
|
||||
+void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t *rktp) {
|
||||
+ rd_kafka_q_t *rkq;
|
||||
+ rkq = rktp->rktp_fetchq;
|
||||
+ mtx_lock(&rkq->rkq_lock);
|
||||
+ if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE &&
|
||||
+ !rktp->rktp_fetchq->rkq_fwdq) {
|
||||
+ rd_kafka_op_t *rko;
|
||||
+ int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0;
|
||||
+
|
||||
+ /* Partition is being removed from the cluster and it's stopped,
|
||||
+ * so rktp->rktp_fetchq->rkq_fwdq is NULL.
|
||||
+ * Purge remaining operations in rktp->rktp_fetchq->rkq_q,
|
||||
+ * while holding lock, to avoid circular references */
|
||||
+ rko = TAILQ_FIRST(&rkq->rkq_q);
|
||||
+ while (rko) {
|
||||
+ if (rko->rko_type != RD_KAFKA_OP_BARRIER &&
|
||||
+ rko->rko_type != RD_KAFKA_OP_FETCH) {
|
||||
+ rd_kafka_log(
|
||||
+ rktp->rktp_rkt->rkt_rk, LOG_WARNING,
|
||||
+ "PARTDEL",
|
||||
+ "Purging toppar fetch queue buffer op"
|
||||
+ "with unexpected type: %s",
|
||||
+ rd_kafka_op2str(rko->rko_type));
|
||||
+ }
|
||||
+
|
||||
+ if (rko->rko_type == RD_KAFKA_OP_BARRIER)
|
||||
+ barrier_cnt++;
|
||||
+ else if (rko->rko_type == RD_KAFKA_OP_FETCH)
|
||||
+ message_cnt++;
|
||||
+ else
|
||||
+ other_cnt++;
|
||||
|
||||
+ rko = TAILQ_NEXT(rko, rko_link);
|
||||
+ cnt++;
|
||||
+ }
|
||||
+
|
||||
+ if (cnt) {
|
||||
+ rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL",
|
||||
+ "Purge toppar fetch queue buffer "
|
||||
+ "containing %d op(s) "
|
||||
+ "(%d barrier(s), %d message(s), %d other)"
|
||||
+ " to avoid "
|
||||
+ "circular references",
|
||||
+ cnt, barrier_cnt, message_cnt, other_cnt);
|
||||
+ rd_kafka_q_purge0(rktp->rktp_fetchq, rd_false);
|
||||
+ } else {
|
||||
+ rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, CGRP, "PARTDEL",
|
||||
+ "Not purging toppar fetch queue buffer."
|
||||
+ " No ops present in the buffer.");
|
||||
+ }
|
||||
+ }
|
||||
+ mtx_unlock(&rkq->rkq_lock);
|
||||
+}
|
||||
|
||||
/**
|
||||
* Helper method for purging queues when removing a toppar.
|
||||
--- src/rdkafka_partition.h
|
||||
+++ src/rdkafka_partition.h
|
||||
@@ -541,6 +541,8 @@ void rd_kafka_toppar_offset_request(rd_kafka_toppar_t *rktp,
|
||||
int64_t query_offset,
|
||||
int backoff_ms);
|
||||
|
||||
+void rd_kafka_toppar_purge_internal_fetch_queue_maybe(rd_kafka_toppar_t *rktp);
|
||||
+
|
||||
int rd_kafka_toppar_purge_queues(rd_kafka_toppar_t *rktp,
|
||||
int purge_flags,
|
||||
rd_bool_t include_xmit_msgq);
|
||||
|
||||
Reference in New Issue
Block a user