diff --git a/be/src/exprs/math_functions.cpp b/be/src/exprs/math_functions.cpp index 4bbce4dc56..6648a3727a 100644 --- a/be/src/exprs/math_functions.cpp +++ b/be/src/exprs/math_functions.cpp @@ -127,7 +127,7 @@ StringVal MathFunctions::decimal_to_base(FunctionContext* ctx, int64_t src_num, buf[buf_index] = '-'; ++result_len; } - StringVal result = StringVal::create_temp_string_val(ctx, result_len); + StringVal result = ctx->create_temp_string_val(result_len); memcpy(result.ptr, buf + max_digits - result_len, result_len); return result; } diff --git a/be/src/runtime/free_pool.hpp b/be/src/runtime/free_pool.hpp deleted file mode 100644 index 8cc5b0677d..0000000000 --- a/be/src/runtime/free_pool.hpp +++ /dev/null @@ -1,206 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// This file is copied from -// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/free-pool.hpp -// and modified by Doris - -#pragma once - -#include -#include - -#include - -#include "common/compiler_util.h" -#include "common/logging.h" -#include "runtime/mem_pool.h" -#include "util/bit_util.h" - -namespace doris { - -// Implementation of a free pool to recycle allocations. The pool is broken -// up into 64 lists, one for each power of 2. Each allocation is rounded up -// to the next power of 2. When the allocation is freed, it is added to the -// corresponding free list. -// Each allocation has an 8 byte header that immediately precedes the actual -// allocation. If the allocation is owned by the user, the header contains -// the ptr to the list that it should be added to on Free(). -// When the allocation is in the pool (i.e. available to be handed out), it -// contains the link to the next allocation. -// This has O(1) Allocate() and Free(). -// This is not thread safe. -// TODO(zxy): consider integrating this with MemPool. -// TODO: consider changing to something more granular than doubling. -class FreePool { -public: - // C'tor, initializes the FreePool to be empty. All allocations come from the - // 'mem_pool'. - FreePool(MemPool* mem_pool) : _mem_pool(mem_pool) { memset(&_lists, 0, sizeof(_lists)); } - - virtual ~FreePool() {} - - // Allocates a buffer of size. - uint8_t* allocate(int64_t size) { - // This is the typical malloc behavior. NULL is reserved for failures. - if (size == 0) { - return reinterpret_cast(0x1); - } - - // Do ceil(log_2(size)) - int free_list_idx = BitUtil::log2(size); - DCHECK_LT(free_list_idx, NUM_LISTS); - - FreeListNode* allocation = _lists[free_list_idx].next; - - if (allocation == NULL) { - // There wasn't an existing allocation of the right size, allocate a new one. - size = 1L << free_list_idx; - allocation = reinterpret_cast( - _mem_pool->allocate(size + sizeof(FreeListNode))); - } else { - // Remove this allocation from the list. - _lists[free_list_idx].next = allocation->next; - } - - DCHECK(allocation != NULL); - // Set the back node to point back to the list it came from so know where - // to add it on free(). - allocation->list = &_lists[free_list_idx]; - return reinterpret_cast(allocation) + sizeof(FreeListNode); - } - - // Allocates a buffer of size. - uint8_t* aligned_allocate(int alignment, int64_t size) { - // The alignment should be a power of 2. - DCHECK(alignment > 0 && ((alignment - 1) & alignment) == 0); - - // This is the typical malloc behavior. NULL is reserved for failures. - if (size == 0) { - return reinterpret_cast(alignment); - } - - int padding = sizeof(FreeListNode) >= alignment ? 0 : (alignment - sizeof(FreeListNode)); - size += padding; - - // Do ceil(log_2(size)) - int free_list_idx = BitUtil::log2(size); - DCHECK_LT(free_list_idx, NUM_LISTS); - - FreeListNode* allocation = _lists[free_list_idx].next; - - if (allocation == nullptr) { - // There wasn't an existing allocation of the right size, allocate a new one. - size = 1L << free_list_idx; - allocation = reinterpret_cast( - _mem_pool->allocate_aligned(size + sizeof(FreeListNode), alignment)); - } else { - // Remove this allocation from the list. - _lists[free_list_idx].next = allocation->next; - } - - DCHECK(allocation != nullptr); - // Set the back node to point back to the list it came from so know where - // to add it on free(). - allocation->list = &_lists[free_list_idx]; - return reinterpret_cast(allocation) + sizeof(FreeListNode) + padding; - } - - void free(uint8_t* ptr) { - if (ptr == NULL || reinterpret_cast(ptr) == 0x1) { - return; - } - - FreeListNode* node = reinterpret_cast(ptr - sizeof(FreeListNode)); - FreeListNode* list = node->list; -#ifndef NDEBUG - check_valid_allocation(list); -#endif - if (UNLIKELY(nullptr == list)) { - // free memory directly if the pointer to free list is null - LOG(ERROR) << "The free list was released, and this may cause memory leak."; - free(ptr); - } else { - // Add node to front of list. - node->next = list->next; - list->next = node; - } - } - - // Returns an allocation that is at least 'size'. If the current allocation - // backing 'ptr' is big enough, 'ptr' is returned. Otherwise a new one is - // made and the contents of ptr are copied into it. - uint8_t* reallocate(uint8_t* ptr, int64_t size) { - if (ptr == NULL || reinterpret_cast(ptr) == 0x1) { - return allocate(size); - } - - FreeListNode* node = reinterpret_cast(ptr - sizeof(FreeListNode)); - FreeListNode* list = node->list; -#ifndef NDEBUG - check_valid_allocation(list); -#endif - int bucket_idx = (list - &_lists[0]); - // This is the actual size of ptr. - int allocation_size = 1 << bucket_idx; - - // If it's already big enough, just return the ptr. - if (allocation_size >= size) { - return ptr; - } - - // Make a new one. Since allocate() already rounds up to powers of 2, this - // effectively doubles for the caller. - uint8_t* new_ptr = allocate(size); - memcpy(new_ptr, ptr, allocation_size); - free(ptr); - return new_ptr; - } - -private: - static const int NUM_LISTS = 64; - - struct FreeListNode { - // Union for clarity when manipulating the node. - union { - FreeListNode* next; // Used when it is in the free list - FreeListNode* list; // Used when it is being used by the caller. - }; - }; - - void check_valid_allocation(FreeListNode* computed_list_ptr) { - // On debug, check that list is valid. - bool found = false; - - for (int i = 0; i < NUM_LISTS && !found; ++i) { - if (computed_list_ptr == &_lists[i]) { - found = true; - } - } - - DCHECK(found); - } - - // MemPool to allocate from. Unowned. - MemPool* _mem_pool; - - // One list head for each allocation size indexed by the LOG_2 of the allocation size. - // While it doesn't make too much sense to use this for very small (e.g. 8 byte) - // allocations, it makes the indexing easy. - FreeListNode _lists[NUM_LISTS]; -}; - -} // namespace doris diff --git a/be/src/udf/udf.cpp b/be/src/udf/udf.cpp index 0d31c42ceb..f7b3c6f552 100644 --- a/be/src/udf/udf.cpp +++ b/be/src/udf/udf.cpp @@ -33,167 +33,36 @@ // Be careful what this includes since this needs to be linked into the UDF's // binary. For example, it would be unfortunate if they had a random dependency // on libhdfs. +#include "runtime/runtime_state.h" #include "udf/udf_internal.h" #include "util/debug_util.h" -#if DORIS_UDF_SDK_BUILD -// For the SDK build, we are building the .lib that the developers would use to -// write UDFs. They want to link against this to run their UDFs in a test environment. -// Pulling in free-pool is very undesirable since it pulls in many other libraries. -// Instead, we'll implement a dummy version that is not used. -// When they build their library to a .so, they'd use the version of FunctionContext -// in the main binary, which does include FreePool. -namespace doris { -class FreePool { -public: - FreePool(MemPool*) {} - - uint8_t* allocate(int byte_size) { return reinterpret_cast(malloc(byte_size)); } - uint8_t* aligned_allocate(int alignment, int byte_size) { - return reinterpret_cast(aligned_alloc(alignment, byte_size)); - } - - uint8_t* reallocate(uint8_t* ptr, int byte_size) { - return reinterpret_cast(realloc(ptr, byte_size)); - } - - void free(uint8_t* ptr) { ::free(ptr); } -}; - -class RuntimeState { -public: - void set_process_status(const std::string& error_msg) { assert(false); } - - bool log_error(const std::string& error) { - assert(false); - return false; - } - - const std::string& user() const { return _user; } - -private: - std::string _user = ""; -}; -} // namespace doris -#else -#include "runtime/free_pool.hpp" -#include "runtime/runtime_state.h" -#endif - namespace doris { -FunctionContextImpl::FunctionContextImpl(doris_udf::FunctionContext* parent) - : _varargs_buffer(nullptr), - _varargs_buffer_size(0), - _num_updates(0), - _num_removes(0), - _context(parent), - _pool(nullptr), - _state(nullptr), - _debug(false), +FunctionContextImpl::FunctionContextImpl() + : _state(nullptr), _version(doris_udf::FunctionContext::V2_0), _num_warnings(0), _thread_local_fn_state(nullptr), - _fragment_local_fn_state(nullptr), - _external_bytes_tracked(0), - _closed(false) {} - -void FunctionContextImpl::close() { - if (_closed) { - return; - } - - // Free local allocations first so we can detect leaks through any remaining allocations - // (local allocations cannot be leaked, at least not by the UDF) - free_local_allocations(); - - if (_external_bytes_tracked > 0) { - // This isn't ideal because the memory is still leaked, but don't track it so our - // accounting stays sane. - // TODO: we need to modify the memtrackers to allow leaked user-allocated memory. - _context->free(_external_bytes_tracked); - } - - free(_varargs_buffer); - _varargs_buffer = nullptr; - - _closed = true; -} - -uint8_t* FunctionContextImpl::allocate_local(int64_t byte_size) { - uint8_t* buffer = _pool->allocate(byte_size); - _local_allocations.push_back(buffer); - return buffer; -} - -void FunctionContextImpl::free_local_allocations() { - for (int i = 0; i < _local_allocations.size(); ++i) { - _pool->free(_local_allocations[i]); - } - - _local_allocations.clear(); -} + _fragment_local_fn_state(nullptr) {} void FunctionContextImpl::set_constant_cols( - const std::vector& constant_cols) { + const std::vector>& constant_cols) { _constant_cols = constant_cols; } -bool FunctionContextImpl::check_allocations_empty() { - if (_allocations.empty() && _external_bytes_tracked == 0) { - return true; - } - - // TODO: fix this - //if (_debug) _context->set_error("Leaked allocations."); - return false; -} - -bool FunctionContextImpl::check_local_allocations_empty() { - if (_local_allocations.empty()) { - return true; - } - - // TODO: fix this - //if (_debug) _context->set_error("Leaked local allocations."); - return false; -} - doris_udf::FunctionContext* FunctionContextImpl::create_context( - RuntimeState* state, MemPool* pool, const doris_udf::FunctionContext::TypeDesc& return_type, - const std::vector& arg_types, int varargs_buffer_size, - bool debug) { - doris_udf::FunctionContext::TypeDesc invalid_type; - invalid_type.type = doris_udf::FunctionContext::INVALID_TYPE; - invalid_type.precision = 0; - invalid_type.scale = 0; - return FunctionContextImpl::create_context(state, pool, invalid_type, return_type, arg_types, - varargs_buffer_size, debug); -} - -doris_udf::FunctionContext* FunctionContextImpl::create_context( - RuntimeState* state, MemPool* pool, - const doris_udf::FunctionContext::TypeDesc& intermediate_type, - const doris_udf::FunctionContext::TypeDesc& return_type, - const std::vector& arg_types, int varargs_buffer_size, - bool debug) { + RuntimeState* state, const doris_udf::FunctionContext::TypeDesc& return_type, + const std::vector& arg_types) { auto* ctx = new doris_udf::FunctionContext(); ctx->_impl->_state = state; - ctx->_impl->_pool = new FreePool(pool); - ctx->_impl->_intermediate_type = intermediate_type; ctx->_impl->_return_type = return_type; ctx->_impl->_arg_types = arg_types; - ctx->_impl->_varargs_buffer = reinterpret_cast(malloc(varargs_buffer_size)); - ctx->_impl->_varargs_buffer_size = varargs_buffer_size; - ctx->_impl->_debug = debug; - VLOG_ROW << "Created FunctionContext: " << ctx << " with pool " << ctx->_impl->_pool; return ctx; } -FunctionContext* FunctionContextImpl::clone(MemPool* pool) { - doris_udf::FunctionContext* new_context = - create_context(_state, pool, _intermediate_type, _return_type, _arg_types, - _varargs_buffer_size, _debug); +FunctionContext* FunctionContextImpl::clone() { + doris_udf::FunctionContext* new_context = create_context(_state, _return_type, _arg_types); new_context->_impl->_constant_cols = _constant_cols; new_context->_impl->_fragment_local_fn_state = _fragment_local_fn_state; return new_context; @@ -204,113 +73,22 @@ FunctionContext* FunctionContextImpl::clone(MemPool* pool) { namespace doris_udf { static const int MAX_WARNINGS = 1000; -FunctionContext::FunctionContext() : _impl(new doris::FunctionContextImpl(this)) {} - -FunctionContext::~FunctionContext() { - // TODO: this needs to free local allocations but there's a mem issue - // in the uda harness now. - _impl->check_local_allocations_empty(); - _impl->check_allocations_empty(); - delete _impl->_pool; - delete _impl; +FunctionContext::FunctionContext() { + _impl = std::make_unique(); } FunctionContext::DorisVersion FunctionContext::version() const { return _impl->_version; } -const char* FunctionContext::user() const { - if (_impl->_state == nullptr) { - return nullptr; - } - - return _impl->_state->user().c_str(); -} - FunctionContext::UniqueId FunctionContext::query_id() const { UniqueId id; -#if DORIS_UDF_SDK_BUILD - id.hi = id.lo = 0; -#else id.hi = _impl->_state->query_id().hi; id.lo = _impl->_state->query_id().lo; -#endif return id; } -bool FunctionContext::has_error() const { - return !_impl->_error_msg.empty(); -} - -const char* FunctionContext::error_msg() const { - if (has_error()) { - return _impl->_error_msg.c_str(); - } - - return nullptr; -} - -uint8_t* FunctionContext::allocate(int byte_size) { - uint8_t* buffer = _impl->_pool->allocate(byte_size); - _impl->_allocations[buffer] = byte_size; - - if (_impl->_debug) { - memset(buffer, 0xff, byte_size); - } - - return buffer; -} - -uint8_t* FunctionContext::aligned_allocate(int alignment, int byte_size) { - uint8_t* buffer = _impl->_pool->aligned_allocate(alignment, byte_size); - _impl->_allocations[buffer] = byte_size; - - if (_impl->_debug) { - memset(buffer, 0xff, byte_size); - } - - return buffer; -} - -uint8_t* FunctionContext::reallocate(uint8_t* ptr, int byte_size) { - _impl->_allocations.erase(ptr); - ptr = _impl->_pool->reallocate(ptr, byte_size); - _impl->_allocations[ptr] = byte_size; - return ptr; -} - -void FunctionContext::free(uint8_t* buffer) { - if (buffer == nullptr) { - return; - } - - if (_impl->_debug) { - std::map::iterator it = _impl->_allocations.find(buffer); - - if (it != _impl->_allocations.end()) { - // fill in garbage value into the buffer to increase the chance of detecting misuse - memset(buffer, 0xff, it->second); - _impl->_allocations.erase(it); - _impl->_pool->free(buffer); - } else { - set_error("FunctionContext::free() on buffer that is not freed or was not allocated."); - } - } else { - _impl->_allocations.erase(buffer); - _impl->_pool->free(buffer); - } -} - -void FunctionContext::track_allocation(int64_t bytes) { - _impl->_external_bytes_tracked += bytes; -} - -void FunctionContext::free(int64_t bytes) { - _impl->_external_bytes_tracked -= bytes; -} - void FunctionContext::set_function_state(FunctionStateScope scope, std::shared_ptr ptr) { - assert(!_impl->_closed); switch (scope) { case THREAD_LOCAL: _impl->_thread_local_fn_state = std::move(ptr); @@ -337,10 +115,6 @@ void FunctionContext::set_error(const char* error_msg) { } } -void FunctionContext::clear_error_msg() { - _impl->_error_msg.clear(); -} - bool FunctionContext::add_warning(const char* warning_msg) { if (_impl->_num_warnings++ >= MAX_WARNINGS) { return false; @@ -357,76 +131,6 @@ bool FunctionContext::add_warning(const char* warning_msg) { } } -StringVal::StringVal(FunctionContext* context, int64_t len) - : len(len), ptr(context->impl()->allocate_local(len)) {} - -bool StringVal::resize(FunctionContext* ctx, int64_t new_len) { - if (new_len <= len) { - len = new_len; - return true; - } - if (UNLIKELY(new_len > StringVal::MAX_LENGTH)) { - len = 0; - is_null = true; - return false; - } - auto* new_ptr = ctx->impl()->allocate_local(new_len); - if (new_ptr != nullptr) { - memcpy(new_ptr, ptr, len); - ptr = new_ptr; - len = new_len; - return true; - } - return false; -} - -StringVal StringVal::copy_from(FunctionContext* ctx, const uint8_t* buf, int64_t len) { - StringVal result(ctx, len); - if (!result.is_null) { - memcpy(result.ptr, buf, len); - } - return result; -} - -StringVal StringVal::create_temp_string_val(FunctionContext* ctx, int64_t len) { - ctx->impl()->string_result().resize(len); - return StringVal((uint8_t*)ctx->impl()->string_result().c_str(), len); -} - -void StringVal::append(FunctionContext* ctx, const uint8_t* buf, int64_t buf_len) { - if (UNLIKELY(len + buf_len > StringVal::MAX_LENGTH)) { - ctx->set_error( - "Concatenated string length larger than allowed limit of " - "1 GB character data."); - ctx->free(ptr); - ptr = nullptr; - len = 0; - is_null = true; - } else { - ptr = ctx->reallocate(ptr, len + buf_len); - memcpy(ptr + len, buf, buf_len); - len += buf_len; - } -} - -void StringVal::append(FunctionContext* ctx, const uint8_t* buf, int64_t buf_len, - const uint8_t* buf2, int64_t buf2_len) { - if (UNLIKELY(len + buf_len + buf2_len > StringVal::MAX_LENGTH)) { - ctx->set_error( - "Concatenated string length larger than allowed limit of " - "1 GB character data."); - ctx->free(ptr); - ptr = nullptr; - len = 0; - is_null = true; - } else { - ptr = ctx->reallocate(ptr, len + buf_len + buf2_len); - memcpy(ptr + len, buf, buf_len); - memcpy(ptr + len + buf_len, buf2, buf2_len); - len += buf_len + buf2_len; - } -} - const FunctionContext::TypeDesc* FunctionContext::get_arg_type(int arg_idx) const { if (arg_idx < 0 || arg_idx >= _impl->_arg_types.size()) { return nullptr; @@ -445,7 +149,7 @@ doris::ColumnPtrWrapper* FunctionContext::get_constant_col(int i) const { if (i < 0 || i >= _impl->_constant_cols.size()) { return nullptr; } - return _impl->_constant_cols[i]; + return _impl->_constant_cols[i].get(); } int FunctionContext::get_num_args() const { @@ -457,7 +161,6 @@ const FunctionContext::TypeDesc& FunctionContext::get_return_type() const { } void* FunctionContext::get_function_state(FunctionStateScope scope) const { - // assert(!_impl->_closed); switch (scope) { case THREAD_LOCAL: return _impl->_thread_local_fn_state.get(); @@ -468,6 +171,12 @@ void* FunctionContext::get_function_state(FunctionStateScope scope) const { return nullptr; } } + +StringVal FunctionContext::create_temp_string_val(int64_t len) { + this->impl()->string_result().resize(len); + return StringVal((uint8_t*)this->impl()->string_result().c_str(), len); +} + std::ostream& operator<<(std::ostream& os, const StringVal& string_val) { return os << string_val.to_string(); } diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h index 4b3c2115ac..9dbcd99a55 100644 --- a/be/src/udf/udf.h +++ b/be/src/udf/udf.h @@ -139,10 +139,6 @@ public: // Returns the version of Doris that's currently running. DorisVersion version() const; - // Returns the user that is running the query. Returns nullptr if it is not - // available. - const char* user() const; - // Returns the query_id for the current query. UniqueId query_id() const; @@ -152,53 +148,12 @@ public: // ensure the function return value is null. void set_error(const char* error_msg); - // when you reused this FunctionContext, you maybe need clear the error status and message. - void clear_error_msg(); - // Adds a warning that is returned to the user. This can include things like // overflow or other recoverable error conditions. // Warnings are capped at a maximum number. Returns true if the warning was // added and false if it was ignored due to the cap. bool add_warning(const char* warning_msg); - // Returns true if there's been an error set. - bool has_error() const; - - // Returns the current error message. Returns nullptr if there is no error. - const char* error_msg() const; - - // Allocates memory for UDAs. All UDA allocations should use this if possible instead of - // malloc/new. The UDA is responsible for calling Free() on all buffers returned - // by Allocate(). - // If this Allocate causes the memory limit to be exceeded, the error will be set - // in this object causing the query to fail. - uint8_t* allocate(int byte_size); - - // Allocate and align memory for UDAs. All UDA allocations should use this if possible instead of - // malloc/new. The UDA is responsible for calling Free() on all buffers returned - // by Allocate(). - // If this Allocate causes the memory limit to be exceeded, the error will be set - // in this object causing the query to fail. - uint8_t* aligned_allocate(int alignment, int byte_size); - - // Reallocates 'ptr' to the new byte_size. If the currently underlying allocation - // is big enough, the original ptr will be returned. If the allocation needs to - // grow, a new allocation is made that is at least 'byte_size' and the contents - // of 'ptr' will be copied into it. - // This should be used for buffers that constantly get appended to. - uint8_t* reallocate(uint8_t* ptr, int byte_size); - - // Frees a buffer returned from Allocate() or Reallocate() - void free(uint8_t* buffer); - - // For allocations that cannot use the Allocate() API provided by this - // object, TrackAllocation()/Free() can be used to just keep count of the - // byte sizes. For each call to TrackAllocation(), the UDF/UDA must call - // the corresponding Free(). - void track_allocation(int64_t byte_size); - - void free(int64_t byte_size); - // TODO: Do we need to add arbitrary key/value metadata. This would be plumbed // through the query. E.g. "select UDA(col, 'sample=true') from tbl". // const char* GetMetadata(const char*) const; @@ -209,7 +164,7 @@ public: // Returns the underlying opaque implementation object. The UDF/UDA should not // use this. This is used internally. - doris::FunctionContextImpl* impl() { return _impl; } + doris::FunctionContextImpl* impl() { return _impl.get(); } /// Methods for maintaining state across UDF/UDA function calls. SetFunctionState() can /// be used to store a pointer that can then be retrieved via GetFunctionState(). If @@ -224,10 +179,6 @@ public: // return type of the UDA (e.g., the type returned by the finalize function). const TypeDesc& get_return_type() const; - // Returns the intermediate type for UDAs, i.e., the one returned by - // update and merge functions. Returns INVALID_TYPE for UDFs. - const TypeDesc& get_intermediate_type() const; - // Returns the number of arguments to this function (not including the FunctionContext* // argument). int get_num_args() const; @@ -246,7 +197,10 @@ public: // Init() or Close() functions. doris::ColumnPtrWrapper* get_constant_col(int arg_idx) const; - ~FunctionContext(); + // Creates a StringVal, which memory is available when this function context is used next time + StringVal create_temp_string_val(int64_t len); + + ~FunctionContext() = default; private: friend class doris::FunctionContextImpl; @@ -258,7 +212,7 @@ private: FunctionContext& operator=(const FunctionContext& other); - doris::FunctionContextImpl* _impl; // Owned by this object. + std::unique_ptr _impl; // Owned by this object. }; //---------------------------------------------------------------------------- @@ -416,16 +370,6 @@ struct StringVal : public AnyVal { return sv; } - // Creates a StringVal, allocating a new buffer with 'len'. This should - // be used to return StringVal objects in UDF/UDAs that need to allocate new - // string memory. - StringVal(FunctionContext* context, int64_t len); - - // Creates a StringVal, which memory is available when this function context is used next time - static StringVal create_temp_string_val(FunctionContext* ctx, int64_t len); - - bool resize(FunctionContext* context, int64_t len); - bool operator==(const StringVal& other) const { if (is_null != other.is_null) { return false; @@ -444,19 +388,6 @@ struct StringVal : public AnyVal { bool operator!=(const StringVal& other) const { return !(*this == other); } - /// Will create a new StringVal with the given dimension and copy the data from the - /// parameters. In case of an error will return a nullptr string and set an error on the - /// function context. - static StringVal copy_from(FunctionContext* ctx, const uint8_t* buf, int64_t len); - - /// Append the passed buffer to this StringVal. Reallocate memory to fit the buffer. If - /// the memory allocation becomes too large, will set an error on FunctionContext and - /// return a nullptr string. - void append(FunctionContext* ctx, const uint8_t* buf, int64_t len); - - void append(FunctionContext* ctx, const uint8_t* buf, int64_t len, const uint8_t* buf2, - int64_t buf2_len); - std::string to_string() const { return std::string((char*)ptr, len); } }; diff --git a/be/src/udf/udf_internal.h b/be/src/udf/udf_internal.h index 91676cc0ea..afc2e3ca03 100644 --- a/be/src/udf/udf_internal.h +++ b/be/src/udf/udf_internal.h @@ -31,8 +31,6 @@ namespace doris { -class FreePool; -class MemPool; class RuntimeState; struct ColumnPtrWrapper; @@ -41,63 +39,22 @@ struct ColumnPtrWrapper; // Note: The actual user code does not include this file. class FunctionContextImpl { public: - /// Create a FunctionContext for a UDF. Caller is responsible for deleting it. - static doris_udf::FunctionContext* create_context( - RuntimeState* state, MemPool* pool, - const doris_udf::FunctionContext::TypeDesc& return_type, - const std::vector& arg_types, - int varargs_buffer_size, bool debug); - /// Create a FunctionContext for a UDA. Identical to the UDF version except for the /// intermediate type. Caller is responsible for deleting it. static doris_udf::FunctionContext* create_context( - RuntimeState* state, MemPool* pool, - const doris_udf::FunctionContext::TypeDesc& intermediate_type, - const doris_udf::FunctionContext::TypeDesc& return_type, - const std::vector& arg_types, - int varargs_buffer_size, bool debug); + RuntimeState* state, const doris_udf::FunctionContext::TypeDesc& return_type, + const std::vector& arg_types); ~FunctionContextImpl() {} - FunctionContextImpl(doris_udf::FunctionContext* parent); - - void close(); + FunctionContextImpl(); /// Returns a new FunctionContext with the same constant args, fragment-local state, and /// debug flag as this FunctionContext. The caller is responsible for calling delete on /// it. - doris_udf::FunctionContext* clone(MemPool* pool); + doris_udf::FunctionContext* clone(); - void set_constant_cols(const std::vector& cols); - - uint8_t* varargs_buffer() { return _varargs_buffer; } - - bool closed() const { return _closed; } - - int64_t num_updates() const { return _num_updates; } - int64_t num_removes() const { return _num_removes; } - void set_num_updates(int64_t n) { _num_updates = n; } - void set_num_removes(int64_t n) { _num_removes = n; } - void increment_num_updates(int64_t n) { _num_updates += n; } - void increment_num_updates() { _num_updates += 1; } - void increment_num_removes(int64_t n) { _num_removes += n; } - void increment_num_removes() { _num_removes += 1; } - - // Allocates a buffer of 'byte_size' with "local" memory management. These - // allocations are not freed one by one but freed as a pool by FreeLocalAllocations() - // This is used where the lifetime of the allocation is clear. - // For UDFs, the allocations can be freed at the row level. - // TODO: free them at the batch level and save some copies? - uint8_t* allocate_local(int64_t byte_size); - - // Frees all allocations returned by AllocateLocal(). - void free_local_allocations(); - - // Returns true if there are no outstanding allocations. - bool check_allocations_empty(); - - // Returns true if there are no outstanding local allocations. - bool check_local_allocations_empty(); + void set_constant_cols(const std::vector>& cols); RuntimeState* state() { return _state; } @@ -114,31 +71,10 @@ public: private: friend class doris_udf::FunctionContext; - /// Preallocated buffer for storing varargs (if the function has any). Allocated and - /// owned by this object, but populated by an Expr function. - // - /// This is the first field in the class so it's easy to access in codegen'd functions. - /// Don't move it or add fields above unless you know what you're doing. - uint8_t* _varargs_buffer; - int _varargs_buffer_size; - - // The number of calls to Update()/Remove(). - int64_t _num_updates; - int64_t _num_removes; - - // Parent context object. Not owned - doris_udf::FunctionContext* _context; - - // Pool to service allocations from. - FreePool* _pool; - // We use the query's runtime state to report errors and warnings. nullptr for test // contexts. RuntimeState* _state; - // If true, indicates this is a debug context which will do additional validation. - bool _debug; - doris_udf::FunctionContext::DorisVersion _version; // Empty if there's no error @@ -147,39 +83,20 @@ private: // The number of warnings reported. int64_t _num_warnings; - // Allocations made and still owned by the user function. - std::map _allocations; - std::vector _local_allocations; - /// The function state accessed via FunctionContext::Get/SetFunctionState() std::shared_ptr _thread_local_fn_state; std::shared_ptr _fragment_local_fn_state; - // The number of bytes allocated externally by the user function. In some cases, - // it is too inconvenient to use the Allocate()/Free() APIs in the FunctionContext, - // particularly for existing codebases (e.g. they use std::vector). Instead, they'll - // have to track those allocations manually. - int64_t _external_bytes_tracked; - - // Type descriptor for the intermediate type of a UDA. Set to INVALID_TYPE for UDFs. - doris_udf::FunctionContext::TypeDesc _intermediate_type; - // Type descriptor for the return type of the function. doris_udf::FunctionContext::TypeDesc _return_type; // Type descriptors for each argument of the function. std::vector _arg_types; - // Contains an AnyVal* for each argument of the function. If the AnyVal* is nullptr, - // indicates that the corresponding argument is non-constant. Otherwise contains the - // value of the argument. - std::vector _constant_cols; + std::vector> _constant_cols; bool _check_overflow_for_decimal = false; - // Indicates whether this context has been closed. Used for verification/debugging. - bool _closed; - std::string _string_result; }; diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 2978d806db..ae91bca981 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -317,7 +317,7 @@ Status NewOlapScanNode::_should_push_down_function_filter(VectorizedFnCall* fn_c return Status::OK(); } else { DCHECK(children[1 - i]->type().is_string_type()); - ColumnPtrWrapper* const_col_wrapper = nullptr; + std::shared_ptr const_col_wrapper; RETURN_IF_ERROR(children[1 - i]->get_const_col(expr_ctx, &const_col_wrapper)); if (const ColumnConst* const_column = check_and_get_column(const_col_wrapper->column_ptr)) { diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 1e8ae42c11..d10cf98987 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -691,7 +691,7 @@ bool VScanNode::_is_predicate_acting_on_slot( Status VScanNode::_eval_const_conjuncts(VExpr* vexpr, VExprContext* expr_ctx, PushDownType* pdt) { char* constant_val = nullptr; if (vexpr->is_constant()) { - ColumnPtrWrapper* const_col_wrapper = nullptr; + std::shared_ptr const_col_wrapper; RETURN_IF_ERROR(vexpr->get_const_col(expr_ctx, &const_col_wrapper)); if (const ColumnConst* const_column = check_and_get_column(const_col_wrapper->column_ptr)) { @@ -1288,7 +1288,7 @@ Status VScanNode::_should_push_down_binary_predicate( pdt = PushDownType::UNACCEPTABLE; return Status::OK(); } else { - ColumnPtrWrapper* const_col_wrapper = nullptr; + std::shared_ptr const_col_wrapper; RETURN_IF_ERROR(children[1 - i]->get_const_col(expr_ctx, &const_col_wrapper)); if (const ColumnConst* const_column = check_and_get_column(const_col_wrapper->column_ptr)) { diff --git a/be/src/vec/exprs/varray_literal.cpp b/be/src/vec/exprs/varray_literal.cpp index 1989441b5b..e986e927a4 100644 --- a/be/src/vec/exprs/varray_literal.cpp +++ b/be/src/vec/exprs/varray_literal.cpp @@ -28,7 +28,7 @@ Status VArrayLiteral::prepare(RuntimeState* state, const RowDescriptor& row_desc Field array = is_null ? Field() : Array(); for (const auto child : _children) { Field item; - ColumnPtrWrapper* const_col_wrapper = nullptr; + std::shared_ptr const_col_wrapper; RETURN_IF_ERROR(child->get_const_col(context, &const_col_wrapper)); const_col_wrapper->column_ptr->get(0, item); array.get().push_back(item); diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp index 4e71afbf31..a72792fb3e 100644 --- a/be/src/vec/exprs/vexpr.cpp +++ b/be/src/vec/exprs/vexpr.cpp @@ -446,14 +446,14 @@ bool VExpr::is_constant() const { return true; } -Status VExpr::get_const_col(VExprContext* context, ColumnPtrWrapper** output) { - *output = nullptr; +Status VExpr::get_const_col(VExprContext* context, + std::shared_ptr* column_wrapper) { if (!is_constant()) { return Status::OK(); } if (_constant_col != nullptr) { - *output = _constant_col.get(); + *column_wrapper = _constant_col; return Status::OK(); } @@ -466,7 +466,7 @@ Status VExpr::get_const_col(VExprContext* context, ColumnPtrWrapper** output) { DCHECK(result != -1); const auto& column = block.get_by_position(result).column; _constant_col = std::make_shared(column); - *output = _constant_col.get(); + *column_wrapper = _constant_col; return Status::OK(); } @@ -477,7 +477,7 @@ void VExpr::register_function_context(doris::RuntimeState* state, VExprContext* arg_types.push_back(VExpr::column_type_to_type_desc(_children[i]->type())); } - _fn_context_index = context->register_func(state, return_type, arg_types, 0); + _fn_context_index = context->register_func(state, return_type, arg_types); } Status VExpr::init_function_context(VExprContext* context, @@ -485,19 +485,19 @@ Status VExpr::init_function_context(VExprContext* context, const FunctionBasePtr& function) const { FunctionContext* fn_ctx = context->fn_context(_fn_context_index); if (scope == FunctionContext::FRAGMENT_LOCAL) { - std::vector constant_cols; + std::vector> constant_cols; for (auto c : _children) { - ColumnPtrWrapper* const_col_wrapper = nullptr; - RETURN_IF_ERROR(c->get_const_col(context, &const_col_wrapper)); - constant_cols.push_back(const_col_wrapper); + std::shared_ptr const_col; + RETURN_IF_ERROR(c->get_const_col(context, &const_col)); + constant_cols.push_back(const_col); } fn_ctx->impl()->set_constant_cols(constant_cols); } if (scope == FunctionContext::FRAGMENT_LOCAL) { - RETURN_IF_ERROR(function->prepare(fn_ctx, FunctionContext::FRAGMENT_LOCAL)); + RETURN_IF_ERROR(function->open(fn_ctx, FunctionContext::FRAGMENT_LOCAL)); } - RETURN_IF_ERROR(function->prepare(fn_ctx, FunctionContext::THREAD_LOCAL)); + RETURN_IF_ERROR(function->open(fn_ctx, FunctionContext::THREAD_LOCAL)); return Status::OK(); } diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index d20c2e59ba..1eb527c600 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -149,7 +149,7 @@ public: /// the output. Returns nullptr if the argument is not constant. The returned ColumnPtr is /// owned by this expr. This should only be called after Open() has been called on this /// expr. - Status get_const_col(VExprContext* context, ColumnPtrWrapper** output); + Status get_const_col(VExprContext* context, std::shared_ptr* column_wrapper); int fn_context_index() const { return _fn_context_index; } diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index 9033245202..2103e63391 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -48,7 +48,6 @@ doris::Status VExprContext::execute(doris::vectorized::Block* block, int* result doris::Status VExprContext::prepare(doris::RuntimeState* state, const doris::RowDescriptor& row_desc) { _prepared = true; - _pool.reset(new MemPool()); return _root->prepare(state, row_desc, this); } @@ -70,14 +69,6 @@ void VExprContext::close(doris::RuntimeState* state) { FunctionContext::FunctionStateScope scope = _is_clone ? FunctionContext::THREAD_LOCAL : FunctionContext::FRAGMENT_LOCAL; _root->close(state, this, scope); - - for (int i = 0; i < _fn_contexts.size(); ++i) { - _fn_contexts[i]->impl()->close(); - } - // _pool can be NULL if Prepare() was never called - if (_pool != nullptr) { - _pool->free_all(); - } _closed = true; } @@ -87,9 +78,8 @@ doris::Status VExprContext::clone(RuntimeState* state, VExprContext** new_ctx) { DCHECK(*new_ctx == nullptr); *new_ctx = state->obj_pool()->add(new VExprContext(_root)); - (*new_ctx)->_pool.reset(new MemPool()); for (auto& _fn_context : _fn_contexts) { - (*new_ctx)->_fn_contexts.push_back(_fn_context->impl()->clone((*new_ctx)->_pool.get())); + (*new_ctx)->_fn_contexts.push_back(_fn_context->impl()->clone()); } (*new_ctx)->_is_clone = true; @@ -101,15 +91,13 @@ doris::Status VExprContext::clone(RuntimeState* state, VExprContext** new_ctx) { void VExprContext::clone_fn_contexts(VExprContext* other) { for (auto& _fn_context : _fn_contexts) { - other->_fn_contexts.push_back(_fn_context->impl()->clone(other->_pool.get())); + other->_fn_contexts.push_back(_fn_context->impl()->clone()); } } int VExprContext::register_func(RuntimeState* state, const FunctionContext::TypeDesc& return_type, - const std::vector& arg_types, - int varargs_buffer_size) { - _fn_contexts.push_back(FunctionContextImpl::create_context( - state, _pool.get(), return_type, arg_types, varargs_buffer_size, false)); + const std::vector& arg_types) { + _fn_contexts.push_back(FunctionContextImpl::create_context(state, return_type, arg_types)); _fn_contexts.back()->impl()->set_check_overflow_for_decimal( state->check_overflow_for_decimal()); return _fn_contexts.size() - 1; diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h index 82ff9d11b8..b9a65a0d00 100644 --- a/be/src/vec/exprs/vexpr_context.h +++ b/be/src/vec/exprs/vexpr_context.h @@ -42,8 +42,7 @@ public: /// Prepare() and save the returned index. 'varargs_buffer_size', if specified, is the /// size of the varargs buffer in the created FunctionContext (see udf-internal.h). int register_func(RuntimeState* state, const FunctionContext::TypeDesc& return_type, - const std::vector& arg_types, - int varargs_buffer_size); + const std::vector& arg_types); /// Retrieves a registered FunctionContext. 'i' is the index returned by the call to /// register_func(). This should only be called by VExprs. @@ -95,9 +94,6 @@ private: /// and owned by this VExprContext. std::vector _fn_contexts; - /// Pool backing fn_contexts_. - std::unique_ptr _pool; - int _last_result_column_id; bool _stale; diff --git a/be/src/vec/exprs/vmap_literal.cpp b/be/src/vec/exprs/vmap_literal.cpp index 60415dda69..1304569e49 100644 --- a/be/src/vec/exprs/vmap_literal.cpp +++ b/be/src/vec/exprs/vmap_literal.cpp @@ -32,9 +32,9 @@ Status VMapLiteral::prepare(RuntimeState* state, const RowDescriptor& row_desc, // each child is slot with key1, value1, key2, value2... for (int idx = 0; idx < _children.size() && idx + 1 < _children.size(); idx += 2) { Field kf, vf; - ColumnPtrWrapper* const_key_col_wrapper = nullptr; - ColumnPtrWrapper* const_val_col_wrapper = nullptr; + std::shared_ptr const_key_col_wrapper; RETURN_IF_ERROR(_children[idx]->get_const_col(context, &const_key_col_wrapper)); + std::shared_ptr const_val_col_wrapper; RETURN_IF_ERROR(_children[idx + 1]->get_const_col(context, &const_val_col_wrapper)); const_key_col_wrapper->column_ptr->get(0, kf); const_val_col_wrapper->column_ptr->get(0, vf); diff --git a/be/src/vec/exprs/vstruct_literal.cpp b/be/src/vec/exprs/vstruct_literal.cpp index 9ae647b481..3ae034f64d 100644 --- a/be/src/vec/exprs/vstruct_literal.cpp +++ b/be/src/vec/exprs/vstruct_literal.cpp @@ -25,7 +25,7 @@ Status VStructLiteral::prepare(RuntimeState* state, const RowDescriptor& row_des Field struct_field = Tuple(); for (const auto child : _children) { Field item; - ColumnPtrWrapper* const_col_wrapper = nullptr; + std::shared_ptr const_col_wrapper; RETURN_IF_ERROR(child->get_const_col(context, &const_col_wrapper)); const_col_wrapper->column_ptr->get(0, item); struct_field.get().push_back(item); diff --git a/be/src/vec/functions/function.h b/be/src/vec/functions/function.h index 2a770011b4..9117599edb 100644 --- a/be/src/vec/functions/function.h +++ b/be/src/vec/functions/function.h @@ -136,7 +136,7 @@ public: /// Override this when function need to store state in the `FunctionContext`, or do some /// preparation work according to information from `FunctionContext`. - virtual Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) { + virtual Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) { return Status::OK(); } @@ -441,7 +441,7 @@ public: __builtin_unreachable(); } - Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { + Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { return Status::OK(); } @@ -518,8 +518,8 @@ public: return std::make_shared(function); } - Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { - return function->prepare(context, scope); + Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { + return function->open(context, scope); } Status close(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { diff --git a/be/src/vec/functions/function_convert_tz.h b/be/src/vec/functions/function_convert_tz.h index caf704cde9..54c3d253ee 100644 --- a/be/src/vec/functions/function_convert_tz.h +++ b/be/src/vec/functions/function_convert_tz.h @@ -136,7 +136,7 @@ public: bool use_default_implementation_for_constants() const override { return true; } bool use_default_implementation_for_nulls() const override { return false; } - Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { + Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { if (scope != FunctionContext::THREAD_LOCAL) { return Status::OK(); } diff --git a/be/src/vec/functions/function_java_udf.cpp b/be/src/vec/functions/function_java_udf.cpp index 9da2f38dd5..2c8e9a91fa 100644 --- a/be/src/vec/functions/function_java_udf.cpp +++ b/be/src/vec/functions/function_java_udf.cpp @@ -49,8 +49,7 @@ JavaFunctionCall::JavaFunctionCall(const TFunction& fn, const DataTypes& argumen const DataTypePtr& return_type) : fn_(fn), _argument_types(argument_types), _return_type(return_type) {} -Status JavaFunctionCall::prepare(FunctionContext* context, - FunctionContext::FunctionStateScope scope) { +Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::FunctionStateScope scope) { JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); if (env == nullptr) { diff --git a/be/src/vec/functions/function_java_udf.h b/be/src/vec/functions/function_java_udf.h index ad3335b384..9eb16c9e1c 100644 --- a/be/src/vec/functions/function_java_udf.h +++ b/be/src/vec/functions/function_java_udf.h @@ -51,7 +51,7 @@ public: return nullptr; } - Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) override; + Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override; Status execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments, size_t result, size_t input_rows_count, bool dry_run = false) override; diff --git a/be/src/vec/functions/function_jsonb.cpp b/be/src/vec/functions/function_jsonb.cpp index 02f352fb57..8260eb4e39 100644 --- a/be/src/vec/functions/function_jsonb.cpp +++ b/be/src/vec/functions/function_jsonb.cpp @@ -122,7 +122,7 @@ public: bool use_default_implementation_for_constants() const override { return true; } - Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { + Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { if constexpr (parse_error_handle_mode == JsonbParseErrorMode::RETURN_VALUE) { if (context->is_col_constant(1)) { const auto default_value_col = context->get_constant_col(1)->column_ptr; diff --git a/be/src/vec/functions/function_regexp.cpp b/be/src/vec/functions/function_regexp.cpp index 7ff07b9f09..87941a9c11 100644 --- a/be/src/vec/functions/function_regexp.cpp +++ b/be/src/vec/functions/function_regexp.cpp @@ -275,7 +275,7 @@ public: return make_nullable(std::make_shared()); } - Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { + Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { if (scope == FunctionContext::THREAD_LOCAL) { if (context->is_col_constant(1)) { DCHECK(!context->get_function_state(scope)); diff --git a/be/src/vec/functions/function_rpc.cpp b/be/src/vec/functions/function_rpc.cpp index 89131f4b94..8495b5493b 100644 --- a/be/src/vec/functions/function_rpc.cpp +++ b/be/src/vec/functions/function_rpc.cpp @@ -527,7 +527,7 @@ FunctionRPC::FunctionRPC(const TFunction& fn, const DataTypes& argument_types, const DataTypePtr& return_type) : _argument_types(argument_types), _return_type(return_type), _tfn(fn) {} -Status FunctionRPC::prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) { +Status FunctionRPC::open(FunctionContext* context, FunctionContext::FunctionStateScope scope) { _fn = std::make_unique(_tfn); if (!_fn->available()) { diff --git a/be/src/vec/functions/function_rpc.h b/be/src/vec/functions/function_rpc.h index 583b37deae..116c213b44 100644 --- a/be/src/vec/functions/function_rpc.h +++ b/be/src/vec/functions/function_rpc.h @@ -82,7 +82,7 @@ public: return nullptr; } - Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) override; + Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override; Status execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments, size_t result, size_t input_rows_count, bool dry_run = false) override; diff --git a/be/src/vec/functions/function_string.h b/be/src/vec/functions/function_string.h index ba8ea7a811..f9f624880c 100644 --- a/be/src/vec/functions/function_string.h +++ b/be/src/vec/functions/function_string.h @@ -2025,7 +2025,7 @@ static StringVal do_money_format(FunctionContext* context, const T int_value, char local[N]; char* p = SimpleItoaWithCommas(int_value, local, sizeof(local)); int32_t string_val_len = local + sizeof(local) - p + 3; - StringVal result = StringVal::create_temp_string_val(context, string_val_len); + StringVal result = context->create_temp_string_val(string_val_len); memcpy(result.ptr, p, string_val_len - 3); *(result.ptr + string_val_len - 3) = '.'; *(result.ptr + string_val_len - 2) = '0' + (frac_value / 10); @@ -2037,7 +2037,7 @@ static StringVal do_money_format(FunctionContext* context, const T int_value, static StringVal do_money_format(FunctionContext* context, const string& value) { bool is_positive = (value[0] != '-'); int32_t result_len = value.size() + (value.size() - (is_positive ? 4 : 5)) / 3; - StringVal result = StringVal::create_temp_string_val(context, result_len); + StringVal result = context->create_temp_string_val(result_len); if (!is_positive) { *result.ptr = '-'; } @@ -2490,7 +2490,7 @@ public: bool use_default_implementation_for_constants() const override { return true; } - Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { + Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { if (scope != FunctionContext::THREAD_LOCAL) { return Status::OK(); } diff --git a/be/src/vec/functions/functions_geo.cpp b/be/src/vec/functions/functions_geo.cpp index 21e68537de..d7fc9d0af4 100644 --- a/be/src/vec/functions/functions_geo.cpp +++ b/be/src/vec/functions/functions_geo.cpp @@ -251,7 +251,7 @@ struct StCircle { return Status::OK(); } - static Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) { + static Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) { return Status::OK(); } @@ -299,14 +299,14 @@ struct StContains { return Status::OK(); } - static Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) { + static Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) { return Status::OK(); } static Status close(FunctionContext* context, FunctionContext::FunctionStateScope scope) { return Status::OK(); } -}; +}; // namespace doris::vectorized struct StGeometryFromText { static constexpr auto NAME = "st_geometryfromtext"; @@ -376,7 +376,7 @@ struct StGeoFromText { return Status::OK(); } - static Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) { + static Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) { return Status::OK(); } diff --git a/be/src/vec/functions/functions_geo.h b/be/src/vec/functions/functions_geo.h index c276b0c2be..30266840c3 100644 --- a/be/src/vec/functions/functions_geo.h +++ b/be/src/vec/functions/functions_geo.h @@ -65,9 +65,9 @@ public: } } - Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { + Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { if constexpr (Impl::NEED_CONTEXT) { - return Impl::prepare(context, scope); + return Impl::open(context, scope); } else { return Status::OK(); } diff --git a/be/src/vec/functions/in.h b/be/src/vec/functions/in.h index eb1c01c343..9a8b096513 100644 --- a/be/src/vec/functions/in.h +++ b/be/src/vec/functions/in.h @@ -62,7 +62,7 @@ public: bool use_default_implementation_for_nulls() const override { return false; } - Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { + Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { if (scope == FunctionContext::THREAD_LOCAL) { return Status::OK(); } diff --git a/be/src/vec/functions/like.cpp b/be/src/vec/functions/like.cpp index 84e8600008..25993bbb74 100644 --- a/be/src/vec/functions/like.cpp +++ b/be/src/vec/functions/like.cpp @@ -531,7 +531,7 @@ void FunctionLike::remove_escape_character(std::string* search_string) { } } -Status FunctionLike::prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) { +Status FunctionLike::open(FunctionContext* context, FunctionContext::FunctionStateScope scope) { if (scope != FunctionContext::THREAD_LOCAL) { return Status::OK(); } @@ -590,8 +590,7 @@ Status FunctionLike::prepare(FunctionContext* context, FunctionContext::Function return Status::OK(); } -Status FunctionRegexp::prepare(FunctionContext* context, - FunctionContext::FunctionStateScope scope) { +Status FunctionRegexp::open(FunctionContext* context, FunctionContext::FunctionStateScope scope) { if (scope != FunctionContext::THREAD_LOCAL) { return Status::OK(); } diff --git a/be/src/vec/functions/like.h b/be/src/vec/functions/like.h index 40235faafa..fab8527e29 100644 --- a/be/src/vec/functions/like.h +++ b/be/src/vec/functions/like.h @@ -224,7 +224,7 @@ public: String get_name() const override { return name; } - Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) override; + Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override; friend struct LikeSearchState; @@ -254,7 +254,7 @@ public: String get_name() const override { return name; } - Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) override; + Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override; }; } // namespace doris::vectorized diff --git a/be/src/vec/functions/random.cpp b/be/src/vec/functions/random.cpp index 976b4340a8..64f8920fb6 100644 --- a/be/src/vec/functions/random.cpp +++ b/be/src/vec/functions/random.cpp @@ -40,7 +40,7 @@ public: return std::make_shared(); } - Status prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { + Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override { std::shared_ptr generator(new std::mt19937_64()); context->set_function_state(scope, generator); if (scope == FunctionContext::THREAD_LOCAL) { diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index fa9836aaec..4e4de45271 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -155,7 +155,6 @@ set(RUNTIME_TEST_FILES runtime/memory/chunk_allocator_test.cpp runtime/memory/system_allocator_test.cpp runtime/cache/partition_cache_test.cpp - runtime/free_pool_test.cpp #runtime/array_test.cpp ) set(TESTUTIL_TEST_FILES diff --git a/be/test/runtime/free_pool_test.cpp b/be/test/runtime/free_pool_test.cpp deleted file mode 100644 index 5dc89be45b..0000000000 --- a/be/test/runtime/free_pool_test.cpp +++ /dev/null @@ -1,67 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/free_pool.hpp" - -#include - -#include - -namespace doris { - -class FreePoolTest : public testing::Test { -public: - FreePoolTest() {} - -protected: - virtual void SetUp() override { - memPool.reset(new MemPool()); - freePool.reset(new FreePool(memPool.get())); - } - virtual void TearDown() override { - memPool.reset(nullptr); - freePool.reset(nullptr); - } - -protected: - std::unique_ptr memPool; - std::unique_ptr freePool; -}; - -inline uint16_t getFreeListNodeCount(FreePool::FreeListNode* list) { - auto count = 0; - auto current = list; - while (nullptr != current) { - count++; - current = current->next; - } - return count; -} - -TEST_F(FreePoolTest, free) { - const int64_t size = 16; - auto ptr = freePool->allocate(size); - - auto node = reinterpret_cast(ptr - sizeof(FreePool::FreeListNode)); - auto list = node->list; - auto beforeCount = getFreeListNodeCount(list); - freePool->free(ptr); - auto afterCount = getFreeListNodeCount(list); - - ASSERT_EQ(beforeCount + 1, afterCount); -} -} // namespace doris diff --git a/be/test/testutil/function_utils.cpp b/be/test/testutil/function_utils.cpp index eb2c50fa80..87ff68ad70 100644 --- a/be/test/testutil/function_utils.cpp +++ b/be/test/testutil/function_utils.cpp @@ -33,9 +33,7 @@ FunctionUtils::FunctionUtils() { _state = new RuntimeState(globals); doris_udf::FunctionContext::TypeDesc return_type; std::vector arg_types; - _memory_pool = new MemPool(); - _fn_ctx = FunctionContextImpl::create_context(_state, _memory_pool, return_type, arg_types, 0, - false); + _fn_ctx = FunctionContextImpl::create_context(_state, return_type, arg_types); } FunctionUtils::FunctionUtils(const doris_udf::FunctionContext::TypeDesc& return_type, @@ -46,15 +44,11 @@ FunctionUtils::FunctionUtils(const doris_udf::FunctionContext::TypeDesc& return_ globals.__set_timestamp_ms(1565026737805); globals.__set_time_zone("Asia/Shanghai"); _state = new RuntimeState(globals); - _memory_pool = new MemPool(); - _fn_ctx = FunctionContextImpl::create_context(_state, _memory_pool, return_type, arg_types, - varargs_buffer_size, false); + _fn_ctx = FunctionContextImpl::create_context(_state, return_type, arg_types); } FunctionUtils::~FunctionUtils() { - _fn_ctx->impl()->close(); delete _fn_ctx; - delete _memory_pool; if (_state) { delete _state; } diff --git a/be/test/testutil/function_utils.h b/be/test/testutil/function_utils.h index c13a532060..d22489a7e6 100644 --- a/be/test/testutil/function_utils.h +++ b/be/test/testutil/function_utils.h @@ -38,7 +38,6 @@ public: private: RuntimeState* _state = nullptr; - MemPool* _memory_pool = nullptr; doris_udf::FunctionContext* _fn_ctx = nullptr; }; diff --git a/be/test/udf/udf_test.cpp b/be/test/udf/udf_test.cpp index 6b4d886b0f..bf5d7e1fbc 100644 --- a/be/test/udf/udf_test.cpp +++ b/be/test/udf/udf_test.cpp @@ -121,17 +121,11 @@ IntVal num_var_args(FunctionContext*, const BigIntVal& dummy, int n, const IntVa IntVal validat_udf(FunctionContext* context) { EXPECT_EQ(context->version(), FunctionContext::V2_0); - EXPECT_FALSE(context->has_error()); - EXPECT_TRUE(context->error_msg() == nullptr); return IntVal::null(); } IntVal validate_fail(FunctionContext* context) { - EXPECT_FALSE(context->has_error()); - EXPECT_TRUE(context->error_msg() == nullptr); context->set_error("Fail"); - EXPECT_TRUE(context->has_error()); - EXPECT_TRUE(strcmp(context->error_msg(), "Fail") == 0); return IntVal::null(); } diff --git a/be/test/vec/function/function_test_util.h b/be/test/vec/function/function_test_util.h index 10ed7c032f..9cb7a2c5bd 100644 --- a/be/test/vec/function/function_test_util.h +++ b/be/test/vec/function/function_test_util.h @@ -208,7 +208,7 @@ Status check_function(const std::string& func_name, const InputTypeSet& input_ty ColumnNumbers arguments; std::vector arg_types; std::vector> constant_col_ptrs; - std::vector constant_cols; + std::vector> constant_cols; for (size_t i = 0; i < descs.size(); ++i) { auto& desc = descs[i]; arguments.push_back(i); @@ -216,7 +216,7 @@ Status check_function(const std::string& func_name, const InputTypeSet& input_ty if (desc.is_const) { constant_col_ptrs.push_back( std::make_shared(block.get_by_position(i).column)); - constant_cols.push_back(constant_col_ptrs.back().get()); + constant_cols.push_back(constant_col_ptrs.back()); } else { constant_cols.push_back(nullptr); } @@ -250,8 +250,8 @@ Status check_function(const std::string& func_name, const InputTypeSet& input_ty FunctionUtils fn_utils(fn_ctx_return, arg_types, 0); auto* fn_ctx = fn_utils.get_fn_ctx(); fn_ctx->impl()->set_constant_cols(constant_cols); - func->prepare(fn_ctx, FunctionContext::FRAGMENT_LOCAL); - func->prepare(fn_ctx, FunctionContext::THREAD_LOCAL); + func->open(fn_ctx, FunctionContext::FRAGMENT_LOCAL); + func->open(fn_ctx, FunctionContext::THREAD_LOCAL); block.insert({nullptr, return_type, "result"});