Skip to content

Commit d954a1a

Browse files
committed
Use callback instead of channels
1 parent 91cd5a6 commit d954a1a

File tree

3 files changed

+127
-142
lines changed

3 files changed

+127
-142
lines changed

internal/update/apt/service.go

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -73,75 +73,73 @@ func (s *Service) ListUpgradablePackages(ctx context.Context, matcher func(updat
7373
// UpgradePackages upgrades the specified packages using the `apt-get upgrade` command.
7474
// It publishes events to subscribers during the upgrade process.
7575
// It returns an error if the upgrade is already in progress or if the upgrade command fails.
76-
func (s *Service) UpgradePackages(ctx context.Context, names []string) (<-chan update.Event, error) {
76+
func (s *Service) UpgradePackages(ctx context.Context, names []string, eventCB func(update.Event)) error {
7777
if !s.lock.TryLock() {
78-
return nil, update.ErrOperationAlreadyInProgress
78+
return update.ErrOperationAlreadyInProgress
7979
}
80-
eventsCh := make(chan update.Event, 100)
8180

8281
go func() {
8382
defer s.lock.Unlock()
84-
defer close(eventsCh)
8583

8684
// At the end of the upgrade, always try to restart the services (that need it).
8785
// This makes sure key services are restarted even if an error happens in the upgrade steps (for examples container images download).
8886
defer func() {
89-
eventsCh <- update.NewDataEvent(update.RestartEvent, "Upgrade completed. Restarting ...")
87+
eventCB(update.NewDataEvent(update.RestartEvent, "Upgrade completed. Restarting ..."))
9088

9189
err := restartServices(ctx)
9290
if err != nil {
93-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error restarting services after upgrade: %w", err))
91+
eventCB(update.NewErrorEvent(fmt.Errorf("error restarting services after upgrade: %w", err)))
9492
return
9593
}
9694
}()
9795

98-
eventsCh <- update.NewDataEvent(update.StartEvent, "Upgrade is starting")
96+
eventCB(update.NewDataEvent(update.StartEvent, "Upgrade is starting"))
9997
stream := runUpgradeCommand(ctx, names)
10098
for line, err := range stream {
10199
if err != nil {
102-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error running upgrade command: %w", err))
100+
eventCB(update.NewErrorEvent(fmt.Errorf("error running upgrade command: %w", err)))
103101
return
104102
}
105-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
103+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, line))
106104
}
107105

108-
eventsCh <- update.NewDataEvent(update.StartEvent, "apt cleaning cache is starting")
106+
eventCB(update.NewDataEvent(update.StartEvent, "apt cleaning cache is starting"))
109107
for line, err := range runAptCleanCommand(ctx) {
110108
if err != nil {
111-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error running apt clean command: %w", err))
109+
eventCB(update.NewErrorEvent(fmt.Errorf("error running apt clean command: %w", err)))
112110
return
113111
}
114-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
112+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, line))
115113
}
116114

117-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, "Stop and destroy docker containers and images ....")
115+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, "Stop and destroy docker containers and images ...."))
118116
streamCleanup := cleanupDockerContainers(ctx)
119117
for line, err := range streamCleanup {
120118
if err != nil {
121119
// TODO: maybe we should retun an error or a better feedback to the user?
122120
// currently, we just log the error and continue considenring not blocking
123121
slog.Warn("Error stopping and destroying docker containers", "error", err)
124122
} else {
125-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
123+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, line))
126124
}
127125
}
128126

129127
// TODO: Remove this workaround once docker image versions are no longer hardcoded in arduino-app-cli.
130128
// Tracking issue: https://github.com/arduino/arduino-app-cli/issues/600
131129
// Currently, we need to launch `arduino-app-cli system init` to pull the latest docker images because
132130
// the version of the docker images are hardcoded in the (new downloaded) version of the arduino-app-cli.
133-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, "Pulling the latest docker images ...")
131+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, "Pulling the latest docker images ..."))
134132
streamDocker := pullDockerImages(ctx)
135133
for line, err := range streamDocker {
136134
if err != nil {
137-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error pulling docker images: %w", err))
135+
eventCB(update.NewErrorEvent(fmt.Errorf("error pulling docker images: %w", err)))
138136
return
139137
}
140-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
138+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, line))
141139
}
142140
}()
143141

