Merge pull request #17 from poanetwork/ferigis.16.DynamoDB_receiver

[#16] adding DynamoDB Receiver for ethereum blocks
This commit is contained in:
Joseph Yiasemides 2018-06-25 11:05:37 +02:00 committed by GitHub
commit eb799a4f5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 292 additions and 16 deletions

View File

@ -58,10 +58,7 @@ defmodule POABackend.CustomHandler do
The mapping between Data Types and Receivers is done in the config file.
"""
@spec send_to_receivers(Message.t) :: :ok
def send_to_receivers(%Message{} = _message) do
require Logger
# this will be implemented when working with the receivers
Logger.warn("To be implemented")
:ok
def send_to_receivers(%Message{} = message) do
POABackend.Metric.add(message.data_type, message)
end
end

View File

@ -287,6 +287,8 @@ defmodule POABackend.CustomHandler.REST do
@moduledoc false
alias POABackend.CustomHandler.REST
alias POABackend.Protocol.DataType
alias POABackend.Protocol.Message
plug REST.AcceptPlug, "application/json"
plug Plug.Parsers, parsers: [:json], json_decoder: Poison
@ -330,8 +332,17 @@ defmodule POABackend.CustomHandler.REST do
conn = REST.RequiredFieldsPlug.call(conn, ~w(type data))
with false <- conn.halted,
true <- is_map(conn.params["data"])
true <- is_map(conn.params["data"]),
true <- DataType.valid?(conn.params["type"])
do
type = String.to_existing_atom(conn.params["type"])
# sending the data to the receivers
conn.params["id"]
|> Message.new(type, :data, conn.params["data"])
|> POABackend.CustomHandler.send_to_receivers()
conn
|> put_resp_content_type("application/json")
|> send_success_resp()

View File

@ -11,4 +11,17 @@ defmodule POABackend.Protocol.DataType do
"""
@type t :: :ethereum_metric
@doc false
def valid?(type) when is_binary(type) do
type
|> String.to_existing_atom()
|> valid?()
end
def valid?(type) when is_atom(type) do
valid_types = Application.get_env(:poa_backend, :metrics)
Enum.member?(valid_types, type)
end
end

View File

@ -1,5 +1,7 @@
defmodule POABackend.Protocol.Message do
alias __MODULE__
alias POABackend.CustomProtocol.DataType
alias POABackend.CustomProtocol.MessageType
@moduledoc """
The message received from the Agent (inspired in [`Plug.Conn`](https://hexdocs.pm/plug/Plug.Conn.html)).
@ -10,22 +12,18 @@ defmodule POABackend.Protocol.Message do
## Message Fields
* `agent_id` - The Agent Id which sent the message to the backend.
* `receivers` - The list of the receivers which are going to receive this message. This list is retrieved from the config file and is mapped to the `data_type`
* `data_type` - The kind of data the message is carring. For now only `ethereum_metric` type is defined.
* `message_type` - This is the message type according to the custom protocol. Only `hello`, `data` and `latency` are defined
* `assigns` - Shared user data as a map
* `peer` - The actual TCP peer that connected, example: `{{127, 0, 0, 1}, 12345}`.
* `data` - The message payloda. It is a map
"""
defstruct [
agent_id: nil,
receivers: [],
data_type: nil,
message_type: nil,
assigns: %{},
peer: nil,
data: nil
]
@ -36,11 +34,9 @@ defmodule POABackend.Protocol.Message do
"""
@type t :: %__MODULE__{
agent_id: String.t() | nil,
receivers: [atom()],
data_type: POABackend.CustomProtocol.DataType.t() | nil,
message_type: POABackend.CustomProtocol.MessageType.t() | nil,
data_type: DataType.t() | nil,
message_type: MessageType.t() | nil,
assigns: %{atom() => any()},
peer: {:inet.ip_address(), :inet.port_number()} | nil,
data: Map.t() | nil
}
@ -52,6 +48,20 @@ defmodule POABackend.Protocol.Message do
%Message{}
end
@doc """
Returns a new Message Struct initialized.
The params in order are: agent_id, data_type, message_type and data
"""
@spec new(String.t, DataType.t, MessageType.t, Map.t) :: t
def new(agent_id, data_type, message_type, data) do
%Message{
agent_id: agent_id,
data_type: data_type,
message_type: message_type,
data: data
}
end
@doc """
Assigns a value to a key in the connection.

View File

@ -0,0 +1,127 @@
defmodule POABackend.Receivers.DynamoDB do
@moduledoc """
This is a Receiver Plugin which stores the received Ethereum Blocks in DynamoDB
This Receiver needs some data to be put in the config file (_receivers_ section), for example:
{:dynamodb_receiver, POABackend.Receivers.DynamoDB, [
scheme: "http://",
host: "localhost",
port: 8000,
access_key_id: "myaccesskeyid",
secret_access_key: "mysecretaccesskey",
region: "us-east-1"
]}
* scheme: the scheme type
* host: host name or url
* port: the TCP port where the DynamoDB instance is listening
* access_key_id: the AWS access key
* secret_access_key: the AWS secret access key
* region: the AWS region
__All fields are mandatory__
"""
use POABackend.Receiver
alias __MODULE__
alias POABackend.Protocol.Message
@pool_name :blocks_dynamodb_worker_pool
defmodule Worker do
@moduledoc false
use GenServer
alias ExAws.Dynamo
def init(:args) do
{:ok, %{}}
end
def handle_cast(metric, state) do
{_, block} = Map.pop(metric.data["body"], "transactions")
{_, block} = Map.pop(block, "uncles")
msg_time = DateTime.utc_now |> DateTime.to_iso8601
message = %{
msg_type: "block",
msg_time: msg_time,
payload: %{block: block}
}
Dynamo.put_item("netstat_prod", message)
|> ExAws.request!
{:noreply, state}
end
end
def init_receiver(opts) do
:ok = config(opts)
{:ok, _pid} = :wpool.start_pool(@pool_name, worker_pool_config(:args))
{:ok, %{}}
end
def metrics_received(metrics, _from, state) do
for metric <- metrics do
send_metric(metric)
end
{:ok, state}
end
def terminate(_) do
:ok
end
defp worker_pool_config(args) do
[
overrun_warning: :infinity,
overrun_handler: {:error_logger, :warning_report},
workers: 50,
worker: {DynamoDB.Worker, args}
]
end
# filtering only block messages
defp send_metric(%Message{data: %{"type" => "block"}} = metric) do
:wpool.cast(@pool_name, metric)
end
defp send_metric(_) do
:ok
end
@doc false
defp config(opts) do
# ExAws
{_, access_key_id} = List.keyfind(opts, :access_key_id, 0)
{_, secret_access_key} = List.keyfind(opts, :secret_access_key, 0)
Application.put_env(:ex_aws, :access_key_id, access_key_id)
Application.put_env(:ex_aws, :secret_access_key, secret_access_key)
# ExAwsDynamo
{_, scheme} = List.keyfind(opts, :scheme, 0)
{_, host} = List.keyfind(opts, :host, 0)
{_, port} = List.keyfind(opts, :port, 0)
{_, region} = List.keyfind(opts, :region, 0)
config = [scheme: scheme, host: host, port: port, region: region]
Application.put_env(:ex_aws, :dynamodb, config)
:ok
end
end

View File

@ -19,7 +19,7 @@ defmodule POABackend.MixProject do
# Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger, :cowboy, :plug, :poison],
extra_applications: [:logger, :cowboy, :plug, :poison, :worker_pool],
mod: {POABackend.Application, []}
]
end
@ -31,12 +31,16 @@ defmodule POABackend.MixProject do
{:plug, "~> 1.0"},
{:poison, "~> 3.1"},
{:gen_stage, "~> 0.14"},
{:worker_pool, "~> 3.1"},
{:ex_aws_dynamo, "~> 2.0"},
{:hackney, "~> 1.12"},
# Tests
{:credo, "~> 0.9", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 0.5", only: [:dev], runtime: false},
{:excoveralls, "~> 0.8", only: [:test, :dev], runtime: false},
{:httpoison, "~> 1.0", only: [:test], runtime: true},
{:mock, "~> 0.3", only: [:test], runtime: false},
# Docs
{:ex_doc, "~> 0.18", only: :dev, runtime: false}
@ -60,7 +64,8 @@ defmodule POABackend.MixProject do
POABackend.CustomHandler.REST
],
"Receivers": [
POABackend.Receiver
POABackend.Receiver,
POABackend.Receivers.DynamoDB
]
]
]

