// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "runtime/stream_load/stream_load_pipe.h" #include #include #include "util/monotime.h" namespace doris { class StreamLoadPipeTest : public testing::Test { public: StreamLoadPipeTest() {} virtual ~StreamLoadPipeTest() {} void SetUp() override {} }; TEST_F(StreamLoadPipeTest, append_buffer) { StreamLoadPipe pipe(66, 64); auto appender = [&pipe] { int k = 0; for (int i = 0; i < 2; ++i) { auto byte_buf = ByteBuffer::allocate(64); char buf[64]; for (int j = 0; j < 64; ++j) { buf[j] = '0' + (k++ % 10); } byte_buf->put_bytes(buf, 64); byte_buf->flip(); pipe.append(byte_buf); } pipe.finish(); }; std::thread t1(appender); char buf[256]; size_t buf_len = 256; bool eof = false; auto st = pipe.read((uint8_t*)buf, &buf_len, &eof); ASSERT_TRUE(st.ok()); ASSERT_EQ(128, buf_len); ASSERT_FALSE(eof); for (int i = 0; i < 128; ++i) { ASSERT_EQ('0' + (i % 10), buf[i]); } st = pipe.read((uint8_t*)buf, &buf_len, &eof); ASSERT_TRUE(st.ok()); ASSERT_EQ(0, buf_len); ASSERT_TRUE(eof); t1.join(); } TEST_F(StreamLoadPipeTest, append_bytes) { StreamLoadPipe pipe(66, 64); auto appender = [&pipe] { for (int i = 0; i < 128; ++i) { char buf = '0' + (i % 10); pipe.append(&buf, 1); } pipe.finish(); }; std::thread t1(appender); char buf[256]; size_t buf_len = 256; bool eof = false; auto st = pipe.read((uint8_t*)buf, &buf_len, &eof); ASSERT_TRUE(st.ok()); ASSERT_EQ(128, buf_len); ASSERT_FALSE(eof); for (int i = 0; i < 128; ++i) { ASSERT_EQ('0' + (i % 10), buf[i]); } st = pipe.read((uint8_t*)buf, &buf_len, &eof); ASSERT_TRUE(st.ok()); ASSERT_EQ(0, buf_len); ASSERT_TRUE(eof); t1.join(); } TEST_F(StreamLoadPipeTest, append_bytes2) { StreamLoadPipe pipe(66, 64); auto appender = [&pipe] { for (int i = 0; i < 128; ++i) { char buf = '0' + (i % 10); pipe.append(&buf, 1); } pipe.finish(); }; std::thread t1(appender); char buf[128]; size_t buf_len = 62; bool eof = false; auto st = pipe.read((uint8_t*)buf, &buf_len, &eof); ASSERT_TRUE(st.ok()); ASSERT_EQ(62, buf_len); ASSERT_FALSE(eof); for (int i = 0; i < 62; ++i) { ASSERT_EQ('0' + (i % 10), buf[i]); } for (int i = 62; i < 128; ++i) { char ch; buf_len = 1; auto st = pipe.read((uint8_t*)&ch, &buf_len, &eof); ASSERT_TRUE(st.ok()); ASSERT_EQ(1, buf_len); ASSERT_FALSE(eof); ASSERT_EQ('0' + (i % 10), ch); } st = pipe.read((uint8_t*)buf, &buf_len, &eof); ASSERT_TRUE(st.ok()); ASSERT_EQ(0, buf_len); ASSERT_TRUE(eof); t1.join(); } TEST_F(StreamLoadPipeTest, append_mix) { StreamLoadPipe pipe(66, 64); auto appender = [&pipe] { // 10 int k = 0; for (int i = 0; i < 10; ++i) { char buf = '0' + (k++ % 10); pipe.append(&buf, 1); } // 60 { auto byte_buf = ByteBuffer::allocate(60); char buf[60]; for (int j = 0; j < 60; ++j) { buf[j] = '0' + (k++ % 10); } byte_buf->put_bytes(buf, 60); byte_buf->flip(); pipe.append(byte_buf); } // 8 for (int i = 0; i < 8; ++i) { char buf = '0' + (k++ % 10); pipe.append(&buf, 1); } // 50 { auto byte_buf = ByteBuffer::allocate(50); char buf[50]; for (int j = 0; j < 50; ++j) { buf[j] = '0' + (k++ % 10); } byte_buf->put_bytes(buf, 50); byte_buf->flip(); pipe.append(byte_buf); } pipe.finish(); }; std::thread t1(appender); char buf[128]; size_t buf_len = 128; bool eof = false; auto st = pipe.read((uint8_t*)buf, &buf_len, &eof); ASSERT_TRUE(st.ok()); ASSERT_EQ(128, buf_len); ASSERT_FALSE(eof); for (int i = 0; i < 128; ++i) { ASSERT_EQ('0' + (i % 10), buf[i]); } st = pipe.read((uint8_t*)buf, &buf_len, &eof); ASSERT_TRUE(st.ok()); ASSERT_EQ(0, buf_len); ASSERT_TRUE(eof); t1.join(); } TEST_F(StreamLoadPipeTest, cancel) { StreamLoadPipe pipe(66, 64); auto appender = [&pipe] { int k = 0; for (int i = 0; i < 10; ++i) { char buf = '0' + (k++ % 10); pipe.append(&buf, 1); } SleepFor(MonoDelta::FromMilliseconds(100)); pipe.cancel(); }; std::thread t1(appender); char buf[128]; size_t buf_len = 128; bool eof = false; auto st = pipe.read((uint8_t*)buf, &buf_len, &eof); ASSERT_FALSE(st.ok()); t1.join(); } TEST_F(StreamLoadPipeTest, close) { StreamLoadPipe pipe(66, 64); auto appender = [&pipe] { int k = 0; { auto byte_buf = ByteBuffer::allocate(64); char buf[64]; for (int j = 0; j < 64; ++j) { buf[j] = '0' + (k++ % 10); } byte_buf->put_bytes(buf, 64); byte_buf->flip(); pipe.append(byte_buf); } { auto byte_buf = ByteBuffer::allocate(64); char buf[64]; for (int j = 0; j < 64; ++j) { buf[j] = '0' + (k++ % 10); } byte_buf->put_bytes(buf, 64); byte_buf->flip(); auto st = pipe.append(byte_buf); ASSERT_FALSE(st.ok()); } }; std::thread t1(appender); SleepFor(MonoDelta::FromMilliseconds(10)); pipe.close(); t1.join(); } } // namespace doris int main(int argc, char* argv[]) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }