[CP] [FEAT MERGE] 424 PL stable features

This commit is contained in:
0xacc 2024-08-15 07:50:55 +00:00 committed by ob-robot
parent 6ebe5187be
commit 0759f42e44
35 changed files with 2324 additions and 2249 deletions

1
.gitignore vendored
View File

@ -171,6 +171,7 @@ src/pl/parser/_gen_parser.error
src/pl/parser/_gen_pl_parser.output
src/pl/parser/pl_parser_mysql_mode.output
src/pl/parser/pl_parser_oracle_mode.output
src/share/inner_table/sys_package/system_package.cpp
############# close_modules #############
close_modules/oracle_pl/pl/parser/*.output

View File

@ -206,6 +206,7 @@ public:
virtual void set_is_load_data_exec(bool v) { UNUSED(v); }
virtual void set_force_remote_exec(bool v) { UNUSED(v); }
virtual void set_use_external_session(bool v) { UNUSED(v); }
virtual void set_ob_enable_pl_cache(bool v) { UNUSED(v); }
virtual int64_t get_cluster_id() const { return common::OB_INVALID_ID; }
void set_session_init_status(bool status) { is_inited_ = status;}
virtual void set_user_timeout(int64_t user_timeout) { UNUSED(user_timeout); }

View File

@ -205,6 +205,7 @@ int ObCommonSqlProxy::write(const uint64_t tenant_id, const ObString sql,
conn->set_is_load_data_exec(param->is_load_data_exec_);
conn->set_use_external_session(param->use_external_session_);
conn->set_group_id(param->consumer_group_id_);
conn->set_ob_enable_pl_cache(param->enable_pl_cache_);
if (param->is_load_data_exec_) {
is_user_sql = true;
}

View File

@ -91,8 +91,8 @@ struct ObSessionParam final
{
public:
ObSessionParam()
: sql_mode_(nullptr), tz_info_wrap_(nullptr), ddl_info_(), is_load_data_exec_(false), use_external_session_(false), consumer_group_id_(0), nls_formats_{}
{}
: sql_mode_(nullptr), tz_info_wrap_(nullptr), ddl_info_(), is_load_data_exec_(false),
use_external_session_(false), consumer_group_id_(0), nls_formats_{}, enable_pl_cache_(true) {}
~ObSessionParam() = default;
public:
int64_t *sql_mode_;
@ -102,6 +102,7 @@ public:
bool use_external_session_; // need init remote inner sql conn with sess getting from sess mgr
int64_t consumer_group_id_;
common::ObString nls_formats_[common::ObNLSFormatEnum::NLS_MAX];
bool enable_pl_cache_;
};
// thread safe sql proxy

View File

@ -20,7 +20,6 @@ message(STATUS "This is SOURCE dir " ${PROJECT_SOURCE_DIR})
include_directories(${LLVM_INCLUDE_DIRS})
add_definitions(${LLVM_DEFINITIONS})
add_definitions(-Wno-deprecated)
add_definitions(-D_GLIBCXX_USE_CXX11_ABI=0 -g -O2 -frtti)
# Find the libraries that correspond to the LLVM components

View File

@ -365,7 +365,7 @@ public:
void dump_module();
void dump_debuginfo();
int verify_module();
uint64_t get_function_address(const common::ObString &name);
int get_function_address(const common::ObString &name, uint64_t &addr);
static void add_symbol(const common::ObString &name, void *value);
ObDIRawData get_debug_info() const;

View File

@ -23,22 +23,20 @@ namespace core
using namespace llvm;
int JitContext::InitializeModule(ObOrcJit &jit)
int JitContext::InitializeModule(const ObDataLayout &DL)
{
int ret = OB_SUCCESS;
TheJIT = &jit;
if (nullptr == (TheModule = std::make_unique<Module>("PL/SQL", TheJIT->getContext()))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory for LLVM Module", K(ret));
} else if (nullptr == (Builder = std::make_unique<IRBuilder<>>(TheJIT->getContext()))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory for LLVM Builder", K(ret));
if (OB_FAIL(ob_jit_make_unique(TheContext))) {
LOG_WARN("failed to make jit context", K(ret));
} else if (OB_FAIL(ob_jit_make_unique(TheModule, "PL/SQL", *TheContext))) {
LOG_WARN("failed to make jit module", K(ret));
} else if (OB_FAIL(ob_jit_make_unique(Builder, *TheContext))) {
LOG_WARN("failed to make ir builder", K(ret));
} else if (OB_FAIL(ob_jit_make_unique(TheFPM, TheModule.get()))) {
LOG_WARN("failed to make FPM", K(ret));
} else {
TheContext = &TheJIT->getContext();
TheModule->setDataLayout(TheJIT->getDataLayout());
Builder = std::make_unique<IRBuilder<>>(*TheContext);
TheFPM = std::make_unique<legacy::FunctionPassManager>(TheModule.get());
TheModule->setDataLayout(DL);
TheFPM->add(createInstructionCombiningPass());
TheFPM->add(createReassociatePass());
TheFPM->add(createGVNPass());
@ -49,12 +47,20 @@ int JitContext::InitializeModule(ObOrcJit &jit)
return ret;
}
void JitContext::compile()
int JitContext::compile(ObOrcJit &jit)
{
if (!Compile && nullptr != TheJIT) {
TheJIT->addModule(std::move(TheModule));
int ret = OB_SUCCESS;
if (Compile) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("already compiled", K(ret), K(lbt()));
} else if (OB_FAIL(jit.addModule(std::move(TheModule), std::move(TheContext)))) {
LOG_WARN("failed to add module to jit engine", K(ret));
} else {
Compile = true;
}
return ret;
}
int JitContext::optimize()

View File

@ -31,26 +31,27 @@ struct JitContext
{
public:
explicit JitContext()
: Compile(false), TheContext(nullptr), TheJIT(nullptr)
{
}
: Compile(false),
TheContext(nullptr),
Builder(nullptr),
TheModule(nullptr),
TheFPM(nullptr)
{ }
int InitializeModule(ObOrcJit &jit);
void compile();
int InitializeModule(const ObDataLayout &DL);
int compile(ObOrcJit &jit);
int optimize();
ObLLVMContext& get_context() { return *TheContext; }
IRBuilder<>& get_builder() { return *Builder; }
Module& get_module() { return *TheModule; }
ObOrcJit* get_jit() { return TheJIT; }
public:
bool Compile;
ObLLVMContext *TheContext;
std::unique_ptr<ObLLVMContext> TheContext;
std::unique_ptr<IRBuilder<>> Builder;
std::unique_ptr<Module> TheModule;
ObOrcJit *TheJIT;
std::unique_ptr<legacy::FunctionPassManager> TheFPM;
};

View File

@ -82,15 +82,8 @@ public:
// set memory protection state.
static int protect_mapped_memory(const ObJitMemoryBlock &block, int64_t p_flags);
static bool aarch64_addr_safe(void *addr, int64_t size);
};
bool ObJitMemory::aarch64_addr_safe(void *addr, int64_t size)
{
return reinterpret_cast<int64_t>(addr) >> 32 == (reinterpret_cast<int64_t>(addr)+size) >> 32;
}
ObJitMemoryBlock ObJitMemory::allocate_mapped_memory(int64_t num_bytes,
int64_t p_flags)
{
@ -107,25 +100,11 @@ ObJitMemoryBlock ObJitMemory::allocate_mapped_memory(int64_t num_bytes,
do {
addr = ::mmap(reinterpret_cast<void*>(start), page_size*num_pages,
p_flags, mm_flags, fd, 0);
if (MAP_FAILED == addr
#if defined(__aarch64__)
|| !aarch64_addr_safe(addr, page_size*num_pages)
#endif
) {
if (MAP_FAILED == addr) {
if (REACH_TIME_INTERVAL(10000000)) { //间隔10s打印日志
LOG_ERROR_RET(common::OB_ALLOCATE_MEMORY_FAILED, "allocate jit memory failed", K(addr), K(num_bytes), K(page_size), K(num_pages));
}
::usleep(100000); //100ms
#if defined(__aarch64__)
if (MAP_FAILED != addr) {
if (0 != ::munmap(addr, page_size*num_pages)) {
LOG_WARN_RET(OB_ERR_SYS, "jit block munmap failed", K(addr), K(page_size*num_pages));
}
start = reinterpret_cast<int64_t>(addr) + UINT32_MAX - page_size*num_pages; //先向上移4G,再移动此次分配的大小,以保证此次分配的地址高16位一致
LOG_INFO("aarch64 memory allocated not safe, try again", K(addr), K(start), K(page_size), K(num_pages));
addr = MAP_FAILED;
}
#endif
} else {
LOG_DEBUG("allocate mapped memory success!",
K(addr), K(start),
@ -370,6 +349,7 @@ void ObJitAllocator::reserve(const JitMemType mem_type, int64_t sz, int64_t alig
}
}
// Returns true if an error occurred, false otherwise.
bool ObJitAllocator::finalize()
{
int ret = OB_SUCCESS;
@ -384,7 +364,7 @@ bool ObJitAllocator::finalize()
LOG_WARN("fail to finalize code memory", K(ret));
}
return OB_SUCC(ret);
return OB_FAIL(ret);
}
void ObJitAllocator::free() {

View File

@ -35,7 +35,6 @@
#include "llvm/IR/DataLayout.h"
#include "llvm/IR/Mangler.h"
#include "llvm/IR/Verifier.h"
#include "llvm/Support/DynamicLibrary.h"
#include "llvm/Support/SourceMgr.h"
#include "llvm/Support/raw_ostream.h"
#include "llvm/AsmParser/Parser.h"
@ -56,94 +55,139 @@ namespace jit
namespace core
{
#ifndef ORC2
DenseMap<StringRef, JITTargetAddress> ObJitGlobalSymbolGenerator::symbol_table;
ObOrcJit::ObOrcJit(common::ObIAllocator &Allocator)
: DebugBuf(nullptr),
DebugLen(0),
JITAllocator(),
NotifyLoaded(Allocator, DebugBuf, DebugLen, SoObject),
TheContext(),
ObResolver(createLegacyLookupResolver(
ObES,
[this](StringRef Name) { return findMangledSymbol(std::string(Name)); },
[](Error Err) { cantFail(std::move(Err), "lookupFlags failed"); })),
ObTM(EngineBuilder().selectTarget()),
ObDL(ObTM->createDataLayout()),
ObObjectLayer(AcknowledgeORCv1Deprecation,
ObES,
[this](ObVModuleKey) {
return ObObjLayerT::Resources{
std::make_shared<ObJitMemoryManager>(JITAllocator), ObResolver}; },
NotifyLoaded),
ObCompileLayer(AcknowledgeORCv1Deprecation, ObObjectLayer, SimpleCompiler(*ObTM))
ObEngineBuilder(),
ObJitEngine()
{ }
ObVModuleKey ObOrcJit::addModule(std::unique_ptr<Module> M)
int ObOrcJit::init()
{
auto Key = ObES.allocateVModule();
cantFail(ObCompileLayer.addModule(Key, std::move(M)));
ObModuleKeys.push_back(Key);
return Key;
int ret = OB_SUCCESS;
ObEngineBuilder.setObjectLinkingLayerCreator(
[this](ExecutionSession &ES, const Triple &TT) {
auto ObjLinkingLayer =
std::make_unique<RTDyldObjectLinkingLayer>(
ES,
[&]() {
return std::make_unique<ObJitMemoryManager>(JITAllocator);
});
#ifndef NDEBUG
ObjLinkingLayer->registerJITEventListener(
*JITEventListener::createGDBRegistrationListener());
#endif // NDEBUG
ObjLinkingLayer->registerJITEventListener(NotifyLoaded);
return ObjLinkingLayer;
});
auto tm_builder_wrapper = JITTargetMachineBuilder::detectHost();
if (!tm_builder_wrapper) {
Error err = tm_builder_wrapper.takeError();
std::string msg = toString(std::move(err));
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get target machine", K(msg.c_str()));
} else {
ObEngineBuilder.setJITTargetMachineBuilder(*tm_builder_wrapper);
}
return ret;
}
ObJITSymbol ObOrcJit::lookup(std::string Name)
int ObOrcJit::addModule(std::unique_ptr<Module> M, std::unique_ptr<ObLLVMContext> TheContext)
{
return findMangledSymbol(mangle(Name));
int ret = OB_SUCCESS;
if (OB_FAIL(create_jit_engine())) {
LOG_WARN("failed to create jit engine", K(ret));
} else if (OB_ISNULL(ObJitEngine)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected NULL jit engine", K(ret), K(lbt()));
} else {
Error err = ObJitEngine->addIRModule(ThreadSafeModule{std::move(M), std::move(TheContext)});
if (err) {
std::string msg = toString(std::move(err));
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to add module to jit engine",
K(ret), K(msg.c_str()));
}
}
return ret;
}
uint64_t ObOrcJit::get_function_address(const std::string Name)
int ObOrcJit::lookup(const std::string &name, ObJITSymbol &symbol)
{
return static_cast<uint64_t>(cantFail(lookup(Name).getAddress()));
int ret = OB_SUCCESS;
if (OB_ISNULL(ObJitEngine)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected NULL jit engine", K(ret), K(lbt()));
} else {
auto value = ObJitEngine->lookup(name);
if (!value) {
Error err = value.takeError();
if (err.isA<SymbolsNotFound>()) {
ret = OB_ENTRY_NOT_EXIST;
} else {
ret = OB_ERR_UNEXPECTED;
}
std::string msg = toString(std::move(err));
LOG_WARN("failed to lookup symbol in jit engine",
K(ret),
"name", name.c_str(),
"msg", msg.c_str());
} else {
symbol = *value;
}
}
return ret;
}
#else
static ExitOnError ExitOnErr;
ObOrcJit::ObOrcJit(ObIAllocator &Allocator, JITTargetMachineBuilder JTMB, ObDataLayout ObDL)
: DebugBuf(nullptr),
DebugLen(0),
JITAllocator(),
NotifyLoaded(Allocator, DebugBuf, DebugLen),
ObObjectLayer(ObES,
[this]() { return std::make_unique<ObJitMemoryManager>(JITAllocator); }),
ObCompileLayer(ObES,
ObObjectLayer,
std::make_unique<ConcurrentIRCompiler>(std::move(JTMB))),
ObDL(std::move(ObDL)),
Mangle(ObES, this->ObDL),
Ctx(std::make_unique<ObLLVMContext>()),
MainJD(ObES.createBareJITDylib("<main>"))
int ObOrcJit::get_function_address(const std::string &name, uint64_t &addr)
{
/*
MainJD.define(absoluteSymbols({
{ Mangle("eh_personality"), pointerToJITTargetAddress(&ObPLEH::eh_personality) }
}));
*/
MainJD.addGenerator(
cantFail(DynamicLibrarySearchGenerator::GetForCurrentProcess(
ObDL.getGlobalPrefix())));
int ret = OB_SUCCESS;
ObJITSymbol sym = nullptr;
if (OB_FAIL(lookup(name, sym))) {
LOG_WARN("failed to lookup symbol addr", K(name.c_str()));
} else {
auto value = sym.getAddress();
if (!value) {
Error err = value.takeError();
std::string msg = toString(std::move(err));
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get symbol address",
K(ret),
"name", name.c_str(),
"msg", msg.c_str());
} else {
addr = static_cast<uint64_t>(*value);
}
}
return ret;
}
Error ObOrcJit::addModule(std::unique_ptr<Module> M)
{
return ObCompileLayer.add(MainJD, ThreadSafeModule(std::move(M), Ctx));
}
Expected<JITEvaluatedSymbol> ObOrcJit::lookup(StringRef Name)
{
return ObES.lookup({&MainJD}, Mangle(Name.str()));
}
uint64_t ObOrcJit::get_function_address(const std::string Name)
{
std::cerr << "get_function_address : " << Name << std::endl;
auto Sym = ExitOnErr(lookup(Name));
std::cerr << "get_function_address finish : " << Name << std::endl;
return static_cast<uint64_t>(Sym.getAddress());
}
#endif
void ObNotifyLoaded::operator()(
void ObNotifyLoaded::notifyObjectLoaded(
ObVModuleKey Key,
const object::ObjectFile &Obj,
const RuntimeDyld::LoadedObjectInfo &Info)
@ -169,13 +213,29 @@ void ObNotifyLoaded::operator()(
// }
}
void ObOrcJit::add_compiled_object(size_t length, const char *ptr)
int ObOrcJit::add_compiled_object(size_t length, const char *ptr)
{
ObVModuleKey Key = ObES.allocateVModule();
int ret = OB_SUCCESS;
cantFail(ObObjectLayer.addObject(
Key, MemoryBuffer::getMemBuffer(StringRef(ptr, length), "", false)));
ObModuleKeys.push_back(Key);
if (OB_FAIL(create_jit_engine())) {
LOG_WARN("failed to create jit engine", K(ret));
} else if (OB_ISNULL(ObJitEngine)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected NULL jit engine", K(ret), K(lbt()));
} else {
Error err =ObJitEngine->addObjectFile(
MemoryBuffer::getMemBuffer(StringRef(ptr, length), "", false));
if (err) {
std::string msg = toString(std::move(err));
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to add compile result to jit engine",
K(ret), K(msg.c_str()), K(length), K(ptr));
}
}
return ret;
}
int ObOrcJit::set_optimize_level(ObPLOptLevel level)
@ -187,14 +247,53 @@ int ObOrcJit::set_optimize_level(ObPLOptLevel level)
LOG_WARN("unexpected PLSQL_OPTIMIZE_LEVEL", K(ret), K(level), K(lbt()));
}
if (OB_SUCC(ret) && OB_ISNULL(ObTM)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected NULL TM", K(ret), K(ObTM.get()), K(lbt()));
if (OB_SUCC(ret) && level == ObPLOptLevel::O0) {
auto &tm_builder = ObEngineBuilder.getJITTargetMachineBuilder();
if (!tm_builder.hasValue()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected NULL JITTargetMachineBuilder", K(ret), K(lbt()));
} else {
auto &builder = *tm_builder;
builder.setCodeGenOptLevel(CodeGenOpt::Level::None);
builder.getOptions().EnableFastISel = true;
}
}
if (OB_SUCC(ret) && level == ObPLOptLevel::O0) {
ObTM->setOptLevel(CodeGenOpt::Level::None);
ObTM->setFastISel(true);
return ret;
}
int ObOrcJit::create_jit_engine()
{
int ret = OB_SUCCESS;
if (OB_NOT_NULL(ObJitEngine)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected NOT NULL jit engine", K(ret), K(lbt()));
} else {
std::unique_ptr<ObJitGlobalSymbolGenerator> symbol_generator = nullptr;
auto engine_wrapper = ObEngineBuilder.create();
if (!engine_wrapper) {
Error err = engine_wrapper.takeError();
std::string msg = toString(std::move(err));
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to build LLVM JIT engine", K(msg.c_str()));
} else {
ObJitEngine = std::move(*engine_wrapper);
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_ISNULL(ObJitEngine)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected NULL jit engine", K(ret));
} else if (OB_FAIL(ob_jit_make_unique(symbol_generator))) {
LOG_WARN("failed to make ObJitGlobalSymbolGenerator unique_ptr", K(ret));
} else {
ObJitEngine->getMainJITDylib().addGenerator(std::move(symbol_generator));
}
}
return ret;

View File

@ -20,11 +20,11 @@
#include "llvm/ExecutionEngine/ObjectCache.h"
#include "llvm/IR/DataLayout.h"
#include "llvm/DebugInfo/DWARF/DWARFContext.h"
#include "llvm/ExecutionEngine/Orc/LLJIT.h"
#include "llvm/ExecutionEngine/Orc/ExecutionUtils.h"
#include "llvm/ExecutionEngine/JITEventListener.h"
#include "llvm/IR/Mangler.h"
#include <map>
#include <string>
#include "core/ob_jit_memory_manager.h"
@ -40,6 +40,33 @@ enum class ObPLOptLevel : int
O2 = 2,
O3 = 3
};
template<typename T, typename ...Args>
static inline int ob_jit_make_unique(std::unique_ptr<T> &ptr, Args&&... args) {
int ret = OB_SUCCESS;
std::unique_ptr<T> result = nullptr;
try {
result = std::make_unique<T>(std::forward<Args>(args)...);
} catch (const std::bad_alloc &e) {
ret = OB_ALLOCATE_MEMORY_FAILED;
SERVER_LOG(WARN, "failed to allocate memory", K(ret), K(e.what()));
} catch (...) {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "unexpected exception in std::make_unique", K(ret), K(lbt()));
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_ISNULL(result)) {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "unexpected NULL ptr of std::make_unque", K(ret), K(lbt()));
} else {
ptr = std::move(result);
}
return ret;
}
namespace core {
using namespace llvm;
@ -51,8 +78,10 @@ typedef ::llvm::TargetMachine ObTargetMachine;
typedef ::llvm::DataLayout ObDataLayout;
typedef ::llvm::orc::VModuleKey ObVModuleKey;
typedef ::llvm::JITSymbol ObJITSymbol;
typedef ::llvm::orc::JITDylib::DefinitionGenerator ObJitDefinitionGenerator;
typedef ::llvm::JITEventListener ObJitEventListener;
class ObNotifyLoaded
class ObNotifyLoaded: public ObJitEventListener
{
public:
explicit ObNotifyLoaded(
@ -60,9 +89,9 @@ public:
: Allocator(Allocator), DebugBuf(DebugBuf), DebugLen(DebugLen), SoObject(SoObject) {}
virtual ~ObNotifyLoaded() {}
void operator()(ObVModuleKey Key,
void notifyObjectLoaded(ObVModuleKey Key,
const object::ObjectFile &Obj,
const RuntimeDyld::LoadedObjectInfo &Info);
const RuntimeDyld::LoadedObjectInfo &Info) override;
private:
common::ObIAllocator &Allocator;
char* &DebugBuf;
@ -70,128 +99,95 @@ private:
ObString &SoObject;
};
class ObJitGlobalSymbolGenerator: public ObJitDefinitionGenerator {
public:
Error tryToGenerate(orc::LookupKind K,
orc::JITDylib &JD,
orc::JITDylibLookupFlags JDLookupFlags,
const orc::SymbolLookupSet &LookupSet) override
{
for (const auto &sym : LookupSet) {
auto res = symbol_table.find(*sym.first);
if (res != symbol_table.end()) {
Error err = JD.define(orc::absoluteSymbols(
{{sym.first, JITEvaluatedSymbol(res->second, {})}}));
if (err) {
StringRef name = *sym.first;
std::string msg = toString(std::move(err));
SERVER_LOG_RET(WARN, OB_ERR_UNEXPECTED,
"failed to define SPI interface symbol",
"name", ObString(name.size(), name.data()),
"msg", msg.c_str(),
K(lbt()));
return err;
}
}
}
return Error::success();
}
static void add_symbol(StringRef name, void *addr) {
symbol_table[name] = pointerToJITTargetAddress(addr);
}
private:
static DenseMap<StringRef, JITTargetAddress> symbol_table;
};
#ifndef ORC2
class ObOrcJit
{
public:
using ObObjLayerT = llvm::orc::LegacyRTDyldObjectLinkingLayer;
using ObCompileLayerT = llvm::orc::LegacyIRCompileLayer<ObObjLayerT, llvm::orc::SimpleCompiler>;
using ObLLJITBuilder = llvm::orc::LLJITBuilder;
using ObJitEngineT = llvm::orc::LLJIT;
explicit ObOrcJit(common::ObIAllocator &Allocator);
virtual ~ObOrcJit() {};
ObVModuleKey addModule(std::unique_ptr<Module> M);
ObJITSymbol lookup(const std::string Name);
uint64_t get_function_address(const std::string Name);
int addModule(std::unique_ptr<Module> M, std::unique_ptr<ObLLVMContext> TheContext);
int get_function_address(const std::string &name, uint64_t &addr);
ObLLVMContext &getContext() { return TheContext; }
const ObDataLayout &getDataLayout() const { return ObDL; }
char* get_debug_info_data() { return DebugBuf; }
int64_t get_debug_info_size() { return DebugLen; }
void add_compiled_object(size_t length, const char *ptr);
int add_compiled_object(size_t length, const char *ptr);
const ObString& get_compiled_object() const { return SoObject; }
int set_optimize_level(ObPLOptLevel level);
private:
std::string mangle(const std::string &Name)
{
std::string MangledName; {
raw_string_ostream MangledNameStream(MangledName);
Mangler::getNameWithPrefix(MangledNameStream, Name, ObDL);
}
return MangledName;
}
int init();
ObJITSymbol findMangledSymbol(const std::string &Name)
{
const bool ExportedSymbolsOnly = true;
for (auto H : make_range(ObModuleKeys.rbegin(), ObModuleKeys.rend())) {
if (auto Sym = ObCompileLayer.findSymbolIn(H, Name, ExportedSymbolsOnly)) {
return Sym;
}
}
if (auto SymAddr = RTDyldMemoryManager::getSymbolAddressInProcess(Name)) {
return ObJITSymbol(SymAddr, JITSymbolFlags::Exported);
}
return nullptr;
}
const ObDataLayout &get_DL() const { return ObDL; }
private:
int lookup(const std::string &name, ObJITSymbol &symbol);
int create_jit_engine();
static ObJitGlobalSymbolGenerator symbol_generator;
char *DebugBuf;
int64_t DebugLen;
ObJitAllocator JITAllocator;
ObNotifyLoaded NotifyLoaded;
ObLLVMContext TheContext;
ObExecutionSession ObES;
std::shared_ptr<ObSymbolResolver> ObResolver;
std::unique_ptr<ObTargetMachine> ObTM;
const ObDataLayout ObDL;
ObObjLayerT ObObjectLayer;
ObCompileLayerT ObCompileLayer;
std::vector<ObVModuleKey> ObModuleKeys;
ObString SoObject;
ObLLJITBuilder ObEngineBuilder;
std::unique_ptr<ObJitEngineT> ObJitEngine;
};
#else
class ObOrcJit
{
public:
explicit ObOrcJit(
common::ObIAllocator &Allocator, llvm::orc::JITTargetMachineBuilder JTMB, ObDataLayout ObDL);
virtual ~ObOrcJit() {};
Error addModule(std::unique_ptr<Module> M);
Expected<JITEvaluatedSymbol> lookup(StringRef Name);
uint64_t get_function_address(const std::string Name);
static ObOrcJit* create(ObIAllocator &allocator)
{
auto JTMB = llvm::orc::JITTargetMachineBuilder::detectHost();
if (!JTMB)
return nullptr;
auto ObDL = JTMB->getDefaultDataLayoutForTarget();
if (!ObDL)
return nullptr;
return OB_NEWx(ObOrcJit, (&allocator), allocator, std::move(*JTMB), std::move(*ObDL));
}
ObLLVMContext &getContext() { return *Ctx.getContext(); }
const ObDataLayout &getDataLayout() const { return ObDL; }
char* get_debug_info_data() { return DebugBuf; }
int64_t get_debug_info_size() { return DebugLen; }
private:
char *DebugBuf;
int64_t DebugLen;
ObJitAllocator JITAllocator;
ObNotifyLoaded NotifyLoaded;
llvm::orc::ObExecutionSession ObES;
llvm::orc::RTDyldObjectLinkingLayer ObObjectLayer;
llvm::orc::IRCompileLayer ObCompileLayer;
llvm::ObDataLayout ObDL;
llvm::orc::MangleAndInterner Mangle;
llvm::orc::ThreadSafeContext Ctx;
llvm::orc::JITDylib &MainJD;
};
#endif
} // core
} // jit
} // oceanbase

View File

@ -20,7 +20,6 @@
#include "llvm/IR/GlobalVariable.h"
#include "llvm/ExecutionEngine/ExecutionEngine.h"
#include "llvm/Support/TargetSelect.h"
#include "llvm/Support/DynamicLibrary.h"
#include "share/ob_define.h"
#include "objit/ob_llvm_helper.h"
@ -514,18 +513,24 @@ int ObLLVMHelper::init()
} else if (nullptr == (jc_ = OB_NEWx(core::JitContext, (&allocator_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory for jit context", K(ret));
#ifndef ORC2
} else if (nullptr == (jit_ = OB_NEWx(core::ObOrcJit, (&allocator_), allocator_))) {
#else
} else if (nullptr == (jit_ = core::ObOrcJit::create(allocator_))) {
#endif
jc_->~JitContext();
allocator_.free(jc_);
jc_ = nullptr;
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory for jit", K(ret));
} else if (OB_FAIL(jc_->InitializeModule(*jit_))) {
} else if (OB_FAIL(jit_->init())) {
jit_->~ObOrcJit();
allocator_.free(jit_);
jit_ = nullptr;
jc_->~JitContext();
allocator_.free(jc_);
jc_ = nullptr;
LOG_WARN("failed to init jit engine", K(ret));
} else if (OB_FAIL(jc_->InitializeModule(jit_->get_DL()))) {
jit_->~ObOrcJit();
allocator_.free(jit_);
jit_ = nullptr;
@ -560,9 +565,9 @@ int ObLLVMHelper::initialize()
llvm::InitializeNativeTargetAsmPrinter();
llvm::InitializeNativeTargetAsmParser();
llvm::sys::DynamicLibrary::LoadLibraryPermanently(nullptr);
#if !defined(__aarch64__)
/*Do not juse use !defined(__aarch64__) here*/
//#if !defined(__aarch64__)
#if defined(__x86_64__)
// initialize LLVM X86 unfold table
llvm::lookupUnfoldTable(0);
#endif
@ -590,6 +595,7 @@ int ObLLVMHelper::init_llvm() {
ObLLVMFunctionType ft;
ObLLVMBasicBlock block;
ObLLVMValue magic;
uint64_t addr;
OZ (helper.get_llvm_type(ObIntType, int64_type));
OZ (arg_types.push_back(int64_type));
@ -601,7 +607,7 @@ int ObLLVMHelper::init_llvm() {
OZ (helper.create_ret(magic));
OZ (helper.compile_module(jit::ObPLOptLevel::O2));
OX (helper.get_function_address(init_func_name));
OZ (helper.get_function_address(init_func_name, addr));
return ret;
}
@ -620,7 +626,7 @@ int ObLLVMHelper::compile_module(jit::ObPLOptLevel optimization)
dump_module();
}
OB_LLVM_MALLOC_GUARD(GET_PL_MOD_STRING(pl::OB_PL_JIT));
jc_->compile();
jc_->compile(*jit_);
}
return ret;
@ -670,15 +676,23 @@ int ObLLVMHelper::verify_module()
return ret;
}
uint64_t ObLLVMHelper::get_function_address(const ObString &name)
int ObLLVMHelper::get_function_address(const ObString &name, uint64_t &addr)
{
OB_LLVM_MALLOC_GUARD(GET_PL_MOD_STRING(pl::OB_PL_JIT));
return jc_->TheJIT->get_function_address(std::string(name.ptr(), name.length()));
int ret = OB_SUCCESS;
CK (OB_NOT_NULL(jit_));
if (OB_SUCC(ret)) {
OB_LLVM_MALLOC_GUARD(GET_PL_MOD_STRING(pl::OB_PL_JIT));
OZ (jit_->get_function_address(std::string(name.ptr(), name.length()), addr));
}
return ret;
}
void ObLLVMHelper::add_symbol(const ObString &name, void *value)
{
llvm::sys::DynamicLibrary::AddSymbol(make_string_ref(name), value);
core::ObJitGlobalSymbolGenerator::add_symbol(make_string_ref(name), value);
}
ObDIRawData ObLLVMHelper::get_debug_info() const
@ -2248,7 +2262,7 @@ int ObLLVMHelper::add_compiled_object(size_t length, const char *ptr)
CK (OB_NOT_NULL(jit_));
CK (OB_NOT_NULL(ptr));
CK (OB_LIKELY(length > 0));
OX (jit_->add_compiled_object(length, ptr));
OZ (jit_->add_compiled_object(length, ptr));
return ret;
}

View File

@ -177,21 +177,26 @@ int ObQueryDriver::response_query_result(ObResultSet &result,
can_retry = true;
bool is_first_row = true;
const ObNewRow *result_row = NULL;
bool has_top_limit = result.get_has_top_limit();
bool is_cac_found_rows = result.is_calc_found_rows();
int64_t limit_count = OB_INVALID_COUNT == fetch_limit ? INT64_MAX : fetch_limit;
int64_t row_num = 0;
ObSqlCtx *sql_ctx = result.get_exec_context().get_sql_ctx();
if (!has_top_limit && OB_INVALID_COUNT == fetch_limit) {
limit_count = INT64_MAX;
if (!lib::is_oracle_mode()) {
if (OB_FAIL(session_.get_sql_select_limit(limit_count))) {
LOG_WARN("fail tp get sql select limit", K(ret));
}
}
}
bool is_packed = result.get_physical_plan() ? result.get_physical_plan()->is_packed() : false;
MYSQL_PROTOCOL_TYPE protocol_type = is_ps_protocol ? MYSQL_PROTOCOL_TYPE::BINARY : MYSQL_PROTOCOL_TYPE::TEXT;
int64_t limit_count = INT64_MAX;
if (OB_FAIL(ret)) {
} else if (OB_INVALID_COUNT != fetch_limit) {
limit_count = fetch_limit;
} else if (lib::is_mysql_mode()) {
if (!result.get_has_top_limit() && OB_FAIL(session_.get_sql_select_limit(limit_count))) {
LOG_WARN("failed to get sytem variable sql_select_limit", K(ret));
}
} else { // lib::is_oracle_mode()
if (OB_FAIL(session_.get_oracle_sql_select_limit(limit_count))) {
LOG_WARN("failed to get sytem variable _oracle_sql_select_limit", K(ret));
}
}
const common::ColumnsFieldIArray *fields = NULL;
if (OB_SUCC(ret)) {
fields = result.get_field_columns();

View File

@ -483,11 +483,24 @@ public:
{
// sql which in pl will local retry first. see ObInnerSQLConnection::process_retry.
// sql which not in pl use the same strategy to avoid never getting the lock.
if (v.force_local_retry_ || (v.local_retry_times_ <= 1 && !v.result_.is_pl_stmt(v.result_.get_stmt_type()))) {
v.retry_type_ = RETRY_TYPE_LOCAL;
if (v.is_from_pl_) {
if (v.local_retry_times_ <= 1 ||
!v.session_.get_pl_can_retry() ||
ObSQLUtils::is_in_autonomous_block(v.session_.get_cur_exec_ctx())) {
v.no_more_test_ = true;
v.retry_type_ = RETRY_TYPE_LOCAL;
} else {
v.no_more_test_ = true;
v.retry_type_ = RETRY_TYPE_NONE;
v.client_ret_ = v.err_;
}
} else {
const ObMultiStmtItem &multi_stmr_item = v.ctx_.multi_stmt_item_;
try_packet_retry(v);
if (v.force_local_retry_ || (v.local_retry_times_ <= 1 && !v.result_.is_pl_stmt(v.result_.get_stmt_type()))) {
v.retry_type_ = RETRY_TYPE_LOCAL;
} else {
const ObMultiStmtItem &multi_stmr_item = v.ctx_.multi_stmt_item_;
try_packet_retry(v);
}
}
}
};

View File

@ -1177,7 +1177,12 @@ int ObMPStmtExecute::execute_response(ObSQLSessionInfo &session,
if (OB_SUCC(ret)) {
ObPLExecCtx pl_ctx(cursor->get_allocator(), &result.get_exec_context(), NULL/*params*/,
NULL/*result*/, &ret, NULL/*func*/, true);
if (OB_FAIL(ObSPIService::dbms_dynamic_open(&pl_ctx, *cursor))) {
int64_t orc_max_ret_rows = INT64_MAX;
if (lib::is_oracle_mode()
&& OB_FAIL(session.get_oracle_sql_select_limit(orc_max_ret_rows))) {
LOG_WARN("failed to get sytem variable _oracle_sql_select_limit", K(ret));
} else if (OB_FAIL(ObSPIService::dbms_dynamic_open(
&pl_ctx, *cursor, false, orc_max_ret_rows))) {
LOG_WARN("open cursor fail. ", K(ret), K(stmt_id_));
if (!THIS_WORKER.need_retry()) {
int cli_ret = OB_SUCCESS;

View File

@ -478,12 +478,16 @@ int ObMPStmtPrexecute::execute_response(ObSQLSessionInfo &session,
ObPLExecCtx pl_ctx(cursor->get_allocator(), &result.get_exec_context(), NULL/*params*/,
NULL/*result*/, &ret, NULL/*func*/, true);
get_ctx().cur_sql_ = sql_;
if (
int64_t orc_max_ret_rows = INT64_MAX;
if (lib::is_oracle_mode()
&& OB_FAIL(session.get_oracle_sql_select_limit(orc_max_ret_rows))) {
LOG_WARN("failed to get sytem variable _oracle_sql_select_limit", K(ret));
} else if (
#ifdef ERRSIM
OB_FAIL(common::EventTable::COM_STMT_PREXECUTE_PS_CURSOR_OPEN_ERROR) ||
#endif
OB_FAIL(ObSPIService::dbms_dynamic_open(&pl_ctx, *cursor))
) {
OB_FAIL(ObSPIService::dbms_dynamic_open(
&pl_ctx, *cursor, false, orc_max_ret_rows))) {
LOG_WARN("cursor open faild.", K(cursor->get_id()));
// select do not support arraybinding
if (!THIS_WORKER.need_retry()) {

View File

@ -326,6 +326,11 @@ void ObInnerSQLConnection::set_is_load_data_exec(bool v)
get_session().set_load_data_exec_session(v);
}
void ObInnerSQLConnection::set_ob_enable_pl_cache(bool v)
{
get_session().set_local_ob_enable_pl_cache(v);
}
int ObInnerSQLConnection::init_session_info(
sql::ObSQLSessionInfo *session,
const bool is_extern_session,

View File

@ -189,6 +189,7 @@ public:
virtual void set_is_load_data_exec(bool v);
virtual void set_force_remote_exec(bool v) { force_remote_execute_ = v; }
virtual void set_use_external_session(bool v) { use_external_session_ = v; }
virtual void set_ob_enable_pl_cache(bool v) override;
bool is_nested_conn();
virtual void set_user_timeout(int64_t timeout) { user_timeout_ = timeout; }
virtual int64_t get_user_timeout() const { return user_timeout_; }

View File

@ -41,10 +41,40 @@ ob_set_subtarget(ob_pl pl_cache
pl_cache/ob_pl_cache.cpp
pl_cache/ob_pl_cache_mgr.cpp
pl_cache/ob_pl_cache_object.cpp
diagnosis/ob_pl_sql_audit_guard.cpp
)
set(SYS_PACK_SQL_SOURCE_DIR ${PROJECT_SOURCE_DIR}/src/share/inner_table/sys_package)
set(gen_sys_pack_file ${SYS_PACK_SQL_SOURCE_DIR}/system_package.cpp)
message(STATUS "generating ${gen_sys_pack_file} ...")
file (GLOB embed_sys_pack_files "${SYS_PACK_SQL_SOURCE_DIR}/*.sql")
list (LENGTH embed_sys_pack_files embed_sys_pack_cnt)
file (WRITE ${gen_sys_pack_file}
"// This file is generated by CMake, do not edit it!\n\n"
"#include <cstdint>\n"
"#include <utility>\n\n"
"namespace oceanbase {\n"
"namespace pl {\n\n"
"extern const int64_t syspack_source_count = ${embed_sys_pack_cnt};\n"
"extern const std::pair<const char * const, const char* const> syspack_source_contents[] = {\n")
foreach(embed_file ${embed_sys_pack_files})
get_filename_component(file_name ${embed_file} NAME)
file(SIZE ${embed_file} embed_file_size)
file(READ ${embed_file} filedata)
file(APPEND ${gen_sys_pack_file}
"{\n"
"\"${file_name}\", // size = ${embed_file_size} bytes\n"
"R\"sys_pack_del\(${filedata}\)sys_pack_del\"\n"
"},\n")
endforeach()
file (APPEND ${gen_sys_pack_file}
"};\n\n"
"} // namespace pl\n"
"} // namespace oceanbase\n")
message(STATUS "generate ${gen_sys_pack_file} done")
ob_set_subtarget(ob_pl sys_package
sys_package/ob_dbms_external_table.cpp
${gen_sys_pack_file}
sys_package/ob_dbms_stats.cpp
sys_package/ob_dbms_scheduler_mysql.cpp
sys_package/ob_dbms_application.cpp
@ -59,6 +89,7 @@ ob_set_subtarget(ob_pl sys_package
sys_package/ob_pl_dbms_resource_manager.cpp
sys_package/ob_pl_dbms_trusted_certificate_manager.cpp
sys_package/ob_dbms_limit_calculator_mysql.cpp
sys_package/ob_dbms_external_table.cpp
)
ob_set_subtarget(ob_pl dblink

View File

@ -0,0 +1,119 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX PL
#include "ob_pl_sql_audit_guard.h"
#include "sql/ob_spi.h"
namespace oceanbase
{
namespace pl
{
ObPLSqlAuditGuard::ObPLSqlAuditGuard(
sql::ObExecContext &exec_ctx,
ObSQLSessionInfo &session_info,
ObSPIResultSet &spi_result,
ObPLSqlAuditRecord &record,
int &ret,
ObString ps_sql,
observer::ObQueryRetryCtrl &retry_ctrl,
ObPLSPITraceIdGuard &traceid_guard,
stmt::StmtType stmt_type)
: exec_ctx_(exec_ctx),
session_info_(session_info),
spi_result_(spi_result),
record_(record),
ret_(ret),
ps_sql_(ps_sql),
retry_ctrl_(retry_ctrl),
traceid_guard_(traceid_guard),
stmt_type_(stmt_type)
{
enable_perf_event_ = lib::is_diagnose_info_enabled();
enable_sql_audit_ = GCONF.enable_sql_audit && session_info_.get_local_ob_enable_sql_audit();
// enable_sql_stat_ = session_info.is_sqlstat_enabled();
max_wait_guard_ = new (memory1) ObMaxWaitGuard(enable_perf_event_ ? &max_wait_desc_ : NULL);
total_wait_guard_ = new (memory2) ObTotalWaitGuard(enable_perf_event_ ? &total_wait_desc_ : NULL);
if (enable_perf_event_) {
record_.exec_record_.record_start();
}
// if (enable_sql_stat_ && OB_NOT_NULL(exec_ctx_.get_sql_ctx())) {
// record_.sqlstat_record_.record_sqlstat_start_value();
// record_.sqlstat_record_.set_is_in_retry(session_info_.get_is_in_retry());
// session_info_.sql_sess_record_sql_stat_start_value(record_.sqlstat_record_);
// }
// 监控项统计开始
record_.time_record_.set_send_timestamp(ObTimeUtility::current_time());
}
ObPLSqlAuditGuard::~ObPLSqlAuditGuard()
{
int &ret = ret_;
record_.time_record_.set_exec_end_timestamp(ObTimeUtility::current_time());
if (enable_perf_event_) {
record_.exec_record_.record_end();
}
// if (enable_sql_stat_ && OB_NOT_NULL(exec_ctx_.get_sql_ctx()) && OB_NOT_NULL(spi_result_.get_result_set())) {
// record_.sqlstat_record_.record_sqlstat_end_value();
// record_.sqlstat_record_.move_to_sqlstat_cache(
// session_info_, exec_ctx_.get_sql_ctx()->cur_sql_, spi_result_.get_result_set()->get_physical_plan());
// }
max_wait_guard_->~ObMaxWaitGuard();
total_wait_guard_->~ObTotalWaitGuard();
LOG_TRACE("Start PL/Sql Audit Record"/*, KPC(this)*/ );
if (OB_NOT_NULL(spi_result_.get_result_set())) {
if (spi_result_.get_result_set()->is_inited()) {
if (ObStmt::is_execute_stmt(stmt_type_)) {
ps_sql_ = session_info_.get_current_query_string();
}
int64_t try_cnt = session_info_.get_raw_audit_record().try_cnt_;
ObExecRecord record_bak = session_info_.get_raw_audit_record().exec_record_;
session_info_.get_raw_audit_record().try_cnt_ = retry_ctrl_.get_retry_times();
session_info_.get_raw_audit_record().pl_trace_id_.set(traceid_guard_.origin_trace_id_);
observer::ObInnerSQLConnection::process_record(*(spi_result_.get_result_set()),
spi_result_.get_sql_ctx(),
session_info_,
record_.time_record_,
ret_,
session_info_.get_current_execution_id(),
OB_INVALID_ID, //FIXME@hr351303
max_wait_desc_,
total_wait_desc_,
record_.exec_record_,
record_.exec_timestamp_,
true,
ps_sql_,
true,
spi_result_.get_exec_params_str_ptr());
session_info_.get_raw_audit_record().exec_record_ = record_bak;
session_info_.get_raw_audit_record().try_cnt_ = try_cnt;
session_info_.get_raw_audit_record().pl_trace_id_.reset();
} else {
LOG_DEBUG("result set is not inited, do not process record", K(ret_), K(ps_sql_));
}
} else {
if (OB_SUCCESS == ret_) {
ret_ = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error, result_set is null", K(ret_), K(ps_sql_));
} else {
LOG_WARN("result_set is null", K(ret_), K(ps_sql_));
}
}
}
} // namespace pl
} // namespace oceanbase

View File

@ -0,0 +1,132 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_SRC_PL_SQL_AUDIT_GUARD_OB_PL_H_
#define OCEANBASE_SRC_PL_SQL_AUDIT_GUARD_OB_PL_H_
#include "sql/monitor/ob_exec_stat.h"
#include "observer/ob_inner_sql_connection.h"
#include "sql/resolver/ob_stmt_type.h"
namespace oceanbase
{
namespace observer
{
class ObQueryRetryCtrl;
}
namespace sql
{
struct ObPLSPITraceIdGuard;
}
namespace pl
{
class ObPLTimeRecord : public observer::ObITimeRecord
{
public:
ObPLTimeRecord()
: send_timestamp_(0),
receive_timestamp_(0),
enqueue_timestamp_(0),
run_timestamp_(0),
process_timestamp_(0),
single_process_timestamp_(0),
exec_start_timestamp_(0),
exec_end_timestamp_(0) {}
virtual ~ObPLTimeRecord() {}
void set_send_timestamp(int64_t send_timestamp) { send_timestamp_ = send_timestamp; }
void set_receive_timestamp(int64_t receive_timestamp) { receive_timestamp_ = receive_timestamp; }
void set_enqueue_timestamp(int64_t enqueue_timestamp) { enqueue_timestamp_ = enqueue_timestamp; }
void set_run_timestamp(int64_t run_timestamp) { run_timestamp_ = run_timestamp; }
void set_process_timestamp(int64_t process_timestamp) { process_timestamp_ = process_timestamp; }
void set_single_process_timestamp(int64_t single_process_timestamp) { single_process_timestamp_ = single_process_timestamp; }
void set_exec_start_timestamp(int64_t exec_start_timestamp) { exec_start_timestamp_ = exec_start_timestamp; }
void set_exec_end_timestamp(int64_t exec_end_timestamp) { exec_end_timestamp_ = exec_end_timestamp; }
int64_t get_send_timestamp() const { return send_timestamp_; }
int64_t get_receive_timestamp() const { return get_send_timestamp(); }
int64_t get_enqueue_timestamp() const { return get_send_timestamp(); }
int64_t get_run_timestamp() const { return get_send_timestamp(); }
int64_t get_process_timestamp() const { return get_send_timestamp(); }
int64_t get_single_process_timestamp() const { return get_send_timestamp(); }
int64_t get_exec_start_timestamp() const { return get_send_timestamp(); }
int64_t get_exec_end_timestamp() const { return exec_end_timestamp_; }
public:
int64_t send_timestamp_;
int64_t receive_timestamp_;
int64_t enqueue_timestamp_;
int64_t run_timestamp_;
int64_t process_timestamp_;
int64_t single_process_timestamp_;
int64_t exec_start_timestamp_;
int64_t exec_end_timestamp_;
};
class ObPLSqlAuditRecord
{
public:
ObPLSqlAuditRecord(sql::ExecType exec_type_) {
exec_timestamp_.exec_type_ = exec_type_;
}
public:
sql::ObExecRecord exec_record_;
// ObExecutingSqlStatRecord sqlstat_record_;
sql::ObExecTimestamp exec_timestamp_;
ObPLTimeRecord time_record_;
};
class ObPLSqlAuditGuard
{
public:
ObPLSqlAuditGuard(sql::ObExecContext &exec_ctx,
sql::ObSQLSessionInfo &session_info,
sql::ObSPIResultSet &spi_result,
ObPLSqlAuditRecord &record,
int &ret,
ObString ps_sql,
observer::ObQueryRetryCtrl &retry_ctrl,
sql::ObPLSPITraceIdGuard &traceid_guard,
sql::stmt::StmtType stmt_type);
~ObPLSqlAuditGuard();
private:
bool enable_perf_event_;
bool enable_sql_audit_;
bool enable_sql_stat_;
sql::ObExecContext &exec_ctx_;
sql::ObSQLSessionInfo &session_info_;
ObWaitEventDesc max_wait_desc_;
ObWaitEventStat total_wait_desc_;
ObMaxWaitGuard *max_wait_guard_;
ObTotalWaitGuard *total_wait_guard_;
char memory1[sizeof(ObMaxWaitGuard)];
char memory2[sizeof(ObTotalWaitGuard)];
sql::ObSPIResultSet &spi_result_;
ObPLSqlAuditRecord &record_;
int &ret_;
ObString ps_sql_;
observer::ObQueryRetryCtrl &retry_ctrl_;
sql::ObPLSPITraceIdGuard &traceid_guard_;
sql::stmt::StmtType stmt_type_;
};
}
}
#endif /*OCEANBASE_SRC_PL_SQL_AUDIT_GUARD_OB_PL_H_*/

View File

@ -4762,8 +4762,13 @@ int ObPL::check_session_alive(const ObBasicSessionInfo &session) {
int ObPLFunction::gen_action_from_precompiled(const ObString &name, size_t length,
const char *ptr) {
int ret = OB_SUCCESS;
uint64_t addr = 0;
OZ (helper_.add_compiled_object(length, ptr));
OX (set_action(helper_.get_function_address(name)));
OZ (helper_.get_function_address(name, addr));
OX (set_action(addr));
return ret;
}

View File

@ -8094,15 +8094,29 @@ int ObPLCodeGenerator::final_expression(ObPLCompileUnit &pl_func)
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < ast_.get_obj_access_exprs().count(); ++i) {
ObRawExpr *expr = ast_.get_obj_access_expr(i);
ObObjAccessRawExpr *obj_access_expr = nullptr;
uint64_t addr = 0;
if (OB_ISNULL(expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("obj_access_expr is null");
} else if (!expr->is_obj_access_expr()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("not a obj access", K(*expr), K(ret));
} else if (OB_ISNULL(obj_access_expr = static_cast<ObObjAccessRawExpr*>(expr))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected NULL obj_access_expr", K(ret), K(*expr));
} else if (OB_FAIL(helper_.get_function_address(obj_access_expr->get_func_name(), addr))) {
if (OB_ENTRY_NOT_EXIST == ret) {
LOG_INFO("failed to find obj_access_expr symbol in JIT engine, will ignore this error",
K(ret), K(obj_access_expr->get_func_name()));
ret = OB_SUCCESS;
obj_access_expr->set_get_attr_func_addr(0);
} else {
LOG_WARN("failed to compile obj_access_expr", K(ret), K(obj_access_expr->get_func_name()), K(addr));
}
} else {
ObObjAccessRawExpr* obj_access_expr = static_cast<ObObjAccessRawExpr*>(expr);
obj_access_expr->set_get_attr_func_addr(helper_.get_function_address(obj_access_expr->get_func_name()));
obj_access_expr->set_get_attr_func_addr(addr);
}
}
@ -8618,6 +8632,8 @@ int ObPLCodeGenerator::generate_normal(ObPLFunction &pl_func)
}
if (OB_SUCC(ret)) {
uint64_t addr = 0;
if (OB_FAIL(final_expression(pl_func))) {
LOG_WARN("generate obj access expr failed", K(ret));
} else if (OB_FAIL(pl_func.set_variables(get_ast().get_symbol_table()))) {
@ -8626,11 +8642,13 @@ int ObPLCodeGenerator::generate_normal(ObPLFunction &pl_func)
LOG_WARN("failed to set ref objects", K(get_ast().get_dependency_table()), K(ret));
} else if (OB_FAIL(pl_func.set_types(get_ast().get_user_type_table()))) {
LOG_WARN("failed to set types", K(ret));
} else if (OB_FAIL(helper_.get_function_address(get_ast().get_name(), addr))) {
LOG_WARN("failed to compile pl routine", K(ret), K(get_ast().get_name()), K(addr));
} else {
pl_func.add_members(get_ast().get_flag());
pl_func.set_pipelined(get_ast().get_pipelined());
pl_func.set_action(helper_.get_function_address(get_ast().get_name()));
pl_func.set_can_cached(get_ast().get_can_cached());
pl_func.set_action(addr);
pl_func.set_can_cached(get_ast().get_can_cached());
pl_func.set_is_all_sql_stmt(get_ast().get_is_all_sql_stmt());
pl_func.set_has_parallel_affect_factor(get_ast().has_parallel_affect_factor());
}

View File

@ -39,14 +39,152 @@ using namespace sql;
namespace pl
{
int ObPLPackageManager::read_package_sql(FILE* file, char* buf, int64_t buf_len, bool &eof)
// Usage: ObCharStream *s -> s.open() -> s.next().is_eos() ... -> s.close()
class ObCharStream
{
public:
ObCharStream(const char *name) : eos_flag_(false), name_(name) {}
virtual ~ObCharStream() {}
ObCharStream(const ObCharStream &) = delete;
const ObCharStream &operator=(const ObCharStream &) = delete;
const char *get_name() { return name_; }
virtual int open() = 0;
virtual const ObCharStream &next(char &c) = 0;
virtual bool is_eos() const { return eos_flag_; }
virtual int close() = 0;
VIRTUAL_TO_STRING_KV(K_(eos_flag), K_(name));
protected:
static const char EOS = '\0';
bool eos_flag_;
private:
const char *const name_;
};
// read package sql from array embeded in the oberver binary file
class ObCStringStream final : public ObCharStream
{
public:
ObCStringStream(const char *package_name, const char *data)
: ObCharStream(package_name), data_(data), cursor_(nullptr) {}
~ObCStringStream() {}
int open() override {
int ret = OB_SUCCESS;
if (OB_ISNULL(data_)) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("package sql data is null", K(ret), K(data_));
} else if (0 == strlen(data_)) {
LOG_INFO("package sql file is empty or not exists", K(ret));
} else {
cursor_ = data_;
}
return ret;
}
const ObCharStream &next(char &c) override {
if (OB_NOT_NULL(cursor_) && !eos_flag_) {
c = *cursor_++;
eos_flag_ = (c == EOS);
} else {
c = EOS;
}
return *this;
}
int close() override {
int ret = OB_SUCCESS;
if (OB_NOT_NULL(cursor_)) {
if (EOS != *(cursor_ - 1)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("C string stream has not been completely consumed", K(ret),
K(cursor_ - data_), K(data_), K(cursor_));
} else {
cursor_ = nullptr;
eos_flag_ = false;
}
}
return ret;
}
INHERIT_TO_STRING_KV("ObCharStream", ObCharStream,
"C string stream bytes comsumed", cursor_ - data_);
private:
const char *const data_;
const char *cursor_;
};
// read package sql from file under admin/
class ObFileStream final : public ObCharStream
{
public:
ObFileStream(const char *package_name, const char *file_path)
: ObCharStream(package_name), file_path_(file_path), file_(nullptr) {}
~ObFileStream() {
// make sure file_ closed
if (file_) {
fclose(file_);
file_ = nullptr;
}
}
int open() override {
int ret = OB_SUCCESS;
if (OB_ISNULL(file_path_) || 0 != access(file_path_, F_OK)) {
ret = OB_FILE_NOT_EXIST;
LOG_WARN("package sql file not exists", K(ret), K(file_path_));
} else if (OB_ISNULL(file_ = fopen(file_path_, "rb"))) {
ret = OB_IO_ERROR;
LOG_WARN("package sql file open failed", K(ret), K(file_path_));
}
return ret;
}
const ObCharStream &next(char &c) override {
int ch;
if (OB_NOT_NULL(file_) && !eos_flag_) {
if (EOF == (ch = fgetc(file_))) {
c = EOS;
eos_flag_ = true;
} else {
c = static_cast<char>(ch);
}
} else {
c = EOS;
}
return *this;
}
int close() override {
int ret = OB_SUCCESS;
if (OB_NOT_NULL(file_)) {
if (0 == feof(file_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("file content has not been completely consumed", K(ret), K(errno));
} else if (0 != fclose(file_)) {
ret = OB_IO_ERROR;
LOG_WARN("close file failed", K(ret), K(file_path_), K(file_));
} else {
file_ = nullptr;
eos_flag_ = false;
}
}
return ret;
}
INHERIT_TO_STRING_KV("ObCharStream", ObCharStream, K_(file_path),
"file stream bytes comsumed", file_ ? ftell(file_) : -1);
private:
const char *const file_path_;
FILE *file_;
};
int ObPLPackageManager::read_package_sql(ObCharStream &stream, char* buf, int64_t buf_len, bool &eos)
{
int ret = OB_SUCCESS;
enum {S_LINE_START, S_NORMAL, S_COMMENT, S_TERMINATE} state = S_LINE_START;
if (OB_ISNULL(file)) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("package sql file is null", K(ret));
} else if (OB_ISNULL(buf)) {
if (OB_ISNULL(buf)) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("sql buffer is null", K(ret));
} else if (buf_len <= 0) {
@ -56,13 +194,13 @@ int ObPLPackageManager::read_package_sql(FILE* file, char* buf, int64_t buf_len,
char *p = buf;
char *p_start = p;
char *p_end = p + buf_len - 1;
int c;
char c;
// clear buffer
*p = '\0';
*p_end = '\0';
while (OB_SUCC(ret) && state != S_TERMINATE) {
if (EOF == (c = fgetc(file))) {
ret = OB_IO_ERROR;
if (stream.next(c).is_eos()) {
ret = OB_ITER_END;
} else {
if (p >= p_end) {
ret = OB_INVALID_ARGUMENT;
@ -77,18 +215,17 @@ int ObPLPackageManager::read_package_sql(FILE* file, char* buf, int64_t buf_len,
} else if ('#' == c) {
state = S_COMMENT;
} else if ('-' == c) {
c = fgetc(file);
if ('-' == c) {
if (stream.next(c).is_eos()) {
ret = OB_ITER_END;
} else if ('-' == c) {
state = S_COMMENT;
} else if (EOF == c) {
ret = OB_IO_ERROR;
} else {
*p++ = '-';
*p++ = static_cast<char>(c);
*p++ = c;
state = S_NORMAL;
}
} else {
*p++ = static_cast<char>(c);
*p++ = c;
state = S_NORMAL;
}
}
@ -107,11 +244,11 @@ int ObPLPackageManager::read_package_sql(FILE* file, char* buf, int64_t buf_len,
*(p - 1) = '\0';
state = S_TERMINATE;
} else {
*p++ = static_cast<char>(c);
*p++ = c;
state = S_NORMAL;
}
} else {
*p++ = static_cast<char>(c);
*p++ = c;
if ('\n' == c) {
state = S_LINE_START;
} else {
@ -130,19 +267,20 @@ int ObPLPackageManager::read_package_sql(FILE* file, char* buf, int64_t buf_len,
}
}
if (OB_FAIL(ret)) {
if (feof(file)) {
eof = true;
if (stream.is_eos()) {
eos = true;
ret = OB_SUCCESS;
} else {
LOG_WARN("read package file error", K(errno), K(ret));
LOG_WARN("read package file error", K(ret), K(stream));
}
}
}
return ret;
}
int ObPLPackageManager::read_and_exec_package_sql(
ObMySQLProxy &sql_proxy, const char* package_full_path, ObCompatibilityMode compa_mode)
int ObPLPackageManager::read_and_exec_package_sql(ObMySQLProxy &sql_proxy,
ObCharStream &stream,
ObCompatibilityMode compa_mode)
{
int ret = OB_SUCCESS;
if (!sql_proxy.is_inited() || !sql_proxy.is_active()) {
@ -150,13 +288,9 @@ int ObPLPackageManager::read_and_exec_package_sql(
LOG_WARN("sql_proxy not inited or not active", "sql_proxy inited",
sql_proxy.is_inited(), "sql_proxy active", sql_proxy.is_active(), K(ret));
} else {
FILE* file = NULL;
int64_t affected_rows = 0;
if (access(package_full_path, F_OK) != 0) {
LOG_INFO("package sql file not exists", K(package_full_path), K(ret));
} else if (OB_ISNULL(file = fopen(package_full_path, "rb"))) {
ret = OB_IO_ERROR;
LOG_WARN("package sql file open failed", K(package_full_path), K(ret));
if (OB_FAIL(stream.open())) {
LOG_WARN("failed to open package file data stream", K(ret), K(stream));
} else {
// system tenant will run with mysql compatibility mode
// but we need to create system packages with oralce compatibility
@ -165,35 +299,35 @@ int ObPLPackageManager::read_and_exec_package_sql(
bool create_external_table = false;
ObSessionParam param;
ObSessionParam *param_ptr = nullptr;
char *last_slash = strrchr(const_cast<char*>(package_full_path), '/');
const char *pacakge_filename = (last_slash != NULL) ? last_slash + 1 : package_full_path;
int64_t sql_mode = SMO_STRICT_ALL_TABLES | SMO_NO_ZERO_IN_DATE | SMO_NO_AUTO_CREATE_USER;
// allow affected_rows > 0 when exec sql in external_table_alert_log.sql
if (strcmp(pacakge_filename, "external_table_alert_log.sql") == 0) {
if (strcmp(stream.get_name(), "external_table_alert_log") == 0) {
create_external_table = true;
param.sql_mode_ = &sql_mode;
param_ptr = &param;
}
// do not cache the compilation results of system packages into the PL cache when loading system packages.
param.enable_pl_cache_ = false;
SMART_VAR(char[OB_MAX_SQL_LENGTH], sql_buf) {
while (OB_SUCC(ret) && !eof) {
if (OB_FAIL(read_package_sql(file, sql_buf, OB_MAX_SQL_LENGTH, eof))) {
LOG_WARN("fail to read package sql file", K(ret));
if (FAILEDx(read_package_sql(stream, sql_buf, OB_MAX_SQL_LENGTH, eof))) {
LOG_WARN("fail to read package sql data", K(ret));
} else if (strlen(sql_buf) != 0
&& OB_FAIL(sql_proxy.write(OB_SYS_TENANT_ID,
&& OB_FAIL(sql_proxy.write(OB_SYS_TENANT_ID,
sql_buf,
affected_rows,
static_cast<int64_t>(compa_mode),
param_ptr))) {
&param))) {
LOG_WARN("fail to exec package sql", K(sql_buf), K(ret));
} else if (affected_rows != 0 && !create_external_table) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("affected_rows expected to be zero", K(affected_rows), K(ret));
LOG_WARN("affected_rows expected to be zero", K(ret), K(affected_rows), K(stream.get_name()));
} else {
OZ (ObSPIService::force_refresh_schema(OB_SYS_TENANT_ID));
}
LOG_INFO("package source data consumed", K(ret), K(stream));
}
}
fclose(file);
if (create_external_table && OB_SUCC(ret)) {
uint64_t data_version = 0;
common::ObString alter_table_sql("alter external table sys_external_tbs.__all_external_alert_log_info auto_refresh immediate");
@ -216,29 +350,80 @@ int ObPLPackageManager::read_and_exec_package_sql(
return ret;
}
int ObPLPackageManager::load_sys_package(
ObMySQLProxy &sql_proxy, const char *package_spec_name, const char *package_body_name, ObCompatibilityMode compa_mode)
// import source file content array declarations, which is defined in system_package.cpp generated by CMake.
extern const int64_t syspack_source_count;
extern const std::pair<const char * const, const char* const> syspack_source_contents[];
int ObPLPackageManager::get_syspack_source_file_content(const char *file_name, const char *&content)
{
int ret = OB_SUCCESS;
const int64_t begin_time = ObTimeUtility::current_time();
LOG_INFO("load sys package begin",
"package name", package_spec_name, "package body name", package_body_name);
if (OB_FAIL(read_and_exec_package_sql(sql_proxy, package_spec_name, compa_mode))) {
LOG_WARN("fail to read and exec package header sql", K(package_spec_name), K(ret));
} else if (OB_FAIL(read_and_exec_package_sql(sql_proxy, package_body_name, compa_mode))) {
LOG_WARN("fail to read and exec package body sql", K(package_body_name), K(ret));
OX (content = nullptr);
if (OB_ISNULL(file_name)) {
// return nullptr as `content`
LOG_INFO("file name c string is null", K(file_name));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < syspack_source_count; i++) {
if (0 == ObString(file_name).case_compare(syspack_source_contents[i].first)) {
content = syspack_source_contents[i].second;
break;
}
}
// `content` not found, report error
OV (OB_NOT_NULL(content), OB_ERR_UNEXPECTED, "system source file not found", ret, file_name);
}
const int64_t now = ObTimeUtility::current_time();
LOG_INFO("load sys package finish", "total_time_used", now - begin_time);
return ret;
}
static const char* sys_package_dir = "admin";
static ObSysPackageFile oracle_sys_package_file_table[] = {
int ObPLPackageManager::load_sys_package(ObMySQLProxy &sql_proxy,
const ObSysPackageFile &pack_file_info,
ObCompatibilityMode compa_mode,
bool from_file)
{
int ret = OB_SUCCESS;
const char *package_name = pack_file_info.package_name;
const char *spec_file = pack_file_info.package_spec_file_name;
const char *body_file = pack_file_info.package_body_file_name;
const int64_t begin_time = ObTimeUtility::current_time();
LOG_INFO("load sys package", K(package_name), K(spec_file), K(body_file), K(begin_time));
if (from_file) {
const char *sys_package_dir = "admin";
char spec_file_path[MAX_PATH_SIZE] = {0};
char body_file_path[MAX_PATH_SIZE] = {0};
if (OB_SUCC(ret) && OB_NOT_NULL(spec_file)) {
OZ (databuff_printf(spec_file_path, MAX_PATH_SIZE, "%s/%s", sys_package_dir, spec_file));
ObFileStream spec_stream{package_name, spec_file_path};
OZ (read_and_exec_package_sql(sql_proxy, spec_stream, compa_mode), spec_stream);
}
if (OB_SUCC(ret) && OB_NOT_NULL(body_file)) {
OZ (databuff_printf(body_file_path, MAX_PATH_SIZE, "%s/%s", sys_package_dir, body_file));
ObFileStream body_stream{package_name, body_file_path};
OZ (read_and_exec_package_sql(sql_proxy, body_stream, compa_mode), body_stream);
}
} else {
const char *spec_content = nullptr;
const char *body_content = nullptr;
OZ (get_syspack_source_file_content(spec_file, spec_content));
if (OB_SUCC(ret) && OB_NOT_NULL(spec_content)) {
ObCStringStream spec_stream{package_name, spec_content};
OZ (read_and_exec_package_sql(sql_proxy, spec_stream, compa_mode), spec_stream);
}
OZ (get_syspack_source_file_content(body_file, body_content));
if (OB_SUCC(ret) && OB_NOT_NULL(body_content)) {
ObCStringStream body_stream{package_name, body_content};
OZ (read_and_exec_package_sql(sql_proxy, body_stream, compa_mode), body_stream);
}
}
const int64_t now = ObTimeUtility::current_time();
LOG_INFO("load sys package finish", K(ret), K(package_name), "total_time_used", now - begin_time);
return ret;
}
static const ObSysPackageFile oracle_syspack_file_list[] = {
#ifdef OB_BUILD_ORACLE_PL
{"dbms_standard", "dbms_standard.sql", "dbms_standard_body.sql"},
{"dbms_standard", "dbms_standard.sql", nullptr},
{"dbms_output", "dbms_output.sql", "dbms_output_body.sql"},
{"dbms_metadata", "dbms_metadata.sql", "dbms_metadata_body.sql"},
{"dbms_spm", "dbms_spm.sql", "dbms_spm_body.sql"},
@ -251,7 +436,7 @@ static ObSysPackageFile oracle_sys_package_file_table[] = {
{"sa_sysdba", "sa_sysdba.sql", "sa_sysdba_body.sql"},
{"sa_user_admin", "sa_user_admin.sql", "sa_user_admin_body.sql"},
{"utl_i18n", "utl_i18n.sql", "utl_i18n_body.sql"},
{"dbms_crypto", "dbms_crypto.sql","dbms_crypto_body.sql"},
{"dbms_crypto", "dbms_crypto.sql", "dbms_crypto_body.sql"},
{"dbms_random", "dbms_random.sql", "dbms_random_body.sql"},
{"dbms_debug", "dbms_debug.sql", "dbms_debug_body.sql"},
{"utl_inaddr", "utl_inaddr.sql", "utl_inaddr_body.sql"},
@ -263,16 +448,15 @@ static ObSysPackageFile oracle_sys_package_file_table[] = {
{"dbms_xa", "dbms_xa.sql", "dbms_xa_body.sql"},
{"dbms_resource_manager", "dbms_resource_manager.sql", "dbms_resource_manager_body.sql"},
{"dbms_utility", "dbms_utility.sql", "dbms_utility_body.sql"},
{"odciconst", "odciconst.sql", "odciconst_body.sql"},
{"odciconst", "odciconst.sql", nullptr},
{"dbms_stats", "dbms_stats.sql", "dbms_stats_body.sql"},
{"dbms_any", "dbms_any.sql", "dbms_any_body.sql"},
{"xml_type", "xml_type.sql", "xml_type_body.sql"},
{"dbms_crypto", "dbms_crypto.sql", "dbms_crypto_body.sql"},
{"dbms_ijob", "dbms_ijob.sql", "dbms_ijob_body.sql"},
{"dbms_job", "dbms_job.sql", "dbms_job_body.sql"},
{"dbms_ischeduler", "dbms_ischeduler.sql", "dbms_ischeduler_body.sql"},
{"dbms_scheduler", "dbms_scheduler.sql", "dbms_scheduler_body.sql"},
{"catodci", "catodci.sql", "catodci_body.sql"},
{"catodci", "catodci.sql", nullptr},
{"dbms_describe", "dbms_describe.sql", "dbms_describe_body.sql"},
{"utl_file", "utl_file.sql", "utl_file_body.sql"},
{"dbms_plan_cache", "dbms_plancache.sql", "dbms_plancache_body.sql"},
@ -292,7 +476,7 @@ static ObSysPackageFile oracle_sys_package_file_table[] = {
{"dbms_mview", "dbms_mview.sql", "dbms_mview_body.sql"},
{"dbms_mview_stats", "dbms_mview_stats.sql", "dbms_mview_stats_body.sql"},
{"json_array_t", "json_array_type.sql", "json_array_type_body.sql"},
{"xmlsequence", "xml_sequence_type.sql", "xml_sequence_type_body.sql"},
{"xmlsequence", "xml_sequence_type.sql", nullptr},
{"utl_recomp", "utl_recomp.sql", "utl_recomp_body.sql"},
{"sdo_geometry", "sdo_geometry.sql", "sdo_geometry_body.sql"},
{"sdo_geom", "sdo_geom.sql", "sdo_geom_body.sql"},
@ -300,8 +484,7 @@ static ObSysPackageFile oracle_sys_package_file_table[] = {
{"dbms_profiler", "dbms_profiler.sql", "dbms_profiler_body.sql"},
#endif
};
static ObSysPackageFile mysql_sys_package_file_table[] = {
static const ObSysPackageFile mysql_syspack_file_list[] = {
{"dbms_stats", "dbms_stats_mysql.sql", "dbms_stats_body_mysql.sql"},
{"dbms_scheduler", "dbms_scheduler_mysql.sql", "dbms_scheduler_mysql_body.sql"},
{"dbms_ischeduler", "dbms_ischeduler_mysql.sql", "dbms_ischeduler_mysql_body.sql"},
@ -320,119 +503,109 @@ static ObSysPackageFile mysql_sys_package_file_table[] = {
{"dbms_trusted_certificate_manager", "dbms_trusted_certificate_manager_mysql.sql", "dbms_trusted_certificate_manager_body_mysql.sql"},
{"dbms_ob_limit_calculator", "dbms_ob_limit_calculator_mysql.sql", "dbms_ob_limit_calculator_body_mysql.sql"},
{"dbms_external_table", "dbms_external_table_mysql.sql", "dbms_external_table_body_mysql.sql"},
{"external_table_alert_log", "external_table_alert_log.sql", "none"}
{"external_table_alert_log", "external_table_alert_log.sql", nullptr}
};
int ObPLPackageManager::load_sys_package(ObMySQLProxy &sql_proxy, ObString &package_name, ObCompatibilityMode compa_mode)
// for now! we only have one special system package "__DBMS_UPGRADE"
static const ObSysPackageFile oracle_special_syspack_file_list[] = {
{"__dbms_upgrade", "__dbms_upgrade.sql", "__dbms_upgrade_body.sql"},
};
static const ObSysPackageFile mysql_special_syspack_file_list[] = {
{"__dbms_upgrade", "__dbms_upgrade_mysql.sql", "__dbms_upgrade_body_mysql.sql"},
};
int ObPLPackageManager::load_sys_package(ObMySQLProxy &sql_proxy,
ObString &package_name,
ObCompatibilityMode compa_mode,
bool from_file)
{
int ret = OB_SUCCESS;
char package_spec_full_path[MAX_PATH_SIZE] = {};
char package_body_full_path[MAX_PATH_SIZE] = {};
bool dir_exists = false;
bool package_exists = false;
if (OB_FAIL(FileDirectoryUtils::is_exists(sys_package_dir, dir_exists))) {
LOG_WARN("check sys package dir whether exist failed", K(ret), K(sys_package_dir));
} else if (!dir_exists) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("sys package dir not exist", K(ret), K(sys_package_dir));
}
const ObSysPackageFile *pack_file_info = nullptr;
#define SEARCH_SYSPACK_FILE_BY_NAME(syspack_file_list) \
do { \
if (pack_file_info == nullptr) { \
int sys_package_count = ARRAYSIZEOF(syspack_file_list); \
for (int64_t i = 0; OB_SUCC(ret) && i < sys_package_count; ++i) { \
if (0 == package_name.case_compare(ObString(syspack_file_list[i].package_name))) { \
pack_file_info = &syspack_file_list[i]; \
break; \
} \
} \
} \
} while (0)
if (ObCompatibilityMode::ORACLE_MODE == compa_mode) {
int sys_package_count = ARRAYSIZEOF(oracle_sys_package_file_table);
for (int64_t i = 0; OB_SUCC(ret) && i < sys_package_count; ++i) {
if (0 == package_name.case_compare(ObString(oracle_sys_package_file_table[i].package_name))) {
const char *package_spec_name = oracle_sys_package_file_table[i].package_spec_file_name;
const char *package_body_name = oracle_sys_package_file_table[i].package_body_file_name;
OZ (databuff_printf(
package_spec_full_path, MAX_PATH_SIZE, "%s/%s", sys_package_dir, package_spec_name));
OZ (databuff_printf(
package_body_full_path, MAX_PATH_SIZE, "%s/%s", sys_package_dir, package_body_name));
OX (package_exists = true);
break;
}
}
SEARCH_SYSPACK_FILE_BY_NAME(oracle_syspack_file_list);
SEARCH_SYSPACK_FILE_BY_NAME(oracle_special_syspack_file_list);
} else if (ObCompatibilityMode::MYSQL_MODE == compa_mode) {
int sys_package_count = ARRAYSIZEOF(mysql_sys_package_file_table);
for (int64_t i = 0; OB_SUCC(ret) && i < sys_package_count; ++i) {
if (0 == package_name.case_compare(ObString(mysql_sys_package_file_table[i].package_name))) {
const char *package_spec_name = mysql_sys_package_file_table[i].package_spec_file_name;
const char *package_body_name = mysql_sys_package_file_table[i].package_body_file_name;
OZ (databuff_printf(
package_spec_full_path, MAX_PATH_SIZE, "%s/%s", sys_package_dir, package_spec_name));
OZ (databuff_printf(
package_body_full_path, MAX_PATH_SIZE, "%s/%s", sys_package_dir, package_body_name));
OX (package_exists = true);
break;
}
}
SEARCH_SYSPACK_FILE_BY_NAME(mysql_syspack_file_list);
SEARCH_SYSPACK_FILE_BY_NAME(mysql_special_syspack_file_list);
}
if (OB_SUCC(ret) && !package_exists) {
#undef SEARCH_SYSPACK_FILE_BY_NAME
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(pack_file_info)) {
ret = OB_ERR_PACKAGE_DOSE_NOT_EXIST;
LOG_WARN("package not exists", K(ret), K(package_name));
LOG_WARN("package not exists", K(ret), K(package_name), K(compa_mode));
LOG_USER_ERROR(OB_ERR_PACKAGE_DOSE_NOT_EXIST,
"PACKAGE",
ObString("oceanbase").length(), ObString("oceanbase").ptr(),
package_name.length(), package_name.ptr());
} else {
OZ (load_sys_package(sql_proxy, *pack_file_info, compa_mode, from_file));
}
OZ (load_sys_package(sql_proxy, package_spec_full_path, package_body_full_path, compa_mode));
return ret;
}
int ObPLPackageManager::load_all_common_sys_package(ObMySQLProxy &sql_proxy,
const ObSysPackageFile *package_file,
int sys_package_count,
ObCompatibilityMode compa_mode)
int ObPLPackageManager::load_sys_package_list(ObMySQLProxy &sql_proxy,
const ObSysPackageFile *sys_package_list,
int sys_package_count,
ObCompatibilityMode compa_mode,
bool from_file)
{
int ret = OB_SUCCESS;
char package_spec_full_path[MAX_PATH_SIZE] = {};
char package_body_full_path[MAX_PATH_SIZE] = {};
CK (OB_NOT_NULL(package_file));
LOG_INFO("load all sys package begin", "sys package total count", sys_package_count);
CK (OB_NOT_NULL(sys_package_list));
LOG_INFO("load sys package list begin", "sys package total count", sys_package_count);
for (int i = 0; OB_SUCC(ret) && i < sys_package_count; ++i) {
const char *package_spec_name = package_file[i].package_spec_file_name;
const char *package_body_name = package_file[i].package_body_file_name;
OZ (databuff_printf(
package_spec_full_path, MAX_PATH_SIZE, "%s/%s", sys_package_dir, package_spec_name));
OZ (databuff_printf(
package_body_full_path, MAX_PATH_SIZE, "%s/%s", sys_package_dir, package_body_name));
if (OB_SUCC(ret)) {
LOG_INFO("load sys package begin", K(package_spec_name));
if (OB_FAIL(load_sys_package(sql_proxy, package_spec_full_path, package_body_full_path, compa_mode))) {
LOG_WARN("load sys package failed",
K(package_spec_full_path), K(package_body_full_path), K(compa_mode), K(ret));
} else {
LOG_INFO("load sys package success", K(ret), K(package_spec_name));
}
}
OZ (load_sys_package(sql_proxy, sys_package_list[i], compa_mode, from_file));
}
if (OB_FAIL(ret)) {
LOG_WARN("load sys package list failed", K(ret), K(compa_mode));
} else {
LOG_INFO("load sys package list success", K(ret), K(compa_mode));
}
return ret;
}
int ObPLPackageManager::load_all_common_sys_package(ObMySQLProxy &sql_proxy, ObCompatibilityMode need_compa_mode)
{
int ObPLPackageManager::load_all_common_sys_package(
ObMySQLProxy &sql_proxy, ObCompatibilityMode compa_mode, bool from_file) {
int ret = OB_SUCCESS;
bool exist = false;
if (OB_FAIL(FileDirectoryUtils::is_exists(sys_package_dir, exist))) {
LOG_WARN("check sys package dir whether exist failed", K(sys_package_dir), K(ret));
} else if (!exist) {
LOG_INFO("sys package dir not exist", K(sys_package_dir));
} else {
if (need_compa_mode == ObCompatibilityMode::OCEANBASE_MODE) {
OZ (load_all_common_sys_package(sql_proxy, oracle_sys_package_file_table,
ARRAYSIZEOF(oracle_sys_package_file_table), ObCompatibilityMode::ORACLE_MODE));
OZ (load_all_common_sys_package(sql_proxy, mysql_sys_package_file_table,
ARRAYSIZEOF(mysql_sys_package_file_table), ObCompatibilityMode::MYSQL_MODE));
} else if (need_compa_mode == ObCompatibilityMode::ORACLE_MODE) {
OZ (load_all_common_sys_package(sql_proxy, oracle_sys_package_file_table,
ARRAYSIZEOF(oracle_sys_package_file_table), ObCompatibilityMode::ORACLE_MODE));
} else if (need_compa_mode == ObCompatibilityMode::MYSQL_MODE) {
OZ (load_all_common_sys_package(sql_proxy, mysql_sys_package_file_table,
ARRAYSIZEOF(mysql_sys_package_file_table), ObCompatibilityMode::MYSQL_MODE));
}
if (compa_mode == ObCompatibilityMode::OCEANBASE_MODE) {
OZ (load_sys_package_list(sql_proxy, oracle_syspack_file_list,
ARRAYSIZEOF(oracle_syspack_file_list),
ObCompatibilityMode::ORACLE_MODE,
from_file));
OZ (load_sys_package_list(sql_proxy, mysql_syspack_file_list,
ARRAYSIZEOF(mysql_syspack_file_list),
ObCompatibilityMode::MYSQL_MODE,
from_file));
} else if (compa_mode == ObCompatibilityMode::ORACLE_MODE) {
OZ (load_sys_package_list(sql_proxy, oracle_syspack_file_list,
ARRAYSIZEOF(oracle_syspack_file_list),
ObCompatibilityMode::ORACLE_MODE,
from_file));
} else if (compa_mode == ObCompatibilityMode::MYSQL_MODE) {
OZ (load_sys_package_list(sql_proxy, mysql_syspack_file_list,
ARRAYSIZEOF(mysql_syspack_file_list),
ObCompatibilityMode::MYSQL_MODE,
from_file));
}
if (OB_SUCC(ret)) {
LOG_INFO("load all common sys package success!");
LOG_INFO("load all common sys package success!", K(ret), K(from_file));
} else {
LOG_INFO("load all common sys package failed!");
LOG_WARN("load all common sys package failed!", K(ret), K(from_file));
}
return ret;
}
@ -440,39 +613,23 @@ int ObPLPackageManager::load_all_common_sys_package(ObMySQLProxy &sql_proxy, ObC
int ObPLPackageManager::load_all_special_sys_package(ObMySQLProxy &sql_proxy)
{
int ret = OB_SUCCESS;
bool exist = false;
if (OB_FAIL(FileDirectoryUtils::is_exists(sys_package_dir, exist))) {
LOG_WARN("check sys package dir whether exist failed", K(sys_package_dir), K(ret));
} else if (!exist) {
LOG_INFO("sys package dir not exist", K(sys_package_dir));
} else {
// for now! we only have one special system package "__DBMS_UPGRADE"
char package_spec_full_path[MAX_PATH_SIZE] = {};
char package_body_full_path[MAX_PATH_SIZE] = {};
#ifdef OB_BUILD_ORACLE_PL
OZ (databuff_printf(
package_spec_full_path, MAX_PATH_SIZE, "%s/%s", sys_package_dir, "__dbms_upgrade.sql"));
OZ (databuff_printf(
package_body_full_path, MAX_PATH_SIZE, "%s/%s", sys_package_dir, "__dbms_upgrade_body.sql"));
OZ (load_sys_package(sql_proxy, package_spec_full_path, package_body_full_path, ObCompatibilityMode::ORACLE_MODE));
OZ (load_sys_package_list(sql_proxy, oracle_special_syspack_file_list,
ARRAYSIZEOF(oracle_special_syspack_file_list),
ObCompatibilityMode::ORACLE_MODE,
false /* from_file */));
#endif
memset(package_spec_full_path, 0, sizeof(package_spec_full_path));
memset(package_body_full_path, 0, sizeof(package_body_full_path));
OZ (databuff_printf(
package_spec_full_path, MAX_PATH_SIZE, "%s/%s", sys_package_dir, "__dbms_upgrade_mysql.sql"));
OZ (databuff_printf(
package_body_full_path, MAX_PATH_SIZE, "%s/%s", sys_package_dir, "__dbms_upgrade_body_mysql.sql"));
OZ (load_sys_package(sql_proxy, package_spec_full_path, package_body_full_path, ObCompatibilityMode::MYSQL_MODE));
}
OZ (load_sys_package_list(sql_proxy, mysql_special_syspack_file_list,
ARRAYSIZEOF(mysql_special_syspack_file_list),
ObCompatibilityMode::MYSQL_MODE,
false /* from_file */));
return ret;
}
int ObPLPackageManager::load_all_sys_package(ObMySQLProxy &sql_proxy)
{
int ret = OB_SUCCESS;
OZ (load_all_common_sys_package(sql_proxy, ObCompatibilityMode::OCEANBASE_MODE));
OZ (load_all_common_sys_package(sql_proxy, ObCompatibilityMode::OCEANBASE_MODE, false /* from_file */));
OZ (load_all_special_sys_package(sql_proxy));
if (OB_SUCC(ret)) {
LOG_INFO("load all sys package success!", K(ret));

View File

@ -63,11 +63,13 @@ class ObPLCursor;
class ObPLCacheCtx;
struct ObSysPackageFile {
const char *package_name;
const char *package_spec_file_name;
const char *package_body_file_name;
const char *const package_name;
const char *const package_spec_file_name;
const char *const package_body_file_name;
};
class ObCharStream;
class ObPLPackageManager
{
public:
@ -139,28 +141,24 @@ public:
uint64_t package_id,
ObPLPackageState *&package_state,
bool for_static_member = false);
static int read_package_sql(FILE *file, char* buf, int64_t buf_len, bool &eof);
static int read_and_exec_package_sql(
common::ObMySQLProxy &sql_proxy, const char* package_full_path, ObCompatibilityMode compa_mode);
static int load_sys_package(
common::ObMySQLProxy &sql_proxy, const char *package_spec_name, const char *package_body_name, ObCompatibilityMode compa_mode);
static int load_sys_package(common::ObMySQLProxy &sql_proxy, common::ObString &package_name, ObCompatibilityMode compa_mode);
static int load_all_common_sys_package(common::ObMySQLProxy &sql_proxy, ObCompatibilityMode compa_mode);
static int load_sys_package(common::ObMySQLProxy &sql_proxy,
common::ObString &package_name,
ObCompatibilityMode compa_mode,
bool from_file);
static int load_all_common_sys_package(common::ObMySQLProxy &sql_proxy,
const ObSysPackageFile *package_file,
int sys_package_count,
ObCompatibilityMode compa_mode);
static int load_all_special_sys_package(common::ObMySQLProxy &sql_proxy);
ObCompatibilityMode compa_mode,
bool from_file);
static int load_all_sys_package(common::ObMySQLProxy &sql_proxy);
static int add_package_to_plan_cache(const ObPLResolveCtx &resolve_ctx, ObPLPackage *package);
static int get_package_from_plan_cache(const ObPLResolveCtx &resolve_ctx,
uint64_t package_id,
ObPLPackage *&package);
static int get_package_schema_info(
share::schema::ObSchemaGetterGuard &schema_guard,
uint64_t package_id,
const share::schema::ObPackageInfo *&package_spec_info,
const share::schema::ObPackageInfo *&package_body_info);
static int get_package_schema_info(share::schema::ObSchemaGetterGuard &schema_guard,
uint64_t package_id,
const share::schema::ObPackageInfo *&package_spec_info,
const share::schema::ObPackageInfo *&package_body_info);
static int destory_package_state(sql::ObSQLSessionInfo &session_info, uint64_t package_id);
int check_version(const ObPLResolveCtx &resolve_ctx, uint64_t package_id,
const ObPackageStateVersion &state_version, bool &match);
@ -174,6 +172,22 @@ public:
private:
DISALLOW_COPY_AND_ASSIGN(ObPLPackageManager);
static int read_package_sql(ObCharStream &stream, char* buf, int64_t buf_len, bool &eos);
static int read_and_exec_package_sql(common::ObMySQLProxy &sql_proxy,
ObCharStream &stream,
ObCompatibilityMode compa_mode);
static int get_syspack_source_file_content(const char *file_name, const char *&content);
static int load_sys_package(ObMySQLProxy &sql_proxy,
const ObSysPackageFile &pack_file_info,
ObCompatibilityMode compa_mode,
bool from_file);
static int load_sys_package_list(common::ObMySQLProxy &sql_proxy,
const ObSysPackageFile *sys_package_list,
int sys_package_count,
ObCompatibilityMode compa_mode,
bool from_file);
static int load_all_special_sys_package(common::ObMySQLProxy &sql_proxy);
int get_cached_package_spec(const ObPLResolveCtx &resolve_ctx, uint64_t package_id,
ObPLPackage *&package_spec);

View File

@ -2389,10 +2389,13 @@ int ObPLCursorInfo::prepare_spi_result(ObPLExecCtx *ctx, ObSPIResultSet *&spi_re
}
OX (spi_cursor_ = get_allocator()->alloc(sizeof(ObSPIResultSet)));
OV (OB_NOT_NULL(spi_cursor_), OB_ALLOCATE_MEMORY_FAILED);
} else {
CK (OB_NOT_NULL(static_cast<ObSPIResultSet*>(spi_cursor_)));
OX ((static_cast<ObSPIResultSet*>(spi_cursor_))->~ObSPIResultSet());
}
OX (spi_result = new (spi_cursor_) ObSPIResultSet());
OZ (spi_result->init(*ctx->exec_ctx_->get_my_session()));
OX (last_stream_cursor_ = true);
OZ (spi_result->init(*ctx->exec_ctx_->get_my_session()));
return ret;
}
@ -2413,6 +2416,14 @@ int ObPLCursorInfo::prepare_spi_cursor(ObSPICursor *&spi_cursor,
: sizeof(ObSPICursor);
OX (spi_cursor_ = spi_allocator->alloc(alloc_size));
OV (OB_NOT_NULL(spi_cursor_), OB_ALLOCATE_MEMORY_FAILED);
} else {
if (last_stream_cursor_) {
CK (OB_NOT_NULL(static_cast<ObSPIResultSet*>(spi_cursor_)));
OX (static_cast<ObSPIResultSet*>(spi_cursor_)->~ObSPIResultSet());
} else {
CK (OB_NOT_NULL(static_cast<ObSPICursor*>(spi_cursor_)));
OX (static_cast<ObSPICursor*>(spi_cursor_)->~ObSPICursor());
}
}
OX (spi_cursor = new (spi_cursor_) ObSPICursor(*spi_allocator, session_info));
OX (last_stream_cursor_ = false);

View File

@ -28,15 +28,33 @@ int ObDBMSUpgrade::upgrade_single(
{
int ret = OB_SUCCESS;
ObString package_name;
bool load_from_file = true;
ObCompatibilityMode mode = lib::is_oracle_mode() ? ObCompatibilityMode::ORACLE_MODE
: ObCompatibilityMode::MYSQL_MODE;
UNUSED(result);
CK (OB_NOT_NULL(ctx.get_sql_proxy()));
CK (OB_LIKELY(1 == params.count()));
OV (params.at(0).is_varchar(), OB_INVALID_ARGUMENT);
OZ (params.at(0).get_string(package_name));
OV (!package_name.empty(), OB_INVALID_ARGUMENT);
OZ (ObPLPackageManager::load_sys_package(*ctx.get_sql_proxy(), package_name, mode));
// OBServer 4.2.4 has added new parameters on the __DBMS_UPGRADE
// interface to control whether to load the system package source code from
// a file or embeded c string. To maintain compatibility during upgarding,
// it is necessary to distinguish the old and new versions of the interface.
// However, the system package does not have version control, so it depends
// on the number of parameters to judge the old and new versions.
if (OB_FAIL(ret)) {
} else if (1 == params.count()) {
OV (params.at(0).is_varchar(), OB_INVALID_ARGUMENT);
OZ (params.at(0).get_string(package_name));
OV (!package_name.empty(), OB_INVALID_ARGUMENT);
} else if (2 == params.count()) {
OV (params.at(0).is_varchar(), OB_INVALID_ARGUMENT);
OZ (params.at(0).get_string(package_name));
OV (!package_name.empty(), OB_INVALID_ARGUMENT);
OV (params.at(1).is_tinyint(), OB_INVALID_ARGUMENT);
OZ (params.at(1).get_bool(load_from_file));
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("__DBMS_UPGRADE.UPGRADE_SINGLE require one or two arguments", K(ret), K(params));
}
OZ (ObPLPackageManager::load_sys_package(*ctx.get_sql_proxy(), package_name, mode, load_from_file));
return ret;
}
@ -44,12 +62,22 @@ int ObDBMSUpgrade::upgrade_all(
sql::ObExecContext &ctx, sql::ParamStore &params, common::ObObj &result)
{
int ret = OB_SUCCESS;
bool load_from_file = true;
ObCompatibilityMode mode = lib::is_oracle_mode() ? ObCompatibilityMode::ORACLE_MODE
: ObCompatibilityMode::MYSQL_MODE;
UNUSED(result);
CK (OB_NOT_NULL(ctx.get_sql_proxy()));
CK (OB_LIKELY(0 == params.count()));
OZ (ObPLPackageManager::load_all_common_sys_package(*ctx.get_sql_proxy(), mode));
if (OB_FAIL(ret)) {
} else if (0 == params.count()) {
// do nothing
} else if (1 == params.count()) {
OV (params.at(0).is_tinyint(), OB_INVALID_ARGUMENT);
OZ (params.at(0).get_bool(load_from_file));
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("__DBMS_UPGRADE.UPGRADE_ALL require none or one arguments", K(ret), K(params));
}
OZ (ObPLPackageManager::load_all_common_sys_package(*ctx.get_sql_proxy(), mode, load_from_file));
return ret;
}

View File

@ -915,17 +915,18 @@ int ObUpgradeExecutor::upgrade_mysql_system_package_job_()
int64_t timeout = GCONF._ob_ddl_timeout;
const char *create_package_sql =
"CREATE OR REPLACE PACKAGE __DBMS_UPGRADE \
PROCEDURE UPGRADE(package_name VARCHAR(1024)); \
PROCEDURE UPGRADE_ALL(); \
PROCEDURE UPGRADE(package_name VARCHAR(1024), \
load_from_file BOOLEAN DEFAULT TRUE); \
PROCEDURE UPGRADE_ALL(load_from_file BOOLEAN DEFAULT TRUE); \
END;";
const char *create_package_body_sql =
"CREATE OR REPLACE PACKAGE BODY __DBMS_UPGRADE \
PROCEDURE UPGRADE(package_name VARCHAR(1024)); \
PROCEDURE UPGRADE(package_name VARCHAR(1024), load_from_file BOOLEAN); \
PRAGMA INTERFACE(c, UPGRADE_SINGLE); \
PROCEDURE UPGRADE_ALL(); \
PROCEDURE UPGRADE_ALL(load_from_file BOOLEAN); \
PRAGMA INTERFACE(c, UPGRADE_ALL); \
END;";
const char *upgrade_sql = "CALL __DBMS_UPGRADE.UPGRADE_ALL();";
const char *upgrade_sql = "CALL __DBMS_UPGRADE.UPGRADE_ALL(FALSE);";
ObTimeoutCtx ctx;
int64_t affected_rows = 0;
if (OB_FAIL(check_inner_stat_())) {
@ -974,17 +975,18 @@ int ObUpgradeExecutor::upgrade_oracle_system_package_job_()
int64_t timeout = GCONF._ob_ddl_timeout;
const char *create_package_sql =
"CREATE OR REPLACE PACKAGE \"__DBMS_UPGRADE\" IS \
PROCEDURE UPGRADE(package_name VARCHAR2); \
PROCEDURE UPGRADE_ALL; \
PROCEDURE UPGRADE(package_name VARCHAR2, \
load_from_file BOOLEAN DEFAULT TRUE); \
PROCEDURE UPGRADE_ALL(load_from_file BOOLEAN DEFAULT TRUE); \
END;";
const char *create_package_body_sql =
"CREATE OR REPLACE PACKAGE BODY \"__DBMS_UPGRADE\" IS \
PROCEDURE UPGRADE(package_name VARCHAR2); \
PROCEDURE UPGRADE(package_name VARCHAR2, load_from_file BOOLEAN); \
PRAGMA INTERFACE(c, UPGRADE_SINGLE); \
PROCEDURE UPGRADE_ALL; \
PROCEDURE UPGRADE_ALL(load_from_file BOOLEAN); \
PRAGMA INTERFACE(c, UPGRADE_ALL); \
END;";
const char *upgrade_sql = "CALL \"__DBMS_UPGRADE\".UPGRADE_ALL();";
const char *upgrade_sql = "BEGIN \"__DBMS_UPGRADE\".UPGRADE_ALL(FALSE); END;";
ObTimeoutCtx ctx;
int64_t affected_rows = 0;
if (OB_FAIL(check_inner_stat_())) {
@ -1017,9 +1019,9 @@ int ObUpgradeExecutor::upgrade_oracle_system_package_job_()
OB_SYS_TENANT_ID, upgrade_sql,
affected_rows, static_cast<int64_t>(mode)))) {
LOG_WARN("fail to execute sql", KR(ret), "sql", upgrade_sql);
} else if (0 != affected_rows) {
} else if (1 != affected_rows) { // default value of oracle anonymous block affected_rows is 1
ret = OB_ERR_UNEXPECTED;
LOG_WARN("affected_rows expected to be zero", KR(ret), K(affected_rows));
LOG_WARN("affected_rows expected to be 1", KR(ret), K(affected_rows));
}
FLOG_INFO("[UPGRADE] finish run upgrade oracle system package job",
KR(ret), "cost", ObTimeUtility::current_time() - start_ts);

View File

@ -3,9 +3,9 @@
CREATE OR REPLACE PACKAGE BODY __DBMS_UPGRADE
PROCEDURE UPGRADE(package_name VARCHAR(1024));
PROCEDURE UPGRADE(package_name VARCHAR(1024), load_from_file BOOLEAN);
PRAGMA INTERFACE(c, UPGRADE_SINGLE);
PROCEDURE UPGRADE_ALL();
PROCEDURE UPGRADE_ALL(load_from_file BOOLEAN);
PRAGMA INTERFACE(c, UPGRADE_ALL);
END;
//

View File

@ -3,7 +3,8 @@
CREATE OR REPLACE PACKAGE __DBMS_UPGRADE
PROCEDURE UPGRADE(package_name VARCHAR(1024));
PROCEDURE UPGRADE_ALL();
PROCEDURE UPGRADE(package_name VARCHAR(1024),
load_from_file BOOLEAN DEFAULT TRUE);
PROCEDURE UPGRADE_ALL(load_from_file BOOLEAN DEFAULT TRUE);
END;
//

View File

@ -546,6 +546,15 @@ int ObRARowStore::switch_idx_block(bool finish_add /* = false */)
if (!is_inited()) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (finish_add && NULL == idx_blk_) {
if (OB_FAIL(build_idx_block())) {
LOG_WARN("build index block failed",
K(ret), K(finish_add), KPC(idx_blk_));
}
}
if (OB_FAIL(ret)) {
// do nothing
} else if (NULL == idx_blk_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("index block should not be null");

File diff suppressed because it is too large Load Diff

View File

@ -35,6 +35,7 @@ using common::ObPsStmtId;
namespace pl
{
class ObDbmsCursorInfo;
class ObPLSqlCodeInfo;
}
namespace sql
@ -58,6 +59,40 @@ struct ObPLSPITraceIdGuard
int &ret_;
};
class ObSPIRetryCtrlGuard
{
public:
ObSPIRetryCtrlGuard(
observer::ObQueryRetryCtrl &retry_ctrl,
ObSPIResultSet &spi_result,
ObSQLSessionInfo &session_info,
int &ret,
bool for_fetch = false);
~ObSPIRetryCtrlGuard();
void test();
private:
observer::ObQueryRetryCtrl &retry_ctrl_;
ObSPIResultSet &spi_result_;
ObSQLSessionInfo &session_info_;
pl::ObPLSqlCodeInfo save_sqlcode_info_;
int &ret_;
bool init_;
};
class ObSPIExecEnvGuard
{
public:
ObSPIExecEnvGuard(ObSQLSessionInfo &session_info, ObSPIResultSet &spi_result);
~ObSPIExecEnvGuard();
private:
ObSQLSessionInfo &session_info_;
ObSPIResultSet &spi_result_;
int64_t query_start_time_bk_;
pl::ObPLSqlCodeInfo *sqlcode_info_bk_;
};
struct ObSPICursor
{
ObSPICursor(ObIAllocator &allocator, sql::ObSQLSessionInfo* session_info) :
@ -201,7 +236,7 @@ public:
sql::ObSqlCtx &get_sql_ctx() { return sql_ctx_; }
ObResultSet *get_result_set() { return result_set_; }
ObSPIOutParams &get_out_params() { return out_params_; }
ObIAllocator &get_allocaor() { return allocator_; }
ObIAllocator &get_allocator() { return allocator_; }
int destruct_exec_params(ObSQLSessionInfo &session);
private:
enum EndStmtType
@ -512,6 +547,55 @@ public:
bool is_returning = false,
bool is_type_record = false);
static int check_dynamic_sql_legal(pl::ObPLExecCtx *ctx,
ObSqlString &sql_str,
stmt::StmtType stmt_type,
int64_t into_count,
int64_t inner_into_count,
common::ObObjParam **params,
int64_t param_count,
const int64_t *params_mode,
bool is_returning,
bool for_update,
int64_t &exec_param_cnt,
common::ObIArray<ObObjParam*> &out_using_params);
static int prepare_dynamic_sql_params(pl::ObPLExecCtx *ctx,
ObSPIResultSet &spi_result,
common::ObIAllocator &allocator,
int64_t exec_param_cnt,
ObObjParam **params,
ParamStore *&exec_params);
static int inner_open(pl::ObPLExecCtx *ctx,
ObIAllocator &param_allocator, //用于拷贝执行期参数
const ObString &sql,
const ObString &ps_sql,
int64_t type,
void *params,
int64_t param_count,
const ObSqlExpression **into_exprs,
int64_t into_count,
ObSPIResultSet &spi_result,
ObSPIOutParams &out_params,
bool is_forall,
bool is_dynamic_sql,
bool is_dbms_sql);
static int prepare_static_sql_params(pl::ObPLExecCtx *ctx,
ObIAllocator &param_allocator,
const ObString &sql,
const ObString &ps_sql,
int64_t type,
const ObSqlExpression **params,
int64_t param_count,
const ObSqlExpression **into_exprs,
int64_t into_count,
ObSPIResultSet &spi_result,
ObSPIOutParams &out_params,
bool is_forall,
ParamStore *&curr_params);
static int spi_get_subprogram_cursor_info(pl::ObPLExecCtx *ctx,
uint64_t package_id,
uint64_t routine_id,
@ -581,7 +665,8 @@ public:
const ObString &ps_sql,
int64_t stmt_type,
bool for_update,
bool has_hidden_rowid);
bool has_hidden_rowid,
int64_t orc_max_ret_rows = INT64_MAX);
static int spi_dynamic_open(pl::ObPLExecCtx *ctx,
const int64_t sql_idx,
const int64_t *sql_param_exprs_idx,
@ -591,7 +676,8 @@ public:
int64_t cursor_index);
static int dbms_dynamic_open(pl::ObPLExecCtx *ctx,
pl::ObDbmsCursorInfo &cursor,
bool is_dbms_sql = false);
bool is_dbms_sql = false,
int64_t orc_max_ret_rows = INT64_MAX);
static int dbms_cursor_fetch(pl::ObPLExecCtx *ctx,
pl::ObDbmsCursorInfo &cursor,
bool is_server_cursor = false);
@ -807,10 +893,14 @@ public:
int64_t stmt_type,
ParamStore &exec_params,
ObSPIResultSet &spi_result,
ObSPIOutParams &out_params);
ObSPIOutParams &out_params,
bool is_dynamic_sql = false);
static void adjust_pl_status_for_xa(sql::ObExecContext &ctx, int &result);
static int fill_cursor(ObResultSet &result_set, ObSPICursor *cursor, int64_t new_query_start_time);
static int fill_cursor(ObResultSet &result_set,
ObSPICursor *cursor,
int64_t new_query_start_time,
int64_t orc_max_ret_rows = INT64_MAX);
static int spi_opaque_assign_null(int64_t opaque_ptr);
@ -897,7 +987,7 @@ private:
const char *sql,
const char *ps_sql,
int64_t type,
const ObSqlExpression **param_exprs,
void *params,
int64_t param_count,
const ObSqlExpression **into_exprs,
int64_t into_count,
@ -908,7 +998,10 @@ private:
int64_t is_bulk,
bool is_forall = false,
bool is_type_record = false,
bool for_update = false);
bool for_update = false,
bool is_dynamic_sql = false,
ObIArray<ObObjParam*> *using_out_params = nullptr,
bool is_dbms_sql = false);
static int dbms_cursor_execute(pl::ObPLExecCtx *ctx,
const ObString ps_sql,
@ -940,22 +1033,8 @@ private:
ObSPIOutParams &out_params,
bool is_forall = false);
static int inner_open(pl::ObPLExecCtx *ctx,
ObIAllocator &param_allocator, //用于拷贝执行期参数
const char* sql,
const char* ps_sql,
int64_t type,
const ObSqlExpression **param_exprs,
int64_t param_count,
const ObSqlExpression **into_exprs,
int64_t into_count,
ObSPIResultSet &spi_result,
ObSPIOutParams &out_params,
observer::ObQueryRetryCtrl *retry_ctrl = nullptr,
bool is_forall = false);
static int inner_fetch(pl::ObPLExecCtx *ctx,
observer::ObQueryRetryCtrl &retry_ctrl,
bool &can_retry,
ObSPIResultSet &spi_result,
const ObSqlExpression **into_exprs,
int64_t into_count,
@ -1136,9 +1215,11 @@ private:
static int calc_dynamic_sqlstr(
pl::ObPLExecCtx *ctx, const ObSqlExpression *sql, ObSqlString &sqlstr);
static int dynamic_out_params(
common::ObIAllocator &allocator,
ObResultSet *result, common::ObObjParam **params, int64_t param_count);
static int dynamic_out_params(common::ObIAllocator &allocator,
ObResultSet *result,
void *params,
int64_t param_count,
bool is_dbms_sql);
static int cursor_close_impl(pl::ObPLExecCtx *ctx,
pl::ObPLCursorInfo *cursor,
@ -1177,7 +1258,35 @@ private:
const ObSqlExpression **actual_param_exprs,
int64_t cursor_param_count);
static bool is_sql_type_into_pl(ObObj &dest_addr, ObIArray<ObObj> &obj_array);
private:
static int streaming_cursor_open(pl::ObPLExecCtx *ctx,
pl::ObPLCursorInfo &cursor,
ObSQLSessionInfo &session_info,
const ObString &sql,
const ObString &ps_sql,
int64_t type,
void *params,
int64_t sql_param_count,
bool is_server_cursor,
bool is_for_update,
bool has_hidden_rowid,
bool is_dbms_cursor = false);
static int unstreaming_cursor_open(pl::ObPLExecCtx *ctx,
pl::ObPLCursorInfo &cursor,
ObSQLSessionInfo &session_info,
const ObString &sql,
const ObString &ps_sql,
int64_t type,
void *params,
int64_t sql_param_count,
bool is_server_cursor,
bool for_update,
bool has_hidden_rowid,
bool is_dbms_cursor = false,
int64_t orc_max_ret_rows = INT64_MAX);
static int store_params_string(pl::ObPLExecCtx *ctx, ObSPIResultSet &spi_result, ParamStore *exec_params);
static int setup_cursor_snapshot_verify_(pl::ObPLCursorInfo *cursor, ObSPIResultSet *spi_result);
};

View File

@ -2404,6 +2404,12 @@ OB_INLINE int ObBasicSessionInfo::process_session_variable(ObSysVarClassType var
}
break;
}
case SYS_VAR__ORACLE_SQL_SELECT_LIMIT: {
int64_t int_val = 0;
OZ (val.get_int(int_val), val);
OX (sys_vars_cache_.set_oracle_sql_select_limit(int_val));
break;
}
case SYS_VAR_AUTO_INCREMENT_OFFSET: {
uint64_t uint_val = 0;
OZ (val.get_uint64(uint_val), val);
@ -2969,6 +2975,12 @@ int ObBasicSessionInfo::fill_sys_vars_cache_base_value(
OX (sys_vars_cache.set_base_sql_select_limit(int_val));
break;
}
case SYS_VAR__ORACLE_SQL_SELECT_LIMIT: {
int64_t int_val = 0;
OZ (val.get_int(int_val), val);
OX (sys_vars_cache.set_base_oracle_sql_select_limit(int_val));
break;
}
case SYS_VAR_AUTO_INCREMENT_OFFSET: {
uint64_t uint_val = 0;
OZ (val.get_uint64(uint_val), val);

View File

@ -480,6 +480,7 @@ public:
uint64_t get_local_auto_increment_increment() const;
uint64_t get_local_auto_increment_offset() const;
uint64_t get_local_last_insert_id() const;
void set_local_ob_enable_pl_cache(bool v) { sys_vars_cache_.set_ob_enable_pl_cache(v); }
bool get_local_ob_enable_pl_cache() const;
bool get_local_ob_enable_plan_cache() const;
bool get_local_ob_enable_sql_audit() const;
@ -651,6 +652,11 @@ public:
sql_select_limit = sys_vars_cache_.get_sql_select_limit();
return common::OB_SUCCESS;
}
int get_oracle_sql_select_limit(int64_t &oracle_sql_select_limit) const
{
oracle_sql_select_limit = sys_vars_cache_.get_oracle_sql_select_limit();
return common::OB_SUCCESS;
}
// session保留compatible mode,主要用于传递mode,方便后续进行guard切换,如inner sql connection等
// 其他需要用mode地方请尽量使用线程上的is_oracle|mysql_mode
// 同时可以使用check_compatibility_mode来检查线程与session上的mode是否一致
@ -1667,6 +1673,7 @@ public:
sql_throttle_current_priority_(100),
ob_last_schema_version_(0),
sql_select_limit_(0),
oracle_sql_select_limit_(0),
auto_increment_offset_(0),
last_insert_id_(0),
binlog_row_image_(2),
@ -1729,6 +1736,7 @@ public:
sql_throttle_current_priority_ = 100;
ob_last_schema_version_ = 0;
sql_select_limit_ = 0;
oracle_sql_select_limit_ = 0;
auto_increment_offset_ = 0;
last_insert_id_ = 0;
binlog_row_image_ = 2;
@ -1789,6 +1797,7 @@ public:
sql_throttle_current_priority_ == other.sql_throttle_current_priority_ &&
ob_last_schema_version_ == other.ob_last_schema_version_ &&
sql_select_limit_ == other.sql_select_limit_ &&
oracle_sql_select_limit_ == other.oracle_sql_select_limit_ &&
auto_increment_offset_ == other.auto_increment_offset_ &&
last_insert_id_ == other.last_insert_id_ &&
binlog_row_image_ == other.binlog_row_image_ &&
@ -1945,7 +1954,7 @@ public:
K(ob_org_cluster_id_), K(ob_query_timeout_), K(ob_trx_timeout_), K(collation_connection_),
K(sql_mode_), K(nls_formats_[0]), K(nls_formats_[1]), K(nls_formats_[2]),
K(ob_trx_idle_timeout_), K(ob_trx_lock_timeout_), K(nls_collation_), K(nls_nation_collation_),
K_(sql_throttle_current_priority), K_(ob_last_schema_version), K_(sql_select_limit),
K_(sql_throttle_current_priority), K_(ob_last_schema_version), K_(sql_select_limit), K_(oracle_sql_select_limit),
K_(optimizer_use_sql_plan_baselines), K_(optimizer_capture_sql_plan_baselines),
K_(is_result_accurate), K_(character_set_results),
K_(character_set_connection), K_(ob_pl_block_timeout), K_(ob_plsql_ccflags),
@ -1958,6 +1967,7 @@ public:
int64_t sql_throttle_current_priority_;
int64_t ob_last_schema_version_;
int64_t sql_select_limit_;
int64_t oracle_sql_select_limit_;
uint64_t auto_increment_offset_;
uint64_t last_insert_id_;
int64_t binlog_row_image_;
@ -2085,6 +2095,7 @@ private:
DEF_SYS_VAR_CACHE_FUNCS(int64_t, sql_throttle_current_priority);
DEF_SYS_VAR_CACHE_FUNCS(int64_t, ob_last_schema_version);
DEF_SYS_VAR_CACHE_FUNCS(int64_t, sql_select_limit);
DEF_SYS_VAR_CACHE_FUNCS(int64_t, oracle_sql_select_limit);
DEF_SYS_VAR_CACHE_FUNCS(uint64_t, auto_increment_offset);
DEF_SYS_VAR_CACHE_FUNCS(uint64_t, last_insert_id);
DEF_SYS_VAR_CACHE_FUNCS(int64_t, binlog_row_image);
@ -2160,6 +2171,7 @@ private:
bool inc_sql_throttle_current_priority_:1;
bool inc_ob_last_schema_version_:1;
bool inc_sql_select_limit_:1;
bool inc_oracle_sql_select_limit_:1;
bool inc_auto_increment_offset_:1;
bool inc_last_insert_id_:1;
bool inc_binlog_row_image_:1;