diff --git a/cmd/barak/barak.go b/cmd/barak/barak.go new file mode 100644 index 00000000..8627cf75 --- /dev/null +++ b/cmd/barak/barak.go @@ -0,0 +1,247 @@ +package main + +import ( + "errors" + "fmt" + "io/ioutil" + "net" + "net/http" + "os" + "sync" + "time" + + "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/cmd/barak/types" + . "github.com/tendermint/tendermint/common" + pcm "github.com/tendermint/tendermint/process" + "github.com/tendermint/tendermint/rpc/server" +) + +type BarakOptions struct { + Validators []Validator + ListenAddress string + StartNonce uint64 + Registries []string +} + +// Read options from a file, or stdin if optionsFile is "" +func ReadBarakOptions(optFile string) *BarakOptions { + var optBytes []byte + var err error + if optFile != "" { + optBytes, err = ioutil.ReadFile(optFile) + } else { + optBytes, err = ioutil.ReadAll(os.Stdin) + } + if err != nil { + panic(Fmt("Error reading input: %v", err)) + } + opt := binary.ReadJSON(&BarakOptions{}, optBytes, &err).(*BarakOptions) + if err != nil { + panic(Fmt("Error parsing input: %v", err)) + } + return opt +} + +func ensureRootDir() (rootDir string) { + rootDir = os.Getenv("BRKROOT") + if rootDir == "" { + rootDir = os.Getenv("HOME") + "/.barak" + } + err := EnsureDir(rootDir) + if err != nil { + panic(Fmt("Error creating barak rootDir: %v", err)) + } + return +} + +func NewBarakFromOptions(opt *BarakOptions) *Barak { + rootDir := ensureRootDir() + barak := NewBarak(rootDir, opt.StartNonce, opt.Validators) + for _, registry := range opt.Registries { + barak.AddRegistry(registry) + } + barak.OpenListener(opt.ListenAddress) + + // Debug. + fmt.Printf("Options: %v\n", opt) + fmt.Printf("Barak: %v\n", barak) + return barak +} + +//-------------------------------------------------------------------------------- + +type Barak struct { + mtx sync.Mutex + pid int + nonce uint64 + processes map[string]*pcm.Process + validators []Validator + listeners []net.Listener + rootDir string + registries []string +} + +func NewBarak(rootDir string, nonce uint64, validators []Validator) *Barak { + return &Barak{ + pid: os.Getpid(), + nonce: nonce, + processes: make(map[string]*pcm.Process), + validators: validators, + listeners: nil, + rootDir: rootDir, + registries: nil, + } +} + +func (brk *Barak) RootDir() string { + brk.mtx.Lock() + defer brk.mtx.Unlock() + return brk.rootDir +} + +func (brk *Barak) ListProcesses() []*pcm.Process { + brk.mtx.Lock() + defer brk.mtx.Unlock() + processes := []*pcm.Process{} + for _, process := range processes { + processes = append(processes, process) + } + return processes +} + +func (brk *Barak) GetProcess(label string) *pcm.Process { + brk.mtx.Lock() + defer brk.mtx.Unlock() + return brk.processes[label] +} + +func (brk *Barak) AddProcess(label string, process *pcm.Process) error { + brk.mtx.Lock() + defer brk.mtx.Unlock() + existing := brk.processes[label] + if existing != nil && existing.EndTime.IsZero() { + return fmt.Errorf("Process already exists: %v", label) + } + brk.processes[label] = process + return nil +} + +func (brk *Barak) StopProcess(label string, kill bool) error { + barak.mtx.Lock() + proc := barak.processes[label] + barak.mtx.Unlock() + + if proc == nil { + return fmt.Errorf("Process does not exist: %v", label) + } + + err := pcm.Stop(proc, kill) + return err +} + +func (brk *Barak) ListValidators() []Validator { + brk.mtx.Lock() + defer brk.mtx.Unlock() + return brk.validators +} + +func (brk *Barak) ListListeners() []net.Listener { + brk.mtx.Lock() + defer brk.mtx.Unlock() + return brk.listeners +} + +func (brk *Barak) OpenListener(addr string) net.Listener { + brk.mtx.Lock() + defer brk.mtx.Unlock() + // Start rpc server. + mux := http.NewServeMux() + mux.HandleFunc("/download", ServeFileHandler) + mux.HandleFunc("/register", RegisterHandler) + // TODO: mux.HandleFunc("/upload", UploadFile) + rpcserver.RegisterRPCFuncs(mux, Routes) + listener := rpcserver.StartHTTPServer(addr, mux) + brk.listeners = append(brk.listeners, listener) + return listener +} + +func (brk *Barak) CloseListener(addr string) { + brk.mtx.Lock() + defer brk.mtx.Unlock() + filtered := []net.Listener{} + for _, listener := range brk.listeners { + if listener.Addr().String() == addr { + continue + } + filtered = append(filtered, listener) + } + brk.listeners = filtered +} + +func (brk *Barak) GetRegistries() []string { + brk.mtx.Lock() + defer brk.mtx.Unlock() + return brk.registries +} + +func (brk *Barak) AddRegistry(registry string) { + brk.mtx.Lock() + defer brk.mtx.Unlock() + brk.registries = append(brk.registries, registry) +} + +func (brk *Barak) RemoveRegistry(registry string) { + brk.mtx.Lock() + defer brk.mtx.Unlock() + filtered := []string{} + for _, reg := range brk.registries { + if registry == reg { + continue + } + filtered = append(filtered, reg) + } + brk.registries = filtered +} + +func (brk *Barak) StartRegisterRoutine() { + // Register this barak with central listener + go func() { + // Workaround around issues when registries register on themselves upon startup. + time.Sleep(3 * time.Second) + for { + // Every hour, register with the registries. + for _, registry := range brk.registries { + resp, err := http.Get(registry + "/register") + if err != nil { + fmt.Printf("Error registering to registry %v:\n %v\n", registry, err) + } else if resp.StatusCode != 200 { + body, _ := ioutil.ReadAll(resp.Body) + fmt.Printf("Error registering to registry %v:\n %v\n", registry, string(body)) + } else { + body, _ := ioutil.ReadAll(resp.Body) + fmt.Printf("Successfully registered with registry %v\n %v\n", registry, string(body)) + } + } + time.Sleep(1 * time.Hour) + } + }() +} + +// Write pid to file. +func (brk *Barak) WritePidFile() { + err := WriteFileAtomic(brk.rootDir+"/pidfile", []byte(Fmt("%v", brk.pid))) + if err != nil { + panic(Fmt("Error writing pidfile: %v", err)) + } +} + +func (brk *Barak) CheckIncrNonce(newNonce uint64) error { + brk.mtx.Lock() + defer brk.mtx.Unlock() + if brk.nonce+1 != newNonce { + return errors.New("Replay error") + } + brk.nonce += 1 + return nil +} diff --git a/cmd/barak/main.go b/cmd/barak/main.go index 254888ea..1c6fe55a 100644 --- a/cmd/barak/main.go +++ b/cmd/barak/main.go @@ -9,11 +9,9 @@ import ( "flag" "fmt" "io" - "io/ioutil" "net/http" "os" "reflect" - "sync" "time" "github.com/tendermint/tendermint/binary" @@ -24,36 +22,24 @@ import ( "github.com/tendermint/tendermint/rpc/server" ) -var Routes = map[string]*rpcserver.RPCFunc{ - "status": rpcserver.NewRPCFunc(Status, []string{}), - "run": rpcserver.NewRPCFunc(Run, []string{"auth_command"}), - // NOTE: also, two special non-JSONRPC routes called "download" and "upload" -} +var Routes map[string]*rpcserver.RPCFunc -type Options struct { - Validators []Validator - ListenAddress string - StartNonce uint64 - Registries []string +func init() { + Routes = map[string]*rpcserver.RPCFunc{ + "status": rpcserver.NewRPCFunc(Status, []string{}), + "run": rpcserver.NewRPCFunc(Run, []string{"auth_command"}), + // NOTE: also, two special non-JSONRPC routes called "download" and "upload" + } } // Global instance -var barak = struct { - mtx sync.Mutex - pid int - nonce uint64 - processes map[string]*pcm.Process - validators []Validator - rootDir string - registries []string -}{ - mtx: sync.Mutex{}, - pid: os.Getpid(), - nonce: 0, - processes: make(map[string]*pcm.Process), - validators: nil, - rootDir: "", - registries: nil, +var barak *Barak + +// Parse command-line options +func parseFlags() (optionsFile string) { + flag.StringVar(&optionsFile, "options-file", "", "Read options from file instead of stdin") + flag.Parse() + return } func main() { @@ -62,71 +48,14 @@ func main() { // Apply bare tendermint/* configuration. cfg.ApplyConfig(cfg.MapConfig(map[string]interface{}{"log_level": "info"})) - // read flags to change options file. - var optionsBytes []byte - var optionsFile string - var err error - flag.StringVar(&optionsFile, "options-file", "", "Read options from file instead of stdin") - flag.Parse() - if optionsFile != "" { - optionsBytes, err = ioutil.ReadFile(optionsFile) - } else { - optionsBytes, err = ioutil.ReadAll(os.Stdin) - } - if err != nil { - panic(Fmt("Error reading input: %v", err)) - } - options := binary.ReadJSON(&Options{}, optionsBytes, &err).(*Options) - if err != nil { - panic(Fmt("Error parsing input: %v", err)) - } - barak.nonce = options.StartNonce - barak.validators = options.Validators - barak.rootDir = os.Getenv("BRKROOT") - if barak.rootDir == "" { - barak.rootDir = os.Getenv("HOME") + "/.barak" - } - err = EnsureDir(barak.rootDir) - if err != nil { - panic(Fmt("Error creating barak rootDir: %v", err)) - } - barak.registries = options.Registries - - // Debug. - fmt.Printf("Options: %v\n", options) - fmt.Printf("Barak: %v\n", barak) - - // Start rpc server. - mux := http.NewServeMux() - mux.HandleFunc("/download", ServeFile) - mux.HandleFunc("/register", Register) - // TODO: mux.HandleFunc("/upload", UploadFile) - rpcserver.RegisterRPCFuncs(mux, Routes) - rpcserver.StartHTTPServer(options.ListenAddress, mux) - - // Register this barak with central listener - for _, registry := range barak.registries { - go func(registry string) { - for { - resp, err := http.Get(registry + "/register") - if err != nil { - fmt.Printf("Error registering to registry %v:\n %v\n", registry, err) - time.Sleep(1 * time.Hour) - continue - } - body, _ := ioutil.ReadAll(resp.Body) - fmt.Printf("Successfully registered with registry %v\n %v\n", registry, string(body)) - return - } - }(registry) - } - - // Write pid to file. This should be the last thing before TrapSignal. - err = WriteFileAtomic(barak.rootDir+"/pidfile", []byte(Fmt("%v", barak.pid))) - if err != nil { - panic(Fmt("Error writing pidfile: %v", err)) - } + // Read options + optionsFile := parseFlags() + options := ReadBarakOptions(optionsFile) + // Init barak + barak = NewBarakFromOptions(options) + barak.StartRegisterRoutine() + barak.WritePidFile() // This should be last, before TrapSignal(). TrapSignal(func() { fmt.Println("Barak shutting down") }) @@ -157,12 +86,16 @@ func Run(authCommand AuthCommand) (interface{}, error) { log.Info(Fmt("Run() received command %v:\n%v", reflect.TypeOf(command), command)) // Issue command switch c := command.(type) { - case CommandRunProcess: - return RunProcess(c.Wait, c.Label, c.ExecPath, c.Args, c.Input) + case CommandStartProcess: + return StartProcess(c.Wait, c.Label, c.ExecPath, c.Args, c.Input) case CommandStopProcess: return StopProcess(c.Label, c.Kill) case CommandListProcesses: return ListProcesses() + case CommandOpenListener: + return OpenListener(c.Addr) + case CommandCloseListener: + return CloseListener(c.Addr) default: return nil, errors.New("Invalid endpoint for command") } @@ -182,7 +115,7 @@ func parseValidateCommand(authCommand AuthCommand) (Command, error) { commandJSONStr := authCommand.CommandJSONStr signatures := authCommand.Signatures // Validate commandJSONStr - if !validate([]byte(commandJSONStr), barak.validators, signatures) { + if !validate([]byte(commandJSONStr), barak.ListValidators(), signatures) { fmt.Printf("Failed validation attempt") return nil, errors.New("Validation error") } @@ -194,11 +127,7 @@ func parseValidateCommand(authCommand AuthCommand) (Command, error) { return nil, errors.New("Command parse error") } // Prevent replays - if barak.nonce+1 != command.Nonce { - return nil, errors.New("Replay error") - } else { - barak.nonce += 1 - } + barak.CheckIncrNonce(command.Nonce) return command.Command, nil } @@ -206,48 +135,42 @@ func parseValidateCommand(authCommand AuthCommand) (Command, error) { // RPC base commands // WARNING Not validated, do not export to routes. -func RunProcess(wait bool, label string, execPath string, args []string, input string) (*ResponseRunProcess, error) { - barak.mtx.Lock() - +func StartProcess(wait bool, label string, execPath string, args []string, input string) (*ResponseStartProcess, error) { // First, see if there already is a process labeled 'label' - existing := barak.processes[label] + existing := barak.GetProcess(label) if existing != nil && existing.EndTime.IsZero() { - barak.mtx.Unlock() return nil, fmt.Errorf("Process already exists: %v", label) } // Otherwise, create one. - err := EnsureDir(barak.rootDir + "/outputs") + err := EnsureDir(barak.RootDir() + "/outputs") if err != nil { return nil, fmt.Errorf("Failed to create outputs dir: %v", err) } - outPath := Fmt("%v/outputs/%v_%v.out", barak.rootDir, label, time.Now().Format("2006_01_02_15_04_05_MST")) + outPath := Fmt("%v/outputs/%v_%v.out", barak.RootDir(), label, time.Now().Format("2006_01_02_15_04_05_MST")) proc, err := pcm.Create(pcm.ProcessModeDaemon, label, execPath, args, input, outPath) - if err == nil { - barak.processes[label] = proc - } - barak.mtx.Unlock() if err != nil { return nil, err } + barak.AddProcess(label, proc) if wait { <-proc.WaitCh output := pcm.ReadOutput(proc) fmt.Println("Read output", output) if proc.ExitState == nil { - return &ResponseRunProcess{ + return &ResponseStartProcess{ Success: true, Output: output, }, nil } else { - return &ResponseRunProcess{ + return &ResponseStartProcess{ Success: proc.ExitState.Success(), // Would be always false? Output: output, }, nil } } else { - return &ResponseRunProcess{ + return &ResponseStartProcess{ Success: true, Output: "", }, nil @@ -255,36 +178,35 @@ func RunProcess(wait bool, label string, execPath string, args []string, input s } func StopProcess(label string, kill bool) (*ResponseStopProcess, error) { - barak.mtx.Lock() - proc := barak.processes[label] - barak.mtx.Unlock() - - if proc == nil { - return nil, fmt.Errorf("Process does not exist: %v", label) - } - - err := pcm.Stop(proc, kill) + err := barak.StopProcess(label, kill) return &ResponseStopProcess{}, err } func ListProcesses() (*ResponseListProcesses, error) { - var procs = []*pcm.Process{} - barak.mtx.Lock() - fmt.Println("Processes: %v", barak.processes) - for _, proc := range barak.processes { - procs = append(procs, proc) - } - barak.mtx.Unlock() - + procs := barak.ListProcesses() return &ResponseListProcesses{ Processes: procs, }, nil } +func OpenListener(addr string) (*ResponseOpenListener, error) { + listener := barak.OpenListener(addr) + return &ResponseOpenListener{ + Addr: listener.Addr().String(), + }, nil +} + +func CloseListener(addr string) (*ResponseCloseListener, error) { + barak.CloseListener(addr) + return &ResponseCloseListener{}, nil +} + +//-------------------------------------------------------------------------------- + // Another barak instance registering its external // address to a remote barak. -func Register(w http.ResponseWriter, req *http.Request) { - registry, err := os.OpenFile(barak.rootDir+"/registry.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600) +func RegisterHandler(w http.ResponseWriter, req *http.Request) { + registry, err := os.OpenFile(barak.RootDir()+"/registry.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600) if err != nil { http.Error(w, "Could not open registry file. Please contact the administrator", 500) return @@ -297,7 +219,7 @@ func Register(w http.ResponseWriter, req *http.Request) { w.Write([]byte("Noted!")) } -func ServeFile(w http.ResponseWriter, req *http.Request) { +func ServeFileHandler(w http.ResponseWriter, req *http.Request) { authCommandStr := req.FormValue("auth_command") command, err := parseValidateCommandStr(authCommandStr) if err != nil { @@ -316,7 +238,11 @@ func ServeFile(w http.ResponseWriter, req *http.Request) { // local paths must be explicitly local, e.g. "./xyz" } else if path[0] != '/' { // If not an absolute path, then is label - proc := barak.processes[path] + proc := barak.GetProcess(path) + if proc == nil { + http.Error(w, Fmt("Unknown process label: %v", path), 400) + return + } path = proc.OutputPath } file, err := os.Open(path) diff --git a/cmd/barak/types/command.go b/cmd/barak/types/command.go index 077ec24e..92df4691 100644 --- a/cmd/barak/types/command.go +++ b/cmd/barak/types/command.go @@ -18,22 +18,26 @@ type NoncedCommand struct { type Command interface{} const ( - commandTypeRunProcess = 0x01 + commandTypeStartProcess = 0x01 commandTypeStopProcess = 0x02 commandTypeListProcesses = 0x03 commandTypeServeFile = 0x04 + commandTypeOpenListener = 0x05 + commandTypeCloseListener = 0x06 ) // for binary.readReflect var _ = binary.RegisterInterface( struct{ Command }{}, - binary.ConcreteType{CommandRunProcess{}, commandTypeRunProcess}, + binary.ConcreteType{CommandStartProcess{}, commandTypeStartProcess}, binary.ConcreteType{CommandStopProcess{}, commandTypeStopProcess}, binary.ConcreteType{CommandListProcesses{}, commandTypeListProcesses}, binary.ConcreteType{CommandServeFile{}, commandTypeServeFile}, + binary.ConcreteType{CommandOpenListener{}, commandTypeOpenListener}, + binary.ConcreteType{CommandCloseListener{}, commandTypeCloseListener}, ) -type CommandRunProcess struct { +type CommandStartProcess struct { Wait bool Label string ExecPath string @@ -51,3 +55,11 @@ type CommandListProcesses struct{} type CommandServeFile struct { Path string } + +type CommandOpenListener struct { + Addr string +} + +type CommandCloseListener struct { + Addr string +} diff --git a/cmd/barak/types/responses.go b/cmd/barak/types/responses.go index e80fc1ac..85fc77bc 100644 --- a/cmd/barak/types/responses.go +++ b/cmd/barak/types/responses.go @@ -13,7 +13,7 @@ type ResponseStatus struct { type ResponseRegister struct { } -type ResponseRunProcess struct { +type ResponseStartProcess struct { Success bool Output string } @@ -24,3 +24,10 @@ type ResponseStopProcess struct { type ResponseListProcesses struct { Processes []*pcm.Process } + +type ResponseOpenListener struct { + Addr string +} + +type ResponseCloseListener struct { +} diff --git a/cmd/debora/commands.go b/cmd/debora/commands.go index 7956cb6e..b5a28988 100644 --- a/cmd/debora/commands.go +++ b/cmd/debora/commands.go @@ -19,7 +19,7 @@ import ( // (First the command(s) are signed by all validators, // and then it is broadcast). -func RunProcess(privKey acm.PrivKey, remote string, command btypes.CommandRunProcess) (response btypes.ResponseRunProcess, err error) { +func StartProcess(privKey acm.PrivKey, remote string, command btypes.CommandStartProcess) (response btypes.ResponseStartProcess, err error) { nonce, err := GetNonce(remote) if err != nil { return response, err diff --git a/cmd/debora/main.go b/cmd/debora/main.go index 26239133..5743f190 100644 --- a/cmd/debora/main.go +++ b/cmd/debora/main.go @@ -87,7 +87,7 @@ func main() { cli.Command{ Name: "run", Usage: "run process", - Action: cliRunProcess, + Action: cliStartProcess, Flags: []cli.Flag{ labelFlag, bgFlag, @@ -145,14 +145,14 @@ func cliGetStatus(c *cli.Context) { wg.Wait() } -func cliRunProcess(c *cli.Context) { +func cliStartProcess(c *cli.Context) { args := c.Args() if len(args) < 1 { Exit("Must specify ") } execPath := args[0] args = args[1:] - command := btypes.CommandRunProcess{ + command := btypes.CommandStartProcess{ Wait: !c.Bool("bg"), Label: c.String("label"), ExecPath: execPath, @@ -164,7 +164,7 @@ func cliRunProcess(c *cli.Context) { wg.Add(1) go func(remote string) { defer wg.Done() - response, err := RunProcess(Config.PrivKey, remote, command) + response, err := StartProcess(Config.PrivKey, remote, command) if err != nil { fmt.Printf("%v failure. %v\n", remote, err) } else { diff --git a/rpc/server/http_server.go b/rpc/server/http_server.go index b8043bc3..cf599c1e 100644 --- a/rpc/server/http_server.go +++ b/rpc/server/http_server.go @@ -16,19 +16,20 @@ import ( . "github.com/tendermint/tendermint/rpc/types" ) -func StartHTTPServer(listenAddr string, handler http.Handler) { +func StartHTTPServer(listenAddr string, handler http.Handler) net.Listener { log.Info(Fmt("Starting RPC HTTP server on %v", listenAddr)) + listener, err := net.Listen("tcp", listenAddr) + if err != nil { + Exit(Fmt("Failed to listen to %v", listenAddr)) + } go func() { - listener, err := net.Listen("tcp", listenAddr) - if err != nil { - Exit(Fmt("Failed to listen to %v", listenAddr)) - } res := http.Serve( listener, RecoverAndLogHandler(handler), ) log.Crit("RPC HTTP server stopped", "result", res) }() + return listener } func WriteRPCResponse(w http.ResponseWriter, res RPCResponse) {