// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "exec/s3_writer.h" #include #include #include #include #include "common/logging.h" #include "util/s3_uri.h" #include "util/s3_util.h" namespace doris { #ifndef CHECK_S3_CLIENT #define CHECK_S3_CLIENT(client) \ if (!client) { \ return Status::InternalError("init aws s3 client error."); \ } #endif S3Writer::S3Writer(const std::map& properties, const std::string& path, int64_t start_offset) : _properties(properties), _path(path), _uri(path), _client(ClientFactory::instance().create(_properties)), _temp_file(std::make_shared( std::ios_base::binary | std::ios_base::trunc | std::ios_base::in | std::ios_base::out)) { DCHECK(_client) << "init aws s3 client error."; } S3Writer::~S3Writer() { close(); } Status S3Writer::open() { CHECK_S3_CLIENT(_client); if (!_uri.parse()) { return Status::InvalidArgument("s3 uri is invalid: " + _path); } Aws::S3::Model::HeadObjectRequest request; request.WithBucket(_uri.get_bucket()).WithKey(_uri.get_key()); Aws::S3::Model::HeadObjectOutcome response = _client->HeadObject(request); if (response.IsSuccess()) { return Status::AlreadyExist(_path + " already exists."); } else if (response.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { return Status::OK(); } else { std::stringstream out; out << "Error: [" << response.GetError().GetExceptionName() << ":" << response.GetError().GetMessage(); return Status::InternalError(out.str()); } } Status S3Writer::write(const uint8_t* buf, size_t buf_len, size_t* written_len) { if (buf_len == 0) { *written_len = 0; return Status::OK(); } if (!_temp_file) { return Status::BufferAllocFailed("The internal temporary file is not writable."); } _temp_file->write(reinterpret_cast(buf), buf_len); if (!_temp_file->good()) { return Status::BufferAllocFailed("Could not append to the internal temporary file."); } *written_len = buf_len; return Status::OK(); } Status S3Writer::close() { if (_temp_file) { RETURN_IF_ERROR(_sync()); _temp_file.reset(); } return Status::OK(); } Status S3Writer::_sync() { if (!_temp_file) { return Status::BufferAllocFailed("The internal temporary file is not writable."); } CHECK_S3_CLIENT(_client); Aws::S3::Model::PutObjectRequest request; request.WithBucket(_uri.get_bucket()).WithKey(_uri.get_key()); long offset = _temp_file->tellp(); _temp_file->seekg(0); request.SetBody(_temp_file); request.SetContentLength(offset); auto response = _client->PutObject(request); _temp_file->clear(); _temp_file->seekp(offset); if (response.IsSuccess()) { return Status::OK(); } else { std::stringstream out; out << "Error: [" << response.GetError().GetExceptionName() << ":" << response.GetError().GetMessage(); return Status::InternalError(out.str()); } } } // end namespace doris