Perform a bound exponential backoff when connecting to Node server
This commit is contained in:
parent
6a90f6da85
commit
a80587a28f
|
@ -11,6 +11,8 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|
|||
alias POAAgent.Entity.Host.Information
|
||||
alias POAAgent.Plugins.Collectors.Eth.LatestBlock
|
||||
|
||||
require Logger
|
||||
|
||||
defmodule State do
|
||||
@moduledoc false
|
||||
|
||||
|
@ -29,9 +31,8 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|
|||
false = Process.flag(:trap_exit, true)
|
||||
|
||||
context = struct!(Primus.State, Application.get_env(:poa_agent, Primus))
|
||||
state = nil
|
||||
address = Map.fetch!(context, :address)
|
||||
{:ok, client} = Primus.Client.start_link(address, state)
|
||||
{:ok, client} = Primus.Client.attempt_connection(address)
|
||||
|
||||
event = information()
|
||||
|> Primus.encode(context)
|
||||
|
@ -76,11 +77,9 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|
|||
{:ok, state}
|
||||
end
|
||||
|
||||
def handle_message({:EXIT, client, {:remote, :closed}}, %{client: client, context: context} = state) do
|
||||
:timer.sleep(8 * 1000)
|
||||
|
||||
def handle_message({:EXIT, pid, reason}, %{client: client, context: context} = state) do
|
||||
address = Map.fetch!(context, :address)
|
||||
{:ok, client} = Primus.Client.start_link(address, nil)
|
||||
{:ok, client} = Primus.Client.attempt_connection(address)
|
||||
|
||||
event = information()
|
||||
|> Primus.encode(context)
|
||||
|
@ -171,6 +170,7 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|
|||
|
||||
defmodule Client do
|
||||
@moduledoc false
|
||||
@max_backoff 16
|
||||
|
||||
alias POAAgent.Entity.Ethereum.Block
|
||||
|
||||
|
@ -180,6 +180,29 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|
|||
WebSockex.send_frame(handle, {:text, message})
|
||||
end
|
||||
|
||||
def attempt_connection(address) do
|
||||
attempt_connection(address, :backoff.init(1, @max_backoff))
|
||||
end
|
||||
|
||||
defp attempt_connection(address, attempt_state) do
|
||||
if :backoff.get(attempt_state) === @max_backoff do
|
||||
{:error, :too_many_failed_attempts}
|
||||
else
|
||||
:timer.sleep(:backoff.get(attempt_state) * 1000)
|
||||
case start_link(address, nil) do
|
||||
|
||||
{:ok, client} = result ->
|
||||
result
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.warn("Could not connect over WebSocket: #{inspect reason}")
|
||||
{_, attempt_state} = :backoff.fail(attempt_state)
|
||||
attempt_connection(address, attempt_state)
|
||||
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def start_link(address, state) do
|
||||
WebSockex.start_link(address, __MODULE__, state)
|
||||
end
|
||||
|
@ -241,9 +264,5 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|
|||
Logger.info("got an unexpected message: #{inspect data}")
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
# def terminate(_, _) do
|
||||
# exit(:normal)
|
||||
# end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue