Merge pull request #8 from poanetwork/ferigis.6.plugins_system_poc

Plugin System PoC
This commit is contained in:
Joseph Yiasemides 2018-05-09 11:51:17 +02:00 committed by GitHub
commit 2014e85777
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 521 additions and 21 deletions

1
.gitignore vendored
View File

@ -1,6 +1,7 @@
/_build /_build
/cover /cover
/deps /deps
/doc
erl_crash.dump erl_crash.dump
*.ez *.ez
*.beam *.beam

View File

@ -2,6 +2,14 @@
**TODO: Add description** **TODO: Add description**
## Documentation
In order to create the documentation
```
mix docs
```
## Run ## Run
POAAgent is an Elixir application, in order to run it first we need to fetch the dependencies and compile it. POAAgent is an Elixir application, in order to run it first we need to fetch the dependencies and compile it.

View File

@ -1 +1,23 @@
use Mix.Config use Mix.Config
# configuration for collectors. The format for each collector is {collector_process_id, module, label, args}
config :poa_agent,
:collectors,
[
# {:my_collector, POAAgent.Plugins.Collectors.MyCollector, 1000, :my_metrics, [host: "localhost", port: 1234]}
]
# configuration for transfers. The format for each collector is {collector_process_id, module, args}
config :poa_agent,
:transfers,
[
# {:my_transfer, POAAgent.Plugins.Transfers.MyTransfer, [ws_key: "mykey", other_stuff: "hello"]}
]
# configuration for mappings. This relates one collector with a list of transfers which the data will be sent
config :poa_agent,
:mappings,
[
# {:my_collector, [:my_transfer]}
]

View File

@ -1,9 +0,0 @@
defmodule POAAgent do
@moduledoc """
Documentation for PoaAgent.
"""
def hello do
:world
end
end

View File

@ -0,0 +1,21 @@
defmodule POAAgent.Application do
@moduledoc """
This module implements the Application behaviour
"""
use Application
def start(_type, _args) do
import Supervisor.Spec
children = [
supervisor(POAAgent.Plugins.Transfers.Supervisor, []),
supervisor(POAAgent.Plugins.Collectors.Supervisor, [])
]
opts = [strategy: :one_for_one, name: POAAgent.Supervisor]
Supervisor.start_link(children, opts)
end
end

View File

