Add ListJobs for scheduler.

This commit is contained in:
Jim McDonald 2020-10-04 12:58:46 +01:00
parent eb5c919344
commit a3086234a0
No known key found for this signature in database
GPG Key ID: 89CEB61B2AD2A5E7
5 changed files with 84 additions and 7 deletions

View File

@ -1,4 +1,5 @@
Development
- add internal ability to list names of all active scheduler jobs
- ensure duplicated attestations are only counted as 1 in block proposal score
- ensure genesis attesters are scheduled appropriately
- do not continue if attempt to acquire a semaphore fails

View File

@ -15,7 +15,7 @@ package basic
import (
"github.com/attestantio/vouch/services/metrics"
"github.com/pkg/errors"
nullmetrics "github.com/attestantio/vouch/services/metrics/null"
"github.com/rs/zerolog"
)
@ -61,7 +61,7 @@ func parseAndCheckParameters(params ...Parameter) (*parameters, error) {
}
if parameters.monitor == nil {
return nil, errors.New("no monitor specified")
parameters.monitor = &nullmetrics.Service{}
}
return &parameters, nil

View File

@ -102,7 +102,7 @@ func (s *Service) ScheduleJob(ctx context.Context, name string, runtime time.Tim
case <-cancelCh:
log.Trace().Str("job", name).Str("scheduled", fmt.Sprintf("%v", runtime)).Msg("Cancel triggered; job not running")
s.mutex.Lock()
s.removeJob(ctx, name)
// The job will have already been removed.
s.mutex.Unlock()
s.monitor.JobCancelled()
case <-runCh:
@ -195,7 +195,7 @@ func (s *Service) SchedulePeriodicJob(ctx context.Context, name string, runtimeF
case <-cancelCh:
log.Trace().Str("job", name).Str("scheduled", fmt.Sprintf("%v", runtime)).Msg("Cancel triggered; job not running")
s.mutex.Lock()
s.removeJob(ctx, name)
// The job will have already been removed.
s.mutex.Unlock()
s.monitor.JobCancelled()
return
@ -245,6 +245,18 @@ func (s *Service) JobExists(ctx context.Context, name string) bool {
return exists
}
// ListJobs returns the names of all jobs.
func (s *Service) ListJobs(ctx context.Context) []string {
s.mutex.RLock()
defer s.mutex.RUnlock()
names := make([]string, 0, len(s.jobs))
for name := range s.jobs {
names = append(names, name)
}
return names
}
// RunJobIfExists runs a job if it exists.
// This does not return an error if the job does not exist.
// If the job does not exist it will return an appropriate error.
@ -270,6 +282,8 @@ func (s *Service) CancelJob(ctx context.Context, name string) error {
}
job.cancelCh <- struct{}{}
s.removeJob(ctx, name)
return nil
}
@ -281,6 +295,7 @@ func (s *Service) CancelJobs(ctx context.Context, prefix string) error {
for name, job := range s.jobs {
if strings.HasPrefix(name, prefix) {
job.cancelCh <- struct{}{}
s.removeJob(ctx, name)
}
}

View File

@ -33,9 +33,33 @@ import (
func TestNew(t *testing.T) {
ctx := context.Background()
s, err := basic.New(ctx, basic.WithLogLevel(zerolog.Disabled), basic.WithMonitor(&nullmetrics.Service{}))
require.NoError(t, err)
assert.NotNil(t, s)
tests := []struct {
name string
options []basic.Parameter
err string
}{
{
name: "Good",
},
{
name: "GoodLogLevel",
options: []basic.Parameter{
basic.WithLogLevel(zerolog.Disabled),
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s, err := basic.New(ctx, test.options...)
if test.err != "" {
require.EqualError(t, err, test.err)
} else {
require.NoError(t, err)
assert.NotNil(t, s)
}
})
}
}
func TestJob(t *testing.T) {
@ -381,3 +405,37 @@ func TestManyJobs(t *testing.T) {
require.Equal(t, uint32(jobs), run)
}
func TestListJobs(t *testing.T) {
ctx := context.Background()
s, err := basic.New(ctx, basic.WithLogLevel(zerolog.Disabled), basic.WithMonitor(&nullmetrics.Service{}))
require.NoError(t, err)
require.NotNil(t, s)
run := 0
runFunc := func(ctx context.Context, data interface{}) {
run++
}
jobs := s.ListJobs(ctx)
require.Len(t, jobs, 0)
require.NoError(t, s.ScheduleJob(ctx, "Test job 1", time.Now().Add(time.Second), runFunc, nil))
jobs = s.ListJobs(ctx)
require.Len(t, jobs, 1)
require.Contains(t, jobs, "Test job 1")
require.NoError(t, s.ScheduleJob(ctx, "Test job 2", time.Now().Add(time.Second), runFunc, nil))
jobs = s.ListJobs(ctx)
require.Len(t, jobs, 2)
require.Contains(t, jobs, "Test job 1")
require.Contains(t, jobs, "Test job 2")
require.NoError(t, s.CancelJob(ctx, "Test job 1"))
jobs = s.ListJobs(ctx)
require.Len(t, jobs, 1)
require.Contains(t, jobs, "Test job 2")
}

View File

@ -73,4 +73,7 @@ type Service interface {
// This does not return an error if the job does not exist.
// If this is a period job then the next instance will be scheduled.
RunJobIfExists(ctx context.Context, name string) error
// ListJobs returns the names of all jobs.
ListJobs(ctx context.Context) []string
}