Skip to content

Commit 07fbbfb

Browse files
committed
Use callback instead of channels
1 parent 91cd5a6 commit 07fbbfb

File tree

3 files changed

+35
-47
lines changed

3 files changed

+35
-47
lines changed

internal/update/apt/service.go

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -73,75 +73,73 @@ func (s *Service) ListUpgradablePackages(ctx context.Context, matcher func(updat
7373
// UpgradePackages upgrades the specified packages using the `apt-get upgrade` command.
7474
// It publishes events to subscribers during the upgrade process.
7575
// It returns an error if the upgrade is already in progress or if the upgrade command fails.
76-
func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan update.Event, error) {
76+
func (s *Service) UpgradePackages(ctx context.Context, names []string, eventCB func(update.Event)) error {
7777
if !s.lock.TryLock() {
78-
return nil, update.ErrOperationAlreadyInProgress
78+
return update.ErrOperationAlreadyInProgress
7979
}
80-
eventsCh := make(chan update.Event, 100)
8180

8281
go func() {
8382
defer s.lock.Unlock()
84-
defer close(eventsCh)
8583

8684
// At the end of the upgrade, always try to restart the services (that need it).
8785
// This makes sure key services are restarted even if an error happens in the upgrade steps (for examples container images download).
8886
defer func() {
89-
eventsCh <- update.NewDataEvent(update.RestartEvent, "Upgrade completed. Restarting ...")
87+
eventCB(update.NewDataEvent(update.RestartEvent, "Upgrade completed. Restarting ..."))
9088

9189
err := restartServices(ctx)
9290
if err != nil {
93-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error restarting services after upgrade: %w", err))
91+
eventCB(update.NewErrorEvent(fmt.Errorf("error restarting services after upgrade: %w", err)))
9492
return
9593
}
9694
}()
9795

98-
eventsCh <- update.NewDataEvent(update.StartEvent, "Upgrade is starting")
96+
eventCB(update.NewDataEvent(update.StartEvent, "Upgrade is starting"))
9997
stream := runUpgradeCommand(ctx, names)
10098
for line, err := range stream {
10199
if err != nil {
102-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error running upgrade command: %w", err))
100+
eventCB(update.NewErrorEvent(fmt.Errorf("error running upgrade command: %w", err)))
103101
return
104102
}
105-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
103+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, line))
106104
}
107105

108-
eventsCh <- update.NewDataEvent(update.StartEvent, "apt cleaning cache is starting")
106+
eventCB(update.NewDataEvent(update.StartEvent, "apt cleaning cache is starting"))
109107
for line, err := range runAptCleanCommand(ctx) {
110108
if err != nil {
111-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error running apt clean command: %w", err))
109+
eventCB(update.NewErrorEvent(fmt.Errorf("error running apt clean command: %w", err)))
112110
return
113111
}
114-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
112+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, line))
115113
}
116114

117-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, "Stop and destroy docker containers and images ....")
115+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, "Stop and destroy docker containers and images ...."))
118116
streamCleanup := cleanupDockerContainers(ctx)
119117
for line, err := range streamCleanup {
120118
if err != nil {
121119
// TODO: maybe we should retun an error or a better feedback to the user?
122120
// currently, we just log the error and continue considenring not blocking
123121
slog.Warn("Error stopping and destroying docker containers", "error", err)
124122
} else {
125-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
123+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, line))
126124
}
127125
}
128126

129127
// TODO: Remove this workaround once docker image versions are no longer hardcoded in arduino-app-cli.
130128
// Tracking issue: https://github.com/arduino/arduino-app-cli/issues/600
131129
// Currently, we need to launch `arduino-app-cli system init` to pull the latest docker images because
132130
// the version of the docker images are hardcoded in the (new downloaded) version of the arduino-app-cli.
133-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, "Pulling the latest docker images ...")
131+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, "Pulling the latest docker images ..."))
134132
streamDocker := pullDockerImages(ctx)
135133
for line, err := range streamDocker {
136134
if err != nil {
137-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error pulling docker images: %w", err))
135+
eventCB(update.NewErrorEvent(fmt.Errorf("error pulling docker images: %w", err)))
138136
return
139137
}
140-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
138+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, line))
141139
}
142140
}()
143141

144-
return eventsCh, nil
142+
return nil
145143
}
146144

147145
// runDpkgConfigureCommand is need in case an upgrade was interrupted in the middle

internal/update/arduino/arduino.go

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -134,40 +134,38 @@ func (a *ArduinoPlatformUpdater) ListUpgradablePackages(ctx context.Context, _ f
134134
}
135135

136136
// UpgradePackages implements ServiceUpdater.
137-
func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []string) (<-chan update.Event, error) {
137+
func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []string, eventCB func(update.Event)) error {
138138
if !a.lock.TryLock() {
139-
return nil, update.ErrOperationAlreadyInProgress
139+
return update.ErrOperationAlreadyInProgress
140140
}
141-
eventsCh := make(chan update.Event, 100)
142141

143142
downloadProgressCB := func(curr *rpc.DownloadProgress) {
144143
data := helpers.ArduinoCLIDownloadProgressToString(curr)
145144
slog.Debug("Download progress", slog.String("download_progress", data))
146-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data)
145+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, data))
147146
}
148147
taskProgressCB := func(msg *rpc.TaskProgress) {
149148
data := helpers.ArduinoCLITaskProgressToString(msg)
150149
slog.Debug("Task progress", slog.String("task_progress", data))
151-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data)
150+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, data))
152151
}
153152

