[fix](dbt) dbt incremental append (#20513)

This commit is contained in:
catpineapple
2023-06-09 01:41:33 +08:00
committed by GitHub
parent 234be0c517
commit e1184bf4dc
6 changed files with 76 additions and 43 deletions

View File

@ -22,4 +22,4 @@
# this 'version' must be set !!!
# otherwise the adapters will not be found after the 'dbt init xxx' command
version = "1.3.0"
version = "0.2.1"

View File

@ -19,7 +19,7 @@
# under the License.
name: dbt_doris
version: 1.3.0
version: 0.2.1
config-version: 2
macro-paths: ["macros"]

View File

@ -81,7 +81,7 @@
{%- endmacro %}
{% macro doris__properties() -%}
{% set properties = config.get('properties', validator=validation.any[dict]) or {"replication_num":"1"} %}
{% set properties = config.get('properties', validator=validation.any[dict]) %}
{% if properties is not none %}
PROPERTIES (
{% for key, value in properties.items() %}

View File

@ -56,7 +56,12 @@
show create table {{ target_relation }}
{%- endmacro %}
{% macro is_unique_model( table_create_obj ) %}
{% macro is_unique_model( target_relation ) %}
{% set build_show_create = show_create( target_relation, statement_name='table_model') %}
{% call statement('table_model' , fetch_result=True) %}
{{ build_show_create }}
{% endcall %}
{%- set table_create_obj = load_result('table_model') -%}
{% set create_table = table_create_obj['data'][0][1]%}
{{ return('\nUNIQUE KEY(' in create_table and '\nDUPLICATE KEY(' not in create_table and '\nAGGREGATE KEY(' not in create_table) }}
{%- endmacro %}

View File

@ -17,56 +17,71 @@
{% materialization incremental, adapter='doris' %}
{% set unique_key = config.get('unique_key', validator=validation.any[list]) %}
{%- set inserts_only = config.get('inserts_only') -%}
{% set strategy = dbt_doris_validate_get_incremental_strategy(config) %}
{% set full_refresh_mode = (should_full_refresh()) %}
{% set target_relation = this.incorporate(type='table') %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}
{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{% set to_drop = [] %}
{#-- append or no unique key --#}
{% if unique_key is none or inserts_only %}
{% set build_sql = tmp_insert(tmp_relation, target_relation, unique_key=none) %}
{% elif existing_relation is none %}
{% set build_sql = doris__create_unique_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or should_full_refresh() %}
{#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #}
{% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %}
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
{% do adapter.drop_relation(backup_relation) %}
{% do adapter.rename_relation(target_relation, backup_relation) %}
{% set build_sql = doris__create_unique_table_as(False, target_relation, sql) %}
{% do to_drop.append(backup_relation) %}
{% if unique_key is none or strategy == 'append' %}
{#-- create table first --#}
{% if existing_relation is none %}
{% set build_sql = doris__create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or full_refresh_mode %}
{#-- backup data before drop old table #}
{% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %}
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
{% do adapter.drop_relation(backup_relation) %} {#-- likes 'drop table if exists ... ' --#}
{% do adapter.rename_relation(target_relation, backup_relation) %}
{% set build_sql = doris__create_table_as(False, target_relation, sql) %}
{% do to_drop.append(backup_relation) %}
{#-- append data --#}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% set build_sql = tmp_insert(tmp_relation, target_relation, unique_key=none) %}
{% endif %}
{#-- insert overwrite --#}
{% elif strategy == 'insert_overwrite' %}
{#-- create table first --#}
{% if existing_relation is none %}
{% set build_sql = doris__create_unique_table_as(False, target_relation, sql) %}
{#-- insert data refresh --#}
{% elif existing_relation.is_view or full_refresh_mode %}
{#-- backup data before drop old table #}
{% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %}
{% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %}
{% do adapter.drop_relation(backup_relation) %} {#-- likes 'drop table if exists ... ' --#}
{% do adapter.rename_relation(target_relation, backup_relation) %}
{% set build_sql = doris__create_unique_table_as(False, target_relation, sql) %}
{% do to_drop.append(backup_relation) %}
{#-- append data --#}
{% else %}
{#-- check doris unique table --#}
{% if not is_unique_model(target_relation) %}
{% do exceptions.raise_compiler_error("doris table:"~ target_relation ~ ", model must be 'UNIQUE'" ) %}
{% endif %}
{#-- create temp duplicate table for this incremental task --#}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do to_drop.append(tmp_relation) %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{% set build_sql = tmp_insert(tmp_relation, target_relation, unique_key=unique_key) %}
{% endif %}
{% else %}
{% set build_show_create = show_create( target_relation, statement_name="table_model") %}
{% call statement('table_model' , fetch_result=True) %}
{{ build_show_create }}
{% endcall %}
{%- set table_create_obj = load_result('table_model') -%}
{% if not is_unique_model(table_create_obj) %}
{% do exceptions.raise_compiler_error("doris table:"~ target_relation ~ ", model must be 'UNIQUE'" ) %}
{% endif %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do to_drop.append(tmp_relation) %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
{% set build_sql = tmp_insert(tmp_relation, target_relation, unique_key=unique_key) %}
{#-- never --#}
{% endif %}
{% call statement("main") %}
{{ build_sql }}
{% endcall %}
{% do persist_docs(target_relation, model) %}
{#-- {% do persist_docs(target_relation, model) %} #}
{{ run_hooks(post_hooks, inside_transaction=True) }}
{% do adapter.commit() %}
{% for rel in to_drop %}
@ -74,5 +89,17 @@
{% endfor %}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ return({'relations': [target_relation]}) }}
{%- endmaterialization %}
{% macro dbt_doris_validate_get_incremental_strategy(config) %}
{#-- Find and validate the incremental strategy #}
{%- set strategy = config.get('incremental_strategy') or 'insert_overwrite' -%}
{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ strategy }}
Expected one of: 'append', 'insert_overwrite'
{%- endset %}
{% if strategy not in ['append', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{% endif %}
{% do return (strategy) %}
{% endmacro %}

View File

@ -22,7 +22,7 @@ from setuptools import find_namespace_packages, setup
package_name = "dbt-doris"
# make sure this always matches dbt/adapters/{adapter}/__version__.py
package_version = "1.3.0"
package_version = "0.2.1"
dbt_core_version = "1.3.0"
description = """The doris adapter plugin for dbt """
@ -39,6 +39,7 @@ setup(
install_requires=[
"dbt-core~={}".format(dbt_core_version),
"mysql-connector-python>=8.0.0,<8.1",
"urllib3~=1.0",
],
python_requires=">=3.8,<=3.10",
python_requires=">=3.7.2",
)