[FEAT MERGE] oblogminer[patch from 4.2.3]
Co-authored-by: qiuyg3 <qiuyg3@chinaunicom.cn>
This commit is contained in:
186
unittest/logminer/ob_log_miner_test_utils.cpp
Normal file
186
unittest/logminer/ob_log_miner_test_utils.cpp
Normal file
@ -0,0 +1,186 @@
|
||||
/**
|
||||
* Copyright (c) 2023 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#include "ob_log_binlog_record.h"
|
||||
#include "ob_log_miner_br.h"
|
||||
#include "ob_log_miner_test_utils.h"
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace oblogminer
|
||||
{
|
||||
|
||||
ObLogMinerBR *v_build_logminer_br(binlogBuf *new_bufs,
|
||||
binlogBuf *old_bufs,
|
||||
RecordType type,
|
||||
lib::Worker::CompatMode compat_mode,
|
||||
const char *db_name,
|
||||
const char *table_name,
|
||||
const char *encoding,
|
||||
const int arg_count, va_list *ap)
|
||||
{
|
||||
ObLogMinerBR *br = new ObLogMinerBR;
|
||||
EXPECT_NE(nullptr, br);
|
||||
libobcdc::ObLogBR *cdc_br = new libobcdc::ObLogBR;
|
||||
cdc_br->init_data(EINSERT, 0, 1, 0, ObString(), ObString(), ObString(),
|
||||
100, 100);
|
||||
ICDCRecord *rec = DRCMessageFactory::createBinlogRecord();
|
||||
rec->setUserData(cdc_br);
|
||||
EXPECT_NE(nullptr, rec);
|
||||
ITableMeta *tab_meta = DRCMessageFactory::createTableMeta();
|
||||
EXPECT_NE(nullptr, tab_meta);
|
||||
IDBMeta *db_meta = DRCMessageFactory::createDBMeta();
|
||||
EXPECT_NE(nullptr, db_meta);
|
||||
rec->setNewColumn(new_bufs, arg_count/3);
|
||||
rec->setOldColumn(old_bufs, arg_count/3);
|
||||
rec->setRecordType(type);
|
||||
rec->setDbname(db_name);
|
||||
rec->setTbname(table_name);
|
||||
db_meta->setName(db_name);
|
||||
tab_meta->setPKs("");
|
||||
tab_meta->setPkinfo("");
|
||||
tab_meta->setUKs("");
|
||||
tab_meta->setUkinfo("");
|
||||
tab_meta->setName(table_name);
|
||||
tab_meta->setDBMeta(db_meta);
|
||||
IColMeta *col_meta = nullptr;
|
||||
const char *next = nullptr;
|
||||
int64_t arg_idx = 0;
|
||||
for (int i = 0; i < arg_count; i++) {
|
||||
const char *col_arg = nullptr;
|
||||
int len = 0;
|
||||
int data_type = 0;
|
||||
if (3 > i%4) {
|
||||
col_arg = va_arg(*ap, const char *);
|
||||
len = col_arg == nullptr ? 0 : strlen(col_arg);
|
||||
} else {
|
||||
data_type = va_arg(*ap, int);
|
||||
}
|
||||
if (0 == i%4) { // column name
|
||||
col_meta = DRCMessageFactory::createColMeta();
|
||||
EXPECT_NE(col_meta, nullptr);
|
||||
col_meta->setName(col_arg);
|
||||
col_meta->setEncoding(encoding);
|
||||
tab_meta->append(col_arg, col_meta);
|
||||
} else if (1 == i%4 && EDELETE != type) { // new value, EDELETE hasn't new value
|
||||
rec->putNew(col_arg, len);
|
||||
} else if (2 == i%4 && EINSERT != type) { // old value, EINSERT hasn't old value
|
||||
rec->putOld(col_arg, len);
|
||||
} else if (3 == i%4) { // column data type
|
||||
col_meta->setType(data_type);
|
||||
}
|
||||
}
|
||||
rec->setTableMeta(tab_meta);
|
||||
br->init(nullptr, rec, compat_mode, 0, 1, 0);
|
||||
return br;
|
||||
}
|
||||
|
||||
ObLogMinerBR *build_logminer_br(binlogBuf *new_bufs,
|
||||
binlogBuf *old_bufs,
|
||||
RecordType type,
|
||||
lib::Worker::CompatMode compat_mode,
|
||||
const char *db_name,
|
||||
const char *table_name,
|
||||
const char *encoding,
|
||||
const int arg_count, ...)
|
||||
{
|
||||
va_list ap;
|
||||
ObLogMinerBR *br = nullptr;
|
||||
va_start(ap, arg_count);
|
||||
br = v_build_logminer_br(new_bufs, old_bufs, type,
|
||||
compat_mode, db_name, table_name, encoding, arg_count, &ap);
|
||||
va_end(ap);
|
||||
return br;
|
||||
}
|
||||
|
||||
ObLogMinerBR *build_logminer_br(binlogBuf *new_bufs,
|
||||
binlogBuf *old_bufs,
|
||||
RecordType type,
|
||||
lib::Worker::CompatMode compat_mode,
|
||||
const char *db_name,
|
||||
const char *table_name,
|
||||
const int arg_count, ...)
|
||||
{
|
||||
va_list ap;
|
||||
ObLogMinerBR *br = nullptr;
|
||||
va_start(ap, arg_count);
|
||||
br = v_build_logminer_br(new_bufs, old_bufs, type,
|
||||
compat_mode, db_name, table_name, "utf8mb4", arg_count, &ap);
|
||||
va_end(ap);
|
||||
return br;
|
||||
}
|
||||
|
||||
void destroy_miner_br(ObLogMinerBR *&br) {
|
||||
ICDCRecord *cdc_br = br->get_br();
|
||||
IDBMeta *db_meta = cdc_br->getDBMeta();
|
||||
ITableMeta *tbl_meta = cdc_br->getTableMeta();
|
||||
delete static_cast<libobcdc::ObLogBR*>(cdc_br->getUserData());
|
||||
DRCMessageFactory::destroy(db_meta);
|
||||
DRCMessageFactory::destroy(tbl_meta);
|
||||
DRCMessageFactory::destroy(cdc_br);
|
||||
delete br;
|
||||
br = nullptr;
|
||||
}
|
||||
|
||||
ObLogMinerRecord *build_logminer_record(ObIAllocator &alloc,
|
||||
lib::Worker::CompatMode compat_mode,
|
||||
uint64_t tenant_id,
|
||||
int64_t orig_cluster_id,
|
||||
const char *tenant_name,
|
||||
const char *database_name,
|
||||
const char *tbl_name,
|
||||
int64_t trans_id,
|
||||
const char* const * pks,
|
||||
const int64_t pk_cnt,
|
||||
const char* const * uks,
|
||||
const int64_t uk_cnt,
|
||||
const char* row_unique_id,
|
||||
RecordType record_type,
|
||||
int64_t commit_ts_us,
|
||||
const char * redo_stmt,
|
||||
const char * undo_stmt)
|
||||
{
|
||||
void *ptr = ob_malloc(sizeof(ObLogMinerRecord), "testminer_rec");
|
||||
ObLogMinerRecord *rec = new (ptr) (ObLogMinerRecord);
|
||||
rec->set_allocator(&alloc);
|
||||
|
||||
rec->compat_mode_ = compat_mode;
|
||||
rec->tenant_id_ = tenant_id;
|
||||
rec->orig_cluster_id_ = orig_cluster_id;
|
||||
rec->tenant_name_.assign(tenant_name);
|
||||
rec->database_name_.assign(database_name);
|
||||
rec->table_name_.assign(tbl_name);
|
||||
rec->trans_id_ = trans_id;
|
||||
for (int i = 0; i < pk_cnt; i++) {
|
||||
rec->primary_keys_.push_back(pks[i]);
|
||||
}
|
||||
for (int i = 0; i < uk_cnt; i++) {
|
||||
rec->unique_keys_.push_back(uks[i]);
|
||||
}
|
||||
rec->row_unique_id_ = row_unique_id;
|
||||
rec->record_type_ = record_type;
|
||||
rec->commit_scn_.convert_from_ts(commit_ts_us);
|
||||
rec->redo_stmt_.append(redo_stmt);
|
||||
rec->undo_stmt_.append(undo_stmt);
|
||||
|
||||
return rec;
|
||||
}
|
||||
|
||||
void destroy_miner_record(ObLogMinerRecord *&rec)
|
||||
{
|
||||
rec->redo_stmt_.reset();
|
||||
rec->undo_stmt_.reset();
|
||||
ob_free(rec);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user