Fix direct load release tablet direct load mgr and tmp file after merge completed
This commit is contained in:
@ -656,6 +656,11 @@ int ObTableLoadMerger::handle_merge_thread_finish(int ret_code)
|
||||
if (OB_UNLIKELY(is_stop_ || has_error_)) {
|
||||
} else {
|
||||
LOG_INFO("LOAD MERGE COMPLETED");
|
||||
// release tmpfile
|
||||
// TODO(suzhi.yt) release all tables and merge tasks
|
||||
if (!store_ctx_->is_fast_heap_table_) {
|
||||
table_compact_ctx_.result_.release_all_table_data();
|
||||
}
|
||||
if (store_ctx_->ctx_->schema_.is_column_store_) {
|
||||
if (OB_FAIL(build_rescan_ctx())) {
|
||||
LOG_WARN("fail to build rescan ctx", KR(ret));
|
||||
|
||||
@ -97,6 +97,14 @@ int ObTableLoadTableCompactResult::add_table(ObIDirectLoadPartitionTable *table)
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTableLoadTableCompactResult::release_all_table_data()
|
||||
{
|
||||
for (int64_t i = 0; i < all_table_array_.count(); ++i) {
|
||||
ObIDirectLoadPartitionTable *table = all_table_array_.at(i);
|
||||
table->release_data();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* ObTableLoadTableCompactCtx
|
||||
*/
|
||||
|
||||
@ -39,6 +39,7 @@ public:
|
||||
void reset();
|
||||
int init();
|
||||
int add_table(storage::ObIDirectLoadPartitionTable *table);
|
||||
void release_all_table_data();
|
||||
public:
|
||||
typedef common::ObLinkHashMap<common::ObTabletID, ObTableLoadTableCompactTabletResult>
|
||||
TabletResultMap;
|
||||
|
||||
@ -103,6 +103,11 @@ int ObDirectLoadExternalTable::init(const ObDirectLoadExternalTableCreateParam &
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObDirectLoadExternalTable::release_data()
|
||||
{
|
||||
fragments_.reset();
|
||||
}
|
||||
|
||||
int ObDirectLoadExternalTable::copy(const ObDirectLoadExternalTable &other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -59,6 +59,7 @@ public:
|
||||
const common::ObTabletID &get_tablet_id() const override { return meta_.tablet_id_; }
|
||||
int64_t get_row_count() const override { return meta_.row_count_; }
|
||||
bool is_valid() const override { return is_inited_; }
|
||||
void release_data() override;
|
||||
int copy(const ObDirectLoadExternalTable &other);
|
||||
const ObDirectLoadExternalTableMeta &get_meta() const { return meta_; }
|
||||
const ObDirectLoadExternalFragmentArray &get_fragments() const { return fragments_; }
|
||||
|
||||
@ -54,6 +54,7 @@ public:
|
||||
const common::ObTabletID &get_tablet_id() const override { return meta_.tablet_id_; }
|
||||
int64_t get_row_count() const override { return meta_.row_count_; }
|
||||
bool is_valid() const override { return is_inited_; }
|
||||
void release_data() override { /*do nothing*/ }
|
||||
const ObDirectLoadFastHeapTableMeta &get_meta() const { return meta_; }
|
||||
TO_STRING_KV(K_(meta));
|
||||
private:
|
||||
|
||||
@ -27,6 +27,7 @@ public:
|
||||
virtual const common::ObTabletID &get_tablet_id() const = 0;
|
||||
virtual int64_t get_row_count() const = 0;
|
||||
virtual bool is_valid() const = 0;
|
||||
virtual void release_data() = 0;
|
||||
TO_STRING_EMPTY();
|
||||
};
|
||||
|
||||
|
||||
@ -188,6 +188,7 @@ int ObDirectLoadInsertTabletContext::close()
|
||||
LOG_WARN("fail to close tablet direct load", KR(ret), K(param_.ls_id_),
|
||||
K(param_.tablet_id_));
|
||||
} else {
|
||||
handle_.reset();
|
||||
is_open_ = false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -168,6 +168,12 @@ int ObDirectLoadMultipleHeapTable::init(const ObDirectLoadMultipleHeapTableCreat
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObDirectLoadMultipleHeapTable::release_data()
|
||||
{
|
||||
index_file_handle_.reset();
|
||||
data_fragments_.reset();
|
||||
}
|
||||
|
||||
int ObDirectLoadMultipleHeapTable::copy(const ObDirectLoadMultipleHeapTable &other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -95,6 +95,7 @@ public:
|
||||
const common::ObTabletID &get_tablet_id() const override { return tablet_id_; }
|
||||
int64_t get_row_count() const override { return meta_.row_count_; }
|
||||
bool is_valid() const override { return is_inited_; }
|
||||
void release_data() override;
|
||||
int copy(const ObDirectLoadMultipleHeapTable &other);
|
||||
const ObDirectLoadMultipleHeapTableMeta &get_meta() const { return meta_; }
|
||||
const ObDirectLoadTmpFileHandle &get_index_file_handle() const { return index_file_handle_; }
|
||||
|
||||
@ -173,6 +173,11 @@ int ObDirectLoadMultipleSSTable::init(const ObDirectLoadMultipleSSTableCreatePar
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObDirectLoadMultipleSSTable::release_data()
|
||||
{
|
||||
fragments_.reset();
|
||||
}
|
||||
|
||||
int ObDirectLoadMultipleSSTable::copy(const ObDirectLoadMultipleSSTable &other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -100,6 +100,7 @@ public:
|
||||
const common::ObTabletID &get_tablet_id() const override { return tablet_id_; }
|
||||
int64_t get_row_count() const override { return meta_.row_count_; }
|
||||
bool is_valid() const override { return is_inited_; }
|
||||
void release_data() override;
|
||||
int copy(const ObDirectLoadMultipleSSTable &other);
|
||||
bool is_empty() const { return 0 == meta_.row_count_; }
|
||||
const ObDirectLoadMultipleSSTableMeta &get_meta() const { return meta_; }
|
||||
|
||||
@ -159,6 +159,11 @@ int ObDirectLoadSSTable::init(ObDirectLoadSSTableCreateParam ¶m)
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObDirectLoadSSTable::release_data()
|
||||
{
|
||||
fragments_.reset();
|
||||
}
|
||||
|
||||
int ObDirectLoadSSTable::copy(const ObDirectLoadSSTable &other)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -152,6 +152,7 @@ public:
|
||||
const common::ObTabletID &get_tablet_id() const override { return meta_.tablet_id_; }
|
||||
bool is_empty() const { return 0 == meta_.row_count_; }
|
||||
bool is_valid() const override { return is_inited_; }
|
||||
void release_data() override;
|
||||
int copy(const ObDirectLoadSSTable &other);
|
||||
const common::ObArray<ObDirectLoadSSTableFragment> &get_fragment_array() const
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user