Merge pull request #15 from poanetwork/ferigis.14.receivers_mechanism

[#14] designing receiver's mechanism
This commit is contained in:
Joseph Yiasemides 2018-06-20 15:23:06 +02:00 committed by GitHub
commit e216bfe45c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 311 additions and 1 deletions

View File

@ -8,4 +8,26 @@ config :poa_backend,
:custom_handlers,
[
{:rest_custom_handler, POABackend.CustomHandler.REST, [scheme: :http, port: 4002]}
]
# configuration of the Receivers we want to start. The format is {id, module, args}
config :poa_backend,
:receivers,
[
# {:dashboard_receiver, POABackend.Receivers.Dashboard, [args: "myargs"]}
]
# here we define the type of metrics we accept. We will create a GenStage Producer per each type
config :poa_backend,
:metrics,
[
:ethereum_metrics
]
# here we have to define the relationship between receivers and metric types. The format is {receiver_id, [metric_type]}.
# one receiver can subscribe to multiple metric types
config :poa_backend,
:subscriptions,
[
# {:dashboard_receiver, [:ethereum_metrics]}
]

8
coveralls.json Normal file
View File

@ -0,0 +1,8 @@
{
"skip_files": [
"lib/poa_backend/receiver.ex",
"lib/poa_backend/protocol.ex",
"lib/poa_backend/protocol/data_type.ex",
"lib/poa_backend/protocol/message_type.ex"
]
}

View File

@ -7,7 +7,9 @@ defmodule POABackend.Application do
import Supervisor.Spec
children = [
supervisor(POABackend.CustomHandler.Supervisor, [])
supervisor(POABackend.CustomHandler.Supervisor, []),
supervisor(POABackend.Metrics.Supervisor, []),
supervisor(POABackend.Receivers.Supervisor, [])
]
opts = [strategy: :one_for_one, name: POABackend.Supervisor]

28
lib/poa_backend/metric.ex Normal file
View File

@ -0,0 +1,28 @@
defmodule POABackend.Metric do
@moduledoc false
use GenStage
def start_link(name) do
GenStage.start_link(__MODULE__, :ok, name: name)
end
def init(:ok) do
{:producer, [], dispatcher: GenStage.BroadcastDispatcher}
end
def handle_info(_, state), do: {:noreply, [], state}
# public endpoint for events adding
def add(name, event), do: GenServer.cast(name, {:add, event})
# just push events to consumers on adding
def handle_cast({:add, events}, state) when is_list(events) do
{:noreply, events, state}
end
def handle_cast({:add, events}, state), do: {:noreply, [events], state}
# ignore any demand
def handle_demand(_demand, state), do: {:noreply, [], state}
end

View File

@ -0,0 +1,22 @@
defmodule POABackend.Metrics.Supervisor do
@moduledoc false
def start_link do
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
import Supervisor.Spec
# create the children from the config file
metrics = Application.get_env(:poa_backend, :metrics)
children = for metric <- metrics do
worker(POABackend.Metric, [metric])
end
opts = [strategy: :one_for_one]
Supervisor.init(children, opts)
end
end

139
lib/poa_backend/receiver.ex Normal file
View File

@ -0,0 +1,139 @@
defmodule POABackend.Receiver do
@moduledoc """
Defines a Receiver Plugin.
A Receiver plugin will run in an independent process and will run the `metrics_received/3`
function every time it receives a metric from the agents.
`POABackend` app reads the Receivers configuration from the `config.exs` file when bootstrap and will create a
process per each one of them. That configuration is referenced by :receivers key.
config :poa_backend,
:receivers,
[
{name, module, args}
]
for example
config :poa_backend,
:receivers,
[
{:my_receiver, POABackend.Receivers.MyReceiver, [host: "localhost", port: 1234]}
]
`name`, `module` and `args` must be defined in the configuration file.
- `name`: Name for the new created process. Must be unique
- `module`: Module which implements the Receiver behaviour
- `args`: Initial args which will be passed to the `init_receiver/1` function
The Receiver's mechanism is built on top of [GenStage](https://hexdocs.pm/gen_stage/GenStage.html). Receivers are Consumers (sinks) and they must
be subscribed to one or more Producers. The Producers are the Metric types (i.e. `ethereum_metrics`) and are defined in the config file too:
config :poa_backend,
:metrics,
[
:ethereum_metrics
]
In order to work properly we have to define in the configuration file the relation between the Receiver
and the Metric types it wants to receive.
config :poa_backend,
:subscriptions,
[
{receiver_name, [metric_type1, metric_type2]}
]
for example
config :poa_backend,
:subscriptions,
[
# {:my_receiver, [:ethereum_metrics]}
]
## Implementing A Receiver Plugin
In order to implement your Receiver Plugin you must implement 3 functions.
- `init_receiver/1`: Called only once when the process starts
- `metrics_received/3`: This function is called eveytime the Producer (metric type) receives a message.
- `terminate/1`: Called just before stopping the process
This is a simple example of custom Receiver Plugin
defmodule POABackend.Receivers.MyReceiver do
use POABackend.Receiver
def init_receiver(_args) do
{:ok, :no_state}
end
def metrics_received(metrics, from, state) do
for metric <- metrics do
IO.puts "metric received"
end
{:ok, state}
end
def terminate(_state) do
:ok
end
end
"""
@doc """
A callback executed when the Receiver Plugin starts.
The argument is retrieved from the configuration file when the Receiver is defined
It must return `{:ok, state}`, that `state` will be keept as in `GenServer` and can be
retrieved in the `metrics_received/3` function.
"""
@callback init_receiver(args :: term()) :: {:ok, state :: any()}
@doc """
This callback will be called every time a message to the subscribed metric type arrives. It must
return the tuple `{:ok, state}`
"""
@callback metrics_received(metrics :: [term()], from :: pid(), state :: any()) :: {:ok, state :: any()}
@doc """
This callback is called just before the Process goes down. This is a good place for closing connections.
"""
@callback terminate(state :: term()) :: term()
defmacro __using__(_opt) do
quote do
use GenStage
@behaviour POABackend.Receiver
def start_link(%{name: name} = state) do
GenStage.start_link(__MODULE__, state, name: name)
end
def init(state) do
{:ok, internal_state} = init_receiver(state.args)
{:consumer, Map.put(state, :internal_state, internal_state), subscribe_to: state.subscribe_to}
end
def handle_events(events, from, state) do
{:ok, internal_state} = metrics_received(events, from, state.internal_state)
{:noreply, [], %{state | internal_state: internal_state}}
end
def terminate(_reason, state) do
terminate(state)
end
end
end
end

View File

@ -0,0 +1,22 @@
defmodule POABackend.Receivers.Supervisor do
@moduledoc false
def start_link do
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
import Supervisor.Spec
receivers = Application.get_env(:poa_backend, :receivers)
subscriptions = Application.get_env(:poa_backend, :subscriptions)
children = for {name, module, args} <- receivers do
worker(module, [%{name: name, subscribe_to: subscriptions[name], args: args}])
end
opts = [strategy: :one_for_one]
Supervisor.init(children, opts)
end
end

View File

@ -30,6 +30,7 @@ defmodule POABackend.MixProject do
{:cowboy, "~> 1.0.0"},
{:plug, "~> 1.0"},
{:poison, "~> 3.1"},
{:gen_stage, "~> 0.14"},
# Tests
{:credo, "~> 0.9", only: [:dev, :test], runtime: false},
@ -57,6 +58,9 @@ defmodule POABackend.MixProject do
"Custom Handler": [
POABackend.CustomHandler,
POABackend.CustomHandler.REST
],
"Receivers": [
POABackend.Receiver
]
]
]

