diff --git a/deps/easy/src/io/easy_negotiation.c b/deps/easy/src/io/easy_negotiation.c index 59ab193fa..5a2f18321 100644 --- a/deps/easy/src/io/easy_negotiation.c +++ b/deps/easy/src/io/easy_negotiation.c @@ -94,9 +94,9 @@ static int easy_decode_uint64(char *buf, const int64_t data_len, int64_t *pos, u static int easy_decode_uint16(char *buf, const int64_t data_len, int64_t *pos, uint16_t *val) { int ret = (NULL != buf && data_len - *pos >= 2) ? EASY_OK : EASY_ERROR; - - if (ret == EASY_OK) { - *val = (uint16_t)(((*(buf + (*pos)++)) & 0xff) << 8); + + if (ret == EASY_OK) { + *val = (uint16_t)(((*(buf + (*pos)++)) & 0xff) << 8); *val = (uint16_t)(*val | (*(buf + (*pos)++) & 0xff)); } @@ -134,9 +134,9 @@ static int easy_encode_negotiation_msg(easy_negotiation_msg_t *ne_msg, char *buf buf[pos++] = ne_msg->msg_body.io_thread_index; *encode_len = pos; - return ret; -} - + return ret; +} + static int easy_decode_negotiation_msg(easy_negotiation_msg_t *ne_msg, char *recv_buf, int64_t recv_buf_len, int64_t *decode_len) { int ret = EASY_OK; diff --git a/src/storage/memtable/mvcc/ob_mvcc.h b/src/storage/memtable/mvcc/ob_mvcc.h index c7b45777c..7649beac1 100644 --- a/src/storage/memtable/mvcc/ob_mvcc.h +++ b/src/storage/memtable/mvcc/ob_mvcc.h @@ -56,7 +56,7 @@ public: void set_scn(const share::SCN scn); share::SCN get_scn() const; int before_append_cb(const bool is_replay); - int after_append_cb(const bool is_replay, const int ret_code); + void after_append_cb(const bool is_replay); // interface for redo log generator bool need_fill_redo() const { return need_fill_redo_; } bool need_submit_log() const { return need_submit_log_; } @@ -67,7 +67,7 @@ public: int log_sync_fail_cb(); // interface should be implement by subclasses virtual int before_append(const bool is_replay) { return common::OB_SUCCESS; } - virtual int after_append(const bool is_replay, const int ret_code) { return common::OB_SUCCESS; } + virtual void after_append(const bool is_replay) {} virtual int log_submitted() { return common::OB_SUCCESS; } virtual int undo_log_submitted() { return common::OB_SUCCESS; } virtual int log_sync(const share::SCN scn) @@ -87,7 +87,7 @@ public: ObITransCallback *get_prev() const { return ATOMIC_LOAD(&prev_); } void set_next(ObITransCallback *node) { ATOMIC_STORE(&next_, node); } void set_prev(ObITransCallback *node) { ATOMIC_STORE(&prev_, node); } - int append(ObITransCallback *node); + void append(ObITransCallback *node); public: // trans_commit is called when txn commit. And you need to let the data know diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp index 88135cbe6..7ff55486b 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp @@ -89,18 +89,17 @@ SCN ObITransCallback::get_scn() const int ObITransCallback::before_append_cb(const bool is_replay) { - need_fill_redo_ = !is_replay; - need_submit_log_ = !is_replay; - return before_append(is_replay); + int ret = before_append(is_replay); + if (OB_SUCC(ret)) { + need_fill_redo_ = !is_replay; + need_submit_log_ = !is_replay; + } + return ret; } -int ObITransCallback::after_append_cb(const bool is_replay, const int ret_code) +void ObITransCallback::after_append_cb(const bool is_replay) { - if (OB_SUCCESS != ret_code) { - need_fill_redo_ = true; - need_submit_log_ = true; - } - return after_append(is_replay, ret_code); + (void)after_append(is_replay); } int ObITransCallback::log_submitted_cb() @@ -151,18 +150,13 @@ int ObITransCallback::log_sync_fail_cb() return ret; } -int ObITransCallback::append(ObITransCallback *node) +// All safety check is in before append +void ObITransCallback::append(ObITransCallback *node) { - int ret = OB_SUCCESS; - if (OB_ISNULL(node)) { - ret = OB_INVALID_ARGUMENT; - } else { - node->set_prev(this); - node->set_next(this->get_next()); - this->get_next()->set_prev(node); - this->set_next(node); - } - return ret; + node->set_prev(this); + node->set_next(this->get_next()); + this->get_next()->set_prev(node); + this->set_next(node); } int ObITransCallback::remove() @@ -241,56 +235,47 @@ int ObTransCallbackMgr::append(ObITransCallback *node) const int64_t slot = tid % MAX_CALLBACK_LIST_COUNT; int64_t stat = ATOMIC_LOAD(¶llel_stat_); - if (OB_FAIL(before_append(node))) { - TRANS_LOG(ERROR, "before_append failed", K(ret), K(node)); - } else { - if (PARALLEL_STMT == stat) { + (void)before_append(node); + + if (PARALLEL_STMT == stat) { + if (NULL == callback_lists_) { + WRLockGuard guard(rwlock_); if (NULL == callback_lists_) { - WRLockGuard guard(rwlock_); - if (NULL == callback_lists_) { - ObTxCallbackList *tmp_callback_lists = NULL; - if (NULL == (tmp_callback_lists = (ObTxCallbackList *)cb_allocator_.alloc( - sizeof(ObTxCallbackList) * MAX_CALLBACK_LIST_COUNT))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - TRANS_LOG(WARN, "alloc cb lists fail", K(ret)); - } else { - for (int i = 0; i < MAX_CALLBACK_LIST_COUNT; ++i) { - UNUSED(new(tmp_callback_lists + i) ObTxCallbackList(*this)); - } - callback_lists_ = tmp_callback_lists; - } - } - } - - if (OB_SUCC(ret)) { - if (NULL == callback_lists_) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(WARN, "callback lists is not inited", K(ret)); + ObTxCallbackList *tmp_callback_lists = NULL; + if (NULL == (tmp_callback_lists = (ObTxCallbackList *)cb_allocator_.alloc( + sizeof(ObTxCallbackList) * MAX_CALLBACK_LIST_COUNT))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + TRANS_LOG(WARN, "alloc cb lists fail", K(ret)); } else { - ret = callback_lists_[slot].append_callback(node); - add_slave_list_append_cnt(); + for (int i = 0; i < MAX_CALLBACK_LIST_COUNT; ++i) { + UNUSED(new(tmp_callback_lists + i) ObTxCallbackList(*this)); + } + callback_lists_ = tmp_callback_lists; } } - } else { - ret = callback_list_.append_callback(node); - add_main_list_append_cnt(); } - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(after_append(node, ret))) { - TRANS_LOG(ERROR, "after_append failed", K(tmp_ret), K(node)); - if (OB_SUCC(ret)) { - ret = tmp_ret; + if (OB_SUCC(ret)) { + if (NULL == callback_lists_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "callback lists is not inited", K(ret)); + } else { + ret = callback_lists_[slot].append_callback(node, for_replay_); + add_slave_list_append_cnt(); } } + } else { + ret = callback_list_.append_callback(node, for_replay_); + add_main_list_append_cnt(); } + after_append(node, ret); + return ret; } -int ObTransCallbackMgr::before_append(ObITransCallback *node) +void ObTransCallbackMgr::before_append(ObITransCallback *node) { - int ret = OB_SUCCESS; int64_t size = node->get_data_size(); int64_t new_size = inc_pending_log_size(size); @@ -298,17 +283,10 @@ int ObTransCallbackMgr::before_append(ObITransCallback *node) if (for_replay_) { inc_flushed_log_size(size); } - if (OB_FAIL(node->before_append_cb(for_replay_))) { - TRANS_LOG(ERROR, "before_append_cb failed", K(ret), K(node)); - } - - return ret; } -int ObTransCallbackMgr::after_append(ObITransCallback *node, const int ret_code) +void ObTransCallbackMgr::after_append(ObITransCallback *node, const int ret_code) { - int ret = OB_SUCCESS; - if (OB_SUCCESS != ret_code) { int64_t size = node->get_data_size(); inc_pending_log_size(-1 * size); @@ -316,12 +294,6 @@ int ObTransCallbackMgr::after_append(ObITransCallback *node, const int ret_code) inc_flushed_log_size(-1 * size); } } - - if (OB_FAIL(node->after_append_cb(for_replay_, ret_code))) { - TRANS_LOG(ERROR, "after_before_append_cb failed", K(ret), K(node)); - } - - return ret; } int ObTransCallbackMgr::rollback_to(const int64_t to_seq_no, @@ -652,21 +624,9 @@ int ObMvccRowCallback::before_append(const bool is_replay) return ret; } -int ObMvccRowCallback::after_append(const bool is_replay, const int ret_code) +void ObMvccRowCallback::after_append(const bool is_replay) { - int ret = OB_SUCCESS; - - if (OB_ISNULL(memtable_)) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(ERROR, "memtable is NULL", K(ret)); - } else if (OB_SUCCESS != ret_code) { - if (!is_replay) { - dec_unsubmitted_cnt_(); - dec_unsynced_cnt_(); - } - } - - return ret; + // do nothing } int ObMvccRowCallback::log_submitted() diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h index 1ff9f0471..3fda9791a 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h @@ -199,8 +199,8 @@ public: void reset(); ObIMvccCtx &get_ctx() { return host_; } int append(ObITransCallback *node); - int before_append(ObITransCallback *node); - int after_append(ObITransCallback *node, const int ret_code); + void before_append(ObITransCallback *node); + void after_append(ObITransCallback *node, const int ret_code); void trans_start(); void calc_checksum_all(); void print_callbacks(); @@ -405,7 +405,7 @@ public: int64_t to_string(char *buf, const int64_t buf_len) const; bool log_synced() const override { return share::SCN::max_scn() != scn_; } virtual int before_append(const bool is_replay) override; - virtual int after_append(const bool is_replay, const int ret_code) override; + virtual void after_append(const bool is_replay) override; virtual int log_submitted() override; virtual int undo_log_submitted() override; int64_t get_data_size() diff --git a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp index 54c25fe79..001929568 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp +++ b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp @@ -47,13 +47,30 @@ void ObTxCallbackList::reset() length_ = 0; } -int ObTxCallbackList::append_callback(ObITransCallback *callback) +// the semantic of the append_callback is atomic which means the cb is removed +// and no side effect is taken effects if some unexpected failure has happened. +int ObTxCallbackList::append_callback(ObITransCallback *callback, + const bool for_replay) { int ret = OB_SUCCESS; + // It is important that we should put the before_append_cb and after_append_cb + // into the latch guard otherwise the callback may already paxosed and released + // before callback it. SpinLockGuard lock(latch_); - if (OB_SUCC(get_tail()->append(callback))) { - length_ ++; + if (OB_ISNULL(callback)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "before_append_cb failed", K(ret), KPC(callback)); + } else if (OB_FAIL(callback->before_append_cb(for_replay))) { + TRANS_LOG(WARN, "before_append_cb failed", K(ret), KPC(callback)); + } else { + (void)get_tail()->append(callback); + length_++; + + // Once callback is appended into callback lists, we can not handle the + // error after it. So it should never report the error later. What's more, + // after_append also should never return the error. + (void)callback->after_append_cb(for_replay); } return ret; diff --git a/src/storage/memtable/mvcc/ob_tx_callback_list.h b/src/storage/memtable/mvcc/ob_tx_callback_list.h index c4d4c6f0f..d759540b1 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_list.h +++ b/src/storage/memtable/mvcc/ob_tx_callback_list.h @@ -43,7 +43,7 @@ public: void reset(); // append_callback will append your callback into the callback list - int append_callback(ObITransCallback *callback); + int append_callback(ObITransCallback *callback, const bool for_replay); // concat_callbacks will append all callbacks in other into itself and reset // other. And it will return the concat number during concat_callbacks. diff --git a/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp b/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp index a9fb5e75f..5b5f288a2 100644 --- a/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp +++ b/unittest/storage/memtable/mvcc/test_mvcc_callback.cpp @@ -159,7 +159,9 @@ public: need_fill_redo, scn); EXPECT_NE(NULL, (long)cb); - EXPECT_EQ(OB_SUCCESS, callback_list_.append_callback(cb)); + EXPECT_EQ(OB_SUCCESS, callback_list_.append_callback(cb, false/*for_replay*/)); + cb->need_submit_log_ = need_submit_log; + cb->need_fill_redo_ = need_fill_redo; return cb; }