diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f15507c..6fcab4b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,9 @@ BUG FIXES already in the validator set. * [consensus] Shut down WAL properly. +BUG FIXES: +- [abci] Fix #1891, pending requests cannot hang when abci server dies. Previously a crash in BeginBlock could leave tendermint in broken state. + ## 0.22.0 *July 2nd, 2018* diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index c3f88725..affea1a9 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -357,6 +357,13 @@ func (cli *socketClient) queueRequest(req *types.Request) *ReqRes { } func (cli *socketClient) flushQueue() { + // mark all in-flight messages as resolved (they will get cli.Error()) + for req := cli.reqSent.Front(); req != nil; req = req.Next() { + reqres := req.Value.(*ReqRes) + reqres.Done() + } + + // mark all queued messages as resolved LOOP: for { select { diff --git a/abci/client/socket_client_test.go b/abci/client/socket_client_test.go index 5a9187fb..251a63f6 100644 --- a/abci/client/socket_client_test.go +++ b/abci/client/socket_client_test.go @@ -2,10 +2,17 @@ package abcicli_test import ( "errors" + "fmt" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/abci/client" + "github.com/tendermint/tendermint/abci/server" + "github.com/tendermint/tendermint/abci/types" + cmn "github.com/tendermint/tendermint/libs/common" ) func TestSocketClientStopForErrorDeadlock(t *testing.T) { @@ -26,3 +33,94 @@ func TestSocketClientStopForErrorDeadlock(t *testing.T) { t.Fatalf("Test took too long, potential deadlock still exists") } } + +func TestProperSyncCalls(t *testing.T) { + app := slowApp{} + + s, c := setupClientServer(t, app) + defer s.Stop() + defer c.Stop() + + resp := make(chan error, 1) + go func() { + // This is BeginBlockSync unrolled.... + reqres := c.BeginBlockAsync(types.RequestBeginBlock{}) + c.FlushSync() + res := reqres.Response.GetBeginBlock() + require.NotNil(t, res) + resp <- c.Error() + }() + + select { + case <-time.After(time.Second): + require.Fail(t, "No response arrived") + case err, ok := <-resp: + require.True(t, ok, "Must not close channel") + assert.NoError(t, err, "This should return success") + } +} + +func TestHangingSyncCalls(t *testing.T) { + app := slowApp{} + + s, c := setupClientServer(t, app) + defer s.Stop() + defer c.Stop() + + resp := make(chan error, 1) + go func() { + // Start BeginBlock and flush it + reqres := c.BeginBlockAsync(types.RequestBeginBlock{}) + flush := c.FlushAsync() + // wait 20 ms for all events to travel socket, but + // no response yet from server + time.Sleep(20 * time.Millisecond) + // kill the server, so the connections break + s.Stop() + + // wait for the response from BeginBlock + fmt.Println("waiting for begin block") + reqres.Wait() + fmt.Println("waiting for flush") + flush.Wait() + fmt.Println("got all responses") + // res := reqres.Response.GetBeginBlock() + // require.NotNil(t, res) + resp <- c.Error() + }() + + select { + case <-time.After(time.Second): + require.Fail(t, "No response arrived") + case err, ok := <-resp: + require.True(t, ok, "Must not close channel") + assert.Error(t, err, "We should get EOF error") + } +} + +func setupClientServer(t *testing.T, app types.Application) ( + cmn.Service, abcicli.Client) { + // some port between 20k and 30k + port := 20000 + cmn.RandInt32()%10000 + addr := fmt.Sprintf("localhost:%d", port) + + s, err := server.NewServer(addr, "socket", app) + require.NoError(t, err) + err = s.Start() + require.NoError(t, err) + + c := abcicli.NewSocketClient(addr, true) + err = c.Start() + require.NoError(t, err) + + return s, c +} + +type slowApp struct { + types.BaseApplication +} + +func (slowApp) BeginBlock(req types.RequestBeginBlock) types.ResponseBeginBlock { + time.Sleep(200 * time.Millisecond) + return types.ResponseBeginBlock{} +}