602 lines
18 KiB
Go
602 lines
18 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package external
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
goerrors "errors"
|
|
"hash/fnv"
|
|
"io"
|
|
"path"
|
|
"reflect"
|
|
"slices"
|
|
"sort"
|
|
"strconv"
|
|
|
|
"github.com/docker/go-units"
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/tidb/pkg/ingestor/engineapi"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/lightning/log"
|
|
"github.com/pingcap/tidb/pkg/objstore/storeapi"
|
|
"github.com/pingcap/tidb/pkg/util"
|
|
"github.com/pingcap/tidb/pkg/util/hack"
|
|
"github.com/pingcap/tidb/pkg/util/intest"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"github.com/pingcap/tidb/pkg/util/mathutil"
|
|
"go.uber.org/zap/zapcore"
|
|
)
|
|
|
|
const (
|
|
metaName = "meta.json"
|
|
)
|
|
|
|
// seekPropsOffsets reads the statistic files to find the largest offset of
|
|
// corresponding sorted data file such that the key at offset is less than or
|
|
// equal to the given start keys. These returned offsets can be used to seek data
|
|
// file reader, read, parse and skip few smaller keys, and then locate the needed
|
|
// data.
|
|
//
|
|
// Caller can specify multiple ascending keys and seekPropsOffsets will return
|
|
// the offsets list per file for each key.
|
|
func seekPropsOffsets(
|
|
ctx context.Context,
|
|
starts []kv.Key,
|
|
paths []string,
|
|
exStorage storeapi.Storage,
|
|
) (_ [][]uint64, err error) {
|
|
logger := logutil.Logger(ctx)
|
|
task := log.BeginTask(logger, "seek props offsets")
|
|
defer func() {
|
|
task.End(zapcore.ErrorLevel, err)
|
|
}()
|
|
|
|
offsetsPerFile := make([][]uint64, len(paths))
|
|
for i := range offsetsPerFile {
|
|
offsetsPerFile[i] = make([]uint64, len(starts))
|
|
}
|
|
|
|
eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx)
|
|
for i := range paths {
|
|
eg.Go(func() error {
|
|
r, err2 := newStatsReader(egCtx, exStorage, paths[i], 250*1024)
|
|
if err2 != nil {
|
|
if goerrors.Is(err2, io.EOF) {
|
|
return nil
|
|
}
|
|
return errors.Trace(err2)
|
|
}
|
|
defer r.Close()
|
|
|
|
keyIdx := 0
|
|
curKey := starts[keyIdx]
|
|
|
|
p, err3 := r.nextProp()
|
|
for {
|
|
if err3 != nil {
|
|
if goerrors.Is(err3, io.EOF) {
|
|
// fill the rest of the offsets with the last offset
|
|
currOffset := offsetsPerFile[i][keyIdx]
|
|
for keyIdx++; keyIdx < len(starts); keyIdx++ {
|
|
offsetsPerFile[i][keyIdx] = currOffset
|
|
}
|
|
return nil
|
|
}
|
|
return errors.Trace(err3)
|
|
}
|
|
propKey := kv.Key(p.firstKey)
|
|
for propKey.Cmp(curKey) > 0 {
|
|
keyIdx++
|
|
if keyIdx >= len(starts) {
|
|
return nil
|
|
}
|
|
offsetsPerFile[i][keyIdx] = offsetsPerFile[i][keyIdx-1]
|
|
curKey = starts[keyIdx]
|
|
}
|
|
offsetsPerFile[i][keyIdx] = p.offset
|
|
p, err3 = r.nextProp()
|
|
}
|
|
})
|
|
}
|
|
|
|
if err = eg.Wait(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// TODO(lance6716): change the caller so we don't need to transpose the result
|
|
offsetsPerKey := make([][]uint64, len(starts))
|
|
for i := range starts {
|
|
offsetsPerKey[i] = make([]uint64, len(paths))
|
|
for j := range paths {
|
|
offsetsPerKey[i][j] = offsetsPerFile[j][i]
|
|
}
|
|
}
|
|
return offsetsPerKey, nil
|
|
}
|
|
|
|
// GetAllFileNames returns files with the same non-partitioned dir.
|
|
// - for intermediate KV/stat files we store them with a partitioned way to mitigate
|
|
// limitation on Cloud, see randPartitionedPrefix for how we partition the files.
|
|
// - for meta files, we store them directly under the non-partitioned dir.
|
|
//
|
|
// for example, if nonPartitionedDir is '30001', the files returned might be
|
|
// - 30001/6/meta.json
|
|
// - 30001/7/meta.json
|
|
// - 30001/plan/ingest/1/meta.json
|
|
// - 30001/plan/merge-sort/1/meta.json
|
|
// - p00110000/30001/7/617527bf-e25d-4312-8784-4a4576eb0195_stat/one-file
|
|
// - p00000000/30001/7/617527bf-e25d-4312-8784-4a4576eb0195/one-file
|
|
func GetAllFileNames(
|
|
ctx context.Context,
|
|
store storeapi.Storage,
|
|
nonPartitionedDir string,
|
|
) ([]string, error) {
|
|
var data []string
|
|
|
|
err := store.WalkDir(ctx,
|
|
&storeapi.WalkOption{},
|
|
func(path string, size int64) error {
|
|
// extract the first dir
|
|
bs := hack.Slice(path)
|
|
firstIdx := bytes.IndexByte(bs, '/')
|
|
if firstIdx == -1 {
|
|
return nil
|
|
}
|
|
|
|
firstDir := bs[:firstIdx]
|
|
if string(firstDir) == nonPartitionedDir {
|
|
data = append(data, path)
|
|
return nil
|
|
}
|
|
|
|
if !isValidPartition(firstDir) {
|
|
return nil
|
|
}
|
|
secondIdx := bytes.IndexByte(bs[firstIdx+1:], '/')
|
|
if secondIdx == -1 {
|
|
return nil
|
|
}
|
|
secondDir := path[firstIdx+1 : firstIdx+1+secondIdx]
|
|
|
|
if secondDir == nonPartitionedDir {
|
|
data = append(data, path)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// in case the external storage does not guarantee the order of walk
|
|
sort.Strings(data)
|
|
return data, nil
|
|
}
|
|
|
|
// CleanUpFiles delete all data and stat files under the same non-partitioned dir.
|
|
// see randPartitionedPrefix for how we partition the files.
|
|
func CleanUpFiles(ctx context.Context, store storeapi.Storage, nonPartitionedDir string) error {
|
|
failpoint.Inject("skipCleanUpFiles", func() {
|
|
failpoint.Return(nil)
|
|
})
|
|
names, err := GetAllFileNames(ctx, store, nonPartitionedDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return store.DeleteFiles(ctx, names)
|
|
}
|
|
|
|
// MockExternalEngine generates an external engine with the given keys and values.
|
|
func MockExternalEngine(
|
|
storage storeapi.Storage,
|
|
keys [][]byte,
|
|
values [][]byte,
|
|
) (dataFiles []string, statsFiles []string, err error) {
|
|
var summary *WriterSummary
|
|
writer := NewWriterBuilder().
|
|
SetMemorySizeLimit(10*(lengthBytes*2+10)).
|
|
SetBlockSize(10*(lengthBytes*2+10)).
|
|
SetPropSizeDistance(32).
|
|
SetPropKeysDistance(4).
|
|
SetOnCloseFunc(func(s *WriterSummary) { summary = s }).
|
|
Build(storage, "/mock-test", "0")
|
|
ctx := context.Background()
|
|
for i := range keys {
|
|
err := writer.WriteRow(ctx, keys[i], values[i], nil)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
err = writer.Close(ctx)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
for _, ms := range summary.MultipleFilesStats {
|
|
for _, f := range ms.Filenames {
|
|
dataFiles = append(dataFiles, f[0])
|
|
statsFiles = append(statsFiles, f[1])
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// EndpointTp is the type of Endpoint.Key.
|
|
type EndpointTp int
|
|
|
|
const (
|
|
// ExclusiveEnd represents "..., Endpoint.Key)".
|
|
ExclusiveEnd EndpointTp = iota
|
|
// InclusiveStart represents "[Endpoint.Key, ...".
|
|
InclusiveStart
|
|
// InclusiveEnd represents "..., Endpoint.Key]".
|
|
InclusiveEnd
|
|
)
|
|
|
|
// Endpoint represents an endpoint of an interval which can be used by GetMaxOverlapping.
|
|
type Endpoint struct {
|
|
Key []byte
|
|
Tp EndpointTp
|
|
Weight int64 // all EndpointTp use positive weight
|
|
}
|
|
|
|
// GetMaxOverlapping returns the maximum overlapping weight treating given
|
|
// `points` as endpoints of intervals. `points` are not required to be sorted,
|
|
// and will be sorted in-place in this function.
|
|
func GetMaxOverlapping(points []Endpoint) int64 {
|
|
slices.SortFunc(points, func(i, j Endpoint) int {
|
|
if cmp := bytes.Compare(i.Key, j.Key); cmp != 0 {
|
|
return cmp
|
|
}
|
|
return int(i.Tp) - int(j.Tp)
|
|
})
|
|
var maxWeight int64
|
|
var curWeight int64
|
|
for _, p := range points {
|
|
switch p.Tp {
|
|
case InclusiveStart:
|
|
curWeight += p.Weight
|
|
case ExclusiveEnd, InclusiveEnd:
|
|
curWeight -= p.Weight
|
|
}
|
|
if curWeight > maxWeight {
|
|
maxWeight = curWeight
|
|
}
|
|
}
|
|
return maxWeight
|
|
}
|
|
|
|
// SortedKVMeta is the meta of sorted kv.
|
|
type SortedKVMeta struct {
|
|
StartKey []byte `json:"start-key"`
|
|
EndKey []byte `json:"end-key"` // exclusive
|
|
TotalKVSize uint64 `json:"total-kv-size"`
|
|
TotalKVCnt uint64 `json:"total-kv-cnt"`
|
|
MultipleFilesStats []MultipleFilesStat `json:"multiple-files-stats"`
|
|
ConflictInfo engineapi.ConflictInfo `json:"conflict-info"`
|
|
}
|
|
|
|
// NewSortedKVMeta creates a SortedKVMeta from a WriterSummary. If the summary
|
|
// is empty, it will return a pointer to zero SortedKVMeta.
|
|
func NewSortedKVMeta(summary *WriterSummary) *SortedKVMeta {
|
|
if summary == nil || (len(summary.Min) == 0 && len(summary.Max) == 0) {
|
|
return &SortedKVMeta{}
|
|
}
|
|
return &SortedKVMeta{
|
|
StartKey: summary.Min.Clone(),
|
|
EndKey: summary.Max.Clone().Next(),
|
|
TotalKVSize: summary.TotalSize,
|
|
TotalKVCnt: summary.TotalCnt,
|
|
MultipleFilesStats: summary.MultipleFilesStats,
|
|
ConflictInfo: summary.ConflictInfo,
|
|
}
|
|
}
|
|
|
|
// Merge merges the other SortedKVMeta into this one.
|
|
func (m *SortedKVMeta) Merge(other *SortedKVMeta) {
|
|
if len(other.StartKey) == 0 && len(other.EndKey) == 0 {
|
|
return
|
|
}
|
|
if len(m.StartKey) == 0 && len(m.EndKey) == 0 {
|
|
*m = *other
|
|
return
|
|
}
|
|
|
|
m.StartKey = BytesMin(m.StartKey, other.StartKey)
|
|
m.EndKey = BytesMax(m.EndKey, other.EndKey)
|
|
m.TotalKVSize += other.TotalKVSize
|
|
m.TotalKVCnt += other.TotalKVCnt
|
|
|
|
m.MultipleFilesStats = append(m.MultipleFilesStats, other.MultipleFilesStats...)
|
|
m.ConflictInfo.Merge(&other.ConflictInfo)
|
|
}
|
|
|
|
// MergeSummary merges the WriterSummary into this SortedKVMeta.
|
|
func (m *SortedKVMeta) MergeSummary(summary *WriterSummary) {
|
|
m.Merge(NewSortedKVMeta(summary))
|
|
}
|
|
|
|
// GetDataFiles returns all data files in the meta.
|
|
func (m *SortedKVMeta) GetDataFiles() []string {
|
|
var ret []string
|
|
for _, stat := range m.MultipleFilesStats {
|
|
for _, files := range stat.Filenames {
|
|
ret = append(ret, files[0])
|
|
}
|
|
}
|
|
return ret
|
|
}
|
|
|
|
// GetStatFiles returns all stat files in the meta.
|
|
func (m *SortedKVMeta) GetStatFiles() []string {
|
|
var ret []string
|
|
for _, stat := range m.MultipleFilesStats {
|
|
for _, files := range stat.Filenames {
|
|
ret = append(ret, files[1])
|
|
}
|
|
}
|
|
return ret
|
|
}
|
|
|
|
// BytesMin returns the smallest of byte slice a and b.
|
|
func BytesMin(a, b []byte) []byte {
|
|
if bytes.Compare(a, b) < 0 {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
// BytesMax returns the largest of byte slice a and b.
|
|
func BytesMax(a, b []byte) []byte {
|
|
if bytes.Compare(a, b) > 0 {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func getSpeed(n uint64, dur float64, isBytes bool) string {
|
|
if dur == 0 {
|
|
return "-"
|
|
}
|
|
if isBytes {
|
|
return units.BytesSize(float64(n) / dur)
|
|
}
|
|
return strconv.FormatFloat(float64(n)/dur, 'f', 4, 64)
|
|
}
|
|
|
|
// marshalWithOverride marshals the provided struct with the ability to override
|
|
func marshalWithOverride(src any, hideCond func(f reflect.StructField) bool) ([]byte, error) {
|
|
v := reflect.ValueOf(src)
|
|
if v.Kind() == reflect.Ptr {
|
|
if v.IsNil() {
|
|
return json.Marshal(src)
|
|
}
|
|
v = v.Elem()
|
|
}
|
|
if v.Kind() != reflect.Struct {
|
|
return json.Marshal(src)
|
|
}
|
|
t := v.Type()
|
|
var fields []reflect.StructField
|
|
for i := range t.NumField() {
|
|
f := t.Field(i)
|
|
if !f.IsExported() {
|
|
continue
|
|
}
|
|
newTag := f.Tag
|
|
if hideCond(f) {
|
|
newTag = `json:"-"`
|
|
}
|
|
fields = append(fields, reflect.StructField{
|
|
Name: f.Name,
|
|
Type: f.Type,
|
|
Tag: newTag,
|
|
Offset: f.Offset,
|
|
Anonymous: f.Anonymous,
|
|
})
|
|
}
|
|
newType := reflect.StructOf(fields)
|
|
newVal := reflect.New(newType).Elem()
|
|
j := 0
|
|
for i := range t.NumField() {
|
|
f := t.Field(i)
|
|
if !f.IsExported() {
|
|
continue
|
|
}
|
|
newVal.Field(j).Set(v.Field(i))
|
|
j++
|
|
}
|
|
return json.Marshal(newVal.Interface())
|
|
}
|
|
|
|
// marshalInternalFields marshal all fields except those with external:"true" tag.
|
|
func marshalInternalFields(src any) ([]byte, error) {
|
|
return marshalWithOverride(src, func(f reflect.StructField) bool {
|
|
return f.Tag.Get("external") == "true"
|
|
})
|
|
}
|
|
|
|
// marshalExternalFields marshal all fields with external:"true" tag.
|
|
func marshalExternalFields(src any) ([]byte, error) {
|
|
return marshalWithOverride(src, func(f reflect.StructField) bool {
|
|
return f.Tag.Get("external") != "true"
|
|
})
|
|
}
|
|
|
|
// BaseExternalMeta is the base meta of external meta.
|
|
type BaseExternalMeta struct {
|
|
// ExternalPath is the path to the external storage where the external meta is stored.
|
|
ExternalPath string
|
|
}
|
|
|
|
// Marshal serializes the provided alias to JSON.
|
|
// Usage: If ExternalPath is set, marshals using internal meta; otherwise marshals the alias directly.
|
|
func (m BaseExternalMeta) Marshal(alias any) ([]byte, error) {
|
|
if m.ExternalPath == "" {
|
|
return json.Marshal(alias)
|
|
}
|
|
return marshalInternalFields(alias)
|
|
}
|
|
|
|
// WriteJSONToExternalStorage writes the serialized external meta JSON to external storage.
|
|
// Usage: Store external meta after appropriate modifications.
|
|
func (m BaseExternalMeta) WriteJSONToExternalStorage(ctx context.Context, store storeapi.Storage, a any) error {
|
|
if m.ExternalPath == "" {
|
|
return nil
|
|
}
|
|
data, err := marshalExternalFields(a)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
return store.WriteFile(ctx, m.ExternalPath, data)
|
|
}
|
|
|
|
// ReadJSONFromExternalStorage reads and unmarshals JSON from external storage into the provided alias.
|
|
// Usage: Retrieve external meta for further processing.
|
|
func (m BaseExternalMeta) ReadJSONFromExternalStorage(ctx context.Context, store storeapi.Storage, a any) error {
|
|
if m.ExternalPath == "" {
|
|
return nil
|
|
}
|
|
data, err := store.ReadFile(ctx, m.ExternalPath)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
return json.Unmarshal(data, a)
|
|
}
|
|
|
|
// PlanMetaPath returns the path of the plan meta file.
|
|
func PlanMetaPath(taskID int64, step string, idx int) string {
|
|
return path.Join(strconv.FormatInt(taskID, 10), "plan", step, strconv.Itoa(idx), metaName)
|
|
}
|
|
|
|
// SubtaskMetaPath returns the path of the subtask meta file.
|
|
func SubtaskMetaPath(taskID int64, subtaskID int64) string {
|
|
return path.Join(strconv.FormatInt(taskID, 10), strconv.FormatInt(subtaskID, 10), metaName)
|
|
}
|
|
|
|
// remove all duplicates inside sorted array in place, i.e. input elements will be changed.
|
|
func removeDuplicates[E any](in []E, keyGetter func(*E) []byte, recordRemoved bool) ([]E, []E, int) {
|
|
return doRemoveDuplicates(in, keyGetter, 0, recordRemoved)
|
|
}
|
|
|
|
// remove all duplicates inside sorted array in place if the duplicate count is
|
|
// more than 2, and keep the first two duplicates.
|
|
// we also return the total number of duplicates as the third return value.
|
|
func removeDuplicatesMoreThanTwo[E any](in []E, keyGetter func(*E) []byte) (out []E, removed []E, totalDup int) {
|
|
return doRemoveDuplicates(in, keyGetter, 2, true)
|
|
}
|
|
|
|
// remove duplicates inside the sorted slice 'in', if keptDupCnt=2, we keep the
|
|
// first 2 duplicates, if keptDupCnt=0, we remove all duplicates.
|
|
// removed duplicates are returned in 'removed' if recordRemoved=true.
|
|
// we also return the total number of duplicates, either it's removed or not, as
|
|
// the third return value.
|
|
func doRemoveDuplicates[E any](
|
|
in []E,
|
|
keyGetter func(*E) []byte,
|
|
keptDupCnt int,
|
|
recordRemoved bool,
|
|
) (out []E, removed []E, totalDup int) {
|
|
intest.Assert(keptDupCnt == 0 || keptDupCnt == 2, "keptDupCnt must be 0 or 2")
|
|
if len(in) <= 1 {
|
|
return in, []E{}, 0
|
|
}
|
|
pivotIdx, fillIdx := 0, 0
|
|
pivot := keyGetter(&in[pivotIdx])
|
|
if recordRemoved {
|
|
removed = make([]E, 0, 2)
|
|
}
|
|
for idx := 1; idx <= len(in); idx++ {
|
|
var key []byte
|
|
if idx < len(in) {
|
|
key = keyGetter(&in[idx])
|
|
if bytes.Compare(pivot, key) == 0 {
|
|
continue
|
|
}
|
|
}
|
|
dupCount := idx - pivotIdx
|
|
if dupCount >= 2 {
|
|
totalDup += dupCount
|
|
// keep the first keptDupCnt duplicates, and remove the rest
|
|
for startIdx := pivotIdx; startIdx < pivotIdx+keptDupCnt; startIdx++ {
|
|
if startIdx != fillIdx {
|
|
in[fillIdx] = in[startIdx]
|
|
}
|
|
fillIdx++
|
|
}
|
|
if recordRemoved {
|
|
removed = append(removed, in[pivotIdx+keptDupCnt:idx]...)
|
|
}
|
|
} else {
|
|
if pivotIdx != fillIdx {
|
|
in[fillIdx] = in[pivotIdx]
|
|
}
|
|
fillIdx++
|
|
}
|
|
pivotIdx = idx
|
|
pivot = key
|
|
}
|
|
return in[:fillIdx], removed, totalDup
|
|
}
|
|
|
|
// DivideMergeSortDataFiles divides the data files into multiple groups for
|
|
// merge sort. Each group will be assigned to a node for sorting.
|
|
// The number of files in each group is limited to MaxMergeSortFileCountStep.
|
|
func DivideMergeSortDataFiles(dataFiles []string, nodeCnt int, mergeConc int) ([][]string, error) {
|
|
if nodeCnt == 0 {
|
|
return nil, errors.Errorf("unsupported zero node count")
|
|
}
|
|
if len(dataFiles) == 0 {
|
|
return [][]string{}, nil
|
|
}
|
|
adjustedMergeSortFileCountStep := GetAdjustedMergeSortFileCountStep(mergeConc)
|
|
dataFilesCnt := len(dataFiles)
|
|
result := make([][]string, 0, nodeCnt)
|
|
batches := len(dataFiles) / adjustedMergeSortFileCountStep
|
|
rounds := batches / nodeCnt
|
|
for range rounds * nodeCnt {
|
|
result = append(result, dataFiles[:adjustedMergeSortFileCountStep])
|
|
dataFiles = dataFiles[adjustedMergeSortFileCountStep:]
|
|
}
|
|
remainder := dataFilesCnt - (nodeCnt * rounds * adjustedMergeSortFileCountStep)
|
|
if remainder == 0 {
|
|
return result, nil
|
|
}
|
|
// adjust node cnt for remainder files to avoid having too much target files.
|
|
adjustNodeCnt := nodeCnt
|
|
maxTargetFilesPerSubtask := max(MergeSortMaxSubtaskTargetFiles, mergeConc)
|
|
for (rounds*nodeCnt*maxTargetFilesPerSubtask)+(adjustNodeCnt*maxTargetFilesPerSubtask) > int(GetAdjustedMergeSortOverlapThreshold(mergeConc)) {
|
|
adjustNodeCnt--
|
|
if adjustNodeCnt == 0 {
|
|
return nil, errors.Errorf("unexpected zero node count, dataFiles=%d, nodeCnt=%d", dataFilesCnt, nodeCnt)
|
|
}
|
|
}
|
|
minimalFileCount := 32 // Each subtask should merge at least 32 files.
|
|
adjustNodeCnt = max(min(remainder/minimalFileCount, adjustNodeCnt), 1)
|
|
sizes := mathutil.Divide2Batches(remainder, adjustNodeCnt)
|
|
for _, s := range sizes {
|
|
result = append(result, dataFiles[:s])
|
|
dataFiles = dataFiles[s:]
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func getHash(s string) int64 {
|
|
h := fnv.New64a()
|
|
// this hash function never return error
|
|
_, _ = h.Write([]byte(s))
|
|
return int64(h.Sum64())
|
|
}
|