[bugfix] fix cs_encoding simd pfor codec format depend on cpu arch

This commit is contained in:
saltonz
2024-08-06 06:15:51 +00:00
committed by ob-robot
parent 89000ccd6b
commit e8faf23073
22 changed files with 667 additions and 82 deletions

View File

@ -83,6 +83,12 @@ namespace common
class ObCodec
{
public:
enum PFoRPackingType : uint8_t
{
CPU_ARCH_DEPENDANT = 0, // before 4.3.3, cpu support AVX2 instruction set will generate different format
CPU_ARCH_INDEPENDANT_SCALAR = 1, // after 4.3.3, use unified scalar format
// CPU_ARCH_INDEPENDANT_SIMD = 2,
};
// get buf size which will be used to hold output data of current codec,
// in order to make enough buf, this value is estimated to be very large.
static OB_INLINE int64_t get_default_max_encoding_size(const int64_t orig_size)
@ -99,7 +105,7 @@ public:
}
ObCodec()
: uint_bytes_(0), check_out_buf_(true), allocator_(nullptr)
: uint_bytes_(0), check_out_buf_(true), pfor_packing_type_(CPU_ARCH_INDEPENDANT_SCALAR), allocator_(nullptr)
{}
virtual ~ObCodec() {}
@ -177,6 +183,11 @@ public:
virtual void set_uint_bytes(const uint8_t uint_bytes) { uint_bytes_ = uint_bytes; }
OB_INLINE uint32_t get_uint_bytes() const { return uint_bytes_; };
virtual void set_pfor_packing_type(const PFoRPackingType pfor_packing_type)
{
pfor_packing_type_ = pfor_packing_type;
}
OB_INLINE PFoRPackingType get_pfor_packing_type() { return pfor_packing_type_; }
virtual void set_allocator(common::ObIAllocator &allocator) { allocator_ = &allocator; }
OB_INLINE void disable_check_out_buf() { check_out_buf_ = false; }
@ -186,6 +197,7 @@ public:
protected:
uint8_t uint_bytes_; // orig uint bytes width, maybe 1, 2, 4, 8 bytes
bool check_out_buf_;
PFoRPackingType pfor_packing_type_; // pfor bit-packing format type
common::ObIAllocator *allocator_;
};

View File

