[#22] [REST Receiver] sending inactive when a client is no sending pings

This commit is contained in:
Felipe Ripoll 2018-07-06 08:34:06 -06:00
parent 7cfd4cc9af
commit b48a80bab1
16 changed files with 315 additions and 21 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

View File

@ -1,5 +1,8 @@
defmodule POABackend.CustomHandler do
alias POABackend.Protocol.Message
alias POABackend.Metric
@moduledoc """
A Custom Handler is responsible of handling data sent from Agents (i.e. REST over HTTP, WebSockets...) "speaking" the POA Protocol.
@ -38,6 +41,13 @@ defmodule POABackend.CustomHandler do
is the id for that handler, the second one is the Elixir module which implements the CustomHandler behaviour and the third one is a list for arguments
which will be passed to the `child_spec/1` function as a parameter
### Helpful functions
This module also define some helpful functions:
- send_to_receivers/1: This function will publish the incomming message to the appropiate metric type (Data Type). A Custom Handler must call it when it wants to dispatch a message.
- publish_inactive/1: Will publish an inactive message to all the metrics in the system. A Custom Handler must call it when detects if a client is disconnected or/and inactive
"""
@doc """
@ -53,12 +63,23 @@ defmodule POABackend.CustomHandler do
end
@doc """
This function dispatches the given Message to the appropiate receivers based on the Data Type.
This function dispatches the given Message to the appropiate receivers based on the Data Type (ie :ethereum_metric).
The mapping between Data Types and Receivers is done in the config file.
_Note_ the message must be a [POABackend.Protocol.Message](POABackend.Protocol.Message.html) struct
"""
@spec send_to_receivers(Message.t) :: :ok
def send_to_receivers(%Message{} = message) do
POABackend.Metric.add(message.data_type, message)
Metric.add(message.data_type, message)
end
@doc """
Publish an inactive message to all the metrics defined in the config file.
A Custom Handler must call this explicity when detecting if a client is inactive for a period of time
"""
def publish_inactive(agent_id) do
Metric.broadcast({:inactive, agent_id})
end
end

View File

@ -4,7 +4,24 @@ defmodule POABackend.CustomHandler.REST do
@moduledoc """
This module implements the REST Custom Handler over HTTP/1.1.
It defines the endpoints needed to use the POA Protocol.
# Plugin Architecture
This plugin involves many processes. When the `POABackend.CustomHandler.Supervisor` calls the
`child_spec/2` function it will create its own supervision tree under that supervisor
![REST plugin Architecture](./REST_architecture.png)
- `POABackend.CustomHandler.REST.Supervisor` is the main supervisor. It is in charge of supervise its three
children.
- The `Ranch/Cowboy` branch is managed by Ranch and Cowboy apps. They are in charge of expose the REST endpoints on top
of http.
- The Registry is an Elixir Registry in charge of track/untrack Activity Monitor Servers, created by the next child
- `POABackend.CustomHandler.REST.Monitor.Supervisor` is a Supervisor with `:simple_one_for_one` strategy. It will start
dynamically `GenServer`'s implemented by `POABackend.CustomHandler.REST.Monitor.Server` module.
# REST endpoints
This Pluting also defines the endpoints needed to use the POA Protocol.
## _hello_ endpoint
@ -290,10 +307,10 @@ defmodule POABackend.CustomHandler.REST do
alias POABackend.Protocol.DataType
alias POABackend.Protocol.Message
plug REST.AcceptPlug, "application/json"
plug REST.Plugs.Accept, "application/json"
plug Plug.Parsers, parsers: [:json], json_decoder: Poison
plug REST.RequiredFieldsPlug, ~w(id secret)
plug REST.AuthorizationPlug
plug REST.Plugs.RequiredFields, ~w(id secret)
plug REST.Plugs.Authorization
plug :match
plug :dispatch
@ -304,13 +321,15 @@ defmodule POABackend.CustomHandler.REST do
end
post "/ping" do
:ok = REST.ping_monitor(conn.params["id"])
conn
|> put_resp_content_type("application/json")
|> send_success_resp()
end
post "/latency" do
conn = REST.RequiredFieldsPlug.call(conn, ~w(latency))
conn = REST.Plugs.RequiredFields.call(conn, ~w(latency))
with false <- conn.halted,
true <- is_float(conn.params["latency"])
@ -329,7 +348,7 @@ defmodule POABackend.CustomHandler.REST do
end
post "/data" do
conn = REST.RequiredFieldsPlug.call(conn, ~w(type data))
conn = REST.Plugs.RequiredFields.call(conn, ~w(type data))
with false <- conn.halted,
true <- is_map(conn.params["data"]),
@ -379,10 +398,35 @@ defmodule POABackend.CustomHandler.REST do
end
end
@doc """
This function will initialize an Activity Monitor Server for a given Agent ID if it doesn't
exist already. If it exist this function will send a ping message to the Monitor Server in order to
restart the timeout countdown.
The Activity Monitor Server is a `GenServer` which will be initialized under the
`POABackend.CustomHandler.REST.Monitor.Supervisor` supervisor.
"""
def ping_monitor(agent_id) when is_binary(agent_id) do
case Registry.lookup(:rest_activity_monitor_registry, agent_id) do
[{pid, _}] ->
GenServer.cast(pid, :ping)
[] ->
{:ok, _} = start_monitor(agent_id)
end
:ok
end
@doc false
def start_monitor(agent_id) when is_binary(agent_id) do
Supervisor.start_child(POABackend.CustomHandler.REST.Monitor.Supervisor, [agent_id])
end
# Custom Handler Callbacks
@doc false
def child_spec(options) do
Plug.Adapters.Cowboy.child_spec(scheme: options[:scheme], plug: POABackend.CustomHandler.REST.Router, options: [port: options[:port]])
POABackend.CustomHandler.REST.Supervisor.child_spec(options)
end
end

View File

@ -0,0 +1,48 @@
defmodule POABackend.CustomHandler.REST.Monitor.Server do
@moduledoc false
use GenServer
@timeout 7000
@registry :rest_activity_monitor_registry
def start_link(args) do
agent_id = via_tuple(args)
GenServer.start_link(__MODULE__, args, name: agent_id)
end
def child_spec do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, []},
type: :worker,
restart: :temporary,
shutdown: 500
}
end
def init(args) do
{:ok, %{agent_id: args}, @timeout}
end
def handle_cast(:ping, state) do
{:noreply, state, @timeout}
end
def handle_info(:timeout, state) do
{:stop, :normal, state}
end
def terminate(_, state) do
set_inactive(state.agent_id)
Registry.unregister(@registry, state.agent_id)
end
defp via_tuple(agent_id) do
{:via, Registry, {@registry, agent_id}}
end
defp set_inactive(agent_id) do
POABackend.CustomHandler.publish_inactive(agent_id)
end
end

