[OBCDC] fix obcdc stop when the disk of rocksdb is full
This commit is contained in:
parent
8bd0e4f049
commit
9345dccd58
@ -26,6 +26,36 @@
|
||||
#include "rocksdb/table_properties.h"
|
||||
#include "rocksdb/utilities/table_properties_collectors.h"
|
||||
|
||||
#define RETRY_FUNC_ON_IO_ERROR_WITH_USLEEP_MS(stop_flag, sleep_ms, var, func, args...) \
|
||||
do {\
|
||||
if (OB_SUCC(ret)) \
|
||||
{ \
|
||||
int64_t _retry_func_on_error_last_print_time = common::ObClockGenerator::getClock();\
|
||||
int64_t _retry_func_on_error_cur_print_time = 0;\
|
||||
const int64_t _PRINT_RETRY_FUNC_INTERVAL = 10 * _SEC_;\
|
||||
s = rocksdb::Status::IOError();\
|
||||
while (s.IsIOError() && ! (stop_flag)) \
|
||||
{ \
|
||||
s = (var).func(args); \
|
||||
if (s.IsIOError()) { \
|
||||
ob_usleep(sleep_ms); \
|
||||
}\
|
||||
_retry_func_on_error_cur_print_time = common::ObClockGenerator::getClock();\
|
||||
if (_retry_func_on_error_cur_print_time - _retry_func_on_error_last_print_time >= _PRINT_RETRY_FUNC_INTERVAL) {\
|
||||
LOG_DBA_WARN(OB_IO_ERROR, \
|
||||
"msg", "put value into rocksdb failed", \
|
||||
"error", s.ToString().c_str(), \
|
||||
"last_print_time", _retry_func_on_error_last_print_time); \
|
||||
_retry_func_on_error_last_print_time = _retry_func_on_error_cur_print_time; \
|
||||
}\
|
||||
} \
|
||||
if ((stop_flag)) \
|
||||
{ \
|
||||
ret = OB_IN_STOP_STATE; \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace libobcdc
|
||||
@ -176,8 +206,8 @@ int RocksDbStoreService::put(const std::string &key, const ObSlice &value)
|
||||
ret = OB_IN_STOP_STATE;
|
||||
} else {
|
||||
// find column family handle for cf
|
||||
rocksdb::Status s = m_db_->Put(
|
||||
writer_options,
|
||||
rocksdb::Status s;
|
||||
RETRY_FUNC_ON_IO_ERROR_WITH_USLEEP_MS(is_stopped(), 1 * _SEC_, (*m_db_), Put, writer_options,
|
||||
rocksdb::Slice(key.c_str(), key.size()),
|
||||
rocksdb::Slice(value.buf_, value.buf_len_));
|
||||
|
||||
@ -203,7 +233,9 @@ int RocksDbStoreService::put(void *cf_handle, const std::string &key, const ObSl
|
||||
} else if (is_stopped()) {
|
||||
ret = OB_IN_STOP_STATE;
|
||||
} else {
|
||||
rocksdb::Status s = m_db_->Put(writer_options, column_family_handle, rocksdb::Slice(key),
|
||||
rocksdb::Status s;
|
||||
RETRY_FUNC_ON_IO_ERROR_WITH_USLEEP_MS(is_stopped(), 1 * _SEC_, (*m_db_), Put, writer_options, column_family_handle,
|
||||
rocksdb::Slice(key),
|
||||
rocksdb::Slice(value.buf_, value.buf_len_));
|
||||
|
||||
if (!s.ok()) {
|
||||
@ -231,10 +263,11 @@ int RocksDbStoreService::batch_write(void *cf_handle,
|
||||
rocksdb::WriteBatch batch;
|
||||
|
||||
for (int64_t idx = 0; OB_SUCC(ret) && !is_stopped() && idx < keys.size(); ++idx) {
|
||||
rocksdb::Status s = batch.Put(
|
||||
column_family_handle,
|
||||
rocksdb::Status s;
|
||||
RETRY_FUNC_ON_IO_ERROR_WITH_USLEEP_MS(is_stopped(), 1 * _SEC_, batch, Put, column_family_handle,
|
||||
rocksdb::Slice(keys[idx]),
|
||||
rocksdb::Slice(values[idx].buf_, values[idx].buf_len_));
|
||||
|
||||
if (!s.ok()) {
|
||||
ret = OB_IO_ERROR;
|
||||
_LOG_ERROR("RocksDbStoreService build batch failed, error %s", s.ToString().c_str());
|
||||
@ -313,7 +346,9 @@ int RocksDbStoreService::del(const std::string &key)
|
||||
// find column family handle for cf
|
||||
rocksdb::WriteOptions writer_options;
|
||||
writer_options.disableWAL = true;
|
||||
rocksdb::Status s = m_db_->Delete(writer_options, key);
|
||||
rocksdb::Status s;
|
||||
RETRY_FUNC_ON_IO_ERROR_WITH_USLEEP_MS(is_stopped(), 1 * _SEC_, (*m_db_), Delete, writer_options, key);
|
||||
|
||||
if (!s.ok()) {
|
||||
ret = OB_IO_ERROR;
|
||||
_LOG_ERROR("delete %s from rocksdb failed, error %s", key.c_str(), s.ToString().c_str());
|
||||
@ -336,7 +371,9 @@ int RocksDbStoreService::del(void *cf_handle, const std::string &key)
|
||||
} else if (is_stopped()) {
|
||||
ret = OB_IN_STOP_STATE;
|
||||
} else {
|
||||
rocksdb::Status s = m_db_->Delete(writer_options, column_family_handle, key);
|
||||
rocksdb::Status s;
|
||||
RETRY_FUNC_ON_IO_ERROR_WITH_USLEEP_MS(is_stopped(), 1 * _SEC_, (*m_db_), Delete, writer_options,
|
||||
column_family_handle, key);
|
||||
|
||||
if (!s.ok()) {
|
||||
LOG_ERROR("delete %s from rocksdb failed, error %s", key.c_str(), s.ToString().c_str());
|
||||
@ -361,8 +398,9 @@ int RocksDbStoreService::del_range(void *cf_handle, const std::string &begin_key
|
||||
} else {
|
||||
rocksdb::WriteOptions writer_options;
|
||||
writer_options.disableWAL = true;
|
||||
rocksdb::Status s = m_db_->DeleteRange(writer_options, column_family_handle,
|
||||
begin_key, end_key);
|
||||
rocksdb::Status s;
|
||||
RETRY_FUNC_ON_IO_ERROR_WITH_USLEEP_MS(is_stopped(), 1 * _SEC_, (*m_db_), DeleteRange, writer_options,
|
||||
column_family_handle, begin_key, end_key);
|
||||
|
||||
if (!s.ok()) {
|
||||
LOG_ERROR("DeleteRange %s from rocksdb failed, error %s", begin_key.c_str(), s.ToString().c_str());
|
||||
|
Loading…
x
Reference in New Issue
Block a user