[#23] Exponential backoff in place

This commit is contained in:
Felipe Ripoll 2018-05-18 15:01:05 -06:00
parent 2fcf728560
commit 37a221b985
3 changed files with 96 additions and 81 deletions

View File

@ -7,30 +7,30 @@ config :ethereumex,
config :poa_agent,
:collectors,
[
{:eth_latest_block, POAAgent.Plugins.Collectors.Eth.LatestBlock, 500, :latest_block, [url: "http://localhost:8545"]},
{:eth_stats, POAAgent.Plugins.Collectors.Eth.Stats, 5000, :eth_stats, [url: "http://localhost:8545"]},
{:eth_pending, POAAgent.Plugins.Collectors.Eth.Pending, 500, :eth_pending, [url: "http://localhost:8545"]}
# {:eth_latest_block, POAAgent.Plugins.Collectors.Eth.LatestBlock, 500, :latest_block, [url: "http://localhost:8545"]},
# {:eth_stats, POAAgent.Plugins.Collectors.Eth.Stats, 5000, :eth_stats, [url: "http://localhost:8545"]},
# {:eth_pending, POAAgent.Plugins.Collectors.Eth.Pending, 500, :eth_pending, [url: "http://localhost:8545"]}
]
# configuration for transfers. The format for each collector is {collector_process_id, module, args}
config :poa_agent,
:transfers,
[
{:node_integration, POAAgent.Plugins.Transfers.WebSocket.Primus, [
address: "ws://localhost:3000/api",
identifier: "elixirNodeJSIntegration",
name: "Elixir-NodeJS-Integration",
secret: "Fr00b5",
contact: "mymail@mail.com"
]
}
# {:node_integration, POAAgent.Plugins.Transfers.WebSocket.Primus, [
# address: "ws://localhost:3000/api",
# identifier: "elixirNodeJSIntegration",
# name: "Elixir-NodeJS-Integration",
# secret: "Fr00b5",
# contact: "mymail@mail.com"
# ]
# }
]
# configuration for mappings. This relates one collector with a list of transfers which the data will be sent
config :poa_agent,
:mappings,
[
{:eth_latest_block, [:node_integration]},
{:eth_stats, [:node_integration]},
{:eth_pending, [:node_integration]}
# {:eth_latest_block, [:node_integration]},
# {:eth_stats, [:node_integration]},
# {:eth_pending, [:node_integration]}
]

View File

@ -36,7 +36,6 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
false = Process.flag(:trap_exit, true)
state = struct(Primus.State, configuration)
set_connection_attempt_timer(0)
IO.puts("inside init_transfere :)")
{:ok, state}
end
@ -45,14 +44,14 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
{:ok, state}
end
def data_received(label, data, %{client: client, context: context} = state) when is_list(data) do
def data_received(label, data, %{client: client} = state) when is_list(data) do
require Logger
Logger.info("Received data from the collector referenced by label: #{label}.")
:ok = Enum.each(data, fn(message) ->
event =
message
|> Primus.encode(context)
|> Primus.encode(state)
|> Jason.encode!()
:ok = Primus.Client.send(client, event)
end)
@ -66,46 +65,39 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
def handle_message(:attempt_to_connect, state) do
address = Map.fetch!(state, :address)
case Primus.Client.start_link(address) do
case Primus.Client.start_link(address, state) do
{:ok, client} ->
IO.puts("attempt to connect succeeded")
set_up_and_send_hello(client, state)
IO.puts("between 73")
ping_timer_ref = set_ping_timer()
|> Process.read_timer()
|> IO.inspect()
IO.puts("between 75")
x = %{state | connected?: true, client: client, current_backoff: 1, ping_timer_ref: ping_timer_ref}
IO.inspect(x)
{:ok, %{state | connected?: true, client: client, current_backoff: 1, ping_timer_ref: ping_timer_ref}}
{:error, reason} ->
IO.puts("attempt to connect failed")
Logger.warn("Connection refused because: #{inspect reason}")
new_backoff = backoff(state.current_backoff, @backoff_ceiling)
set_connection_attempt_timer(new_backoff)
{:ok, %{state | connected?: false, current_backoff: new_backoff, client: nil}}
{:ok, %{state | connected?: false, client: nil}}
end
end
def handle_message(:ping, %{client: client, context: context} = state) do
IO.puts("got a ping message")
def handle_message(:ping, %{client: client} = state) do
event = %{}
|> Map.put(:id, context.identifier)
|> Map.put(:id, state.identifier)
|> Map.put(:clientTime, POAAgent.Utils.system_time())
|> POAAgent.Format.PrimusEmitter.wrap(event: "node-ping")
|> Jason.encode!()
:ok = Primus.Client.send(client, event)
set_ping_timer()
ping_timer_ref = set_ping_timer()
{:ok, state}
{:ok, %{state | ping_timer_ref: ping_timer_ref}}
end
def handle_message({:EXIT, _pid, _reason}, state) do
Process.cancel_timer(state.ping_timer_ref)
set_connection_attempt_timer(0)
{:ok, %{state | connected?: false}}
case state.ping_timer_ref do
nil -> :continue
_ -> Process.cancel_timer(state.ping_timer_ref)
end
new_backoff = backoff(state.current_backoff, @backoff_ceiling)
set_connection_attempt_timer(new_backoff)
{:ok, %{state | current_backoff: new_backoff + 1, connected?: false, client: nil}}
end
def terminate(_) do
@ -199,17 +191,15 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
end
end
defp set_up_and_send_hello(client, context) do
defp set_up_and_send_hello(client, state) do
event = information()
|> Primus.encode(context)
|> Primus.encode(state)
|> Jason.encode!()
:ok = Primus.Client.send(client, event)
IO.puts("set up and sent hello was done")
end
defmodule Client do
@moduledoc false
@max_backoff 16
alias POAAgent.Entity.Ethereum.Block
@ -219,33 +209,6 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
WebSockex.send_frame(handle, {:text, message})
end
def attempt_connection(address) do
attempt_connection(address, :backoff.init(1, @max_backoff))
end
defp attempt_connection(address, attempt_state) do
if :backoff.get(attempt_state) === @max_backoff do
{:error, :too_many_failed_attempts}
else
:timer.sleep(:backoff.get(attempt_state) * 1000)
case start_link(address, nil) do
{:ok, _client} = result ->
result
{:error, reason} ->
Logger.warn("Could not connect over WebSocket: #{inspect reason}")
{_, attempt_state} = :backoff.fail(attempt_state)
attempt_connection(address, attempt_state)
end
end
end
def start_link(address) do
start_link(address, nil)
end
def start_link(address, state) do
WebSockex.start_link(address, __MODULE__, state)
end
@ -263,13 +226,10 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
end
defp handle_primus_event(["node-pong", data], state) do
context = struct!(Primus.State, Application.get_env(:poa_agent, Primus))
now = POAAgent.Utils.system_time()
latency = Float.ceil((now - data["clientTime"]) / 2)
event = %{}
|> Map.put(:id, context.identifier)
|> Map.put(:id, state.identifier)
|> Map.put(:latency, latency)
|> POAAgent.Format.PrimusEmitter.wrap(event: "latency")
|> Jason.encode!()
@ -277,7 +237,6 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
{:reply, {:text, event}, state}
end
defp handle_primus_event(["history", %{"max" => max, "min" => min}], state) do
context = struct!(Primus.State, Application.get_env(:poa_agent, Primus))
h = LatestBlock.history(min..max)
@ -286,7 +245,7 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
end
event = %{}
|> Map.put(:id, context.identifier)
|> Map.put(:id, state.identifier)
|> Map.put(:history, history)
|> POAAgent.Format.PrimusEmitter.wrap(event: "history")
|> Jason.encode!()

View File

@ -11,7 +11,7 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do
import Mock
test "sending the Hello & Ping messages" do
args = %{name: :primus_dashboard, args: :no_args}
args = %{name: :primus_dashboard, args: []}
test_pid = self()
with_mocks([
@ -40,7 +40,7 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do
end
test "sending the stats message" do
args = %{name: :primus_dashboard, args: :no_args}
args = %{name: :primus_dashboard, args: []}
test_pid = self()
with_mock WebSockex, [
@ -52,6 +52,7 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do
] do
{:ok, _pid} = Primus.start_link(args)
Process.sleep(500)
:ok = send_to_transfer(:primus_dashboard, :my_metrics, stats_message())
assert_receive "{\"emit\":[\"stats\"" <> _, 20_000
@ -59,7 +60,7 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do
end
test "sending the last block message" do
args = %{name: :primus_dashboard, args: :no_args}
args = %{name: :primus_dashboard, args: []}
test_pid = self()
with_mock WebSockex, [
@ -71,6 +72,7 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do
] do
{:ok, _pid} = Primus.start_link(args)
Process.sleep(500)
:ok = send_to_transfer(:primus_dashboard, :my_metrics, last_block_message())
assert_receive "{\"emit\":[\"block\"" <> _, 20_000
@ -78,7 +80,7 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do
end
test "sending the history message" do
args = %{name: :primus_dashboard, args: :no_args}
args = %{name: :primus_dashboard, args: []}
test_pid = self()
with_mock WebSockex, [
@ -90,6 +92,7 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do
] do
{:ok, _pid} = Primus.start_link(args)
Process.sleep(500)
:ok = send_to_transfer(:primus_dashboard, :my_metrics, history_message())
assert_receive "{\"emit\":[\"history\"" <> _, 20_000
@ -97,7 +100,7 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do
end
test "sending the pending trx message" do
args = %{name: :primus_dashboard, args: :no_args}
args = %{name: :primus_dashboard, args: []}
test_pid = self()
with_mock WebSockex, [
@ -109,16 +112,58 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do
] do
{:ok, _pid} = Primus.start_link(args)
Process.sleep(500)
:ok = send_to_transfer(:primus_dashboard, :my_metrics, pending_message())
assert_receive "{\"emit\":[\"pending\"" <> _, 20_000
end
end
test "handle WS reconnections when connection breaks" do
args = %{name: :primus_dashboard, args: []}
test_pid = self()
with_mock WebSockex, [
send_frame: fn(to, {:text, message}) ->
send(to, message)
:ok
end,
start_link: fn(_, _, _) -> {:ok, test_pid} end
] do
{:ok, primus_pid} = Primus.start_link(args)
Process.sleep(500)
send(primus_pid, {:EXIT, :pid, :econnrefused})
assert_receive "{\"emit\":[\"hello\"" <> _, 20_000
assert_receive "{\"emit\":[\"node-ping\"" <> _, 20_000
end
end
test "handle collector's messages when the Transfer is not connected to the Server" do
args = %{name: :primus_dashboard, args: []}
test_pid = self()
with_mock WebSockex, [
send_frame: fn(_, {:text, message}) ->
send(test_pid, message)
:ok
end,
start_link: fn(_, _, _) -> {:error, :econnrefused} end
] do
{:ok, _pid} = Primus.start_link(args)
Process.sleep(500)
:ok = send_to_transfer(:primus_dashboard, :my_metrics, last_block_message())
refute_receive "{\"emit\":[\"block\"" <> _, 20_000
end
end
test "handle pong message from the dashboard" do
message = "{\"emit\":[\"node-pong\",{\"clientTime\":1526597561638,\"serverTime\":1526597561638}]}"
{:reply, {:text, "{\"emit\":[\"latency\"" <> _}, :state} = Primus.Client.handle_frame({:text, message}, :state)
{:reply, {:text, "{\"emit\":[\"latency\"" <> _}, _} = Primus.Client.handle_frame({:text, message}, %{identifier: 1})
end
test "handle history message from the dashboard" do
@ -127,7 +172,18 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do
] do
message = "{\"emit\":[\"history\",{\"max\":5,\"min\":1}]}"
{:reply, {:text, "{\"emit\":[\"history\"" <> _}, :state} = Primus.Client.handle_frame({:text, message}, :state)
{:reply, {:text, "{\"emit\":[\"history\"" <> _}, _} = Primus.Client.handle_frame({:text, message}, %{identifier: 1})
end
end
test "handle {history, false} message from the dashboard" do
with_mock Ethereumex.HttpClient, [
eth_block_number: fn() -> {:ok, "0x1"} end,
eth_get_block_by_number: fn(_, _) -> {:ok, ethereumex_block()} end
] do
message = "{\"emit\":[\"history\",false]}"
{:reply, {:text, "{\"emit\":[\"history\"" <> _}, _} = Primus.Client.handle_frame({:text, message}, %{identifier: 1})
end
end