View File

@ -0,0 +1,29 @@
defmodule POABackend.CustomHandler.REST.Monitor.Supervisor do
@moduledoc false
alias POABackend.CustomHandler.REST
def start_link do
Supervisor.start_link(__MODULE__, :noargs, name: __MODULE__)
end
def child_spec do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, []},
type: :supervisor,
restart: :permanent,
shutdown: 500
}
end
def init(:noargs) do
children = [
REST.Monitor.Server.child_spec()
]
opts = [strategy: :simple_one_for_one]
Supervisor.init(children, opts)
end
end

View File

@ -1,4 +1,4 @@
defmodule POABackend.CustomHandler.REST.AcceptPlug do
defmodule POABackend.CustomHandler.REST.Plugs.Accept do
@moduledoc false
@behaviour Plug

View File

@ -1,4 +1,4 @@
defmodule POABackend.CustomHandler.REST.AuthorizationPlug do
defmodule POABackend.CustomHandler.REST.Plugs.Authorization do
@moduledoc false
@behaviour Plug

View File

@ -1,4 +1,4 @@
defmodule POABackend.CustomHandler.REST.RequiredFieldsPlug do
defmodule POABackend.CustomHandler.REST.Plugs.RequiredFields do
@moduledoc false
@behaviour Plug

View File