144-
return eventsCh, nil
142+
return nil
145143
}
146144

147145
// runDpkgConfigureCommand is need in case an upgrade was interrupted in the middle

internal/update/arduino/arduino.go

Lines changed: 94 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -134,129 +134,125 @@ func (a *ArduinoPlatformUpdater) ListUpgradablePackages(ctx context.Context, _ f
134134
}
135135

136136
// UpgradePackages implements ServiceUpdater.
137-
func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []string) (<-chan update.Event, error) {
137+
func (a *ArduinoPlatformUpdater) UpgradePackages(ctx context.Context, names []string, eventCB func(update.Event)) error {
138138
if !a.lock.TryLock() {
139-
return nil, update.ErrOperationAlreadyInProgress
139+
return update.ErrOperationAlreadyInProgress
140140
}
141-
eventsCh := make(chan update.Event, 100)
142141

143142
downloadProgressCB := func(curr *rpc.DownloadProgress) {
144143
data := helpers.ArduinoCLIDownloadProgressToString(curr)
145144
slog.Debug("Download progress", slog.String("download_progress", data))
146-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data)
145+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, data))
147146
}
148147
taskProgressCB := func(msg *rpc.TaskProgress) {
149148
data := helpers.ArduinoCLITaskProgressToString(msg)
150149
slog.Debug("Task progress", slog.String("task_progress", data))
151-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, data)
150+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, data))
152151
}
153152

