From bf73ab69f274cc156ecc8208293c6ac07eb17971 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Thu, 31 Mar 2022 11:13:05 +0800 Subject: [PATCH] [Bug] Fix DCHECK failed in runtime filter and mutable block (#8720) Co-authored-by: lihaopeng --- be/src/exprs/runtime_filter.cpp | 3 ++- be/src/vec/sink/vdata_stream_sender.cpp | 13 ++++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 7226eaa4d4..821b940e4a 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -452,7 +452,8 @@ public: DCHECK(state != nullptr); DCHECK(container != nullptr); DCHECK(_pool != nullptr); - DCHECK(prob_expr->root()->type().type == _column_return_type); + DCHECK(prob_expr->root()->type().type == _column_return_type || + (is_string_type(prob_expr->root()->type().type) && is_string_type(_column_return_type))); auto real_filter_type = get_real_type(); switch (real_filter_type) { diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index d84ba2541d..da06fe7a45 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -425,9 +425,9 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { } _current_channel_idx = (_current_channel_idx + 1) % _channels.size(); } else if (_part_type == TPartitionType::HASH_PARTITIONED) { - int num_channels = _channels.size(); // will only copy schema // we don't want send temp columns + auto column_to_keep = block->columns(); int result_size = _partition_expr_ctxs.size(); int result[result_size]; @@ -451,12 +451,14 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { hash_vals[i] = siphashs[i].get64(); } - RETURN_IF_ERROR(channel_add_rows(_channels, num_channels, hash_vals, rows, block)); + Block::erase_useless_column(block, column_to_keep); + RETURN_IF_ERROR(channel_add_rows(_channels, _channels.size(), hash_vals, rows, block)); } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { + // will only copy schema + // we don't want send temp columns + auto column_to_keep = block->columns(); // 1. calculate hash // 2. dispatch rows to channel - int num_channels = _channel_shared_ptrs.size(); - int result_size = _partition_expr_ctxs.size(); int result[result_size]; RETURN_IF_ERROR(get_partition_column_result(block, result)); @@ -484,8 +486,9 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) { } } + Block::erase_useless_column(block, column_to_keep); RETURN_IF_ERROR( - channel_add_rows(_channel_shared_ptrs, num_channels, hash_vals, rows, block)); + channel_add_rows(_channel_shared_ptrs, _channel_shared_ptrs.size(), hash_vals, rows, block)); } else { // Range partition // 1. calculate range