cmd/swarm, swarm: add stream peer servers limit

This commit is contained in:
Janos Guljas 2018-09-24 17:40:22 +02:00
parent 1f45ba9bb1
commit 9e99a0c2b9
10 changed files with 258 additions and 60 deletions

View File

@ -59,27 +59,28 @@ var (
//constants for environment variables
const (
SWARM_ENV_CHEQUEBOOK_ADDR = "SWARM_CHEQUEBOOK_ADDR"
SWARM_ENV_ACCOUNT = "SWARM_ACCOUNT"
SWARM_ENV_LISTEN_ADDR = "SWARM_LISTEN_ADDR"
SWARM_ENV_PORT = "SWARM_PORT"
SWARM_ENV_NETWORK_ID = "SWARM_NETWORK_ID"
SWARM_ENV_SWAP_ENABLE = "SWARM_SWAP_ENABLE"
SWARM_ENV_SWAP_API = "SWARM_SWAP_API"
SWARM_ENV_SYNC_DISABLE = "SWARM_SYNC_DISABLE"
SWARM_ENV_SYNC_UPDATE_DELAY = "SWARM_ENV_SYNC_UPDATE_DELAY"
SWARM_ENV_LIGHT_NODE_ENABLE = "SWARM_LIGHT_NODE_ENABLE"
SWARM_ENV_DELIVERY_SKIP_CHECK = "SWARM_DELIVERY_SKIP_CHECK"
SWARM_ENV_ENS_API = "SWARM_ENS_API"
SWARM_ENV_ENS_ADDR = "SWARM_ENS_ADDR"
SWARM_ENV_CORS = "SWARM_CORS"
SWARM_ENV_BOOTNODES = "SWARM_BOOTNODES"
SWARM_ENV_PSS_ENABLE = "SWARM_PSS_ENABLE"
SWARM_ENV_STORE_PATH = "SWARM_STORE_PATH"
SWARM_ENV_STORE_CAPACITY = "SWARM_STORE_CAPACITY"
SWARM_ENV_STORE_CACHE_CAPACITY = "SWARM_STORE_CACHE_CAPACITY"
SWARM_ACCESS_PASSWORD = "SWARM_ACCESS_PASSWORD"
GETH_ENV_DATADIR = "GETH_DATADIR"
SWARM_ENV_CHEQUEBOOK_ADDR = "SWARM_CHEQUEBOOK_ADDR"
SWARM_ENV_ACCOUNT = "SWARM_ACCOUNT"
SWARM_ENV_LISTEN_ADDR = "SWARM_LISTEN_ADDR"
SWARM_ENV_PORT = "SWARM_PORT"
SWARM_ENV_NETWORK_ID = "SWARM_NETWORK_ID"
SWARM_ENV_SWAP_ENABLE = "SWARM_SWAP_ENABLE"
SWARM_ENV_SWAP_API = "SWARM_SWAP_API"
SWARM_ENV_SYNC_DISABLE = "SWARM_SYNC_DISABLE"
SWARM_ENV_SYNC_UPDATE_DELAY = "SWARM_ENV_SYNC_UPDATE_DELAY"
SWARM_ENV_MAX_STREAM_PEER_SERVERS = "SWARM_ENV_MAX_STREAM_PEER_SERVERS"
SWARM_ENV_LIGHT_NODE_ENABLE = "SWARM_LIGHT_NODE_ENABLE"
SWARM_ENV_DELIVERY_SKIP_CHECK = "SWARM_DELIVERY_SKIP_CHECK"
SWARM_ENV_ENS_API = "SWARM_ENS_API"
SWARM_ENV_ENS_ADDR = "SWARM_ENS_ADDR"
SWARM_ENV_CORS = "SWARM_CORS"
SWARM_ENV_BOOTNODES = "SWARM_BOOTNODES"
SWARM_ENV_PSS_ENABLE = "SWARM_PSS_ENABLE"
SWARM_ENV_STORE_PATH = "SWARM_STORE_PATH"
SWARM_ENV_STORE_CAPACITY = "SWARM_STORE_CAPACITY"
SWARM_ENV_STORE_CACHE_CAPACITY = "SWARM_STORE_CACHE_CAPACITY"
SWARM_ACCESS_PASSWORD = "SWARM_ACCESS_PASSWORD"
GETH_ENV_DATADIR = "GETH_DATADIR"
)
// These settings ensure that TOML keys use the same names as Go struct fields.
@ -207,6 +208,9 @@ func cmdLineOverride(currentConfig *bzzapi.Config, ctx *cli.Context) *bzzapi.Con
currentConfig.SyncUpdateDelay = d
}
// any value including 0 is acceptable
currentConfig.MaxStreamPeerServers = ctx.GlobalInt(SwarmMaxStreamPeerServersFlag.Name)
if ctx.GlobalIsSet(SwarmLightNodeEnabled.Name) {
currentConfig.LightNodeEnabled = true
}
@ -304,6 +308,12 @@ func envVarsOverride(currentConfig *bzzapi.Config) (config *bzzapi.Config) {
}
}
if max := os.Getenv(SWARM_ENV_MAX_STREAM_PEER_SERVERS); max != "" {
if m, err := strconv.Atoi(max); err == nil {
currentConfig.MaxStreamPeerServers = m
}
}
if lne := os.Getenv(SWARM_ENV_LIGHT_NODE_ENABLE); lne != "" {
if lightnode, err := strconv.ParseBool(lne); err != nil {
currentConfig.LightNodeEnabled = lightnode

View File

@ -116,6 +116,12 @@ var (
Usage: "Duration for sync subscriptions update after no new peers are added (default 15s)",
EnvVar: SWARM_ENV_SYNC_UPDATE_DELAY,
}
SwarmMaxStreamPeerServersFlag = cli.IntFlag{
Name: "max-stream-peer-servers",
Usage: "Limit of Stream peer servers, 0 denotes unlimited",
EnvVar: SWARM_ENV_MAX_STREAM_PEER_SERVERS,
Value: 10000, // A very large default value is possible as stream servers have very small memory footprint
}
SwarmLightNodeEnabled = cli.BoolFlag{
Name: "lightnode",
Usage: "Enable Swarm LightNode (default false)",
@ -542,6 +548,7 @@ pv(1) tool to get a progress bar:
SwarmSwapAPIFlag,
SwarmSyncDisabledFlag,
SwarmSyncUpdateDelay,
SwarmMaxStreamPeerServersFlag,
SwarmLightNodeEnabled,
SwarmDeliverySkipCheckFlag,
SwarmListenAddrFlag,

View File

@ -50,26 +50,27 @@ type Config struct {
Swap *swap.LocalProfile
Pss *pss.PssParams
//*network.SyncParams
Contract common.Address
EnsRoot common.Address
EnsAPIs []string
Path string
ListenAddr string
Port string
PublicKey string
BzzKey string
NodeID string
NetworkID uint64
SwapEnabled bool
SyncEnabled bool
SyncingSkipCheck bool
DeliverySkipCheck bool
LightNodeEnabled bool
SyncUpdateDelay time.Duration
SwapAPI string
Cors string
BzzAccount string
privateKey *ecdsa.PrivateKey
Contract common.Address
EnsRoot common.Address
EnsAPIs []string
Path string
ListenAddr string
Port string
PublicKey string
BzzKey string
NodeID string
NetworkID uint64
SwapEnabled bool
SyncEnabled bool
SyncingSkipCheck bool
DeliverySkipCheck bool
MaxStreamPeerServers int
LightNodeEnabled bool
SyncUpdateDelay time.Duration
SwapAPI string
Cors string
BzzAccount string
privateKey *ecdsa.PrivateKey
}
//create a default config with all parameters to set to defaults

View File

@ -84,7 +84,7 @@ func createGlobalStore() (string, *mockdb.GlobalStore, error) {
return globalStoreDir, globalStore, nil
}
func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
// setup
addr := network.RandomAddr() // tested peers peer address
to := network.NewKademlia(addr.OAddr, network.NewKadParams())
@ -114,7 +114,7 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora
delivery := NewDelivery(to, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
streamer := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), nil)
streamer := NewRegistry(addr, delivery, netStore, state.NewInmemoryStore(), registryOptions)
teardown := func() {
streamer.Close()
removeDataDir()

View File

@ -39,7 +39,7 @@ import (
)
func TestStreamerRetrieveRequest(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -75,7 +75,7 @@ func TestStreamerRetrieveRequest(t *testing.T) {
}
func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -127,7 +127,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
// upstream request server receives a retrieve Request and responds with
// offered hashes or delivery if skipHash is set to true
func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
tester, streamer, localStore, teardown, err := newStreamerTester(t)
tester, streamer, localStore, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -221,7 +221,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
}
func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
tester, streamer, localStore, teardown, err := newStreamerTester(t)
tester, streamer, localStore, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)

View File

@ -84,11 +84,13 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e
defer func() {
if err != nil {
if e := p.Send(context.TODO(), SubscribeErrorMsg{
// The error will be sent as a subscribe error message
// and will not be returned as it will prevent any new message
// exchange between peers over p2p. Instead, error will be returned
// only if there is one from sending subscribe error message.
err = p.Send(context.TODO(), SubscribeErrorMsg{
Error: err.Error(),
}); e != nil {
log.Error("send stream subscribe error message", "err", err)
}
})
}
}()

View File

@ -18,6 +18,7 @@ package stream
import (
"context"
"errors"
"fmt"
"sync"
"time"
@ -46,6 +47,10 @@ func (e *notFoundError) Error() string {
return fmt.Sprintf("%s not found for stream %q", e.t, e.s)
}
// ErrMaxPeerServers will be returned if peer server limit is reached.
// It will be sent in the SubscribeErrorMsg.
var ErrMaxPeerServers = errors.New("max peer servers")
// Peer is the Peer extension for the streaming protocol
type Peer struct {
*protocols.Peer
@ -204,6 +209,11 @@ func (p *Peer) setServer(s Stream, o Server, priority uint8) (*server, error) {
if p.servers[s] != nil {
return nil, fmt.Errorf("server %s already registered", s)
}
if p.streamer.maxPeerServers > 0 && len(p.servers) >= p.streamer.maxPeerServers {
return nil, ErrMaxPeerServers
}
os := &server{
Server: o,
stream: s,
@ -346,6 +356,7 @@ func (p *Peer) removeClient(s Stream) error {
return newNotFoundError("client", s)
}
client.close()
delete(p.clients, s)
return nil
}

View File

@ -60,6 +60,7 @@ type Registry struct {
delivery *Delivery
intervalsStore state.Store
doRetrieve bool
maxPeerServers int
}
// RegistryOptions holds optional values for NewRegistry constructor.
@ -68,6 +69,7 @@ type RegistryOptions struct {
DoSync bool
DoRetrieve bool
SyncUpdateDelay time.Duration
MaxPeerServers int // The limit of servers for each peer in registry
}
// NewRegistry is Streamer constructor
@ -87,6 +89,7 @@ func NewRegistry(addr *network.BzzAddr, delivery *Delivery, syncChunkStore stora
delivery: delivery,
intervalsStore: intervalsStore,
doRetrieve: options.DoRetrieve,
maxPeerServers: options.MaxPeerServers,
}
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer

View File

@ -19,6 +19,7 @@ package stream
import (
"bytes"
"context"
"strconv"
"testing"
"time"
@ -27,7 +28,7 @@ import (
)
func TestStreamerSubscribe(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -41,7 +42,7 @@ func TestStreamerSubscribe(t *testing.T) {
}
func TestStreamerRequestSubscription(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -125,7 +126,7 @@ func (self *testServer) Close() {
}
func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -218,7 +219,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -285,7 +286,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -351,7 +352,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
}
func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -395,7 +396,7 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -460,7 +461,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
}
func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -559,7 +560,7 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
}
func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -685,3 +686,165 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
t.Fatal(err)
}
}
// TestMaxPeerServersWithUnsubscribe creates a registry with a limited
// number of stream servers, and performs a test with subscriptions and
// unsubscriptions, checking if unsubscriptions will remove streams,
// leaving place for new streams.
func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
var maxPeerServers = 6
tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
MaxPeerServers: maxPeerServers,
})
defer teardown()
if err != nil {
t.Fatal(err)
}
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t), nil
})
peerID := tester.IDs[0]
for i := 0; i < maxPeerServers+10; i++ {
stream := NewStream("foo", strconv.Itoa(i), true)
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
Triggers: []p2ptest.Trigger{
{
Code: 4,
Msg: &SubscribeMsg{
Stream: stream,
Priority: Top,
},
Peer: peerID,
},
},
Expects: []p2ptest.Expect{
{
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 1,
To: 1,
},
Peer: peerID,
},
},
})
if err != nil {
t.Fatal(err)
}
err = tester.TestExchanges(p2ptest.Exchange{
Label: "unsubscribe message",
Triggers: []p2ptest.Trigger{
{
Code: 0,
Msg: &UnsubscribeMsg{
Stream: stream,
},
Peer: peerID,
},
},
})
if err != nil {
t.Fatal(err)
}
}
}
// TestMaxPeerServersWithoutUnsubscribe creates a registry with a limited
// number of stream servers, and performs subscriptions to detect sunscriptions
// error message exchange.
func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
var maxPeerServers = 6
tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
MaxPeerServers: maxPeerServers,
})
defer teardown()
if err != nil {
t.Fatal(err)
}
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t), nil
})
peerID := tester.IDs[0]
for i := 0; i < maxPeerServers+10; i++ {
stream := NewStream("foo", strconv.Itoa(i), true)
if i >= maxPeerServers {
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
Triggers: []p2ptest.Trigger{
{
Code: 4,
Msg: &SubscribeMsg{
Stream: stream,
Priority: Top,
},
Peer: peerID,
},
},
Expects: []p2ptest.Expect{
{
Code: 7,
Msg: &SubscribeErrorMsg{
Error: ErrMaxPeerServers.Error(),
},
Peer: peerID,
},
},
})
if err != nil {
t.Fatal(err)
}
continue
}
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
Triggers: []p2ptest.Trigger{
{
Code: 4,
Msg: &SubscribeMsg{
Stream: stream,
Priority: Top,
},
Peer: peerID,
},
},
Expects: []p2ptest.Expect{
{
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 1,
To: 1,
},
Peer: peerID,
},
},
})
if err != nil {
t.Fatal(err)
}
}
}

View File

@ -186,6 +186,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
DoSync: config.SyncEnabled,
DoRetrieve: true,
SyncUpdateDelay: config.SyncUpdateDelay,
MaxPeerServers: config.MaxStreamPeerServers,
})
// Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage