[improve](move-memtable) limit task num in load stream flush token (#28748)
This commit is contained in:
@ -776,6 +776,8 @@ DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB
|
||||
DEFINE_Int32(load_stream_messages_in_batch, "128");
|
||||
// brpc streaming StreamWait seconds on EAGAIN
|
||||
DEFINE_Int32(load_stream_eagain_wait_seconds, "60");
|
||||
// max tasks per flush token in load stream
|
||||
DEFINE_Int32(load_stream_flush_token_max_tasks, "2");
|
||||
|
||||
// max send batch parallelism for OlapTableSink
|
||||
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
|
||||
|
||||
@ -837,6 +837,8 @@ DECLARE_Int64(load_stream_max_buf_size);
|
||||
DECLARE_Int32(load_stream_messages_in_batch);
|
||||
// brpc streaming StreamWait seconds on EAGAIN
|
||||
DECLARE_Int32(load_stream_eagain_wait_seconds);
|
||||
// max tasks per flush token in load stream
|
||||
DECLARE_Int32(load_stream_flush_token_max_tasks);
|
||||
|
||||
// max send batch parallelism for OlapTableSink
|
||||
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
#include "runtime/load_stream.h"
|
||||
|
||||
#include <brpc/stream.h>
|
||||
#include <bthread/bthread.h>
|
||||
#include <bthread/condition_variable.h>
|
||||
#include <bthread/mutex.h>
|
||||
#include <olap/rowset/rowset_factory.h>
|
||||
@ -136,7 +137,11 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
|
||||
LOG(INFO) << "write data failed " << *this;
|
||||
}
|
||||
};
|
||||
return _flush_tokens[new_segid % _flush_tokens.size()]->submit_func(flush_func);
|
||||
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
|
||||
while (flush_token->num_tasks() >= config::load_stream_flush_token_max_tasks) {
|
||||
bthread_usleep(10 * 1000); // 10ms
|
||||
}
|
||||
return flush_token->submit_func(flush_func);
|
||||
}
|
||||
|
||||
Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) {
|
||||
@ -170,7 +175,11 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
|
||||
LOG(INFO) << "add segment failed " << *this;
|
||||
}
|
||||
};
|
||||
return _flush_tokens[new_segid % _flush_tokens.size()]->submit_func(add_segment_func);
|
||||
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
|
||||
while (flush_token->num_tasks() >= config::load_stream_flush_token_max_tasks) {
|
||||
bthread_usleep(10 * 1000); // 10ms
|
||||
}
|
||||
return flush_token->submit_func(add_segment_func);
|
||||
}
|
||||
|
||||
Status TabletStream::close() {
|
||||
|
||||
Reference in New Issue
Block a user