[master] fix pending_log_size and flushed_log_size in __all_virtual_trans_stat
This commit is contained in:
parent
4bc30b8a98
commit
b529ebb084
@ -419,9 +419,10 @@ int ObTransCallbackMgr::append(ObITransCallback *node)
|
||||
void ObTransCallbackMgr::before_append(ObITransCallback *node)
|
||||
{
|
||||
int64_t size = node->get_data_size();
|
||||
int64_t new_size = inc_pending_log_size(size);
|
||||
if (for_replay_) {
|
||||
inc_flushed_log_size(size);
|
||||
} else {
|
||||
inc_pending_log_size(size);
|
||||
}
|
||||
}
|
||||
|
||||
@ -429,9 +430,10 @@ void ObTransCallbackMgr::after_append(ObITransCallback *node, const int ret_code
|
||||
{
|
||||
if (OB_SUCCESS != ret_code) {
|
||||
int64_t size = node->get_data_size();
|
||||
inc_pending_log_size(-1 * size);
|
||||
if (for_replay_) {
|
||||
inc_flushed_log_size(-1 * size);
|
||||
} else {
|
||||
inc_pending_log_size(-1 * size);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1204,21 +1206,21 @@ int64_t ObTransCallbackMgr::inc_pending_log_size(const int64_t size)
|
||||
int64_t old_size = ATOMIC_FAA(&pending_log_size_, size);
|
||||
new_size = ATOMIC_LOAD(&pending_log_size_);
|
||||
if (old_size < 0 || new_size < 0) {
|
||||
ObIMemtableCtx *mt_ctx = NULL;
|
||||
transaction::ObTransCtx *trans_ctx = NULL;
|
||||
if (NULL == (mt_ctx = static_cast<ObIMemtableCtx *>(&host_))) {
|
||||
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "mt_ctx is null", K(size), K(old_size), K(new_size), K(host_));
|
||||
} else if (NULL == (trans_ctx = mt_ctx->get_trans_ctx())) {
|
||||
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "trans ctx get failed", K(size), K(old_size), K(new_size), K(*mt_ctx));
|
||||
} else {
|
||||
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "increase remaining data size less than 0!",
|
||||
K(size), K(old_size), K(new_size), K(*trans_ctx));
|
||||
}
|
||||
ObIMemtableCtx *mt_ctx = static_cast<ObIMemtableCtx *>(&host_);
|
||||
transaction::ObTransCtx *trans_ctx = mt_ctx ? mt_ctx->get_trans_ctx() : NULL;
|
||||
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "increase remaining data size less than 0!",
|
||||
K(size), K(old_size), K(new_size), KPC(mt_ctx), KPC(trans_ctx));
|
||||
}
|
||||
}
|
||||
return new_size;
|
||||
}
|
||||
|
||||
void ObTransCallbackMgr::inc_flushed_log_size(const int64_t size) {
|
||||
if (!is_parallel_logging_()) {
|
||||
ATOMIC_FAA(&flushed_log_size_, size);
|
||||
}
|
||||
}
|
||||
|
||||
int ObTransCallbackMgr::get_memtable_key_arr(ObMemtableKeyArray &memtable_key_arr)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -2184,6 +2186,34 @@ bool ObTransCallbackMgr::is_logging_blocked(bool &has_pending_log) const
|
||||
return all_blocked;
|
||||
}
|
||||
|
||||
int64_t ObTransCallbackMgr::get_pending_log_size() const
|
||||
{
|
||||
if (!is_parallel_logging_()) {
|
||||
return ATOMIC_LOAD(&pending_log_size_);
|
||||
} else {
|
||||
int64_t size = 0;
|
||||
int ret = OB_SUCCESS;
|
||||
CALLBACK_LISTS_FOREACH_CONST(idx, list) {
|
||||
size += list->get_pending_log_size();
|
||||
}
|
||||
return size;
|
||||
}
|
||||
}
|
||||
|
||||
int64_t ObTransCallbackMgr::get_flushed_log_size() const
|
||||
{
|
||||
if (!is_parallel_logging_()) {
|
||||
return ATOMIC_LOAD(&flushed_log_size_);
|
||||
} else {
|
||||
int64_t size = 0;
|
||||
int ret = OB_SUCCESS;
|
||||
CALLBACK_LISTS_FOREACH_CONST(idx, list) {
|
||||
size += list->get_logged_data_size();
|
||||
}
|
||||
return size;
|
||||
}
|
||||
}
|
||||
|
||||
}; // end namespace mvcc
|
||||
}; // end namespace oceanbase
|
||||
|
||||
|
@ -272,15 +272,11 @@ public:
|
||||
int clean_unlog_callbacks(int64_t &removed_cnt, common::ObFunction<void()> &before_remove);
|
||||
// when not inc, return -1
|
||||
int64_t inc_pending_log_size(const int64_t size);
|
||||
void inc_flushed_log_size(const int64_t size) {
|
||||
if (!serial_final_scn_.is_valid()) {
|
||||
UNUSED(ATOMIC_FAA(&flushed_log_size_, size));
|
||||
}
|
||||
}
|
||||
void inc_flushed_log_size(const int64_t size);
|
||||
void clear_pending_log_size() { ATOMIC_STORE(&pending_log_size_, 0); }
|
||||
int64_t get_pending_log_size() const { return ATOMIC_LOAD(&pending_log_size_); }
|
||||
int64_t get_pending_log_size() const;
|
||||
bool pending_log_size_too_large(const transaction::ObTxSEQ &write_seq_no, const int64_t limit);
|
||||
int64_t get_flushed_log_size() const { return ATOMIC_LOAD(&flushed_log_size_); }
|
||||
int64_t get_flushed_log_size() const;
|
||||
int get_log_guard(const transaction::ObTxSEQ &write_seq,
|
||||
ObCallbackListLogGuard &log_guard,
|
||||
int &cb_list_idx);
|
||||
|
@ -127,12 +127,14 @@ int ObTxCallbackList::append_callback(ObITransCallback *callback,
|
||||
}
|
||||
++appended_;
|
||||
ATOMIC_INC(&length_);
|
||||
data_size_ += callback->get_data_size();
|
||||
int64_t data_size = callback->get_data_size();
|
||||
data_size_ += data_size;
|
||||
if (repos_lc) {
|
||||
log_cursor_ = get_tail();
|
||||
}
|
||||
if (for_replay) {
|
||||
++logged_;
|
||||
logged_data_size_ += data_size;
|
||||
++synced_;
|
||||
}
|
||||
// Once callback is appended into callback lists, we can not handle the
|
||||
|
@ -199,6 +199,12 @@ public:
|
||||
bool has_pending_log() const {
|
||||
return ATOMIC_LOAD(&data_size_) - ATOMIC_LOAD(&logged_data_size_) > 0;
|
||||
}
|
||||
int64_t get_pending_log_size() const {
|
||||
return data_size_ - logged_data_size_;
|
||||
}
|
||||
int64_t get_logged_data_size() const {
|
||||
return logged_data_size_;
|
||||
}
|
||||
DECLARE_TO_STRING;
|
||||
private:
|
||||
const int16_t id_;
|
||||
|
Loading…
x
Reference in New Issue
Block a user