[fix](revert) data stream sender stop sending data to receiver if it returns eos early (#19847)" (#20040)

* Revert "[fix](sink) fix END_OF_FILE error for pipeline caused by VDataStreamSender eof (#20007)"

This reverts commit 2ec1d282c5e27b25d37baf91cacde082cca4ec31.

* [fix](revert) data stream sender stop sending data to receiver if it returns eos early (#19847)"

This reverts commit c73003359567067ea7d44e4a06c1670c9ec37902.
This commit is contained in:
TengJianPing
2023-05-25 16:50:17 +08:00
committed by GitHub
parent 694b8b6cd3
commit 3598518e59
7 changed files with 44 additions and 158 deletions

View File

@ -152,7 +152,6 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
finst_id.set_lo(fragment_instance_id.lo);
_instance_to_finst_id[low_id] = finst_id;
_instance_to_sending_by_pipeline[low_id] = true;
_instance_to_receiver_eof[low_id] = false;
}
Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
@ -182,9 +181,6 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
return Status::OK();
}
TUniqueId ins_id = request.channel->_fragment_instance_id;
if (_is_receiver_eof(ins_id.lo)) {
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
request.block_holder->ref();
{
@ -234,9 +230,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
_closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,
const PTransmitDataResult& result) {
Status s = Status(result.status());
if (s.is<ErrorCode::END_OF_FILE>()) {
_set_receiver_eof(id);
} else if (!s.ok()) {
if (!s.ok()) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
} else if (eos) {
@ -279,9 +273,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
_closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,
const PTransmitDataResult& result) {
Status s = Status(result.status());
if (s.is<ErrorCode::END_OF_FILE>()) {
_set_receiver_eof(id);
} else if (!s.ok()) {
if (!s.ok()) {
_failed(id,
fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
} else if (eos) {
@ -331,16 +323,6 @@ void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
_is_finishing = true;
_context->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, err);
_ended(id);
}
void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
}
bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
return _instance_to_receiver_eof[id];
}
};
} // namespace doris::pipeline

View File

@ -85,7 +85,6 @@ private:
phmap::flat_hash_map<InstanceLoId, PTransmitDataParams*> _instance_to_request;
phmap::flat_hash_map<InstanceLoId, PUniqueId> _instance_to_finst_id;
phmap::flat_hash_map<InstanceLoId, bool> _instance_to_sending_by_pipeline;
phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
std::atomic<bool> _is_finishing;
PUniqueId _query_id;
@ -101,8 +100,6 @@ private:
void _construct_request(InstanceLoId id);
inline void _ended(InstanceLoId id);
inline void _failed(InstanceLoId id, const std::string& err);
inline void _set_receiver_eof(InstanceLoId id);
inline bool _is_receiver_eof(InstanceLoId id);
};
} // namespace pipeline

View File

@ -286,12 +286,7 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
if (in_block->rows() > 0) {
auto st = _sink->send(state, in_block, source_state == SourceState::FINISHED);
// TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished
if (st.template is<ErrorCode::END_OF_FILE>()) {
return Status::OK();
}
return st;
return _sink->send(state, in_block, source_state == SourceState::FINISHED);
}
return Status::OK();
}

View File

@ -102,13 +102,9 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request,
// As a consequence, find_recvr() may return an innocuous NULL if a thread
// calling deregister_recvr() beat the thread calling find_recvr()
// in acquiring _lock.
//
// e.g. for broadcast join build side, only one instance will build the hash table,
// all other instances don't need build side data and will close the data stream receiver.
//
// TODO: Rethink the lifecycle of DataStreamRecvr to distinguish
// errors from receiver-initiated teardowns.
return Status::EndOfFile("data stream receiver closed");
return Status::OK();
}
// request can only be used before calling recvr's add_batch or when request

View File

