-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpool-defs-internal.go
More file actions
121 lines (110 loc) · 3.29 KB
/
pool-defs-internal.go
File metadata and controls
121 lines (110 loc) · 3.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package pants
import (
"time"
)
const (
// DefaultChSize TODO: This is just temporary, channel size definition still needs to be
// fine tuned
DefaultChSize = 100
)
type (
injectable[I any] interface {
inject(input I) error
}
closable interface {
terminate()
}
)
type injector[I any] func(input I) error
func (f injector[I]) inject(input I) error {
return f(input)
}
type terminator func()
func (f terminator) terminate() {
f()
}
type outputInfo[O any] struct {
outputDupCh *Duplex[JobOutput[O]]
cancelDupCh *Duplex[CancelWorkSignal]
}
type outputInfoW[O any] struct {
outputCh JobOutputStreamW[O]
cancelCh CancelStreamW
timeoutOnSend time.Duration
}
// Worker pool types:
//
// 🍺 ManifoldFuncPool (to be used by traverse):
// description: this is the most comprehensive pool type with return
// semantics. It is functional meaning that the pool is defined by a
// predefined executive function.
// ants: PoolWithFunc
// post(ants): Invoke
// job(Param): Job(I)
// job-return: JobOutput(O), error
// job-input-stream(client-side): JobStreamW[I]
// job-input-stream(pool-side): JobStreamR[I]
// returns err: true
// observable: JobOutputStreamR(O)
// start: returns observable stream, completion stream
// pool-result: tbd (this is the result that represents the overall pool result.
// If pool shuts down as a result of premature error or ctrl-c abort, then this
// will be reflected in the pool's result).
//
// 🍺 ManifoldTaskPool:
// description: like ManifoldFuncPool but accepts task based jobs meaning each
// job can be any function as opposed to be being a pre-defined function registered
// with the pool. Each job accepts an input I and emits an output O with an error.
// ants: Pool
// post(ants): Submit
// job(Param): Job(func(I) JobOutput(O), error)
// job-return: JobOutput(O), error
// job-input-stream(client-side): JobStreamW[I]
// job-input-stream(pool-side): JobStreamR[I]
// returns err: true
// observable: JobOutputStreamR(O)
// start: returns observable stream, completion stream
// pool-result: yes
//
// 🍺 FuncPoolE
// description: A simple functional pool with fire and return semantics. Client
// submits jobs with only an error return value.
// ants: PoolWithFunc
// post(ants): Invoke
// job(Param): Job(I)
// job-return: none; error only
// job-input-stream(client-side): JobStreamW[I]
// job-input-stream(pool-side): JobStreamR[I]
// returns err: yes
// observable: none
// start: returns completion stream
// pool-result: yes
//
// 🍺 FuncPool
// description: A simple functional pool with fire and forget semantics. Client
// submits jobs with no return value
// ants: PoolWithFunc
// post(ants): Invoke
// job(Param): Job(I)
// job-return: none
// job-input-stream(client-side): JobStreamW[I]
// job-input-stream(pool-side): JobStreamR[I]
// returns err: no
// observable: none
// start: returns completion stream
// pool-result: yes
//
// 🍺 TaskPoolE
// description: accepts task based jobs. Each job accepts an input I and
// emits only an error return value.
// ants: Pool
// post(ants): Submit
// job(Param): Job(func(I) error)
// job-return: error
// job-input-stream(client-side): JobStreamW[I]
// job-input-stream(pool-side): JobStreamR[I]
// returns err: true
// observable: JobOutputStreamR(O)
// start: returns observable stream, completion stream
// pool-result: yes
//