diff --git a/.gitignore b/.gitignore index 66fd13c..398baf2 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,5 @@ # Dependency directories (remove the comment below to include it) # vendor/ + +.idea diff --git a/fsm/cmd/state_machines/state_machines.go b/fsm/cmd/state_machines/state_machines.go new file mode 100644 index 0000000..3d5e3ed --- /dev/null +++ b/fsm/cmd/state_machines/state_machines.go @@ -0,0 +1,97 @@ +package main + +import ( + "fmt" + "github.com/looplab/fsm" +) + +func main() { + signatureProposalFSM := fsm.NewFSM( + "idle", + fsm.Events{ + {Name: "proposal_spotted", Src: []string{"idle"}, Dst: "validate_proposal"}, + {Name: "proposal_valid", Src: []string{"validate_proposal"}, Dst: "proposed"}, + {Name: "proposal_invalid", Src: []string{"validate_proposal"}, Dst: "idle"}, + {Name: "recieve_yay", Src: []string{"proposed"}, Dst: "process_yay"}, + {Name: "receive_nay", Src: []string{"proposed"}, Dst: "process_nay"}, + {Name: "send_nay", Src: []string{"proposed"}, Dst: "proposed"}, + {Name: "send_yay", Src: []string{"proposed"}, Dst: "proposed"}, + {Name: "enough_yays", Src: []string{"process_yay"}, Dst: "signing"}, + {Name: "enough_nays", Src: []string{"process_nay"}, Dst: "abort"}, + {Name: "not_enough_yays", Src: []string{"process_yay"}, Dst: "proposed"}, + {Name: "not_enough_nays", Src: []string{"process_nay"}, Dst: "proposed"}, + }, + fsm.Callbacks{}, + ) + fmt.Print(fsm.Visualize(signatureProposalFSM)) + + signatureConstructFSM := fsm.NewFSM( + "idle", + fsm.Events{ + {Name: "request_airgapped_sig", Src: []string{"signing"}, Dst: "signing"}, + {Name: "transmit_airgapped_sig", Src: []string{"signing"}, Dst: "signing"}, + {Name: "receive_sig", Src: []string{"signing"}, Dst: "process_sig"}, + {Name: "enough_signature_shares", Src: []string{"process_sig"}, Dst: "reconstruct_signature"}, + {Name: "not_enough_signature_shares", Src: []string{"process_sig"}, Dst: "signing"}, + {Name: "signature_reconstucted", Src: []string{"reconstruct_signature"}, Dst: "publish_signature"}, + {Name: "signature_published", Src: []string{"publish_signature"}, Dst: "fin"}, + {Name: "failed_to_reconstuct_signature", Src: []string{"reconstruct_signature"}, Dst: "signing"}, + }, + fsm.Callbacks{}, + ) + fmt.Print(fsm.Visualize(signatureConstructFSM)) + + DkgProposeFSM := fsm.NewFSM( + "idle", + fsm.Events{ + {Name: "proposal_spotted", Src: []string{"idle"}, Dst: "validate_proposal"}, + {Name: "proposal_valid", Src: []string{"validate_proposal"}, Dst: "proposed"}, + {Name: "proposal_invalid", Src: []string{"validate_proposal"}, Dst: "idle"}, + {Name: "recieve_yay", Src: []string{"proposed"}, Dst: "process_yay"}, + {Name: "receive_nay", Src: []string{"proposed"}, Dst: "abort"}, + {Name: "send_nay", Src: []string{"proposed"}, Dst: "proposed"}, + {Name: "send_yay", Src: []string{"proposed"}, Dst: "proposed"}, + {Name: "not_enough_yays", Src: []string{"process_yay"}, Dst: "proposed"}, + {Name: "all_yays", Src: []string{"process_yay"}, Dst: "dkg_commitments"}, + {Name: "timeout", Src: []string{"proposed"}, Dst: "abort"}, + }, + fsm.Callbacks{}, + ) + fmt.Print(fsm.Visualize(DkgProposeFSM)) + + DkgCommitFSM := fsm.NewFSM( + "dkg_commitments", + fsm.Events{ + {Name: "request_airgapped_commitment", Src: []string{"dkg_commitments"}, Dst: "dkg_commitments"}, + {Name: "transmit_airgapped_commitment", Src: []string{"dkg_commitments"}, Dst: "dkg_commitments"}, + {Name: "recieve_commitment", Src: []string{"dkg_commitments"}, Dst: "process_commitment"}, + {Name: "invalid_commitment", Src: []string{"process_commitment"}, Dst: "abort"}, + {Name: "all_commitments", Src: []string{"process_commitment"}, Dst: "dkg_deals"}, + {Name: "not_enough_commitments", Src: []string{"process_commitment"}, Dst: "dkg_commitments"}, + {Name: "timeout", Src: []string{"dkg_commitments"}, Dst: "abort"}, + }, + fsm.Callbacks{}, + ) + fmt.Print(fsm.Visualize(DkgCommitFSM)) + + DkgDealsFSM := fsm.NewFSM( + "dkg_deals", + fsm.Events{ + {Name: "pass_commitements_and_request_airgapped_deals", Src: []string{"dkg_deals"}, Dst: "dkg_deals"}, + {Name: "transmit_airgapped_deals", Src: []string{"dkg_deals"}, Dst: "dkg_deals"}, + {Name: "transmit_airgapped_error", Src: []string{"dkg_deals"}, Dst: "abort"}, + {Name: "recieve_deal", Src: []string{"dkg_deals"}, Dst: "process_deal"}, + {Name: "not_my_deal", Src: []string{"process_deal"}, Dst: "dkg_deals"}, + {Name: "invalid_deal", Src: []string{"process_deal"}, Dst: "abort"}, + {Name: "enough_deals", Src: []string{"process_deal"}, Dst: "dkg_construct_tss"}, + {Name: "not_enough_deals", Src: []string{"process_deal"}, Dst: "dkg_deals"}, + {Name: "pass_deals_and_request_airgapped_public_key", Src: []string{"dkg_construct_tss"}, Dst: "dkg_construct_tss"}, + {Name: "transmit_airgapped_public_key", Src: []string{"dkg_construct_tss"}, Dst: "fin"}, + {Name: "transmit_airgapped_error", Src: []string{"dkg_construct_tss"}, Dst: "abort"}, + {Name: "timeout", Src: []string{"dkg_deals"}, Dst: "abort"}, + }, + fsm.Callbacks{}, + ) + fmt.Print(fsm.Visualize(DkgDealsFSM)) + +} diff --git a/fsm/cmd/test/test.go b/fsm/cmd/test/test.go new file mode 100644 index 0000000..54f9f8d --- /dev/null +++ b/fsm/cmd/test/test.go @@ -0,0 +1,34 @@ +package main + +import ( + "github.com/p2p-org/dc4bc/fsm/state_machines" + "github.com/p2p-org/dc4bc/fsm/types/requests" + "log" +) + +func main() { + fsmMachine, err := state_machines.New([]byte{}) + log.Println(fsmMachine, err) + resp, dump, err := fsmMachine.Do( + "proposal_init", + "d8a928b2043db77e340b523547bf16cb4aa483f0645fe0a290ed1f20aab76257", + requests.ProposalParticipantsListRequest{ + { + "John Doe", + []byte("pubkey123123"), + }, + { + "Crypto Billy", + []byte("pubkey456456"), + }, + { + "Matt", + []byte("pubkey789789"), + }, + }, + ) + log.Println("Response", resp) + log.Println("Err", err) + log.Println("Dump", string(dump)) + +} diff --git a/fsm/config/config.go b/fsm/config/config.go new file mode 100644 index 0000000..6234e6a --- /dev/null +++ b/fsm/config/config.go @@ -0,0 +1,6 @@ +package config + +const ( + // TODO: Move to machine level configs? + ParticipantsMinCount = 3 +) diff --git a/fsm/fsm/fsm.go b/fsm/fsm/fsm.go new file mode 100644 index 0000000..6b178d9 --- /dev/null +++ b/fsm/fsm/fsm.go @@ -0,0 +1,333 @@ +package fsm + +import ( + "errors" + "fmt" + "strings" + "sync" +) + +// +// fsmInstance, err := fsm.New(scope) +// if err != nil { +// log.Println(err) +// return +// } +// +// fsmInstance.Do(event, args) +// + +// Temporary global finish state for deprecating operations +const ( + StateGlobalIdle = "__idle" + StateGlobalDone = "__done" +) + +// FSMResponse returns result for processing with client events +type FSMResponse struct { + // Returns machine execution result state + State string + // Must be cast, according to mapper event_name->response_type + Data interface{} +} + +type FSM struct { + name string + initialState string + currentState string + + // May be mapping must require pair source + event? + transitions map[trKey]*trEvent + + callbacks Callbacks + + initialEvent string + + // Finish states, for switch machine or fin, + // These states cannot be linked as SrcState in this machine + finStates map[string]bool + + // stateMu guards access to the currentState state. + stateMu sync.RWMutex + // eventMu guards access to State() and Transition(). + eventMu sync.Mutex +} + +// Transition key source + dst +type trKey struct { + source string + event string +} + +// Transition lightweight event description +type trEvent struct { + dstState string + isInternal bool +} + +type Event struct { + Name string + + SrcState []string + + // Dst state changes after callback + DstState string + + // Internal events, cannot be emitted from external call + IsInternal bool +} + +type Callback func(event string, args ...interface{}) (interface{}, error) + +type Callbacks map[string]Callback + +// TODO: Exports +func MustNewFSM(machineName, initialState string, events []Event, callbacks map[string]Callback) *FSM { + machineName = strings.TrimSpace(machineName) + initialState = strings.TrimSpace(initialState) + + if machineName == "" { + panic("machine name cannot be empty") + } + + if initialState == "" { + panic("initial state state cannot be empty") + } + + // to remove + if len(events) == 0 { + panic("cannot init fsm with empty events") + } + + f := &FSM{ + name: machineName, + currentState: initialState, + initialState: initialState, + transitions: make(map[trKey]*trEvent), + finStates: make(map[string]bool), + callbacks: make(map[string]Callback), + } + + allEvents := make(map[string]bool) + + // Required for find finStates + allSources := make(map[string]bool) + allStates := make(map[string]bool) + + // Validate events + for _, event := range events { + event.Name = strings.TrimSpace(event.Name) + event.DstState = strings.TrimSpace(event.DstState) + + if event.Name == "" { + panic("cannot init empty event") + } + + if event.DstState == "" { + panic("event dest cannot be empty, use StateGlobalDone for finish or external state") + } + + if _, ok := allEvents[event.Name]; ok { + panic(fmt.Sprintf("duplicate event \"%s\"", event.Name)) + } + + allEvents[event.Name] = true + allStates[event.DstState] = true + + trimmedSourcesCounter := 0 + + for _, sourceState := range event.SrcState { + sourceState := strings.TrimSpace(sourceState) + + if sourceState == "" { + continue + } + + tKey := trKey{ + sourceState, + event.Name, + } + + if sourceState == StateGlobalDone { + panic("StateGlobalDone cannot set as source state") + } + + if _, ok := f.transitions[tKey]; ok { + panic("duplicate dst for pair `source + event`") + } + + f.transitions[tKey] = &trEvent{event.DstState, event.IsInternal} + + // For using provider, event must use with IsGlobal = true + if sourceState == initialState { + if f.initialEvent != "" { + panic("machine entry event already exist") + } + f.initialEvent = event.Name + } + + allSources[sourceState] = true + trimmedSourcesCounter++ + } + + if trimmedSourcesCounter == 0 { + panic("event must have minimum one source available state") + } + } + + if len(allStates) < 2 { + panic("machine must contain at least two states") + } + + // Validate callbacks + for event, callback := range callbacks { + if event == "" { + panic("callback machineName cannot be empty") + } + + if _, ok := allEvents[event]; !ok { + panic("callback has no event") + } + + f.callbacks[event] = callback + } + + for state := range allStates { + if state == StateGlobalIdle { + continue + } + // Exit states cannot be a source in this machine + if _, exists := allSources[state]; !exists || state == StateGlobalDone { + f.finStates[state] = true + } + } + + if len(f.finStates) == 0 { + panic("cannot initialize machine without final states") + } + + return f +} + +func (f *FSM) Do(event string, args ...interface{}) (resp *FSMResponse, err error) { + f.eventMu.Lock() + defer f.eventMu.Unlock() + + trEvent, ok := f.transitions[trKey{f.currentState, event}] + if !ok { + return nil, errors.New("cannot execute event for this state") + } + if trEvent.isInternal { + return nil, errors.New("event is internal") + } + + resp = &FSMResponse{ + State: f.State(), + } + + if callback, ok := f.callbacks[event]; ok { + resp.Data, err = callback(event, args...) + // Do not try change state on error + if err != nil { + return resp, err + } + } + + err = f.setState(event) + return +} + +// State returns the currentState state of the FSM. +func (f *FSM) State() string { + f.stateMu.RLock() + defer f.stateMu.RUnlock() + return f.currentState +} + +// setState allows the user to move to the given state from currentState state. +// The call does not trigger any callbacks, if defined. +func (f *FSM) setState(event string) error { + f.stateMu.Lock() + defer f.stateMu.Unlock() + + trEvent, ok := f.transitions[trKey{f.currentState, event}] + if !ok { + return errors.New("cannot change state") + } + + f.currentState = trEvent.dstState + + return nil +} + +func (f *FSM) Name() string { + return f.name +} + +func (f *FSM) InitialState() string { + return f.initialState +} + +// Check entry event for available emitting as global entry event +func (f *FSM) GlobalInitialEvent() (event string) { + if initialEvent, exists := f.transitions[trKey{StateGlobalIdle, f.initialEvent}]; exists { + if !initialEvent.isInternal { + event = f.initialEvent + } + } + return +} + +func (f *FSM) EntryEvent() (event string) { + if entryEvent, exists := f.transitions[trKey{f.initialState, f.initialEvent}]; exists { + if !entryEvent.isInternal { + event = f.initialEvent + } + } + return +} + +func (f *FSM) EventsList() (events []string) { + var eventsMap = map[string]bool{} + if len(f.transitions) > 0 { + for trKey, trEvent := range f.transitions { + if !trEvent.isInternal { + eventsMap[trKey.event] = true + if _, exists := eventsMap[trKey.event]; !exists { + + events = append(events, trKey.event) + } + } + } + } + + if len(eventsMap) > 0 { + for event := range eventsMap { + events = append(events, event) + } + } + + return +} + +func (f *FSM) StatesSourcesList() (states []string) { + var allStates = map[string]bool{} + if len(f.transitions) > 0 { + for trKey, _ := range f.transitions { + allStates[trKey.source] = true + } + } + + if len(allStates) > 0 { + for state := range allStates { + states = append(states, state) + } + } + + return +} + +func (f *FSM) IsFinState(state string) bool { + _, exists := f.finStates[state] + return exists +} diff --git a/fsm/fsm/fsm_machines_data_test.go b/fsm/fsm/fsm_machines_data_test.go new file mode 100644 index 0000000..ea3821e --- /dev/null +++ b/fsm/fsm/fsm_machines_data_test.go @@ -0,0 +1,77 @@ +package fsm + +const ( + FSM1Name = "fsm1" + // Init process from global idle state + FSM1StateInit = StateGlobalIdle + // Set up data + FSM1StateStage1 = "state_fsm1_stage1" + // Process data + FSM1StateStage2 = "state_fsm1_stage2" + // Cancelled with internal event + FSM1StateCanceledByInternal = "state_fsm1_canceled1" + // Cancelled with external event + FSM1StateCanceled2 = "state_fsm1_canceled2" + // Out endpoint to switch + FSM1StateOutToFSM2 = "state_fsm1_out_to_fsm2" + FSM1StateOutToFSM3 = "state_fsm1_out_to_fsm3" + + // Events + EventFSM1Init = "event_fsm1_init" + EventFSM1Cancel = "event_fsm1_cancel" + EventFSM1Process = "event_fsm1_process" + + // Internal events + EventFSM1Internal = "event_internal_fsm1" + EventFSM1CancelByInternal = "event_internal_fsm1_cancel" + EventFSM1InternalOut2 = "event_internal_fsm1_out" +) + +var ( + testingEvents = []Event{ + // Init + {Name: EventFSM1Init, SrcState: []string{FSM1StateInit}, DstState: FSM1StateStage1}, + {Name: EventFSM1Internal, SrcState: []string{FSM1StateStage1}, DstState: FSM1StateStage2, IsInternal: true}, + + // Cancellation events + {Name: EventFSM1CancelByInternal, SrcState: []string{FSM1StateStage2}, DstState: FSM1StateCanceledByInternal, IsInternal: true}, + {Name: EventFSM1Cancel, SrcState: []string{FSM1StateStage2}, DstState: FSM1StateCanceled2}, + + // Out + {Name: EventFSM1Process, SrcState: []string{FSM1StateStage2}, DstState: FSM1StateOutToFSM2}, + {Name: EventFSM1InternalOut2, SrcState: []string{FSM1StateStage2}, DstState: FSM1StateOutToFSM3, IsInternal: true}, + } + + testingCallbacks = Callbacks{ + EventFSM1Init: actionSetUpData, + EventFSM1InternalOut2: actionEmitOut2, + EventFSM1Process: actionProcessData, + } +) + +type testMachineFSM struct { + *FSM +} + +/*func new() fsm_pool.IStateMachine { + machine := &testMachineFSM{} + machine.FSM = MustNewFSM( + FSM1Name, + FSM1StateInit, + testingEvents, + testingCallbacks, + ) + return machine +}*/ + +func actionSetUpData(event string, args ...interface{}) (response interface{}, err error) { + return +} + +func actionProcessData(event string, args ...interface{}) (response interface{}, err error) { + return +} + +func actionEmitOut2(event string, args ...interface{}) (response interface{}, err error) { + return +} diff --git a/fsm/fsm/fsm_machines_test.go b/fsm/fsm/fsm_machines_test.go new file mode 100644 index 0000000..fa71f90 --- /dev/null +++ b/fsm/fsm/fsm_machines_test.go @@ -0,0 +1,225 @@ +package fsm + +import ( + "log" + "testing" +) + +var testingFSM *FSM + +func init() { + testingFSM = MustNewFSM( + FSM1Name, + FSM1StateInit, + testingEvents, + testingCallbacks, + ) +} + +func compareRecoverStr(t *testing.T, r interface{}, assertion string) { + if r == nil { + return + } + msg, ok := r.(string) + if !ok { + t.Error("not asserted recover:", r) + } + if msg != assertion { + t.Error("not asserted recover:", msg) + } +} + +func compareArrays(src, dst []string) bool { + if len(src) != len(dst) { + return false + } + // create a map of string -> int + diff := make(map[string]int, len(src)) + for _, _x := range src { + // 0 value for int is 0, so just increment a counter for the string + diff[_x]++ + } + for _, _y := range dst { + // If the string _y is not in diff bail out early + if _, ok := diff[_y]; !ok { + return false + } + diff[_y] -= 1 + if diff[_y] == 0 { + delete(diff, _y) + } + } + if len(diff) == 0 { + return true + } + return false +} + +func TestMustNewFSM_Empty_Name_Panic(t *testing.T) { + defer func() { + compareRecoverStr(t, recover(), "machine name cannot be empty") + }() + testingFSM = MustNewFSM( + "", + "init_state", + []Event{}, + nil, + ) + + t.Errorf("did not panic on empty machine name") +} + +func TestMustNewFSM_Empty_Initial_State_Panic(t *testing.T) { + defer func() { + compareRecoverStr(t, recover(), "initial state state cannot be empty") + }() + + testingFSM = MustNewFSM( + "fsm", + "", + []Event{}, + nil, + ) + + t.Errorf("did not panic on empty initial") +} + +func TestMustNewFSM_Empty_Events_Panic(t *testing.T) { + defer func() { + compareRecoverStr(t, recover(), "cannot init fsm with empty events") + }() + + testingFSM = MustNewFSM( + "fsm", + "init_state", + []Event{}, + nil, + ) + + t.Errorf("did not panic on empty events list") +} + +func TestMustNewFSM_Event_Empty_Name_Panic(t *testing.T) { + defer func() { + compareRecoverStr(t, recover(), "cannot init empty event") + }() + + testingFSM = MustNewFSM( + "fsm", + "init_state", + []Event{ + {Name: "", SrcState: []string{"init_state"}, DstState: StateGlobalDone}, + }, + nil, + ) + + t.Errorf("did not panic on empty event name") +} + +func TestMustNewFSM_Event_Empty_Source_Panic(t *testing.T) { + defer func() { + compareRecoverStr(t, recover(), "event must have minimum one source available state") + }() + + testingFSM = MustNewFSM( + "fsm", + "init_state", + []Event{ + {Name: "event", SrcState: []string{}, DstState: StateGlobalDone}, + }, + nil, + ) + + t.Errorf("did not panic on empty event sources") +} + +func TestMustNewFSM_States_Min_Panic(t *testing.T) { + defer func() { + compareRecoverStr(t, recover(), "machine must contain at least two states") + }() + + testingFSM = MustNewFSM( + "fsm", + "init_state", + []Event{ + {Name: "event", SrcState: []string{"init_state"}, DstState: StateGlobalDone}, + }, + nil, + ) + + t.Errorf("did not panic on less than two states") +} + +func TestMustNewFSM_State_Entry_Conflict_Panic(t *testing.T) { + defer func() { + compareRecoverStr(t, recover(), "machine entry event already exist") + }() + + testingFSM = MustNewFSM( + "fsm", + "init_state", + []Event{ + {Name: "event1", SrcState: []string{"init_state"}, DstState: "state"}, + {Name: "event2", SrcState: []string{"init_state"}, DstState: "state"}, + }, + nil, + ) + + t.Errorf("did not panic on initialize with conflict in entry state") +} + +func TestMustNewFSM_State_Final_Not_Found_Panic(t *testing.T) { + defer func() { + compareRecoverStr(t, recover(), "cannot initialize machine without final states") + }() + + testingFSM = MustNewFSM( + "fsm", + "init_state", + []Event{ + {Name: "event1", SrcState: []string{"init_state"}, DstState: "state2"}, + {Name: "event2", SrcState: []string{"state2"}, DstState: "init_state"}, + }, + nil, + ) + + t.Errorf("did not panic on initialize without final state") +} + +func TestFSM_Name(t *testing.T) { + if testingFSM.Name() != FSM1Name { + t.Errorf("expected machine name \"%s\"", FSM1Name) + } +} + +func TestFSM_EntryEvent(t *testing.T) { + if testingFSM.InitialState() != FSM1StateInit { + t.Errorf("expected initial state \"%s\"", FSM1StateInit) + } +} + +func TestFSM_EventsList(t *testing.T) { + eventsList := []string{ + EventFSM1Init, + EventFSM1Cancel, + EventFSM1Process, + } + + if !compareArrays(testingFSM.EventsList(), eventsList) { + t.Error("expected public events", eventsList) + } + +} + +func TestFSM_StatesList(t *testing.T) { + log.Println(testingFSM.StatesSourcesList()) + statesList := []string{ + FSM1StateInit, + FSM1StateStage1, + FSM1StateStage2, + } + + if !compareArrays(testingFSM.StatesSourcesList(), statesList) { + t.Error("expected states", statesList) + } +} diff --git a/fsm/fsm_pool/fsm_pool.go b/fsm/fsm_pool/fsm_pool.go new file mode 100644 index 0000000..3b24ec5 --- /dev/null +++ b/fsm/fsm_pool/fsm_pool.go @@ -0,0 +1,150 @@ +package fsm_pool + +import ( + "errors" + "github.com/p2p-org/dc4bc/fsm/fsm" +) + +type IStateMachine interface { + // Returns machine state from scope dump + // For nil argument returns fsm with process initiation + // Get() IStateMachine + + Name() string + + InitialState() string + + // Process event + Do(event string, args ...interface{}) (*fsm.FSMResponse, error) + + GlobalInitialEvent() string + + EventsList() []string + + StatesSourcesList() []string + + IsFinState(state string) bool +} + +type FSMMapper map[string]IStateMachine + +type FSMRouteMapper map[string]string + +type FSMPoolProvider struct { + fsmInitialEvent string + // Pool mapper by names + mapper FSMMapper + events FSMRouteMapper + states FSMRouteMapper +} + +func Init(machines ...IStateMachine) *FSMPoolProvider { + if len(machines) == 0 { + panic("cannot initialize empty pool") + } + p := &FSMPoolProvider{ + mapper: make(FSMMapper), + events: make(FSMRouteMapper), + states: make(FSMRouteMapper), + } + + allInitStatesMap := make(map[string]string) + + // Fill up mapper + for _, machine := range machines { + + if machine == nil { + panic("machine not initialized, got nil") + } + + machineName := machine.Name() + + if machineName == "" { + panic("machine name cannot be empty") + } + + if _, exists := p.mapper[machineName]; exists { + panic("duplicate machine name") + } + + allInitStatesMap[machine.InitialState()] = machineName + + machineEvents := machine.EventsList() + for _, event := range machineEvents { + if _, exists := p.events[event]; exists { + panic("duplicate public event") + } + p.events[event] = machineName + } + + // Setup entry event for machines pool if available + if initialEvent := machine.GlobalInitialEvent(); initialEvent != "" { + if p.fsmInitialEvent != "" { + panic("duplicate entry event initialization") + } + + p.fsmInitialEvent = initialEvent + } + + p.mapper[machineName] = machine + } + + // Second iteration, all initial states filled up + // Fill up states with initial and exit states checking + for _, machine := range machines { + machineName := machine.Name() + machineStates := machine.StatesSourcesList() + for _, state := range machineStates { + if machine.IsFinState(state) { + // If state is initial for another machine, + if initMachineName, exists := allInitStatesMap[state]; exists { + p.states[allInitStatesMap[state]] = initMachineName + continue + } + } + if name, exists := p.states[state]; exists && name != machineName { + panic("duplicate state for machines") + } + + p.states[state] = machineName + } + } + + if p.fsmInitialEvent == "" { + panic("machines pool entry event not set") + } + return p +} + +func (p *FSMPoolProvider) EntryPointMachine() (IStateMachine, error) { + // StateGlobalIdle + // TODO: Short code + entryStateMachineName := p.events[p.fsmInitialEvent] + + machine, exists := p.mapper[entryStateMachineName] + + if !exists || machine == nil { + return nil, errors.New("cannot init machine with entry point") + } + return machine, nil +} + +func (p *FSMPoolProvider) MachineByEvent(event string) (IStateMachine, error) { + eventMachineName := p.events[event] + machine, exists := p.mapper[eventMachineName] + + if !exists || machine == nil { + return nil, errors.New("cannot init machine for event") + } + return machine, nil +} + +func (p *FSMPoolProvider) MachineByState(state string) (IStateMachine, error) { + eventMachineName := p.states[state] + machine, exists := p.mapper[eventMachineName] + + if !exists || machine == nil { + return nil, errors.New("cannot init machine for state") + } + return machine, nil +} diff --git a/fsm/state_machines/dkg_commit_fsm/init.go b/fsm/state_machines/dkg_commit_fsm/init.go new file mode 100644 index 0000000..025125c --- /dev/null +++ b/fsm/state_machines/dkg_commit_fsm/init.go @@ -0,0 +1 @@ +package dkg_commit_fsm diff --git a/fsm/state_machines/dkg_deals_fsm/init.go b/fsm/state_machines/dkg_deals_fsm/init.go new file mode 100644 index 0000000..73c6a3a --- /dev/null +++ b/fsm/state_machines/dkg_deals_fsm/init.go @@ -0,0 +1 @@ +package dkg_deals_fsm diff --git a/fsm/state_machines/dkg_proposal_fsm/init.go b/fsm/state_machines/dkg_proposal_fsm/init.go new file mode 100644 index 0000000..4f18c1e --- /dev/null +++ b/fsm/state_machines/dkg_proposal_fsm/init.go @@ -0,0 +1 @@ +package dkg_proposal_fsm diff --git a/fsm/state_machines/internal/provider.go b/fsm/state_machines/internal/provider.go new file mode 100644 index 0000000..0d74d02 --- /dev/null +++ b/fsm/state_machines/internal/provider.go @@ -0,0 +1,13 @@ +package internal + +type MachineStatePayload struct { + ProposalPayload ProposalConfirmationPrivateQuorum + SigningPayload map[string]interface{} +} + +// Using combine response for modify data with chain +// User value or pointer? How about memory state? +type MachineCombinedResponse struct { + Response interface{} + Payload *MachineStatePayload +} diff --git a/fsm/state_machines/internal/types.go b/fsm/state_machines/internal/types.go new file mode 100644 index 0000000..59dc581 --- /dev/null +++ b/fsm/state_machines/internal/types.go @@ -0,0 +1,17 @@ +package internal + +import "time" + +type ProposalParticipantPrivate struct { + // Public title for address, such as name, nickname, organization + Title string + PublicKey []byte + // For validation user confirmation: sign(InvitationSecret, PublicKey) => user + InvitationSecret string + ConfirmedAt *time.Time +} + +// Unique alias for map iteration - Public Key Fingerprint +// Excludes array merge and rotate operations + +type ProposalConfirmationPrivateQuorum map[string]ProposalParticipantPrivate diff --git a/fsm/state_machines/provider.go b/fsm/state_machines/provider.go new file mode 100644 index 0000000..c029694 --- /dev/null +++ b/fsm/state_machines/provider.go @@ -0,0 +1,98 @@ +package state_machines + +import ( + "encoding/json" + "errors" + "github.com/p2p-org/dc4bc/fsm/fsm" + "github.com/p2p-org/dc4bc/fsm/fsm_pool" + "github.com/p2p-org/dc4bc/fsm/state_machines/internal" + "github.com/p2p-org/dc4bc/fsm/state_machines/signature_construct_fsm" + "github.com/p2p-org/dc4bc/fsm/state_machines/signature_proposal_fsm" +) + +// Is machine state scope dump will be locked? +type FSMDump struct { + Id string + State string + Payload internal.MachineStatePayload +} + +type FSMInstance struct { + machine fsm_pool.IStateMachine + dump *FSMDump +} + +var ( + fsmPoolProvider *fsm_pool.FSMPoolProvider +) + +func init() { + fsmPoolProvider = fsm_pool.Init( + signature_proposal_fsm.New(), + signature_construct_fsm.New(), + ) +} + +func New(data []byte) (*FSMInstance, error) { + var err error + i := &FSMInstance{} + if len(data) == 0 { + i.InitDump() + i.machine, err = fsmPoolProvider.EntryPointMachine() + return i, err // Create machine + } + + err = i.dump.Unmarshal(data) + + if err != nil { + return nil, errors.New("cannot read machine dump") + } + + i.machine, err = fsmPoolProvider.MachineByState(i.dump.State) + return i, err +} + +func (i *FSMInstance) Do(event string, args ...interface{}) (*fsm.FSMResponse, []byte, error) { + // Provide payload as first argument ever + result, err := i.machine.Do(event, append([]interface{}{i.dump.Payload}, args...)...) + + // On route errors result will be nil + if result != nil { + + // Proxying combined response, separate payload and data + if result.Data != nil { + if r, ok := result.Data.(internal.MachineCombinedResponse); ok { + i.dump.Payload = *r.Payload + result.Data = r.Response + } else { + return nil, []byte{}, errors.New("cannot cast callback response") + } + } + + i.dump.State = result.State + } + dump, dumpErr := i.dump.Marshal() + if dumpErr != nil { + return result, []byte{}, err + } + + return result, dump, err +} + +func (i *FSMInstance) InitDump() { + if i.dump == nil { + i.dump = &FSMDump{ + State: fsm.StateGlobalIdle, + } + } +} + +// TODO: Add encryption +func (d *FSMDump) Marshal() ([]byte, error) { + return json.Marshal(d) +} + +// TODO: Add decryption +func (d *FSMDump) Unmarshal(data []byte) error { + return json.Unmarshal(data, d) +} diff --git a/fsm/state_machines/signature_construct_fsm/init.go b/fsm/state_machines/signature_construct_fsm/init.go new file mode 100644 index 0000000..5f80f06 --- /dev/null +++ b/fsm/state_machines/signature_construct_fsm/init.go @@ -0,0 +1,39 @@ +package signature_construct_fsm + +import ( + "github.com/p2p-org/dc4bc/fsm/fsm" + "github.com/p2p-org/dc4bc/fsm/fsm_pool" +) + +const ( + fsmName = "signature_construct_fsm" + + stateConstructorEntryPoint = "process_sig" + awaitConstructor = "validate_process_sig" // waiting participants + + eventInitSignatureConstructor = "process_sig_init" + eventInitSignatureFinishTmp = "process_sig_fin" +) + +type SignatureConstructFSM struct { + *fsm.FSM +} + +func New() fsm_pool.IStateMachine { + machine := &SignatureConstructFSM{} + + machine.FSM = fsm.MustNewFSM( + fsmName, + stateConstructorEntryPoint, + []fsm.Event{ + // {Name: "", SrcState: []string{""}, DstState: ""}, + + // Init + {Name: eventInitSignatureConstructor, SrcState: []string{stateConstructorEntryPoint}, DstState: awaitConstructor}, + {Name: eventInitSignatureFinishTmp, SrcState: []string{awaitConstructor}, DstState: "dkg_proposal_fsm"}, + }, + fsm.Callbacks{}, + ) + + return machine +} diff --git a/fsm/state_machines/signature_proposal_fsm/actions.go b/fsm/state_machines/signature_proposal_fsm/actions.go new file mode 100644 index 0000000..aba5f4b --- /dev/null +++ b/fsm/state_machines/signature_proposal_fsm/actions.go @@ -0,0 +1,98 @@ +package signature_proposal_fsm + +import ( + "errors" + "github.com/p2p-org/dc4bc/fsm/state_machines/internal" + "github.com/p2p-org/dc4bc/fsm/types/requests" + "github.com/p2p-org/dc4bc/fsm/types/responses" + "log" +) + +// init -> awaitingConfirmations +// args: payload, signing id, participants list +func (s *SignatureProposalFSM) actionInitProposal(event string, args ...interface{}) (response interface{}, err error) { + var payload internal.MachineStatePayload + // Init proposal + log.Println("I'm actionInitProposal") + + if len(args) < 3 { + err = errors.New("payload and signing id required and participants list required") + return + } + + if len(args) > 3 { + err = errors.New("too many arguments") + return + } + + payload, ok := args[0].(internal.MachineStatePayload) + + if !ok { + err = errors.New("cannot cast payload") + return + } + + signingId, ok := args[1].(string) + if !ok { + err = errors.New("cannot cast signing id, awaiting string value") + return + } + + if len(signingId) < signingIdLen { + err = errors.New("signing id to short ") + return + } + + request, ok := args[2].(requests.ProposalParticipantsListRequest) + + if !ok { + err = errors.New("cannot cast participants list") + return + } + + if err = request.Validate(); err != nil { + return + } + + payload.ProposalPayload = make(internal.ProposalConfirmationPrivateQuorum) + + for _, participant := range request { + participantId := createFingerprint(&participant.PublicKey) + secret, err := generateRandomString(32) + if err != nil { + return nil, errors.New("cannot generateRandomString") + } + payload.ProposalPayload[participantId] = internal.ProposalParticipantPrivate{ + Title: participant.Title, + PublicKey: participant.PublicKey, + InvitationSecret: secret, + ConfirmedAt: nil, + } + } + + /*s.state = &fsm_pool.FSMachine{ + Id: signingId, + State: stateAwaitProposalConfirmation, + } + s.state.Payload.ProposalPayload = &privateParticipantsList*/ + return internal.MachineCombinedResponse{ + Response: responses.ProposalParticipantInvitationsResponse{}, + Payload: &payload, + }, nil +} + +// +func (s *SignatureProposalFSM) actionConfirmProposalByParticipant(event string, args ...interface{}) (response interface{}, err error) { + log.Println("I'm actionConfirmProposalByParticipant") + return +} + +func (s *SignatureProposalFSM) actionDeclineProposalByParticipant(event string, args ...interface{}) (response interface{}, err error) { + log.Println("I'm actionDeclineProposalByParticipant") + return +} + +func (s *SignatureProposalFSM) actionValidateProposal(event string, args ...interface{}) (response interface{}, err error) { + log.Println("I'm actionValidateProposal") + return +} diff --git a/fsm/state_machines/signature_proposal_fsm/helpers.go b/fsm/state_machines/signature_proposal_fsm/helpers.go new file mode 100644 index 0000000..2da1020 --- /dev/null +++ b/fsm/state_machines/signature_proposal_fsm/helpers.go @@ -0,0 +1,58 @@ +package signature_proposal_fsm + +import ( + "crypto/sha256" + "encoding/base64" + "github.com/p2p-org/dc4bc/fsm/state_machines/internal" + "github.com/p2p-org/dc4bc/fsm/types/responses" + "math/rand" +) + +// Request and response mutators + +func ProposalParticipantsQuorumToResponse(list *internal.ProposalConfirmationPrivateQuorum) responses.ProposalParticipantInvitationsResponse { + var response responses.ProposalParticipantInvitationsResponse + for quorumId, parcipant := range *list { + response = append(response, &responses.ProposalParticipantInvitationEntryResponse{ + Title: parcipant.Title, + PubKeyFingerprint: quorumId, + // TODO: Add encryption + EncryptedInvitation: parcipant.InvitationSecret, + }) + } + return response +} + +// Common functions + +func createFingerprint(data *[]byte) string { + hash := sha256.Sum256(*data) + return base64.StdEncoding.EncodeToString(hash[:]) +} + +// https://blog.questionable.services/article/generating-secure-random-numbers-crypto-rand/ + +// GenerateRandomBytes returns securely generated random bytes. +// It will return an error if the system's secure random +// number generator fails to function correctly, in which +// case the caller should not continue. +func generateRandomBytes(n int) ([]byte, error) { + b := make([]byte, n) + _, err := rand.Read(b) + // Note that err == nil only if we read len(b) bytes. + if err != nil { + return nil, err + } + + return b, nil +} + +// GenerateRandomString returns a URL-safe, base64 encoded +// securely generated random string. +// It will return an error if the system's secure random +// number generator fails to function correctly, in which +// case the caller should not continue. +func generateRandomString(s int) (string, error) { + b, err := generateRandomBytes(s) + return base64.URLEncoding.EncodeToString(b), err +} diff --git a/fsm/state_machines/signature_proposal_fsm/init.go b/fsm/state_machines/signature_proposal_fsm/init.go new file mode 100644 index 0000000..4ab0d18 --- /dev/null +++ b/fsm/state_machines/signature_proposal_fsm/init.go @@ -0,0 +1,68 @@ +package signature_proposal_fsm + +import ( + "github.com/p2p-org/dc4bc/fsm/fsm" + "github.com/p2p-org/dc4bc/fsm/fsm_pool" +) + +const ( + fsmName = "signature_proposal_fsm" + signingIdLen = 32 + + stateAwaitProposalConfirmation = "validate_proposal" // waiting participants + + stateValidationCanceledByParticipant = "validation_canceled_by_participant" + stateValidationCanceledByTimeout = "validation_canceled_by_timeout" + + stateProposed = "proposed" + + eventInitProposal = "proposal_init" + eventConfirmProposal = "proposal_confirm_by_participant" + eventDeclineProposal = "proposal_decline_by_participant" + eventValidateProposal = "proposal_validate" + eventSetProposalValidated = "proposal_set_validated" + + eventSetValidationCanceledByTimeout = "proposal_canceled_timeout" + eventSwitchProposedToSigning = "switch_state_to_signing" +) + +type SignatureProposalFSM struct { + *fsm.FSM +} + +func New() fsm_pool.IStateMachine { + machine := &SignatureProposalFSM{} + + machine.FSM = fsm.MustNewFSM( + fsmName, + fsm.StateGlobalIdle, + []fsm.Event{ + // {Name: "", SrcState: []string{""}, DstState: ""}, + + // Init + {Name: eventInitProposal, SrcState: []string{fsm.StateGlobalIdle}, DstState: stateAwaitProposalConfirmation}, + + // Validate by participants + {Name: eventConfirmProposal, SrcState: []string{stateAwaitProposalConfirmation}, DstState: stateAwaitProposalConfirmation}, + // Is decline event should auto change state to default, or it process will initiated by client (external emit)? + // Now set for external emitting. + {Name: eventDeclineProposal, SrcState: []string{stateAwaitProposalConfirmation}, DstState: stateValidationCanceledByParticipant}, + + {Name: eventValidateProposal, SrcState: []string{stateAwaitProposalConfirmation}, DstState: stateAwaitProposalConfirmation}, + + // eventProposalValidate internal or from client? + // yay + // Exit point + {Name: eventSetProposalValidated, SrcState: []string{stateAwaitProposalConfirmation}, DstState: "process_sig", IsInternal: true}, + // nan + {Name: eventSetValidationCanceledByTimeout, SrcState: []string{stateAwaitProposalConfirmation}, DstState: stateValidationCanceledByTimeout, IsInternal: true}, + }, + fsm.Callbacks{ + eventInitProposal: machine.actionInitProposal, + eventConfirmProposal: machine.actionConfirmProposalByParticipant, + eventDeclineProposal: machine.actionDeclineProposalByParticipant, + eventValidateProposal: machine.actionValidateProposal, + }, + ) + return machine +} diff --git a/fsm/types/requests/signature_proposal.go b/fsm/types/requests/signature_proposal.go new file mode 100644 index 0000000..06f6b1c --- /dev/null +++ b/fsm/types/requests/signature_proposal.go @@ -0,0 +1,48 @@ +package requests + +import ( + "errors" + "github.com/p2p-org/dc4bc/fsm/config" +) + +// Requests + +type ProposalParticipantsListRequest []ProposalParticipantsEntryRequest + +type ProposalParticipantsEntryRequest struct { + // Public title for address, such as name, nickname, organization + Title string + PublicKey []byte +} + +func (r *ProposalParticipantsListRequest) Validate() error { + if len(*r) < config.ParticipantsMinCount { + return errors.New("too few participants") + } + + for _, participant := range *r { + if len(participant.Title) < 3 { + return errors.New("title too short") + } + + if len(participant.Title) > 150 { + return errors.New("title too long") + } + + if len(participant.PublicKey) < 10 { + return errors.New("pub key too short") + } + } + + return nil +} + +type ProposalParticipantConfirmationRequest struct { + // Public title for address, such as name, nickname, organization + ParticipantId string + EncryptedInvitation string +} + +func (r *ProposalParticipantConfirmationRequest) Validate() error { + return nil +} diff --git a/fsm/types/responses/signature_proposal.go b/fsm/types/responses/signature_proposal.go new file mode 100644 index 0000000..ff9acdc --- /dev/null +++ b/fsm/types/responses/signature_proposal.go @@ -0,0 +1,14 @@ +package responses + +// Responses + +type ProposalParticipantInvitationsResponse []*ProposalParticipantInvitationEntryResponse + +type ProposalParticipantInvitationEntryResponse struct { + // Public title for address, such as name, nickname, organization + Title string + // Key for link invitations to participants + PubKeyFingerprint string + // Encrypted with public key secret + EncryptedInvitation string +} diff --git a/go.mod b/go.mod index ec3a434..f4e2139 100644 --- a/go.mod +++ b/go.mod @@ -3,19 +3,21 @@ module p2p.org/dc4bc go 1.13 require ( + github.com/google/uuid v1.1.1 + github.com/juju/fslock v0.0.0-20160525022230-4d5c94c67b4b + github.com/looplab/fsm v0.1.0 github.com/makiuchi-d/gozxing v0.0.0-20190830103442-eaff64b1ceb7 github.com/mattn/go-gtk v0.0.0-20191030024613-af2e013261f5 - github.com/mattn/go-pointer v0.0.0-20190911064623-a0a44394634f // indirect + github.com/p2p-org/dc4bc v0.0.0-00010101000000-000000000000 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e - github.com/stretchr/testify v1.6.1 // indirect - github.com/syndtr/goleveldb v1.0.0 + github.com/stretchr/testify v1.6.1 go.dedis.ch/kyber/v3 v3.0.9 gocv.io/x/gocv v0.23.0 golang.org/x/image v0.0.0-20200618115811-c13761719519 - golang.org/x/text v0.3.3 // indirect - golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect ) replace golang.org/x/crypto => github.com/tendermint/crypto v0.0.0-20180820045704-3764759f34a5 replace go.dedis.ch/kyber/v3 => github.com/corestario/kyber/v3 v3.0.0-20200218082721-8ed10c357c05 + +replace github.com/p2p-org/dc4bc => /home/tellme/PROJECTS/go/src/github.com/p2p-org/dc4bc diff --git a/storage/fileStorage.go b/storage/fileStorage.go new file mode 100644 index 0000000..293d7cb --- /dev/null +++ b/storage/fileStorage.go @@ -0,0 +1,115 @@ +package storage + +import ( + "bufio" + "encoding/json" + "fmt" + "github.com/google/uuid" + "github.com/juju/fslock" + "io" + "os" +) + +var _ Storage = (*FileStorage)(nil) + +type FileStorage struct { + lockFile *fslock.Lock + + dataFile *os.File +} + +const ( + defaultLockFile = "/tmp/dc4bc_storage_lock" +) + +func countLines(r io.Reader) uint64 { + var count uint64 + fileScanner := bufio.NewScanner(r) + + for fileScanner.Scan() { + count++ + } + + return count +} + +// InitFileStorage inits append-only file storage +// It takes two arguments: filename - path to a data file, lockFilename (optional) - path to a lock file +func InitFileStorage(filename string, lockFilename ...string) (Storage, error) { + var ( + fs FileStorage + err error + ) + if len(lockFilename) > 0 { + fs.lockFile = fslock.New(lockFilename[0]) + } else { + fs.lockFile = fslock.New(defaultLockFile) + } + + if fs.dataFile, err = os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644); err != nil { + return nil, fmt.Errorf("failed to open a data file: %v", err) + } + return &fs, nil +} + +// Send sends a message to an append-only data file, returns a message with offset and id +func (fs *FileStorage) Send(m Message) (Message, error) { + var ( + data []byte + err error + ) + if err = fs.lockFile.Lock(); err != nil { + return m, fmt.Errorf("failed to lock a file: %v", err) + } + defer fs.lockFile.Unlock() + + m.ID = uuid.New().String() + + if _, err = fs.dataFile.Seek(0, 0); err != nil { // otherwise countLines will return zero + return m, fmt.Errorf("failed to seek a offset to the start of a data file: %v", err) + } + m.Offset = countLines(fs.dataFile) + + if data, err = json.Marshal(m); err != nil { + return m, fmt.Errorf("failed to marshal a message %v: %v", m, err) + } + + if _, err = fmt.Fprintln(fs.dataFile, string(data)); err != nil { + return m, fmt.Errorf("failed to write a message to a data file: %v", err) + } + return m, err +} + +// GetMessages returns a slice of messages from append-only data file with given offset +func (fs *FileStorage) GetMessages(offset int) ([]Message, error) { + var ( + msgs []Message + err error + row []byte + data Message + ) + if _, err = fs.dataFile.Seek(0, 0); err != nil { + return nil, fmt.Errorf("failed to seek a offset to the start of a data file: %v", err) + } + scanner := bufio.NewScanner(fs.dataFile) + for scanner.Scan() { + if offset > 0 { + offset-- + continue + } + + row = scanner.Bytes() + if err = json.Unmarshal(row, &data); err != nil { + return nil, fmt.Errorf("failed to unmarshal a message %s: %v", string(row), err) + } + msgs = append(msgs, data) + } + if scanner.Err() != nil { + return nil, fmt.Errorf("failed to read a data file: %v", err) + } + return msgs, nil +} + +func (fs *FileStorage) Close() error { + return fs.dataFile.Close() +} diff --git a/storage/fileStorage_test.go b/storage/fileStorage_test.go new file mode 100644 index 0000000..07dbf8d --- /dev/null +++ b/storage/fileStorage_test.go @@ -0,0 +1,47 @@ +package storage + +import ( + "math/rand" + "reflect" + "testing" + "time" +) + +func randomBytes(n int) []byte { + rand.Seed(time.Now().UnixNano()) + b := make([]byte, n) + if _, err := rand.Read(b); err != nil { + return nil + } + return b +} + +func TestFileStorage_GetMessages(t *testing.T) { + N := 10 + offset := 5 + fs, err := InitFileStorage("test") + if err != nil { + t.Error(err) + } + defer fs.Close() + msgs := make([]Message, 0, N) + for i := 0; i < N; i++ { + msg := Message{ + Data: randomBytes(10), + Signature: randomBytes(10), + } + msg, err = fs.Send(msg) + if err != nil { + t.Error(err) + } + msgs = append(msgs, msg) + } + offsetMsgs, err := fs.GetMessages(offset) + if err != nil { + t.Error(err) + } + expectedOffsetMsgs := msgs[offset:] + if !reflect.DeepEqual(offsetMsgs, expectedOffsetMsgs) { + t.Errorf("expected messages: %v, actual messages: %v", expectedOffsetMsgs, offsetMsgs) + } +} diff --git a/storage/types.go b/storage/types.go index f6c9949..65dc2b7 100644 --- a/storage/types.go +++ b/storage/types.go @@ -1,13 +1,14 @@ package storage type Message struct { - Offset uint64 Data []byte `json:"data"` Signature []byte `json:"signature"` + ID string `json:"id"` + Offset uint64 `json:"offset"` } type Storage interface { - Post(message Message) error - GetMessages(offset uint64) ([]Message, error) + Send(message Message) (Message, error) + GetMessages(offset int) ([]Message, error) Close() error }