diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp b/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp index 40c1532057..f56fd080d8 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp +++ b/src/logservice/libobcdc/src/ob_log_part_trans_task.cpp @@ -349,15 +349,15 @@ int MutatorRow::parse_columns_( // NOTE: Allow obj2str_helper and column_schema to be empty if (OB_ISNULL(col_data) || OB_UNLIKELY(col_data_size <= 0)) { - LOG_ERROR("invalid argument", K(col_data_size), K(col_data)); ret = OB_INVALID_ARGUMENT; + LOG_ERROR("invalid argument", KR(ret), K(col_data_size), K(col_data)); } // Validate cols values else if (OB_UNLIKELY(cols.num_ > 0)) { - LOG_ERROR("column value list is not reseted", K(cols)); ret = OB_INVALID_ARGUMENT; + LOG_ERROR("column value list is not reseted", KR(ret), K(cols)); } else if (OB_FAIL(row_reader.read_row(col_data, col_data_size, nullptr, datum_row))) { - TRANS_LOG(WARN, "Failed to read datum row", K(ret)); + LOG_WARN("Failed to read datum row", KR(ret), K(tenant_id), K(table_id), K(is_parse_new_col)); } else { LOG_DEBUG("prepare to handle datum_row", K(datum_row)); // Iterate through all Cells using Cell Reader @@ -367,7 +367,10 @@ int MutatorRow::parse_columns_( ColumnSchemaInfo *column_schema_info = NULL; blocksstable::ObStorageDatum &datum = datum_row.storage_datums_[column_stored_idx]; - if (datum.is_nop()) { + if (OB_FAIL(deep_copy_encoded_column_value_(datum))) { + LOG_ERROR("deep_copy_encoded_column_value_ failed", KR(ret), + K(tenant_id), K(table_id), K(column_stored_idx), K(datum), K(is_parse_new_col)); + } else if (datum.is_nop()) { LOG_DEBUG("ignore nop datum", K(column_stored_idx), K(datum)); } else if (OB_FAIL(get_column_info_( tb_schema_info, @@ -409,7 +412,7 @@ int MutatorRow::parse_columns_( LOG_ERROR("set_obj_propertie_ failed", K(column_id), K(column_stored_idx), KPC(column_schema_info), K(obj_meta), K(obj)); } else if (OB_FAIL(datum.to_obj_enhance(obj, obj_meta))) { - LOG_ERROR("transfer datum to obj failed", K(ret), K(datum), K(obj_meta)); + LOG_ERROR("transfer datum to obj failed", KR(ret), K(datum), K(obj_meta)); } else { const bool is_lob_storage = obj_meta.is_lob_storage(); // Default is false @@ -763,7 +766,7 @@ int MutatorRow::parse_columns_( ret = OB_INVALID_ARGUMENT; LOG_ERROR("column value list is not reseted", KR(ret), K(cols)); } else if (OB_FAIL(row_reader.read_row(col_data, col_data_size, nullptr, datum_row))) { - LOG_ERROR("Failed to read datum row", K(ret)); + LOG_ERROR("Failed to read datum row", KR(ret)); } else { LOG_DEBUG("parse_columns_", K(is_parse_new_col), K(datum_row)); @@ -774,7 +777,10 @@ int MutatorRow::parse_columns_( blocksstable::ObStorageDatum &datum = datum_row.storage_datums_[i]; column_id = col_des_array[i].col_id_; - if (datum.is_nop()) { + if (OB_FAIL(deep_copy_encoded_column_value_(datum))) { + LOG_ERROR("deep_copy_encoded_column_value_ failed", KR(ret), "column_stored_idx", i, K(datum), K(is_parse_new_col)); + } else if (datum.is_nop()) { + LOG_DEBUG("ignore nop datum", "column_stored_idx", i, K(datum)); } else if (OB_INVALID_ID == column_id) { // Note: the column_id obtained here may be invalid // For example a delete statement with only one cell and an invalid column_id in the cell @@ -787,7 +793,7 @@ int MutatorRow::parse_columns_( if (OB_FAIL(set_obj_propertie_(column_id, i, table_schema, obj_meta, obj))) { LOG_ERROR("set_obj_propertie_ failed", K(column_id), K(i), K(obj_meta), K(obj)); } else if (OB_FAIL(datum.to_obj_enhance(obj, obj_meta))) { - LOG_ERROR("transfer datum to obj failed", K(ret), K(datum), K(obj_meta)); + LOG_ERROR("transfer datum to obj failed", KR(ret), K(datum), K(obj_meta)); } else { OB_ASSERT(obj.has_lob_header() == false); // debug only if (OB_FAIL(add_column_(cols, column_id, &obj))) { @@ -832,6 +838,26 @@ int MutatorRow::parse_rowkey_( return ret; } +int MutatorRow::deep_copy_encoded_column_value_(blocksstable::ObStorageDatum &datum) +{ + int ret = OB_SUCCESS; + + if (datum.need_copy_for_encoding_column_with_flat_format(OBJ_DATUM_STRING)) { + // local buffer will free while allocator_ reset + char* local_buffer = static_cast(allocator_.alloc(sizeof(uint64_t))); + + if (OB_ISNULL(local_buffer)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("Failed to allocate local buffer to store local encoded column datum", KR(ret)); + } else { + MEMCPY(local_buffer, datum.ptr_, sizeof(uint64_t)); + datum.set_string(local_buffer, sizeof(uint64_t)); + } + } + + return ret; +} + int MutatorRow::add_column_( ColValueList &cols, const uint64_t column_id, diff --git a/src/logservice/libobcdc/src/ob_log_part_trans_task.h b/src/logservice/libobcdc/src/ob_log_part_trans_task.h index 641e999010..75f5846e67 100644 --- a/src/logservice/libobcdc/src/ob_log_part_trans_task.h +++ b/src/logservice/libobcdc/src/ob_log_part_trans_task.h @@ -285,6 +285,7 @@ private: const TableSchemaInfo *tb_schema_info, const ObTimeZoneInfoWrap *tz_info_wrap, const bool enable_output_hidden_primary_key); + int deep_copy_encoded_column_value_(blocksstable::ObStorageDatum &datum); // 1. get column_id and column_schema_info for user table; // 2. get column_id for all_ddl_operation_table int get_column_info_( diff --git a/src/storage/blocksstable/ob_datum_row.h b/src/storage/blocksstable/ob_datum_row.h index 03eb54819e..327e3695f6 100644 --- a/src/storage/blocksstable/ob_datum_row.h +++ b/src/storage/blocksstable/ob_datum_row.h @@ -582,7 +582,12 @@ OB_INLINE int ObStorageDatum::from_buf_enhance(const char *buf, const int64_t bu } else { reuse(); len_ = static_cast(buf_len); - if (buf_len > 0) { + if (sizeof(uint64_t) == buf_len) { + // To maintain the same processing method as other micro-block formats, + // we perform a deep copy on columns with a length of 8 bytes in flat micro-block format. + // see ObClusterColumnReader::read_column_from_buf + MEMCPY(no_cv(ptr_), buf, sizeof(uint64_t)); + } else if (buf_len > 0) { ptr_ = buf; } }