From a13bc9d7a1bc96fab93ace40045c0f0fea4da836 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 5 Feb 2016 13:45:36 +0200 Subject: [PATCH 1/4] cmd, common, node, rpc: move HTTP RPC into node, drop singletone aspect --- cmd/geth/main.go | 5 -- cmd/gethrpctest/main.go | 37 ++------ cmd/utils/flags.go | 48 +++++----- common/defaults.go | 48 ++++++++++ common/path.go | 22 ----- node/api.go | 46 ++++------ node/config.go | 37 +++++++- node/node.go | 194 +++++++++++++++++++++++++++++----------- rpc/http.go | 61 ++----------- rpc/utils.go | 3 +- 10 files changed, 280 insertions(+), 221 deletions(-) create mode 100644 common/defaults.go diff --git a/cmd/geth/main.go b/cmd/geth/main.go index fa456a7ac..a321181a1 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -503,11 +503,6 @@ func startNode(ctx *cli.Context, stack *node.Node) { } } // Start auxiliary services if enabled - if ctx.GlobalBool(utils.RPCEnabledFlag.Name) { - if err := utils.StartRPC(stack, ctx); err != nil { - utils.Fatalf("Failed to start RPC: %v", err) - } - } if ctx.GlobalBool(utils.WSEnabledFlag.Name) { if err := utils.StartWS(stack, ctx); err != nil { utils.Fatalf("Failed to start WS: %v", err) diff --git a/cmd/gethrpctest/main.go b/cmd/gethrpctest/main.go index becd09f5a..b0907f8c5 100644 --- a/cmd/gethrpctest/main.go +++ b/cmd/gethrpctest/main.go @@ -18,7 +18,6 @@ package main import ( - "errors" "flag" "io/ioutil" "log" @@ -26,10 +25,10 @@ import ( "os/signal" "github.com/ethereum/go-ethereum/accounts" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/tests" @@ -84,12 +83,6 @@ func main() { } log.Println("Initial test suite passed...") - // Start the RPC interface and wait until terminated - if err := StartRPC(stack); err != nil { - log.Fatalf("Failed to start RPC interface: %v", err) - } - log.Println("RPC Interface started, accepting requests...") - quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt) <-quit @@ -99,7 +92,13 @@ func main() { // keystore path and initial pre-state. func MakeSystemNode(keydir string, privkey string, test *tests.BlockTest) (*node.Node, error) { // Create a networkless protocol stack - stack, err := node.New(&node.Config{IpcPath: node.DefaultIpcEndpoint(), NoDiscovery: true}) + stack, err := node.New(&node.Config{ + IpcPath: node.DefaultIpcEndpoint(), + HttpHost: common.DefaultHttpHost, + HttpPort: common.DefaultHttpPort, + HttpModules: []string{"admin", "db", "eth", "debug", "miner", "net", "shh", "txpool", "personal", "web3"}, + NoDiscovery: true, + }) if err != nil { return nil, err } @@ -164,23 +163,3 @@ func RunTest(stack *node.Node, test *tests.BlockTest) error { } return nil } - -// StartRPC initializes an RPC interface to the given protocol stack. -func StartRPC(stack *node.Node) error { - /* - web3 := NewPublicWeb3API(stack) - server.RegisterName("web3", web3) - net := NewPublicNetAPI(stack.Server(), ethereum.NetVersion()) - server.RegisterName("net", net) - */ - - for _, api := range stack.APIs() { - if adminApi, ok := api.Service.(*node.PrivateAdminAPI); ok { - _, err := adminApi.StartRPC("127.0.0.1", 8545, "", "admin,db,eth,debug,miner,net,shh,txpool,personal,web3") - return err - } - } - - glog.V(logger.Error).Infof("Unable to start RPC-HTTP interface, could not find admin API") - return errors.New("Unable to start RPC-HTTP interface") -} diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 5d56ba7d0..40ea29d78 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -233,12 +233,12 @@ var ( RPCListenAddrFlag = cli.StringFlag{ Name: "rpcaddr", Usage: "HTTP-RPC server listening interface", - Value: "127.0.0.1", + Value: common.DefaultHttpHost, } RPCPortFlag = cli.IntFlag{ Name: "rpcport", Usage: "HTTP-RPC server listening port", - Value: 8545, + Value: common.DefaultHttpPort, } RPCCORSDomainFlag = cli.StringFlag{ Name: "rpccorsdomain", @@ -262,7 +262,7 @@ var ( IPCPathFlag = DirectoryFlag{ Name: "ipcpath", Usage: "Filename for IPC socket/pipe within the datadir (explicit paths escape it)", - Value: DirectoryString{common.DefaultIpcSocket()}, + Value: DirectoryString{common.DefaultIpcSocket}, } WSEnabledFlag = cli.BoolFlag{ Name: "ws", @@ -271,12 +271,12 @@ var ( WSListenAddrFlag = cli.StringFlag{ Name: "wsaddr", Usage: "WS-RPC server listening interface", - Value: "127.0.0.1", + Value: common.DefaultWsHost, } WSPortFlag = cli.IntFlag{ Name: "wsport", Usage: "WS-RPC server listening port", - Value: 8546, + Value: common.DefaultWsPort, } WSApiFlag = cli.StringFlag{ Name: "wsapi", @@ -284,7 +284,7 @@ var ( Value: rpc.DefaultHttpRpcApis, } WSAllowedDomainsFlag = cli.StringFlag{ - Name: "wsdomains", + Name: "wscors", Usage: "Domains from which to accept websockets requests", Value: "", } @@ -482,6 +482,15 @@ func MakeNAT(ctx *cli.Context) nat.Interface { return natif } +// MakeHttpRpcHost creates the HTTP RPC listener interface string from the set +// command line flags, returning empty if the HTTP endpoint is disabled. +func MakeHttpRpcHost(ctx *cli.Context) string { + if !ctx.GlobalBool(RPCEnabledFlag.Name) { + return "" + } + return ctx.GlobalString(RPCListenAddrFlag.Name) +} + // MakeGenesisBlock loads up a genesis block from an input file specified in the // command line, or returns the empty string if none set. func MakeGenesisBlock(ctx *cli.Context) string { @@ -591,7 +600,6 @@ func MakeSystemNode(name, version string, extra []byte, ctx *cli.Context) *node. // Configure the node's service container stackConf := &node.Config{ DataDir: MustMakeDataDir(ctx), - IpcPath: MakeIpcPath(ctx), PrivateKey: MakeNodeKey(ctx), Name: MakeNodeName(name, version, ctx), NoDiscovery: ctx.GlobalBool(NoDiscoverFlag.Name), @@ -600,6 +608,11 @@ func MakeSystemNode(name, version string, extra []byte, ctx *cli.Context) *node. NAT: MakeNAT(ctx), MaxPeers: ctx.GlobalInt(MaxPeersFlag.Name), MaxPendingPeers: ctx.GlobalInt(MaxPendingPeersFlag.Name), + IpcPath: MakeIpcPath(ctx), + HttpHost: MakeHttpRpcHost(ctx), + HttpPort: ctx.GlobalInt(RPCPortFlag.Name), + HttpCors: ctx.GlobalString(RPCCORSDomainFlag.Name), + HttpModules: strings.Split(ctx.GlobalString(RPCApiFlag.Name), ","), } // Configure the Ethereum service accman := MakeAccountManager(ctx) @@ -744,27 +757,6 @@ func MakeChain(ctx *cli.Context) (chain *core.BlockChain, chainDb ethdb.Database return chain, chainDb } -// StartRPC starts a HTTP JSON-RPC API server. -func StartRPC(stack *node.Node, ctx *cli.Context) error { - for _, api := range stack.APIs() { - if adminApi, ok := api.Service.(*node.PrivateAdminAPI); ok { - address := ctx.GlobalString(RPCListenAddrFlag.Name) - port := ctx.GlobalInt(RPCPortFlag.Name) - cors := ctx.GlobalString(RPCCORSDomainFlag.Name) - apiStr := "" - if ctx.GlobalIsSet(RPCApiFlag.Name) { - apiStr = ctx.GlobalString(RPCApiFlag.Name) - } - - _, err := adminApi.StartRPC(address, port, cors, apiStr) - return err - } - } - - glog.V(logger.Error).Infof("Unable to start RPC-HTTP interface, could not find admin API") - return errors.New("Unable to start RPC-HTTP interface") -} - // StartWS starts a websocket JSON-RPC API server. func StartWS(stack *node.Node, ctx *cli.Context) error { for _, api := range stack.APIs() { diff --git a/common/defaults.go b/common/defaults.go new file mode 100644 index 000000000..c5a88d7a3 --- /dev/null +++ b/common/defaults.go @@ -0,0 +1,48 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package common + +import ( + "path/filepath" + "runtime" +) + +const ( + DefaultIpcSocket = "geth.ipc" // Default (relative) name of the IPC RPC socket + DefaultHttpHost = "localhost" // Default host interface for the HTTP RPC server + DefaultHttpPort = 8545 // Default TCP port for the HTTP RPC server + DefaultWsHost = "localhost" // Default host interface for the websocket RPC server + DefaultWsPort = 8546 // Default TCP port for the websocket RPC server +) + +// DefaultDataDir is the default data directory to use for the databases and other +// persistence requirements. +func DefaultDataDir() string { + // Try to place the data folder in the user's home dir + home := HomeDir() + if home != "" { + if runtime.GOOS == "darwin" { + return filepath.Join(home, "Library", "Ethereum") + } else if runtime.GOOS == "windows" { + return filepath.Join(home, "AppData", "Roaming", "Ethereum") + } else { + return filepath.Join(home, ".ethereum") + } + } + // As we cannot guess a stable location, return empty and handle later + return "" +} diff --git a/common/path.go b/common/path.go index 38c213a12..75a8c1a3e 100644 --- a/common/path.go +++ b/common/path.go @@ -72,25 +72,3 @@ func HomeDir() string { } return "" } - -func DefaultDataDir() string { - // Try to place the data folder in the user's home dir - home := HomeDir() - if home != "" { - if runtime.GOOS == "darwin" { - return filepath.Join(home, "Library", "Ethereum") - } else if runtime.GOOS == "windows" { - return filepath.Join(home, "AppData", "Roaming", "Ethereum") - } else { - return filepath.Join(home, ".ethereum") - } - } - // As we cannot guess a stable location, return empty and handle later - return "" -} - -// DefaultIpcSocket returns the relative name of the default IPC socket. The path -// resolution is done by a node with other contextual infos. -func DefaultIpcSocket() string { - return "geth.ipc" -} diff --git a/node/api.go b/node/api.go index bc1795407..1b185c6f1 100644 --- a/node/api.go +++ b/node/api.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/rpc" "github.com/rcrowley/go-metrics" - "gopkg.in/fatih/set.v0" ) @@ -61,42 +60,29 @@ func (api *PrivateAdminAPI) AddPeer(url string) (bool, error) { } // StartRPC starts the HTTP RPC API server. -func (api *PrivateAdminAPI) StartRPC(address string, port int, cors string, apis string) (bool, error) { - var offeredAPIs []rpc.API - if len(apis) > 0 { - namespaces := set.New() - for _, a := range strings.Split(apis, ",") { - namespaces.Add(strings.TrimSpace(a)) - } - for _, api := range api.node.APIs() { - if namespaces.Has(api.Namespace) { - offeredAPIs = append(offeredAPIs, api) - } - } - } else { // use by default all public API's - for _, api := range api.node.APIs() { - if api.Public { - offeredAPIs = append(offeredAPIs, api) - } - } - } +func (api *PrivateAdminAPI) StartRPC(host string, port int, cors string, apis string) (bool, error) { + api.node.lock.Lock() + defer api.node.lock.Unlock() - if address == "" { - address = "127.0.0.1" + if api.node.httpHandler != nil { + return false, fmt.Errorf("HTTP RPC already running on %s", api.node.httpEndpoint) } - if port == 0 { - port = 8545 + if err := api.node.startHTTP(fmt.Sprintf("%s:%d", host, port), api.node.rpcAPIs, strings.Split(apis, ","), cors); err != nil { + return false, err } - - corsDomains := strings.Split(cors, " ") - err := rpc.StartHTTP(address, port, corsDomains, offeredAPIs) - return err == nil, err + return true, nil } // StopRPC terminates an already running HTTP RPC API endpoint. func (api *PrivateAdminAPI) StopRPC() (bool, error) { - err := rpc.StopHTTP() - return err == nil, err + api.node.lock.Lock() + defer api.node.lock.Unlock() + + if api.node.httpHandler == nil { + return false, fmt.Errorf("HTTP RPC not running") + } + api.node.stopHTTP() + return true, nil } // StartWS starts the websocket RPC API server. diff --git a/node/config.go b/node/config.go index d3eb1c78b..94c6e2e56 100644 --- a/node/config.go +++ b/node/config.go @@ -19,6 +19,7 @@ package node import ( "crypto/ecdsa" "encoding/json" + "fmt" "io/ioutil" "net" "os" @@ -97,6 +98,25 @@ type Config struct { // handshake phase, counted separately for inbound and outbound connections. // Zero defaults to preset values. MaxPendingPeers int + + // HttpHost is the host interface on which to start the HTTP RPC server. If this + // field is empty, no HTTP API endpoint will be started. + HttpHost string + + // HttpPort is the TCP port number on which to start the HTTP RPC server. The + // default zero value is/ valid and will pick a port number randomly (useful + // for ephemeral nodes). + HttpPort int + + // HttpCors is the Cross-Origin Resource Sharing header to send to requesting + // clients. Please be aware that CORS is a browser enforced security, it's fully + // useless for custom HTTP clients. + HttpCors string + + // HttpModules is a list of API modules to expose via the HTTP RPC interface. + // If the module list is empty, all RPC API endpoints designated public will be + // exposed. + HttpModules []string } // IpcEndpoint resolves an IPC endpoint based on a configured value, taking into @@ -126,10 +146,25 @@ func (c *Config) IpcEndpoint() string { // DefaultIpcEndpoint returns the IPC path used by default. func DefaultIpcEndpoint() string { - config := &Config{DataDir: common.DefaultDataDir(), IpcPath: common.DefaultIpcSocket()} + config := &Config{DataDir: common.DefaultDataDir(), IpcPath: common.DefaultIpcSocket} return config.IpcEndpoint() } +// HttpEndpoint resolves an HTTP endpoint based on the configured host interface +// and port parameters. +func (c *Config) HttpEndpoint() string { + if c.HttpHost == "" { + return "" + } + return fmt.Sprintf("%s:%d", c.HttpHost, c.HttpPort) +} + +// DefaultHttpEndpoint returns the HTTP endpoint used by default. +func DefaultHttpEndpoint() string { + config := &Config{HttpHost: common.DefaultHttpHost, HttpPort: common.DefaultHttpPort} + return config.HttpEndpoint() +} + // NodeKey retrieves the currently configured private key of the node, checking // first any manually set key, falling back to the one found in the configured // data folder. If no key can be found, a new one is generated. diff --git a/node/node.go b/node/node.go index e3fc03360..44c88d378 100644 --- a/node/node.go +++ b/node/node.go @@ -55,10 +55,17 @@ type Node struct { serviceFuncs []ServiceConstructor // Service constructors (in dependency order) services map[reflect.Type]Service // Currently running services + rpcAPIs []rpc.API // List of APIs currently provided by the node ipcEndpoint string // IPC endpoint to listen at (empty = IPC disabled) ipcListener net.Listener // IPC RPC listener socket to serve API requests ipcHandler *rpc.Server // IPC RPC request handler to process the API requests + httpEndpoint string // HTTP endpoint (interface + port) to listen at (empty = HTTP disabled) + httpWhitelist []string // HTTP RPC modules to allow through this endpoint + httpCors string // HTTP RPC Cross-Origin Resource Sharing header + httpListener net.Listener // HTTP RPC listener socket to server API requests + httpHandler *rpc.Server // HTTP RPC request handler to process the API requests + stop chan struct{} // Channel to wait for termination notifications lock sync.RWMutex } @@ -93,9 +100,12 @@ func New(conf *Config) (*Node, error) { MaxPeers: conf.MaxPeers, MaxPendingPeers: conf.MaxPendingPeers, }, - serviceFuncs: []ServiceConstructor{}, - ipcEndpoint: conf.IpcEndpoint(), - eventmux: new(event.TypeMux), + serviceFuncs: []ServiceConstructor{}, + ipcEndpoint: conf.IpcEndpoint(), + httpEndpoint: conf.HttpEndpoint(), + httpWhitelist: conf.HttpModules, + httpCors: conf.HttpCors, + eventmux: new(event.TypeMux), }, nil } @@ -188,58 +198,146 @@ func (n *Node) Start() error { return nil } -// startRPC initializes and starts the IPC RPC endpoints. +// startRPC is a helper method to start all the various RPC endpoint during node +// startup. It's not meant to be called at any time afterwards as it makes certain +// assumptions about the state of the node. func (n *Node) startRPC(services map[reflect.Type]Service) error { - // Gather and register all the APIs exposed by the services + // Gather all the possible APIs to surface apis := n.apis() for _, service := range services { apis = append(apis, service.APIs()...) } - ipcHandler := rpc.NewServer() - for _, api := range apis { - if err := ipcHandler.RegisterName(api.Namespace, api.Service); err != nil { - return err - } - glog.V(logger.Debug).Infof("Register %T under namespace '%s'", api.Service, api.Namespace) + // Start the various API endpoints, terminating all in case of errors + if err := n.startIPC(apis); err != nil { + return err } - // All APIs registered, start the IPC and HTTP listeners - var ( - ipcListener net.Listener - err error - ) - if n.ipcEndpoint != "" { - if ipcListener, err = rpc.CreateIPCListener(n.ipcEndpoint); err != nil { - return err - } - go func() { - glog.V(logger.Info).Infof("IPC endpoint opened: %s", n.ipcEndpoint) - defer glog.V(logger.Info).Infof("IPC endpoint closed: %s", n.ipcEndpoint) + if err := n.startHTTP(n.httpEndpoint, apis, n.httpWhitelist, n.httpCors); err != nil { + n.stopIPC() + return err + } + // All API endpoints started successfully + n.rpcAPIs = apis + return nil +} - for { - conn, err := ipcListener.Accept() - if err != nil { - // Terminate if the listener was closed - n.lock.RLock() - closed := n.ipcListener == nil - n.lock.RUnlock() - if closed { - return - } - // Not closed, just some error; report and continue - glog.V(logger.Error).Infof("IPC accept failed: %v", err) - continue - } - go ipcHandler.ServeCodec(rpc.NewJSONCodec(conn)) - } - }() +// startIPC initializes and starts the IPC RPC endpoint. +func (n *Node) startIPC(apis []rpc.API) error { + // Short circuit if the IPC endpoint isn't being exposed + if n.ipcEndpoint == "" { + return nil } + // Register all the APIs exposed by the services + handler := rpc.NewServer() + for _, api := range apis { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return err + } + glog.V(logger.Debug).Infof("IPC registered %T under '%s'", api.Service, api.Namespace) + } + // All APIs registered, start the IPC listener + var ( + listener net.Listener + err error + ) + if listener, err = rpc.CreateIPCListener(n.ipcEndpoint); err != nil { + return err + } + go func() { + glog.V(logger.Info).Infof("IPC endpoint opened: %s", n.ipcEndpoint) + + for { + conn, err := listener.Accept() + if err != nil { + // Terminate if the listener was closed + n.lock.RLock() + closed := n.ipcListener == nil + n.lock.RUnlock() + if closed { + return + } + // Not closed, just some error; report and continue + glog.V(logger.Error).Infof("IPC accept failed: %v", err) + continue + } + go handler.ServeCodec(rpc.NewJSONCodec(conn)) + } + }() // All listeners booted successfully - n.ipcListener = ipcListener - n.ipcHandler = ipcHandler + n.ipcListener = listener + n.ipcHandler = handler return nil } +// stopIPC terminates the IPC RPC endpoint. +func (n *Node) stopIPC() { + if n.ipcListener != nil { + n.ipcListener.Close() + n.ipcListener = nil + + glog.V(logger.Info).Infof("IPC endpoint closed: %s", n.ipcEndpoint) + } + if n.ipcHandler != nil { + n.ipcHandler.Stop() + n.ipcHandler = nil + } +} + +// startHTTP initializes and starts the HTTP RPC endpoint. +func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors string) error { + // Short circuit if the IPC endpoint isn't being exposed + if endpoint == "" { + return nil + } + // Generate the whitelist based on the allowed modules + whitelist := make(map[string]bool) + for _, module := range modules { + whitelist[module] = true + } + // Register all the APIs exposed by the services + handler := rpc.NewServer() + for _, api := range apis { + if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return err + } + glog.V(logger.Debug).Infof("HTTP registered %T under '%s'", api.Service, api.Namespace) + } + } + // All APIs registered, start the HTTP listener + var ( + listener net.Listener + err error + ) + if listener, err = net.Listen("tcp", endpoint); err != nil { + return err + } + go rpc.NewHTTPServer(cors, handler).Serve(listener) + glog.V(logger.Info).Infof("HTTP endpoint opened: http://%s", endpoint) + + // All listeners booted successfully + n.httpEndpoint = endpoint + n.httpListener = listener + n.httpHandler = handler + n.httpCors = cors + + return nil +} + +// stopHTTP terminates the HTTP RPC endpoint. +func (n *Node) stopHTTP() { + if n.httpListener != nil { + n.httpListener.Close() + n.httpListener = nil + + glog.V(logger.Info).Infof("HTTP endpoint closed: http://%s", n.httpEndpoint) + } + if n.httpHandler != nil { + n.httpHandler.Stop() + n.httpHandler = nil + } +} + // Stop terminates a running node along with all it's services. In the node was // not started, an error is returned. func (n *Node) Stop() error { @@ -251,14 +349,10 @@ func (n *Node) Stop() error { return ErrNodeStopped } // Otherwise terminate the API, all services and the P2P server too - if n.ipcListener != nil { - n.ipcListener.Close() - n.ipcListener = nil - } - if n.ipcHandler != nil { - n.ipcHandler.Stop() - n.ipcHandler = nil - } + n.stopIPC() + n.stopHTTP() + n.rpcAPIs = nil + failure := &StopError{ Services: make(map[reflect.Type]error), } diff --git a/rpc/http.go b/rpc/http.go index c5eb41af1..e58a88c08 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -20,7 +20,6 @@ import ( "bufio" "bytes" "encoding/json" - "errors" "fmt" "io" "io/ioutil" @@ -29,7 +28,6 @@ import ( "net/url" "strconv" "strings" - "sync" "time" "github.com/ethereum/go-ethereum/logger" @@ -41,12 +39,6 @@ const ( httpReadDeadLine = 60 * time.Second // wait max httpReadDeadeline for next request ) -var ( - httpServerMu sync.Mutex // prevent concurrent access to the httpListener and httpServer - httpListener net.Listener // listener for the http server - httpRPCServer *Server // the node can only start 1 HTTP RPC server instance -) - // httpMessageStream is the glue between a HTTP connection which is message based // and the RPC codecs that expect json requests to be read from a stream. It will // parse HTTP messages and offer the bodies of these requests as a stream through @@ -249,53 +241,14 @@ func (h *httpConnHijacker) ServeHTTP(w http.ResponseWriter, req *http.Request) { go h.rpcServer.ServeCodec(codec) } -// StartHTTP will start the JSONRPC HTTP RPC interface when its not yet running. -func StartHTTP(address string, port int, corsdomains []string, apis []API) error { - httpServerMu.Lock() - defer httpServerMu.Unlock() - - if httpRPCServer != nil { - return fmt.Errorf("HTTP RPC interface already started on %s", httpListener.Addr()) +// NewHTTPServer creates a new HTTP RPC server around an API provider. +func NewHTTPServer(cors string, handler *Server) *http.Server { + return &http.Server{ + Handler: &httpConnHijacker{ + corsdomains: strings.Split(cors, ","), + rpcServer: handler, + }, } - - rpcServer := NewServer() - - for _, api := range apis { - if err := rpcServer.RegisterName(api.Namespace, api.Service); err != nil { - return err - } - } - - listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", address, port)) - if err != nil { - return err - } - - httpServer := http.Server{Handler: &httpConnHijacker{corsdomains, rpcServer}} - go httpServer.Serve(listener) - - httpListener = listener - httpRPCServer = rpcServer - - return nil -} - -// StopHTTP will stop the running HTTP interface. If it is not running an error will be returned. -func StopHTTP() error { - httpServerMu.Lock() - defer httpServerMu.Unlock() - - if httpRPCServer == nil { - return errors.New("HTTP RPC interface not started") - } - - httpListener.Close() - httpRPCServer.Stop() - - httpRPCServer = nil - httpListener = nil - - return nil } // httpClient connects to a geth RPC server over HTTP. diff --git a/rpc/utils.go b/rpc/utils.go index 39acf8196..fa114284d 100644 --- a/rpc/utils.go +++ b/rpc/utils.go @@ -20,13 +20,12 @@ import ( "crypto/rand" "encoding/hex" "errors" + "fmt" "math/big" "reflect" "unicode" "unicode/utf8" - "fmt" - "golang.org/x/net/context" ) From 7486904b92449c5955bb682f4ff98752906912b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 5 Feb 2016 15:08:48 +0200 Subject: [PATCH 2/4] cmd, node, rpc: move websockets into node, break singleton --- cmd/geth/js_test.go | 52 +++++++++--------- cmd/geth/main.go | 21 ++++--- cmd/geth/usage.go | 2 +- cmd/gethrpctest/main.go | 3 + cmd/utils/client.go | 118 ---------------------------------------- cmd/utils/flags.go | 40 +++++--------- node/api.go | 49 ++++++----------- node/config.go | 34 ++++++++++++ node/node.go | 85 ++++++++++++++++++++++++----- rpc/ipc_windows.go | 6 ++ rpc/websocket.go | 76 ++++---------------------- 11 files changed, 194 insertions(+), 292 deletions(-) diff --git a/cmd/geth/js_test.go b/cmd/geth/js_test.go index 19583c5ef..58800ed44 100644 --- a/cmd/geth/js_test.go +++ b/cmd/geth/js_test.go @@ -20,6 +20,7 @@ import ( "fmt" "io/ioutil" "math/big" + "math/rand" "os" "path/filepath" "regexp" @@ -29,6 +30,7 @@ import ( "time" "github.com/ethereum/go-ethereum/accounts" + "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/compiler" "github.com/ethereum/go-ethereum/common/httpclient" @@ -37,22 +39,21 @@ import ( "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/cmd/utils" ) const ( testSolcPath = "" - solcVersion = "0.9.23" + solcVersion = "0.9.23" - testKey = "e6fab74a43941f82d89cb7faa408e227cdad3153c4720e540e855c19b15e6674" + testKey = "e6fab74a43941f82d89cb7faa408e227cdad3153c4720e540e855c19b15e6674" testAddress = "0x8605cdbbdb6d264aa742e77020dcbc58fcdce182" testBalance = "10000000000000000000" -// of empty string + // of empty string testHash = "0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470" ) var ( - versionRE = regexp.MustCompile(strconv.Quote(`"compilerVersion":"` + solcVersion + `"`)) + versionRE = regexp.MustCompile(strconv.Quote(`"compilerVersion":"` + solcVersion + `"`)) testNodeKey = crypto.ToECDSA(common.Hex2Bytes("4b50fa71f5c3eeb8fdc452224b2395af2fcc3d125e06c32c82e048c0559db03f")) testGenesis = `{"` + testAddress[2:] + `": {"balance": "` + testBalance + `"}}` ) @@ -95,7 +96,7 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *nod t.Fatal(err) } // Create a networkless protocol stack - stack, err := node.New(&node.Config{PrivateKey: testNodeKey, Name: "test", NoDiscovery: true}) + stack, err := node.New(&node.Config{PrivateKey: testNodeKey, Name: "test", NoDiscovery: true, IpcPath: fmt.Sprintf("geth-test-%d.ipc", rand.Int63())}) if err != nil { t.Fatalf("failed to create node: %v", err) } @@ -141,8 +142,10 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *nod stack.Service(ðereum) assetPath := filepath.Join(os.Getenv("GOPATH"), "src", "github.com", "ethereum", "go-ethereum", "cmd", "mist", "assets", "ext") - //client := comms.NewInProcClient(codec.JSON) - client := utils.NewInProcRPCClient(stack) + client, err := utils.NewRemoteRPCClientFromString("ipc:" + stack.IpcEndpoint()) + if err != nil { + t.Fatalf("failed to attach to node: %v", err) + } tf := &testjethre{client: ethereum.HTTPClient()} repl := newJSRE(stack, assetPath, "", client, false) tf.jsre = repl @@ -152,9 +155,6 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *nod func TestNodeInfo(t *testing.T) { t.Skip("broken after p2p update") tmp, repl, ethereum := testJEthRE(t) - if err := ethereum.Start(); err != nil { - t.Fatalf("error starting ethereum: %v", err) - } defer ethereum.Stop() defer os.RemoveAll(tmp) @@ -167,8 +167,8 @@ func TestAccounts(t *testing.T) { defer node.Stop() defer os.RemoveAll(tmp) - checkEvalJSON(t, repl, `eth.accounts`, `["` + testAddress + `"]`) - checkEvalJSON(t, repl, `eth.coinbase`, `"` + testAddress + `"`) + checkEvalJSON(t, repl, `eth.accounts`, `["`+testAddress+`"]`) + checkEvalJSON(t, repl, `eth.coinbase`, `"`+testAddress+`"`) val, err := repl.re.Run(`jeth.newAccount("password")`) if err != nil { t.Errorf("expected no error, got %v", err) @@ -178,7 +178,7 @@ func TestAccounts(t *testing.T) { t.Errorf("address not hex: %q", addr) } - checkEvalJSON(t, repl, `eth.accounts`, `["` + testAddress + `","` + addr + `"]`) + checkEvalJSON(t, repl, `eth.accounts`, `["`+testAddress+`","`+addr+`"]`) } @@ -206,13 +206,13 @@ func TestBlockChain(t *testing.T) { node.Service(ðereum) ethereum.BlockChain().Reset() - checkEvalJSON(t, repl, `admin.exportChain(` + tmpfileq + `)`, `true`) + checkEvalJSON(t, repl, `admin.exportChain(`+tmpfileq+`)`, `true`) if _, err := os.Stat(tmpfile); err != nil { t.Fatal(err) } // check import, verify that dumpBlock gives the same result. - checkEvalJSON(t, repl, `admin.importChain(` + tmpfileq + `)`, `true`) + checkEvalJSON(t, repl, `admin.importChain(`+tmpfileq+`)`, `true`) checkEvalJSON(t, repl, `debug.dumpBlock(eth.blockNumber)`, beforeExport) } @@ -240,7 +240,7 @@ func TestCheckTestAccountBalance(t *testing.T) { defer os.RemoveAll(tmp) repl.re.Run(`primary = "` + testAddress + `"`) - checkEvalJSON(t, repl, `eth.getBalance(primary)`, `"` + testBalance + `"`) + checkEvalJSON(t, repl, `eth.getBalance(primary)`, `"`+testBalance+`"`) } func TestSignature(t *testing.T) { @@ -301,11 +301,11 @@ func TestContract(t *testing.T) { */ source := `contract test {\n` + - " /// @notice Will multiply `a` by 7." + `\n` + - ` function multiply(uint a) returns(uint d) {\n` + - ` return a * 7;\n` + - ` }\n` + - `}\n` + " /// @notice Will multiply `a` by 7." + `\n` + + ` function multiply(uint a) returns(uint d) {\n` + + ` return a * 7;\n` + + ` }\n` + + `}\n` if checkEvalJSON(t, repl, `admin.stopNatSpec()`, `true`) != nil { return @@ -315,10 +315,10 @@ func TestContract(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - if checkEvalJSON(t, repl, `primary = eth.accounts[0]`, `"` + testAddress + `"`) != nil { + if checkEvalJSON(t, repl, `primary = eth.accounts[0]`, `"`+testAddress+`"`) != nil { return } - if checkEvalJSON(t, repl, `source = "` + source + `"`, `"` + source + `"`) != nil { + if checkEvalJSON(t, repl, `source = "`+source+`"`, `"`+source+`"`) != nil { return } @@ -396,7 +396,7 @@ multiply7 = Multiply7.at(contractaddress); var contentHash = `"0x86d2b7cf1e72e9a7a3f8d96601f0151742a2f780f1526414304fbe413dc7f9bd"` if sol != nil && solcVersion != sol.Version() { - modContractInfo := versionRE.ReplaceAll(contractInfo, []byte(`"compilerVersion":"` + sol.Version() + `"`)) + modContractInfo := versionRE.ReplaceAll(contractInfo, []byte(`"compilerVersion":"`+sol.Version()+`"`)) fmt.Printf("modified contractinfo:\n%s\n", modContractInfo) contentHash = `"` + common.ToHex(crypto.Sha3([]byte(modContractInfo))) + `"` } @@ -481,7 +481,7 @@ func processTxs(repl *testjethre, t *testing.T, expTxc int) bool { repl.wait <- height select { case <-timer.C: - // if times out make sure the xeth loop does not block + // if times out make sure the xeth loop does not block go func() { select { case repl.wait <- nil: diff --git a/cmd/geth/main.go b/cmd/geth/main.go index a321181a1..61f0632db 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -312,7 +312,7 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso utils.WSListenAddrFlag, utils.WSPortFlag, utils.WSApiFlag, - utils.WSAllowedDomainsFlag, + utils.WSCORSDomainFlag, utils.IPCDisabledFlag, utils.IPCApiFlag, utils.IPCPathFlag, @@ -399,7 +399,7 @@ func attach(ctx *cli.Context) { // attach to a running geth instance client, err := utils.NewRemoteRPCClient(ctx) if err != nil { - utils.Fatalf("Unable to attach to geth - %v", err) + utils.Fatalf("Unable to attach to geth: %v", err) } repl := newLightweightJSRE( @@ -425,8 +425,10 @@ func console(ctx *cli.Context) { startNode(ctx, node) // Attach to the newly started node, and either execute script or become interactive - client := utils.NewInProcRPCClient(node) - + client, err := utils.NewRemoteRPCClientFromString("ipc:" + node.IpcEndpoint()) + if err != nil { + utils.Fatalf("Failed to attach to the inproc geth: %v", err) + } repl := newJSRE(node, ctx.GlobalString(utils.JSpathFlag.Name), ctx.GlobalString(utils.RPCCORSDomainFlag.Name), @@ -449,8 +451,10 @@ func execScripts(ctx *cli.Context) { startNode(ctx, node) // Attach to the newly started node and execute the given scripts - client := utils.NewInProcRPCClient(node) - + client, err := utils.NewRemoteRPCClientFromString("ipc:" + node.IpcEndpoint()) + if err != nil { + utils.Fatalf("Failed to attach to the inproc geth: %v", err) + } repl := newJSRE(node, ctx.GlobalString(utils.JSpathFlag.Name), ctx.GlobalString(utils.RPCCORSDomainFlag.Name), @@ -503,11 +507,6 @@ func startNode(ctx *cli.Context, stack *node.Node) { } } // Start auxiliary services if enabled - if ctx.GlobalBool(utils.WSEnabledFlag.Name) { - if err := utils.StartWS(stack, ctx); err != nil { - utils.Fatalf("Failed to start WS: %v", err) - } - } if ctx.GlobalBool(utils.MiningEnabledFlag.Name) { if err := ethereum.StartMining(ctx.GlobalInt(utils.MinerThreadsFlag.Name), ctx.GlobalString(utils.MiningGPUFlag.Name)); err != nil { utils.Fatalf("Failed to start mining: %v", err) diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 051c51878..e20c67bf8 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -93,7 +93,7 @@ var AppHelpFlagGroups = []flagGroup{ utils.WSListenAddrFlag, utils.WSPortFlag, utils.WSApiFlag, - utils.WSAllowedDomainsFlag, + utils.WSCORSDomainFlag, utils.IPCDisabledFlag, utils.IPCApiFlag, utils.IPCPathFlag, diff --git a/cmd/gethrpctest/main.go b/cmd/gethrpctest/main.go index b0907f8c5..8b54fa2c1 100644 --- a/cmd/gethrpctest/main.go +++ b/cmd/gethrpctest/main.go @@ -97,6 +97,9 @@ func MakeSystemNode(keydir string, privkey string, test *tests.BlockTest) (*node HttpHost: common.DefaultHttpHost, HttpPort: common.DefaultHttpPort, HttpModules: []string{"admin", "db", "eth", "debug", "miner", "net", "shh", "txpool", "personal", "web3"}, + WsHost: common.DefaultWsHost, + WsPort: common.DefaultWsPort, + WsModules: []string{"admin", "db", "eth", "debug", "miner", "net", "shh", "txpool", "personal", "web3"}, NoDiscovery: true, }) if err != nil { diff --git a/cmd/utils/client.go b/cmd/utils/client.go index 40ebcd729..8595cd90b 100644 --- a/cmd/utils/client.go +++ b/cmd/utils/client.go @@ -17,132 +17,14 @@ package utils import ( - "encoding/json" "fmt" - "strings" "github.com/codegangsta/cli" - "github.com/ethereum/go-ethereum/eth" - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/rpc" ) -// NewInProcRPCClient will start a new RPC server for the given node and returns a client to interact with it. -func NewInProcRPCClient(stack *node.Node) *inProcClient { - server := rpc.NewServer() - - offered := stack.APIs() - for _, api := range offered { - server.RegisterName(api.Namespace, api.Service) - } - - web3 := node.NewPublicWeb3API(stack) - server.RegisterName("web3", web3) - - var ethereum *eth.Ethereum - if err := stack.Service(ðereum); err == nil { - net := eth.NewPublicNetAPI(stack.Server(), ethereum.NetVersion()) - server.RegisterName("net", net) - } else { - glog.V(logger.Warn).Infof("%v\n", err) - } - - buf := &buf{ - requests: make(chan []byte), - responses: make(chan []byte), - } - client := &inProcClient{ - server: server, - buf: buf, - } - - go func() { - server.ServeCodec(rpc.NewJSONCodec(client.buf)) - }() - - return client -} - -// buf represents the connection between the RPC server and console -type buf struct { - readBuf []byte // store remaining request bytes after a partial read - requests chan []byte // list with raw serialized requests - responses chan []byte // list with raw serialized responses -} - -// will read the next request in json format -func (b *buf) Read(p []byte) (int, error) { - // last read didn't read entire request, return remaining bytes - if len(b.readBuf) > 0 { - n := copy(p, b.readBuf) - if n < len(b.readBuf) { - b.readBuf = b.readBuf[:n] - } else { - b.readBuf = b.readBuf[:0] - } - return n, nil - } - - // read next request - req := <-b.requests - n := copy(p, req) - if n < len(req) { - // buf too small, store remaining chunk for next read - b.readBuf = req[n:] - } - - return n, nil -} - -// Write send the given buffer to the backend -func (b *buf) Write(p []byte) (n int, err error) { - b.responses <- p - return len(p), nil -} - -// Close cleans up obtained resources. -func (b *buf) Close() error { - close(b.requests) - close(b.responses) - - return nil -} - -// inProcClient starts a RPC server and uses buf to communicate with it. -type inProcClient struct { - server *rpc.Server - buf *buf -} - -// Close will stop the RPC server -func (c *inProcClient) Close() { - c.server.Stop() -} - -// Send a msg to the endpoint -func (c *inProcClient) Send(msg interface{}) error { - d, err := json.Marshal(msg) - if err != nil { - return err - } - c.buf.requests <- d - return nil -} - -// Recv reads a message and tries to parse it into the given msg -func (c *inProcClient) Recv(msg interface{}) error { - data := <-c.buf.responses - return json.Unmarshal(data, &msg) -} - -// Returns the collection of modules the RPC server offers. -func (c *inProcClient) SupportedModules() (map[string]string, error) { - return rpc.SupportedModules(c) -} - // NewRemoteRPCClient returns a RPC client which connects to a running geth instance. // Depending on the given context this can either be a IPC or a HTTP client. func NewRemoteRPCClient(ctx *cli.Context) (rpc.Client, error) { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 40ea29d78..28c692689 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -18,7 +18,6 @@ package utils import ( "crypto/ecdsa" - "errors" "fmt" "io/ioutil" "math" @@ -283,8 +282,8 @@ var ( Usage: "API's offered over the WS-RPC interface", Value: rpc.DefaultHttpRpcApis, } - WSAllowedDomainsFlag = cli.StringFlag{ - Name: "wscors", + WSCORSDomainFlag = cli.StringFlag{ + Name: "wscorsdomain", Usage: "Domains from which to accept websockets requests", Value: "", } @@ -491,6 +490,15 @@ func MakeHttpRpcHost(ctx *cli.Context) string { return ctx.GlobalString(RPCListenAddrFlag.Name) } +// MakeWsRpcHost creates the WebSocket RPC listener interface string from the set +// command line flags, returning empty if the HTTP endpoint is disabled. +func MakeWsRpcHost(ctx *cli.Context) string { + if !ctx.GlobalBool(WSEnabledFlag.Name) { + return "" + } + return ctx.GlobalString(WSListenAddrFlag.Name) +} + // MakeGenesisBlock loads up a genesis block from an input file specified in the // command line, or returns the empty string if none set. func MakeGenesisBlock(ctx *cli.Context) string { @@ -613,6 +621,10 @@ func MakeSystemNode(name, version string, extra []byte, ctx *cli.Context) *node. HttpPort: ctx.GlobalInt(RPCPortFlag.Name), HttpCors: ctx.GlobalString(RPCCORSDomainFlag.Name), HttpModules: strings.Split(ctx.GlobalString(RPCApiFlag.Name), ","), + WsHost: MakeWsRpcHost(ctx), + WsPort: ctx.GlobalInt(WSPortFlag.Name), + WsCors: ctx.GlobalString(WSCORSDomainFlag.Name), + WsModules: strings.Split(ctx.GlobalString(WSApiFlag.Name), ","), } // Configure the Ethereum service accman := MakeAccountManager(ctx) @@ -753,27 +765,5 @@ func MakeChain(ctx *cli.Context) (chain *core.BlockChain, chainDb ethdb.Database if err != nil { Fatalf("Could not start chainmanager: %v", err) } - return chain, chainDb } - -// StartWS starts a websocket JSON-RPC API server. -func StartWS(stack *node.Node, ctx *cli.Context) error { - for _, api := range stack.APIs() { - if adminApi, ok := api.Service.(*node.PrivateAdminAPI); ok { - address := ctx.GlobalString(WSListenAddrFlag.Name) - port := ctx.GlobalInt(WSAllowedDomainsFlag.Name) - allowedDomains := ctx.GlobalString(WSAllowedDomainsFlag.Name) - apiStr := "" - if ctx.GlobalIsSet(WSApiFlag.Name) { - apiStr = ctx.GlobalString(WSApiFlag.Name) - } - - _, err := adminApi.StartWS(address, port, allowedDomains, apiStr) - return err - } - } - - glog.V(logger.Error).Infof("Unable to start RPC-WS interface, could not find admin API") - return errors.New("Unable to start RPC-WS interface") -} diff --git a/node/api.go b/node/api.go index 1b185c6f1..879b33816 100644 --- a/node/api.go +++ b/node/api.go @@ -25,9 +25,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" - "github.com/ethereum/go-ethereum/rpc" "github.com/rcrowley/go-metrics" - "gopkg.in/fatih/set.v0" ) // PrivateAdminAPI is the collection of administrative API methods exposed only @@ -86,44 +84,29 @@ func (api *PrivateAdminAPI) StopRPC() (bool, error) { } // StartWS starts the websocket RPC API server. -func (api *PrivateAdminAPI) StartWS(address string, port int, cors string, apis string) (bool, error) { - var offeredAPIs []rpc.API - if len(apis) > 0 { - namespaces := set.New() - for _, a := range strings.Split(apis, ",") { - namespaces.Add(strings.TrimSpace(a)) - } - for _, api := range api.node.APIs() { - if namespaces.Has(api.Namespace) { - offeredAPIs = append(offeredAPIs, api) - } - } - } else { - // use by default all public API's - for _, api := range api.node.APIs() { - if api.Public { - offeredAPIs = append(offeredAPIs, api) - } - } - } +func (api *PrivateAdminAPI) StartWS(host string, port int, cors string, apis string) (bool, error) { + api.node.lock.Lock() + defer api.node.lock.Unlock() - if address == "" { - address = "127.0.0.1" + if api.node.wsHandler != nil { + return false, fmt.Errorf("WebSocker RPC already running on %s", api.node.wsEndpoint) } - if port == 0 { - port = 8546 + if err := api.node.startWS(fmt.Sprintf("%s:%d", host, port), api.node.rpcAPIs, strings.Split(apis, ","), cors); err != nil { + return false, err } - - corsDomains := strings.Split(cors, " ") - - err := rpc.StartWS(address, port, corsDomains, offeredAPIs) - return err == nil, err + return true, nil } // StopRPC terminates an already running websocket RPC API endpoint. func (api *PrivateAdminAPI) StopWS() (bool, error) { - err := rpc.StopWS() - return err == nil, err + api.node.lock.Lock() + defer api.node.lock.Unlock() + + if api.node.wsHandler == nil { + return false, fmt.Errorf("WebSocket RPC not running") + } + api.node.stopWS() + return true, nil } // PublicAdminAPI is the collection of administrative API methods exposed over diff --git a/node/config.go b/node/config.go index 94c6e2e56..f8252b63a 100644 --- a/node/config.go +++ b/node/config.go @@ -117,6 +117,25 @@ type Config struct { // If the module list is empty, all RPC API endpoints designated public will be // exposed. HttpModules []string + + // WsHost is the host interface on which to start the websocket RPC server. If + // this field is empty, no websocket API endpoint will be started. + WsHost string + + // WsPort is the TCP port number on which to start the websocket RPC server. The + // default zero value is/ valid and will pick a port number randomly (useful for + // ephemeral nodes). + WsPort int + + // WsCors is the Cross-Origin Resource Sharing header to send to requesting clients. + // Please be aware that CORS is a browser enforced security, it's fully useless + // for custom websocket clients. + WsCors string + + // WsModules is a list of API modules to expose via the websocket RPC interface. + // If the module list is empty, all RPC API endpoints designated public will be + // exposed. + WsModules []string } // IpcEndpoint resolves an IPC endpoint based on a configured value, taking into @@ -165,6 +184,21 @@ func DefaultHttpEndpoint() string { return config.HttpEndpoint() } +// WsEndpoint resolves an websocket endpoint based on the configured host interface +// and port parameters. +func (c *Config) WsEndpoint() string { + if c.WsHost == "" { + return "" + } + return fmt.Sprintf("%s:%d", c.WsHost, c.WsPort) +} + +// DefaultWsEndpoint returns the websocket endpoint used by default. +func DefaultWsEndpoint() string { + config := &Config{WsHost: common.DefaultWsHost, WsPort: common.DefaultWsPort} + return config.WsEndpoint() +} + // NodeKey retrieves the currently configured private key of the node, checking // first any manually set key, falling back to the one found in the configured // data folder. If no key can be found, a new one is generated. diff --git a/node/node.go b/node/node.go index 44c88d378..804748b6b 100644 --- a/node/node.go +++ b/node/node.go @@ -66,6 +66,12 @@ type Node struct { httpListener net.Listener // HTTP RPC listener socket to server API requests httpHandler *rpc.Server // HTTP RPC request handler to process the API requests + wsEndpoint string // Websocket endpoint (interface + port) to listen at (empty = websocket disabled) + wsWhitelist []string // Websocket RPC modules to allow through this endpoint + wsCors string // Websocket RPC Cross-Origin Resource Sharing header + wsListener net.Listener // Websocket RPC listener socket to server API requests + wsHandler *rpc.Server // Websocket RPC request handler to process the API requests + stop chan struct{} // Channel to wait for termination notifications lock sync.RWMutex } @@ -105,6 +111,9 @@ func New(conf *Config) (*Node, error) { httpEndpoint: conf.HttpEndpoint(), httpWhitelist: conf.HttpModules, httpCors: conf.HttpCors, + wsEndpoint: conf.WsEndpoint(), + wsWhitelist: conf.WsModules, + wsCors: conf.WsCors, eventmux: new(event.TypeMux), }, nil } @@ -215,6 +224,11 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error { n.stopIPC() return err } + if err := n.startWS(n.wsEndpoint, apis, n.wsWhitelist, n.wsCors); err != nil { + n.stopHTTP() + n.stopIPC() + return err + } // All API endpoints started successfully n.rpcAPIs = apis return nil @@ -285,7 +299,7 @@ func (n *Node) stopIPC() { // startHTTP initializes and starts the HTTP RPC endpoint. func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors string) error { - // Short circuit if the IPC endpoint isn't being exposed + // Short circuit if the HTTP endpoint isn't being exposed if endpoint == "" { return nil } @@ -338,6 +352,61 @@ func (n *Node) stopHTTP() { } } +// startWS initializes and starts the websocket RPC endpoint. +func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, cors string) error { + // Short circuit if the WS endpoint isn't being exposed + if endpoint == "" { + return nil + } + // Generate the whitelist based on the allowed modules + whitelist := make(map[string]bool) + for _, module := range modules { + whitelist[module] = true + } + // Register all the APIs exposed by the services + handler := rpc.NewServer() + for _, api := range apis { + if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return err + } + glog.V(logger.Debug).Infof("WebSocket registered %T under '%s'", api.Service, api.Namespace) + } + } + // All APIs registered, start the HTTP listener + var ( + listener net.Listener + err error + ) + if listener, err = net.Listen("tcp", endpoint); err != nil { + return err + } + go rpc.NewWSServer(cors, handler).Serve(listener) + glog.V(logger.Info).Infof("WebSocket endpoint opened: ws://%s", endpoint) + + // All listeners booted successfully + n.wsEndpoint = endpoint + n.wsListener = listener + n.wsHandler = handler + n.wsCors = cors + + return nil +} + +// stopWS terminates the websocket RPC endpoint. +func (n *Node) stopWS() { + if n.wsListener != nil { + n.wsListener.Close() + n.wsListener = nil + + glog.V(logger.Info).Infof("WebSocket endpoint closed: ws://%s", n.wsEndpoint) + } + if n.wsHandler != nil { + n.wsHandler.Stop() + n.wsHandler = nil + } +} + // Stop terminates a running node along with all it's services. In the node was // not started, an error is returned. func (n *Node) Stop() error { @@ -349,8 +418,9 @@ func (n *Node) Stop() error { return ErrNodeStopped } // Otherwise terminate the API, all services and the P2P server too - n.stopIPC() + n.stopWS() n.stopHTTP() + n.stopIPC() n.rpcAPIs = nil failure := &StopError{ @@ -471,14 +541,3 @@ func (n *Node) apis() []rpc.API { }, } } - -// APIs returns the collection of RPC descriptor this node offers. This method -// is just a quick placeholder passthrough for the RPC update, which in the next -// step will be fully integrated into the node itself. -func (n *Node) APIs() []rpc.API { - apis := n.apis() - for _, api := range n.services { - apis = append(apis, api.APIs()...) - } - return apis -} diff --git a/rpc/ipc_windows.go b/rpc/ipc_windows.go index 1d4672ad2..09b01974e 100644 --- a/rpc/ipc_windows.go +++ b/rpc/ipc_windows.go @@ -239,6 +239,9 @@ func Dial(address string) (*PipeConn, error) { for { conn, err := dial(address, nmpwait_wait_forever) if err == nil { + // Ugly hack working around some async connectivity issues + time.Sleep(100 * time.Millisecond) + return conn, nil } if isPipeNotReady(err) { @@ -360,6 +363,9 @@ func Listen(address string) (*PipeListener, error) { if err != nil { return nil, err } + // Ugly hack working around some async connectivity issues + time.Sleep(100 * time.Millisecond) + return &PipeListener{ addr: PipeAddr(address), handle: handle, diff --git a/rpc/websocket.go b/rpc/websocket.go index b5bcbf4f6..548847602 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -17,13 +17,11 @@ package rpc import ( - "errors" "fmt" - "net" "net/http" - "sync" - "os" + "strings" + "sync" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" @@ -31,12 +29,6 @@ import ( "gopkg.in/fatih/set.v0" ) -var ( - wsServerMu sync.Mutex - wsRPCServer *Server - wsListener net.Listener -) - // wsReaderWriterCloser reads and write payloads from and to a websocket connection. type wsReaderWriterCloser struct { c *websocket.Conn @@ -57,14 +49,6 @@ func (rw *wsReaderWriterCloser) Close() error { return rw.c.Close() } -// wsHandler accepts a websocket connection and handles incoming RPC requests. -// Will return when the websocket connection is closed, either by the client or -// server. -func wsHandler(conn *websocket.Conn) { - rwc := &wsReaderWriterCloser{conn} - wsRPCServer.ServeCodec(NewJSONCodec(rwc)) -} - // wsHandshakeValidator returns a handler that verifies the origin during the // websocket upgrade process. When a '*' is specified as an allowed origins all // connections are accepted. @@ -103,54 +87,16 @@ func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http return f } -// StartWS will start a websocket RPC server on the given address and port. -func StartWS(address string, port int, corsdomains []string, apis []API) error { - wsServerMu.Lock() - defer wsServerMu.Unlock() - - if wsRPCServer != nil { - return fmt.Errorf("WS RPC interface already started on %s", wsListener.Addr()) +// NewWSServer creates a new websocket RPC server around an API provider. +func NewWSServer(cors string, handler *Server) *http.Server { + return &http.Server{ + Handler: websocket.Server{ + Handshake: wsHandshakeValidator(strings.Split(cors, ",")), + Handler: func(conn *websocket.Conn) { + handler.ServeCodec(NewJSONCodec(&wsReaderWriterCloser{conn})) + }, + }, } - - rpcServer := NewServer() - for _, api := range apis { - if err := rpcServer.RegisterName(api.Namespace, api.Service); err != nil { - return err - } - } - - listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", address, port)) - if err != nil { - return err - } - - wsServer := websocket.Server{Handshake: wsHandshakeValidator(corsdomains), Handler: wsHandler} - wsHTTPServer := http.Server{Handler: wsServer} - - go wsHTTPServer.Serve(listener) - - wsListener = listener - wsRPCServer = rpcServer - - return nil -} - -// StopWS stops the running websocket RPC server. -func StopWS() error { - wsServerMu.Lock() - defer wsServerMu.Unlock() - - if wsRPCServer == nil { - return errors.New("HTTP RPC interface not started") - } - - wsListener.Close() - wsRPCServer.Stop() - - wsRPCServer = nil - wsListener = nil - - return nil } // wsClient represents a RPC client that communicates over websockets with a From 900e124beea551ded290f61e7bf85ff6b2e4a29e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 9 Feb 2016 13:24:42 +0200 Subject: [PATCH 3/4] cmd, common, node, rpc: rework naming convention to canonical one --- cmd/geth/js_test.go | 4 +- cmd/geth/main.go | 6 +-- cmd/geth/monitorcmd.go | 2 +- cmd/geth/usage.go | 2 +- cmd/gethrpctest/main.go | 14 +++--- cmd/utils/client.go | 2 +- cmd/utils/flags.go | 52 ++++++++++----------- common/defaults.go | 10 ++-- node/config.go | 100 ++++++++++++++++++++-------------------- node/config_test.go | 6 +-- node/node.go | 24 +++++----- node/node_test.go | 4 +- rpc/server.go | 4 +- 13 files changed, 115 insertions(+), 115 deletions(-) diff --git a/cmd/geth/js_test.go b/cmd/geth/js_test.go index 58800ed44..ed6e5b319 100644 --- a/cmd/geth/js_test.go +++ b/cmd/geth/js_test.go @@ -96,7 +96,7 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *nod t.Fatal(err) } // Create a networkless protocol stack - stack, err := node.New(&node.Config{PrivateKey: testNodeKey, Name: "test", NoDiscovery: true, IpcPath: fmt.Sprintf("geth-test-%d.ipc", rand.Int63())}) + stack, err := node.New(&node.Config{PrivateKey: testNodeKey, Name: "test", NoDiscovery: true, IPCPath: fmt.Sprintf("geth-test-%d.ipc", rand.Int63())}) if err != nil { t.Fatalf("failed to create node: %v", err) } @@ -142,7 +142,7 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *nod stack.Service(ðereum) assetPath := filepath.Join(os.Getenv("GOPATH"), "src", "github.com", "ethereum", "go-ethereum", "cmd", "mist", "assets", "ext") - client, err := utils.NewRemoteRPCClientFromString("ipc:" + stack.IpcEndpoint()) + client, err := utils.NewRemoteRPCClientFromString("ipc:" + stack.IPCEndpoint()) if err != nil { t.Fatalf("failed to attach to node: %v", err) } diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 61f0632db..5c07be3f6 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -312,7 +312,7 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso utils.WSListenAddrFlag, utils.WSPortFlag, utils.WSApiFlag, - utils.WSCORSDomainFlag, + utils.WSAllowedDomainsFlag, utils.IPCDisabledFlag, utils.IPCApiFlag, utils.IPCPathFlag, @@ -425,7 +425,7 @@ func console(ctx *cli.Context) { startNode(ctx, node) // Attach to the newly started node, and either execute script or become interactive - client, err := utils.NewRemoteRPCClientFromString("ipc:" + node.IpcEndpoint()) + client, err := utils.NewRemoteRPCClientFromString("ipc:" + node.IPCEndpoint()) if err != nil { utils.Fatalf("Failed to attach to the inproc geth: %v", err) } @@ -451,7 +451,7 @@ func execScripts(ctx *cli.Context) { startNode(ctx, node) // Attach to the newly started node and execute the given scripts - client, err := utils.NewRemoteRPCClientFromString("ipc:" + node.IpcEndpoint()) + client, err := utils.NewRemoteRPCClientFromString("ipc:" + node.IPCEndpoint()) if err != nil { utils.Fatalf("Failed to attach to the inproc geth: %v", err) } diff --git a/cmd/geth/monitorcmd.go b/cmd/geth/monitorcmd.go index 4d56f2289..ce3a20e8a 100644 --- a/cmd/geth/monitorcmd.go +++ b/cmd/geth/monitorcmd.go @@ -36,7 +36,7 @@ import ( var ( monitorCommandAttachFlag = cli.StringFlag{ Name: "attach", - Value: "ipc:" + node.DefaultIpcEndpoint(), + Value: "ipc:" + node.DefaultIPCEndpoint(), Usage: "API endpoint to attach to", } monitorCommandRowsFlag = cli.IntFlag{ diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index e20c67bf8..051c51878 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -93,7 +93,7 @@ var AppHelpFlagGroups = []flagGroup{ utils.WSListenAddrFlag, utils.WSPortFlag, utils.WSApiFlag, - utils.WSCORSDomainFlag, + utils.WSAllowedDomainsFlag, utils.IPCDisabledFlag, utils.IPCApiFlag, utils.IPCPathFlag, diff --git a/cmd/gethrpctest/main.go b/cmd/gethrpctest/main.go index 8b54fa2c1..38016fb35 100644 --- a/cmd/gethrpctest/main.go +++ b/cmd/gethrpctest/main.go @@ -93,13 +93,13 @@ func main() { func MakeSystemNode(keydir string, privkey string, test *tests.BlockTest) (*node.Node, error) { // Create a networkless protocol stack stack, err := node.New(&node.Config{ - IpcPath: node.DefaultIpcEndpoint(), - HttpHost: common.DefaultHttpHost, - HttpPort: common.DefaultHttpPort, - HttpModules: []string{"admin", "db", "eth", "debug", "miner", "net", "shh", "txpool", "personal", "web3"}, - WsHost: common.DefaultWsHost, - WsPort: common.DefaultWsPort, - WsModules: []string{"admin", "db", "eth", "debug", "miner", "net", "shh", "txpool", "personal", "web3"}, + IPCPath: node.DefaultIPCEndpoint(), + HTTPHost: common.DefaultHTTPHost, + HTTPPort: common.DefaultHTTPPort, + HTTPModules: []string{"admin", "db", "eth", "debug", "miner", "net", "shh", "txpool", "personal", "web3"}, + WSHost: common.DefaultWSHost, + WSPort: common.DefaultWSPort, + WSModules: []string{"admin", "db", "eth", "debug", "miner", "net", "shh", "txpool", "personal", "web3"}, NoDiscovery: true, }) if err != nil { diff --git a/cmd/utils/client.go b/cmd/utils/client.go index 8595cd90b..1144af6f6 100644 --- a/cmd/utils/client.go +++ b/cmd/utils/client.go @@ -33,7 +33,7 @@ func NewRemoteRPCClient(ctx *cli.Context) (rpc.Client, error) { return NewRemoteRPCClientFromString(endpoint) } // use IPC by default - return rpc.NewIPCClient(node.DefaultIpcEndpoint()) + return rpc.NewIPCClient(node.DefaultIPCEndpoint()) } // NewRemoteRPCClientFromString returns a RPC client which connects to the given diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 28c692689..2fc1816af 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -232,12 +232,12 @@ var ( RPCListenAddrFlag = cli.StringFlag{ Name: "rpcaddr", Usage: "HTTP-RPC server listening interface", - Value: common.DefaultHttpHost, + Value: common.DefaultHTTPHost, } RPCPortFlag = cli.IntFlag{ Name: "rpcport", Usage: "HTTP-RPC server listening port", - Value: common.DefaultHttpPort, + Value: common.DefaultHTTPPort, } RPCCORSDomainFlag = cli.StringFlag{ Name: "rpccorsdomain", @@ -247,7 +247,7 @@ var ( RPCApiFlag = cli.StringFlag{ Name: "rpcapi", Usage: "API's offered over the HTTP-RPC interface", - Value: rpc.DefaultHttpRpcApis, + Value: rpc.DefaultHTTPApis, } IPCDisabledFlag = cli.BoolFlag{ Name: "ipcdisable", @@ -256,12 +256,12 @@ var ( IPCApiFlag = cli.StringFlag{ Name: "ipcapi", Usage: "API's offered over the IPC-RPC interface", - Value: rpc.DefaultIpcApis, + Value: rpc.DefaultIPCApis, } IPCPathFlag = DirectoryFlag{ Name: "ipcpath", Usage: "Filename for IPC socket/pipe within the datadir (explicit paths escape it)", - Value: DirectoryString{common.DefaultIpcSocket}, + Value: DirectoryString{common.DefaultIPCSocket}, } WSEnabledFlag = cli.BoolFlag{ Name: "ws", @@ -270,21 +270,21 @@ var ( WSListenAddrFlag = cli.StringFlag{ Name: "wsaddr", Usage: "WS-RPC server listening interface", - Value: common.DefaultWsHost, + Value: common.DefaultWSHost, } WSPortFlag = cli.IntFlag{ Name: "wsport", Usage: "WS-RPC server listening port", - Value: common.DefaultWsPort, + Value: common.DefaultWSPort, } WSApiFlag = cli.StringFlag{ Name: "wsapi", Usage: "API's offered over the WS-RPC interface", - Value: rpc.DefaultHttpRpcApis, + Value: rpc.DefaultHTTPApis, } - WSCORSDomainFlag = cli.StringFlag{ - Name: "wscorsdomain", - Usage: "Domains from which to accept websockets requests", + WSAllowedDomainsFlag = cli.StringFlag{ + Name: "wsdomains", + Usage: "Domains from which to accept websockets requests (can be spoofed)", Value: "", } ExecFlag = cli.StringFlag{ @@ -393,9 +393,9 @@ func MustMakeDataDir(ctx *cli.Context) string { return "" } -// MakeIpcPath creates an IPC path configuration from the set command line flags, +// MakeIPCPath creates an IPC path configuration from the set command line flags, // returning an empty string if IPC was explicitly disabled, or the set path. -func MakeIpcPath(ctx *cli.Context) string { +func MakeIPCPath(ctx *cli.Context) string { if ctx.GlobalBool(IPCDisabledFlag.Name) { return "" } @@ -481,18 +481,18 @@ func MakeNAT(ctx *cli.Context) nat.Interface { return natif } -// MakeHttpRpcHost creates the HTTP RPC listener interface string from the set +// MakeHTTPRpcHost creates the HTTP RPC listener interface string from the set // command line flags, returning empty if the HTTP endpoint is disabled. -func MakeHttpRpcHost(ctx *cli.Context) string { +func MakeHTTPRpcHost(ctx *cli.Context) string { if !ctx.GlobalBool(RPCEnabledFlag.Name) { return "" } return ctx.GlobalString(RPCListenAddrFlag.Name) } -// MakeWsRpcHost creates the WebSocket RPC listener interface string from the set +// MakeWSRpcHost creates the WebSocket RPC listener interface string from the set // command line flags, returning empty if the HTTP endpoint is disabled. -func MakeWsRpcHost(ctx *cli.Context) string { +func MakeWSRpcHost(ctx *cli.Context) string { if !ctx.GlobalBool(WSEnabledFlag.Name) { return "" } @@ -616,15 +616,15 @@ func MakeSystemNode(name, version string, extra []byte, ctx *cli.Context) *node. NAT: MakeNAT(ctx), MaxPeers: ctx.GlobalInt(MaxPeersFlag.Name), MaxPendingPeers: ctx.GlobalInt(MaxPendingPeersFlag.Name), - IpcPath: MakeIpcPath(ctx), - HttpHost: MakeHttpRpcHost(ctx), - HttpPort: ctx.GlobalInt(RPCPortFlag.Name), - HttpCors: ctx.GlobalString(RPCCORSDomainFlag.Name), - HttpModules: strings.Split(ctx.GlobalString(RPCApiFlag.Name), ","), - WsHost: MakeWsRpcHost(ctx), - WsPort: ctx.GlobalInt(WSPortFlag.Name), - WsCors: ctx.GlobalString(WSCORSDomainFlag.Name), - WsModules: strings.Split(ctx.GlobalString(WSApiFlag.Name), ","), + IPCPath: MakeIPCPath(ctx), + HTTPHost: MakeHTTPRpcHost(ctx), + HTTPPort: ctx.GlobalInt(RPCPortFlag.Name), + HTTPCors: ctx.GlobalString(RPCCORSDomainFlag.Name), + HTTPModules: strings.Split(ctx.GlobalString(RPCApiFlag.Name), ","), + WSHost: MakeWSRpcHost(ctx), + WSPort: ctx.GlobalInt(WSPortFlag.Name), + WSDomains: ctx.GlobalString(WSAllowedDomainsFlag.Name), + WSModules: strings.Split(ctx.GlobalString(WSApiFlag.Name), ","), } // Configure the Ethereum service accman := MakeAccountManager(ctx) diff --git a/common/defaults.go b/common/defaults.go index c5a88d7a3..8a136fa80 100644 --- a/common/defaults.go +++ b/common/defaults.go @@ -22,11 +22,11 @@ import ( ) const ( - DefaultIpcSocket = "geth.ipc" // Default (relative) name of the IPC RPC socket - DefaultHttpHost = "localhost" // Default host interface for the HTTP RPC server - DefaultHttpPort = 8545 // Default TCP port for the HTTP RPC server - DefaultWsHost = "localhost" // Default host interface for the websocket RPC server - DefaultWsPort = 8546 // Default TCP port for the websocket RPC server + DefaultIPCSocket = "geth.ipc" // Default (relative) name of the IPC RPC socket + DefaultHTTPHost = "localhost" // Default host interface for the HTTP RPC server + DefaultHTTPPort = 8545 // Default TCP port for the HTTP RPC server + DefaultWSHost = "localhost" // Default host interface for the websocket RPC server + DefaultWSPort = 8546 // Default TCP port for the websocket RPC server ) // DefaultDataDir is the default data directory to use for the databases and other diff --git a/node/config.go b/node/config.go index f8252b63a..301ec636e 100644 --- a/node/config.go +++ b/node/config.go @@ -53,11 +53,11 @@ type Config struct { // in memory. DataDir string - // IpcPath is the requested location to place the IPC endpoint. If the path is + // IPCPath is the requested location to place the IPC endpoint. If the path is // a simple file name, it is placed inside the data directory (or on the root // pipe path on Windows), whereas if it's a resolvable path name (absolute or // relative), then that specific path is enforced. An empty path disables IPC. - IpcPath string + IPCPath string // This field should be a valid secp256k1 private key that will be used for both // remote peer identification as well as network traffic encryption. If no key @@ -99,104 +99,104 @@ type Config struct { // Zero defaults to preset values. MaxPendingPeers int - // HttpHost is the host interface on which to start the HTTP RPC server. If this + // HTTPHost is the host interface on which to start the HTTP RPC server. If this // field is empty, no HTTP API endpoint will be started. - HttpHost string + HTTPHost string - // HttpPort is the TCP port number on which to start the HTTP RPC server. The + // HTTPPort is the TCP port number on which to start the HTTP RPC server. The // default zero value is/ valid and will pick a port number randomly (useful // for ephemeral nodes). - HttpPort int + HTTPPort int - // HttpCors is the Cross-Origin Resource Sharing header to send to requesting + // HTTPCors is the Cross-Origin Resource Sharing header to send to requesting // clients. Please be aware that CORS is a browser enforced security, it's fully // useless for custom HTTP clients. - HttpCors string + HTTPCors string - // HttpModules is a list of API modules to expose via the HTTP RPC interface. + // HTTPModules is a list of API modules to expose via the HTTP RPC interface. // If the module list is empty, all RPC API endpoints designated public will be // exposed. - HttpModules []string + HTTPModules []string - // WsHost is the host interface on which to start the websocket RPC server. If + // WSHost is the host interface on which to start the websocket RPC server. If // this field is empty, no websocket API endpoint will be started. - WsHost string + WSHost string - // WsPort is the TCP port number on which to start the websocket RPC server. The + // WSPort is the TCP port number on which to start the websocket RPC server. The // default zero value is/ valid and will pick a port number randomly (useful for // ephemeral nodes). - WsPort int + WSPort int - // WsCors is the Cross-Origin Resource Sharing header to send to requesting clients. - // Please be aware that CORS is a browser enforced security, it's fully useless - // for custom websocket clients. - WsCors string + // WSDomains is the list of domain to accept websocket requests from. Please be + // aware that the server can only act upon the HTTP request the client sends and + // cannot verify the validity of the request header. + WSDomains string - // WsModules is a list of API modules to expose via the websocket RPC interface. + // WSModules is a list of API modules to expose via the websocket RPC interface. // If the module list is empty, all RPC API endpoints designated public will be // exposed. - WsModules []string + WSModules []string } -// IpcEndpoint resolves an IPC endpoint based on a configured value, taking into +// IPCEndpoint resolves an IPC endpoint based on a configured value, taking into // account the set data folders as well as the designated platform we're currently // running on. -func (c *Config) IpcEndpoint() string { +func (c *Config) IPCEndpoint() string { // Short circuit if IPC has not been enabled - if c.IpcPath == "" { + if c.IPCPath == "" { return "" } // On windows we can only use plain top-level pipes if runtime.GOOS == "windows" { - if strings.HasPrefix(c.IpcPath, `\\.\pipe\`) { - return c.IpcPath + if strings.HasPrefix(c.IPCPath, `\\.\pipe\`) { + return c.IPCPath } - return `\\.\pipe\` + c.IpcPath + return `\\.\pipe\` + c.IPCPath } // Resolve names into the data directory full paths otherwise - if filepath.Base(c.IpcPath) == c.IpcPath { + if filepath.Base(c.IPCPath) == c.IPCPath { if c.DataDir == "" { - return filepath.Join(os.TempDir(), c.IpcPath) + return filepath.Join(os.TempDir(), c.IPCPath) } - return filepath.Join(c.DataDir, c.IpcPath) + return filepath.Join(c.DataDir, c.IPCPath) } - return c.IpcPath + return c.IPCPath } -// DefaultIpcEndpoint returns the IPC path used by default. -func DefaultIpcEndpoint() string { - config := &Config{DataDir: common.DefaultDataDir(), IpcPath: common.DefaultIpcSocket} - return config.IpcEndpoint() +// DefaultIPCEndpoint returns the IPC path used by default. +func DefaultIPCEndpoint() string { + config := &Config{DataDir: common.DefaultDataDir(), IPCPath: common.DefaultIPCSocket} + return config.IPCEndpoint() } -// HttpEndpoint resolves an HTTP endpoint based on the configured host interface +// HTTPEndpoint resolves an HTTP endpoint based on the configured host interface // and port parameters. -func (c *Config) HttpEndpoint() string { - if c.HttpHost == "" { +func (c *Config) HTTPEndpoint() string { + if c.HTTPHost == "" { return "" } - return fmt.Sprintf("%s:%d", c.HttpHost, c.HttpPort) + return fmt.Sprintf("%s:%d", c.HTTPHost, c.HTTPPort) } -// DefaultHttpEndpoint returns the HTTP endpoint used by default. -func DefaultHttpEndpoint() string { - config := &Config{HttpHost: common.DefaultHttpHost, HttpPort: common.DefaultHttpPort} - return config.HttpEndpoint() +// DefaultHTTPEndpoint returns the HTTP endpoint used by default. +func DefaultHTTPEndpoint() string { + config := &Config{HTTPHost: common.DefaultHTTPHost, HTTPPort: common.DefaultHTTPPort} + return config.HTTPEndpoint() } -// WsEndpoint resolves an websocket endpoint based on the configured host interface +// WSEndpoint resolves an websocket endpoint based on the configured host interface // and port parameters. -func (c *Config) WsEndpoint() string { - if c.WsHost == "" { +func (c *Config) WSEndpoint() string { + if c.WSHost == "" { return "" } - return fmt.Sprintf("%s:%d", c.WsHost, c.WsPort) + return fmt.Sprintf("%s:%d", c.WSHost, c.WSPort) } -// DefaultWsEndpoint returns the websocket endpoint used by default. -func DefaultWsEndpoint() string { - config := &Config{WsHost: common.DefaultWsHost, WsPort: common.DefaultWsPort} - return config.WsEndpoint() +// DefaultWSEndpoint returns the websocket endpoint used by default. +func DefaultWSEndpoint() string { + config := &Config{WSHost: common.DefaultWSHost, WSPort: common.DefaultWSPort} + return config.WSEndpoint() } // NodeKey retrieves the currently configured private key of the node, checking diff --git a/node/config_test.go b/node/config_test.go index efb864ce4..45a54d184 100644 --- a/node/config_test.go +++ b/node/config_test.go @@ -63,10 +63,10 @@ func TestDatadirCreation(t *testing.T) { // Tests that IPC paths are correctly resolved to valid endpoints of different // platforms. -func TestIpcPathResolution(t *testing.T) { +func TestIPCPathResolution(t *testing.T) { var tests = []struct { DataDir string - IpcPath string + IPCPath string Windows bool Endpoint string }{ @@ -85,7 +85,7 @@ func TestIpcPathResolution(t *testing.T) { for i, test := range tests { // Only run when platform/test match if (runtime.GOOS == "windows") == test.Windows { - if endpoint := (&Config{DataDir: test.DataDir, IpcPath: test.IpcPath}).IpcEndpoint(); endpoint != test.Endpoint { + if endpoint := (&Config{DataDir: test.DataDir, IPCPath: test.IPCPath}).IPCEndpoint(); endpoint != test.Endpoint { t.Errorf("test %d: IPC endpoint mismatch: have %s, want %s", i, endpoint, test.Endpoint) } } diff --git a/node/node.go b/node/node.go index 804748b6b..6d9290034 100644 --- a/node/node.go +++ b/node/node.go @@ -68,7 +68,7 @@ type Node struct { wsEndpoint string // Websocket endpoint (interface + port) to listen at (empty = websocket disabled) wsWhitelist []string // Websocket RPC modules to allow through this endpoint - wsCors string // Websocket RPC Cross-Origin Resource Sharing header + wsDomains string // Websocket RPC allowed origin domains wsListener net.Listener // Websocket RPC listener socket to server API requests wsHandler *rpc.Server // Websocket RPC request handler to process the API requests @@ -107,13 +107,13 @@ func New(conf *Config) (*Node, error) { MaxPendingPeers: conf.MaxPendingPeers, }, serviceFuncs: []ServiceConstructor{}, - ipcEndpoint: conf.IpcEndpoint(), - httpEndpoint: conf.HttpEndpoint(), - httpWhitelist: conf.HttpModules, - httpCors: conf.HttpCors, - wsEndpoint: conf.WsEndpoint(), - wsWhitelist: conf.WsModules, - wsCors: conf.WsCors, + ipcEndpoint: conf.IPCEndpoint(), + httpEndpoint: conf.HTTPEndpoint(), + httpWhitelist: conf.HTTPModules, + httpCors: conf.HTTPCors, + wsEndpoint: conf.WSEndpoint(), + wsWhitelist: conf.WSModules, + wsDomains: conf.WSDomains, eventmux: new(event.TypeMux), }, nil } @@ -224,7 +224,7 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error { n.stopIPC() return err } - if err := n.startWS(n.wsEndpoint, apis, n.wsWhitelist, n.wsCors); err != nil { + if err := n.startWS(n.wsEndpoint, apis, n.wsWhitelist, n.wsDomains); err != nil { n.stopHTTP() n.stopIPC() return err @@ -388,7 +388,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, cors s n.wsEndpoint = endpoint n.wsListener = listener n.wsHandler = handler - n.wsCors = cors + n.wsDomains = cors return nil } @@ -501,8 +501,8 @@ func (n *Node) DataDir() string { return n.datadir } -// IpcEndpoint retrieves the current IPC endpoint used by the protocol stack. -func (n *Node) IpcEndpoint() string { +// IPCEndpoint retrieves the current IPC endpoint used by the protocol stack. +func (n *Node) IPCEndpoint() string { return n.ipcEndpoint } diff --git a/node/node_test.go b/node/node_test.go index 53dcbcf74..38bfe27e2 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -37,7 +37,7 @@ var ( func testNodeConfig() *Config { return &Config{ - IpcPath: fmt.Sprintf("test-%d.ipc", rand.Int63()), + IPCPath: fmt.Sprintf("test-%d.ipc", rand.Int63()), PrivateKey: testNodeKey, Name: "test node", } @@ -541,7 +541,7 @@ func TestAPIGather(t *testing.T) { defer stack.Stop() // Connect to the RPC server and verify the various registered endpoints - ipcClient, err := rpc.NewIPCClient(stack.IpcEndpoint()) + ipcClient, err := rpc.NewIPCClient(stack.IPCEndpoint()) if err != nil { t.Fatalf("failed to connect to the IPC API server: %v", err) } diff --git a/rpc/server.go b/rpc/server.go index 5b88d843a..f42ee2d37 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -33,8 +33,8 @@ import ( const ( stopPendingRequestTimeout = 3 * time.Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped - DefaultIpcApis = "admin,eth,debug,miner,net,shh,txpool,personal,web3" - DefaultHttpRpcApis = "eth,net,web3" + DefaultIPCApis = "admin,eth,debug,miner,net,shh,txpool,personal,web3" + DefaultHTTPApis = "eth,net,web3" ) // NewServer will create a new server instance with no registered handlers. From df75dbfd6804923b1c8a8388b67523072d59f155 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 9 Feb 2016 14:10:40 +0200 Subject: [PATCH 4/4] cmd, node, rpc: readd inproc RPC client, expose via node --- cmd/geth/js_test.go | 6 +-- cmd/geth/main.go | 4 +- cmd/utils/client.go | 1 - node/api.go | 2 +- node/node.go | 55 +++++++++++++++++++++- node/node_test.go | 12 ++--- rpc/http.go | 2 +- rpc/inproc.go | 111 ++++++++++++++++++++++++++++++++++++++++++++ rpc/ipc.go | 2 +- rpc/ipc_windows.go | 6 --- rpc/websocket.go | 2 +- 11 files changed, 178 insertions(+), 25 deletions(-) create mode 100644 rpc/inproc.go diff --git a/cmd/geth/js_test.go b/cmd/geth/js_test.go index ed6e5b319..4330b484c 100644 --- a/cmd/geth/js_test.go +++ b/cmd/geth/js_test.go @@ -20,7 +20,6 @@ import ( "fmt" "io/ioutil" "math/big" - "math/rand" "os" "path/filepath" "regexp" @@ -30,7 +29,6 @@ import ( "time" "github.com/ethereum/go-ethereum/accounts" - "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/compiler" "github.com/ethereum/go-ethereum/common/httpclient" @@ -96,7 +94,7 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *nod t.Fatal(err) } // Create a networkless protocol stack - stack, err := node.New(&node.Config{PrivateKey: testNodeKey, Name: "test", NoDiscovery: true, IPCPath: fmt.Sprintf("geth-test-%d.ipc", rand.Int63())}) + stack, err := node.New(&node.Config{PrivateKey: testNodeKey, Name: "test", NoDiscovery: true}) if err != nil { t.Fatalf("failed to create node: %v", err) } @@ -142,7 +140,7 @@ func testREPL(t *testing.T, config func(*eth.Config)) (string, *testjethre, *nod stack.Service(ðereum) assetPath := filepath.Join(os.Getenv("GOPATH"), "src", "github.com", "ethereum", "go-ethereum", "cmd", "mist", "assets", "ext") - client, err := utils.NewRemoteRPCClientFromString("ipc:" + stack.IPCEndpoint()) + client, err := stack.Attach() if err != nil { t.Fatalf("failed to attach to node: %v", err) } diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 5c07be3f6..8594d18c5 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -425,7 +425,7 @@ func console(ctx *cli.Context) { startNode(ctx, node) // Attach to the newly started node, and either execute script or become interactive - client, err := utils.NewRemoteRPCClientFromString("ipc:" + node.IPCEndpoint()) + client, err := node.Attach() if err != nil { utils.Fatalf("Failed to attach to the inproc geth: %v", err) } @@ -451,7 +451,7 @@ func execScripts(ctx *cli.Context) { startNode(ctx, node) // Attach to the newly started node and execute the given scripts - client, err := utils.NewRemoteRPCClientFromString("ipc:" + node.IPCEndpoint()) + client, err := node.Attach() if err != nil { utils.Fatalf("Failed to attach to the inproc geth: %v", err) } diff --git a/cmd/utils/client.go b/cmd/utils/client.go index 1144af6f6..3913d007b 100644 --- a/cmd/utils/client.go +++ b/cmd/utils/client.go @@ -51,6 +51,5 @@ func NewRemoteRPCClientFromString(endpoint string) (rpc.Client, error) { if strings.HasPrefix(endpoint, "ws:") { return rpc.NewWSClient(endpoint) } - return nil, fmt.Errorf("invalid endpoint") } diff --git a/node/api.go b/node/api.go index 879b33816..48cbd0150 100644 --- a/node/api.go +++ b/node/api.go @@ -89,7 +89,7 @@ func (api *PrivateAdminAPI) StartWS(host string, port int, cors string, apis str defer api.node.lock.Unlock() if api.node.wsHandler != nil { - return false, fmt.Errorf("WebSocker RPC already running on %s", api.node.wsEndpoint) + return false, fmt.Errorf("WebSocket RPC already running on %s", api.node.wsEndpoint) } if err := api.node.startWS(fmt.Sprintf("%s:%d", host, port), api.node.rpcAPIs, strings.Split(apis, ","), cors); err != nil { return false, err diff --git a/node/node.go b/node/node.go index 6d9290034..7d3a10874 100644 --- a/node/node.go +++ b/node/node.go @@ -55,7 +55,9 @@ type Node struct { serviceFuncs []ServiceConstructor // Service constructors (in dependency order) services map[reflect.Type]Service // Currently running services - rpcAPIs []rpc.API // List of APIs currently provided by the node + rpcAPIs []rpc.API // List of APIs currently provided by the node + inprocHandler *rpc.Server // In-process RPC request handler to process the API requests + ipcEndpoint string // IPC endpoint to listen at (empty = IPC disabled) ipcListener net.Listener // IPC RPC listener socket to serve API requests ipcHandler *rpc.Server // IPC RPC request handler to process the API requests @@ -217,16 +219,22 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error { apis = append(apis, service.APIs()...) } // Start the various API endpoints, terminating all in case of errors + if err := n.startInProc(apis); err != nil { + return err + } if err := n.startIPC(apis); err != nil { + n.stopInProc() return err } if err := n.startHTTP(n.httpEndpoint, apis, n.httpWhitelist, n.httpCors); err != nil { n.stopIPC() + n.stopInProc() return err } if err := n.startWS(n.wsEndpoint, apis, n.wsWhitelist, n.wsDomains); err != nil { n.stopHTTP() n.stopIPC() + n.stopInProc() return err } // All API endpoints started successfully @@ -234,6 +242,28 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error { return nil } +// startInProc initializes an in-process RPC endpoint. +func (n *Node) startInProc(apis []rpc.API) error { + // Register all the APIs exposed by the services + handler := rpc.NewServer() + for _, api := range apis { + if err := handler.RegisterName(api.Namespace, api.Service); err != nil { + return err + } + glog.V(logger.Debug).Infof("InProc registered %T under '%s'", api.Service, api.Namespace) + } + n.inprocHandler = handler + return nil +} + +// stopInProc terminates the in-process RPC endpoint. +func (n *Node) stopInProc() { + if n.inprocHandler != nil { + n.inprocHandler.Stop() + n.inprocHandler = nil + } +} + // startIPC initializes and starts the IPC RPC endpoint. func (n *Node) startIPC(apis []rpc.API) error { // Short circuit if the IPC endpoint isn't being exposed @@ -468,6 +498,19 @@ func (n *Node) Restart() error { return nil } +// Attach creates an RPC client attached to an in-process API handler. +func (n *Node) Attach() (rpc.Client, error) { + n.lock.RLock() + defer n.lock.RUnlock() + + // Short circuit if the node's not running + if n.server == nil { + return nil, ErrNodeStopped + } + // Otherwise attach to the API and return + return rpc.NewInProcRPCClient(n.inprocHandler), nil +} + // Server retrieves the currently running P2P network layer. This method is meant // only to inspect fields of the currently running server, life cycle management // should be left to this Node entity. @@ -506,6 +549,16 @@ func (n *Node) IPCEndpoint() string { return n.ipcEndpoint } +// HTTPEndpoint retrieves the current HTTP endpoint used by the protocol stack. +func (n *Node) HTTPEndpoint() string { + return n.httpEndpoint +} + +// WSEndpoint retrieves the current WS endpoint used by the protocol stack. +func (n *Node) WSEndpoint() string { + return n.wsEndpoint +} + // EventMux retrieves the event multiplexer used by all the network services in // the current protocol stack. func (n *Node) EventMux() *event.TypeMux { diff --git a/node/node_test.go b/node/node_test.go index 38bfe27e2..532115d3c 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -18,9 +18,7 @@ package node import ( "errors" - "fmt" "io/ioutil" - "math/rand" "os" "reflect" "testing" @@ -37,7 +35,6 @@ var ( func testNodeConfig() *Config { return &Config{ - IPCPath: fmt.Sprintf("test-%d.ipc", rand.Int63()), PrivateKey: testNodeKey, Name: "test node", } @@ -541,10 +538,11 @@ func TestAPIGather(t *testing.T) { defer stack.Stop() // Connect to the RPC server and verify the various registered endpoints - ipcClient, err := rpc.NewIPCClient(stack.IPCEndpoint()) + client, err := stack.Attach() if err != nil { - t.Fatalf("failed to connect to the IPC API server: %v", err) + t.Fatalf("failed to connect to the inproc API server: %v", err) } + defer client.Close() tests := []struct { Method string @@ -556,11 +554,11 @@ func TestAPIGather(t *testing.T) { {"multi.v2.nested_theOneMethod", "multi.v2.nested"}, } for i, test := range tests { - if err := ipcClient.Send(rpc.JSONRequest{Id: new(int64), Version: "2.0", Method: test.Method}); err != nil { + if err := client.Send(rpc.JSONRequest{Id: new(int64), Version: "2.0", Method: test.Method}); err != nil { t.Fatalf("test %d: failed to send API request: %v", i, err) } reply := new(rpc.JSONSuccessResponse) - if err := ipcClient.Recv(reply); err != nil { + if err := client.Recv(reply); err != nil { t.Fatalf("test %d: failed to read API reply: %v", i, err) } select { diff --git a/rpc/http.go b/rpc/http.go index e58a88c08..d9053b003 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -259,7 +259,7 @@ type httpClient struct { // NewHTTPClient create a new RPC clients that connection to a geth RPC server // over HTTP. -func NewHTTPClient(endpoint string) (*httpClient, error) { +func NewHTTPClient(endpoint string) (Client, error) { url, err := url.Parse(endpoint) if err != nil { return nil, err diff --git a/rpc/inproc.go b/rpc/inproc.go new file mode 100644 index 000000000..e138ba2c3 --- /dev/null +++ b/rpc/inproc.go @@ -0,0 +1,111 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import "encoding/json" + +// NewInProcRPCClient creates an in-process buffer stream attachment to a given +// RPC server. +func NewInProcRPCClient(handler *Server) Client { + buffer := &inprocBuffer{ + requests: make(chan []byte, 16), + responses: make(chan []byte, 16), + } + client := &inProcClient{ + server: handler, + buffer: buffer, + } + go handler.ServeCodec(NewJSONCodec(client.buffer)) + return client +} + +// inProcClient is an in-process buffer stream attached to an RPC server. +type inProcClient struct { + server *Server + buffer *inprocBuffer +} + +// Close tears down the request channel of the in-proc client. +func (c *inProcClient) Close() { + c.buffer.Close() +} + +// Send marshals a message into a json format and injects in into the client +// request channel. +func (c *inProcClient) Send(msg interface{}) error { + d, err := json.Marshal(msg) + if err != nil { + return err + } + c.buffer.requests <- d + return nil +} + +// Recv reads a message from the response channel and tries to parse it into the +// given msg interface. +func (c *inProcClient) Recv(msg interface{}) error { + data := <-c.buffer.responses + return json.Unmarshal(data, &msg) +} + +// Returns the collection of modules the RPC server offers. +func (c *inProcClient) SupportedModules() (map[string]string, error) { + return SupportedModules(c) +} + +// inprocBuffer represents the connection between the RPC server and console +type inprocBuffer struct { + readBuf []byte // store remaining request bytes after a partial read + requests chan []byte // list with raw serialized requests + responses chan []byte // list with raw serialized responses +} + +// Read will read the next request in json format. +func (b *inprocBuffer) Read(p []byte) (int, error) { + // last read didn't read entire request, return remaining bytes + if len(b.readBuf) > 0 { + n := copy(p, b.readBuf) + if n < len(b.readBuf) { + b.readBuf = b.readBuf[:n] + } else { + b.readBuf = b.readBuf[:0] + } + return n, nil + } + // read next request + req := <-b.requests + n := copy(p, req) + if n < len(req) { + // inprocBuffer too small, store remaining chunk for next read + b.readBuf = req[n:] + } + return n, nil +} + +// Write sends the given buffer to the backend. +func (b *inprocBuffer) Write(p []byte) (n int, err error) { + b.responses <- p + return len(p), nil +} + +// Close cleans up obtained resources. +func (b *inprocBuffer) Close() error { + close(b.requests) + close(b.responses) + + return nil +} diff --git a/rpc/ipc.go b/rpc/ipc.go index b87bfcbd7..05d8909ca 100644 --- a/rpc/ipc.go +++ b/rpc/ipc.go @@ -38,7 +38,7 @@ type ipcClient struct { // NewIPCClient create a new IPC client that will connect on the given endpoint. Messages are JSON encoded and encoded. // On Unix it assumes the endpoint is the full path to a unix socket, and Windows the endpoint is an identifier for a // named pipe. -func NewIPCClient(endpoint string) (*ipcClient, error) { +func NewIPCClient(endpoint string) (Client, error) { conn, err := newIPCConnection(endpoint) if err != nil { return nil, err diff --git a/rpc/ipc_windows.go b/rpc/ipc_windows.go index 09b01974e..1d4672ad2 100644 --- a/rpc/ipc_windows.go +++ b/rpc/ipc_windows.go @@ -239,9 +239,6 @@ func Dial(address string) (*PipeConn, error) { for { conn, err := dial(address, nmpwait_wait_forever) if err == nil { - // Ugly hack working around some async connectivity issues - time.Sleep(100 * time.Millisecond) - return conn, nil } if isPipeNotReady(err) { @@ -363,9 +360,6 @@ func Listen(address string) (*PipeListener, error) { if err != nil { return nil, err } - // Ugly hack working around some async connectivity issues - time.Sleep(100 * time.Millisecond) - return &PipeListener{ addr: PipeAddr(address), handle: handle, diff --git a/rpc/websocket.go b/rpc/websocket.go index 548847602..92615494e 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -109,7 +109,7 @@ type wsClient struct { // NewWSClientj creates a new RPC client that communicates with a RPC server // that is listening on the given endpoint using JSON encoding. -func NewWSClient(endpoint string) (*wsClient, error) { +func NewWSClient(endpoint string) (Client, error) { return &wsClient{endpoint: endpoint}, nil }