[Bug] Fix DCHECK failed in runtime filter and mutable block (#8720)
Co-authored-by: lihaopeng <lihaopeng@baidu.com>
This commit is contained in:
@ -452,7 +452,8 @@ public:
|
||||
DCHECK(state != nullptr);
|
||||
DCHECK(container != nullptr);
|
||||
DCHECK(_pool != nullptr);
|
||||
DCHECK(prob_expr->root()->type().type == _column_return_type);
|
||||
DCHECK(prob_expr->root()->type().type == _column_return_type ||
|
||||
(is_string_type(prob_expr->root()->type().type) && is_string_type(_column_return_type)));
|
||||
|
||||
auto real_filter_type = get_real_type();
|
||||
switch (real_filter_type) {
|
||||
|
||||
@ -425,9 +425,9 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) {
|
||||
}
|
||||
_current_channel_idx = (_current_channel_idx + 1) % _channels.size();
|
||||
} else if (_part_type == TPartitionType::HASH_PARTITIONED) {
|
||||
int num_channels = _channels.size();
|
||||
// will only copy schema
|
||||
// we don't want send temp columns
|
||||
auto column_to_keep = block->columns();
|
||||
|
||||
int result_size = _partition_expr_ctxs.size();
|
||||
int result[result_size];
|
||||
@ -451,12 +451,14 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) {
|
||||
hash_vals[i] = siphashs[i].get64();
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(channel_add_rows(_channels, num_channels, hash_vals, rows, block));
|
||||
Block::erase_useless_column(block, column_to_keep);
|
||||
RETURN_IF_ERROR(channel_add_rows(_channels, _channels.size(), hash_vals, rows, block));
|
||||
} else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
|
||||
// will only copy schema
|
||||
// we don't want send temp columns
|
||||
auto column_to_keep = block->columns();
|
||||
// 1. calculate hash
|
||||
// 2. dispatch rows to channel
|
||||
int num_channels = _channel_shared_ptrs.size();
|
||||
|
||||
int result_size = _partition_expr_ctxs.size();
|
||||
int result[result_size];
|
||||
RETURN_IF_ERROR(get_partition_column_result(block, result));
|
||||
@ -484,8 +486,9 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block) {
|
||||
}
|
||||
}
|
||||
|
||||
Block::erase_useless_column(block, column_to_keep);
|
||||
RETURN_IF_ERROR(
|
||||
channel_add_rows(_channel_shared_ptrs, num_channels, hash_vals, rows, block));
|
||||
channel_add_rows(_channel_shared_ptrs, _channel_shared_ptrs.size(), hash_vals, rows, block));
|
||||
} else {
|
||||
// Range partition
|
||||
// 1. calculate range
|
||||
|
||||
Reference in New Issue
Block a user