[CP] [OBCDC] Support heap table for OB4.x mark part_key or dep_col of part_key as primary_key

This commit is contained in:
SanmuWangZJU 2023-04-24 08:11:42 +00:00 committed by ob-robot
parent a58de12030
commit 229db2bb7b
16 changed files with 374 additions and 120 deletions

View File

@ -106,9 +106,9 @@ static int easy_decode_uint16(char *buf, const int64_t data_len, int64_t *pos, u
static int easy_encode_negotiation_msg(easy_negotiation_msg_t *ne_msg, char *buf, int buf_len, int64_t *encode_len)
{
int ret = EASY_OK;
int64_t pos = 0;
if (NULL == ne_msg || NULL == buf || NULL == encode_len) {
int64_t pos = 0;
if (NULL == ne_msg || NULL == buf || NULL == encode_len) {
easy_error_log("easy_encode_negotiation_msg, invalid param!");
return EASY_ERROR;
}
@ -122,9 +122,9 @@ static int easy_encode_negotiation_msg(easy_negotiation_msg_t *ne_msg, char *buf
ret = easy_encode_uint16(buf, buf_len, &pos, ne_msg->msg_header.msg_body_len);
if (ret != EASY_OK) {
easy_error_log("send negotiation msg, encode msg body len failed!");
return ret;
}
return ret;
}
ret = easy_encode_uint64(buf, buf_len, &pos, ne_msg->msg_body.eio_magic);
if (ret != EASY_OK) {
easy_error_log("send negotiation msg, encode eio magic failed!");

View File

@ -172,34 +172,34 @@ int ObDataDictIterator::next_dict_header(ObDictMetaHeader &header)
}
template<class DICT_ENTRY>
int ObDataDictIterator::next_dict_entry(DICT_ENTRY &dict_entry)
int ObDataDictIterator::next_dict_entry(const ObDictMetaHeader &header, DICT_ENTRY &dict_entry)
{
int ret = OB_SUCCESS;
if (dict_pos_ > 0) {
// deserialize from dict_buf_
int64_t deserialize_pos = 0;
if (OB_FAIL(dict_entry.deserialize(dict_buf_, dict_pos_, deserialize_pos))) {
if (OB_FAIL(dict_entry.deserialize(header, dict_buf_, dict_pos_, deserialize_pos))) {
DDLOG(WARN, "deserialize DICT_ENTRY from dict_buf failed", KR(ret),
K_(dict_pos), K(deserialize_pos));
K(header), K_(dict_pos), K(deserialize_pos));
}
} else if (palf_pos_ > 0) {
// deserialize from dict_buf_
if (OB_FAIL(dict_entry.deserialize(palf_buf_, palf_buf_len_, palf_pos_))) {
if (OB_FAIL(dict_entry.deserialize(header, palf_buf_, palf_buf_len_, palf_pos_))) {
DDLOG(WARN, "deserialize DICT_ENTRY from palf_buf failed", KR(ret),
K_(palf_buf_len), K_(palf_pos));
K(header), K_(palf_buf_len), K_(palf_pos));
}
} else {
ret = OB_ERR_UNEXPECTED;
DDLOG(WARN, "expect any of dict_pos/palf_pos is valid", KR(ret), K_(palf_pos), K_(dict_pos));
DDLOG(WARN, "expect any of dict_pos/palf_pos is valid", KR(ret), K(header), K_(palf_pos), K_(dict_pos));
}
return ret;
}
template int ObDataDictIterator::next_dict_entry(ObDictTenantMeta &dict_entry);
template int ObDataDictIterator::next_dict_entry(ObDictDatabaseMeta &dict_entry);
template int ObDataDictIterator::next_dict_entry(ObDictTableMeta &dict_entry);
template int ObDataDictIterator::next_dict_entry(const ObDictMetaHeader &header, ObDictTenantMeta &dict_entry);
template int ObDataDictIterator::next_dict_entry(const ObDictMetaHeader &header, ObDictDatabaseMeta &dict_entry);
template int ObDataDictIterator::next_dict_entry(const ObDictMetaHeader &header, ObDictTableMeta &dict_entry);
int ObDataDictIterator::append_log_buf_with_base_header_(const char *buf, const int64_t buf_len)
{

View File

@ -34,7 +34,7 @@ public:
int append_log_buf(const char *buf, const int64_t buf_len, const int64_t pos); // without log_base_header
int next_dict_header(ObDictMetaHeader &meta_header);
template<class DICT_ENTRY>
int next_dict_entry(DICT_ENTRY &dict_entry);
int next_dict_entry(const ObDictMetaHeader &header, DICT_ENTRY &dict_entry);
private:
OB_INLINE void release_palf_buf_()
{

View File

@ -326,7 +326,7 @@ int ObDataDictStorage::parse_dict_metas(
} else {
new (meta) ObDictTenantMeta(&allocator);
if (OB_FAIL(iterator.next_dict_entry(*meta))) {
if (OB_FAIL(iterator.next_dict_entry(header, *meta))) {
DDLOG(WARN, "next_dict_entry for tenant_meta failed", KR(ret), K(header));
} else if (OB_FAIL(tenant_metas.push_back(meta))) {
DDLOG(WARN, "push_back tenant_meta failed", KR(ret), K(header), KPC(meta));
@ -340,7 +340,7 @@ int ObDataDictStorage::parse_dict_metas(
} else {
new (meta) ObDictDatabaseMeta(&allocator);
if (OB_FAIL(iterator.next_dict_entry(*meta))) {
if (OB_FAIL(iterator.next_dict_entry(header, *meta))) {
DDLOG(WARN, "next_dict_entry for database_meta failed", KR(ret), K(header));
} else if (OB_FAIL(database_metas.push_back(meta))) {
DDLOG(WARN, "push_back database_meta failed", KR(ret), K(header), KPC(meta));
@ -354,7 +354,7 @@ int ObDataDictStorage::parse_dict_metas(
} else {
new (meta) ObDictTableMeta(&allocator);
if (OB_FAIL(iterator.next_dict_entry(*meta))) {
if (OB_FAIL(iterator.next_dict_entry(header, *meta))) {
DDLOG(WARN, "next_dict_entry for table_meta failed", KR(ret), K(header));
} else if (OB_FAIL(table_metas.push_back(meta))) {
DDLOG(WARN, "push_back table_meta failed", KR(ret), K(header), KPC(meta));

View File

@ -20,6 +20,9 @@
#include "share/schema/ob_table_schema.h"
#include "share/schema/ob_table_param.h"
#define DEFINE_DESERIALIZE_DATA_DICT(TypeName) \
int TypeName::deserialize(const ObDictMetaHeader &header, const char* buf, const int64_t data_len, int64_t& pos)
#define PRECHECK_SERIALIZE \
int ret = OB_SUCCESS; \
if (OB_ISNULL(buf) \
@ -83,7 +86,7 @@
} \
} while(0)
#define DESERIALIZE_ARRAY_WITH_ARGS(ARRAY_TYPE, array_ptr, array_size, allocator, args... ) \
#define DESERIALIZE_DICT_ARRAY_WITH_ARGS(ARRAY_TYPE, array_ptr, array_size, allocator, header, args... ) \
do { \
if (OB_SUCC(ret)) { \
if (OB_FAIL(serialization::decode_vi64(buf, data_len, pos, &array_size))) { \
@ -98,7 +101,7 @@
} else { \
for (int i = 0; OB_SUCC(ret) && i < array_size; i++) { \
new (array_ptr + i) ARRAY_TYPE(args); \
if (OB_FAIL(array_ptr[i].deserialize(buf, data_len, pos))) { \
if (OB_FAIL(array_ptr[i].deserialize(header, buf, data_len, pos))) { \
DDLOG(WARN, #ARRAY_TYPE " deserialize fail", KR(ret), K(array_size), K(i)); \
} \
} \
@ -283,7 +286,7 @@ DEFINE_SERIALIZE(ObDictTenantMeta)
return ret;
}
DEFINE_DESERIALIZE(ObDictTenantMeta)
DEFINE_DESERIALIZE_DATA_DICT(ObDictTenantMeta)
{
PRECHECK_DESERIALIZE;
@ -463,7 +466,7 @@ DEFINE_SERIALIZE(ObDictDatabaseMeta)
return ret;
}
DEFINE_DESERIALIZE(ObDictDatabaseMeta)
DEFINE_DESERIALIZE_DATA_DICT(ObDictDatabaseMeta)
{
PRECHECK_DESERIALIZE;
@ -584,6 +587,7 @@ void ObDictColumnMeta::reset()
orig_default_value_.reset();
cur_default_value_.reset();
extended_type_info_.reset();
column_ref_ids_.reset();
}
DEFINE_EQUAL(ObDictColumnMeta)
@ -604,6 +608,7 @@ DEFINE_EQUAL(ObDictColumnMeta)
cur_default_value_
);
IS_OBARRAY_EQUAL(extended_type_info_);
IS_OBARRAY_EQUAL(column_ref_ids_);
return is_equal;
}
@ -626,13 +631,14 @@ DEFINE_SERIALIZE(ObDictColumnMeta)
collation_type_,
orig_default_value_,
cur_default_value_,
extended_type_info_);
extended_type_info_,
column_ref_ids_);
}
return ret;
}
DEFINE_DESERIALIZE(ObDictColumnMeta)
DEFINE_DESERIALIZE_DATA_DICT(ObDictColumnMeta)
{
PRECHECK_DESERIALIZE;
if (OB_FAIL(ret)) {
@ -663,6 +669,13 @@ DEFINE_DESERIALIZE(ObDictColumnMeta)
DDLOG(WARN, "deep copy orig_default_value failed", KR(ret), K(tmp_orig_default_val));
} else if (OB_FAIL(deep_copy_default_val_(tmp_cur_default_val, cur_default_value_))) {
DDLOG(WARN, "deep copy cur_default_value failed", KR(ret), K(tmp_cur_default_val));
} else {
if (header.get_version() > 1) {
// column_ref_ids_ is serialized when versin >= 2
if (OB_FAIL(common::serialization::decode(buf, data_len, pos, column_ref_ids_))) {
DDLOG(WARN, "deserialize column_ref_ids_ failed", KR(ret), K(data_len), K(pos));
}
}
}
}
@ -685,7 +698,8 @@ DEFINE_GET_SERIALIZE_SIZE(ObDictColumnMeta)
collation_type_,
orig_default_value_,
cur_default_value_,
extended_type_info_);
extended_type_info_,
column_ref_ids_);
return len;
}
@ -718,6 +732,8 @@ int ObDictColumnMeta::assign_(COLUMN_META &column_schema)
DDLOG(WARN, "copy_cur_default_value failed", KR(ret), K(column_schema), KPC(this));
} else if (OB_FAIL(deep_copy_str_array(column_schema.get_extended_type_info(), extended_type_info_, *allocator_))) {
DDLOG(WARN, "assign extended_type_info failed", KR(ret), K(column_schema), KPC(this));
} else if (OB_FAIL(column_schema.get_cascaded_column_ids(column_ref_ids_))) {
DDLOG(WARN, "get_cascaded_column_ids failed", KR(ret), K(column_schema));
} else {
column_id_ = column_schema.get_column_id();
rowkey_position_ = column_schema.get_rowkey_position();
@ -758,6 +774,18 @@ int ObDictColumnMeta::assign(const ObDictColumnMeta &src_column_meta)
return ret;
}
int ObDictColumnMeta::get_cascaded_column_ids(ObIArray<uint64_t> &column_ids) const
{
int ret = OB_SUCCESS;
column_ids.reset();
if (OB_FAIL(column_ids.assign(column_ref_ids_))) {
DDLOG(WARN, "assign cascaded_columns failed", KR(ret), KPC(this), K_(column_ref_ids));
}
return ret;
}
int ObDictColumnMeta::deep_copy_default_val_(const ObObj &src_default_val, ObObj &dest_default_val)
{
int ret = OB_SUCCESS;
@ -889,7 +917,7 @@ DEFINE_SERIALIZE(ObDictTableMeta)
return ret;
}
DEFINE_DESERIALIZE(ObDictTableMeta)
DEFINE_DESERIALIZE_DATA_DICT(ObDictTableMeta)
{
PRECHECK_DESERIALIZE;
@ -920,7 +948,7 @@ DEFINE_DESERIALIZE(ObDictTableMeta)
} else if (OB_FAIL(deep_copy_str(tmp_table_name, table_name_, *allocator_))) {
DDLOG(WARN, "deep_copy_str for table_name failed", KR(ret), K(tmp_table_name));
} else {
DESERIALIZE_ARRAY_WITH_ARGS(ObDictColumnMeta, col_metas_, column_count_, allocator_, allocator_);
DESERIALIZE_DICT_ARRAY_WITH_ARGS(ObDictColumnMeta, col_metas_, column_count_, allocator_, header, allocator_);
DESERIALIZE_ARRAY(ObRowkeyColumn, rowkey_cols_, rowkey_column_count_, allocator_);
DESERIALIZE_ARRAY(ObIndexColumn, index_cols_, index_column_count_, allocator_);
}

View File

@ -31,6 +31,11 @@
#include "ob_data_dict_utils.h"
#define NEED_SERIALIZE_AND_DESERIALIZE_DICT \
int serialize(char* buf, const int64_t buf_len, int64_t& pos) const; \
int deserialize(const ObDictMetaHeader &header, const char* buf, const int64_t data_len, int64_t& pos); \
int64_t get_serialize_size(void) const
namespace oceanbase
{
namespace common
@ -74,7 +79,10 @@ public:
ObDictMetaHeader();
ObDictMetaHeader(const ObDictMetaType &meta_type);
virtual ~ObDictMetaHeader() { reset(); }
const static int64_t DEFAULT_VERSION = 1;
public:
// NOTICE: update DEFAULT_VERSION if modify serialized fields in DictxxxMeta
// update to 2 in 4.1 bp1: add column_ref_ids_ in ObDictColumnMeta
const static int64_t DEFAULT_VERSION = 2;
public:
OB_INLINE bool is_valid() const
{
@ -86,6 +94,7 @@ public:
void reset();
bool operator==(const ObDictMetaHeader &other) const;
public:
OB_INLINE int64_t get_version() const { return version_; }
OB_INLINE const ObDictMetaType &get_dict_meta_type() const { return meta_type_; }
OB_INLINE void set_snapshot_scn(const share::SCN &snapshot_scn) { snapshot_scn_ = snapshot_scn; }
OB_INLINE const share::SCN &get_snapshot_scn() const { return snapshot_scn_; }
@ -100,7 +109,7 @@ public:
K_(storage_type),
K_(dict_serialized_length));
private:
int16_t version_; // current version is 1. update if needed.
int16_t version_;
share::SCN snapshot_scn_;
ObDictMetaType meta_type_;
ObDictMetaStorageType storage_type_;
@ -109,7 +118,6 @@ private:
class ObDictTenantMeta
{
OB_UNIS_VERSION(1);
public:
// allocator should keep memory for meta until meta is not in use anymore
explicit ObDictTenantMeta(ObIAllocator *allocator);
@ -153,6 +161,7 @@ public:
OB_INLINE bool is_in_recyclebin() const { return in_recyclebin_; }
OB_INLINE const share::ObLSArray &get_ls_array() const { return ls_arr_; }
NEED_SERIALIZE_AND_DESERIALIZE_DICT;
TO_STRING_KV(
K_(tenant_id),
K_(schema_version),
@ -182,7 +191,6 @@ private:
class ObDictDatabaseMeta
{
OB_UNIS_VERSION(1);
public:
ObDictDatabaseMeta(ObIAllocator *allocator);
virtual ~ObDictDatabaseMeta() { reset(); }
@ -212,6 +220,7 @@ public:
OB_INLINE bool is_in_recyclebin() const { return in_recyclebin_; }
OB_INLINE common::ObNameCaseMode get_name_case_mode() const { return name_case_mode_; }
NEED_SERIALIZE_AND_DESERIALIZE_DICT;
TO_STRING_KV(
K_(tenant_id),
K_(database_id),
@ -243,7 +252,6 @@ private:
class ObDictColumnMeta
{
OB_UNIS_VERSION(1);
public:
ObDictColumnMeta(ObIAllocator *allocator);
virtual ~ObDictColumnMeta() { reset(); }
@ -288,7 +296,9 @@ public:
OB_INLINE bool is_generated_column() const { return is_virtual_generated_column() || is_stored_generated_column(); }
OB_INLINE bool is_shadow_column() const { return column_id_ > common::OB_MIN_SHADOW_COLUMN_ID; }
OB_INLINE bool has_generated_column_deps() const { return column_flags_ & GENERATED_DEPS_CASCADE_FLAG; }
int get_cascaded_column_ids(ObIArray<uint64_t> &column_ids) const;
NEED_SERIALIZE_AND_DESERIALIZE_DICT;
TO_STRING_KV(
K_(column_id),
K_(column_name),
@ -335,11 +345,11 @@ private:
common::ObObj orig_default_value_;//first default value, used for alter table add column; collation must be same with the column
common::ObObj cur_default_value_; //collation must be same with the column
common::ObSEArray<common::ObString, 8> extended_type_info_;//used for enum and set
common::ObSEArray<uint64_t, 2> column_ref_ids_;
};
class ObDictTableMeta
{
OB_UNIS_VERSION(1);
public:
ObDictTableMeta(ObIAllocator *allocator);
virtual ~ObDictTableMeta() { reset(); }
@ -437,6 +447,7 @@ public:
int get_column_meta(const uint64_t column_id, const ObDictColumnMeta *&column_meta) const;
const ObDictColumnMeta *get_column_schema(const uint64_t column_id) const;
public:
NEED_SERIALIZE_AND_DESERIALIZE_DICT;
TO_STRING_KV(
K_(tenant_id),
K_(database_id),

View File

@ -124,7 +124,8 @@ int64_t IObCDCPartTransResolver::MissingLogInfo::get_total_misslog_cnt() const
int IObCDCPartTransResolver::MissingLogInfo::sort_and_unique_missing_log_lsn()
{
return sort_and_unique_lsn_arr(miss_redo_or_state_lsn_arr_);
auto fn = [](palf::LSN &lsn1, palf::LSN &lsn2) { return lsn1 < lsn2; };
return sort_and_unique_array(miss_redo_or_state_lsn_arr_, fn);
}
// *************** ObCDCPartTransResolver public functions ***************** //

View File

@ -141,17 +141,17 @@ int ObLogMetaDataBaselineLoader::read(
LOG_ERROR("data_dict_iterator append_log failed", KR(ret), K(tenant_id));
} else {
bool is_done = false;
datadict::ObDictMetaHeader meta_heaer;
datadict::ObDictMetaHeader meta_header;
while (OB_SUCC(ret) && ! is_done) {
meta_heaer.reset();
meta_header.reset();
if (OB_FAIL(data_dict_iterator.next_dict_header(meta_heaer))) {
if (OB_FAIL(data_dict_iterator.next_dict_header(meta_header))) {
if (OB_ITER_END != ret) {
LOG_ERROR("data_dict_iterator next_dict_heaer failed", KR(ret), K(tenant_id));
}
} else {
const datadict::ObDictMetaType &meta_type = meta_heaer.get_dict_meta_type();
const datadict::ObDictMetaType &meta_type = meta_header.get_dict_meta_type();
switch (meta_type) {
case datadict::ObDictMetaType::TABLE_META:
@ -160,7 +160,7 @@ int ObLogMetaDataBaselineLoader::read(
if (OB_FAIL(dict_tenant_info->alloc_dict_table_meta(table_meta))) {
LOG_ERROR("alloc_dict_table_meta failed", K(ret), K(tenant_id));
} else if (OB_FAIL(data_dict_iterator.next_dict_entry(*table_meta))) {
} else if (OB_FAIL(data_dict_iterator.next_dict_entry(meta_header, *table_meta))) {
LOG_ERROR("data_dict_iterator next_dict_entry for table_meta failed", K(ret), K(tenant_id),
K(table_meta));
} else if (OB_FAIL(dict_tenant_info->insert_dict_table_meta(table_meta))) {
@ -177,7 +177,7 @@ int ObLogMetaDataBaselineLoader::read(
if (OB_FAIL(dict_tenant_info->alloc_dict_db_meta(db_meta))) {
LOG_ERROR("alloc_dict_db_meta failed", K(ret), K(tenant_id));
} else if (OB_FAIL(data_dict_iterator.next_dict_entry(*db_meta))) {
} else if (OB_FAIL(data_dict_iterator.next_dict_entry(meta_header, *db_meta))) {
LOG_ERROR("data_dict_iterator next_dict_entry for db_meta failed", K(ret), K(tenant_id),
K(db_meta));
} else if (OB_FAIL(dict_tenant_info->insert_dict_db_meta(db_meta))) {
@ -192,7 +192,7 @@ int ObLogMetaDataBaselineLoader::read(
{
datadict::ObDictTenantMeta &tenant_meta = dict_tenant_info->get_dict_tenant_meta();
if (OB_FAIL(data_dict_iterator.next_dict_entry(tenant_meta))) {
if (OB_FAIL(data_dict_iterator.next_dict_entry(meta_header, tenant_meta))) {
LOG_ERROR("data_dict_iterator next_dict_entry for tenant_meta failed", K(ret), K(tenant_id),
K(tenant_meta));
} else {

View File

@ -1045,89 +1045,79 @@ int ObLogMetaManager::set_column_meta_(
template<class TABLE_SCHEMA>
int ObLogMetaManager::set_primary_keys_(ITableMeta *table_meta,
const TABLE_SCHEMA *schema,
const TABLE_SCHEMA *table_schema,
const TableSchemaInfo &tb_schema_info)
{
int ret = OB_SUCCESS;
int64_t valid_pk_num = 0;
const int64_t rowkey_column_num = schema->get_rowkey_column_num();
ObLogAdaptString pks(ObModIds::OB_LOG_TEMP_MEMORY);
ObLogAdaptString pk_info(ObModIds::OB_LOG_TEMP_MEMORY);
if (OB_ISNULL(table_meta) || OB_ISNULL(schema)) {
LOG_ERROR("invalid argument", K(table_meta), K(schema));
if (OB_ISNULL(table_meta) || OB_ISNULL(table_schema)) {
LOG_ERROR("invalid argument", K(table_meta), K(table_schema));
ret = OB_INVALID_ARGUMENT;
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_column_num; i++) {
int64_t column_index = -1;
ColumnSchemaInfo *column_schema_info = NULL;
if (! table_schema->is_heap_table()) {
const int64_t rowkey_column_num = table_schema->get_rowkey_column_num();
if (OB_FAIL(tb_schema_info.get_column_schema_info_for_rowkey(i, column_schema_info))) {
LOG_ERROR("get_column_schema_info", KR(ret), "table_id", schema->get_table_id(),
"table_name", schema->get_table_name(),
K_(enable_output_hidden_primary_key), K(column_schema_info));
} else if (! column_schema_info->is_usr_column()) {
// filter non user column
META_STAT_INFO("ignore non user-required column for set_row_keys_", KPC(column_schema_info),
"table_name", schema->get_table_name(),
"table_id", schema->get_table_id(),
"rowkey_index", i,
K(rowkey_column_num));
} else if (! column_schema_info->is_rowkey()) { // not rowkey
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("not a heap table and have no-rowkey in TableSchema::rowley_info_", K(ret),
K(column_schema_info));
} else {
const auto *column_table_schema = schema->get_column_schema(column_schema_info->get_column_id());
if (OB_ISNULL(column_table_schema)) {
for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_column_num; i++) {
int64_t column_index = -1;
ColumnSchemaInfo *column_schema_info = NULL;
if (OB_FAIL(tb_schema_info.get_column_schema_info_for_rowkey(i, column_schema_info))) {
LOG_ERROR("get_column_schema_info", KR(ret), "table_id", table_schema->get_table_id(),
"table_name", table_schema->get_table_name(),
K_(enable_output_hidden_primary_key), K(column_schema_info));
} else if (! column_schema_info->is_rowkey()) { // not rowkey
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("get_column_schema_by_id_internal fail", KR(ret), KPC(column_schema_info), KPC(schema), KPC(column_table_schema));
} else {
column_index = column_schema_info->get_usr_column_idx();
LOG_ERROR("not a heap table and have no-rowkey in TableSchema::rowley_info_", K(ret),
K(column_schema_info));
} else if (OB_FAIL(fill_primary_key_info_(
*table_schema,
*column_schema_info,
pks,
pk_info,
valid_pk_num))) {
LOG_ERROR("fill_primary_key_info_ failed", KR(ret), K(pks), K(pk_info), K(valid_pk_num), K(column_schema_info));
}
} // for
} else {
ObArray<uint64_t> logic_pks;
auto fn = [](uint64_t &a, uint64_t &b) { return a < b; };
if (OB_UNLIKELY(column_index < 0 || column_index >= OB_MAX_COLUMN_NUMBER)) {
LOG_ERROR("column_index is invalid", K(column_index),
"table_id", schema->get_table_id(),
"table_name", schema->get_table_name(),
"column_id", column_schema_info->get_column_id(),
"column_name", column_table_schema->get_column_name());
if (OB_FAIL(get_logic_primary_keys_for_heap_table_(*table_schema, logic_pks))) {
LOG_ERROR("get_logic_primary_keys_for_heap_table_ failed", KR(ret), KPC(table_schema), K(logic_pks));
} else if (OB_FAIL(sort_and_unique_array(logic_pks, fn))) {
LOG_ERROR("sort and unique logic_pks failed", KR(ret), K(logic_pks), K(table_schema));
} else {
ARRAY_FOREACH_N(logic_pks, column_id_idx, pk_count) {
const uint64_t column_id = logic_pks.at(column_id_idx);
ColumnSchemaInfo *column_schema_info = NULL;
if (OB_UNLIKELY(OB_HIDDEN_PK_INCREMENT_COLUMN_ID < column_id && OB_APP_MIN_COLUMN_ID > column_id)) {
ret = OB_ERR_UNEXPECTED;
} else {
ret = pks.append(column_table_schema->get_column_name());
if (OB_SUCCESS == ret) {
if (i < (rowkey_column_num - 1)) {
ret = pks.append(",");
}
}
if (OB_SUCCESS == ret) {
if (0 == valid_pk_num) {
ret = pk_info.append("(");
} else {
ret = pk_info.append(",");
}
}
if (OB_SUCCESS == ret) {
ret = pk_info.append_int64(column_index);
}
if (OB_SUCCESS == ret) {
valid_pk_num++;
} else {
LOG_ERROR("pks or pk_info append fail", KR(ret), K(pks), K(pk_info), K(column_index));
}
LOG_ERROR("invalid column_id", KR(ret), K(column_id_idx), K(column_id), K(logic_pks), KPC(table_schema));
} else if (OB_FAIL(tb_schema_info.get_column_schema_info_of_column_id(column_id, column_schema_info))) {
LOG_ERROR("get_column_schema_info_of_column_id failed", KR(ret), K(column_id), KPC(table_schema));
} else if (OB_FAIL(fill_primary_key_info_(
*table_schema,
*column_schema_info,
pks,
pk_info,
valid_pk_num))) {
LOG_ERROR("fill_primary_key_info_ failed",
KR(ret), K(pks), K(pk_info), K(valid_pk_num), K(column_id_idx), K(column_id), K(logic_pks), K(column_schema_info));
}
}
}
} // for
}
if (OB_SUCC(ret)) {
table_meta->setHasPK((valid_pk_num > 0));
const bool has_pk = (valid_pk_num > 0);
table_meta->setHasPK((has_pk));
// 只有在存在pk的情况下,才设置主键信息
if (valid_pk_num > 0) {
// only set primary_key_info if primary key is exist
if (has_pk) {
if (OB_FAIL(pk_info.append(")"))) {
LOG_ERROR("pk_info append fail", KR(ret), K(pk_info));
} else {
@ -1139,7 +1129,7 @@ int ObLogMetaManager::set_primary_keys_(ITableMeta *table_meta,
} else if (OB_FAIL(pks.cstr(pks_str))) {
LOG_ERROR("get pks str fail", KR(ret), K(pks));
}
// 要求cstr是有效的
// require cstr is valid
else if (OB_ISNULL(pk_info_str) || OB_ISNULL(pks_str)) {
LOG_ERROR("pk_info_str or pks_str is invalid", K(pk_info_str), K(pks_str), K(pk_info),
K(pks), K(valid_pk_num));
@ -1151,8 +1141,8 @@ int ObLogMetaManager::set_primary_keys_(ITableMeta *table_meta,
}
}
META_STAT_INFO("set_primary_keys", KR(ret), "table_name", schema->get_table_name(),
"table_id", schema->get_table_id(),
META_STAT_INFO("set_primary_keys", KR(ret), "table_name", table_schema->get_table_name(),
"table_id", table_schema->get_table_id(),
"has_pk", table_meta->hasPK(), "pk_info", table_meta->getPkinfo(),
"pks", table_meta->getPKs());
}
@ -1160,6 +1150,125 @@ int ObLogMetaManager::set_primary_keys_(ITableMeta *table_meta,
return ret;
}
template<class TABLE_SCHEMA>
int ObLogMetaManager::get_logic_primary_keys_for_heap_table_(
const TABLE_SCHEMA &table_schema,
ObIArray<uint64_t> &pk_list)
{
int ret = OB_SUCCESS;
const bool enable_output_hidden_primary_key = (1 == TCONF.enable_output_hidden_primary_key);
pk_list.reset();
if (table_schema.is_heap_table() && enable_output_hidden_primary_key) {
ObArray<ObColDesc> col_ids;
if (OB_FAIL(table_schema.get_column_ids(col_ids))) {
LOG_ERROR("get all column info failed", KR(ret), K(table_schema));
} else {
ARRAY_FOREACH_N(col_ids, col_idx, col_cnt) {
bool chosen = false;
const ObColDesc &col_desc = col_ids.at(col_idx);
const uint64_t column_id = col_desc.col_id_;
if (OB_HIDDEN_PK_INCREMENT_COLUMN_ID == column_id) {
chosen = true;
} else {
auto *column_schema = table_schema.get_column_schema(column_id);
if (OB_ISNULL(column_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid column_schema", KR(ret), K(column_id), K(col_desc), K(table_schema));
} else if (column_schema->is_tbl_part_key_column()) {
// if not virtual: set as logic pk
// otherwise set its dep columns as logic pk
if (column_schema->is_virtual_generated_column()) {
ObArray<uint64_t> deped_cols;
if (OB_FAIL(column_schema->get_cascaded_column_ids(deped_cols))) {
LOG_ERROR("get_cascaded_column_ids from column_schema failed", KR(ret), K(column_schema));
} else {
ARRAY_FOREACH_N(deped_cols, dep_col_idx, dep_col_cnt) {
const uint64_t deped_col_id = deped_cols.at(dep_col_idx);
if (OB_UNLIKELY(OB_HIDDEN_PK_INCREMENT_COLUMN_ID < deped_col_id && OB_APP_MIN_COLUMN_ID > deped_col_id)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid deped column", KR(ret), K(column_id), K(deped_col_id));
} else if (OB_FAIL(pk_list.push_back(deped_col_id))) {
LOG_ERROR("push_back column_id into pk_list failed", KR(ret), K(column_id));
}
}
}
} else {
chosen = true;
}
}
}
if (OB_SUCC(ret) && chosen && OB_FAIL(pk_list.push_back(column_id))) {
LOG_ERROR("push_back column_id into pk_list failed", KR(ret), K(column_id));
}
} // for
}
LOG_INFO("get_logic_primary_keys_for_heap_table_", KR(ret),
"tenant_id", table_schema.get_tenant_id(),
"table_id", table_schema.get_table_id(),
"table_name", table_schema.get_table_name(),
K(pk_list));
}
return ret;
}
template<class TABLE_SCHEMA>
int ObLogMetaManager::fill_primary_key_info_(
const TABLE_SCHEMA &table_schema,
const ColumnSchemaInfo &column_schema_info,
ObLogAdaptString &pks,
ObLogAdaptString &pk_info,
int64_t &valid_pk_num)
{
int ret = OB_SUCCESS;
int64_t column_index = column_schema_info.get_usr_column_idx();
const auto *column_table_schema = table_schema.get_column_schema(column_schema_info.get_column_id());
if (OB_ISNULL(column_table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("get_column_schema_by_id_internal fail", KR(ret), K(column_schema_info), K(table_schema), KPC(column_table_schema));
} else if (OB_UNLIKELY(! column_schema_info.is_usr_column())) {
// filter non user column
META_STAT_INFO("ignore non user-required column for set_row_keys_",
"tenant_id", table_schema.get_tenant_id(),
"table_name", table_schema.get_table_name(),
"table_id", table_schema.get_table_id(),
K(column_schema_info));
} else if (OB_UNLIKELY(column_index < 0 || column_index >= OB_MAX_COLUMN_NUMBER)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("column_index is invalid", KR(ret),
K(column_index),
"table_id", table_schema.get_table_id(),
"table_name", table_schema.get_table_name(),
"column_id", column_schema_info.get_column_id(),
"column_name", column_table_schema->get_column_name());
} else if (valid_pk_num > 0 && OB_FAIL(pks.append(","))) {
LOG_ERROR("append pks delimeter failed", KR(ret), K(valid_pk_num), K(pks));
} else if (pks.append(column_table_schema->get_column_name())) {
LOG_ERROR("append column_name into pks failed", KR(ret), K(pks), KPC(column_table_schema));
} else {
if (OB_SUCC(ret)) {
if (0 == valid_pk_num) {
ret = pk_info.append("(");
} else {
ret = pk_info.append(",");
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(pk_info.append_int64(column_index))) {
LOG_ERROR("append column_index into pk_info failed", KR(ret), K(column_index), K(pk_info), KPC(column_table_schema));
} else {
valid_pk_num++;
}
}
return ret;
}
int ObLogMetaManager::set_unique_keys_from_unique_index_table_(
const share::schema::ObTableSchema *table_schema,
const share::schema::ObTableSchema *index_table_schema,

View File

@ -47,6 +47,7 @@ namespace libobcdc
class ObLogSchemaGuard;
class IObLogSchemaGetter;
class TableSchemaInfo;
class ColumnSchemaInfo;
class ObDictTenantInfo;
class ObDictTenantInfoGuard;
@ -346,6 +347,17 @@ private:
ITableMeta *table_meta,
const TABLE_SCHEMA*schema,
const TableSchemaInfo &tb_schema_info);
template<class TABLE_SCHEMA>
int get_logic_primary_keys_for_heap_table_(
const TABLE_SCHEMA &table_schema,
ObIArray<uint64_t> &pk_list);
template<class TABLE_SCHEMA>
int fill_primary_key_info_(
const TABLE_SCHEMA &table_schema,
const ColumnSchemaInfo &column_schema_info,
ObLogAdaptString &pks,
ObLogAdaptString &pk_info,
int64_t &valid_pk_num);
template<class SCHEMA_GUARD, class TABLE_SCHEMA>
int set_unique_keys_(
ITableMeta *table_meta,

View File

@ -112,7 +112,8 @@ int SortedLogEntryInfo::is_all_log_entry_fetched(bool &is_all_redo_fetched)
{
int ret = OB_SUCCESS;
if (OB_FAIL(sort_and_unique_lsn_arr(recorded_lsn_arr_))) {
auto fn = [](palf::LSN &lsn1, palf::LSN &lsn2) { return lsn1 < lsn2; };
if (OB_FAIL(sort_and_unique_array(recorded_lsn_arr_, fn))) {
LOG_ERROR("sort_and_unique_recorded_lsn_arr failed", KR(ret), KPC(this));
} else {
is_all_redo_fetched = fetched_log_entry_arr_.count() == recorded_lsn_arr_.count();

View File

@ -561,6 +561,49 @@ struct CDCLSNComparator
// NOT THREAD_SAFE
int sort_and_unique_lsn_arr(ObLogLSNArray &lsn_arr);
// sort arr and remove duplicate item in arr
// 1. Item in array should impl copy-assign
// 2. comparator should compare Item in array, and should obey rule of std::sort
template<class ARRAY, class Comparator>
int sort_and_unique_array(ARRAY &arr, Comparator &comparator)
{
int ret = OB_SUCCESS;
ObArray<int64_t> duplicated_item_idx_arr;
if (arr.count() > 1) {
// sort lsn_arr
std::sort(arr.begin(), arr.end(), comparator);
auto prev = arr.at(0);
// get duplicate misslog lsn idx
for(int64_t idx = 1; OB_SUCC(ret) && idx < arr.count(); idx++) {
auto &cur = arr.at(idx);
if (prev == cur) {
if (OB_FAIL(duplicated_item_idx_arr.push_back(idx))) {
OBLOG_LOG(WARN, "push_back_duplicate_item_arr fail", KR(ret), K(cur), K(prev), K(idx));
}
}
if (OB_SUCC(ret)) {
prev = cur;
}
}
// remove duplicate misslog lsn
for(int64_t idx = duplicated_item_idx_arr.count() - 1; OB_SUCC(ret) && idx >= 0; idx--) {
int64_t duplicate_item_idx = duplicated_item_idx_arr[idx];
if (OB_UNLIKELY(0 > duplicate_item_idx || duplicate_item_idx > arr.count())) {
ret = OB_INVALID_ARGUMENT;
OBLOG_LOG(WARN, "invalid duplicate_cur_lsn_idx", KR(ret), K(arr), K(duplicated_item_idx_arr), K(idx), K(duplicate_item_idx));
} else if (OB_FAIL(arr.remove(duplicate_item_idx))) {
OBLOG_LOG(WARN, "remove_duplicate_item failed", KR(ret), K(arr), K(duplicate_item_idx));
} else {
}
}
OBLOG_LOG(DEBUG, "sort_and_unique_array", KR(ret), K(duplicated_item_idx_arr), K(arr));
}
return ret;
}
typedef int32_t offset_t;
// write specified buf to specified file.

View File

@ -505,7 +505,7 @@ int ObAdminParserLogEntry::parse_data_dict_log_()
switch (header.get_dict_meta_type()) {
case datadict::ObDictMetaType::TENANT_META: {
datadict::ObDictTenantMeta tenant_meta(&allocator);
if (OB_FAIL(dict_iterator.next_dict_entry(tenant_meta))) {
if (OB_FAIL(dict_iterator.next_dict_entry(header, tenant_meta))) {
LOG_ERROR("get next_dict_entry failed", KR(ret), K(header), K(tenant_meta));
} else {
str_arg_.writer_ptr_->dump_key("TenantMeta");
@ -515,7 +515,7 @@ int ObAdminParserLogEntry::parse_data_dict_log_()
}
case datadict::ObDictMetaType::DATABASE_META: {
datadict::ObDictDatabaseMeta db_meta(&allocator);
if (OB_FAIL(dict_iterator.next_dict_entry(db_meta))) {
if (OB_FAIL(dict_iterator.next_dict_entry(header, db_meta))) {
LOG_ERROR("get next_dict_entry failed", KR(ret), K(header), K(db_meta));
} else {
str_arg_.writer_ptr_->dump_key("DatabaseMeta");
@ -525,7 +525,7 @@ int ObAdminParserLogEntry::parse_data_dict_log_()
}
case datadict::ObDictMetaType::TABLE_META: {
datadict::ObDictTableMeta table_meta(&allocator);
if (OB_FAIL(dict_iterator.next_dict_entry(table_meta))) {
if (OB_FAIL(dict_iterator.next_dict_entry(header, table_meta))) {
LOG_ERROR("get next_dict_entry failed", KR(ret), K(header), K(table_meta));
} else {
str_arg_.writer_ptr_->dump_key("TableMeta");

View File

@ -139,7 +139,7 @@ TEST(ObDataDictStorage, test_storage_in_palf)
} else {
DDLOG(ERROR, "next_dict_header failed", KR(ret), K(header));
}
} else if (OB_FAIL(iterator.next_dict_entry(tb_meta_after))) {
} else if (OB_FAIL(iterator.next_dict_entry(header, tb_meta_after))) {
DDLOG(ERROR, "next_dict_entry failed", KR(ret), K(header));
} else {
EXPECT_TRUE(*tb_meta == tb_meta_after);
@ -202,7 +202,7 @@ TEST(ObDataDictStorage, test_storage_in_dict)
} else {
DDLOG(ERROR, "next_dict_header failed", KR(ret), K(header));
}
} else if (OB_FAIL(iterator.next_dict_entry(tb_meta_after))) {
} else if (OB_FAIL(iterator.next_dict_entry(header, tb_meta_after))) {
DDLOG(ERROR, "next_dict_entry failed", KR(ret), K(header));
} else {
EXPECT_TRUE(*tb_meta == tb_meta_after);

View File

@ -62,6 +62,7 @@ TEST(ObDictTenantMeta, test_raw)
ObArenaAllocator allocator;
ObArenaAllocator allocator_for_deserialize;
ObDictTenantMeta tenant_meta(&allocator);
ObDictMetaHeader header(ObDictMetaType::TENANT_META);
tenant_meta.tenant_id_ = 1001;
tenant_meta.schema_version_=100003124341;
tenant_meta.tenant_name_ = "md_tenant";
@ -82,7 +83,7 @@ TEST(ObDictTenantMeta, test_raw)
ObDictTenantMeta tenant_meta_after(&allocator_for_deserialize);
int64_t deserialize_pos = 0;
EXPECT_EQ(OB_SUCCESS, tenant_meta_after.deserialize(buf, serialize_size, deserialize_pos));
EXPECT_EQ(OB_SUCCESS, tenant_meta_after.deserialize(header, buf, serialize_size, deserialize_pos));
DDLOG(INFO, "deserialized_meta", K(deserialize_pos), K(tenant_meta_after));
ob_free(buf);
@ -101,6 +102,7 @@ TEST(ObDictDatabaseMeta, test_raw)
ObArenaAllocator allocator;
ObArenaAllocator allocator_for_deserialize;
ObDictDatabaseMeta db_meta(&allocator);
ObDictMetaHeader header(ObDictMetaType::DATABASE_META);
db_meta.database_id_ = 1003030501;
db_meta.tenant_id_ = 23412;
db_meta.schema_version_ = 790134621334;
@ -119,7 +121,7 @@ TEST(ObDictDatabaseMeta, test_raw)
ObDictDatabaseMeta db_meta_after(&allocator_for_deserialize);
int64_t deserialize_pos = 0;
EXPECT_EQ(OB_SUCCESS, db_meta_after.deserialize(buf, serialize_size, deserialize_pos));
EXPECT_EQ(OB_SUCCESS, db_meta_after.deserialize(header, buf, serialize_size, deserialize_pos));
DDLOG(INFO, "deserialized_meta", K(deserialize_pos), K(db_meta_after));
ob_free(buf);
@ -143,6 +145,7 @@ TEST(ObDictColumnMeta, test_raw)
ObArenaAllocator allocator_for_deserialize;
DictTableMetaBuilder meta_builder;
ObDictColumnMeta col_meta(&allocator);
ObDictMetaHeader header(ObDictMetaType::TABLE_META);
meta_builder.build_column_meta(&col_meta);
DDLOG(INFO, "build_column_meta", K(col_meta));
@ -154,7 +157,7 @@ TEST(ObDictColumnMeta, test_raw)
ObDictColumnMeta col_meta_after(&allocator_for_deserialize);
int64_t deserialize_pos = 0;
EXPECT_EQ(OB_SUCCESS, col_meta_after.deserialize(buf, serialize_size, deserialize_pos));
EXPECT_EQ(OB_SUCCESS, col_meta_after.deserialize(header, buf, serialize_size, deserialize_pos));
DDLOG(INFO, "deserialized_meta", K(deserialize_pos), K(col_meta_after));
ob_free(buf);
@ -220,6 +223,7 @@ TEST(ObDictTableMeta, test_raw)
ObArenaAllocator allocator_for_deserialize;
DictTableMetaBuilder meta_builder;
ObDictTableMeta *tb_meta;
ObDictMetaHeader header(ObDictMetaType::TABLE_META);
const int64_t col_count = 4000;
const int64_t rowkey_count = 57;
const int64_t index_column_count = 1235;
@ -237,7 +241,7 @@ TEST(ObDictTableMeta, test_raw)
ObDictTableMeta tb_meta_after(&allocator_for_deserialize);
int64_t deserialize_pos = 0;
EXPECT_EQ(OB_SUCCESS, tb_meta_after.deserialize(buf, serialize_size, deserialize_pos));
EXPECT_EQ(OB_SUCCESS, tb_meta_after.deserialize(header, buf, serialize_size, deserialize_pos));
DDLOG(INFO, "deserialized_meta", K(deserialize_pos), K(tb_meta_after));
ob_free(buf);

View File

@ -318,12 +318,57 @@ TEST(utils, cstring_to_num)
EXPECT_EQ(OB_INVALID_ARGUMENT, ret);
}
TEST(utils, unique_arr_uint64)
{
auto fn = [](uint64_t &a, uint64_t &b) { return a < b; };
ObArray<uint64_t> arr;
EXPECT_EQ(OB_SUCCESS, arr.push_back(0));
EXPECT_EQ(OB_SUCCESS, arr.push_back(2));
EXPECT_EQ(OB_SUCCESS, arr.push_back(1));
EXPECT_EQ(OB_SUCCESS, arr.push_back(3));
EXPECT_EQ(OB_SUCCESS, arr.push_back(1));
EXPECT_EQ(OB_SUCCESS, arr.push_back(2));
EXPECT_EQ(OB_SUCCESS, arr.push_back(3));
EXPECT_EQ(OB_SUCCESS, arr.push_back(2));
EXPECT_EQ(OB_SUCCESS, sort_and_unique_array(arr, fn));
EXPECT_EQ(4, arr.count());
EXPECT_EQ(0, arr.at(0));
EXPECT_EQ(1, arr.at(1));
EXPECT_EQ(2, arr.at(2));
EXPECT_EQ(3, arr.at(3));
}
TEST(utils, unique_arr_lsn)
{
auto fn = [](palf::LSN &a, palf::LSN &b) { return a < b; };
ObSEArray<palf::LSN, 8> arr;
EXPECT_EQ(OB_SUCCESS, arr.push_back(palf::LSN(0)));
EXPECT_EQ(OB_SUCCESS, arr.push_back(palf::LSN(2)));
EXPECT_EQ(OB_SUCCESS, arr.push_back(palf::LSN(1)));
EXPECT_EQ(OB_SUCCESS, arr.push_back(palf::LSN(3)));
EXPECT_EQ(OB_SUCCESS, arr.push_back(palf::LSN(1)));
EXPECT_EQ(OB_SUCCESS, arr.push_back(palf::LSN(2)));
EXPECT_EQ(OB_SUCCESS, arr.push_back(palf::LSN(3)));
EXPECT_EQ(OB_SUCCESS, arr.push_back(palf::LSN(2)));
EXPECT_EQ(OB_SUCCESS, sort_and_unique_array(arr, fn));
EXPECT_EQ(4, arr.count());
EXPECT_EQ(palf::LSN(0), arr.at(0));
EXPECT_EQ(palf::LSN(1), arr.at(1));
EXPECT_EQ(palf::LSN(2), arr.at(2));
EXPECT_EQ(palf::LSN(3), arr.at(3));
}
}
}
int main(int argc, char **argv)
{
// ObLogger::get_logger().set_mod_log_levels("ALL.*:DEBUG, TLOG.*:DEBUG");
system("rm -f test_log_utils.log");
ObLogger &logger = ObLogger::get_logger();
bool not_output_obcdc_log = true;
logger.set_file_name("test_log_utils.log", not_output_obcdc_log, false);
logger.set_log_level(OB_LOG_LEVEL_DEBUG);
logger.set_mod_log_levels("ALL.*:DEBUG");
logger.set_enable_async_log(false);
testing::InitGoogleTest(&argc,argv);
// testing::FLAGS_gtest_filter = "DO_NOT_RUN";
return RUN_ALL_TESTS();