Files
doris/be/src/agent/task_worker_pool.h
2023-06-16 11:25:11 +08:00

332 lines
10 KiB
C++

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <butil/macros.h>
#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/Types_types.h>
#include <stdint.h>
#include <atomic>
#include <condition_variable>
#include <deque>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include "common/status.h"
#include "olap/tablet.h"
#include "util/countdown_latch.h"
#include "util/metrics.h"
namespace doris {
class ExecEnv;
class ThreadPool;
class AgentUtils;
class DataDir;
class TFinishTaskRequest;
class TMasterInfo;
class TReportRequest;
class TTabletInfo;
class TaskWorkerPool {
public:
// You need to modify the content in TYPE_STRING at the same time,
enum TaskWorkerType {
CREATE_TABLE,
DROP_TABLE,
PUSH,
REALTIME_PUSH,
PUBLISH_VERSION,
// Deprecated
CLEAR_ALTER_TASK [[deprecated]],
CLEAR_TRANSACTION_TASK,
DELETE,
ALTER_TABLE,
// Deprecated
QUERY_SPLIT_KEY [[deprecated]],
CLONE,
STORAGE_MEDIUM_MIGRATE,
CHECK_CONSISTENCY,
REPORT_TASK,
REPORT_DISK_STATE,
REPORT_OLAP_TABLE,
UPLOAD,
DOWNLOAD,
MAKE_SNAPSHOT,
RELEASE_SNAPSHOT,
MOVE,
RECOVER_TABLET,
UPDATE_TABLET_META_INFO,
SUBMIT_TABLE_COMPACTION,
PUSH_COOLDOWN_CONF,
PUSH_STORAGE_POLICY,
ALTER_INVERTED_INDEX,
GC_BINLOG,
};
enum ReportType { TASK, DISK, TABLET };
enum class ThreadModel {
SINGLE_THREAD, // Only 1 thread allowed in the pool
MULTI_THREADS // 1 or more threads allowed in the pool
};
const std::string TYPE_STRING(TaskWorkerType type) {
switch (type) {
case CREATE_TABLE:
return "CREATE_TABLE";
case DROP_TABLE:
return "DROP_TABLE";
case PUSH:
return "PUSH";
case REALTIME_PUSH:
return "REALTIME_PUSH";
case PUBLISH_VERSION:
return "PUBLISH_VERSION";
case CLEAR_TRANSACTION_TASK:
return "CLEAR_TRANSACTION_TASK";
case DELETE:
return "DELETE";
case ALTER_TABLE:
return "ALTER_TABLE";
case CLONE:
return "CLONE";
case STORAGE_MEDIUM_MIGRATE:
return "STORAGE_MEDIUM_MIGRATE";
case CHECK_CONSISTENCY:
return "CHECK_CONSISTENCY";
case REPORT_TASK:
return "REPORT_TASK";
case REPORT_DISK_STATE:
return "REPORT_DISK_STATE";
case REPORT_OLAP_TABLE:
return "REPORT_OLAP_TABLE";
case UPLOAD:
return "UPLOAD";
case DOWNLOAD:
return "DOWNLOAD";
case MAKE_SNAPSHOT:
return "MAKE_SNAPSHOT";
case RELEASE_SNAPSHOT:
return "RELEASE_SNAPSHOT";
case MOVE:
return "MOVE";
case RECOVER_TABLET:
return "RECOVER_TABLET";
case UPDATE_TABLET_META_INFO:
return "UPDATE_TABLET_META_INFO";
case SUBMIT_TABLE_COMPACTION:
return "SUBMIT_TABLE_COMPACTION";
case PUSH_COOLDOWN_CONF:
return "PUSH_COOLDOWN_CONF";
case PUSH_STORAGE_POLICY:
return "PUSH_STORAGE_POLICY";
case ALTER_INVERTED_INDEX:
return "ALTER_INVERTED_INDEX";
case GC_BINLOG:
return "GC_BINLOG";
default:
return "Unknown";
}
}
const std::string TYPE_STRING(ReportType type) {
switch (type) {
case TASK:
return "TASK";
case DISK:
return "DISK";
case TABLET:
return "TABLET";
default:
return "Unknown";
}
}
TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* env,
const TMasterInfo& master_info, ThreadModel thread_model);
virtual ~TaskWorkerPool();
// Start the task worker thread pool
virtual void start();
// Stop the task worker
virtual void stop();
// Submit task to task pool
//
// Input parameters:
// * task: the task need callback thread to do
virtual void submit_task(const TAgentTaskRequest& task);
// notify the worker. currently for task/disk/tablet report thread
void notify_thread();
protected:
bool _register_task_info(const TTaskType::type task_type, int64_t signature);
void _remove_task_info(const TTaskType::type task_type, int64_t signature);
void _finish_task(const TFinishTaskRequest& finish_task_request);
void _alter_inverted_index_worker_thread_callback();
void _check_consistency_worker_thread_callback();
void _report_task_worker_thread_callback();
void _report_disk_state_worker_thread_callback();
void _report_tablet_worker_thread_callback();
void _upload_worker_thread_callback();
void _download_worker_thread_callback();
void _make_snapshot_thread_callback();
void _release_snapshot_thread_callback();
void _move_dir_thread_callback();
void _update_tablet_meta_worker_thread_callback();
void _submit_table_compaction_worker_thread_callback();
void _push_cooldown_conf_worker_thread_callback();
void _push_storage_policy_worker_thread_callback();
void _gc_binlog_worker_thread_callback();
void _alter_tablet(const TAgentTaskRequest& alter_tablet_request, int64_t signature,
const TTaskType::type task_type, TFinishTaskRequest* finish_task_request);
void _alter_inverted_index(const TAgentTaskRequest& alter_inverted_index_request,
int64_t signature, const TTaskType::type task_type,
TFinishTaskRequest* finish_task_request);
void _handle_report(const TReportRequest& request, ReportType type);
Status _get_tablet_info(const TTabletId tablet_id, const TSchemaHash schema_hash,
int64_t signature, TTabletInfo* tablet_info);
Status _move_dir(const TTabletId tablet_id, const std::string& src, int64_t job_id,
bool overwrite);
// random sleep 1~second seconds
void _random_sleep(int second);
protected:
std::string _name;
// Reference to the ExecEnv::_master_info
const TMasterInfo& _master_info;
TBackend _backend;
std::unique_ptr<AgentUtils> _agent_utils;
ExecEnv* _env;
// Protect task queue
std::mutex _worker_thread_lock;
std::condition_variable _worker_thread_condition_variable;
CountDownLatch _stop_background_threads_latch;
bool _is_work;
ThreadModel _thread_model;
std::unique_ptr<ThreadPool> _thread_pool;
// Only meaningful when _thread_model is MULTI_THREADS
std::deque<TAgentTaskRequest> _tasks;
// Only meaningful when _thread_model is SINGLE_THREAD
std::atomic<bool> _is_doing_work;
std::shared_ptr<MetricEntity> _metric_entity;
UIntGauge* agent_task_queue_size;
// Always 1 when _thread_model is SINGLE_THREAD
uint32_t _worker_count;
TaskWorkerType _task_worker_type;
std::function<void()> _cb;
static std::atomic_ulong _s_report_version;
static std::mutex _s_task_signatures_lock;
static std::map<TTaskType::type, std::set<int64_t>> _s_task_signatures;
DISALLOW_COPY_AND_ASSIGN(TaskWorkerPool);
}; // class TaskWorkerPool
class CreateTableTaskPool : public TaskWorkerPool {
public:
CreateTableTaskPool(ExecEnv* env, ThreadModel thread_model);
void _create_tablet_worker_thread_callback();
DISALLOW_COPY_AND_ASSIGN(CreateTableTaskPool);
};
class DropTableTaskPool : public TaskWorkerPool {
public:
DropTableTaskPool(ExecEnv* env, ThreadModel thread_model);
void _drop_tablet_worker_thread_callback();
DISALLOW_COPY_AND_ASSIGN(DropTableTaskPool);
};
class PushTaskPool : public TaskWorkerPool {
public:
enum class PushWokerType { LOAD_V2, DELETE };
PushTaskPool(ExecEnv* env, ThreadModel thread_model, PushWokerType type);
void _push_worker_thread_callback();
DISALLOW_COPY_AND_ASSIGN(PushTaskPool);
private:
PushWokerType _push_worker_type;
};
class PublishVersionTaskPool : public TaskWorkerPool {
public:
PublishVersionTaskPool(ExecEnv* env, ThreadModel thread_model);
void _publish_version_worker_thread_callback();
DISALLOW_COPY_AND_ASSIGN(PublishVersionTaskPool);
};
class ClearTransactionTaskPool : public TaskWorkerPool {
public:
ClearTransactionTaskPool(ExecEnv* env, ThreadModel thread_model);
void _clear_transaction_task_worker_thread_callback();
DISALLOW_COPY_AND_ASSIGN(ClearTransactionTaskPool);
};
class AlterTableTaskPool : public TaskWorkerPool {
public:
AlterTableTaskPool(ExecEnv* env, ThreadModel thread_model);
void _alter_tablet(const TAgentTaskRequest& alter_tablet_request, int64_t signature,
const TTaskType::type task_type, TFinishTaskRequest* finish_task_request);
void _alter_tablet_worker_thread_callback();
DISALLOW_COPY_AND_ASSIGN(AlterTableTaskPool);
};
class CloneTaskPool : public TaskWorkerPool {
public:
CloneTaskPool(ExecEnv* env, ThreadModel thread_model);
void _clone_worker_thread_callback();
DISALLOW_COPY_AND_ASSIGN(CloneTaskPool);
};
class StorageMediumMigrateTaskPool : public TaskWorkerPool {
public:
StorageMediumMigrateTaskPool(ExecEnv* env, ThreadModel thread_model);
Status _check_migrate_request(const TStorageMediumMigrateReq& req, TabletSharedPtr& tablet,
DataDir** dest_store);
void _storage_medium_migrate_worker_thread_callback();
DISALLOW_COPY_AND_ASSIGN(StorageMediumMigrateTaskPool);
};
} // namespace doris