client: ResultsCh chan json.RawMessage, ErrorsCh

This commit is contained in:
Ethan Buchman 2016-01-13 21:21:16 -05:00
parent aff561d8c3
commit 91c734d02e
1 changed files with 15 additions and 6 deletions

View File

@ -2,6 +2,7 @@ package rpcclient
import ( import (
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"time" "time"
@ -12,6 +13,7 @@ import (
const ( const (
wsResultsChannelCapacity = 10 wsResultsChannelCapacity = 10
wsErrorsChannelCapacity = 1
wsWriteTimeoutSeconds = 10 wsWriteTimeoutSeconds = 10
) )
@ -19,7 +21,8 @@ type WSClient struct {
QuitService QuitService
Address string Address string
*websocket.Conn *websocket.Conn
ResultsCh chan rpctypes.Result // closes upon WSClient.Stop() ResultsCh chan json.RawMessage // closes upon WSClient.Stop()
ErrorsCh chan error // closes upon WSClient.Stop()
} }
// create a new connection // create a new connection
@ -27,7 +30,8 @@ func NewWSClient(addr string) *WSClient {
wsClient := &WSClient{ wsClient := &WSClient{
Address: addr, Address: addr,
Conn: nil, Conn: nil,
ResultsCh: make(chan rpctypes.Result, wsResultsChannelCapacity), ResultsCh: make(chan json.RawMessage, wsResultsChannelCapacity),
ErrorsCh: make(chan error, wsErrorsChannelCapacity),
} }
wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient) wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient)
return wsClient return wsClient
@ -67,7 +71,7 @@ func (wsc *WSClient) dial() error {
func (wsc *WSClient) OnStop() { func (wsc *WSClient) OnStop() {
wsc.QuitService.OnStop() wsc.QuitService.OnStop()
// ResultsCh is closed in receiveEventsRoutine. // ResultsCh/ErrorsCh is closed in receiveEventsRoutine.
} }
func (wsc *WSClient) receiveEventsRoutine() { func (wsc *WSClient) receiveEventsRoutine() {
@ -82,15 +86,20 @@ func (wsc *WSClient) receiveEventsRoutine() {
err := json.Unmarshal(data, &response) err := json.Unmarshal(data, &response)
if err != nil { if err != nil {
log.Info("WSClient failed to parse message", "error", err, "data", string(data)) log.Info("WSClient failed to parse message", "error", err, "data", string(data))
wsc.Stop() wsc.ErrorsCh <- err
break continue
} }
wsc.ResultsCh <- response.Result if response.Error != "" {
wsc.ErrorsCh <- fmt.Errorf(err.Error())
continue
}
wsc.ResultsCh <- *response.Result
} }
} }
// Cleanup // Cleanup
close(wsc.ResultsCh) close(wsc.ResultsCh)
close(wsc.ErrorsCh)
} }
// subscribe to an event // subscribe to an event