Fix tmp file write info invalid
This commit is contained in:
@ -378,10 +378,10 @@ int ObTmpFileExtent::read(const ObTmpFileIOInfo &io_info, const int64_t offset,
|
|||||||
if (OB_UNLIKELY(!is_alloced_)) {
|
if (OB_UNLIKELY(!is_alloced_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
STORAGE_LOG(WARN, "ObTmpFileExtent has not been allocated", K(ret));
|
STORAGE_LOG(WARN, "ObTmpFileExtent has not been allocated", K(ret));
|
||||||
} else if (OB_UNLIKELY(offset < 0 || offset >= offset_ || size <= 0
|
} else if (OB_UNLIKELY(offset < 0 || offset >= get_offset() || size <= 0
|
||||||
|| offset + size > offset_) || OB_ISNULL(buf)) {
|
|| offset + size > get_offset()) || OB_ISNULL(buf)) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
STORAGE_LOG(WARN, "invalid argument", K(ret), K(offset), K(offset_), K(size), K(buf));
|
STORAGE_LOG(WARN, "invalid argument", K(ret), K(offset), K(get_offset()), K(size), K(buf));
|
||||||
} else {
|
} else {
|
||||||
ObTmpBlockIOInfo info;
|
ObTmpBlockIOInfo info;
|
||||||
info.buf_ = buf;
|
info.buf_ = buf;
|
||||||
@ -411,25 +411,25 @@ int ObTmpFileExtent::write(const ObTmpFileIOInfo &io_info,int64_t &size, char *&
|
|||||||
} else if (OB_UNLIKELY(!is_alloced_)) {
|
} else if (OB_UNLIKELY(!is_alloced_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
STORAGE_LOG(WARN, "ObTmpFileExtent has not been allocated", K(ret));
|
STORAGE_LOG(WARN, "ObTmpFileExtent has not been allocated", K(ret));
|
||||||
} else if (offset_ == page_nums_ * ObTmpMacroBlock::get_default_page_size()) {
|
} else if (get_offset() == page_nums_ * ObTmpMacroBlock::get_default_page_size()) {
|
||||||
close();
|
close();
|
||||||
} else {
|
} else {
|
||||||
SpinWLockGuard guard(lock_);
|
SpinWLockGuard guard(lock_);
|
||||||
if (!is_closed()) {
|
if (!is_closed()) {
|
||||||
remain = page_nums_ * ObTmpMacroBlock::get_default_page_size() - offset_;
|
remain = page_nums_ * ObTmpMacroBlock::get_default_page_size() - get_offset();
|
||||||
write_size = std::min(remain, size);
|
write_size = std::min(remain, size);
|
||||||
ObTmpBlockIOInfo info;
|
ObTmpBlockIOInfo info;
|
||||||
info.block_id_ = block_id_;
|
info.block_id_ = block_id_;
|
||||||
info.buf_ = buf;
|
info.buf_ = buf;
|
||||||
info.io_desc_ = io_info.io_desc_;
|
info.io_desc_ = io_info.io_desc_;
|
||||||
info.offset_ = start_page_id_ * ObTmpMacroBlock::get_default_page_size() + offset_;
|
info.offset_ = start_page_id_ * ObTmpMacroBlock::get_default_page_size() + get_offset();
|
||||||
info.size_ = write_size;
|
info.size_ = write_size;
|
||||||
info.tenant_id_ = io_info.tenant_id_;
|
info.tenant_id_ = io_info.tenant_id_;
|
||||||
if (OB_FAIL(OB_TMP_FILE_STORE.write(owner_->get_tenant_id(), info))) {
|
if (OB_FAIL(OB_TMP_FILE_STORE.write(owner_->get_tenant_id(), info))) {
|
||||||
STORAGE_LOG(WARN, "fail to write the extent", K(ret));
|
STORAGE_LOG(WARN, "fail to write the extent", K(ret));
|
||||||
} else {
|
} else {
|
||||||
offset_ += write_size;
|
ATOMIC_FAA(&offset_, write_size);
|
||||||
g_offset_end_ = offset_ + g_offset_start_;
|
g_offset_end_ = get_offset() + g_offset_start_;
|
||||||
buf += write_size;
|
buf += write_size;
|
||||||
size -= write_size;
|
size -= write_size;
|
||||||
is_write = true;
|
is_write = true;
|
||||||
@ -451,7 +451,7 @@ void ObTmpFileExtent::reset()
|
|||||||
fd_ = -1;
|
fd_ = -1;
|
||||||
g_offset_start_ = 0;
|
g_offset_start_ = 0;
|
||||||
g_offset_end_ = 0;
|
g_offset_end_ = 0;
|
||||||
offset_ = 0;
|
ATOMIC_SET(&offset_, 0);
|
||||||
owner_ = NULL;
|
owner_ = NULL;
|
||||||
start_page_id_ = -1;
|
start_page_id_ = -1;
|
||||||
page_nums_ = 0;
|
page_nums_ = 0;
|
||||||
@ -496,11 +496,11 @@ bool ObTmpFileExtent::close(uint8_t &free_page_start_id, uint8_t &free_page_nums
|
|||||||
free_page_nums = 0;
|
free_page_nums = 0;
|
||||||
SpinWLockGuard guard(lock_);
|
SpinWLockGuard guard(lock_);
|
||||||
if (!is_closed_) {
|
if (!is_closed_) {
|
||||||
if (!force && 0 != page_nums_ && 0 == offset_) {
|
if (!force && 0 != page_nums_ && 0 == get_offset()) {
|
||||||
// Nothing to do. This extent is alloced just now, so it cannot be closed.
|
// Nothing to do. This extent is alloced just now, so it cannot be closed.
|
||||||
} else {
|
} else {
|
||||||
if (offset_ != page_nums_ * ObTmpMacroBlock::get_default_page_size()) {
|
if (get_offset() != page_nums_ * ObTmpMacroBlock::get_default_page_size()) {
|
||||||
uint8_t offset_page_id = common::upper_align(offset_, ObTmpMacroBlock::get_default_page_size())
|
uint8_t offset_page_id = common::upper_align(get_offset(), ObTmpMacroBlock::get_default_page_size())
|
||||||
/ ObTmpMacroBlock::get_default_page_size();
|
/ ObTmpMacroBlock::get_default_page_size();
|
||||||
free_page_nums = page_nums_ - offset_page_id;
|
free_page_nums = page_nums_ - offset_page_id;
|
||||||
free_page_start_id = start_page_id_ + offset_page_id;
|
free_page_start_id = start_page_id_ + offset_page_id;
|
||||||
|
|||||||
@ -173,7 +173,7 @@ public:
|
|||||||
OB_INLINE uint8_t get_page_nums() const { return page_nums_; }
|
OB_INLINE uint8_t get_page_nums() const { return page_nums_; }
|
||||||
OB_INLINE void set_block_id(const int64_t block_id) { block_id_ = block_id; }
|
OB_INLINE void set_block_id(const int64_t block_id) { block_id_ = block_id; }
|
||||||
OB_INLINE int64_t get_block_id() const { return block_id_; }
|
OB_INLINE int64_t get_block_id() const { return block_id_; }
|
||||||
OB_INLINE int32_t get_offset() const { return offset_; }
|
OB_INLINE int32_t get_offset() const { return ATOMIC_LOAD(&offset_); }
|
||||||
OB_INLINE ObTmpFile &get_owner() { return *owner_; }
|
OB_INLINE ObTmpFile &get_owner() { return *owner_; }
|
||||||
TO_STRING_KV(K_(is_alloced), K_(fd), K_(g_offset_start), K_(g_offset_end), KP_(owner),
|
TO_STRING_KV(K_(is_alloced), K_(fd), K_(g_offset_start), K_(g_offset_end), KP_(owner),
|
||||||
K_(start_page_id), K_(page_nums), K_(block_id), K_(offset), K_(is_closed));
|
K_(start_page_id), K_(page_nums), K_(block_id), K_(offset), K_(is_closed));
|
||||||
|
|||||||
@ -744,10 +744,12 @@ int ObTmpTenantMemBlockManager::try_sync(const int64_t block_id)
|
|||||||
} else {
|
} else {
|
||||||
STORAGE_LOG(DEBUG, "the tmp macro block has been washed", K(ret), K(block_id));
|
STORAGE_LOG(DEBUG, "the tmp macro block has been washed", K(ret), K(block_id));
|
||||||
}
|
}
|
||||||
} else if (t_mblk->is_washing()){
|
} else if (t_mblk->is_washing()) {
|
||||||
STORAGE_LOG(WARN, "the tmp macro block is washing", K(ret), K(block_id));
|
STORAGE_LOG(WARN, "the tmp macro block is washing", K(ret), K(block_id));
|
||||||
} else if (t_mblk->is_disked()){
|
} else if (t_mblk->is_disked()) {
|
||||||
STORAGE_LOG(WARN, "the tmp macro block has been disked", K(ret), K(block_id));
|
STORAGE_LOG(WARN, "the tmp macro block has been disked", K(ret), K(block_id));
|
||||||
|
} else if (0 == t_mblk->get_used_page_nums()) {
|
||||||
|
STORAGE_LOG(WARN, "the tmp macro block has not been written", K(ret), K(block_id));
|
||||||
} else {
|
} else {
|
||||||
t_mblk->set_washing_status(true);
|
t_mblk->set_washing_status(true);
|
||||||
common::ObIArray<ObTmpFileExtent* > &extents = t_mblk->get_extents();
|
common::ObIArray<ObTmpFileExtent* > &extents = t_mblk->get_extents();
|
||||||
@ -998,7 +1000,8 @@ int ObTmpTenantMemBlockManager::wash(const int64_t block_nums,
|
|||||||
int64_t cur_time = ObTimeUtility::fast_current_time();
|
int64_t cur_time = ObTimeUtility::fast_current_time();
|
||||||
for (iter = t_mblk_map_.begin(); OB_SUCC(ret) && iter != t_mblk_map_.end(); ++iter) {
|
for (iter = t_mblk_map_.begin(); OB_SUCC(ret) && iter != t_mblk_map_.end(); ++iter) {
|
||||||
ObTmpMacroBlock *m_blk = iter->second;
|
ObTmpMacroBlock *m_blk = iter->second;
|
||||||
if (OB_UNLIKELY(NULL != m_blk) && OB_UNLIKELY(m_blk->is_inited()) && OB_UNLIKELY(!m_blk->is_disked())) {
|
if (OB_UNLIKELY(NULL != m_blk) && OB_UNLIKELY(m_blk->is_inited()) && OB_UNLIKELY(!m_blk->is_disked()) &&
|
||||||
|
OB_UNLIKELY(0 != m_blk->get_used_page_nums())) {
|
||||||
BlockInfo info;
|
BlockInfo info;
|
||||||
info.block_id_ = m_blk->get_block_id();
|
info.block_id_ = m_blk->get_block_id();
|
||||||
info.wash_score_ = m_blk->get_wash_score(cur_time);
|
info.wash_score_ = m_blk->get_wash_score(cur_time);
|
||||||
|
|||||||
@ -584,6 +584,15 @@ int ObTmpMacroBlock::free(const int32_t start_page_id, const int32_t page_nums)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t ObTmpMacroBlock::get_used_page_nums() const
|
||||||
|
{
|
||||||
|
int64_t used_page_nums = 0;
|
||||||
|
for (int64_t i = using_extents_.count() - 1; i >= 0; --i) {
|
||||||
|
used_page_nums += std::ceil(using_extents_.at(i)->get_offset() / DEFAULT_PAGE_SIZE);
|
||||||
|
}
|
||||||
|
return used_page_nums;
|
||||||
|
}
|
||||||
|
|
||||||
void ObTmpMacroBlock::set_io_desc(const common::ObIOFlag &io_desc)
|
void ObTmpMacroBlock::set_io_desc(const common::ObIOFlag &io_desc)
|
||||||
{
|
{
|
||||||
io_desc_ = io_desc;
|
io_desc_ = io_desc;
|
||||||
|
|||||||
@ -120,10 +120,7 @@ public:
|
|||||||
OB_INLINE char *get_buffer() { return buffer_; }
|
OB_INLINE char *get_buffer() { return buffer_; }
|
||||||
OB_INLINE uint8_t get_max_cont_page_nums() const { return page_buddy_.get_max_cont_page_nums(); }
|
OB_INLINE uint8_t get_max_cont_page_nums() const { return page_buddy_.get_max_cont_page_nums(); }
|
||||||
OB_INLINE uint8_t get_free_page_nums() const { return free_page_nums_; }
|
OB_INLINE uint8_t get_free_page_nums() const { return free_page_nums_; }
|
||||||
OB_INLINE int64_t get_used_page_nums() const
|
int64_t get_used_page_nums() const;
|
||||||
{
|
|
||||||
return get_mblk_page_nums() - free_page_nums_;
|
|
||||||
}
|
|
||||||
int get_block_cache_handle(ObTmpBlockValueHandle &handle);
|
int get_block_cache_handle(ObTmpBlockValueHandle &handle);
|
||||||
int get_wash_io_info(ObTmpBlockIOInfo &info);
|
int get_wash_io_info(ObTmpBlockIOInfo &info);
|
||||||
void set_io_desc(const common::ObIOFlag &io_desc);
|
void set_io_desc(const common::ObIOFlag &io_desc);
|
||||||
@ -154,7 +151,7 @@ public:
|
|||||||
int close(bool &is_all_close, uint8_t &free_page_nums);
|
int close(bool &is_all_close, uint8_t &free_page_nums);
|
||||||
int give_back_buf_into_cache(bool is_wash = false);
|
int give_back_buf_into_cache(bool is_wash = false);
|
||||||
OB_INLINE double get_wash_score(int64_t cur_time) const {
|
OB_INLINE double get_wash_score(int64_t cur_time) const {
|
||||||
if (get_free_page_nums() == 0) {
|
if (ObTmpFilePageBuddy::MAX_PAGE_NUMS == get_used_page_nums()) {
|
||||||
return INT64_MAX;
|
return INT64_MAX;
|
||||||
}
|
}
|
||||||
return (double) get_used_page_nums() * (cur_time - get_alloc_time()) / (get_access_time() - get_alloc_time());
|
return (double) get_used_page_nums() * (cur_time - get_alloc_time()) / (get_access_time() - get_alloc_time());
|
||||||
|
|||||||
Reference in New Issue
Block a user