Remove clear_alter_task (#2056)

Alter task has been refactored and clear_alter_task is not necessary.
This commit is contained in:
lichaoyong
2019-10-24 18:57:14 +08:00
committed by Mingyu Chen
parent 4848c94262
commit 0bcfddab92
11 changed files with 0 additions and 244 deletions

View File

@ -88,10 +88,6 @@ AgentServer::AgentServer(ExecEnv* exec_env,
TaskWorkerPool::TaskWorkerType::PUBLISH_VERSION,
_exec_env,
master_info);
_clear_alter_task_workers = new TaskWorkerPool(
TaskWorkerPool::TaskWorkerType::CLEAR_ALTER_TASK,
_exec_env,
master_info);
_clear_transaction_task_workers = new TaskWorkerPool(
TaskWorkerPool::TaskWorkerType::CLEAR_TRANSACTION_TASK,
exec_env,
@ -161,7 +157,6 @@ AgentServer::AgentServer(ExecEnv* exec_env,
_drop_tablet_workers->start();
_push_workers->start();
_publish_version_workers->start();
_clear_alter_task_workers->start();
_clear_transaction_task_workers->start();
_delete_workers->start();
_alter_tablet_workers->start();
@ -198,9 +193,6 @@ AgentServer::~AgentServer() {
if (_publish_version_workers != NULL) {
delete _publish_version_workers;
}
if (_clear_alter_task_workers != NULL) {
delete _clear_alter_task_workers;
}
if (_clear_transaction_task_workers != NULL) {
delete _clear_transaction_task_workers;
}
@ -313,13 +305,6 @@ void AgentServer::submit_tasks(
status_code = TStatusCode::ANALYSIS_ERROR;
}
break;
case TTaskType::CLEAR_ALTER_TASK:
if (task.__isset.clear_alter_task_req) {
_clear_alter_task_workers->submit_task(task);
} else {
status_code = TStatusCode::ANALYSIS_ERROR;
}
break;
case TTaskType::CLEAR_TRANSACTION_TASK:
if (task.__isset.clear_transaction_task_req) {
_clear_transaction_task_workers->submit_task(task);

View File

@ -97,7 +97,6 @@ private:
TaskWorkerPool* _drop_tablet_workers;
TaskWorkerPool* _push_workers;
TaskWorkerPool* _publish_version_workers;
TaskWorkerPool* _clear_alter_task_workers;
TaskWorkerPool* _clear_transaction_task_workers;
TaskWorkerPool* _delete_workers;
TaskWorkerPool* _alter_tablet_workers;

View File

@ -44,7 +44,6 @@
#include "olap/data_dir.h"
#include "olap/snapshot_manager.h"
#include "olap/task/engine_checksum_task.h"
#include "olap/task/engine_clear_alter_task.h"
#include "olap/task/engine_clone_task.h"
#include "olap/task/engine_alter_tablet_task.h"
#include "olap/task/engine_batch_load_task.h"
@ -135,10 +134,6 @@ void TaskWorkerPool::start() {
_worker_count = config::publish_version_worker_count;
_callback_function = _publish_version_worker_thread_callback;
break;
case TaskWorkerType::CLEAR_ALTER_TASK:
_worker_count = config::clear_alter_task_worker_count;
_callback_function = _clear_alter_task_worker_thread_callback;
break;
case TaskWorkerType::CLEAR_TRANSACTION_TASK:
_worker_count = config::clear_transaction_task_worker_count;
_callback_function = _clear_transaction_task_worker_thread_callback;
@ -869,57 +864,6 @@ void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) {
return (void*)0;
}
void* TaskWorkerPool::_clear_alter_task_worker_thread_callback(void* arg_this) {
TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this;
#ifndef BE_TEST
while (true) {
#endif
TAgentTaskRequest agent_task_req;
TClearAlterTaskRequest clear_alter_task_req;
{
lock_guard<Mutex> worker_thread_lock(worker_pool_this->_worker_thread_lock);
while (worker_pool_this->_tasks.empty()) {
worker_pool_this->_worker_thread_condition_lock.wait();
}
agent_task_req = worker_pool_this->_tasks.front();
clear_alter_task_req = agent_task_req.clear_alter_task_req;
worker_pool_this->_tasks.pop_front();
}
LOG(INFO) << "get clear alter task task, signature:" << agent_task_req.signature;
TStatusCode::type status_code = TStatusCode::OK;
vector<string> error_msgs;
TStatus task_status;
EngineClearAlterTask engine_task(clear_alter_task_req);
OLAPStatus clear_status = worker_pool_this->_env->storage_engine()->execute_task(&engine_task);
if (clear_status != OLAPStatus::OLAP_SUCCESS) {
OLAP_LOG_WARNING("clear alter task failed. [signature: %ld status=%d]",
agent_task_req.signature, clear_status);
error_msgs.push_back("clear alter task failed");
status_code = TStatusCode::RUNTIME_ERROR;
} else {
LOG(INFO) << "clear alter task success. signature:" << agent_task_req.signature;
}
task_status.__set_status_code(status_code);
task_status.__set_error_msgs(error_msgs);
TFinishTaskRequest finish_task_request;
finish_task_request.__set_task_status(task_status);
finish_task_request.__set_backend(worker_pool_this->_backend);
finish_task_request.__set_task_type(agent_task_req.task_type);
finish_task_request.__set_signature(agent_task_req.signature);
worker_pool_this->_finish_task(finish_task_request);
worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature, "");
#ifndef BE_TEST
}
#endif
return (void*)0;
}
void* TaskWorkerPool::_clear_transaction_task_worker_thread_callback(void* arg_this) {
TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this;

View File

@ -97,7 +97,6 @@ private:
static void* _drop_tablet_worker_thread_callback(void* arg_this);
static void* _push_worker_thread_callback(void* arg_this);
static void* _publish_version_worker_thread_callback(void* arg_this);
static void* _clear_alter_task_worker_thread_callback(void* arg_this);
static void* _clear_transaction_task_worker_thread_callback(void* arg_this);
static void* _alter_tablet_worker_thread_callback(void* arg_this);
static void* _clone_worker_thread_callback(void* arg_this);

View File

@ -64,8 +64,6 @@ namespace config {
CONF_Int32(push_worker_count_high_priority, "3");
// the count of thread to publish version
CONF_Int32(publish_version_worker_count, "2");
// the count of thread to clear alter task
CONF_Int32(clear_alter_task_worker_count, "1");
// the count of thread to clear transaction task
CONF_Int32(clear_transaction_task_worker_count, "1");
// the count of thread to delete

View File

@ -98,7 +98,6 @@ add_library(Olap STATIC
rowset/segment_v2/column_zone_map.cpp
task/engine_batch_load_task.cpp
task/engine_checksum_task.cpp
task/engine_clear_alter_task.cpp
task/engine_clone_task.cpp
task/engine_storage_migration_task.cpp
task/engine_publish_version_task.cpp

View File

@ -198,8 +198,6 @@ OLAPStatus StorageEngine::open() {
auto dirs = get_stores();
load_data_dirs(dirs);
// 取消未完成的SchemaChange任务
_tablet_manager->cancel_unfinished_schema_change();
_memtable_flush_executor = new MemTableFlushExecutor();
_memtable_flush_executor->init(dirs);

View File

@ -211,46 +211,6 @@ OLAPStatus TabletManager::_add_tablet_to_map(TTabletId tablet_id, SchemaHash sch
return res;
}
// this method is called when engine restarts so that not add any locks
void TabletManager::cancel_unfinished_schema_change() {
// Schema Change在引擎退出时schemachange信息还保存在在Header里,
// 引擎重启后,需清除schemachange信息,上层会重做
uint64_t canceled_num = 0;
LOG(INFO) << "begin to cancel unfinished schema change.";
for (const auto& tablet_instance : _tablet_map) {
for (TabletSharedPtr tablet : tablet_instance.second.table_arr) {
if (tablet == nullptr) {
LOG(WARNING) << "tablet does not exist. tablet_id=" << tablet_instance.first;
continue;
}
AlterTabletTaskSharedPtr alter_task = tablet->alter_task();
// if alter task's state == finished, could not do anything
if (alter_task == nullptr || alter_task->alter_state() == ALTER_FINISHED) {
continue;
}
OLAPStatus res = tablet->set_alter_state(ALTER_FAILED);
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "fail to set alter state. res=" << res
<< ", base_tablet=" << tablet->full_name();
return;
}
res = tablet->save_meta();
if (res != OLAP_SUCCESS) {
LOG(FATAL) << "fail to save base tablet meta. res=" << res
<< ", base_tablet=" << tablet->full_name();
return;
}
LOG(INFO) << "cancel unfinished alter tablet task. base_tablet=" << tablet->full_name();
++canceled_num;
}
}
LOG(INFO) << "finish to cancel unfinished schema change! canceled_num=" << canceled_num;
}
bool TabletManager::check_tablet_id_exist(TTabletId tablet_id) {
ReadLock rlock(&_tablet_map_lock);
return _check_tablet_id_exist_unlock(tablet_id);

View File

@ -58,8 +58,6 @@ public:
_tablet_map.clear();
}
void cancel_unfinished_schema_change();
bool check_tablet_id_exist(TTabletId tablet_id);
void clear();

View File

@ -1,76 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/task/engine_clear_alter_task.h"
namespace doris {
EngineClearAlterTask::EngineClearAlterTask(const TClearAlterTaskRequest& request)
:_clear_alter_task_req(request) { }
OLAPStatus EngineClearAlterTask::execute() {
return _clear_alter_task(_clear_alter_task_req.tablet_id, _clear_alter_task_req.schema_hash);
}
OLAPStatus EngineClearAlterTask::_clear_alter_task(const TTabletId tablet_id,
const TSchemaHash schema_hash) {
LOG(INFO) << "begin to process clear alter task. tablet_id=" << tablet_id
<< ", schema_hash=" << schema_hash;
TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash);
if (tablet == nullptr) {
LOG(WARNING) << "can't find tablet when process clear alter task."
<< " tablet_id=" << tablet_id
<< ", schema_hash=" << schema_hash;
return OLAP_SUCCESS;
}
// get schema change info
AlterTabletTaskSharedPtr alter_task = tablet->alter_task();
if (alter_task == nullptr) {
return OLAP_SUCCESS;
}
AlterTabletState alter_state = alter_task->alter_state();
TTabletId related_tablet_id = alter_task->related_tablet_id();
TSchemaHash related_schema_hash = alter_task->related_schema_hash();
if (alter_state == ALTER_PREPARED || alter_state == ALTER_RUNNING) {
LOG(WARNING) << "Alter task is not finished when processing clear alter task. "
<< "tablet=" << tablet->full_name();
return OLAP_ERR_PREVIOUS_SCHEMA_CHANGE_NOT_FINISHED;
}
// clear schema change info
OLAPStatus res = tablet->protected_delete_alter_task();
// clear related tablet's schema change info
TabletSharedPtr related_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(related_tablet_id, related_schema_hash);
if (related_tablet == nullptr) {
LOG(WARNING) << "related tablet not found when process clear alter task."
<< " tablet_id=" << tablet_id << ", schema_hash=" << schema_hash
<< ", related_tablet_id=" << related_tablet_id
<< ", related_schema_hash=" << related_schema_hash;
} else {
res = related_tablet->protected_delete_alter_task();
}
LOG(INFO) << "finish to process clear alter task."
<< "tablet_id=" << related_tablet_id
<< ", schema_hash=" << related_schema_hash;
return res;
}
} // doris

View File

@ -1,48 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef DORIS_BE_SRC_OLAP_TASK_ENGINE_CLEAR_ALTER_TASK_H
#define DORIS_BE_SRC_OLAP_TASK_ENGINE_CLEAR_ALTER_TASK_H
#include "gen_cpp/AgentService_types.h"
#include "olap/olap_define.h"
#include "olap/task/engine_task.h"
namespace doris {
// base class for storage engine
// add "Engine" as task prefix to prevent duplicate name with agent task
class EngineClearAlterTask : public EngineTask {
public:
virtual OLAPStatus execute();
public:
EngineClearAlterTask(const TClearAlterTaskRequest& request);
~EngineClearAlterTask() {}
private:
OLAPStatus _clear_alter_task(const TTabletId tablet_id,
const TSchemaHash schema_hash);
private:
const TClearAlterTaskRequest& _clear_alter_task_req;
}; // EngineTask
} // doris
#endif //DORIS_BE_SRC_OLAP_TASK_ENGINE_CLEAR_ALTER_TASK_H