From 59e111705b04c87d17ce01feb469af7db314fee0 Mon Sep 17 00:00:00 2001 From: Daniel Saunders Date: Thu, 27 Jun 2024 16:18:56 -0400 Subject: [PATCH 1/5] Add simple cron scheduling --- scheduler/scheduler.go | 216 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 216 insertions(+) create mode 100644 scheduler/scheduler.go diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go new file mode 100644 index 0000000..4f6018f --- /dev/null +++ b/scheduler/scheduler.go @@ -0,0 +1,216 @@ +package scheduler + +import ( + "context" + "fmt" + "log" + "regexp" + "strconv" + "strings" + "sync" + "time" +) + +const cronRegex = `^([0-5][0-9]|\*) ([0-1][0-9]|2[0-4]|\*) ([0-2][0-9]|3[0-1]|\*) ([0-9]|1[0-2]|\*) ([0-6]|\*)$` + +type schedule struct { + min int + hour int + day int + month int + weekday int +} + +type job struct { + cmd func() + name string + schedule schedule +} + +type Scheduler struct { + cv *sync.Cond + jobs []job + logger *log.Logger + mutex *sync.Mutex + numReady int + ready sync.Map +} + +func New(logger *log.Logger) *Scheduler { + mutex := &sync.Mutex{} + cv := sync.NewCond(mutex) + + return &Scheduler{ + cv: cv, + jobs: nil, + logger: logger, + mutex: mutex, + numReady: 0, + ready: sync.Map{}, + } +} + +func (s *Scheduler) Add(schedule string, cmd func()) (bool, error) { + return s.addJob(schedule, cmd, fmt.Sprintf("job-%d", time.Now().UTC().Unix())) +} + +func (s *Scheduler) AddWithName(schedule string, cmd func(), name string) (bool, error) { + return s.addJob(schedule, cmd, name) +} + +func (s *Scheduler) addJob(schedule string, cmd func(), name string) (bool, error) { + ok, err := validateSchedule(schedule) + if err != nil { + return false, err + } + + if !ok { + return false, nil + } + + job := job{ + cmd: cmd, + schedule: parseSchedule(schedule), + name: name, + } + + s.mutex.Lock() + s.ready.Store(s.numReady, true) + s.jobs = append(s.jobs, job) + s.numReady += 1 + s.mutex.Unlock() + + return true, nil +} + +func (s *Scheduler) Start(ctx context.Context) { + s.logger.Printf("Starting CRON scheduling with [%d] jobs.", len(s.jobs)) + go func() { + for { + select { + case <-ctx.Done(): + default: + s.mutex.Lock() + defer s.mutex.Unlock() + + for s.numReady == 0 { + s.cv.Wait() + } + + s.ready.Range(func(key, _ any) bool { + go func() { + job := s.jobs[key.(int)] + + duration := getDurationTillNextProc(job.schedule) + s.logger.Printf("Scheduling job [%s] | Time till next proc [%s]", job.name, duration) + + time.Sleep(duration) + job.cmd() + + s.mutex.Lock() + defer s.mutex.Unlock() + s.ready.Store(key, true) + s.numReady += 1 + s.cv.Signal() + }() + + s.ready.Delete(key) + s.numReady -= 1 + + return true + }) + } + } + }() +} + +func getDurationTillNextProc(s schedule) time.Duration { + currentDate := time.Now() + + var nextMin int + if s.min == -1 { + nextMin = currentDate.Minute() + 1 + } else { + nextMin = s.min + } + + var nextHour int + if s.hour == -1 { + nextHour = currentDate.Hour() + if nextMin < currentDate.Minute() { + nextHour += 1 + } + } else { + nextHour = s.hour + } + + var nextDay int + if s.day == -1 { + nextDay = currentDate.Day() + if nextHour < currentDate.Hour() { + nextDay += 1 + } + } else { + nextDay = s.day + } + + var nextMonth int + if s.month == -1 { + nextMonth = int(currentDate.Month()) + if nextDay < currentDate.Day() { + nextMonth += 1 + } + } else { + nextMonth = s.month + } + + var nextYear int = currentDate.Year() + if nextMonth < int(currentDate.Month()) { + nextYear += 1 + } + + nextDate := time.Date(nextYear, time.Month(nextMonth), nextDay, nextHour, nextMin, 0, 0, currentDate.Location()) + return nextDate.Sub(currentDate) +} + +func validateSchedule(schedule string) (bool, error) { + // For now just match basic numerics and wildcards + ok, err := regexp.MatchString(cronRegex, schedule) + if err != nil { + return false, err + } + + return ok, nil +} + +func parseSchedule(s string) schedule { + timings := strings.Split(s, " ") + + min := convCronTiming(timings[0], -1) + hour := convCronTiming(timings[1], -1) + day := convCronTiming(timings[2], -1) + month := convCronTiming(timings[3], -1) + weekday := convCronTiming(timings[4], -1) + + return schedule{ + min: min, + hour: hour, + day: day, + month: month, + weekday: weekday, + } +} + +func convCronTiming(timing string, defaultVal int) int { + if timing != "*" { + val, err := strconv.Atoi(timing) + + // Conversion should not fail because of regex matching string prior + if err != nil { + panic("Conversion of cron timing should not have failed") + } + return val + } + + return defaultVal +} From 9594b8ceb40a67420ce8bfa3443f3673e8ea5e21 Mon Sep 17 00:00:00 2001 From: Daniel Saunders Date: Thu, 27 Jun 2024 16:53:39 -0400 Subject: [PATCH 2/5] Fix: remove defer on lock to prevent deadlock --- scheduler/scheduler.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 4f6018f..1d7c2da 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -91,8 +91,6 @@ func (s *Scheduler) Start(ctx context.Context) { case <-ctx.Done(): default: s.mutex.Lock() - defer s.mutex.Unlock() - for s.numReady == 0 { s.cv.Wait() } @@ -119,6 +117,7 @@ func (s *Scheduler) Start(ctx context.Context) { return true }) + s.mutex.Unlock() } } }() From 2c08ed29caa795445de885e3bd3c4a869182e443 Mon Sep 17 00:00:00 2001 From: Daniel Saunders Date: Thu, 27 Jun 2024 23:34:38 -0400 Subject: [PATCH 3/5] feat: Add more detailed regex validation and step capability --- scheduler/schedule.go | 148 +++++++++++++++++++++++++++++++++++++++++ scheduler/scheduler.go | 144 ++++++--------------------------------- 2 files changed, 168 insertions(+), 124 deletions(-) create mode 100644 scheduler/schedule.go diff --git a/scheduler/schedule.go b/scheduler/schedule.go new file mode 100644 index 0000000..55eb886 --- /dev/null +++ b/scheduler/schedule.go @@ -0,0 +1,148 @@ +package scheduler + +import ( + "regexp" + "strconv" + "strings" + "time" +) + +const cronRegex = `^(\*|[0-5]?\d)(\/\d+)? (\*|[01]?\d|2[0-3])(\/\d+)? (\*|0?[1-9]|[12]\d|3[01])(\/\d+)? (\*|0?[1-9]|1[0-2])(\/\d+)? (\*|[0-6])(\/\d+)?$` + +type timingType int + +const ( + concrete timingType = iota + step + wildcard +) + +const ( + minMax = 60 + hourMax = 24 + dayMax = 31 + monthMax = 12 + weekdayMax = 6 +) + +type timing struct { + typ timingType + val int +} + +type schedule struct { + min timing + hour timing + day timing + month timing + weekday timing +} + +func getDurationTillNextProc(s schedule) time.Duration { + currentDate := time.Now() + + var nextMin int + if s.min.typ == wildcard { + nextMin = currentDate.Minute() + 1 + } else if s.min.typ == step { + stepped := min(currentDate.Minute()+s.min.val, minMax) + nextMin = stepped - (stepped % s.min.val) + } else { + nextMin = s.min.val + } + + var nextHour int + if s.hour.typ == wildcard { + nextHour = currentDate.Hour() + if s.min.typ == concrete && nextMin < currentDate.Minute() { + nextHour += 1 + } + } else if s.hour.typ == step { + + } else { + nextHour = s.hour.val + } + + var nextDay int + if s.day.typ == wildcard { + nextDay = currentDate.Day() + if nextHour < currentDate.Hour() { + nextDay += 1 + } + } else { + nextDay = s.day.val + } + + var nextMonth int + if s.month.typ == wildcard { + nextMonth = int(currentDate.Month()) + if nextDay < currentDate.Day() { + nextMonth += 1 + } + } else { + nextMonth = s.month.val + } + + var nextYear int = currentDate.Year() + if nextMonth < int(currentDate.Month()) { + nextYear += 1 + } + + nextDate := time.Date(nextYear, time.Month(nextMonth), nextDay, nextHour, nextMin, 0, 0, currentDate.Location()) + return nextDate.Sub(currentDate) +} + +func validateSchedule(schedule string) (bool, error) { + ok, err := regexp.MatchString(cronRegex, schedule) + if err != nil { + return false, err + } + + return ok, nil +} + +func parseSchedule(s string) schedule { + timings := strings.Split(s, " ") + + min := convCronTiming(timings[0], 0, minMax) + hour := convCronTiming(timings[1], 0, hourMax) + day := convCronTiming(timings[2], 1, dayMax) + month := convCronTiming(timings[3], 1, monthMax) + weekday := convCronTiming(timings[4], 0, weekdayMax) + + return schedule{ + min: min, + hour: hour, + day: day, + month: month, + weekday: weekday, + } +} + +func convCronTiming(timeOption string, minVal, maxVal int) timing { + if timeOption == "*" { + return timing{ + typ: wildcard, + val: minVal, + } + } + + var typ timingType + if ok, _ := regexp.MatchString(`^\*\/\d+$`, timeOption); ok { + timeOption = timeOption[2:] + typ = step + } else { + typ = concrete + } + + val, err := strconv.Atoi(timeOption) + if err != nil { + panic("String to int conversion should not have failed for cron string") + } + + return timing{ + typ: typ, + val: max(min(val, maxVal), minVal), + } + +} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 1d7c2da..c0d8be7 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -4,23 +4,10 @@ import ( "context" "fmt" "log" - "regexp" - "strconv" - "strings" "sync" "time" ) -const cronRegex = `^([0-5][0-9]|\*) ([0-1][0-9]|2[0-4]|\*) ([0-2][0-9]|3[0-1]|\*) ([0-9]|1[0-2]|\*) ([0-6]|\*)$` - -type schedule struct { - min int - hour int - day int - month int - weekday int -} - type job struct { cmd func() name string @@ -51,36 +38,11 @@ func New(logger *log.Logger) *Scheduler { } func (s *Scheduler) Add(schedule string, cmd func()) (bool, error) { - return s.addJob(schedule, cmd, fmt.Sprintf("job-%d", time.Now().UTC().Unix())) + return s.add(schedule, cmd, fmt.Sprintf("job-%d", time.Now().UTC().Unix())) } func (s *Scheduler) AddWithName(schedule string, cmd func(), name string) (bool, error) { - return s.addJob(schedule, cmd, name) -} - -func (s *Scheduler) addJob(schedule string, cmd func(), name string) (bool, error) { - ok, err := validateSchedule(schedule) - if err != nil { - return false, err - } - - if !ok { - return false, nil - } - - job := job{ - cmd: cmd, - schedule: parseSchedule(schedule), - name: name, - } - - s.mutex.Lock() - s.ready.Store(s.numReady, true) - s.jobs = append(s.jobs, job) - s.numReady += 1 - s.mutex.Unlock() - - return true, nil + return s.add(schedule, cmd, name) } func (s *Scheduler) Start(ctx context.Context) { @@ -96,6 +58,9 @@ func (s *Scheduler) Start(ctx context.Context) { } s.ready.Range(func(key, _ any) bool { + s.ready.Delete(key) + s.numReady -= 1 + go func() { job := s.jobs[key.(int)] @@ -112,9 +77,6 @@ func (s *Scheduler) Start(ctx context.Context) { s.cv.Signal() }() - s.ready.Delete(key) - s.numReady -= 1 - return true }) s.mutex.Unlock() @@ -123,93 +85,27 @@ func (s *Scheduler) Start(ctx context.Context) { }() } -func getDurationTillNextProc(s schedule) time.Duration { - currentDate := time.Now() - - var nextMin int - if s.min == -1 { - nextMin = currentDate.Minute() + 1 - } else { - nextMin = s.min - } - - var nextHour int - if s.hour == -1 { - nextHour = currentDate.Hour() - if nextMin < currentDate.Minute() { - nextHour += 1 - } - } else { - nextHour = s.hour - } - - var nextDay int - if s.day == -1 { - nextDay = currentDate.Day() - if nextHour < currentDate.Hour() { - nextDay += 1 - } - } else { - nextDay = s.day - } - - var nextMonth int - if s.month == -1 { - nextMonth = int(currentDate.Month()) - if nextDay < currentDate.Day() { - nextMonth += 1 - } - } else { - nextMonth = s.month - } - - var nextYear int = currentDate.Year() - if nextMonth < int(currentDate.Month()) { - nextYear += 1 - } - - nextDate := time.Date(nextYear, time.Month(nextMonth), nextDay, nextHour, nextMin, 0, 0, currentDate.Location()) - return nextDate.Sub(currentDate) -} - -func validateSchedule(schedule string) (bool, error) { - // For now just match basic numerics and wildcards - ok, err := regexp.MatchString(cronRegex, schedule) +func (s *Scheduler) add(schedule string, cmd func(), name string) (bool, error) { + ok, err := validateSchedule(schedule) if err != nil { return false, err } - return ok, nil -} - -func parseSchedule(s string) schedule { - timings := strings.Split(s, " ") - - min := convCronTiming(timings[0], -1) - hour := convCronTiming(timings[1], -1) - day := convCronTiming(timings[2], -1) - month := convCronTiming(timings[3], -1) - weekday := convCronTiming(timings[4], -1) - - return schedule{ - min: min, - hour: hour, - day: day, - month: month, - weekday: weekday, + if !ok { + return false, nil } -} - -func convCronTiming(timing string, defaultVal int) int { - if timing != "*" { - val, err := strconv.Atoi(timing) - // Conversion should not fail because of regex matching string prior - if err != nil { - panic("Conversion of cron timing should not have failed") - } - return val + job := job{ + cmd: cmd, + schedule: parseSchedule(schedule), + name: name, } - return defaultVal + s.mutex.Lock() + s.ready.Store(s.numReady, true) + s.jobs = append(s.jobs, job) + s.numReady += 1 + s.mutex.Unlock() + + return true, nil } From daf8e3bb0d0ea7357d2c17924d1ea0d4d2e25cb5 Mon Sep 17 00:00:00 2001 From: Daniel Saunders Date: Fri, 28 Jun 2024 14:34:15 -0400 Subject: [PATCH 4/5] feat: add step functionality to cron scheduling --- scheduler/schedule.go | 60 +++++++++++++++---------------------------- 1 file changed, 20 insertions(+), 40 deletions(-) diff --git a/scheduler/schedule.go b/scheduler/schedule.go index 55eb886..56701f8 100644 --- a/scheduler/schedule.go +++ b/scheduler/schedule.go @@ -41,55 +41,35 @@ type schedule struct { func getDurationTillNextProc(s schedule) time.Duration { currentDate := time.Now() - var nextMin int - if s.min.typ == wildcard { - nextMin = currentDate.Minute() + 1 - } else if s.min.typ == step { - stepped := min(currentDate.Minute()+s.min.val, minMax) - nextMin = stepped - (stepped % s.min.val) - } else { - nextMin = s.min.val - } - - var nextHour int - if s.hour.typ == wildcard { - nextHour = currentDate.Hour() - if s.min.typ == concrete && nextMin < currentDate.Minute() { - nextHour += 1 - } - } else if s.hour.typ == step { + nextMin := calcNextTime(s.min, currentDate.Minute(), minMax, 1) + nextHour := calcNextTime(s.hour, currentDate.Hour(), hourMax, 0) + nextDay := calcNextTime(s.day, currentDate.Day(), dayMax, 0) + nextMonth := calcNextTime(s.month, int(currentDate.Month()), monthMax, 0) - } else { - nextHour = s.hour.val + var nextYear int = currentDate.Year() + if nextMonth < int(currentDate.Month()) { + nextYear += 1 } - var nextDay int - if s.day.typ == wildcard { - nextDay = currentDate.Day() - if nextHour < currentDate.Hour() { - nextDay += 1 - } - } else { - nextDay = s.day.val + nextDate := time.Date(nextYear, time.Month(nextMonth), nextDay, nextHour, nextMin, 0, 0, currentDate.Location()) + return nextDate.Sub(currentDate) +} + +func calcNextTime(t timing, currentTime, maxVal, wildCardIncrement int) int { + if t.typ == wildcard { + return currentTime + wildCardIncrement } - var nextMonth int - if s.month.typ == wildcard { - nextMonth = int(currentDate.Month()) - if nextDay < currentDate.Day() { - nextMonth += 1 - } - } else { - nextMonth = s.month.val + if t.typ == step { + stepped := min(currentTime+t.val, maxVal) + return stepped - (stepped % min(t.val, maxVal)) } - var nextYear int = currentDate.Year() - if nextMonth < int(currentDate.Month()) { - nextYear += 1 + if t.val < currentTime { + return t.val + minMax } - nextDate := time.Date(nextYear, time.Month(nextMonth), nextDay, nextHour, nextMin, 0, 0, currentDate.Location()) - return nextDate.Sub(currentDate) + return t.val } func validateSchedule(schedule string) (bool, error) { From a48c875e74525542ed2d2853bb3b1456306cf375 Mon Sep 17 00:00:00 2001 From: Daniel Saunders Date: Sun, 14 Jul 2024 18:36:06 -0400 Subject: [PATCH 5/5] Handle step functionality --- scheduler/schedule.go | 70 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 59 insertions(+), 11 deletions(-) diff --git a/scheduler/schedule.go b/scheduler/schedule.go index 56701f8..563359e 100644 --- a/scheduler/schedule.go +++ b/scheduler/schedule.go @@ -39,20 +39,69 @@ type schedule struct { } func getDurationTillNextProc(s schedule) time.Duration { - currentDate := time.Now() + currentTime := time.Now() + + nextMonth := calcNextTime(s.month, int(currentTime.Month()), monthMax, 0) + + if nextMonth > int(currentTime.Month()) { + nextDate := time.Date( + currentTime.Year(), + time.Month(nextMonth), + 0, + 0, + 0, + 0, + 0, + currentTime.Location(), + ) + return nextDate.Sub(currentTime) + } - nextMin := calcNextTime(s.min, currentDate.Minute(), minMax, 1) - nextHour := calcNextTime(s.hour, currentDate.Hour(), hourMax, 0) - nextDay := calcNextTime(s.day, currentDate.Day(), dayMax, 0) - nextMonth := calcNextTime(s.month, int(currentDate.Month()), monthMax, 0) + nextDay := calcNextTime(s.day, currentTime.Day(), dayMax, 0) + + if nextDay > currentTime.Day() { + nextDate := time.Date( + currentTime.Year(), + time.Month(nextMonth), + nextDay, + 0, + 0, + 0, + 0, + currentTime.Location(), + ) + return nextDate.Sub(currentTime) + } - var nextYear int = currentDate.Year() - if nextMonth < int(currentDate.Month()) { - nextYear += 1 + nextHour := calcNextTime(s.hour, currentTime.Hour(), hourMax, 0) + + if nextHour > currentTime.Hour() { + nextDate := time.Date( + currentTime.Year(), + time.Month(nextMonth), + nextDay, + nextHour, + 0, + 0, + 0, + currentTime.Location(), + ) + return nextDate.Sub(currentTime) } - nextDate := time.Date(nextYear, time.Month(nextMonth), nextDay, nextHour, nextMin, 0, 0, currentDate.Location()) - return nextDate.Sub(currentDate) + nextMinute := calcNextTime(s.min, currentTime.Minute(), minMax, 1) + + nextDate := time.Date( + currentTime.Year(), + time.Month(nextMonth), + nextDay, + nextHour, + nextMinute, + 0, + 0, + currentTime.Location(), + ) + return nextDate.Sub(currentTime) } func calcNextTime(t timing, currentTime, maxVal, wildCardIncrement int) int { @@ -124,5 +173,4 @@ func convCronTiming(timeOption string, minVal, maxVal int) timing { typ: typ, val: max(min(val, maxVal), minVal), } - }