[Feature](binlog) Add binlog gc && Auth master_token (#20854)
Signed-off-by: Jack Drogon <jack.xsuperman@gmail.com>
This commit is contained in:
@ -213,6 +213,10 @@ void TaskWorkerPool::start() {
|
||||
_worker_count = 1;
|
||||
_cb = std::bind<void>(&TaskWorkerPool::_push_cooldown_conf_worker_thread_callback, this);
|
||||
break;
|
||||
case TaskWorkerType::GC_BINLOG:
|
||||
_worker_count = 1;
|
||||
_cb = std::bind<void>(&TaskWorkerPool::_gc_binlog_worker_thread_callback, this);
|
||||
break;
|
||||
default:
|
||||
// pass
|
||||
break;
|
||||
@ -1705,6 +1709,41 @@ void AlterTableTaskPool::_alter_tablet(const TAgentTaskRequest& agent_task_req,
|
||||
finish_task_request->__set_task_status(status.to_thrift());
|
||||
}
|
||||
|
||||
void TaskWorkerPool::_gc_binlog_worker_thread_callback() {
|
||||
while (_is_work) {
|
||||
TAgentTaskRequest agent_task_req;
|
||||
{
|
||||
std::unique_lock<std::mutex> worker_thread_lock(_worker_thread_lock);
|
||||
_worker_thread_condition_variable.wait(
|
||||
worker_thread_lock, [this]() { return !_is_work || !_tasks.empty(); });
|
||||
if (!_is_work) {
|
||||
return;
|
||||
}
|
||||
|
||||
agent_task_req = _tasks.front();
|
||||
_tasks.pop_front();
|
||||
}
|
||||
|
||||
std::unordered_map<int64_t, int64_t> gc_tablet_infos;
|
||||
if (!agent_task_req.__isset.gc_binlog_req) {
|
||||
LOG(WARNING) << "gc binlog task is not valid";
|
||||
return;
|
||||
}
|
||||
if (!agent_task_req.gc_binlog_req.__isset.tablet_gc_binlog_infos) {
|
||||
LOG(WARNING) << "gc binlog task tablet_gc_binlog_infos is not valid";
|
||||
return;
|
||||
}
|
||||
|
||||
auto& tablet_gc_binlog_infos = agent_task_req.gc_binlog_req.tablet_gc_binlog_infos;
|
||||
for (auto& tablet_info : tablet_gc_binlog_infos) {
|
||||
// gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.schema_hash);
|
||||
gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.version);
|
||||
}
|
||||
|
||||
StorageEngine::instance()->gc_binlogs(gc_tablet_infos);
|
||||
}
|
||||
}
|
||||
|
||||
CloneTaskPool::CloneTaskPool(ExecEnv* env, ThreadModel thread_model)
|
||||
: TaskWorkerPool(TaskWorkerType::CLONE, env, *env->master_info(), thread_model) {
|
||||
_worker_count = config::clone_worker_count;
|
||||
|
||||
Reference in New Issue
Block a user