add open/close to debora, barak bug fix

This commit is contained in:
Jae Kwon 2015-05-21 17:13:23 -07:00
parent c297f41a9f
commit 5029d53474
5 changed files with 118 additions and 31 deletions

View File

@ -104,7 +104,7 @@ func (brk *Barak) ListProcesses() []*pcm.Process {
brk.mtx.Lock() brk.mtx.Lock()
defer brk.mtx.Unlock() defer brk.mtx.Unlock()
processes := []*pcm.Process{} processes := []*pcm.Process{}
for _, process := range processes { for _, process := range brk.processes {
processes = append(processes, process) processes = append(processes, process)
} }
return processes return processes
@ -128,9 +128,9 @@ func (brk *Barak) AddProcess(label string, process *pcm.Process) error {
} }
func (brk *Barak) StopProcess(label string, kill bool) error { func (brk *Barak) StopProcess(label string, kill bool) error {
barak.mtx.Lock() brk.mtx.Lock()
proc := barak.processes[label] proc := brk.processes[label]
barak.mtx.Unlock() brk.mtx.Unlock()
if proc == nil { if proc == nil {
return fmt.Errorf("Process does not exist: %v", label) return fmt.Errorf("Process does not exist: %v", label)
@ -152,7 +152,7 @@ func (brk *Barak) ListListeners() []net.Listener {
return brk.listeners return brk.listeners
} }
func (brk *Barak) OpenListener(addr string) net.Listener { func (brk *Barak) OpenListener(addr string) (net.Listener, error) {
brk.mtx.Lock() brk.mtx.Lock()
defer brk.mtx.Unlock() defer brk.mtx.Unlock()
// Start rpc server. // Start rpc server.
@ -161,9 +161,12 @@ func (brk *Barak) OpenListener(addr string) net.Listener {
mux.HandleFunc("/register", RegisterHandler) mux.HandleFunc("/register", RegisterHandler)
// TODO: mux.HandleFunc("/upload", UploadFile) // TODO: mux.HandleFunc("/upload", UploadFile)
rpcserver.RegisterRPCFuncs(mux, Routes) rpcserver.RegisterRPCFuncs(mux, Routes)
listener := rpcserver.StartHTTPServer(addr, mux) listener, err := rpcserver.StartHTTPServer(addr, mux)
if err != nil {
return nil, err
}
brk.listeners = append(brk.listeners, listener) brk.listeners = append(brk.listeners, listener)
return listener return listener, nil
} }
func (brk *Barak) CloseListener(addr string) { func (brk *Barak) CloseListener(addr string) {

View File

@ -33,7 +33,7 @@ func init() {
} }
// Global instance // Global instance
var barak *Barak var barak_ *Barak
// Parse command-line options // Parse command-line options
func parseFlags() (optionsFile string) { func parseFlags() (optionsFile string) {
@ -53,9 +53,9 @@ func main() {
options := ReadBarakOptions(optionsFile) options := ReadBarakOptions(optionsFile)
// Init barak // Init barak
barak = NewBarakFromOptions(options) barak_ = NewBarakFromOptions(options)
barak.StartRegisterRoutine() barak_.StartRegisterRoutine()
barak.WritePidFile() // This should be last, before TrapSignal(). barak_.WritePidFile() // This should be last, before TrapSignal().
TrapSignal(func() { TrapSignal(func() {
fmt.Println("Barak shutting down") fmt.Println("Barak shutting down")
}) })
@ -65,11 +65,11 @@ func main() {
// RPC functions // RPC functions
func Status() (*ResponseStatus, error) { func Status() (*ResponseStatus, error) {
barak.mtx.Lock() barak_.mtx.Lock()
pid := barak.pid pid := barak_.pid
nonce := barak.nonce nonce := barak_.nonce
validators := barak.validators validators := barak_.validators
barak.mtx.Unlock() barak_.mtx.Unlock()
return &ResponseStatus{ return &ResponseStatus{
Pid: pid, Pid: pid,
@ -115,7 +115,7 @@ func parseValidateCommand(authCommand AuthCommand) (Command, error) {
commandJSONStr := authCommand.CommandJSONStr commandJSONStr := authCommand.CommandJSONStr
signatures := authCommand.Signatures signatures := authCommand.Signatures
// Validate commandJSONStr // Validate commandJSONStr
if !validate([]byte(commandJSONStr), barak.ListValidators(), signatures) { if !validate([]byte(commandJSONStr), barak_.ListValidators(), signatures) {
fmt.Printf("Failed validation attempt") fmt.Printf("Failed validation attempt")
return nil, errors.New("Validation error") return nil, errors.New("Validation error")
} }
@ -127,7 +127,7 @@ func parseValidateCommand(authCommand AuthCommand) (Command, error) {
return nil, errors.New("Command parse error") return nil, errors.New("Command parse error")
} }
// Prevent replays // Prevent replays
barak.CheckIncrNonce(command.Nonce) barak_.CheckIncrNonce(command.Nonce)
return command.Command, nil return command.Command, nil
} }
@ -137,22 +137,22 @@ func parseValidateCommand(authCommand AuthCommand) (Command, error) {
func StartProcess(wait bool, label string, execPath string, args []string, input string) (*ResponseStartProcess, error) { func StartProcess(wait bool, label string, execPath string, args []string, input string) (*ResponseStartProcess, error) {
// First, see if there already is a process labeled 'label' // First, see if there already is a process labeled 'label'
existing := barak.GetProcess(label) existing := barak_.GetProcess(label)
if existing != nil && existing.EndTime.IsZero() { if existing != nil && existing.EndTime.IsZero() {
return nil, fmt.Errorf("Process already exists: %v", label) return nil, fmt.Errorf("Process already exists: %v", label)
} }
// Otherwise, create one. // Otherwise, create one.
err := EnsureDir(barak.RootDir() + "/outputs") err := EnsureDir(barak_.RootDir() + "/outputs")
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to create outputs dir: %v", err) return nil, fmt.Errorf("Failed to create outputs dir: %v", err)
} }
outPath := Fmt("%v/outputs/%v_%v.out", barak.RootDir(), label, time.Now().Format("2006_01_02_15_04_05_MST")) outPath := Fmt("%v/outputs/%v_%v.out", barak_.RootDir(), label, time.Now().Format("2006_01_02_15_04_05_MST"))
proc, err := pcm.Create(pcm.ProcessModeDaemon, label, execPath, args, input, outPath) proc, err := pcm.Create(pcm.ProcessModeDaemon, label, execPath, args, input, outPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
barak.AddProcess(label, proc) barak_.AddProcess(label, proc)
if wait { if wait {
<-proc.WaitCh <-proc.WaitCh
@ -178,26 +178,29 @@ func StartProcess(wait bool, label string, execPath string, args []string, input
} }
func StopProcess(label string, kill bool) (*ResponseStopProcess, error) { func StopProcess(label string, kill bool) (*ResponseStopProcess, error) {
err := barak.StopProcess(label, kill) err := barak_.StopProcess(label, kill)
return &ResponseStopProcess{}, err return &ResponseStopProcess{}, err
} }
func ListProcesses() (*ResponseListProcesses, error) { func ListProcesses() (*ResponseListProcesses, error) {
procs := barak.ListProcesses() procs := barak_.ListProcesses()
return &ResponseListProcesses{ return &ResponseListProcesses{
Processes: procs, Processes: procs,
}, nil }, nil
} }
func OpenListener(addr string) (*ResponseOpenListener, error) { func OpenListener(addr string) (*ResponseOpenListener, error) {
listener := barak.OpenListener(addr) listener, err := barak_.OpenListener(addr)
if err != nil {
return nil, err
}
return &ResponseOpenListener{ return &ResponseOpenListener{
Addr: listener.Addr().String(), Addr: listener.Addr().String(),
}, nil }, nil
} }
func CloseListener(addr string) (*ResponseCloseListener, error) { func CloseListener(addr string) (*ResponseCloseListener, error) {
barak.CloseListener(addr) barak_.CloseListener(addr)
return &ResponseCloseListener{}, nil return &ResponseCloseListener{}, nil
} }
@ -206,7 +209,7 @@ func CloseListener(addr string) (*ResponseCloseListener, error) {
// Another barak instance registering its external // Another barak instance registering its external
// address to a remote barak. // address to a remote barak.
func RegisterHandler(w http.ResponseWriter, req *http.Request) { func RegisterHandler(w http.ResponseWriter, req *http.Request) {
registry, err := os.OpenFile(barak.RootDir()+"/registry.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600) registry, err := os.OpenFile(barak_.RootDir()+"/registry.log", os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
if err != nil { if err != nil {
http.Error(w, "Could not open registry file. Please contact the administrator", 500) http.Error(w, "Could not open registry file. Please contact the administrator", 500)
return return
@ -238,7 +241,7 @@ func ServeFileHandler(w http.ResponseWriter, req *http.Request) {
// local paths must be explicitly local, e.g. "./xyz" // local paths must be explicitly local, e.g. "./xyz"
} else if path[0] != '/' { } else if path[0] != '/' {
// If not an absolute path, then is label // If not an absolute path, then is label
proc := barak.GetProcess(path) proc := barak_.GetProcess(path)
if proc == nil { if proc == nil {
http.Error(w, Fmt("Unknown process label: %v", path), 400) http.Error(w, Fmt("Unknown process label: %v", path), 400)
return return

View File

@ -18,6 +18,7 @@ import (
// When multiple are involved, the workflow is different. // When multiple are involved, the workflow is different.
// (First the command(s) are signed by all validators, // (First the command(s) are signed by all validators,
// and then it is broadcast). // and then it is broadcast).
// TODO: Implement a reasonable workflow with multiple validators.
func StartProcess(privKey acm.PrivKey, remote string, command btypes.CommandStartProcess) (response btypes.ResponseStartProcess, err error) { func StartProcess(privKey acm.PrivKey, remote string, command btypes.CommandStartProcess) (response btypes.ResponseStartProcess, err error) {
nonce, err := GetNonce(remote) nonce, err := GetNonce(remote)
@ -49,6 +50,26 @@ func ListProcesses(privKey acm.PrivKey, remote string, command btypes.CommandLis
return response, err return response, err
} }
func OpenListener(privKey acm.PrivKey, remote string, command btypes.CommandOpenListener) (response btypes.ResponseOpenListener, err error) {
nonce, err := GetNonce(remote)
if err != nil {
return response, err
}
commandBytes, signature := SignCommand(privKey, nonce+1, command)
_, err = RunAuthCommand(remote, commandBytes, []acm.Signature{signature}, &response)
return response, err
}
func CloseListener(privKey acm.PrivKey, remote string, command btypes.CommandCloseListener) (response btypes.ResponseCloseListener, err error) {
nonce, err := GetNonce(remote)
if err != nil {
return response, err
}
commandBytes, signature := SignCommand(privKey, nonce+1, command)
_, err = RunAuthCommand(remote, commandBytes, []acm.Signature{signature}, &response)
return response, err
}
func DownloadFile(privKey acm.PrivKey, remote string, command btypes.CommandServeFile, outPath string) (n int64, err error) { func DownloadFile(privKey acm.PrivKey, remote string, command btypes.CommandServeFile, outPath string) (n int64, err error) {
// Create authCommandJSONBytes // Create authCommandJSONBytes
nonce, err := GetNonce(remote) nonce, err := GetNonce(remote)

View File

@ -104,6 +104,16 @@ func main() {
Usage: "list processes", Usage: "list processes",
Action: cliListProcesses, Action: cliListProcesses,
}, },
cli.Command{
Name: "open",
Usage: "open listener",
Action: cliOpenListener,
},
cli.Command{
Name: "close",
Usage: "close listener",
Action: cliCloseListener,
},
cli.Command{ cli.Command{
Name: "download", Name: "download",
Usage: "download file <remote-path> <local-path-prefix>", Usage: "download file <remote-path> <local-path-prefix>",
@ -244,6 +254,56 @@ func cliListProcesses(c *cli.Context) {
wg.Wait() wg.Wait()
} }
func cliOpenListener(c *cli.Context) {
args := c.Args()
if len(args) < 1 {
Exit("Must specify <listenAddr e.g. 0.0.0.0:46660>")
}
listenAddr := args[0]
command := btypes.CommandOpenListener{
Addr: listenAddr,
}
wg := sync.WaitGroup{}
for _, remote := range Config.Remotes {
wg.Add(1)
go func(remote string) {
defer wg.Done()
response, err := OpenListener(Config.PrivKey, remote, command)
if err != nil {
fmt.Printf("%v failure. %v\n", remote, err)
} else {
fmt.Printf("%v opened %v.\n", remote, response.Addr)
}
}(remote)
}
wg.Wait()
}
func cliCloseListener(c *cli.Context) {
args := c.Args()
if len(args) == 0 {
Exit("Must specify listenAddr to stop")
}
listenAddr := args[0]
command := btypes.CommandCloseListener{
Addr: listenAddr,
}
wg := sync.WaitGroup{}
for _, remote := range Config.Remotes {
wg.Add(1)
go func(remote string) {
defer wg.Done()
response, err := CloseListener(Config.PrivKey, remote, command)
if err != nil {
fmt.Printf("%v failure. %v\n", remote, err)
} else {
fmt.Printf("%v success. %v\n", remote, response)
}
}(remote)
}
wg.Wait()
}
func cliDownloadFile(c *cli.Context) { func cliDownloadFile(c *cli.Context) {
args := c.Args() args := c.Args()
if len(args) != 2 { if len(args) != 2 {

View File

@ -16,11 +16,11 @@ import (
. "github.com/tendermint/tendermint/rpc/types" . "github.com/tendermint/tendermint/rpc/types"
) )
func StartHTTPServer(listenAddr string, handler http.Handler) net.Listener { func StartHTTPServer(listenAddr string, handler http.Handler) (net.Listener, error) {
log.Info(Fmt("Starting RPC HTTP server on %v", listenAddr)) log.Info(Fmt("Starting RPC HTTP server on %v", listenAddr))
listener, err := net.Listen("tcp", listenAddr) listener, err := net.Listen("tcp", listenAddr)
if err != nil { if err != nil {
Exit(Fmt("Failed to listen to %v", listenAddr)) return nil, fmt.Errorf("Failed to listen to %v", listenAddr)
} }
go func() { go func() {
res := http.Serve( res := http.Serve(
@ -29,7 +29,7 @@ func StartHTTPServer(listenAddr string, handler http.Handler) net.Listener {
) )
log.Crit("RPC HTTP server stopped", "result", res) log.Crit("RPC HTTP server stopped", "result", res)
}() }()
return listener return listener, nil
} }
func WriteRPCResponse(w http.ResponseWriter, res RPCResponse) { func WriteRPCResponse(w http.ResponseWriter, res RPCResponse) {