@ -0,0 +1,163 @@
defmodule POAAgent.Plugins.Collector do
@moduledoc """
Defines a Collector Plugin.
A Collector plugin will run in an independent process and will run the `collect/1`
function in a given `frequency`.
`POAAgent` app reads the Collectors configuration from the `config.exs` file when bootstrap and will create a
process per each one of them. That configuration is referenced by :collectors key.
config :poa_agent,
:collectors,
[
{name, module, frequency, label, args}
]
for example
config :poa_agent,
:collectors,
[
{:my_collector, POAAgent.Plugins.Collectors.MyCollector, 5000, :my_metrics, [host: "localhost", port: 1234]}
]
`name`, `module`, `frequency`, `label` and `args` must be defined in the configuration file.
- `name`: Name for the new created process. Must be unique
- `module`: Module which implements the Collector behaviour
- `frequency`: time in milliseconds after which the function `collect/1` will be called
- `label`: The data collected will be prefixed with this label. ie `{:eth_metrics, "data"}`
- `args`: Initial args which will be passed to the `init_collector/1` function
In order to work properly we have to define in the configuration file also the mapping between the Collector
and the Transfers related with it. A `Transfer` is a Plugin process which transfers the data to outside the agent node
(external Database, Dashboard server...).
config :poa_agent,
:mappings,
[
{collector_name, [transfer_name1, transfer_name2]}
]
for example
config :poa_agent,
:mappings,
[
{:my_collector, [:my_transfer]}
]
In order to implement your Collector Plugin you must implement 3 functions.
- `init_collector/1`: Called only once when the process starts
- `collect/1`: This function is called periodically after `frequency` milliseconds. It is responsible
of retrieving the metrics
- `terminate/1`: Called just before stopping the process
This is a simple example of custom Collector Plugin
defmodule POAAgent.Plugins.Collectors.MyCollector do
use POAAgent.Plugins.Collector
def init_collector(args) do
{:ok, :no_state}
end
def collect(:no_state) do
IO.puts "I am collecting data!"
{:ok, "data retrieved", :no_state}
end
def terminate(_reason, _state) do
:ok
end
end
"""
@doc """
A callback executed when the Collector Plugin starts.
The argument is retrieved from the configuration file when the Collector is defined
It must return `{:ok, state}`, that `state` will be keept as in `GenServer` and can be
retrieved in the `collect/1` function.
"""
@callback init_collector(args :: term()) ::
{:ok, any()}
@doc """
In this callback is where the metrics collection logic must be placed.
It must return `{:ok, data, state}`. `data` is the retrieved metrics.
"""
@callback collect(state :: any()) :: {:ok, data :: any(), state :: any()}
@doc """
This callback is called just before the Process goes down. This is a good place for closing connections.
"""
@callback terminate(state :: term()) :: term()
@doc false
defmacro __using__(_opt) do
quote do
@behaviour POAAgent.Plugins.Collector
@doc false
def start_link(%{name: name} = state) do
GenServer.start_link(__MODULE__, state, name: name)
end
@doc false
def init(state) do
{:ok, internal_state} = init_collector(state[:args])
set_collector_timer(state.frequency)
{:ok, Map.put(state, :internal_state, internal_state)}
end
@doc false
def handle_call(_msg, _from, state) do
{:noreply, state}
end
@doc false
def handle_info(:collect, state) do
{:ok, data, internal_state} = collect(state.internal_state)
transfer(data, state.label, state.transfers)
set_collector_timer(state.frequency)
{:noreply, %{state | internal_state: internal_state}}
end
def handle_info(_msg, state) do
{:noreply, state}
end
@doc false
def handle_cast(msg, state) do
{:noreply, state}
end
@doc false
def code_change(_old, state, _extra) do
{:ok, state}
end
@doc false
def terminate(_reason, state) do
terminate(state)
end
@doc false
defp transfer(data, label, transfers) do
Enum.each(transfers, &GenServer.cast(&1, %{label: label, data: data}))
:ok
end
@doc false
defp set_collector_timer(frequency) do
Process.send_after(self(), :collect, frequency)
end
end
end
end

View File

@ -0,0 +1,23 @@
defmodule POAAgent.Plugins.Collectors.Supervisor do
@moduledoc false
def start_link do
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
import Supervisor.Spec
# create the children from the config file
collectors = Application.get_env(:poa_agent, :collectors)
mappings = Application.get_env(:poa_agent, :mappings)
children = for {name, module, frequency, label, args} <- collectors do
worker(module, [%{name: name, transfers: mappings[name], frequency: frequency, label: label, args: args}])
end
opts = [strategy: :one_for_one]
supervise(children, opts)
end
end

View File

