-
Notifications
You must be signed in to change notification settings - Fork 0
/
func.go
137 lines (122 loc) · 3.21 KB
/
func.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package gosh
import (
"context"
"errors"
"io"
"os"
)
// A Func is a function which can be used to implement a go-native Commander/Pipelineable.
// If the function cannot start before calling any blocking (or other long-running) functions, it should return err.
// If any blocking calls would be called, instead you should still return a nil err, and send any potential errors from those blocking calls to the provided "done" channel.
// Functions are expected to close the done channel before returning, e.g. with a defer statement.
// Functions are expected to close stdout and stderr if not returning an initial error
type Func func(ctx context.Context, stdin io.Reader, stdout, stderr io.Writer, done chan error) error
// FuncCmd is a wrapper for the state of a Func which implements Commander and Pipelineable. This can be used to, for example, insert native go-code in between two Pipelines
type FuncCmd struct {
Func Func
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
ctx context.Context
kill context.CancelFunc
done chan error
deferredBefore []func() error
deferredAfter []func() error
}
var (
_ = Pipelineable(&FuncCmd{})
)
// FromFunc produces a Commander/Pipelineable from a compliant function
func FromFunc(parentCtx context.Context, f Func) *FuncCmd {
devNull, err := os.Open(os.DevNull)
if err != nil {
// This shouldn't ever happen, right?
panic(err)
}
cmd := &FuncCmd{
Func: f,
Stdin: devNull,
Stdout: devNull,
Stderr: devNull,
}
cmd.ctx, cmd.kill = context.WithCancel(parentCtx)
return cmd
}
// Start implements Commander
func (f *FuncCmd) Start() error {
if f.done != nil {
return ErrAlreadyStarted
}
err := doDeferredBefore(f.deferredBefore)
if err != nil {
return err
}
f.done = make(chan error)
err = f.Func(f.ctx, f.Stdin, f.Stdout, f.Stderr, f.done)
if err != nil {
return err
}
return nil
}
// Wait implements Commander
func (f *FuncCmd) Wait() error {
err := <-f.done
if errors.Is(err, context.Canceled) {
return ErrKilled
}
return err
}
// Kill implements Commander
func (f *FuncCmd) Kill() error {
if f.done == nil {
return ErrNotStarted
}
f.kill()
return nil
}
// Run implements Commander
func (f *FuncCmd) Run() error {
if f.done != nil {
return ErrAlreadyStarted
}
var err error
err = f.Start()
if err != nil {
return err
}
err = f.Wait()
if err != nil {
return err
}
return nil
}
// DeferBefore implements Pipelineable
func (f *FuncCmd) DeferBefore(fn func() error) {
f.deferredBefore = append(f.deferredBefore, fn)
}
// DeferAfter implements Pipelineable
func (f *FuncCmd) DeferAfter(fn func() error) {
f.deferredAfter = append(f.deferredAfter, fn)
}
// SetStdin implements Pipelineable
func (f *FuncCmd) SetStdin(stdin io.Reader) error {
f.Stdin = stdin
return nil
}
// SetStdout implements Pipelineable
func (f *FuncCmd) SetStdout(stdout io.Writer) error {
f.Stdout = stdout
return nil
}
// SetStderr implements Pipelineable
func (f *FuncCmd) SetStderr(stderr io.Writer) error {
f.Stderr = stderr
return nil
}
// WithStreams applies a set of StreamSetters to this command
func (f *FuncCmd) WithStreams(fs ...StreamSetter) *FuncCmd {
for _, fn := range fs {
fn(f)
}
return f
}