diff --git a/CHANGELOG.md b/CHANGELOG.md index aa2ac50..9f5f961 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ 1.1.0: + - add 'advanced' scheduler, designed to be more robust with higher parallel job load - fetch wallet accounts from Dirk in parallel - fetch process-concurrency configuration value from most specific point in hierarchy - add metrics to track strategy operation results diff --git a/docs/configuration.md b/docs/configuration.md index 099d988..68f43ed 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -32,6 +32,12 @@ graffiti: static: value: My graffiti +# scheduler handles the scheduling of Vouch's operations. +scheduler: + # style can be 'basic' (default) or 'advanced'. The advanced scheduler should be more robust, however it is + # newer and as such more likely to encounter issues. + style: basic + # submitter submits data to beacon nodes. If not present the nodes in beacon-node-address above will be used. submitter: # style can currently only be 'all' diff --git a/go.mod b/go.mod index 9052219..09780f8 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/wealdtech/go-eth2-wallet-store-scratch v1.6.2 github.com/wealdtech/go-eth2-wallet-types/v2 v2.8.4 github.com/wealdtech/go-majordomo v1.0.1 + go.uber.org/atomic v1.7.0 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c google.golang.org/grpc v1.38.0 gotest.tools v2.2.0+incompatible diff --git a/main.go b/main.go index 8b61d22..a41d588 100644 --- a/main.go +++ b/main.go @@ -48,6 +48,8 @@ import ( "github.com/attestantio/vouch/services/metrics" nullmetrics "github.com/attestantio/vouch/services/metrics/null" prometheusmetrics "github.com/attestantio/vouch/services/metrics/prometheus" + "github.com/attestantio/vouch/services/scheduler" + advancedscheduler "github.com/attestantio/vouch/services/scheduler/advanced" basicscheduler "github.com/attestantio/vouch/services/scheduler/basic" "github.com/attestantio/vouch/services/signer" standardsigner "github.com/attestantio/vouch/services/signer/standard" @@ -273,13 +275,10 @@ func startServices(ctx context.Context, majordomo majordomo.Service) error { return errors.Wrap(err, "failed to start chain time service") } - log.Trace().Msg("Starting scheduler") - scheduler, err := basicscheduler.New(ctx, - basicscheduler.WithLogLevel(logLevel(viper.GetString("scheduler.log-level"))), - basicscheduler.WithMonitor(monitor.(metrics.SchedulerMonitor)), - ) + log.Trace().Msg("Selecting scheduler") + scheduler, err := selectScheduler(ctx, monitor) if err != nil { - return errors.Wrap(err, "failed to start scheduler service") + return errors.Wrap(err, "failed to select scheduler") } log.Trace().Msg("Starting validators manager") @@ -533,6 +532,29 @@ func startMonitor(ctx context.Context) (metrics.Service, error) { return monitor, nil } +func selectScheduler(ctx context.Context, monitor metrics.Service) (scheduler.Service, error) { + var scheduler scheduler.Service + var err error + switch viper.GetString("scheduler.style") { + case "advanced": + log.Info().Msg("Starting advanced scheduler") + scheduler, err = advancedscheduler.New(ctx, + advancedscheduler.WithLogLevel(logLevel(viper.GetString("scheduler.log-level"))), + advancedscheduler.WithMonitor(monitor.(metrics.SchedulerMonitor)), + ) + default: + log.Info().Msg("Starting basic scheduler") + scheduler, err = basicscheduler.New(ctx, + basicscheduler.WithLogLevel(logLevel(viper.GetString("scheduler.log-level"))), + basicscheduler.WithMonitor(monitor.(metrics.SchedulerMonitor)), + ) + } + if err != nil { + return nil, errors.Wrap(err, "failed to start scheduler service") + } + return scheduler, nil +} + func startGraffitiProvider(ctx context.Context, majordomo majordomo.Service) (graffitiprovider.Service, error) { switch { case viper.Get("graffiti.dynamic") != nil: diff --git a/services/scheduler/advanced/parameters.go b/services/scheduler/advanced/parameters.go new file mode 100644 index 0000000..b39982f --- /dev/null +++ b/services/scheduler/advanced/parameters.go @@ -0,0 +1,68 @@ +// Copyright © 2021 Attestant Limited. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package advanced + +import ( + "github.com/attestantio/vouch/services/metrics" + nullmetrics "github.com/attestantio/vouch/services/metrics/null" + "github.com/rs/zerolog" +) + +type parameters struct { + logLevel zerolog.Level + monitor metrics.SchedulerMonitor +} + +// Parameter is the interface for service parameters. +type Parameter interface { + apply(*parameters) +} + +type parameterFunc func(*parameters) + +func (f parameterFunc) apply(p *parameters) { + f(p) +} + +// WithLogLevel sets the log level for the module. +func WithLogLevel(logLevel zerolog.Level) Parameter { + return parameterFunc(func(p *parameters) { + p.logLevel = logLevel + }) +} + +// WithMonitor sets the monitor for this module. +func WithMonitor(monitor metrics.SchedulerMonitor) Parameter { + return parameterFunc(func(p *parameters) { + p.monitor = monitor + }) +} + +// parseAndCheckParameters parses and checks parameters to ensure that mandatory parameters are present and correct. +func parseAndCheckParameters(params ...Parameter) (*parameters, error) { + parameters := parameters{ + logLevel: zerolog.GlobalLevel(), + } + for _, p := range params { + if params != nil { + p.apply(¶meters) + } + } + + if parameters.monitor == nil { + parameters.monitor = &nullmetrics.Service{} + } + + return ¶meters, nil +} diff --git a/services/scheduler/advanced/service.go b/services/scheduler/advanced/service.go new file mode 100644 index 0000000..f4adf53 --- /dev/null +++ b/services/scheduler/advanced/service.go @@ -0,0 +1,346 @@ +// Copyright © 2021 Attestant Limited. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package advanced + +import ( + "context" + "strings" + "time" + + "github.com/attestantio/vouch/services/metrics" + "github.com/attestantio/vouch/services/scheduler" + "github.com/pkg/errors" + "github.com/rs/zerolog" + zerologger "github.com/rs/zerolog/log" + "github.com/sasha-s/go-deadlock" + "go.uber.org/atomic" +) + +// module-wide log. +var log zerolog.Logger + +// job contains control points for a job. +type job struct { + // stateLock is required for active or finalised. + stateLock deadlock.Mutex + active atomic.Bool + finalised atomic.Bool + cancelCh chan struct{} + runCh chan struct{} +} + +// Service is a scheduler service. It uses additional per-job information to manage +// the state of each job, in an attempt to ensure additional robustness in the face +// of high concurrent load. +type Service struct { + monitor metrics.SchedulerMonitor + jobs map[string]*job + jobsMutex deadlock.RWMutex +} + +// New creates a new scheduling service. +func New(ctx context.Context, params ...Parameter) (*Service, error) { + parameters, err := parseAndCheckParameters(params...) + if err != nil { + return nil, errors.Wrap(err, "problem with parameters") + } + + // Set logging. + log = zerologger.With().Str("service", "scheduler").Str("impl", "advanced").Logger() + if parameters.logLevel != log.GetLevel() { + log = log.Level(parameters.logLevel) + } + + return &Service{ + jobs: make(map[string]*job), + monitor: parameters.monitor, + }, nil +} + +// ScheduleJob schedules a one-off job for a given time. +// Note that if the parent context is cancelled the job wil not run. +func (s *Service) ScheduleJob(ctx context.Context, name string, runtime time.Time, jobFunc scheduler.JobFunc, data interface{}) error { + if name == "" { + return scheduler.ErrNoJobName + } + if jobFunc == nil { + return scheduler.ErrNoJobFunc + } + + s.jobsMutex.Lock() + _, exists := s.jobs[name] + if exists { + s.jobsMutex.Unlock() + return scheduler.ErrJobAlreadyExists + } + + job := &job{ + cancelCh: make(chan struct{}), + runCh: make(chan struct{}), + } + s.jobs[name] = job + s.jobsMutex.Unlock() + s.monitor.JobScheduled() + + log.Trace().Str("job", name).Time("scheduled", runtime).Msg("Scheduled job") + go func() { + select { + case <-ctx.Done(): + log.Trace().Str("job", name).Time("scheduled", runtime).Msg("Parent context done; job not running") + s.jobsMutex.Lock() + delete(s.jobs, name) + s.jobsMutex.Unlock() + finaliseJob(job) + s.monitor.JobCancelled() + case <-job.cancelCh: + log.Trace().Str("job", name).Time("scheduled", runtime).Msg("Cancel triggered; job not running") + finaliseJob(job) + s.monitor.JobCancelled() + case <-job.runCh: + log.Trace().Str("job", name).Time("scheduled", runtime).Msg("Run triggered; job running") + s.monitor.JobStartedOnSignal() + jobFunc(ctx, data) + log.Trace().Str("job", name).Time("scheduled", runtime).Msg("Job complete") + finaliseJob(job) + job.active.Store(false) + case <-time.After(time.Until(runtime)): + log.Trace().Str("job", name).Time("scheduled", runtime).Msg("Timer triggered; job running") + job.stateLock.Lock() + job.active.Store(true) + job.stateLock.Unlock() + s.monitor.JobStartedOnTimer() + jobFunc(ctx, data) + log.Trace().Str("job", name).Time("scheduled", runtime).Msg("Job complete") + job.active.Store(false) + finaliseJob(job) + } + }() + + return nil +} + +// SchedulePeriodicJob schedules a job to run in a loop. +// The loop starts by calling runtimeFunc, which sets the time for the first run. +// Once the time as specified by runtimeFunc is met, jobFunc is called. +// Once jobFunc returns, go back to the beginning of the loop. +func (s *Service) SchedulePeriodicJob(ctx context.Context, name string, runtimeFunc scheduler.RuntimeFunc, runtimeData interface{}, jobFunc scheduler.JobFunc, jobData interface{}) error { + if name == "" { + return scheduler.ErrNoJobName + } + if runtimeFunc == nil { + return scheduler.ErrNoRuntimeFunc + } + if jobFunc == nil { + return scheduler.ErrNoJobFunc + } + + s.jobsMutex.Lock() + _, exists := s.jobs[name] + if exists { + s.jobsMutex.Unlock() + return scheduler.ErrJobAlreadyExists + } + + job := &job{ + cancelCh: make(chan struct{}), + runCh: make(chan struct{}), + } + s.jobs[name] = job + s.jobsMutex.Unlock() + s.monitor.JobScheduled() + + go func() { + for { + runtime, err := runtimeFunc(ctx, runtimeData) + if err == scheduler.ErrNoMoreInstances { + log.Trace().Str("job", name).Msg("No more instances; period job stopping") + s.jobsMutex.Lock() + delete(s.jobs, name) + s.jobsMutex.Unlock() + finaliseJob(job) + s.monitor.JobCancelled() + return + } + if err != nil { + log.Error().Str("job", name).Err(err).Msg("Failed to obtain runtime; periodic job stopping") + s.jobsMutex.Lock() + delete(s.jobs, name) + s.jobsMutex.Unlock() + finaliseJob(job) + s.monitor.JobCancelled() + return + } + log.Trace().Str("job", name).Time("scheduled", runtime).Msg("Scheduled job") + select { + case <-ctx.Done(): + log.Trace().Str("job", name).Time("scheduled", runtime).Msg("Parent context done; job not running") + s.jobsMutex.Lock() + delete(s.jobs, name) + s.jobsMutex.Unlock() + finaliseJob(job) + s.monitor.JobCancelled() + return + case <-job.cancelCh: + log.Trace().Str("job", name).Time("scheduled", runtime).Msg("Cancel triggered; job not running") + finaliseJob(job) + s.monitor.JobCancelled() + return + case <-job.runCh: + log.Trace().Str("job", name).Time("scheduled", runtime).Msg("Run triggered; job running") + s.monitor.JobStartedOnSignal() + jobFunc(ctx, jobData) + log.Trace().Str("job", name).Time("scheduled", runtime).Msg("Job complete") + job.active.Store(false) + case <-time.After(time.Until(runtime)): + job.active.Store(true) + log.Trace().Str("job", name).Time("scheduled", runtime).Msg("Timer triggered; job running") + s.monitor.JobStartedOnTimer() + jobFunc(ctx, jobData) + log.Trace().Str("job", name).Time("scheduled", runtime).Msg("Job complete") + job.active.Store(false) + } + } + }() + + return nil +} + +// RunJob runs a named job immediately. +// If the job does not exist it will return an appropriate error. +func (s *Service) RunJob(ctx context.Context, name string) error { + s.jobsMutex.Lock() + job, exists := s.jobs[name] + s.jobsMutex.Unlock() + + if !exists { + return scheduler.ErrNoSuchJob + } + + return s.runJob(ctx, job) +} + +// RunJobIfExists runs a job if it exists. +// This does not return an error if the job does not exist or is otherwise unable to run. +func (s *Service) RunJobIfExists(ctx context.Context, name string) { + s.jobsMutex.Lock() + job, exists := s.jobs[name] + s.jobsMutex.Unlock() + + if !exists { + return + } + //nolint + s.runJob(ctx, job) + +} + +// JobExists returns true if a job exists. +func (s *Service) JobExists(ctx context.Context, name string) bool { + s.jobsMutex.RLock() + _, exists := s.jobs[name] + s.jobsMutex.RUnlock() + return exists +} + +// ListJobs returns the names of all jobs. +func (s *Service) ListJobs(ctx context.Context) []string { + s.jobsMutex.RLock() + names := make([]string, 0, len(s.jobs)) + for name := range s.jobs { + names = append(names, name) + } + s.jobsMutex.RUnlock() + + return names +} + +// CancelJob removes a named job. +// If the job does not exist it will return an appropriate error. +func (s *Service) CancelJob(ctx context.Context, name string) error { + s.jobsMutex.Lock() + job, exists := s.jobs[name] + if !exists { + s.jobsMutex.Unlock() + return scheduler.ErrNoSuchJob + } + delete(s.jobs, name) + s.jobsMutex.Unlock() + + job.stateLock.Lock() + if job.finalised.Load() { + // Already marked to be cancelled. + job.stateLock.Unlock() + return nil + } + job.finalised.Store(true) + job.cancelCh <- struct{}{} + job.stateLock.Unlock() + + return nil +} + +// CancelJobIfExists cancels a job that may or may not exist. +// If this is a period job then all future instances are cancelled. +func (s *Service) CancelJobIfExists(ctx context.Context, name string) { + //nolint + s.CancelJob(ctx, name) +} + +// CancelJobs cancels all jobs with the given prefix. +// If the prefix matches a period job then all future instances are cancelled. +func (s *Service) CancelJobs(ctx context.Context, prefix string) { + names := make([]string, 0) + s.jobsMutex.Lock() + for name := range s.jobs { + if strings.HasPrefix(name, prefix) { + names = append(names, name) + } + } + s.jobsMutex.Unlock() + + for _, name := range names { + // It is possible that the job has been removed whist we were iterating, so use the non-erroring version of cancel. + s.CancelJobIfExists(ctx, name) + } +} + +// finaliseJob tidies up a job that is no longer in use. +func finaliseJob(job *job) { + job.stateLock.Lock() + job.finalised.Store(true) + + // Close the channels for the job to ensure that nothing is hanging on sending a message. + close(job.cancelCh) + close(job.runCh) + + job.stateLock.Unlock() +} + +// runJob runs the given job. +func (s *Service) runJob(ctx context.Context, job *job) error { + job.stateLock.Lock() + if job.active.Load() { + job.stateLock.Unlock() + return scheduler.ErrJobRunning + } + if job.finalised.Load() { + job.stateLock.Unlock() + return scheduler.ErrJobFinalised + } + job.active.Store(true) + job.runCh <- struct{}{} + job.stateLock.Unlock() + + return nil +} diff --git a/services/scheduler/advanced/service_test.go b/services/scheduler/advanced/service_test.go new file mode 100644 index 0000000..8042afb --- /dev/null +++ b/services/scheduler/advanced/service_test.go @@ -0,0 +1,584 @@ +// Copyright © 2021 Attestant Limited. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package advanced_test + +import ( + "context" + "errors" + "fmt" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" + + nullmetrics "github.com/attestantio/vouch/services/metrics/null" + "github.com/attestantio/vouch/services/scheduler" + "github.com/attestantio/vouch/services/scheduler/advanced" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNew(t *testing.T) { + ctx := context.Background() + + tests := []struct { + name string + options []advanced.Parameter + err string + }{ + { + name: "Good", + }, + { + name: "GoodLogLevel", + options: []advanced.Parameter{ + advanced.WithLogLevel(zerolog.Disabled), + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s, err := advanced.New(ctx, test.options...) + if test.err != "" { + require.EqualError(t, err, test.err) + } else { + require.NoError(t, err) + assert.NotNil(t, s) + } + }) + } +} + +func TestJob(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + run++ + } + + require.NoError(t, s.ScheduleJob(ctx, "Test job", time.Now().Add(20*time.Millisecond), runFunc, nil)) + require.Equal(t, 0, run) + time.Sleep(time.Duration(50) * time.Millisecond) + assert.Equal(t, 1, run) +} + +func TestJobExists(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + run++ + } + + require.NoError(t, s.ScheduleJob(ctx, "Test job", time.Now().Add(10*time.Second), runFunc, nil)) + + require.True(t, s.JobExists(ctx, "Test job")) + require.False(t, s.JobExists(ctx, "Unknown job")) + + require.NoError(t, s.CancelJob(ctx, "Test job")) +} + +func TestCancelJob(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + run++ + } + + require.NoError(t, s.ScheduleJob(ctx, "Test job", time.Now().Add(100*time.Millisecond), runFunc, nil)) + require.Equal(t, 0, run) + require.NoError(t, s.CancelJob(ctx, "Test job")) + time.Sleep(time.Duration(110) * time.Millisecond) + assert.Equal(t, 0, run) +} + +func TestCancelUnknownJob(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + assert.EqualError(t, s.CancelJob(ctx, "Unknown job"), scheduler.ErrNoSuchJob.Error()) +} + +func TestCancelJobs(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + run++ + } + + require.NoError(t, s.ScheduleJob(ctx, "Test job 1", time.Now().Add(100*time.Millisecond), runFunc, nil)) + require.NoError(t, s.ScheduleJob(ctx, "Test job 2", time.Now().Add(100*time.Millisecond), runFunc, nil)) + require.NoError(t, s.ScheduleJob(ctx, "No cancel job", time.Now().Add(100*time.Millisecond), runFunc, nil)) + require.Equal(t, 0, run) + s.CancelJobs(ctx, "Test job") + time.Sleep(time.Duration(110) * time.Millisecond) + assert.Equal(t, 1, run) +} + +func TestCancelJobIfExists(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + run++ + } + + require.NoError(t, s.ScheduleJob(ctx, "Test job", time.Now().Add(100*time.Millisecond), runFunc, nil)) + require.Equal(t, 0, run) + s.CancelJobIfExists(ctx, "Test job") + time.Sleep(time.Duration(110) * time.Millisecond) + assert.Equal(t, 0, run) + + s.CancelJobIfExists(ctx, "Unknown job") +} + +func TestCancelParentContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + run++ + } + + require.NoError(t, s.ScheduleJob(ctx, "Test job", time.Now().Add(100*time.Millisecond), runFunc, nil)) + require.Equal(t, 0, run) + cancel() + time.Sleep(time.Duration(110) * time.Millisecond) + assert.Equal(t, 0, run) +} + +func TestRunJob(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + run++ + } + + require.NoError(t, s.ScheduleJob(ctx, "Test job", time.Now().Add(time.Second), runFunc, nil)) + require.Equal(t, 0, run) + require.NoError(t, s.RunJob(ctx, "Test job")) + time.Sleep(time.Duration(100) * time.Millisecond) + assert.Equal(t, 1, run) +} + +func TestRunJobIfExists(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + run++ + } + + require.NoError(t, s.ScheduleJob(ctx, "Test job", time.Now().Add(time.Second), runFunc, nil)) + require.Equal(t, 0, run) + s.RunJobIfExists(ctx, "Unknown job") + require.Equal(t, 0, run) + s.RunJobIfExists(ctx, "Test job") + time.Sleep(time.Duration(100) * time.Millisecond) + assert.Equal(t, 1, run) +} + +func TestRunUnknownJob(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + assert.EqualError(t, s.RunJob(ctx, "Unknown job"), scheduler.ErrNoSuchJob.Error()) +} + +func TestPeriodicJob(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + run++ + } + + runtimeFunc := func(ctx context.Context, data interface{}) (time.Time, error) { + return time.Now().Add(100 * time.Millisecond), nil + } + + require.NoError(t, s.SchedulePeriodicJob(ctx, "Test periodic job", runtimeFunc, nil, runFunc, nil)) + require.Equal(t, 0, run) + time.Sleep(time.Duration(110) * time.Millisecond) + assert.Equal(t, 1, run) + time.Sleep(time.Duration(110) * time.Millisecond) + assert.Equal(t, 2, run) + require.NoError(t, s.RunJob(ctx, "Test periodic job")) + time.Sleep(time.Duration(10) * time.Millisecond) + assert.Equal(t, 3, run) + + require.NoError(t, s.CancelJob(ctx, "Test periodic job")) +} + +func TestCancelPeriodicJob(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + run++ + } + + runtimeFunc := func(ctx context.Context, data interface{}) (time.Time, error) { + return time.Now().Add(100 * time.Millisecond), nil + } + + require.NoError(t, s.SchedulePeriodicJob(ctx, "Test periodic job", runtimeFunc, nil, runFunc, nil)) + require.Equal(t, 0, run) + require.NoError(t, s.CancelJob(ctx, "Test periodic job")) + time.Sleep(time.Duration(110) * time.Millisecond) + assert.Equal(t, 0, run) +} + +func TestCancelPeriodicParentContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + run++ + } + + runtimeFunc := func(ctx context.Context, data interface{}) (time.Time, error) { + return time.Now().Add(100 * time.Millisecond), nil + } + + require.NoError(t, s.SchedulePeriodicJob(ctx, "Test job", runtimeFunc, nil, runFunc, nil)) + require.Equal(t, 0, run) + cancel() + time.Sleep(time.Duration(110) * time.Millisecond) + assert.Equal(t, 0, run) +} + +func TestLimitedPeriodicJob(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + run++ + } + + runtimeFunc := func(ctx context.Context, data interface{}) (time.Time, error) { + if run == 3 { + return time.Now(), scheduler.ErrNoMoreInstances + } + return time.Now().Add(10 * time.Millisecond), nil + } + + require.NoError(t, s.SchedulePeriodicJob(ctx, "Test job", runtimeFunc, nil, runFunc, nil)) + require.Equal(t, 0, run) + time.Sleep(time.Duration(50) * time.Millisecond) + assert.Equal(t, 3, run) +} + +func TestBadPeriodicJob(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + run++ + } + + runtimeFunc := func(ctx context.Context, data interface{}) (time.Time, error) { + if run == 3 { + return time.Now(), errors.New("Bad") + } + return time.Now().Add(10 * time.Millisecond), nil + } + + require.NoError(t, s.SchedulePeriodicJob(ctx, "Test job", runtimeFunc, nil, runFunc, nil)) + require.Equal(t, 0, run) + time.Sleep(time.Duration(50) * time.Millisecond) + assert.Equal(t, 3, run) +} + +func TestDuplicateJobName(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + run++ + } + + runtimeFunc := func(ctx context.Context, data interface{}) (time.Time, error) { + return time.Now().Add(100 * time.Millisecond), nil + } + + require.NoError(t, s.ScheduleJob(ctx, "Test duplicate job", time.Now().Add(time.Second), runFunc, nil)) + require.EqualError(t, s.ScheduleJob(ctx, "Test duplicate job", time.Now().Add(time.Second), runFunc, nil), scheduler.ErrJobAlreadyExists.Error()) + + require.NoError(t, s.SchedulePeriodicJob(ctx, "Test duplicate periodic job", runtimeFunc, nil, runFunc, nil)) + require.EqualError(t, s.SchedulePeriodicJob(ctx, "Test duplicate periodic job", runtimeFunc, nil, runFunc, nil), scheduler.ErrJobAlreadyExists.Error()) + require.NoError(t, s.CancelJob(ctx, "Test duplicate periodic job")) +} + +func TestBadJobs(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + run++ + } + + runtimeFunc := func(ctx context.Context, data interface{}) (time.Time, error) { + return time.Now().Add(100 * time.Millisecond), nil + } + + require.EqualError(t, s.ScheduleJob(ctx, "", time.Now(), runFunc, nil), scheduler.ErrNoJobName.Error()) + require.EqualError(t, s.ScheduleJob(ctx, "Test bad job", time.Now(), nil, nil), scheduler.ErrNoJobFunc.Error()) + + require.EqualError(t, s.SchedulePeriodicJob(ctx, "", runtimeFunc, nil, runFunc, nil), scheduler.ErrNoJobName.Error()) + require.EqualError(t, s.SchedulePeriodicJob(ctx, "Test bad period job", nil, nil, runFunc, nil), scheduler.ErrNoRuntimeFunc.Error()) + require.EqualError(t, s.SchedulePeriodicJob(ctx, "Test bad period job", runtimeFunc, nil, nil, nil), scheduler.ErrNoJobFunc.Error()) +} + +func TestManyJobs(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := uint32(0) + runFunc := func(ctx context.Context, data interface{}) { + atomic.AddUint32(&run, 1) + } + + runTime := time.Now().Add(200 * time.Millisecond) + + jobs := 2048 + for i := 0; i < jobs; i++ { + require.NoError(t, s.ScheduleJob(ctx, fmt.Sprintf("Job instance %d", i), runTime, runFunc, nil)) + } + + // Kick off some jobs early. + for i := 0; i < jobs/32; i++ { + // #nosec G404 + randomJob := rand.Intn(jobs) + // Don't check for error as we could try to kick off the same job multiple times, which would cause an error. + //nolint + s.RunJob(ctx, fmt.Sprintf("Job instance %d", randomJob)) + } + + // Sleep to let the others run normally. + time.Sleep(400 * time.Millisecond) + + require.Equal(t, uint32(jobs), run) +} + +func TestListJobs(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + run++ + } + + jobs := s.ListJobs(ctx) + require.Len(t, jobs, 0) + + require.NoError(t, s.ScheduleJob(ctx, "Test job 1", time.Now().Add(time.Second), runFunc, nil)) + + jobs = s.ListJobs(ctx) + require.Len(t, jobs, 1) + require.Contains(t, jobs, "Test job 1") + + require.NoError(t, s.ScheduleJob(ctx, "Test job 2", time.Now().Add(time.Second), runFunc, nil)) + + jobs = s.ListJobs(ctx) + require.Len(t, jobs, 2) + require.Contains(t, jobs, "Test job 1") + require.Contains(t, jobs, "Test job 2") + + require.NoError(t, s.CancelJob(ctx, "Test job 1")) + + jobs = s.ListJobs(ctx) + require.Len(t, jobs, 1) + require.Contains(t, jobs, "Test job 2") +} + +func TestLongRunningPeriodicJob(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + // Job takes 100 ms. + run := uint32(0) + jobFunc := func(ctx context.Context, data interface{}) { + time.Sleep(100 * time.Millisecond) + atomic.AddUint32(&run, 1) + } + + // Job runs every 50 ms. + runtimeFunc := func(ctx context.Context, data interface{}) (time.Time, error) { + return time.Now().Add(50 * time.Millisecond), nil + } + + // Schedule the job. + require.NoError(t, s.SchedulePeriodicJob(ctx, "Test long running periodic job", runtimeFunc, nil, jobFunc, nil)) + + // Sleep for 400 ms. Expect two runs (50+100+50+100+50). + time.Sleep(400 * time.Millisecond) + assert.Equal(t, uint32(2), run) + + require.NoError(t, s.CancelJob(ctx, "Test long running periodic job")) +} + +func TestOverlappingJobs(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + // Job takes 200ms. + run := uint32(0) + jobFunc := func(ctx context.Context, data interface{}) { + time.Sleep(200 * time.Millisecond) + atomic.AddUint32(&run, 1) + } + + now := time.Now() + require.NoError(t, s.ScheduleJob(ctx, "Test job 1", now.Add(100*time.Millisecond), jobFunc, nil)) + require.NoError(t, s.ScheduleJob(ctx, "Test job 2", now.Add(200*time.Millisecond), jobFunc, nil)) + + // Sleep to let jobs complete. + time.Sleep(500 * time.Millisecond) + + // Ensure both jobs have completed. + require.Equal(t, uint32(2), run) +} + +func TestMulti(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + // Create a job for the future. + run := uint32(0) + jobFunc := func(ctx context.Context, data interface{}) { + atomic.AddUint32(&run, 1) + } + require.NoError(t, s.ScheduleJob(ctx, "Test job", time.Now().Add(10*time.Second), jobFunc, nil)) + + // Create a number of runners that will try to start the job simultaneously. + var runWG sync.WaitGroup + var setupWG sync.WaitGroup + starter := make(chan interface{}) + for i := 0; i < 32; i++ { + setupWG.Add(1) + runWG.Add(1) + go func() { + setupWG.Done() + <-starter + //nolint + s.RunJob(ctx, "Test job") + runWG.Done() + }() + } + // Wait for setup to complete. + setupWG.Wait() + // Start the jobs by closing the channel. + close(starter) + + // Wait for run to complete + runWG.Wait() + + // Ensure the job has only completed once. + require.Equal(t, uint32(1), run) +} + +func TestCancelWhilstRunning(t *testing.T) { + ctx := context.Background() + s, err := advanced.New(ctx, advanced.WithLogLevel(zerolog.Disabled), advanced.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + time.Sleep(50 * time.Millisecond) + run++ + } + + runtimeFunc := func(ctx context.Context, data interface{}) (time.Time, error) { + return time.Now().Add(50 * time.Millisecond), nil + } + + require.NoError(t, s.SchedulePeriodicJob(ctx, "Test periodic job", runtimeFunc, nil, runFunc, nil)) + require.Equal(t, 0, run) + time.Sleep(time.Duration(60) * time.Millisecond) + require.Equal(t, 0, run) + require.NoError(t, s.CancelJob(ctx, "Test periodic job")) + time.Sleep(time.Duration(40) * time.Millisecond) + assert.Equal(t, 1, run) + time.Sleep(time.Duration(120) * time.Millisecond) + assert.Equal(t, 1, run) +} diff --git a/services/scheduler/service.go b/services/scheduler/service.go index 9dc5e3d..e7a9049 100644 --- a/services/scheduler/service.go +++ b/services/scheduler/service.go @@ -34,6 +34,12 @@ var ErrNoSuchJob = errors.New("no such job") // ErrJobAlreadyExists is returned when the scheduler is asked to create a job that already exists. var ErrJobAlreadyExists = errors.New("job already exists") +// ErrJobRunning is returned when the scheduler is asked to interact with a job that is running. +var ErrJobRunning = errors.New("job running") + +// ErrJobFinalised is returned when the scheduler is asked to interact with a job that is finalised. +var ErrJobFinalised = errors.New("job finalised") + // ErrNoJobName is returned when an attempt is made to to control a job without a name. var ErrNoJobName = errors.New("no job name")