@ -0,0 +1,132 @@
defmodule POAAgent.Plugins.Transfer do
@moduledoc """
Defines a Transfer Plugin.
A Transfer plugin receives data from Collectors. It uses the Collector's `label` in order to
differenciate from multiple Collectors.
`POAAgent` app reads the Transfers configuration from the `config.exs` file when bootstrap and will create a
process per each one of them. That configuration is referenced by :transfers key.
config :poa_agent,
:transfers,
[
{name, module, args}
]
for example
config :poa_agent,
:transfers,
[
{:my_transfer, POAAgent.Plugins.Transfers.MyTransfer, [ws_key: "mykey", other_stuff: "hello"]}
]
`name`, `module` and `args` must be defined in the configuration file.
- `name`: Name for the new created process. Must be unique
- `module`: Module which implements the Transfer behaviour
- `args`: Initial args which will be passed to the `init_transfer/1` function
In order to implement your Transfer Plugin you must implement 3 functions.
- `init_transfer/1`: Called only once when the process starts
- `data_received/2`: This function is called every time a Collector sends metrics to the Transfer
- `terminate/1`: Called just before stopping the process
This is a simple example of custom Transfer Plugin
defmodule POAAgent.Plugins.Transfers.MyTransfer do
use POAAgent.Plugins.Transfer
def init_transfer(args) do
{:ok, :no_state}
end
def data_received(label, data, state) do
IO.puts "Received data from the collector referenced by label"
{:ok, :no_state}
end
def terminate(_reason, _state) do
:ok
end
end
"""
@doc """
A callback executed when the Transfer Plugin starts.
The argument is retrieved from the configuration file when the Transfer is defined
It must return `{:ok, state}`, that `state` will be keept as in `GenServer` and can be
retrieved in the `data_received/2` function.
"""
@callback init_transfer(args :: term()) ::
{:ok, any()}
@doc """
In this callback is called when a Collector sends data to this Transfer.
The data received is in `{label, data}` format where `label` identifies the Collector and the
data is the real data received.
It must return `{:ok, state}`.
"""
@callback data_received(label :: atom(), data :: any(), state :: any()) :: {:ok, any()}
@doc """
This callback is called just before the Process goes down. This is a good place for closing connections.
"""
@callback terminate(state :: term()) :: term()
@doc false
defmacro __using__(_opt) do
quote do
@behaviour POAAgent.Plugins.Transfer
@doc false
def start_link(%{name: name} = state) do
GenServer.start_link(__MODULE__, state, name: name)
end
@doc false
def init(state) do
{:ok, internal_state} = init_transfer(state[:args])
{:ok, Map.put(state, :internal_state, internal_state)}
end
@doc false
def handle_call(_msg, _from, state) do
{:noreply, state}
end
@doc false
def handle_info(_msg, state) do
{:noreply, state}
end
@doc false
def handle_cast(%{label: label, data: data}, state) do
{:ok, internal_state} = data_received(label, data, state.internal_state)
{:noreply, %{state | internal_state: internal_state}}
end
def handle_cast(msg, state) do
{:noreply, state}
end
@doc false
def code_change(_old, state, _extra) do
{:ok, state}
end
@doc false
def terminate(_reason, state) do
terminate(state)
end
end
end
end

View File

@ -0,0 +1,22 @@
defmodule POAAgent.Plugins.Transfers.Supervisor do
@moduledoc false
def start_link do
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
import Supervisor.Spec
# create the children from the config file
transfers = Application.get_env(:poa_agent, :transfers)
children = for {name, module, args} <- transfers do
worker(module, [%{name: name, args: args}])
end
opts = [strategy: :one_for_one]
supervise(children, opts)
end
end

30
mix.exs
View File

@ -1,26 +1,48 @@
defmodule POAAgent.MixProject do defmodule POAAgent.MixProject do
use Mix.Project use Mix.Project
@version "0.1.0"
def project do def project do
[ [
app: :poa_agent, app: :poa_agent,
version: "0.1.0", version: @version,
elixir: "~> 1.6", elixir: "~> 1.6",
start_permanent: Mix.env() == :prod, start_permanent: Mix.env() == :prod,
deps: deps() deps: deps(),
docs: docs()
] ]
end end
def application do def application do
[ [
extra_applications: [:logger] extra_applications: [:logger],
mod: {POAAgent.Application, []}
] ]
end end
defp deps do defp deps do
[ [
{:credo, "~> 0.9", only: [:dev, :test], runtime: false}, {:credo, "~> 0.9", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 0.5", only: [:dev], runtime: false} {:dialyxir, "~> 0.5", only: [:dev], runtime: false},
# Docs
{:ex_doc, "~> 0.18", only: :dev, runtime: false}
] ]
end end
defp docs do
[
source_ref: "v#{@version}",
main: "POAAgent.Application",
source_url: "https://github.com/poanetwork/poa-netstats-agent",
groups_for_modules: [
"Plugins": [
POAAgent.Plugins.Collector,
POAAgent.Plugins.Transfer,
]
]
]
end
end end

View File

@ -2,5 +2,7 @@
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [], [], "hexpm"}, "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [], [], "hexpm"},
"credo": {:hex, :credo, "0.9.2", "841d316612f568beb22ba310d816353dddf31c2d94aa488ae5a27bb53760d0bf", [], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:poison, ">= 0.0.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"}, "credo": {:hex, :credo, "0.9.2", "841d316612f568beb22ba310d816353dddf31c2d94aa488ae5a27bb53760d0bf", [], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:poison, ">= 0.0.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"},
"dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [], [], "hexpm"}, "dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [], [], "hexpm"},
"earmark": {:hex, :earmark, "1.2.5", "4d21980d5d2862a2e13ec3c49ad9ad783ffc7ca5769cf6ff891a4553fbaae761", [], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.18.3", "f4b0e4a2ec6f333dccf761838a4b253d75e11f714b85ae271c9ae361367897b7", [], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [], [], "hexpm"}, "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [], [], "hexpm"},
} }

