From 37a221b9854847d602fb5a18a72ba7edaed18d95 Mon Sep 17 00:00:00 2001 From: Felipe Ripoll Date: Fri, 18 May 2018 15:01:05 -0600 Subject: [PATCH] [#23] Exponential backoff in place --- config/test.exs | 28 +++---- .../plugins/transfers/web_socket/primus.ex | 79 +++++-------------- test/poa_agent/transfers/primus_test.exs | 70 ++++++++++++++-- 3 files changed, 96 insertions(+), 81 deletions(-) diff --git a/config/test.exs b/config/test.exs index 064fa93..8c65e7c 100644 --- a/config/test.exs +++ b/config/test.exs @@ -7,30 +7,30 @@ config :ethereumex, config :poa_agent, :collectors, [ - {:eth_latest_block, POAAgent.Plugins.Collectors.Eth.LatestBlock, 500, :latest_block, [url: "http://localhost:8545"]}, - {:eth_stats, POAAgent.Plugins.Collectors.Eth.Stats, 5000, :eth_stats, [url: "http://localhost:8545"]}, - {:eth_pending, POAAgent.Plugins.Collectors.Eth.Pending, 500, :eth_pending, [url: "http://localhost:8545"]} + # {:eth_latest_block, POAAgent.Plugins.Collectors.Eth.LatestBlock, 500, :latest_block, [url: "http://localhost:8545"]}, + # {:eth_stats, POAAgent.Plugins.Collectors.Eth.Stats, 5000, :eth_stats, [url: "http://localhost:8545"]}, + # {:eth_pending, POAAgent.Plugins.Collectors.Eth.Pending, 500, :eth_pending, [url: "http://localhost:8545"]} ] # configuration for transfers. The format for each collector is {collector_process_id, module, args} config :poa_agent, :transfers, [ - {:node_integration, POAAgent.Plugins.Transfers.WebSocket.Primus, [ - address: "ws://localhost:3000/api", - identifier: "elixirNodeJSIntegration", - name: "Elixir-NodeJS-Integration", - secret: "Fr00b5", - contact: "mymail@mail.com" - ] - } + # {:node_integration, POAAgent.Plugins.Transfers.WebSocket.Primus, [ + # address: "ws://localhost:3000/api", + # identifier: "elixirNodeJSIntegration", + # name: "Elixir-NodeJS-Integration", + # secret: "Fr00b5", + # contact: "mymail@mail.com" + # ] + # } ] # configuration for mappings. This relates one collector with a list of transfers which the data will be sent config :poa_agent, :mappings, [ - {:eth_latest_block, [:node_integration]}, - {:eth_stats, [:node_integration]}, - {:eth_pending, [:node_integration]} + # {:eth_latest_block, [:node_integration]}, + # {:eth_stats, [:node_integration]}, + # {:eth_pending, [:node_integration]} ] diff --git a/lib/poa_agent/plugins/transfers/web_socket/primus.ex b/lib/poa_agent/plugins/transfers/web_socket/primus.ex index 44c9279..e69ef24 100644 --- a/lib/poa_agent/plugins/transfers/web_socket/primus.ex +++ b/lib/poa_agent/plugins/transfers/web_socket/primus.ex @@ -36,7 +36,6 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do false = Process.flag(:trap_exit, true) state = struct(Primus.State, configuration) set_connection_attempt_timer(0) - IO.puts("inside init_transfere :)") {:ok, state} end @@ -45,14 +44,14 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do {:ok, state} end - def data_received(label, data, %{client: client, context: context} = state) when is_list(data) do + def data_received(label, data, %{client: client} = state) when is_list(data) do require Logger Logger.info("Received data from the collector referenced by label: #{label}.") :ok = Enum.each(data, fn(message) -> event = message - |> Primus.encode(context) + |> Primus.encode(state) |> Jason.encode!() :ok = Primus.Client.send(client, event) end) @@ -66,46 +65,39 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do def handle_message(:attempt_to_connect, state) do address = Map.fetch!(state, :address) - case Primus.Client.start_link(address) do + case Primus.Client.start_link(address, state) do {:ok, client} -> - IO.puts("attempt to connect succeeded") set_up_and_send_hello(client, state) - IO.puts("between 73") ping_timer_ref = set_ping_timer() - |> Process.read_timer() - |> IO.inspect() - IO.puts("between 75") - x = %{state | connected?: true, client: client, current_backoff: 1, ping_timer_ref: ping_timer_ref} - IO.inspect(x) {:ok, %{state | connected?: true, client: client, current_backoff: 1, ping_timer_ref: ping_timer_ref}} {:error, reason} -> - IO.puts("attempt to connect failed") Logger.warn("Connection refused because: #{inspect reason}") - new_backoff = backoff(state.current_backoff, @backoff_ceiling) - set_connection_attempt_timer(new_backoff) - {:ok, %{state | connected?: false, current_backoff: new_backoff, client: nil}} + {:ok, %{state | connected?: false, client: nil}} end end - def handle_message(:ping, %{client: client, context: context} = state) do - IO.puts("got a ping message") + def handle_message(:ping, %{client: client} = state) do event = %{} - |> Map.put(:id, context.identifier) + |> Map.put(:id, state.identifier) |> Map.put(:clientTime, POAAgent.Utils.system_time()) |> POAAgent.Format.PrimusEmitter.wrap(event: "node-ping") |> Jason.encode!() :ok = Primus.Client.send(client, event) - set_ping_timer() + ping_timer_ref = set_ping_timer() - {:ok, state} + {:ok, %{state | ping_timer_ref: ping_timer_ref}} end def handle_message({:EXIT, _pid, _reason}, state) do - Process.cancel_timer(state.ping_timer_ref) - set_connection_attempt_timer(0) - {:ok, %{state | connected?: false}} + case state.ping_timer_ref do + nil -> :continue + _ -> Process.cancel_timer(state.ping_timer_ref) + end + new_backoff = backoff(state.current_backoff, @backoff_ceiling) + set_connection_attempt_timer(new_backoff) + {:ok, %{state | current_backoff: new_backoff + 1, connected?: false, client: nil}} end def terminate(_) do @@ -199,17 +191,15 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do end end - defp set_up_and_send_hello(client, context) do + defp set_up_and_send_hello(client, state) do event = information() - |> Primus.encode(context) + |> Primus.encode(state) |> Jason.encode!() :ok = Primus.Client.send(client, event) - IO.puts("set up and sent hello was done") end defmodule Client do @moduledoc false - @max_backoff 16 alias POAAgent.Entity.Ethereum.Block @@ -219,33 +209,6 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do WebSockex.send_frame(handle, {:text, message}) end - def attempt_connection(address) do - attempt_connection(address, :backoff.init(1, @max_backoff)) - end - - defp attempt_connection(address, attempt_state) do - if :backoff.get(attempt_state) === @max_backoff do - {:error, :too_many_failed_attempts} - else - :timer.sleep(:backoff.get(attempt_state) * 1000) - case start_link(address, nil) do - - {:ok, _client} = result -> - result - - {:error, reason} -> - Logger.warn("Could not connect over WebSocket: #{inspect reason}") - {_, attempt_state} = :backoff.fail(attempt_state) - attempt_connection(address, attempt_state) - - end - end - end - - def start_link(address) do - start_link(address, nil) - end - def start_link(address, state) do WebSockex.start_link(address, __MODULE__, state) end @@ -263,13 +226,10 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do end defp handle_primus_event(["node-pong", data], state) do - context = struct!(Primus.State, Application.get_env(:poa_agent, Primus)) - now = POAAgent.Utils.system_time() latency = Float.ceil((now - data["clientTime"]) / 2) - event = %{} - |> Map.put(:id, context.identifier) + |> Map.put(:id, state.identifier) |> Map.put(:latency, latency) |> POAAgent.Format.PrimusEmitter.wrap(event: "latency") |> Jason.encode!() @@ -277,7 +237,6 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do {:reply, {:text, event}, state} end defp handle_primus_event(["history", %{"max" => max, "min" => min}], state) do - context = struct!(Primus.State, Application.get_env(:poa_agent, Primus)) h = LatestBlock.history(min..max) @@ -286,7 +245,7 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do end event = %{} - |> Map.put(:id, context.identifier) + |> Map.put(:id, state.identifier) |> Map.put(:history, history) |> POAAgent.Format.PrimusEmitter.wrap(event: "history") |> Jason.encode!() diff --git a/test/poa_agent/transfers/primus_test.exs b/test/poa_agent/transfers/primus_test.exs index 54cf044..ec1181f 100644 --- a/test/poa_agent/transfers/primus_test.exs +++ b/test/poa_agent/transfers/primus_test.exs @@ -11,7 +11,7 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do import Mock test "sending the Hello & Ping messages" do - args = %{name: :primus_dashboard, args: :no_args} + args = %{name: :primus_dashboard, args: []} test_pid = self() with_mocks([ @@ -40,7 +40,7 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do end test "sending the stats message" do - args = %{name: :primus_dashboard, args: :no_args} + args = %{name: :primus_dashboard, args: []} test_pid = self() with_mock WebSockex, [ @@ -52,6 +52,7 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do ] do {:ok, _pid} = Primus.start_link(args) + Process.sleep(500) :ok = send_to_transfer(:primus_dashboard, :my_metrics, stats_message()) assert_receive "{\"emit\":[\"stats\"" <> _, 20_000 @@ -59,7 +60,7 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do end test "sending the last block message" do - args = %{name: :primus_dashboard, args: :no_args} + args = %{name: :primus_dashboard, args: []} test_pid = self() with_mock WebSockex, [ @@ -71,6 +72,7 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do ] do {:ok, _pid} = Primus.start_link(args) + Process.sleep(500) :ok = send_to_transfer(:primus_dashboard, :my_metrics, last_block_message()) assert_receive "{\"emit\":[\"block\"" <> _, 20_000 @@ -78,7 +80,7 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do end test "sending the history message" do - args = %{name: :primus_dashboard, args: :no_args} + args = %{name: :primus_dashboard, args: []} test_pid = self() with_mock WebSockex, [ @@ -90,6 +92,7 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do ] do {:ok, _pid} = Primus.start_link(args) + Process.sleep(500) :ok = send_to_transfer(:primus_dashboard, :my_metrics, history_message()) assert_receive "{\"emit\":[\"history\"" <> _, 20_000 @@ -97,7 +100,7 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do end test "sending the pending trx message" do - args = %{name: :primus_dashboard, args: :no_args} + args = %{name: :primus_dashboard, args: []} test_pid = self() with_mock WebSockex, [ @@ -109,16 +112,58 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do ] do {:ok, _pid} = Primus.start_link(args) + Process.sleep(500) :ok = send_to_transfer(:primus_dashboard, :my_metrics, pending_message()) assert_receive "{\"emit\":[\"pending\"" <> _, 20_000 end end + test "handle WS reconnections when connection breaks" do + args = %{name: :primus_dashboard, args: []} + test_pid = self() + + with_mock WebSockex, [ + send_frame: fn(to, {:text, message}) -> + send(to, message) + :ok + end, + start_link: fn(_, _, _) -> {:ok, test_pid} end + ] do + {:ok, primus_pid} = Primus.start_link(args) + + Process.sleep(500) + send(primus_pid, {:EXIT, :pid, :econnrefused}) + + assert_receive "{\"emit\":[\"hello\"" <> _, 20_000 + assert_receive "{\"emit\":[\"node-ping\"" <> _, 20_000 + end + end + + test "handle collector's messages when the Transfer is not connected to the Server" do + args = %{name: :primus_dashboard, args: []} + test_pid = self() + + with_mock WebSockex, [ + send_frame: fn(_, {:text, message}) -> + send(test_pid, message) + :ok + end, + start_link: fn(_, _, _) -> {:error, :econnrefused} end + ] do + {:ok, _pid} = Primus.start_link(args) + + Process.sleep(500) + :ok = send_to_transfer(:primus_dashboard, :my_metrics, last_block_message()) + + refute_receive "{\"emit\":[\"block\"" <> _, 20_000 + end + end + test "handle pong message from the dashboard" do message = "{\"emit\":[\"node-pong\",{\"clientTime\":1526597561638,\"serverTime\":1526597561638}]}" - {:reply, {:text, "{\"emit\":[\"latency\"" <> _}, :state} = Primus.Client.handle_frame({:text, message}, :state) + {:reply, {:text, "{\"emit\":[\"latency\"" <> _}, _} = Primus.Client.handle_frame({:text, message}, %{identifier: 1}) end test "handle history message from the dashboard" do @@ -127,7 +172,18 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do ] do message = "{\"emit\":[\"history\",{\"max\":5,\"min\":1}]}" - {:reply, {:text, "{\"emit\":[\"history\"" <> _}, :state} = Primus.Client.handle_frame({:text, message}, :state) + {:reply, {:text, "{\"emit\":[\"history\"" <> _}, _} = Primus.Client.handle_frame({:text, message}, %{identifier: 1}) + end + end + + test "handle {history, false} message from the dashboard" do + with_mock Ethereumex.HttpClient, [ + eth_block_number: fn() -> {:ok, "0x1"} end, + eth_get_block_by_number: fn(_, _) -> {:ok, ethereumex_block()} end + ] do + message = "{\"emit\":[\"history\",false]}" + + {:reply, {:text, "{\"emit\":[\"history\"" <> _}, _} = Primus.Client.handle_frame({:text, message}, %{identifier: 1}) end end