[FIX](Map) fix map compaction error (#17795)
When compaction case, memory map offsets coming to same olap convertor which is from 0 to 0+size but it should be continue in different pages when in one segment writer . eg : last block with map offset : [3, 6, 8, ... 100] this block with map offset : [5, 10, 15 ..., 100] the same convertor should record last offset to make later coming offset followed last offset. so after convertor : the current offset should [105, 110, 115, ... 200], then column writer just call append_data() to make the right offset data append pages
This commit is contained in:
@ -893,13 +893,15 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap(
|
||||
|
||||
// offsets data
|
||||
auto& offsets = column_map->get_offsets();
|
||||
// make first offset
|
||||
auto offsets_col = ColumnArray::ColumnOffsets::create();
|
||||
|
||||
// Now map column offsets data layout in memory is [3, 6, 9], and in disk should be [0, 3, 6, 9]
|
||||
_offsets.clear();
|
||||
_offsets.reserve(offsets.size() + 1);
|
||||
_offsets.push_back(_row_pos); // _offsets start with current map offsets
|
||||
_offsets.insert_assume_reserved(offsets.begin(), offsets.end());
|
||||
_offsets.push_back(
|
||||
_base_row); // _offsets must start with current map offsets which coming blocks in continue pages
|
||||
for (auto it = offsets.begin(); it < offsets.end(); ++it) {
|
||||
_offsets.push_back(*it + _base_row);
|
||||
}
|
||||
|
||||
int64_t start_index = _row_pos - 1;
|
||||
int64_t end_index = _row_pos + _num_rows - 1;
|
||||
@ -923,6 +925,7 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap(
|
||||
_results[4] = _key_convertor->get_nullmap();
|
||||
_results[5] = _value_convertor->get_nullmap();
|
||||
|
||||
_base_row += size;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -413,6 +413,7 @@ private:
|
||||
OlapColumnDataConvertorBaseUPtr value_convertor)
|
||||
: _key_convertor(std::move(key_convertor)),
|
||||
_value_convertor(std::move(value_convertor)) {
|
||||
_base_row = 0;
|
||||
_results.resize(6); // size + offset + k_data + v_data + k_nullmap + v_nullmap
|
||||
}
|
||||
|
||||
@ -428,6 +429,7 @@ private:
|
||||
OlapColumnDataConvertorBaseUPtr _value_convertor;
|
||||
std::vector<const void*> _results;
|
||||
PaddedPODArray<UInt64> _offsets;
|
||||
UInt64 _base_row;
|
||||
}; //OlapColumnDataConvertorMap
|
||||
|
||||
private:
|
||||
|
||||
@ -612,12 +612,9 @@ public abstract class Type {
|
||||
return ScalarType.isImplicitlyCastable((ScalarType) t1, (ScalarType) t2, strict);
|
||||
}
|
||||
if (t1.isComplexType() || t2.isComplexType()) {
|
||||
if (t1.isArrayType() && t2.isArrayType()) {
|
||||
if ((t1.isArrayType() && t2.isArrayType()) || (t1.isMapType() && t2.isMapType())
|
||||
|| (t1.isStructType() && t2.isStructType())) {
|
||||
return t1.matchesType(t2);
|
||||
} else if (t1.isMapType() && t2.isMapType()) {
|
||||
return true;
|
||||
} else if (t1.isStructType() && t2.isStructType()) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
2
regression-test/data/load_p0/stream_load/map_2_rows.json
Normal file
2
regression-test/data/load_p0/stream_load/map_2_rows.json
Normal file
@ -0,0 +1,2 @@
|
||||
{"id": 2, "actor": {"login": "azure-sdk", "url": "https://api.github.com/users/azure-sdk", "avatar_url": "https://avatars.githubusercontent.com/u/53356347?", "time": "2022-12-22T00:00:00Z"}}
|
||||
{"id": 26, "actor": {"login": "joshburt", "url": "https://api.github.com/users/joshburt", "avatar_url": "https://avatars.githubusercontent.com/u/5835886?", "repo.name": "shapeandshare/anaconda.enterprise.k8s.sdk", "time": "2022-12-22T00:00:00Z"}}
|
||||
4063
regression-test/data/load_p0/stream_load/map_4093_rows.json
Normal file
4063
regression-test/data/load_p0/stream_load/map_4093_rows.json
Normal file
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,4 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !select --
|
||||
20317
|
||||
|
||||
@ -0,0 +1,175 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
import org.codehaus.groovy.runtime.IOGroovyMethods
|
||||
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Paths
|
||||
|
||||
suite("test_map_load_and_compaction", "p0") {
|
||||
// define a sql table
|
||||
def testTable = "tbl_test_map"
|
||||
def dataFile = "map_2_rows.json";
|
||||
def dataFile1 = "map_4093_rows.json"
|
||||
|
||||
sql "DROP TABLE IF EXISTS ${testTable}"
|
||||
sql "ADMIN SET FRONTEND CONFIG ('enable_map_type' = 'true')"
|
||||
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${testTable} (
|
||||
id BIGINT,
|
||||
actor Map<STRING, STRING>
|
||||
)
|
||||
DUPLICATE KEY(id)
|
||||
DISTRIBUTED BY HASH(id) BUCKETS 10
|
||||
PROPERTIES("replication_num" = "1", "disable_auto_compaction" = "true");
|
||||
"""
|
||||
def streamLoadJson = {assertLoadNum, fileName->
|
||||
// load the json data
|
||||
streamLoad {
|
||||
table testTable
|
||||
|
||||
// set http request header params
|
||||
set 'read_json_by_line', 'true'
|
||||
set 'format', 'json'
|
||||
file fileName // import json file
|
||||
time 10000 // limit inflight 10s
|
||||
|
||||
// if declared a check callback, the default check condition will ignore.
|
||||
// So you must check all condition
|
||||
check { result, exception, startTime, endTime ->
|
||||
if (exception != null) {
|
||||
throw exception
|
||||
}
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertEquals("OK", json.Message)
|
||||
log.info("expect ", assertLoadNum)
|
||||
log.info("now: ", json.NumberTotalRows)
|
||||
assertTrue(assertLoadNum==json.NumberTotalRows)
|
||||
assertTrue(json.LoadBytes > 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def checkCompactionStatus = {compactionStatus, assertRowSetNum->
|
||||
String checkStatus = new String("curl -X GET "+compactionStatus)
|
||||
process = checkStatus.execute()
|
||||
code = process.waitFor()
|
||||
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
|
||||
out = process.getText()
|
||||
logger.info("Check compaction status: code=" + code + ", out=" + out + ", err=" + err)
|
||||
assertEquals(code, 0)
|
||||
def compactStatusJson = parseJson(out.trim())
|
||||
assert compactStatusJson.rowsets instanceof List
|
||||
int rowsetsCount = 0
|
||||
for (String rowset in (List<String>) compactStatusJson.rowsets) {
|
||||
rowsetsCount += Integer.parseInt(rowset.split(" ")[1])
|
||||
}
|
||||
assertTrue(assertRowSetNum==rowsetsCount)
|
||||
}
|
||||
|
||||
|
||||
// try trigger compaction manually and then check backends is still alive
|
||||
try {
|
||||
// load the map data from json file
|
||||
streamLoadJson.call(2, dataFile)
|
||||
|
||||
for (int i = 0; i < 5; ++i) {
|
||||
streamLoadJson.call(4063, dataFile1)
|
||||
}
|
||||
|
||||
// check result
|
||||
qt_select "SELECT count(*) FROM ${testTable};"
|
||||
|
||||
// check here 2 rowsets
|
||||
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus
|
||||
String[][] tablets = sql """ show tablets from ${testTable}; """
|
||||
String[] tablet = tablets[0]
|
||||
// check rowsets number
|
||||
String compactionStatus = tablet[17]
|
||||
checkCompactionStatus.call(compactionStatus, 6)
|
||||
|
||||
// trigger compaction
|
||||
String[][] backends = sql """ show backends; """
|
||||
assertTrue(backends.size() > 0)
|
||||
String backend_id;
|
||||
def backendId_to_backendIP = [:]
|
||||
def backendId_to_backendHttpPort = [:]
|
||||
for (String[] backend in backends) {
|
||||
backendId_to_backendIP.put(backend[0], backend[2])
|
||||
backendId_to_backendHttpPort.put(backend[0], backend[5])
|
||||
}
|
||||
String tablet_id = tablet[0]
|
||||
backend_id = tablet[2]
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("curl -X POST http://")
|
||||
sb.append(backendId_to_backendIP.get(backend_id))
|
||||
sb.append(":")
|
||||
sb.append(backendId_to_backendHttpPort.get(backend_id))
|
||||
sb.append("/api/compaction/run?tablet_id=")
|
||||
sb.append(tablet_id)
|
||||
sb.append("&compact_type=cumulative")
|
||||
|
||||
String command = sb.toString()
|
||||
process = command.execute()
|
||||
code = process.waitFor()
|
||||
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
|
||||
out = process.getText()
|
||||
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
|
||||
assertEquals(code, 0)
|
||||
def compactJson = parseJson(out.trim())
|
||||
assertEquals("success", compactJson.status.toLowerCase())
|
||||
|
||||
// wait compactions done
|
||||
do {
|
||||
Thread.sleep(1000)
|
||||
sb = new StringBuilder();
|
||||
sb.append("curl -X GET http://")
|
||||
sb.append(backendId_to_backendIP.get(backend_id))
|
||||
sb.append(":")
|
||||
sb.append(backendId_to_backendHttpPort.get(backend_id))
|
||||
sb.append("/api/compaction/run_status?tablet_id=")
|
||||
sb.append(tablet_id)
|
||||
|
||||
String cm = sb.toString()
|
||||
logger.info(cm)
|
||||
process = cm.execute()
|
||||
code = process.waitFor()
|
||||
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
|
||||
out = process.getText()
|
||||
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
|
||||
assertEquals(code, 0)
|
||||
def cs = parseJson(out.trim())
|
||||
assertEquals("success", cs.status.toLowerCase())
|
||||
running = cs.run_status
|
||||
} while (running)
|
||||
|
||||
checkCompactionStatus.call(compactionStatus, 1)
|
||||
|
||||
// finally check backend alive
|
||||
backends = sql """ show backends; """
|
||||
assertTrue(backends.size() > 0)
|
||||
for (String[] b : backends) {
|
||||
assertEquals("true", b[9])
|
||||
}
|
||||
} finally {
|
||||
try_sql("DROP TABLE IF EXISTS ${testTable}")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user