feat: single process tests

This commit is contained in:
Andrej Zavgorodnij 2020-08-10 00:37:53 +03:00
parent 268877c481
commit ae4d167195
4 changed files with 119 additions and 147 deletions

View File

@ -51,7 +51,7 @@ func (c *Client) SendMessage(message storage.Message) error {
return nil return nil
} }
func (c *Client) Poll() { func (c *Client) Poll() error {
tk := time.NewTicker(pollingPeriod) tk := time.NewTicker(pollingPeriod)
for { for {
select { select {
@ -63,7 +63,7 @@ func (c *Client) Poll() {
messages, err := c.storage.GetMessages(offset) messages, err := c.storage.GetMessages(offset)
if err != nil { if err != nil {
panic(err) return fmt.Errorf("failed to GetMessages: %w", err)
} }
for _, message := range messages { for _, message := range messages {
@ -75,21 +75,21 @@ func (c *Client) Poll() {
// I.e., if FSM returned an Operation for us. // I.e., if FSM returned an Operation for us.
if operation != nil { if operation != nil {
if err := c.state.PutOperation(operation); err != nil { if err := c.state.PutOperation(operation); err != nil {
panic(err) return fmt.Errorf("failed to PutOperation: %w", err)
} }
} }
if err := c.state.SaveOffset(message.Offset); err != nil { if err := c.state.SaveOffset(message.Offset); err != nil {
panic(err) return fmt.Errorf("failed to SaveOffset: %w", err)
} }
if err := c.state.SaveFSM(c.fsm); err != nil { if err := c.state.SaveFSM(c.fsm); err != nil {
panic(err) return fmt.Errorf("failed to SaveFSM: %w", err)
} }
} }
case <-c.ctx.Done(): case <-c.ctx.Done():
log.Println("Context closed, stop polling...") log.Println("Context closed, stop polling...")
return return nil
} }
} }
} }

View File

@ -27,14 +27,16 @@ func successResponse(w http.ResponseWriter, response []byte) {
} }
func (c *Client) StartHTTPServer(listenAddr string) error { func (c *Client) StartHTTPServer(listenAddr string) error {
http.HandleFunc("/sendMessage", c.sendMessageHandler) mux := http.NewServeMux()
http.HandleFunc("/getOperations", c.getOperationsHandler) mux.HandleFunc("/sendMessage", c.sendMessageHandler)
http.HandleFunc("/getOperationQRPath", c.getOperationQRPathHandler) mux.HandleFunc("/getOperations", c.getOperationsHandler)
http.HandleFunc("/readProcessedOperationFromCamera", c.readProcessedOperationFromCameraHandler) mux.HandleFunc("/getOperationQRPath", c.getOperationQRPathHandler)
mux.HandleFunc("/readProcessedOperationFromCamera", c.readProcessedOperationFromCameraHandler)
http.HandleFunc("/readProcessedOperation", c.readProcessedOperationFromBodyHandler) mux.HandleFunc("/readProcessedOperation", c.readProcessedOperationFromBodyHandler)
http.HandleFunc("/getOperationQR", c.getOperationQRToBodyHandler) mux.HandleFunc("/getOperationQR", c.getOperationQRToBodyHandler)
return http.ListenAndServe(listenAddr, nil)
return http.ListenAndServe(listenAddr, mux)
} }
func (c *Client) sendMessageHandler(w http.ResponseWriter, r *http.Request) { func (c *Client) sendMessageHandler(w http.ResponseWriter, r *http.Request) {

View File

@ -43,14 +43,22 @@ func NewLevelDBState(stateDbPath string) (State, error) {
stateDb: db, stateDb: db,
} }
if err := state.initKey(operationsKey, map[string]*Operation{}); err != nil { // Init state key for operations JSON.
if err := state.initJsonKey(operationsKey, map[string]*Operation{}); err != nil {
return nil, fmt.Errorf("failed to init %s storage: %w", operationsKey, err) return nil, fmt.Errorf("failed to init %s storage: %w", operationsKey, err)
} }
// Init state key for offset bytes.
bz := make([]byte, 8)
binary.LittleEndian.PutUint64(bz, 0)
if err := db.Put([]byte(offsetKey), bz, nil); err != nil {
return nil, fmt.Errorf("failed to init %s storage: %w", offsetKey, err)
}
return state, nil return state, nil
} }
func (s *LevelDBState) initKey(key string, data interface{}) error { func (s *LevelDBState) initJsonKey(key string, data interface{}) error {
if _, err := s.stateDb.Get([]byte(key), nil); err != nil { if _, err := s.stateDb.Get([]byte(key), nil); err != nil {
operationsBz, err := json.Marshal(data) operationsBz, err := json.Marshal(data)
if err != nil { if err != nil {

226
main.go
View File

@ -1,16 +1,16 @@
package main package main
import ( import (
"context"
"fmt" "fmt"
_ "image/jpeg" _ "image/jpeg"
"log" "log"
"sync" "sync"
dkg "go.dedis.ch/kyber/v3/share/dkg/pedersen" "github.com/depools/dc4bc/qr"
"github.com/depools/dc4bc/storage"
"go.dedis.ch/kyber/v3" "github.com/depools/dc4bc/client"
dkglib "github.com/depools/dc4bc/dkg"
_ "image/gif" _ "image/gif"
_ "image/png" _ "image/png"
@ -21,160 +21,122 @@ import (
) )
type Transport struct { type Transport struct {
nodes []*dkglib.DKG nodes []*client.Client
}
func (t *Transport) getNodeByParticipantID(id int) *dkglib.DKG {
for _, node := range t.nodes {
if node.ParticipantID == id {
return node
}
}
return nil
}
func (t *Transport) BroadcastPK(participant string, pk kyber.Point) {
for idx, node := range t.nodes {
if ok := node.StorePubKey(participant, pk); !ok {
log.Fatalf("Failed to store PK for participant %d", idx)
}
}
}
func (t *Transport) BroadcastCommits(participant string, commits []kyber.Point) {
for _, node := range t.nodes {
node.StoreCommits(participant, commits)
}
}
func (t *Transport) BroadcastDeals(participant string, deals map[int]*dkg.Deal) {
for index, deal := range deals {
dstNode := t.getNodeByParticipantID(index)
if dstNode == nil {
fmt.Printf("Node with index #%d not found\n", index)
continue
}
dstNode.StoreDeal(participant, deal)
}
}
func (t *Transport) BroadcastResponses(participant string, responses []*dkg.Response) {
for _, node := range t.nodes {
node.StoreResponses(participant, responses)
}
} }
func main() { func main() {
var threshold = 3
var transport = &Transport{} var transport = &Transport{}
var numNodes = 4 var numNodes = 4
for i := 0; i < numNodes; i++ { for nodeID := 0; nodeID < numNodes; nodeID++ {
transport.nodes = append(transport.nodes, dkglib.Init()) var ctx = context.Background()
} var state, err = client.NewLevelDBState(fmt.Sprintf("/tmp/dc4bc_node_%d_state", nodeID))
// Participants broadcast PKs.
runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) {
transport.BroadcastPK(participantID, participant.GetPubKey())
wg.Done()
})
// Participants init their DKGInstances.
runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) {
if err := participant.InitDKGInstance(threshold); err != nil {
log.Fatalf("Failed to InitDKGInstance: %v", err)
}
wg.Done()
})
// Participants broadcast their Commits.
runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) {
commits := participant.GetCommits()
transport.BroadcastCommits(participantID, commits)
wg.Done()
})
// Participants broadcast their deal.
runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) {
deals, err := participant.GetDeals()
if err != nil { if err != nil {
log.Fatalf("failed to getDeals for participant %s: %v", participantID, err) log.Fatalf("node %d failed to init state: %v\n", nodeID, err)
} }
transport.BroadcastDeals(participantID, deals)
wg.Done()
})
// Participants broadcast their responses. stg, err := storage.NewFileStorage("/tmp/dc4bc_storage")
runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) {
responses, err := participant.ProcessDeals()
if err != nil { if err != nil {
log.Fatalf("failed to ProcessDeals for participant %s: %v", participantID, err) log.Fatalf("node %d failed to init storage: %v\n", nodeID, err)
} }
transport.BroadcastResponses(participantID, responses)
wg.Done()
})
// Participants process their responses. clt, err := client.NewClient(
runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) { ctx,
if err := participant.ProcessResponses(); err != nil { nil,
log.Fatalf("failed to ProcessResponses for participant %s: %v", participantID, err) state,
} stg,
wg.Done() qr.NewCameraProcessor(),
}) )
if err != nil {
for idx, node := range transport.nodes { log.Fatalf("node %d failed to init client: %v\n", nodeID, err)
if err := node.Reconstruct(); err != nil {
fmt.Println("Node", idx, "is not ready:", err)
} else {
fmt.Println("Node", idx, "is ready")
} }
transport.nodes = append(transport.nodes, clt)
} }
}
func runStep(transport *Transport, cb func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup)) { for nodeID, node := range transport.nodes {
var wg = &sync.WaitGroup{} go func(nodeID int, node *client.Client) {
for idx, node := range transport.nodes { if err := node.StartHTTPServer(fmt.Sprintf("localhost:808%d", nodeID)); err != nil {
wg.Add(1) log.Fatalf("client %d http server failed: %v\n", nodeID, err)
n := node }
go cb(fmt.Sprintf("participant_%d", idx), n, wg) }(nodeID, node)
go func(nodeID int, node *client.Client) {
if err := node.Poll(); err != nil {
log.Fatalf("client %d poller failed: %v\n", nodeID, err)
}
}(nodeID, node)
log.Printf("client %d started...\n", nodeID)
} }
var wg = sync.WaitGroup{}
wg.Add(1)
wg.Wait() wg.Wait()
} }
//func runQRTest() { // // Participants broadcast PKs.
// clearTerminal() // runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) {
// var data = "Hello, world!" // transport.BroadcastPK(participantID, participant.GetPubKey())
// wg.Done()
// })
// //
// log.Println("A QR code will be shown on your screen.") // // Participants init their DKGInstances.
// log.Println("Please take a photo of the QR code with your smartphone.") // runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) {
// log.Println("When you close the image, you will have 5 seconds to" + // if err := participant.InitDKGInstance(threshold); err != nil {
// "scan the QR code with your laptop's camera.") // log.Fatalf("Failed to InitDKGInstance: %v", err)
// err := qr.ShowQR(data) // }
// if err != nil { // wg.Done()
// log.Fatalf("Failed to show QR code: %v", err) // })
// }
// //
// var scannedData string // // Participants broadcast their Commits.
// for { // runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) {
// clearTerminal() // commits := participant.GetCommits()
// transport.BroadcastCommits(participantID, commits)
// wg.Done()
// })
//
// // Participants broadcast their deal.
// runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) {
// deals, err := participant.GetDeals()
// if err != nil { // if err != nil {
// log.Printf("Failed to scan QR code: %v\n", err) // log.Fatalf("failed to getDeals for participant %s: %v", participantID, err)
// } // }
// transport.BroadcastDeals(participantID, deals)
// wg.Done()
// })
// //
// log.Println("Please center the photo of the QR-code in front" + // // Participants broadcast their responses.
// "of your web-camera...") // runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) {
// responses, err := participant.ProcessDeals()
// if err != nil {
// log.Fatalf("failed to ProcessDeals for participant %s: %v", participantID, err)
// }
// transport.BroadcastResponses(participantID, responses)
// wg.Done()
// })
// //
// scannedData, err = qr.ReadQR() // // Participants process their responses.
// if err == nil { // runStep(transport, func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup) {
// break // if err := participant.ProcessResponses(); err != nil {
// log.Fatalf("failed to ProcessResponses for participant %s: %v", participantID, err)
// }
// wg.Done()
// })
//
// for idx, node := range transport.nodes {
// if err := node.Reconstruct(); err != nil {
// fmt.Println("Node", idx, "is not ready:", err)
// } else {
// fmt.Println("Node", idx, "is ready")
// } // }
// } // }
//
// clearTerminal()
// log.Printf("QR code successfully scanned; the data is: %s\n", scannedData)
//} //}
// //
//func clearTerminal() { //func runStep(transport *Transport, cb func(participantID string, participant *dkglib.DKG, wg *sync.WaitGroup)) {
// c := exec.Command("clear") // var wg = &sync.WaitGroup{}
// c.Stdout = os.Stdout // for idx, node := range transport.nodes {
// _ = c.Run() // wg.Add(1)
// n := node
// go cb(fmt.Sprintf("participant_%d", idx), n, wg)
// }
// wg.Wait()
//} //}