101
test/plugins_test.exs Normal file
View File

@ -0,0 +1,101 @@
defmodule POAAgent.PluginsTest do
use ExUnit.Case
test "__using__ Collector" do
defmodule Collector1 do
use POAAgent.Plugins.Collector
def init_collector(_args) do
{:ok, :no_state}
end
def collect(:no_state) do
{:ok, "data retrieved", :no_state}
end
def terminate(_state) do
:ok
end
end
assert Collector1.init(%{frequency: 5_000}) == {:ok, %{internal_state: :no_state, frequency: 5_000}}
assert Collector1.handle_call(:msg, :from, :state) == {:noreply, :state}
assert Collector1.handle_info(:msg, :state) == {:noreply, :state}
assert Collector1.handle_cast(:msg, :state) == {:noreply, :state}
assert Collector1.code_change(:old, :state, :extra) == {:ok, :state}
assert Collector1.terminate(:reason, :state) == :ok
end
test "__using__ Transfer" do
defmodule Transfer1 do
use POAAgent.Plugins.Transfer
def init_transfer(_args) do
{:ok, :no_state}
end
def data_received(_label, _data, _state) do
{:ok, :no_state}
end
def terminate(_state) do
:ok
end
end
assert Transfer1.init(%{args: :args}) == {:ok, %{internal_state: :no_state, args: :args}}
assert Transfer1.handle_call(:msg, :from, :state) == {:noreply, :state}
assert Transfer1.handle_info(:msg, :state) == {:noreply, :state}
assert Transfer1.handle_cast(:msg, :state) == {:noreply, :state}
assert Transfer1.code_change(:old, :state, :extra) == {:ok, :state}
assert Transfer1.terminate(:reason, :state) == :ok
end
test "Collector - Transfer integration" do
defmodule Collector2 do
use POAAgent.Plugins.Collector
def init_collector(test_pid) do
{:ok, test_pid}
end
def collect(test_pid) do
data = "data retrieved"
send test_pid, {:sent, self(), data}
{:ok, data, test_pid}
end
def terminate(_state) do
:ok
end
end
defmodule Transfer2 do
use POAAgent.Plugins.Transfer
def init_transfer(test_pid) do
{:ok, test_pid}
end
def data_received(label, data, test_pid) do
send test_pid, {:received, self(), label, data}
{:ok, test_pid}
end
def terminate(_state) do
:ok
end
end
transfer1 = :transfer2
{:ok, tpid} = Transfer2.start_link(%{name: transfer1, args: self()})
{:ok, cpid} = Collector2.start_link(%{name: :collector2, transfers: [transfer1], label: :label, args: self(), frequency: 2_000})
assert_receive {:sent, ^cpid, "data retrieved"}, 20_000
assert_receive {:received, ^tpid, :label, "data retrieved"}, 20_000
end
end

View File

@ -1,8 +0,0 @@
defmodule POAAgentTest do
use ExUnit.Case
doctest POAAgent
test "greets the world" do
assert POAAgent.hello() == :world
end
end