154-
go func() {
155-
defer a.lock.Unlock()
156-
defer close(eventsCh)
153+
defer a.lock.Unlock()
157154

158-
eventsCh <- update.NewDataEvent(update.StartEvent, "Upgrade is starting")
155+
eventCB(update.NewDataEvent(update.StartEvent, "Upgrade is starting"))
159156

160-
logrus.SetLevel(logrus.ErrorLevel) // Reduce the log level of arduino-cli
161-
srv := commands.NewArduinoCoreServer()
157+
logrus.SetLevel(logrus.ErrorLevel) // Reduce the log level of arduino-cli
158+
srv := commands.NewArduinoCoreServer()
162159

163-
if err := setConfig(ctx, srv); err != nil {
164-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error setting config: %w", err))
165-
return
160+
if err := setConfig(ctx, srv); err != nil {
161+
eventCB(update.NewErrorEvent(fmt.Errorf("error setting config: %w", err)))
162+
return nil
163+
}
164+
165+
var inst *rpc.Instance
166+
if resp, err := srv.Create(ctx, &rpc.CreateRequest{}); err != nil {
167+
eventCB(update.NewErrorEvent(fmt.Errorf("error creating arduino-cli instance: %w", err)))
168+
return nil
169+
} else {
170+
inst = resp.GetInstance()
171+
}
172+
defer func() {
173+
_, err := srv.CleanDownloadCacheDirectory(ctx, &rpc.CleanDownloadCacheDirectoryRequest{})
174+
if err != nil {
175+
slog.Error("Error cleaning cache directory", slog.Any("error", err))
166176
}
177+
_, _ = srv.Destroy(ctx, &rpc.DestroyRequest{Instance: inst})
178+
}()
167179

168-
var inst *rpc.Instance
169-
if resp, err := srv.Create(ctx, &rpc.CreateRequest{}); err != nil {
170-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error creating arduino-cli instance: %w", err))
171-
return
172-
} else {
173-
inst = resp.GetInstance()
180+
{
181+
stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, downloadProgressCB)
182+
if err := srv.UpdateIndex(&rpc.UpdateIndexRequest{Instance: inst}, stream); err != nil {
183+
eventCB(update.NewErrorEvent(fmt.Errorf("error updating index: %w", err)))
184+
return nil
174185
}
175-
defer func() {
176-
_, err := srv.CleanDownloadCacheDirectory(ctx, &rpc.CleanDownloadCacheDirectoryRequest{})
177-
if err != nil {
178-
slog.Error("Error cleaning cache directory", slog.Any("error", err))
179-
}
180-
_, _ = srv.Destroy(ctx, &rpc.DestroyRequest{Instance: inst})
181-
}()
182-
183-
{
184-
stream, _ := commands.UpdateIndexStreamResponseToCallbackFunction(ctx, downloadProgressCB)
185-
if err := srv.UpdateIndex(&rpc.UpdateIndexRequest{Instance: inst}, stream); err != nil {
186-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error updating index: %w", err))
187-
return
188-
}
189-
if err := srv.Init(&rpc.InitRequest{Instance: inst}, commands.InitStreamResponseToCallbackFunction(ctx, nil)); err != nil {
190-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error initializing instance: %w", err))
191-
return
192-
}
186+
if err := srv.Init(&rpc.InitRequest{Instance: inst}, commands.InitStreamResponseToCallbackFunction(ctx, nil)); err != nil {
187+
eventCB(update.NewErrorEvent(fmt.Errorf("error initializing instance: %w", err)))
188+
return nil
193189
}
190+
}
194191

195-
stream, respCB := commands.PlatformUpgradeStreamResponseToCallbackFunction(
196-
ctx,
197-
downloadProgressCB,
198-
taskProgressCB,
199-
)
200-
if err := srv.PlatformUpgrade(
201-
&rpc.PlatformUpgradeRequest{
202-
Instance: inst,
203-
PlatformPackage: "arduino",
204-
Architecture: "zephyr",
205-
SkipPostInstall: false,
206-
SkipPreUninstall: false,
207-
},
208-
stream,
209-
); err != nil {
210-
var alreadyPresent *cmderrors.PlatformAlreadyAtTheLatestVersionError
211-
if errors.As(err, &alreadyPresent) {
212-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, alreadyPresent.Error())
213-
return
214-
}
215-
216-
var notFound *cmderrors.PlatformNotFoundError
217-
if !errors.As(err, &notFound) {
218-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error upgrading platform: %w", err))
219-
return
220-
}
221-
// If the platform is not found, we will try to install it
222-
err := srv.PlatformInstall(
223-
&rpc.PlatformInstallRequest{
224-
Instance: inst,
225-
PlatformPackage: "arduino",
226-
Architecture: "zephyr",
227-
},
228-
commands.PlatformInstallStreamResponseToCallbackFunction(
229-
ctx,
230-
downloadProgressCB,
231-
taskProgressCB,
232-
),
233-
)
234-
if err != nil {
235-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error installing platform: %w", err))
236-
return
237-
}
238-
} else if respCB().GetPlatform() == nil {
239-
eventsCh <- update.NewErrorEvent(fmt.Errorf("platform upgrade failed"))
240-
return
192+
stream, respCB := commands.PlatformUpgradeStreamResponseToCallbackFunction(
193+
ctx,
194+
downloadProgressCB,
195+
taskProgressCB,
196+
)
197+
if err := srv.PlatformUpgrade(
198+
&rpc.PlatformUpgradeRequest{
199+
Instance: inst,
200+
PlatformPackage: "arduino",
201+
Architecture: "zephyr",
202+
SkipPostInstall: false,
203+
SkipPreUninstall: false,
204+
},
205+
stream,
206+
); err != nil {
207+
var alreadyPresent *cmderrors.PlatformAlreadyAtTheLatestVersionError
208+
if errors.As(err, &alreadyPresent) {
209+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, alreadyPresent.Error()))
210+
return nil
241211
}
242212

243-
cbw := orchestrator.NewCallbackWriter(func(line string) {
244-
eventsCh <- update.NewDataEvent(update.UpgradeLineEvent, line)
245-
})
246-
247-
err := srv.BurnBootloader(
248-
&rpc.BurnBootloaderRequest{
249-
Instance: inst,
250-
Fqbn: "arduino:zephyr:unoq",
251-
Programmer: "jlink",
213+
var notFound *cmderrors.PlatformNotFoundError
214+
if !errors.As(err, &notFound) {
215+
eventCB(update.NewErrorEvent(fmt.Errorf("error upgrading platform: %w", err)))
216+
return nil
217+
}
218+
// If the platform is not found, we will try to install it
219+
err := srv.PlatformInstall(
220+
&rpc.PlatformInstallRequest{
221+
Instance: inst,
222+
PlatformPackage: "arduino",
223+
Architecture: "zephyr",
252224
},
253-
commands.BurnBootloaderToServerStreams(ctx, cbw, cbw),
225+
commands.PlatformInstallStreamResponseToCallbackFunction(
226+
ctx,
227+
downloadProgressCB,
228+
taskProgressCB,
229+
),
254230
)
255231
if err != nil {
256-
eventsCh <- update.NewErrorEvent(fmt.Errorf("error burning bootloader: %w", err))
257-
return
232+
eventCB(update.NewErrorEvent(fmt.Errorf("error installing platform: %w", err)))
233+
return nil
258234
}
259-
}()
235+
} else if respCB().GetPlatform() == nil {
236+
eventCB(update.NewErrorEvent(fmt.Errorf("platform upgrade failed")))
237+
return nil
238+
}
239+
240+
cbw := orchestrator.NewCallbackWriter(func(line string) {
241+
eventCB(update.NewDataEvent(update.UpgradeLineEvent, line))
242+
})
260243

261-
return eventsCh, nil
244+
err := srv.BurnBootloader(
245+
&rpc.BurnBootloaderRequest{
246+
Instance: inst,
247+
Fqbn: "arduino:zephyr:unoq",
248+
Programmer: "jlink",
249+
},
250+
commands.BurnBootloaderToServerStreams(ctx, cbw, cbw),
251+
)
252+
if err != nil {
253+
eventCB(update.NewErrorEvent(fmt.Errorf("error burning bootloader: %w", err)))
254+
return nil
255+
}
256+
257+
return nil
262258
}

internal/update/update.go

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type UpgradablePackage struct {
4646

4747
type ServiceUpdater interface {
4848
ListUpgradablePackages(ctx context.Context, matcher func(UpgradablePackage) bool) ([]UpgradablePackage, error)
49-
UpgradePackages(ctx context.Context, names []string) (<-chan Event, error)
49+
UpgradePackages(ctx context.Context, names []string, eventCB func(Event)) error
5050
}
5151

5252
type Manager struct {
@@ -129,33 +129,24 @@ func (m *Manager) UpgradePackages(ctx context.Context, pkgs []UpgradablePackage)
129129
}
130130
}
131131

132-
go func() {
133-
defer m.lock.Unlock()
134-
// We are launching on purpose the update sequentially. The reason is that
135-
// the deb pkgs restart the orchestrator, and if we run in parallel the
136-
// update of the cores we will end up with inconsistent state, or
137-
// we need to re run the upgrade because the orchestrator interrupted
138-
// in the middle the upgrade of the cores.
139-
arduinoEvents, err := m.arduinoPlatformUpdateService.UpgradePackages(ctx, arduinoPlatform)
140-
if err != nil {
141-
m.broadcast(NewErrorEvent(fmt.Errorf("failed to upgrade Arduino packages: %w", err)))
142-
return
143-
}
144-
for e := range arduinoEvents {
145-
m.broadcast(e)
146-
}
132+
defer m.lock.Unlock()
147133

148-
aptEvents, err := m.debUpdateService.UpgradePackages(ctx, debPkgs)
149-
if err != nil {
150-
m.broadcast(NewErrorEvent(fmt.Errorf("failed to upgrade APT packages: %w", err)))
151-
return
152-
}
153-
for e := range aptEvents {
154-
m.broadcast(e)
155-
}
134+
// We are launching on purpose the update sequentially. The reason is that
135+
// the deb pkgs restart the orchestrator, and if we run in parallel the
136+
// update of the cores we will end up with inconsistent state, or
137+
// we need to re run the upgrade because the orchestrator interrupted
138+
// in the middle the upgrade of the cores.
139+
if err := m.arduinoPlatformUpdateService.UpgradePackages(ctx, arduinoPlatform, m.broadcast); err != nil {
140+
m.broadcast(NewErrorEvent(fmt.Errorf("failed to upgrade Arduino packages: %w", err)))
141+
return nil
142+
}
143+
144+
if err := m.debUpdateService.UpgradePackages(ctx, debPkgs, m.broadcast); err != nil {
145+
m.broadcast(NewErrorEvent(fmt.Errorf("failed to upgrade APT packages: %w", err)))
146+
return nil
147+
}
156148

157-
m.broadcast(NewDataEvent(DoneEvent, "Update completed"))
158-
}()
149+
m.broadcast(NewDataEvent(DoneEvent, "Update completed"))
159150
return nil
160151
}
161152

0 commit comments

Comments
 (0)