[BUG] fix after append cb core

This commit is contained in:
Handora 2023-04-24 07:11:32 +00:00 committed by ob-robot
parent 4b4abc7be1
commit 7beeb7dd31
7 changed files with 80 additions and 101 deletions

View File

@ -94,9 +94,9 @@ static int easy_decode_uint64(char *buf, const int64_t data_len, int64_t *pos, u
static int easy_decode_uint16(char *buf, const int64_t data_len, int64_t *pos, uint16_t *val)
{
int ret = (NULL != buf && data_len - *pos >= 2) ? EASY_OK : EASY_ERROR;
if (ret == EASY_OK) {
*val = (uint16_t)(((*(buf + (*pos)++)) & 0xff) << 8);
if (ret == EASY_OK) {
*val = (uint16_t)(((*(buf + (*pos)++)) & 0xff) << 8);
*val = (uint16_t)(*val | (*(buf + (*pos)++) & 0xff));
}
@ -134,9 +134,9 @@ static int easy_encode_negotiation_msg(easy_negotiation_msg_t *ne_msg, char *buf
buf[pos++] = ne_msg->msg_body.io_thread_index;
*encode_len = pos;
return ret;
}
return ret;
}
static int easy_decode_negotiation_msg(easy_negotiation_msg_t *ne_msg, char *recv_buf, int64_t recv_buf_len, int64_t *decode_len)
{
int ret = EASY_OK;

View File

@ -56,7 +56,7 @@ public:
void set_scn(const share::SCN scn);
share::SCN get_scn() const;
int before_append_cb(const bool is_replay);
int after_append_cb(const bool is_replay, const int ret_code);
void after_append_cb(const bool is_replay);
// interface for redo log generator
bool need_fill_redo() const { return need_fill_redo_; }
bool need_submit_log() const { return need_submit_log_; }
@ -67,7 +67,7 @@ public:
int log_sync_fail_cb();
// interface should be implement by subclasses
virtual int before_append(const bool is_replay) { return common::OB_SUCCESS; }
virtual int after_append(const bool is_replay, const int ret_code) { return common::OB_SUCCESS; }
virtual void after_append(const bool is_replay) {}
virtual int log_submitted() { return common::OB_SUCCESS; }
virtual int undo_log_submitted() { return common::OB_SUCCESS; }
virtual int log_sync(const share::SCN scn)
@ -87,7 +87,7 @@ public:
ObITransCallback *get_prev() const { return ATOMIC_LOAD(&prev_); }
void set_next(ObITransCallback *node) { ATOMIC_STORE(&next_, node); }
void set_prev(ObITransCallback *node) { ATOMIC_STORE(&prev_, node); }
int append(ObITransCallback *node);
void append(ObITransCallback *node);
public:
// trans_commit is called when txn commit. And you need to let the data know

View File

@ -89,18 +89,17 @@ SCN ObITransCallback::get_scn() const
int ObITransCallback::before_append_cb(const bool is_replay)
{
need_fill_redo_ = !is_replay;
need_submit_log_ = !is_replay;
return before_append(is_replay);
int ret = before_append(is_replay);
if (OB_SUCC(ret)) {
need_fill_redo_ = !is_replay;
need_submit_log_ = !is_replay;
}
return ret;
}
int ObITransCallback::after_append_cb(const bool is_replay, const int ret_code)
void ObITransCallback::after_append_cb(const bool is_replay)
{
if (OB_SUCCESS != ret_code) {
need_fill_redo_ = true;
need_submit_log_ = true;
}
return after_append(is_replay, ret_code);
(void)after_append(is_replay);
}
int ObITransCallback::log_submitted_cb()
@ -151,18 +150,13 @@ int ObITransCallback::log_sync_fail_cb()
return ret;
}
int ObITransCallback::append(ObITransCallback *node)
// All safety check is in before append
void ObITransCallback::append(ObITransCallback *node)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(node)) {
ret = OB_INVALID_ARGUMENT;
} else {
node->set_prev(this);
node->set_next(this->get_next());
this->get_next()->set_prev(node);
this->set_next(node);
}
return ret;
node->set_prev(this);
node->set_next(this->get_next());
this->get_next()->set_prev(node);
this->set_next(node);
}
int ObITransCallback::remove()
@ -241,56 +235,47 @@ int ObTransCallbackMgr::append(ObITransCallback *node)
const int64_t slot = tid % MAX_CALLBACK_LIST_COUNT;
int64_t stat = ATOMIC_LOAD(&parallel_stat_);
if (OB_FAIL(before_append(node))) {
TRANS_LOG(ERROR, "before_append failed", K(ret), K(node));
} else {
if (PARALLEL_STMT == stat) {
(void)before_append(node);
if (PARALLEL_STMT == stat) {
if (NULL == callback_lists_) {
WRLockGuard guard(rwlock_);
if (NULL == callback_lists_) {
WRLockGuard guard(rwlock_);
if (NULL == callback_lists_) {
ObTxCallbackList *tmp_callback_lists = NULL;
if (NULL == (tmp_callback_lists = (ObTxCallbackList *)cb_allocator_.alloc(
sizeof(ObTxCallbackList) * MAX_CALLBACK_LIST_COUNT))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
TRANS_LOG(WARN, "alloc cb lists fail", K(ret));
} else {
for (int i = 0; i < MAX_CALLBACK_LIST_COUNT; ++i) {
UNUSED(new(tmp_callback_lists + i) ObTxCallbackList(*this));
}
callback_lists_ = tmp_callback_lists;
}
}
}
if (OB_SUCC(ret)) {
if (NULL == callback_lists_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "callback lists is not inited", K(ret));
ObTxCallbackList *tmp_callback_lists = NULL;
if (NULL == (tmp_callback_lists = (ObTxCallbackList *)cb_allocator_.alloc(
sizeof(ObTxCallbackList) * MAX_CALLBACK_LIST_COUNT))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
TRANS_LOG(WARN, "alloc cb lists fail", K(ret));
} else {
ret = callback_lists_[slot].append_callback(node);
add_slave_list_append_cnt();
for (int i = 0; i < MAX_CALLBACK_LIST_COUNT; ++i) {
UNUSED(new(tmp_callback_lists + i) ObTxCallbackList(*this));
}
callback_lists_ = tmp_callback_lists;
}
}
} else {
ret = callback_list_.append_callback(node);
add_main_list_append_cnt();
}
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(after_append(node, ret))) {
TRANS_LOG(ERROR, "after_append failed", K(tmp_ret), K(node));
if (OB_SUCC(ret)) {
ret = tmp_ret;
if (OB_SUCC(ret)) {
if (NULL == callback_lists_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "callback lists is not inited", K(ret));
} else {
ret = callback_lists_[slot].append_callback(node, for_replay_);
add_slave_list_append_cnt();
}
}
} else {
ret = callback_list_.append_callback(node, for_replay_);
add_main_list_append_cnt();
}
after_append(node, ret);
return ret;
}
int ObTransCallbackMgr::before_append(ObITransCallback *node)
void ObTransCallbackMgr::before_append(ObITransCallback *node)
{
int ret = OB_SUCCESS;
int64_t size = node->get_data_size();
int64_t new_size = inc_pending_log_size(size);
@ -298,17 +283,10 @@ int ObTransCallbackMgr::before_append(ObITransCallback *node)
if (for_replay_) {
inc_flushed_log_size(size);
}
if (OB_FAIL(node->before_append_cb(for_replay_))) {
TRANS_LOG(ERROR, "before_append_cb failed", K(ret), K(node));
}
return ret;
}
int ObTransCallbackMgr::after_append(ObITransCallback *node, const int ret_code)
void ObTransCallbackMgr::after_append(ObITransCallback *node, const int ret_code)
{
int ret = OB_SUCCESS;
if (OB_SUCCESS != ret_code) {
int64_t size = node->get_data_size();
inc_pending_log_size(-1 * size);
@ -316,12 +294,6 @@ int ObTransCallbackMgr::after_append(ObITransCallback *node, const int ret_code)
inc_flushed_log_size(-1 * size);
}
}
if (OB_FAIL(node->after_append_cb(for_replay_, ret_code))) {
TRANS_LOG(ERROR, "after_before_append_cb failed", K(ret), K(node));
}
return ret;
}
int ObTransCallbackMgr::rollback_to(const int64_t to_seq_no,
@ -652,21 +624,9 @@ int ObMvccRowCallback::before_append(const bool is_replay)
return ret;
}
int ObMvccRowCallback::after_append(const bool is_replay, const int ret_code)
void ObMvccRowCallback::after_append(const bool is_replay)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(memtable_)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "memtable is NULL", K(ret));
} else if (OB_SUCCESS != ret_code) {
if (!is_replay) {
dec_unsubmitted_cnt_();
dec_unsynced_cnt_();
}
}
return ret;
// do nothing
}
int ObMvccRowCallback::log_submitted()

