[Feature](group commit) Support table property "group commit data bytes" (#29484)
This commit is contained in:
@ -500,6 +500,8 @@ public class Alter {
|
||||
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)
|
||||
|| properties
|
||||
.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)
|
||||
|| properties
|
||||
.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES)
|
||||
|| properties
|
||||
.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION)
|
||||
|| properties
|
||||
|
||||
@ -2213,6 +2213,7 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION)
|
||||
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION)
|
||||
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)
|
||||
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES)
|
||||
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)) {
|
||||
LOG.info("Properties already up-to-date");
|
||||
return;
|
||||
|
||||
@ -267,6 +267,21 @@ public class ModifyTablePropertiesClause extends AlterTableClause {
|
||||
}
|
||||
this.needTableStable = false;
|
||||
this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
|
||||
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES)) {
|
||||
long groupCommitDataBytes;
|
||||
String groupCommitDataBytesStr = properties.get(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES);
|
||||
try {
|
||||
groupCommitDataBytes = Long.parseLong(groupCommitDataBytesStr);
|
||||
if (groupCommitDataBytes < 0) {
|
||||
throw new AnalysisException("group_commit_data_bytes can not be less than 0:"
|
||||
+ groupCommitDataBytesStr);
|
||||
}
|
||||
} catch (NumberFormatException e) {
|
||||
throw new AnalysisException("Invalid group_commit_data_bytes format: "
|
||||
+ groupCommitDataBytesStr);
|
||||
}
|
||||
this.needTableStable = false;
|
||||
this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
|
||||
} else {
|
||||
throw new AnalysisException("Unknown table property: " + properties.keySet());
|
||||
}
|
||||
|
||||
@ -3411,6 +3411,10 @@ public class Env {
|
||||
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS).append("\" = \"");
|
||||
sb.append(olapTable.getGroupCommitIntervalMs()).append("\"");
|
||||
|
||||
// group commit data bytes
|
||||
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES).append("\" = \"");
|
||||
sb.append(olapTable.getGroupCommitDataBytes()).append("\"");
|
||||
|
||||
// enable duplicate without keys by default
|
||||
if (olapTable.isDuplicateWithoutKey()) {
|
||||
sb.append(",\n\"")
|
||||
|
||||
@ -1114,6 +1114,14 @@ public class OlapTable extends Table {
|
||||
return getOrCreatTableProperty().getGroupCommitIntervalMs();
|
||||
}
|
||||
|
||||
public void setGroupCommitDataBytes(int groupCommitInterValMs) {
|
||||
getOrCreatTableProperty().setGroupCommitDataBytes(groupCommitInterValMs);
|
||||
}
|
||||
|
||||
public int getGroupCommitDataBytes() {
|
||||
return getOrCreatTableProperty().getGroupCommitDataBytes();
|
||||
}
|
||||
|
||||
public Boolean hasSequenceCol() {
|
||||
return getSequenceCol() != null;
|
||||
}
|
||||
|
||||
@ -523,6 +523,16 @@ public class TableProperty implements Writable {
|
||||
Integer.toString(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS_DEFAULT_VALUE)));
|
||||
}
|
||||
|
||||
public void setGroupCommitDataBytes(int groupCommitDataBytes) {
|
||||
properties.put(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES, Integer.toString(groupCommitDataBytes));
|
||||
}
|
||||
|
||||
public int getGroupCommitDataBytes() {
|
||||
return Integer.parseInt(properties.getOrDefault(
|
||||
PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES,
|
||||
Integer.toString(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES_DEFAULT_VALUE)));
|
||||
}
|
||||
|
||||
public void buildReplicaAllocation() {
|
||||
try {
|
||||
// Must copy the properties because "analyzeReplicaAllocation" will remove the property
|
||||
|
||||
@ -180,6 +180,10 @@ public class PropertyAnalyzer {
|
||||
public static final int PROPERTIES_GROUP_COMMIT_INTERVAL_MS_DEFAULT_VALUE
|
||||
= Config.group_commit_interval_ms_default_value;
|
||||
|
||||
public static final String PROPERTIES_GROUP_COMMIT_DATA_BYTES = "group_commit_data_bytes";
|
||||
public static final int PROPERTIES_GROUP_COMMIT_DATA_BYTES_DEFAULT_VALUE
|
||||
= Config.group_commit_data_bytes_default_value;
|
||||
|
||||
// compaction policy
|
||||
public static final String SIZE_BASED_COMPACTION_POLICY = "size_based";
|
||||
public static final String TIME_SERIES_COMPACTION_POLICY = "time_series";
|
||||
@ -1213,6 +1217,22 @@ public class PropertyAnalyzer {
|
||||
return groupCommitIntervalMs;
|
||||
}
|
||||
|
||||
public static int analyzeGroupCommitDateBytes(Map<String, String> properties) throws AnalysisException {
|
||||
int groupCommitDataBytes = PROPERTIES_GROUP_COMMIT_DATA_BYTES_DEFAULT_VALUE;
|
||||
if (properties != null && properties.containsKey(PROPERTIES_GROUP_COMMIT_DATA_BYTES)) {
|
||||
String groupIntervalCommitDataBytesStr = properties.get(PROPERTIES_GROUP_COMMIT_DATA_BYTES);
|
||||
try {
|
||||
groupCommitDataBytes = Integer.parseInt(groupIntervalCommitDataBytesStr);
|
||||
} catch (Exception e) {
|
||||
throw new AnalysisException("parse group_commit_interval_ms format error");
|
||||
}
|
||||
|
||||
properties.remove(PROPERTIES_GROUP_COMMIT_DATA_BYTES);
|
||||
}
|
||||
|
||||
return groupCommitDataBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check the type property of the catalog props.
|
||||
*/
|
||||
|
||||
@ -2428,7 +2428,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
}
|
||||
|
||||
// analyse group commit interval ms
|
||||
int groupCommitIntervalMs = 0;
|
||||
int groupCommitIntervalMs;
|
||||
try {
|
||||
groupCommitIntervalMs = PropertyAnalyzer.analyzeGroupCommitIntervalMs(properties);
|
||||
olapTable.setGroupCommitIntervalMs(groupCommitIntervalMs);
|
||||
@ -2436,6 +2436,14 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
throw new DdlException(e.getMessage());
|
||||
}
|
||||
|
||||
int groupCommitDataBytes;
|
||||
try {
|
||||
groupCommitDataBytes = PropertyAnalyzer.analyzeGroupCommitDateBytes(properties);
|
||||
olapTable.setGroupCommitDataBytes(groupCommitDataBytes);
|
||||
} catch (Exception e) {
|
||||
throw new DdlException(e.getMessage());
|
||||
}
|
||||
|
||||
olapTable.initSchemaColumnUniqueId();
|
||||
olapTable.initAutoIncrentGenerator(db.getId());
|
||||
olapTable.rebuildFullSchema();
|
||||
|
||||
@ -1983,8 +1983,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
result.setTableId(parsedStmt.getTargetTable().getId());
|
||||
result.setBaseSchemaVersion(((OlapTable) parsedStmt.getTargetTable()).getBaseSchemaVersion());
|
||||
result.setGroupCommitIntervalMs(((OlapTable) parsedStmt.getTargetTable()).getGroupCommitIntervalMs());
|
||||
// TODO get from table property
|
||||
result.setGroupCommitDataBytes(134217728L);
|
||||
result.setGroupCommitDataBytes(((OlapTable) parsedStmt.getTargetTable()).getGroupCommitDataBytes());
|
||||
result.setWaitInternalGroupCommitFinish(Config.wait_internal_group_commit_finish);
|
||||
} catch (UserException e) {
|
||||
LOG.warn("exec sql error", e);
|
||||
|
||||
Reference in New Issue
Block a user