diff --git a/client/client.go b/client/client.go index 739dc11..e16696d 100644 --- a/client/client.go +++ b/client/client.go @@ -12,7 +12,6 @@ import ( "sync" "time" - "github.com/depools/dc4bc/airgapped" "github.com/depools/dc4bc/client/types" "github.com/depools/dc4bc/fsm/types/requests" "github.com/google/uuid" @@ -33,9 +32,22 @@ const ( QrCodesDir = "/tmp" ) +type Poller interface { + GetAddr() string + GetPubKey() ed25519.PublicKey + Poll() error + SendMessage(message storage.Message) error + ProcessMessage(message storage.Message) error + GetOperations() (map[string]*types.Operation, error) + GetOperationQRPath(operationID string) (string, error) + ReadProcessedOperation() error + StartHTTPServer(listenAddr string) error + GetLogger() *logger +} + type Client struct { sync.Mutex - logger *logger + Logger *logger userName string address string pubKey ed25519.PublicKey @@ -44,7 +56,6 @@ type Client struct { storage storage.Storage keyStore KeyStore qrProcessor qr.Processor - Airgapped *airgapped.AirgappedMachine } func NewClient( @@ -54,8 +65,7 @@ func NewClient( storage storage.Storage, keyStore KeyStore, qrProcessor qr.Processor, - airgappedMachine *airgapped.AirgappedMachine, -) (*Client, error) { +) (Poller, error) { keyPair, err := keyStore.LoadKeys(userName, "") if err != nil { return nil, fmt.Errorf("failed to LoadKeys: %w", err) @@ -63,7 +73,7 @@ func NewClient( return &Client{ ctx: ctx, - logger: newLogger(userName), + Logger: newLogger(userName), userName: userName, address: keyPair.GetAddr(), pubKey: keyPair.Pub, @@ -71,10 +81,13 @@ func NewClient( storage: storage, keyStore: keyStore, qrProcessor: qrProcessor, - Airgapped: airgappedMachine, }, nil } +func (c *Client) GetLogger() *logger { + return c.Logger +} + func (c *Client) GetAddr() string { return c.address } @@ -100,38 +113,15 @@ func (c *Client) Poll() error { for _, message := range messages { if message.RecipientAddr == "" || message.RecipientAddr == c.GetAddr() { - c.logger.Log("Handling message with offset %d, type %s", message.Offset, message.Event) + c.Logger.Log("Handling message with offset %d, type %s", message.Offset, message.Event) if err := c.ProcessMessage(message); err != nil { - c.logger.Log("Failed to process message: %v", err) + c.Logger.Log("Failed to process message: %v", err) } else { - c.logger.Log("Successfully processed message with offset %d, type %s", + c.Logger.Log("Successfully processed message with offset %d, type %s", message.Offset, message.Event) } } } - - operations, err := c.GetOperations() - if err != nil { - c.logger.Log("Failed to get operations: %v", err) - } - - c.logger.Log("Got %d Operations from pool", len(operations)) - for _, operation := range operations { - c.logger.Log("Handling operation %s in airgapped", operation.Type) - processedOperation, err := c.Airgapped.HandleOperation(*operation) - if err != nil { - c.logger.Log("Failed to handle operation: %v", err) - } - - c.logger.Log("Got %d Processed Operations from Airgapped", len(operations)) - c.logger.Log("Operation %s handled in airgapped, result event is %s", - operation.Event, processedOperation.Event) - if err = c.handleProcessedOperation(processedOperation); err != nil { - c.logger.Log("Failed to handle processed operation: %v", err) - } else { - c.logger.Log("Successfully handled processed operation %s", processedOperation.Event) - } - } case <-c.ctx.Done(): log.Println("Context closed, stop polling...") return nil @@ -169,7 +159,7 @@ func (c *Client) ProcessMessage(message storage.Message) error { return fmt.Errorf("failed to Do operation in FSM: %w", err) } - c.logger.Log("message %s done successfully from %s", message.Event, message.SenderAddr) + c.Logger.Log("message %s done successfully from %s", message.Event, message.SenderAddr) if resp.State == spf.StateSignatureProposalCollected { fsmInstance, err = state_machines.FromDump(fsmDump) @@ -223,7 +213,7 @@ func (c *Client) ProcessMessage(message storage.Message) error { } } default: - c.logger.Log("State %s does not require an operation", resp.State) + c.Logger.Log("State %s does not require an operation", resp.State) } if operation != nil { diff --git a/client/client_test.go b/client/client_test.go index 1f0a162..8296b96 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -49,7 +49,6 @@ func TestClient_ProcessMessage(t *testing.T) { stg, keyStore, qrProcessor, - nil, ) req.NoError(err) @@ -132,7 +131,6 @@ func TestClient_GetOperationsList(t *testing.T) { stg, keyStore, qrProcessor, - nil, ) req.NoError(err) @@ -180,7 +178,6 @@ func TestClient_GetOperationQRPath(t *testing.T) { stg, keyStore, qrProcessor, - nil, ) req.NoError(err) @@ -232,7 +229,6 @@ func TestClient_ReadProcessedOperation(t *testing.T) { stg, keyStore, qrProcessor, - nil, ) req.NoError(err) diff --git a/client/http_server.go b/client/http_server.go index f6782ea..3175cf7 100644 --- a/client/http_server.go +++ b/client/http_server.go @@ -1,9 +1,15 @@ package client import ( + "crypto/md5" + "encoding/hex" "encoding/json" "fmt" "github.com/depools/dc4bc/client/types" + "github.com/depools/dc4bc/fsm/fsm" + spf "github.com/depools/dc4bc/fsm/state_machines/signature_proposal_fsm" + sif "github.com/depools/dc4bc/fsm/state_machines/signing_proposal_fsm" + "github.com/google/uuid" "image" "io/ioutil" "log" @@ -13,16 +19,40 @@ import ( "github.com/depools/dc4bc/storage" ) -func errorResponse(w http.ResponseWriter, statusCode int, err string) { - log.Println(err) - w.WriteHeader(statusCode) - if _, err := w.Write([]byte(err)); err != nil { +type Response struct { + ErrorMessage string `json:"error_message,omitempty"` + Result interface{} `json:"result"` +} + +func rawResponse(w http.ResponseWriter, response []byte) { + if _, err := w.Write(response); err != nil { panic(fmt.Sprintf("failed to write response: %v", err)) } } -func successResponse(w http.ResponseWriter, response []byte) { - if _, err := w.Write(response); err != nil { +func errorResponse(w http.ResponseWriter, statusCode int, error string) { + w.WriteHeader(statusCode) + w.Header().Set("Content-Type", "application/json") + resp := Response{ErrorMessage: error} + respBz, err := json.Marshal(resp) + if err != nil { + log.Printf("Failed to marshal response: %v\n", err) + return + } + if _, err := w.Write(respBz); err != nil { + panic(fmt.Sprintf("failed to write response: %v", err)) + } +} + +func successResponse(w http.ResponseWriter, response interface{}) { + w.Header().Set("Content-Type", "application/json") + resp := Response{Result: response} + respBz, err := json.Marshal(resp) + if err != nil { + log.Printf("Failed to marshal response: %v\n", err) + return + } + if _, err := w.Write(respBz); err != nil { panic(fmt.Sprintf("failed to write response: %v", err)) } } @@ -36,7 +66,12 @@ func (c *Client) StartHTTPServer(listenAddr string) error { mux.HandleFunc("/readProcessedOperation", c.readProcessedOperationFromBodyHandler) mux.HandleFunc("/getOperationQR", c.getOperationQRToBodyHandler) + mux.HandleFunc("/handleProcessedOperationJSON", c.handleJSONOperationHandler) + mux.HandleFunc("/startDKG", c.startDKGHandler) + mux.HandleFunc("/proposeSignMessage", c.proposeSignDataHandler) + + c.Logger.Log("Starting HTTP server on address: %s", listenAddr) return http.ListenAndServe(listenAddr, mux) } @@ -50,6 +85,7 @@ func (c *Client) sendMessageHandler(w http.ResponseWriter, r *http.Request) { errorResponse(w, http.StatusBadRequest, fmt.Sprintf("failed to read request body: %v", err)) return } + defer r.Body.Close() var msg storage.Message if err = json.Unmarshal(reqBytes, &msg); err != nil { @@ -62,7 +98,7 @@ func (c *Client) sendMessageHandler(w http.ResponseWriter, r *http.Request) { return } - successResponse(w, []byte("ok")) + successResponse(w, "ok") } func (c *Client) getOperationsHandler(w http.ResponseWriter, r *http.Request) { @@ -77,13 +113,7 @@ func (c *Client) getOperationsHandler(w http.ResponseWriter, r *http.Request) { return } - response, err := json.Marshal(operations) - if err != nil { - errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to marshal operations: %v", err)) - return - } - - successResponse(w, response) + successResponse(w, operations) } func (c *Client) getOperationQRPathHandler(w http.ResponseWriter, r *http.Request) { @@ -99,7 +129,7 @@ func (c *Client) getOperationQRPathHandler(w http.ResponseWriter, r *http.Reques return } - successResponse(w, []byte(qrPath)) + successResponse(w, qrPath) } func (c *Client) getOperationQRToBodyHandler(w http.ResponseWriter, r *http.Request) { @@ -123,7 +153,7 @@ func (c *Client) getOperationQRToBodyHandler(w http.ResponseWriter, r *http.Requ w.Header().Set("Content-Type", "image/jpeg") w.Header().Set("Content-Length", fmt.Sprintf("%d", len(encodedData))) - successResponse(w, encodedData) + rawResponse(w, encodedData) } func (c *Client) readProcessedOperationFromCameraHandler(w http.ResponseWriter, r *http.Request) { @@ -138,7 +168,7 @@ func (c *Client) readProcessedOperationFromCameraHandler(w http.ResponseWriter, return } - successResponse(w, []byte("ok")) + successResponse(w, "ok") } func (c *Client) readProcessedOperationFromBodyHandler(w http.ResponseWriter, r *http.Request) { @@ -179,4 +209,102 @@ func (c *Client) readProcessedOperationFromBodyHandler(w http.ResponseWriter, r errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to handle an operation: %v", err)) return } + + successResponse(w, "ok") +} + +func (c *Client) startDKGHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + errorResponse(w, http.StatusBadRequest, "Wrong HTTP method") + return + } + reqBody, err := ioutil.ReadAll(r.Body) + if err != nil { + errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to read body: %v", err)) + return + } + defer r.Body.Close() + + dkgRoundID := md5.Sum(reqBody) + message, err := c.buildMessage(hex.EncodeToString(dkgRoundID[:]), spf.EventInitProposal, reqBody) + if err != nil { + errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to build message: %v", err)) + return + } + if err = c.SendMessage(*message); err != nil { + errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to send message: %v", err)) + return + } + successResponse(w, "ok") +} + +func (c *Client) proposeSignDataHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + errorResponse(w, http.StatusBadRequest, "Wrong HTTP method") + return + } + reqBody, err := ioutil.ReadAll(r.Body) + if err != nil { + errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to read body: %v", err)) + return + } + defer r.Body.Close() + + var req map[string][]byte + if err = json.Unmarshal(reqBody, &req); err != nil { + errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to umarshal request: %v", err)) + return + } + + message, err := c.buildMessage(hex.EncodeToString(req["dkgID"]), sif.EventSigningStart, req["data"]) + if err != nil { + errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to build message: %v", err)) + } + if err = c.SendMessage(*message); err != nil { + errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to send message: %v", err)) + return + } + successResponse(w, "ok") +} + +func (c *Client) handleJSONOperationHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + errorResponse(w, http.StatusBadRequest, "Wrong HTTP method") + return + } + reqBody, err := ioutil.ReadAll(r.Body) + if err != nil { + errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to read body: %v", err)) + return + } + defer r.Body.Close() + + var req types.Operation + if err = json.Unmarshal(reqBody, &req); err != nil { + errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to umarshal request: %v", err)) + return + } + + if err = c.handleProcessedOperation(req); err != nil { + errorResponse(w, http.StatusInternalServerError, fmt.Sprintf("failed to handle processed operation: %v", err)) + return + } + + successResponse(w, "ok") +} + +func (c *Client) buildMessage(dkgRoundID string, event fsm.Event, data []byte) (*storage.Message, error) { + message := storage.Message{ + ID: uuid.New().String(), + DkgRoundID: dkgRoundID, + Event: string(event), + Data: data, + SenderAddr: c.GetAddr(), + } + signature, err := c.signMessage(message.Bytes()) + if err != nil { + return nil, fmt.Errorf("failed to sign message: %w", err) + } + message.Signature = signature + return &message, nil } diff --git a/main.go b/main.go index 857894b..ccf83bc 100644 --- a/main.go +++ b/main.go @@ -1,41 +1,122 @@ package main import ( + "bytes" "context" - "crypto/ed25519" "crypto/md5" - "encoding/hex" "encoding/json" "fmt" + "github.com/depools/dc4bc/client/types" _ "image/jpeg" + "io/ioutil" "log" + "net/http" "sync" "time" "github.com/depools/dc4bc/airgapped" "github.com/depools/dc4bc/client" - spf "github.com/depools/dc4bc/fsm/state_machines/signature_proposal_fsm" - sif "github.com/depools/dc4bc/fsm/state_machines/signing_proposal_fsm" "github.com/depools/dc4bc/fsm/types/requests" "github.com/depools/dc4bc/qr" "github.com/depools/dc4bc/storage" - "github.com/google/uuid" ) -type Transport struct { - nodes []*client.Client +type node struct { + client client.Poller + keyPair *client.KeyPair + air *airgapped.AirgappedMachine + listenAddr string } -type node struct { - client *client.Client - keyPair *client.KeyPair +type OperationsResponse struct { + Result map[string]*types.Operation `json:"result"` +} + +func getOperations(url string) (*OperationsResponse, error) { + resp, err := http.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to get operations for node %w", err) + } + defer resp.Body.Close() + responseBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Fatalf("failed to read body %v", err) + } + + var response OperationsResponse + if err = json.Unmarshal(responseBody, &response); err != nil { + return nil, fmt.Errorf("failed to unmarshal response: %v", err) + } + return &response, nil +} + +func handleProcessedOperation(url string, operation types.Operation) error { + operationBz, err := json.Marshal(operation) + if err != nil { + return fmt.Errorf("failed to marshal operation: %w", err) + } + resp, err := http.Post(url, "application/json", bytes.NewReader(operationBz)) + if err != nil { + return fmt.Errorf("failed to handle processed operation %w", err) + } + defer resp.Body.Close() + responseBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Fatalf("failed to read body %v", err) + } + + var response client.Response + if err = json.Unmarshal(responseBody, &response); err != nil { + return fmt.Errorf("failed to unmarshal response: %w", err) + } + if response.ErrorMessage != "" { + return fmt.Errorf("failed to handle processed operation: %s", response.ErrorMessage) + } + return nil +} + +func (n *node) run() { + for { + operationsResponse, err := getOperations(fmt.Sprintf("http://%s/getOperations", n.listenAddr)) + if err != nil { + log.Fatalf("failed to get operations: %v", err) + } + + operations := operationsResponse.Result + if len(operations) == 0 { + time.Sleep(1 * time.Second) + continue + } + + n.client.GetLogger().Log("Got %d Operations from pool", len(operations)) + for _, operation := range operations { + n.client.GetLogger().Log("Handling operation %s in airgapped", operation.Type) + processedOperation, err := n.air.HandleOperation(*operation) + if err != nil { + n.client.GetLogger().Log("Failed to handle operation: %v", err) + } + + n.client.GetLogger().Log("Got %d Processed Operations from Airgapped", len(operations)) + n.client.GetLogger().Log("Operation %s handled in airgapped, result event is %s", + operation.Event, processedOperation.Event) + + if err = handleProcessedOperation(fmt.Sprintf("http://%s/handleProcessedOperationJSON", n.listenAddr), + processedOperation); err != nil { + n.client.GetLogger().Log("Failed to handle processed operation: %v", err) + } else { + n.client.GetLogger().Log("Successfully handled processed operation %s", processedOperation.Event) + } + + } + } } func main() { var numNodes = 4 var threshold = 3 var storagePath = "/tmp/dc4bc_storage" - var nodes = make([]*node, 4) + var nodes = make([]*node, numNodes) + startingPort := 8080 for nodeID := 0; nodeID < numNodes; nodeID++ { var ctx = context.Background() var userName = fmt.Sprintf("node_%d", nodeID) @@ -71,39 +152,43 @@ func main() { stg, keyStore, qr.NewCameraProcessor(), - airgappedMachine, ) if err != nil { log.Fatalf("node %d failed to init client: %v\n", nodeID, err) } - clt.Airgapped.SetAddress(clt.GetAddr()) + airgappedMachine.SetAddress(clt.GetAddr()) nodes[nodeID] = &node{ - client: clt, - keyPair: keyPair, + client: clt, + keyPair: keyPair, + air: airgappedMachine, + listenAddr: fmt.Sprintf("localhost:%d", startingPort), } + startingPort++ } // Each node starts to Poll(). - for nodeID, node := range nodes { - go func(nodeID int, node *client.Client) { + for nodeID, n := range nodes { + go func(nodeID int, node *node) { + if err := node.client.StartHTTPServer(node.listenAddr); err != nil { + log.Fatalf("failed to start HTTP server for nodeID #%d: %v\n", nodeID, err) + } + }(nodeID, n) + go nodes[nodeID].run() + + go func(nodeID int, node client.Poller) { if err := node.Poll(); err != nil { log.Fatalf("client %d poller failed: %v\n", nodeID, err) } - }(nodeID, node.client) + }(nodeID, n.client) log.Printf("client %d started...\n", nodeID) } - stg, err := storage.NewFileStorage(storagePath) - if err != nil { - log.Fatalf("main namespace failed to init storage: %v\n", err) - } - // Node1 tells other participants to start DKG. var participants []*requests.SignatureProposalParticipantsEntry for _, node := range nodes { - dkgPubKey, err := node.client.Airgapped.GetPubKey().MarshalBinary() + dkgPubKey, err := node.air.GetPubKey().MarshalBinary() if err != nil { log.Fatalln("failed to get DKG pubKey:", err.Error()) } @@ -123,45 +208,35 @@ func main() { log.Fatalf("failed to marshal SignatureProposalParticipantsListRequest: %v\n", err) } - dkgRoundID := md5.Sum(messageDataBz) - message := storage.Message{ - ID: uuid.New().String(), - DkgRoundID: hex.EncodeToString(dkgRoundID[:]), - Event: string(spf.EventInitProposal), - Data: messageDataBz, - SenderAddr: nodes[0].client.GetAddr(), - } - - message.Signature = ed25519.Sign(nodes[0].keyPair.Priv, message.Bytes()) - if _, err := stg.Send(message); err != nil { - log.Fatalf("Failed to send %+v to storage: %v\n", message, err) + if _, err := http.Post(fmt.Sprintf("http://localhost:%d/startDKG", startingPort-1), + "application/json", bytes.NewReader(messageDataBz)); err != nil { + log.Fatalf("failed to send HTTP request to start DKG: %v\n", err) } // i haven't a better idea to test signing without big changes in the client code time.Sleep(10 * time.Second) log.Println("Propose message to sign") + dkgRoundID := md5.Sum(messageDataBz) messageDataSign := requests.SigningProposalStartRequest{ - ParticipantId: 0, + ParticipantId: len(nodes) - 1, SrcPayload: []byte("message to sign"), CreatedAt: time.Now(), } - messageDataBz, err = json.Marshal(messageDataSign) + messageDataSignBz, err := json.Marshal(messageDataSign) + if err != nil { + log.Fatalf("failed to marshal SigningProposalStartRequest: %v\n", err) + } + + messageDataBz, err = json.Marshal(map[string][]byte{"data": messageDataSignBz, + "dkgID": dkgRoundID[:]}) if err != nil { log.Fatalf("failed to marshal SignatureProposalParticipantsListRequest: %v\n", err) } - message = storage.Message{ - ID: uuid.New().String(), - DkgRoundID: hex.EncodeToString(dkgRoundID[:]), - Event: string(sif.EventSigningStart), - Data: messageDataBz, - SenderAddr: nodes[0].client.GetAddr(), - } - - message.Signature = ed25519.Sign(nodes[0].keyPair.Priv, message.Bytes()) - if _, err := stg.Send(message); err != nil { - log.Fatalf("Failed to send %+v to storage: %v\n", message, err) + if _, err := http.Post(fmt.Sprintf("http://localhost:%d/proposeSignMessage", startingPort-1), + "application/json", bytes.NewReader(messageDataBz)); err != nil { + log.Fatalf("failed to send HTTP request to sign message: %v\n", err) } var wg = sync.WaitGroup{}