diff --git a/CHANGELOG.md b/CHANGELOG.md index 12181ea..60645a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ Development + - add internal ability to list names of all active scheduler jobs - ensure duplicated attestations are only counted as 1 in block proposal score - ensure genesis attesters are scheduled appropriately - do not continue if attempt to acquire a semaphore fails diff --git a/services/scheduler/basic/parameters.go b/services/scheduler/basic/parameters.go index ae2b7b1..0daad1b 100644 --- a/services/scheduler/basic/parameters.go +++ b/services/scheduler/basic/parameters.go @@ -15,7 +15,7 @@ package basic import ( "github.com/attestantio/vouch/services/metrics" - "github.com/pkg/errors" + nullmetrics "github.com/attestantio/vouch/services/metrics/null" "github.com/rs/zerolog" ) @@ -61,7 +61,7 @@ func parseAndCheckParameters(params ...Parameter) (*parameters, error) { } if parameters.monitor == nil { - return nil, errors.New("no monitor specified") + parameters.monitor = &nullmetrics.Service{} } return ¶meters, nil diff --git a/services/scheduler/basic/service.go b/services/scheduler/basic/service.go index 97fb69c..4d6d4e1 100644 --- a/services/scheduler/basic/service.go +++ b/services/scheduler/basic/service.go @@ -102,7 +102,7 @@ func (s *Service) ScheduleJob(ctx context.Context, name string, runtime time.Tim case <-cancelCh: log.Trace().Str("job", name).Str("scheduled", fmt.Sprintf("%v", runtime)).Msg("Cancel triggered; job not running") s.mutex.Lock() - s.removeJob(ctx, name) + // The job will have already been removed. s.mutex.Unlock() s.monitor.JobCancelled() case <-runCh: @@ -195,7 +195,7 @@ func (s *Service) SchedulePeriodicJob(ctx context.Context, name string, runtimeF case <-cancelCh: log.Trace().Str("job", name).Str("scheduled", fmt.Sprintf("%v", runtime)).Msg("Cancel triggered; job not running") s.mutex.Lock() - s.removeJob(ctx, name) + // The job will have already been removed. s.mutex.Unlock() s.monitor.JobCancelled() return @@ -245,6 +245,18 @@ func (s *Service) JobExists(ctx context.Context, name string) bool { return exists } +// ListJobs returns the names of all jobs. +func (s *Service) ListJobs(ctx context.Context) []string { + s.mutex.RLock() + defer s.mutex.RUnlock() + names := make([]string, 0, len(s.jobs)) + for name := range s.jobs { + names = append(names, name) + } + + return names +} + // RunJobIfExists runs a job if it exists. // This does not return an error if the job does not exist. // If the job does not exist it will return an appropriate error. @@ -270,6 +282,8 @@ func (s *Service) CancelJob(ctx context.Context, name string) error { } job.cancelCh <- struct{}{} + s.removeJob(ctx, name) + return nil } @@ -281,6 +295,7 @@ func (s *Service) CancelJobs(ctx context.Context, prefix string) error { for name, job := range s.jobs { if strings.HasPrefix(name, prefix) { job.cancelCh <- struct{}{} + s.removeJob(ctx, name) } } diff --git a/services/scheduler/basic/service_test.go b/services/scheduler/basic/service_test.go index 15a7886..921296d 100644 --- a/services/scheduler/basic/service_test.go +++ b/services/scheduler/basic/service_test.go @@ -33,9 +33,33 @@ import ( func TestNew(t *testing.T) { ctx := context.Background() - s, err := basic.New(ctx, basic.WithLogLevel(zerolog.Disabled), basic.WithMonitor(&nullmetrics.Service{})) - require.NoError(t, err) - assert.NotNil(t, s) + tests := []struct { + name string + options []basic.Parameter + err string + }{ + { + name: "Good", + }, + { + name: "GoodLogLevel", + options: []basic.Parameter{ + basic.WithLogLevel(zerolog.Disabled), + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s, err := basic.New(ctx, test.options...) + if test.err != "" { + require.EqualError(t, err, test.err) + } else { + require.NoError(t, err) + assert.NotNil(t, s) + } + }) + } } func TestJob(t *testing.T) { @@ -381,3 +405,37 @@ func TestManyJobs(t *testing.T) { require.Equal(t, uint32(jobs), run) } + +func TestListJobs(t *testing.T) { + ctx := context.Background() + s, err := basic.New(ctx, basic.WithLogLevel(zerolog.Disabled), basic.WithMonitor(&nullmetrics.Service{})) + require.NoError(t, err) + require.NotNil(t, s) + + run := 0 + runFunc := func(ctx context.Context, data interface{}) { + run++ + } + + jobs := s.ListJobs(ctx) + require.Len(t, jobs, 0) + + require.NoError(t, s.ScheduleJob(ctx, "Test job 1", time.Now().Add(time.Second), runFunc, nil)) + + jobs = s.ListJobs(ctx) + require.Len(t, jobs, 1) + require.Contains(t, jobs, "Test job 1") + + require.NoError(t, s.ScheduleJob(ctx, "Test job 2", time.Now().Add(time.Second), runFunc, nil)) + + jobs = s.ListJobs(ctx) + require.Len(t, jobs, 2) + require.Contains(t, jobs, "Test job 1") + require.Contains(t, jobs, "Test job 2") + + require.NoError(t, s.CancelJob(ctx, "Test job 1")) + + jobs = s.ListJobs(ctx) + require.Len(t, jobs, 1) + require.Contains(t, jobs, "Test job 2") +} diff --git a/services/scheduler/service.go b/services/scheduler/service.go index 5e2be58..00aa882 100644 --- a/services/scheduler/service.go +++ b/services/scheduler/service.go @@ -73,4 +73,7 @@ type Service interface { // This does not return an error if the job does not exist. // If this is a period job then the next instance will be scheduled. RunJobIfExists(ctx context.Context, name string) error + + // ListJobs returns the names of all jobs. + ListJobs(ctx context.Context) []string }