154153
go func() {
155154
defer a.lock.Unlock()
156-
defer close(eventsCh)
157155

158-
eventsCh <- update.NewDataEvent(update.StartEvent, "Upgrade is starting")
156+
eventCB(update.NewDataEvent(update.StartEvent, "Upgrade is starting"))
159157

160158
logrus.SetLevel(logrus.ErrorLevel) // Reduce the log level of arduino-cli
161159
srv := commands.NewArduinoCoreServer()
162160

163161
if err := setConfig(ctx, srv); err != nil {
164-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error setting config: %w", err))
162+
eventCB(update.NewErrorEvent(fmt.Errorf("error setting config: %w", err)))
165163
return
166164
}
167165

168166
var inst *rpc.Instance
169167
if resp, err := srv.Create(ctx, &rpc.CreateRequest{}); err != nil {
170-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error creating arduino-cli instance: %w", err))
168+
eventCB(update.NewErrorEvent(fmt.Errorf("error creating arduino-cli instance: %w", err)))
171169
return
172170
} else {
173171
inst = resp.GetInstance()
@@ -183,11 +181,11 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
183181
{
184182
stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, downloadProgressCB)
185183
if err := srv.UpdateIndex(&rpc.UpdateIndexRequest{Instance: inst}, stream); err != nil {
186-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error updating index: %w", err))
184+
eventCB(update.NewErrorEvent(fmt.Errorf("error updating index: %w", err)))
187185
return
188186
}
189187
if err := srv.Init(&rpc.InitRequest{Instance: inst}, commands.InitStreamResponseToCallbackFunction(ctx, nil)); err != nil {
190-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error initializing instance: %w", err))
188+
eventCB(update.NewErrorEvent(fmt.Errorf("error initializing instance: %w", err)))
191189
return
192190
}
193191
}
@@ -209,13 +207,13 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
209207
); err != nil {
210208
var alreadyPresent *cmderrors.PlatformAlreadyAtTheLatestVersionError
211209
if errors.As(err, &alreadyPresent) {
212-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, alreadyPresent.Error())
210+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, alreadyPresent.Error()))
213211
return
214212
}
215213

216214
var notFound *cmderrors.PlatformNotFoundError
217215
if !errors.As(err, &notFound) {
218-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error upgrading platform: %w", err))
216+
eventCB(update.NewErrorEvent(fmt.Errorf("error upgrading platform: %w", err)))
219217
return
220218
}
221219
// If the platform is not found, we will try to install it
@@ -232,16 +230,16 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
232230
),
233231
)
234232
if err != nil {
235-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error installing platform: %w", err))
233+
eventCB(update.NewErrorEvent(fmt.Errorf("error installing platform: %w", err)))
236234
return
237235
}
238236
} else if respCB().GetPlatform() == nil {
239-
eventsCh <- update.NewErrorEvent(fmt.Errorf("platform upgrade failed"))
237+
eventCB(update.NewErrorEvent(fmt.Errorf("platform upgrade failed")))
240238
return
241239
}
242240

243241
cbw := orchestrator.NewCallbackWriter(func(line string) {
244-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
242+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, line))
245243
})
246244

247245
err := srv.BurnBootloader(
@@ -253,10 +251,10 @@ func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []st
253251
commands.BurnBootloaderToServerStreams(ctx, cbw, cbw),
254252
)
255253
if err != nil {
256-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error burning bootloader: %w", err))
254+
eventCB(update.NewErrorEvent(fmt.Errorf("error burning bootloader: %w", err)))
257255
return
258256
}
259257
}()
260258

261-
return eventsCh, nil
259+
return nil
262260
}

internal/update/update.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type UpgradablePackage struct {
4646

4747
type ServiceUpdater interface {
4848
ListUpgradablePackages(ctx context.Context, matcher func(UpgradablePackage) bool) ([]UpgradablePackage, error)
49-
UpgradePackages(ctx context.Context, names []string) (<-chan Event, error)
49+
UpgradePackages(ctx context.Context, names []string, eventCB func(Event)) error
5050
}
5151

5252
type Manager struct {
@@ -136,23 +136,15 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage)
136136
// update of the cores we will end up with inconsistent state, or
137137
// we need to re run the upgrade because the orchestrator interrupted
138138
// in the middle the upgrade of the cores.
139-
arduinoEvents, err := m.arduinoPlatformUpdateService.UpgradePackages(ctx, arduinoPlatform)
140-
if err != nil {
139+
if err := m.arduinoPlatformUpdateService.UpgradePackages(ctx, arduinoPlatform, m.broadcast); err != nil {
141140
m.broadcast(NewErrorEvent(fmt.Errorf("failed to upgrade Arduino packages: %w", err)))
142141
return
143142
}
144-
for e := range arduinoEvents {
145-
m.broadcast(e)
146-
}
147143

148-
aptEvents, err := m.debUpdateService.UpgradePackages(ctx, debPkgs)
149-
if err != nil {
144+
if err := m.debUpdateService.UpgradePackages(ctx, debPkgs, m.broadcast); err != nil {
150145
m.broadcast(NewErrorEvent(fmt.Errorf("failed to upgrade APT packages: %w", err)))
151146
return
152147
}
153-
for e := range aptEvents {
154-
m.broadcast(e)
155-
}
156148

157149
m.broadcast(NewDataEvent(DoneEvent, "Update completed"))
158150
}()

0 commit comments

Comments
 (0)