Fix direct load thread local variables destruct
This commit is contained in:
parent
b87473dac2
commit
d7d52691d0
@ -7,11 +7,12 @@
|
|||||||
#include "observer/table_load/ob_table_load_task_scheduler.h"
|
#include "observer/table_load/ob_table_load_task_scheduler.h"
|
||||||
#include "common/ob_timeout_ctx.h"
|
#include "common/ob_timeout_ctx.h"
|
||||||
#include "lib/stat/ob_session_stat.h"
|
#include "lib/stat/ob_session_stat.h"
|
||||||
|
#include "observer/omt/ob_tenant.h"
|
||||||
#include "observer/table_load/ob_table_load_stat.h"
|
#include "observer/table_load/ob_table_load_stat.h"
|
||||||
#include "observer/table_load/ob_table_load_task.h"
|
#include "observer/table_load/ob_table_load_task.h"
|
||||||
#include "share/ob_share_util.h"
|
#include "share/ob_share_util.h"
|
||||||
#include "share/rc/ob_tenant_base.h"
|
#include "share/rc/ob_tenant_base.h"
|
||||||
#include "observer/omt/ob_tenant.h"
|
#include "storage/direct_load/ob_direct_load_table_builder_allocator.h"
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
@ -19,6 +20,7 @@ namespace observer
|
|||||||
{
|
{
|
||||||
using namespace common;
|
using namespace common;
|
||||||
using namespace share;
|
using namespace share;
|
||||||
|
using namespace storage;
|
||||||
|
|
||||||
void ObTableLoadTaskThreadPoolScheduler::MyThreadPool::run1()
|
void ObTableLoadTaskThreadPoolScheduler::MyThreadPool::run1()
|
||||||
{
|
{
|
||||||
@ -237,6 +239,9 @@ void ObTableLoadTaskThreadPoolScheduler::run(uint64_t thread_idx)
|
|||||||
state_ = STATE_STOPPING;
|
state_ = STATE_STOPPING;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// clear thread local variables
|
||||||
|
get_table_builder_allocator()->reset();
|
||||||
|
|
||||||
LOG_INFO("table load task thread stopped", KP(this), "pid", get_tid_cache(), K(thread_idx));
|
LOG_INFO("table load task thread stopped", KP(this), "pid", get_tid_cache(), K(thread_idx));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -445,6 +445,7 @@ void ObDDLKV::reset()
|
|||||||
{
|
{
|
||||||
FLOG_INFO("ddl kv reset", KP(this), K(*this));
|
FLOG_INFO("ddl kv reset", KP(this), K(*this));
|
||||||
is_inited_ = false;
|
is_inited_ = false;
|
||||||
|
ObSSTable::reset();
|
||||||
ls_id_.reset();
|
ls_id_.reset();
|
||||||
tablet_id_.reset();
|
tablet_id_.reset();
|
||||||
ddl_start_scn_ = SCN::min_scn();
|
ddl_start_scn_ = SCN::min_scn();
|
||||||
@ -469,7 +470,6 @@ void ObDDLKV::reset()
|
|||||||
sstable_index_builder_ = nullptr;
|
sstable_index_builder_ = nullptr;
|
||||||
}
|
}
|
||||||
block_meta_tree_.destroy();
|
block_meta_tree_.destroy();
|
||||||
ObSSTable::reset();
|
|
||||||
arena_allocator_.reset();
|
arena_allocator_.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,15 +13,63 @@ namespace oceanbase
|
|||||||
namespace storage
|
namespace storage
|
||||||
{
|
{
|
||||||
|
|
||||||
class ObDirectLoadTableBuilderAllocator
|
class ObDirectLoadTableBuilderAllocator final
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ObDirectLoadTableBuilderAllocator();
|
ObDirectLoadTableBuilderAllocator()
|
||||||
~ObDirectLoadTableBuilderAllocator();
|
{
|
||||||
|
tid_ = get_tid_cache();
|
||||||
|
}
|
||||||
|
~ObDirectLoadTableBuilderAllocator()
|
||||||
|
{
|
||||||
|
assert_in_own_thread();
|
||||||
|
OB_ASSERT(using_list_.is_empty());
|
||||||
|
}
|
||||||
|
void reset()
|
||||||
|
{
|
||||||
|
assert_in_own_thread();
|
||||||
|
Item *item = nullptr;
|
||||||
|
DLIST_REMOVE_ALL_NORET(item, using_list_)
|
||||||
|
{
|
||||||
|
ObIDirectLoadPartitionTableBuilder *table_builder =
|
||||||
|
(ObIDirectLoadPartitionTableBuilder *)item->buf_;
|
||||||
|
table_builder->~ObIDirectLoadPartitionTableBuilder();
|
||||||
|
ob_free(item);
|
||||||
|
}
|
||||||
|
OB_ASSERT(using_list_.is_empty());
|
||||||
|
}
|
||||||
template <typename T, typename... Args>
|
template <typename T, typename... Args>
|
||||||
T *alloc(Args &&... args);
|
T *alloc(Args &&... args)
|
||||||
void free(ObIDirectLoadPartitionTableBuilder *table_builder);
|
{
|
||||||
void assert_in_own_thread();
|
assert_in_own_thread();
|
||||||
|
T *t = nullptr;
|
||||||
|
void *buf = nullptr;
|
||||||
|
ObMemAttr attr;
|
||||||
|
attr.label_ = "TLD_TB_Alloc";
|
||||||
|
attr.tenant_id_ = MTL_ID();
|
||||||
|
if (OB_NOT_NULL(buf = ob_malloc(sizeof(Item) + sizeof(T), attr))) {
|
||||||
|
Item *item = new (buf) Item;
|
||||||
|
t = new (item->buf_) T(args...);
|
||||||
|
using_list_.add_last(item);
|
||||||
|
}
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
void free(ObIDirectLoadPartitionTableBuilder *table_builder)
|
||||||
|
{
|
||||||
|
assert_in_own_thread();
|
||||||
|
if (OB_NOT_NULL(table_builder)) {
|
||||||
|
table_builder->~ObIDirectLoadPartitionTableBuilder();
|
||||||
|
Item *item = (Item *)table_builder - 1;
|
||||||
|
using_list_.remove(item);
|
||||||
|
item->~Item();
|
||||||
|
ob_free(item);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
OB_INLINE void assert_in_own_thread()
|
||||||
|
{
|
||||||
|
const int64_t tid = get_tid_cache();
|
||||||
|
OB_ASSERT(tid == tid_);
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct Item : public common::ObDLinkBase<Item>
|
struct Item : public common::ObDLinkBase<Item>
|
||||||
@ -34,60 +82,6 @@ private:
|
|||||||
ObDList<Item> using_list_;
|
ObDList<Item> using_list_;
|
||||||
};
|
};
|
||||||
|
|
||||||
ObDirectLoadTableBuilderAllocator::ObDirectLoadTableBuilderAllocator()
|
|
||||||
{
|
|
||||||
tid_ = get_tid_cache();
|
|
||||||
}
|
|
||||||
|
|
||||||
ObDirectLoadTableBuilderAllocator::~ObDirectLoadTableBuilderAllocator()
|
|
||||||
{
|
|
||||||
assert_in_own_thread();
|
|
||||||
Item *item = nullptr;
|
|
||||||
DLIST_REMOVE_ALL_NORET(item, using_list_)
|
|
||||||
{
|
|
||||||
ObIDirectLoadPartitionTableBuilder *table_builder =
|
|
||||||
(ObIDirectLoadPartitionTableBuilder *)item->buf_;
|
|
||||||
table_builder->~ObIDirectLoadPartitionTableBuilder();
|
|
||||||
ob_free(item);
|
|
||||||
}
|
|
||||||
OB_ASSERT(using_list_.is_empty());
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename T, typename... Args>
|
|
||||||
T *ObDirectLoadTableBuilderAllocator::alloc(Args &&... args)
|
|
||||||
{
|
|
||||||
assert_in_own_thread();
|
|
||||||
T *t = nullptr;
|
|
||||||
void *buf = nullptr;
|
|
||||||
ObMemAttr attr;
|
|
||||||
attr.label_ = "TLD_TB_Alloc";
|
|
||||||
attr.tenant_id_ = MTL_ID();
|
|
||||||
if (OB_NOT_NULL(buf = ob_malloc(sizeof(Item) + sizeof(T), attr))) {
|
|
||||||
Item *item = new (buf) Item;
|
|
||||||
t = new (item->buf_) T(args...);
|
|
||||||
using_list_.add_last(item);
|
|
||||||
}
|
|
||||||
return t;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ObDirectLoadTableBuilderAllocator::free(ObIDirectLoadPartitionTableBuilder *table_builder)
|
|
||||||
{
|
|
||||||
assert_in_own_thread();
|
|
||||||
if (OB_NOT_NULL(table_builder)) {
|
|
||||||
table_builder->~ObIDirectLoadPartitionTableBuilder();
|
|
||||||
Item *item = (Item *)table_builder - 1;
|
|
||||||
using_list_.remove(item);
|
|
||||||
item->~Item();
|
|
||||||
ob_free(item);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void ObDirectLoadTableBuilderAllocator::assert_in_own_thread()
|
|
||||||
{
|
|
||||||
const int64_t tid = get_tid_cache();
|
|
||||||
OB_ASSERT(tid == tid_);
|
|
||||||
}
|
|
||||||
|
|
||||||
OB_INLINE ObDirectLoadTableBuilderAllocator *get_table_builder_allocator()
|
OB_INLINE ObDirectLoadTableBuilderAllocator *get_table_builder_allocator()
|
||||||
{
|
{
|
||||||
RLOCAL_INLINE(ObDirectLoadTableBuilderAllocator, allcator);
|
RLOCAL_INLINE(ObDirectLoadTableBuilderAllocator, allcator);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user