WIP: investigate how to handle ["history", false] upon reconnect

This commit is contained in:
Joseph Yiasemides 2018-05-16 16:57:12 +02:00
parent 6732855c65
commit a538548d3d
4 changed files with 56 additions and 3 deletions

View File

@ -4,8 +4,31 @@ config :poa_agent, POAAgent.Plugins.Transfers.WebSocket.Primus,
address: "ws://localhost:3000/api",
identifier: "elixirNodeJSIntegration",
name: "Elixir-NodeJS-Integration",
secret: "",
secret: "Fr00b5",
contact: "mymail@mail.com"
config :ethereumex,
url: "http://localhost:8545"
url: "http://localhost:8545"
# configuration for collectors. The format for each collector is {collector_process_id, module, label, args}
config :poa_agent,
:collectors,
[
{:eth_latest_block, POAAgent.Plugins.Collectors.Eth.LatestBlock, 500, :latest_block, [url: "http://localhost:8545"]},
{:eth_stats, POAAgent.Plugins.Collectors.Eth.Stats, 5000, :eth_stats, [url: "http://localhost:8545"]},
{:eth_pending, POAAgent.Plugins.Collectors.Eth.Pending, 500, :eth_pending, [url: "http://localhost:8545"]}
]
# configuration for transfers. The format for each collector is {collector_process_id, module, args}
config :poa_agent,
:transfers,
[
{:node_integration, POAAgent.Plugins.Transfers.WebSocket.Primus, []}
]
# configuration for mappings. This relates one collector with a list of transfers which the data will be sent
config :poa_agent,
:mappings,
[
{:eth_latest_block, [:node_integration]},
{:eth_stats, [:node_integration]},
{:eth_pending, [:node_integration]}
]

View File

@ -25,6 +25,8 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
@ping_frequency 3_000
def init_transfer(_) do
false = Process.flag(:trap_exit, true)
context = struct!(Primus.State, Application.get_env(:poa_agent, Primus))
state = nil
address = Map.fetch!(context, :address)
@ -73,6 +75,21 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
{:ok, state}
end
def handle_message({:EXIT, client, {:remote, :closed}}, %{client: client, context: context} = state) do
:timer.sleep(8 * 1000)
address = Map.fetch!(context, :address)
{:ok, client} = Primus.Client.start_link(address, nil)
event = information()
|> Primus.encode(context)
|> Jason.encode!()
:ok = Primus.Client.send(client, event)
{:ok, %{state | client: client}}
end
def terminate(_) do
:ok
end
@ -207,11 +224,22 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
{:reply, {:text, event}, state}
end
defp handle_primus_event(["history", false], state) do
epoch = 20
{:ok, num} = Ethereumex.HttpClient.eth_block_number()
num = String.to_integer(POAAgent.Format.Literal.Hex.decimalize(num))
{:reply, {:text, event}, state} = handle_primus_event(["history", %{"max" => num, "min" => num - epoch}], state)
{:reply, {:text, event}, state}
end
defp handle_primus_event(data, state) do
require Logger
Logger.info("got an unexpected message: #{inspect data}")
{:ok, state}
end
# def terminate(_, _) do
# exit(:normal)
# end
end
end

View File

@ -42,7 +42,8 @@ defmodule POAAgent.MixProject do
# Transfer
{:websockex, "~> 0.4"},
{:jason, "~> 1.0"}
{:jason, "~> 1.0"},
{:backoff, "~> 1.1"}
]
end

View File

@ -1,4 +1,5 @@
%{
"backoff": {:hex, :backoff, "1.1.6", "83b72ed2108ba1ee8f7d1c22e0b4a00cfe3593a67dbc792799e8cce9f42f796b", [:rebar3], [], "hexpm"},
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm"},
"certifi": {:hex, :certifi, "2.3.1", "d0f424232390bf47d82da8478022301c561cf6445b5b5fb6a84d49a9e76d2639", [:rebar3], [{:parse_trans, "3.2.0", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm"},
"credo": {:hex, :credo, "0.9.2", "841d316612f568beb22ba310d816353dddf31c2d94aa488ae5a27bb53760d0bf", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:poison, ">= 0.0.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"},