Merge pull request #51 from poanetwork/dzol.49.update-dashboard-regularly
Poll for Ethereum node status regularly and send information to server
This commit is contained in:
commit
527bd15a18
|
@ -26,6 +26,7 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|
|||
current_backoff: 1,
|
||||
client: nil,
|
||||
ping_timer_ref: nil,
|
||||
hello_timer_ref: nil,
|
||||
last_block: nil,
|
||||
last_metrics: %{}
|
||||
]
|
||||
|
@ -33,6 +34,7 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|
|||
|
||||
@ping_frequency 3_000
|
||||
@backoff_ceiling 32
|
||||
@hello_frequency 60
|
||||
|
||||
def init_transfer(configuration) do
|
||||
false = Process.flag(:trap_exit, true)
|
||||
|
@ -86,11 +88,12 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|
|||
case Primus.Client.start_link(address, state) do
|
||||
{:ok, client} ->
|
||||
set_up_and_send_hello(client, state)
|
||||
hello_timer_ref = set_hello_timer(seconds: @hello_frequency)
|
||||
ping_timer_ref = set_ping_timer()
|
||||
|
||||
:ok = send_last_metrics(client, state)
|
||||
|
||||
{:ok, %{state | connected?: true, client: client, current_backoff: 1, ping_timer_ref: ping_timer_ref}}
|
||||
{:ok, %{state | connected?: true, client: client, current_backoff: 1, ping_timer_ref: ping_timer_ref, hello_timer_ref: hello_timer_ref}}
|
||||
{:error, reason} ->
|
||||
Logger.warn("Connection refused because: #{inspect reason}")
|
||||
{:ok, %{state | connected?: false, client: nil}}
|
||||
|
@ -111,14 +114,24 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|
|||
{:ok, %{state | ping_timer_ref: ping_timer_ref}}
|
||||
end
|
||||
|
||||
def handle_message(:sample_and_send_hello, state) do
|
||||
set_up_and_send_hello(state.client, state)
|
||||
hello_timer_ref = set_hello_timer(seconds: @hello_frequency)
|
||||
{:ok, %{state | hello_timer_ref: hello_timer_ref}}
|
||||
end
|
||||
|
||||
def handle_message({:EXIT, _pid, _reason}, state) do
|
||||
case state.ping_timer_ref do
|
||||
nil -> :continue
|
||||
_ -> Process.cancel_timer(state.ping_timer_ref)
|
||||
end
|
||||
case state.hello_timer_ref do
|
||||
nil -> :continue
|
||||
_ -> Process.cancel_timer(state.hello_timer_ref)
|
||||
end
|
||||
new_backoff = backoff(state.current_backoff, @backoff_ceiling)
|
||||
set_connection_attempt_timer(new_backoff)
|
||||
{:ok, %{state | current_backoff: new_backoff + 1, connected?: false, client: nil}}
|
||||
{:ok, %{state | current_backoff: new_backoff + 1, connected?: false, client: nil, ping_timer_ref: nil, hello_timer_ref: nil}}
|
||||
end
|
||||
|
||||
def terminate(_) do
|
||||
|
@ -206,6 +219,10 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|
|||
Process.send_after(self(), :attempt_to_connect, backoff_time * 1000)
|
||||
end
|
||||
|
||||
defp set_hello_timer(seconds: s) do
|
||||
Process.send_after(self(), :sample_and_send_hello, s * 1000)
|
||||
end
|
||||
|
||||
defp backoff(backoff, ceiling) do
|
||||
case (:math.pow(2, backoff) - 1) do
|
||||
result when result > ceiling ->
|
||||
|
|
|
@ -39,6 +39,68 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do
|
|||
end
|
||||
end
|
||||
|
||||
@tag timeout: 500 + 60_500
|
||||
test "sending the information (hello) messages more than once" do
|
||||
args = %{name: :primus_dashboard, args: []}
|
||||
test_pid = self()
|
||||
|
||||
with_mocks([
|
||||
{WebSockex,
|
||||
[],
|
||||
[
|
||||
send_frame: fn(to, {:text, message}) ->
|
||||
send(to, message)
|
||||
:ok
|
||||
end,
|
||||
start_link: fn(_, _, _) -> {:ok, test_pid} end
|
||||
]},
|
||||
{Ethereumex.HttpClient,
|
||||
[],
|
||||
[
|
||||
eth_coinbase: fn() -> {:ok, "0x0"} end,
|
||||
eth_protocol_version: fn() -> {:ok, "15"} end,
|
||||
web3_client_version: fn() -> {:ok, "0x0"} end,
|
||||
net_version: fn() -> {:ok, "0x0"} end
|
||||
]}]) do
|
||||
{:ok, _pid} = Primus.start_link(args)
|
||||
|
||||
delta = 500
|
||||
assert_receive "{\"emit\":[\"hello\"" <> _, delta
|
||||
assert_receive "{\"emit\":[\"hello\"" <> _, 60_000 + delta
|
||||
end
|
||||
end
|
||||
|
||||
@tag timeout: 500 + 60_500
|
||||
test "information messages change to reflect node changes" do
|
||||
args = %{name: :primus_dashboard, args: []}
|
||||
test_pid = self()
|
||||
|
||||
with_mocks([
|
||||
{WebSockex,
|
||||
[],
|
||||
[
|
||||
send_frame: fn(to, {:text, message}) ->
|
||||
send(to, message)
|
||||
:ok
|
||||
end,
|
||||
start_link: fn(_, _, _) -> {:ok, test_pid} end
|
||||
]},
|
||||
{Ethereumex.HttpClient,
|
||||
[],
|
||||
[
|
||||
eth_coinbase: fn() -> {:ok, "0x0"} end,
|
||||
eth_protocol_version: fn() -> {:ok, "15"} end,
|
||||
web3_client_version: fn() -> {:ok, "0x0" <> Float.to_string(:rand.uniform())} end,
|
||||
net_version: fn() -> {:ok, "0x0"} end
|
||||
]}]) do
|
||||
{:ok, _pid} = Primus.start_link(args)
|
||||
|
||||
assert_receive x = "{\"emit\":[\"hello\"" <> _, :infinity
|
||||
assert_receive y = "{\"emit\":[\"hello\"" <> _, :infinity
|
||||
assert x != y
|
||||
end
|
||||
end
|
||||
|
||||
test "sending the stats message" do
|
||||
args = %{name: :primus_dashboard, args: []}
|
||||
test_pid = self()
|
||||
|
|
Loading…
Reference in New Issue