[hermes] Pass Wormhole arguments from command line or env. vars (#769)

* Format p2p.go

* Pass Wormhole arguments from command line or env. vars

* Remove forget calls and let memory be freed (also remove confusing comment)

* Use proper types on command line arguments
This commit is contained in:
Thomaz Leite 2023-04-19 17:04:30 -03:00 committed by GitHub
parent 9fea461174
commit 04b1a21dfe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 186 additions and 123 deletions

View File

@ -24,13 +24,22 @@ pub enum Options {
#[structopt(long)]
id_secp256k1: Option<PathBuf>,
/// Multiaddress for a Wormhole bootstrap peer.
#[structopt(long)]
wormhole_peer: Option<String>,
/// Network ID for Wormhole
#[structopt(long, env = "WORMHOLE_NETWORK_ID")]
wh_network_id: String,
/// Multiaddress to bind Wormhole P2P to.
#[structopt(long)]
wormhole_addr: Option<Multiaddr>,
/// Multiaddresses for Wormhole bootstrap peers (separated by comma).
#[structopt(long, use_delimiter = true, env = "WORMHOLE_BOOTSTRAP_ADDRS")]
wh_bootstrap_addrs: Vec<Multiaddr>,
/// Multiaddresses to bind Wormhole P2P to (separated by comma)
#[structopt(
long,
use_delimiter = true,
default_value = "/ip4/0.0.0.0/udp/30910/quic,/ip6/::/udp/30910/quic",
env = "WORMHOLE_LISTEN_ADDRS"
)]
wh_listen_addrs: Vec<Multiaddr>,
/// The address to bind the RPC server to.
#[structopt(long, default_value = "127.0.0.1:33999")]

View File

