188 lines
4.1 KiB
Go

package bucket_schema
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"github.com/influxdata/influx-cli/v2/api"
"github.com/influxdata/influx-cli/v2/clients"
)
type Client struct {
api.BucketsApi
api.BucketSchemasApi
clients.CLI
}
type orgBucketID struct {
OrgID string
BucketID string
}
func (c Client) resolveMeasurement(ctx context.Context, ids orgBucketID, name string) (string, error) {
res, err := c.GetMeasurementSchemas(ctx, ids.BucketID).
OrgID(ids.OrgID).
Name(name).
Execute()
if err != nil {
return "", fmt.Errorf("failed to find measurement schema: %w", err)
}
if len(res.MeasurementSchemas) == 0 {
return "", fmt.Errorf("measurement schema %q not found", name)
}
return res.MeasurementSchemas[0].Id, nil
}
// todo move to params.go
func (c Client) resolveOrgBucketIds(ctx context.Context, params clients.OrgBucketParams) (*orgBucketID, error) {
if params.BucketName == "" && params.BucketID == "" {
return nil, errors.New("bucket missing: specify bucket ID or bucket name")
}
if params.OrgID == "" && params.OrgName == "" && c.ActiveConfig.Org == "" {
return nil, errors.New("org missing: specify org ID or org name")
}
req := c.GetBuckets(ctx)
var nameID string
if params.BucketID != "" {
req = req.Id(params.BucketID)
nameID = params.BucketID
} else {
req = req.Name(params.BucketName)
nameID = params.BucketName
}
if params.OrgID != "" {
req = req.OrgID(params.OrgID)
} else if params.OrgName != "" {
req = req.Org(params.OrgName)
} else {
req = req.Org(c.ActiveConfig.Org)
}
resp, err := req.Execute()
if err != nil {
return nil, fmt.Errorf("failed to find bucket %q: %w", nameID, err)
}
buckets := resp.GetBuckets()
if len(buckets) == 0 {
return nil, fmt.Errorf("bucket %q not found", nameID)
}
return &orgBucketID{OrgID: buckets[0].GetOrgID(), BucketID: buckets[0].GetId()}, nil
}
func (c Client) readColumns(stdin io.Reader, f ColumnsFormat, path string) ([]api.MeasurementSchemaColumn, error) {
var (
r io.Reader
name string
)
if path == "" {
r = stdin
name = "stdin"
} else {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("unable to read file %q: %w", path, err)
}
r = bytes.NewReader(data)
name = path
}
reader, err := f.DecoderFn(name)
if err != nil {
return nil, err
}
return reader(r)
}
// Constants for table column headers
const (
IDHdr = "ID"
MeasurementNameHdr = "Measurement Name"
ColumnNameHdr = "Column Name"
ColumnTypeHdr = "Column Type"
ColumnDataTypeHdr = "Column Data Type"
BucketIDHdr = "Bucket ID"
)
func (c Client) printMeasurements(bucketID string, m []api.MeasurementSchema, extended bool) error {
if len(m) == 0 {
return nil
}
if c.PrintAsJSON {
return c.PrintJSON(m)
}
var headers []string
if extended {
headers = []string{
IDHdr,
MeasurementNameHdr,
ColumnNameHdr,
ColumnTypeHdr,
ColumnDataTypeHdr,
BucketIDHdr,
}
} else {
headers = []string{
IDHdr,
MeasurementNameHdr,
BucketIDHdr,
}
}
var makeRow measurementRowFn
if extended {
makeRow = makeExtendedMeasurementRows
} else {
makeRow = makeMeasurementRows
}
var rows []map[string]interface{}
for i := range m {
rows = append(rows, makeRow(bucketID, &m[i])...)
}
return c.PrintTable(headers, rows...)
}
type measurementRowFn func(bucketID string, m *api.MeasurementSchema) []map[string]interface{}
func makeMeasurementRows(bucketID string, m *api.MeasurementSchema) []map[string]interface{} {
return []map[string]interface{}{
{
IDHdr: m.Id,
MeasurementNameHdr: m.Name,
BucketIDHdr: bucketID,
},
}
}
func makeExtendedMeasurementRows(bucketID string, m *api.MeasurementSchema) []map[string]interface{} {
rows := make([]map[string]interface{}, 0, len(m.Columns))
for i := range m.Columns {
col := &m.Columns[i]
rows = append(rows, map[string]interface{}{
IDHdr: m.Id,
MeasurementNameHdr: m.Name,
ColumnNameHdr: col.Name,
ColumnTypeHdr: col.Type,
ColumnDataTypeHdr: col.GetDataType(),
BucketIDHdr: bucketID,
})
}
return rows
}