[Load Parallel][3/3] Support parallel delta writer (#5369)
In the previous broker load, multiple OlapTableSinks would send data to the same LoadChannel, and because of the lock granularity problem, LoadChannel could only process these requests serially, which made it impossible to make full use of cluster resources. This CL modifies the related locks so that LoadChannel can process these requests in parallel. In the test, with a size of 20G, the load speed of 334 million rows of data in 3 nodes has been increased from 9min to 5min, and after enabling 2 concurrency, it can be increased to 3min. Also modify the profile of load job.
This commit is contained in:
@ -562,12 +562,18 @@ Status OlapTableSink::prepare(RuntimeState* state) {
|
||||
_output_rows_counter = ADD_COUNTER(_profile, "RowsReturned", TUnit::UNIT);
|
||||
_filtered_rows_counter = ADD_COUNTER(_profile, "RowsFiltered", TUnit::UNIT);
|
||||
_send_data_timer = ADD_TIMER(_profile, "SendDataTime");
|
||||
_wait_mem_limit_timer = ADD_CHILD_TIMER(_profile, "WaitMemLimitTime", "SendDataTime");
|
||||
_convert_batch_timer = ADD_TIMER(_profile, "ConvertBatchTime");
|
||||
_validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime");
|
||||
_open_timer = ADD_TIMER(_profile, "OpenTime");
|
||||
_close_timer = ADD_TIMER(_profile, "CloseWaitTime");
|
||||
_non_blocking_send_timer = ADD_TIMER(_profile, "NonBlockingSendTime");
|
||||
_serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime");
|
||||
_non_blocking_send_work_timer = ADD_CHILD_TIMER(_profile, "NonBlockingSendWorkTime", "NonBlockingSendTime");
|
||||
_serialize_batch_timer = ADD_CHILD_TIMER(_profile, "SerializeBatchTime", "NonBlockingSendWorkTime");
|
||||
_total_add_batch_exec_timer = ADD_TIMER(_profile, "TotalAddBatchExecTime");
|
||||
_max_add_batch_exec_timer = ADD_TIMER(_profile, "MaxAddBatchExecTime");
|
||||
_add_batch_number = ADD_COUNTER(_profile, "NumberBatchAdded", TUnit::UNIT);
|
||||
_num_node_channels = ADD_COUNTER(_profile, "NumberNodeChannels", TUnit::UNIT);
|
||||
_load_mem_limit = state->get_load_mem_limit();
|
||||
|
||||
// open all channels
|
||||
@ -697,18 +703,23 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
|
||||
// BE id -> add_batch method counter
|
||||
std::unordered_map<int64_t, AddBatchCounter> node_add_batch_counter_map;
|
||||
int64_t serialize_batch_ns = 0, mem_exceeded_block_ns = 0, queue_push_lock_ns = 0,
|
||||
actual_consume_ns = 0;
|
||||
actual_consume_ns = 0, total_add_batch_exec_time_ns = 0,
|
||||
max_add_batch_exec_time_ns = 0,
|
||||
total_add_batch_num = 0, num_node_channels = 0;
|
||||
{
|
||||
SCOPED_TIMER(_close_timer);
|
||||
for (auto index_channel : _channels) {
|
||||
index_channel->for_each_node_channel([](NodeChannel* ch) { ch->mark_close(); });
|
||||
num_node_channels += index_channel->num_node_channels();
|
||||
}
|
||||
|
||||
for (auto index_channel : _channels) {
|
||||
int64_t add_batch_exec_time = 0;
|
||||
index_channel->for_each_node_channel([&status, &state, &node_add_batch_counter_map,
|
||||
&serialize_batch_ns, &mem_exceeded_block_ns,
|
||||
&queue_push_lock_ns,
|
||||
&actual_consume_ns](NodeChannel* ch) {
|
||||
&queue_push_lock_ns, &actual_consume_ns,
|
||||
&total_add_batch_exec_time_ns, &add_batch_exec_time,
|
||||
&total_add_batch_num](NodeChannel* ch) {
|
||||
auto s = ch->close_wait(state);
|
||||
if (!s.ok()) {
|
||||
// 'status' will store the last non-ok status of all channels
|
||||
@ -719,8 +730,13 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
|
||||
}
|
||||
ch->time_report(&node_add_batch_counter_map, &serialize_batch_ns,
|
||||
&mem_exceeded_block_ns, &queue_push_lock_ns,
|
||||
&actual_consume_ns);
|
||||
&actual_consume_ns, &total_add_batch_exec_time_ns,
|
||||
&add_batch_exec_time, &total_add_batch_num);
|
||||
});
|
||||
|
||||
if (add_batch_exec_time > max_add_batch_exec_time_ns) {
|
||||
max_add_batch_exec_time_ns = add_batch_exec_time;
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO need to be improved
|
||||
@ -732,9 +748,15 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
|
||||
COUNTER_SET(_output_rows_counter, _number_output_rows);
|
||||
COUNTER_SET(_filtered_rows_counter, _number_filtered_rows);
|
||||
COUNTER_SET(_send_data_timer, _send_data_ns);
|
||||
COUNTER_SET(_wait_mem_limit_timer, mem_exceeded_block_ns);
|
||||
COUNTER_SET(_convert_batch_timer, _convert_batch_ns);
|
||||
COUNTER_SET(_validate_data_timer, _validate_data_ns);
|
||||
COUNTER_SET(_serialize_batch_timer, serialize_batch_ns);
|
||||
COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns);
|
||||
COUNTER_SET(_total_add_batch_exec_timer, total_add_batch_exec_time_ns);
|
||||
COUNTER_SET(_max_add_batch_exec_timer, max_add_batch_exec_time_ns);
|
||||
COUNTER_SET(_add_batch_number, total_add_batch_num);
|
||||
COUNTER_SET(_num_node_channels, num_node_channels);
|
||||
// _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
|
||||
int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() +
|
||||
state->num_rows_load_unselected();
|
||||
@ -744,11 +766,10 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
|
||||
// print log of add batch time of all node, for tracing load performance easily
|
||||
std::stringstream ss;
|
||||
ss << "finished to close olap table sink. load_id=" << print_id(_load_id)
|
||||
<< ", txn_id=" << _txn_id << ", node add batch time(ms)/wait lock time(ms)/num: ";
|
||||
<< ", txn_id=" << _txn_id << ", node add batch time(ms)/num: ";
|
||||
for (auto const& pair : node_add_batch_counter_map) {
|
||||
ss << "{" << pair.first << ":(" << (pair.second.add_batch_execution_time_us / 1000)
|
||||
<< ")(" << (pair.second.add_batch_wait_lock_time_us / 1000) << ")("
|
||||
<< pair.second.add_batch_num << ")} ";
|
||||
<< ")(" << pair.second.add_batch_num << ")} ";
|
||||
}
|
||||
LOG(INFO) << ss.str();
|
||||
} else {
|
||||
|
||||
@ -80,7 +80,8 @@ struct AddBatchCounter {
|
||||
template <typename T>
|
||||
class ReusableClosure : public google::protobuf::Closure {
|
||||
public:
|
||||
ReusableClosure() : cid(INVALID_BTHREAD_ID) {}
|
||||
ReusableClosure() : cid(INVALID_BTHREAD_ID) {
|
||||
}
|
||||
~ReusableClosure() {
|
||||
// shouldn't delete when Run() is calling or going to be called, wait for current Run() done.
|
||||
join();
|
||||
@ -173,12 +174,17 @@ public:
|
||||
|
||||
void time_report(std::unordered_map<int64_t, AddBatchCounter>* add_batch_counter_map,
|
||||
int64_t* serialize_batch_ns, int64_t* mem_exceeded_block_ns,
|
||||
int64_t* queue_push_lock_ns, int64_t* actual_consume_ns) {
|
||||
int64_t* queue_push_lock_ns, int64_t* actual_consume_ns,
|
||||
int64_t* total_add_batch_exec_time_ns, int64_t* add_batch_exec_time_ns,
|
||||
int64_t* total_add_batch_num) {
|
||||
(*add_batch_counter_map)[_node_id] += _add_batch_counter;
|
||||
*serialize_batch_ns += _serialize_batch_ns;
|
||||
*mem_exceeded_block_ns += _mem_exceeded_block_ns;
|
||||
*queue_push_lock_ns += _queue_push_lock_ns;
|
||||
*actual_consume_ns += _actual_consume_ns;
|
||||
*add_batch_exec_time_ns = (_add_batch_counter.add_batch_execution_time_us * 1000);
|
||||
*total_add_batch_exec_time_ns += *add_batch_exec_time_ns;
|
||||
*total_add_batch_num += _add_batch_counter.add_batch_num;
|
||||
}
|
||||
|
||||
int64_t node_id() const { return _node_id; }
|
||||
@ -237,10 +243,10 @@ private:
|
||||
std::vector<TTabletCommitInfo> _tablet_commit_infos;
|
||||
|
||||
AddBatchCounter _add_batch_counter;
|
||||
std::atomic<int64_t> _serialize_batch_ns;
|
||||
std::atomic<int64_t> _mem_exceeded_block_ns;
|
||||
std::atomic<int64_t> _queue_push_lock_ns;
|
||||
std::atomic<int64_t> _actual_consume_ns;
|
||||
std::atomic<int64_t> _serialize_batch_ns{0};
|
||||
std::atomic<int64_t> _mem_exceeded_block_ns{0};
|
||||
std::atomic<int64_t> _queue_push_lock_ns{0};
|
||||
std::atomic<int64_t> _actual_consume_ns{0};
|
||||
};
|
||||
|
||||
class IndexChannel {
|
||||
@ -262,6 +268,8 @@ public:
|
||||
void mark_as_failed(const NodeChannel* ch) { _failed_channels.insert(ch->node_id()); }
|
||||
bool has_intolerable_failure();
|
||||
|
||||
size_t num_node_channels() const { return _node_channels.size(); }
|
||||
|
||||
private:
|
||||
OlapTableSink* _parent;
|
||||
int64_t _index_id;
|
||||
@ -382,12 +390,18 @@ private:
|
||||
RuntimeProfile::Counter* _output_rows_counter = nullptr;
|
||||
RuntimeProfile::Counter* _filtered_rows_counter = nullptr;
|
||||
RuntimeProfile::Counter* _send_data_timer = nullptr;
|
||||
RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr;
|
||||
RuntimeProfile::Counter* _convert_batch_timer = nullptr;
|
||||
RuntimeProfile::Counter* _validate_data_timer = nullptr;
|
||||
RuntimeProfile::Counter* _open_timer = nullptr;
|
||||
RuntimeProfile::Counter* _close_timer = nullptr;
|
||||
RuntimeProfile::Counter* _non_blocking_send_timer = nullptr;
|
||||
RuntimeProfile::Counter* _non_blocking_send_work_timer = nullptr;
|
||||
RuntimeProfile::Counter* _serialize_batch_timer = nullptr;
|
||||
RuntimeProfile::Counter* _total_add_batch_exec_timer = nullptr;
|
||||
RuntimeProfile::Counter* _max_add_batch_exec_timer = nullptr;
|
||||
RuntimeProfile::Counter* _add_batch_number = nullptr;
|
||||
RuntimeProfile::Counter* _num_node_channels = nullptr;
|
||||
|
||||
// load mem limit is for remote load channel
|
||||
int64_t _load_mem_limit = -1;
|
||||
|
||||
@ -176,10 +176,17 @@ OLAPStatus DeltaWriter::init() {
|
||||
}
|
||||
|
||||
OLAPStatus DeltaWriter::write(Tuple* tuple) {
|
||||
if (!_is_init) {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
if (!_is_init && !_is_cancelled) {
|
||||
RETURN_NOT_OK(init());
|
||||
}
|
||||
|
||||
if (_is_cancelled) {
|
||||
// The writer may be cancelled at any time by other thread.
|
||||
// just return ERROR if writer is cancelled.
|
||||
return OLAP_ERR_ALREADY_CANCELLED;
|
||||
}
|
||||
|
||||
_mem_table->insert(tuple);
|
||||
|
||||
// if memtable is full, push it to the flush executor,
|
||||
@ -196,7 +203,20 @@ OLAPStatus DeltaWriter::_flush_memtable_async() {
|
||||
return _flush_token->submit(_mem_table);
|
||||
}
|
||||
|
||||
OLAPStatus DeltaWriter::flush_memtable_and_wait() {
|
||||
OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
if (!_is_init) {
|
||||
// This writer is not initialized before flushing. Do nothing
|
||||
// But we return OLAP_SUCCESS instead of OLAP_ERR_ALREADY_CANCELLED,
|
||||
// Because this method maybe called when trying to reduce mem consumption,
|
||||
// and at that time, the writer may not be initialized yet and that is a normal case.
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
if (_is_cancelled) {
|
||||
return OLAP_ERR_ALREADY_CANCELLED;
|
||||
}
|
||||
|
||||
if (mem_consumption() == _mem_table->memory_usage()) {
|
||||
// equal means there is no memtable in flush queue, just flush this memtable
|
||||
VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
|
||||
@ -208,7 +228,24 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait() {
|
||||
DCHECK(mem_consumption() > _mem_table->memory_usage());
|
||||
// this means there should be at least one memtable in flush queue.
|
||||
}
|
||||
// wait all memtables in flush queue to be flushed.
|
||||
|
||||
if (need_wait) {
|
||||
// wait all memtables in flush queue to be flushed.
|
||||
RETURN_NOT_OK(_flush_token->wait());
|
||||
}
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus DeltaWriter::wait_flush() {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
if (!_is_init) {
|
||||
// return OLAP_SUCCESS instead of OLAP_ERR_ALREADY_CANCELLED for same reason
|
||||
// as described in flush_memtable_and_wait()
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
if (_is_cancelled) {
|
||||
return OLAP_ERR_ALREADY_CANCELLED;
|
||||
}
|
||||
RETURN_NOT_OK(_flush_token->wait());
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
@ -220,7 +257,8 @@ void DeltaWriter::_reset_mem_table() {
|
||||
}
|
||||
|
||||
OLAPStatus DeltaWriter::close() {
|
||||
if (!_is_init) {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
if (!_is_init && !_is_cancelled) {
|
||||
// if this delta writer is not initialized, but close() is called.
|
||||
// which means this tablet has no data loaded, but at least one tablet
|
||||
// in same partition has data loaded.
|
||||
@ -229,14 +267,24 @@ OLAPStatus DeltaWriter::close() {
|
||||
RETURN_NOT_OK(init());
|
||||
}
|
||||
|
||||
if (_is_cancelled) {
|
||||
return OLAP_ERR_ALREADY_CANCELLED;
|
||||
}
|
||||
|
||||
RETURN_NOT_OK(_flush_memtable_async());
|
||||
_mem_table.reset();
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
DCHECK(_is_init)
|
||||
<< "delta writer is supposed be to initialized before close_wait() being called";
|
||||
|
||||
if (_is_cancelled) {
|
||||
return OLAP_ERR_ALREADY_CANCELLED;
|
||||
}
|
||||
|
||||
// return error if previous flush failed
|
||||
RETURN_NOT_OK(_flush_token->wait());
|
||||
DCHECK_EQ(_mem_tracker->consumption(), 0);
|
||||
@ -295,7 +343,8 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
|
||||
}
|
||||
|
||||
OLAPStatus DeltaWriter::cancel() {
|
||||
if (!_is_init) {
|
||||
std::lock_guard<SpinLock> l(_lock);
|
||||
if (!_is_init || _is_cancelled) {
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
_mem_table.reset();
|
||||
@ -304,6 +353,7 @@ OLAPStatus DeltaWriter::cancel() {
|
||||
_flush_token->cancel();
|
||||
}
|
||||
DCHECK_EQ(_mem_tracker->consumption(), 0);
|
||||
_is_cancelled = true;
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
@ -21,6 +21,7 @@
|
||||
#include "gen_cpp/internal_service.pb.h"
|
||||
#include "olap/rowset/rowset_writer.h"
|
||||
#include "olap/tablet.h"
|
||||
#include "util/spinlock.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -73,12 +74,17 @@ public:
|
||||
// submit current memtable to flush queue, and wait all memtables in flush queue
|
||||
// to be flushed.
|
||||
// This is currently for reducing mem consumption of this delta writer.
|
||||
OLAPStatus flush_memtable_and_wait();
|
||||
// If need_wait is true, it will wait for all memtable in flush queue to be flushed.
|
||||
// Otherwise, it will just put memtables to the flush queue and return.
|
||||
OLAPStatus flush_memtable_and_wait(bool need_wait);
|
||||
|
||||
int64_t partition_id() const;
|
||||
|
||||
int64_t mem_consumption() const;
|
||||
|
||||
// Wait all memtable in flush queue to be flushed
|
||||
OLAPStatus wait_flush();
|
||||
|
||||
private:
|
||||
DeltaWriter(WriteRequest* req, const std::shared_ptr<MemTracker>& parent,
|
||||
StorageEngine* storage_engine);
|
||||
@ -92,6 +98,7 @@ private:
|
||||
|
||||
private:
|
||||
bool _is_init = false;
|
||||
bool _is_cancelled = false;
|
||||
WriteRequest _req;
|
||||
TabletSharedPtr _tablet;
|
||||
RowsetSharedPtr _cur_rowset;
|
||||
@ -106,6 +113,8 @@ private:
|
||||
StorageEngine* _storage_engine;
|
||||
std::unique_ptr<FlushToken> _flush_token;
|
||||
std::shared_ptr<MemTracker> _mem_tracker;
|
||||
|
||||
SpinLock _lock;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -165,6 +165,8 @@ enum OLAPStatus {
|
||||
OLAP_ERR_TOO_MANY_TRANSACTIONS = -233,
|
||||
OLAP_ERR_INVALID_SNAPSHOT_VERSION = -234,
|
||||
OLAP_ERR_TOO_MANY_VERSION = -235,
|
||||
OLAP_ERR_NOT_INITIALIZED = -236,
|
||||
OLAP_ERR_ALREADY_CANCELLED = -237,
|
||||
|
||||
// CommandExecutor
|
||||
// [-300, -400)
|
||||
|
||||
@ -19,6 +19,7 @@
|
||||
|
||||
#include "util/doris_metrics.h"
|
||||
#include "util/spinlock.h"
|
||||
#include "util/stack_util.h"
|
||||
#include "util/uid_util.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -632,6 +632,7 @@ TabletSharedPtr TabletManager::get_tablet(TTabletId tablet_id, SchemaHash schema
|
||||
bool TabletManager::get_tablet_id_and_schema_hash_from_path(const string& path,
|
||||
TTabletId* tablet_id,
|
||||
TSchemaHash* schema_hash) {
|
||||
// the path like: /data/14/10080/964828783/
|
||||
static re2::RE2 normal_re("/data/\\d+/(\\d+)/(\\d+)($|/)");
|
||||
// match tablet schema hash data path, for example, the path is /data/1/16791/29998
|
||||
// 1 is shard id , 16791 is tablet id, 29998 is schema hash
|
||||
@ -651,6 +652,7 @@ bool TabletManager::get_tablet_id_and_schema_hash_from_path(const string& path,
|
||||
}
|
||||
|
||||
bool TabletManager::get_rowset_id_from_path(const string& path, RowsetId* rowset_id) {
|
||||
// the path like: /data/14/10080/964828783/02000000000000969144d8725cb62765f9af6cd3125d5a91_0.dat
|
||||
static re2::RE2 re("/data/\\d+/\\d+/\\d+/([A-Fa-f0-9]+)_.*");
|
||||
string id_str;
|
||||
bool ret = RE2::PartialMatch(path, re, &id_str);
|
||||
|
||||
@ -117,8 +117,7 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
|
||||
static void dummy_deleter(const CacheKey& key, void* value) {}
|
||||
|
||||
Status LoadChannelMgr::add_batch(const PTabletWriterAddBatchRequest& request,
|
||||
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
|
||||
int64_t* wait_lock_time_ns) {
|
||||
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
|
||||
UniqueId load_id(request.id());
|
||||
// 1. get load channel
|
||||
std::shared_ptr<LoadChannel> channel;
|
||||
|
||||
@ -51,8 +51,7 @@ public:
|
||||
Status open(const PTabletWriterOpenRequest& request);
|
||||
|
||||
Status add_batch(const PTabletWriterAddBatchRequest& request,
|
||||
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
|
||||
int64_t* wait_lock_time_ns);
|
||||
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec);
|
||||
|
||||
// cancel all tablet stream for 'load_id' load
|
||||
Status cancel(const PTabletWriterCancelRequest& request);
|
||||
|
||||
@ -77,23 +77,26 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& params) {
|
||||
|
||||
Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) {
|
||||
DCHECK(params.tablet_ids_size() == params.row_batch().num_rows());
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
if (_state != kOpened) {
|
||||
return _state == kFinished
|
||||
? _close_status
|
||||
: Status::InternalError(strings::Substitute("TabletsChannel $0 state: $1",
|
||||
_key.to_string(), _state));
|
||||
}
|
||||
auto next_seq = _next_seqs[params.sender_id()];
|
||||
// check packet
|
||||
if (params.packet_seq() < next_seq) {
|
||||
LOG(INFO) << "packet has already recept before, expect_seq=" << next_seq
|
||||
<< ", recept_seq=" << params.packet_seq();
|
||||
return Status::OK();
|
||||
} else if (params.packet_seq() > next_seq) {
|
||||
LOG(WARNING) << "lost data packet, expect_seq=" << next_seq
|
||||
<< ", recept_seq=" << params.packet_seq();
|
||||
return Status::InternalError("lost data packet");
|
||||
int64_t cur_seq;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
if (_state != kOpened) {
|
||||
return _state == kFinished
|
||||
? _close_status
|
||||
: Status::InternalError(strings::Substitute("TabletsChannel $0 state: $1",
|
||||
_key.to_string(), _state));
|
||||
}
|
||||
cur_seq = _next_seqs[params.sender_id()];
|
||||
// check packet
|
||||
if (params.packet_seq() < cur_seq) {
|
||||
LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq
|
||||
<< ", recept_seq=" << params.packet_seq();
|
||||
return Status::OK();
|
||||
} else if (params.packet_seq() > cur_seq) {
|
||||
LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq
|
||||
<< ", recept_seq=" << params.packet_seq();
|
||||
return Status::InternalError("lost data packet");
|
||||
}
|
||||
}
|
||||
|
||||
RowBatch row_batch(*_row_desc, params.row_batch(), _mem_tracker.get());
|
||||
@ -115,7 +118,11 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) {
|
||||
return Status::InternalError(err_msg);
|
||||
}
|
||||
}
|
||||
_next_seqs[params.sender_id()]++;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
_next_seqs[params.sender_id()] = cur_seq + 1;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -183,30 +190,21 @@ Status TabletsChannel::reduce_mem_usage() {
|
||||
// therefore it's possible for reduce_mem_usage() to be called right after close()
|
||||
return _close_status;
|
||||
}
|
||||
// find tablet writer with largest mem consumption
|
||||
int64_t max_consume = 0L;
|
||||
DeltaWriter* writer = nullptr;
|
||||
|
||||
// Flush all memtables
|
||||
for (auto& it : _tablet_writers) {
|
||||
if (it.second->mem_consumption() > max_consume) {
|
||||
max_consume = it.second->mem_consumption();
|
||||
writer = it.second;
|
||||
it.second->flush_memtable_and_wait(false);
|
||||
}
|
||||
|
||||
for (auto& it : _tablet_writers) {
|
||||
OLAPStatus st = it.second->wait_flush();
|
||||
if (st != OLAP_SUCCESS) {
|
||||
// flush failed, return error
|
||||
std::stringstream ss;
|
||||
ss << "failed to reduce mem consumption by flushing memtable. err: " << st;
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
}
|
||||
|
||||
if (writer == nullptr || max_consume == 0) {
|
||||
// barely not happend, just return OK
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
VLOG_NOTICE << "pick the delte writer to flush, with mem consumption: " << max_consume
|
||||
<< ", channel key: " << _key;
|
||||
OLAPStatus st = writer->flush_memtable_and_wait();
|
||||
if (st != OLAP_SUCCESS) {
|
||||
// flush failed, return error
|
||||
std::stringstream ss;
|
||||
ss << "failed to reduce mem consumption by flushing memtable. err: " << st;
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -97,11 +97,10 @@ void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
|
||||
_tablet_worker_pool.offer([request, response, done, this]() {
|
||||
brpc::ClosureGuard closure_guard(done);
|
||||
int64_t execution_time_ns = 0;
|
||||
int64_t wait_lock_time_ns = 0;
|
||||
{
|
||||
SCOPED_RAW_TIMER(&execution_time_ns);
|
||||
auto st = _exec_env->load_channel_mgr()->add_batch(
|
||||
*request, response->mutable_tablet_vec(), &wait_lock_time_ns);
|
||||
*request, response->mutable_tablet_vec());
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg()
|
||||
<< ", id=" << request->id() << ", index_id=" << request->index_id()
|
||||
@ -110,7 +109,6 @@ void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
|
||||
st.to_protobuf(response->mutable_status());
|
||||
}
|
||||
response->set_execution_time_us(execution_time_ns / 1000);
|
||||
response->set_wait_lock_time_us(wait_lock_time_ns / 1000);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -85,7 +85,11 @@ OLAPStatus DeltaWriter::cancel() {
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus DeltaWriter::flush_memtable_and_wait() {
|
||||
OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) {
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus DeltaWriter::wait_flush() {
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
@ -246,7 +250,7 @@ TEST_F(LoadChannelMgrTest, normal) {
|
||||
}
|
||||
row_batch.serialize(request.mutable_row_batch());
|
||||
google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
|
||||
auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns);
|
||||
auto st = mgr.add_batch(request, &tablet_vec);
|
||||
request.release_id();
|
||||
ASSERT_TRUE(st.ok());
|
||||
}
|
||||
@ -413,7 +417,7 @@ TEST_F(LoadChannelMgrTest, add_failed) {
|
||||
row_batch.serialize(request.mutable_row_batch());
|
||||
add_status = OLAP_ERR_TABLE_NOT_FOUND;
|
||||
google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
|
||||
auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns);
|
||||
auto st = mgr.add_batch(request, &tablet_vec);
|
||||
request.release_id();
|
||||
ASSERT_FALSE(st.ok());
|
||||
}
|
||||
@ -503,7 +507,7 @@ TEST_F(LoadChannelMgrTest, close_failed) {
|
||||
row_batch.serialize(request.mutable_row_batch());
|
||||
close_status = OLAP_ERR_TABLE_NOT_FOUND;
|
||||
google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
|
||||
auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns);
|
||||
auto st = mgr.add_batch(request, &tablet_vec);
|
||||
request.release_id();
|
||||
// even if delta close failed, the return status is still ok, but tablet_vec is empty
|
||||
ASSERT_TRUE(st.ok());
|
||||
@ -591,7 +595,7 @@ TEST_F(LoadChannelMgrTest, unknown_tablet) {
|
||||
}
|
||||
row_batch.serialize(request.mutable_row_batch());
|
||||
google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
|
||||
auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns);
|
||||
auto st = mgr.add_batch(request, &tablet_vec);
|
||||
request.release_id();
|
||||
ASSERT_FALSE(st.ok());
|
||||
}
|
||||
@ -677,10 +681,10 @@ TEST_F(LoadChannelMgrTest, duplicate_packet) {
|
||||
}
|
||||
row_batch.serialize(request.mutable_row_batch());
|
||||
google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec1;
|
||||
auto st = mgr.add_batch(request, &tablet_vec1, &wait_lock_time_ns);
|
||||
auto st = mgr.add_batch(request, &tablet_vec1);
|
||||
ASSERT_TRUE(st.ok());
|
||||
google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec2;
|
||||
st = mgr.add_batch(request, &tablet_vec2, &wait_lock_time_ns);
|
||||
st = mgr.add_batch(request, &tablet_vec2);
|
||||
request.release_id();
|
||||
ASSERT_TRUE(st.ok());
|
||||
}
|
||||
@ -693,7 +697,7 @@ TEST_F(LoadChannelMgrTest, duplicate_packet) {
|
||||
request.set_eos(true);
|
||||
request.set_packet_seq(0);
|
||||
google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
|
||||
auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns);
|
||||
auto st = mgr.add_batch(request, &tablet_vec);
|
||||
request.release_id();
|
||||
ASSERT_TRUE(st.ok());
|
||||
}
|
||||
|
||||
@ -1069,6 +1069,8 @@ public class Coordinator {
|
||||
List<List<TScanRangeParams>> perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges,
|
||||
expectedInstanceNum);
|
||||
|
||||
LOG.debug("scan range number per instance is: {}", perInstanceScanRanges.size());
|
||||
|
||||
for (List<TScanRangeParams> scanRangeParams : perInstanceScanRanges) {
|
||||
FInstanceExecParam instanceParam = new FInstanceExecParam(null, key, 0, params);
|
||||
instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams);
|
||||
@ -1085,7 +1087,7 @@ public class Coordinator {
|
||||
throw new UserException("there is no scanNode Backend");
|
||||
}
|
||||
this.addressToBackendID.put(execHostport, backendIdRef.getRef());
|
||||
FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport,
|
||||
FInstanceExecParam instanceParam = new FInstanceExecParam(null, execHostport,
|
||||
0, params);
|
||||
params.instanceExecParams.add(instanceParam);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user