[improvement](binlog)Support inverted index format v2 in CCR (#33415)
This commit is contained in:
@ -573,24 +573,41 @@ Status BetaRowset::add_to_binlog() {
|
||||
}
|
||||
linked_success_files.push_back(binlog_file);
|
||||
|
||||
for (const auto& index : _schema->indexes()) {
|
||||
if (index.index_type() != IndexType::INVERTED) {
|
||||
continue;
|
||||
if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) {
|
||||
for (const auto& index : _schema->indexes()) {
|
||||
if (index.index_type() != IndexType::INVERTED) {
|
||||
continue;
|
||||
}
|
||||
auto index_id = index.index_id();
|
||||
auto index_file = InvertedIndexDescriptor::get_index_file_name(
|
||||
seg_file, index_id, index.get_index_suffix());
|
||||
auto binlog_index_file = (std::filesystem::path(binlog_dir) /
|
||||
std::filesystem::path(index_file).filename())
|
||||
.string();
|
||||
VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file;
|
||||
if (!local_fs->link_file(index_file, binlog_index_file).ok()) {
|
||||
status = Status::Error<OS_ERROR>(
|
||||
"fail to create hard link. from={}, to={}, errno={}", index_file,
|
||||
binlog_index_file, Errno::no());
|
||||
return status;
|
||||
}
|
||||
linked_success_files.push_back(binlog_index_file);
|
||||
}
|
||||
auto index_id = index.index_id();
|
||||
auto index_file = InvertedIndexDescriptor::get_index_file_name(
|
||||
seg_file, index_id, index.get_index_suffix());
|
||||
auto binlog_index_file = (std::filesystem::path(binlog_dir) /
|
||||
std::filesystem::path(index_file).filename())
|
||||
.string();
|
||||
VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file;
|
||||
if (!local_fs->link_file(index_file, binlog_index_file).ok()) {
|
||||
status = Status::Error<OS_ERROR>(
|
||||
"fail to create hard link. from={}, to={}, errno={}", index_file,
|
||||
binlog_index_file, Errno::no());
|
||||
return status;
|
||||
} else {
|
||||
if (_schema->has_inverted_index()) {
|
||||
auto index_file = InvertedIndexDescriptor::get_index_file_name(seg_file);
|
||||
auto binlog_index_file = (std::filesystem::path(binlog_dir) /
|
||||
std::filesystem::path(index_file).filename())
|
||||
.string();
|
||||
VLOG_DEBUG << "link " << index_file << " to " << binlog_index_file;
|
||||
if (!local_fs->link_file(index_file, binlog_index_file).ok()) {
|
||||
status = Status::Error<OS_ERROR>(
|
||||
"fail to create hard link. from={}, to={}, errno={}", index_file,
|
||||
binlog_index_file, Errno::no());
|
||||
return status;
|
||||
}
|
||||
linked_success_files.push_back(binlog_index_file);
|
||||
}
|
||||
linked_success_files.push_back(binlog_index_file);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -687,26 +687,47 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet
|
||||
}
|
||||
linked_success_files.push_back(snapshot_segment_file_path);
|
||||
|
||||
for (const auto& index : tablet_schema.indexes()) {
|
||||
if (index.index_type() != IndexType::INVERTED) {
|
||||
continue;
|
||||
if (tablet_schema.get_inverted_index_storage_format() ==
|
||||
InvertedIndexStorageFormatPB::V1) {
|
||||
for (const auto& index : tablet_schema.indexes()) {
|
||||
if (index.index_type() != IndexType::INVERTED) {
|
||||
continue;
|
||||
}
|
||||
auto index_id = index.index_id();
|
||||
auto index_file = ref_tablet->get_segment_index_filepath(
|
||||
rowset_id, segment_index, index_id);
|
||||
auto snapshot_segment_index_file_path =
|
||||
fmt::format("{}/{}_{}_{}.binlog-index", schema_full_path, rowset_id,
|
||||
segment_index, index_id);
|
||||
VLOG_DEBUG << "link " << index_file << " to "
|
||||
<< snapshot_segment_index_file_path;
|
||||
res = io::global_local_filesystem()->link_file(
|
||||
index_file, snapshot_segment_index_file_path);
|
||||
if (!res.ok()) {
|
||||
LOG(WARNING) << "fail to link binlog index file. [src=" << index_file
|
||||
<< ", dest=" << snapshot_segment_index_file_path << "]";
|
||||
break;
|
||||
}
|
||||
linked_success_files.push_back(snapshot_segment_index_file_path);
|
||||
}
|
||||
auto index_id = index.index_id();
|
||||
auto index_file = ref_tablet->get_segment_index_filepath(
|
||||
rowset_id, segment_index, index_id);
|
||||
auto snapshot_segment_index_file_path =
|
||||
fmt::format("{}/{}_{}_{}.binlog-index", schema_full_path, rowset_id,
|
||||
segment_index, index_id);
|
||||
VLOG_DEBUG << "link " << index_file << " to "
|
||||
<< snapshot_segment_index_file_path;
|
||||
res = io::global_local_filesystem()->link_file(
|
||||
index_file, snapshot_segment_index_file_path);
|
||||
if (!res.ok()) {
|
||||
LOG(WARNING) << "fail to link binlog index file. [src=" << index_file
|
||||
<< ", dest=" << snapshot_segment_index_file_path << "]";
|
||||
break;
|
||||
} else {
|
||||
if (tablet_schema.has_inverted_index()) {
|
||||
auto index_file =
|
||||
InvertedIndexDescriptor::get_index_file_name(segment_file_path);
|
||||
auto snapshot_segment_index_file_path =
|
||||
fmt::format("{}/{}_{}.binlog-index", schema_full_path, rowset_id,
|
||||
segment_index);
|
||||
VLOG_DEBUG << "link " << index_file << " to "
|
||||
<< snapshot_segment_index_file_path;
|
||||
res = io::global_local_filesystem()->link_file(
|
||||
index_file, snapshot_segment_index_file_path);
|
||||
if (!res.ok()) {
|
||||
LOG(WARNING) << "fail to link binlog index file. [src=" << index_file
|
||||
<< ", dest=" << snapshot_segment_index_file_path << "]";
|
||||
break;
|
||||
}
|
||||
linked_success_files.push_back(snapshot_segment_index_file_path);
|
||||
}
|
||||
linked_success_files.push_back(snapshot_segment_index_file_path);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -3573,14 +3573,25 @@ std::string Tablet::get_segment_filepath(std::string_view rowset_id, int64_t seg
|
||||
std::string Tablet::get_segment_index_filepath(std::string_view rowset_id,
|
||||
std::string_view segment_index,
|
||||
std::string_view index_id) const {
|
||||
// TODO(qiye): support inverted index file format v2, when https://github.com/apache/doris/pull/30145 is merged
|
||||
return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index, index_id);
|
||||
auto format = _tablet_meta->tablet_schema()->get_inverted_index_storage_format();
|
||||
if (format == doris::InvertedIndexStorageFormatPB::V1) {
|
||||
return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index,
|
||||
index_id);
|
||||
} else {
|
||||
return fmt::format("{}/_binlog/{}_{}.idx", _tablet_path, rowset_id, segment_index);
|
||||
}
|
||||
}
|
||||
|
||||
std::string Tablet::get_segment_index_filepath(std::string_view rowset_id, int64_t segment_index,
|
||||
int64_t index_id) const {
|
||||
// TODO(qiye): support inverted index file format v2, when https://github.com/apache/doris/pull/30145 is merged
|
||||
return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index, index_id);
|
||||
auto format = _tablet_meta->tablet_schema()->get_inverted_index_storage_format();
|
||||
if (format == doris::InvertedIndexStorageFormatPB::V1) {
|
||||
return fmt::format("{}/_binlog/{}_{}_{}.idx", _tablet_path, rowset_id, segment_index,
|
||||
index_id);
|
||||
} else {
|
||||
DCHECK(index_id == -1);
|
||||
return fmt::format("{}/_binlog/{}_{}.idx", _tablet_path, rowset_id, segment_index);
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::string> Tablet::get_binlog_filepath(std::string_view binlog_version) const {
|
||||
|
||||
@ -305,41 +305,81 @@ void _ingest_binlog(IngestBinlogArg* arg) {
|
||||
std::vector<uint64_t> segment_index_file_sizes;
|
||||
std::vector<std::string> segment_index_file_names;
|
||||
auto tablet_schema = rowset_meta->tablet_schema();
|
||||
for (const auto& index : tablet_schema->indexes()) {
|
||||
if (index.index_type() != IndexType::INVERTED) {
|
||||
continue;
|
||||
}
|
||||
auto index_id = index.index_id();
|
||||
for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
|
||||
auto get_segment_index_file_size_url = fmt::format(
|
||||
"{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={"
|
||||
"}",
|
||||
binlog_api_url, "get_segment_index_file", request.remote_tablet_id,
|
||||
remote_rowset_id, segment_index, index_id);
|
||||
uint64_t segment_index_file_size;
|
||||
auto get_segment_index_file_size_cb = [&get_segment_index_file_size_url,
|
||||
&segment_index_file_size](HttpClient* client) {
|
||||
RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
|
||||
client->set_timeout_ms(kMaxTimeoutMs);
|
||||
RETURN_IF_ERROR(client->head());
|
||||
return client->get_content_length(&segment_index_file_size);
|
||||
};
|
||||
auto index_file = InvertedIndexDescriptor::inverted_index_file_path(
|
||||
local_tablet->tablet_path(), rowset_meta->rowset_id(), segment_index, index_id,
|
||||
index.get_index_suffix());
|
||||
segment_index_file_names.push_back(index_file);
|
||||
|
||||
status = HttpClient::execute_with_retry(max_retry, 1, get_segment_index_file_size_cb);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "failed to get segment file size from "
|
||||
<< get_segment_index_file_size_url
|
||||
<< ", status=" << status.to_string();
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
if (tablet_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) {
|
||||
for (const auto& index : tablet_schema->indexes()) {
|
||||
if (index.index_type() != IndexType::INVERTED) {
|
||||
continue;
|
||||
}
|
||||
auto index_id = index.index_id();
|
||||
for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
|
||||
auto get_segment_index_file_size_url = fmt::format(
|
||||
"{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={"
|
||||
"}",
|
||||
binlog_api_url, "get_segment_index_file", request.remote_tablet_id,
|
||||
remote_rowset_id, segment_index, index_id);
|
||||
uint64_t segment_index_file_size;
|
||||
auto get_segment_index_file_size_cb =
|
||||
[&get_segment_index_file_size_url,
|
||||
&segment_index_file_size](HttpClient* client) {
|
||||
RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
|
||||
client->set_timeout_ms(kMaxTimeoutMs);
|
||||
RETURN_IF_ERROR(client->head());
|
||||
return client->get_content_length(&segment_index_file_size);
|
||||
};
|
||||
auto index_file = InvertedIndexDescriptor::inverted_index_file_path(
|
||||
local_tablet->tablet_path(), rowset_meta->rowset_id(), segment_index,
|
||||
index_id, index.get_index_suffix());
|
||||
segment_index_file_names.push_back(index_file);
|
||||
|
||||
segment_index_file_sizes.push_back(segment_index_file_size);
|
||||
segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url));
|
||||
status = HttpClient::execute_with_retry(max_retry, 1,
|
||||
get_segment_index_file_size_cb);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "failed to get segment file size from "
|
||||
<< get_segment_index_file_size_url
|
||||
<< ", status=" << status.to_string();
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
|
||||
segment_index_file_sizes.push_back(segment_index_file_size);
|
||||
segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
|
||||
if (tablet_schema->has_inverted_index()) {
|
||||
auto get_segment_index_file_size_url = fmt::format(
|
||||
"{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={"
|
||||
"}",
|
||||
binlog_api_url, "get_segment_index_file", request.remote_tablet_id,
|
||||
remote_rowset_id, segment_index, -1);
|
||||
uint64_t segment_index_file_size;
|
||||
auto get_segment_index_file_size_cb =
|
||||
[&get_segment_index_file_size_url,
|
||||
&segment_index_file_size](HttpClient* client) {
|
||||
RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
|
||||
client->set_timeout_ms(kMaxTimeoutMs);
|
||||
RETURN_IF_ERROR(client->head());
|
||||
return client->get_content_length(&segment_index_file_size);
|
||||
};
|
||||
auto local_segment_path = BetaRowset::segment_file_path(
|
||||
local_tablet->tablet_path(), rowset_meta->rowset_id(), segment_index);
|
||||
auto index_file = InvertedIndexDescriptor::get_index_file_name(local_segment_path);
|
||||
segment_index_file_names.push_back(index_file);
|
||||
|
||||
status = HttpClient::execute_with_retry(max_retry, 1,
|
||||
get_segment_index_file_size_cb);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "failed to get segment file size from "
|
||||
<< get_segment_index_file_size_url
|
||||
<< ", status=" << status.to_string();
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
|
||||
segment_index_file_sizes.push_back(segment_index_file_size);
|
||||
segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -427,7 +427,9 @@ class Syncer {
|
||||
|
||||
Boolean checkRestoreFinish() {
|
||||
String checkSQL = "SHOW RESTORE FROM TEST_" + context.db
|
||||
List<Object> row = suite.sql(checkSQL)[0]
|
||||
int size = suite.sql(checkSQL).size()
|
||||
logger.info("Now size is ${size}")
|
||||
List<Object> row = suite.sql(checkSQL)[size-1]
|
||||
logger.info("Now row is ${row}")
|
||||
|
||||
return (row[4] as String) == "FINISHED"
|
||||
@ -645,9 +647,9 @@ class Syncer {
|
||||
|
||||
// step 2: get partitionIds
|
||||
metaMap.values().forEach {
|
||||
baseSql += "/" + it.id.toString() + "/partitions"
|
||||
def partitionSql = baseSql + "/" + it.id.toString() + "/partitions"
|
||||
Map<Long, Long> partitionInfo = Maps.newHashMap()
|
||||
sqlInfo = sendSql.call(baseSql, toSrc)
|
||||
sqlInfo = sendSql.call(partitionSql, toSrc)
|
||||
for (List<Object> row : sqlInfo) {
|
||||
partitionInfo.put(row[0] as Long, row[2] as Long)
|
||||
}
|
||||
@ -660,7 +662,7 @@ class Syncer {
|
||||
for (Entry<Long, Long> info : partitionInfo) {
|
||||
|
||||
// step 3.1: get partition/indexId
|
||||
String partitionSQl = baseSql + "/" + info.key.toString()
|
||||
String partitionSQl = partitionSql + "/" + info.key.toString()
|
||||
sqlInfo = sendSql.call(partitionSQl, toSrc)
|
||||
if (sqlInfo.isEmpty()) {
|
||||
logger.error("Target cluster partition-${info.key} indexId fault.")
|
||||
|
||||
@ -0,0 +1,217 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_binlog_config_change_index") {
|
||||
|
||||
def syncer = getSyncer()
|
||||
if (!syncer.checkEnableFeatureBinlog()) {
|
||||
logger.info("fe enable_feature_binlog is false, skip case test_binlog_config_change_index")
|
||||
return
|
||||
}
|
||||
def insert_data = { tableName ->
|
||||
[
|
||||
"""INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 100);""",
|
||||
"""INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 99);""",
|
||||
"""INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple", 100);""",
|
||||
"""INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear", 99);""",
|
||||
"""INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple", 100);""",
|
||||
"""INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear", 99);"""
|
||||
]
|
||||
}
|
||||
|
||||
def sqls = { tableName ->
|
||||
[
|
||||
""" select * from ${tableName} order by id, name, hobbies, score """,
|
||||
""" select * from ${tableName} where name match "andy" order by id, name, hobbies, score """,
|
||||
""" select * from ${tableName} where hobbies match "pear" order by id, name, hobbies, score """,
|
||||
""" select * from ${tableName} where score < 100 order by id, name, hobbies, score """
|
||||
]
|
||||
}
|
||||
|
||||
def run_sql = { tableName ->
|
||||
sqls(tableName).each { sqlStatement ->
|
||||
def target_res = target_sql sqlStatement
|
||||
def res = sql sqlStatement
|
||||
assertEquals(res, target_res)
|
||||
}
|
||||
}
|
||||
|
||||
def create_table_v1 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES ( "replication_num" = "1");
|
||||
"""
|
||||
}
|
||||
|
||||
def create_table_v2 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES ( "replication_num" = "1", "inverted_index_storage_format" = "V2");
|
||||
"""
|
||||
}
|
||||
|
||||
def create_table_mow_v1 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_num" = "1",
|
||||
"binlog.enable" = "true",
|
||||
"enable_unique_key_merge_on_write" = "true");
|
||||
"""
|
||||
}
|
||||
|
||||
def create_table_mow_v2 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_num" = "1",
|
||||
"binlog.enable" = "true",
|
||||
"enable_unique_key_merge_on_write" = "true",
|
||||
"inverted_index_storage_format" = "V2");
|
||||
"""
|
||||
}
|
||||
|
||||
def run_test = {create_table, tableName ->
|
||||
sql "DROP TABLE IF EXISTS ${tableName}"
|
||||
sql create_table
|
||||
sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
|
||||
|
||||
target_sql "DROP TABLE IF EXISTS ${tableName}"
|
||||
target_sql create_table
|
||||
|
||||
assertTrue(syncer.getTargetMeta("${tableName}"))
|
||||
|
||||
// test 1: target cluster follow source cluster
|
||||
logger.info("=== Test 1: Target cluster follow source cluster case ===")
|
||||
|
||||
insert_data(tableName).each { sqlStatement ->
|
||||
sql sqlStatement
|
||||
assertTrue(syncer.getBinlog("${tableName}"))
|
||||
assertTrue(syncer.beginTxn("${tableName}"))
|
||||
assertTrue(syncer.getBackendClients())
|
||||
assertTrue(syncer.ingestBinlog())
|
||||
assertTrue(syncer.commitTxn())
|
||||
assertTrue(syncer.checkTargetVersion())
|
||||
syncer.closeBackendClients()
|
||||
}
|
||||
|
||||
target_sql " sync "
|
||||
def res = target_sql """SELECT * FROM ${tableName}"""
|
||||
if (tableName.contains("mow")) {
|
||||
assertEquals(res.size(), insert_data(tableName).size() / 2 as Integer)
|
||||
} else {
|
||||
assertEquals(res.size(), insert_data(tableName).size())
|
||||
}
|
||||
run_sql(tableName)
|
||||
}
|
||||
|
||||
|
||||
// inverted index format v1
|
||||
logger.info("=== Test 1: Inverted index format v1 case ===")
|
||||
def tableName = "tbl_binlog_config_change_index_v1"
|
||||
run_test.call(create_table_v1(tableName), tableName)
|
||||
|
||||
// inverted index format v2
|
||||
logger.info("=== Test 2: Inverted index format v2 case ===")
|
||||
tableName = "tbl_binlog_config_change_index_v2"
|
||||
run_test.call(create_table_v2(tableName), tableName)
|
||||
|
||||
// inverted index format v1 with mow
|
||||
logger.info("=== Test 3: Inverted index format v1 with mow case ===")
|
||||
tableName = "tbl_binlog_config_change_index_mow_v1"
|
||||
run_test.call(create_table_mow_v1(tableName), tableName)
|
||||
|
||||
// inverted index format v2 with mow
|
||||
logger.info("=== Test 4: Inverted index format v2 with mow case ===")
|
||||
tableName = "tbl_binlog_config_change_index_mow_v2"
|
||||
run_test.call(create_table_mow_v2(tableName), tableName)
|
||||
|
||||
// TODO: bugfix
|
||||
// test 2: source cluster disable and re-enable binlog
|
||||
// target_sql "DROP TABLE IF EXISTS ${tableName}"
|
||||
// target_sql """
|
||||
// CREATE TABLE if NOT EXISTS ${tableName}
|
||||
// (
|
||||
// `test` INT,
|
||||
// `id` INT
|
||||
// )
|
||||
// ENGINE=OLAP
|
||||
// UNIQUE KEY(`test`, `id`)
|
||||
// DISTRIBUTED BY HASH(id) BUCKETS 1
|
||||
// PROPERTIES (
|
||||
// "replication_allocation" = "tag.location.default: 1"
|
||||
// )
|
||||
// """
|
||||
// sql """ALTER TABLE ${tableName} set ("binlog.enable" = "false")"""
|
||||
// sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
|
||||
|
||||
// syncer.context.seq = -1
|
||||
|
||||
// assertTrue(syncer.getBinlog("${tableName}"))
|
||||
// assertTrue(syncer.beginTxn("${tableName}"))
|
||||
// assertTrue(syncer.ingestBinlog())
|
||||
// assertTrue(syncer.commitTxn())
|
||||
// assertTrue(syncer.checkTargetVersion())
|
||||
|
||||
// res = target_sql """SELECT * FROM ${tableName} WHERE test=${test_num}"""
|
||||
// assertTrue(res.size() == insert_num)
|
||||
|
||||
}
|
||||
@ -0,0 +1,239 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_get_binlog_case_index") {
|
||||
def syncer = getSyncer()
|
||||
if (!syncer.checkEnableFeatureBinlog()) {
|
||||
logger.info("fe enable_feature_binlog is false, skip case test_get_binlog_case_index")
|
||||
return
|
||||
}
|
||||
|
||||
def insert_data = { tableName ->
|
||||
[
|
||||
"""INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 99);""",
|
||||
"""INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple", 100);""",
|
||||
"""INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear", 99);""",
|
||||
"""INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple", 100);""",
|
||||
"""INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear", 99);"""
|
||||
]
|
||||
}
|
||||
|
||||
def sqls = { tableName ->
|
||||
[
|
||||
""" select * from ${tableName} order by id, name, hobbies, score """,
|
||||
""" select * from ${tableName} where name match "andy" order by id, name, hobbies, score """,
|
||||
""" select * from ${tableName} where hobbies match "pear" order by id, name, hobbies, score """,
|
||||
""" select * from ${tableName} where score < 100 order by id, name, hobbies, score """
|
||||
]
|
||||
}
|
||||
|
||||
def run_sql = { tableName ->
|
||||
sqls(tableName).each { sqlStatement ->
|
||||
def target_res = target_sql sqlStatement
|
||||
def res = sql sqlStatement
|
||||
assertEquals(res, target_res)
|
||||
}
|
||||
}
|
||||
|
||||
def create_table_v1 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES ( "replication_num" = "1");
|
||||
"""
|
||||
}
|
||||
|
||||
def create_table_v2 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES ( "replication_num" = "1", "inverted_index_storage_format" = "V2");
|
||||
"""
|
||||
}
|
||||
|
||||
def create_table_mow_v1 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_num" = "1",
|
||||
"binlog.enable" = "true",
|
||||
"enable_unique_key_merge_on_write" = "true");
|
||||
"""
|
||||
}
|
||||
|
||||
def create_table_mow_v2 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_num" = "1",
|
||||
"binlog.enable" = "true",
|
||||
"enable_unique_key_merge_on_write" = "true",
|
||||
"inverted_index_storage_format" = "V2");
|
||||
"""
|
||||
}
|
||||
|
||||
def run_test = { create_table, tableName ->
|
||||
long seq = -1
|
||||
sql "DROP TABLE IF EXISTS ${tableName}"
|
||||
sql create_table
|
||||
sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
|
||||
sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 100); """
|
||||
assertTrue(syncer.getBinlog("${tableName}"))
|
||||
long firstSeq = syncer.context.seq
|
||||
|
||||
logger.info("=== Test 1: normal case ===")
|
||||
insert_data(tableName).each { sqlStatement ->
|
||||
sql sqlStatement
|
||||
assertTrue(syncer.getBinlog("${tableName}"))
|
||||
}
|
||||
|
||||
long endSeq = syncer.context.seq
|
||||
|
||||
logger.info("=== Test 2: Abnormal seq case ===")
|
||||
logger.info("=== Test 2.1: too old seq case ===")
|
||||
syncer.context.seq = -1
|
||||
assertTrue(syncer.context.seq == -1)
|
||||
assertTrue(syncer.getBinlog("${tableName}"))
|
||||
assertTrue(syncer.context.seq == firstSeq)
|
||||
|
||||
|
||||
logger.info("=== Test 2.2: too new seq case ===")
|
||||
syncer.context.seq = endSeq + 100
|
||||
assertTrue((syncer.getBinlog("${tableName}")) == false)
|
||||
|
||||
|
||||
logger.info("=== Test 2.3: not find table case ===")
|
||||
assertTrue(syncer.getBinlog("this_is_an_invalid_tbl") == false)
|
||||
|
||||
|
||||
logger.info("=== Test 2.4: seq between first and end case ===")
|
||||
long midSeq = (firstSeq + endSeq) / 2
|
||||
syncer.context.seq = midSeq
|
||||
assertTrue(syncer.getBinlog("${tableName}"))
|
||||
long test5Seq = syncer.context.seq
|
||||
assertTrue(firstSeq <= test5Seq && test5Seq <= endSeq)
|
||||
|
||||
logger.info("=== Test 3: Get binlog with different priv user case ===")
|
||||
logger.info("=== Test 3.1: read only user get binlog case ===")
|
||||
// TODO: bugfix
|
||||
// syncer.context.seq = -1
|
||||
// readOnlyUser = "read_only_user"
|
||||
// sql """DROP USER IF EXISTS ${readOnlyUser}"""
|
||||
// sql """CREATE USER ${readOnlyUser} IDENTIFIED BY '123456'"""
|
||||
// sql """GRANT ALL ON ${context.config.defaultDb}.* TO ${readOnlyUser}"""
|
||||
// sql """GRANT SELECT_PRIV ON TEST_${context.dbName}.${tableName} TO ${readOnlyUser}"""
|
||||
// syncer.context.user = "${readOnlyUser}"
|
||||
// syncer.context.passwd = "123456"
|
||||
// assertTrue(syncer.getBinlog("${tableName}"))
|
||||
|
||||
}
|
||||
|
||||
// inverted index format v1
|
||||
logger.info("=== Test 1: Inverted index format v1 case ===")
|
||||
def tableName = "tbl_get_binlog_case_index_v1"
|
||||
run_test.call(create_table_v1(tableName), tableName)
|
||||
|
||||
// inverted index format v2
|
||||
logger.info("=== Test 2: Inverted index format v2 case ===")
|
||||
tableName = "tbl_get_binlog_case_index_v2"
|
||||
run_test.call(create_table_v2(tableName), tableName)
|
||||
|
||||
// inverted index format v1 with mow
|
||||
logger.info("=== Test 3: Inverted index format v1 with mow case ===")
|
||||
tableName = "tbl_get_binlog_case_index_mow_v1"
|
||||
run_test.call(create_table_mow_v1(tableName), tableName)
|
||||
|
||||
// inverted index format v2 with mow
|
||||
logger.info("=== Test 4: Inverted index format v2 with mow case ===")
|
||||
tableName = "tbl_get_binlog_case_index_mow_v2"
|
||||
run_test.call(create_table_mow_v2(tableName), tableName)
|
||||
|
||||
logger.info("=== Test 3: no priv user get binlog case ===")
|
||||
syncer.context.seq = -1
|
||||
def noPrivUser = "no_priv_user2"
|
||||
def emptyTable = "tbl_empty_test"
|
||||
sql "DROP TABLE IF EXISTS ${emptyTable}"
|
||||
sql """
|
||||
CREATE TABLE ${emptyTable} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES ( "replication_num" = "1");
|
||||
"""
|
||||
sql """CREATE USER IF NOT EXISTS ${noPrivUser} IDENTIFIED BY '123456'"""
|
||||
sql """GRANT ALL ON ${context.config.defaultDb}.* TO ${noPrivUser}"""
|
||||
sql """GRANT ALL ON TEST_${context.dbName}.${emptyTable} TO ${noPrivUser}"""
|
||||
syncer.context.user = "${noPrivUser}"
|
||||
syncer.context.passwd = "123456"
|
||||
assertTrue((syncer.getBinlog("${tableName}")) == false)
|
||||
|
||||
|
||||
logger.info("=== Test 3.3: Non-existent user set in syncer get binlog case ===")
|
||||
syncer.context.user = "this_is_an_invalid_user"
|
||||
syncer.context.passwd = "this_is_an_invalid_user"
|
||||
assertTrue(syncer.getBinlog("${tableName}", false) == false)
|
||||
}
|
||||
@ -0,0 +1,223 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_ingest_binlog_index") {
|
||||
|
||||
def syncer = getSyncer()
|
||||
if (!syncer.checkEnableFeatureBinlog()) {
|
||||
logger.info("fe enable_feature_binlog is false, skip case test_ingest_binlog_index")
|
||||
return
|
||||
}
|
||||
|
||||
def insert_data = { tableName ->
|
||||
[
|
||||
"""INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 100);""",
|
||||
"""INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 99);""",
|
||||
"""INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple", 100);""",
|
||||
"""INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear", 99);""",
|
||||
"""INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple", 100);""",
|
||||
"""INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear", 99);"""
|
||||
]
|
||||
}
|
||||
|
||||
def sqls = { tableName ->
|
||||
[
|
||||
""" select * from ${tableName} order by id, name, hobbies, score """,
|
||||
""" select * from ${tableName} where name match "andy" order by id, name, hobbies, score """,
|
||||
""" select * from ${tableName} where hobbies match "pear" order by id, name, hobbies, score """,
|
||||
""" select * from ${tableName} where score < 100 order by id, name, hobbies, score """
|
||||
]
|
||||
}
|
||||
|
||||
def run_sql = { tableName ->
|
||||
sqls(tableName).each { sqlStatement ->
|
||||
def target_res = target_sql sqlStatement
|
||||
def res = sql sqlStatement
|
||||
assertEquals(res, target_res)
|
||||
}
|
||||
}
|
||||
|
||||
def create_table_v1 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES ( "replication_num" = "1");
|
||||
"""
|
||||
}
|
||||
|
||||
def create_table_v2 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES ( "replication_num" = "1", "inverted_index_storage_format" = "V2");
|
||||
"""
|
||||
}
|
||||
|
||||
def create_table_mow_v1 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_num" = "1",
|
||||
"binlog.enable" = "true",
|
||||
"enable_unique_key_merge_on_write" = "true");
|
||||
"""
|
||||
}
|
||||
|
||||
def create_table_mow_v2 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_num" = "1",
|
||||
"binlog.enable" = "true",
|
||||
"enable_unique_key_merge_on_write" = "true",
|
||||
"inverted_index_storage_format" = "V2");
|
||||
"""
|
||||
}
|
||||
|
||||
def run_test = { create_table, tableName ->
|
||||
sql "DROP TABLE IF EXISTS ${tableName}"
|
||||
sql create_table
|
||||
sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
|
||||
|
||||
target_sql "DROP TABLE IF EXISTS ${tableName}"
|
||||
target_sql create_table
|
||||
assertTrue(syncer.getTargetMeta("${tableName}"))
|
||||
|
||||
logger.info("=== Test 1: Common ingest binlog case ===")
|
||||
insert_data.call(tableName).each { sqlStatement ->
|
||||
sql sqlStatement
|
||||
assertTrue(syncer.getBinlog("${tableName}"))
|
||||
assertTrue(syncer.beginTxn("${tableName}"))
|
||||
assertTrue(syncer.getBackendClients())
|
||||
assertTrue(syncer.ingestBinlog())
|
||||
assertTrue(syncer.commitTxn())
|
||||
assertTrue(syncer.checkTargetVersion())
|
||||
syncer.closeBackendClients()
|
||||
}
|
||||
|
||||
target_sql " sync "
|
||||
res = target_sql """SELECT * FROM ${tableName}"""
|
||||
if (tableName.contains("mow")) {
|
||||
assertEquals(res.size(), insert_data(tableName).size() / 2 as Integer)
|
||||
} else {
|
||||
assertEquals(res.size(), insert_data(tableName).size())
|
||||
}
|
||||
run_sql.call(tableName)
|
||||
|
||||
logger.info("=== Test 2: Wrong IngestBinlogRequest case ===")
|
||||
sql """INSERT INTO ${tableName} VALUES (4, "bason", "bason hate pear", 99);"""
|
||||
assertTrue(syncer.getBinlog("${tableName}"))
|
||||
assertTrue(syncer.beginTxn("${tableName}"))
|
||||
assertTrue(syncer.getBackendClients())
|
||||
|
||||
|
||||
logger.info("=== Test 2.1: Wrong txnId case ===")
|
||||
// TODO: bugfix
|
||||
// def originTxnId = syncer.context.txnId
|
||||
// syncer.context.txnId = -1
|
||||
// assertTrue(syncer.ingestBinlog() == false)
|
||||
// syncer.context.txnId = originTxnId
|
||||
|
||||
|
||||
logger.info("=== Test 2.2: Wrong binlog version case ===")
|
||||
// -1 means use the number of syncer.context
|
||||
// Boolean ingestBinlog(long fakePartitionId = -1, long fakeVersion = -1)
|
||||
// use fakeVersion = 1, 1 is doris be talet first version, so no binlog, only http error
|
||||
assertTrue(syncer.ingestBinlog(-1, 1) == false)
|
||||
|
||||
|
||||
logger.info("=== Test 2.3: Wrong partitionId case ===")
|
||||
// TODO: bugfix
|
||||
// assertTrue(syncer.ingestBinlog(1, -1) == false)
|
||||
|
||||
|
||||
logger.info("=== Test 2.4: Right case ===")
|
||||
assertTrue(syncer.ingestBinlog())
|
||||
assertTrue(syncer.commitTxn())
|
||||
assertTrue(syncer.checkTargetVersion())
|
||||
target_sql " sync "
|
||||
res = target_sql """SELECT * FROM ${tableName} WHERE id=4"""
|
||||
assertEquals(res.size(), 1)
|
||||
|
||||
|
||||
// End Test 2
|
||||
syncer.closeBackendClients()
|
||||
}
|
||||
|
||||
// inverted index format v1
|
||||
logger.info("=== Test 1: Inverted index format v1 case ===")
|
||||
def tableName = "tbl_ingest_binlog_index_v1"
|
||||
run_test.call(create_table_v1(tableName), tableName)
|
||||
|
||||
// inverted index format v2
|
||||
logger.info("=== Test 2: Inverted index format v2 case ===")
|
||||
tableName = "tbl_ingest_binlog_index_v2"
|
||||
run_test.call(create_table_v2(tableName), tableName)
|
||||
|
||||
// inverted index format v1 with mow
|
||||
logger.info("=== Test 3: Inverted index format v1 with mow case ===")
|
||||
tableName = "tbl_ingest_binlog_index_mow_v1"
|
||||
run_test.call(create_table_mow_v1(tableName), tableName)
|
||||
|
||||
// inverted index format v2 with mow
|
||||
logger.info("=== Test 4: Inverted index format v2 with mow case ===")
|
||||
tableName = "tbl_ingest_binlog_index_mow_v2"
|
||||
run_test.call(create_table_mow_v2(tableName), tableName)
|
||||
|
||||
}
|
||||
@ -0,0 +1,180 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_multi_buckets_index") {
|
||||
|
||||
def syncer = getSyncer()
|
||||
if (!syncer.checkEnableFeatureBinlog()) {
|
||||
logger.info("fe enable_feature_binlog is false, skip case test_multi_buckets_index")
|
||||
return
|
||||
}
|
||||
def insert_data = { tableName ->
|
||||
[
|
||||
"""INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 100);""",
|
||||
"""INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 99);""",
|
||||
"""INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple", 100);""",
|
||||
"""INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear", 99);""",
|
||||
"""INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple", 100);""",
|
||||
"""INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear", 99);"""
|
||||
]
|
||||
}
|
||||
|
||||
def sqls = { tableName ->
|
||||
[
|
||||
""" select * from ${tableName} order by id, name, hobbies, score """,
|
||||
""" select * from ${tableName} where name match "andy" order by id, name, hobbies, score """,
|
||||
""" select * from ${tableName} where hobbies match "pear" order by id, name, hobbies, score """,
|
||||
""" select * from ${tableName} where score < 100 order by id, name, hobbies, score """
|
||||
]
|
||||
}
|
||||
|
||||
def run_sql = { tableName ->
|
||||
sqls(tableName).each { sqlStatement ->
|
||||
def target_res = target_sql sqlStatement
|
||||
def res = sql sqlStatement
|
||||
assertEquals(res, target_res)
|
||||
}
|
||||
}
|
||||
|
||||
def create_table_v1 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 3
|
||||
PROPERTIES ( "replication_num" = "1");
|
||||
"""
|
||||
}
|
||||
|
||||
def create_table_v2 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 3
|
||||
PROPERTIES ( "replication_num" = "1", "inverted_index_storage_format" = "V2");
|
||||
"""
|
||||
}
|
||||
|
||||
def create_table_mow_v1 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 3
|
||||
PROPERTIES (
|
||||
"replication_num" = "1",
|
||||
"binlog.enable" = "true",
|
||||
"enable_unique_key_merge_on_write" = "true");
|
||||
"""
|
||||
}
|
||||
|
||||
def create_table_mow_v2 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 3
|
||||
PROPERTIES (
|
||||
"replication_num" = "1",
|
||||
"binlog.enable" = "true",
|
||||
"enable_unique_key_merge_on_write" = "true",
|
||||
"inverted_index_storage_format" = "V2");
|
||||
"""
|
||||
}
|
||||
|
||||
def run_test = { create_table, tableName ->
|
||||
sql "DROP TABLE IF EXISTS ${tableName}"
|
||||
sql create_table
|
||||
sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""
|
||||
|
||||
target_sql "DROP TABLE IF EXISTS ${tableName}"
|
||||
target_sql create_table
|
||||
assertTrue(syncer.getTargetMeta("${tableName}"))
|
||||
|
||||
insert_data(tableName).each { sqlStatement ->
|
||||
sql sqlStatement
|
||||
assertTrue(syncer.getBinlog("${tableName}"))
|
||||
assertTrue(syncer.beginTxn("${tableName}"))
|
||||
assertTrue(syncer.getBackendClients())
|
||||
assertTrue(syncer.ingestBinlog())
|
||||
assertTrue(syncer.commitTxn())
|
||||
assertTrue(syncer.checkTargetVersion())
|
||||
syncer.closeBackendClients()
|
||||
}
|
||||
|
||||
target_sql " sync "
|
||||
def res = target_sql """SELECT * FROM ${tableName}"""
|
||||
if (tableName.contains("mow")) {
|
||||
assertEquals(res.size(), insert_data(tableName).size() / 2 as Integer)
|
||||
} else {
|
||||
assertEquals(res.size(), insert_data(tableName).size())
|
||||
}
|
||||
run_sql(tableName)
|
||||
}
|
||||
|
||||
// inverted index format v1
|
||||
logger.info("=== Test 1: Inverted index format v1 case ===")
|
||||
def tableName = "tbl_multi_buckets_index_v1"
|
||||
run_test.call(create_table_v1(tableName), tableName)
|
||||
// inverted index format v2
|
||||
logger.info("=== Test 2: Inverted index format v2 case ===")
|
||||
tableName = "tbl_multi_buckets_index_v2"
|
||||
run_test.call(create_table_v2(tableName), tableName)
|
||||
|
||||
// inverted index format v1 with mow
|
||||
logger.info("=== Test 3: Inverted index format v1 with mow case ===")
|
||||
tableName = "tbl_multi_buckets_index_mow_v1"
|
||||
run_test.call(create_table_mow_v1(tableName), tableName)
|
||||
|
||||
// inverted index format v2 with mow
|
||||
logger.info("=== Test 4: Inverted index format v2 with mow case ===")
|
||||
tableName = "tbl_multi_buckets_index_mow_v2"
|
||||
run_test.call(create_table_mow_v2(tableName), tableName)
|
||||
}
|
||||
@ -0,0 +1,196 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_backup_restore_index") {
|
||||
|
||||
def syncer = getSyncer()
|
||||
if (!syncer.checkEnableFeatureBinlog()) {
|
||||
logger.info("fe enable_feature_binlog is false, skip case test_backup_restore")
|
||||
return
|
||||
}
|
||||
|
||||
def insert_data = { tableName ->
|
||||
[
|
||||
"""INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 100);""",
|
||||
"""INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 99);""",
|
||||
"""INSERT INTO ${tableName} VALUES (2, "andy", "andy love apple", 100);""",
|
||||
"""INSERT INTO ${tableName} VALUES (2, "bason", "bason hate pear", 99);""",
|
||||
"""INSERT INTO ${tableName} VALUES (3, "andy", "andy love apple", 100);""",
|
||||
"""INSERT INTO ${tableName} VALUES (3, "bason", "bason hate pear", 99);"""
|
||||
]
|
||||
}
|
||||
|
||||
def sqls = { tableName ->
|
||||
[
|
||||
""" select * from ${tableName} order by id, name, hobbies, score """,
|
||||
""" select * from ${tableName} where name match "andy" order by id, name, hobbies, score """,
|
||||
""" select * from ${tableName} where hobbies match "pear" order by id, name, hobbies, score """,
|
||||
""" select * from ${tableName} where score < 100 order by id, name, hobbies, score """
|
||||
]
|
||||
}
|
||||
|
||||
def run_sql = { tableName ->
|
||||
sqls(tableName).each { sqlStatement ->
|
||||
def target_res = target_sql sqlStatement
|
||||
def res = sql sqlStatement
|
||||
assertEquals(res, target_res)
|
||||
}
|
||||
}
|
||||
|
||||
def create_table_v1 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_num" = "1",
|
||||
"binlog.enable" = "true"
|
||||
);
|
||||
"""
|
||||
}
|
||||
|
||||
def create_table_v2 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_num" = "1",
|
||||
"binlog.enable" = "true",
|
||||
"inverted_index_storage_format" = "V2"
|
||||
);
|
||||
"""
|
||||
}
|
||||
|
||||
def create_table_mow_v1 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_num" = "1",
|
||||
"binlog.enable" = "true",
|
||||
"enable_unique_key_merge_on_write" = "true");
|
||||
"""
|
||||
}
|
||||
|
||||
def create_table_mow_v2 = { tableName ->
|
||||
"""
|
||||
CREATE TABLE ${tableName} (
|
||||
`id` int(11) NULL,
|
||||
`name` varchar(255) NULL,
|
||||
`hobbies` text NULL,
|
||||
`score` int(11) NULL,
|
||||
index index_name (name) using inverted,
|
||||
index index_hobbies (hobbies) using inverted properties("parser"="english"),
|
||||
index index_score (score) using inverted
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_num" = "1",
|
||||
"binlog.enable" = "true",
|
||||
"enable_unique_key_merge_on_write" = "true",
|
||||
"inverted_index_storage_format" = "V2");
|
||||
"""
|
||||
}
|
||||
|
||||
def run_test = { create_table, tableName ->
|
||||
sql "DROP TABLE IF EXISTS ${tableName}"
|
||||
sql create_table
|
||||
|
||||
logger.info("=== Test 1: Common backup and restore ===")
|
||||
def snapshotName = "snapshot_test_" + tableName
|
||||
insert_data(tableName).each { sqlStatement ->
|
||||
sql sqlStatement
|
||||
}
|
||||
sql " sync "
|
||||
def res = sql "SELECT * FROM ${tableName}"
|
||||
if (tableName.contains("mow")) {
|
||||
assertEquals(res.size(), insert_data(tableName).size() / 2 as Integer)
|
||||
} else {
|
||||
assertEquals(res.size(), insert_data(tableName).size())
|
||||
}
|
||||
|
||||
sql """
|
||||
BACKUP SNAPSHOT ${context.dbName}.${snapshotName}
|
||||
TO `__keep_on_local__`
|
||||
ON (${tableName})
|
||||
PROPERTIES ("type" = "full")
|
||||
"""
|
||||
syncer.waitSnapshotFinish()
|
||||
assertTrue(syncer.getSnapshot("${snapshotName}", "${tableName}"))
|
||||
assertTrue(syncer.restoreSnapshot(true))
|
||||
syncer.waitTargetRestoreFinish()
|
||||
target_sql " sync "
|
||||
res = target_sql "SELECT * FROM ${tableName}"
|
||||
if (tableName.contains("mow")) {
|
||||
assertEquals(res.size(), insert_data(tableName).size() / 2 as Integer)
|
||||
} else {
|
||||
assertEquals(res.size(), insert_data(tableName).size())
|
||||
}
|
||||
run_sql(tableName)
|
||||
}
|
||||
|
||||
// inverted index format v1
|
||||
logger.info("=== Test 1: Inverted index format v1 case ===")
|
||||
def tableName = "tbl_backup_restore_index_v1"
|
||||
run_test.call(create_table_v1(tableName), tableName)
|
||||
|
||||
// inverted index format v2
|
||||
logger.info("=== Test 2: Inverted index format v2 case ===")
|
||||
tableName = "tbl_backup_restore_index_v2"
|
||||
run_test.call(create_table_v2(tableName), tableName)
|
||||
|
||||
// inverted index format v1 with mow
|
||||
logger.info("=== Test 3: Inverted index format v1 with mow case ===")
|
||||
tableName = "tbl_backup_restore_index_mow_v1"
|
||||
run_test.call(create_table_mow_v1(tableName), tableName)
|
||||
|
||||
// inverted index format v2 with mow
|
||||
logger.info("=== Test 4: Inverted index format v2 with mow case ===")
|
||||
tableName = "tbl_backup_restore_index_mow_v2"
|
||||
run_test.call(create_table_mow_v2(tableName), tableName)
|
||||
}
|
||||
Reference in New Issue
Block a user