diff --git a/client/client.go b/client/client.go index 739dc11..40c09e6 100644 --- a/client/client.go +++ b/client/client.go @@ -12,7 +12,6 @@ import ( "sync" "time" - "github.com/depools/dc4bc/airgapped" "github.com/depools/dc4bc/client/types" "github.com/depools/dc4bc/fsm/types/requests" "github.com/google/uuid" @@ -35,7 +34,7 @@ const ( type Client struct { sync.Mutex - logger *logger + Logger *logger userName string address string pubKey ed25519.PublicKey @@ -44,7 +43,6 @@ type Client struct { storage storage.Storage keyStore KeyStore qrProcessor qr.Processor - Airgapped *airgapped.AirgappedMachine } func NewClient( @@ -54,7 +52,6 @@ func NewClient( storage storage.Storage, keyStore KeyStore, qrProcessor qr.Processor, - airgappedMachine *airgapped.AirgappedMachine, ) (*Client, error) { keyPair, err := keyStore.LoadKeys(userName, "") if err != nil { @@ -63,7 +60,7 @@ func NewClient( return &Client{ ctx: ctx, - logger: newLogger(userName), + Logger: newLogger(userName), userName: userName, address: keyPair.GetAddr(), pubKey: keyPair.Pub, @@ -71,7 +68,6 @@ func NewClient( storage: storage, keyStore: keyStore, qrProcessor: qrProcessor, - Airgapped: airgappedMachine, }, nil } @@ -100,38 +96,15 @@ func (c *Client) Poll() error { for _, message := range messages { if message.RecipientAddr == "" || message.RecipientAddr == c.GetAddr() { - c.logger.Log("Handling message with offset %d, type %s", message.Offset, message.Event) + c.Logger.Log("Handling message with offset %d, type %s", message.Offset, message.Event) if err := c.ProcessMessage(message); err != nil { - c.logger.Log("Failed to process message: %v", err) + c.Logger.Log("Failed to process message: %v", err) } else { - c.logger.Log("Successfully processed message with offset %d, type %s", + c.Logger.Log("Successfully processed message with offset %d, type %s", message.Offset, message.Event) } } } - - operations, err := c.GetOperations() - if err != nil { - c.logger.Log("Failed to get operations: %v", err) - } - - c.logger.Log("Got %d Operations from pool", len(operations)) - for _, operation := range operations { - c.logger.Log("Handling operation %s in airgapped", operation.Type) - processedOperation, err := c.Airgapped.HandleOperation(*operation) - if err != nil { - c.logger.Log("Failed to handle operation: %v", err) - } - - c.logger.Log("Got %d Processed Operations from Airgapped", len(operations)) - c.logger.Log("Operation %s handled in airgapped, result event is %s", - operation.Event, processedOperation.Event) - if err = c.handleProcessedOperation(processedOperation); err != nil { - c.logger.Log("Failed to handle processed operation: %v", err) - } else { - c.logger.Log("Successfully handled processed operation %s", processedOperation.Event) - } - } case <-c.ctx.Done(): log.Println("Context closed, stop polling...") return nil @@ -169,7 +142,7 @@ func (c *Client) ProcessMessage(message storage.Message) error { return fmt.Errorf("failed to Do operation in FSM: %w", err) } - c.logger.Log("message %s done successfully from %s", message.Event, message.SenderAddr) + c.Logger.Log("message %s done successfully from %s", message.Event, message.SenderAddr) if resp.State == spf.StateSignatureProposalCollected { fsmInstance, err = state_machines.FromDump(fsmDump) @@ -223,7 +196,7 @@ func (c *Client) ProcessMessage(message storage.Message) error { } } default: - c.logger.Log("State %s does not require an operation", resp.State) + c.Logger.Log("State %s does not require an operation", resp.State) } if operation != nil { @@ -291,10 +264,10 @@ func (c *Client) ReadProcessedOperation() error { return fmt.Errorf("failed to unmarshal processed operation") } - return c.handleProcessedOperation(operation) + return c.HandleProcessedOperation(operation) } -func (c *Client) handleProcessedOperation(operation types.Operation) error { +func (c *Client) HandleProcessedOperation(operation types.Operation) error { storedOperation, err := c.state.GetOperationByID(operation.ID) if err != nil { return fmt.Errorf("failed to find matching operation: %w", err) diff --git a/client/client_test.go b/client/client_test.go index 1f0a162..8296b96 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -49,7 +49,6 @@ func TestClient_ProcessMessage(t *testing.T) { stg, keyStore, qrProcessor, - nil, ) req.NoError(err) @@ -132,7 +131,6 @@ func TestClient_GetOperationsList(t *testing.T) { stg, keyStore, qrProcessor, - nil, ) req.NoError(err) @@ -180,7 +178,6 @@ func TestClient_GetOperationQRPath(t *testing.T) { stg, keyStore, qrProcessor, - nil, ) req.NoError(err) @@ -232,7 +229,6 @@ func TestClient_ReadProcessedOperation(t *testing.T) { stg, keyStore, qrProcessor, - nil, ) req.NoError(err) diff --git a/client/http_server.go b/client/http_server.go index d74b7d3..014d876 100644 --- a/client/http_server.go +++ b/client/http_server.go @@ -1,9 +1,15 @@ package client import ( + "crypto/md5" + "encoding/hex" "encoding/json" "fmt" "github.com/depools/dc4bc/client/types" + "github.com/depools/dc4bc/fsm/fsm" + spf "github.com/depools/dc4bc/fsm/state_machines/signature_proposal_fsm" + sif "github.com/depools/dc4bc/fsm/state_machines/signing_proposal_fsm" + "github.com/google/uuid" "image" "io/ioutil" "log" @@ -14,8 +20,8 @@ import ( ) type Response struct { - ErrorMessage string `json:"error_message,omitempty"` - Result interface{} `json:"result"` + ErrorMessage string `json:"error_message,omitempty"` + Result interface{} `json:"result"` } func rawResponse(w http.ResponseWriter, response []byte) { @@ -61,6 +67,10 @@ func (c *Client) StartHTTPServer(listenAddr string) error { mux.HandleFunc("/readProcessedOperation", c.readProcessedOperationFromBodyHandler) mux.HandleFunc("/getOperationQR", c.getOperationQRToBodyHandler) + mux.HandleFunc("/startDKG", c.startDKGHandler) + mux.HandleFunc("/proposeSignMessage", c.proposeSignDataHandler) + + c.Logger.Log("Starting HTTP server on address: %s", listenAddr) return http.ListenAndServe(listenAddr, mux) } @@ -74,6 +84,7 @@ func (c *Client) sendMessageHandler(w http.ResponseWriter, r *http.Request) { errorResponse(w, http.StatusBadRequest, fmt.Sprintf("failed to read request body: %v", err)) return } + defer r.Body.Close() var msg storage.Message if err = json.Unmarshal(reqBytes, &msg); err != nil { @@ -193,10 +204,80 @@ func (c *Client) readProcessedOperationFromBodyHandler(w http.ResponseWriter, r fmt.Sprintf("failed to unmarshal processed operation: %v", err)) return } - if err := c.handleProcessedOperation(operation); err != nil { + if err := c.HandleProcessedOperation(operation); err != nil { errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to handle an operation: %v", err)) return } successResponse(w, "ok") -} \ No newline at end of file +} + +func (c *Client) startDKGHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + errorResponse(w, http.StatusBadRequest, "Wrong HTTP method") + return + } + reqBody, err := ioutil.ReadAll(r.Body) + if err != nil { + errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to read body: %v", err)) + return + } + defer r.Body.Close() + + dkgRoundID := md5.Sum(reqBody) + message, err := c.buildMessage(hex.EncodeToString(dkgRoundID[:]), spf.EventInitProposal, reqBody) + if err != nil { + errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to build message: %v", err)) + return + } + if err = c.SendMessage(*message); err != nil { + errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to send message: %v", err)) + return + } + successResponse(w, "ok") +} + +func (c *Client) proposeSignDataHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + errorResponse(w, http.StatusBadRequest, "Wrong HTTP method") + return + } + reqBody, err := ioutil.ReadAll(r.Body) + if err != nil { + errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to read body: %v", err)) + return + } + defer r.Body.Close() + + var req map[string][]byte + if err = json.Unmarshal(reqBody, &req); err != nil { + errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to umarshal request: %v", err)) + return + } + + message, err := c.buildMessage(hex.EncodeToString(req["dkgID"]), sif.EventSigningStart, req["data"]) + if err != nil { + errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to build message: %v", err)) + } + if err = c.SendMessage(*message); err != nil { + errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to send message: %v", err)) + return + } + successResponse(w, "ok") +} + +func (c *Client) buildMessage(dkgRoundID string, event fsm.Event, data []byte) (*storage.Message, error) { + message := storage.Message{ + ID: uuid.New().String(), + DkgRoundID: dkgRoundID, + Event: string(event), + Data: data, + SenderAddr: c.GetAddr(), + } + signature, err := c.signMessage(message.Bytes()) + if err != nil { + return nil, fmt.Errorf("failed to sign message: %w", err) + } + message.Signature = signature + return &message, nil +} diff --git a/main.go b/main.go index 857894b..142c19a 100644 --- a/main.go +++ b/main.go @@ -1,41 +1,68 @@ package main import ( + "bytes" "context" - "crypto/ed25519" "crypto/md5" - "encoding/hex" "encoding/json" "fmt" _ "image/jpeg" "log" + "net/http" "sync" "time" "github.com/depools/dc4bc/airgapped" "github.com/depools/dc4bc/client" - spf "github.com/depools/dc4bc/fsm/state_machines/signature_proposal_fsm" - sif "github.com/depools/dc4bc/fsm/state_machines/signing_proposal_fsm" "github.com/depools/dc4bc/fsm/types/requests" "github.com/depools/dc4bc/qr" "github.com/depools/dc4bc/storage" - "github.com/google/uuid" ) -type Transport struct { - nodes []*client.Client -} - type node struct { client *client.Client keyPair *client.KeyPair + air *airgapped.AirgappedMachine +} + +func (n *node) run() { + for { + operations, err := n.client.GetOperations() + if err != nil { + n.client.Logger.Log("Failed to get operations: %v", err) + } + + if len(operations) == 0 { + time.Sleep(1 * time.Second) + continue + } + + n.client.Logger.Log("Got %d Operations from pool", len(operations)) + for _, operation := range operations { + n.client.Logger.Log("Handling operation %s in airgapped", operation.Type) + processedOperation, err := n.air.HandleOperation(*operation) + if err != nil { + n.client.Logger.Log("Failed to handle operation: %v", err) + } + + n.client.Logger.Log("Got %d Processed Operations from Airgapped", len(operations)) + n.client.Logger.Log("Operation %s handled in airgapped, result event is %s", + operation.Event, processedOperation.Event) + if err = n.client.HandleProcessedOperation(processedOperation); err != nil { + n.client.Logger.Log("Failed to handle processed operation: %v", err) + } else { + n.client.Logger.Log("Successfully handled processed operation %s", processedOperation.Event) + } + } + } } func main() { var numNodes = 4 var threshold = 3 var storagePath = "/tmp/dc4bc_storage" - var nodes = make([]*node, 4) + var nodes = make([]*node, numNodes) + startingPort := 8080 for nodeID := 0; nodeID < numNodes; nodeID++ { var ctx = context.Background() var userName = fmt.Sprintf("node_%d", nodeID) @@ -71,39 +98,42 @@ func main() { stg, keyStore, qr.NewCameraProcessor(), - airgappedMachine, ) if err != nil { log.Fatalf("node %d failed to init client: %v\n", nodeID, err) } - clt.Airgapped.SetAddress(clt.GetAddr()) + airgappedMachine.SetAddress(clt.GetAddr()) nodes[nodeID] = &node{ client: clt, keyPair: keyPair, + air: airgappedMachine, } } // Each node starts to Poll(). - for nodeID, node := range nodes { + for nodeID, n := range nodes { + go func(nodeID int, node *node, port int) { + if err := node.client.StartHTTPServer(fmt.Sprintf(":%d", port)); err != nil { + log.Fatalf("failed to start HTTP server for nodeID #%d: %v\n", nodeID, err) + } + }(nodeID, n, startingPort) + startingPort++ + go nodes[nodeID].run() + go func(nodeID int, node *client.Client) { if err := node.Poll(); err != nil { log.Fatalf("client %d poller failed: %v\n", nodeID, err) } - }(nodeID, node.client) + }(nodeID, n.client) log.Printf("client %d started...\n", nodeID) } - stg, err := storage.NewFileStorage(storagePath) - if err != nil { - log.Fatalf("main namespace failed to init storage: %v\n", err) - } - // Node1 tells other participants to start DKG. var participants []*requests.SignatureProposalParticipantsEntry for _, node := range nodes { - dkgPubKey, err := node.client.Airgapped.GetPubKey().MarshalBinary() + dkgPubKey, err := node.air.GetPubKey().MarshalBinary() if err != nil { log.Fatalln("failed to get DKG pubKey:", err.Error()) } @@ -123,45 +153,35 @@ func main() { log.Fatalf("failed to marshal SignatureProposalParticipantsListRequest: %v\n", err) } - dkgRoundID := md5.Sum(messageDataBz) - message := storage.Message{ - ID: uuid.New().String(), - DkgRoundID: hex.EncodeToString(dkgRoundID[:]), - Event: string(spf.EventInitProposal), - Data: messageDataBz, - SenderAddr: nodes[0].client.GetAddr(), - } - - message.Signature = ed25519.Sign(nodes[0].keyPair.Priv, message.Bytes()) - if _, err := stg.Send(message); err != nil { - log.Fatalf("Failed to send %+v to storage: %v\n", message, err) + if _, err := http.Post(fmt.Sprintf("http://localhost:%d/startDKG", startingPort-1), + "application/json", bytes.NewReader(messageDataBz)); err != nil { + log.Fatalf("failed to send HTTP request to start DKG: %v\n", err) } // i haven't a better idea to test signing without big changes in the client code time.Sleep(10 * time.Second) log.Println("Propose message to sign") + dkgRoundID := md5.Sum(messageDataBz) messageDataSign := requests.SigningProposalStartRequest{ - ParticipantId: 0, + ParticipantId: len(nodes) - 1, SrcPayload: []byte("message to sign"), CreatedAt: time.Now(), } - messageDataBz, err = json.Marshal(messageDataSign) + messageDataSignBz, err := json.Marshal(messageDataSign) + if err != nil { + log.Fatalf("failed to marshal SigningProposalStartRequest: %v\n", err) + } + + messageDataBz, err = json.Marshal(map[string][]byte{"data": messageDataSignBz, + "dkgID": dkgRoundID[:]}) if err != nil { log.Fatalf("failed to marshal SignatureProposalParticipantsListRequest: %v\n", err) } - message = storage.Message{ - ID: uuid.New().String(), - DkgRoundID: hex.EncodeToString(dkgRoundID[:]), - Event: string(sif.EventSigningStart), - Data: messageDataBz, - SenderAddr: nodes[0].client.GetAddr(), - } - - message.Signature = ed25519.Sign(nodes[0].keyPair.Priv, message.Bytes()) - if _, err := stg.Send(message); err != nil { - log.Fatalf("Failed to send %+v to storage: %v\n", message, err) + if _, err := http.Post(fmt.Sprintf("http://localhost:%d/proposeSignMessage", startingPort-1), + "application/json", bytes.NewReader(messageDataBz)); err != nil { + log.Fatalf("failed to send HTTP request to sign message: %v\n", err) } var wg = sync.WaitGroup{}