[pipelineX](runtime filter) Support runtime filter (#24054)

This commit is contained in:
Gabriel
2023-09-08 10:17:22 +08:00
committed by GitHub
parent ac6028a731
commit cdb1b341c7
46 changed files with 161 additions and 85 deletions

View File

@ -113,6 +113,7 @@ bool ScanLocalState<Derived>::should_run_serial() const {
template <typename Derived>
Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
auto& p = _parent->cast<typename Derived::Parent>();
set_scan_ranges(info.scan_ranges);
_common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
@ -140,7 +141,14 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info)
_open_timer = ADD_TIMER(_runtime_profile, "OpenTime");
_alloc_resource_timer = ADD_TIMER(_runtime_profile, "AllocateResourceTime");
return Status::OK();
}
template <typename Derived>
Status ScanLocalState<Derived>::open(RuntimeState* state) {
if (_opened) {
return Status::OK();
}
RETURN_IF_ERROR(_acquire_runtime_filter());
RETURN_IF_ERROR(_process_conjuncts());
@ -150,6 +158,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info)
RETURN_IF_ERROR(_scanner_ctx->init());
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
}
_opened = true;
return status;
}
@ -1239,17 +1248,21 @@ ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool* pool, const TPlanNode&
template <typename LocalStateType>
bool ScanOperatorX<LocalStateType>::can_read(RuntimeState* state) {
auto& local_state = state->get_local_state(id())->template cast<LocalStateType>();
if (local_state._eos || local_state._scanner_ctx->done()) {
// _eos: need eos
// _scanner_ctx->done(): need finish
// _scanner_ctx->no_schedule(): should schedule _scanner_ctx
if (!local_state._opened) {
return true;
} else {
if (local_state._scanner_ctx->get_num_running_scanners() == 0 &&
local_state._scanner_ctx->has_enough_space_in_blocks_queue()) {
local_state._scanner_ctx->reschedule_scanner_ctx();
if (local_state._eos || local_state._scanner_ctx->done()) {
// _eos: need eos
// _scanner_ctx->done(): need finish
// _scanner_ctx->no_schedule(): should schedule _scanner_ctx
return true;
} else {
if (local_state._scanner_ctx->get_num_running_scanners() == 0 &&
local_state._scanner_ctx->has_enough_space_in_blocks_queue()) {
local_state._scanner_ctx->reschedule_scanner_ctx();
}
return local_state.ready_to_read(); // there are some blocks to process
}
return local_state.ready_to_read(); // there are some blocks to process
}
}
@ -1321,6 +1334,14 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
return PipelineXLocalState<>::close(state);
}
template <typename LocalStateType>
bool ScanOperatorX<LocalStateType>::runtime_filters_are_ready_or_timeout(
RuntimeState* state) const {
return state->get_local_state(id())
->template cast<LocalStateType>()
.runtime_filters_are_ready_or_timeout();
}
template <typename LocalStateType>
Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {

View File

@ -60,7 +60,7 @@ public:
ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<>(state, parent),
vectorized::RuntimeFilterConsumer(parent->id(), parent->runtime_filter_descs(),
parent->row_descriptor(), parent->conjuncts()) {}
parent->row_descriptor(), _conjuncts) {}
virtual ~ScanLocalStateBase() = default;
virtual bool ready_to_read() = 0;
@ -128,6 +128,7 @@ class ScanLocalState : public ScanLocalStateBase {
virtual ~ScanLocalState() = default;
Status init(RuntimeState* state, LocalStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
bool ready_to_read() override;
@ -337,12 +338,14 @@ protected:
RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr;
doris::Mutex _block_lock;
std::atomic<bool> _opened = false;
};
template <typename LocalStateType>
class ScanOperatorX : public OperatorX<LocalStateType> {
public:
// bool runtime_filters_are_ready_or_timeout() override;
bool runtime_filters_are_ready_or_timeout(RuntimeState* state) const override;
Status try_close(RuntimeState* state) override;

View File

@ -211,7 +211,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
_runtime_state->set_tracer(std::move(tracer));
// TODO should be combine with plan_fragment_executor.prepare funciton
SCOPED_ATTACH_TASK(get_runtime_state());
SCOPED_ATTACH_TASK(_runtime_state.get());
_runtime_state->runtime_filter_mgr()->init();
_runtime_state->set_be_number(local_params.backend_num);

View File

@ -71,7 +71,9 @@ public:
TUniqueId get_fragment_instance_id() { return _fragment_instance_id; }
RuntimeState* get_runtime_state() { return _runtime_state.get(); }
virtual RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/) {
return _runtime_state.get();
}
// should be protected by lock?
[[nodiscard]] bool is_canceled() const { return _runtime_state->is_cancelled(); }
@ -112,6 +114,9 @@ public:
_merge_controller_handler = handler;
}
virtual void add_merge_controller_handler(
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {}
void send_report(bool);
virtual void report_profile();

View File

@ -63,6 +63,7 @@ public:
}
virtual Status init(RuntimeState* state, LocalStateInfo& info) = 0;
virtual Status open(RuntimeState* state) { return Status::OK(); }
virtual Status close(RuntimeState* state) = 0;
// If use projection, we should clear `_origin_block`.
@ -175,6 +176,13 @@ public:
return Status::OK();
}
bool runtime_filters_are_ready_or_timeout() override {
LOG(FATAL) << "should not reach here!";
return true;
}
virtual bool runtime_filters_are_ready_or_timeout(RuntimeState* state) const { return true; }
virtual Status close(RuntimeState* state) override;
virtual bool can_read(RuntimeState* state) { return true; }
@ -302,6 +310,7 @@ public:
"PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
return Status::OK();
}
virtual Status close(RuntimeState* state) override {
if (_closed) {
return Status::OK();

View File

@ -173,7 +173,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
_runtime_state->set_tracer(std::move(tracer));
SCOPED_ATTACH_TASK(get_runtime_state());
SCOPED_ATTACH_TASK(_runtime_state.get());
if (request.__isset.backend_id) {
_runtime_state->set_backend_id(request.backend_id);
}
@ -362,6 +362,13 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
}
}
}
{
std::lock_guard<std::mutex> l(_state_map_lock);
_instance_id_to_runtime_state.insert(
{UniqueId(_runtime_states[i]->fragment_instance_id()),
_runtime_states[i].get()});
}
}
_build_side_pipelines.clear();
_dag.clear();