View File

@ -199,8 +199,8 @@ public:
void reset();
ObIMvccCtx &get_ctx() { return host_; }
int append(ObITransCallback *node);
int before_append(ObITransCallback *node);
int after_append(ObITransCallback *node, const int ret_code);
void before_append(ObITransCallback *node);
void after_append(ObITransCallback *node, const int ret_code);
void trans_start();
void calc_checksum_all();
void print_callbacks();
@ -405,7 +405,7 @@ public:
int64_t to_string(char *buf, const int64_t buf_len) const;
bool log_synced() const override { return share::SCN::max_scn() != scn_; }
virtual int before_append(const bool is_replay) override;
virtual int after_append(const bool is_replay, const int ret_code) override;
virtual void after_append(const bool is_replay) override;
virtual int log_submitted() override;
virtual int undo_log_submitted() override;
int64_t get_data_size()

View File

@ -47,13 +47,30 @@ void ObTxCallbackList::reset()
length_ = 0;
}
int ObTxCallbackList::append_callback(ObITransCallback *callback)
// the semantic of the append_callback is atomic which means the cb is removed
// and no side effect is taken effects if some unexpected failure has happened.
int ObTxCallbackList::append_callback(ObITransCallback *callback,
const bool for_replay)
{
int ret = OB_SUCCESS;
// It is important that we should put the before_append_cb and after_append_cb
// into the latch guard otherwise the callback may already paxosed and released
// before callback it.
SpinLockGuard lock(latch_);
if (OB_SUCC(get_tail()->append(callback))) {
length_ ++;
if (OB_ISNULL(callback)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "before_append_cb failed", K(ret), KPC(callback));
} else if (OB_FAIL(callback->before_append_cb(for_replay))) {
TRANS_LOG(WARN, "before_append_cb failed", K(ret), KPC(callback));
} else {
(void)get_tail()->append(callback);
length_++;
// Once callback is appended into callback lists, we can not handle the
// error after it. So it should never report the error later. What's more,
// after_append also should never return the error.
(void)callback->after_append_cb(for_replay);
}
return ret;

View File

@ -43,7 +43,7 @@ public:
void reset();
// append_callback will append your callback into the callback list
int append_callback(ObITransCallback *callback);
int append_callback(ObITransCallback *callback, const bool for_replay);
// concat_callbacks will append all callbacks in other into itself and reset
// other. And it will return the concat number during concat_callbacks.

View File

@ -159,7 +159,9 @@ public:
need_fill_redo,
scn);
EXPECT_NE(NULL, (long)cb);
EXPECT_EQ(OB_SUCCESS, callback_list_.append_callback(cb));
EXPECT_EQ(OB_SUCCESS, callback_list_.append_callback(cb, false/*for_replay*/));
cb->need_submit_log_ = need_submit_log;
cb->need_fill_redo_ = need_fill_redo;
return cb;
}