@ -52,8 +52,9 @@ async fn init(_update_channel: Receiver<AccountUpdate>) -> Result<()> {
config::Options::Run {
id: _,
id_secp256k1: _,
wormhole_addr: _,
wormhole_peer: _,
wh_network_id,
wh_bootstrap_addrs,
wh_listen_addrs,
rpc_addr,
p2p_addr,
p2p_peer: _,
@ -62,7 +63,13 @@ async fn init(_update_channel: Receiver<AccountUpdate>) -> Result<()> {
// Spawn the P2P layer.
log::info!("Starting P2P server on {}", p2p_addr);
network::p2p::spawn(handle_message).await?;
network::p2p::spawn(
handle_message,
wh_network_id.to_string(),
wh_bootstrap_addrs,
wh_listen_addrs,
)
.await?;
// Spawn the RPC server.
log::info!("Starting RPC server on {}", rpc_addr);

View File

@ -8,6 +8,7 @@
package main
// #include <stdlib.h>
// #include <string.h>
//
// // A structure containing Wormhole VAA observations. This must match on both
// // the Go and Rust side.
@ -27,6 +28,7 @@ import "C"
import (
"context"
"fmt"
"strings"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
@ -45,123 +47,120 @@ import (
)
//export RegisterObservationCallback
func RegisterObservationCallback(f C.callback_t) {
go func() {
ctx := context.Background()
func RegisterObservationCallback(f C.callback_t, network_id, bootstrap_addrs, listen_addrs *C.char) {
networkID := C.GoString(network_id)
bootstrapAddrs := strings.Split(C.GoString(bootstrap_addrs), ",")
listenAddrs := strings.Split(C.GoString(listen_addrs), ",")
// Setup base network configuration.
networkID := "/wormhole/mainnet/2"
priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1)
bootstrapPeers := []string{
"/dns4/wormhole-mainnet-v2-bootstrap.certus.one/udp/8999/quic/p2p/12D3KooWQp644DK27fd3d4Km3jr7gHiuJJ5ZGmy8hH4py7fP4FP7",
}
go func() {
ctx := context.Background()
// Setup libp2p Connection Manager.
mgr, err := connmgr.NewConnManager(
100,
400,
connmgr.WithGracePeriod(0),
)
// Setup base network configuration.
priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1)
if err != nil {
err := fmt.Errorf("Failed to create connection manager: %w", err)
fmt.Println(err)
return
}
// Setup libp2p Connection Manager.
mgr, err := connmgr.NewConnManager(
100,
400,
connmgr.WithGracePeriod(0),
)
// Setup libp2p Reactor.
h, err := libp2p.New(
libp2p.Identity(priv),
libp2p.ListenAddrStrings(
"/ip4/0.0.0.0/udp/30910/quic",
"/ip6/::/udp/30910/quic",
),
libp2p.Security(libp2ptls.ID, libp2ptls.New),
libp2p.Transport(libp2pquic.NewTransport),
libp2p.ConnectionManager(mgr),
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
bootstrappers := make([]peer.AddrInfo, 0)
for _, addr := range bootstrapPeers {
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
continue
}
if err != nil {
err := fmt.Errorf("Failed to create connection manager: %w", err)
fmt.Println(err)
return
}
pi, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil || pi.ID == h.ID() {
continue
}
// Setup libp2p Reactor.
h, err := libp2p.New(
libp2p.Identity(priv),
libp2p.ListenAddrStrings(listenAddrs...),
libp2p.Security(libp2ptls.ID, libp2ptls.New),
libp2p.Transport(libp2pquic.NewTransport),
libp2p.ConnectionManager(mgr),
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
bootstrappers := make([]peer.AddrInfo, 0)
for _, addr := range bootstrapAddrs {
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
continue
}
bootstrappers = append(bootstrappers, *pi)
}
idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer),
dht.ProtocolPrefix(protocol.ID("/"+networkID)),
dht.BootstrapPeers(bootstrappers...),
)
return idht, err
}),
)
pi, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil || pi.ID == h.ID() {
continue
}
if err != nil {
err := fmt.Errorf("Failed to create libp2p host: %w", err)
fmt.Println(err)
return
}
bootstrappers = append(bootstrappers, *pi)
}
idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer),
dht.ProtocolPrefix(protocol.ID("/"+networkID)),
dht.BootstrapPeers(bootstrappers...),
)
return idht, err
}),
)
topic := fmt.Sprintf("%s/%s", networkID, "broadcast")
ps, err := pubsub.NewGossipSub(ctx, h)
if err != nil {
err := fmt.Errorf("Failed to create Pubsub: %w", err)
fmt.Println(err)
return
}
if err != nil {
err := fmt.Errorf("Failed to create libp2p host: %w", err)
fmt.Println(err)
return
}
th, err := ps.Join(topic)
if err != nil {
err := fmt.Errorf("Failed to join topic: %w", err)
fmt.Println(err)
return
}
topic := fmt.Sprintf("%s/%s", networkID, "broadcast")
ps, err := pubsub.NewGossipSub(ctx, h)
if err != nil {
err := fmt.Errorf("Failed to create Pubsub: %w", err)
fmt.Println(err)
return
}
sub, err := th.Subscribe()
if err != nil {
err := fmt.Errorf("Failed to subscribe topic: %w", err)
fmt.Println(err)
return
}
th, err := ps.Join(topic)
if err != nil {
err := fmt.Errorf("Failed to join topic: %w", err)
fmt.Println(err)
return
}
for {
for {
select {
case <-ctx.Done():
return
default:
envelope, err := sub.Next(ctx)
if err != nil {
err := fmt.Errorf("Failed to receive Pubsub message: %w", err)
fmt.Println(err)
return
}
sub, err := th.Subscribe()
if err != nil {
err := fmt.Errorf("Failed to subscribe topic: %w", err)
fmt.Println(err)
return
}
// Definition for GossipMessage is generated by Protobuf, see `p2p.proto`.
var msg GossipMessage
err = proto.Unmarshal(envelope.Data, &msg)
for {
for {
select {
case <-ctx.Done():
return
default:
envelope, err := sub.Next(ctx)
if err != nil {
err := fmt.Errorf("Failed to receive Pubsub message: %w", err)
fmt.Println(err)
return
}
switch msg.Message.(type) {
case *GossipMessage_SignedObservation:
case *GossipMessage_SignedVaaWithQuorum:
vaaBytes := msg.GetSignedVaaWithQuorum().GetVaa()
cBytes := C.CBytes(vaaBytes)
defer C.free(cBytes)
C.invoke(f, C.observation_t{
vaa: (*C.char)(cBytes),
vaa_len: C.size_t(len(vaaBytes)),
})
}
}
}
}
}()
// Definition for GossipMessage is generated by Protobuf, see `p2p.proto`.
var msg GossipMessage
err = proto.Unmarshal(envelope.Data, &msg)
switch msg.Message.(type) {
case *GossipMessage_SignedObservation:
case *GossipMessage_SignedVaaWithQuorum:
vaaBytes := msg.GetSignedVaaWithQuorum().GetVaa()
cBytes := C.CBytes(vaaBytes)
defer C.free(cBytes)
C.invoke(f, C.observation_t{
vaa: (*C.char)(cBytes),
vaa_len: C.size_t(len(vaaBytes)),
})
}
}
}
}
}()
}
func main() {

View File

@ -11,17 +11,29 @@
use {
anyhow::Result,
std::sync::{
mpsc::{
Receiver,
Sender,
libp2p::Multiaddr,
std::{
ffi::{
c_char,
CString,
},
sync::{
mpsc::{
Receiver,
Sender,
},
Mutex,
},
Mutex,
},
};
extern "C" {
fn RegisterObservationCallback(cb: extern "C" fn(o: ObservationC));
fn RegisterObservationCallback(
cb: extern "C" fn(o: ObservationC),
network_id: *const c_char,
bootstrap_addrs: *const c_char,
listen_addrs: *const c_char,
);
}
// An `Observation` C type passed back to us from Go.
@ -64,22 +76,58 @@ extern "C" fn proxy(o: ObservationC) {
/// TODO: handle_message should be capable of handling more than just Observations. But we don't
/// have our own P2P network, we pass it in to keep the code structure and read directly from the
/// OBSERVATIONS channel in the RPC for now.
pub fn bootstrap<H>(_handle_message: H) -> Result<()>
pub fn bootstrap<H>(
_handle_message: H,
network_id: String,
wh_bootstrap_addrs: Vec<Multiaddr>,
wh_listen_addrs: Vec<Multiaddr>,
) -> Result<()>
where
H: Fn(Observation) -> Result<()> + 'static,
{
let network_id_cstr = CString::new(network_id)?;
let wh_bootstrap_addrs_cstr = CString::new(
wh_bootstrap_addrs
.iter()
.map(|a| a.to_string())
.collect::<Vec<_>>()
.join(","),
)?;
let wh_listen_addrs_cstr = CString::new(
wh_listen_addrs
.iter()
.map(|a| a.to_string())
.collect::<Vec<_>>()
.join(","),
)?;
// Launch the Go LibP2P Reactor.
unsafe {
RegisterObservationCallback(proxy as extern "C" fn(o: ObservationC));
RegisterObservationCallback(
proxy as extern "C" fn(observation: ObservationC),
network_id_cstr.as_ptr(),
wh_bootstrap_addrs_cstr.as_ptr(),
wh_listen_addrs_cstr.as_ptr(),
);
}
Ok(())
}
// Spawn's the P2P layer as a separate thread via Go.
pub async fn spawn<H>(handle_message: H) -> Result<()>
pub async fn spawn<H>(
handle_message: H,
network_id: String,
wh_bootstrap_addrs: Vec<Multiaddr>,
wh_listen_addrs: Vec<Multiaddr>,
) -> Result<()>
where
H: Fn(Observation) -> Result<()> + Send + 'static,
{
bootstrap(handle_message)?;
bootstrap(
handle_message,
network_id,
wh_bootstrap_addrs,
wh_listen_addrs,
)?;
Ok(())
}