dc4bc/client/client.go

432 lines
13 KiB
Go

package client
import (
"context"
"crypto/ed25519"
"encoding/json"
"errors"
"fmt"
"log"
"path/filepath"
"sync"
"time"
"github.com/lidofinance/dc4bc/fsm/types/responses"
sipf "github.com/lidofinance/dc4bc/fsm/state_machines/signing_proposal_fsm"
"github.com/google/uuid"
"github.com/lidofinance/dc4bc/client/types"
"github.com/lidofinance/dc4bc/fsm/types/requests"
spf "github.com/lidofinance/dc4bc/fsm/state_machines/signature_proposal_fsm"
"github.com/lidofinance/dc4bc/fsm/state_machines"
"github.com/lidofinance/dc4bc/fsm/fsm"
dpf "github.com/lidofinance/dc4bc/fsm/state_machines/dkg_proposal_fsm"
"github.com/lidofinance/dc4bc/qr"
"github.com/lidofinance/dc4bc/storage"
)
const (
pollingPeriod = time.Second
QrCodesDir = "/tmp"
)
type Client interface {
Poll() error
GetLogger() *logger
GetPubKey() ed25519.PublicKey
GetUsername() string
SendMessage(message storage.Message) error
ProcessMessage(message storage.Message) error
GetOperations() (map[string]*types.Operation, error)
GetOperationQRPath(operationID string) (string, error)
StartHTTPServer(listenAddr string) error
}
type BaseClient struct {
sync.Mutex
Logger *logger
userName string
pubKey ed25519.PublicKey
ctx context.Context
state State
storage storage.Storage
keyStore KeyStore
qrProcessor qr.Processor
}
func NewClient(
ctx context.Context,
userName string,
state State,
storage storage.Storage,
keyStore KeyStore,
qrProcessor qr.Processor,
) (Client, error) {
keyPair, err := keyStore.LoadKeys(userName, "")
if err != nil {
return nil, fmt.Errorf("failed to LoadKeys: %w", err)
}
return &BaseClient{
ctx: ctx,
Logger: newLogger(userName),
userName: userName,
pubKey: keyPair.Pub,
state: state,
storage: storage,
keyStore: keyStore,
qrProcessor: qrProcessor,
}, nil
}
func (c *BaseClient) GetLogger() *logger {
return c.Logger
}
func (c *BaseClient) GetUsername() string {
return c.userName
}
func (c *BaseClient) GetPubKey() ed25519.PublicKey {
return c.pubKey
}
// Poll is a main client loop, which gets new messages from an append-only log and processes them
func (c *BaseClient) Poll() error {
tk := time.NewTicker(pollingPeriod)
for {
select {
case <-tk.C:
offset, err := c.state.LoadOffset()
if err != nil {
panic(err)
}
messages, err := c.storage.GetMessages(offset)
if err != nil {
return fmt.Errorf("failed to GetMessages: %w", err)
}
for _, message := range messages {
if message.RecipientAddr == "" || message.RecipientAddr == c.GetUsername() {
c.Logger.Log("Handling message with offset %d, type %s", message.Offset, message.Event)
if err := c.ProcessMessage(message); err != nil {
c.Logger.Log("Failed to process message with offset %d: %v", message.Offset, err)
} else {
c.Logger.Log("Successfully processed message with offset %d, type %s",
message.Offset, message.Event)
}
}
}
case <-c.ctx.Done():
log.Println("Context closed, stop polling...")
return nil
}
}
}
func (c *BaseClient) SendMessage(message storage.Message) error {
if _, err := c.storage.Send(message); err != nil {
return fmt.Errorf("failed to post message: %w", err)
}
return nil
}
// processSignature saves a broadcasted reconstructed signature to a LevelDB
func (c *BaseClient) processSignature(message storage.Message) error {
var (
signature types.ReconstructedSignature
err error
)
if err = json.Unmarshal(message.Data, &signature); err != nil {
return fmt.Errorf("failed to unmarshal reconstructed signature: %w", err)
}
signature.Username = message.SenderAddr
signature.DKGRoundID = message.DkgRoundID
return c.state.SaveSignature(signature)
}
func (c *BaseClient) ProcessMessage(message storage.Message) error {
// save broadcasted reconstructed signature
if fsm.Event(message.Event) == types.SignatureReconstructed {
if err := c.processSignature(message); err != nil {
return fmt.Errorf("failed to process signature: %w", err)
}
if err := c.state.SaveOffset(message.Offset + 1); err != nil {
return fmt.Errorf("failed to SaveOffset: %w", err)
}
return nil
}
// save signing data to the same storage as we save signatures
// This allows easy to view signing data by CLI-command
if fsm.Event(message.Event) == sipf.EventSigningStart {
if err := c.processSignature(message); err != nil {
return fmt.Errorf("failed to process signature: %w", err)
}
}
fsmInstance, err := c.getFSMInstance(message.DkgRoundID)
if err != nil {
return fmt.Errorf("failed to getFSMInstance: %w", err)
}
// we can't verify a message at this moment, cause we don't have public keys of participantss
if fsm.Event(message.Event) != spf.EventInitProposal {
if err := c.verifyMessage(fsmInstance, message); err != nil {
return fmt.Errorf("failed to verifyMessage %+v: %w", message, err)
}
}
fsmReq, err := types.FSMRequestFromMessage(message)
if err != nil {
return fmt.Errorf("failed to get FSMRequestFromMessage: %v", err)
}
resp, fsmDump, err := fsmInstance.Do(fsm.Event(message.Event), fsmReq)
if err != nil {
return fmt.Errorf("failed to Do operation in FSM: %w", err)
}
c.Logger.Log("message %s done successfully from %s", message.Event, message.SenderAddr)
// switch FSM state by hand due to implementation specifics
if resp.State == spf.StateSignatureProposalCollected {
fsmInstance, err = state_machines.FromDump(fsmDump)
if err != nil {
return fmt.Errorf("failed get state_machines from dump: %w", err)
}
resp, fsmDump, err = fsmInstance.Do(dpf.EventDKGInitProcess, requests.DefaultRequest{
CreatedAt: time.Now(),
})
if err != nil {
return fmt.Errorf("failed to Do operation in FSM: %w", err)
}
}
if resp.State == dpf.StateDkgMasterKeyCollected {
fsmInstance, err = state_machines.FromDump(fsmDump)
if err != nil {
return fmt.Errorf("failed get state_machines from dump: %w", err)
}
resp, fsmDump, err = fsmInstance.Do(sipf.EventSigningInit, requests.DefaultRequest{
CreatedAt: time.Now(),
})
if err != nil {
return fmt.Errorf("failed to Do operation in FSM: %w", err)
}
}
var operation *types.Operation
switch resp.State {
// if the new state is waiting for RPC to airgapped machine
case
spf.StateAwaitParticipantsConfirmations,
dpf.StateDkgCommitsAwaitConfirmations,
dpf.StateDkgDealsAwaitConfirmations,
dpf.StateDkgResponsesAwaitConfirmations,
dpf.StateDkgMasterKeyAwaitConfirmations,
sipf.StateSigningAwaitPartialSigns,
sipf.StateSigningPartialSignsCollected,
sipf.StateSigningAwaitConfirmations:
if resp.Data != nil {
// if we are initiator of signing, then we don't need to confirm our participation
if data, ok := resp.Data.(responses.SigningProposalParticipantInvitationsResponse); ok {
initiator, err := fsmInstance.SigningQuorumGetParticipant(data.InitiatorId)
if err != nil {
return fmt.Errorf("failed to get SigningQuorumParticipant: %w", err)
}
if initiator.Username == c.GetUsername() {
break
}
}
bz, err := json.Marshal(resp.Data)
if err != nil {
return fmt.Errorf("failed to marshal FSM response: %w", err)
}
operation = &types.Operation{
ID: uuid.New().String(),
Type: types.OperationType(resp.State),
Payload: bz,
DKGIdentifier: message.DkgRoundID,
CreatedAt: time.Now(),
}
}
default:
c.Logger.Log("State %s does not require an operation", resp.State)
}
// switch FSM state by hand due to implementation specifics
if resp.State == sipf.StateSigningPartialSignsCollected {
fsmInstance, err = state_machines.FromDump(fsmDump)
if err != nil {
return fmt.Errorf("failed get state_machines from dump: %w", err)
}
resp, fsmDump, err = fsmInstance.Do(sipf.EventSigningRestart, requests.DefaultRequest{
CreatedAt: time.Now(),
})
if err != nil {
return fmt.Errorf("failed to Do operation in FSM: %w", err)
}
}
if operation != nil {
if err := c.state.PutOperation(operation); err != nil {
return fmt.Errorf("failed to PutOperation: %w", err)
}
}
if err := c.state.SaveOffset(message.Offset + 1); err != nil {
return fmt.Errorf("failed to SaveOffset: %w", err)
}
if err := c.state.SaveFSM(message.DkgRoundID, fsmDump); err != nil {
return fmt.Errorf("failed to SaveFSM: %w", err)
}
return nil
}
func (c *BaseClient) GetOperations() (map[string]*types.Operation, error) {
return c.state.GetOperations()
}
//GetSignatures returns all signatures for the given DKG round that were reconstructed on the airgapped machine and
// broadcasted by users
func (c *BaseClient) GetSignatures(dkgID string) (map[string][]types.ReconstructedSignature, error) {
return c.state.GetSignatures(dkgID)
}
//GetSignatureByDataHash returns a list of reconstructed signatures of the signed data broadcasted by users
func (c *BaseClient) GetSignatureByID(dkgID, sigID string) ([]types.ReconstructedSignature, error) {
return c.state.GetSignatureByID(dkgID, sigID)
}
// getOperationJSON returns a specific JSON-encoded operation
func (c *BaseClient) getOperationJSON(operationID string) ([]byte, error) {
operation, err := c.state.GetOperationByID(operationID)
if err != nil {
return nil, fmt.Errorf("failed to get operation: %w", err)
}
operationJSON, err := json.Marshal(operation)
if err != nil {
return nil, fmt.Errorf("failed to marshal operation: %w", err)
}
return operationJSON, nil
}
// GetOperationQRPath returns a path to the image with the QR generated
// for the specified operation. It is supposed that the user will open
// this file herself.
func (c *BaseClient) GetOperationQRPath(operationID string) (string, error) {
operationJSON, err := c.getOperationJSON(operationID)
if err != nil {
return "", fmt.Errorf("failed to get operation in JSON: %w", err)
}
operationQRPath := filepath.Join(QrCodesDir, fmt.Sprintf("dc4bc_qr_%s", operationID))
qrPath := fmt.Sprintf("%s.gif", operationQRPath)
if err = c.qrProcessor.WriteQR(qrPath, operationJSON); err != nil {
return "", err
}
return qrPath, nil
}
// handleProcessedOperation handles an operation which was processed by the airgapped machine
// It checks that the operation exists in an operation pool, signs the operation, sends it to an append-only log and
// deletes it from the pool.
func (c *BaseClient) handleProcessedOperation(operation types.Operation) error {
storedOperation, err := c.state.GetOperationByID(operation.ID)
if err != nil {
return fmt.Errorf("failed to find matching operation: %w", err)
}
if err := storedOperation.Check(&operation); err != nil {
return fmt.Errorf("processed operation does not match stored operation: %w", err)
}
for _, message := range operation.ResultMsgs {
message.SenderAddr = c.GetUsername()
sig, err := c.signMessage(message.Bytes())
if err != nil {
return fmt.Errorf("failed to sign a message: %w", err)
}
message.Signature = sig
if _, err := c.storage.Send(message); err != nil {
return fmt.Errorf("failed to post message: %w", err)
}
}
if err := c.state.DeleteOperation(operation.ID); err != nil {
return fmt.Errorf("failed to DeleteOperation: %w", err)
}
return nil
}
// getFSMInstance returns a FSM for a necessary DKG round.
func (c *BaseClient) getFSMInstance(dkgRoundID string) (*state_machines.FSMInstance, error) {
var err error
fsmInstance, ok, err := c.state.LoadFSM(dkgRoundID)
if err != nil {
return nil, fmt.Errorf("failed to LoadFSM: %w", err)
}
if !ok {
fsmInstance, err = state_machines.Create(dkgRoundID)
if err != nil {
return nil, fmt.Errorf("failed to create FSM instance: %w", err)
}
bz, err := fsmInstance.Dump()
if err != nil {
return nil, fmt.Errorf("failed to Dump FSM instance: %w", err)
}
if err := c.state.SaveFSM(dkgRoundID, bz); err != nil {
return nil, fmt.Errorf("failed to SaveFSM: %w", err)
}
}
return fsmInstance, nil
}
func (c *BaseClient) signMessage(message []byte) ([]byte, error) {
keyPair, err := c.keyStore.LoadKeys(c.userName, "")
if err != nil {
return nil, fmt.Errorf("failed to LoadKeys: %w", err)
}
return ed25519.Sign(keyPair.Priv, message), nil
}
func (c *BaseClient) verifyMessage(fsmInstance *state_machines.FSMInstance, message storage.Message) error {
senderPubKey, err := fsmInstance.GetPubKeyByUsername(message.SenderAddr)
if err != nil {
return fmt.Errorf("failed to GetPubKeyByUsername: %w", err)
}
if !ed25519.Verify(senderPubKey, message.Bytes(), message.Signature) {
return errors.New("signature is corrupt")
}
return nil
}
func (c *BaseClient) GetFSMDump(dkgID string) (*state_machines.FSMDump, error) {
fsmInstance, err := c.getFSMInstance(dkgID)
if err != nil {
return nil, fmt.Errorf("failed to get FSM instance for DKG round ID %s: %w", dkgID, err)
}
return fsmInstance.FSMDump(), nil
}