View File

@ -75,6 +75,11 @@ public:
}
}
void add_merge_controller_handler(
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) override {
_merge_controller_handlers.emplace_back(handler);
}
// bool is_canceled() const { return _runtime_state->is_cancelled(); }
// Prepare global information including global states and the unique operator tree shared by all pipeline tasks.
@ -92,6 +97,15 @@ public:
void report_profile() override;
RuntimeState* get_runtime_state(UniqueId fragment_instance_id) override {
std::lock_guard<std::mutex> l(_state_map_lock);
if (_instance_id_to_runtime_state.count(fragment_instance_id) > 0) {
return _instance_id_to_runtime_state[fragment_instance_id];
} else {
return _runtime_state.get();
}
}
private:
void _close_action() override;
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override;
@ -121,6 +135,9 @@ private:
// Local runtime states for each pipeline task.
std::vector<std::unique_ptr<RuntimeState>> _runtime_states;
// It is used to manage the lifecycle of RuntimeFilterMergeController
std::vector<std::shared_ptr<RuntimeFilterMergeControllerEntity>> _merge_controller_handlers;
// TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
// of it in pipeline task not the fragment_context
DataSinkOperatorXPtr _sink;
@ -135,6 +152,10 @@ private:
// ProbeSide first, and use `_pipelines_to_build` to store which pipeline the build operator
// is in, so we can build BuildSide once we complete probe side.
std::map<int, PipelinePtr> _build_side_pipelines;
std::map<UniqueId, RuntimeState*> _instance_id_to_runtime_state;
std::mutex _state_map_lock;
};
} // namespace pipeline
} // namespace doris

View File

