Remove OLAP_LOG_INFO log format. Use LOG(INFO) instead (#372)
This commit is contained in:
@ -54,10 +54,10 @@ void HeartbeatServer::heartbeat(
|
||||
|
||||
//print heartbeat in every minute
|
||||
LOG_EVERY_N(INFO, 12) << "get heartbeat from FE."
|
||||
<< "host:" << master_info.network_address.hostname << ", "
|
||||
<< "port:" << master_info.network_address.port << ", "
|
||||
<< "cluster id:" << master_info.cluster_id << ", "
|
||||
<< "counter:" << google::COUNTER;
|
||||
<< "host:" << master_info.network_address.hostname
|
||||
<< ", port:" << master_info.network_address.port
|
||||
<< ", cluster id:" << master_info.cluster_id
|
||||
<< ", counter:" << google::COUNTER;
|
||||
|
||||
// do heartbeat
|
||||
Status st = _heartbeat(master_info);
|
||||
@ -88,7 +88,7 @@ Status HeartbeatServer::_heartbeat(
|
||||
|
||||
// Check cluster id
|
||||
if (_master_info->cluster_id == -1) {
|
||||
OLAP_LOG_INFO("get first heartbeat. update cluster id");
|
||||
LOG(INFO) << "get first heartbeat. update cluster id";
|
||||
// write and update cluster id
|
||||
auto st = _olap_engine->set_cluster_id(master_info.cluster_id);
|
||||
if (!st.ok()) {
|
||||
|
||||
@ -130,7 +130,7 @@ AgentStatus Pusher::_get_tmp_file_dir(const string& root_path, string* download_
|
||||
}
|
||||
|
||||
AgentStatus Pusher::_download_file() {
|
||||
OLAP_LOG_INFO("begin download file. tablet=%d", _push_req.tablet_id);
|
||||
LOG(INFO) << "begin download file. tablet_id=" << _push_req.tablet_id;
|
||||
time_t start = time(NULL);
|
||||
AgentStatus status = DORIS_SUCCESS;
|
||||
|
||||
@ -146,15 +146,15 @@ AgentStatus Pusher::_download_file() {
|
||||
rate = (double) _push_req.http_file_size / cost / 1024;
|
||||
}
|
||||
if (status == DORIS_SUCCESS) {
|
||||
OLAP_LOG_INFO("down load file success. local_file=%s, remote_file=%s, "
|
||||
"tablet=%d, cost=%ld, file size: %ld B, download rate: %f KB/s",
|
||||
_downloader_param.local_file_path.c_str(),
|
||||
_downloader_param.remote_file_path.c_str(),
|
||||
_push_req.tablet_id, cost, _push_req.http_file_size, rate);
|
||||
LOG(INFO) << "down load file success. local_file=" << _downloader_param.local_file_path
|
||||
<< ", remote_file=" << _downloader_param.remote_file_path
|
||||
<< ", tablet_id" << _push_req.tablet_id
|
||||
<< ", cost=" << cost << ", file_size" << _push_req.http_file_size
|
||||
<< ", download rage:" << rate << "KB/s";
|
||||
} else {
|
||||
LOG(WARNING) << "down load file failed. remote_file=" << _downloader_param.remote_file_path
|
||||
<< " tablet=" << _push_req.tablet_id
|
||||
<< " cost=" << cost << " file size: " << _push_req.http_file_size << " B";
|
||||
<< ", tablet=" << _push_req.tablet_id
|
||||
<< ", cost=" << cost << " file size: " << _push_req.http_file_size << " B";
|
||||
}
|
||||
|
||||
// todo check data length and mv name tmp
|
||||
@ -260,7 +260,7 @@ AgentStatus Pusher::process(vector<TTabletInfo>* tablet_infos) {
|
||||
time_t push_begin = time(NULL);
|
||||
OLAPStatus push_status = _engine->push(_push_req, tablet_infos);
|
||||
time_t push_finish = time(NULL);
|
||||
OLAP_LOG_INFO("Push finish, cost time: %ld", push_finish - push_begin);
|
||||
LOG(INFO) << "Push finish, cost time: " << (push_finish - push_begin);
|
||||
if (push_status == OLAPStatus::OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) {
|
||||
status = DORIS_PUSH_HAD_LOADED;
|
||||
} else if (push_status != OLAPStatus::OLAP_SUCCESS) {
|
||||
|
||||
@ -240,15 +240,15 @@ bool TaskWorkerPool::_record_task_info(
|
||||
std::string task_name;
|
||||
EnumToString(TTaskType, task_type, task_name);
|
||||
if (signature_set.count(signature) > 0) {
|
||||
LOG(INFO) << "type: " << task_name << ", "
|
||||
<< "signature: " << signature << ", has been inserted."
|
||||
<< "queue size: " << signature_set.size();
|
||||
LOG(INFO) << "type: " << task_name
|
||||
<< ", signature: " << signature << ", has been inserted."
|
||||
<< ", queue size: " << signature_set.size();
|
||||
ret = false;
|
||||
} else {
|
||||
signature_set.insert(signature);
|
||||
LOG(INFO) << "type: " << task_name << ", "
|
||||
<< "signature: " << signature << ", has been inserted."
|
||||
<< "queue size: " << signature_set.size();
|
||||
LOG(INFO) << "type: " << task_name
|
||||
<< ", signature: " << signature << ", has been inserted."
|
||||
<< ", queue size: " << signature_set.size();
|
||||
if (task_type == TTaskType::PUSH) {
|
||||
_s_total_task_user_count[task_type][user] += 1;
|
||||
_s_total_task_count[task_type] += 1;
|
||||
@ -278,9 +278,9 @@ void TaskWorkerPool::_remove_task_info(
|
||||
|
||||
std::string task_name;
|
||||
EnumToString(TTaskType, task_type, task_name);
|
||||
LOG(INFO) << "type: " << task_name << ", "
|
||||
<< "signature: " << signature << ", has been erased."
|
||||
<< "queue size: " << signature_set.size();
|
||||
LOG(INFO) << "type: " << task_name
|
||||
<< ", signature: " << signature << ", has been erased."
|
||||
<< ", queue size: " << signature_set.size();
|
||||
}
|
||||
|
||||
void TaskWorkerPool::_spawn_callback_worker_thread(CALLBACK_FUNCTION callback_func) {
|
||||
@ -320,7 +320,7 @@ void TaskWorkerPool::_finish_task(const TFinishTaskRequest& finish_task_request)
|
||||
AgentStatus client_status = _master_client->finish_task(finish_task_request, &result);
|
||||
|
||||
if (client_status == DORIS_SUCCESS) {
|
||||
OLAP_LOG_INFO("finish task success.result: %d", result.status.status_code);
|
||||
LOG(INFO) << "finish task success. result:" << result.status.status_code;
|
||||
break;
|
||||
} else {
|
||||
DorisMetrics::finish_task_requests_failed.increment(1);
|
||||
@ -371,15 +371,13 @@ uint32_t TaskWorkerPool::_get_next_task_index(
|
||||
thread_count;
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("get next task. signature: %ld, user: %s, "
|
||||
"total_task_user_count: %ud, total_task_count: %ud, "
|
||||
"running_task_user_count: %ud, thread_count: %d, "
|
||||
"user_total_rate: %f, user_running_rate: %f",
|
||||
task.signature, user.c_str(),
|
||||
_s_total_task_user_count[task.task_type][user],
|
||||
_s_total_task_count[task.task_type],
|
||||
_s_running_task_user_count[task.task_type][user] + 1,
|
||||
thread_count, user_total_rate, user_running_rate);
|
||||
LOG(INFO) << "get next task. signature:" << task.signature
|
||||
<< ", user:" << user
|
||||
<< ", total_task_user_count:" << _s_total_task_user_count[task.task_type][user]
|
||||
<< ", total_task_count:" << _s_total_task_count[task.task_type]
|
||||
<< ", running_task_user_count:" << _s_running_task_user_count[task.task_type][user] + 1
|
||||
<< ", thread_count:" << thread_count << ", user_total_rate" << user_total_rate
|
||||
<< ", user_running_rate:" << user_running_rate;
|
||||
if (_s_running_task_user_count[task.task_type][user] == 0
|
||||
|| user_running_rate <= user_total_rate) {
|
||||
index = i;
|
||||
@ -529,7 +527,7 @@ void* TaskWorkerPool::_alter_table_worker_thread_callback(void* arg_this) {
|
||||
// Try to register to cgroups_mgr
|
||||
CgroupsMgr::apply_system_cgroup();
|
||||
int64_t signatrue = agent_task_req.signature;
|
||||
OLAP_LOG_INFO("get alter table task, signature: %ld", agent_task_req.signature);
|
||||
LOG(INFO) << "get alter table task, signature: " << agent_task_req.signature;
|
||||
|
||||
TFinishTaskRequest finish_task_request;
|
||||
TTaskType::type task_type = agent_task_req.task_type;
|
||||
@ -574,8 +572,8 @@ void TaskWorkerPool::_alter_table(
|
||||
default:
|
||||
std::string task_name;
|
||||
EnumToString(TTaskType, task_type, task_name);
|
||||
LOG(WARNING) << "schema change type invalid. type: " << task_name << ", "
|
||||
<< "signature: " << signature;
|
||||
LOG(WARNING) << "schema change type invalid. type: " << task_name
|
||||
<< ", signature: " << signature;
|
||||
status = DORIS_TASK_REQUEST_ERROR;
|
||||
break;
|
||||
}
|
||||
@ -591,8 +589,8 @@ void TaskWorkerPool::_alter_table(
|
||||
AlterTableStatus alter_table_status = _show_alter_table_status(
|
||||
base_tablet_id,
|
||||
base_schema_hash);
|
||||
OLAP_LOG_INFO("get alter table status: %d first, signature: %ld",
|
||||
alter_table_status, signature);
|
||||
LOG(INFO) << "get alter table status:" << alter_table_status
|
||||
<< ", signature:" << signature;
|
||||
|
||||
// Delete failed alter table tablet file
|
||||
if (alter_table_status == ALTER_TABLE_FAILED) {
|
||||
@ -865,7 +863,7 @@ void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) {
|
||||
publish_version_req = agent_task_req.publish_version_req;
|
||||
worker_pool_this->_tasks.pop_front();
|
||||
}
|
||||
OLAP_LOG_INFO("get publish version task, signature: %ld", agent_task_req.signature);
|
||||
LOG(INFO)<< "get publish version task, signature:" << agent_task_req.signature;
|
||||
|
||||
TStatusCode::type status_code = TStatusCode::OK;
|
||||
vector<string> error_msgs;
|
||||
@ -896,7 +894,7 @@ void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) {
|
||||
error_msgs.push_back("publish version failed");
|
||||
finish_task_request.__set_error_tablet_ids(error_tablet_ids);
|
||||
} else {
|
||||
OLAP_LOG_INFO("publish_version success. signature: %ld", agent_task_req.signature);
|
||||
LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature;
|
||||
}
|
||||
|
||||
task_status.__set_status_code(status_code);
|
||||
@ -933,7 +931,7 @@ void* TaskWorkerPool::_clear_alter_task_worker_thread_callback(void* arg_this) {
|
||||
clear_alter_task_req = agent_task_req.clear_alter_task_req;
|
||||
worker_pool_this->_tasks.pop_front();
|
||||
}
|
||||
OLAP_LOG_INFO("get clear alter task task, signature: %ld", agent_task_req.signature);
|
||||
LOG(INFO) << "get clear alter task task, signature:" << agent_task_req.signature;
|
||||
|
||||
TStatusCode::type status_code = TStatusCode::OK;
|
||||
vector<string> error_msgs;
|
||||
@ -947,7 +945,7 @@ void* TaskWorkerPool::_clear_alter_task_worker_thread_callback(void* arg_this) {
|
||||
error_msgs.push_back("clear alter task failed");
|
||||
status_code = TStatusCode::RUNTIME_ERROR;
|
||||
} else {
|
||||
OLAP_LOG_INFO("clear alter task success. signature: %ld", agent_task_req.signature);
|
||||
LOG(INFO) << "clear alter task success. signature:" << agent_task_req.signature;
|
||||
}
|
||||
|
||||
task_status.__set_status_code(status_code);
|
||||
@ -985,8 +983,8 @@ void* TaskWorkerPool::_clear_transaction_task_worker_thread_callback(void* arg_t
|
||||
clear_transaction_task_req = agent_task_req.clear_transaction_task_req;
|
||||
worker_pool_this->_tasks.pop_front();
|
||||
}
|
||||
OLAP_LOG_INFO("get clear transaction task task, signature: %ld, transaction_id: %ld",
|
||||
agent_task_req.signature, clear_transaction_task_req.transaction_id);
|
||||
LOG(INFO) << "get clear transaction task task, signature:" << agent_task_req.signature
|
||||
<< ", transaction_id:" << clear_transaction_task_req.transaction_id;
|
||||
|
||||
TStatusCode::type status_code = TStatusCode::OK;
|
||||
vector<string> error_msgs;
|
||||
@ -994,8 +992,8 @@ void* TaskWorkerPool::_clear_transaction_task_worker_thread_callback(void* arg_t
|
||||
|
||||
worker_pool_this->_env->olap_engine()->clear_transaction_task(
|
||||
clear_transaction_task_req.transaction_id, clear_transaction_task_req.partition_id);
|
||||
OLAP_LOG_INFO("finish to clear transaction task. signature: %ld, transaction_id: %ld",
|
||||
agent_task_req.signature, clear_transaction_task_req.transaction_id);
|
||||
LOG(INFO) << "finish to clear transaction task. signature:" << agent_task_req.signature
|
||||
<< ", transaction_id:" << clear_transaction_task_req.transaction_id;
|
||||
|
||||
task_status.__set_status_code(status_code);
|
||||
task_status.__set_error_msgs(error_msgs);
|
||||
@ -1038,7 +1036,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) {
|
||||
DorisMetrics::clone_requests_total.increment(1);
|
||||
// Try to register to cgroups_mgr
|
||||
CgroupsMgr::apply_system_cgroup();
|
||||
OLAP_LOG_INFO("get clone task. signature: %ld", agent_task_req.signature);
|
||||
LOG(INFO) << "get clone task. signature:" << agent_task_req.signature;
|
||||
|
||||
vector<string> error_msgs;
|
||||
string src_file_path;
|
||||
@ -1048,10 +1046,11 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) {
|
||||
worker_pool_this->_env->olap_engine()->get_table(
|
||||
clone_req.tablet_id, clone_req.schema_hash);
|
||||
if (tablet.get() != NULL) {
|
||||
OLAP_LOG_INFO("clone tablet exist yet, begin to incremental clone. "
|
||||
"signature: %ld, tablet_id: %ld, schema_hash: %ld, "
|
||||
"committed_version: %d", agent_task_req.signature,
|
||||
clone_req.tablet_id, clone_req.schema_hash, clone_req.committed_version);
|
||||
LOG(INFO) << "clone tablet exist yet, begin to incremental clone. "
|
||||
<< "signature:" << agent_task_req.signature
|
||||
<< ", tablet_id:" << clone_req.tablet_id
|
||||
<< ", schema_hash:" << clone_req.schema_hash
|
||||
<< ", committed_version:" << clone_req.committed_version;
|
||||
|
||||
// try to incremental clone
|
||||
vector<Version> missing_versions;
|
||||
@ -1200,15 +1199,13 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) {
|
||||
// we need to check if this cloned table's version is what we expect.
|
||||
// if not, maybe this is a stale remaining table which is waiting for drop.
|
||||
// we drop it.
|
||||
OLAP_LOG_INFO("begin to drop the stale table. "
|
||||
"tablet id: %ld, schema hash: %ld, signature: %ld "
|
||||
"version: %ld, version_hash %ld "
|
||||
"expected version: %ld, version_hash: %ld",
|
||||
clone_req.tablet_id, clone_req.schema_hash,
|
||||
agent_task_req.signature,
|
||||
tablet_info.version, tablet_info.version_hash,
|
||||
clone_req.committed_version, clone_req.committed_version_hash);
|
||||
|
||||
LOG(INFO) << "begin to drop the stale table. tablet_id:" << clone_req.tablet_id
|
||||
<< ", schema_hash:" << clone_req.schema_hash
|
||||
<< ", signature:" << agent_task_req.signature
|
||||
<< ", version:" << tablet_info.version
|
||||
<< ", version_hash:" << tablet_info.version_hash
|
||||
<< ", expected_version: " << clone_req.committed_version
|
||||
<< ", version_hash:" << clone_req.committed_version_hash;
|
||||
TDropTabletReq drop_req;
|
||||
drop_req.tablet_id = clone_req.tablet_id;
|
||||
drop_req.schema_hash = clone_req.schema_hash;
|
||||
@ -1221,12 +1218,11 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) {
|
||||
|
||||
status = DORIS_ERROR;
|
||||
} else {
|
||||
OLAP_LOG_INFO("clone get tablet info success. "
|
||||
"tablet id: %ld, schema hash: %ld, signature: %ld "
|
||||
"version: %ld, version_hash %ld",
|
||||
clone_req.tablet_id, clone_req.schema_hash,
|
||||
agent_task_req.signature,
|
||||
tablet_info.version, tablet_info.version_hash);
|
||||
LOG(INFO) << "clone get tablet info success. tablet_id:" << clone_req.tablet_id
|
||||
<< ", schema_hash:" << clone_req.schema_hash
|
||||
<< ", signature:" << agent_task_req.signature
|
||||
<< ", version:" << tablet_info.version
|
||||
<< ", version_hash:" << tablet_info.version_hash;
|
||||
tablet_infos.push_back(tablet_info);
|
||||
}
|
||||
}
|
||||
@ -1246,8 +1242,8 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) {
|
||||
agent_task_req.signature);
|
||||
error_msgs.push_back("clone failed.");
|
||||
} else {
|
||||
OLAP_LOG_INFO("clone success, set tablet infos. signature: %ld",
|
||||
agent_task_req.signature);
|
||||
LOG(INFO) << "clone success, set tablet infos."
|
||||
<< "signature:" << agent_task_req.signature;
|
||||
finish_task_request.__set_finish_tablet_infos(tablet_infos);
|
||||
}
|
||||
task_status.__set_status_code(status_code);
|
||||
@ -1634,8 +1630,8 @@ void* TaskWorkerPool::_storage_medium_migrate_worker_thread_callback(void* arg_t
|
||||
res, agent_task_req.signature);
|
||||
status_code = TStatusCode::RUNTIME_ERROR;
|
||||
} else {
|
||||
OLAP_LOG_INFO("storage media migrate success. status: %d, signature: %ld",
|
||||
res, agent_task_req.signature);
|
||||
LOG(INFO) << "storage media migrate success. status:" << res << ","
|
||||
<< ", signature:" << agent_task_req.signature;
|
||||
}
|
||||
|
||||
task_status.__set_status_code(status_code);
|
||||
@ -1674,8 +1670,7 @@ void* TaskWorkerPool::_cancel_delete_data_worker_thread_callback(void* arg_this)
|
||||
worker_pool_this->_tasks.pop_front();
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("get cancel delete data task. signature: %ld",
|
||||
agent_task_req.signature);
|
||||
LOG(INFO) << "get cancel delete data task. signature:" << agent_task_req.signature;
|
||||
TStatusCode::type status_code = TStatusCode::OK;
|
||||
vector<string> error_msgs;
|
||||
TStatus task_status;
|
||||
@ -1688,8 +1683,8 @@ void* TaskWorkerPool::_cancel_delete_data_worker_thread_callback(void* arg_this)
|
||||
cancel_delete_data_status, agent_task_req.signature);
|
||||
status_code = TStatusCode::RUNTIME_ERROR;
|
||||
} else {
|
||||
OLAP_LOG_INFO("cancel delete data success. statusta: %d, signature: %ld",
|
||||
cancel_delete_data_status, agent_task_req.signature);
|
||||
LOG(INFO) << "cancel delete data success. status:" << cancel_delete_data_status
|
||||
<< ", signature:" << agent_task_req.signature;
|
||||
}
|
||||
|
||||
task_status.__set_status_code(status_code);
|
||||
@ -1748,8 +1743,9 @@ void* TaskWorkerPool::_check_consistency_worker_thread_callback(void* arg_this)
|
||||
res, agent_task_req.signature);
|
||||
status_code = TStatusCode::RUNTIME_ERROR;
|
||||
} else {
|
||||
OLAP_LOG_INFO("check consistency success. status: %d, signature: %ld. checksum: %ud",
|
||||
res, agent_task_req.signature, checksum);
|
||||
LOG(INFO) << "check consistency success. status:" << res
|
||||
<< ", signature:" << agent_task_req.signature
|
||||
<< ", checksum:" << checksum;
|
||||
}
|
||||
|
||||
task_status.__set_status_code(status_code);
|
||||
@ -1793,8 +1789,8 @@ void* TaskWorkerPool::_report_task_worker_thread_callback(void* arg_this) {
|
||||
|
||||
if (status != DORIS_SUCCESS) {
|
||||
DorisMetrics::report_task_requests_failed.increment(1);
|
||||
LOG(WARNING) << "finish report task failed. status:" << status << ", "
|
||||
<< "master host:" << worker_pool_this->_master_info.network_address.hostname << ", "
|
||||
LOG(WARNING) << "finish report task failed. status:" << status
|
||||
<< ", master host:" << worker_pool_this->_master_info.network_address.hostname
|
||||
<< "port:" << worker_pool_this->_master_info.network_address.port;
|
||||
}
|
||||
|
||||
@ -1818,7 +1814,7 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this)
|
||||
if (worker_pool_this->_master_info.network_address.port == 0) {
|
||||
// port == 0 means not received heartbeat yet
|
||||
// sleep a short time and try again
|
||||
OLAP_LOG_INFO("waiting to receive first heartbeat from frontend");
|
||||
LOG(INFO) << "waiting to receive first heartbeat from frontend";
|
||||
sleep(config::sleep_one_second);
|
||||
continue;
|
||||
}
|
||||
@ -1845,9 +1841,9 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this)
|
||||
|
||||
if (status != DORIS_SUCCESS) {
|
||||
DorisMetrics::report_disk_requests_failed.increment(1);
|
||||
LOG(WARNING) << "finish report disk state failed. status:" << status << ", "
|
||||
<< "master host:" << worker_pool_this->_master_info.network_address.hostname << ", "
|
||||
<< "port:" << worker_pool_this->_master_info.network_address.port;
|
||||
LOG(WARNING) << "finish report disk state failed. status:" << status
|
||||
<< ", master host:" << worker_pool_this->_master_info.network_address.hostname
|
||||
<< ", port:" << worker_pool_this->_master_info.network_address.port;
|
||||
}
|
||||
|
||||
#ifndef BE_TEST
|
||||
@ -1874,7 +1870,7 @@ void* TaskWorkerPool::_report_olap_table_worker_thread_callback(void* arg_this)
|
||||
if (worker_pool_this->_master_info.network_address.port == 0) {
|
||||
// port == 0 means not received heartbeat yet
|
||||
// sleep a short time and try again
|
||||
OLAP_LOG_INFO("waiting to receive first heartbeat from frontend");
|
||||
LOG(INFO) << "waiting to receive first heartbeat from frontend";
|
||||
sleep(config::sleep_one_second);
|
||||
continue;
|
||||
}
|
||||
@ -1902,9 +1898,9 @@ void* TaskWorkerPool::_report_olap_table_worker_thread_callback(void* arg_this)
|
||||
|
||||
if (status != DORIS_SUCCESS) {
|
||||
DorisMetrics::report_all_tablets_requests_failed.increment(1);
|
||||
LOG(WARNING) << "finish report olap table state failed. status:" << status << ", "
|
||||
<< "master host:" << worker_pool_this->_master_info.network_address.hostname << ", "
|
||||
<< "port:" << worker_pool_this->_master_info.network_address.port;
|
||||
LOG(WARNING) << "finish report olap table state failed. status:" << status
|
||||
<< ", master host:" << worker_pool_this->_master_info.network_address.hostname
|
||||
<< ", port:" << worker_pool_this->_master_info.network_address.port;
|
||||
}
|
||||
|
||||
#ifndef BE_TEST
|
||||
@ -1936,8 +1932,8 @@ void* TaskWorkerPool::_upload_worker_thread_callback(void* arg_this) {
|
||||
worker_pool_this->_tasks.pop_front();
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("get upload task, signature: %ld, job id: %d",
|
||||
agent_task_req.signature, upload_request.job_id);
|
||||
LOG(INFO) << "get upload task, signature:" << agent_task_req.signature
|
||||
<< ", job id:" << upload_request.job_id;
|
||||
|
||||
std::map<int64_t, std::vector<std::string>> tablet_files;
|
||||
SnapshotLoader* loader = worker_pool_this->_env->snapshot_loader();
|
||||
@ -1972,8 +1968,8 @@ void* TaskWorkerPool::_upload_worker_thread_callback(void* arg_this) {
|
||||
worker_pool_this->_finish_task(finish_task_request);
|
||||
worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature, "");
|
||||
|
||||
OLAP_LOG_INFO("finished upload task, signature: %ld, job id: %ld",
|
||||
agent_task_req.signature, upload_request.job_id);
|
||||
LOG(INFO) << "finished upload task, signature: " << agent_task_req.signature
|
||||
<< ", job id:" << upload_request.job_id;
|
||||
#ifndef BE_TEST
|
||||
}
|
||||
#endif
|
||||
@ -2000,8 +1996,8 @@ void* TaskWorkerPool::_download_worker_thread_callback(void* arg_this) {
|
||||
}
|
||||
// Try to register to cgroups_mgr
|
||||
CgroupsMgr::apply_system_cgroup();
|
||||
OLAP_LOG_INFO("get download task, signature: %ld, job id: %ld",
|
||||
agent_task_req.signature, download_request.job_id);
|
||||
LOG(INFO) << "get download task, signature: " << agent_task_req.signature
|
||||
<< ", job id:" << download_request.job_id;
|
||||
|
||||
TStatusCode::type status_code = TStatusCode::OK;
|
||||
std::vector<string> error_msgs;
|
||||
@ -2038,8 +2034,8 @@ void* TaskWorkerPool::_download_worker_thread_callback(void* arg_this) {
|
||||
worker_pool_this->_finish_task(finish_task_request);
|
||||
worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature, "");
|
||||
|
||||
OLAP_LOG_INFO("finished download task, signature: %ld, job id: %d",
|
||||
agent_task_req.signature, download_request.job_id);
|
||||
LOG(INFO) << "finished download task, signature: " << agent_task_req.signature
|
||||
<< ", job id:" << download_request.job_id;
|
||||
#ifndef BE_TEST
|
||||
}
|
||||
#endif
|
||||
@ -2066,7 +2062,7 @@ void* TaskWorkerPool::_make_snapshot_thread_callback(void* arg_this) {
|
||||
}
|
||||
// Try to register to cgroups_mgr
|
||||
CgroupsMgr::apply_system_cgroup();
|
||||
OLAP_LOG_INFO("get snapshot task, signature: %ld", agent_task_req.signature);
|
||||
LOG(INFO) << "get snapshot task, signature:" << agent_task_req.signature;
|
||||
|
||||
TStatusCode::type status_code = TStatusCode::OK;
|
||||
vector<string> error_msgs;
|
||||
@ -2086,12 +2082,11 @@ void* TaskWorkerPool::_make_snapshot_thread_callback(void* arg_this) {
|
||||
error_msgs.push_back("make_snapshot failed. status: " +
|
||||
boost::lexical_cast<string>(make_snapshot_status));
|
||||
} else {
|
||||
OLAP_LOG_INFO("make_snapshot success. tablet_id: %ld, schema_hash: %ld, version: %d,"
|
||||
"version_hash: %ld, snapshot_path: %s",
|
||||
snapshot_request.tablet_id, snapshot_request.schema_hash,
|
||||
snapshot_request.version, snapshot_request.version_hash,
|
||||
snapshot_path.c_str());
|
||||
|
||||
LOG(INFO) << "make_snapshot success. tablet_id:" << snapshot_request.tablet_id
|
||||
<< ", schema_hash:" << snapshot_request.schema_hash
|
||||
<< ", version:" << snapshot_request.version
|
||||
<< ", version_hash:" << snapshot_request.version_hash
|
||||
<< ", snapshot_path:" << snapshot_path;
|
||||
if (snapshot_request.__isset.list_files) {
|
||||
// list and save all snapshot files
|
||||
// snapshot_path like: data/snapshot/20180417205230.1
|
||||
@ -2152,7 +2147,7 @@ void* TaskWorkerPool::_release_snapshot_thread_callback(void* arg_this) {
|
||||
}
|
||||
// Try to register to cgroups_mgr
|
||||
CgroupsMgr::apply_system_cgroup();
|
||||
OLAP_LOG_INFO("get release snapshot task, signature: %ld", agent_task_req.signature);
|
||||
LOG(INFO) << "get release snapshot task, signature:" << agent_task_req.signature;
|
||||
|
||||
TStatusCode::type status_code = TStatusCode::OK;
|
||||
vector<string> error_msgs;
|
||||
@ -2244,8 +2239,8 @@ void* TaskWorkerPool::_move_dir_thread_callback(void* arg_this) {
|
||||
}
|
||||
// Try to register to cgroups_mgr
|
||||
CgroupsMgr::apply_system_cgroup();
|
||||
OLAP_LOG_INFO("get move dir task, signature: %ld, job id: %ld",
|
||||
agent_task_req.signature, move_dir_req.job_id);
|
||||
LOG(INFO) << "get move dir task, signature:" << agent_task_req.signature
|
||||
<< ", job id:" << move_dir_req.job_id;
|
||||
|
||||
TStatusCode::type status_code = TStatusCode::OK;
|
||||
vector<string> error_msgs;
|
||||
@ -2266,9 +2261,10 @@ void* TaskWorkerPool::_move_dir_thread_callback(void* arg_this) {
|
||||
move_dir_req.src.c_str(), move_dir_req.tablet_id, agent_task_req.signature,
|
||||
move_dir_req.job_id);
|
||||
} else {
|
||||
OLAP_LOG_INFO("finished to move dir: %s, tablet id: %ld, signature: %ld, job id: %ld",
|
||||
move_dir_req.src.c_str(), move_dir_req.tablet_id, agent_task_req.signature,
|
||||
move_dir_req.job_id);
|
||||
LOG(INFO) << "finished to move dir:" << move_dir_req.src
|
||||
<< ", tablet_id:" << move_dir_req.tablet_id
|
||||
<< ", signature:" << agent_task_req.signature
|
||||
<< ", job id:" << move_dir_req.job_id;
|
||||
}
|
||||
|
||||
task_status.__set_status_code(status_code);
|
||||
@ -2300,8 +2296,8 @@ AgentStatus TaskWorkerPool::_move_dir(
|
||||
OLAPTablePtr tablet = _env->olap_engine()->get_table(
|
||||
tablet_id, schema_hash);
|
||||
if (tablet.get() == NULL) {
|
||||
OLAP_LOG_INFO("failed to get tablet: %ld, schema hash: %d",
|
||||
tablet_id, schema_hash);
|
||||
LOG(INFO) << "failed to get tablet. tablet_id:" << tablet_id
|
||||
<< ", schema hash:" << schema_hash;
|
||||
error_msgs->push_back("failed to get tablet");
|
||||
return DORIS_TASK_REQUEST_ERROR;
|
||||
}
|
||||
@ -2346,20 +2342,20 @@ void* TaskWorkerPool::_recover_tablet_thread_callback(void* arg_this) {
|
||||
TStatus task_status;
|
||||
|
||||
LOG(INFO) << "begin to recover tablet."
|
||||
<< "table:" << recover_tablet_req.tablet_id << "." << recover_tablet_req.schema_hash << ", "
|
||||
<< "version:" << recover_tablet_req.version << "-" << recover_tablet_req.version_hash;
|
||||
<< ", tablet_id:" << recover_tablet_req.tablet_id << "." << recover_tablet_req.schema_hash
|
||||
<< ", version:" << recover_tablet_req.version << "-" << recover_tablet_req.version_hash;
|
||||
OLAPStatus status = worker_pool_this->_env->olap_engine()->recover_tablet_until_specfic_version(recover_tablet_req);
|
||||
if (status != OLAP_SUCCESS) {
|
||||
status_code = TStatusCode::RUNTIME_ERROR;
|
||||
LOG(WARNING) << "failed to recover tablet."
|
||||
<< "signature:" << agent_task_req.signature << ", "
|
||||
<< "table:" << recover_tablet_req.tablet_id << "." << recover_tablet_req.schema_hash << ", "
|
||||
<< "version:" << recover_tablet_req.version << "-" << recover_tablet_req.version_hash;
|
||||
<< "signature:" << agent_task_req.signature
|
||||
<< ", table:" << recover_tablet_req.tablet_id << "." << recover_tablet_req.schema_hash
|
||||
<< ", version:" << recover_tablet_req.version << "-" << recover_tablet_req.version_hash;
|
||||
} else {
|
||||
LOG(WARNING) << "succeed to recover tablet."
|
||||
<< "signature:" << agent_task_req.signature << ", "
|
||||
<< "table:" << recover_tablet_req.tablet_id << "." << recover_tablet_req.schema_hash << ", "
|
||||
<< "version:" << recover_tablet_req.version << "-" << recover_tablet_req.version_hash;
|
||||
<< "signature:" << agent_task_req.signature
|
||||
<< ", table:" << recover_tablet_req.tablet_id << "." << recover_tablet_req.schema_hash
|
||||
<< ", version:" << recover_tablet_req.version << "-" << recover_tablet_req.version_hash;
|
||||
}
|
||||
|
||||
task_status.__set_status_code(status_code);
|
||||
|
||||
@ -83,10 +83,9 @@ OLAPStatus BaseCompaction::init(OLAPTablePtr table, bool is_manual_trigger) {
|
||||
}
|
||||
|
||||
OLAPStatus BaseCompaction::run() {
|
||||
OLAP_LOG_INFO("start base compaction. [table=%s; old_base_version=%d; new_base_version=%d]",
|
||||
_table->full_name().c_str(),
|
||||
_old_base_version.second,
|
||||
_new_base_version.second);
|
||||
LOG(INFO) << "start base compaction. tablet=" << _table->full_name()
|
||||
<< ", old_base_version=" << _old_base_version.second
|
||||
<< ", new_base_version=" << _new_base_version.second;
|
||||
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
OlapStopWatch stage_watch;
|
||||
@ -151,8 +150,8 @@ OLAPStatus BaseCompaction::run() {
|
||||
vector<SegmentGroup*> unused_olap_indices;
|
||||
res = _update_header(row_count, &unused_olap_indices);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "fail to update header. table=" << _table->full_name() << ", "
|
||||
<< "version=" << _new_base_version.first << "-" << _new_base_version.second;
|
||||
LOG(WARNING) << "fail to update header. table=" << _table->full_name()
|
||||
<< ", version=" << _new_base_version.first << "-" << _new_base_version.second;
|
||||
_garbage_collection();
|
||||
return res;
|
||||
}
|
||||
@ -230,18 +229,18 @@ bool BaseCompaction::_check_whether_satisfy_policy(bool is_manual_trigger,
|
||||
// 只有1个base文件和1个delta文件
|
||||
if (base_compaction_layer_point == -1) {
|
||||
VLOG(3) << "can't do base compaction: no cumulative files."
|
||||
<< "table=" << _table->full_name() << ", "
|
||||
<< "base_version=0-" << _old_base_version.second << ", "
|
||||
<< "cumulative_layer_point=" << cumulative_layer_point + 1;
|
||||
<< "table=" << _table->full_name()
|
||||
<< ", base_version=0-" << _old_base_version.second
|
||||
<< ", cumulative_layer_point=" << cumulative_layer_point + 1;
|
||||
return false;
|
||||
}
|
||||
|
||||
// 只有1个cumulative文件
|
||||
if (base_compaction_layer_point == _old_base_version.second) {
|
||||
VLOG(3) << "can't do base compaction: only one cumulative file."
|
||||
<< "table=" << _table->full_name() << ", "
|
||||
<< "base_version=0-" << _old_base_version.second << ", "
|
||||
<< "cumulative_layer_point=" << cumulative_layer_point + 1;
|
||||
<< "table=" << _table->full_name()
|
||||
<< ", base_version=0-" << _old_base_version.second
|
||||
<< ", cumulative_layer_point=" << cumulative_layer_point + 1;
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -249,8 +248,8 @@ bool BaseCompaction::_check_whether_satisfy_policy(bool is_manual_trigger,
|
||||
if (OLAP_SUCCESS != _table->select_versions_to_span(_new_base_version,
|
||||
candidate_versions)) {
|
||||
LOG(WARNING) << "fail to select shortest version path."
|
||||
<< "start=" << _new_base_version.first << ", "
|
||||
<< "end=" << _new_base_version.second;
|
||||
<< "start=" << _new_base_version.first
|
||||
<< ", end=" << _new_base_version.second;
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -283,9 +282,9 @@ bool BaseCompaction::_check_whether_satisfy_policy(bool is_manual_trigger,
|
||||
= config::base_compaction_num_cumulative_deltas;
|
||||
// candidate_versions中包含base文件,所以这里减1
|
||||
if (candidate_versions->size() - 1 >= base_compaction_num_cumulative_deltas) {
|
||||
LOG(INFO) << "satisfy the base compaction policy. table="<< _table->full_name() << ", "
|
||||
<< "num_cumulative_deltas=" << candidate_versions->size() - 1 << ", "
|
||||
<< "base_compaction_num_cumulative_deltas=" << base_compaction_num_cumulative_deltas;
|
||||
LOG(INFO) << "satisfy the base compaction policy. table="<< _table->full_name()
|
||||
<< ", num_cumulative_deltas=" << candidate_versions->size() - 1
|
||||
<< ", base_compaction_num_cumulative_deltas=" << base_compaction_num_cumulative_deltas;
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -293,10 +292,10 @@ bool BaseCompaction::_check_whether_satisfy_policy(bool is_manual_trigger,
|
||||
const double base_cumulative_delta_ratio = config::base_cumulative_delta_ratio;
|
||||
double cumulative_base_ratio = static_cast<double>(cumulative_total_size) / base_size;
|
||||
if (cumulative_base_ratio > base_cumulative_delta_ratio) {
|
||||
LOG(INFO) << "satisfy the base compaction policy. table=" << _table->full_name() << ", "
|
||||
<< "cumualtive_total_size=" << cumulative_total_size << ", "
|
||||
<< "base_size=" << base_size << ", "
|
||||
<< "cumulative_base_ratio=" << cumulative_base_ratio << ", "
|
||||
LOG(INFO) << "satisfy the base compaction policy. table=" << _table->full_name()
|
||||
<< "cumualtive_total_size=" << cumulative_total_size
|
||||
<< "base_size=" << base_size
|
||||
<< "cumulative_base_ratio=" << cumulative_base_ratio
|
||||
<< "policy_ratio=" << base_cumulative_delta_ratio;
|
||||
return true;
|
||||
}
|
||||
@ -305,16 +304,16 @@ bool BaseCompaction::_check_whether_satisfy_policy(bool is_manual_trigger,
|
||||
const uint32_t interval_since_last_operation = config::base_compaction_interval_seconds_since_last_operation;
|
||||
int64_t interval_since_last_be = time(NULL) - base_creation_time;
|
||||
if (interval_since_last_be > interval_since_last_operation) {
|
||||
LOG(INFO) << "satisfy the base compaction policy. table=" << _table->full_name() << ", "
|
||||
<< "interval_since_last_be=" << interval_since_last_be << ", "
|
||||
<< "policy_interval=" << interval_since_last_operation;
|
||||
LOG(INFO) << "satisfy the base compaction policy. table=" << _table->full_name()
|
||||
<< ", interval_since_last_be=" << interval_since_last_be
|
||||
<< ", policy_interval=" << interval_since_last_operation;
|
||||
return true;
|
||||
}
|
||||
|
||||
VLOG(3) << "don't satisfy the base compaction policy. table=" << _table->full_name() << ", "
|
||||
<< "cumulative_files_number=" << candidate_versions->size() - 1 << ", "
|
||||
<< "cumulative_base_ratio=" << cumulative_base_ratio << ", "
|
||||
<< "interval_since_last_be=" << interval_since_last_be;
|
||||
VLOG(3) << "don't satisfy the base compaction policy. table=" << _table->full_name()
|
||||
<< ", cumulative_files_number=" << candidate_versions->size() - 1
|
||||
<< ", cumulative_base_ratio=" << cumulative_base_ratio
|
||||
<< ", interval_since_last_be=" << interval_since_last_be;
|
||||
|
||||
return false;
|
||||
}
|
||||
@ -332,10 +331,8 @@ OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash
|
||||
return OLAP_ERR_MALLOC_ERROR;
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("start merge new base. [table='%s' version=%d]",
|
||||
_table->full_name().c_str(),
|
||||
_new_base_version.second);
|
||||
|
||||
LOG(INFO) << "start merge new base. tablet=" << _table->full_name()
|
||||
<< ", version=" << _new_base_version.second;
|
||||
// 2. 执行base compaction的merge
|
||||
// 注意:无论是行列存,还是列存,在执行merge时都使用Merger类,不能使用MassiveMerger。
|
||||
// 原因:MassiveMerger中的base文件不是通过Reader读取的,所以会导致删除条件失效,
|
||||
@ -403,22 +400,22 @@ OLAPStatus BaseCompaction::_do_base_compaction(VersionHash new_base_version_hash
|
||||
if (row_nums_check) {
|
||||
if (source_rows != new_base->num_rows() + merged_rows + filted_rows) {
|
||||
LOG(WARNING) << "fail to check row num!"
|
||||
<< "source_rows=" << source_rows << ", "
|
||||
<< "merged_rows=" << merged_rows << ", "
|
||||
<< "filted_rows=" << filted_rows << ", "
|
||||
<< "new_index_rows=" << new_base->num_rows();
|
||||
<< "source_rows=" << source_rows
|
||||
<< ", merged_rows=" << merged_rows
|
||||
<< ", filted_rows=" << filted_rows
|
||||
<< ", new_index_rows=" << new_base->num_rows();
|
||||
return OLAP_ERR_CHECK_LINES_ERROR;
|
||||
}
|
||||
} else {
|
||||
LOG(INFO) << "all row nums."
|
||||
<< "source_rows=" << source_rows << ", "
|
||||
<< "merged_rows=" << merged_rows << ", "
|
||||
<< "filted_rows=" << filted_rows << ", "
|
||||
<< "new_index_rows=" << new_base->num_rows();
|
||||
<< "source_rows=" << source_rows
|
||||
<< ", merged_rows=" << merged_rows
|
||||
<< ", filted_rows=" << filted_rows
|
||||
<< ", new_index_rows=" << new_base->num_rows();
|
||||
}
|
||||
|
||||
LOG(INFO) << "succeed to do base compaction. table=" << _table->full_name() << ", "
|
||||
<< "base_version=" << _new_base_version.first << "-" << _new_base_version.second;
|
||||
LOG(INFO) << "succeed to do base compaction. table=" << _table->full_name()
|
||||
<< ", base_version=" << _new_base_version.first << "-" << _new_base_version.second;
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
@ -441,7 +438,7 @@ OLAPStatus BaseCompaction::_update_header(uint64_t row_count, vector<SegmentGrou
|
||||
return res;
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("BE remove delete conditions. [removed_version=%d]", _new_base_version.second);
|
||||
LOG(INFO) << "BE remove delete conditions. removed_version=" << _new_base_version.second;
|
||||
|
||||
// Base Compaction完成之后,需要删除header中版本号小于等于新base文件版本号的删除条件
|
||||
DeleteConditionHandler cond_handler;
|
||||
|
||||
@ -71,7 +71,7 @@ OLAPStatus CumulativeCompaction::init(OLAPTablePtr table) {
|
||||
_release_header_lock();
|
||||
if (res != OLAP_SUCCESS) {
|
||||
_table->release_cumulative_lock();
|
||||
OLAP_LOG_INFO("no suitable delta versions. don't do cumulative compaction now.");
|
||||
LOG(INFO) << "no suitable delta versions. don't do cumulative compaction now.";
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -96,10 +96,9 @@ OLAPStatus CumulativeCompaction::run() {
|
||||
}
|
||||
|
||||
// 0. 准备工作
|
||||
OLAP_LOG_INFO("start cumulative compaction [table=%s; cumulative_version=%d-%d]",
|
||||
_table->full_name().c_str(),
|
||||
_cumulative_version.first,
|
||||
_cumulative_version.second);
|
||||
LOG(INFO) << "start cumulative compaction. tablet=" << _table->full_name()
|
||||
<< ", cumulative_version=" << _cumulative_version.first << "-"
|
||||
<< _cumulative_version.second;
|
||||
OlapStopWatch watch;
|
||||
|
||||
// 1. 计算新的cumulative文件的version hash
|
||||
@ -183,7 +182,7 @@ OLAPStatus CumulativeCompaction::_calculate_need_merged_versions() {
|
||||
Versions delta_versions;
|
||||
res = _get_delta_versions(&delta_versions);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
OLAP_LOG_INFO("failed to get delta versions.");
|
||||
LOG(INFO) << "failed to get delta versions.";
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -301,7 +300,7 @@ OLAPStatus CumulativeCompaction::_get_delta_versions(Versions* delta_versions) {
|
||||
}
|
||||
|
||||
if (delta_versions->size() == 0) {
|
||||
OLAP_LOG_INFO("no delta versions.[cumulative_point=%d]", _old_cumulative_layer_point);
|
||||
LOG(INFO) << "no delta versions. cumulative_point=" << _old_cumulative_layer_point;
|
||||
return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
|
||||
}
|
||||
|
||||
@ -408,9 +407,10 @@ OLAPStatus CumulativeCompaction::_do_cumulative_compaction() {
|
||||
return OLAP_ERR_CHECK_LINES_ERROR;
|
||||
}
|
||||
} else {
|
||||
OLAP_LOG_INFO("all row nums. "
|
||||
"[source_rows=%lu merged_rows=%lu filted_rows=%lu new_index_rows=%lu]",
|
||||
source_rows, merged_rows, filted_rows, _new_segment_group->num_rows());
|
||||
LOG(INFO) << "all row nums. source_rows=" << source_rows
|
||||
<< ", merged_rows=" << merged_rows
|
||||
<< ", filted_rows=" << filted_rows
|
||||
<< ", new_index_rows=" << _new_segment_group->num_rows();
|
||||
}
|
||||
|
||||
// 3. add new cumulative file into table
|
||||
@ -452,10 +452,9 @@ OLAPStatus CumulativeCompaction::_do_cumulative_compaction() {
|
||||
// 6. delete delta files which have been merged into new cumulative file
|
||||
_delete_unused_delta_files(&unused_indices);
|
||||
|
||||
OLAP_LOG_INFO("succeed to do cumulative compaction. [table=%s; cumulative_version=%d-%d]",
|
||||
_table->full_name().c_str(),
|
||||
_cumulative_version.first,
|
||||
_cumulative_version.second);
|
||||
LOG(INFO) << "succeed to do cumulative compaction. tablet=" << _table->full_name()
|
||||
<< ", cumulative_version=" << _cumulative_version.first << "-"
|
||||
<< _cumulative_version.second;
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
@ -84,8 +84,8 @@ OLAPStatus DeleteConditionHandler::store_cond(
|
||||
for (const TCondition& condition : conditions) {
|
||||
string condition_str = construct_sub_conditions(condition);
|
||||
del_cond->add_sub_conditions(condition_str);
|
||||
OLAP_LOG_INFO("store one sub-delete condition. [condition='%s']",
|
||||
condition_str.c_str());
|
||||
LOG(INFO) << "store one sub-delete condition."
|
||||
<< "condition=" << condition_str;
|
||||
}
|
||||
|
||||
return OLAP_SUCCESS;
|
||||
@ -141,8 +141,8 @@ OLAPStatus DeleteConditionHandler::delete_cond(OLAPTablePtr table,
|
||||
del_cond_str += sub_conditions.Get(i) + ";";
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("delete one condition. [version=%d condition=%s]",
|
||||
temp.version(), del_cond_str.c_str());
|
||||
LOG(INFO) << "delete one condition. version=" << temp.version()
|
||||
<< ", condition=" << del_cond_str;
|
||||
|
||||
// 移除过滤条件
|
||||
// 因为pb没有提供直接删除数组特定元素的方法,所以用下面的删除方式;这种方式会改变存在
|
||||
@ -158,8 +158,7 @@ OLAPStatus DeleteConditionHandler::delete_cond(OLAPTablePtr table,
|
||||
}
|
||||
|
||||
OLAPStatus DeleteConditionHandler::log_conds(OLAPTablePtr table) {
|
||||
OLAP_LOG_INFO("display all delete condition. [full_name=%s]",
|
||||
table->full_name().c_str());
|
||||
LOG(INFO) << "display all delete condition. tablet=" << table->full_name();
|
||||
table->obtain_header_rdlock();
|
||||
const del_cond_array& delete_conditions = table->delete_data_conditions();
|
||||
|
||||
@ -173,8 +172,8 @@ OLAPStatus DeleteConditionHandler::log_conds(OLAPTablePtr table) {
|
||||
del_cond_str += sub_conditions.Get(i) + ";";
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("condition item: [version=%d condition=%s]",
|
||||
temp.version(), del_cond_str.c_str());
|
||||
LOG(INFO) << "condition item: version=" << temp.version()
|
||||
<< ", condition=" << del_cond_str;
|
||||
}
|
||||
|
||||
table->release_header_lock();
|
||||
|
||||
@ -362,7 +362,7 @@ void ShardedLRUCache::prune() {
|
||||
for (int s = 0; s < kNumShards; s++) {
|
||||
num_prune += _shards[s].prune();
|
||||
}
|
||||
OLAP_LOG_INFO("prune file descriptor: %d", num_prune);
|
||||
LOG(INFO) << "prune file descriptor:" << num_prune;
|
||||
}
|
||||
|
||||
size_t ShardedLRUCache::get_memory_usage() {
|
||||
|
||||
@ -932,9 +932,9 @@ OLAPStatus OLAPEngine::add_transaction(
|
||||
for (PUniqueId& pid : load_info->second) {
|
||||
if (pid.hi() == load_id.hi() && pid.lo() == load_id.lo()) {
|
||||
LOG(WARNING) << "find transaction exists when add to engine."
|
||||
<< "partition_id: " << key.first << ", "
|
||||
<< "transaction_id: " << key.second << ", "
|
||||
<< "table: " << tablet_info.to_string();
|
||||
<< "partition_id: " << key.first
|
||||
<< ", transaction_id: " << key.second
|
||||
<< ", table: " << tablet_info.to_string();
|
||||
return OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST;
|
||||
}
|
||||
}
|
||||
@ -943,9 +943,9 @@ OLAPStatus OLAPEngine::add_transaction(
|
||||
|
||||
_transaction_tablet_map[key][tablet_info].push_back(load_id);
|
||||
VLOG(3) << "add transaction to engine successfully."
|
||||
<< "partition_id: " << key.first << ", "
|
||||
<< "transaction_id: " << key.second << ", "
|
||||
<< "table: " << tablet_info.to_string();
|
||||
<< "partition_id: " << key.first
|
||||
<< ", transaction_id: " << key.second
|
||||
<< ", table: " << tablet_info.to_string();
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
@ -960,9 +960,9 @@ void OLAPEngine::delete_transaction(
|
||||
auto it = _transaction_tablet_map.find(key);
|
||||
if (it != _transaction_tablet_map.end()) {
|
||||
VLOG(3) << "delete transaction to engine successfully."
|
||||
<< "partition_id: " << key.first << ", "
|
||||
<< "transaction_id: " << key.second << ", "
|
||||
<< "table: " << tablet_info.to_string();
|
||||
<< ",partition_id: " << key.first
|
||||
<< ", transaction_id: " << key.second
|
||||
<< ", table: " << tablet_info.to_string();
|
||||
it->second.erase(tablet_info);
|
||||
if (it->second.empty()) {
|
||||
_transaction_tablet_map.erase(it);
|
||||
@ -992,9 +992,9 @@ void OLAPEngine::get_transactions_by_tablet(OLAPTablePtr tablet, int64_t* partit
|
||||
*partition_id = it.first.first;
|
||||
transaction_ids->insert(it.first.second);
|
||||
VLOG(3) << "find transaction on tablet."
|
||||
<< "partition_id: " << it.first.first << ", "
|
||||
<< "transaction_id: " << it.first.second << ", "
|
||||
<< "table: " << tablet_info.to_string();
|
||||
<< "partition_id: " << it.first.first
|
||||
<< ", transaction_id: " << it.first.second
|
||||
<< ", table: " << tablet_info.to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1075,12 +1075,12 @@ OLAPStatus OLAPEngine::publish_version(const TPublishVersionRequest& publish_ver
|
||||
|
||||
// if publish successfully, delete transaction from engine
|
||||
} else if (publish_status == OLAP_SUCCESS) {
|
||||
LOG(INFO) << "publish version successfully on tablet. [table=" << tablet->full_name()
|
||||
<< " transaction_id=" << transaction_id << " version=" << version.first << "]";
|
||||
LOG(INFO) << "publish version successfully on tablet. tablet=" << tablet->full_name()
|
||||
<< ", transaction_id=" << transaction_id << ", version=" << version.first;
|
||||
_transaction_tablet_map_lock.wrlock();
|
||||
auto it2 = _transaction_tablet_map.find(key);
|
||||
if (it2 != _transaction_tablet_map.end()) {
|
||||
VLOG(3) << "delete transaction from engine. table=" << tablet->full_name() << ", "
|
||||
VLOG(3) << "delete transaction from engine. table=" << tablet->full_name()
|
||||
<< "transaction_id: " << transaction_id;
|
||||
it2->second.erase(tablet_info);
|
||||
if (it2->second.empty()) {
|
||||
@ -1100,15 +1100,15 @@ OLAPStatus OLAPEngine::publish_version(const TPublishVersionRequest& publish_ver
|
||||
}
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("finish to publish version on transaction. "
|
||||
"[transaction_id=%ld, error_tablet_size=%d]",
|
||||
transaction_id, error_tablet_ids->size());
|
||||
LOG(INFO) << "finish to publish version on transaction."
|
||||
<< "transaction_id=" << transaction_id
|
||||
<< ", error_tablet_size=" << error_tablet_ids->size();
|
||||
return res;
|
||||
}
|
||||
|
||||
void OLAPEngine::clear_transaction_task(const TTransactionId transaction_id,
|
||||
const vector<TPartitionId> partition_ids) {
|
||||
OLAP_LOG_INFO("begin to clear transaction task. [transaction_id=%ld]", transaction_id);
|
||||
LOG(INFO) << "begin to clear transaction task. transaction_id=" << transaction_id;
|
||||
|
||||
// each partition
|
||||
for (const TPartitionId& partition_id : partition_ids) {
|
||||
@ -1135,13 +1135,13 @@ void OLAPEngine::clear_transaction_task(const TTransactionId transaction_id,
|
||||
}
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("finish to clear transaction task. [transaction_id=%ld]", transaction_id);
|
||||
LOG(INFO) << "finish to clear transaction task. transaction_id=" << transaction_id;
|
||||
}
|
||||
|
||||
OLAPStatus OLAPEngine::clone_incremental_data(OLAPTablePtr tablet, OLAPHeader& clone_header,
|
||||
int64_t committed_version) {
|
||||
OLAP_LOG_INFO("begin to incremental clone. [table=%s committed_version=%ld]",
|
||||
tablet->full_name().c_str(), committed_version);
|
||||
LOG(INFO) << "begin to incremental clone. tablet=" << tablet->full_name()
|
||||
<< ", committed_version=" << committed_version;
|
||||
|
||||
// calculate missing version again
|
||||
vector<Version> missing_versions;
|
||||
@ -1193,8 +1193,8 @@ OLAPStatus OLAPEngine::clone_incremental_data(OLAPTablePtr tablet, OLAPHeader& c
|
||||
const PDelta* clone_src_version = clone_header.get_incremental_version(version);
|
||||
if (clone_src_version == NULL) {
|
||||
LOG(WARNING) << "missing version not found in clone src."
|
||||
<< "clone_header_file=" << clone_header.file_name() << ", "
|
||||
<< "missing_version=" << version.first << "-" << version.second;
|
||||
<< "clone_header_file=" << clone_header.file_name()
|
||||
<< ", missing_version=" << version.first << "-" << version.second;
|
||||
return OLAP_ERR_VERSION_NOT_EXIST;
|
||||
}
|
||||
|
||||
@ -1219,15 +1219,15 @@ OLAPStatus OLAPEngine::clone_full_data(OLAPTablePtr tablet, OLAPHeader& clone_he
|
||||
tablet->get_delta(i)->end_version());
|
||||
VersionHash local_version_hash = tablet->get_delta(i)->version_hash();
|
||||
LOG(INFO) << "check local delta when full clone."
|
||||
<< "table=" << tablet->full_name() << ", "
|
||||
<< "local_version=" << local_version.first << "-" << local_version.second;
|
||||
<< "table=" << tablet->full_name()
|
||||
<< ", local_version=" << local_version.first << "-" << local_version.second;
|
||||
|
||||
// if local version cross src latest, clone failed
|
||||
if (local_version.first <= clone_latest_version.second
|
||||
&& local_version.second > clone_latest_version.second) {
|
||||
LOG(WARNING) << "stop to full clone, version cross src latest."
|
||||
<< "table=" << tablet->full_name() << ", "
|
||||
<< "local_version=" << local_version.first << "-" << local_version.second;
|
||||
<< "table=" << tablet->full_name()
|
||||
<< ", local_version=" << local_version.first << "-" << local_version.second;
|
||||
return OLAP_ERR_TABLE_VERSION_DUPLICATE_ERROR;
|
||||
|
||||
} else if (local_version.second <= clone_latest_version.second) {
|
||||
@ -1242,14 +1242,14 @@ OLAPStatus OLAPEngine::clone_full_data(OLAPTablePtr tablet, OLAPHeader& clone_he
|
||||
&& clone_header.get_delta(j)->version_hash() == local_version_hash) {
|
||||
existed_in_src = true;
|
||||
LOG(INFO) << "Delta has already existed in local header, no need to clone."
|
||||
<< "table=" << tablet->full_name() << ", "
|
||||
<< "version='" << local_version.first<< "-" << local_version.second << ", "
|
||||
<< "version_hash=" << local_version_hash;
|
||||
<< "table=" << tablet->full_name()
|
||||
<< ", version='" << local_version.first<< "-" << local_version.second
|
||||
<< ", version_hash=" << local_version_hash;
|
||||
|
||||
OLAPStatus delete_res = clone_header.delete_version(local_version);
|
||||
if (delete_res != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "failed to delete existed version from clone src when full clone. "
|
||||
<< "clone_header_file=" << clone_header.file_name() << ", "
|
||||
<< "clone_header_file=" << clone_header.file_name()
|
||||
<< "version=" << local_version.first << "-" << local_version.second;
|
||||
return delete_res;
|
||||
}
|
||||
@ -1264,8 +1264,8 @@ OLAPStatus OLAPEngine::clone_full_data(OLAPTablePtr tablet, OLAPHeader& clone_he
|
||||
versions_to_delete.push_back(local_version);
|
||||
LOG(INFO) << "Delete delta not included by the clone header, should delete it from local header."
|
||||
<< "table=" << tablet->full_name() << ","
|
||||
<< "version=" << local_version.first<< "-" << local_version.second << ", "
|
||||
<< "version_hash=" << local_version_hash;
|
||||
<< ", version=" << local_version.first<< "-" << local_version.second
|
||||
<< ", version_hash=" << local_version_hash;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1274,9 +1274,9 @@ OLAPStatus OLAPEngine::clone_full_data(OLAPTablePtr tablet, OLAPHeader& clone_he
|
||||
clone_deltas.push_back(clone_header.get_delta(i));
|
||||
LOG(INFO) << "Delta to clone."
|
||||
<< "table=" << tablet->full_name() << ","
|
||||
<< "version=" << clone_header.get_delta(i)->start_version() << "-"
|
||||
<< clone_header.get_delta(i)->end_version() << ", "
|
||||
<< "version_hash=" << clone_header.get_delta(i)->version_hash();
|
||||
<< ", version=" << clone_header.get_delta(i)->start_version() << "-"
|
||||
<< clone_header.get_delta(i)->end_version()
|
||||
<< ", version_hash=" << clone_header.get_delta(i)->version_hash();
|
||||
}
|
||||
|
||||
// clone_data to tablet
|
||||
@ -1373,7 +1373,7 @@ OLAPStatus OLAPEngine::drop_table(
|
||||
return res;
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("finish to drop tablet. [res=%d]", res);
|
||||
LOG(INFO) << "finish to drop tablet. res=" << res;
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -1653,9 +1653,9 @@ void OLAPEngine::_build_tablet_info(OLAPTablePtr olap_table, TTabletInfo* tablet
|
||||
|
||||
OLAPStatus OLAPEngine::report_tablet_info(TTabletInfo* tablet_info) {
|
||||
DorisMetrics::report_tablet_requests_total.increment(1);
|
||||
OLAP_LOG_INFO("begin to process report tablet info. "
|
||||
"[table=%ld schema_hash=%d]",
|
||||
tablet_info->tablet_id, tablet_info->schema_hash);
|
||||
LOG(INFO) << "begin to process report tablet info."
|
||||
<< "tablet_id=" << tablet_info->tablet_id
|
||||
<< ", schema_hash=" << tablet_info->schema_hash;
|
||||
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
|
||||
@ -1668,12 +1668,12 @@ OLAPStatus OLAPEngine::report_tablet_info(TTabletInfo* tablet_info) {
|
||||
}
|
||||
|
||||
_build_tablet_info(olap_table, tablet_info);
|
||||
OLAP_LOG_INFO("success to process report tablet info.");
|
||||
LOG(INFO) << "success to process report tablet info.";
|
||||
return res;
|
||||
}
|
||||
|
||||
OLAPStatus OLAPEngine::report_all_tablets_info(std::map<TTabletId, TTablet>* tablets_info) {
|
||||
OLAP_LOG_INFO("begin to process report all tablets info.");
|
||||
LOG(INFO) << "begin to process report all tablets info.";
|
||||
DorisMetrics::report_all_tablets_requests_total.increment(1);
|
||||
|
||||
if (tablets_info == NULL) {
|
||||
@ -1867,7 +1867,7 @@ void OLAPEngine::get_cache_status(rapidjson::Document* document) const {
|
||||
|
||||
OLAPStatus OLAPEngine::start_trash_sweep(double* usage) {
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
OLAP_LOG_INFO("start trash and snapshot sweep.");
|
||||
LOG(INFO) << "start trash and snapshot sweep.";
|
||||
|
||||
const uint32_t snapshot_expire = config::snapshot_expire_time_sec;
|
||||
const uint32_t trash_expire = config::trash_file_expire_time_sec;
|
||||
@ -2149,7 +2149,7 @@ void OLAPEngine::_cancel_unfinished_schema_change() {
|
||||
// Schema Change在引擎退出时schemachange信息还保存在在Header里,
|
||||
// 引擎重启后,需清除schemachange信息,上层会重做
|
||||
uint64_t canceled_num = 0;
|
||||
OLAP_LOG_INFO("begin to cancel unfinished schema change.");
|
||||
LOG(INFO) << "begin to cancel unfinished schema change.";
|
||||
|
||||
SchemaChangeHandler schema_change_handler;
|
||||
TTabletId tablet_id;
|
||||
@ -2189,7 +2189,7 @@ void OLAPEngine::_cancel_unfinished_schema_change() {
|
||||
}
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("finish to cancel unfinished schema change! [canceled_num=%lu]", canceled_num);
|
||||
LOG(INFO) << "finish to cancel unfinished schema change! canceled_num=" << canceled_num;
|
||||
}
|
||||
|
||||
void OLAPEngine::start_delete_unused_index() {
|
||||
@ -2274,8 +2274,8 @@ OLAPStatus OLAPEngine::create_table(const TCreateTabletReq& request) {
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
bool is_table_added = false;
|
||||
|
||||
OLAP_LOG_INFO("begin to process create table. [tablet=%ld, schema_hash=%d]",
|
||||
request.tablet_id, request.tablet_schema.schema_hash);
|
||||
LOG(INFO) << "begin to process create table. tablet=" << request.tablet_id
|
||||
<< ", schema_hash=" << request.tablet_schema.schema_hash;
|
||||
|
||||
DorisMetrics::create_tablet_requests_total.increment(1);
|
||||
|
||||
@ -2286,7 +2286,7 @@ OLAPStatus OLAPEngine::create_table(const TCreateTabletReq& request) {
|
||||
OLAPTablePtr table = get_table(
|
||||
request.tablet_id, request.tablet_schema.schema_hash);
|
||||
if (table.get() != NULL) {
|
||||
OLAP_LOG_INFO("create table success for table already exist.");
|
||||
LOG(INFO) << "create table success for table already exist.";
|
||||
return OLAP_SUCCESS;
|
||||
} else {
|
||||
OLAP_LOG_WARNING("table with different schema hash already exists.");
|
||||
@ -2359,13 +2359,13 @@ OLAPStatus OLAPEngine::create_table(const TCreateTabletReq& request) {
|
||||
}
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("finish to process create table. [res=%d]", res);
|
||||
LOG(INFO) << "finish to process create table. res=" << res;
|
||||
return res;
|
||||
}
|
||||
|
||||
OLAPStatus OLAPEngine::schema_change(const TAlterTabletReq& request) {
|
||||
OLAP_LOG_INFO("begin to schema change. [base_table=%ld new_table=%ld]",
|
||||
request.base_tablet_id, request.new_tablet_req.tablet_id);
|
||||
LOG(INFO) << "begin to schema change. old_tablet_id=" << request.base_tablet_id
|
||||
<< ", new_tablet_id=" << request.new_tablet_req.tablet_id;
|
||||
|
||||
DorisMetrics::schema_change_requests_total.increment(1);
|
||||
|
||||
@ -2382,16 +2382,16 @@ OLAPStatus OLAPEngine::schema_change(const TAlterTabletReq& request) {
|
||||
return res;
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("success to submit schema change. "
|
||||
"[base_table=%ld new_table=%ld]",
|
||||
request.base_tablet_id, request.new_tablet_req.tablet_id);
|
||||
LOG(INFO) << "success to submit schema change."
|
||||
<< "old_tablet_id=" << request.base_tablet_id
|
||||
<< ", new_tablet_id=" << request.new_tablet_req.tablet_id;
|
||||
return res;
|
||||
}
|
||||
|
||||
OLAPStatus OLAPEngine::create_rollup_table(const TAlterTabletReq& request) {
|
||||
OLAP_LOG_INFO("begin to create rollup table. "
|
||||
"[base_table=%ld new_table=%ld]",
|
||||
request.base_tablet_id, request.new_tablet_req.tablet_id);
|
||||
LOG(INFO) << "begin to create rollup table. "
|
||||
<< "old_tablet_id=" << request.base_tablet_id
|
||||
<< ", new_tablet_id=" << request.new_tablet_req.tablet_id;
|
||||
|
||||
DorisMetrics::create_rollup_requests_total.increment(1);
|
||||
|
||||
@ -2408,18 +2408,18 @@ OLAPStatus OLAPEngine::create_rollup_table(const TAlterTabletReq& request) {
|
||||
return res;
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("success to create rollup table. "
|
||||
"[base_table=%ld new_table=%ld] [res=%d]",
|
||||
request.base_tablet_id, request.new_tablet_req.tablet_id, res);
|
||||
LOG(INFO) << "success to create rollup table. res=" << res
|
||||
<< ", old_tablet_id=" << request.base_tablet_id
|
||||
<< ", new_tablet_id=" << request.new_tablet_req.tablet_id;
|
||||
return res;
|
||||
}
|
||||
|
||||
AlterTableStatus OLAPEngine::show_alter_table_status(
|
||||
TTabletId tablet_id,
|
||||
TSchemaHash schema_hash) {
|
||||
OLAP_LOG_INFO("begin to process show alter table status. "
|
||||
"[table=%ld schema_hash=%d]",
|
||||
tablet_id, schema_hash);
|
||||
LOG(INFO) << "begin to process show alter table status."
|
||||
<< "tablet_id" << tablet_id
|
||||
<< ", schema_hash" << schema_hash;
|
||||
|
||||
AlterTableStatus status = ALTER_TABLE_FINISHED;
|
||||
|
||||
@ -2441,9 +2441,10 @@ OLAPStatus OLAPEngine::compute_checksum(
|
||||
TVersion version,
|
||||
TVersionHash version_hash,
|
||||
uint32_t* checksum) {
|
||||
OLAP_LOG_INFO("begin to process compute checksum. "
|
||||
"[tablet_id=%ld schema_hash=%d version=%ld]",
|
||||
tablet_id, schema_hash, version);
|
||||
LOG(INFO) << "begin to process compute checksum."
|
||||
<< "tablet_id=" << tablet_id
|
||||
<< ", schema_hash=" << schema_hash
|
||||
<< ", version=" << version;
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
|
||||
if (checksum == NULL) {
|
||||
@ -2520,14 +2521,15 @@ OLAPStatus OLAPEngine::compute_checksum(
|
||||
row_checksum = row.hash_code(row_checksum);
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("success to finish compute checksum. [checksum=%u]", row_checksum);
|
||||
LOG(INFO) << "success to finish compute checksum. checksum=" << row_checksum;
|
||||
*checksum = row_checksum;
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
OLAPStatus OLAPEngine::cancel_delete(const TCancelDeleteDataReq& request) {
|
||||
OLAP_LOG_INFO("begin to process cancel delete. [table=%ld version=%ld]",
|
||||
request.tablet_id, request.version);
|
||||
LOG(INFO) << "begin to process cancel delete."
|
||||
<< "tablet=" << request.tablet_id
|
||||
<< ", version=" << request.version;
|
||||
|
||||
DorisMetrics::cancel_delete_requests_total.increment(1);
|
||||
|
||||
@ -2568,15 +2570,14 @@ OLAPStatus OLAPEngine::cancel_delete(const TCancelDeleteDataReq& request) {
|
||||
cond_handler.log_conds(table);
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("finish to process cancel delete. [res=%d]", res);
|
||||
LOG(INFO) << "finish to process cancel delete. res=" << res;
|
||||
return res;
|
||||
}
|
||||
|
||||
OLAPStatus OLAPEngine::delete_data(
|
||||
const TPushReq& request,
|
||||
vector<TTabletInfo>* tablet_info_vec) {
|
||||
OLAP_LOG_INFO("begin to process delete data. [request='%s']",
|
||||
ThriftDebugString(request).c_str());
|
||||
LOG(INFO) << "begin to process delete data. request=" << ThriftDebugString(request);
|
||||
DorisMetrics::delete_requests_total.increment(1);
|
||||
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
@ -2610,7 +2611,7 @@ OLAPStatus OLAPEngine::delete_data(
|
||||
return res;
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("finish to process delete data. [res=%d]", res);
|
||||
LOG(INFO) << "finish to process delete data. res=" << res;
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -2638,13 +2639,13 @@ string OLAPEngine::get_info_before_incremental_clone(OLAPTablePtr tablet,
|
||||
// TODO: Used in upgraded. If old Doris version, version can be converted.
|
||||
Version version(least_complete_version->start_version(), least_complete_version->end_version());
|
||||
missing_versions->push_back(version);
|
||||
LOG(INFO) << "least complete version for incremental clone. table=" << tablet->full_name() << ", "
|
||||
<< "least_complete_version=" << least_complete_version->end_version();
|
||||
LOG(INFO) << "least complete version for incremental clone. table=" << tablet->full_name()
|
||||
<< ", least_complete_version=" << least_complete_version->end_version();
|
||||
}
|
||||
|
||||
tablet->release_header_lock();
|
||||
LOG(INFO) << "finish to calculate missing versions when clone. [table=" << tablet->full_name()
|
||||
<< " committed_version=" << committed_version << " missing_versions_size=" << missing_versions->size() << "]";
|
||||
<< ", committed_version=" << committed_version << " missing_versions_size=" << missing_versions->size() << "]";
|
||||
|
||||
// get download path
|
||||
return tablet->tablet_path() + CLONE_PREFIX;
|
||||
@ -2703,7 +2704,7 @@ OLAPStatus OLAPEngine::finish_clone(OLAPTablePtr tablet, const string& clone_dir
|
||||
|
||||
string from = clone_dir + "/" + clone_file;
|
||||
string to = tablet_dir + "/" + clone_file;
|
||||
LOG(INFO) << "src file:" << from << ", " << "dest file:" << to;
|
||||
LOG(INFO) << "src file:" << from << "dest file:" << to;
|
||||
if (link(from.c_str(), to.c_str()) != 0) {
|
||||
OLAP_LOG_WARNING("fail to create hard link when clone. [from=%s to=%s]",
|
||||
from.c_str(), to.c_str());
|
||||
@ -2744,15 +2745,15 @@ OLAPStatus OLAPEngine::finish_clone(OLAPTablePtr tablet, const string& clone_dir
|
||||
// clear clone dir
|
||||
boost::filesystem::path clone_dir_path(clone_dir);
|
||||
boost::filesystem::remove_all(clone_dir_path);
|
||||
OLAP_LOG_INFO("finish to clone data, clear downloaded data. "
|
||||
"[table=%s clone_dir=%s clone_res=%d]",
|
||||
tablet->full_name().c_str(), clone_dir.c_str(), res);
|
||||
LOG(INFO) << "finish to clone data, clear downloaded data. res=" << res
|
||||
<< ", tablet=" << tablet->full_name()
|
||||
<< ", clone_dir=" << clone_dir;
|
||||
return res;
|
||||
}
|
||||
|
||||
OLAPStatus OLAPEngine::obtain_shard_path(
|
||||
TStorageMedium::type storage_medium, std::string* shard_path, OlapStore** store) {
|
||||
OLAP_LOG_INFO("begin to process obtain root path. [storage_medium=%d]", storage_medium);
|
||||
LOG(INFO) << "begin to process obtain root path. storage_medium=" << storage_medium;
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
|
||||
if (shard_path == NULL) {
|
||||
@ -2778,17 +2779,16 @@ OLAPStatus OLAPEngine::obtain_shard_path(
|
||||
*shard_path = root_path_stream.str();
|
||||
*store = stores[0];
|
||||
|
||||
OLAP_LOG_INFO("success to process obtain root path. [path='%s']",
|
||||
shard_path->c_str());
|
||||
LOG(INFO) << "success to process obtain root path. path=" << shard_path;
|
||||
return res;
|
||||
}
|
||||
|
||||
OLAPStatus OLAPEngine::load_header(
|
||||
const string& shard_path,
|
||||
const TCloneReq& request) {
|
||||
OLAP_LOG_INFO("begin to process load headers. "
|
||||
"[tablet_id=%ld schema_hash=%d]",
|
||||
request.tablet_id, request.schema_hash);
|
||||
LOG(INFO) << "begin to process load headers."
|
||||
<< "tablet_id=" << request.tablet_id
|
||||
<< ", schema_hash=" << request.schema_hash;
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
|
||||
OlapStore* store = nullptr;
|
||||
@ -2821,7 +2821,7 @@ OLAPStatus OLAPEngine::load_header(
|
||||
return res;
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("success to process load headers.");
|
||||
LOG(INFO) << "success to process load headers.";
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -2830,8 +2830,8 @@ OLAPStatus OLAPEngine::load_header(
|
||||
const string& shard_path,
|
||||
TTabletId tablet_id,
|
||||
TSchemaHash schema_hash) {
|
||||
OLAP_LOG_INFO("begin to process load headers. [tablet_id=%ld schema_hash=%d]",
|
||||
tablet_id, schema_hash);
|
||||
LOG(INFO) << "begin to process load headers. tablet_id=" << tablet_id
|
||||
<< "schema_hash=" << schema_hash;
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
|
||||
stringstream schema_hash_path_stream;
|
||||
@ -2847,14 +2847,14 @@ OLAPStatus OLAPEngine::load_header(
|
||||
return res;
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("success to process load headers.");
|
||||
LOG(INFO) << "success to process load headers.";
|
||||
return res;
|
||||
}
|
||||
|
||||
OLAPStatus OLAPEngine::clear_alter_task(const TTabletId tablet_id,
|
||||
const TSchemaHash schema_hash) {
|
||||
OLAP_LOG_INFO("begin to process clear alter task. [tablet_id=%ld schema_hash=%d]",
|
||||
tablet_id, schema_hash);
|
||||
LOG(INFO) << "begin to process clear alter task. tablet_id=" << tablet_id
|
||||
<< ", schema_hash=" << schema_hash;
|
||||
OLAPTablePtr tablet = get_table(tablet_id, schema_hash);
|
||||
if (tablet.get() == NULL) {
|
||||
OLAP_LOG_WARNING("can't find tablet when process clear alter task. ",
|
||||
@ -2911,8 +2911,9 @@ OLAPStatus OLAPEngine::clear_alter_task(const TTabletId tablet_id,
|
||||
related_table->release_header_lock();
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("finish to process clear alter task. [tablet_id=%ld schema_hash=%d]",
|
||||
related_tablet_id, related_schema_hash);
|
||||
LOG(INFO) << "finish to process clear alter task."
|
||||
<< "tablet_id=" << related_tablet_id
|
||||
<< ", schema_hash=" << related_schema_hash;
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
@ -2920,8 +2921,8 @@ OLAPStatus OLAPEngine::push(
|
||||
const TPushReq& request,
|
||||
vector<TTabletInfo>* tablet_info_vec) {
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
OLAP_LOG_INFO("begin to process push. [tablet_id=%ld version=%ld]",
|
||||
request.tablet_id, request.version);
|
||||
LOG(INFO) << "begin to process push. tablet_id=" << request.tablet_id
|
||||
<< ", version=" << request.version;
|
||||
|
||||
if (tablet_info_vec == NULL) {
|
||||
OLAP_LOG_WARNING("invalid output parameter which is null pointer.");
|
||||
|
||||
@ -230,7 +230,7 @@ OLAPStatus OLAPHeader::add_version(Version version, VersionHash version_hash,
|
||||
for (const PSegmentGroup& segment_group : delta(i).segment_group()) {
|
||||
if (segment_group.segment_group_id() == segment_group_id) {
|
||||
LOG(WARNING) << "the version is existed."
|
||||
<< "version=" << version.first << ", "
|
||||
<< "version=" << version.first << "-"
|
||||
<< version.second;
|
||||
return OLAP_ERR_HEADER_ADD_VERSION;
|
||||
}
|
||||
@ -309,8 +309,8 @@ OLAPStatus OLAPHeader::add_pending_version(
|
||||
del_cond->set_version(0);
|
||||
for (const string& condition : *delete_conditions) {
|
||||
del_cond->add_sub_conditions(condition);
|
||||
OLAP_LOG_INFO("store one sub-delete condition. [condition='%s' transaction_id=%ld]",
|
||||
condition.c_str(), transaction_id);
|
||||
LOG(INFO) << "store one sub-delete condition. condition=" << condition
|
||||
<< ", transaction_id=" << transaction_id;
|
||||
}
|
||||
}
|
||||
|
||||
@ -336,8 +336,8 @@ OLAPStatus OLAPHeader::add_pending_segment_group(
|
||||
const PPendingSegmentGroup& pending_segment_group = delta.pending_segment_group(j);
|
||||
if (pending_segment_group.pending_segment_group_id() == pending_segment_group_id) {
|
||||
LOG(WARNING) << "pending segment_group already exists in header."
|
||||
<< "transaction_id:" << transaction_id << ", "
|
||||
<< "pending_segment_group_id: " << pending_segment_group_id;
|
||||
<< "transaction_id:" << transaction_id
|
||||
<< ", pending_segment_group_id: " << pending_segment_group_id;
|
||||
return OLAP_ERR_HEADER_ADD_PENDING_DELTA;
|
||||
}
|
||||
}
|
||||
@ -457,7 +457,7 @@ void OLAPHeader::add_delete_condition(const DeleteConditionMessage& delete_condi
|
||||
for (const string& condition : delete_condition.sub_conditions()) {
|
||||
del_cond->add_sub_conditions(condition);
|
||||
}
|
||||
OLAP_LOG_INFO("add delete condition. [version=%d]", version);
|
||||
LOG(INFO) << "add delete condition. version=" << version;
|
||||
}
|
||||
|
||||
const PPendingDelta* OLAPHeader::get_pending_delta(int64_t transaction_id) const {
|
||||
|
||||
@ -142,8 +142,9 @@ void* OLAPEngine::_garbage_sweeper_thread_callback(void* arg) {
|
||||
max_interval, min_interval);
|
||||
min_interval = 1;
|
||||
max_interval = max_interval >= min_interval ? max_interval : min_interval;
|
||||
OLAP_LOG_INFO("force reset garbage sweep interval. [max=%d min=%d].",
|
||||
max_interval, min_interval);
|
||||
LOG(INFO) << "force reset garbage sweep interval."
|
||||
<< "max_interval" << max_interval
|
||||
<< ", min_interval" << min_interval;
|
||||
}
|
||||
|
||||
const double pi = 4 * std::atan(1);
|
||||
@ -224,7 +225,7 @@ void* OLAPEngine::_cumulative_compaction_thread_callback(void* arg) {
|
||||
LOG(INFO) << "try to start cumulative compaction process!";
|
||||
uint32_t interval = config::cumulative_compaction_check_interval_seconds;
|
||||
if (interval <= 0) {
|
||||
LOG(WARNING) << "cumulative compaction check interval config is illegal:" << interval << ", "
|
||||
LOG(WARNING) << "cumulative compaction check interval config is illegal:" << interval
|
||||
<< "will be forced set to one";
|
||||
interval = 1;
|
||||
}
|
||||
|
||||
@ -216,8 +216,8 @@ OLAPStatus OLAPEngine::_link_index_and_data_files(
|
||||
res = _create_hard_link(ref_table_data_path, data_path);
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(WARNING) << "fail to create hard link."
|
||||
<< "tablet_path_prefix=" << tablet_path_prefix << ", "
|
||||
<< "from_path=" << ref_table_data_path << ", to_path=" << data_path;
|
||||
<< "tablet_path_prefix=" << tablet_path_prefix
|
||||
<< ", from_path=" << ref_table_data_path << ", to_path=" << data_path;
|
||||
return res;
|
||||
}
|
||||
}
|
||||
@ -247,8 +247,8 @@ OLAPStatus OLAPEngine::_copy_index_and_data_files(
|
||||
Status res = FileUtils::copy_file(ref_table_index_path, index_path);
|
||||
if (!res.ok()) {
|
||||
LOG(WARNING) << "fail to copy index file."
|
||||
<< "dest=" << index_path << ", "
|
||||
<< "src=" << ref_table_index_path;
|
||||
<< "dest=" << index_path
|
||||
<< ", src=" << ref_table_index_path;
|
||||
return OLAP_ERR_COPY_FILE_ERROR;
|
||||
}
|
||||
|
||||
@ -259,8 +259,8 @@ OLAPStatus OLAPEngine::_copy_index_and_data_files(
|
||||
res = FileUtils::copy_file(ref_table_data_path, data_path);
|
||||
if (!res.ok()) {
|
||||
LOG(WARNING) << "fail to copy data file."
|
||||
<< "dest=" << index_path << ", "
|
||||
<< "src=" << ref_table_index_path;
|
||||
<< "dest=" << index_path
|
||||
<< ", src=" << ref_table_index_path;
|
||||
return OLAP_ERR_COPY_FILE_ERROR;
|
||||
}
|
||||
}
|
||||
@ -438,8 +438,9 @@ OLAPStatus OLAPEngine::_create_incremental_snapshot_files(
|
||||
const OLAPTablePtr& ref_olap_table,
|
||||
const TSnapshotRequest& request,
|
||||
string* snapshot_path) {
|
||||
OLAP_LOG_INFO("begin to create incremental snapshot files. [table=%ld schema_hash=%d]",
|
||||
request.tablet_id, request.schema_hash);
|
||||
LOG(INFO) << "begin to create incremental snapshot files."
|
||||
<< "tablet=" << request.tablet_id
|
||||
<< ", schema_hash=" << request.schema_hash;
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
|
||||
if (snapshot_path == nullptr) {
|
||||
@ -626,9 +627,9 @@ OLAPStatus OLAPEngine::_create_hard_link(const string& from_path, const string&
|
||||
OLAPStatus OLAPEngine::storage_medium_migrate(
|
||||
TTabletId tablet_id, TSchemaHash schema_hash,
|
||||
TStorageMedium::type storage_medium) {
|
||||
OLAP_LOG_INFO("begin to process storage media migrate. "
|
||||
"[tablet_id=%ld schema_hash=%d dest_storage_medium=%d]",
|
||||
tablet_id, schema_hash, storage_medium);
|
||||
LOG(INFO) << "begin to process storage media migrate. "
|
||||
<< "tablet_id=" << tablet_id << ", schema_hash=" << schema_hash
|
||||
<< ", dest_storage_medium=" << storage_medium;
|
||||
DorisMetrics::storage_migrate_requests_total.increment(1);
|
||||
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
@ -642,15 +643,15 @@ OLAPStatus OLAPEngine::storage_medium_migrate(
|
||||
// judge case when no need to migrate
|
||||
uint32_t count = available_storage_medium_type_count();
|
||||
if (count <= 1) {
|
||||
OLAP_LOG_INFO("available storage medium type count is less than 1, "
|
||||
"no need to migrate. [count=%u]", count);
|
||||
LOG(INFO) << "available storage medium type count is less than 1, "
|
||||
<< "no need to migrate. count=" << count;
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
TStorageMedium::type src_storage_medium = tablet->store()->storage_medium();
|
||||
if (src_storage_medium == storage_medium) {
|
||||
OLAP_LOG_INFO("tablet is already on specified storage medium. "
|
||||
"[storage_medium='%d']", storage_medium);
|
||||
LOG(INFO) << "tablet is already on specified storage medium. "
|
||||
<< "storage_medium=" << storage_medium;
|
||||
return OLAP_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
@ -49,8 +49,8 @@ OLAPStatus PushHandler::process(
|
||||
const TPushReq& request,
|
||||
PushType push_type,
|
||||
vector<TTabletInfo>* tablet_info_vec) {
|
||||
OLAP_LOG_INFO("begin to push data. [table='%s' version=%ld]",
|
||||
olap_table->full_name().c_str(), request.version);
|
||||
LOG(INFO) << "begin to push data. tablet=" << olap_table->full_name()
|
||||
<< ", version=" << request.version;
|
||||
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
_request = request;
|
||||
@ -107,12 +107,11 @@ OLAPStatus PushHandler::process(
|
||||
_release_header_lock();
|
||||
|
||||
if (!is_schema_changing) {
|
||||
OLAP_LOG_INFO("schema change info is cleared after base table get related tablet, "
|
||||
"maybe new tablet reach at the same time and load firstly. ",
|
||||
"[base_table='%s' table='%s' version=%ld]",
|
||||
olap_table->full_name().c_str(),
|
||||
related_olap_table->full_name().c_str(),
|
||||
_request.version);
|
||||
LOG(INFO) << "schema change info is cleared after base table get related tablet, "
|
||||
<< "maybe new tablet reach at the same time and load firstly. "
|
||||
<< ", old_tablet=" << olap_table->full_name()
|
||||
<< ", new_tablet=" << related_olap_table->full_name()
|
||||
<< ", version=" << _request.version;
|
||||
} else if (related_olap_table->creation_time() > olap_table->creation_time()) {
|
||||
// If current table is old table, append it to table_infoes
|
||||
table_infoes.push_back(TableVars());
|
||||
@ -126,11 +125,10 @@ OLAPStatus PushHandler::process(
|
||||
goto EXIT;
|
||||
}
|
||||
|
||||
OLAP_LOG_INFO("data of new table is generated, stop convert from base table. "
|
||||
"[base_table='%s' table='%s' version=%ld]",
|
||||
related_olap_table->full_name().c_str(),
|
||||
olap_table->full_name().c_str(),
|
||||
_request.version);
|
||||
LOG(INFO) << "data of new table is generated, stop convert from base table. "
|
||||
<< "old_tablet=" << olap_table->full_name()
|
||||
<< ", new_tablet=" << related_olap_table->full_name()
|
||||
<< ", version=" << _request.version;
|
||||
is_new_tablet_effective = true;
|
||||
}
|
||||
}
|
||||
@ -212,8 +210,8 @@ OLAPStatus PushHandler::process(
|
||||
|
||||
res = table_var.olap_table->save_header();
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(FATAL) << "fail to save header. res=" << res << ", "
|
||||
<< "table=" << table_var.olap_table->full_name();
|
||||
LOG(FATAL) << "fail to save header. res=" << res
|
||||
<< ", table=" << table_var.olap_table->full_name();
|
||||
goto EXIT;
|
||||
}
|
||||
}
|
||||
@ -294,7 +292,7 @@ EXIT:
|
||||
}
|
||||
_olap_table_arr.clear();
|
||||
|
||||
OLAP_LOG_INFO("finish to process push. [res=%d]", res);
|
||||
LOG(INFO) << "finish to process push. res=" << res;
|
||||
|
||||
return res;
|
||||
}
|
||||
@ -304,8 +302,8 @@ OLAPStatus PushHandler::process_realtime_push(
|
||||
const TPushReq& request,
|
||||
PushType push_type,
|
||||
vector<TTabletInfo>* tablet_info_vec) {
|
||||
OLAP_LOG_INFO("begin to realtime push. [table=%s transaction_id=%ld]",
|
||||
olap_table->full_name().c_str(), request.transaction_id);
|
||||
LOG(INFO) << "begin to realtime push. tablet=" << olap_table->full_name()
|
||||
<< ", transaction_id=" << request.transaction_id;
|
||||
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
_request = request;
|
||||
@ -350,10 +348,11 @@ OLAPStatus PushHandler::process_realtime_push(
|
||||
olap_table->release_header_lock();
|
||||
|
||||
if (is_schema_changing) {
|
||||
OLAP_LOG_INFO("find schema_change status when realtime push. "
|
||||
"[table=%s related_tablet_id=%ld related_schema_hash=%d transaction_id=%ld]",
|
||||
olap_table->full_name().c_str(),
|
||||
related_tablet_id, related_schema_hash, request.transaction_id);
|
||||
LOG(INFO) << "find schema_change status when realtime push. "
|
||||
<< "tablet=" << olap_table->full_name()
|
||||
<< ", related_tablet_id=" << related_tablet_id
|
||||
<< ", related_schema_hash=" << related_schema_hash
|
||||
<< ", transaction_id=" << request.transaction_id;
|
||||
OLAPTablePtr related_olap_table = OLAPEngine::get_instance()->get_table(
|
||||
related_tablet_id, related_schema_hash);
|
||||
|
||||
@ -427,9 +426,9 @@ OLAPStatus PushHandler::process_realtime_push(
|
||||
}
|
||||
}
|
||||
table_var.olap_table->release_header_lock();
|
||||
OLAP_LOG_INFO("success to check delete condition when realtime push. "
|
||||
"[table=%s transaction_id=%ld]",
|
||||
table_var.olap_table->full_name().c_str(), request.transaction_id);
|
||||
LOG(INFO) << "success to check delete condition when realtime push. "
|
||||
<< "tablet=" << table_var.olap_table->full_name()
|
||||
<< ", transaction_id=" << request.transaction_id;
|
||||
}
|
||||
}
|
||||
|
||||
@ -480,9 +479,10 @@ EXIT:
|
||||
if (tablet_info_vec != NULL) {
|
||||
_get_tablet_infos(table_infoes, tablet_info_vec);
|
||||
}
|
||||
OLAP_LOG_INFO("process realtime push successfully. "
|
||||
"[table=%s partition_id=%ld transaction_id=%ld]",
|
||||
olap_table->full_name().c_str(), request.partition_id, request.transaction_id);
|
||||
LOG(INFO) << "process realtime push successfully. "
|
||||
<< "tablet=" << olap_table->full_name()
|
||||
<< ", partition_id=" << request.partition_id
|
||||
<< ", transaction_id=" << request.transaction_id;
|
||||
} else {
|
||||
|
||||
// error happens, clear
|
||||
@ -744,16 +744,14 @@ OLAPStatus PushHandler::_validate_request(
|
||||
}
|
||||
|
||||
if (is_new_tablet_effective) {
|
||||
OLAP_LOG_INFO("maybe a alter tablet has already created from base tablet. "
|
||||
"[table='%s' version=%d]",
|
||||
olap_table_for_raw->full_name().c_str(),
|
||||
_request.version);
|
||||
LOG(INFO) << "maybe a alter tablet has already created from base tablet. "
|
||||
<< "tablet=" << olap_table_for_raw->full_name()
|
||||
<< ", version=" << _request.version;
|
||||
if (push_type == PUSH_FOR_DELETE
|
||||
&& _request.version == latest_delta->start_version()
|
||||
&& _request.version_hash == latest_delta->version_hash()) {
|
||||
OLAP_LOG_INFO("base tablet has already convert delete version for new tablet. "
|
||||
"[version=%ld version_hash=%lu]",
|
||||
_request.version, _request.version_hash);
|
||||
LOG(INFO) << "base tablet has already convert delete version for new tablet. "
|
||||
<< "version=" << _request.version << ", version_hash=" << _request.version_hash;
|
||||
return OLAP_ERR_PUSH_VERSION_ALREADY_EXIST;
|
||||
}
|
||||
} else {
|
||||
@ -865,8 +863,8 @@ OLAPStatus PushHandler::_update_header(
|
||||
// Note we don't return fail here.
|
||||
res = olap_table->save_header();
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(FATAL) << "fail to save header. res=" << res << ", "
|
||||
<< "table=" << olap_table->full_name();
|
||||
LOG(FATAL) << "fail to save header. res=" << res
|
||||
<< ", tablet=" << olap_table->full_name();
|
||||
}
|
||||
|
||||
return res;
|
||||
@ -900,8 +898,8 @@ OLAPStatus PushHandler::_clear_alter_table_info(
|
||||
|
||||
res = tablet->save_header();
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(FATAL) << "fail to save header. res=" << res << ", "
|
||||
<< "table=" << tablet->full_name();
|
||||
LOG(FATAL) << "fail to save header. res=" << res
|
||||
<< ", table=" << tablet->full_name();
|
||||
break;
|
||||
}
|
||||
|
||||
@ -920,7 +918,7 @@ OLAPStatus PushHandler::_clear_alter_table_info(
|
||||
|
||||
res = related_tablet->save_header();
|
||||
if (res != OLAP_SUCCESS) {
|
||||
LOG(FATAL) << "fail to save header. res=" << res << ", "
|
||||
LOG(FATAL) << "fail to save header. res=" << res
|
||||
<< "table=" << related_tablet->full_name();
|
||||
break;
|
||||
}
|
||||
|
||||
@ -798,7 +798,7 @@ bool SchemaChangeDirectly::process(ColumnData* olap_data, SegmentGroup* new_segm
|
||||
return true;
|
||||
}
|
||||
|
||||
VLOG(3) << "init writer. table=" << _olap_table->full_name() << ", "
|
||||
VLOG(3) << "init writer. table=" << _olap_table->full_name()
|
||||
<< "block_row_size=" << _olap_table->num_rows_per_row_block();
|
||||
bool result = true;
|
||||
RowBlock* new_row_block = NULL;
|
||||
@ -884,10 +884,10 @@ bool SchemaChangeDirectly::process(ColumnData* olap_data, SegmentGroup* new_segm
|
||||
result = false;
|
||||
}
|
||||
} else {
|
||||
OLAP_LOG_INFO("all row nums. "
|
||||
"[source_rows=%lu merged_rows=%lu filted_rows=%lu new_index_rows=%lu]",
|
||||
olap_data->segment_group()->num_rows(),
|
||||
merged_rows(), filted_rows(), new_segment_group->num_rows());
|
||||
LOG(INFO) << "all row nums. source_rows=" << olap_data->segment_group()->num_rows()
|
||||
<< ", merged_rows=" << merged_rows()
|
||||
<< ", filted_rows=" << filted_rows()
|
||||
<< ", new_index_rows=" << new_segment_group->num_rows();
|
||||
}
|
||||
|
||||
DIRECTLY_PROCESS_ERR:
|
||||
@ -1095,10 +1095,10 @@ bool SchemaChangeWithSorting::process(ColumnData* olap_data, SegmentGroup* new_s
|
||||
result = false;
|
||||
}
|
||||
} else {
|
||||
OLAP_LOG_INFO("all row nums. "
|
||||
"[source_rows=%lu merged_rows=%lu filted_rows=%lu new_index_rows=%lu]",
|
||||
olap_data->segment_group()->num_rows(),
|
||||
merged_rows(), filted_rows(), new_segment_group->num_rows());
|
||||
LOG(INFO) << "all row nums. source_rows=" << olap_data->segment_group()->num_rows()
|
||||
<< ", merged_rows=" << merged_rows()
|
||||
<< ", filted_rows=" << filted_rows()
|
||||
<< ", new_index_rows=" << new_segment_group->num_rows();
|
||||
}
|
||||
|
||||
SORTING_PROCESS_ERR:
|
||||
@ -1301,8 +1301,8 @@ OLAPStatus SchemaChangeHandler::_check_and_clear_schema_change_info(
|
||||
|
||||
if (tablet_id == request.new_tablet_req.tablet_id
|
||||
&& schema_hash == request.new_tablet_req.tablet_schema.schema_hash) {
|
||||
OLAP_LOG_INFO("schema change task for specified tablet has already finished. "
|
||||
"tablet_id=%ld schema_hash=%d", tablet_id, schema_hash);
|
||||
LOG(INFO) << "schema change task for specified tablet has already finished. "
|
||||
<< "tablet_id=" << tablet_id << ", schema_hash=" << schema_hash;
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -1359,7 +1359,7 @@ OLAPStatus SchemaChangeHandler::process_alter_table(
|
||||
AlterTabletType type,
|
||||
const TAlterTabletReq& request) {
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
OLAP_LOG_INFO("begin to validate alter tablet request.");
|
||||
LOG(INFO) << "begin to validate alter tablet request.";
|
||||
|
||||
// 1. Lock schema_change_lock util schema change info is stored in table header
|
||||
if (!OLAPEngine::get_instance()->try_schema_change_lock(request.base_tablet_id)) {
|
||||
@ -1409,9 +1409,8 @@ OLAPStatus SchemaChangeHandler::_do_alter_table(
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
OLAPTablePtr new_olap_table;
|
||||
string base_root_path = ref_olap_table->storage_root_path_name();
|
||||
OLAP_LOG_INFO("begin to do alter tablet job. new table[%d]",
|
||||
request.new_tablet_req.tablet_id);
|
||||
|
||||
LOG(INFO) << "begin to do alter tablet job. new_table_id=" << request.new_tablet_req.tablet_id;
|
||||
// 1. Create new table and register into OLAPEngine
|
||||
res = _create_new_olap_table(ref_olap_table,
|
||||
request.new_tablet_req,
|
||||
@ -1749,10 +1748,9 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(
|
||||
}
|
||||
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
OLAP_LOG_INFO("begin to convert delta version for schema changing. "
|
||||
"[src_tablet='%s' dest_tablet='%s']",
|
||||
src_olap_table->full_name().c_str(),
|
||||
dest_olap_table->full_name().c_str());
|
||||
LOG(INFO) << "begin to convert delta version for schema changing. "
|
||||
<< "old_tablet=" << src_olap_table->full_name()
|
||||
<< ", dest_tablet=" << dest_olap_table->full_name();
|
||||
|
||||
// a. 解析Alter请求,转换成内部的表示形式
|
||||
// 不使用DELETE_DATA命令指定的删除条件
|
||||
@ -1775,17 +1773,17 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(
|
||||
SchemaChange* sc_procedure = NULL;
|
||||
if (true == sc_sorting) {
|
||||
size_t memory_limitation = config::memory_limitation_per_thread_for_schema_change;
|
||||
OLAP_LOG_INFO("doing schema change with sorting.");
|
||||
LOG(INFO) << "doing schema change with sorting.";
|
||||
sc_procedure = new(nothrow) SchemaChangeWithSorting(
|
||||
dest_olap_table,
|
||||
rb_changer,
|
||||
memory_limitation * 1024 * 1024 * 1024);
|
||||
} else if (true == sc_directly) {
|
||||
OLAP_LOG_INFO("doing schema change directly.");
|
||||
LOG(INFO) << "doing schema change directly.";
|
||||
sc_procedure = new(nothrow) SchemaChangeDirectly(
|
||||
dest_olap_table, rb_changer);
|
||||
} else {
|
||||
OLAP_LOG_INFO("doing linked schema change.");
|
||||
LOG(INFO) << "doing linked schema change.";
|
||||
sc_procedure = new(nothrow) LinkedSchemaChange(
|
||||
src_olap_table,
|
||||
dest_olap_table);
|
||||
@ -1958,10 +1956,9 @@ OLAPStatus SchemaChangeHandler::_save_schema_change_info(
|
||||
// @static
|
||||
OLAPStatus SchemaChangeHandler::_alter_table(SchemaChangeParams* sc_params) {
|
||||
OLAPStatus res = OLAP_SUCCESS;
|
||||
OLAP_LOG_INFO("begin to process alter table job. "
|
||||
"[ref_olap_table='%s' new_olap_table='%s']",
|
||||
sc_params->ref_olap_table->full_name().c_str(),
|
||||
sc_params->new_olap_table->full_name().c_str());
|
||||
LOG(INFO) << "begin to process alter table job. "
|
||||
<< "old_olap_table=" << sc_params->ref_olap_table->full_name()
|
||||
<< ", new_olap_table=" << sc_params->new_olap_table->full_name();
|
||||
|
||||
// find end version
|
||||
int32_t end_version = -1;
|
||||
@ -1995,17 +1992,17 @@ OLAPStatus SchemaChangeHandler::_alter_table(SchemaChangeParams* sc_params) {
|
||||
// b. 生成历史数据转换器
|
||||
if (true == sc_sorting) {
|
||||
size_t memory_limitation = config::memory_limitation_per_thread_for_schema_change;
|
||||
OLAP_LOG_INFO("doing schema change with sorting.");
|
||||
LOG(INFO) << "doing schema change with sorting.";
|
||||
sc_procedure = new(nothrow) SchemaChangeWithSorting(
|
||||
sc_params->new_olap_table,
|
||||
rb_changer,
|
||||
memory_limitation * 1024 * 1024 * 1024);
|
||||
} else if (true == sc_directly) {
|
||||
OLAP_LOG_INFO("doing schema change directly.");
|
||||
LOG(INFO) << "doing schema change directly.";
|
||||
sc_procedure = new(nothrow) SchemaChangeDirectly(
|
||||
sc_params->new_olap_table, rb_changer);
|
||||
} else {
|
||||
OLAP_LOG_INFO("doing linked schema change.");
|
||||
LOG(INFO) << "doing linked schema change.";
|
||||
sc_procedure = new(nothrow) LinkedSchemaChange(
|
||||
sc_params->ref_olap_table,
|
||||
sc_params->new_olap_table);
|
||||
@ -2211,7 +2208,7 @@ PROCESS_ALTER_EXIT:
|
||||
sc_params->ref_olap_table->release_data_sources(&(sc_params->ref_olap_data_arr));
|
||||
SAFE_DELETE(sc_procedure);
|
||||
|
||||
OLAP_LOG_INFO("finish to process alter table job. [res=%d]", res);
|
||||
LOG(INFO) << "finish to process alter table job. res=" << res;
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
@ -517,7 +517,6 @@ bool valid_datetime(const std::string& value_str);
|
||||
#define OLAP_LOG_DEBUG(fmt, arg...) OLAP_VLOG_WRITE(3, fmt, ##arg)
|
||||
#define OLAP_LOG_TRACE(fmt, arg...) OLAP_VLOG_WRITE(20, fmt, ##arg)
|
||||
|
||||
#define OLAP_LOG_INFO(fmt, arg...) OLAP_LOG_WRITE(INFO, fmt, ##arg)
|
||||
#define OLAP_LOG_WARNING(fmt, arg...) OLAP_LOG_WRITE(WARNING, fmt, ##arg)
|
||||
#define OLAP_LOG_FATAL(fmt, arg...) OLAP_LOG_WRITE(ERROR, fmt, ##arg)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user