From 7ba66c58909101e3af18646fcd5e751784f9fa97 Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Sat, 11 May 2024 11:01:18 +0800 Subject: [PATCH] [branch-2.1](routine-load) do not schedule task when there is no data (#34654) --- be/src/runtime/routine_load/data_consumer.cpp | 45 ++++ be/src/runtime/routine_load/data_consumer.h | 3 + .../routine_load_task_executor.cpp | 23 ++ .../routine_load/routine_load_task_executor.h | 4 + be/src/service/internal_service.cpp | 18 +- .../doris/datasource/kafka/KafkaUtil.java | 76 ++++++ .../load/routineload/KafkaRoutineLoadJob.java | 37 ++- .../routineload/KafkaRoutineLoadJobTest.java | 15 ++ .../load/routineload/RoutineLoadJobTest.java | 15 ++ gensrc/proto/internal_service.proto | 2 + .../test_routine_load_schedule.out | 23 ++ .../routine_load/data/test_schedule.csv | 20 ++ .../test_routine_load_schedule.groovy | 222 ++++++++++++++++++ 13 files changed, 489 insertions(+), 14 deletions(-) create mode 100644 regression-test/data/load_p0/routine_load/test_routine_load_schedule.out create mode 100644 regression-test/suites/load_p0/routine_load/data/test_schedule.csv create mode 100644 regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index ccf5fb4cb2..9c40e85e28 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -415,6 +415,51 @@ Status KafkaDataConsumer::get_latest_offsets_for_partitions( return Status::OK(); } +Status KafkaDataConsumer::get_real_offsets_for_partitions( + const std::vector& offset_flags, std::vector* offsets, + int timeout) { + MonotonicStopWatch watch; + watch.start(); + for (const auto& entry : offset_flags) { + PIntegerPair pair; + if (UNLIKELY(entry.val() >= 0)) { + pair.set_key(entry.key()); + pair.set_val(entry.val()); + offsets->push_back(std::move(pair)); + continue; + } + + int64_t low = 0; + int64_t high = 0; + auto timeout_ms = timeout - static_cast(watch.elapsed_time() / 1000 / 1000); + if (UNLIKELY(timeout_ms <= 0)) { + return Status::InternalError("get kafka real offsets for partitions timeout"); + } + + RdKafka::ErrorCode err = + _k_consumer->query_watermark_offsets(_topic, entry.key(), &low, &high, timeout_ms); + if (UNLIKELY(err != RdKafka::ERR_NO_ERROR)) { + std::stringstream ss; + ss << "failed to get latest offset for partition: " << entry.key() + << ", err: " << RdKafka::err2str(err); + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } + + pair.set_key(entry.key()); + if (entry.val() == -1) { + // OFFSET_END_VAL = -1 + pair.set_val(high); + } else if (entry.val() == -2) { + // OFFSET_BEGINNING_VAL = -2 + pair.set_val(low); + } + offsets->push_back(std::move(pair)); + } + + return Status::OK(); +} + Status KafkaDataConsumer::cancel(std::shared_ptr ctx) { std::unique_lock l(_lock); if (!_init) { diff --git a/be/src/runtime/routine_load/data_consumer.h b/be/src/runtime/routine_load/data_consumer.h index f6c1046778..7ae30b5172 100644 --- a/be/src/runtime/routine_load/data_consumer.h +++ b/be/src/runtime/routine_load/data_consumer.h @@ -155,6 +155,9 @@ public: // get latest offsets for partitions Status get_latest_offsets_for_partitions(const std::vector& partition_ids, std::vector* offsets, int timeout); + // get offsets for times + Status get_real_offsets_for_partitions(const std::vector& offset_flags, + std::vector* offsets, int timeout); private: std::string _brokers; diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 7044155254..0534531aed 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -174,6 +174,29 @@ Status RoutineLoadTaskExecutor::get_kafka_latest_offsets_for_partitions( return st; } +Status RoutineLoadTaskExecutor::get_kafka_real_offsets_for_partitions( + const PKafkaMetaProxyRequest& request, std::vector* partition_offsets, + int timeout) { + CHECK(request.has_kafka_info()); + + // This context is meaningless, just for unifing the interface + std::shared_ptr ctx = std::make_shared(_exec_env); + RETURN_IF_ERROR(_prepare_ctx(request, ctx)); + + std::shared_ptr consumer; + RETURN_IF_ERROR(_data_consumer_pool.get_consumer(ctx, &consumer)); + + Status st = + std::static_pointer_cast(consumer)->get_real_offsets_for_partitions( + std::vector(request.offset_flags().begin(), + request.offset_flags().end()), + partition_offsets, timeout); + if (st.ok()) { + _data_consumer_pool.return_consumer(consumer); + } + return st; +} + Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { std::unique_lock l(_lock); if (_task_map.find(task.id) != _task_map.end()) { diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index e75c8eb881..8f8a6f2d65 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -67,6 +67,10 @@ public: std::vector* partition_offsets, int timeout); + Status get_kafka_real_offsets_for_partitions(const PKafkaMetaProxyRequest& request, + std::vector* partition_offsets, + int timeout); + private: // execute the task void exec_task(std::shared_ptr ctx, DataConsumerPool* pool, diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 30b948f1d2..6abc972634 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1176,7 +1176,23 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, int timeout_ms = request->has_timeout_secs() ? request->timeout_secs() * 1000 : 5 * 1000; if (request->has_kafka_meta_request()) { const PKafkaMetaProxyRequest& kafka_request = request->kafka_meta_request(); - if (!kafka_request.partition_id_for_latest_offsets().empty()) { + if (!kafka_request.offset_flags().empty()) { + std::vector partition_offsets; + Status st = _exec_env->routine_load_task_executor() + ->get_kafka_real_offsets_for_partitions( + request->kafka_meta_request(), &partition_offsets, + timeout_ms); + if (st.ok()) { + PKafkaPartitionOffsets* part_offsets = response->mutable_partition_offsets(); + for (const auto& entry : partition_offsets) { + PIntegerPair* res = part_offsets->add_offset_times(); + res->set_key(entry.key()); + res->set_val(entry.val()); + } + } + st.to_protobuf(response->mutable_status()); + return; + } else if (!kafka_request.partition_id_for_latest_offsets().empty()) { // get latest offsets for specified partition ids std::vector partition_offsets; Status st = _exec_env->routine_load_task_executor() diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java index d2184229bb..00169f4c8e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java @@ -31,6 +31,7 @@ import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -217,4 +218,79 @@ public class KafkaUtil { "Failed to get latest offsets of kafka topic: " + topic + ". error: " + e.getMessage()); } } + + public static List> getRealOffsets(String brokerList, String topic, + Map convertedCustomProperties, + List> offsets) + throws LoadException { + // filter values greater than 0 as these offsets is real offset + // only update offset like OFFSET_BEGINNING or OFFSET_END + List> offsetFlags = new ArrayList<>(); + List> realOffsets = new ArrayList<>(); + for (Pair pair : offsets) { + if (pair.second < 0) { + offsetFlags.add(pair); + } else { + realOffsets.add(pair); + } + } + if (offsetFlags.size() == 0) { + LOG.info("do not need update and directly return offsets for partitions {} in topic: {}", offsets, topic); + return offsets; + } + + TNetworkAddress address = null; + try { + List backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + if (backendIds.isEmpty()) { + throw new LoadException("Failed to get real offsets. No alive backends"); + } + Collections.shuffle(backendIds); + Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); + address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); + + // create request + InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder = + InternalService.PKafkaMetaProxyRequest.newBuilder() + .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder() + .setBrokers(brokerList) + .setTopic(topic) + .addAllProperties( + convertedCustomProperties.entrySet().stream().map( + e -> InternalService.PStringPair.newBuilder() + .setKey(e.getKey()) + .setVal(e.getValue()) + .build() + ).collect(Collectors.toList()) + ) + ); + for (Pair pair : offsetFlags) { + metaRequestBuilder.addOffsetFlags(InternalService.PIntegerPair.newBuilder().setKey(pair.first) + .setVal(pair.second).build()); + } + InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( + metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); + + // get info + Future future = BackendServiceProxy.getInstance().getInfo(address, request); + InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + throw new UserException("failed to get real offsets: " + result.getStatus().getErrorMsgsList()); + } else { + List pairs = result.getPartitionOffsets().getOffsetTimesList(); + List> partitionOffsets = Lists.newArrayList(); + for (InternalService.PIntegerPair pair : pairs) { + partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); + } + realOffsets.addAll(partitionOffsets); + LOG.info("finish to get real offsets for partitions {} in topic: {}", realOffsets, topic); + return realOffsets; + } + } catch (Exception e) { + LOG.warn("failed to get real offsets.", e); + throw new LoadException( + "Failed to get real offsets of kafka topic: " + topic + ". error: " + e.getMessage()); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index c00f16b7d8..43ae98d8f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -512,17 +512,20 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { for (Integer kafkaPartition : newPartitions) { partitionOffsets.add(Pair.of(kafkaPartition, beginOffset)); } - if (isOffsetForTimes()) { - try { + try { + if (isOffsetForTimes()) { partitionOffsets = KafkaUtil.getOffsetsForTimes(this.brokerList, this.topic, convertedCustomProperties, partitionOffsets); - } catch (LoadException e) { - LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("partition:timestamp", Joiner.on(",").join(partitionOffsets)) - .add("error_msg", "Job failed to fetch current offsets from times with error " + e.getMessage()) - .build(), e); - throw new UserException(e); + } else { + partitionOffsets = KafkaUtil.getRealOffsets(this.brokerList, + this.topic, convertedCustomProperties, partitionOffsets); } + } catch (LoadException e) { + LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("partition:", Joiner.on(",").join(partitionOffsets)) + .add("error_msg", "Job failed to fetch current offsets with error " + e.getMessage()) + .build(), e); + throw new UserException(e); } return partitionOffsets; } @@ -552,6 +555,10 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { kafkaPartitionOffsets = KafkaUtil.getOffsetsForTimes(kafkaDataSourceProperties.getBrokerList(), kafkaDataSourceProperties.getTopic(), convertedCustomProperties, kafkaDataSourceProperties.getKafkaPartitionOffsets()); + } else { + kafkaPartitionOffsets = KafkaUtil.getRealOffsets(kafkaDataSourceProperties.getBrokerList(), + kafkaDataSourceProperties.getTopic(), + convertedCustomProperties, kafkaDataSourceProperties.getKafkaPartitionOffsets()); } for (Pair partitionOffset : kafkaPartitionOffsets) { @@ -638,9 +645,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { public void modifyProperties(AlterRoutineLoadStmt stmt) throws UserException { Map jobProperties = stmt.getAnalyzedJobProperties(); KafkaDataSourceProperties dataSourceProperties = (KafkaDataSourceProperties) stmt.getDataSourceProperties(); - if (null != dataSourceProperties && dataSourceProperties.isOffsetsForTimes()) { + if (null != dataSourceProperties) { // if the partition offset is set by timestamp, convert it to real offset - convertTimestampToOffset(dataSourceProperties); + convertOffset(dataSourceProperties); } writeLock(); @@ -659,13 +666,17 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { } } - private void convertTimestampToOffset(KafkaDataSourceProperties dataSourceProperties) throws UserException { + private void convertOffset(KafkaDataSourceProperties dataSourceProperties) throws UserException { List> partitionOffsets = dataSourceProperties.getKafkaPartitionOffsets(); if (partitionOffsets.isEmpty()) { return; } - List> newOffsets = KafkaUtil.getOffsetsForTimes(brokerList, topic, - convertedCustomProperties, partitionOffsets); + List> newOffsets; + if (dataSourceProperties.isOffsetsForTimes()) { + newOffsets = KafkaUtil.getOffsetsForTimes(brokerList, topic, convertedCustomProperties, partitionOffsets); + } else { + newOffsets = KafkaUtil.getRealOffsets(brokerList, topic, convertedCustomProperties, partitionOffsets); + } dataSourceProperties.setKafkaPartitionOffsets(newOffsets); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index ba3c9ee626..d188838ad3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -31,6 +31,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; @@ -287,6 +288,20 @@ public class KafkaRoutineLoadJobTest { } }; + new MockUp() { + @Mock + public List> getRealOffsets(String brokerList, String topic, + Map convertedCustomProperties, + List> offsetFlags) + throws LoadException { + List> pairList = new ArrayList<>(); + pairList.add(Pair.of(1, 0L)); + pairList.add(Pair.of(2, 0L)); + pairList.add(Pair.of(3, 0L)); + return pairList; + } + }; + KafkaRoutineLoadJob kafkaRoutineLoadJob = KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt); Assert.assertEquals(jobName, kafkaRoutineLoadJob.getName()); Assert.assertEquals(dbId, kafkaRoutineLoadJob.getDbId()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java index 8d90395745..d9494374c0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java @@ -24,6 +24,8 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Table; import org.apache.doris.common.InternalErrorCode; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.datasource.InternalCatalog; @@ -46,6 +48,7 @@ import org.apache.kafka.common.PartitionInfo; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -274,6 +277,18 @@ public class RoutineLoadJobTest { } }; + new MockUp() { + @Mock + public List> getRealOffsets(String brokerList, String topic, + Map convertedCustomProperties, + List> offsetFlags) + throws LoadException { + List> pairList = new ArrayList<>(); + pairList.add(Pair.of(1, 0L)); + return pairList; + } + }; + RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING); Deencapsulation.setField(routineLoadJob, "progress", kafkaProgress); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 8e0608fd73..12b1b6b1ed 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -391,6 +391,8 @@ message PKafkaMetaProxyRequest { repeated PIntegerPair offset_times = 3; // optional for getting latest offsets of partitons repeated int32 partition_id_for_latest_offsets = 4; + // optional for getting real offset for end/beginning flag + repeated PIntegerPair offset_flags = 5; }; message PProxyRequest { diff --git a/regression-test/data/load_p0/routine_load/test_routine_load_schedule.out b/regression-test/data/load_p0/routine_load/test_routine_load_schedule.out new file mode 100644 index 0000000000..58613e1bdf --- /dev/null +++ b/regression-test/data/load_p0/routine_load/test_routine_load_schedule.out @@ -0,0 +1,23 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +8 2023-08-14 true 109 -31573 -1362465190 3990845741226497177 2732763251146840270 -25698.553 1.312831962567818E9 99999999.9 99999999.9 2023-03-07T14:13:19 2022-10-18 2023-07-16T05:03:13 D PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme {"animal":"lion","weight":200,"habitat":["savannah","grassland"]} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +20 2023-08-17 false -5 18158 784479801 1485484354598941738 -6632681928222776815 9708.431 -3.30432620706069E8 -99999999.9 99999999.9 2022-09-15T21:40:55 2023-02-23 2023-08-13T21:31:54 O X 2pYmX2vAhfEEHZZYPsgAmda1G7otnwx5TmUC879FPhDeIjvWI79ksBZpfFG2gp7jhCSbpZiecKGklB5SvG8tm31i5SUqe1xrWgLt4HSq7lMJWp75tx2kxD7pRIOpn {"name":"Sarah","age":30,"city":"London","isMarried":false} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +21 2023-08-18 false 63 -27847 -35409596 8638201997392767650 4919963231735304178 -23382.541 -1.803403621426313E9 -22009767.0 99999999.9 2023-03-31T10:56:14 2023-01-20 2023-02-18T13:37:52 N T PSiFwUEx3eVFNtjlnQ70YkgZNvKrGmQ2DN5K9yYHiSdFWeEDB1UpL3Frt8z1kEAIWRDWqXZuyi {"city":"Sydney","population":5312000,"area":2058.7} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +31 2023-08-27 false 17 -18849 1728109133 3266501886640700374 527195452623418935 -24062.328 -1.514348021262435E9 -99999999.9 -99999999.9 2022-10-07T03:24:23 2022-09-25 \N 0 8 yKMiAntORoRa8svnMfcxlOPwwND1m5s2fdS26Xu6cfs6HK5SAibqIp9h8sZcpjHy4 {"team":"Manchester United","players":["Ronaldo","Rooney","Giggs"],"coach":"Ole Gunnar Solskjaer"} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +41 2023-08-27 true -104 22750 \N 8527773271030840740 5554497317268279215 -5296.828 -1.71564688801304E9 -99999999.9 99999999.9 2022-12-02T17:56:44 2022-10-12 2023-02-19T07:02:54 V \N E9GzQdTwX1ITUQz27IVznAs6Ca4WwprKk6Odjs6SH75D2F1089QiY3HQ52LXRD1V6xAWjhLE2hWgW3EdHuAOnUDVrb5V {"food":"Sushi","price":10,"restaurant":"Sushi King"} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +49 2023-08-08 false \N 16275 -2144851675 -2303421957908954634 -46526938720058765 -13141.143 -6.866322332302E8 99999999.9 -99999999.9 2022-09-01T00:16:01 2023-03-25 2022-09-07T14:59:03 s yvuILR2iNxfe8RRml {"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +50 2023-08-06 true 109 -6330 1479023892 -8630800697573159428 -1645095773540208759 17880.96 -1.453844792013949E9 -99999999.9 -99999999.9 2022-09-22T02:03:21 2023-05-14 2023-03-25T02:18:34 m JKnIgXvGVidGiWl9YRSi3mFI7wHKt1sBpWSadKF8VX3LAuElm4sdc9gtxREaUr57oikSYlU8We8h1MWqQlYNiJObl {"city":"Tokyo","temperature":20.5,"humidity":75} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +50 2023-08-24 true 15 14403 \N -6418906115745394180 9205303779366462513 -4331.549 -6.15112179557648E8 99999999.9 -99999999.9 2022-12-29T02:27:20 2023-06-01 2023-08-12T04:50:04 a eCl38sztIvBQvGvGKyYZmyMXy9vIJx197iu3JwP9doJGcrYUl9Uova0rz4iCCgrjlAiZU18Fs9YtCq830nhM {"band":"The Beatles","members":["John Lennon","Paul McCartney","George Harrison","Ringo Starr"]} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +57 2023-08-19 true 2 -25462 -74112029 6458082754318544493 -7910671781690629051 -15205.859 -3.06870797484914E8 99999999.9 -99999999.9 2023-07-10T18:39:10 2023-02-12 2023-01-27T07:26:06 y Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ {"name":"John","age":25,"city":"New York"} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +58 2023-08-22 \N 0 -18231 1832867360 6997858407575297145 2480714305422728023 -5450.489 1.475901032138386E9 -99999999.9 -99999999.9 2023-02-02T05:13:24 2022-09-18 2023-04-23T10:51:15 k LdFXF7Kmfzgmnn2R6zLsXdmi3A2cLBLq4G4WDVNDhxvH7dYH8Kga2WA47uSIxp6NSrwPSdw0ssB1TS8RFJTDJAB0Uba3e05NL2Aiw0ja {"restaurant":"Pizza Hut","menu":["pizza","pasta","salad"]} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +60 2023-08-27 false -52 -2338 -757056972 1047567408607120856 6541476642780646552 6614.0894 -1.204448798517855E9 99999999.9 99999999.9 2022-12-29T14:47:30 2022-09-24 2023-08-01T12:41:59 O F RM4F1Ke7lkcnuxF2nK0j9VBW3MDcgyHR4pseBjtFnqS6GUkVFuzF6u3Cp9Nv7ab0O6UYrpP4DhU {"game":"Chess","players":2,"time":"1 hour"} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +62 2023-08-21 false 81 20302 -200761532 6365479976421007608 \N -29916.533 1.709141750828478E9 99999999.9 -99999999.9 2023-05-04T01:14:51 2022-09-17 2022-12-04T19:30:09 d v BKWy9dTNg1aZW7ancEJAmEDOPK5TwFsNSHbI78emu9gymeIlx5NoLmyii0QAqdzRvSQPZKiqKkwInGCTIBnK1yYkK7zD {"username":"user123","password":"pass123","email":"user123@example.com"} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +65 2023-08-09 false 94 31514 814994517 -297697460695940343 734910652450318597 -13061.892 6.2750847041706E7 -9808654.0 \N 2023-08-14T22:01:27 2023-05-19 2022-11-13T13:44:28 V aGeMsI24O12chGlP5ak0AHghAz7bu5MargJBStHnt0yMnChH0JnfYhsfH1u59XIHkJKMsHYktBqORkGlovu8V47E74KeFpaqxn5yLyXfDbhhzUKf {"language":"Python","version":3.9,"frameworks":["Django","Flask"]} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +66 2023-08-15 true -91 28378 609923317 4872185586197131212 1207709464099378591 \N -1.863683325985123E9 -99999999.9 -99999999.9 2022-09-24T10:39:23 2022-09-24 2022-10-16T18:36:43 Y z AI1BSPQdKiHJiQH1kguyLSWsDXkC7zwy7PwgWnyGSaa9tBKRex8vHBdxg2QSKZKL2mV2lHz7iI1PnsTd4MXDcIKhqiHyPuQPt2tEtgt0UgF6 {"book":{"title":"The Great Gatsby","author":"F. Scott Fitzgerald"},"year":1925} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +68 2023-08-23 true -73 20117 1737338128 795638676048937749 -5551546237562433901 -30627.04 6.8589475684545E7 99999999.9 99999999.9 2022-12-28T20:26:51 2022-10-04 2023-07-30T00:20:06 y keZ3JlWWpdnPBejf0cuiCQCVBBTd5gjvO08NVdcAFewqL7nRT4N9lnvSU6pWmletA5VbPQCeQapJdcnQCHfZUDCf4ulCnczyqr7SGrbGRT0XYcd7iktKM {"country":"Brazil","continent":"South America","population":211049527} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +80 2023-08-18 false -18 -8971 679027874 6535956962935330265 3960889045799757165 -13219.76 1.187161924505394E9 -99999999.9 -99999999.9 2023-03-11T07:40 2022-11-29 2023-01-14T07:24:07 \N D 3Nhx6xX1qdwaq7lxwLRSKMtJFbC03swWv12mpySSVysH3igGZTiGPuKMsYW7HAkf6CWc7c0nzqDsjuH3FYVMNCWRmfxMrmY8rykQCC4Ve {"car":"BMW","model":"X5","year":2020,"color":"black"} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +81 2023-08-23 false 106 11492 -667795397 4480250461471356146 -5346660566234294101 9082.75 3.85167225902608E8 -99999999.9 99999999.9 2023-03-20T03:33:16 2022-11-24 2023-02-16T18:29:41 G 9 Lk3eNVQNjucbekD1rZmUlGPiXS5JvcWr2LQzRU8GSGIbSag {"flower":"rose","color":"red","fragrance":true} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +85 2023-08-11 true -7 24304 -2043877415 -2024144417867729183 \N 5363.0244 -5.78615669042831E8 -99999999.9 -99999999.9 2023-07-15T01:07:41 2023-08-13 2023-01-20T11:57:48 i WQ9dh9ajPu0y {"country":"France","capital":"Paris","population":67081000} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +90 2023-08-27 true 22 16456 -1476824962 -3279894870153540825 8990195191470116763 26651.906 2.06860148942546E8 -99999999.9 -99999999.9 2022-10-07T03:11:03 2023-03-18 2023-04-15T00:38:33 T L QW0GQ3GoMtHgxPQOWGfVaveynahNpsNs09siMFA1OtO6QEDBQTdivmGyq7bFzejAqwbbVQQpREAmeLjcFSXLnQuou2KbwYD {"company":"Apple","products":[{"name":"iPhone","price":1000},{"name":"MacBook","price":1500}]} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N +91 2023-08-27 true 90 2465 702240964 6373830997821598984 305860046137409400 15991.356 1.599972327386147E9 -99999999.9 \N 2023-04-26T19:31:10 2023-07-21 \N 2 B7YKYBYT8w0YC926bZ8Yz1VzyiWw2NWDAiTlEoPVyz9AXGti2Npg1FxWqWk4hEaALw0ZBSuiAIPj41lq36g5QRpPmAjNPK {"fruit":"apple","color":"red","qty":5,"price":2.5} true 1 2 3 4 5 6.0 7.0 888888888 999999999 2023-08-24 2023-08-24T12:00 2023-08-24 2023-08-24T12:00 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 我能吞下玻璃而不伤身体 \N + diff --git a/regression-test/suites/load_p0/routine_load/data/test_schedule.csv b/regression-test/suites/load_p0/routine_load/data/test_schedule.csv new file mode 100644 index 0000000000..b58285ed57 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_schedule.csv @@ -0,0 +1,20 @@ +57|2023-08-19|TRUE|2|-25462|-74112029|6458082754318544493|-7910671781690629051|-15205.859375|-306870797.484914|759730669.0|-628556336.0|2023-07-10 18:39:10|2023-02-12|2023-01-27 07:26:06|y||Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ|{"name": "John", "age": 25, "city": "New York"} +49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]} +66|2023-08-15|TRUE|-91|28378|609923317|4872185586197131212|1207709464099378591|\N|-1863683325.985123|-783792012.0|-708986976.0|2022-09-24 10:39:23|2022-09-24|2022-10-16 18:36:43|Y|z|AI1BSPQdKiHJiQH1kguyLSWsDXkC7zwy7PwgWnyGSaa9tBKRex8vHBdxg2QSKZKL2mV2lHz7iI1PnsTd4MXDcIKhqiHyPuQPt2tEtgt0UgF6|{"book": {"title": "The Great Gatsby", "author": "F. Scott Fitzgerald"}, "year": 1925} +91|2023-08-27|TRUE|90|2465|702240964|6373830997821598984|305860046137409400|15991.356445|1599972327.386147|-165530947.0|\N|2023-04-26 19:31:10|2023-07-21|\N|2||B7YKYBYT8w0YC926bZ8Yz1VzyiWw2NWDAiTlEoPVyz9AXGti2Npg1FxWqWk4hEaALw0ZBSuiAIPj41lq36g5QRpPmAjNPK|{"fruit": "apple", "color": "red", "qty": 5, "price": 2.5} +80|2023-08-18|FALSE|-18|-8971|679027874|6535956962935330265|3960889045799757165|-13219.759766|1187161924.505394|-526615878.0|-947410627.0|2023-03-11 07:40:00|2022-11-29|2023-01-14 07:24:07|\N|D|3Nhx6xX1qdwaq7lxwLRSKMtJFbC03swWv12mpySSVysH3igGZTiGPuKMsYW7HAkf6CWc7c0nzqDsjuH3FYVMNCWRmfxMrmY8rykQCC4Ve|{"car": "BMW", "model": "X5", "year": 2020, "color": "black"} +85|2023-08-11|TRUE|-7|24304|-2043877415|-2024144417867729183|\N|5363.024414|-578615669.042831|-378574346.0|-810302932.0|2023-07-15 01:07:41|2023-08-13|2023-01-20 11:57:48|i||WQ9dh9ajPu0y|{"country": "France", "capital": "Paris", "population": 67081000} +31|2023-08-27|FALSE|17|-18849|1728109133|3266501886640700374|527195452623418935|-24062.328125|-1514348021.262435|-322205854.0|-278237157.0|2022-10-07 03:24:23|2022-09-25|\N|0|8|yKMiAntORoRa8svnMfcxlOPwwND1m5s2fdS26Xu6cfs6HK5SAibqIp9h8sZcpjHy4|{"team": "Manchester United", "players": ["Ronaldo", "Rooney", "Giggs"], "coach": "Ole Gunnar Solskjaer"} +20|2023-08-17|FALSE|-5|18158|784479801|1485484354598941738|-6632681928222776815|9708.430664|-330432620.706069|-816424174.0|571112646.0|2022-09-15 21:40:55|2023-02-23|2023-08-13 21:31:54|O|X|2pYmX2vAhfEEHZZYPsgAmda1G7otnwx5TmUC879FPhDeIjvWI79ksBZpfFG2gp7jhCSbpZiecKGklB5SvG8tm31i5SUqe1xrWgLt4HSq7lMJWp75tx2kxD7pRIOpn|{"name": "Sarah", "age": 30, "city": "London", "isMarried": false} +90|2023-08-27|TRUE|22|16456|-1476824962|-3279894870153540825|8990195191470116763|26651.906250|206860148.942546|-580959198.0|-210329147.0|2022-10-07 03:11:03|2023-03-18|2023-04-15 00:38:33|T|L|QW0GQ3GoMtHgxPQOWGfVaveynahNpsNs09siMFA1OtO6QEDBQTdivmGyq7bFzejAqwbbVQQpREAmeLjcFSXLnQuou2KbwYD|{"company": "Apple", "products": [{"name": "iPhone", "price": 1000}, {"name": "MacBook", "price": 1500}]} +8|2023-08-14|TRUE|109|-31573|-1362465190|3990845741226497177|2732763251146840270|-25698.552734|1312831962.567818|771983879.0|173937916.0|2023-03-07 14:13:19|2022-10-18|2023-07-16 05:03:13|D||PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme|{"animal": "lion", "weight": 200, "habitat": ["savannah", "grassland"]} +65|2023-08-09|FALSE|94|31514|814994517|-297697460695940343|734910652450318597|-13061.891602|62750847.041706|-9808654.0|\N|2023-08-14 22:01:27|2023-05-19|2022-11-13 13:44:28|V||aGeMsI24O12chGlP5ak0AHghAz7bu5MargJBStHnt0yMnChH0JnfYhsfH1u59XIHkJKMsHYktBqORkGlovu8V47E74KeFpaqxn5yLyXfDbhhzUKf|{"language": "Python", "version": 3.9, "frameworks": ["Django", "Flask"]} +62|2023-08-21|FALSE|81|20302|-200761532|6365479976421007608|\N|-29916.533203|1709141750.828478|549873536.0|-119205359.0|2023-05-04 01:14:51|2022-09-17|2022-12-04 19:30:09|d|v|BKWy9dTNg1aZW7ancEJAmEDOPK5TwFsNSHbI78emu9gymeIlx5NoLmyii0QAqdzRvSQPZKiqKkwInGCTIBnK1yYkK7zD|{"username": "user123", "password": "pass123", "email": "user123@example.com"} +50|2023-08-06|TRUE|109|-6330|1479023892|-8630800697573159428|-1645095773540208759|17880.960938|-1453844792.013949|-158871820.0|-862940384.0|2022-09-22 02:03:21|2023-05-14|2023-03-25 02:18:34|m||JKnIgXvGVidGiWl9YRSi3mFI7wHKt1sBpWSadKF8VX3LAuElm4sdc9gtxREaUr57oikSYlU8We8h1MWqQlYNiJObl|{"city": "Tokyo", "temperature": 20.5, "humidity": 75} +58|2023-08-22|\N|0|-18231|1832867360|6997858407575297145|2480714305422728023|-5450.488770|1475901032.138386|-893480655.0|-607891858.0|2023-02-02 05:13:24|2022-09-18|2023-04-23 10:51:15|k||LdFXF7Kmfzgmnn2R6zLsXdmi3A2cLBLq4G4WDVNDhxvH7dYH8Kga2WA47uSIxp6NSrwPSdw0ssB1TS8RFJTDJAB0Uba3e05NL2Aiw0ja|{"restaurant": "Pizza Hut", "menu": ["pizza", "pasta", "salad"]} +60|2023-08-27|FALSE|-52|-2338|-757056972|1047567408607120856|6541476642780646552|6614.089355|-1204448798.517855|236657733.0|731515433.0|2022-12-29 14:47:30|2022-09-24|2023-08-01 12:41:59|O|F|RM4F1Ke7lkcnuxF2nK0j9VBW3MDcgyHR4pseBjtFnqS6GUkVFuzF6u3Cp9Nv7ab0O6UYrpP4DhU|{"game": "Chess", "players": 2, "time": "1 hour"} +68|2023-08-23|TRUE|-73|20117|1737338128|795638676048937749|-5551546237562433901|-30627.039062|68589475.684545|585022347.0|513722420.0|2022-12-28 20:26:51|2022-10-04|2023-07-30 00:20:06|y||keZ3JlWWpdnPBejf0cuiCQCVBBTd5gjvO08NVdcAFewqL7nRT4N9lnvSU6pWmletA5VbPQCeQapJdcnQCHfZUDCf4ulCnczyqr7SGrbGRT0XYcd7iktKM|{"country": "Brazil", "continent": "South America", "population": 211049527} +50|2023-08-24|TRUE|15|14403|\N|-6418906115745394180|9205303779366462513|-4331.548828|-615112179.557648|367305015.0|-551652958.0|2022-12-29 02:27:20|2023-06-01|2023-08-12 04:50:04|a||eCl38sztIvBQvGvGKyYZmyMXy9vIJx197iu3JwP9doJGcrYUl9Uova0rz4iCCgrjlAiZU18Fs9YtCq830nhM|{"band": "The Beatles", "members": ["John Lennon", "Paul McCartney", "George Harrison", "Ringo Starr"]} +81|2023-08-23|FALSE|106|11492|-667795397|4480250461471356146|-5346660566234294101|9082.750000|385167225.902608|-717553011.0|649146853.0|2023-03-20 03:33:16|2022-11-24|2023-02-16 18:29:41|G|9|Lk3eNVQNjucbekD1rZmUlGPiXS5JvcWr2LQzRU8GSGIbSag|{"flower": "rose", "color": "red", "fragrance": true} +41|2023-08-27|TRUE|-104|22750|\N|8527773271030840740|5554497317268279215|-5296.828125|-1715646888.013040|-306075962.0|897769189.0|2022-12-02 17:56:44|2022-10-12|2023-02-19 07:02:54|V|\N|E9GzQdTwX1ITUQz27IVznAs6Ca4WwprKk6Odjs6SH75D2F1089QiY3HQ52LXRD1V6xAWjhLE2hWgW3EdHuAOnUDVrb5V|{"food": "Sushi", "price": 10, "restaurant": "Sushi King"} +21|2023-08-18|FALSE|63|-27847|-35409596|8638201997392767650|4919963231735304178|-23382.541016|-1803403621.426313|-22009767.0|661750756.0|2023-03-31 10:56:14|2023-01-20|2023-02-18 13:37:52|N|T|PSiFwUEx3eVFNtjlnQ70YkgZNvKrGmQ2DN5K9yYHiSdFWeEDB1UpL3Frt8z1kEAIWRDWqXZuyi|{"city": "Sydney", "population": 5312000, "area": 2058.7} \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy new file mode 100644 index 0000000000..c8044ad140 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + + +suite("test_routine_load_schedule","p0") { + def kafkaCsvTpoics = [ + "test_schedule", + ] + + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // define kafka + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // Create kafka producer + def producer = new KafkaProducer<>(props) + + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + + sleep(10000) + + def jobName = "testScheduleJob" + def tableName = "test_routine_load_schedule" + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} + ( + k00 INT NOT NULL, + k01 DATE NOT NULL, + k02 BOOLEAN NULL, + k03 TINYINT NULL, + k04 SMALLINT NULL, + k05 INT NULL, + k06 BIGINT NULL, + k07 LARGEINT NULL, + k08 FLOAT NULL, + k09 DOUBLE NULL, + k10 DECIMAL(9,1) NULL, + k11 DECIMALV3(9,1) NULL, + k12 DATETIME NULL, + k13 DATEV2 NULL, + k14 DATETIMEV2 NULL, + k15 CHAR NULL, + k16 VARCHAR NULL, + k17 STRING NULL, + k18 JSON NULL, + kd01 BOOLEAN NOT NULL DEFAULT "TRUE", + kd02 TINYINT NOT NULL DEFAULT "1", + kd03 SMALLINT NOT NULL DEFAULT "2", + kd04 INT NOT NULL DEFAULT "3", + kd05 BIGINT NOT NULL DEFAULT "4", + kd06 LARGEINT NOT NULL DEFAULT "5", + kd07 FLOAT NOT NULL DEFAULT "6.0", + kd08 DOUBLE NOT NULL DEFAULT "7.0", + kd09 DECIMAL NOT NULL DEFAULT "888888888", + kd10 DECIMALV3 NOT NULL DEFAULT "999999999", + kd11 DATE NOT NULL DEFAULT "2023-08-24", + kd12 DATETIME NOT NULL DEFAULT "2023-08-24 12:00:00", + kd13 DATEV2 NOT NULL DEFAULT "2023-08-24", + kd14 DATETIMEV2 NOT NULL DEFAULT "2023-08-24 12:00:00", + kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd18 JSON NULL, + + INDEX idx_inverted_k104 (`k05`) USING INVERTED, + INDEX idx_inverted_k110 (`k11`) USING INVERTED, + INDEX idx_inverted_k113 (`k13`) USING INVERTED, + INDEX idx_inverted_k114 (`k14`) USING INVERTED, + INDEX idx_inverted_k117 (`k17`) USING INVERTED PROPERTIES("parser" = "english"), + INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + + INDEX idx_bitmap_k104 (`k02`) USING BITMAP, + INDEX idx_bitmap_k110 (`kd01`) USING BITMAP + + ) + DUPLICATE KEY(k00) + PARTITION BY RANGE(k01) + ( + PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')), + PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')), + PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01')) + ) + DISTRIBUTED BY HASH(k00) BUCKETS 32 + PROPERTIES ( + "bloom_filter_columns"="k05", + "replication_num" = "1" + ); + """ + sql "sync" + + sql """ + CREATE ROUTINE LOAD ${jobName} on ${tableName} + COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18), + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "test_schedule", + "property.kafka_default_offsets" = "OFFSET_END" + ); + """ + sql "sync" + + def count = 0 + while (true) { + sleep(1000) + def res = sql "show routine load for ${jobName}" + def state = res[0][8].toString() + if (state != "RUNNING") { + count++ + if (count > 300) { + assertEquals(1, 2) + } + continue; + } + break; + } + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // define kafka + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // Create kafka producer + def producer = new KafkaProducer<>(props) + + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + + sleep(5000) + + count = 0 + while (true) { + sleep(1000) + def res = sql "show routine load for ${jobName}" + def state = res[0][8].toString() + if (state != "RUNNING") { + count++ + if (count > 60) { + assertEquals(1, 2) + } + continue; + } + log.info("reason of state changed: ${res[0][11].toString()}".toString()) + break; + } + while (true) { + sleep(1000) + def res = sql "show routine load for ${jobName}" + log.info("routine load statistic: ${res[0][14].toString()}".toString()) + log.info("progress: ${res[0][15].toString()}".toString()) + log.info("lag: ${res[0][16].toString()}".toString()) + def json = parseJson(res[0][14]) + if (json.loadedRows.toString() != "20") { + count++ + if (count > 60) { + assertEquals(1, 2) + } + continue; + } + break; + } + qt_sql "select * from ${tableName} order by k00, k01" + } finally { + sql "stop routine load for ${jobName}" + sql "DROP TABLE IF EXISTS ${tableName}" + } + } +} \ No newline at end of file