Refactor tablet ID calculation in partition table
This commit is contained in:
parent
d266ea7a7a
commit
464493e92d
@ -250,40 +250,50 @@ int ObTableLSExecuteP::add_new_op(ObIArray<std::pair<uint64_t, ObTableTabletOp>>
|
||||
int ObTableLSExecuteP::partition_single_op(ObTableTabletOp& origin_tablet_op, ObIArray<std::pair<uint64_t, ObTableTabletOp>>& tablet_ops)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
uint64_t real_table_id = OB_INVALID;
|
||||
uint64_t real_tablet_id = OB_INVALID;
|
||||
int64_t now_ms = -ObHTableUtils::current_time_millis();
|
||||
for (int32_t i = 0; OB_SUCC(ret) && i < origin_tablet_op.count(); ++i) {
|
||||
ObTableSingleOp& single_op = origin_tablet_op.at(i);
|
||||
ObString tablegroup_name = arg_.ls_op_.get_table_name();
|
||||
ObObj qualifier;
|
||||
ObTableSingleOpEntity& single_entity = single_op.get_entities().at(0);
|
||||
if (OB_FAIL(single_entity.get_rowkey_value(ObHTableConstants::COL_IDX_Q, qualifier))) {
|
||||
LOG_WARN("fail to get qualifier value", K(ret));
|
||||
} else {
|
||||
ObSqlString real_table_name;
|
||||
ObString family = qualifier.get_string().split_on('.');
|
||||
if (OB_FAIL(real_table_name.append(tablegroup_name))) {
|
||||
LOG_WARN("fail to append tablegroup name", K(ret), K(tablegroup_name));
|
||||
} else if (OB_FAIL(real_table_name.append("$"))) {
|
||||
LOG_WARN("fail to append $", K(ret));
|
||||
} else if (OB_FAIL(real_table_name.append(family))) {
|
||||
LOG_WARN("fail to append family name", K(ret), K(family));
|
||||
} else if (OB_FAIL(modify_htable_quailfier_and_timestamp(&single_entity,
|
||||
qualifier.get_string().after('.'),
|
||||
now_ms,
|
||||
single_op.get_op_type()))) {
|
||||
LOG_WARN("fail to modify hbase entity", K(ret));
|
||||
} else if (OB_FAIL(get_table_id(real_table_name.string(), OB_INVALID, real_table_id))) {
|
||||
LOG_WARN("failed to get_table_id by table_name", K(ret), K(real_table_name), K(qualifier), K(family));
|
||||
} else if (OB_FAIL(get_tablet_id_by_rowkey(real_table_id, single_entity.get_rowkey(), real_tablet_id))) {
|
||||
LOG_WARN("failed to get_tablet_id_by_rowkey ", K(ret), K(real_table_id), K(single_entity.get_rowkey()));
|
||||
ObTabletID real_tablet_id;
|
||||
int64_t part_idx = OB_INVALID_INDEX;
|
||||
int64_t subpart_idx = OB_INVALID_INDEX;
|
||||
int64_t arg_table_id = arg_.ls_op_.get_table_id();
|
||||
ObTabletID arg_tablet_id = arg_.ls_op_.at(0).get_tablet_id();
|
||||
|
||||
if (OB_FAIL(get_idx_by_table_tablet_id(arg_table_id, arg_tablet_id, part_idx, subpart_idx))) {
|
||||
LOG_WARN("fail to get part idx", K(ret), K(arg_table_id), K(arg_tablet_id));
|
||||
} else {
|
||||
int64_t now_ms = -ObHTableUtils::current_time_millis();
|
||||
for (int32_t i = 0; OB_SUCC(ret) && i < origin_tablet_op.count(); ++i) {
|
||||
ObTableSingleOp& single_op = origin_tablet_op.at(i);
|
||||
ObString tablegroup_name = arg_.ls_op_.get_table_name();
|
||||
ObObj qualifier;
|
||||
ObTableSingleOpEntity& single_entity = single_op.get_entities().at(0);
|
||||
if (OB_FAIL(single_entity.get_rowkey_value(ObHTableConstants::COL_IDX_Q, qualifier))) {
|
||||
LOG_WARN("fail to get qualifier value", K(ret));
|
||||
} else {
|
||||
bool is_found = false;
|
||||
if (OB_FAIL(find_and_add_op(tablet_ops, real_table_id, single_op, is_found))) {
|
||||
LOG_WARN("fail to add op to single_op", K(ret));
|
||||
} else if (!is_found && OB_FAIL(add_new_op(tablet_ops, real_table_id, real_tablet_id, origin_tablet_op, single_op))) {
|
||||
LOG_WARN("fail to push new tablet op", K(ret));
|
||||
ObSqlString real_table_name;
|
||||
ObString family = qualifier.get_string().split_on('.');
|
||||
if (OB_FAIL(real_table_name.append(tablegroup_name))) {
|
||||
LOG_WARN("fail to append tablegroup name", K(ret), K(tablegroup_name));
|
||||
} else if (OB_FAIL(real_table_name.append("$"))) {
|
||||
LOG_WARN("fail to append $", K(ret));
|
||||
} else if (OB_FAIL(real_table_name.append(family))) {
|
||||
LOG_WARN("fail to append family name", K(ret), K(family));
|
||||
} else if (OB_FAIL(modify_htable_quailfier_and_timestamp(&single_entity,
|
||||
qualifier.get_string().after('.'),
|
||||
now_ms,
|
||||
single_op.get_op_type()))) {
|
||||
LOG_WARN("fail to modify hbase entity", K(ret));
|
||||
} else if (OB_FAIL(get_table_id(real_table_name.string(), OB_INVALID, real_table_id))) {
|
||||
LOG_WARN("failed to get_table_id by table_name", K(ret), K(real_table_name), K(qualifier), K(family));
|
||||
} else if (OB_FAIL(get_tablet_by_idx(real_table_id, part_idx, subpart_idx, real_tablet_id))) {
|
||||
LOG_WARN("failed to get_tablet_id_by_rowkey ", K(ret), K(real_table_id));
|
||||
} else {
|
||||
bool is_found = false;
|
||||
if (OB_FAIL(find_and_add_op(tablet_ops, real_table_id, single_op, is_found))) {
|
||||
LOG_WARN("fail to add op to single_op", K(ret));
|
||||
} else if (!is_found && OB_FAIL(add_new_op(tablet_ops, real_table_id, real_tablet_id.id(), origin_tablet_op, single_op))) {
|
||||
LOG_WARN("fail to push new tablet op", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -297,13 +307,18 @@ int ObTableLSExecuteP::construct_delete_family_op(const ObTableSingleOp &single_
|
||||
ObTableTabletOp &tablet_op) {
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
ObTableLSOp &ls_op = arg_.ls_op_;
|
||||
uint64_t real_tablet_id = OB_INVALID_ID;
|
||||
const ObRowkey& rowkey = single_op.get_entities().at(0).get_rowkey();
|
||||
if OB_FAIL(get_tablet_id_by_rowkey(mutation_info.table_id_, rowkey, real_tablet_id)) {
|
||||
ObTabletID real_tablet_id;
|
||||
int64_t part_idx;
|
||||
int64_t subpart_idx;
|
||||
int64_t arg_table_id = arg_.ls_op_.get_table_id();
|
||||
ObTabletID arg_tablet_id = arg_.ls_op_.at(0).get_tablet_id();
|
||||
|
||||
if (OB_FAIL(get_idx_by_table_tablet_id(arg_table_id, arg_tablet_id, part_idx, subpart_idx))) {
|
||||
LOG_WARN("fail to get part idx", K(ret), K(arg_table_id), K(arg_tablet_id));
|
||||
} else if (OB_FAIL(get_tablet_by_idx(mutation_info.table_id_, part_idx, subpart_idx, real_tablet_id))) {
|
||||
LOG_WARN("fail to get tablet id", K(ret));
|
||||
} else {
|
||||
tablet_op.set_tablet_id(real_tablet_id);
|
||||
tablet_op.set_tablet_id(real_tablet_id.id());
|
||||
tablet_op.set_dictionary(single_op.get_all_rowkey_names(), single_op.get_all_properties_names());
|
||||
ObTableSingleOp new_single_op;
|
||||
new_single_op.set_op_query(const_cast<ObTableSingleOpQuery*>(single_op.get_query()));
|
||||
@ -355,7 +370,9 @@ int ObTableLSExecuteP::init_multi_schema_info(const ObString& arg_tablegroup_nam
|
||||
schema_guard_))) {
|
||||
LOG_WARN("fail to init shcema_cache_guard", K(ret));
|
||||
} else {
|
||||
hbase_infos_.push_back(mutation_info);
|
||||
if (OB_FAIL(hbase_infos_.push_back(mutation_info))) {
|
||||
LOG_WARN("fail to push mutation info", K(ret), K(mutation_info));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -386,17 +403,31 @@ int ObTableLSExecuteP::try_process()
|
||||
LOG_USER_ERROR(OB_TABLE_NOT_EXIST, to_cstring(db), to_cstring(tablegroup_name));
|
||||
}
|
||||
LOG_WARN("fail to init schema info", K(ret), K(ls_op.get_table_name()));
|
||||
} else if (OB_FAIL(get_ls_id(ls_id, hbase_infos_.at(0)->simple_schema_))) {
|
||||
LOG_WARN("fail to get ls id", K(ret));
|
||||
} else if (OB_FAIL(check_table_has_global_index(exist_global_index, hbase_infos_.at(0)->schema_cache_guard_))) {
|
||||
LOG_WARN("fail to check global index", K(ret), K(ls_op.get_table_name()));
|
||||
} else if (need_all_prop) {
|
||||
ObSEArray<ObString, 8> all_prop_name;
|
||||
const ObIArray<ObTableColumnInfo *> &column_info_array = hbase_infos_.at(0)->schema_cache_guard_.get_column_info_array();
|
||||
if (OB_FAIL(ObTableApiUtil::expand_all_columns(column_info_array, all_prop_name))) {
|
||||
LOG_WARN("fail to expand all columns", K(ret));
|
||||
} else if (OB_FAIL(cb_result->assign_properties_names(all_prop_name))) {
|
||||
LOG_WARN("fail to assign property names to result", K(ret));
|
||||
} else if (hbase_infos_.empty()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("table info is empty", K(ret));
|
||||
} else {
|
||||
const share::schema::ObSimpleTableSchemaV2 *simple_schema = NULL;
|
||||
for (int i = 0; OB_ISNULL(simple_schema) && i < hbase_infos_.count(); ++i) {
|
||||
if (hbase_infos_.at(i)->table_id_ == ls_op.get_table_id()) {
|
||||
simple_schema = hbase_infos_.at(i)->simple_schema_;
|
||||
}
|
||||
}
|
||||
if (OB_ISNULL(simple_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get null simple_schema", K(ret), K(ls_op.get_table_id()));
|
||||
} else if (OB_FAIL(get_ls_id(ls_id, simple_schema))) {
|
||||
LOG_WARN("fail to get ls id", K(ret));
|
||||
} else if (OB_FAIL(check_table_has_global_index(exist_global_index, hbase_infos_.at(0)->schema_cache_guard_))) {
|
||||
LOG_WARN("fail to check global index", K(ret), K(ls_op.get_table_name()));
|
||||
} else if (need_all_prop) {
|
||||
ObSEArray<ObString, 8> all_prop_name;
|
||||
const ObIArray<ObTableColumnInfo *> &column_info_array = hbase_infos_.at(0)->schema_cache_guard_.get_column_info_array();
|
||||
if (OB_FAIL(ObTableApiUtil::expand_all_columns(column_info_array, all_prop_name))) {
|
||||
LOG_WARN("fail to expand all columns", K(ret));
|
||||
} else if (OB_FAIL(cb_result->assign_properties_names(all_prop_name))) {
|
||||
LOG_WARN("fail to assign property names to result", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -555,7 +555,7 @@ int ObTableQueryAsyncP::init_tb_ctx(ObTableCtx& ctx, ObTableSingleQueryInfo& que
|
||||
ObObjectID tmp_object_id = OB_INVALID_ID;
|
||||
ObObjectID tmp_first_level_part_id = OB_INVALID_ID;
|
||||
ObTabletID real_tablet_id;
|
||||
if (query_ctx.part_idx_ == OB_INVALID_ID && query_ctx.subpart_idx_ == OB_INVALID_ID) { // 非分区表
|
||||
if (query_ctx.part_idx_ == OB_INVALID_INDEX && query_ctx.subpart_idx_ == OB_INVALID_INDEX) { // 非分区表
|
||||
real_tablet_id = query_info.simple_schema_->get_tablet_id();
|
||||
} else if (OB_FAIL(query_info.simple_schema_->get_part_id_and_tablet_id_by_idx(query_ctx.part_idx_,
|
||||
query_ctx.subpart_idx_,
|
||||
|
@ -476,81 +476,52 @@ int ObTableApiProcessorBase::get_table_id(
|
||||
return ret;
|
||||
}
|
||||
|
||||
// transaction control
|
||||
int ObTableApiProcessorBase::get_tablet_by_rowkey(uint64_t table_id, const ObIArray<ObRowkey> &rowkeys,
|
||||
ObIArray<ObTabletID> &tablet_ids)
|
||||
{
|
||||
int ObTableApiProcessorBase::get_idx_by_table_tablet_id(uint64_t arg_table_id, ObTabletID arg_tablet_id,
|
||||
int64_t &part_idx, int64_t &subpart_idx) {
|
||||
int ret = OB_SUCCESS;
|
||||
const ObTableSchema *table_schema = NULL;
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
share::schema::ObSchemaGetterGuard schema_guard;
|
||||
SMART_VAR(sql::ObTableLocation, location_calc) {
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
const ObTableSchema *table_schema;
|
||||
if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
|
||||
LOG_WARN("failed to get schema guard", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
|
||||
LOG_WARN("failed to get table schema", K(ret), K(tenant_id), K(table_id));
|
||||
} else if (OB_ISNULL(table_schema)) {
|
||||
ret = OB_TABLE_NOT_EXIST;
|
||||
LOG_WARN("get table schema failed", K(ret), K(tenant_id), K(table_id));
|
||||
} else if (!table_schema->is_partitioned_table()) {
|
||||
tablet_ids.push_back(table_schema->get_tablet_id());
|
||||
} else {
|
||||
// trigger client to refresh table entry
|
||||
// maybe drop a non-partitioned table and create a
|
||||
// partitioned table with same name
|
||||
ret = OB_SCHEMA_ERROR;
|
||||
LOG_WARN("partitioned table should pass right tablet id from client", K(ret));
|
||||
}
|
||||
if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
|
||||
LOG_WARN("failed to get schema guard", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, arg_table_id, table_schema))) {
|
||||
LOG_WARN("failed to get table schema", K(ret), K(tenant_id), K(arg_table_id));
|
||||
} else if (OB_ISNULL(table_schema)) {
|
||||
ret = OB_TABLE_NOT_EXIST;
|
||||
} else if (!table_schema->is_partitioned_table()) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(table_schema->get_part_idx_by_tablet(arg_tablet_id, part_idx, subpart_idx))) {
|
||||
LOG_WARN("fail to get part idx by tablet", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableApiProcessorBase::get_tablet_by_rowkey_partition_table(uint64_t table_id,
|
||||
const ObIArray<ObRowkey> &rowkeys,
|
||||
ObIArray<ObTabletID> &tablet_ids)
|
||||
int ObTableApiProcessorBase::get_tablet_by_idx(uint64_t table_id,
|
||||
int64_t part_idx,
|
||||
int64_t subpart_idx,
|
||||
ObTabletID &tablet_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSEArray<ObObjectID, 1> part_ids;
|
||||
share::schema::ObSchemaGetterGuard schema_guard;
|
||||
SMART_VAR(sql::ObTableLocation, location_calc) {
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
const ObTableSchema *table_schema;
|
||||
if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
|
||||
LOG_WARN("failed to get schema guard", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
|
||||
LOG_WARN("failed to get table schema", K(ret), K(tenant_id), K(table_id));
|
||||
} else if (OB_ISNULL(table_schema)) {
|
||||
ret = OB_TABLE_NOT_EXIST;
|
||||
LOG_WARN("get table schema failed", K(ret), K(tenant_id), K(table_id));
|
||||
} else if (!table_schema->is_partitioned_table()) {
|
||||
ret = OB_SCHEMA_ERROR;
|
||||
LOG_WARN("used for partoition table", K(ret));
|
||||
} else if (OB_FAIL(location_calc.calculate_partition_ids_by_rowkey(
|
||||
session(), schema_guard, table_id, rowkeys, tablet_ids, part_ids))) {
|
||||
LOG_WARN("failed to calc partition id", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableApiProcessorBase::get_tablet_id_by_rowkey(uint64_t table_id, const ObRowkey& rowkey, uint64_t& tablet_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSEArray<ObRowkey, 1> rowkey_array;
|
||||
rowkey_array.push_back(rowkey);
|
||||
ObSEArray<ObTabletID, 1> tablet_ids;
|
||||
if (OB_FAIL(get_tablet_by_rowkey(table_id, rowkey_array, tablet_ids))) {
|
||||
if (OB_SCHEMA_ERROR == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
if (OB_FAIL(get_tablet_by_rowkey_partition_table(table_id, rowkey_array, tablet_ids))) {
|
||||
LOG_WARN("fail to get tablet by rowkey in partition table", K(ret));
|
||||
}
|
||||
} else {
|
||||
LOG_WARN("failed to get_tablet_by_rowkey", K(table_id), K(rowkey_array));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
tablet_id = tablet_ids.at(0).id();
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
const ObTableSchema *table_schema = NULL;
|
||||
ObObjectID tmp_object_id = OB_INVALID_ID;
|
||||
ObObjectID tmp_first_level_part_id = OB_INVALID_ID;
|
||||
if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
|
||||
LOG_WARN("failed to get schema guard", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, table_id, table_schema))) {
|
||||
LOG_WARN("failed to get table schema", K(ret), K(tenant_id), K(table_id));
|
||||
} else if (OB_ISNULL(table_schema)) {
|
||||
ret = OB_TABLE_NOT_EXIST;
|
||||
LOG_WARN("get table schema failed", K(ret), K(tenant_id), K(table_id));
|
||||
} else if (!table_schema->is_partitioned_table()) {
|
||||
tablet_id = table_schema->get_tablet_id();
|
||||
} else if (OB_FAIL(table_schema->get_part_id_and_tablet_id_by_idx(part_idx,
|
||||
subpart_idx,
|
||||
tmp_object_id,
|
||||
tmp_first_level_part_id,
|
||||
tablet_id))) {
|
||||
LOG_WARN("fail to get tablet by idx", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -128,12 +128,10 @@ public:
|
||||
|
||||
// for get
|
||||
inline transaction::ObTxDesc *get_trans_desc() { return trans_param_.trans_desc_; }
|
||||
int get_tablet_by_rowkey(uint64_t table_id, const ObIArray<ObRowkey> &rowkeys,
|
||||
ObIArray<ObTabletID> &tablet_ids);
|
||||
int get_tablet_by_rowkey_partition_table(uint64_t table_id,
|
||||
const ObIArray<ObRowkey> &rowkeys,
|
||||
ObIArray<ObTabletID> &tablet_ids);
|
||||
int get_tablet_id_by_rowkey(uint64_t table_id, const ObRowkey& rowkey, uint64_t& tablet_id);
|
||||
|
||||
int get_idx_by_table_tablet_id(uint64_t arg_table_id, ObTabletID arg_tablet_id,
|
||||
int64_t &part_idx, int64_t &subpart_idx);
|
||||
int get_tablet_by_idx(uint64_t table_id, int64_t part_idx, int64_t subpart_idx, ObTabletID &tablet_ids);
|
||||
inline transaction::ObTxReadSnapshot &get_tx_snapshot() { return trans_param_.tx_snapshot_; }
|
||||
inline bool had_do_response() const { return trans_param_.had_do_response_; }
|
||||
int get_table_id(const ObString &table_name, const uint64_t arg_table_id, uint64_t &real_table_id) const;
|
||||
@ -153,7 +151,10 @@ protected:
|
||||
virtual int init_schema_info(const ObString &arg_table_name);
|
||||
virtual int init_schema_info(uint64_t table_id);
|
||||
int check_table_has_global_index(bool &exists, table::ObKvSchemaCacheGuard& schema_cache_guard);
|
||||
int get_tablet_id(const share::schema::ObSimpleTableSchemaV2 * simple_table_schema, const ObTabletID &arg_tablet_id, const uint64_t table_id, ObTabletID &tablet_id);
|
||||
int get_tablet_id(const share::schema::ObSimpleTableSchemaV2 * simple_table_schema,
|
||||
const ObTabletID &arg_tablet_id,
|
||||
const uint64_t table_id,
|
||||
ObTabletID &tablet_id);
|
||||
protected:
|
||||
const ObGlobalContext &gctx_;
|
||||
ObTableService *table_service_;
|
||||
|
Loading…
x
Reference in New Issue
Block a user