View File

@ -6,6 +6,8 @@
"credo": {:hex, :credo, "0.9.3", "76fa3e9e497ab282e0cf64b98a624aa11da702854c52c82db1bf24e54ab7c97a", [], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:poison, ">= 0.0.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"},
"dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [], [], "hexpm"},
"earmark": {:hex, :earmark, "1.2.5", "4d21980d5d2862a2e13ec3c49ad9ad783ffc7ca5769cf6ff891a4553fbaae761", [], [], "hexpm"},
"ex_aws": {:hex, :ex_aws, "2.0.2", "8df2f96f58624a399abe5a0ce26db648ee848aca6393b9c65c939ece9ac07bfa", [], [{:configparser_ex, "~> 2.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "1.6.3 or 1.6.5 or 1.7.1 or 1.8.6 or ~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8", [hex: :jsx, repo: "hexpm", optional: true]}, {:poison, ">= 1.2.0", [hex: :poison, repo: "hexpm", optional: true]}, {:sweet_xml, "~> 0.6", [hex: :sweet_xml, repo: "hexpm", optional: true]}, {:xml_builder, "~> 0.1.0", [hex: :xml_builder, repo: "hexpm", optional: true]}], "hexpm"},
"ex_aws_dynamo": {:hex, :ex_aws_dynamo, "2.0.0", "07b1117bbd1b1d04e2598190834c69c271db1d357cc21b82240d1a0b17194165", [], [{:ex_aws, "~> 2.0.0", [hex: :ex_aws, repo: "hexpm", optional: false]}], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.18.3", "f4b0e4a2ec6f333dccf761838a4b253d75e11f714b85ae271c9ae361367897b7", [], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
"excoveralls": {:hex, :excoveralls, "0.8.2", "b941a08a1842d7aa629e0bbc969186a4cefdd035bad9fe15d43aaaaaeb8fae36", [], [{:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:hackney, ">= 0.12.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
"exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm"},
@ -14,13 +16,16 @@
"httpoison": {:hex, :httpoison, "1.1.1", "96ed7ab79f78a31081bb523eefec205fd2900a02cda6dbc2300e7a1226219566", [], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
"idna": {:hex, :idna, "5.1.1", "cbc3b2fa1645113267cc59c760bafa64b2ea0334635ef06dbac8801e42f7279c", [], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"},
"jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [], [], "hexpm"},
"meck": {:hex, :meck, "0.8.9", "64c5c0bd8bcca3a180b44196265c8ed7594e16bcc845d0698ec6b4e577f48188", [], [], "hexpm"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [], [], "hexpm"},
"mime": {:hex, :mime, "1.3.0", "5e8d45a39e95c650900d03f897fbf99ae04f60ab1daa4a34c7a20a5151b7a5fe", [], [], "hexpm"},
"mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [], [], "hexpm"},
"mock": {:hex, :mock, "0.3.1", "994f00150f79a0ea50dc9d86134cd9ebd0d177ad60bd04d1e46336cdfdb98ff9", [], [{:meck, "~> 0.8.8", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm"},
"parse_trans": {:hex, :parse_trans, "3.2.0", "2adfa4daf80c14dc36f522cf190eb5c4ee3e28008fc6394397c16f62a26258c2", [], [], "hexpm"},
"plug": {:hex, :plug, "1.5.1", "1ff35bdecfb616f1a2b1c935ab5e4c47303f866cb929d2a76f0541e553a58165", [], [{:cowboy, "~> 1.0.1 or ~> 1.1 or ~> 2.3", [hex: :cowboy, repo: "hexpm", optional: true]}, {:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}], "hexpm"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [], [], "hexpm"},
"ranch": {:hex, :ranch, "1.5.0", "f04166f456790fee2ac1aa05a02745cc75783c2bfb26d39faf6aefc9a3d3a58a", [], [], "hexpm"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [], [], "hexpm"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [], [], "hexpm"},
"worker_pool": {:hex, :worker_pool, "3.1.0", "c908627e04057cf29940ad0e79b89ab161db520eebc76942efd08a187babf93a", [], [], "hexpm"},
}

View File

@ -0,0 +1,108 @@
defmodule Receivers.DynamoDBTest do
use ExUnit.Case
alias POABackend.Receivers.DynamoDB
alias POABackend.Protocol.Message
import Mock
test "testing DynamoDB" do
caller = self()
with_mock ExAws, [
request!: fn(%ExAws.Operation.JSON{data: %{"Item" => stored_block}}) ->
send(caller, stored_block)
:ok
end
] do
state = %{name: :dynamodb_receiver, args: args(), subscribe_to: [:ethereum_metrics]}
{:ok, _} = DynamoDB.start_link(state)
POABackend.Metric.add(:ethereum_metrics, valid_message())
assert_receive stored_block, 20_000
{_, stored_block} = Map.pop(stored_block, "msg_time")
assert stored_block == stored_block()
end
end
defp args do
[
scheme: "http://",
host: "localhost",
port: 8000,
access_key_id: "myaccesskey",
secret_access_key: "mysecretaccesskey",
region: "us-east-1"
]
end
defp valid_message do
%Message{
agent_id: "elixirNodeJSIntegration",
assigns: %{},
data: %{"body" =>
%{"author" => "0x6e50b3d7a292380b3080022015b941f912ed62e9",
"difficulty" => "340282366920938463463374607431768211454",
"extra_data" => "0xd583010a008650617269747986312e32342e31826c69",
"gas_limit" => 8_000_000,
"gas_used" => 0,
"hash" => "0x495054f5be069321c8bd394884ab0925e53dcd4734978e618106f05276ccfbe0",
"miner" => "0x6e50b3d7a292380b3080022015b941f912ed62e9",
"number" => 644_964,
"parent_hash" => "0xd9197468f891bdee90aee83d15eb274dfb42de294af2d7fc78d652033e4ef27f",
"receipts_root" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
"seal_fields" => ["0x84123c0799", "0xb84174457964f4d98af28faec6f9c1f4eec928fc527d4cd884ef36031a064aafdd494ed5963c0e756166e88fda4491cfc99984a5924230fa3ace4e135e1578a1cb9000"],
"sha3_uncles" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
"signature" => "74457964f4d98af28faec6f9c1f4eec928fc527d4cd884ef36031a064aafdd494ed5963c0e756166e88fda4491cfc99984a5924230fa3ace4e135e1578a1cb9000",
"size" => 579,
"state_root" => "0x4b6d81e4060fdbc88197440d0ce1a2d0ec0f730698b7226a72ca256073508aee",
"step" => "305923993",
"timestamp" => 1_529_619_965,
"total_difficulty" => "219469876498796155149191940307622952427069699",
"transactions" => [],
"transactions_root" =>"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
"uncles" => []},
"type" => "block"},
data_type: :ethereum_metrics,
message_type: :data
}
end
defp stored_block do
%{
"msg_type" => %{"S" => "block"},
"payload" => %{"M" => %{
"block" => %{
"M" => %{
"author" => %{"S" => "0x6e50b3d7a292380b3080022015b941f912ed62e9"},
"difficulty" => %{"S" => "340282366920938463463374607431768211454"},
"extra_data" => %{"S" => "0xd583010a008650617269747986312e32342e31826c69"},
"gas_limit" => %{"N" => "8000000"},
"gas_used" => %{"N" => "0"},
"hash" => %{"S" => "0x495054f5be069321c8bd394884ab0925e53dcd4734978e618106f05276ccfbe0"},
"miner" => %{"S" => "0x6e50b3d7a292380b3080022015b941f912ed62e9"},
"number" => %{"N" => "644964"},
"parent_hash" => %{"S" => "0xd9197468f891bdee90aee83d15eb274dfb42de294af2d7fc78d652033e4ef27f"},
"receipts_root" => %{"S" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"},
"seal_fields" => %{"L" => [%{"S" => "0x84123c0799"}, %{"S" => "0xb84174457964f4d98af28faec6f9c1f4eec928fc527d4cd884ef36031a064aafdd494ed5963c0e756166e88fda4491cfc99984a5924230fa3ace4e135e1578a1cb9000"}]},
"sha3_uncles" => %{"S" => "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"},
"signature" => %{"S" => "74457964f4d98af28faec6f9c1f4eec928fc527d4cd884ef36031a064aafdd494ed5963c0e756166e88fda4491cfc99984a5924230fa3ace4e135e1578a1cb9000"},
"size" => %{"N" => "579"},
"state_root" => %{"S" => "0x4b6d81e4060fdbc88197440d0ce1a2d0ec0f730698b7226a72ca256073508aee"},
"step" => %{"S" => "305923993"},
"timestamp" => %{"N" => "1529619965"},
"total_difficulty" => %{"S" => "219469876498796155149191940307622952427069699"},
"transactions_root" => %{"S" => "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"}
}
}
}
}
}
end
end