@ -50,6 +50,13 @@ class ObCompositeCodec : public ObCodec
codec2.set_uint_bytes(uint_bytes);
}
virtual void set_pfor_packing_type(const PFoRPackingType pfor_packing_type) override
{
ObCodec::set_pfor_packing_type(pfor_packing_type);
codec1.set_pfor_packing_type(pfor_packing_type);
codec2.set_pfor_packing_type(pfor_packing_type);
}
virtual int do_encode(const char *in,
const uint64_t in_len,
char *out,

View File

@ -42,6 +42,7 @@ OB_INLINE static void encode_one(const UIntT *ip, const uint32_t idx, UIntT &sta
template<typename UIntT>
/*OB_INLINE*/ static int encode(
const ObCodec::PFoRPackingType pfor_packing_type,
UIntT *&lp,
const char *in,
uint64_t in_len,
@ -64,7 +65,7 @@ template<typename UIntT>
encode_one(ip, 3, start, p);
}
if (OB_FAIL(ObSIMDFixedPFor::__encode_array<UIntT>(lp, BSIZE, out, out_buf_len, out_pos))) {
if (OB_FAIL(ObSIMDFixedPFor::__encode_array<UIntT>(pfor_packing_type, lp, BSIZE, out, out_buf_len, out_pos))) {
LIB_LOG(WARN, "fail to encode array", K(ret), K(out_buf_len), K(out_pos));
} else {
DO_PREFETCH(ip + 512);
@ -99,6 +100,7 @@ OB_INLINE static void deocde_one(UIntT *op, const uint32_t idx, UIntT &start, UI
template<typename UIntT>
OB_INLINE static void decode(
const ObCodec::PFoRPackingType pfor_packing_type,
UIntT *&lp,
const char *in,
const uint64_t in_len,
@ -122,7 +124,7 @@ OB_INLINE static void decode(
DO_PREFETCH(in + pos + 512);
tmp_out_pos = 0;
ObSIMDFixedPFor::__decode_array<UIntT>(in, in_len, pos, BSIZE, tmp_out, tmp_out_len, tmp_out_pos);
ObSIMDFixedPFor::__decode_array<UIntT>(pfor_packing_type, in, in_len, pos, BSIZE, tmp_out, tmp_out_len, tmp_out_pos);
p = lp;
for(; p != (lp + BSIZE); p+=4,op+=4) {
@ -164,25 +166,26 @@ public:
uint64_t &out_pos) override
{
int ret = OB_SUCCESS;
const ObCodec::PFoRPackingType pfor_ptype = get_pfor_packing_type();
switch (get_uint_bytes()) {
case 1 : {
uint8_t *t_lp = reinterpret_cast<uint8_t *>(lp);
ret = ObDeltaZigzagFixedPfor::encode<uint8_t>(t_lp, in, in_len, out, out_len, out_pos, 0);
ret = ObDeltaZigzagFixedPfor::encode<uint8_t>(pfor_ptype, t_lp, in, in_len, out, out_len, out_pos, 0);
break;
}
case 2 : {
uint16_t *t_lp = reinterpret_cast<uint16_t *>(lp);
ret = ObDeltaZigzagFixedPfor::encode<uint16_t>(t_lp, in, in_len, out, out_len, out_pos, 0);
ret = ObDeltaZigzagFixedPfor::encode<uint16_t>(pfor_ptype, t_lp, in, in_len, out, out_len, out_pos, 0);
break;
}
case 4 : {
uint32_t *t_lp = reinterpret_cast<uint32_t *>(lp);
ret = ObDeltaZigzagFixedPfor::encode<uint32_t>(t_lp, in, in_len, out, out_len, out_pos, 0);
ret = ObDeltaZigzagFixedPfor::encode<uint32_t>(pfor_ptype, t_lp, in, in_len, out, out_len, out_pos, 0);
break;
}
case 8 : {
uint64_t *t_lp = reinterpret_cast<uint64_t *>(lp);
ret = ObDeltaZigzagFixedPfor::encode<uint64_t>(t_lp, in, in_len, out, out_len, out_pos, 0);
ret = ObDeltaZigzagFixedPfor::encode<uint64_t>(pfor_ptype, t_lp, in, in_len, out, out_len, out_pos, 0);
break;
}
default : {
@ -205,25 +208,26 @@ public:
uint64_t &out_pos) override
{
int ret = OB_SUCCESS;
const ObCodec::PFoRPackingType pfor_ptype = get_pfor_packing_type();
switch (get_uint_bytes()) {
case 1 : {
uint8_t *t_lp = reinterpret_cast<uint8_t *>(lp);
ObDeltaZigzagFixedPfor::decode<uint8_t>(t_lp, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
ObDeltaZigzagFixedPfor::decode<uint8_t>(pfor_ptype, t_lp, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
break;
}
case 2 : {
uint16_t *t_lp = reinterpret_cast<uint16_t *>(lp);
ObDeltaZigzagFixedPfor::decode<uint16_t>(t_lp, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
ObDeltaZigzagFixedPfor::decode<uint16_t>(pfor_ptype, t_lp, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
break;
}
case 4 : {
uint32_t *t_lp = reinterpret_cast<uint32_t *>(lp);
ObDeltaZigzagFixedPfor::decode<uint32_t>(t_lp, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
ObDeltaZigzagFixedPfor::decode<uint32_t>(pfor_ptype, t_lp, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
break;
}
case 8 : {
uint64_t *t_lp = reinterpret_cast<uint64_t *>(lp);
ObDeltaZigzagFixedPfor::decode<uint64_t>(t_lp, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
ObDeltaZigzagFixedPfor::decode<uint64_t>(pfor_ptype, t_lp, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
break;
}
default : {

View File

@ -44,6 +44,7 @@ OB_INLINE static void encode_one(const UIntT *ip, const uint32_t idx, UIntT &sta
template<typename UIntT>
/*OB_INLINE*/ static int encode(
const ObCodec::PFoRPackingType pfor_packing_type,
UIntT *&lp,
const char *in,
uint64_t in_len,
@ -67,7 +68,7 @@ template<typename UIntT>
encode_one(ip, 3, start, pd, p);
}
if (OB_FAIL(ObSIMDFixedPFor::__encode_array<UIntT>(lp, BSIZE, out, out_buf_len, out_pos))) {
if (OB_FAIL(ObSIMDFixedPFor::__encode_array<UIntT>(pfor_packing_type, lp, BSIZE, out, out_buf_len, out_pos))) {
LIB_LOG(WARN, "fail to encode array", K(ret), K(out_buf_len), K(out_pos));
} else {
DO_PREFETCH(ip + 512);
@ -102,6 +103,7 @@ OB_INLINE static void deocde_one(UIntT *op, const uint32_t idx, UIntT &start, UI
template<typename UIntT>
OB_INLINE static void decode(
const ObCodec::PFoRPackingType pfor_packing_type,
UIntT *&lp,
const char *in,
const uint64_t in_len,
@ -126,7 +128,7 @@ OB_INLINE static void decode(
DO_PREFETCH(in + pos + 512);
tmp_out_pos = 0;
ObSIMDFixedPFor::__decode_array<UIntT>(in, in_len, pos, BSIZE, tmp_out, tmp_out_len, tmp_out_pos);
ObSIMDFixedPFor::__decode_array<UIntT>(pfor_packing_type, in, in_len, pos, BSIZE, tmp_out, tmp_out_len, tmp_out_pos);
p = lp;
for(; p != (lp + BSIZE); p+=4,op+=4) {
@ -168,25 +170,26 @@ public:
uint64_t &out_pos) override
{
int ret = OB_SUCCESS;
const PFoRPackingType pfor_ptype = get_pfor_packing_type();
switch (get_uint_bytes()) {
case 1 : {
uint8_t *t_lp = reinterpret_cast<uint8_t *>(lp);
ret = ObDoubleDeltaZigzagFixedPfor::encode<uint8_t>(t_lp, in, in_len, out, out_len, out_pos, 0);
ret = ObDoubleDeltaZigzagFixedPfor::encode<uint8_t>(pfor_ptype, t_lp, in, in_len, out, out_len, out_pos, 0);
break;
}
case 2 : {
uint16_t *t_lp = reinterpret_cast<uint16_t *>(lp);
ret = ObDoubleDeltaZigzagFixedPfor::encode<uint16_t>(t_lp, in, in_len, out, out_len, out_pos, 0);
ret = ObDoubleDeltaZigzagFixedPfor::encode<uint16_t>(pfor_ptype, t_lp, in, in_len, out, out_len, out_pos, 0);
break;
}
case 4 : {
uint32_t *t_lp = reinterpret_cast<uint32_t *>(lp);
ret = ObDoubleDeltaZigzagFixedPfor::encode<uint32_t>(t_lp, in, in_len, out, out_len, out_pos, 0);
ret = ObDoubleDeltaZigzagFixedPfor::encode<uint32_t>(pfor_ptype, t_lp, in, in_len, out, out_len, out_pos, 0);
break;
}
case 8 : {
uint64_t *t_lp = reinterpret_cast<uint64_t *>(lp);
ret = ObDoubleDeltaZigzagFixedPfor::encode<uint64_t>(t_lp, in, in_len, out, out_len, out_pos, 0);
ret = ObDoubleDeltaZigzagFixedPfor::encode<uint64_t>(pfor_ptype, t_lp, in, in_len, out, out_len, out_pos, 0);
break;
}
default : {
@ -210,25 +213,26 @@ public:
uint64_t &out_pos) override
{
int ret = OB_SUCCESS;
const PFoRPackingType pfor_ptype = get_pfor_packing_type();
switch (get_uint_bytes()) {
case 1 : {
uint8_t *t_lp = reinterpret_cast<uint8_t *>(lp);
ObDoubleDeltaZigzagFixedPfor::decode<uint8_t>(t_lp, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
ObDoubleDeltaZigzagFixedPfor::decode<uint8_t>(pfor_ptype, t_lp, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
break;
}
case 2 : {
uint16_t *t_lp = reinterpret_cast<uint16_t *>(lp);
ObDoubleDeltaZigzagFixedPfor::decode<uint16_t>(t_lp, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
ObDoubleDeltaZigzagFixedPfor::decode<uint16_t>(pfor_ptype, t_lp, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
break;
}
case 4 : {
uint32_t *t_lp = reinterpret_cast<uint32_t *>(lp);
ObDoubleDeltaZigzagFixedPfor::decode<uint32_t>(t_lp, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
ObDoubleDeltaZigzagFixedPfor::decode<uint32_t>(pfor_ptype, t_lp, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
break;
}
case 8 : {
uint64_t *t_lp = reinterpret_cast<uint64_t *>(lp);
ObDoubleDeltaZigzagFixedPfor::decode<uint64_t>(t_lp, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
ObDoubleDeltaZigzagFixedPfor::decode<uint64_t>(pfor_ptype, t_lp, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
break;
}
default : {

View File

@ -109,6 +109,7 @@ public:
template<typename UIntT>
static OB_INLINE void inner_do_encode(
const PFoRPackingType pfor_packing_type,
const UIntT *__restrict in,
uint32_t n,
char *out,
@ -125,7 +126,7 @@ public:
char *orig_out = out;
if (bx == 0) { // no exception
inner_bit_packing<UIntT>(in, out, b, out_buf_len);
inner_bit_packing<UIntT>(pfor_packing_type, in, out, b, out_buf_len);
} else {
uint64_t i = 0;
uint64_t xn = 0;// the count of exceptions
@ -162,7 +163,7 @@ public:
}
// packing data
inner_bit_packing<UIntT>(_in, out, b, out_buf_len);
inner_bit_packing<UIntT>(pfor_packing_type, _in, out, b, out_buf_len);
}
len = (uint32_t)(out - orig_out);
}
@ -189,7 +190,8 @@ public:
}
template<typename UIntT>
static OB_INLINE int __encode_array(const UIntT *in, const uint32_t length,
static OB_INLINE int __encode_array(const PFoRPackingType pfor_packing_type,
const UIntT *in, const uint32_t length,
char *out, uint64_t out_buf_len, uint64_t &out_pos)
{
int ret = OB_SUCCESS;
@ -208,7 +210,7 @@ public:
} else {
uint32_t len = 0;
uint64_t remain_out_buf_len = out_buf_len - (out - orig_out);
inner_do_encode(in, length, out, b, bx, remain_out_buf_len, len);
inner_do_encode(pfor_packing_type, in, length, out, b, bx, remain_out_buf_len, len);
out += len;
out_pos = out - orig_out;
}
@ -225,7 +227,8 @@ public:
const UIntT *t_in = reinterpret_cast<const UIntT *>(in);
for (int64_t i = 0; OB_SUCC(ret) && i < t_in_len; i += BlockSize) {
const UIntT *next_in = t_in + i;
if (OB_FAIL(__encode_array<UIntT>(next_in, BlockSize, out, out_len, out_pos))) {
if (OB_FAIL(__encode_array<UIntT>(
get_pfor_packing_type(), next_in, BlockSize, out, out_len, out_pos))) {
LIB_LOG(WARN, "fail to encode array", K(ret), K(BlockSize), K(out_len), K(out_pos));
}
}
@ -270,6 +273,7 @@ public:
template<typename UIntT>
static OB_INLINE void inner_do_decode(
const PFoRPackingType pfor_packing_type,
const char *_in,
const uint64_t length,
uint64_t &pos,
@ -284,7 +288,7 @@ public:
b = *in++; // bit width
if (0 == (b & 0x80)) { // no exception value, direct unpack
inner_bit_unpacking<UIntT>(in, _in, length, out, b);
inner_bit_unpacking<UIntT>(pfor_packing_type, in, _in, length, out, b);
} else {
b &= (0x80 - 1); // get normal bit width
bx = *in++; // get exception bit width
@ -307,7 +311,7 @@ public:
}
// unpacking data
inner_bit_unpacking<UIntT>(in, _in, length, out, b);
inner_bit_unpacking<UIntT>(pfor_packing_type, in, _in, length, out, b);
UIntT *out_arr = reinterpret_cast<UIntT *>(_out + out_pos);
// patch exception, TODO, oushen, optimize later
@ -332,6 +336,7 @@ public:
template<typename UIntT>
static OB_INLINE void __decode_array(
const PFoRPackingType pfor_packing_type,
const char *in,
const uint64_t length,
uint64_t &pos,
@ -341,7 +346,7 @@ public:
uint64_t &out_pos)
{
for (int64_t i = 0; i < uint_count; i += BlockSize) {
inner_do_decode<UIntT>(in, length, pos, out, out_buf_len, out_pos);
inner_do_decode<UIntT>(pfor_packing_type, in, length, pos, out, out_buf_len, out_pos);
}
}
@ -355,21 +360,22 @@ public:
uint64_t &out_pos) override
{
int ret = OB_SUCCESS;
const ObCodec::PFoRPackingType pfor_packing_type = get_pfor_packing_type();
switch (get_uint_bytes()) {
case 1 : {
__decode_array<uint8_t>(in, in_len, in_pos, uint_count, out, out_len, out_pos);
__decode_array<uint8_t>(pfor_packing_type, in, in_len, in_pos, uint_count, out, out_len, out_pos);
break;
}
case 2 : {
__decode_array<uint16_t>(in, in_len, in_pos, uint_count, out, out_len, out_pos);
__decode_array<uint16_t>(pfor_packing_type, in, in_len, in_pos, uint_count, out, out_len, out_pos);
break;
}
case 4 : {
__decode_array<uint32_t>(in, in_len, in_pos, uint_count, out, out_len, out_pos);
__decode_array<uint32_t>(pfor_packing_type, in, in_len, in_pos, uint_count, out, out_len, out_pos);
break;
}
case 8 : {
__decode_array<uint64_t>(in, in_len, in_pos, uint_count, out, out_len, out_pos);
__decode_array<uint64_t>(pfor_packing_type, in, in_len, in_pos, uint_count, out, out_len, out_pos);
break;
}
default : {
@ -385,11 +391,13 @@ public:
private:
template<typename UIntT>
static OB_INLINE void inner_bit_packing(
const PFoRPackingType pfor_packing_type,
const UIntT *__restrict in,
char *&out,
const uint32_t bit,
const uint64_t out_buf_len)
{
if (PFoRPackingType::CPU_ARCH_DEPENDANT == pfor_packing_type) {
if (4 == sizeof(UIntT) && common::is_arch_supported(ObTargetArch::AVX2)) {
// uint32_t simd packing
uSIMD_fastpackwithoutmask_128_32((uint32_t *)in, reinterpret_cast<__m128i *>(out), bit);
@ -400,6 +408,12 @@ private:
out += BlockSize * bit / 8;
} else {
// uint64_t, uint8_t use scalar packing
// use scalar bit-packing for cpu-arch independant format
uint64_t out_pos = 0;
scalar_bit_packing<UIntT>(in, BlockSize, bit, out, out_buf_len, out_pos);
out += out_pos;
}
} else if (PFoRPackingType::CPU_ARCH_INDEPENDANT_SCALAR == pfor_packing_type) {
uint64_t out_pos = 0;
scalar_bit_packing<UIntT>(in, BlockSize, bit, out, out_buf_len, out_pos);
out += out_pos;
@ -408,12 +422,14 @@ private:
template<typename UIntT>
static OB_INLINE void inner_bit_unpacking(
const PFoRPackingType pfor_packing_type,
const char *&in,
const char *_in,
const uint64_t length,
char *&out,
const uint32_t bit)
{
if (PFoRPackingType::CPU_ARCH_DEPENDANT == pfor_packing_type) {
if (4 == sizeof(UIntT) && common::is_arch_supported(ObTargetArch::AVX2)) {
uSIMD_fastunpack_128_32(reinterpret_cast<const __m128i *>(in), reinterpret_cast<uint32_t *>(out), bit);
in += BlockSize * bit / 8; // convert to byte;
@ -431,6 +447,16 @@ private:
tmp_out, tmp_out_buf_len, tmp_out_pos);
in = _in + in_pos;
}
} else if (PFoRPackingType::CPU_ARCH_INDEPENDANT_SCALAR == pfor_packing_type) {
uint64_t in_pos = in - _in;
uint64_t in_len = length;
uint64_t tmp_out_pos = 0;
uint64_t tmp_out_buf_len = BlockSize;
UIntT *tmp_out = (UIntT *)out;
scalar_bit_unpacking<UIntT>(_in, in_len, in_pos, BlockSize, bit,
tmp_out, tmp_out_buf_len, tmp_out_pos);
in = _in + in_pos;
}
out += BlockSize * sizeof(UIntT);
}
};

View File

@ -48,6 +48,13 @@ class ObTiredCodec : public ObCodec
codec2_.set_uint_bytes(uint_bytes);
}
virtual void set_pfor_packing_type(const PFoRPackingType pfor_packing_type) override
{
ObCodec::set_pfor_packing_type(pfor_packing_type);
codec1_.set_pfor_packing_type(pfor_packing_type);
codec2_.set_pfor_packing_type(pfor_packing_type);
}
virtual int do_encode(const char *in,
const uint64_t in_len,
char *out,

View File

@ -42,9 +42,9 @@ OB_INLINE static void do_encode_one_batch(const uint32_t idx, UIntT &start, cons
template<typename UIntT>
/*OB_INLINE*/ static int encode(
const ObCodec::PFoRPackingType pfor_ptype,
const char *in, uint64_t in_len,
char *out, const uint64_t out_buf_len, uint64_t &out_pos,
UIntT start)
char *out, const uint64_t out_buf_len, uint64_t &out_pos, UIntT start)
{
// performance critical, do not check param(outer has already checked)
int ret = OB_SUCCESS;
@ -78,7 +78,7 @@ template<typename UIntT>
} else {
*(out + out_pos) = b;
out_pos++;
if (OB_FAIL(ObSIMDFixedPFor::__encode_array<UIntT>(v, BSIZE, out, out_buf_len, out_pos))) {
if (OB_FAIL(ObSIMDFixedPFor::__encode_array<UIntT>(pfor_ptype, v, BSIZE, out, out_buf_len, out_pos))) {
LIB_LOG(WARN, "fail to encode array", K(ret), K(out_buf_len), K(out_pos));
} else {
DO_PREFETCH(ip + 512);
@ -131,6 +131,7 @@ OB_INLINE static void do_decode_one_batch(const uint32_t idx, UIntT &start, UInt
template<typename UIntT>
/*OB_INLINE*/ static void decode(
const ObCodec::PFoRPackingType pfor_ptype,
const char *in,
const uint64_t in_len,
uint64_t &pos,
@ -154,7 +155,7 @@ template<typename UIntT>
uint8_t b = *(in + pos);
pos++;
tmp_out_pos = 0;
ObSIMDFixedPFor::__decode_array<UIntT>(in, in_len, pos, BSIZE, tmp_out, tmp_out_len, tmp_out_pos);
ObSIMDFixedPFor::__decode_array<UIntT>(pfor_ptype, in, in_len, pos, BSIZE, tmp_out, tmp_out_len, tmp_out_pos);
for(tv = v; tv != &v[BSIZE]; tv+=4,op+=4) {
do_decode_one_batch<UIntT>(0, start, op, tv, b);
@ -199,21 +200,22 @@ public:
uint64_t &out_pos) override
{
int ret = OB_SUCCESS;
const PFoRPackingType pfor_ptype = get_pfor_packing_type();
switch (get_uint_bytes()) {
case 1 : {
ret = ObXorFixedPforInner::encode<uint8_t>(in, in_len, out, out_len, out_pos, 0);
ret = ObXorFixedPforInner::encode<uint8_t>(pfor_ptype, in, in_len, out, out_len, out_pos, 0);
break;
}
case 2 : {
ret = ObXorFixedPforInner::encode<uint16_t>(in, in_len, out, out_len, out_pos, 0);
ret = ObXorFixedPforInner::encode<uint16_t>(pfor_ptype, in, in_len, out, out_len, out_pos, 0);
break;
}
case 4 : {
ret = ObXorFixedPforInner::encode<uint32_t>(in, in_len, out, out_len, out_pos, 0);
ret = ObXorFixedPforInner::encode<uint32_t>(pfor_ptype, in, in_len, out, out_len, out_pos, 0);
break;
}
case 8 : {
ret = ObXorFixedPforInner::encode<uint64_t>(in, in_len, out, out_len, out_pos, 0);
ret = ObXorFixedPforInner::encode<uint64_t>(pfor_ptype, in, in_len, out, out_len, out_pos, 0);
break;
}
default : {
@ -236,21 +238,22 @@ public:
uint64_t &out_pos) override
{
int ret = OB_SUCCESS;
const PFoRPackingType pfor_ptype = get_pfor_packing_type();
switch (get_uint_bytes()) {
case 1 : {
ObXorFixedPforInner::decode<uint8_t>(in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
ObXorFixedPforInner::decode<uint8_t>(pfor_ptype, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
break;
}
case 2 : {
ObXorFixedPforInner::decode<uint16_t>(in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
ObXorFixedPforInner::decode<uint16_t>(pfor_ptype, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
break;
}
case 4 : {
ObXorFixedPforInner::decode<uint32_t>(in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
ObXorFixedPforInner::decode<uint32_t>(pfor_ptype, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
break;
}
case 8 : {
ObXorFixedPforInner::decode<uint64_t>(in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
ObXorFixedPforInner::decode<uint64_t>(pfor_ptype, in, in_len, in_pos, uint_count, out, out_len, out_pos, 0);
break;
}
default : {

View File

@ -110,14 +110,16 @@ int ObDictColumnEncoder::build_ref_encoder_ctx_()
uint64_t range = 0;
if (is_force_raw_) {
if (OB_FAIL(ref_enc_ctx_.build_unsigned_stream_meta(
0, max_ref_, is_replace_null, null_replaced_value, true, range))) {
0, max_ref_, is_replace_null, null_replaced_value, true,
ctx_->encoding_ctx_->major_working_cluster_version_, range))) {
LOG_WARN("fail to build_unsigned_stream_meta", K(ret));
}
} else {
if (OB_FAIL(try_const_encoding_ref_())) {
LOG_WARN("fail to try_use_const_ref", K(ret));
} else if (OB_FAIL(ref_enc_ctx_.build_unsigned_stream_meta(0, ref_stream_max_value_,
is_replace_null, null_replaced_value, false, range))) {
is_replace_null, null_replaced_value, false,
ctx_->encoding_ctx_->major_working_cluster_version_, range))) {
LOG_WARN("fail to build_unsigned_stream_meta", K(ret));
}
}

View File

@ -80,7 +80,8 @@ int ObIntDictColumnEncoder::build_integer_dict_encoder_ctx_()
const int64_t int_min = static_cast<int64_t>(ctx_->integer_min_);
const int64_t int_max = static_cast<int64_t>(ctx_->integer_max_);
if (OB_FAIL(integer_dict_enc_ctx_.build_signed_stream_meta(int_min, int_max, is_replace_null,
null_replaced_value, precision_width_size_, is_force_raw_, dict_integer_range_))) {
null_replaced_value, precision_width_size_, is_force_raw_,
ctx_->encoding_ctx_->major_working_cluster_version_, dict_integer_range_))) {
LOG_WARN("fail to build_signed_stream_meta", K(ret));
}
} else if (ObUIntSC == store_class_) {
@ -88,7 +89,7 @@ int ObIntDictColumnEncoder::build_integer_dict_encoder_ctx_()
const uint64_t uint_max = static_cast<uint64_t>(ctx_->integer_max_);
if (OB_FAIL(integer_dict_enc_ctx_.build_unsigned_stream_meta(
uint_min, uint_max, is_replace_null, null_replaced_value,
is_force_raw_, dict_integer_range_))) {
is_force_raw_, ctx_->encoding_ctx_->major_working_cluster_version_, dict_integer_range_))) {
LOG_WARN("fail to build_unsigned_stream_meta", K(ret));
}
} else {

View File

@ -87,7 +87,8 @@ int ObIntegerColumnEncoder::do_init_()
LOG_WARN("not init", K(ret));
} else if (row_count_ == ctx_->null_cnt_) { // all datums is null
if (OB_FAIL(enc_ctx_.build_signed_stream_meta(
0, 0, true/*is_replace_null*/, 0, precision_width_size_, is_force_raw_, integer_range_))) {
0, 0, true/*is_replace_null*/, 0, precision_width_size_, is_force_raw_,
ctx_->encoding_ctx_->major_working_cluster_version_, integer_range_))) {
LOG_WARN("fail to build_signed_stream_meta", K(ret));
}
} else if (ObIntSC == store_class_ || ObDecimalIntSC == store_class_) {
@ -212,7 +213,8 @@ int ObIntegerColumnEncoder::build_signed_stream_meta_()
}
if (OB_FAIL(enc_ctx_.build_signed_stream_meta(new_int_min, new_int_max, is_replace_null,
null_replaced_value, precision_width_size_, is_force_raw_, integer_range_))) {
null_replaced_value, precision_width_size_, is_force_raw_,
ctx_->encoding_ctx_->major_working_cluster_version_, integer_range_))) {
LOG_WARN("fail to build_signed_stream_meta", K(ret));
}
}
@ -258,7 +260,7 @@ int ObIntegerColumnEncoder::build_unsigned_encoder_ctx_()
if (OB_FAIL(enc_ctx_.build_unsigned_stream_meta(
new_uint_min, new_uint_max, is_replace_null, null_replaced_value,
is_force_raw_, integer_range_))) {
is_force_raw_, ctx_->encoding_ctx_->major_working_cluster_version_, integer_range_))) {
LOG_WARN("fail to build_unsigned_stream_meta", K(ret));
}
}

View File

@ -139,6 +139,7 @@ private:
uint64_t in_pos = 0;
codec.set_uint_bytes(sizeof(T));
codec.set_pfor_packing_type(ctx.meta_.get_pfor_packing_type());
if (OB_FAIL(codec.decode(in, in_len, in_pos, uint_count, out, out_len, out_pos))) {
STORAGE_LOG(WARN, "fail to deocde array", K(in_len), K(uint_count), K(out_len), KR(ret));
}

View File

@ -88,6 +88,7 @@ private:
uint64_t out_pos = 0;
codec.set_uint_bytes(sizeof(T));
codec.set_pfor_packing_type(ctx_->meta_.get_pfor_packing_type());
const char *in = reinterpret_cast<const char *>(in_arr);
uint64_t in_len = arr_count * sizeof(T);

View File

@ -536,7 +536,8 @@ int ObMicroBlockCSEncoder::store_stream_offsets_(int64_t &stream_offsets_length)
int64_t need_store_size = 0;
ObIntegerStreamEncoderCtx enc_ctx;
uint32_t end_offset = stream_offsets_.at(stream_offsets_.count() - 1);
if (OB_FAIL(enc_ctx.build_offset_array_stream_meta(end_offset, false/*force raw*/))) {
if (OB_FAIL(enc_ctx.build_offset_array_stream_meta(
end_offset, false/*force raw*/, ctx_.major_working_cluster_version_))) {
LOG_WARN("fail to build_offset_array_stream_meta", K(ret));
} else if(OB_FAIL(enc_ctx.build_stream_encoder_info(
false/*has_null*/,

View File

@ -81,7 +81,9 @@ int ObStrDictColumnEncoder::build_string_dict_encoder_ctx_()
ctx_->encoding_ctx_->compressor_type_, is_force_raw_,
&ctx_->encoding_ctx_->cs_encoding_opt_,
ctx_->encoding_ctx_->previous_cs_encoding_.get_column_encoding(column_index_),
int_stream_idx, ctx_->allocator_))) {
int_stream_idx,
ctx_->encoding_ctx_->major_working_cluster_version_,
ctx_->allocator_))) {
LOG_WARN("fail to build_string_stream_encoder_info", K(ret), KPC_(ctx));
}
}

View File

@ -41,6 +41,8 @@ DEFINE_SERIALIZE(ObIntegerStreamMeta)
LOG_WARN("fail to encode null_replaced_value_", K(ret));
} else if (is_decimal_int() && OB_FAIL(serialization::encode_i8(buf, buf_len, pos, decimal_precision_width_))) {
LOG_WARN("fail to encode decimal_precision_width_", K(ret));
} else if (version_ > OB_INTEGER_STREAM_META_V1 && serialization::encode_i8(buf, buf_len, pos, pfor_packing_type_)) {
LOG_WARN("fail to encode pfor_packing_type_", K(ret));
}
return ret;
}
@ -63,6 +65,14 @@ DEFINE_DESERIALIZE(ObIntegerStreamMeta)
} else if (is_decimal_int() && OB_FAIL(serialization::decode_i8(buf, data_len, pos, (int8_t*)&decimal_precision_width_))) {
LOG_WARN("fail to decode decimal_precision_width_", K(ret));
}
if (OB_SUCC(ret) ) {
if (version_ == OB_INTEGER_STREAM_META_V1) {
pfor_packing_type_ = ObCodec::CPU_ARCH_DEPENDANT;
} else if (OB_FAIL(serialization::decode(buf, data_len, pos, pfor_packing_type_))) {
LOG_WARN("fail to decode pfor_packing_type_", K(ret));
}
}
return ret;
}
@ -82,13 +92,16 @@ DEFINE_GET_SERIALIZE_SIZE(ObIntegerStreamMeta)
if (is_decimal_int()) {
len += serialization::encoded_length_i8(decimal_precision_width_);
}
if (version_ > OB_INTEGER_STREAM_META_V1) {
len += serialization::encoded_length(pfor_packing_type_);
}
return len;
}
int ObIntegerStreamEncoderCtx::build_signed_stream_meta(
const int64_t min, const int64_t max, const bool is_replace_null,
const int64_t replace_value, const int64_t precision_width_size,
const bool force_raw, uint64_t &range)
const bool force_raw, const int64_t major_working_cluster_version, uint64_t &range)
{
int ret = OB_SUCCESS;
@ -108,6 +121,12 @@ int ObIntegerStreamEncoderCtx::build_signed_stream_meta(
meta_.set_precision_width_size(precision_width_size);
}
if (major_working_cluster_version >= DATA_VERSION_4_3_3_0) {
meta_.set_pfor_packing_type(ObCodec::CPU_ARCH_INDEPENDANT_SCALAR);
} else {
meta_.set_pfor_packing_type(ObCodec::CPU_ARCH_DEPENDANT);
}
if (min < 0) {
range = max - min;
meta_.set_base_value((uint64_t)min);
@ -128,7 +147,7 @@ int ObIntegerStreamEncoderCtx::build_signed_stream_meta(
int ObIntegerStreamEncoderCtx::build_unsigned_stream_meta(const uint64_t min,
const uint64_t max, const bool is_replace_null, const uint64_t replace_value,
const bool force_raw, uint64_t &range)
const bool force_raw, const int64_t major_working_cluster_version, uint64_t &range)
{
int ret = OB_SUCCESS;
@ -143,6 +162,11 @@ int ObIntegerStreamEncoderCtx::build_unsigned_stream_meta(const uint64_t min,
if (is_replace_null) {
meta_.set_null_replaced_value(replace_value);
}
if (major_working_cluster_version >= DATA_VERSION_4_3_3_0) {
meta_.set_pfor_packing_type(ObCodec::CPU_ARCH_INDEPENDANT_SCALAR);
} else {
meta_.set_pfor_packing_type(ObCodec::CPU_ARCH_DEPENDANT);
}
int64_t uint_max_byte_size = get_byte_packed_int_size(max);
// not use base when there is no negative value
if (OB_FAIL(meta_.set_uint_width_size(uint_max_byte_size))) {
@ -153,12 +177,18 @@ int ObIntegerStreamEncoderCtx::build_unsigned_stream_meta(const uint64_t min,
return ret;
}
int ObIntegerStreamEncoderCtx::build_offset_array_stream_meta(const uint64_t end_offset,
bool force_raw)
const bool force_raw,
const int64_t major_working_cluster_version)
{
int ret = OB_SUCCESS;
if (force_raw) {
meta_.set_raw_encoding();
}
if (major_working_cluster_version >= DATA_VERSION_4_3_3_0) {
meta_.set_pfor_packing_type(ObCodec::CPU_ARCH_INDEPENDANT_SCALAR);
} else {
meta_.set_pfor_packing_type(ObCodec::CPU_ARCH_DEPENDANT);
}
info_.is_monotonic_inc_ = true;
int64_t width_size = get_byte_packed_int_size(end_offset);
if (OB_FAIL(meta_.set_uint_width_size(width_size))) {
@ -276,6 +306,7 @@ int ObStringStreamEncoderCtx::build_string_stream_encoder_info(
const ObCSEncodingOpt *encoding_opt,
const ObPreviousColumnEncoding *previous_encoding,
const int32_t int_stream_idx,
const int64_t major_working_cluster_version,
ObIAllocator *allocator)
{
int ret = OB_SUCCESS;
@ -284,6 +315,7 @@ int ObStringStreamEncoderCtx::build_string_stream_encoder_info(
info_.encoding_opt_ = encoding_opt;
info_.previous_encoding_ = previous_encoding;
info_.int_stream_idx_ = int_stream_idx;
info_.major_working_cluster_version_ = major_working_cluster_version;
info_.allocator_ = allocator;
return ret;
}

View File

@ -15,6 +15,7 @@
#include "lib/utility/ob_print_utils.h"
#include "lib/compress/ob_compress_util.h"
#include "lib/codec/ob_codecs.h"
namespace oceanbase
{
@ -107,13 +108,16 @@ struct ObIntegerStream
struct ObIntegerStreamMeta
{
static constexpr uint8_t OB_INTEGER_STREAM_META_V1 = 0;
static constexpr uint8_t OB_INTEGER_STREAM_META_V2 = 1;
ObIntegerStreamMeta() { reset(); }
OB_INLINE void reset()
{
// all member must be reset, or will checksum error between replicas
memset(this, 0, sizeof(*this));
version_ = OB_INTEGER_STREAM_META_V2;
width_ = ObIntegerStream::UW_MAX_BYTE;
pfor_packing_type_ = ObCodec::CPU_ARCH_INDEPENDANT_SCALAR;
}
OB_INLINE bool is_valid() const
@ -135,6 +139,10 @@ struct ObIntegerStreamMeta
OB_INLINE bool is_raw_encoding() const { return ObIntegerStream::EncodingType::RAW == type_; }
OB_INLINE bool is_unspecified_encoding() const { return ObIntegerStream::EncodingType::MIN_TYPE == type_; }
OB_INLINE bool is_universal_compress_encoding() const { return ObIntegerStream::EncodingType::UNIVERSAL_COMPRESS == type_; }
OB_INLINE ObCodec::PFoRPackingType get_pfor_packing_type() const
{
return static_cast<ObCodec::PFoRPackingType>(pfor_packing_type_);
}
// set
OB_INLINE void set_use_base() { attr_ |= ObIntegerStream::Attribute::USE_BASE; }
@ -165,6 +173,10 @@ struct ObIntegerStreamMeta
OB_INLINE void set_2_byte_width() { width_ = ObIntegerStream::UintWidth::UW_2_BYTE; }
OB_INLINE void set_4_byte_width() { width_ = ObIntegerStream::UintWidth::UW_4_BYTE; }
OB_INLINE void set_8_byte_width() { width_ = ObIntegerStream::UintWidth::UW_8_BYTE; }
OB_INLINE void set_pfor_packing_type(const ObCodec::PFoRPackingType type)
{
pfor_packing_type_ = type;
}
OB_INLINE int set_uint_width_size(const uint32_t byte_size)
@ -257,6 +269,7 @@ struct ObIntegerStreamMeta
uint64_t base_value_;
uint64_t null_replaced_value_;
uint8_t decimal_precision_width_;
uint8_t pfor_packing_type_;
};
struct ObIntegerStreamEncoderInfo
@ -301,11 +314,14 @@ struct ObIntegerStreamEncoderCtx
int build_signed_stream_meta(const int64_t min, const int64_t max,
const bool is_replace_null, const int64_t replace_value,
const int64_t precision_width_size,
const bool force_raw, uint64_t &range);
const bool force_raw,
const int64_t major_working_cluster_version,
uint64_t &range);
int build_unsigned_stream_meta(const uint64_t min, const uint64_t max,
const bool is_replace_null, const uint64_t replace_value,
const bool force_raw, uint64_t &range);
int build_offset_array_stream_meta(const uint64_t end_offset, const bool force_raw);
const bool force_raw, const int64_t major_working_cluster_version, uint64_t &range);
int build_offset_array_stream_meta(const uint64_t end_offset, const bool force_raw,
const int64_t major_working_cluster_version);
int build_stream_encoder_info(const bool has_null,
bool is_monotonic_inc,
const ObCSEncodingOpt *encoding_opt,
@ -376,18 +392,20 @@ struct ObStringStreamEncoderInfo
encoding_opt_ = nullptr;
previous_encoding_ = nullptr;
int_stream_idx_ = -1;
major_working_cluster_version_ = 0;
allocator_ = nullptr;
}
TO_STRING_KV("compressor_type", all_compressor_name[compressor_type_],
K_(raw_encoding_str_offset), KP_(encoding_opt), KPC_(previous_encoding),
K_(int_stream_idx), KP_(allocator));
K_(int_stream_idx), K_(major_working_cluster_version), KP_(allocator));
common::ObCompressorType compressor_type_;
bool raw_encoding_str_offset_;
const ObCSEncodingOpt *encoding_opt_;
const ObPreviousColumnEncoding *previous_encoding_;
int32_t int_stream_idx_;
int64_t major_working_cluster_version_;
ObIAllocator *allocator_;
};
@ -409,6 +427,7 @@ struct ObStringStreamEncoderCtx
const ObCSEncodingOpt *encoding_opt,
const ObPreviousColumnEncoding *previous_encoding,
const int32_t int_stream_idx,
const int64_t major_working_cluster_version,
ObIAllocator *allocator);
TO_STRING_KV(K_(meta), K_(info));

View File

@ -127,7 +127,9 @@ int ObStringColumnEncoder::do_init_()
ctx_->encoding_ctx_->compressor_type_,
is_force_raw_, &ctx_->encoding_ctx_->cs_encoding_opt_,
ctx_->encoding_ctx_->previous_cs_encoding_.get_column_encoding(column_index_),
int_stream_idx, ctx_->allocator_))) {
int_stream_idx,
ctx_->encoding_ctx_->major_working_cluster_version_,
ctx_->allocator_))) {
LOG_WARN("fail to build_string_stream_encoder_info", K(ret));
}

View File

@ -167,7 +167,7 @@ int ObStringStreamEncoder::do_encode_offset_stream_(ObIArray<uint32_t> &stream_o
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "max string offset must equal to total_bytes_len_", K(ret), K(end_offset), KPC_(ctx));
} else if (OB_FAIL(int_ctx_.build_offset_array_stream_meta(
end_offset, ctx_->info_.raw_encoding_str_offset_))) {
end_offset, ctx_->info_.raw_encoding_str_offset_, ctx_->info_.major_working_cluster_version_))) {
STORAGE_LOG(WARN, "fail to build_offset_array_stream_meta", KR(ret));
} else if (OB_FAIL(int_ctx_.build_stream_encoder_info(
false/*has_null*/,

View File

@ -10,6 +10,7 @@ import logging
import getopt
import time
import re
import ctypes
if sys.version_info.major == 3:
def cmp(a, b):
@ -784,6 +785,157 @@ def set_query_timeout(query_cur, timeout):
sql = """set @@session.ob_query_timeout = {0}""".format(timeout * 1000 * 1000)
query_cur.exec_sql(sql)
# Run assembly in python with mmaped byte-code
class ASM:
def __init__(self, restype=None, argtypes=(), machine_code=[]):
self.restype = restype
self.argtypes = argtypes
self.machine_code = machine_code
self.prochandle = None
self.mm = None
self.func = None
self.address = None
self.size = 0
def compile(self):
machine_code = bytes.join(b'', self.machine_code)
self.size = ctypes.c_size_t(len(machine_code))
from mmap import mmap, MAP_PRIVATE, MAP_ANONYMOUS, PROT_WRITE, PROT_READ, PROT_EXEC
# Allocate a private and executable memory segment the size of the machine code
machine_code = bytes.join(b'', self.machine_code)
self.size = len(machine_code)
self.mm = mmap(-1, self.size, flags=MAP_PRIVATE | MAP_ANONYMOUS, prot=PROT_WRITE | PROT_READ | PROT_EXEC)
# Copy the machine code into the memory segment
self.mm.write(machine_code)
self.address = ctypes.addressof(ctypes.c_int.from_buffer(self.mm))
# Cast the memory segment into a function
functype = ctypes.CFUNCTYPE(self.restype, *self.argtypes)
self.func = functype(self.address)
def run(self):
# Call the machine code like a function
retval = self.func()
return retval
def free(self):
# Free the function memory segment
self.mm.close()
self.prochandle = None
self.mm = None
self.func = None
self.address = None
self.size = 0
def run_asm(*machine_code):
asm = ASM(ctypes.c_uint32, (), machine_code)
asm.compile()
retval = asm.run()
asm.free()
return retval
def is_bit_set(reg, bit):
mask = 1 << bit
is_set = reg & mask > 0
return is_set
def get_max_extension_support():
# Check for extension support
max_extension_support = run_asm(
b"\xB8\x00\x00\x00\x80" # mov ax,0x80000000
b"\x0f\xa2" # cpuid
b"\xC3" # ret
)
return max_extension_support
def arch_support_avx2():
bret = False
if (is_x86_arch() and get_max_extension_support() >= 7):
ebx = run_asm(
b"\x31\xC9", # xor ecx,ecx
b"\xB8\x07\x00\x00\x00" # mov eax,7
b"\x0f\xa2" # cpuid
b"\x89\xD8" # mov ax,bx
b"\xC3" # ret
)
bret = is_bit_set(ebx, 5)
return bret
def is_x86_arch():
import platform
arch_string_raw = platform.machine().lower()
bret = False
if re.match(r'^i\d86$|^x86$|^x86_32$|^i86pc$|^ia32$|^ia-32$|^bepc$', arch_string_raw):
# x86_32
bret = True
elif re.match(r'^x64$|^x86_64$|^x86_64t$|^i686-64$|^amd64$|^ia64$|^ia-64$', arch_string_raw):
# x86_64
bret=True
return bret
# 检查cs_encoding格式是否兼容,对小于4.3.3版本的cpu不支持avx2指令集的集群,我们要求升级前schema上不存在cs_encoding的存储格式
# 注意:这里对混布集群 / schema上row_format进行了ddl变更的场景无法做到完全的防御
def check_cs_encoding_arch_dependency_compatiblity(query_cur):
can_upgrade = True
need_check_schema = False
sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
(desc, results) = query_cur.exec_query(sql)
if len(results) != 1:
fail_list.append('min_observer_version is not sync')
elif len(results[0]) != 1:
fail_list.append('column cnt not match')
else:
min_cluster_version = get_version(results[0][0])
if min_cluster_version < get_version("4.3.3.0"):
if (arch_support_avx2()):
logging.info("current cpu support avx2 inst, no need to check cs_encoding format")
else:
get_data_version_sql = """select distinct value from oceanbase.__all_virtual_tenant_parameter_info where name='compatible'"""
(desc, results) = query_cur.exec_query(sql)
if len(results) != 1:
fail_list.append('compatible is not sync')
elif len(results[0]) != 1:
fail_list.append('column cnt not match')
else:
data_version = get_version(results[0][0])
if (data_version < get_version("4.3.0.0")):
logging.info("no need to check cs encoding arch compatibility for data version before version 4.3.0")
else:
logging.info("cpu not support avx2 instruction set, check cs_encoding format in schema")
need_check_schema = True
else:
logging.info("no need to check cs encoding arch compatibility for cluster version after version 4.3.3")
if need_check_schema and can_upgrade:
ck_all_tbl_sql = """select count(1) from __all_virtual_table where row_store_type = 'cs_encoding_row_store'"""
(desc, results) = query_cur.exec_query(ck_all_tbl_sql)
if len(results) != 1:
fail_list.append("all table query row count not match");
elif len(results[0]) != 1:
fail_list.append("all table query column count not match")
elif results[0][0] != 0:
can_upgrade = False
fail_list.append("exist table with row_format cs_encoding_row_store for observer not support avx2 instruction set, table count = {0}".format(results[0][0]));
if need_check_schema and can_upgrade:
ck_all_cg_sql = """select count(distinct table_id) from __all_virtual_column_group where row_store_type = 3"""
(desc, results) = query_cur.exec_query(ck_all_cg_sql)
if len(results) != 1:
fail_list.append("all column group query row count not match");
elif len(results[0]) != 1:
fail_list.append("all column group query column count not match")
elif results[0][0] != 0:
can_upgrade = False
fail_list.append("exist column group with row_format cs_encoding_row_store for observer not support avx2 instruction set, table count = {0}".format(results[0][0]));
if can_upgrade:
logging.info("check upgrade for arch-dependant cs_encoding format success")
else:
logging.info("check upgrade for arch-dependant cs_encoding format failed")
# 开始升级前的检查
def do_check(my_host, my_port, my_user, my_passwd, timeout, upgrade_params):
try:
@ -823,6 +975,7 @@ def do_check(my_host, my_port, my_user, my_passwd, timeout, upgrade_params):
check_variable_binlog_row_image(query_cur)
check_oracle_standby_replication_exist(query_cur)
check_disk_space_for_mds_sstable_compat(query_cur)
check_cs_encoding_arch_dependency_compatiblity(query_cur)
# all check func should execute before check_fail_list
check_fail_list()
modify_server_permanent_offline_time(cur)

View File

@ -1663,6 +1663,7 @@
#import getopt
#import time
#import re
#import ctypes
#
#if sys.version_info.major == 3:
# def cmp(a, b):
@ -2437,6 +2438,157 @@
# sql = """set @@session.ob_query_timeout = {0}""".format(timeout * 1000 * 1000)
# query_cur.exec_sql(sql)
#
## Run assembly in python with mmaped byte-code
#class ASM:
# def __init__(self, restype=None, argtypes=(), machine_code=[]):
# self.restype = restype
# self.argtypes = argtypes
# self.machine_code = machine_code
# self.prochandle = None
# self.mm = None
# self.func = None
# self.address = None
# self.size = 0
#
# def compile(self):
# machine_code = bytes.join(b'', self.machine_code)
# self.size = ctypes.c_size_t(len(machine_code))
# from mmap import mmap, MAP_PRIVATE, MAP_ANONYMOUS, PROT_WRITE, PROT_READ, PROT_EXEC
#
# # Allocate a private and executable memory segment the size of the machine code
# machine_code = bytes.join(b'', self.machine_code)
# self.size = len(machine_code)
# self.mm = mmap(-1, self.size, flags=MAP_PRIVATE | MAP_ANONYMOUS, prot=PROT_WRITE | PROT_READ | PROT_EXEC)
#
# # Copy the machine code into the memory segment
# self.mm.write(machine_code)
# self.address = ctypes.addressof(ctypes.c_int.from_buffer(self.mm))
#
# # Cast the memory segment into a function
# functype = ctypes.CFUNCTYPE(self.restype, *self.argtypes)
# self.func = functype(self.address)
#
# def run(self):
# # Call the machine code like a function
# retval = self.func()
#
# return retval
#
# def free(self):
# # Free the function memory segment
# self.mm.close()
# self.prochandle = None
# self.mm = None
# self.func = None
# self.address = None
# self.size = 0
#
#def run_asm(*machine_code):
# asm = ASM(ctypes.c_uint32, (), machine_code)
# asm.compile()
# retval = asm.run()
# asm.free()
# return retval
#
#def is_bit_set(reg, bit):
# mask = 1 << bit
# is_set = reg & mask > 0
# return is_set
#
#def get_max_extension_support():
# # Check for extension support
# max_extension_support = run_asm(
# b"\xB8\x00\x00\x00\x80" # mov ax,0x80000000
# b"\x0f\xa2" # cpuid
# b"\xC3" # ret
# )
# return max_extension_support
#
#def arch_support_avx2():
# bret = False
# if (is_x86_arch() and get_max_extension_support() >= 7):
# ebx = run_asm(
# b"\x31\xC9", # xor ecx,ecx
# b"\xB8\x07\x00\x00\x00" # mov eax,7
# b"\x0f\xa2" # cpuid
# b"\x89\xD8" # mov ax,bx
# b"\xC3" # ret
# )
# bret = is_bit_set(ebx, 5)
# return bret
#
#def is_x86_arch():
# import platform
# arch_string_raw = platform.machine().lower()
# bret = False
# if re.match(r'^i\d86$|^x86$|^x86_32$|^i86pc$|^ia32$|^ia-32$|^bepc$', arch_string_raw):
# # x86_32
# bret = True
# elif re.match(r'^x64$|^x86_64$|^x86_64t$|^i686-64$|^amd64$|^ia64$|^ia-64$', arch_string_raw):
# # x86_64
# bret=True
# return bret
#
## 检查cs_encoding格式是否兼容,对小于4.3.3版本的cpu不支持avx2指令集的集群,我们要求升级前schema上不存在cs_encoding的存储格式
## 注意:这里对混布集群 / schema上row_format进行了ddl变更的场景无法做到完全的防御
#def check_cs_encoding_arch_dependency_compatiblity(query_cur):
# can_upgrade = True
# need_check_schema = False
# sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
# (desc, results) = query_cur.exec_query(sql)
# if len(results) != 1:
# fail_list.append('min_observer_version is not sync')
# elif len(results[0]) != 1:
# fail_list.append('column cnt not match')
# else:
# min_cluster_version = get_version(results[0][0])
# if min_cluster_version < get_version("4.3.3.0"):
# if (arch_support_avx2()):
# logging.info("current cpu support avx2 inst, no need to check cs_encoding format")
# else:
# get_data_version_sql = """select distinct value from oceanbase.__all_virtual_tenant_parameter_info where name='compatible'"""
# (desc, results) = query_cur.exec_query(sql)
# if len(results) != 1:
# fail_list.append('compatible is not sync')
# elif len(results[0]) != 1:
# fail_list.append('column cnt not match')
# else:
# data_version = get_version(results[0][0])
# if (data_version < get_version("4.3.0.0")):
# logging.info("no need to check cs encoding arch compatibility for data version before version 4.3.0")
# else:
# logging.info("cpu not support avx2 instruction set, check cs_encoding format in schema")
# need_check_schema = True
# else:
# logging.info("no need to check cs encoding arch compatibility for cluster version after version 4.3.3")
#
# if need_check_schema and can_upgrade:
# ck_all_tbl_sql = """select count(1) from __all_virtual_table where row_store_type = 'cs_encoding_row_store'"""
# (desc, results) = query_cur.exec_query(ck_all_tbl_sql)
# if len(results) != 1:
# fail_list.append("all table query row count not match");
# elif len(results[0]) != 1:
# fail_list.append("all table query column count not match")
# elif results[0][0] != 0:
# can_upgrade = False
# fail_list.append("exist table with row_format cs_encoding_row_store for observer not support avx2 instruction set, table count = {0}".format(results[0][0]));
#
# if need_check_schema and can_upgrade:
# ck_all_cg_sql = """select count(distinct table_id) from __all_virtual_column_group where row_store_type = 3"""
# (desc, results) = query_cur.exec_query(ck_all_cg_sql)
# if len(results) != 1:
# fail_list.append("all column group query row count not match");
# elif len(results[0]) != 1:
# fail_list.append("all column group query column count not match")
# elif results[0][0] != 0:
# can_upgrade = False
# fail_list.append("exist column group with row_format cs_encoding_row_store for observer not support avx2 instruction set, table count = {0}".format(results[0][0]));
#
# if can_upgrade:
# logging.info("check upgrade for arch-dependant cs_encoding format success")
# else:
# logging.info("check upgrade for arch-dependant cs_encoding format failed")
#
## 开始升级前的检查
#def do_check(my_host, my_port, my_user, my_passwd, timeout, upgrade_params):
# try:
@ -2476,6 +2628,7 @@
# check_variable_binlog_row_image(query_cur)
# check_oracle_standby_replication_exist(query_cur)
# check_disk_space_for_mds_sstable_compat(query_cur)
# check_cs_encoding_arch_dependency_compatiblity(query_cur)
# # all check func should execute before check_fail_list
# check_fail_list()
# modify_server_permanent_offline_time(cur)

View File

@ -1663,6 +1663,7 @@
#import getopt
#import time
#import re
#import ctypes
#
#if sys.version_info.major == 3:
# def cmp(a, b):
@ -2437,6 +2438,157 @@
# sql = """set @@session.ob_query_timeout = {0}""".format(timeout * 1000 * 1000)
# query_cur.exec_sql(sql)
#
## Run assembly in python with mmaped byte-code
#class ASM:
# def __init__(self, restype=None, argtypes=(), machine_code=[]):
# self.restype = restype
# self.argtypes = argtypes
# self.machine_code = machine_code
# self.prochandle = None
# self.mm = None
# self.func = None
# self.address = None
# self.size = 0
#
# def compile(self):
# machine_code = bytes.join(b'', self.machine_code)
# self.size = ctypes.c_size_t(len(machine_code))
# from mmap import mmap, MAP_PRIVATE, MAP_ANONYMOUS, PROT_WRITE, PROT_READ, PROT_EXEC
#
# # Allocate a private and executable memory segment the size of the machine code
# machine_code = bytes.join(b'', self.machine_code)
# self.size = len(machine_code)
# self.mm = mmap(-1, self.size, flags=MAP_PRIVATE | MAP_ANONYMOUS, prot=PROT_WRITE | PROT_READ | PROT_EXEC)
#
# # Copy the machine code into the memory segment
# self.mm.write(machine_code)
# self.address = ctypes.addressof(ctypes.c_int.from_buffer(self.mm))
#
# # Cast the memory segment into a function
# functype = ctypes.CFUNCTYPE(self.restype, *self.argtypes)
# self.func = functype(self.address)
#
# def run(self):
# # Call the machine code like a function
# retval = self.func()
#
# return retval
#
# def free(self):
# # Free the function memory segment
# self.mm.close()
# self.prochandle = None
# self.mm = None
# self.func = None
# self.address = None
# self.size = 0
#
#def run_asm(*machine_code):
# asm = ASM(ctypes.c_uint32, (), machine_code)
# asm.compile()
# retval = asm.run()
# asm.free()
# return retval
#
#def is_bit_set(reg, bit):
# mask = 1 << bit
# is_set = reg & mask > 0
# return is_set
#
#def get_max_extension_support():
# # Check for extension support
# max_extension_support = run_asm(
# b"\xB8\x00\x00\x00\x80" # mov ax,0x80000000
# b"\x0f\xa2" # cpuid
# b"\xC3" # ret
# )
# return max_extension_support
#
#def arch_support_avx2():
# bret = False
# if (is_x86_arch() and get_max_extension_support() >= 7):
# ebx = run_asm(
# b"\x31\xC9", # xor ecx,ecx
# b"\xB8\x07\x00\x00\x00" # mov eax,7
# b"\x0f\xa2" # cpuid
# b"\x89\xD8" # mov ax,bx
# b"\xC3" # ret
# )
# bret = is_bit_set(ebx, 5)
# return bret
#
#def is_x86_arch():
# import platform
# arch_string_raw = platform.machine().lower()
# bret = False
# if re.match(r'^i\d86$|^x86$|^x86_32$|^i86pc$|^ia32$|^ia-32$|^bepc$', arch_string_raw):
# # x86_32
# bret = True
# elif re.match(r'^x64$|^x86_64$|^x86_64t$|^i686-64$|^amd64$|^ia64$|^ia-64$', arch_string_raw):
# # x86_64
# bret=True
# return bret
#
## 检查cs_encoding格式是否兼容,对小于4.3.3版本的cpu不支持avx2指令集的集群,我们要求升级前schema上不存在cs_encoding的存储格式
## 注意:这里对混布集群 / schema上row_format进行了ddl变更的场景无法做到完全的防御
#def check_cs_encoding_arch_dependency_compatiblity(query_cur):
# can_upgrade = True
# need_check_schema = False
# sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
# (desc, results) = query_cur.exec_query(sql)
# if len(results) != 1:
# fail_list.append('min_observer_version is not sync')
# elif len(results[0]) != 1:
# fail_list.append('column cnt not match')
# else:
# min_cluster_version = get_version(results[0][0])
# if min_cluster_version < get_version("4.3.3.0"):
# if (arch_support_avx2()):
# logging.info("current cpu support avx2 inst, no need to check cs_encoding format")
# else:
# get_data_version_sql = """select distinct value from oceanbase.__all_virtual_tenant_parameter_info where name='compatible'"""
# (desc, results) = query_cur.exec_query(sql)
# if len(results) != 1:
# fail_list.append('compatible is not sync')
# elif len(results[0]) != 1:
# fail_list.append('column cnt not match')
# else:
# data_version = get_version(results[0][0])
# if (data_version < get_version("4.3.0.0")):
# logging.info("no need to check cs encoding arch compatibility for data version before version 4.3.0")
# else:
# logging.info("cpu not support avx2 instruction set, check cs_encoding format in schema")
# need_check_schema = True
# else:
# logging.info("no need to check cs encoding arch compatibility for cluster version after version 4.3.3")
#
# if need_check_schema and can_upgrade:
# ck_all_tbl_sql = """select count(1) from __all_virtual_table where row_store_type = 'cs_encoding_row_store'"""
# (desc, results) = query_cur.exec_query(ck_all_tbl_sql)
# if len(results) != 1:
# fail_list.append("all table query row count not match");
# elif len(results[0]) != 1:
# fail_list.append("all table query column count not match")
# elif results[0][0] != 0:
# can_upgrade = False
# fail_list.append("exist table with row_format cs_encoding_row_store for observer not support avx2 instruction set, table count = {0}".format(results[0][0]));
#
# if need_check_schema and can_upgrade:
# ck_all_cg_sql = """select count(distinct table_id) from __all_virtual_column_group where row_store_type = 3"""
# (desc, results) = query_cur.exec_query(ck_all_cg_sql)
# if len(results) != 1:
# fail_list.append("all column group query row count not match");
# elif len(results[0]) != 1:
# fail_list.append("all column group query column count not match")
# elif results[0][0] != 0:
# can_upgrade = False
# fail_list.append("exist column group with row_format cs_encoding_row_store for observer not support avx2 instruction set, table count = {0}".format(results[0][0]));
#
# if can_upgrade:
# logging.info("check upgrade for arch-dependant cs_encoding format success")
# else:
# logging.info("check upgrade for arch-dependant cs_encoding format failed")
#
## 开始升级前的检查
#def do_check(my_host, my_port, my_user, my_passwd, timeout, upgrade_params):
# try:
@ -2476,6 +2628,7 @@
# check_variable_binlog_row_image(query_cur)
# check_oracle_standby_replication_exist(query_cur)
# check_disk_space_for_mds_sstable_compat(query_cur)
# check_cs_encoding_arch_dependency_compatiblity(query_cur)
# # all check func should execute before check_fail_list
# check_fail_list()
# modify_server_permanent_offline_time(cur)

View File

@ -167,7 +167,7 @@ public:
total_len = datums->count() * fixed_len;
}
ctx.build_string_stream_meta(fixed_len, is_use_zero_len_as_null, total_len);
ctx.build_string_stream_encoder_info(type, false, &encoding_opt, nullptr, -1, &allocator_);
ctx.build_string_stream_encoder_info(type, false, &encoding_opt, nullptr, -1, DATA_VERSION_4_3_2_1, &allocator_);
int64_t bitmap_size = pad8(size);
char *bitmap = new char[bitmap_size];
memset(bitmap, 0, bitmap_size);