@ -0,0 +1,33 @@
defmodule POABackend.CustomHandler.REST.Supervisor do
@moduledoc false
alias POABackend.CustomHandler.REST
def start_link(rest_options) do
Supervisor.start_link(__MODULE__, rest_options, name: __MODULE__)
end
def child_spec(rest_options) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [rest_options]},
type: :supervisor,
restart: :permanent,
shutdown: 500
}
end
def init(rest_options) do
import Supervisor.Spec
children = [
Plug.Adapters.Cowboy.child_spec(scheme: rest_options[:scheme], plug: REST.Router, options: [port: rest_options[:port]]),
supervisor(Registry, [:unique, :rest_activity_monitor_registry]),
REST.Monitor.Supervisor.child_spec()
]
opts = [strategy: :one_for_one]
Supervisor.init(children, opts)
end
end

View File

@ -8,6 +8,14 @@ defmodule POABackend.Metric do
GenStage.start_link(__MODULE__, :ok, name: name)
end
def broadcast(message) do
metrics = Application.get_env(:poa_backend, :metrics)
for metric <- metrics do
add(metric, message)
end
end
def init(:ok) do
{:producer, [], dispatcher: GenStage.BroadcastDispatcher}
end

View File

@ -62,6 +62,7 @@ defmodule POABackend.Receiver do
- `init_receiver/1`: Called only once when the process starts
- `metrics_received/3`: This function is called eveytime the Producer (metric type) receives a message.
- `handle_message/1`: This is called when the Receiver process receives an Erlang message
- `handle_inactive/2`: This function is called when one client has been disconnected or is not active for a period of time.
- `terminate/1`: Called just before stopping the process
This is a simple example of custom Receiver Plugin
@ -84,6 +85,10 @@ defmodule POABackend.Receiver do
{:ok, state}
end
def handle_inactive(agent_id, state) do
{:ok, state}
end
def terminate(_state) do
:ok
end
@ -114,6 +119,15 @@ defmodule POABackend.Receiver do
"""
@callback handle_message(msg :: any(), state :: any()) :: {:ok, state :: any()}
@doc """
This function is called when a Custom Handler detects a client is inactive.
The Custom Handler must to call explicity to `POABackend.CustomHandler.publish_inactive/1` and it will publish the
`inactive` message to all the metrics in the system (defined in the config file).
"""
@callback handle_inactive(agent_id :: binary(), state :: any()) :: {:ok, state :: any()}
@doc """
This callback is called just before the Process goes down. This is a good place for closing connections.
"""
@ -138,6 +152,13 @@ defmodule POABackend.Receiver do
end
@doc false
def handle_events([inactive: agent_id], _from, state) do
{:ok, internal_state} = handle_inactive(agent_id, state.internal_state)
{:noreply, [], %{state | internal_state: internal_state}}
end
def handle_events(events, from, state) do
{:ok, internal_state} = metrics_received(events, from, state.internal_state)

View File

@ -24,6 +24,7 @@ defmodule POABackend.Receivers.Dashboard do
use POABackend.Receiver
alias __MODULE__
alias POABackend.Protocol.Message
defmodule SocketHandler do
@ -111,6 +112,24 @@ defmodule POABackend.Receivers.Dashboard do
{:ok, %{state | clients: List.delete(clients, client)}}
end
def handle_inactive(agent_id, %{clients: clients} = state) do
data = %{"type" => "statistics",
"body" =>
%{"active" => false,
"gasPrice" => nil,
"hashrate" => 0,
"mining" => false,
"peers" => 0,
"syncing" => nil,
"uptime" => nil
}}
agent_id
|> Message.new(:ethereum_metrics, :data, data)
|> dispatch_metric(clients)
{:ok, state}
end
def terminate(_) do
:ok
end
@ -137,7 +156,7 @@ defmodule POABackend.Receivers.Dashboard do
]
end
defp dispatch_metric(metrics, clients) do
defp dispatch_metric(metrics, clients) when is_list(metrics) do
for client <- clients do
for metric <- metrics do
send(client, metric)
@ -147,4 +166,8 @@ defmodule POABackend.Receivers.Dashboard do
:ok
end
defp dispatch_metric(metric, clients) do
dispatch_metric([metric], clients)
end
end

View File

@ -78,6 +78,8 @@ defmodule POABackend.Receivers.DynamoDB do
{:ok, state}
end
def handle_inactive(_, state), do: {:ok, state}
def handle_message(_message, state), do: {:ok, state}
def terminate(_) do

View File

@ -79,5 +79,6 @@ defmodule POABackend.MixProject do
defp picture(_) do
File.cp("./assets/backend_architecture.png", "./doc/backend_architecture.png")
File.cp("./assets/REST_architecture.png", "./doc/REST_architecture.png")
end
end

View File

@ -1,6 +1,8 @@
defmodule CustomHandler.RESTTest do
use ExUnit.Case
alias POABackend.CustomHandler.REST.Monitor
@base_url "localhost:4002"
# ----------------------------------------
@ -43,11 +45,7 @@ defmodule CustomHandler.RESTTest do
# ----------------------------------------
test "testing the REST /ping endpoint" do
url = @base_url <> "/ping"
{:ok, data} = Poison.encode(%{id: "agentID", secret: "mysecret", data: %{hello: "world"}})
headers = [{"Content-Type", "application/json"}]
{200, %{"result" => "success"}} = post(url, data, headers)
{200, %{"result" => "success"}} = ping("agentID")
end
test "testing the REST /ping endpoint without content-type" do
@ -73,6 +71,56 @@ defmodule CustomHandler.RESTTest do
{401, :nobody} = post(url, data, headers)
end
test "POST /ping send inactive after stopping sending pings" do
# first creating a Receiver in order to catch the inactive message
defmodule Receiver1 do
use POABackend.Receiver
def init_receiver(state) do
{:ok, state}
end
def metrics_received(_metrics, _from, state) do
{:ok, state}
end
def handle_message(_message, state) do
{:ok, state}
end
def handle_inactive(_, state) do
send(state[:test_pid], :inactive_received)
{:ok, state}
end
def terminate(_state) do
:ok
end
end
state = %{name: :receiver, args: [test_pid: self()], subscribe_to: [:ethereum_metrics]}
{:ok, _} = Receiver1.start_link(state)
%{active: active_monitors} = Supervisor.count_children(Monitor.Supervisor)
agent_id = "NewAgentID"
{200, %{"result" => "success"}} = ping(agent_id)
active_monitors = active_monitors + 1
%{active: ^active_monitors} = Supervisor.count_children(Monitor.Supervisor)
{200, %{"result" => "success"}} = ping(agent_id)
%{active: ^active_monitors} = Supervisor.count_children(Monitor.Supervisor)
assert_receive :inactive_received, 20_000
end
# ----------------------------------------
# /latency Endpoint Tests
# ----------------------------------------
@ -227,9 +275,9 @@ defmodule CustomHandler.RESTTest do
original_data = :original_data
assert(original_data == REST.AcceptPlug.init(original_data))
assert(original_data == REST.AuthorizationPlug.init(original_data))
assert(original_data == REST.RequiredFieldsPlug.init(original_data))
assert(original_data == REST.Plugs.Accept.init(original_data))
assert(original_data == REST.Plugs.Authorization.init(original_data))
assert(original_data == REST.Plugs.RequiredFields.init(original_data))
end
# ----------------------------------------
@ -250,4 +298,12 @@ defmodule CustomHandler.RESTTest do
{response.status_code, body}
end
defp ping(agent_id) do
url = @base_url <> "/ping"
{:ok, data} = Poison.encode(%{id: agent_id, secret: "mysecret", data: %{hello: "world"}})
headers = [{"Content-Type", "application/json"}]
post(url, data, headers)
end
end

View File

@ -17,6 +17,10 @@ defmodule Receivers.ReceiversTest do
{:ok, state}
end
def handle_inactive(_, state) do
{:ok, state}
end
def terminate(_state) do
:ok
end
@ -47,6 +51,10 @@ defmodule Receivers.ReceiversTest do
{:ok, state}
end
def handle_inactive(_, state) do
{:ok, state}
end
def terminate(_state) do
:ok
end