[OBCDC] Fix obcdc output wrong column value if column is 8 byte and encoded by datum
This commit is contained in:
@ -349,15 +349,15 @@ int MutatorRow::parse_columns_(
|
|||||||
|
|
||||||
// NOTE: Allow obj2str_helper and column_schema to be empty
|
// NOTE: Allow obj2str_helper and column_schema to be empty
|
||||||
if (OB_ISNULL(col_data) || OB_UNLIKELY(col_data_size <= 0)) {
|
if (OB_ISNULL(col_data) || OB_UNLIKELY(col_data_size <= 0)) {
|
||||||
LOG_ERROR("invalid argument", K(col_data_size), K(col_data));
|
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_ERROR("invalid argument", KR(ret), K(col_data_size), K(col_data));
|
||||||
}
|
}
|
||||||
// Validate cols values
|
// Validate cols values
|
||||||
else if (OB_UNLIKELY(cols.num_ > 0)) {
|
else if (OB_UNLIKELY(cols.num_ > 0)) {
|
||||||
LOG_ERROR("column value list is not reseted", K(cols));
|
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_ERROR("column value list is not reseted", KR(ret), K(cols));
|
||||||
} else if (OB_FAIL(row_reader.read_row(col_data, col_data_size, nullptr, datum_row))) {
|
} else if (OB_FAIL(row_reader.read_row(col_data, col_data_size, nullptr, datum_row))) {
|
||||||
TRANS_LOG(WARN, "Failed to read datum row", K(ret));
|
LOG_WARN("Failed to read datum row", KR(ret), K(tenant_id), K(table_id), K(is_parse_new_col));
|
||||||
} else {
|
} else {
|
||||||
LOG_DEBUG("prepare to handle datum_row", K(datum_row));
|
LOG_DEBUG("prepare to handle datum_row", K(datum_row));
|
||||||
// Iterate through all Cells using Cell Reader
|
// Iterate through all Cells using Cell Reader
|
||||||
@ -367,7 +367,10 @@ int MutatorRow::parse_columns_(
|
|||||||
ColumnSchemaInfo *column_schema_info = NULL;
|
ColumnSchemaInfo *column_schema_info = NULL;
|
||||||
blocksstable::ObStorageDatum &datum = datum_row.storage_datums_[column_stored_idx];
|
blocksstable::ObStorageDatum &datum = datum_row.storage_datums_[column_stored_idx];
|
||||||
|
|
||||||
if (datum.is_nop()) {
|
if (OB_FAIL(deep_copy_encoded_column_value_(datum))) {
|
||||||
|
LOG_ERROR("deep_copy_encoded_column_value_ failed", KR(ret),
|
||||||
|
K(tenant_id), K(table_id), K(column_stored_idx), K(datum), K(is_parse_new_col));
|
||||||
|
} else if (datum.is_nop()) {
|
||||||
LOG_DEBUG("ignore nop datum", K(column_stored_idx), K(datum));
|
LOG_DEBUG("ignore nop datum", K(column_stored_idx), K(datum));
|
||||||
} else if (OB_FAIL(get_column_info_(
|
} else if (OB_FAIL(get_column_info_(
|
||||||
tb_schema_info,
|
tb_schema_info,
|
||||||
@ -409,7 +412,7 @@ int MutatorRow::parse_columns_(
|
|||||||
LOG_ERROR("set_obj_propertie_ failed", K(column_id), K(column_stored_idx),
|
LOG_ERROR("set_obj_propertie_ failed", K(column_id), K(column_stored_idx),
|
||||||
KPC(column_schema_info), K(obj_meta), K(obj));
|
KPC(column_schema_info), K(obj_meta), K(obj));
|
||||||
} else if (OB_FAIL(datum.to_obj_enhance(obj, obj_meta))) {
|
} else if (OB_FAIL(datum.to_obj_enhance(obj, obj_meta))) {
|
||||||
LOG_ERROR("transfer datum to obj failed", K(ret), K(datum), K(obj_meta));
|
LOG_ERROR("transfer datum to obj failed", KR(ret), K(datum), K(obj_meta));
|
||||||
} else {
|
} else {
|
||||||
const bool is_lob_storage = obj_meta.is_lob_storage();
|
const bool is_lob_storage = obj_meta.is_lob_storage();
|
||||||
// Default is false
|
// Default is false
|
||||||
@ -763,7 +766,7 @@ int MutatorRow::parse_columns_(
|
|||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_ERROR("column value list is not reseted", KR(ret), K(cols));
|
LOG_ERROR("column value list is not reseted", KR(ret), K(cols));
|
||||||
} else if (OB_FAIL(row_reader.read_row(col_data, col_data_size, nullptr, datum_row))) {
|
} else if (OB_FAIL(row_reader.read_row(col_data, col_data_size, nullptr, datum_row))) {
|
||||||
LOG_ERROR("Failed to read datum row", K(ret));
|
LOG_ERROR("Failed to read datum row", KR(ret));
|
||||||
} else {
|
} else {
|
||||||
LOG_DEBUG("parse_columns_", K(is_parse_new_col), K(datum_row));
|
LOG_DEBUG("parse_columns_", K(is_parse_new_col), K(datum_row));
|
||||||
|
|
||||||
@ -774,7 +777,10 @@ int MutatorRow::parse_columns_(
|
|||||||
blocksstable::ObStorageDatum &datum = datum_row.storage_datums_[i];
|
blocksstable::ObStorageDatum &datum = datum_row.storage_datums_[i];
|
||||||
column_id = col_des_array[i].col_id_;
|
column_id = col_des_array[i].col_id_;
|
||||||
|
|
||||||
if (datum.is_nop()) {
|
if (OB_FAIL(deep_copy_encoded_column_value_(datum))) {
|
||||||
|
LOG_ERROR("deep_copy_encoded_column_value_ failed", KR(ret), "column_stored_idx", i, K(datum), K(is_parse_new_col));
|
||||||
|
} else if (datum.is_nop()) {
|
||||||
|
LOG_DEBUG("ignore nop datum", "column_stored_idx", i, K(datum));
|
||||||
} else if (OB_INVALID_ID == column_id) {
|
} else if (OB_INVALID_ID == column_id) {
|
||||||
// Note: the column_id obtained here may be invalid
|
// Note: the column_id obtained here may be invalid
|
||||||
// For example a delete statement with only one cell and an invalid column_id in the cell
|
// For example a delete statement with only one cell and an invalid column_id in the cell
|
||||||
@ -787,7 +793,7 @@ int MutatorRow::parse_columns_(
|
|||||||
if (OB_FAIL(set_obj_propertie_(column_id, i, table_schema, obj_meta, obj))) {
|
if (OB_FAIL(set_obj_propertie_(column_id, i, table_schema, obj_meta, obj))) {
|
||||||
LOG_ERROR("set_obj_propertie_ failed", K(column_id), K(i), K(obj_meta), K(obj));
|
LOG_ERROR("set_obj_propertie_ failed", K(column_id), K(i), K(obj_meta), K(obj));
|
||||||
} else if (OB_FAIL(datum.to_obj_enhance(obj, obj_meta))) {
|
} else if (OB_FAIL(datum.to_obj_enhance(obj, obj_meta))) {
|
||||||
LOG_ERROR("transfer datum to obj failed", K(ret), K(datum), K(obj_meta));
|
LOG_ERROR("transfer datum to obj failed", KR(ret), K(datum), K(obj_meta));
|
||||||
} else {
|
} else {
|
||||||
OB_ASSERT(obj.has_lob_header() == false); // debug only
|
OB_ASSERT(obj.has_lob_header() == false); // debug only
|
||||||
if (OB_FAIL(add_column_(cols, column_id, &obj))) {
|
if (OB_FAIL(add_column_(cols, column_id, &obj))) {
|
||||||
@ -832,6 +838,26 @@ int MutatorRow::parse_rowkey_(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int MutatorRow::deep_copy_encoded_column_value_(blocksstable::ObStorageDatum &datum)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
|
if (datum.need_copy_for_encoding_column_with_flat_format(OBJ_DATUM_STRING)) {
|
||||||
|
// local buffer will free while allocator_ reset
|
||||||
|
char* local_buffer = static_cast<char*>(allocator_.alloc(sizeof(uint64_t)));
|
||||||
|
|
||||||
|
if (OB_ISNULL(local_buffer)) {
|
||||||
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
|
LOG_WARN("Failed to allocate local buffer to store local encoded column datum", KR(ret));
|
||||||
|
} else {
|
||||||
|
MEMCPY(local_buffer, datum.ptr_, sizeof(uint64_t));
|
||||||
|
datum.set_string(local_buffer, sizeof(uint64_t));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int MutatorRow::add_column_(
|
int MutatorRow::add_column_(
|
||||||
ColValueList &cols,
|
ColValueList &cols,
|
||||||
const uint64_t column_id,
|
const uint64_t column_id,
|
||||||
|
|||||||
@ -285,6 +285,7 @@ private:
|
|||||||
const TableSchemaInfo *tb_schema_info,
|
const TableSchemaInfo *tb_schema_info,
|
||||||
const ObTimeZoneInfoWrap *tz_info_wrap,
|
const ObTimeZoneInfoWrap *tz_info_wrap,
|
||||||
const bool enable_output_hidden_primary_key);
|
const bool enable_output_hidden_primary_key);
|
||||||
|
int deep_copy_encoded_column_value_(blocksstable::ObStorageDatum &datum);
|
||||||
// 1. get column_id and column_schema_info for user table;
|
// 1. get column_id and column_schema_info for user table;
|
||||||
// 2. get column_id for all_ddl_operation_table
|
// 2. get column_id for all_ddl_operation_table
|
||||||
int get_column_info_(
|
int get_column_info_(
|
||||||
|
|||||||
@ -582,7 +582,12 @@ OB_INLINE int ObStorageDatum::from_buf_enhance(const char *buf, const int64_t bu
|
|||||||
} else {
|
} else {
|
||||||
reuse();
|
reuse();
|
||||||
len_ = static_cast<uint32_t>(buf_len);
|
len_ = static_cast<uint32_t>(buf_len);
|
||||||
if (buf_len > 0) {
|
if (sizeof(uint64_t) == buf_len) {
|
||||||
|
// To maintain the same processing method as other micro-block formats,
|
||||||
|
// we perform a deep copy on columns with a length of 8 bytes in flat micro-block format.
|
||||||
|
// see ObClusterColumnReader::read_column_from_buf
|
||||||
|
MEMCPY(no_cv(ptr_), buf, sizeof(uint64_t));
|
||||||
|
} else if (buf_len > 0) {
|
||||||
ptr_ = buf;
|
ptr_ = buf;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user