@ -135,11 +135,8 @@ Status Channel::send_local_block(bool eos) {
if (eos) {
_local_recvr->remove_sender(_parent->_sender_id, _be_number);
}
return Status::OK();
} else {
_mutable_block.reset();
return receiver_status_;
}
return Status::OK();
}
Status Channel::send_local_block(Block* block) {
@ -149,10 +146,8 @@ Status Channel::send_local_block(Block* block) {
COUNTER_UPDATE(_parent->_local_sent_rows, block->rows());
COUNTER_UPDATE(_parent->_blocks_sent_counter, 1);
_local_recvr->add_block(block, _parent->_sender_id, false);
return Status::OK();
} else {
return receiver_status_;
}
return Status::OK();
}
Status Channel::send_block(PBlock* block, bool eos) {
@ -241,9 +236,7 @@ Status Channel::add_rows(Block* block, const std::vector<int>& rows) {
Status Channel::close_wait(RuntimeState* state) {
if (_need_close) {
Status st = _wait_last_brpc();
if (st.is<ErrorCode::END_OF_FILE>()) {
st = Status::OK();
} else if (!st.ok()) {
if (!st.ok()) {
state->log_error(st.to_string());
}
_need_close = false;
@ -259,33 +252,18 @@ Status Channel::close_internal() {
}
VLOG_RPC << "Channel::close() instance_id=" << _fragment_instance_id
<< " dest_node=" << _dest_node_id
<< " #rows= " << ((_mutable_block == nullptr) ? 0 : _mutable_block->rows())
<< " receiver status: " << receiver_status_;
if (receiver_status_.is<ErrorCode::END_OF_FILE>()) {
_mutable_block.reset();
return Status::OK();
}
Status status;
<< " #rows= " << ((_mutable_block == nullptr) ? 0 : _mutable_block->rows());
if (_mutable_block != nullptr && _mutable_block->rows() > 0) {
status = send_current_block(true);
RETURN_IF_ERROR(send_current_block(true));
} else {
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
status = send_block((PBlock*)nullptr, true);
RETURN_IF_ERROR(send_block((PBlock*)nullptr, true));
}
// Don't wait for the last packet to finish, left it to close_wait.
if (status.is<ErrorCode::END_OF_FILE>()) {
return Status::OK();
} else {
return status;
}
return Status::OK();
}
Status Channel::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
_closed = true;
Status st = close_internal();
if (!st.ok()) {
state->log_error(st.to_string());
@ -516,33 +494,15 @@ Status VDataStreamSender::open(RuntimeState* state) {
return Status::OK();
}
template <typename ChannelPtrType>
void VDataStreamSender::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel,
Status st) {
channel->set_receiver_eof(st);
channel->close(state);
}
Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
SCOPED_TIMER(_profile->total_time_counter());
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
break;
}
return Status::EndOfFile("all data stream channels EOF");
}
if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
// 1. serialize depends on it is not local exchange
// 2. send block
// 3. rollover block
if (_only_local_exchange) {
Status status;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
status = channel->send_local_block(block);
HANDLE_CHANNEL_STATUS(state, channel, status);
}
RETURN_IF_ERROR(channel->send_local_block(block));
}
} else if (_enable_pipeline_exec) {
BroadcastPBlockHolder* block_holder = nullptr;
@ -553,16 +513,12 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
serialize_block(block, block_holder->get_block(), _channels.size()));
}
Status status;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
status = channel->send_local_block(block);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_block(block_holder, eos);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
if (channel->is_local()) {
RETURN_IF_ERROR(channel->send_local_block(block));
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
RETURN_IF_ERROR(channel->send_block(block_holder, eos));
}
}
} else {
@ -571,16 +527,12 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
RETURN_IF_ERROR(serialize_block(block, _cur_pb_block, _channels.size()));
}
Status status;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
status = channel->send_local_block(block);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_block(_cur_pb_block, eos);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
if (channel->is_local()) {
RETURN_IF_ERROR(channel->send_local_block(block));
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
RETURN_IF_ERROR(channel->send_block(_cur_pb_block, eos));
}
}
// rollover
@ -589,18 +541,14 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
} else if (_part_type == TPartitionType::RANDOM) {
// 1. select channel
Channel* current_channel = _channels[_current_channel_idx];
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
auto status = current_channel->send_local_block(block);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
RETURN_IF_ERROR(serialize_block(block, current_channel->ch_cur_pb_block()));
auto status = current_channel->send_block(current_channel->ch_cur_pb_block(), eos);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
current_channel->ch_roll_pb_block();
}
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
RETURN_IF_ERROR(current_channel->send_local_block(block));
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
RETURN_IF_ERROR(serialize_block(block, current_channel->ch_cur_pb_block()));
RETURN_IF_ERROR(current_channel->send_block(current_channel->ch_cur_pb_block(), eos));
current_channel->ch_roll_pb_block();
}
_current_channel_idx = (_current_channel_idx + 1) % _channels.size();
} else if (_part_type == TPartitionType::HASH_PARTITIONED ||
@ -652,7 +600,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
Block::erase_useless_column(block, column_to_keep);
}
RETURN_IF_ERROR(channel_add_rows(state, _channels, element_size, hashes, rows, block));
RETURN_IF_ERROR(channel_add_rows(_channels, element_size, hashes, rows, block));
} else {
for (int j = 0; j < result_size; ++j) {
block->get_by_position(result[j]).column->update_crcs_with_value(
@ -667,8 +615,8 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Block::erase_useless_column(block, column_to_keep);
}
RETURN_IF_ERROR(channel_add_rows(state, _channel_shared_ptrs, element_size, hashes,
rows, block));
RETURN_IF_ERROR(
channel_add_rows(_channel_shared_ptrs, element_size, hashes, rows, block));
}
} else {
// Range partition

View File

@ -28,7 +28,6 @@
#include <stdint.h>
#include <atomic>
#include <cstddef>
#include <memory>
#include <ostream>
#include <string>
@ -158,11 +157,8 @@ protected:
}
template <typename Channels>
Status channel_add_rows(RuntimeState* state, Channels& channels, int num_channels,
const uint64_t* channel_ids, int rows, Block* block);
template <typename ChannelPtrType>
void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st);
Status channel_add_rows(Channels& channels, int num_channels, const uint64_t* channel_ids,
int rows, Block* block);
struct hash_128 {
uint64_t high;
@ -253,7 +249,6 @@ public:
_num_data_bytes_sent(0),
_packet_seq(0),
_need_close(false),
_closed(false),
_brpc_dest_addr(brpc_dest),
_is_transfer_chain(is_transfer_chain),
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) {
@ -329,18 +324,8 @@ public:
_local_recvr->sender_queue_empty(_parent->_sender_id);
}
bool is_receiver_eof() const { return receiver_status_.is<ErrorCode::END_OF_FILE>(); }
void set_receiver_eof(Status st) { receiver_status_ = st; }
protected:
bool _recvr_is_valid() {
if (_local_recvr && !_local_recvr->is_closed()) {
return true;
}
receiver_status_ = Status::EndOfFile("local data stream receiver closed");
return false;
}
bool _recvr_is_valid() { return _local_recvr && !_local_recvr->is_closed(); }
Status _wait_last_brpc() {
SCOPED_TIMER(_parent->_brpc_wait_timer);
@ -350,7 +335,6 @@ protected:
auto cntl = &_closure->cntl;
auto call_id = _closure->cntl.call_id();
brpc::Join(call_id);
receiver_status_ = _closure->result.status();
if (cntl->Failed()) {
std::string err = fmt::format(
"failed to send brpc batch, error={}, error_text={}, client: {}, "
@ -360,7 +344,7 @@ protected:
LOG(WARNING) << err;
return Status::RpcError(err);
}
return receiver_status_;
return Status::OK();
}
// Serialize _batch into _thrift_batch and send via send_batch().
@ -382,7 +366,6 @@ protected:
std::unique_ptr<MutableBlock> _mutable_block;
bool _need_close;
bool _closed;
int _be_number;
TNetworkAddress _brpc_dest_addr;
@ -393,7 +376,6 @@ protected:
PTransmitDataParams _brpc_request;
std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
RefCountClosure<PTransmitDataResult>* _closure = nullptr;
Status receiver_status_;
int32_t _brpc_timeout_ms = 500;
// whether the dest can be treated as query statistics transfer chain.
bool _is_transfer_chain;
@ -411,30 +393,19 @@ protected:
PBlock _ch_pb_block2;
};
#define HANDLE_CHANNEL_STATUS(state, channel, status) \
do { \
if (status.is<ErrorCode::END_OF_FILE>()) { \
_handle_eof_channel(state, channel, status); \
} else { \
RETURN_IF_ERROR(status); \
} \
} while (0)
template <typename Channels>
Status VDataStreamSender::channel_add_rows(RuntimeState* state, Channels& channels,
int num_channels, const uint64_t* __restrict channel_ids,
int rows, Block* block) {
Status VDataStreamSender::channel_add_rows(Channels& channels, int num_channels,
const uint64_t* __restrict channel_ids, int rows,
Block* block) {
std::vector<int> channel2rows[num_channels];
for (int i = 0; i < rows; i++) {
channel2rows[channel_ids[i]].emplace_back(i);
}
Status status;
for (int i = 0; i < num_channels; ++i) {
if (!channels[i]->is_receiver_eof() && !channel2rows[i].empty()) {
status = channels[i]->add_rows(block, channel2rows[i]);
HANDLE_CHANNEL_STATUS(state, channels[i], status);
if (!channel2rows[i].empty()) {
RETURN_IF_ERROR(channels[i]->add_rows(block, channel2rows[i]));
}
}

View File

@ -178,10 +178,7 @@ Status VResultFileSink::close(RuntimeState* state, Status exec_status) {
state->fragment_instance_id());
} else {
if (final_status.ok()) {
auto st = _stream_sender->send(state, _output_block.get(), true);
if (!st.template is<ErrorCode::END_OF_FILE>()) {
RETURN_IF_ERROR(st);
}
RETURN_IF_ERROR(_stream_sender->send(state, _output_block.get(), true));
}
RETURN_IF_ERROR(_stream_sender->close(state, final_status));
_output_block->clear();