@ -98,21 +98,27 @@ Status PipelineXTask::_open() {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_open_timer);
Status st = Status::OK();
for (auto& o : _operators) {
Dependency* dep = _upstream_dependency.find(o->id()) == _upstream_dependency.end()
? (Dependency*)nullptr
: _upstream_dependency.find(o->id())->second.get();
LocalStateInfo info {_scan_ranges, dep, _recvr};
Status cur_st = o->setup_local_state(_state, info);
if (!cur_st.ok()) {
st = cur_st;
if (!_init_local_state) {
Status st = Status::OK();
for (auto& o : _operators) {
Dependency* dep = _upstream_dependency.find(o->id()) == _upstream_dependency.end()
? (Dependency*)nullptr
: _upstream_dependency.find(o->id())->second.get();
LocalStateInfo info {_scan_ranges, dep, _recvr};
Status cur_st = o->setup_local_state(_state, info);
if (!cur_st.ok()) {
st = cur_st;
}
}
LocalSinkStateInfo info {_sender_id, _downstream_dependency.get(), _sender};
RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
_dry_run = _sink->should_dry_run(_state);
RETURN_IF_ERROR(st);
_init_local_state = true;
}
for (auto& o : _operators) {
RETURN_IF_ERROR(_state->get_local_state(o->id())->open(_state));
}
LocalSinkStateInfo info {_sender_id, _downstream_dependency.get(), _sender};
RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
_dry_run = _sink->should_dry_run(_state);
RETURN_IF_ERROR(st);
_opened = true;
return Status::OK();
}

View File

@ -79,7 +79,7 @@ public:
}
bool runtime_filters_are_ready_or_timeout() override {
return _source->runtime_filters_are_ready_or_timeout();
return _source->runtime_filters_are_ready_or_timeout(_state);
}
bool sink_can_write() override { return _sink->can_write(_state); }
@ -131,5 +131,7 @@ private:
std::shared_ptr<BufferControlBlock> _sender;
std::shared_ptr<vectorized::VDataStreamRecvr> _recvr;
bool _dry_run = false;
bool _init_local_state = false;
};
} // namespace doris::pipeline

View File

