Files
doris/be/test/agent/task_worker_pool_test.cpp
morningman cc74efb3c5 merge to ddb65b69f9c788e359e191889cb31f15279c41ec (#224)
1. Apache HDFS broker support HDFS HA and Hadoop kerberos authentication.
2. New Backup and Restore function. Use Fs Broker to backup your data to HDFS or restore them from HDFS.
3. Table-Level Privileges. Grant fine-grained privileges on table-level to specified user.
4. A lot of bugs fixed.
5. Performance improvement.
2018-08-24 17:12:26 +08:00

1645 lines
74 KiB
C++

// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <fstream>
#include <iostream>
#include <memory>
#include <sstream>
#include "gtest/gtest.h"
#include "gmock/gmock.h"
#include "agent/file_downloader.h"
#include "agent/mock_file_downloader.h"
#include "agent/mock_pusher.h"
#include "agent/mock_utils.h"
#include "agent/mock_task_worker_pool.h"
#include "agent/task_worker_pool.h"
#include "agent/utils.h"
#include "olap/mock_command_executor.h"
#include "util/logging.h"
#include "runtime/exec_env.h"
using ::testing::_;
using ::testing::Return;
using ::testing::SetArgPointee;
using std::string;
using std::deque;
namespace palo {
MockFileDownloader::MockFileDownloader(const FileDownloaderParam& param):FileDownloader(param) {
}
MockPusher::MockPusher(const TPushReq& push_req) : Pusher(push_req) {
};
MockAgentServerClient::MockAgentServerClient(const TBackend backend)
: AgentServerClient(backend) {
}
MockMasterServerClient::MockMasterServerClient(
const TMasterInfo& master_info,
FrontendServiceClientCache* client_cache) : MasterServerClient(master_info, client_cache) {
}
TEST(TaskWorkerPoolTest, TestStart) {
TMasterInfo master_info;
ExecEnv env;
TaskWorkerPool task_worker_pool_create_table(
TaskWorkerPool::TaskWorkerType::CREATE_TABLE,
&env,
master_info);
task_worker_pool_create_table.start();
EXPECT_EQ(task_worker_pool_create_table._worker_count, config::create_table_worker_count);
TaskWorkerPool task_worker_pool_drop_table(
TaskWorkerPool::TaskWorkerType::DROP_TABLE,
&env,
master_info);
task_worker_pool_drop_table.start();
EXPECT_EQ(task_worker_pool_create_table._worker_count, config::drop_table_worker_count);
TaskWorkerPool task_worker_pool_push(
TaskWorkerPool::TaskWorkerType::PUSH,
&env,
master_info);
task_worker_pool_push.start();
EXPECT_EQ(task_worker_pool_push._worker_count, config::push_worker_count_normal_priority
+ config::push_worker_count_high_priority);
TaskWorkerPool task_worker_pool_alter_table(
TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
&env,
master_info);
task_worker_pool_alter_table.start();
EXPECT_EQ(task_worker_pool_alter_table._worker_count, config::alter_table_worker_count);
TaskWorkerPool task_worker_pool_clone(
TaskWorkerPool::TaskWorkerType::CLONE,
&env,
master_info);
task_worker_pool_clone.start();
EXPECT_EQ(task_worker_pool_clone._worker_count, config::clone_worker_count);
TaskWorkerPool task_worker_pool_cancel_delete_data(
TaskWorkerPool::TaskWorkerType::CANCEL_DELETE_DATA,
&env,
master_info);
task_worker_pool_cancel_delete_data.start();
EXPECT_EQ(
task_worker_pool_cancel_delete_data._worker_count,
config::cancel_delete_data_worker_count);
TaskWorkerPool task_worker_pool_report_task(
TaskWorkerPool::TaskWorkerType::REPORT_TASK,
&env,
master_info);
task_worker_pool_report_task.start();
EXPECT_EQ(task_worker_pool_report_task._worker_count, REPORT_TASK_WORKER_COUNT);
TaskWorkerPool task_worker_pool_report_disk_state(
TaskWorkerPool::TaskWorkerType::REPORT_DISK_STATE,
&env,
master_info);
task_worker_pool_report_disk_state.start();
EXPECT_EQ(task_worker_pool_report_disk_state._worker_count, REPORT_DISK_STATE_WORKER_COUNT);
TaskWorkerPool task_worker_pool_report_olap_table(
TaskWorkerPool::TaskWorkerType::REPORT_OLAP_TABLE,
&env,
master_info);
task_worker_pool_report_olap_table.start();
EXPECT_EQ(task_worker_pool_report_olap_table._worker_count, REPORT_OLAP_TABLE_WORKER_COUNT);
TaskWorkerPool task_worker_pool_upload(
TaskWorkerPool::TaskWorkerType::UPLOAD,
&env,
master_info);
task_worker_pool_upload.start();
EXPECT_EQ(task_worker_pool_upload._worker_count, config::upload_worker_count);
TaskWorkerPool task_worker_pool_make_snapshot(
TaskWorkerPool::TaskWorkerType::MAKE_SNAPSHOT,
&env,
master_info);
task_worker_pool_make_snapshot.start();
EXPECT_EQ(task_worker_pool_make_snapshot._worker_count, config::make_snapshot_worker_count);
TaskWorkerPool task_worker_pool_release_snapshot(
TaskWorkerPool::TaskWorkerType::RELEASE_SNAPSHOT,
&env,
master_info);
task_worker_pool_release_snapshot.start();
EXPECT_EQ(task_worker_pool_release_snapshot._worker_count,
config::release_snapshot_worker_count);
}
TEST(TaskWorkerPoolTest, TestSubmitTask) {
TMasterInfo master_info;
ExecEnv env;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
&env,
master_info);
// Record signature success
TAgentTaskRequest agent_task_request;
agent_task_request.task_type = TTaskType::ROLLUP;
agent_task_request.signature = 123456;
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._tasks.size());
// Record same signature
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._s_task_signatures[agent_task_request.task_type].clear();
}
TEST(TaskWorkerPoolTest, TestRecordTaskInfo) {
TMasterInfo master_info;
ExecEnv env;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
&env,
master_info);
TTaskType::type task_type = TTaskType::ROLLUP;
// Record signature success
bool ret = task_worker_pool._record_task_info(task_type, 123456, "root");
EXPECT_TRUE(ret);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[task_type].size());
// Record same signature
ret = task_worker_pool._record_task_info(task_type, 123456, "root");
EXPECT_FALSE(ret);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[task_type].size());
// Record different signature
ret = task_worker_pool._record_task_info(task_type, 123457, "");
EXPECT_TRUE(ret);
EXPECT_EQ(2, task_worker_pool._s_task_signatures[task_type].size());
TMasterInfo master_info2;
TaskWorkerPool task_worker_pool2(
TaskWorkerPool::TaskWorkerType::PUSH,
&env,
master_info2);
TTaskType::type task_type2 = TTaskType::PUSH;
// Record push task info
ret = task_worker_pool._record_task_info(task_type2, 223456, "root");
EXPECT_TRUE(ret);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[task_type2].size());
EXPECT_EQ(1, task_worker_pool._s_total_task_user_count[task_type2]["root"]);
EXPECT_EQ(1, task_worker_pool._s_total_task_count[task_type2]);
// Record same signature push task
ret = task_worker_pool._record_task_info(task_type2, 223456, "user");
EXPECT_FALSE(ret);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[task_type2].size());
EXPECT_EQ(1, task_worker_pool._s_total_task_user_count[task_type2]["root"]);
EXPECT_EQ(1, task_worker_pool._s_total_task_count[task_type2]);
// Record diff signature same user
ret = task_worker_pool._record_task_info(task_type2, 223457, "root");
EXPECT_TRUE(ret);
EXPECT_EQ(2, task_worker_pool._s_task_signatures[task_type2].size());
EXPECT_EQ(2, task_worker_pool._s_total_task_user_count[task_type2]["root"]);
EXPECT_EQ(2, task_worker_pool._s_total_task_count[task_type2]);
// Record diff signature diff user
ret = task_worker_pool._record_task_info(task_type2, 223458, "user");
EXPECT_TRUE(ret);
EXPECT_EQ(3, task_worker_pool._s_task_signatures[task_type2].size());
EXPECT_EQ(1, task_worker_pool._s_total_task_user_count[task_type2]["user"]);
EXPECT_EQ(3, task_worker_pool._s_total_task_count[task_type2]);
}
TEST(TaskWorkerPoolTest, TestRemoveTaskInfo) {
TMasterInfo master_info;
ExecEnv env;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
&env,
master_info);
TTaskType::type task_type = TTaskType::ROLLUP;
EXPECT_EQ(2, task_worker_pool._s_task_signatures[task_type].size());
task_worker_pool._remove_task_info(task_type, 123456, "root");
EXPECT_EQ(1, task_worker_pool._s_task_signatures[task_type].size());
task_worker_pool._remove_task_info(task_type, 123457, "root");
EXPECT_EQ(0, task_worker_pool._s_task_signatures[task_type].size());
TTaskType::type task_type_push = TTaskType::PUSH;
task_worker_pool._s_running_task_user_count[task_type_push]["root"] = 2;
task_worker_pool._s_running_task_user_count[task_type_push]["user"] = 1;
EXPECT_EQ(3, task_worker_pool._s_task_signatures[task_type_push].size());
EXPECT_EQ(2, task_worker_pool._s_total_task_user_count[task_type_push]["root"]);
EXPECT_EQ(1, task_worker_pool._s_total_task_user_count[task_type_push]["user"]);
EXPECT_EQ(3, task_worker_pool._s_total_task_count[task_type_push]);
task_worker_pool._remove_task_info(task_type_push, 223456, "root");
EXPECT_EQ(2, task_worker_pool._s_task_signatures[task_type_push].size());
EXPECT_EQ(1, task_worker_pool._s_total_task_user_count[task_type_push]["root"]);
EXPECT_EQ(2, task_worker_pool._s_total_task_count[task_type_push]);
EXPECT_EQ(1, task_worker_pool._s_running_task_user_count[task_type_push]["root"]);
task_worker_pool._remove_task_info(task_type_push, 223457, "root");
EXPECT_EQ(1, task_worker_pool._s_task_signatures[task_type_push].size());
EXPECT_EQ(0, task_worker_pool._s_total_task_user_count[task_type_push]["root"]);
EXPECT_EQ(1, task_worker_pool._s_total_task_count[task_type_push]);
EXPECT_EQ(0, task_worker_pool._s_running_task_user_count[task_type_push]["root"]);
task_worker_pool._remove_task_info(task_type_push, 223458, "user");
EXPECT_EQ(0, task_worker_pool._s_task_signatures[task_type_push].size());
EXPECT_EQ(0, task_worker_pool._s_total_task_user_count[task_type_push]["user"]);
EXPECT_EQ(0, task_worker_pool._s_total_task_count[task_type_push]);
EXPECT_EQ(0, task_worker_pool._s_running_task_user_count[task_type_push]["user"]);
}
TEST(TaskWorkerPoolTest, TestGetNextTask) {
TMasterInfo master_info;
ExecEnv env;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::PUSH,
&env,
master_info);
// Add 1 task
int32_t thread_count = 3;
deque<TAgentTaskRequest> tasks;
TAgentTaskRequest task1;
task1.resource_info.user = "root";
task1.__isset.resource_info = true;
task1.task_type = TTaskType::PUSH;
tasks.push_back(task1);
task_worker_pool._s_total_task_user_count[TTaskType::PUSH]["root"] = 1;
task_worker_pool._s_total_task_count[TTaskType::PUSH] = 1;
task_worker_pool._s_running_task_user_count[TTaskType::PUSH]["root"] = 0;
uint32_t ret = task_worker_pool._get_next_task_index(thread_count, tasks, TPriority::NORMAL);
EXPECT_EQ(0, ret);
tasks.erase(tasks.begin() + 0);
// Add 3 task
TAgentTaskRequest task2;
task2.resource_info.user = "root";
task2.__isset.resource_info = true;
task2.task_type = TTaskType::PUSH;
tasks.push_back(task2);
TAgentTaskRequest task3;
task3.resource_info.user = "root";
task3.__isset.resource_info = true;
task3.task_type = TTaskType::PUSH;
tasks.push_back(task3);
TAgentTaskRequest task4;
task4.resource_info.user = "user1";
task4.__isset.resource_info = true;
task4.task_type = TTaskType::PUSH;
tasks.push_back(task4);
task_worker_pool._s_total_task_user_count[TTaskType::PUSH]["root"] = 3;
task_worker_pool._s_total_task_user_count[TTaskType::PUSH]["user1"] = 1;
task_worker_pool._s_total_task_count[TTaskType::PUSH] = 4;
task_worker_pool._s_running_task_user_count[TTaskType::PUSH]["root"] = 1;
ret = task_worker_pool._get_next_task_index(thread_count, tasks, TPriority::NORMAL);
EXPECT_EQ(0, ret);
tasks.erase(tasks.begin() + 0);
// Go on
task_worker_pool._s_running_task_user_count[TTaskType::PUSH]["root"] = 2;
ret = task_worker_pool._get_next_task_index(thread_count, tasks, TPriority::NORMAL);
EXPECT_EQ(1, ret);
tasks.erase(tasks.begin() + 1);
// Add 2 task, 1 root task finished
TAgentTaskRequest task5;
task5.resource_info.user = "user1";
task5.__isset.resource_info = true;
task5.task_type = TTaskType::PUSH;
tasks.push_back(task5);
TAgentTaskRequest task6;
task6.resource_info.user = "user2";
task6.__isset.resource_info = true;
task6.task_type = TTaskType::PUSH;
tasks.push_back(task6);
task_worker_pool._s_total_task_user_count[TTaskType::PUSH]["root"] = 2;
task_worker_pool._s_total_task_user_count[TTaskType::PUSH]["user1"] = 2;
task_worker_pool._s_total_task_user_count[TTaskType::PUSH]["user2"] = 1;
task_worker_pool._s_total_task_count[TTaskType::PUSH] = 5;
task_worker_pool._s_running_task_user_count[TTaskType::PUSH]["root"] = 1;
task_worker_pool._s_running_task_user_count[TTaskType::PUSH]["user1"] = 1;
ret = task_worker_pool._get_next_task_index(thread_count, tasks, TPriority::NORMAL);
EXPECT_EQ(2, ret);
tasks.erase(tasks.begin() + 2);
// User2 task finished, no one task was fit, choose first one
task_worker_pool._s_total_task_user_count[TTaskType::PUSH]["user2"] = 0;
task_worker_pool._s_total_task_count[TTaskType::PUSH] = 4;
ret = task_worker_pool._get_next_task_index(thread_count, tasks, TPriority::NORMAL);
EXPECT_EQ(0, ret);
}
TEST(TaskWorkerPoolTest, TestFinishTask) {
TMasterInfo master_info;
ExecEnv env;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
&env,
master_info);
FrontendServiceClientCache* client_cache = new FrontendServiceClientCache();
MockMasterServerClient mock_master_server_client(master_info, client_cache);
MasterServerClient* original_master_server_client;
original_master_server_client = task_worker_pool._master_client;
task_worker_pool._master_client = &mock_master_server_client;
// Finish task failed
TFinishTaskRequest finish_task_request;
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(TASK_FINISH_MAX_RETRY)
.WillRepeatedly(Return(PALO_ERROR));
task_worker_pool._finish_task(finish_task_request);
// Finish task success
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool._finish_task(finish_task_request);
task_worker_pool._master_client = original_master_server_client;
}
TEST(TaskWorkerPoolTest, TestCreateTable) {
TMasterInfo master_info;
ExecEnv env;
TAgentTaskRequest agent_task_request;
agent_task_request.task_type = TTaskType::CREATE;
agent_task_request.signature = 123456;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::CREATE_TABLE,
&env,
master_info);
MockCommandExecutor mock_command_executor;
CommandExecutor* original_command_executor;
original_command_executor = task_worker_pool._command_executor;
task_worker_pool._command_executor = &mock_command_executor;
FrontendServiceClientCache* client_cache = new FrontendServiceClientCache();
MockMasterServerClient mock_master_server_client(master_info, client_cache);
MasterServerClient* original_master_server_client;
original_master_server_client = task_worker_pool._master_client;
task_worker_pool._master_client = &mock_master_server_client;
// Create table failed
EXPECT_CALL(mock_command_executor, create_table(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._create_table_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// Create table success
EXPECT_CALL(mock_command_executor, create_table(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._create_table_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
task_worker_pool._command_executor = original_command_executor;
task_worker_pool._master_client = original_master_server_client;
}
TEST(TaskWorkerPoolTest, TestDropTableTask) {
TMasterInfo master_info;
ExecEnv env;
TAgentTaskRequest agent_task_request;
agent_task_request.task_type = TTaskType::DROP;
agent_task_request.signature = 123456;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::DROP_TABLE,
&env,
master_info);
MockCommandExecutor mock_command_executor;
CommandExecutor* original_command_executor;
original_command_executor = task_worker_pool._command_executor;
task_worker_pool._command_executor = &mock_command_executor;
FrontendServiceClientCache* client_cache = new FrontendServiceClientCache();
MockMasterServerClient mock_master_server_client(master_info, client_cache);
MasterServerClient* original_master_server_client;
original_master_server_client = task_worker_pool._master_client;
task_worker_pool._master_client = &mock_master_server_client;
// Drop table failed
EXPECT_CALL(mock_command_executor, drop_table(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._drop_table_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// Drop table success
EXPECT_CALL(mock_command_executor, drop_table(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._drop_table_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
task_worker_pool._command_executor = original_command_executor;
task_worker_pool._master_client = original_master_server_client;
}
TEST(TaskWorkerPoolTest, TestSchemaChange) {
TMasterInfo master_info;
ExecEnv env;
TAgentTaskRequest agent_task_request;
agent_task_request.task_type = TTaskType::SCHEMA_CHANGE;
agent_task_request.signature = 123456;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
&env,
master_info);
MockCommandExecutor mock_command_executor;
CommandExecutor* original_command_executor;
original_command_executor = task_worker_pool._command_executor;
task_worker_pool._command_executor = &mock_command_executor;
FrontendServiceClientCache* client_cache = new FrontendServiceClientCache();
MockMasterServerClient mock_master_server_client(master_info, client_cache);
MasterServerClient* original_master_server_client;
original_master_server_client = task_worker_pool._master_client;
task_worker_pool._master_client = &mock_master_server_client;
// New tablet size ok, last schema change status is failed
// Delete failed alter table tablet file failed
TCreateTabletReq create_tablet_req1;
agent_task_request.alter_tablet_req.base_tablet_id = 12345;
agent_task_request.alter_tablet_req.base_schema_hash = 56789;
agent_task_request.alter_tablet_req.__set_new_tablet_req(create_tablet_req1);
EXPECT_CALL(mock_command_executor, show_alter_table_status(
agent_task_request.alter_tablet_req.base_tablet_id,
agent_task_request.alter_tablet_req.base_schema_hash))
.Times(1)
.WillOnce(Return(ALTER_TABLE_FAILED));
EXPECT_CALL(mock_command_executor, drop_table(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
EXPECT_CALL(mock_command_executor, schema_change(_))
.Times(0);
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._alter_table_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// New tablet size ok, last schema change status is failed
// Delete failed alter table tablet file success
// Do schema change failed
EXPECT_CALL(mock_command_executor, show_alter_table_status(
agent_task_request.alter_tablet_req.base_tablet_id,
agent_task_request.alter_tablet_req.base_schema_hash))
.Times(1)
.WillOnce(Return(ALTER_TABLE_FAILED));
EXPECT_CALL(mock_command_executor, drop_table(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_command_executor, schema_change(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._alter_table_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// New tablet size ok, last schema change status is failed
// Delete failed alter table tablet file success
// Do schema change success, check status failed
EXPECT_CALL(mock_command_executor, show_alter_table_status(
agent_task_request.alter_tablet_req.base_tablet_id,
agent_task_request.alter_tablet_req.base_schema_hash))
.Times(1)
.WillOnce(Return(ALTER_TABLE_FAILED));
EXPECT_CALL(mock_command_executor, drop_table(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_command_executor, schema_change(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_command_executor, report_tablet_info(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._alter_table_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// New tablet size ok, last schema change status is ok
// Do schema change success, check status running then success
EXPECT_CALL(mock_command_executor, show_alter_table_status(
agent_task_request.alter_tablet_req.base_tablet_id,
agent_task_request.alter_tablet_req.base_schema_hash))
.Times(1)
.WillOnce(Return(ALTER_TABLE_DONE));
EXPECT_CALL(mock_command_executor, drop_table(_))
.Times(0);
EXPECT_CALL(mock_command_executor, schema_change(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_command_executor, report_tablet_info(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._alter_table_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
task_worker_pool._command_executor = original_command_executor;
task_worker_pool._master_client = original_master_server_client;
}
TEST(TaskWorkerPoolTest, TestRollup) {
TMasterInfo master_info;
ExecEnv env;
TAgentTaskRequest agent_task_request;
agent_task_request.task_type = TTaskType::ROLLUP;
agent_task_request.signature = 123456;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
&env,
master_info);
MockCommandExecutor mock_command_executor;
CommandExecutor* original_command_executor;
original_command_executor = task_worker_pool._command_executor;
task_worker_pool._command_executor = &mock_command_executor;
FrontendServiceClientCache* client_cache = new FrontendServiceClientCache();
MockMasterServerClient mock_master_server_client(master_info, client_cache);
MasterServerClient* original_master_server_client;
original_master_server_client = task_worker_pool._master_client;
task_worker_pool._master_client = &mock_master_server_client;
// New tablet size ok, last rollup status is ok
// Do rollup success, check status running then success
TCreateTabletReq create_tablet_req1;
agent_task_request.alter_tablet_req.base_tablet_id = 12345;
agent_task_request.alter_tablet_req.base_schema_hash = 56789;
agent_task_request.alter_tablet_req.__set_new_tablet_req(create_tablet_req1);
EXPECT_CALL(mock_command_executor, show_alter_table_status(
agent_task_request.alter_tablet_req.base_tablet_id,
agent_task_request.alter_tablet_req.base_schema_hash))
.Times(1)
.WillOnce(Return(ALTER_TABLE_DONE));
EXPECT_CALL(mock_command_executor, drop_table(_))
.Times(0);
EXPECT_CALL(mock_command_executor, create_rollup_table(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_command_executor, report_tablet_info(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._alter_table_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
task_worker_pool._command_executor = original_command_executor;
task_worker_pool._master_client = original_master_server_client;
}
TEST(TaskWorkerPoolTest, TestPush) {
TMasterInfo master_info;
ExecEnv env;
TAgentTaskRequest agent_task_request;
agent_task_request.task_type = TTaskType::PUSH;
agent_task_request.signature = 123456;
agent_task_request.__set_priority(TPriority::HIGH);
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::PUSH,
&env,
master_info);
MockCommandExecutor mock_command_executor;
CommandExecutor* original_command_executor;
original_command_executor = task_worker_pool._command_executor;
task_worker_pool._command_executor = &mock_command_executor;
FrontendServiceClientCache* client_cache = new FrontendServiceClientCache();
MockMasterServerClient mock_master_server_client(master_info, client_cache);
MasterServerClient* original_master_server_client;
original_master_server_client = task_worker_pool._master_client;
task_worker_pool._master_client = &mock_master_server_client;
TPushReq push_req;
MockPusher mock_pusher(push_req);
Pusher* original_pusher = task_worker_pool._pusher;
task_worker_pool._pusher = &mock_pusher;
// Push type load, push init failed
agent_task_request.push_req.push_type = TPushType::LOAD;
EXPECT_CALL(mock_pusher, init())
.Times(1)
.WillOnce(Return(PALO_ERROR));
EXPECT_CALL(mock_pusher, process(_))
.Times(0);
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._push_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// Push type load, push init success, push failed
EXPECT_CALL(mock_pusher, init())
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
EXPECT_CALL(mock_pusher, process(_))
.Times(PUSH_MAX_RETRY)
.WillRepeatedly(Return(PALO_ERROR));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._push_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// Push type load, push init success, push success
EXPECT_CALL(mock_pusher, init())
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
EXPECT_CALL(mock_pusher, process(_))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._push_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// Push type delete, delete failed
agent_task_request.push_req.push_type = TPushType::DELETE;
EXPECT_CALL(mock_command_executor, delete_data(_, _))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._push_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// Push type delete, delete success
EXPECT_CALL(mock_command_executor, delete_data(_, _))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._push_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
task_worker_pool._command_executor = original_command_executor;
task_worker_pool._master_client = original_master_server_client;
task_worker_pool._pusher = original_pusher;
}
TEST(TaskWorkerPoolTest, TestClone) {
TMasterInfo master_info;
ExecEnv env;
TAgentTaskRequest agent_task_request;
agent_task_request.task_type = TTaskType::CLONE;
agent_task_request.signature = 123456;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::CLONE,
&env,
master_info);
MockCommandExecutor mock_command_executor;
CommandExecutor* original_command_executor;
original_command_executor = task_worker_pool._command_executor;
task_worker_pool._command_executor = &mock_command_executor;
FrontendServiceClientCache* client_cache = new FrontendServiceClientCache();
MockMasterServerClient mock_master_server_client(master_info, client_cache);
MasterServerClient* original_master_server_client;
original_master_server_client = task_worker_pool._master_client;
task_worker_pool._master_client = &mock_master_server_client;
TBackend backend;
MockAgentServerClient mock_agent_server_client(backend);
AgentServerClient* original_agent_server_client;
original_agent_server_client = task_worker_pool._agent_client;
task_worker_pool._agent_client = &mock_agent_server_client;
FileDownloader::FileDownloaderParam param;
MockFileDownloader mock_file_downloader(param);
FileDownloader* original_file_downloader_ptr;
original_file_downloader_ptr = task_worker_pool._file_downloader_ptr;
task_worker_pool._file_downloader_ptr = &mock_file_downloader;
MockAgentUtils mock_agent_utils;
AgentUtils* original_agent_utils;
original_agent_utils = task_worker_pool._agent_utils;
task_worker_pool._agent_utils = &mock_agent_utils;
// Tablet has exist, get tablet info failed
agent_task_request.clone_req.tablet_id = 123;
agent_task_request.clone_req.schema_hash = 456;
std::shared_ptr<OLAPTable> olap_table_ok(new OLAPTable(NULL));
EXPECT_CALL(mock_command_executor, get_table(
agent_task_request.clone_req.tablet_id,
agent_task_request.clone_req.schema_hash))
.Times(1)
.WillOnce(Return(olap_table_ok));
EXPECT_CALL(mock_command_executor, obtain_shard_path(_, _))
.Times(0);
EXPECT_CALL(mock_command_executor, report_tablet_info(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._clone_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// Tablet not exist, obtain root path failed, do not get tablet info
std::shared_ptr<OLAPTable> olap_table_null(NULL);
EXPECT_CALL(mock_command_executor, get_table(
agent_task_request.clone_req.tablet_id,
agent_task_request.clone_req.schema_hash))
.Times(1)
.WillOnce(Return(olap_table_null));
EXPECT_CALL(mock_command_executor, obtain_shard_path(_, _))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
EXPECT_CALL(mock_command_executor, report_tablet_info(_))
.Times(0);
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._clone_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// Tablet not exist, obtain root path success, make snapshot failed
TCloneReq clone_req;
TBackend backend1;
TBackend backend2;
TBackend backend3;
clone_req.src_backends.push_back(backend1);
clone_req.src_backends.push_back(backend2);
clone_req.src_backends.push_back(backend3);
TAgentResult agent_result;
agent_result.status.status_code = TStatusCode::INTERNAL_ERROR;
TAgentResult agent_result2;
agent_result2.status.status_code = TStatusCode::OK;
agent_result2.__isset.snapshot_path = false;
agent_task_request.__set_clone_req(clone_req);
EXPECT_CALL(mock_command_executor, get_table(
agent_task_request.clone_req.tablet_id,
agent_task_request.clone_req.schema_hash))
.Times(1)
.WillOnce(Return(olap_table_null));
EXPECT_CALL(mock_command_executor, obtain_shard_path(_, _))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
TSnapshotRequest snapshot_request;
snapshot_request.__set_tablet_id(agent_task_request.clone_req.tablet_id);
snapshot_request.__set_schema_hash(agent_task_request.clone_req.schema_hash);
EXPECT_CALL(mock_agent_server_client, make_snapshot(snapshot_request, _))
.Times(clone_req.src_backends.size())
.WillOnce(DoAll(SetArgPointee<1>(agent_result), Return(PALO_SUCCESS)))
.WillOnce(DoAll(SetArgPointee<1>(agent_result2), Return(PALO_SUCCESS)))
.WillOnce(DoAll(SetArgPointee<1>(agent_result), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_agent_server_client, release_snapshot(_, _))
.Times(0);
EXPECT_CALL(mock_command_executor, report_tablet_info(_))
.Times(0);
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._clone_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// Tablet not exist, obtain root path success, make snapshot success
// List remote dir failed
clone_req.tablet_id = 5;
clone_req.schema_hash = 6;
agent_task_request.__set_clone_req(clone_req);
agent_result2.__set_snapshot_path("path");
EXPECT_CALL(mock_command_executor, get_table(
agent_task_request.clone_req.tablet_id,
agent_task_request.clone_req.schema_hash))
.Times(1)
.WillOnce(Return(olap_table_null));
EXPECT_CALL(mock_command_executor, obtain_shard_path(_, _))
.Times(1)
.WillOnce(
DoAll(SetArgPointee<1>("./test_data"),
Return(OLAPStatus::OLAP_SUCCESS)));
snapshot_request.__set_tablet_id(agent_task_request.clone_req.tablet_id);
snapshot_request.__set_schema_hash(agent_task_request.clone_req.schema_hash);
EXPECT_CALL(mock_agent_server_client, make_snapshot(snapshot_request, _))
.Times(clone_req.src_backends.size())
.WillRepeatedly(DoAll(SetArgPointee<1>(agent_result2), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_file_downloader, list_file_dir(_))
.Times(clone_req.src_backends.size() * DOWNLOAD_FILE_MAX_RETRY)
.WillRepeatedly(Return(PALO_ERROR));
EXPECT_CALL(mock_agent_server_client, release_snapshot(_, _))
.Times(clone_req.src_backends.size())
.WillRepeatedly(DoAll(SetArgPointee<1>(agent_result), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_command_executor, report_tablet_info(_))
.Times(0);
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._clone_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// Tablet not exist, obtain root path success, make snapshot success
// List remote dir success, get remote file length failed
EXPECT_CALL(mock_command_executor, get_table(
agent_task_request.clone_req.tablet_id,
agent_task_request.clone_req.schema_hash))
.Times(1)
.WillOnce(Return(olap_table_null));
EXPECT_CALL(mock_command_executor, obtain_shard_path(_, _))
.Times(1)
.WillOnce(
DoAll(SetArgPointee<1>("./test_data"),
Return(OLAPStatus::OLAP_SUCCESS)));
snapshot_request.__set_tablet_id(agent_task_request.clone_req.tablet_id);
snapshot_request.__set_schema_hash(agent_task_request.clone_req.schema_hash);
EXPECT_CALL(mock_agent_server_client, make_snapshot(snapshot_request, _))
.Times(clone_req.src_backends.size())
.WillRepeatedly(DoAll(SetArgPointee<1>(agent_result2), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_file_downloader, list_file_dir(_))
.Times(clone_req.src_backends.size())
.WillRepeatedly(
DoAll(SetArgPointee<0>("1.hdr\n1.idx\n1.dat"), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_file_downloader, get_length(_))
.Times(clone_req.src_backends.size() * DOWNLOAD_FILE_MAX_RETRY)
.WillRepeatedly(Return(PALO_ERROR));
EXPECT_CALL(mock_agent_server_client, release_snapshot(_, _))
.Times(clone_req.src_backends.size())
.WillRepeatedly(DoAll(SetArgPointee<1>(agent_result), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_command_executor, report_tablet_info(_))
.Times(0);
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._clone_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// Tablet not exist, obtain root path success, make snapshot success
// List remote dir success, get remote file length success
// Download file failed
EXPECT_CALL(mock_command_executor, get_table(
agent_task_request.clone_req.tablet_id,
agent_task_request.clone_req.schema_hash))
.Times(1)
.WillOnce(Return(olap_table_null));
EXPECT_CALL(mock_command_executor, obtain_shard_path(_, _))
.Times(1)
.WillOnce(
DoAll(SetArgPointee<1>("./test_data"),
Return(OLAPStatus::OLAP_SUCCESS)));
snapshot_request.__set_tablet_id(agent_task_request.clone_req.tablet_id);
snapshot_request.__set_schema_hash(agent_task_request.clone_req.schema_hash);
EXPECT_CALL(mock_agent_server_client, make_snapshot(snapshot_request, _))
.Times(clone_req.src_backends.size())
.WillRepeatedly(DoAll(SetArgPointee<1>(agent_result2), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_file_downloader, list_file_dir(_))
.Times(clone_req.src_backends.size())
.WillRepeatedly(
DoAll(SetArgPointee<0>("1.hdr\n1.idx\n1.dat"), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_file_downloader, get_length(_))
.Times(clone_req.src_backends.size())
.WillRepeatedly(Return(PALO_SUCCESS));
EXPECT_CALL(mock_file_downloader, download_file())
.Times(clone_req.src_backends.size() * DOWNLOAD_FILE_MAX_RETRY)
.WillRepeatedly(Return(PALO_ERROR));
EXPECT_CALL(mock_agent_server_client, release_snapshot(_, _))
.Times(clone_req.src_backends.size())
.WillRepeatedly(DoAll(SetArgPointee<1>(agent_result), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_command_executor, report_tablet_info(_))
.Times(0);
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._clone_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// Tablet not exist, obtain root path success, make snapshot success
// List remote dir success, get remote file length success
// Download file success, but file size is wrong
EXPECT_CALL(mock_command_executor, get_table(
agent_task_request.clone_req.tablet_id,
agent_task_request.clone_req.schema_hash))
.Times(1)
.WillOnce(Return(olap_table_null));
EXPECT_CALL(mock_command_executor, obtain_shard_path(_, _))
.Times(1)
.WillOnce(
DoAll(SetArgPointee<1>("./test_data"),
Return(OLAPStatus::OLAP_SUCCESS)));
snapshot_request.__set_tablet_id(agent_task_request.clone_req.tablet_id);
snapshot_request.__set_schema_hash(agent_task_request.clone_req.schema_hash);
EXPECT_CALL(mock_agent_server_client, make_snapshot(snapshot_request, _))
.Times(clone_req.src_backends.size())
.WillRepeatedly(DoAll(SetArgPointee<1>(agent_result2), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_file_downloader, list_file_dir(_))
.Times(clone_req.src_backends.size())
.WillRepeatedly(
DoAll(SetArgPointee<0>("1.hdr\n1.idx\n1.dat"), Return(PALO_SUCCESS)));
uint64_t file_size = 5;
EXPECT_CALL(mock_file_downloader, get_length(_))
.Times(clone_req.src_backends.size())
.WillRepeatedly(DoAll(SetArgPointee<0>(file_size), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_file_downloader, download_file())
.Times(clone_req.src_backends.size() * DOWNLOAD_FILE_MAX_RETRY)
.WillRepeatedly(Return(PALO_SUCCESS));
EXPECT_CALL(mock_agent_server_client, release_snapshot(_, _))
.Times(clone_req.src_backends.size())
.WillRepeatedly(DoAll(SetArgPointee<1>(agent_result), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_command_executor, report_tablet_info(_))
.Times(0);
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._clone_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// Tablet not exist, obtain root path success, make snapshot success
// List remote dir success, get remote file length success
// Download file success, load header failed
EXPECT_CALL(mock_command_executor, get_table(
agent_task_request.clone_req.tablet_id,
agent_task_request.clone_req.schema_hash))
.Times(1)
.WillOnce(Return(olap_table_null));
EXPECT_CALL(mock_command_executor, obtain_shard_path(_, _))
.Times(1)
.WillOnce(
DoAll(SetArgPointee<1>("./test_data"),
Return(OLAPStatus::OLAP_SUCCESS)));
snapshot_request.__set_tablet_id(agent_task_request.clone_req.tablet_id);
snapshot_request.__set_schema_hash(agent_task_request.clone_req.schema_hash);
EXPECT_CALL(mock_agent_server_client, make_snapshot(snapshot_request, _))
.Times(1)
.WillRepeatedly(DoAll(SetArgPointee<1>(agent_result2), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_file_downloader, list_file_dir(_))
.Times(1)
.WillRepeatedly(
DoAll(SetArgPointee<0>("1.hdr\n1.idx\n1.dat"), Return(PALO_SUCCESS)));
file_size = 4;
EXPECT_CALL(mock_file_downloader, get_length(_))
.Times(3)
.WillRepeatedly(DoAll(SetArgPointee<0>(file_size), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_file_downloader, download_file())
.Times(3)
.WillRepeatedly(Return(PALO_SUCCESS));
EXPECT_CALL(mock_command_executor, load_header(_, _, _))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
EXPECT_CALL(mock_agent_server_client, release_snapshot(_, _))
.Times(1)
.WillRepeatedly(DoAll(SetArgPointee<1>(agent_result), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_command_executor, report_tablet_info(_))
.Times(0);
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._clone_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// Tablet not exist, obtain root path success, make snapshot success
// List remote dir success, get remote file length success
// Download file success, load header success
// Release snapshot failed, get tablet info failed
EXPECT_CALL(mock_command_executor, get_table(
agent_task_request.clone_req.tablet_id,
agent_task_request.clone_req.schema_hash))
.Times(1)
.WillOnce(Return(olap_table_null));
EXPECT_CALL(mock_command_executor, obtain_shard_path(_, _))
.Times(1)
.WillOnce(
DoAll(SetArgPointee<1>("./test_data"),
Return(OLAPStatus::OLAP_SUCCESS)));
snapshot_request.__set_tablet_id(agent_task_request.clone_req.tablet_id);
snapshot_request.__set_schema_hash(agent_task_request.clone_req.schema_hash);
EXPECT_CALL(mock_agent_server_client, make_snapshot(snapshot_request, _))
.Times(1)
.WillRepeatedly(DoAll(SetArgPointee<1>(agent_result2), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_file_downloader, list_file_dir(_))
.Times(1)
.WillRepeatedly(
DoAll(SetArgPointee<0>("1.hdr\n1.idx\n1.dat"), Return(PALO_SUCCESS)));
file_size = 4;
EXPECT_CALL(mock_file_downloader, get_length(_))
.Times(3)
.WillRepeatedly(DoAll(SetArgPointee<0>(file_size), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_file_downloader, download_file())
.Times(3)
.WillRepeatedly(Return(PALO_SUCCESS));
EXPECT_CALL(mock_command_executor, load_header(_, _, _))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_agent_server_client, release_snapshot(_, _))
.Times(1)
.WillRepeatedly(DoAll(SetArgPointee<1>(agent_result), Return(PALO_ERROR)));
EXPECT_CALL(mock_command_executor, report_tablet_info(_))
.Times(1)
.WillOnce(Return(OLAP_ERR_OTHER_ERROR));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._clone_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// Tablet not exist, obtain root path success, make snapshot success
// List remote dir success, get remote file length success
// Download file success, load header success
// Release snapshot success, get tablet info success
EXPECT_CALL(mock_command_executor, get_table(
agent_task_request.clone_req.tablet_id,
agent_task_request.clone_req.schema_hash))
.Times(1)
.WillOnce(Return(olap_table_null));
EXPECT_CALL(mock_command_executor, obtain_shard_path(_, _))
.Times(1)
.WillOnce(
DoAll(SetArgPointee<1>("./test_data"),
Return(OLAPStatus::OLAP_SUCCESS)));
snapshot_request.__set_tablet_id(agent_task_request.clone_req.tablet_id);
snapshot_request.__set_schema_hash(agent_task_request.clone_req.schema_hash);
EXPECT_CALL(mock_agent_server_client, make_snapshot(snapshot_request, _))
.Times(1)
.WillRepeatedly(DoAll(SetArgPointee<1>(agent_result2), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_file_downloader, list_file_dir(_))
.Times(1)
.WillRepeatedly(
DoAll(SetArgPointee<0>("1.hdr\n1.idx\n1.dat"), Return(PALO_SUCCESS)));
file_size = 4;
EXPECT_CALL(mock_file_downloader, get_length(_))
.Times(3)
.WillRepeatedly(DoAll(SetArgPointee<0>(file_size), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_file_downloader, download_file())
.Times(3)
.WillRepeatedly(Return(PALO_SUCCESS));
EXPECT_CALL(mock_command_executor, load_header(_, _, _))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_agent_server_client, release_snapshot(_, _))
.Times(1)
.WillRepeatedly(DoAll(SetArgPointee<1>(agent_result), Return(PALO_SUCCESS)));
EXPECT_CALL(mock_command_executor, report_tablet_info(_))
.Times(1)
.WillOnce(Return(OLAP_SUCCESS));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._clone_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
task_worker_pool._command_executor = original_command_executor;
task_worker_pool._master_client = original_master_server_client;
task_worker_pool._agent_client = original_agent_server_client;
task_worker_pool._agent_utils = original_agent_utils;
task_worker_pool._file_downloader_ptr = original_file_downloader_ptr;
}
TEST(TaskWorkerPoolTest, TestCancelDeleteData) {
TMasterInfo master_info;
ExecEnv env;
TAgentTaskRequest agent_task_request;
agent_task_request.task_type = TTaskType::CANCEL_DELETE;
agent_task_request.signature = 123456;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::CANCEL_DELETE_DATA,
&env,
master_info);
MockCommandExecutor mock_command_executor;
CommandExecutor* original_command_executor;
original_command_executor = task_worker_pool._command_executor;
task_worker_pool._command_executor = &mock_command_executor;
FrontendServiceClientCache* client_cache = new FrontendServiceClientCache();
MockMasterServerClient mock_master_server_client(master_info, client_cache);
MasterServerClient* original_master_server_client;
original_master_server_client = task_worker_pool._master_client;
task_worker_pool._master_client = &mock_master_server_client;
// Cancel delete failed
EXPECT_CALL(mock_command_executor, cancel_delete(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._cancel_delete_data_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// Cancel delete success
EXPECT_CALL(mock_command_executor, cancel_delete(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
task_worker_pool._cancel_delete_data_worker_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
task_worker_pool._command_executor = original_command_executor;
task_worker_pool._master_client = original_master_server_client;
}
TEST(TaskWorkerPoolTest, TestReportTask) {
TMasterInfo master_info;
ExecEnv env;
TAgentTaskRequest agent_task_request;
agent_task_request.task_type = TTaskType::SCHEMA_CHANGE;
agent_task_request.signature = 123456;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
&env,
master_info);
MockCommandExecutor mock_command_executor;
CommandExecutor* original_command_executor;
original_command_executor = task_worker_pool._command_executor;
task_worker_pool._command_executor = &mock_command_executor;
FrontendServiceClientCache* client_cache = new FrontendServiceClientCache();
MockMasterServerClient mock_master_server_client(master_info, client_cache);
MasterServerClient* original_master_server_client;
original_master_server_client = task_worker_pool._master_client;
task_worker_pool._master_client = &mock_master_server_client;
// Report failed
EXPECT_CALL(mock_master_server_client, report(_, _))
.Times(1)
.WillOnce(Return(PALO_ERROR));
task_worker_pool._report_task_worker_thread_callback(&task_worker_pool);
// Report success
EXPECT_CALL(mock_master_server_client, report(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool._report_task_worker_thread_callback(&task_worker_pool);
task_worker_pool._command_executor = original_command_executor;
task_worker_pool._master_client = original_master_server_client;
}
TEST(TaskWorkerPoolTest, TestReportDiskState) {
TMasterInfo master_info;
ExecEnv env;
TAgentTaskRequest agent_task_request;
agent_task_request.task_type = TTaskType::SCHEMA_CHANGE;
agent_task_request.signature = 123456;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
&env,
master_info);
MockCommandExecutor mock_command_executor;
CommandExecutor* original_command_executor;
original_command_executor = task_worker_pool._command_executor;
task_worker_pool._command_executor = &mock_command_executor;
FrontendServiceClientCache* client_cache = new FrontendServiceClientCache();
MockMasterServerClient mock_master_server_client(master_info, client_cache);
MasterServerClient* original_master_server_client;
original_master_server_client = task_worker_pool._master_client;
task_worker_pool._master_client = &mock_master_server_client;
// Get root path failed, report failed
#if 0
EXPECT_CALL(mock_command_executor, get_all_root_path_info(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
EXPECT_CALL(mock_master_server_client, report(_, _))
.Times(0);
task_worker_pool._report_disk_state_worker_thread_callback(&task_worker_pool);
#endif
// Get root path success, report failed
EXPECT_CALL(mock_command_executor, get_all_root_path_info(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_master_server_client, report(_, _))
.Times(1)
.WillOnce(Return(PALO_ERROR));
task_worker_pool._report_disk_state_worker_thread_callback(&task_worker_pool);
// Get root path success, report success
EXPECT_CALL(mock_command_executor, get_all_root_path_info(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_master_server_client, report(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool._report_disk_state_worker_thread_callback(&task_worker_pool);
task_worker_pool._command_executor = original_command_executor;
task_worker_pool._master_client = original_master_server_client;
}
TEST(TaskWorkerPoolTest, TestReportOlapTable) {
TMasterInfo master_info;
ExecEnv env;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
&env,
master_info);
MockCommandExecutor mock_command_executor;
CommandExecutor* original_command_executor;
original_command_executor = task_worker_pool._command_executor;
task_worker_pool._command_executor = &mock_command_executor;
FrontendServiceClientCache* client_cache = new FrontendServiceClientCache();
MockMasterServerClient mock_master_server_client(master_info, client_cache);
MasterServerClient* original_master_server_client;
original_master_server_client = task_worker_pool._master_client;
task_worker_pool._master_client = &mock_master_server_client;
// Get tablet info failed, report failed
EXPECT_CALL(mock_command_executor, report_all_tablets_info(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
EXPECT_CALL(mock_master_server_client, report(_, _))
.Times(0);
task_worker_pool._report_olap_table_worker_thread_callback(&task_worker_pool);
// Get tablet info success, report failed
EXPECT_CALL(mock_command_executor, report_all_tablets_info(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_master_server_client, report(_, _))
.Times(1)
.WillOnce(Return(PALO_ERROR));
task_worker_pool._report_olap_table_worker_thread_callback(&task_worker_pool);
// Get tablet info success, report success
EXPECT_CALL(mock_command_executor, report_all_tablets_info(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_master_server_client, report(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool._report_olap_table_worker_thread_callback(&task_worker_pool);
task_worker_pool._command_executor = original_command_executor;
task_worker_pool._master_client = original_master_server_client;
}
TEST(TaskWorkerPoolTest, TestMakeSnapshot) {
TMasterInfo master_info;
ExecEnv env;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::MAKE_SNAPSHOT,
&env,
master_info);
MockCommandExecutor mock_command_executor;
CommandExecutor* original_command_executor;
original_command_executor = task_worker_pool._command_executor;
task_worker_pool._command_executor = &mock_command_executor;
FrontendServiceClientCache* client_cache = new FrontendServiceClientCache();
MockMasterServerClient mock_master_server_client(master_info, client_cache);
MasterServerClient* original_master_server_client;
original_master_server_client = task_worker_pool._master_client;
task_worker_pool._master_client = &mock_master_server_client;
TAgentTaskRequest agent_task_request;
agent_task_request.task_type = TTaskType::MAKE_SNAPSHOT;
agent_task_request.signature = 123456;
// make snapshot failed
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
EXPECT_CALL(mock_command_executor, make_snapshot(_, _))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool._make_snapshot_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// make snapshot success
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
EXPECT_CALL(mock_command_executor, make_snapshot(_, _))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool._make_snapshot_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
task_worker_pool._command_executor = original_command_executor;
task_worker_pool._master_client = original_master_server_client;
}
TEST(TaskWorkerPoolTest, TestReleaseSnapshot) {
TMasterInfo master_info;
ExecEnv env;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::RELEASE_SNAPSHOT,
&env,
master_info);
MockCommandExecutor mock_command_executor;
CommandExecutor* original_command_executor;
original_command_executor = task_worker_pool._command_executor;
task_worker_pool._command_executor = &mock_command_executor;
FrontendServiceClientCache* client_cache = new FrontendServiceClientCache();
MockMasterServerClient mock_master_server_client(master_info, client_cache);
MasterServerClient* original_master_server_client;
original_master_server_client = task_worker_pool._master_client;
task_worker_pool._master_client = &mock_master_server_client;
TAgentTaskRequest agent_task_request;
agent_task_request.task_type = TTaskType::RELEASE_SNAPSHOT;
agent_task_request.signature = 123456;
// make snapshot failed
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
EXPECT_CALL(mock_command_executor, release_snapshot(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool._release_snapshot_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(0, task_worker_pool._tasks.size());
// make snapshot success
task_worker_pool.submit_task(agent_task_request);
EXPECT_EQ(1, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
EXPECT_EQ(1, task_worker_pool._tasks.size());
EXPECT_CALL(mock_command_executor, release_snapshot(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
EXPECT_CALL(mock_master_server_client, finish_task(_, _))
.Times(1)
.WillOnce(Return(PALO_SUCCESS));
task_worker_pool._release_snapshot_thread_callback(&task_worker_pool);
EXPECT_EQ(0, task_worker_pool._s_task_signatures[agent_task_request.task_type].size());
task_worker_pool._command_executor = original_command_executor;
task_worker_pool._master_client = original_master_server_client;
}
TEST(TaskWorkerPoolTest, TestShowAlterTableStatus) {
TMasterInfo master_info;
ExecEnv env;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
&env,
master_info);
MockCommandExecutor mock_command_executor;
CommandExecutor* original_command_executor;
original_command_executor = task_worker_pool._command_executor;
task_worker_pool._command_executor = &mock_command_executor;
// Get tablet info failed
TTabletInfo tablet_info;
EXPECT_CALL(mock_command_executor, report_tablet_info(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
AgentStatus status = task_worker_pool._get_tablet_info(1, 2, 123456, &tablet_info);
EXPECT_EQ(PALO_ERROR, status);
// Get tablet info success
EXPECT_CALL(mock_command_executor, report_tablet_info(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
status = task_worker_pool._get_tablet_info(1, 2, 123456, &tablet_info);
EXPECT_EQ(PALO_SUCCESS, status);
task_worker_pool._command_executor = original_command_executor;
}
TEST(TaskWorkerPoolTest, TestDropTable) {
TMasterInfo master_info;
ExecEnv env;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
&env,
master_info);
MockCommandExecutor mock_command_executor;
CommandExecutor* original_command_executor;
original_command_executor = task_worker_pool._command_executor;
task_worker_pool._command_executor = &mock_command_executor;
TTabletId tablet_id = 123;
TSchemaHash schema_hash = 456;
EXPECT_CALL(mock_command_executor, show_alter_table_status(tablet_id, schema_hash))
.Times(1)
.WillOnce(Return(ALTER_TABLE_RUNNING));
AlterTableStatus status = task_worker_pool._show_alter_table_status(tablet_id, schema_hash);
EXPECT_EQ(ALTER_TABLE_RUNNING, status);
task_worker_pool._command_executor = original_command_executor;
}
TEST(TaskWorkerPoolTest, TestGetTabletInfo) {
TMasterInfo master_info;
ExecEnv env;
TaskWorkerPool task_worker_pool(
TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
&env,
master_info);
MockCommandExecutor mock_command_executor;
CommandExecutor* original_command_executor;
original_command_executor = task_worker_pool._command_executor;
task_worker_pool._command_executor = &mock_command_executor;
// Report tablet info failed
TTabletInfo tablet_info;
EXPECT_CALL(mock_command_executor, report_tablet_info(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR));
AgentStatus status = task_worker_pool._get_tablet_info(1, 2, 123456, &tablet_info);
EXPECT_EQ(PALO_ERROR, status);
// Report tablet info success
EXPECT_CALL(mock_command_executor, report_tablet_info(_))
.Times(1)
.WillOnce(Return(OLAPStatus::OLAP_SUCCESS));
status = task_worker_pool._get_tablet_info(1, 2, 123456, &tablet_info);
EXPECT_EQ(PALO_SUCCESS, status);
task_worker_pool._command_executor = original_command_executor;
}
}
int main(int argc, char** argv) {
std::string conffile = std::string(getenv("PALO_HOME")) + "/conf/be.conf";
if (!palo::config::init(conffile.c_str(), false)) {
fprintf(stderr, "error read config file. \n");
return -1;
}
palo::init_glog("be-test");
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}