View File

@ -9,6 +9,7 @@
"ex_doc": {:hex, :ex_doc, "0.18.3", "f4b0e4a2ec6f333dccf761838a4b253d75e11f714b85ae271c9ae361367897b7", [], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
"excoveralls": {:hex, :excoveralls, "0.8.2", "b941a08a1842d7aa629e0bbc969186a4cefdd035bad9fe15d43aaaaaeb8fae36", [], [{:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:hackney, ">= 0.12.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
"exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm"},
"gen_stage": {:hex, :gen_stage, "0.14.0", "65ae78509f85b59d360690ce3378d5096c3130a0694bab95b0c4ae66f3008fad", [], [], "hexpm"},
"hackney": {:hex, :hackney, "1.12.1", "8bf2d0e11e722e533903fe126e14d6e7e94d9b7983ced595b75f532e04b7fdc7", [], [{:certifi, "2.3.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.1", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"},
"httpoison": {:hex, :httpoison, "1.1.1", "96ed7ab79f78a31081bb523eefec205fd2900a02cda6dbc2300e7a1226219566", [], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
"idna": {:hex, :idna, "5.1.1", "cbc3b2fa1645113267cc59c760bafa64b2ea0334635ef06dbac8801e42f7279c", [], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"},

View File

@ -0,0 +1,62 @@
defmodule Receivers.ReceiversTest do
use ExUnit.Case
test "__using__ Receiver" do
defmodule Receiver1 do
use POABackend.Receiver
def init_receiver(_args) do
{:ok, :no_state}
end
def metrics_received(_metric, _from, state) do
{:ok, state}
end
def terminate(_state) do
:ok
end
end
state = %{name: :receiver, args: [], subscribe_to: []}
assert Receiver1.init(state) == {:consumer, %{internal_state: :no_state, name: :receiver, args: [], subscribe_to: []}, [subscribe_to: []]}
end
test "integration between metric and receiver" do
defmodule Receiver2 do
use POABackend.Receiver
def init_receiver(state) do
{:ok, state}
end
def metrics_received(metrics, _from, state) do
for metric <- metrics do
send(state[:test_pid], {:metric_received, metric})
end
{:ok, state}
end
def terminate(_state) do
:ok
end
end
state = %{name: :receiver, args: [test_pid: self()], subscribe_to: [:ethereum_metrics]}
{:ok, _} = Receiver2.start_link(state)
POABackend.Metric.add(:ethereum_metrics, [:message1, :message2])
POABackend.Metric.add(:ethereum_metrics, :message3)
metrics_pid = Process.whereis(:ethereum_metrics)
send(metrics_pid, :nothing_happens)
assert_receive {:metric_received, :message1}, 20_000
assert_receive {:metric_received, :message2}, 20_000
assert_receive {:metric_received, :message3}, 20_000
end
end