ddl defult use compression algorithm from table schema in temp file store
This commit is contained in:
parent
38973ea874
commit
815b69bd02
@ -257,7 +257,7 @@ public:
|
||||
}
|
||||
// compress type
|
||||
else if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(
|
||||
table_schema->get_compressor_type(), task_param.get_parallel(), compressor_type))) {
|
||||
table_schema, task_param.get_parallel(), compressor_type))) {
|
||||
LOG_WARN("fail to get tmp store compressor type", KR(ret));
|
||||
}
|
||||
// opt stat gather
|
||||
|
@ -2564,12 +2564,29 @@ bool ObDDLUtil::reach_time_interval(const int64_t i, volatile int64_t &last_time
|
||||
return bret;
|
||||
}
|
||||
|
||||
int ObDDLUtil::get_temp_store_compress_type(const share::schema::ObTableSchema *table_schema,
|
||||
const int64_t parallel,
|
||||
ObCompressorType &compr_type)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(table_schema)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), KP(table_schema));
|
||||
} else {
|
||||
ObCompressorType schema_compr_type = table_schema->get_compressor_type();
|
||||
if (NONE_COMPRESSOR == schema_compr_type && table_schema->get_row_store_type() != FLAT_ROW_STORE) { // encoding without compress
|
||||
schema_compr_type = ZSTD_COMPRESSOR;
|
||||
}
|
||||
ret = get_temp_store_compress_type(schema_compr_type, parallel, compr_type);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLUtil::get_temp_store_compress_type(const ObCompressorType schema_compr_type,
|
||||
const int64_t parallel,
|
||||
ObCompressorType &compr_type)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t COMPRESS_PARALLELISM_THRESHOLD = 8;
|
||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
|
||||
compr_type = NONE_COMPRESSOR;
|
||||
if (OB_UNLIKELY(!tenant_config.is_valid())) {
|
||||
@ -2583,7 +2600,8 @@ int ObDDLUtil::get_temp_store_compress_type(const ObCompressorType schema_compr_
|
||||
} else if (0 == tenant_config->_ob_ddl_temp_file_compress_func.get_value_string().case_compare("LZ4")) {
|
||||
compr_type = LZ4_COMPRESSOR;
|
||||
} else if (0 == tenant_config->_ob_ddl_temp_file_compress_func.get_value_string().case_compare("AUTO")) {
|
||||
if (parallel >= COMPRESS_PARALLELISM_THRESHOLD) {
|
||||
UNUSED(parallel);
|
||||
if (schema_compr_type > INVALID_COMPRESSOR && schema_compr_type < MAX_COMPRESSOR) {
|
||||
compr_type = schema_compr_type;
|
||||
} else {
|
||||
compr_type = NONE_COMPRESSOR;
|
||||
@ -2593,7 +2611,7 @@ int ObDDLUtil::get_temp_store_compress_type(const ObCompressorType schema_compr_
|
||||
LOG_WARN("the temp store format config is unexpected", K(ret), K(tenant_config->_ob_ddl_temp_file_compress_func.get_value_string()));
|
||||
}
|
||||
}
|
||||
LOG_INFO("get compressor type", K(ret), K(compr_type));
|
||||
LOG_INFO("get compressor type", K(ret), K(compr_type), K(schema_compr_type));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -788,6 +788,9 @@ public:
|
||||
static int get_temp_store_compress_type(const ObCompressorType schema_compr_type,
|
||||
const int64_t parallel,
|
||||
ObCompressorType &compr_type);
|
||||
static int get_temp_store_compress_type(const share::schema::ObTableSchema *table_schema,
|
||||
const int64_t parallel,
|
||||
ObCompressorType &compr_type);
|
||||
static inline bool is_verifying_checksum_error_needed(share::ObDDLType type)
|
||||
{
|
||||
bool res = false;
|
||||
|
@ -1912,7 +1912,7 @@ DEF_BOOL(_enable_skip_index, OB_TENANT_PARAMETER, "True",
|
||||
DEF_STR_WITH_CHECKER(_ob_ddl_temp_file_compress_func, OB_TENANT_PARAMETER, "AUTO",
|
||||
common::ObConfigTempStoreFormatChecker,
|
||||
"specific compression in ObTempBlockStore."\
|
||||
"AUTO: use dop to determine compression;"\
|
||||
"AUTO: use compression algorithm from table schema;"\
|
||||
"ZSTD: use ZSTD compression algorithm;"\
|
||||
"LZ4: use LZ4 compression algorithm;"\
|
||||
"NONE: do not use compression.",
|
||||
|
@ -2307,15 +2307,15 @@ int ObStaticEngineCG::generate_spec(ObLogSort &op, ObSortSpec &spec, const bool
|
||||
int ObStaticEngineCG::fill_compress_type(ObLogSort &op, ObCompressorType &compr_type)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
compr_type = NONE_COMPRESSOR;
|
||||
int64_t tenant_id = op.get_plan()->get_optimizer_context().get_session_info()->get_effective_tenant_id();
|
||||
// for normal sort we use default compress type. for online ddl, we use the compress type in source table
|
||||
ObLogicalOperator *child_op = op.get_child(0);
|
||||
ObCompressorType tmp_compr_type = NONE_COMPRESSOR;
|
||||
const share::schema::ObTableSchema *table_schema = nullptr;
|
||||
while(OB_SUCC(ret) && OB_NOT_NULL(child_op) && child_op->get_type() != log_op_def::LOG_TABLE_SCAN ) {
|
||||
child_op = child_op->get_child(0);
|
||||
if (OB_NOT_NULL(child_op) && child_op->get_type() == log_op_def::LOG_TABLE_SCAN ) {
|
||||
share::schema::ObSchemaGetterGuard *schema_guard = nullptr;
|
||||
const share::schema::ObTableSchema *table_schema = nullptr;
|
||||
uint64_t table_id = static_cast<ObLogTableScan*>(child_op)->get_ref_table_id();
|
||||
if (OB_ISNULL(schema_guard = opt_ctx_->get_schema_guard())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -2325,13 +2325,11 @@ int ObStaticEngineCG::fill_compress_type(ObLogSort &op, ObCompressorType &compr_
|
||||
} else if (OB_ISNULL(table_schema)) {
|
||||
ret = OB_TABLE_NOT_EXIST;
|
||||
LOG_WARN("can't find table schema", K(ret), K(table_id));
|
||||
} else {
|
||||
tmp_compr_type = table_schema->get_compressor_type();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(tmp_compr_type,
|
||||
if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(table_schema,
|
||||
op.get_parallel(),
|
||||
compr_type))) {
|
||||
LOG_WARN("fail to get compress type", K(ret));
|
||||
|
@ -2480,7 +2480,7 @@ int ObLoadDataDirectImpl::init_execute_param()
|
||||
// compressor_type_
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(
|
||||
table_schema->get_compressor_type(), execute_param_.parallel_, execute_param_.compressor_type_))) {
|
||||
table_schema, execute_param_.parallel_, execute_param_.compressor_type_))) {
|
||||
LOG_WARN("fail to get tmp store compressor type", KR(ret));
|
||||
}
|
||||
}
|
||||
|
@ -98,7 +98,7 @@ int ObTableDirectInsertCtx::init(
|
||||
LOG_WARN("failed to get direct load level", KR(ret), K(phy_plan), K(table_id));
|
||||
} else if (OB_FAIL(ObTableLoadSchema::get_table_schema(*schema_guard, tenant_id, table_id, table_schema))) {
|
||||
LOG_WARN("fail to get table schema", KR(ret));
|
||||
} else if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(table_schema->get_compressor_type(),
|
||||
} else if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(table_schema,
|
||||
parallel,
|
||||
compressor_type))) {
|
||||
LOG_WARN("fail to get tmp store compressor type", KR(ret));
|
||||
|
Loading…
x
Reference in New Issue
Block a user