This commit is contained in:
Andrej Zavgorodnij 2020-08-19 19:04:41 +03:00
parent b5780402e3
commit 33eeba1854
4 changed files with 130 additions and 104 deletions

View File

@ -12,11 +12,12 @@ import (
"time" "time"
"github.com/depools/dc4bc/fsm/state_machines/signature_proposal_fsm" "github.com/depools/dc4bc/fsm/state_machines/signature_proposal_fsm"
spf "github.com/depools/dc4bc/fsm/state_machines/signature_proposal_fsm"
"github.com/depools/dc4bc/fsm/state_machines" "github.com/depools/dc4bc/fsm/state_machines"
"github.com/depools/dc4bc/fsm/fsm" "github.com/depools/dc4bc/fsm/fsm"
dkgFSM "github.com/depools/dc4bc/fsm/state_machines/dkg_proposal_fsm" dpf "github.com/depools/dc4bc/fsm/state_machines/dkg_proposal_fsm"
"github.com/depools/dc4bc/qr" "github.com/depools/dc4bc/qr"
"github.com/depools/dc4bc/storage" "github.com/depools/dc4bc/storage"
) )
@ -30,6 +31,7 @@ type Client struct {
sync.Mutex sync.Mutex
userName string userName string
address string address string
pubKey ed25519.PublicKey
ctx context.Context ctx context.Context
state State state State
storage storage.Storage storage storage.Storage
@ -54,6 +56,7 @@ func NewClient(
ctx: ctx, ctx: ctx,
userName: userName, userName: userName,
address: keyPair.GetAddr(), address: keyPair.GetAddr(),
pubKey: keyPair.Pub,
state: state, state: state,
storage: storage, storage: storage,
keyStore: keyStore, keyStore: keyStore,
@ -61,12 +64,12 @@ func NewClient(
}, nil }, nil
} }
func (c *Client) SendMessage(message storage.Message) error { func (c *Client) GetAddr() string {
if _, err := c.storage.Send(message); err != nil { return c.address
return fmt.Errorf("failed to post message: %w", err) }
}
return nil func (c *Client) GetPubKey() ed25519.PublicKey {
return c.pubKey
} }
func (c *Client) Poll() error { func (c *Client) Poll() error {
@ -96,6 +99,14 @@ func (c *Client) Poll() error {
} }
} }
func (c *Client) SendMessage(message storage.Message) error {
if _, err := c.storage.Send(message); err != nil {
return fmt.Errorf("failed to post message: %w", err)
}
return nil
}
func (c *Client) ProcessMessage(message storage.Message) error { func (c *Client) ProcessMessage(message storage.Message) error {
fsmInstance, err := c.getFSMInstance(message.DkgRoundID) fsmInstance, err := c.getFSMInstance(message.DkgRoundID)
if err != nil { if err != nil {
@ -122,9 +133,10 @@ func (c *Client) ProcessMessage(message storage.Message) error {
switch resp.State { switch resp.State {
// if the new state is waiting for RPC to airgapped machine // if the new state is waiting for RPC to airgapped machine
case case
dkgFSM.StateDkgCommitsAwaitConfirmations, spf.StateAwaitParticipantsConfirmations,
dkgFSM.StateDkgDealsAwaitConfirmations, dpf.StateDkgCommitsAwaitConfirmations,
dkgFSM.StateDkgResponsesAwaitConfirmations: dpf.StateDkgDealsAwaitConfirmations,
dpf.StateDkgResponsesAwaitConfirmations:
bz, err := json.Marshal(resp.Data) bz, err := json.Marshal(resp.Data)
if err != nil { if err != nil {
return fmt.Errorf("failed to marshal FSM response: %w", err) return fmt.Errorf("failed to marshal FSM response: %w", err)

View File

@ -31,6 +31,7 @@ func TestClient_ProcessMessage(t *testing.T) {
) )
defer ctrl.Finish() defer ctrl.Finish()
userName := "user_name"
dkgRoundID := "dkg_round_id" dkgRoundID := "dkg_round_id"
state := clientMocks.NewMockState(ctrl) state := clientMocks.NewMockState(ctrl)
keyStore := clientMocks.NewMockKeyStore(ctrl) keyStore := clientMocks.NewMockKeyStore(ctrl)
@ -38,14 +39,11 @@ func TestClient_ProcessMessage(t *testing.T) {
qrProcessor := qrMocks.NewMockProcessor(ctrl) qrProcessor := qrMocks.NewMockProcessor(ctrl)
testClientKeyPair := client.NewKeyPair() testClientKeyPair := client.NewKeyPair()
keyStore.EXPECT().LoadKeys("test_client", "").Times(1).Return(testClientKeyPair, nil) keyStore.EXPECT().LoadKeys(userName, "").Times(1).Return(testClientKeyPair, nil)
fsm, err := state_machines.Create(dkgRoundID)
state.EXPECT().LoadFSM(dkgRoundID).Times(1).Return(fsm, true, nil)
clt, err := client.NewClient( clt, err := client.NewClient(
ctx, ctx,
"test_client", userName,
state, state,
stg, stg,
keyStore, keyStore,
@ -53,13 +51,16 @@ func TestClient_ProcessMessage(t *testing.T) {
) )
req.NoError(err) req.NoError(err)
t.Run("test_process_init_dkg", func(t *testing.T) { t.Run("test_process_dkg_init", func(t *testing.T) {
senderUserName := "sender_username" fsm, err := state_machines.Create(dkgRoundID)
state.EXPECT().LoadFSM(dkgRoundID).Times(1).Return(fsm, true, nil)
senderKeyPair := client.NewKeyPair() senderKeyPair := client.NewKeyPair()
senderAddr := senderKeyPair.GetAddr()
messageData := requests.SignatureProposalParticipantsListRequest{ messageData := requests.SignatureProposalParticipantsListRequest{
Participants: []*requests.SignatureProposalParticipantsEntry{ Participants: []*requests.SignatureProposalParticipantsEntry{
{ {
Addr: senderUserName, Addr: senderAddr,
PubKey: senderKeyPair.Pub, PubKey: senderKeyPair.Pub,
DkgPubKey: make([]byte, 128), DkgPubKey: make([]byte, 128),
}, },
@ -91,12 +92,13 @@ func TestClient_ProcessMessage(t *testing.T) {
Offset: 1, Offset: 1,
Event: string(spf.EventInitProposal), Event: string(spf.EventInitProposal),
Data: messageDataBz, Data: messageDataBz,
SenderAddr: senderUserName, SenderAddr: senderAddr,
} }
message.Signature = ed25519.Sign(senderKeyPair.Priv, message.Bytes()) message.Signature = ed25519.Sign(senderKeyPair.Priv, message.Bytes())
state.EXPECT().SaveOffset(uint64(1)).Times(1).Return(nil) state.EXPECT().SaveOffset(uint64(1)).Times(1).Return(nil)
state.EXPECT().SaveFSM(gomock.Any(), gomock.Any()).Times(1).Return(nil) state.EXPECT().SaveFSM(gomock.Any(), gomock.Any()).Times(1).Return(nil)
state.EXPECT().PutOperation(gomock.Any()).Times(1).Return(nil)
err = clt.ProcessMessage(message) err = clt.ProcessMessage(message)
req.NoError(err) req.NoError(err)

View File

@ -14,6 +14,7 @@ const (
) )
type KeyStore interface { type KeyStore interface {
PutKeys(username string, keyPair *KeyPair) error
LoadKeys(userName, password string) (*KeyPair, error) LoadKeys(userName, password string) (*KeyPair, error)
} }
@ -34,9 +35,7 @@ func NewLevelDBKeyStore(username, keystorePath string) (KeyStore, error) {
} }
if _, err := keystore.keystoreDb.Get([]byte(secretsKey), nil); err != nil { if _, err := keystore.keystoreDb.Get([]byte(secretsKey), nil); err != nil {
if err := keystore.initJsonKey(secretsKey, map[string]*KeyPair{ if err := keystore.initJsonKey(secretsKey, map[string]*KeyPair{}); err != nil {
username: NewKeyPair(),
}); err != nil {
return nil, fmt.Errorf("failed to init %s storage: %w", operationsKey, err) return nil, fmt.Errorf("failed to init %s storage: %w", operationsKey, err)
} }
} }
@ -44,6 +43,32 @@ func NewLevelDBKeyStore(username, keystorePath string) (KeyStore, error) {
return keystore, nil return keystore, nil
} }
func (s *LevelDBKeyStore) PutKeys(username string, keyPair *KeyPair) error {
bz, err := s.keystoreDb.Get([]byte(secretsKey), nil)
if err != nil {
return fmt.Errorf("failed to read keystore: %w", err)
}
var keyPairs = map[string]*KeyPair{}
if err := json.Unmarshal(bz, &keyPairs); err != nil {
return fmt.Errorf("failed to unmarshak key pairs: %w", err)
}
keyPairs[username] = keyPair
keyPairsBz, err := json.Marshal(keyPairs)
if err != nil {
return fmt.Errorf("failed to marshal key pair: %w", err)
}
err = s.keystoreDb.Put([]byte(secretsKey), keyPairsBz, nil)
if err != nil {
return fmt.Errorf("failed to put key pairs: %w", err)
}
return nil
}
func (s *LevelDBKeyStore) LoadKeys(userName, password string) (*KeyPair, error) { func (s *LevelDBKeyStore) LoadKeys(userName, password string) (*KeyPair, error) {
bz, err := s.keystoreDb.Get([]byte(secretsKey), nil) bz, err := s.keystoreDb.Get([]byte(secretsKey), nil)
if err != nil { if err != nil {
@ -65,11 +90,11 @@ func (s *LevelDBKeyStore) LoadKeys(userName, password string) (*KeyPair, error)
func (s *LevelDBKeyStore) initJsonKey(key string, data interface{}) error { func (s *LevelDBKeyStore) initJsonKey(key string, data interface{}) error {
if _, err := s.keystoreDb.Get([]byte(key), nil); err != nil { if _, err := s.keystoreDb.Get([]byte(key), nil); err != nil {
operationsBz, err := json.Marshal(data) dataBz, err := json.Marshal(data)
if err != nil { if err != nil {
return fmt.Errorf("failed to marshal storage structure: %w", err) return fmt.Errorf("failed to marshal storage structure: %w", err)
} }
err = s.keystoreDb.Put([]byte(key), operationsBz, nil) err = s.keystoreDb.Put([]byte(key), dataBz, nil)
if err != nil { if err != nil {
return fmt.Errorf("failed to init state: %w", err) return fmt.Errorf("failed to init state: %w", err)
} }
@ -84,7 +109,6 @@ type KeyPair struct {
} }
func NewKeyPair() *KeyPair { func NewKeyPair() *KeyPair {
// TODO: implement proper generation.
pub, priv, _ := ed25519.GenerateKey(nil) pub, priv, _ := ed25519.GenerateKey(nil)
return &KeyPair{ return &KeyPair{
Pub: pub, Pub: pub,

148
main.go
View File

@ -2,10 +2,18 @@ package main
import ( import (
"context" "context"
"crypto/ed25519"
"crypto/md5"
"encoding/json"
"fmt" "fmt"
_ "image/jpeg" _ "image/jpeg"
"log" "log"
"sync" "sync"
"time"
spf "github.com/depools/dc4bc/fsm/state_machines/signature_proposal_fsm"
"github.com/depools/dc4bc/fsm/types/requests"
"github.com/google/uuid"
"github.com/depools/dc4bc/qr" "github.com/depools/dc4bc/qr"
"github.com/depools/dc4bc/storage" "github.com/depools/dc4bc/storage"
@ -24,9 +32,16 @@ type Transport struct {
nodes []*client.Client nodes []*client.Client
} }
type node struct {
client *client.Client
keyPair *client.KeyPair
}
func main() { func main() {
var transport = &Transport{}
var numNodes = 4 var numNodes = 4
var threshold = 3
var storagePath = "/tmp/dc4bc_storage"
var nodes = make([]*node, 4)
for nodeID := 0; nodeID < numNodes; nodeID++ { for nodeID := 0; nodeID < numNodes; nodeID++ {
var ctx = context.Background() var ctx = context.Background()
var userName = fmt.Sprintf("node_%d", nodeID) var userName = fmt.Sprintf("node_%d", nodeID)
@ -35,7 +50,7 @@ func main() {
log.Fatalf("node %d failed to init state: %v\n", nodeID, err) log.Fatalf("node %d failed to init state: %v\n", nodeID, err)
} }
stg, err := storage.NewFileStorage("/tmp/dc4bc_storage") stg, err := storage.NewFileStorage(storagePath)
if err != nil { if err != nil {
log.Fatalf("node %d failed to init storage: %v\n", nodeID, err) log.Fatalf("node %d failed to init storage: %v\n", nodeID, err)
} }
@ -45,10 +60,14 @@ func main() {
log.Fatalf("Failed to init key store: %v", err) log.Fatalf("Failed to init key store: %v", err)
} }
keyPair := client.NewKeyPair()
if err := keyStore.PutKeys(userName, keyPair); err != nil {
log.Fatalf("Failed to PutKeys: %v\n", err)
}
clt, err := client.NewClient( clt, err := client.NewClient(
ctx, ctx,
userName, userName,
nil,
state, state,
stg, stg,
keyStore, keyStore,
@ -57,94 +76,63 @@ func main() {
if err != nil { if err != nil {
log.Fatalf("node %d failed to init client: %v\n", nodeID, err) log.Fatalf("node %d failed to init client: %v\n", nodeID, err)
} }
transport.nodes = append(transport.nodes, clt)
nodes[nodeID] = &node{
client: clt,
keyPair: keyPair,
}
} }
for nodeID, node := range transport.nodes { // Each node starts to Poll().
go func(nodeID int, node *client.Client) { for nodeID, node := range nodes {
if err := node.StartHTTPServer(fmt.Sprintf("localhost:808%d", nodeID)); err != nil {
log.Fatalf("client %d http server failed: %v\n", nodeID, err)
}
}(nodeID, node)
go func(nodeID int, node *client.Client) { go func(nodeID int, node *client.Client) {
if err := node.Poll(); err != nil { if err := node.Poll(); err != nil {
log.Fatalf("client %d poller failed: %v\n", nodeID, err) log.Fatalf("client %d poller failed: %v\n", nodeID, err)
} }
}(nodeID, node) }(nodeID, node.client)
log.Printf("client %d started...\n", nodeID) log.Printf("client %d started...\n", nodeID)
} }
stg, err := storage.NewFileStorage(storagePath)
if err != nil {
log.Fatalf("main namespace failed to init storage: %v\n", err)
}
// Node1 tells other participants to start DKG.
var participants []*requests.SignatureProposalParticipantsEntry
for _, node := range nodes {
participants = append(participants, &requests.SignatureProposalParticipantsEntry{
Addr: node.client.GetAddr(),
PubKey: node.client.GetPubKey(),
DkgPubKey: make([]byte, 128), // TODO: Use a real one.
})
}
messageData := requests.SignatureProposalParticipantsListRequest{
Participants: participants,
SigningThreshold: threshold,
CreatedAt: time.Now(),
}
messageDataBz, err := json.Marshal(messageData)
if err != nil {
log.Fatalf("failed to marshal SignatureProposalParticipantsListRequest: %v\n", err)
}
dkgRoundID := md5.Sum(messageDataBz)
message := storage.Message{
ID: uuid.New().String(),
DkgRoundID: string(dkgRoundID[:]),
Event: string(spf.EventInitProposal),
Data: messageDataBz,
SenderAddr: nodes[0].client.GetAddr(),
}
message.Signature = ed25519.Sign(nodes[0].keyPair.Priv, message.Bytes())
if _, err := stg.Send(message); err != nil {
log.Fatalf("Failed to send %+v to storage: %v\n", message, err)
}
var wg = sync.WaitGroup{} var wg = sync.WaitGroup{}
wg.Add(1) wg.Add(1)
wg.Wait() wg.Wait()
} }
// // Participants broadcast PKs.
// runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) {
// transport.BroadcastPK(participantID, participant.GetPubKey())
// wg.Done()
// })
//
// // Participants init their DKGInstances.
// runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) {
// if err := participant.InitDKGInstance(threshold); err != nil {
// log.Fatalf("Failed to InitDKGInstance: %v", err)
// }
// wg.Done()
// })
//
// // Participants broadcast their Commits.
// runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) {
// commits := participant.GetCommits()
// transport.BroadcastCommits(participantID, commits)
// wg.Done()
// })
//
// // Participants broadcast their deal.
// runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) {
// deals, err := participant.GetDeals()
// if err != nil {
// log.Fatalf("failed to getDeals for participant %s: %v", participantID, err)
// }
// transport.BroadcastDeals(participantID, deals)
// wg.Done()
// })
//
// // Participants broadcast their responses.
// runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) {
// responses, err := participant.ProcessDeals()
// if err != nil {
// log.Fatalf("failed to ProcessDeals for participant %s: %v", participantID, err)
// }
// transport.BroadcastResponses(participantID, responses)
// wg.Done()
// })
//
// // Participants process their responses.
// runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) {
// if err := participant.ProcessResponses(); err != nil {
// log.Fatalf("failed to ProcessResponses for participant %s: %v", participantID, err)
// }
// wg.Done()
// })
//
// for idx, node := range transport.nodes {
// if err := node.Reconstruct(); err != nil {
// fmt.Println("Node", idx, "is not ready:", err)
// } else {
// fmt.Println("Node", idx, "is ready")
// }
// }
//}
//
//func runStep(transport *Transport, cb func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup)) {
// var wg = &sync.WaitGroup{}
// for idx, node := range transport.nodes {
// wg.Add(1)
// n := node
// go cb(fmt.Sprintf("participant_%d", idx), n, wg)
// }
// wg.Wait()
//}