[branch-2.1](memory) Fix CCR task repeat attach task DCHECK failed3 #33366
This commit is contained in:
@ -631,6 +631,7 @@ void alter_inverted_index_callback(StorageEngine& engine, const TAgentTaskReques
|
||||
auto tablet_ptr = engine.tablet_manager()->get_tablet(alter_inverted_index_rq.tablet_id);
|
||||
if (tablet_ptr != nullptr) {
|
||||
EngineIndexChangeTask engine_task(alter_inverted_index_rq);
|
||||
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
|
||||
status = engine_task.execute();
|
||||
} else {
|
||||
status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id);
|
||||
@ -857,6 +858,7 @@ void check_consistency_callback(StorageEngine& engine, const TAgentTaskRequest&
|
||||
EngineChecksumTask engine_task(check_consistency_req.tablet_id,
|
||||
check_consistency_req.schema_hash, check_consistency_req.version,
|
||||
&checksum);
|
||||
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
|
||||
Status status = engine_task.execute();
|
||||
if (!status.ok()) {
|
||||
LOG_WARNING("failed to check consistency")
|
||||
@ -1438,6 +1440,7 @@ void push_callback(const TAgentTaskRequest& req) {
|
||||
std::vector<TTabletInfo> tablet_infos;
|
||||
|
||||
EngineBatchLoadTask engine_task(const_cast<TPushReq&>(push_req), &tablet_infos);
|
||||
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
|
||||
auto status = engine_task.execute();
|
||||
|
||||
// Return result to fe
|
||||
@ -1695,6 +1698,7 @@ void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,
|
||||
|
||||
std::vector<TTabletInfo> tablet_infos;
|
||||
EngineCloneTask engine_task(clone_req, master_info, req.signature, &tablet_infos);
|
||||
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
|
||||
auto status = engine_task.execute();
|
||||
// Return result to fe
|
||||
TFinishTaskRequest finish_task_request;
|
||||
|
||||
@ -23,7 +23,6 @@
|
||||
#include "olap/task/engine_task.h"
|
||||
|
||||
namespace doris {
|
||||
class MemTrackerLimiter;
|
||||
class TAlterInvertedIndexReq;
|
||||
class TAlterTabletReqV2;
|
||||
|
||||
@ -38,8 +37,6 @@ public:
|
||||
|
||||
private:
|
||||
const TAlterTabletReqV2& _alter_tablet_req;
|
||||
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
}; // EngineTask
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -72,7 +72,6 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vector<TTablet
|
||||
EngineBatchLoadTask::~EngineBatchLoadTask() {}
|
||||
|
||||
Status EngineBatchLoadTask::execute() {
|
||||
SCOPED_ATTACH_TASK(_mem_tracker);
|
||||
Status status;
|
||||
if (_push_req.push_type == TPushType::LOAD_V2) {
|
||||
RETURN_IF_ERROR(_init());
|
||||
|
||||
@ -28,7 +28,6 @@
|
||||
#include "olap/task/engine_task.h"
|
||||
|
||||
namespace doris {
|
||||
class MemTrackerLimiter;
|
||||
class TPushReq;
|
||||
class TTabletInfo;
|
||||
|
||||
@ -71,7 +70,6 @@ private:
|
||||
std::vector<TTabletInfo>* _tablet_infos;
|
||||
std::string _remote_file_path;
|
||||
std::string _local_file_path;
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
}; // class EngineBatchLoadTask
|
||||
} // namespace doris
|
||||
#endif // DORIS_BE_SRC_OLAP_TASK_ENGINE_BATCH_LOAD_TASK_H
|
||||
|
||||
@ -50,7 +50,6 @@ EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_h
|
||||
}
|
||||
|
||||
Status EngineChecksumTask::execute() {
|
||||
SCOPED_ATTACH_TASK(_mem_tracker);
|
||||
return _compute_checksum();
|
||||
} // execute
|
||||
|
||||
|
||||
@ -27,7 +27,6 @@
|
||||
#include "olap/task/engine_task.h"
|
||||
|
||||
namespace doris {
|
||||
class MemTrackerLimiter;
|
||||
|
||||
// base class for storage engine
|
||||
// add "Engine" as task prefix to prevent duplicate name with agent task
|
||||
@ -49,7 +48,6 @@ private:
|
||||
TSchemaHash _schema_hash;
|
||||
TVersion _version;
|
||||
uint32_t* _checksum;
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
}; // EngineTask
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -155,7 +155,6 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo&
|
||||
|
||||
Status EngineCloneTask::execute() {
|
||||
// register the tablet to avoid it is deleted by gc thread during clone process
|
||||
SCOPED_ATTACH_TASK(_mem_tracker);
|
||||
if (!StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id)) {
|
||||
return Status::InternalError("tablet {} is under clone", _clone_req.tablet_id);
|
||||
}
|
||||
|
||||
@ -32,7 +32,6 @@
|
||||
|
||||
namespace doris {
|
||||
class DataDir;
|
||||
class MemTrackerLimiter;
|
||||
class TCloneReq;
|
||||
class TMasterInfo;
|
||||
class TTabletInfo;
|
||||
@ -95,7 +94,6 @@ private:
|
||||
const TMasterInfo& _master_info;
|
||||
int64_t _copy_size;
|
||||
int64_t _copy_time_ms;
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
std::vector<PendingRowsetGuard> _pending_rs_guards;
|
||||
}; // EngineTask
|
||||
|
||||
|
||||
@ -17,8 +17,7 @@
|
||||
|
||||
#include "olap/task/engine_index_change_task.h"
|
||||
|
||||
#include "runtime/memory/mem_tracker.h"
|
||||
#include "runtime/thread_context.h"
|
||||
#include "runtime/memory/mem_tracker_limiter.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -33,7 +32,6 @@ EngineIndexChangeTask::EngineIndexChangeTask(
|
||||
}
|
||||
|
||||
Status EngineIndexChangeTask::execute() {
|
||||
SCOPED_ATTACH_TASK(_mem_tracker);
|
||||
DorisMetrics::instance()->alter_inverted_index_requests_total->increment(1);
|
||||
uint64_t start = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch())
|
||||
|
||||
@ -35,8 +35,6 @@ public:
|
||||
|
||||
private:
|
||||
const TAlterInvertedIndexReq& _alter_inverted_index_req;
|
||||
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
}; // EngineTask
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -27,12 +27,17 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
class MemTrackerLimiter;
|
||||
|
||||
// base class for storage engine
|
||||
// add "Engine" as task prefix to prevent duplicate name with agent task
|
||||
class EngineTask {
|
||||
public:
|
||||
virtual ~EngineTask() = default;
|
||||
virtual Status execute() = 0;
|
||||
std::shared_ptr<MemTrackerLimiter> mem_tracker() const { return _mem_tracker; }
|
||||
|
||||
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
|
||||
};
|
||||
|
||||
} // end namespace doris
|
||||
|
||||
@ -141,6 +141,7 @@ public:
|
||||
DCHECK(mem_tracker);
|
||||
// Orphan is thread default tracker.
|
||||
DCHECK(thread_mem_tracker()->label() == "Orphan")
|
||||
<< ", thread mem tracker label: " << thread_mem_tracker()->label()
|
||||
<< ", attach mem tracker label: " << mem_tracker->label();
|
||||
#endif
|
||||
_task_id = task_id;
|
||||
|
||||
Reference in New Issue
Block a user