[fix](datastream sender) fix wrong result of broadcast join; fix wrong result of pipeline (#22942)
Fix bug of #22765 Close #22924
This commit is contained in:
@ -95,6 +95,8 @@ Status Channel::init(RuntimeState* state) {
|
||||
_fragment_instance_id, _dest_node_id);
|
||||
}
|
||||
|
||||
_serializer.set_is_local(_is_local);
|
||||
|
||||
// In bucket shuffle join will set fragment_instance_id (-1, -1)
|
||||
// to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
|
||||
// so the empty channel not need call function close_internal()
|
||||
@ -526,7 +528,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
|
||||
for (auto channel : _channels) {
|
||||
if (!channel->is_receiver_eof()) {
|
||||
if (channel->is_local()) {
|
||||
status = channel->send_local_block(block);
|
||||
status = channel->send_local_block(&cur_block);
|
||||
} else {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
status = channel->send_block(block_holder, eos);
|
||||
@ -544,11 +546,15 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
|
||||
RETURN_IF_ERROR(_serializer.next_serialized_block(
|
||||
block, _cur_pb_block, _channels.size(), &serialized, false));
|
||||
if (serialized) {
|
||||
auto cur_block = _serializer.get_block()->to_block();
|
||||
if (!cur_block.empty()) {
|
||||
_serializer.serialize_block(&cur_block, _cur_pb_block, _channels.size());
|
||||
}
|
||||
Status status;
|
||||
for (auto channel : _channels) {
|
||||
if (!channel->is_receiver_eof()) {
|
||||
if (channel->is_local()) {
|
||||
status = channel->send_local_block(block);
|
||||
status = channel->send_local_block(&cur_block);
|
||||
} else {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
status = channel->send_block(_cur_pb_block, false);
|
||||
@ -556,6 +562,8 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
|
||||
HANDLE_CHANNEL_STATUS(state, channel, status);
|
||||
}
|
||||
}
|
||||
cur_block.clear_column_data();
|
||||
_serializer.get_block()->set_muatable_columns(cur_block.mutate_columns());
|
||||
_roll_pb_block();
|
||||
}
|
||||
}
|
||||
@ -757,7 +765,7 @@ Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BlockSerializer::serialize_block(Block* src, PBlock* dest, int num_receivers) {
|
||||
Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, int num_receivers) {
|
||||
{
|
||||
SCOPED_TIMER(_parent->_serialize_batch_timer);
|
||||
dest->Clear();
|
||||
|
||||
@ -75,12 +75,14 @@ public:
|
||||
Status next_serialized_block(Block* src, PBlock* dest, int num_receivers, bool* serialized,
|
||||
bool eos, const std::vector<int>* rows = nullptr);
|
||||
Status serialize_block(PBlock* dest, int num_receivers = 1);
|
||||
Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1);
|
||||
Status serialize_block(const Block* src, PBlock* dest, int num_receivers = 1);
|
||||
|
||||
MutableBlock* get_block() const { return _mutable_block.get(); }
|
||||
|
||||
void reset_block() { _mutable_block.reset(); }
|
||||
|
||||
void set_is_local(bool is_local) { _is_local = is_local; }
|
||||
|
||||
private:
|
||||
VDataStreamSender* _parent;
|
||||
std::unique_ptr<MutableBlock> _mutable_block;
|
||||
@ -501,10 +503,6 @@ public:
|
||||
return send_local_block(eos);
|
||||
}
|
||||
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
|
||||
if (eos) {
|
||||
_pblock = std::make_unique<PBlock>();
|
||||
RETURN_IF_ERROR(_serializer.serialize_block(_pblock.get(), 1));
|
||||
}
|
||||
RETURN_IF_ERROR(send_block(_pblock.release(), eos));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user