* Fix bug(#2017): oblogminer incorrectly labeled LOB type NULL values from real logs as potentially inaccurate. Update oblogminer unit tests to adapt to the fixed bug.

* Enhancement(#2017): oblogminer incorrectly labeled LOB type NULL values from real logs as potentially inaccurate. Update oblogminer unit tests to adapt to the fixed bug.

* Enhancement(#2017): oblogminer incorrectly labeled  NULL values from real logs as potentially inaccurate. Update oblogminer unit tests to adapt to the fixed bug.

* Enhancement(#2017): oblogminer incorrectly labeled  NULL values from real logs as potentially inaccurate. Update oblogminer unit tests to adapt to the fixed bug.

* Enhancement(#2017): oblogminer incorrectly labeled  NULL values from real logs as potentially inaccurate. Update oblogminer unit tests to adapt to the fixed bug.

* Enhancement(#2017): oblogminer incorrectly labeled  NULL values from real logs as potentially inaccurate. Update oblogminer unit tests to adapt to the fixed bug.

* Enhancement(#2017): oblogminer incorrectly labeled  NULL values from real logs as potentially inaccurate. Update oblogminer unit tests to adapt to the fixed bug.Check all the format
This commit is contained in:
2549141519 2024-08-20 19:37:05 +08:00 committed by GitHub
parent c45c83e89d
commit 5e90f2342a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 248 additions and 75 deletions

View File

@ -441,24 +441,25 @@ int ObLogMinerRecord::build_dml_stmt_(ICDCRecord &cdc_rec)
binlogBuf *new_cols = cdc_rec.newCols(new_col_cnt);
binlogBuf *old_cols = cdc_rec.oldCols(old_col_cnt);
ITableMeta *tbl_meta = cdc_rec.getTableMeta();
// When updating or deleting records with lob type,
// the null value of the lob type column may be incorrect
// due to the limitations of obcdc.
bool has_lob_null = false;
// xmltype and sdo_geometry type don't support compare operation.
// When updating or deleting records with value null,
// the null value of the column may be incorrect
// due to the limitations of obcdc and the minimal mode.
// Currently, it only indicates that obcdc has mistakenly identified a NULL value.
bool has_unreliable_null = false;
// xmltype and sdo_geometry type don't support compare operation.
bool has_unsupport_type_compare = false;
if (OB_SUCC(ret)) {
switch(record_type_) {
// Insert records with lob type is accurate. obcdc will output all value of lob type.
// Insert records with null value is accurate. obcdc will output all value accurately.
case EINSERT: {
if (OB_FAIL(build_insert_stmt_(redo_stmt_, new_cols, new_col_cnt, tbl_meta))) {
LOG_ERROR("build insert redo stmt failed", KPC(this));
} else {
if (OB_FAIL(build_delete_stmt_(undo_stmt_, new_cols, new_col_cnt,
tbl_meta, has_lob_null, has_unsupport_type_compare))) {
tbl_meta, has_unreliable_null, has_unsupport_type_compare))) {
LOG_ERROR("build insert undo stmt failed", KPC(this));
} else {
// ignore has_lob_null
// ignore has_unreliable_null
if (has_unsupport_type_compare) {
APPEND_STMT(undo_stmt_, "/* POTENTIALLY INACCURATE */");
}
@ -467,25 +468,25 @@ int ObLogMinerRecord::build_dml_stmt_(ICDCRecord &cdc_rec)
break;
}
// Update records with lob type maybe inaccurate,
// Update records with null value maybe inaccurate,
// if NULL value appears in the pre/post mirror, the NULL may be incorrect.
case EUPDATE: {
if (OB_FAIL(build_update_stmt_(redo_stmt_, new_cols, new_col_cnt, old_cols,
old_col_cnt, tbl_meta, has_lob_null, has_unsupport_type_compare))) {
old_col_cnt, tbl_meta, has_unreliable_null, has_unsupport_type_compare))) {
LOG_ERROR("build update redo stmt failed", KPC(this));
} else {
if (has_lob_null || has_unsupport_type_compare) {
if (has_unreliable_null || has_unsupport_type_compare) {
APPEND_STMT(redo_stmt_, "/* POTENTIALLY INACCURATE */");
has_lob_null = false;
has_unreliable_null = false;
has_unsupport_type_compare = false;
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(build_update_stmt_(undo_stmt_, old_cols, old_col_cnt, new_cols,
new_col_cnt, tbl_meta, has_lob_null, has_unsupport_type_compare))) {
new_col_cnt, tbl_meta, has_unreliable_null, has_unsupport_type_compare))) {
LOG_ERROR("build update undo stmt failed", KPC(this));
} else {
if (has_lob_null || has_unsupport_type_compare) {
if (has_unreliable_null || has_unsupport_type_compare) {
APPEND_STMT(undo_stmt_, "/* POTENTIALLY INACCURATE */");
}
}
@ -493,25 +494,25 @@ int ObLogMinerRecord::build_dml_stmt_(ICDCRecord &cdc_rec)
break;
}
// Delete records with lob type maybe inaccurate,
// Delete records with null value maybe inaccurate,
// if NULL value appears in the pre mirror, the NULL may be incorrect.
case EDELETE: {
if (OB_FAIL(build_delete_stmt_(redo_stmt_, old_cols, old_col_cnt,
tbl_meta, has_lob_null, has_unsupport_type_compare))) {
tbl_meta, has_unreliable_null, has_unsupport_type_compare))) {
LOG_ERROR("build delete redo stmt failed", KPC(this));
} else {
if (has_lob_null || has_unsupport_type_compare) {
if (has_unreliable_null || has_unsupport_type_compare) {
APPEND_STMT(redo_stmt_, "/* POTENTIALLY INACCURATE */");
has_lob_null = false;
has_unreliable_null = false;
has_unsupport_type_compare = false;
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(build_insert_stmt_(undo_stmt_, old_cols,
old_col_cnt, tbl_meta, has_lob_null))) {
old_col_cnt, tbl_meta, has_unreliable_null))) {
LOG_ERROR("build delete undo stmt failed", KPC(this));
} else {
if (has_lob_null) {
if (has_unreliable_null) {
APPEND_STMT(undo_stmt_, "/* POTENTIALLY INACCURATE */");
}
}
@ -536,9 +537,9 @@ int ObLogMinerRecord::build_insert_stmt_(ObStringBuffer &stmt,
ITableMeta *tbl_meta)
{
int ret = OB_SUCCESS;
// ignore has_lob_null
bool has_lob_null = false;
if (OB_FAIL(build_insert_stmt_(stmt, new_cols, new_col_cnt, tbl_meta, has_lob_null))) {
// ignore has_unreliable_null
bool has_unreliable_null = false;
if (OB_FAIL(build_insert_stmt_(stmt, new_cols, new_col_cnt, tbl_meta, has_unreliable_null))) {
LOG_ERROR("build insert stmt failed", KPC(this));
}
return ret;
@ -547,7 +548,7 @@ int ObLogMinerRecord::build_insert_stmt_(ObStringBuffer &stmt,
binlogBuf *new_cols,
const unsigned int new_col_cnt,
ITableMeta *tbl_meta,
bool &has_lob_null)
bool &has_unreliable_null)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -594,8 +595,8 @@ int ObLogMinerRecord::build_insert_stmt_(ObStringBuffer &stmt,
"col_idx", i);
}
if (OB_SUCC(ret)) {
if (is_lob_type_(col_meta) && nullptr == new_cols[i].buf) {
has_lob_null = true;
if (nullptr == new_cols[i].buf && new_cols[i].m_origin == VALUE_ORIGIN::PADDING) {
has_unreliable_null = true;
}
}
}
@ -615,8 +616,8 @@ int ObLogMinerRecord::build_update_stmt_(ObStringBuffer &stmt,
binlogBuf *old_cols,
const unsigned int old_col_cnt,
ITableMeta *tbl_meta,
bool &has_lob_null,
bool &has_unsupport_type_compare)
bool &has_unreliable_null,
bool &has_unsupport_type_compare)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -653,15 +654,16 @@ int ObLogMinerRecord::build_update_stmt_(ObStringBuffer &stmt,
"col_idx", i);
}
if (OB_SUCC(ret)) {
if (is_lob_type_(col_meta) && nullptr == new_cols[i].buf) {
has_lob_null = true;
if ((nullptr == new_cols[i].buf && new_cols[i].m_origin == VALUE_ORIGIN::PADDING)
|| (nullptr != old_cols[i].buf && old_cols[i].m_origin == VALUE_ORIGIN::PADDING)) {
has_unreliable_null = true;
}
}
}
APPEND_STMT(stmt, " WHERE ");
if (OB_SUCC(ret) && OB_FAIL(build_where_conds_(stmt, old_cols, old_col_cnt,
tbl_meta, has_lob_null, has_unsupport_type_compare))) {
tbl_meta, has_unreliable_null, has_unsupport_type_compare))) {
LOG_ERROR("build where conds failed",);
}
if (lib::Worker::CompatMode::MYSQL == compat_mode_) {
@ -686,8 +688,8 @@ int ObLogMinerRecord::build_delete_stmt_(ObStringBuffer &stmt,
binlogBuf *old_cols,
const unsigned int old_col_cnt,
ITableMeta *tbl_meta,
bool &has_lob_null,
bool &has_unsupport_type_compare)
bool &has_unreliable_null,
bool &has_unsupport_type_compare)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -705,7 +707,7 @@ int ObLogMinerRecord::build_delete_stmt_(ObStringBuffer &stmt,
APPEND_STMT(stmt, " WHERE ");
if (OB_SUCC(ret) && OB_FAIL(build_where_conds_(stmt, old_cols, old_col_cnt,
tbl_meta, has_lob_null, has_unsupport_type_compare))) {
tbl_meta, has_unreliable_null, has_unsupport_type_compare))) {
LOG_ERROR("build where conds failed",);
}
if (lib::Worker::CompatMode::MYSQL == compat_mode_) {
@ -861,21 +863,21 @@ int ObLogMinerRecord::build_column_value_(ObStringBuffer &stmt,
}
int ObLogMinerRecord::build_where_conds_(ObStringBuffer &stmt,
binlogBuf *cols,
const unsigned int col_cnt,
ITableMeta *tbl_meta,
bool &has_lob_null,
bool &has_unsupport_type_compare)
binlogBuf *cols,
const unsigned int col_cnt,
ITableMeta *tbl_meta,
bool &has_unreliable_null,
bool &has_unsupport_type_compare)
{
int ret = OB_SUCCESS;
if (!unique_keys_.empty()) {
if (OB_FAIL(build_key_conds_(stmt, cols, col_cnt, tbl_meta,
unique_keys_, has_lob_null, has_unsupport_type_compare))) {
unique_keys_, has_unreliable_null, has_unsupport_type_compare))) {
LOG_ERROR("build unique keys failed", K(stmt), K(unique_keys_));
}
} else if (!primary_keys_.empty()) {
if (OB_FAIL(build_key_conds_(stmt, cols, col_cnt, tbl_meta,
primary_keys_, has_lob_null, has_unsupport_type_compare))) {
primary_keys_, has_unreliable_null, has_unsupport_type_compare))) {
LOG_ERROR("build primary keys failed", K(stmt), K(primary_keys_));
}
} else {
@ -885,7 +887,7 @@ int ObLogMinerRecord::build_where_conds_(ObStringBuffer &stmt,
APPEND_STMT(stmt, " AND ");
}
if (OB_SUCC(ret)) {
if (OB_FAIL(build_cond_(stmt, cols, i, tbl_meta, col_meta, has_lob_null, has_unsupport_type_compare))) {
if (OB_FAIL(build_cond_(stmt, cols, i, tbl_meta, col_meta, has_unreliable_null, has_unsupport_type_compare))) {
LOG_ERROR("build cond failed", "table_name", tbl_meta->getName());
}
}
@ -895,12 +897,12 @@ int ObLogMinerRecord::build_where_conds_(ObStringBuffer &stmt,
}
int ObLogMinerRecord::build_key_conds_(ObStringBuffer &stmt,
binlogBuf *cols,
const unsigned int col_cnt,
ITableMeta *tbl_meta,
const KeyArray &key,
bool &has_lob_null,
bool &has_unsupport_type_compare)
binlogBuf *cols,
const unsigned int col_cnt,
ITableMeta *tbl_meta,
const KeyArray &key,
bool &has_unreliable_null,
bool &has_unsupport_type_compare)
{
int ret = OB_SUCCESS;
for (int i = 0; OB_SUCC(ret) && i < key.count(); i++) {
@ -915,7 +917,7 @@ int ObLogMinerRecord::build_key_conds_(ObStringBuffer &stmt,
}
if (OB_SUCC(ret)) {
if (OB_FAIL(build_cond_(stmt, cols, col_idx, tbl_meta, col_meta,
has_lob_null, has_unsupport_type_compare))) {
has_unreliable_null, has_unsupport_type_compare))) {
LOG_ERROR("build cond failed", "table_name", tbl_meta->getName());
}
}
@ -925,12 +927,12 @@ int ObLogMinerRecord::build_key_conds_(ObStringBuffer &stmt,
}
int ObLogMinerRecord::build_cond_(ObStringBuffer &stmt,
binlogBuf *cols,
const unsigned int col_idx,
ITableMeta *tbl_meta,
IColMeta *col_meta,
bool &has_lob_null,
bool &has_unsupport_type_compare)
binlogBuf *cols,
const unsigned int col_idx,
ITableMeta *tbl_meta,
IColMeta *col_meta,
bool &has_unreliable_null,
bool &has_unsupport_type_compare)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(col_meta)) {
@ -940,7 +942,7 @@ int ObLogMinerRecord::build_cond_(ObStringBuffer &stmt,
if (is_lob_type_(col_meta) && nullptr != cols[col_idx].buf) {
// build lob type compare condition, excluding null value condition
if (OB_FAIL(build_lob_cond_(stmt, cols, col_idx, tbl_meta,
col_meta, has_lob_null, has_unsupport_type_compare))) {
col_meta, has_unreliable_null, has_unsupport_type_compare))) {
LOG_ERROR("build lob condition failed", "table_name", tbl_meta->getName());
}
} else {
@ -949,8 +951,8 @@ int ObLogMinerRecord::build_cond_(ObStringBuffer &stmt,
}
}
if (OB_SUCC(ret)) {
if (is_lob_type_(col_meta) && nullptr == cols[col_idx].buf) {
has_lob_null = true;
if (nullptr == cols[col_idx].buf && cols[col_idx].m_origin == VALUE_ORIGIN::PADDING) {
has_unreliable_null = true;
}
}
}
@ -958,12 +960,12 @@ int ObLogMinerRecord::build_cond_(ObStringBuffer &stmt,
}
int ObLogMinerRecord::build_lob_cond_(ObStringBuffer &stmt,
binlogBuf *cols,
const unsigned int col_idx,
ITableMeta *tbl_meta,
IColMeta *col_meta,
bool &has_lob_null,
bool &has_unsupport_type_compare)
binlogBuf *cols,
const unsigned int col_idx,
ITableMeta *tbl_meta,
IColMeta *col_meta,
bool &has_unreliable_null,
bool &has_unsupport_type_compare)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_lob_type_(col_meta) || nullptr == cols[col_idx].buf)) {
@ -1020,10 +1022,10 @@ int ObLogMinerRecord::build_lob_cond_(ObStringBuffer &stmt,
}
int ObLogMinerRecord::build_func_cond_(ObStringBuffer &stmt,
binlogBuf *cols,
const unsigned int col_idx,
binlogBuf *cols,
const unsigned int col_idx,
ITableMeta *tbl_meta,
IColMeta *col_meta,
IColMeta *col_meta,
const char *func_name)
{
int ret = OB_SUCCESS;
@ -1042,10 +1044,10 @@ int ObLogMinerRecord::build_func_cond_(ObStringBuffer &stmt,
}
int ObLogMinerRecord::build_normal_cond_(ObStringBuffer &stmt,
binlogBuf *cols,
const unsigned int col_idx,
ITableMeta *tbl_meta,
IColMeta *col_meta)
binlogBuf *cols,
const unsigned int col_idx,
ITableMeta *tbl_meta,
IColMeta *col_meta)
{
int ret = OB_SUCCESS;
APPEND_ESCAPE_CHAR(stmt);

View File

@ -65,6 +65,35 @@ TEST(test_ob_log_miner_record, InitObLogMinerRecord)
destroy_miner_br(br);
record.destroy();
br = build_logminer_br(new_buf, old_buf, EUPDATE, lib::Worker::CompatMode::MYSQL,
"tenant2.db2", "tbl2", 8, "id", "1", "2",
static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_VAR_STRING),
"name", nullptr, nullptr, static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_VAR_STRING));
EXPECT_EQ(OB_SUCCESS, record.init(*br));
EXPECT_STREQ("tenant2", record.tenant_name_.ptr());
EXPECT_STREQ("db2", record.database_name_.ptr());
EXPECT_STREQ("tbl2", record.table_name_.ptr());
EXPECT_EQ(OB_SUCCESS, record.build_stmts(*br));
EXPECT_STREQ("UPDATE `db2`.`tbl2` SET `id`='1', `name`=NULL WHERE `id`='2' AND `name` IS NULL LIMIT 1;", record.redo_stmt_.ptr());
EXPECT_STREQ("UPDATE `db2`.`tbl2` SET `id`='2', `name`=NULL WHERE `id`='1' AND `name` IS NULL LIMIT 1;", record.undo_stmt_.ptr());
destroy_miner_br(br);
record.destroy();
br = build_logminer_br(new_buf, old_buf, EUPDATE, lib::Worker::CompatMode::MYSQL,
"tenant2.db2", "tbl2", 8, "id", "1", "2",
static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_VAR_STRING),
"name", nullptr, nullptr, static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_VAR_STRING));
old_buf[1].m_origin = VALUE_ORIGIN::PADDING;
EXPECT_EQ(OB_SUCCESS, record.init(*br));
EXPECT_STREQ("tenant2", record.tenant_name_.ptr());
EXPECT_STREQ("db2", record.database_name_.ptr());
EXPECT_STREQ("tbl2", record.table_name_.ptr());
EXPECT_EQ(OB_SUCCESS, record.build_stmts(*br));
EXPECT_STREQ("UPDATE `db2`.`tbl2` SET `id`='1', `name`=NULL WHERE `id`='2' AND `name` IS NULL LIMIT 1;/* POTENTIALLY INACCURATE */", record.redo_stmt_.ptr());
EXPECT_STREQ("UPDATE `db2`.`tbl2` SET `id`='2', `name`=NULL WHERE `id`='1' AND `name` IS NULL LIMIT 1;/* POTENTIALLY INACCURATE */", record.undo_stmt_.ptr());
destroy_miner_br(br);
record.destroy();
br = build_logminer_br(new_buf, old_buf, EDELETE, lib::Worker::CompatMode::MYSQL,
"tenant2.db2", "tbl2", 8, "id", nullptr , "2",
static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_VAR_STRING),
@ -79,6 +108,35 @@ TEST(test_ob_log_miner_record, InitObLogMinerRecord)
destroy_miner_br(br);
record.destroy();
br = build_logminer_br(new_buf, old_buf, EDELETE, lib::Worker::CompatMode::MYSQL,
"tenant2.db2", "tbl2", 8, "id", nullptr , "2",
static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_VAR_STRING),
"name", nullptr, nullptr, static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_VAR_STRING));
EXPECT_EQ(OB_SUCCESS, record.init(*br));
EXPECT_STREQ("tenant2", record.tenant_name_.ptr());
EXPECT_STREQ("db2", record.database_name_.ptr());
EXPECT_STREQ("tbl2", record.table_name_.ptr());
EXPECT_EQ(OB_SUCCESS, record.build_stmts(*br));
EXPECT_STREQ("DELETE FROM `db2`.`tbl2` WHERE `id`='2' AND `name` IS NULL LIMIT 1;", record.redo_stmt_.ptr());
EXPECT_STREQ("INSERT INTO `db2`.`tbl2` (`id`, `name`) VALUES ('2', NULL);", record.undo_stmt_.ptr());
destroy_miner_br(br);
record.destroy();
br = build_logminer_br(new_buf, old_buf, EDELETE, lib::Worker::CompatMode::MYSQL,
"tenant2.db2", "tbl2", 8, "id", nullptr , "2",
static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_VAR_STRING),
"name", nullptr, nullptr, static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_VAR_STRING));
old_buf[1].m_origin = VALUE_ORIGIN::PADDING;
EXPECT_EQ(OB_SUCCESS, record.init(*br));
EXPECT_STREQ("tenant2", record.tenant_name_.ptr());
EXPECT_STREQ("db2", record.database_name_.ptr());
EXPECT_STREQ("tbl2", record.table_name_.ptr());
EXPECT_EQ(OB_SUCCESS, record.build_stmts(*br));
EXPECT_STREQ("DELETE FROM `db2`.`tbl2` WHERE `id`='2' AND `name` IS NULL LIMIT 1;/* POTENTIALLY INACCURATE */", record.redo_stmt_.ptr());
EXPECT_STREQ("INSERT INTO `db2`.`tbl2` (`id`, `name`) VALUES ('2', NULL);/* POTENTIALLY INACCURATE */", record.undo_stmt_.ptr());
destroy_miner_br(br);
record.destroy();
br = build_logminer_br(new_buf, old_buf, EDDL, lib::Worker::CompatMode::MYSQL,
"tenant2.db2", "", 4, "ddl_stmt_str", "CREATE TABLE T1(ID INT PRIMARY KEY);" , nullptr,
static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_VAR_STRING));
@ -442,6 +500,8 @@ TEST(test_ob_log_miner_record, LobTypeInMySqlMode)
"tenant2.db2", "tbl2", "utf8mb4", 8,
"col1", "{\"key\": \"new\"}", nullptr, static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_JSON),
"col2", "POINT(0 1)", "POINT(2 3)", static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_GEOMETRY));
new_buf[0].m_origin = VALUE_ORIGIN::PADDING;
old_buf[0].m_origin = VALUE_ORIGIN::PADDING;
EXPECT_EQ(OB_SUCCESS, record.init(*br));
EXPECT_STREQ("tenant2", record.tenant_name_.ptr());
EXPECT_STREQ("db2", record.database_name_.ptr());
@ -459,6 +519,8 @@ TEST(test_ob_log_miner_record, LobTypeInMySqlMode)
"col2", "POINT(0 1)", "POINT(2 3)", static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_GEOMETRY),
"col3", "1", nullptr, static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_LONG));
br->get_br()->getTableMeta()->setUKs("col3");
new_buf[0].m_origin = VALUE_ORIGIN::PADDING;
old_buf[0].m_origin = VALUE_ORIGIN::PADDING;
EXPECT_EQ(OB_SUCCESS, record.init(*br));
EXPECT_STREQ("tenant2", record.tenant_name_.ptr());
EXPECT_STREQ("db2", record.database_name_.ptr());
@ -469,6 +531,110 @@ TEST(test_ob_log_miner_record, LobTypeInMySqlMode)
destroy_miner_br(br);
record.destroy();
// delete without key
br = build_logminer_br(new_buf, old_buf, EDELETE, lib::Worker::CompatMode::MYSQL,
"tenant2.db2", "tbl2", "utf8mb4", 8,
"col1", nullptr, "{\"key\": \"old\"}", static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_JSON),
"col2", nullptr, "POINT(2 3)", static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_GEOMETRY));
new_buf[0].m_origin = VALUE_ORIGIN::PADDING;
old_buf[0].m_origin = VALUE_ORIGIN::PADDING;
EXPECT_EQ(OB_SUCCESS, record.init(*br));
EXPECT_STREQ("tenant2", record.tenant_name_.ptr());
EXPECT_STREQ("db2", record.database_name_.ptr());
EXPECT_STREQ("tbl2", record.table_name_.ptr());
EXPECT_EQ(OB_SUCCESS, record.build_stmts(*br));
EXPECT_STREQ("DELETE FROM `db2`.`tbl2` WHERE `col1`=cast('{\"key\": \"old\"}'as json) AND ST_Equals(`col2`, ST_GeomFromText('POINT(2 3)')) LIMIT 1;", record.redo_stmt_.ptr());
EXPECT_STREQ("INSERT INTO `db2`.`tbl2` (`col1`, `col2`) VALUES ('{\"key\": \"old\"}', ST_GeomFromText('POINT(2 3)'));", record.undo_stmt_.ptr());
destroy_miner_br(br);
record.destroy();
// delete with key
br = build_logminer_br(new_buf, old_buf, EDELETE, lib::Worker::CompatMode::MYSQL,
"tenant2.db2", "tbl2", "utf8mb4", 12,
"col1", nullptr, "{\"key\": \"old\"}", static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_JSON),
"col2", nullptr, "POINT(2 3)", static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_GEOMETRY),
"col3", nullptr, "2", static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_LONG));
br->get_br()->getTableMeta()->setUKs("col3");
new_buf[1].m_origin = VALUE_ORIGIN::PADDING;
old_buf[1].m_origin = VALUE_ORIGIN::PADDING;
EXPECT_EQ(OB_SUCCESS, record.init(*br));
EXPECT_STREQ("tenant2", record.tenant_name_.ptr());
EXPECT_STREQ("db2", record.database_name_.ptr());
EXPECT_STREQ("tbl2", record.table_name_.ptr());
EXPECT_EQ(OB_SUCCESS, record.build_stmts(*br));
EXPECT_STREQ("DELETE FROM `db2`.`tbl2` WHERE `col3`=2 LIMIT 1;", record.redo_stmt_.ptr());
EXPECT_STREQ("INSERT INTO `db2`.`tbl2` (`col1`, `col2`, `col3`) VALUES ('{\"key\": \"old\"}', ST_GeomFromText('POINT(2 3)'), 2);", record.undo_stmt_.ptr());
destroy_miner_br(br);
record.destroy();
// null value delete without key
br = build_logminer_br(new_buf, old_buf, EDELETE, lib::Worker::CompatMode::MYSQL,
"tenant2.db2", "tbl2", "utf8mb4", 8,
"col1", nullptr, nullptr, static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_JSON),
"col2", nullptr, "POINT(2 3)", static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_GEOMETRY));
new_buf[0].m_origin = VALUE_ORIGIN::PADDING;
old_buf[0].m_origin = VALUE_ORIGIN::PADDING;
EXPECT_EQ(OB_SUCCESS, record.init(*br));
EXPECT_STREQ("tenant2", record.tenant_name_.ptr());
EXPECT_STREQ("db2", record.database_name_.ptr());
EXPECT_STREQ("tbl2", record.table_name_.ptr());
EXPECT_EQ(OB_SUCCESS, record.build_stmts(*br));
EXPECT_STREQ("DELETE FROM `db2`.`tbl2` WHERE `col1` IS NULL AND ST_Equals(`col2`, ST_GeomFromText('POINT(2 3)')) LIMIT 1;/* POTENTIALLY INACCURATE */",record.redo_stmt_.ptr());
EXPECT_STREQ("INSERT INTO `db2`.`tbl2` (`col1`, `col2`) VALUES (NULL, ST_GeomFromText('POINT(2 3)'));/* POTENTIALLY INACCURATE */", record.undo_stmt_.ptr());
destroy_miner_br(br);
record.destroy();
// null value delete with key
br = build_logminer_br(new_buf, old_buf, EDELETE, lib::Worker::CompatMode::MYSQL,
"tenant2.db2", "tbl2", "utf8mb4", 12,
"col1", nullptr, nullptr, static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_JSON),
"col2", nullptr, "POINT(2 3)", static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_GEOMETRY),
"col3", nullptr, "1", static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_LONG));
new_buf[0].m_origin = VALUE_ORIGIN::PADDING;
old_buf[0].m_origin = VALUE_ORIGIN::PADDING;
br->get_br()->getTableMeta()->setUKs("col3");
EXPECT_EQ(OB_SUCCESS, record.init(*br));
EXPECT_STREQ("tenant2", record.tenant_name_.ptr());
EXPECT_STREQ("db2", record.database_name_.ptr());
EXPECT_STREQ("tbl2", record.table_name_.ptr());
EXPECT_EQ(OB_SUCCESS, record.build_stmts(*br));
EXPECT_STREQ("DELETE FROM `db2`.`tbl2` WHERE `col3`=1 LIMIT 1;", record.redo_stmt_.ptr());
EXPECT_STREQ("INSERT INTO `db2`.`tbl2` (`col1`, `col2`, `col3`) VALUES (NULL, ST_GeomFromText('POINT(2 3)'), 1);/* POTENTIALLY INACCURATE */", record.undo_stmt_.ptr());
destroy_miner_br(br);
record.destroy();
// null value update without key
br = build_logminer_br(new_buf, old_buf, EUPDATE, lib::Worker::CompatMode::MYSQL,
"tenant2.db2", "tbl2", "utf8mb4", 8,
"col1", "{\"key\": \"new\"}", nullptr, static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_JSON),
"col2", "POINT(0 1)", "POINT(2 3)", static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_GEOMETRY));
EXPECT_EQ(OB_SUCCESS, record.init(*br));
EXPECT_STREQ("tenant2", record.tenant_name_.ptr());
EXPECT_STREQ("db2", record.database_name_.ptr());
EXPECT_STREQ("tbl2", record.table_name_.ptr());
EXPECT_EQ(OB_SUCCESS, record.build_stmts(*br));
EXPECT_STREQ("UPDATE `db2`.`tbl2` SET `col1`='{\"key\": \"new\"}', `col2`=ST_GeomFromText('POINT(0 1)') WHERE `col1` IS NULL AND ST_Equals(`col2`, ST_GeomFromText('POINT(2 3)')) LIMIT 1;",record.redo_stmt_.ptr());
EXPECT_STREQ("UPDATE `db2`.`tbl2` SET `col1`=NULL, `col2`=ST_GeomFromText('POINT(2 3)') WHERE `col1`=cast('{\"key\": \"new\"}'as json) AND ST_Equals(`col2`, ST_GeomFromText('POINT(0 1)')) LIMIT 1;", record.undo_stmt_.ptr());
destroy_miner_br(br);
record.destroy();
// null value update with key
br = build_logminer_br(new_buf, old_buf, EUPDATE, lib::Worker::CompatMode::MYSQL,
"tenant2.db2", "tbl2", "utf8mb4", 12,
"col1", "{\"key\": \"new\"}", nullptr, static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_JSON),
"col2", "POINT(0 1)", "POINT(2 3)", static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_GEOMETRY),
"col3", "1", nullptr, static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_LONG));
br->get_br()->getTableMeta()->setUKs("col3");
EXPECT_EQ(OB_SUCCESS, record.init(*br));
EXPECT_STREQ("tenant2", record.tenant_name_.ptr());
EXPECT_STREQ("db2", record.database_name_.ptr());
EXPECT_STREQ("tbl2", record.table_name_.ptr());
EXPECT_EQ(OB_SUCCESS, record.build_stmts(*br));
EXPECT_STREQ("UPDATE `db2`.`tbl2` SET `col1`='{\"key\": \"new\"}', `col2`=ST_GeomFromText('POINT(0 1)'), `col3`=1 WHERE `col3` IS NULL LIMIT 1;", record.redo_stmt_.ptr());
EXPECT_STREQ("UPDATE `db2`.`tbl2` SET `col1`=NULL, `col2`=ST_GeomFromText('POINT(2 3)'), `col3`=NULL WHERE `col3`=1 LIMIT 1;", record.undo_stmt_.ptr());
destroy_miner_br(br);
record.destroy();
// delete without key
br = build_logminer_br(new_buf, old_buf, EDELETE, lib::Worker::CompatMode::MYSQL,
"tenant2.db2", "tbl2", "utf8mb4", 8,
@ -511,8 +677,8 @@ TEST(test_ob_log_miner_record, LobTypeInMySqlMode)
EXPECT_STREQ("db2", record.database_name_.ptr());
EXPECT_STREQ("tbl2", record.table_name_.ptr());
EXPECT_EQ(OB_SUCCESS, record.build_stmts(*br));
EXPECT_STREQ("DELETE FROM `db2`.`tbl2` WHERE `col1` IS NULL AND ST_Equals(`col2`, ST_GeomFromText('POINT(2 3)')) LIMIT 1;/* POTENTIALLY INACCURATE */",record.redo_stmt_.ptr());
EXPECT_STREQ("INSERT INTO `db2`.`tbl2` (`col1`, `col2`) VALUES (NULL, ST_GeomFromText('POINT(2 3)'));/* POTENTIALLY INACCURATE */", record.undo_stmt_.ptr());
EXPECT_STREQ("DELETE FROM `db2`.`tbl2` WHERE `col1` IS NULL AND ST_Equals(`col2`, ST_GeomFromText('POINT(2 3)')) LIMIT 1;",record.redo_stmt_.ptr());
EXPECT_STREQ("INSERT INTO `db2`.`tbl2` (`col1`, `col2`) VALUES (NULL, ST_GeomFromText('POINT(2 3)'));", record.undo_stmt_.ptr());
destroy_miner_br(br);
record.destroy();
@ -529,7 +695,7 @@ TEST(test_ob_log_miner_record, LobTypeInMySqlMode)
EXPECT_STREQ("tbl2", record.table_name_.ptr());
EXPECT_EQ(OB_SUCCESS, record.build_stmts(*br));
EXPECT_STREQ("DELETE FROM `db2`.`tbl2` WHERE `col3`=1 LIMIT 1;", record.redo_stmt_.ptr());
EXPECT_STREQ("INSERT INTO `db2`.`tbl2` (`col1`, `col2`, `col3`) VALUES (NULL, ST_GeomFromText('POINT(2 3)'), 1);/* POTENTIALLY INACCURATE */", record.undo_stmt_.ptr());
EXPECT_STREQ("INSERT INTO `db2`.`tbl2` (`col1`, `col2`, `col3`) VALUES (NULL, ST_GeomFromText('POINT(2 3)'), 1);", record.undo_stmt_.ptr());
destroy_miner_br(br);
record.destroy();
}
@ -673,6 +839,8 @@ TEST(test_ob_log_miner_record, LobTypeInOracleMode)
"col2", "SRID=NULL;POINT(0 1)", nullptr, static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_GEOMETRY),
"col3", nullptr, nullptr, drcmsg_field_types::DRCMSG_TYPE_ORA_XML,
"col4", "AABB1122", "1122", static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_ORA_CLOB));
new_buf[1].m_origin = VALUE_ORIGIN::PADDING;
old_buf[1].m_origin = VALUE_ORIGIN::PADDING;
EXPECT_EQ(OB_SUCCESS, record.init(*br));
EXPECT_STREQ("tenant2", record.tenant_name_.ptr());
EXPECT_STREQ("db2", record.database_name_.ptr());
@ -749,6 +917,8 @@ TEST(test_ob_log_miner_record, LobTypeInOracleMode)
"col2", nullptr, nullptr, static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_GEOMETRY),
"col3", nullptr, "<a>abc</a>", drcmsg_field_types::DRCMSG_TYPE_ORA_XML,
"col4", nullptr, "AABB1122", static_cast<int>(obmysql::EMySQLFieldType::MYSQL_TYPE_ORA_CLOB));
new_buf[1].m_origin = VALUE_ORIGIN::PADDING;
old_buf[1].m_origin = VALUE_ORIGIN::PADDING;
EXPECT_EQ(OB_SUCCESS, record.init(*br));
EXPECT_STREQ("tenant2", record.tenant_name_.ptr());
EXPECT_STREQ("db2", record.database_name_.ptr());
@ -777,6 +947,7 @@ TEST(test_ob_log_miner_record, LobTypeInOracleMode)
EXPECT_STREQ("INSERT INTO \"db2\".\"tbl2\" (\"col1\", \"col2\", \"col3\", \"col4\", \"col5\") VALUES ('{\"key\": \"value\"}', SDO_GEOMETRY('POINT(0 1)', NULL), '<a>abc</a>', 'AABB1122', 1);", record.undo_stmt_.ptr());
destroy_miner_br(br);
record.destroy();
}
}
@ -792,4 +963,4 @@ int main(int argc, char **argv)
logger.set_enable_async_log(false);
testing::InitGoogleTest(&argc,argv);
return RUN_ALL_TESTS();
}
}