From 17cf4ab2c1333ef75c0631910ae87ee70a280b74 Mon Sep 17 00:00:00 2001 From: HowardQin Date: Sun, 7 Jan 2024 19:50:16 +0800 Subject: [PATCH] [case](regression) streamload publish timeout (#29457) Co-authored-by: qinhao --- .../stream_load/test_publish_timeout.out | 4 + .../stream_load/test_publish_timeout.groovy | 111 ++++++++++++++++++ 2 files changed, 115 insertions(+) create mode 100644 regression-test/data/load_p0/stream_load/test_publish_timeout.out create mode 100644 regression-test/suites/load_p0/stream_load/test_publish_timeout.groovy diff --git a/regression-test/data/load_p0/stream_load/test_publish_timeout.out b/regression-test/data/load_p0/stream_load/test_publish_timeout.out new file mode 100644 index 0000000000..1db8882f88 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_publish_timeout.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +2500 + diff --git a/regression-test/suites/load_p0/stream_load/test_publish_timeout.groovy b/regression-test/suites/load_p0/stream_load/test_publish_timeout.groovy new file mode 100644 index 0000000000..c585382961 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_publish_timeout.groovy @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_publish_timeout", "nonConcurrent") { + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def get_be_param = { paramName -> + // assuming paramName on all BEs have save value + String backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + for (Object ele in (List) configList) { + assert ele instanceof List + if (((List) ele)[0] == paramName) { + return ((List) ele)[2] + } + } + } + + def set_be_param = { paramName, paramValue -> + // for eache BE node, set paramName=paramValue + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + } + + String saved_txn_commit_rpc_timeout_ms = get_be_param.call("txn_commit_rpc_timeout_ms") + + log.info("Old txn_commit_rpc_timeout_ms value is : ${saved_txn_commit_rpc_timeout_ms}".toString()) + + // Setting txn_commit_rpc_timeout_ms =< 0 will trigger publish timeout + set_be_param.call("txn_commit_rpc_timeout_ms", "0") + + log.info("New txn_commit_rpc_timeout_ms value is : 0".toString()) + + def testTable = "all_types" + try { + sql "DROP TABLE IF EXISTS ${testTable}" + sql """ + CREATE TABLE IF NOT EXISTS ${testTable} ( + `k1` int(11) NULL, + `k2` tinyint(4) NULL, + `k3` smallint(6) NULL, + `k4` bigint(20) NULL, + `k5` largeint(40) NULL, + `k6` float NULL, + `k7` double NULL, + `k8` decimal(9, 0) NULL, + `k9` char(10) NULL, + `k10` varchar(1024) NULL, + `k11` text NULL, + `k12` date NULL, + `k13` datetime NULL + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + streamLoad { + table "${testTable}" + // set http request header params + set 'label', "test_publish_timeout_" + UUID.randomUUID().toString() + set 'format', 'csv' + set 'column_separator', ',' + file 'all_types.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertTrue(json.Message.contains("PUBLISH_TIMEOUT")) + assertEquals(2500, json.NumberTotalRows) + assertEquals(2500, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + } + } + + qt_sql "SELECT COUNT(*) FROM ${testTable}" + } finally { + sql "DROP TABLE IF EXISTS ${testTable}" + } + set_be_param.call("txn_commit_rpc_timeout_ms", saved_txn_commit_rpc_timeout_ms) + log.info("Set txn_commit_rpc_timeout_ms value to : ${saved_txn_commit_rpc_timeout_ms}".toString()) +}