Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/event/sseevent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"strconv"
"sync"
"time"

"github.com/opensvc/om3/v3/core/event"
Expand Down Expand Up @@ -42,6 +43,8 @@ type (
// timeout specifies the maximum duration to wait before Read returns
// nil if no events are available.
timeout time.Duration

mutex sync.RWMutex
}

Writer struct {
Expand Down Expand Up @@ -106,9 +109,12 @@ func (r *ReadCloser) Buffer(buf []byte, max int) {

// Read returns *Event read from EventReader r
func (r *ReadCloser) Read() (*event.Event, error) {
r.mutex.RLock()
if r.closed {
r.mutex.RUnlock()
return nil, ErrClosed
}
r.mutex.RUnlock()
if !r.parseStarted {
go r.parse()
r.parseStarted = true
Expand Down Expand Up @@ -147,6 +153,8 @@ func (r *ReadCloser) Read() (*event.Event, error) {

// Close ask wrapped io.readCloser for Close
func (r *ReadCloser) Close() error {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.closed {
return ErrClosed
}
Expand Down
5 changes: 5 additions & 0 deletions daemon/daemonapi/get_daemon_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (a *DaemonAPI) GetClusterStatus(ctx echo.Context, params api.GetClusterStat

status := a.Daemondata.ClusterData()

if params.Selector != nil || params.Namespace != nil {
// Deep copy to avoid modifying the original
status = status.DeepCopy()
}

// Explicit object selector filtering
if params.Selector != nil {
status = status.WithSelector(*params.Selector)
Expand Down
8 changes: 4 additions & 4 deletions daemon/imon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ import (
"time"

"github.com/google/uuid"
"github.com/opensvc/om3/v3/daemon/daemonctx"
"golang.org/x/time/rate"

"github.com/opensvc/om3/v3/daemon/daemonctx"

"github.com/opensvc/om3/v3/core/instance"
"github.com/opensvc/om3/v3/core/naming"
"github.com/opensvc/om3/v3/core/node"
Expand Down Expand Up @@ -535,10 +536,9 @@ func (t *Manager) update() {
}

t.state.UpdatedAt = time.Now()
newValue := *t.state.DeepCopy()

instance.MonitorData.Set(t.path, t.localhost, newValue.DeepCopy())
t.publisher.Pub(&msgbus.InstanceMonitorUpdated{Path: t.path, Node: t.localhost, Value: newValue}, t.pubLabels...)
instance.MonitorData.Set(t.path, t.localhost, t.state.DeepCopy())
t.publisher.Pub(&msgbus.InstanceMonitorUpdated{Path: t.path, Node: t.localhost, Value: *t.state.DeepCopy()}, t.pubLabels...)
}

func (t *Manager) transitionTo(newState instance.MonitorState) {
Expand Down
10 changes: 7 additions & 3 deletions daemon/imon/main_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,10 @@ func (t *Manager) onSetInstanceMonitor(c *msgbus.SetInstanceMonitor) {
err := fmt.Errorf("%w: daemon: imon: %s: a %s orchestration is already in progress with id %s", instance.ErrInvalidGlobalExpect, *c.Value.GlobalExpect, t.state.GlobalExpect, t.state.OrchestrationID)
return err
}
// Clone global expect options to prevent data race when we have to update its
// value when we guess destinations
var globalExpectOptions any = c.Value.GlobalExpectOptions

switch *c.Value.GlobalExpect {
case instance.MonitorGlobalExpectPlacedAt:
options, ok := c.Value.GlobalExpectOptions.(instance.MonitorGlobalExpectOptionsPlacedAt)
Expand All @@ -524,7 +528,7 @@ func (t *Manager) onSetInstanceMonitor(c *msgbus.SetInstanceMonitor) {
return err
}
options.Destination = []string{dst}
c.Value.GlobalExpectOptions = options
globalExpectOptions = options
} else if options.Live && len(options.Destination) > 1 {
err := fmt.Errorf("%w: daemon: imon: %s: live migration is not possible with multiple destinations", instance.ErrInvalidGlobalExpect, *c.Value.GlobalExpect)
globalExpectRefused()
Expand All @@ -547,7 +551,7 @@ func (t *Manager) onSetInstanceMonitor(c *msgbus.SetInstanceMonitor) {
t.log.Infof("set instance monitor: change destination nodes from %s to %s", want, can)
}
options.Destination = []string{can}
c.Value.GlobalExpectOptions = options
globalExpectOptions = options
}
case instance.MonitorGlobalExpectStarted:
if v, reason := t.isStartable(); !v {
Expand Down Expand Up @@ -581,7 +585,7 @@ func (t *Manager) onSetInstanceMonitor(c *msgbus.SetInstanceMonitor) {
}
t.change = true
t.state.GlobalExpect = *c.Value.GlobalExpect
t.state.GlobalExpectOptions = c.Value.GlobalExpectOptions
t.state.GlobalExpectOptions = globalExpectOptions
// update GlobalExpectUpdated now
// This will allow remote nodes to pickup most recent value
t.state.GlobalExpectUpdatedAt = time.Now()
Expand Down
2 changes: 1 addition & 1 deletion daemon/msgbus/instance_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (data *ClusterData) onInstanceMonitorDeleted(m *InstanceMonitorDeleted) {
// onInstanceMonitorUpdated updates .cluster.node.<node>.instance.<path>.monitor
func (data *ClusterData) onInstanceMonitorUpdated(m *InstanceMonitorUpdated) {
s := m.Path.String()
value := m.Value.DeepCopy()
value := &m.Value
if cnode, ok := data.Cluster.Node[m.Node]; ok {
if cnode.Instance == nil {
cnode.Instance = make(map[string]instance.Instance)
Expand Down
8 changes: 3 additions & 5 deletions daemon/msgbus/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"time"

"github.com/google/uuid"

"github.com/opensvc/om3/v3/util/plog"

"github.com/opensvc/om3/v3/core/cluster"
Expand Down Expand Up @@ -279,9 +280,6 @@ func KindToT(kind string) (any, error) {
// EventToMessage converts event.Event message as pubsub.Messager
func EventToMessage(ev event.Event) (pubsub.Messager, error) {
var c pubsub.Messager
if ev.Data == nil {
return c, nil
}
i, err := KindToT(ev.Kind)
if err != nil {
return c, errors.New("can't decode " + ev.Kind)
Expand All @@ -303,13 +301,13 @@ type (

AuditStart struct {
pubsub.Msg `yaml:",inline"`
Q chan plog.LogMessage `json:"q" yaml:"q"`
Q chan plog.LogMessage `json:"-" yaml:"-"`
Subsystems []string `json:"subsystems" yaml:"subsystems"`
}

AuditStop struct {
pubsub.Msg `yaml:",inline"`
Q chan plog.LogMessage `json:"q" yaml:"q"`
Q chan plog.LogMessage `json:"-" yaml:"-"`
Subsystems []string `json:"subsystems" yaml:"subsystems"`
}

Expand Down
Loading