Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 19 additions & 38 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type App struct {
sysLog *syslog.Writer
config *config.Config
dcs dcs.DCS
appDCS IAppDCS
cluster *mysql.Cluster
filelock *flock.Flock
t *Timings
Expand Down Expand Up @@ -148,6 +149,7 @@ func (app *App) connectDCS() error {
if err != nil {
return fmt.Errorf("failed to connect to zkDCS: %w", err)
}
app.appDCS = NewAppDCS(app.dcs, app.config, app.logger)
return nil
}

Expand Down Expand Up @@ -212,7 +214,7 @@ func (app *App) healthChecker(ctx context.Context) {
hc := app.getLocalNodeState()
logFile, maxLogPos = hc.UpdateBinlogStatus(logFile, maxLogPos)
app.logger.Info().Msgf("healthcheck: %v", hc)
err := app.dcs.SetEphemeral(dcs.JoinPath(pathHealthPrefix, app.config.Hostname), hc)
err := app.SetHealthState(app.config.Hostname, hc)
if err != nil {
app.logger.Error().Err(err).Msg("healthcheck: failed to set status to dcs")
}
Expand Down Expand Up @@ -609,7 +611,7 @@ func (app *App) stateManager() appState {
// Signal to dcs that we have set light maintenance mode
if !maintenance.MaintAcquired() {
maintenance.MySyncPaused = true
err := app.dcs.Set(pathMaintenance, maintenance)
err := app.SetMaintenance(maintenance)

if err != nil {
app.logger.Error().Err(err).Msg("failed to enter light maintenance mode")
Expand All @@ -634,7 +636,7 @@ func (app *App) stateManager() appState {

// check if switchover required or in progress
switchover := new(Switchover)
if err := app.dcs.Get(pathCurrentSwitch, switchover); err == nil {
if err := app.GetCurrentSwitchover(switchover); err == nil {
// failover via DCS is suppressed during light maintenance (only manual switchover is allowed)
if lightMaintenance && switchover.MasterTransition == FailoverTransition {
app.logger.Info().Msgf("failover suppressed by light maintenance mode")
Expand Down Expand Up @@ -664,7 +666,7 @@ func (app *App) stateManager() appState {
return stateManager
}
err = app.performSwitchover(clusterState, activeNodes, switchover, master)
if errors.Is(app.dcs.Get(pathCurrentSwitch, new(Switchover)), dcs.ErrNotFound) {
if errors.Is(app.GetCurrentSwitchover(new(Switchover)), dcs.ErrNotFound) {
app.logger.Error().Msgf("switchover was aborted")
} else {
if err != nil {
Expand Down Expand Up @@ -920,15 +922,8 @@ func (app *App) approveFailover(clusterState, clusterStateDcs map[string]*nodest
return err
}

var lastSwitchover Switchover
err = app.dcs.Get(pathLastSwitch, &lastSwitchover)
if !errors.Is(err, dcs.ErrNotFound) {
if err != nil {
return err
}
if lastSwitchover.Result == nil {
return fmt.Errorf("another switchover in progress. this should never happen")
}
lastSwitchover := app.appDCS.GetLastSwitchover()
if lastSwitchover.Result != nil {
timeAfterLastSwitchover := time.Since(lastSwitchover.Result.FinishedAt)
if timeAfterLastSwitchover < app.config.FailoverCooldown && lastSwitchover.Cause == CauseAuto {
return fmt.Errorf("not enough time from last failover %s (cooldown %s)", lastSwitchover.Result.FinishedAt, app.config.FailoverCooldown)
Expand Down Expand Up @@ -1151,7 +1146,7 @@ func (app *App) updateActiveNodes(clusterState, clusterStateDcs map[string]*node
app.disableSemiSyncIfNonNeeded(node, state)
}
// then update DCS
err = app.dcs.Set(pathActiveNodes, activeNodes)
err = app.SetActiveNodes(activeNodes)
if err != nil {
app.logger.Error().Err(err).Msg("update active nodes: failed to update active nodes in dcs")
return err
Expand Down Expand Up @@ -1226,7 +1221,7 @@ func (app *App) updateActiveNodes(clusterState, clusterStateDcs map[string]*node
}

// then update DCS
err = app.dcs.Set(pathActiveNodes, activeNodes)
err = app.SetActiveNodes(activeNodes)
if err != nil {
app.logger.Error().Err(err).Msg("update active nodes: failed to update active nodes in dcs")
return err
Expand Down Expand Up @@ -1647,7 +1642,7 @@ func (app *App) performSwitchover(clusterState map[string]*nodestate.NodeState,
}

// set new master in dcs
err = app.dcs.Set(pathMasterNode, newMaster)
_, err = app.SetMasterHost(newMaster)
if err != nil || app.emulateError("promote_set_to_dcs") {
return fmt.Errorf("failed to set new master to dcs: %w", err)
}
Expand Down Expand Up @@ -2213,13 +2208,13 @@ func (app *App) enterMaintenance(maintenance *Maintenance, master string) error
if err != nil {
return err
}
err = app.dcs.Delete(pathActiveNodes)
err = app.DeleteActiveNodes()
if err != nil {
return err
}
}
maintenance.MySyncPaused = true
return app.dcs.Set(pathMaintenance, maintenance)
return app.SetMaintenance(maintenance)
}

func (app *App) leaveMaintenance() error {
Expand Down Expand Up @@ -2252,7 +2247,7 @@ func (app *App) leaveMaintenance() error {
if len(activeNodes) == 0 {
return ErrNoActiveNodes
}
return app.dcs.Delete(pathMaintenance)
return app.DeleteMaintenance()
}

func (app *App) performChangeMaster(host, master string) error {
Expand Down Expand Up @@ -2459,7 +2454,7 @@ func (app *App) getClusterStateFromDcs() (map[string]*nodestate.NodeState, error
hosts := app.cluster.AllNodeHosts()
getter := func(host string) (*nodestate.NodeState, error) {
nodeState := new(nodestate.NodeState)
err := app.dcs.Get(dcs.JoinPath(pathHealthPrefix, host), nodeState)
err := app.GetHealthState(host, nodeState)
if err != nil && !errors.Is(err, dcs.ErrNotFound) {
return nil, err
}
Expand All @@ -2481,7 +2476,7 @@ func (app *App) waitForCatchUp(node *mysql.Node, gtidset gtids.GTIDSet, timeout
return true, nil
}
switchover := new(Switchover)
if errors.Is(app.dcs.Get(pathCurrentSwitch, switchover), dcs.ErrNotFound) {
if errors.Is(app.GetCurrentSwitchover(switchover), dcs.ErrNotFound) {
return false, nil
}
if app.CheckAsyncSwitchAllowed(node, switchover) {
Expand Down Expand Up @@ -2527,24 +2522,11 @@ func (app *App) stopReplicationOnMaster(masterNode *mysql.Node) error {
}

func (app *App) fetchCascadeNodeConfigurations() (map[string]mysql.CascadeNodeConfiguration, error) {
cascadeTopology := make(map[string]mysql.CascadeNodeConfiguration)

hosts, err := app.dcs.GetChildren(dcs.PathCascadeNodesPrefix)
cascadeTopology, err := app.FetchCascadeNodeConfigurations()
if err != nil {
app.logger.Warn().Msgf("repair: Failed to fetch CascadeNodeConfigurations")
return cascadeTopology, err
}
for _, host := range hosts {
var cnc mysql.CascadeNodeConfiguration
err := app.dcs.Get(dcs.JoinPath(dcs.PathCascadeNodesPrefix, host), &cnc)
if err != nil {
app.logger.Warn().Msgf("repair: Failed to fetch CascadeNodeConfiguration for %s", host)
return cascadeTopology, err
}
cascadeTopology[host] = cnc
}

return cascadeTopology, nil
return cascadeTopology, err
}

func (app *App) getNodePositions(activeNodes []string) ([]nodePosition, error) {
Expand Down Expand Up @@ -2585,8 +2567,7 @@ func (app *App) getNodePositions(activeNodes []string) ([]nodePosition, error) {
}
}

var nc mysql.NodeConfiguration
err = app.dcs.Get(dcs.JoinPath(pathHANodes, host), &nc)
nc, err := app.GetNodeConfiguration(host)
if err != nil {
if !errors.Is(err, dcs.ErrNotFound) && !errors.Is(err, dcs.ErrMalformed) {
return fmt.Errorf("failed to get priority for host %s: %w", host, err)
Expand Down
Loading
Loading