[#36] resend metrics after reconnect with Server

This commit is contained in:
Felipe Ripoll 2018-05-20 12:25:03 -06:00
parent 37a221b985
commit cb7a9750d0
4 changed files with 92 additions and 54 deletions

View File

@ -5,13 +5,4 @@ defmodule POAAgent.Format.PrimusEmitter do
%{emit: [e | [data]]}
end
def write(id: i, event: e, name: n, data: d) do
value = %{"id" => i, n => d}
%{emit: [e, value]}
end
def write(id: i, event: e, name: n, data: d, secret: s) do
value = %{"id" => i, n => d, "secret" => s}
%{emit: [e, value]}
end
end

View File

@ -72,11 +72,16 @@ defmodule POAAgent.Plugins.Collectors.Eth.LatestBlock do
@doc false
def history(range) do
history = for i <- range do
block_number = "0x" <> Integer.to_string(i, 16)
{:ok, block} = Ethereumex.HttpClient.eth_get_block_by_number(block_number, :false)
Block.format_block(block)
end
history =
try do
for i <- range do
block_number = "0x" <> Integer.to_string(i, 16)
{:ok, block} = Ethereumex.HttpClient.eth_get_block_by_number(block_number, :false)
Block.format_block(block)
end
catch
_, _ -> []
end
%POAAgent.Entity.Ethereum.History{
history: Enum.reverse(history)

View File

@ -25,7 +25,9 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
connected?: false,
current_backoff: 1,
client: nil,
ping_timer_ref: nil
ping_timer_ref: nil,
last_block: nil,
last_metrics: %{}
]
end
@ -39,24 +41,40 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
{:ok, state}
end
def data_received(_, _, %{connected?: false} = state) do
IO.puts("for some reason we seem to be connected")
{:ok, state}
def data_received(label, data, %{connected?: false} = state) when is_list(data) do
last_metrics = Enum.reduce(data, state.last_metrics, fn(message, metrics) ->
Map.put(metrics, label, message)
end)
{:ok, %{state | last_metrics: last_metrics}}
end
def data_received(label, [%Ethereum.Block{} = block], %{client: client} = state) do
require Logger
Logger.info("Received data from the collector referenced by label: #{label}.")
# we have to check if is the first block sent, if that is the case we must send the
# history too. That case makes sense when the agent starts and the Ethereum node is down
case state.last_metrics[label] do
nil -> send_block_and_history(block, client, state)
_ -> send_metric(block, client, state)
end
{:ok, %{state | last_metrics: Map.put(state.last_metrics, label, block)}}
end
def data_received(label, data, %{client: client} = state) when is_list(data) do
require Logger
Logger.info("Received data from the collector referenced by label: #{label}.")
:ok = Enum.each(data, fn(message) ->
event =
message
|> Primus.encode(state)
|> Jason.encode!()
:ok = Primus.Client.send(client, event)
last_metrics = Enum.reduce(data, state.last_metrics, fn(message, metrics) ->
:ok = send_metric(message, client, state)
Map.put(metrics, label, message)
end)
{:ok, state}
{:ok, %{state | last_metrics: last_metrics}}
end
def data_received(label, data, state) do
@ -69,6 +87,9 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
{:ok, client} ->
set_up_and_send_hello(client, state)
ping_timer_ref = set_ping_timer()
:ok = send_last_metrics(client, state)
{:ok, %{state | connected?: true, client: client, current_backoff: 1, ping_timer_ref: ping_timer_ref}}
{:error, reason} ->
Logger.warn("Connection refused because: #{inspect reason}")
@ -104,7 +125,7 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
:ok
end
def encode(%Host.Information{} = x, %Primus.State{identifier: i, secret: s}) do
defp encode(%Host.Information{} = x, %Primus.State{identifier: i, secret: s}) do
x = Entity.NameConvention.from_elixir_to_node(x)
%{}
@ -114,7 +135,7 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|> POAAgent.Format.PrimusEmitter.wrap(event: "hello")
end
def encode(%Ethereum.Block{} = x, %Primus.State{identifier: i}) do
defp encode(%Ethereum.Block{} = x, %Primus.State{identifier: i}) do
x = Entity.NameConvention.from_elixir_to_node(x)
%{}
@ -123,7 +144,7 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|> POAAgent.Format.PrimusEmitter.wrap(event: "block")
end
def encode(%Ethereum.Statistics{} = x, %Primus.State{identifier: i}) do
defp encode(%Ethereum.Statistics{} = x, %Primus.State{identifier: i}) do
x = Entity.NameConvention.from_elixir_to_node(x)
%{}
@ -132,7 +153,7 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|> POAAgent.Format.PrimusEmitter.wrap(event: "stats")
end
def encode(%POAAgent.Entity.Ethereum.History{} = x, %Primus.State{identifier: i}) do
defp encode(%POAAgent.Entity.Ethereum.History{} = x, %Primus.State{identifier: i}) do
history = for i <- x.history do
Entity.NameConvention.from_elixir_to_node(i)
end
@ -143,7 +164,7 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|> POAAgent.Format.PrimusEmitter.wrap(event: "history")
end
def encode(%POAAgent.Entity.Ethereum.Pending{} = x, %Primus.State{identifier: i}) do
defp encode(%POAAgent.Entity.Ethereum.Pending{} = x, %Primus.State{identifier: i}) do
x = Entity.NameConvention.from_elixir_to_node(x)
%{}
@ -152,7 +173,7 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|> POAAgent.Format.PrimusEmitter.wrap(event: "pending")
end
def information() do
defp information() do
config = Application.get_env(:poa_agent, __MODULE__)
with {:ok, coinbase} <- Ethereumex.HttpClient.eth_coinbase(),
@ -170,7 +191,12 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
net: net
}
else
_error -> Information.new()
_error ->
%Information{
Information.new() |
name: config[:name],
contact: config[:contact]
}
end
end
@ -193,16 +219,43 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
defp set_up_and_send_hello(client, state) do
event = information()
|> Primus.encode(state)
|> encode(state)
|> Jason.encode!()
:ok = Primus.Client.send(client, event)
end
defp send_last_metrics(client, state) do
state.last_metrics
|> Enum.each(fn
{_label, %Ethereum.Block{} = block} ->
:ok = send_block_and_history(block, client, state)
{_label, message} ->
:ok = send_metric(message, client, state)
end)
end
defp send_block_and_history(block, client, state) do
range = LatestBlock.history_range(block, 0)
history = LatestBlock.history(range)
:ok = send_metric(block, client, state)
:ok = send_metric(history, client, state)
:ok
end
defp send_metric(metric, client, state) do
event =
metric
|> encode(state)
|> Jason.encode!()
Primus.Client.send(client, event)
end
defmodule Client do
@moduledoc false
alias POAAgent.Entity.Ethereum.Block
use WebSockex
def send(handle, message) do
@ -252,14 +305,7 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
{:reply, {:text, event}, state}
end
defp handle_primus_event(["history", false], state) do
{:ok, block_number} = Ethereumex.HttpClient.eth_block_number()
{:ok, block} = Ethereumex.HttpClient.eth_get_block_by_number(block_number, :false)
block = Block.format_block(block)
min..max = LatestBlock.history_range(block, 0)
{:reply, {:text, event}, state} = handle_primus_event(["history", %{"max" => max, "min" => min}], state)
{:reply, {:text, event}, state}
end
defp handle_primus_event(data, state) do
require Logger

View File

@ -76,6 +76,13 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do
:ok = send_to_transfer(:primus_dashboard, :my_metrics, last_block_message())
assert_receive "{\"emit\":[\"block\"" <> _, 20_000
assert_receive "{\"emit\":[\"history\"" <> _, 20_000
# send a second block won't send the history
:ok = send_to_transfer(:primus_dashboard, :my_metrics, last_block_message())
assert_receive "{\"emit\":[\"block\"" <> _, 20_000
refute_receive "{\"emit\":[\"history\"" <> _, 20_000
end
end
@ -176,17 +183,6 @@ defmodule POAAgent.Plugins.Collectors.Eth.PrimusTest do
end
end
test "handle {history, false} message from the dashboard" do
with_mock Ethereumex.HttpClient, [
eth_block_number: fn() -> {:ok, "0x1"} end,
eth_get_block_by_number: fn(_, _) -> {:ok, ethereumex_block()} end
] do
message = "{\"emit\":[\"history\",false]}"
{:reply, {:text, "{\"emit\":[\"history\"" <> _}, _} = Primus.Client.handle_frame({:text, message}, %{identifier: 1})
end
end
test "handle unexpected messages from the dashboard" do
message = "{\"emit\":[\"unexpected\",{\"clientTime\":1526597561638,\"serverTime\":1526597561638}]}"