feat: task subcommand (#105)
* feat: task subcommand Closes #19 * fix: testing and review comments * fix: pointer deref and error message
This commit is contained in:
parent
9747d05ae1
commit
63da8eccbd
@ -1961,6 +1961,7 @@ type ApiPostTasksIDRunsIDRetryRequest struct {
|
||||
taskID string
|
||||
runID string
|
||||
zapTraceSpan *string
|
||||
body *map[string]interface{}
|
||||
}
|
||||
|
||||
func (r ApiPostTasksIDRunsIDRetryRequest) TaskID(taskID string) ApiPostTasksIDRunsIDRetryRequest {
|
||||
@ -1987,6 +1988,14 @@ func (r ApiPostTasksIDRunsIDRetryRequest) GetZapTraceSpan() *string {
|
||||
return r.zapTraceSpan
|
||||
}
|
||||
|
||||
func (r ApiPostTasksIDRunsIDRetryRequest) Body(body map[string]interface{}) ApiPostTasksIDRunsIDRetryRequest {
|
||||
r.body = &body
|
||||
return r
|
||||
}
|
||||
func (r ApiPostTasksIDRunsIDRetryRequest) GetBody() *map[string]interface{} {
|
||||
return r.body
|
||||
}
|
||||
|
||||
func (r ApiPostTasksIDRunsIDRetryRequest) Execute() (Run, error) {
|
||||
return r.ApiService.PostTasksIDRunsIDRetryExecute(r)
|
||||
}
|
||||
@ -2035,7 +2044,7 @@ func (a *TasksApiService) PostTasksIDRunsIDRetryExecute(r ApiPostTasksIDRunsIDRe
|
||||
localVarFormParams := _neturl.Values{}
|
||||
|
||||
// to determine the Content-Type header
|
||||
localVarHTTPContentTypes := []string{}
|
||||
localVarHTTPContentTypes := []string{"application/json; charset=utf-8"}
|
||||
|
||||
// set Content-Type header
|
||||
localVarHTTPContentType := selectHeaderContentType(localVarHTTPContentTypes)
|
||||
@ -2054,6 +2063,8 @@ func (a *TasksApiService) PostTasksIDRunsIDRetryExecute(r ApiPostTasksIDRunsIDRe
|
||||
if r.zapTraceSpan != nil {
|
||||
localVarHeaderParams["Zap-Trace-Span"] = parameterToString(*r.zapTraceSpan, "")
|
||||
}
|
||||
// body params
|
||||
localVarPostBody = r.body
|
||||
req, err := a.client.prepareRequest(r.ctx, localVarPath, localVarHTTPMethod, localVarPostBody, localVarHeaderParams, localVarQueryParams, localVarFormParams, localVarFormFileName, localVarFileName, localVarFileBytes)
|
||||
if err != nil {
|
||||
return localVarReturnValue, err
|
||||
|
@ -1 +1 @@
|
||||
Subproject commit 9a74cd2d94a2ad82cdbe6af30348cd9de60c84a6
|
||||
Subproject commit c18cced83492e43c7c03e846d8eaf30bacf1ecdc
|
@ -21,6 +21,8 @@ type LogEvent struct {
|
||||
Time *time.Time `json:"time,omitempty"`
|
||||
// A description of the event that occurred.
|
||||
Message *string `json:"message,omitempty"`
|
||||
// the ID of the task that logged
|
||||
RunID *string `json:"runID,omitempty"`
|
||||
}
|
||||
|
||||
// NewLogEvent instantiates a new LogEvent object
|
||||
@ -104,6 +106,38 @@ func (o *LogEvent) SetMessage(v string) {
|
||||
o.Message = &v
|
||||
}
|
||||
|
||||
// GetRunID returns the RunID field value if set, zero value otherwise.
|
||||
func (o *LogEvent) GetRunID() string {
|
||||
if o == nil || o.RunID == nil {
|
||||
var ret string
|
||||
return ret
|
||||
}
|
||||
return *o.RunID
|
||||
}
|
||||
|
||||
// GetRunIDOk returns a tuple with the RunID field value if set, nil otherwise
|
||||
// and a boolean to check if the value has been set.
|
||||
func (o *LogEvent) GetRunIDOk() (*string, bool) {
|
||||
if o == nil || o.RunID == nil {
|
||||
return nil, false
|
||||
}
|
||||
return o.RunID, true
|
||||
}
|
||||
|
||||
// HasRunID returns a boolean if a field has been set.
|
||||
func (o *LogEvent) HasRunID() bool {
|
||||
if o != nil && o.RunID != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// SetRunID gets a reference to the given string and assigns it to the RunID field.
|
||||
func (o *LogEvent) SetRunID(v string) {
|
||||
o.RunID = &v
|
||||
}
|
||||
|
||||
func (o LogEvent) MarshalJSON() ([]byte, error) {
|
||||
toSerialize := map[string]interface{}{}
|
||||
if o.Time != nil {
|
||||
@ -112,6 +146,9 @@ func (o LogEvent) MarshalJSON() ([]byte, error) {
|
||||
if o.Message != nil {
|
||||
toSerialize["message"] = o.Message
|
||||
}
|
||||
if o.RunID != nil {
|
||||
toSerialize["runID"] = o.RunID
|
||||
}
|
||||
return json.Marshal(toSerialize)
|
||||
}
|
||||
|
||||
|
@ -23,7 +23,7 @@ type Run struct {
|
||||
// Time used for run's \"now\" option, RFC3339.
|
||||
ScheduledFor *time.Time `json:"scheduledFor,omitempty"`
|
||||
// An array of logs associated with the run.
|
||||
Log *[]RunLog `json:"log,omitempty"`
|
||||
Log *[]LogEvent `json:"log,omitempty"`
|
||||
// Time run started executing, RFC3339Nano.
|
||||
StartedAt *time.Time `json:"startedAt,omitempty"`
|
||||
// Time run finished executing, RFC3339Nano.
|
||||
@ -179,9 +179,9 @@ func (o *Run) SetScheduledFor(v time.Time) {
|
||||
}
|
||||
|
||||
// GetLog returns the Log field value if set, zero value otherwise.
|
||||
func (o *Run) GetLog() []RunLog {
|
||||
func (o *Run) GetLog() []LogEvent {
|
||||
if o == nil || o.Log == nil {
|
||||
var ret []RunLog
|
||||
var ret []LogEvent
|
||||
return ret
|
||||
}
|
||||
return *o.Log
|
||||
@ -189,7 +189,7 @@ func (o *Run) GetLog() []RunLog {
|
||||
|
||||
// GetLogOk returns a tuple with the Log field value if set, nil otherwise
|
||||
// and a boolean to check if the value has been set.
|
||||
func (o *Run) GetLogOk() (*[]RunLog, bool) {
|
||||
func (o *Run) GetLogOk() (*[]LogEvent, bool) {
|
||||
if o == nil || o.Log == nil {
|
||||
return nil, false
|
||||
}
|
||||
@ -205,8 +205,8 @@ func (o *Run) HasLog() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// SetLog gets a reference to the given []RunLog and assigns it to the Log field.
|
||||
func (o *Run) SetLog(v []RunLog) {
|
||||
// SetLog gets a reference to the given []LogEvent and assigns it to the Log field.
|
||||
func (o *Run) SetLog(v []LogEvent) {
|
||||
o.Log = &v
|
||||
}
|
||||
|
||||
|
@ -1,185 +0,0 @@
|
||||
/*
|
||||
* Subset of Influx API covered by Influx CLI
|
||||
*
|
||||
* No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
|
||||
*
|
||||
* API version: 2.0.0
|
||||
*/
|
||||
|
||||
// Code generated by OpenAPI Generator (https://openapi-generator.tech); DO NOT EDIT.
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
// RunLog struct for RunLog
|
||||
type RunLog struct {
|
||||
RunID *string `json:"runID,omitempty"`
|
||||
Time *string `json:"time,omitempty"`
|
||||
Message *string `json:"message,omitempty"`
|
||||
}
|
||||
|
||||
// NewRunLog instantiates a new RunLog object
|
||||
// This constructor will assign default values to properties that have it defined,
|
||||
// and makes sure properties required by API are set, but the set of arguments
|
||||
// will change when the set of required properties is changed
|
||||
func NewRunLog() *RunLog {
|
||||
this := RunLog{}
|
||||
return &this
|
||||
}
|
||||
|
||||
// NewRunLogWithDefaults instantiates a new RunLog object
|
||||
// This constructor will only assign default values to properties that have it defined,
|
||||
// but it doesn't guarantee that properties required by API are set
|
||||
func NewRunLogWithDefaults() *RunLog {
|
||||
this := RunLog{}
|
||||
return &this
|
||||
}
|
||||
|
||||
// GetRunID returns the RunID field value if set, zero value otherwise.
|
||||
func (o *RunLog) GetRunID() string {
|
||||
if o == nil || o.RunID == nil {
|
||||
var ret string
|
||||
return ret
|
||||
}
|
||||
return *o.RunID
|
||||
}
|
||||
|
||||
// GetRunIDOk returns a tuple with the RunID field value if set, nil otherwise
|
||||
// and a boolean to check if the value has been set.
|
||||
func (o *RunLog) GetRunIDOk() (*string, bool) {
|
||||
if o == nil || o.RunID == nil {
|
||||
return nil, false
|
||||
}
|
||||
return o.RunID, true
|
||||
}
|
||||
|
||||
// HasRunID returns a boolean if a field has been set.
|
||||
func (o *RunLog) HasRunID() bool {
|
||||
if o != nil && o.RunID != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// SetRunID gets a reference to the given string and assigns it to the RunID field.
|
||||
func (o *RunLog) SetRunID(v string) {
|
||||
o.RunID = &v
|
||||
}
|
||||
|
||||
// GetTime returns the Time field value if set, zero value otherwise.
|
||||
func (o *RunLog) GetTime() string {
|
||||
if o == nil || o.Time == nil {
|
||||
var ret string
|
||||
return ret
|
||||
}
|
||||
return *o.Time
|
||||
}
|
||||
|
||||
// GetTimeOk returns a tuple with the Time field value if set, nil otherwise
|
||||
// and a boolean to check if the value has been set.
|
||||
func (o *RunLog) GetTimeOk() (*string, bool) {
|
||||
if o == nil || o.Time == nil {
|
||||
return nil, false
|
||||
}
|
||||
return o.Time, true
|
||||
}
|
||||
|
||||
// HasTime returns a boolean if a field has been set.
|
||||
func (o *RunLog) HasTime() bool {
|
||||
if o != nil && o.Time != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// SetTime gets a reference to the given string and assigns it to the Time field.
|
||||
func (o *RunLog) SetTime(v string) {
|
||||
o.Time = &v
|
||||
}
|
||||
|
||||
// GetMessage returns the Message field value if set, zero value otherwise.
|
||||
func (o *RunLog) GetMessage() string {
|
||||
if o == nil || o.Message == nil {
|
||||
var ret string
|
||||
return ret
|
||||
}
|
||||
return *o.Message
|
||||
}
|
||||
|
||||
// GetMessageOk returns a tuple with the Message field value if set, nil otherwise
|
||||
// and a boolean to check if the value has been set.
|
||||
func (o *RunLog) GetMessageOk() (*string, bool) {
|
||||
if o == nil || o.Message == nil {
|
||||
return nil, false
|
||||
}
|
||||
return o.Message, true
|
||||
}
|
||||
|
||||
// HasMessage returns a boolean if a field has been set.
|
||||
func (o *RunLog) HasMessage() bool {
|
||||
if o != nil && o.Message != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// SetMessage gets a reference to the given string and assigns it to the Message field.
|
||||
func (o *RunLog) SetMessage(v string) {
|
||||
o.Message = &v
|
||||
}
|
||||
|
||||
func (o RunLog) MarshalJSON() ([]byte, error) {
|
||||
toSerialize := map[string]interface{}{}
|
||||
if o.RunID != nil {
|
||||
toSerialize["runID"] = o.RunID
|
||||
}
|
||||
if o.Time != nil {
|
||||
toSerialize["time"] = o.Time
|
||||
}
|
||||
if o.Message != nil {
|
||||
toSerialize["message"] = o.Message
|
||||
}
|
||||
return json.Marshal(toSerialize)
|
||||
}
|
||||
|
||||
type NullableRunLog struct {
|
||||
value *RunLog
|
||||
isSet bool
|
||||
}
|
||||
|
||||
func (v NullableRunLog) Get() *RunLog {
|
||||
return v.value
|
||||
}
|
||||
|
||||
func (v *NullableRunLog) Set(val *RunLog) {
|
||||
v.value = val
|
||||
v.isSet = true
|
||||
}
|
||||
|
||||
func (v NullableRunLog) IsSet() bool {
|
||||
return v.isSet
|
||||
}
|
||||
|
||||
func (v *NullableRunLog) Unset() {
|
||||
v.value = nil
|
||||
v.isSet = false
|
||||
}
|
||||
|
||||
func NewNullableRunLog(val *RunLog) *NullableRunLog {
|
||||
return &NullableRunLog{value: val, isSet: true}
|
||||
}
|
||||
|
||||
func (v NullableRunLog) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(v.value)
|
||||
}
|
||||
|
||||
func (v *NullableRunLog) UnmarshalJSON(src []byte) error {
|
||||
v.isSet = true
|
||||
return json.Unmarshal(src, &v.value)
|
||||
}
|
@ -2,6 +2,9 @@ package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influx-cli/v2/api"
|
||||
"github.com/influxdata/influx-cli/v2/clients"
|
||||
@ -19,20 +22,49 @@ type CreateParams struct {
|
||||
FluxQuery string
|
||||
}
|
||||
|
||||
func (c Client) getOrg(params *clients.OrgParams) (string, error) {
|
||||
type NameOrID struct {
|
||||
Name string
|
||||
ID string
|
||||
}
|
||||
|
||||
func (n NameOrID) NameOrNil() *string {
|
||||
if n.Name == "" {
|
||||
return nil
|
||||
}
|
||||
return &n.Name
|
||||
}
|
||||
|
||||
func (n NameOrID) IDOrNil() *string {
|
||||
if n.ID == "" {
|
||||
return nil
|
||||
}
|
||||
return &n.ID
|
||||
}
|
||||
|
||||
func addOrg(n NameOrID, g api.ApiGetTasksRequest) api.ApiGetTasksRequest {
|
||||
if n.ID != "" {
|
||||
return g.OrgID(n.ID)
|
||||
}
|
||||
if n.Name != "" {
|
||||
return g.Org(n.Name)
|
||||
}
|
||||
return g
|
||||
}
|
||||
|
||||
func (c Client) getOrg(params *clients.OrgParams) (NameOrID, error) {
|
||||
if params.OrgID.Valid() {
|
||||
return params.OrgID.String(), nil
|
||||
return NameOrID{ID: params.OrgID.String()}, nil
|
||||
}
|
||||
if params.OrgName != "" {
|
||||
return params.OrgName, nil
|
||||
return NameOrID{Name: params.OrgName}, nil
|
||||
}
|
||||
if c.ActiveConfig.Org != "" {
|
||||
return c.ActiveConfig.Org, nil
|
||||
return NameOrID{Name: c.ActiveConfig.Org}, nil
|
||||
}
|
||||
if c.AllowEmptyOrg {
|
||||
return "", nil
|
||||
return NameOrID{}, nil
|
||||
}
|
||||
return "", clients.ErrMustSpecifyOrg
|
||||
return NameOrID{}, clients.ErrMustSpecifyOrg
|
||||
}
|
||||
|
||||
func (c Client) Create(ctx context.Context, params *CreateParams) error {
|
||||
@ -40,9 +72,214 @@ func (c Client) Create(ctx context.Context, params *CreateParams) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
task, err := c.PostTasks(ctx).TaskCreateRequest(api.TaskCreateRequest{
|
||||
Org: &org,
|
||||
Flux: params.FluxQuery,
|
||||
createRequest := api.TaskCreateRequest{
|
||||
Flux: params.FluxQuery,
|
||||
OrgID: org.IDOrNil(),
|
||||
Org: org.NameOrNil(),
|
||||
}
|
||||
task, err := c.PostTasks(ctx).TaskCreateRequest(createRequest).Execute()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.printTasks(taskPrintOpts{
|
||||
task: &task,
|
||||
})
|
||||
}
|
||||
|
||||
type FindParams struct {
|
||||
clients.OrgParams
|
||||
TaskID string
|
||||
UserID string
|
||||
Limit int
|
||||
}
|
||||
|
||||
func (c Client) Find(ctx context.Context, params *FindParams) error {
|
||||
if params.Limit < 1 {
|
||||
return fmt.Errorf("must specify a positive limit, not %d", params.Limit)
|
||||
}
|
||||
|
||||
var tasks []api.Task
|
||||
// If we get an id, just find the one task
|
||||
if params.TaskID != "" {
|
||||
task, err := c.GetTasksID(ctx, params.TaskID).Execute()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tasks = append(tasks, task)
|
||||
} else {
|
||||
org, err := c.getOrg(¶ms.OrgParams)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// filter on all tasks
|
||||
if params.Limit > math.MaxInt32 {
|
||||
return fmt.Errorf("limit too large %d > %d", params.Limit, math.MaxInt32)
|
||||
}
|
||||
getTask := c.GetTasks(ctx).Limit(int32(params.Limit))
|
||||
getTask = addOrg(org, getTask)
|
||||
if params.UserID != "" {
|
||||
getTask = getTask.User(params.UserID)
|
||||
}
|
||||
tasksResult, err := getTask.Execute()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tasks = *tasksResult.Tasks
|
||||
}
|
||||
return c.printTasks(taskPrintOpts{
|
||||
tasks: tasks,
|
||||
})
|
||||
}
|
||||
|
||||
func (c Client) appendRuns(ctx context.Context, prev []api.Run, taskID string, filter RunFilter) ([]api.Run, error) {
|
||||
if filter.Limit < 1 {
|
||||
return nil, fmt.Errorf("must specify a positive run limit, not %d", filter.Limit)
|
||||
}
|
||||
if filter.Limit > math.MaxInt32 {
|
||||
return nil, fmt.Errorf("limit too large %d > %d", filter.Limit, math.MaxInt32)
|
||||
}
|
||||
getRuns := c.GetTasksIDRuns(ctx, taskID).Limit(int32(filter.Limit))
|
||||
if filter.After != "" {
|
||||
afterTime, err := time.Parse(time.RFC3339, filter.After)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
getRuns = getRuns.AfterTime(afterTime)
|
||||
}
|
||||
if filter.Before != "" {
|
||||
beforeTime, err := time.Parse(time.RFC3339, filter.Before)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
getRuns = getRuns.BeforeTime(beforeTime)
|
||||
}
|
||||
runs, err := getRuns.Execute()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, run := range *runs.Runs {
|
||||
if filter.Status == "" {
|
||||
prev = append(prev, run)
|
||||
} else if run.Status != nil && *run.Status == filter.Status {
|
||||
prev = append(prev, run)
|
||||
}
|
||||
}
|
||||
return prev, nil
|
||||
}
|
||||
|
||||
type RunFilter struct {
|
||||
After string
|
||||
Before string
|
||||
Limit int
|
||||
Status string
|
||||
}
|
||||
|
||||
type RetryFailedParams struct {
|
||||
clients.OrgParams
|
||||
TaskID string
|
||||
DryRun bool
|
||||
TaskLimit int
|
||||
RunFilter RunFilter
|
||||
}
|
||||
|
||||
func (c Client) retryRun(ctx context.Context, run api.Run, dryRun bool) error {
|
||||
// Note that this output does not respect json flag, in line with original influx cli
|
||||
// The server should fill in the empty id's so this shouldn't happen
|
||||
if run.Id == nil {
|
||||
_ = c.StdIO.Error("skipping empty run id from influxdb")
|
||||
return nil
|
||||
}
|
||||
if run.TaskID == nil {
|
||||
_ = c.StdIO.Error("skipping empty task id from influxdb")
|
||||
return nil
|
||||
}
|
||||
if dryRun {
|
||||
_, _ = fmt.Fprintf(c.StdIO, "Would retry for %s run for Task %s.\n", *run.Id, *run.TaskID)
|
||||
} else {
|
||||
newRun, err := c.PostTasksIDRunsIDRetry(ctx, *run.TaskID, *run.Id).Execute()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, _ = fmt.Fprintf(c.StdIO, "Retry for task %s's run %s queued as run %s.\n", *run.TaskID, *run.Id, *newRun.Id)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c Client) RetryFailed(ctx context.Context, params *RetryFailedParams) error {
|
||||
if params.TaskLimit < 1 {
|
||||
return fmt.Errorf("must specify a positive task limit, not %d", params.TaskLimit)
|
||||
}
|
||||
var taskIds []string
|
||||
if params.TaskID != "" {
|
||||
taskIds = []string{params.TaskID}
|
||||
} else {
|
||||
org, err := c.getOrg(¶ms.OrgParams)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if params.TaskLimit > math.MaxInt32 {
|
||||
return fmt.Errorf("limit too large %d > %d", params.TaskLimit, math.MaxInt32)
|
||||
}
|
||||
getTask := c.GetTasks(ctx).Limit(int32(params.TaskLimit))
|
||||
getTask = addOrg(org, getTask)
|
||||
tasks, err := getTask.Execute()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
taskIds = make([]string, 0, len(*tasks.Tasks))
|
||||
for _, t := range *tasks.Tasks {
|
||||
taskIds = append(taskIds, t.Id)
|
||||
}
|
||||
}
|
||||
var failedRuns []api.Run
|
||||
for _, taskId := range taskIds {
|
||||
var err error
|
||||
runFilter := params.RunFilter
|
||||
runFilter.Status = "failed"
|
||||
failedRuns, err = c.appendRuns(ctx, failedRuns, taskId, runFilter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, run := range failedRuns {
|
||||
err := c.retryRun(ctx, run, params.DryRun)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if params.DryRun {
|
||||
_, _ = fmt.Fprintf(c.StdIO, `Dry run complete. Found %d tasks with a total of %d runs to be retried
|
||||
Rerun without '--dry-run' to execute
|
||||
`, len(taskIds), len(failedRuns))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type UpdateParams struct {
|
||||
FluxQuery string
|
||||
TaskID string
|
||||
Status string
|
||||
}
|
||||
|
||||
func (c Client) Update(ctx context.Context, params *UpdateParams) error {
|
||||
var flux *string
|
||||
if params.FluxQuery != "" {
|
||||
flux = ¶ms.FluxQuery
|
||||
}
|
||||
var status *api.TaskStatusType
|
||||
if params.Status != "" {
|
||||
var s api.TaskStatusType
|
||||
err := s.UnmarshalJSON([]byte(fmt.Sprintf("%q", params.Status)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
status = &s
|
||||
}
|
||||
task, err := c.PatchTasksID(ctx, params.TaskID).TaskUpdateRequest(api.TaskUpdateRequest{
|
||||
Status: status,
|
||||
Flux: flux,
|
||||
}).Execute()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -52,9 +289,34 @@ func (c Client) Create(ctx context.Context, params *CreateParams) error {
|
||||
})
|
||||
}
|
||||
|
||||
type DeleteParams struct {
|
||||
TaskID string
|
||||
}
|
||||
|
||||
func (c Client) Delete(ctx context.Context, params *DeleteParams) error {
|
||||
task, err := c.GetTasksID(ctx, params.TaskID).Execute()
|
||||
if err != nil {
|
||||
return fmt.Errorf("while finding task: %w", err)
|
||||
}
|
||||
err = c.DeleteTasksID(ctx, params.TaskID).Execute()
|
||||
if err != nil {
|
||||
return fmt.Errorf("while deleting: %w", err)
|
||||
}
|
||||
return c.printTasks(taskPrintOpts{
|
||||
task: &task,
|
||||
})
|
||||
}
|
||||
|
||||
type taskPrintOpts struct {
|
||||
task *api.Task
|
||||
tasks []*api.Task
|
||||
tasks []api.Task
|
||||
}
|
||||
|
||||
func derefOrEmpty(s *string) string {
|
||||
if s == nil {
|
||||
return ""
|
||||
}
|
||||
return *s
|
||||
}
|
||||
|
||||
func (c Client) printTasks(printOpts taskPrintOpts) error {
|
||||
@ -77,7 +339,7 @@ func (c Client) printTasks(printOpts taskPrintOpts) error {
|
||||
}
|
||||
|
||||
if printOpts.task != nil {
|
||||
printOpts.tasks = append(printOpts.tasks, printOpts.task)
|
||||
printOpts.tasks = append(printOpts.tasks, *printOpts.task)
|
||||
}
|
||||
|
||||
var rows []map[string]interface{}
|
||||
@ -86,13 +348,147 @@ func (c Client) printTasks(printOpts taskPrintOpts) error {
|
||||
"ID": t.Id,
|
||||
"Name": t.Name,
|
||||
"Organization ID": t.OrgID,
|
||||
"Organization": t.Org,
|
||||
"Status": t.Status,
|
||||
"Every": t.Every,
|
||||
"Cron": t.Cron,
|
||||
"Organization": derefOrEmpty(t.Org),
|
||||
"Status": derefOrEmpty((*string)(t.Status)),
|
||||
"Every": derefOrEmpty(t.Every),
|
||||
"Cron": derefOrEmpty(t.Cron),
|
||||
}
|
||||
rows = append(rows, row)
|
||||
}
|
||||
|
||||
return c.PrintTable(headers, rows...)
|
||||
}
|
||||
|
||||
type LogFindParams struct {
|
||||
TaskID string
|
||||
RunID string
|
||||
}
|
||||
|
||||
func (c Client) FindLogs(ctx context.Context, params *LogFindParams) error {
|
||||
var logs api.Logs
|
||||
if params.RunID != "" {
|
||||
var err error
|
||||
logs, err = c.GetTasksIDRunsIDLogs(ctx, params.TaskID, params.RunID).Execute()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
var err error
|
||||
logs, err = c.GetTasksIDLogs(ctx, params.TaskID).Execute()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if logs.Events == nil {
|
||||
return c.printLogs(nil)
|
||||
}
|
||||
return c.printLogs(*logs.Events)
|
||||
}
|
||||
|
||||
func (c Client) printLogs(logs []api.LogEvent) error {
|
||||
if c.PrintAsJSON {
|
||||
var v interface{} = logs
|
||||
return c.PrintJSON(v)
|
||||
}
|
||||
|
||||
headers := []string{
|
||||
"RunID",
|
||||
"Time",
|
||||
"Message",
|
||||
}
|
||||
|
||||
var rows []map[string]interface{}
|
||||
for _, l := range logs {
|
||||
row := map[string]interface{}{
|
||||
"RunID": derefOrEmpty(l.RunID),
|
||||
"Time": l.Time,
|
||||
"Message": derefOrEmpty(l.Message),
|
||||
}
|
||||
rows = append(rows, row)
|
||||
}
|
||||
|
||||
return c.PrintTable(headers, rows...)
|
||||
}
|
||||
|
||||
type RunFindParams struct {
|
||||
TaskID string
|
||||
RunID string
|
||||
Filter RunFilter
|
||||
}
|
||||
|
||||
func (c Client) FindRuns(ctx context.Context, params *RunFindParams) error {
|
||||
if params.Filter.Limit < 1 {
|
||||
return fmt.Errorf("must specify a positive run limit, not %d", params.Filter.Limit)
|
||||
}
|
||||
|
||||
runs := make([]api.Run, 0)
|
||||
if params.RunID != "" {
|
||||
run, err := c.GetTasksIDRunsID(ctx, params.TaskID, params.RunID).Execute()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
runs = append(runs, run)
|
||||
} else {
|
||||
var err error
|
||||
runs, err = c.appendRuns(ctx, runs, params.TaskID, params.Filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return c.printRuns(runs)
|
||||
}
|
||||
|
||||
func (c Client) printRuns(runs []api.Run) error {
|
||||
if c.PrintAsJSON {
|
||||
var v interface{} = runs
|
||||
return c.PrintJSON(v)
|
||||
}
|
||||
|
||||
headers := []string{
|
||||
"ID",
|
||||
"TaskID",
|
||||
"Status",
|
||||
"ScheduledFor",
|
||||
"StartedAt",
|
||||
"FinishedAt",
|
||||
"RequestedAt",
|
||||
}
|
||||
|
||||
derefAndFormat := func(t *time.Time, layout string) string {
|
||||
if t == nil {
|
||||
return ""
|
||||
}
|
||||
return t.Format(layout)
|
||||
}
|
||||
|
||||
var rows []map[string]interface{}
|
||||
for _, r := range runs {
|
||||
row := map[string]interface{}{
|
||||
"ID": derefOrEmpty(r.Id),
|
||||
"TaskID": derefOrEmpty(r.TaskID),
|
||||
"Status": derefOrEmpty(r.Status),
|
||||
"ScheduledFor": derefAndFormat(r.ScheduledFor, time.RFC3339),
|
||||
"StartedAt": derefAndFormat(r.StartedAt, time.RFC3339Nano),
|
||||
"FinishedAt": derefAndFormat(r.FinishedAt, time.RFC3339Nano),
|
||||
"RequestedAt": derefAndFormat(r.RequestedAt, time.RFC3339Nano),
|
||||
}
|
||||
rows = append(rows, row)
|
||||
}
|
||||
|
||||
return c.PrintTable(headers, rows...)
|
||||
}
|
||||
|
||||
type RunRetryParams struct {
|
||||
TaskID string
|
||||
RunID string
|
||||
}
|
||||
|
||||
func (c Client) RetryRun(ctx context.Context, params *RunRetryParams) error {
|
||||
newRun, err := c.PostTasksIDRunsIDRetry(ctx, params.TaskID, params.RunID).Execute()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, _ = fmt.Fprintf(c.StdIO, "Retry for task %s's run %s queued as run %s.\n", params.TaskID, params.RunID, *newRun.Id)
|
||||
return nil
|
||||
}
|
||||
|
@ -7,18 +7,20 @@ import (
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
const TaskMaxPageSize = 500
|
||||
|
||||
func newTaskCommand() *cli.Command {
|
||||
return &cli.Command{
|
||||
Name: "task",
|
||||
Usage: "Task management commands",
|
||||
Subcommands: []*cli.Command{
|
||||
//newTaskLogCmd(),
|
||||
//newTaskRunCmd(),
|
||||
newTaskLogCmd(),
|
||||
newTaskRunCmd(),
|
||||
newTaskCreateCmd(),
|
||||
//newTaskDeleteCmd(),
|
||||
//newTaskFindCmd(),
|
||||
//newTaskUpdateCmd(),
|
||||
//newTaskRetryFailedCmd(),
|
||||
newTaskDeleteCmd(),
|
||||
newTaskFindCmd(),
|
||||
newTaskUpdateCmd(),
|
||||
newTaskRetryFailedCmd(),
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -34,7 +36,7 @@ func newTaskCreateCmd() *cli.Command {
|
||||
})
|
||||
return &cli.Command{
|
||||
Name: "create",
|
||||
Usage: "Create a task with a Flux script provided via the first argument or a file or stdin",
|
||||
Usage: "Create a task with a Flux script provided via the first argument or a file or stdin.",
|
||||
ArgsUsage: "[flux script or '-' for stdin]",
|
||||
Flags: flags,
|
||||
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
|
||||
@ -53,3 +55,311 @@ func newTaskCreateCmd() *cli.Command {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newTaskFindCmd() *cli.Command {
|
||||
var params task.FindParams
|
||||
flags := append(commonFlags(), getOrgFlags(¶ms.OrgParams)...)
|
||||
flags = append(flags, []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "id",
|
||||
Usage: "task ID",
|
||||
Aliases: []string{"i"},
|
||||
Destination: ¶ms.TaskID,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "user-id",
|
||||
Usage: "task owner ID",
|
||||
Aliases: []string{"n"},
|
||||
Destination: ¶ms.UserID,
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "limit",
|
||||
Usage: "the number of tasks to find",
|
||||
Destination: ¶ms.Limit,
|
||||
Value: TaskMaxPageSize,
|
||||
},
|
||||
}...)
|
||||
return &cli.Command{
|
||||
Name: "list",
|
||||
Usage: "List tasks",
|
||||
Aliases: []string{"find", "ls"},
|
||||
Flags: flags,
|
||||
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
api := getAPI(ctx)
|
||||
client := task.Client{
|
||||
CLI: getCLI(ctx),
|
||||
TasksApi: api.TasksApi,
|
||||
}
|
||||
return client.Find(ctx.Context, ¶ms)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newTaskRetryFailedCmd() *cli.Command {
|
||||
var params task.RetryFailedParams
|
||||
flags := append(commonFlags(), getOrgFlags(¶ms.OrgParams)...)
|
||||
flags = append(flags, []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "id",
|
||||
Usage: "task ID",
|
||||
Aliases: []string{"i"},
|
||||
Destination: ¶ms.TaskID,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "before",
|
||||
Usage: "runs before this time",
|
||||
Destination: ¶ms.RunFilter.Before,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "after",
|
||||
Usage: "runs after this time",
|
||||
Destination: ¶ms.RunFilter.After,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "dry-run",
|
||||
Usage: "print info about runs that would be retried",
|
||||
Destination: ¶ms.DryRun,
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "task-limit",
|
||||
Usage: "max number of tasks to retry failed runs for",
|
||||
Destination: ¶ms.TaskLimit,
|
||||
Value: 100,
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "run-limit",
|
||||
Usage: "max number of failed runs to retry per task",
|
||||
Destination: ¶ms.RunFilter.Limit,
|
||||
Value: 100,
|
||||
},
|
||||
}...)
|
||||
return &cli.Command{
|
||||
Name: "retry-failed",
|
||||
Usage: "Retry failed runs",
|
||||
Aliases: []string{"rtf"},
|
||||
Flags: flags,
|
||||
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
api := getAPI(ctx)
|
||||
client := task.Client{
|
||||
CLI: getCLI(ctx),
|
||||
TasksApi: api.TasksApi,
|
||||
}
|
||||
return client.RetryFailed(ctx.Context, ¶ms)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newTaskUpdateCmd() *cli.Command {
|
||||
var params task.UpdateParams
|
||||
flags := commonFlags()
|
||||
flags = append(flags, []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "id",
|
||||
Usage: "task ID (required)",
|
||||
Aliases: []string{"i"},
|
||||
Destination: ¶ms.TaskID,
|
||||
Required: true,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "status",
|
||||
Usage: "update tasks status",
|
||||
Destination: ¶ms.Status,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "file",
|
||||
Usage: "Path to Flux script file",
|
||||
Aliases: []string{"f"},
|
||||
TakesFile: true,
|
||||
},
|
||||
}...)
|
||||
return &cli.Command{
|
||||
Name: "update",
|
||||
Usage: "Update task status or script. Provide a Flux script via the first argument or a file.",
|
||||
ArgsUsage: "[flux script or '-' for stdin]",
|
||||
Flags: flags,
|
||||
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
api := getAPI(ctx)
|
||||
client := task.Client{
|
||||
CLI: getCLI(ctx),
|
||||
TasksApi: api.TasksApi,
|
||||
}
|
||||
var err error
|
||||
if ctx.String("file") != "" || ctx.NArg() != 0 {
|
||||
params.FluxQuery, err = clients.ReadQuery(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return client.Update(ctx.Context, ¶ms)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newTaskDeleteCmd() *cli.Command {
|
||||
var params task.DeleteParams
|
||||
flags := commonFlags()
|
||||
flags = append(flags, []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "id",
|
||||
Usage: "task ID (required)",
|
||||
Aliases: []string{"i"},
|
||||
Destination: ¶ms.TaskID,
|
||||
Required: true,
|
||||
},
|
||||
}...)
|
||||
return &cli.Command{
|
||||
Name: "delete",
|
||||
Usage: "Delete tasks",
|
||||
Flags: flags,
|
||||
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
api := getAPI(ctx)
|
||||
client := task.Client{
|
||||
CLI: getCLI(ctx),
|
||||
TasksApi: api.TasksApi,
|
||||
}
|
||||
return client.Delete(ctx.Context, ¶ms)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newTaskLogCmd() *cli.Command {
|
||||
return &cli.Command{
|
||||
Name: "log",
|
||||
Usage: "Log related commands",
|
||||
Subcommands: []*cli.Command{
|
||||
newTaskLogFindCmd(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newTaskLogFindCmd() *cli.Command {
|
||||
var params task.LogFindParams
|
||||
flags := commonFlags()
|
||||
flags = append(flags, []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "task-id",
|
||||
Usage: "task id (required)",
|
||||
Destination: ¶ms.TaskID,
|
||||
Required: true,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "run-id",
|
||||
Usage: "run id",
|
||||
Destination: ¶ms.RunID,
|
||||
},
|
||||
}...)
|
||||
return &cli.Command{
|
||||
Name: "list",
|
||||
Usage: "List logs for a task",
|
||||
Aliases: []string{"find", "ls"},
|
||||
Flags: flags,
|
||||
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
api := getAPI(ctx)
|
||||
client := task.Client{
|
||||
CLI: getCLI(ctx),
|
||||
TasksApi: api.TasksApi,
|
||||
}
|
||||
return client.FindLogs(ctx.Context, ¶ms)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newTaskRunCmd() *cli.Command {
|
||||
return &cli.Command{
|
||||
Name: "run",
|
||||
Usage: "Run related commands",
|
||||
Subcommands: []*cli.Command{
|
||||
newTaskRunFindCmd(),
|
||||
newTaskRunRetryCmd(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newTaskRunFindCmd() *cli.Command {
|
||||
var params task.RunFindParams
|
||||
flags := commonFlags()
|
||||
flags = append(flags, []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "task-id",
|
||||
Usage: "task ID (required)",
|
||||
Destination: ¶ms.TaskID,
|
||||
Required: true,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "run-id",
|
||||
Usage: "run id",
|
||||
Destination: ¶ms.RunID,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "before",
|
||||
Usage: "runs before this time",
|
||||
Destination: ¶ms.Filter.Before,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "after",
|
||||
Usage: "runs after this time",
|
||||
Destination: ¶ms.Filter.After,
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "limit",
|
||||
Usage: "limit the results",
|
||||
Destination: ¶ms.Filter.Limit,
|
||||
Value: 100,
|
||||
},
|
||||
}...)
|
||||
return &cli.Command{
|
||||
Name: "list",
|
||||
Usage: "List runs for a tasks",
|
||||
Aliases: []string{"find", "ls"},
|
||||
Flags: flags,
|
||||
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
api := getAPI(ctx)
|
||||
client := task.Client{
|
||||
CLI: getCLI(ctx),
|
||||
TasksApi: api.TasksApi,
|
||||
}
|
||||
return client.FindRuns(ctx.Context, ¶ms)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newTaskRunRetryCmd() *cli.Command {
|
||||
var params task.RunRetryParams
|
||||
flags := commonFlags()
|
||||
flags = append(flags, []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "task-id",
|
||||
Usage: "task ID (required)",
|
||||
Aliases: []string{"i"},
|
||||
Destination: ¶ms.TaskID,
|
||||
Required: true,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "run-id",
|
||||
Usage: "run ID (required)",
|
||||
Aliases: []string{"r"},
|
||||
Destination: ¶ms.RunID,
|
||||
Required: true,
|
||||
},
|
||||
}...)
|
||||
return &cli.Command{
|
||||
Name: "retry",
|
||||
Usage: "Retry a run",
|
||||
Flags: flags,
|
||||
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
api := getAPI(ctx)
|
||||
client := task.Client{
|
||||
CLI: getCLI(ctx),
|
||||
TasksApi: api.TasksApi,
|
||||
}
|
||||
return client.RetryRun(ctx.Context, ¶ms)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user