156 lines
4.9 KiB
Go
156 lines
4.9 KiB
Go
// Copyright 2020 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package objstore
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
|
|
"github.com/pingcap/errors"
|
|
berrors "github.com/pingcap/tidb/br/pkg/errors"
|
|
"github.com/pingcap/tidb/pkg/objstore/objectio"
|
|
"github.com/pingcap/tidb/pkg/objstore/storeapi"
|
|
)
|
|
|
|
// HDFSStorage represents HDFS storage.
|
|
type HDFSStorage struct {
|
|
remote string
|
|
}
|
|
|
|
// NewHDFSStorage creates a new HDFS storage.
|
|
func NewHDFSStorage(remote string) *HDFSStorage {
|
|
return &HDFSStorage{
|
|
remote: remote,
|
|
}
|
|
}
|
|
|
|
func getHdfsBin() (string, error) {
|
|
hadoopHome, ok := os.LookupEnv("HADOOP_HOME")
|
|
if !ok {
|
|
return "", errors.Annotatef(berrors.ErrEnvNotSpecified, "please specify environment variable HADOOP_HOME")
|
|
}
|
|
return filepath.Join(hadoopHome, "bin/hdfs"), nil
|
|
}
|
|
|
|
func getLinuxUser() (string, bool) {
|
|
return os.LookupEnv("HADOOP_LINUX_USER")
|
|
}
|
|
|
|
func dfsCommand(args ...string) (*exec.Cmd, error) {
|
|
bin, err := getHdfsBin()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cmd := []string{}
|
|
user, ok := getLinuxUser()
|
|
if ok {
|
|
cmd = append(cmd, "sudo", "-u", user)
|
|
}
|
|
cmd = append(cmd, bin, "dfs")
|
|
cmd = append(cmd, args...)
|
|
//nolint:gosec
|
|
return exec.Command(cmd[0], cmd[1:]...), nil
|
|
}
|
|
|
|
// WriteFile writes a complete file to storage, similar to os.WriteFile
|
|
func (s *HDFSStorage) WriteFile(_ context.Context, name string, data []byte) error {
|
|
filePath := fmt.Sprintf("%s/%s", s.remote, name)
|
|
cmd, err := dfsCommand("-put", "-", filePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
buf := bytes.Buffer{}
|
|
buf.Write(data)
|
|
cmd.Stdin = &buf
|
|
|
|
out, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
return errors.Annotate(err, string(out))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ReadFile reads a complete file from storage, similar to os.ReadFile
|
|
func (*HDFSStorage) ReadFile(_ context.Context, _ string) ([]byte, error) {
|
|
return nil, errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup")
|
|
}
|
|
|
|
// FileExists return true if file exists
|
|
func (s *HDFSStorage) FileExists(_ context.Context, name string) (bool, error) {
|
|
filePath := fmt.Sprintf("%s/%s", s.remote, name)
|
|
cmd, err := dfsCommand("-ls", filePath)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
out, err := cmd.CombinedOutput()
|
|
if _, ok := err.(*exec.ExitError); ok {
|
|
// Successfully exit with non-zero value
|
|
return false, nil
|
|
}
|
|
if err != nil {
|
|
return false, errors.Annotate(err, string(out))
|
|
}
|
|
// Successfully exit with zero value
|
|
return true, nil
|
|
}
|
|
|
|
// DeleteFile delete the file in storage
|
|
func (*HDFSStorage) DeleteFile(_ context.Context, _ string) error {
|
|
return errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup")
|
|
}
|
|
|
|
// DeleteFiles deletes files in storage
|
|
func (*HDFSStorage) DeleteFiles(_ context.Context, _ []string) error {
|
|
return errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup")
|
|
}
|
|
|
|
// Open a Reader by file path. path is relative path to storage base path
|
|
func (*HDFSStorage) Open(_ context.Context, _ string, _ *storeapi.ReaderOption) (objectio.Reader, error) {
|
|
return nil, errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup")
|
|
}
|
|
|
|
// WalkDir traverse all the files in a dir.
|
|
//
|
|
// fn is the function called for each regular file visited by WalkDir.
|
|
// The argument `path` is the file path that can be used in `Open`
|
|
// function; the argument `size` is the size in byte of the file determined
|
|
// by path.
|
|
func (*HDFSStorage) WalkDir(_ context.Context, _ *storeapi.WalkOption, _ func(path string, size int64) error) error {
|
|
return errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup")
|
|
}
|
|
|
|
// URI returns the base path as a URI
|
|
func (s *HDFSStorage) URI() string {
|
|
return s.remote
|
|
}
|
|
|
|
// Create opens a file writer by path. path is relative path to storage base path
|
|
func (*HDFSStorage) Create(_ context.Context, _ string, _ *storeapi.WriterOption) (objectio.Writer, error) {
|
|
return nil, errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup")
|
|
}
|
|
|
|
// Rename a file name from oldFileName to newFileName.
|
|
func (*HDFSStorage) Rename(_ context.Context, _, _ string) error {
|
|
return errors.Annotatef(berrors.ErrUnsupportedOperation, "currently HDFS backend only support rawkv backup")
|
|
}
|
|
|
|
// Close implements Storage interface.
|
|
func (*HDFSStorage) Close() {}
|