diff --git a/cmd/barak/barak.go b/cmd/barak/barak.go index 8627cf75..7e71ab66 100644 --- a/cmd/barak/barak.go +++ b/cmd/barak/barak.go @@ -104,7 +104,7 @@ func (brk *Barak) ListProcesses() []*pcm.Process { brk.mtx.Lock() defer brk.mtx.Unlock() processes := []*pcm.Process{} - for _, process := range processes { + for _, process := range brk.processes { processes = append(processes, process) } return processes @@ -128,9 +128,9 @@ func (brk *Barak) AddProcess(label string, process *pcm.Process) error { } func (brk *Barak) StopProcess(label string, kill bool) error { - barak.mtx.Lock() - proc := barak.processes[label] - barak.mtx.Unlock() + brk.mtx.Lock() + proc := brk.processes[label] + brk.mtx.Unlock() if proc == nil { return fmt.Errorf("Process does not exist: %v", label) @@ -152,7 +152,7 @@ func (brk *Barak) ListListeners() []net.Listener { return brk.listeners } -func (brk *Barak) OpenListener(addr string) net.Listener { +func (brk *Barak) OpenListener(addr string) (net.Listener, error) { brk.mtx.Lock() defer brk.mtx.Unlock() // Start rpc server. @@ -161,9 +161,12 @@ func (brk *Barak) OpenListener(addr string) net.Listener { mux.HandleFunc("/register", RegisterHandler) // TODO: mux.HandleFunc("/upload", UploadFile) rpcserver.RegisterRPCFuncs(mux, Routes) - listener := rpcserver.StartHTTPServer(addr, mux) + listener, err := rpcserver.StartHTTPServer(addr, mux) + if err != nil { + return nil, err + } brk.listeners = append(brk.listeners, listener) - return listener + return listener, nil } func (brk *Barak) CloseListener(addr string) { diff --git a/cmd/barak/main.go b/cmd/barak/main.go index 1c6fe55a..9c08fbac 100644 --- a/cmd/barak/main.go +++ b/cmd/barak/main.go @@ -33,7 +33,7 @@ func init() { } // Global instance -var barak *Barak +var barak_ *Barak // Parse command-line options func parseFlags() (optionsFile string) { @@ -53,9 +53,9 @@ func main() { options := ReadBarakOptions(optionsFile) // Init barak - barak = NewBarakFromOptions(options) - barak.StartRegisterRoutine() - barak.WritePidFile() // This should be last, before TrapSignal(). + barak_ = NewBarakFromOptions(options) + barak_.StartRegisterRoutine() + barak_.WritePidFile() // This should be last, before TrapSignal(). TrapSignal(func() { fmt.Println("Barak shutting down") }) @@ -65,11 +65,11 @@ func main() { // RPC functions func Status() (*ResponseStatus, error) { - barak.mtx.Lock() - pid := barak.pid - nonce := barak.nonce - validators := barak.validators - barak.mtx.Unlock() + barak_.mtx.Lock() + pid := barak_.pid + nonce := barak_.nonce + validators := barak_.validators + barak_.mtx.Unlock() return &ResponseStatus{ Pid: pid, @@ -115,7 +115,7 @@ func parseValidateCommand(authCommand AuthCommand) (Command, error) { commandJSONStr := authCommand.CommandJSONStr signatures := authCommand.Signatures // Validate commandJSONStr - if !validate([]byte(commandJSONStr), barak.ListValidators(), signatures) { + if !validate([]byte(commandJSONStr), barak_.ListValidators(), signatures) { fmt.Printf("Failed validation attempt") return nil, errors.New("Validation error") } @@ -127,7 +127,7 @@ func parseValidateCommand(authCommand AuthCommand) (Command, error) { return nil, errors.New("Command parse error") } // Prevent replays - barak.CheckIncrNonce(command.Nonce) + barak_.CheckIncrNonce(command.Nonce) return command.Command, nil } @@ -137,22 +137,22 @@ func parseValidateCommand(authCommand AuthCommand) (Command, error) { func StartProcess(wait bool, label string, execPath string, args []string, input string) (*ResponseStartProcess, error) { // First, see if there already is a process labeled 'label' - existing := barak.GetProcess(label) + existing := barak_.GetProcess(label) if existing != nil && existing.EndTime.IsZero() { return nil, fmt.Errorf("Process already exists: %v", label) } // Otherwise, create one. - err := EnsureDir(barak.RootDir() + "/outputs") + err := EnsureDir(barak_.RootDir() + "/outputs") if err != nil { return nil, fmt.Errorf("Failed to create outputs dir: %v", err) } - outPath := Fmt("%v/outputs/%v_%v.out", barak.RootDir(), label, time.Now().Format("2006_01_02_15_04_05_MST")) + outPath := Fmt("%v/outputs/%v_%v.out", barak_.RootDir(), label, time.Now().Format("2006_01_02_15_04_05_MST")) proc, err := pcm.Create(pcm.ProcessModeDaemon, label, execPath, args, input, outPath) if err != nil { return nil, err } - barak.AddProcess(label, proc) + barak_.AddProcess(label, proc) if wait { <-proc.WaitCh @@ -178,26 +178,29 @@ func StartProcess(wait bool, label string, execPath string, args []string, input } func StopProcess(label string, kill bool) (*ResponseStopProcess, error) { - err := barak.StopProcess(label, kill) + err := barak_.StopProcess(label, kill) return &ResponseStopProcess{}, err } func ListProcesses() (*ResponseListProcesses, error) { - procs := barak.ListProcesses() + procs := barak_.ListProcesses() return &ResponseListProcesses{ Processes: procs, }, nil } func OpenListener(addr string) (*ResponseOpenListener, error) { - listener := barak.OpenListener(addr) + listener, err := barak_.OpenListener(addr) + if err != nil { + return nil, err + } return &ResponseOpenListener{ Addr: listener.Addr().String(), }, nil } func CloseListener(addr string) (*ResponseCloseListener, error) { - barak.CloseListener(addr) + barak_.CloseListener(addr) return &ResponseCloseListener{}, nil } @@ -206,7 +209,7 @@ func CloseListener(addr string) (*ResponseCloseListener, error) { // Another barak instance registering its external // address to a remote barak. func RegisterHandler(w http.ResponseWriter, req *http.Request) { - registry, err := os.OpenFile(barak.RootDir()+"/registry.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600) + registry, err := os.OpenFile(barak_.RootDir()+"/registry.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600) if err != nil { http.Error(w, "Could not open registry file. Please contact the administrator", 500) return @@ -238,7 +241,7 @@ func ServeFileHandler(w http.ResponseWriter, req *http.Request) { // local paths must be explicitly local, e.g. "./xyz" } else if path[0] != '/' { // If not an absolute path, then is label - proc := barak.GetProcess(path) + proc := barak_.GetProcess(path) if proc == nil { http.Error(w, Fmt("Unknown process label: %v", path), 400) return diff --git a/cmd/debora/commands.go b/cmd/debora/commands.go index b5a28988..316e48ee 100644 --- a/cmd/debora/commands.go +++ b/cmd/debora/commands.go @@ -18,6 +18,7 @@ import ( // When multiple are involved, the workflow is different. // (First the command(s) are signed by all validators, // and then it is broadcast). +// TODO: Implement a reasonable workflow with multiple validators. func StartProcess(privKey acm.PrivKey, remote string, command btypes.CommandStartProcess) (response btypes.ResponseStartProcess, err error) { nonce, err := GetNonce(remote) @@ -49,6 +50,26 @@ func ListProcesses(privKey acm.PrivKey, remote string, command btypes.CommandLis return response, err } +func OpenListener(privKey acm.PrivKey, remote string, command btypes.CommandOpenListener) (response btypes.ResponseOpenListener, err error) { + nonce, err := GetNonce(remote) + if err != nil { + return response, err + } + commandBytes, signature := SignCommand(privKey, nonce+1, command) + _, err = RunAuthCommand(remote, commandBytes, []acm.Signature{signature}, &response) + return response, err +} + +func CloseListener(privKey acm.PrivKey, remote string, command btypes.CommandCloseListener) (response btypes.ResponseCloseListener, err error) { + nonce, err := GetNonce(remote) + if err != nil { + return response, err + } + commandBytes, signature := SignCommand(privKey, nonce+1, command) + _, err = RunAuthCommand(remote, commandBytes, []acm.Signature{signature}, &response) + return response, err +} + func DownloadFile(privKey acm.PrivKey, remote string, command btypes.CommandServeFile, outPath string) (n int64, err error) { // Create authCommandJSONBytes nonce, err := GetNonce(remote) diff --git a/cmd/debora/main.go b/cmd/debora/main.go index 5743f190..e0ba3459 100644 --- a/cmd/debora/main.go +++ b/cmd/debora/main.go @@ -104,6 +104,16 @@ func main() { Usage: "list processes", Action: cliListProcesses, }, + cli.Command{ + Name: "open", + Usage: "open listener", + Action: cliOpenListener, + }, + cli.Command{ + Name: "close", + Usage: "close listener", + Action: cliCloseListener, + }, cli.Command{ Name: "download", Usage: "download file ", @@ -244,6 +254,56 @@ func cliListProcesses(c *cli.Context) { wg.Wait() } +func cliOpenListener(c *cli.Context) { + args := c.Args() + if len(args) < 1 { + Exit("Must specify ") + } + listenAddr := args[0] + command := btypes.CommandOpenListener{ + Addr: listenAddr, + } + wg := sync.WaitGroup{} + for _, remote := range Config.Remotes { + wg.Add(1) + go func(remote string) { + defer wg.Done() + response, err := OpenListener(Config.PrivKey, remote, command) + if err != nil { + fmt.Printf("%v failure. %v\n", remote, err) + } else { + fmt.Printf("%v opened %v.\n", remote, response.Addr) + } + }(remote) + } + wg.Wait() +} + +func cliCloseListener(c *cli.Context) { + args := c.Args() + if len(args) == 0 { + Exit("Must specify listenAddr to stop") + } + listenAddr := args[0] + command := btypes.CommandCloseListener{ + Addr: listenAddr, + } + wg := sync.WaitGroup{} + for _, remote := range Config.Remotes { + wg.Add(1) + go func(remote string) { + defer wg.Done() + response, err := CloseListener(Config.PrivKey, remote, command) + if err != nil { + fmt.Printf("%v failure. %v\n", remote, err) + } else { + fmt.Printf("%v success. %v\n", remote, response) + } + }(remote) + } + wg.Wait() +} + func cliDownloadFile(c *cli.Context) { args := c.Args() if len(args) != 2 { diff --git a/rpc/server/http_server.go b/rpc/server/http_server.go index cf599c1e..d2e4cc4b 100644 --- a/rpc/server/http_server.go +++ b/rpc/server/http_server.go @@ -16,11 +16,11 @@ import ( . "github.com/tendermint/tendermint/rpc/types" ) -func StartHTTPServer(listenAddr string, handler http.Handler) net.Listener { +func StartHTTPServer(listenAddr string, handler http.Handler) (net.Listener, error) { log.Info(Fmt("Starting RPC HTTP server on %v", listenAddr)) listener, err := net.Listen("tcp", listenAddr) if err != nil { - Exit(Fmt("Failed to listen to %v", listenAddr)) + return nil, fmt.Errorf("Failed to listen to %v", listenAddr) } go func() { res := http.Serve( @@ -29,7 +29,7 @@ func StartHTTPServer(listenAddr string, handler http.Handler) net.Listener { ) log.Crit("RPC HTTP server stopped", "result", res) }() - return listener + return listener, nil } func WriteRPCResponse(w http.ResponseWriter, res RPCResponse) {