From 180d8e5cbdaabcf00d1301c5d85605232b66057c Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Thu, 30 May 2019 21:23:37 +0800 Subject: [PATCH] Modify some thirdparties (#1228) 1. Change Kafka java client from 2.0.0 to 0.10.1.1. Because high version client may not support low server server. 2. Enable SSL in librdkafka --- be/CMakeLists.txt | 2 +- be/src/runtime/routine_load/routine_load_task_executor.h | 3 ++- fe/pom.xml | 2 +- .../apache/doris/load/routineload/KafkaRoutineLoadJob.java | 4 +--- .../doris/load/routineload/KafkaRoutineLoadJobTest.java | 3 +-- .../org/apache/doris/load/routineload/RoutineLoadJobTest.java | 3 +-- thirdparty/build-thirdparty.sh | 4 ++-- 7 files changed, 9 insertions(+), 12 deletions(-) diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 52f9179321..1439fc4aec 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -440,6 +440,7 @@ set(DORIS_LINK_LIBS # Set thirdparty libraries set(DORIS_DEPENDENCIES + mysql ${WL_START_GROUP} rocksdb librdkafka_cpp @@ -455,7 +456,6 @@ set(DORIS_DEPENDENCIES pprof lz4 libevent - mysql curl ${LIBZ} ${LIBBZ2} diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index 63511f2560..bbecb13a05 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -49,7 +49,8 @@ public: } ~RoutineLoadTaskExecutor() { - + _thread_pool.shutdown(); + _thread_pool.join(); } // submit a routine load task diff --git a/fe/pom.xml b/fe/pom.xml index 211dd83cef..8d94da3e76 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -520,7 +520,7 @@ under the License. org.apache.kafka kafka-clients - 2.0.0 + 0.10.1.1 diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 2909d3913a..1b9320b973 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -48,7 +48,6 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -280,8 +279,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { private List getAllKafkaPartitions() throws LoadException { List result = new ArrayList<>(); try { - List partitionList = consumer.partitionsFor(topic, - Duration.ofSeconds(FETCH_PARTITIONS_TIMEOUT_SECOND)); + List partitionList = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionList) { result.add(partitionInfo.partition()); } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 7735151cfa..b64ae82ad0 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -52,7 +52,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -294,7 +293,7 @@ public class KafkaRoutineLoadJobTest { result = tableId; table.getType(); result = Table.TableType.OLAP; - kafkaConsumer.partitionsFor(anyString, (Duration) any); + kafkaConsumer.partitionsFor(anyString); result = kafkaPartitionInfoList; } }; diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java index c86ca50689..667e4ec017 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java @@ -39,7 +39,6 @@ import org.apache.kafka.common.PartitionInfo; import org.junit.Assert; import org.junit.Test; -import java.time.Duration; import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -213,7 +212,7 @@ public class RoutineLoadJobTest { result = database; database.getTable(anyLong); result = table; - kafkaConsumer.partitionsFor(anyString, (Duration) any); + kafkaConsumer.partitionsFor(anyString); result = partitionInfoList; partitionInfo.partition(); result = 1; diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh index a3f45bbd54..f1ece4cf45 100755 --- a/thirdparty/build-thirdparty.sh +++ b/thirdparty/build-thirdparty.sh @@ -173,7 +173,7 @@ build_openssl() { CFLAGS="-fPIC" \ LIBDIR="lib" \ ./Configure --prefix=$TP_INSTALL_DIR -zlib -shared linux-x86_64 - make -j$PARALLEL && make install + make && make install if [ -f $TP_INSTALL_DIR/lib64/libcrypto.a ]; then mkdir -p $TP_INSTALL_DIR/lib && \ cp $TP_INSTALL_DIR/lib64/libcrypto.a $TP_INSTALL_DIR/lib/libcrypto.a && \ @@ -520,7 +520,7 @@ build_librdkafka() { CPPFLAGS="-I${TP_INCLUDE_DIR}" \ LDFLAGS="-L${TP_LIB_DIR}" \ CFLAGS="-fPIC" \ - ./configure --prefix=$TP_INSTALL_DIR --enable-static --disable-ssl --disable-sasl + ./configure --prefix=$TP_INSTALL_DIR --enable-static --disable-sasl make -j$PARALLEL && make install }