From 9345dccd582455b32f8d77491dfa960c72b0ed02 Mon Sep 17 00:00:00 2001 From: fkuner <784819644@qq.com> Date: Wed, 22 May 2024 10:43:59 +0000 Subject: [PATCH] [OBCDC] fix obcdc stop when the disk of rocksdb is full --- .../src/ob_log_rocksdb_store_service.cpp | 56 ++++++++++++++++--- 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.cpp b/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.cpp index 07e7a0b73..d8d08d802 100644 --- a/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.cpp +++ b/src/logservice/libobcdc/src/ob_log_rocksdb_store_service.cpp @@ -26,6 +26,36 @@ #include "rocksdb/table_properties.h" #include "rocksdb/utilities/table_properties_collectors.h" +#define RETRY_FUNC_ON_IO_ERROR_WITH_USLEEP_MS(stop_flag, sleep_ms, var, func, args...) \ + do {\ + if (OB_SUCC(ret)) \ + { \ + int64_t _retry_func_on_error_last_print_time = common::ObClockGenerator::getClock();\ + int64_t _retry_func_on_error_cur_print_time = 0;\ + const int64_t _PRINT_RETRY_FUNC_INTERVAL = 10 * _SEC_;\ + s = rocksdb::Status::IOError();\ + while (s.IsIOError() && ! (stop_flag)) \ + { \ + s = (var).func(args); \ + if (s.IsIOError()) { \ + ob_usleep(sleep_ms); \ + }\ + _retry_func_on_error_cur_print_time = common::ObClockGenerator::getClock();\ + if (_retry_func_on_error_cur_print_time - _retry_func_on_error_last_print_time >= _PRINT_RETRY_FUNC_INTERVAL) {\ + LOG_DBA_WARN(OB_IO_ERROR, \ + "msg", "put value into rocksdb failed", \ + "error", s.ToString().c_str(), \ + "last_print_time", _retry_func_on_error_last_print_time); \ + _retry_func_on_error_last_print_time = _retry_func_on_error_cur_print_time; \ + }\ + } \ + if ((stop_flag)) \ + { \ + ret = OB_IN_STOP_STATE; \ + } \ + } \ + } while (0) + namespace oceanbase { namespace libobcdc @@ -176,8 +206,8 @@ int RocksDbStoreService::put(const std::string &key, const ObSlice &value) ret = OB_IN_STOP_STATE; } else { // find column family handle for cf - rocksdb::Status s = m_db_->Put( - writer_options, + rocksdb::Status s; + RETRY_FUNC_ON_IO_ERROR_WITH_USLEEP_MS(is_stopped(), 1 * _SEC_, (*m_db_), Put, writer_options, rocksdb::Slice(key.c_str(), key.size()), rocksdb::Slice(value.buf_, value.buf_len_)); @@ -203,7 +233,9 @@ int RocksDbStoreService::put(void *cf_handle, const std::string &key, const ObSl } else if (is_stopped()) { ret = OB_IN_STOP_STATE; } else { - rocksdb::Status s = m_db_->Put(writer_options, column_family_handle, rocksdb::Slice(key), + rocksdb::Status s; + RETRY_FUNC_ON_IO_ERROR_WITH_USLEEP_MS(is_stopped(), 1 * _SEC_, (*m_db_), Put, writer_options, column_family_handle, + rocksdb::Slice(key), rocksdb::Slice(value.buf_, value.buf_len_)); if (!s.ok()) { @@ -231,10 +263,11 @@ int RocksDbStoreService::batch_write(void *cf_handle, rocksdb::WriteBatch batch; for (int64_t idx = 0; OB_SUCC(ret) && !is_stopped() && idx < keys.size(); ++idx) { - rocksdb::Status s = batch.Put( - column_family_handle, + rocksdb::Status s; + RETRY_FUNC_ON_IO_ERROR_WITH_USLEEP_MS(is_stopped(), 1 * _SEC_, batch, Put, column_family_handle, rocksdb::Slice(keys[idx]), rocksdb::Slice(values[idx].buf_, values[idx].buf_len_)); + if (!s.ok()) { ret = OB_IO_ERROR; _LOG_ERROR("RocksDbStoreService build batch failed, error %s", s.ToString().c_str()); @@ -313,7 +346,9 @@ int RocksDbStoreService::del(const std::string &key) // find column family handle for cf rocksdb::WriteOptions writer_options; writer_options.disableWAL = true; - rocksdb::Status s = m_db_->Delete(writer_options, key); + rocksdb::Status s; + RETRY_FUNC_ON_IO_ERROR_WITH_USLEEP_MS(is_stopped(), 1 * _SEC_, (*m_db_), Delete, writer_options, key); + if (!s.ok()) { ret = OB_IO_ERROR; _LOG_ERROR("delete %s from rocksdb failed, error %s", key.c_str(), s.ToString().c_str()); @@ -336,7 +371,9 @@ int RocksDbStoreService::del(void *cf_handle, const std::string &key) } else if (is_stopped()) { ret = OB_IN_STOP_STATE; } else { - rocksdb::Status s = m_db_->Delete(writer_options, column_family_handle, key); + rocksdb::Status s; + RETRY_FUNC_ON_IO_ERROR_WITH_USLEEP_MS(is_stopped(), 1 * _SEC_, (*m_db_), Delete, writer_options, + column_family_handle, key); if (!s.ok()) { LOG_ERROR("delete %s from rocksdb failed, error %s", key.c_str(), s.ToString().c_str()); @@ -361,8 +398,9 @@ int RocksDbStoreService::del_range(void *cf_handle, const std::string &begin_key } else { rocksdb::WriteOptions writer_options; writer_options.disableWAL = true; - rocksdb::Status s = m_db_->DeleteRange(writer_options, column_family_handle, - begin_key, end_key); + rocksdb::Status s; + RETRY_FUNC_ON_IO_ERROR_WITH_USLEEP_MS(is_stopped(), 1 * _SEC_, (*m_db_), DeleteRange, writer_options, + column_family_handle, begin_key, end_key); if (!s.ok()) { LOG_ERROR("DeleteRange %s from rocksdb failed, error %s", begin_key.c_str(), s.ToString().c_str());