Fix core that dtl free dtl buffer from rpc wrongly
This commit is contained in:
@ -96,7 +96,6 @@ int ObDtlLocalChannel::send_shared_message(ObDtlLinkedBuffer*& buf)
|
|||||||
KP(buf));
|
KP(buf));
|
||||||
bool is_eof = buf->is_eof();
|
bool is_eof = buf->is_eof();
|
||||||
if (OB_FAIL(DTL.get_dfc_server().cache(/*buf->tenant_id(), */ peer_id_, buf, true))) {
|
if (OB_FAIL(DTL.get_dfc_server().cache(/*buf->tenant_id(), */ peer_id_, buf, true))) {
|
||||||
ret = tmp_ret;
|
|
||||||
LOG_WARN("get DTL channel fail", KP(peer_id_), "peer", get_peer(), K(ret), K(tmp_ret));
|
LOG_WARN("get DTL channel fail", KP(peer_id_), "peer", get_peer(), K(ret), K(tmp_ret));
|
||||||
} else {
|
} else {
|
||||||
// return block after cache first msg
|
// return block after cache first msg
|
||||||
|
|||||||
@ -38,7 +38,9 @@ int ObDtlLocalFirstBufferCache::ObDtlBufferClean::operator()(sql::dtl::ObDtlCach
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObDtlLinkedBuffer* linked_buffer = buffer_info->buffer();
|
ObDtlLinkedBuffer* linked_buffer = buffer_info->buffer();
|
||||||
buffer_info->set_buffer(nullptr);
|
buffer_info->set_buffer(nullptr);
|
||||||
tenant_mem_mgr_->free(linked_buffer);
|
if (nullptr != linked_buffer) {
|
||||||
|
tenant_mem_mgr_->free(linked_buffer);
|
||||||
|
}
|
||||||
buffer_info->reset();
|
buffer_info->reset();
|
||||||
buffer_info_mgr_->free_buffer_info(buffer_info);
|
buffer_info_mgr_->free_buffer_info(buffer_info);
|
||||||
return ret;
|
return ret;
|
||||||
@ -113,23 +115,27 @@ int ObDtlLocalFirstBufferCache::cache_buffer(ObDtlCacheBufferInfo*& buffer)
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_NOT_NULL(buffer)) {
|
if (OB_NOT_NULL(buffer)) {
|
||||||
uint64_t chan_id = buffer->chid();
|
uint64_t chan_id = buffer->chid();
|
||||||
if (OB_FAIL(buffer_map_.set_refactored(chan_id, buffer))) {
|
#ifdef ERRSIM
|
||||||
|
// -17 enable failed set refactored
|
||||||
|
ret = E(EventTable::EN_DTL_ONE_ROW_ONE_BUFFER) ret;
|
||||||
|
#endif
|
||||||
|
int64_t tmp_ret = ret;
|
||||||
|
if (TP_ENABLE_FAILED_SET_HT == ret) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("failed to set refactor", K(ret), K(tmp_ret));
|
||||||
|
} else if (OB_FAIL(buffer_map_.set_refactored(chan_id, buffer))) {
|
||||||
LOG_WARN("failed to insert buffer map", K(ret));
|
LOG_WARN("failed to insert buffer map", K(ret));
|
||||||
} else if (OB_FAIL(set_first_buffer(chan_id))) {
|
|
||||||
int tmp_ret = OB_SUCCESS;
|
|
||||||
ObDtlCacheBufferInfo* tmp_buffer = nullptr;
|
|
||||||
if (tmp_ret != buffer_map_.erase_refactored(chan_id, tmp_buffer)) {
|
|
||||||
LOG_WARN("failed to insert buffer map", K(ret), K(tmp_ret));
|
|
||||||
} else if (nullptr == tmp_buffer) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("buffer is null", K(ret), K(tmp_ret));
|
|
||||||
} else {
|
|
||||||
buffer = tmp_buffer;
|
|
||||||
}
|
|
||||||
LOG_WARN("failed to set first buffer", K(ret), K(chan_id), KP(chan_id));
|
|
||||||
} else {
|
} else {
|
||||||
inc_first_buffer_cnt();
|
buffer = nullptr;
|
||||||
LOG_DEBUG("trace cache buffer", KP(chan_id), KP(this), K(dfo_key_));
|
if (TP_ENABLE_FAILED_SET_FIRST_BUFFER == tmp_ret) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("failed to set first buffer", K(ret), K(tmp_ret));
|
||||||
|
} else if (OB_FAIL(set_first_buffer(chan_id))) {
|
||||||
|
LOG_WARN("failed to set first buffer", K(ret), K(chan_id), KP(chan_id));
|
||||||
|
} else {
|
||||||
|
inc_first_buffer_cnt();
|
||||||
|
LOG_DEBUG("trace cache buffer", KP(chan_id), KP(this), K(dfo_key_));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -421,7 +427,18 @@ int ObDtlLocalFirstBufferCacheManager::cache_buffer(int64_t chid, ObDtlLinkedBuf
|
|||||||
buf_info->set_buffer(data_buffer);
|
buf_info->set_buffer(data_buffer);
|
||||||
data_buffer = nullptr;
|
data_buffer = nullptr;
|
||||||
} else {
|
} else {
|
||||||
ObDtlLinkedBuffer* buffer = tenant_mem_mgr_->alloc(chid, data_buffer->size());
|
ObDtlLinkedBuffer *buffer = nullptr;
|
||||||
|
#ifdef ERRSIM
|
||||||
|
// -16 enable failed to alloc memory
|
||||||
|
ret = E(EventTable::EN_DTL_ONE_ROW_ONE_BUFFER) ret;
|
||||||
|
#endif
|
||||||
|
if (OB_SUCC(ret) || TP_ENABLE_FAILED_ALLOC_MEM != ret) {
|
||||||
|
ret = OB_SUCCESS;
|
||||||
|
buffer = tenant_mem_mgr_->alloc(chid, data_buffer->size());
|
||||||
|
} else {
|
||||||
|
ret = OB_SUCCESS;
|
||||||
|
buffer = nullptr;
|
||||||
|
}
|
||||||
if (nullptr != buffer) {
|
if (nullptr != buffer) {
|
||||||
ObDtlLinkedBuffer::assign(*data_buffer, buffer);
|
ObDtlLinkedBuffer::assign(*data_buffer, buffer);
|
||||||
buf_info->set_buffer(buffer);
|
buf_info->set_buffer(buffer);
|
||||||
@ -442,13 +459,10 @@ int ObDtlLocalFirstBufferCacheManager::cache_buffer(int64_t chid, ObDtlLinkedBuf
|
|||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
if (nullptr != buf_info) {
|
if (nullptr != buf_info) {
|
||||||
if (nullptr == data_buffer) {
|
ObDtlLinkedBuffer *buffer = buf_info->buffer();
|
||||||
// if cache buffer failed, should still send it. Can't just free it
|
buf_info->set_buffer(nullptr);
|
||||||
data_buffer = buf_info->buffer();
|
if (nullptr != buffer) {
|
||||||
buf_info->set_buffer(nullptr);
|
tenant_mem_mgr_->free(buffer);
|
||||||
}
|
|
||||||
if (!attach && OB_NOT_NULL(data_buffer)) {
|
|
||||||
tenant_mem_mgr_->free(data_buffer);
|
|
||||||
}
|
}
|
||||||
if (OB_SUCCESS != (tmp_ret = buffer_info_mgr_.free_buffer_info(buf_info))) {
|
if (OB_SUCCESS != (tmp_ret = buffer_info_mgr_.free_buffer_info(buf_info))) {
|
||||||
LOG_WARN("failed to free buffer info", K(ret), K(tmp_ret));
|
LOG_WARN("failed to free buffer info", K(ret), K(tmp_ret));
|
||||||
|
|||||||
@ -566,6 +566,8 @@ private:
|
|||||||
static const int64_t CONCURRENT_CNT = 2048;
|
static const int64_t CONCURRENT_CNT = 2048;
|
||||||
static const int64_t CHANNEL_HASH_BUCKET_NUM = 64 * 1024;
|
static const int64_t CHANNEL_HASH_BUCKET_NUM = 64 * 1024;
|
||||||
static const int64_t MAX_BITSET_CNT = 1024 * 1024; // 1024 * 1024
|
static const int64_t MAX_BITSET_CNT = 1024 * 1024; // 1024 * 1024
|
||||||
|
static const int64_t TP_ENABLE_FAILED_SET_HT = -17;
|
||||||
|
static const int64_t TP_ENABLE_FAILED_SET_FIRST_BUFFER = -18;
|
||||||
int64_t pins_;
|
int64_t pins_;
|
||||||
ObDtlDfoKey dfo_key_;
|
ObDtlDfoKey dfo_key_;
|
||||||
ObDtlFirstBufferHashTable<uint64_t, ObDtlCacheBufferInfo> buffer_map_;
|
ObDtlFirstBufferHashTable<uint64_t, ObDtlCacheBufferInfo> buffer_map_;
|
||||||
@ -603,6 +605,7 @@ public:
|
|||||||
private:
|
private:
|
||||||
static const int64_t CONCURRENT_CNT = 1024;
|
static const int64_t CONCURRENT_CNT = 1024;
|
||||||
static const int64_t BUCKET_NUM = 64 * 1024;
|
static const int64_t BUCKET_NUM = 64 * 1024;
|
||||||
|
static const int64_t TP_ENABLE_FAILED_ALLOC_MEM = -16;
|
||||||
uint64_t tenant_id_;
|
uint64_t tenant_id_;
|
||||||
common::ObFIFOAllocator allocator_;
|
common::ObFIFOAllocator allocator_;
|
||||||
ObDtlTenantMemManager* tenant_mem_mgr_;
|
ObDtlTenantMemManager* tenant_mem_mgr_;
|
||||||
|
|||||||
@ -188,7 +188,7 @@ int ObDtlRpcChannel::feedup(ObDtlLinkedBuffer*& buffer)
|
|||||||
LOG_WARN("control channel can't drain msg", K(ret));
|
LOG_WARN("control channel can't drain msg", K(ret));
|
||||||
}
|
}
|
||||||
} else if (OB_ISNULL(linked_buffer = alloc_buf(buffer->size()))) {
|
} else if (OB_ISNULL(linked_buffer = alloc_buf(buffer->size()))) {
|
||||||
ret = OB_REACH_MEMORY_LIMIT;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
LOG_WARN("failed to allocate buffer", K(ret));
|
LOG_WARN("failed to allocate buffer", K(ret));
|
||||||
} else {
|
} else {
|
||||||
LOG_TRACE("DTL feedup a new msg to msg loop", K(buffer->size()), KP(id_), K(peer_));
|
LOG_TRACE("DTL feedup a new msg to msg loop", K(buffer->size()), KP(id_), K(peer_));
|
||||||
|
|||||||
@ -63,7 +63,6 @@ int ObDtlSendMessageP::process_msg(ObDtlRpcDataResponse& response, ObDtlSendArgs
|
|||||||
} else if (arg.buffer_.is_data_msg() && 1 == arg.buffer_.seq_no()) {
|
} else if (arg.buffer_.is_data_msg() && 1 == arg.buffer_.seq_no()) {
|
||||||
ObDtlLinkedBuffer* buf = &arg.buffer_;
|
ObDtlLinkedBuffer* buf = &arg.buffer_;
|
||||||
if (OB_FAIL(DTL.get_dfc_server().cache(arg.buffer_.tenant_id(), arg.chid_, buf))) {
|
if (OB_FAIL(DTL.get_dfc_server().cache(arg.buffer_.tenant_id(), arg.chid_, buf))) {
|
||||||
ret = tmp_ret;
|
|
||||||
LOG_WARN("get DTL channel fail",
|
LOG_WARN("get DTL channel fail",
|
||||||
KP(arg.chid_),
|
KP(arg.chid_),
|
||||||
K(ret),
|
K(ret),
|
||||||
|
|||||||
@ -136,7 +136,7 @@ ObDtlLinkedBuffer* ObDtlTenantMemManager::alloc(int64_t chid, int64_t size)
|
|||||||
++n_times;
|
++n_times;
|
||||||
buf = mem_mgr->alloc(chid, size);
|
buf = mem_mgr->alloc(chid, size);
|
||||||
if (nullptr == buf) {
|
if (nullptr == buf) {
|
||||||
ret = OB_REACH_MEMORY_LIMIT;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
LOG_WARN("failed to allocate dtl buffer memory", K(ret));
|
LOG_WARN("failed to allocate dtl buffer memory", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user