@ -711,12 +711,11 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
}
g_fragmentmgr_prepare_latency << (duration_ns / 1000);
// std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
// _runtimefilter_controller.add_entity(params, local_params, &handler,
// context->get_runtime_state());
// context->set_merge_controller_handler(handler);
for (size_t i = 0; i < params.local_params.size(); i++) {
std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
_runtimefilter_controller.add_entity(params, params.local_params[i], &handler,
context->get_runtime_state(UniqueId()));
context->set_merge_controller_handler(handler);
const TUniqueId& fragment_instance_id = params.local_params[i].fragment_instance_id;
{
std::lock_guard<std::mutex> lock(_lock);
@ -792,7 +791,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
_runtimefilter_controller.add_entity(params, local_params, &handler,
context->get_runtime_state());
context->get_runtime_state(UniqueId()));
context->set_merge_controller_handler(handler);
{
@ -1195,7 +1194,8 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
pip_context = iter->second;
DCHECK(pip_context != nullptr);
runtime_filter_mgr = pip_context->get_runtime_state()->runtime_filter_mgr();
runtime_filter_mgr =
pip_context->get_runtime_state(fragment_instance_id)->runtime_filter_mgr();
} else {
std::unique_lock<std::mutex> lock(_lock);
auto iter = _fragment_map.find(tfragment_instance_id);
@ -1237,8 +1237,9 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
pip_context = iter->second;
DCHECK(pip_context != nullptr);
runtime_filter_mgr =
pip_context->get_runtime_state()->get_query_ctx()->runtime_filter_mgr();
runtime_filter_mgr = pip_context->get_runtime_state(fragment_instance_id)
->get_query_ctx()
->runtime_filter_mgr();
pool = &pip_context->get_query_context()->obj_pool;
} else {
std::unique_lock<std::mutex> lock(_lock);

View File

@ -65,6 +65,7 @@ Status RuntimeFilterMgr::init() {
Status RuntimeFilterMgr::get_producer_filter(const int filter_id, IRuntimeFilter** target) {
int32_t key = filter_id;
std::lock_guard<std::mutex> l(_lock);
auto iter = _producer_map.find(key);
if (iter == _producer_map.end()) {
LOG(WARNING) << "unknown runtime filter: " << key << ", role: PRODUCER";
@ -77,6 +78,7 @@ Status RuntimeFilterMgr::get_producer_filter(const int filter_id, IRuntimeFilter
Status RuntimeFilterMgr::get_consume_filter(const int filter_id, const int node_id,
IRuntimeFilter** consumer_filter) {
std::lock_guard<std::mutex> l(_lock);
auto iter = _consumer_map.find(filter_id);
if (iter == _consumer_map.cend()) {
LOG(WARNING) << "unknown runtime filter: " << filter_id << ", role: consumer";
@ -97,6 +99,7 @@ Status RuntimeFilterMgr::get_consume_filter(const int filter_id, const int node_
Status RuntimeFilterMgr::get_consume_filters(const int filter_id,
std::vector<IRuntimeFilter*>& consumer_filters) {
int32_t key = filter_id;
std::lock_guard<std::mutex> l(_lock);
auto iter = _consumer_map.find(key);
if (iter == _consumer_map.end()) {
LOG(WARNING) << "unknown runtime filter: " << key << ", role: consumer";
@ -114,29 +117,26 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
std::lock_guard<std::mutex> l(_lock);
auto iter = _consumer_map.find(key);
if (desc.__isset.opt_remote_rf && desc.opt_remote_rf && desc.has_remote_targets &&
desc.type == TRuntimeFilterType::BLOOM) {
// if this runtime filter has remote target (e.g. need merge), we reuse the runtime filter between all instances
DCHECK(_query_ctx != nullptr);
{
std::lock_guard<std::mutex> l(_lock);
iter = _consumer_map.find(key);
if (iter != _consumer_map.end()) {
for (auto holder : iter->second) {
if (holder.node_id == node_id) {
return Status::OK();
}
iter = _consumer_map.find(key);
if (iter != _consumer_map.end()) {
for (auto holder : iter->second) {
if (holder.node_id == node_id) {
return Status::OK();
}
}
IRuntimeFilter* filter;
RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, &_query_ctx->obj_pool, &desc,
&options, RuntimeFilterRole::CONSUMER, node_id,
&filter, build_bf_exactly));
_consumer_map[key].emplace_back(node_id, filter);
}
IRuntimeFilter* filter;
RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, &_query_ctx->obj_pool, &desc, &options,
RuntimeFilterRole::CONSUMER, node_id, &filter,
build_bf_exactly));
_consumer_map[key].emplace_back(node_id, filter);
} else {
DCHECK(_state != nullptr);
@ -162,6 +162,7 @@ Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc
bool build_bf_exactly) {
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
std::lock_guard<std::mutex> l(_lock);
auto iter = _producer_map.find(key);
DCHECK(_state != nullptr);

View File

@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ SUM(lo_extendedprice*lo_discount) AS
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_extendedprice*lo_discount) AS
REVENUE
FROM lineorder, date
WHERE lo_orderdate = d_datekey

View File

@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ SUM(lo_extendedprice*lo_discount) AS
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_extendedprice*lo_discount) AS
REVENUE
FROM lineorder, date
WHERE lo_orderdate = d_datekey

View File

@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ SUM(lo_extendedprice*lo_discount) AS
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_extendedprice*lo_discount) AS
REVENUE
FROM lineorder, date
WHERE lo_orderdate = d_datekey

View File

@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ SUM(lo_revenue), d_year, p_brand
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_revenue), d_year, p_brand
FROM lineorder, date, part, supplier
WHERE lo_orderdate = d_datekey
AND lo_partkey = p_partkey

View File

@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ SUM(lo_revenue), d_year, p_brand
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_revenue), d_year, p_brand
FROM lineorder, date, part, supplier
WHERE lo_orderdate = d_datekey
AND lo_partkey = p_partkey

View File

@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ SUM(lo_revenue), d_year, p_brand
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_revenue), d_year, p_brand
FROM lineorder, date, part, supplier
WHERE lo_orderdate = d_datekey
AND lo_partkey = p_partkey

View File

@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ c_nation, s_nation, d_year,
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_nation, s_nation, d_year,
SUM(lo_revenue) AS REVENUE
FROM customer, lineorder, supplier, date
WHERE lo_custkey = c_custkey

View File

@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ c_city, s_city, d_year, sum(lo_revenue)
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city, s_city, d_year, sum(lo_revenue)
AS REVENUE
FROM customer, lineorder, supplier, date
WHERE lo_custkey = c_custkey

View File

@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ c_city, s_city, d_year, SUM(lo_revenue)
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city, s_city, d_year, SUM(lo_revenue)
AS REVENUE
FROM customer, lineorder, supplier, date
WHERE lo_custkey = c_custkey

View File

@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ c_city, s_city, d_year, SUM(lo_revenue)
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city, s_city, d_year, SUM(lo_revenue)
AS REVENUE
FROM customer, lineorder, supplier, date
WHERE lo_custkey = c_custkey

View File

@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ d_year, c_nation,
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year, c_nation,
SUM(lo_revenue - lo_supplycost) AS PROFIT
FROM date, customer, supplier, part, lineorder
WHERE lo_custkey = c_custkey

View File

@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ d_year, s_nation, p_category,
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year, s_nation, p_category,
SUM(lo_revenue - lo_supplycost) AS PROFIT
FROM date, customer, supplier, part, lineorder
WHERE lo_custkey = c_custkey

View File

@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ d_year, s_city, p_brand,
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year, s_city, p_brand,
SUM(lo_revenue - lo_supplycost) AS PROFIT
FROM date, customer, supplier, part, lineorder
WHERE lo_custkey = c_custkey

View File

@ -1,5 +1,5 @@
-- tables: lineitem
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
l_returnflag,
l_linestatus,
sum(l_quantity) AS sum_qty,

View File

@ -1,5 +1,5 @@
-- tables: part,supplier,partsupp,nation,region
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
s_acctbal,
s_name,
n_name,

View File

@ -1,5 +1,5 @@
-- tables: customer,orders,lineitem
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
l_orderkey,
sum(l_extendedprice * (1 - l_discount)) AS revenue,
o_orderdate,

View File

@ -1,5 +1,5 @@
-- tables: orders,lineitem
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
o_orderpriority,
count(*) AS order_count
FROM orders

View File

@ -1,5 +1,5 @@
-- tables: customer,orders,lineitem,supplier,nation,region
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
n_name,
sum(l_extendedprice * (1 - l_discount)) AS revenue
FROM

View File

@ -1,6 +1,6 @@
-- tables: lineitem
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ sum(l_extendedprice * l_discount) AS revenue
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ sum(l_extendedprice * l_discount) AS revenue
FROM
lineitem
WHERE

View File

@ -1,5 +1,5 @@
-- tables: supplier,lineitem,orders,customer,nation
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
supp_nation,
cust_nation,
l_year,

View File

@ -1,5 +1,5 @@
-- tables: part,supplier,lineitem,orders,customer,nation,region
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
o_year,
sum(CASE
WHEN nation = 'BRAZIL'

View File

@ -1,5 +1,5 @@
-- tables: part,supplier,lineitem,partsupp,orders,nation
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
nation,
o_year,
sum(amount) AS sum_profit

View File

@ -1,5 +1,5 @@
-- tables: customer,orders,lineitem,nation
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
c_custkey,
c_name,
sum(l_extendedprice * (1 - l_discount)) AS revenue,

View File

@ -1,5 +1,5 @@
-- tables: partsupp,supplier,nation
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
ps_partkey,
sum(ps_supplycost * ps_availqty) AS value
FROM

View File

@ -1,5 +1,5 @@
-- tables: orders,lineitem
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
l_shipmode,
sum(CASE
WHEN o_orderpriority = '1-URGENT'

View File

@ -1,5 +1,5 @@
-- tables: customer
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
c_count,
count(*) AS custdist
FROM (

View File

@ -1,5 +1,5 @@
-- tables: lineitem,part
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ 100.00 * sum(CASE
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 100.00 * sum(CASE
WHEN p_type LIKE 'PROMO%'
THEN l_extendedprice * (1 - l_discount)
ELSE 0

View File

@ -1,4 +1,4 @@
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
s_suppkey,
s_name,
s_address,

View File

@ -1,5 +1,5 @@
-- tables: partsupp,part,supplier
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
p_brand,
p_type,
p_size,

View File

@ -1,5 +1,5 @@
-- tables: lineitem,part
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ sum(l_extendedprice) / 7.0 AS avg_yearly
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ sum(l_extendedprice) / 7.0 AS avg_yearly
FROM
lineitem,
part

View File

@ -1,5 +1,5 @@
-- tables: customer,orders,lineitem
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
c_name,
c_custkey,
o_orderkey,

View File

@ -1,5 +1,5 @@
-- tables: lineitem,part
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ sum(l_extendedprice * (1 - l_discount)) AS revenue
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ sum(l_extendedprice * (1 - l_discount)) AS revenue
FROM
lineitem,
part

View File

@ -1,5 +1,5 @@
-- tables: supplier,nation,partsupp,lineitem,part
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
s_name,
s_address
FROM

View File

@ -1,5 +1,5 @@
-- tables: supplier,lineitem,orders,nation
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
s_name,
count(*) AS numwait
FROM

View File

@ -1,5 +1,5 @@
-- tables: orders,customer
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
cntrycode,
count(*) AS numcust,
sum(c_acctbal) AS totacctbal