[opt](hudi) use spark bundle to read hudi data (#21260)

Use spark-bundle to read hudi data instead of using hive-bundle to read hudi data.

**Advantage** for using spark-bundle to read hudi data:
1. The performance of spark-bundle is more than twice that of hive-bundle
2. spark-bundle using `UnsafeRow` can reduce data copying and GC time of the jvm
3. spark-bundle support `Time Travel`, `Incremental Read`, and `Schema Change`, these functions can be quickly ported to Doris

**Disadvantage** for using spark-bundle to read hudi data:
1. More dependencies make hudi-dependency.jar very cumbersome(from 138M -> 300M)
2. spark-bundle only provides `RDD` interface and cannot be used directly
This commit is contained in:
Ashin Gau
2023-07-04 17:04:49 +08:00
committed by GitHub
parent 90dd8716ed
commit 9adbca685a
26 changed files with 1524 additions and 540 deletions

View File

@ -20,6 +20,7 @@ package org.apache.doris.common.jni;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ColumnValue;
import org.apache.doris.common.jni.vec.NativeColumnValue;
import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.jni.vec.TableSchema;
import org.apache.doris.common.jni.vec.VectorTable;
@ -45,7 +46,9 @@ public abstract class JniScanner {
protected abstract int getNext() throws IOException;
// parse table schema
protected abstract TableSchema parseTableSchema() throws UnsupportedOperationException;
protected TableSchema parseTableSchema() throws UnsupportedOperationException {
throw new UnsupportedOperationException();
}
protected void initTableInfo(ColumnType[] requiredTypes, String[] requiredFields, ScanPredicate[] predicates,
int batchSize) {
@ -55,6 +58,10 @@ public abstract class JniScanner {
this.batchSize = batchSize;
}
protected void appendNativeData(int index, NativeColumnValue value) {
vectorTable.appendNativeData(index, value);
}
protected void appendData(int index, ColumnValue value) {
vectorTable.appendData(index, value);
}

View File

@ -21,7 +21,6 @@ package org.apache.doris.common.jni;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ColumnValue;
import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.jni.vec.TableSchema;
import org.apache.log4j.Logger;
@ -111,6 +110,11 @@ public class MockJniScanner extends JniScanner {
return "row-" + i + "-column-" + j;
}
@Override
public byte[] getStringAsBytes() {
throw new UnsupportedOperationException();
}
@Override
public LocalDate getDate() {
return LocalDate.now();
@ -196,9 +200,4 @@ public class MockJniScanner extends JniScanner {
readRows += rows;
return rows;
}
@Override
protected TableSchema parseTableSchema() throws UnsupportedOperationException {
return null;
}
}

View File

@ -55,6 +55,8 @@ public interface ColumnValue {
String getString();
byte[] getStringAsBytes();
LocalDate getDate();
LocalDateTime getDateTime();

View File

@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.jni.vec;
/**
* Native types of data that can be directly copied.
*/
public interface NativeColumnValue {
public static class NativeValue {
public final Object baseObject;
public final long offset;
public final int length;
public NativeValue(Object baseObject, long offset) {
this.baseObject = baseObject;
this.offset = offset;
this.length = -1;
}
public NativeValue(Object baseObject, long offset, int length) {
this.baseObject = baseObject;
this.offset = offset;
this.length = length;
}
}
boolean isNull();
/**
* Return null if the type can't be copied directly
*/
NativeValue getNativeValue(ColumnType.Type type);
}

View File

@ -183,6 +183,11 @@ public class ScanPredicate {
return toString();
}
@Override
public byte[] getStringAsBytes() {
throw new UnsupportedOperationException();
}
@Override
public LocalDate getDate() {
return LocalDate.now();

View File

@ -21,6 +21,7 @@ package org.apache.doris.common.jni.vec;
import org.apache.doris.common.jni.utils.OffHeap;
import org.apache.doris.common.jni.utils.TypeNativeBytes;
import org.apache.doris.common.jni.vec.ColumnType.Type;
import org.apache.doris.common.jni.vec.NativeColumnValue.NativeValue;
import java.math.BigDecimal;
import java.math.BigInteger;
@ -550,6 +551,38 @@ public class VectorColumn {
}
}
public void appendNativeValue(NativeColumnValue o) {
ColumnType.Type typeValue = columnType.getType();
if (o == null || o.isNull()) {
appendNull(typeValue);
return;
}
NativeValue nativeValue = o.getNativeValue(typeValue);
if (nativeValue == null) {
// can't get native value, fall back to materialized value
appendValue((ColumnValue) o);
return;
}
if (nativeValue.length == -1) {
// java origin types
long typeSize = typeValue.size;
reserve(appendIndex + 1);
OffHeap.copyMemory(nativeValue.baseObject, nativeValue.offset,
null, data + typeSize * appendIndex, typeSize);
appendIndex++;
} else {
int byteLength = nativeValue.length;
VectorColumn bytesColumn = childColumns[0];
int startOffset = bytesColumn.appendIndex;
bytesColumn.reserve(startOffset + byteLength);
OffHeap.copyMemory(nativeValue.baseObject, nativeValue.offset,
null, bytesColumn.data + startOffset, byteLength);
bytesColumn.appendIndex += byteLength;
OffHeap.putInt(null, offsets + 4L * appendIndex, startOffset + byteLength);
appendIndex++;
}
}
public void appendValue(ColumnValue o) {
ColumnType.Type typeValue = columnType.getType();
if (o == null || o.isNull()) {
@ -598,7 +631,7 @@ public class VectorColumn {
case VARCHAR:
case STRING:
if (o.canGetStringAsBytes()) {
appendBytesAndOffset(o.getBytes());
appendBytesAndOffset(o.getStringAsBytes());
} else {
appendStringAndOffset(o.getString());
}

View File

@ -68,6 +68,11 @@ public class VectorTable {
this.isRestoreTable = true;
}
public void appendNativeData(int fieldId, NativeColumnValue o) {
assert (!isRestoreTable);
columns[fieldId].appendNativeValue(o);
}
public void appendData(int fieldId, ColumnValue o) {
assert (!isRestoreTable);
columns[fieldId].appendValue(o);