[#18] implementing Ping-Pong and Latency messages
This commit is contained in:
parent
53ba6327ef
commit
12389f66aa
|
@ -33,6 +33,7 @@ defmodule POAAgent.Plugins.Transfer do
|
|||
|
||||
- `init_transfer/1`: Called only once when the process starts
|
||||
- `data_received/2`: This function is called every time a Collector sends metrics to the Transfer
|
||||
- `handle_message/1`: This is called when the transfer process receives an Erlang message
|
||||
- `terminate/1`: Called just before stopping the process
|
||||
|
||||
This is a simple example of custom Transfer Plugin
|
||||
|
@ -76,6 +77,13 @@ defmodule POAAgent.Plugins.Transfer do
|
|||
"""
|
||||
@callback data_received(label :: atom(), data :: any(), state :: any()) :: {:ok, any()}
|
||||
|
||||
@doc """
|
||||
In this callback is called when the Transfer process receives an erlang message.
|
||||
|
||||
It must return `{:ok, state}`.
|
||||
"""
|
||||
@callback handle_message(msg :: any(), state :: any()) :: {:ok, state :: any()}
|
||||
|
||||
@doc """
|
||||
This callback is called just before the Process goes down. This is a good place for closing connections.
|
||||
"""
|
||||
|
@ -103,8 +111,9 @@ defmodule POAAgent.Plugins.Transfer do
|
|||
end
|
||||
|
||||
@doc false
|
||||
def handle_info(_msg, state) do
|
||||
{:noreply, state}
|
||||
def handle_info(msg, state) do
|
||||
{:ok, internal_state} = handle_message(msg, state.internal_state)
|
||||
{:noreply, %{state | internal_state: internal_state}}
|
||||
end
|
||||
|
||||
@doc false
|
||||
|
|
|
@ -22,6 +22,8 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|
|||
]
|
||||
end
|
||||
|
||||
@ping_frequency 3_000
|
||||
|
||||
def init_transfer(_) do
|
||||
context = struct!(Primus.State, Application.get_env(:poa_agent, Primus))
|
||||
state = nil
|
||||
|
@ -33,6 +35,8 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|
|||
|> Jason.encode!()
|
||||
:ok = Primus.Client.send(client, event)
|
||||
|
||||
set_ping_timer()
|
||||
|
||||
{:ok, %{client: client, context: context}}
|
||||
end
|
||||
|
||||
|
@ -48,6 +52,21 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|
|||
{:ok, state}
|
||||
end
|
||||
|
||||
def handle_message(:ping, %{client: client, context: context} = state) do
|
||||
|
||||
event = %{}
|
||||
|> Map.put(:id, context.identifier)
|
||||
|> Map.put(:clientTime, POAAgent.Utils.system_time())
|
||||
|> POAAgent.Format.PrimusEmitter.wrap(event: "node-ping")
|
||||
|> Jason.encode!()
|
||||
|
||||
:ok = Primus.Client.send(client, event)
|
||||
|
||||
set_ping_timer()
|
||||
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
def terminate(_) do
|
||||
:ok
|
||||
end
|
||||
|
@ -109,6 +128,10 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|
|||
end
|
||||
end
|
||||
|
||||
defp set_ping_timer() do
|
||||
Process.send_after(self(), :ping, @ping_frequency)
|
||||
end
|
||||
|
||||
defmodule Client do
|
||||
@moduledoc false
|
||||
|
||||
|
@ -122,11 +145,37 @@ defmodule POAAgent.Plugins.Transfers.WebSocket.Primus do
|
|||
WebSockex.start_link(address, __MODULE__, state)
|
||||
end
|
||||
|
||||
def handle_frame({:text, event}, state) do
|
||||
event = Jason.decode!(event)
|
||||
|
||||
handle_primus_event(event["emit"], state)
|
||||
end
|
||||
def handle_frame({_type, _msg} = frame, state) do
|
||||
require Logger
|
||||
|
||||
Logger.info("got an unexpected frame: #{inspect frame}")
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
defp handle_primus_event(["node-pong", data], state) do
|
||||
context = struct!(Primus.State, Application.get_env(:poa_agent, Primus))
|
||||
|
||||
now = POAAgent.Utils.system_time()
|
||||
latency = Float.ceil((now - data["clientTime"]) / 2)
|
||||
|
||||
event = %{}
|
||||
|> Map.put(:id, context.identifier)
|
||||
|> Map.put(:latency, latency)
|
||||
|> POAAgent.Format.PrimusEmitter.wrap(event: "latency")
|
||||
|> Jason.encode!()
|
||||
|
||||
{:reply, {:text, event}, state}
|
||||
end
|
||||
defp handle_primus_event(data, state) do
|
||||
require Logger
|
||||
|
||||
Logger.info("got an unexpected message: #{inspect data}")
|
||||
{:ok, state}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
defmodule POAAgent.Utils do
|
||||
@moduledoc false
|
||||
|
||||
@doc false
|
||||
def system_time do
|
||||
{mega, seconds, ms} = :os.timestamp()
|
||||
(mega * 1_000_000 + seconds) * 1000 + :erlang.round(ms / 1000)
|
||||
end
|
||||
end
|
|
@ -39,6 +39,10 @@ defmodule POAAgent.PluginsTest do
|
|||
{:ok, :no_state}
|
||||
end
|
||||
|
||||
def handle_message(_, state) do
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
def terminate(_state) do
|
||||
:ok
|
||||
end
|
||||
|
@ -46,7 +50,6 @@ defmodule POAAgent.PluginsTest do
|
|||
|
||||
assert Transfer1.init(%{args: :args}) == {:ok, %{internal_state: :no_state, args: :args}}
|
||||
assert Transfer1.handle_call(:msg, :from, :state) == {:noreply, :state}
|
||||
assert Transfer1.handle_info(:msg, :state) == {:noreply, :state}
|
||||
assert Transfer1.handle_cast(:msg, :state) == {:noreply, :state}
|
||||
assert Transfer1.code_change(:old, :state, :extra) == {:ok, :state}
|
||||
assert Transfer1.terminate(:reason, :state) == :ok
|
||||
|
@ -84,6 +87,10 @@ defmodule POAAgent.PluginsTest do
|
|||
{:ok, test_pid}
|
||||
end
|
||||
|
||||
def handle_message(_, state) do
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
def terminate(_state) do
|
||||
:ok
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue