[feature](jni) support complex types in jni framework (#24810)

Support complex types in jni framework, and successfully run end-to-end on hudi.
### How to Use
Other scanners only need to implement three interfaces in `ColumnValue`:
```
// Get array elements and append into values
void unpackArray(List<ColumnValue> values);

// Get map key array&value array, and append into keys&values
void unpackMap(List<ColumnValue> keys, List<ColumnValue> values);

// Get the struct fields specified by `structFieldIndex`, and append into values
void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values);
```
Developers can take `HudiColumnValue` as an example.
This commit is contained in:
Ashin Gau
2023-09-27 14:47:41 +08:00
committed by GitHub
parent a1ab8f96a1
commit 26818de9c8
10 changed files with 575 additions and 168 deletions

View File

@ -42,6 +42,10 @@ public class MockJniScanner extends JniScanner {
private int i;
private int j;
public MockColumnValue(int i, int j) {
set(i, j);
}
public MockColumnValue() {
}
@ -132,17 +136,40 @@ public class MockJniScanner extends JniScanner {
@Override
public void unpackArray(List<ColumnValue> values) {
for (int m = 1; m < i; ++m) {
if (m % 3 == 0) {
values.add(null);
} else {
values.add(new MockColumnValue(i, j + m));
}
}
}
@Override
public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
for (int m = 0; m < i; ++m) {
values.add(new MockColumnValue(i + m, j));
}
for (int m = 0; m < i; ++m) {
if (m % 3 == 0) {
values.add(null);
} else {
values.add(new MockColumnValue(i, j + m));
}
}
}
@Override
public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {
structFieldIndex.clear();
structFieldIndex.add(0);
structFieldIndex.add(1);
if ((i + j) % 4 == 0) {
values.add(null);
} else {
values.add(new MockColumnValue(i, j));
}
values.add(new MockColumnValue(i, j + 3));
}
}

View File

@ -28,6 +28,8 @@ import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
@ -46,6 +48,9 @@ public class VectorColumn {
// For String / Array / Map.
private long offsets;
// String's offset is int32, while Array&Map's offset is int64
// todo: how to solve the overflow of offsets when taking Array&Map's offset as int32
private boolean isComplexType = false;
// Number of elements in vector column
private int capacity;
// Upper limit for the maximum capacity for this column.
@ -59,6 +64,10 @@ public class VectorColumn {
// For nested column type: String / Array/ Map / Struct
private VectorColumn[] childColumns;
// For struct, only support to read all fields in struct now
// todo: support pruned struct fields
private List<Integer> structFieldIndex;
public VectorColumn(ColumnType columnType, int capacity) {
this.columnType = columnType;
this.capacity = 0;
@ -68,11 +77,18 @@ public class VectorColumn {
this.numNulls = 0;
this.appendIndex = 0;
if (columnType.isComplexType()) {
isComplexType = true;
List<ColumnType> children = columnType.getChildTypes();
childColumns = new VectorColumn[children.size()];
for (int i = 0; i < children.size(); ++i) {
childColumns[i] = new VectorColumn(children.get(i), capacity);
}
if (columnType.isStruct()) {
structFieldIndex = new ArrayList<>();
for (int i = 0; i < children.size(); ++i) {
structFieldIndex.add(i);
}
}
} else if (columnType.isStringType()) {
childColumns = new VectorColumn[1];
childColumns[0] = new VectorColumn(new ColumnType("#stringBytes", Type.BYTE),
@ -91,6 +107,13 @@ public class VectorColumn {
this.offsets = 0;
this.numNulls = 0;
this.appendIndex = capacity;
if (columnType.isStruct()) {
List<ColumnType> children = columnType.getChildTypes();
structFieldIndex = new ArrayList<>();
for (int i = 0; i < children.size(); ++i) {
structFieldIndex.add(i);
}
}
}
// restore block column
@ -114,8 +137,20 @@ public class VectorColumn {
this.appendIndex = numRows;
if (columnType.isComplexType()) {
// todo: support complex type
throw new RuntimeException("Unhandled type: " + columnType);
isComplexType = true;
int childRows = numRows;
if (!columnType.isStruct()) {
this.offsets = OffHeap.getLong(null, address);
address += 8;
childRows = getArrayEndOffset(numRows - 1);
}
this.data = 0;
List<ColumnType> children = columnType.getChildTypes();
childColumns = new VectorColumn[children.size()];
for (int i = 0; i < children.size(); ++i) {
childColumns[i] = new VectorColumn(children.get(i), childRows, address);
address += children.get(i).metaSize() * 8L;
}
} else if (columnType.isStringType()) {
this.offsets = OffHeap.getLong(null, address);
address += 8;
@ -130,6 +165,19 @@ public class VectorColumn {
}
}
private int getArrayEndOffset(int rowId) {
if (rowId >= 0 && rowId < appendIndex) {
if (isComplexType) {
// maybe overflowed
return (int) OffHeap.getLong(null, offsets + 8L * rowId);
} else {
return OffHeap.getInt(null, offsets + 4L * rowId);
}
} else {
return 0;
}
}
public long nullMapAddress() {
return nullMap;
}
@ -200,21 +248,21 @@ public class VectorColumn {
}
private void reserveCapacity(int newCapacity) {
long offsetLength = isComplexType ? 8L : 4L;
long oldCapacity = capacity;
long oldOffsetSize = capacity * 4L;
long newOffsetSize = newCapacity * 4L;
long oldOffsetSize = capacity * offsetLength;
long newOffsetSize = newCapacity * offsetLength;
long typeSize = columnType.getTypeSize();
if (columnType.isUnsupported()) {
// do nothing
return;
} else if (typeSize != -1) {
this.data = OffHeap.reallocateMemory(data, oldCapacity * typeSize, newCapacity * typeSize);
} else if (columnType.isStringType()) {
} else if (columnType.isStringType() || columnType.isArray() || columnType.isMap()) {
this.offsets = OffHeap.reallocateMemory(offsets, oldOffsetSize, newOffsetSize);
} else {
} else if (!columnType.isStruct()) {
throw new RuntimeException("Unhandled type: " + columnType);
}
// todo: support complex type
if (!"#stringBytes".equals(columnType.getName())) {
this.nullMap = OffHeap.reallocateMemory(nullMap, oldCapacity, newCapacity);
OffHeap.setMemory(nullMap + oldCapacity, (byte) 0, newCapacity - oldCapacity);
@ -292,6 +340,12 @@ public class VectorColumn {
case STRING:
case BINARY:
return appendBytesAndOffset(new byte[0]);
case ARRAY:
return appendArray(Collections.emptyList());
case MAP:
return appendMap(Collections.emptyList(), Collections.emptyList());
case STRUCT:
return appendStruct(structFieldIndex, null);
default:
throw new RuntimeException("Unknown type value: " + typeValue);
}
@ -530,6 +584,45 @@ public class VectorColumn {
return new String(bytes, StandardCharsets.UTF_8);
}
public int appendArray(List<ColumnValue> values) {
int length = values.size();
int startOffset = childColumns[0].appendIndex;
for (ColumnValue v : values) {
childColumns[0].appendValue(v);
}
reserve(appendIndex + 1);
OffHeap.putLong(null, offsets + 8L * appendIndex, startOffset + length);
return appendIndex++;
}
public int appendMap(List<ColumnValue> keys, List<ColumnValue> values) {
int length = keys.size();
int startOffset = childColumns[0].appendIndex;
for (ColumnValue k : keys) {
childColumns[0].appendValue(k);
}
for (ColumnValue v : values) {
childColumns[1].appendValue(v);
}
reserve(appendIndex + 1);
OffHeap.putInt(null, offsets + 8L * appendIndex, startOffset + length);
return appendIndex++;
}
public int appendStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {
if (values == null) {
for (int i : structFieldIndex) {
childColumns[i].appendValue(null);
}
} else {
for (int i = 0; i < structFieldIndex.size(); ++i) {
childColumns[structFieldIndex.get(i)].appendValue(values.get(i));
}
}
reserve(appendIndex + 1);
return appendIndex++;
}
public void updateMeta(VectorColumn meta) {
if (columnType.isUnsupported()) {
meta.appendLong(0);
@ -558,19 +651,7 @@ public class VectorColumn {
return;
}
NativeValue nativeValue = o.getNativeValue(typeValue);
if (nativeValue == null) {
// can't get native value, fall back to materialized value
appendValue((ColumnValue) o);
return;
}
if (nativeValue.length == -1) {
// java origin types
long typeSize = typeValue.size;
reserve(appendIndex + 1);
OffHeap.copyMemory(nativeValue.baseObject, nativeValue.offset,
null, data + typeSize * appendIndex, typeSize);
appendIndex++;
} else {
if (nativeValue != null && columnType.isStringType()) {
int byteLength = nativeValue.length;
VectorColumn bytesColumn = childColumns[0];
int startOffset = bytesColumn.appendIndex;
@ -580,6 +661,9 @@ public class VectorColumn {
bytesColumn.appendIndex += byteLength;
OffHeap.putInt(null, offsets + 4L * appendIndex, startOffset + byteLength);
appendIndex++;
} else {
// can't get native value, fall back to materialized value
appendValue((ColumnValue) o);
}
}
@ -639,6 +723,25 @@ public class VectorColumn {
case BINARY:
appendBytesAndOffset(o.getBytes());
break;
case ARRAY: {
List<ColumnValue> values = new ArrayList<>();
o.unpackArray(values);
appendArray(values);
break;
}
case MAP: {
List<ColumnValue> keys = new ArrayList<>();
List<ColumnValue> values = new ArrayList<>();
o.unpackMap(keys, values);
appendMap(keys, values);
break;
}
case STRUCT: {
List<ColumnValue> values = new ArrayList<>();
o.unpackStruct(structFieldIndex, values);
appendStruct(structFieldIndex, values);
break;
}
default:
throw new RuntimeException("Unknown type value: " + typeValue);
}
@ -695,6 +798,67 @@ public class VectorColumn {
case BINARY:
sb.append(getStringWithOffset(i));
break;
case ARRAY: {
int begin = getArrayEndOffset(i - 1);
int end = getArrayEndOffset(i);
sb.append("[");
for (int rowId = begin; rowId < end; rowId++) {
if (rowId != begin) {
sb.append(",");
}
childColumns[0].dump(sb, rowId);
}
sb.append("]");
break;
}
case MAP: {
int begin = getArrayEndOffset(i - 1);
int end = getArrayEndOffset(i);
sb.append("{");
VectorColumn key = childColumns[0];
VectorColumn value = childColumns[1];
for (int rowId = begin; rowId < end; rowId++) {
if (rowId != begin) {
sb.append(",");
}
if (key.columnType.isStringType()) {
sb.append("\"");
}
key.dump(sb, rowId);
if (key.columnType.isStringType()) {
sb.append("\"");
}
sb.append(":");
if (value.columnType.isStringType()) {
sb.append("\"");
}
value.dump(sb, rowId);
if (value.columnType.isStringType()) {
sb.append("\"");
}
}
sb.append("}");
break;
}
case STRUCT: {
sb.append("{");
for (int fieldIndex = 0; fieldIndex < childColumns.length; ++fieldIndex) {
VectorColumn child = childColumns[fieldIndex];
if (fieldIndex != 0) {
sb.append(",");
}
sb.append("\"").append(child.columnType.getName()).append("\":");
if (child.columnType.isStringType()) {
sb.append("\"");
}
child.dump(sb, i);
if (child.columnType.isStringType()) {
sb.append("\"");
}
}
sb.append("}");
break;
}
default:
throw new RuntimeException("Unknown type value: " + typeValue);
}

View File

@ -58,7 +58,7 @@ public class VectorTable {
this.numRows = (int) OffHeap.getLong(null, address);
address += 8;
int metaSize = 1; // number of rows
int metaSize = 1; // stores the number of rows + other columns meta data
for (int i = 0; i < types.length; i++) {
columns[i] = new VectorColumn(types[i], numRows, address);
metaSize += types[i].metaSize();

View File

@ -35,9 +35,10 @@ public class JniScannerTest {
{
put("mock_rows", "128");
put("required_fields", "boolean,tinyint,smallint,int,bigint,largeint,float,double,"
+ "date,timestamp,char,varchar,string,decimalv2,decimal64");
+ "date,timestamp,char,varchar,string,decimalv2,decimal64,array,map,struct");
put("columns_types", "boolean#tinyint#smallint#int#bigint#largeint#float#double#"
+ "date#timestamp#char(10)#varchar(10)#string#decimalv2(12,4)#decimal64(10,3)");
+ "date#timestamp#char(10)#varchar(10)#string#decimalv2(12,4)#decimal64(10,3)#"
+ "array<array<string>>#map<string,array<int>>#struct<col1:timestamp(6),col2:array<char(10)>>");
}
});
scanner.open();
@ -50,7 +51,7 @@ public class JniScannerTest {
VectorTable restoreTable = new VectorTable(scanner.getTable().getColumnTypes(),
scanner.getTable().getFields(), metaAddress);
System.out.println(restoreTable.dump((int) rows));
System.out.println(restoreTable.dump((int) rows).substring(0, 128));
// Restored table is release by the origin table.
}
scanner.resetTable();