swarm/storage/mru: Renamed all identifiers to Feeds

This commit is contained in:
Javier Peletier 2018-09-29 01:00:28 +02:00
parent bd1f7ebda2
commit f1e86ad9cf
24 changed files with 236 additions and 236 deletions

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>. // along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
// Command resource allows the user to create and update signed mutable resource updates // Command resource allows the user to create and update signed Swarm Feeds
package main package main
import ( import (
@ -66,7 +66,7 @@ func resourceCreate(ctx *cli.Context) {
) )
newResourceRequest := mru.NewFirstRequest(getTopic(ctx)) newResourceRequest := mru.NewFirstRequest(getTopic(ctx))
newResourceRequest.View.User = resourceGetUser(ctx) newResourceRequest.Feed.User = resourceGetUser(ctx)
manifestAddress, err := client.CreateResource(newResourceRequest) manifestAddress, err := client.CreateResource(newResourceRequest)
if err != nil { if err != nil {

View File

@ -101,7 +101,7 @@ func TestCLIResourceUpdate(t *testing.T) {
} }
// View configures whose updates we will be looking up. // View configures whose updates we will be looking up.
view := mru.View{ view := mru.Feed{
Topic: topic, Topic: topic,
User: address, User: address,
} }
@ -146,8 +146,8 @@ func TestCLIResourceUpdate(t *testing.T) {
} }
// make sure the retrieved view is the same // make sure the retrieved view is the same
if request.View != view { if request.Feed != view {
t.Fatalf("Expected view to be: %s, got %s", view, request.View) t.Fatalf("Expected view to be: %s, got %s", view, request.Feed)
} }
// test publishing a manifest // test publishing a manifest

View File

@ -956,14 +956,14 @@ func (a *API) BuildDirectoryTree(ctx context.Context, mhash string, nameresolver
return addr, manifestEntryMap, nil return addr, manifestEntryMap, nil
} }
// ResourceLookup finds mutable resource updates at specific periods and versions // ResourceLookup finds Swarm Feeds at specific periods and versions
func (a *API) ResourceLookup(ctx context.Context, query *mru.Query) ([]byte, error) { func (a *API) ResourceLookup(ctx context.Context, query *mru.Query) ([]byte, error) {
_, err := a.resource.Lookup(ctx, query) _, err := a.resource.Lookup(ctx, query)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var data []byte var data []byte
_, data, err = a.resource.GetContent(&query.View) _, data, err = a.resource.GetContent(&query.Feed)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -971,7 +971,7 @@ func (a *API) ResourceLookup(ctx context.Context, query *mru.Query) ([]byte, err
} }
// ResourceNewRequest creates a Request object to update a specific mutable resource // ResourceNewRequest creates a Request object to update a specific mutable resource
func (a *API) ResourceNewRequest(ctx context.Context, view *mru.View) (*mru.Request, error) { func (a *API) ResourceNewRequest(ctx context.Context, view *mru.Feed) (*mru.Request, error) {
return a.resource.NewRequest(ctx, view) return a.resource.NewRequest(ctx, view)
} }
@ -993,7 +993,7 @@ var ErrCannotLoadResourceManifest = errors.New("Cannot load resource manifest")
var ErrNotAResourceManifest = errors.New("Not a resource manifest") var ErrNotAResourceManifest = errors.New("Not a resource manifest")
// ResolveResourceManifest retrieves the Mutable Resource manifest for the given address, and returns the Resource's view ID. // ResolveResourceManifest retrieves the Mutable Resource manifest for the given address, and returns the Resource's view ID.
func (a *API) ResolveResourceManifest(ctx context.Context, addr storage.Address) (*mru.View, error) { func (a *API) ResolveResourceManifest(ctx context.Context, addr storage.Address) (*mru.Feed, error) {
trie, err := loadManifest(ctx, a.fileStore, addr, nil, NOOPDecrypt) trie, err := loadManifest(ctx, a.fileStore, addr, nil, NOOPDecrypt)
if err != nil { if err != nil {
return nil, ErrCannotLoadResourceManifest return nil, ErrCannotLoadResourceManifest
@ -1016,8 +1016,8 @@ var ErrCannotResolveResourceView = errors.New("Cannot resolve resource view")
// ResolveResourceView attempts to extract View information out of the manifest, if provided // ResolveResourceView attempts to extract View information out of the manifest, if provided
// If not, it attempts to extract the View out of a set of key-value pairs // If not, it attempts to extract the View out of a set of key-value pairs
func (a *API) ResolveResourceView(ctx context.Context, uri *URI, values mru.Values) (*mru.View, error) { func (a *API) ResolveResourceView(ctx context.Context, uri *URI, values mru.Values) (*mru.Feed, error) {
var view *mru.View var view *mru.Feed
var err error var err error
if uri.Addr != "" { if uri.Addr != "" {
// resolve the content key. // resolve the content key.
@ -1036,7 +1036,7 @@ func (a *API) ResolveResourceView(ctx context.Context, uri *URI, values mru.Valu
} }
log.Debug("handle.get.resource: resolved", "manifestkey", manifestAddr, "view", view.Hex()) log.Debug("handle.get.resource: resolved", "manifestkey", manifestAddr, "view", view.Hex())
} else { } else {
var v mru.View var v mru.Feed
if err := v.FromValues(values); err != nil { if err := v.FromValues(values); err != nil {
return nil, ErrCannotResolveResourceView return nil, ErrCannotResolveResourceView

View File

@ -508,7 +508,7 @@ func TestClientCreateUpdateResource(t *testing.T) {
// now try retrieving resource without a manifest // now try retrieving resource without a manifest
view := &mru.View{ view := &mru.Feed{
Topic: topic, Topic: topic,
User: signer.Address(), User: signer.Address(),
} }

View File

@ -517,7 +517,7 @@ func (s *Server) HandlePostResource(w http.ResponseWriter, r *http.Request) {
} }
var updateRequest mru.Request var updateRequest mru.Request
updateRequest.View = *view updateRequest.Feed = *view
query := r.URL.Query() query := r.URL.Query()
if err := updateRequest.FromValues(query, body); err != nil { // decodes request from query parameters if err := updateRequest.FromValues(query, body); err != nil { // decodes request from query parameters
@ -544,7 +544,7 @@ func (s *Server) HandlePostResource(w http.ResponseWriter, r *http.Request) {
// we create a manifest so we can retrieve the resource with bzz:// later // we create a manifest so we can retrieve the resource with bzz:// later
// this manifest has a special "resource type" manifest, and saves the // this manifest has a special "resource type" manifest, and saves the
// resource view ID used to retrieve the resource later // resource view ID used to retrieve the resource later
m, err := s.api.NewResourceManifest(r.Context(), &updateRequest.View) m, err := s.api.NewResourceManifest(r.Context(), &updateRequest.Feed)
if err != nil { if err != nil {
RespondError(w, r, fmt.Sprintf("failed to create resource manifest: %v", err), http.StatusInternalServerError) RespondError(w, r, fmt.Sprintf("failed to create resource manifest: %v", err), http.StatusInternalServerError)
return return
@ -563,7 +563,7 @@ func (s *Server) HandlePostResource(w http.ResponseWriter, r *http.Request) {
} }
} }
// Retrieve mutable resource updates: // Retrieve Swarm Feeds:
// bzz-resource://<id> - get latest update // bzz-resource://<id> - get latest update
// bzz-resource://<id>/?period=n - get latest update on period n // bzz-resource://<id>/?period=n - get latest update on period n
// bzz-resource://<id>/?period=n&version=m - get update version m of period n // bzz-resource://<id>/?period=n&version=m - get update version m of period n
@ -606,7 +606,7 @@ func (s *Server) HandleGetResource(w http.ResponseWriter, r *http.Request) {
return return
} }
lookupParams := &mru.Query{View: *view} lookupParams := &mru.Query{Feed: *view}
if err = lookupParams.FromValues(r.URL.Query()); err != nil { // parse period, version if err = lookupParams.FromValues(r.URL.Query()); err != nil { // parse period, version
RespondError(w, r, fmt.Sprintf("invalid mutable resource request:%s", err), http.StatusBadRequest) RespondError(w, r, fmt.Sprintf("invalid mutable resource request:%s", err), http.StatusBadRequest)
return return

View File

@ -206,7 +206,7 @@ func TestBzzResourceMultihash(t *testing.T) {
} }
} }
// Test resource updates using the raw update methods // Test Swarm Feeds using the raw update methods
func TestBzzResource(t *testing.T) { func TestBzzResource(t *testing.T) {
srv := testutil.NewTestSwarmServer(t, serverFunc, nil) srv := testutil.NewTestSwarmServer(t, serverFunc, nil)
signer, _ := newTestSigner() signer, _ := newTestSigner()
@ -406,7 +406,7 @@ func TestBzzResource(t *testing.T) {
// test manifest-less queries // test manifest-less queries
log.Info("get first update in update1Timestamp via direct query") log.Info("get first update in update1Timestamp via direct query")
query := mru.NewQuery(&updateRequest.View, update1Timestamp, lookup.NoClue) query := mru.NewQuery(&updateRequest.Feed, update1Timestamp, lookup.NoClue)
urlq, err := url.Parse(fmt.Sprintf("%s/bzz-resource:/", srv.URL)) urlq, err := url.Parse(fmt.Sprintf("%s/bzz-resource:/", srv.URL))
if err != nil { if err != nil {

View File

@ -56,7 +56,7 @@ type ManifestEntry struct {
ModTime time.Time `json:"mod_time,omitempty"` ModTime time.Time `json:"mod_time,omitempty"`
Status int `json:"status,omitempty"` Status int `json:"status,omitempty"`
Access *AccessEntry `json:"access,omitempty"` Access *AccessEntry `json:"access,omitempty"`
ResourceView *mru.View `json:"resourceView,omitempty"` ResourceView *mru.Feed `json:"resourceView,omitempty"`
} }
// ManifestList represents the result of listing files in a manifest // ManifestList represents the result of listing files in a manifest
@ -82,7 +82,7 @@ func (a *API) NewManifest(ctx context.Context, toEncrypt bool) (storage.Address,
// Manifest hack for supporting Mutable Resource Updates from the bzz: scheme // Manifest hack for supporting Mutable Resource Updates from the bzz: scheme
// see swarm/api/api.go:API.Get() for more information // see swarm/api/api.go:API.Get() for more information
func (a *API) NewResourceManifest(ctx context.Context, view *mru.View) (storage.Address, error) { func (a *API) NewResourceManifest(ctx context.Context, view *mru.Feed) (storage.Address, error) {
var manifest Manifest var manifest Manifest
entry := ManifestEntry{ entry := ManifestEntry{
ResourceView: view, ResourceView: view,

View File

@ -37,7 +37,7 @@ Using the streamer logic, various stream types are easy to implement:
* live session syncing * live session syncing
* historical syncing * historical syncing
* simple retrieve requests and deliveries * simple retrieve requests and deliveries
* mutable resource updates streams * Swarm Feeds streams
* receipting for finger pointing * receipting for finger pointing
## Syncing ## Syncing

View File

@ -26,23 +26,23 @@ import (
const ( const (
hasherCount = 8 hasherCount = 8
resourceHashAlgorithm = storage.SHA3Hash feedsHashAlgorithm = storage.SHA3Hash
defaultRetrieveTimeout = 100 * time.Millisecond defaultRetrieveTimeout = 100 * time.Millisecond
) )
// cacheEntry caches resource data and the metadata of its root chunk. // cacheEntry caches resource data and the metadata of its root chunk.
type cacheEntry struct { type cacheEntry struct {
ResourceUpdate Update
*bytes.Reader *bytes.Reader
lastKey storage.Address lastKey storage.Address
} }
// implements storage.LazySectionReader // implements storage.LazySectionReader
func (r *cacheEntry) Size(ctx context.Context, _ chan bool) (int64, error) { func (r *cacheEntry) Size(ctx context.Context, _ chan bool) (int64, error) {
return int64(len(r.ResourceUpdate.data)), nil return int64(len(r.Update.data)), nil
} }
//returns the resource's topic //returns the resource's topic
func (r *cacheEntry) Topic() Topic { func (r *cacheEntry) Topic() Topic {
return r.View.Topic return r.Feed.Topic
} }

View File

@ -1,5 +1,5 @@
/* /*
Package mru defines Mutable resource updates. Package feeds defines Swarm Feeds.
A Mutable Resource is an entity which allows updates to a resource A Mutable Resource is an entity which allows updates to a resource
without resorting to ENS on each update. without resorting to ENS on each update.

View File

@ -34,8 +34,8 @@ import (
type Handler struct { type Handler struct {
chunkStore *storage.NetStore chunkStore *storage.NetStore
HashSize int HashSize int
resources map[uint64]*cacheEntry cache map[uint64]*cacheEntry
resourceLock sync.RWMutex cacheLock sync.RWMutex
storeTimeout time.Duration storeTimeout time.Duration
queryMaxPeriods uint32 queryMaxPeriods uint32
} }
@ -52,26 +52,26 @@ var hashPool sync.Pool
func init() { func init() {
hashPool = sync.Pool{ hashPool = sync.Pool{
New: func() interface{} { New: func() interface{} {
return storage.MakeHashFunc(resourceHashAlgorithm)() return storage.MakeHashFunc(feedsHashAlgorithm)()
}, },
} }
} }
// NewHandler creates a new Mutable Resource API // NewHandler creates a new Mutable Resource API
func NewHandler(params *HandlerParams) *Handler { func NewHandler(params *HandlerParams) *Handler {
rh := &Handler{ fh := &Handler{
resources: make(map[uint64]*cacheEntry), cache: make(map[uint64]*cacheEntry),
} }
for i := 0; i < hasherCount; i++ { for i := 0; i < hasherCount; i++ {
hashfunc := storage.MakeHashFunc(resourceHashAlgorithm)() hashfunc := storage.MakeHashFunc(feedsHashAlgorithm)()
if rh.HashSize == 0 { if fh.HashSize == 0 {
rh.HashSize = hashfunc.Size() fh.HashSize = hashfunc.Size()
} }
hashPool.Put(hashfunc) hashPool.Put(hashfunc)
} }
return rh return fh
} }
// SetStore sets the store backend for the Mutable Resource API // SetStore sets the store backend for the Mutable Resource API
@ -95,7 +95,7 @@ func (h *Handler) Validate(chunkAddr storage.Address, data []byte) bool {
// First, deserialize the chunk // First, deserialize the chunk
var r Request var r Request
if err := r.fromChunk(chunkAddr, data); err != nil { if err := r.fromChunk(chunkAddr, data); err != nil {
log.Debug("Invalid resource chunk", "addr", chunkAddr.Hex(), "err", err.Error()) log.Debug("Invalid feed update chunk", "addr", chunkAddr.Hex(), "err", err.Error())
return false return false
} }
@ -103,7 +103,7 @@ func (h *Handler) Validate(chunkAddr storage.Address, data []byte) bool {
// If it fails, it means either the signature is not valid, data is corrupted // If it fails, it means either the signature is not valid, data is corrupted
// or someone is trying to update someone else's resource. // or someone is trying to update someone else's resource.
if err := r.Verify(); err != nil { if err := r.Verify(); err != nil {
log.Debug("Invalid signature", "err", err) log.Debug("Invalid feed update signature", "err", err)
return false return false
} }
@ -111,32 +111,32 @@ func (h *Handler) Validate(chunkAddr storage.Address, data []byte) bool {
} }
// GetContent retrieves the data payload of the last synced update of the Mutable Resource // GetContent retrieves the data payload of the last synced update of the Mutable Resource
func (h *Handler) GetContent(view *View) (storage.Address, []byte, error) { func (h *Handler) GetContent(feed *Feed) (storage.Address, []byte, error) {
if view == nil { if feed == nil {
return nil, nil, NewError(ErrInvalidValue, "view is nil") return nil, nil, NewError(ErrInvalidValue, "view is nil")
} }
rsrc := h.get(view) feedUpdate := h.get(feed)
if rsrc == nil { if feedUpdate == nil {
return nil, nil, NewError(ErrNotFound, "resource does not exist") return nil, nil, NewError(ErrNotFound, "resource does not exist")
} }
return rsrc.lastKey, rsrc.data, nil return feedUpdate.lastKey, feedUpdate.data, nil
} }
// NewRequest prepares a Request structure with all the necessary information to // NewRequest prepares a Request structure with all the necessary information to
// just add the desired data and sign it. // just add the desired data and sign it.
// The resulting structure can then be signed and passed to Handler.Update to be verified and sent // The resulting structure can then be signed and passed to Handler.Update to be verified and sent
func (h *Handler) NewRequest(ctx context.Context, view *View) (request *Request, err error) { func (h *Handler) NewRequest(ctx context.Context, feed *Feed) (request *Request, err error) {
if view == nil { if feed == nil {
return nil, NewError(ErrInvalidValue, "view cannot be nil") return nil, NewError(ErrInvalidValue, "feed cannot be nil")
} }
now := TimestampProvider.Now().Time now := TimestampProvider.Now().Time
request = new(Request) request = new(Request)
request.Header.Version = ProtocolVersion request.Header.Version = ProtocolVersion
query := NewQueryLatest(view, lookup.NoClue) query := NewQueryLatest(feed, lookup.NoClue)
rsrc, err := h.Lookup(ctx, query) feedUpdate, err := h.Lookup(ctx, query)
if err != nil { if err != nil {
if err.(*Error).code != ErrNotFound { if err.(*Error).code != ErrNotFound {
return nil, err return nil, err
@ -145,11 +145,11 @@ func (h *Handler) NewRequest(ctx context.Context, view *View) (request *Request,
// or that the resource really does not have updates // or that the resource really does not have updates
} }
request.View = *view request.Feed = *feed
// if we already have an update, then find next epoch // if we already have an update, then find next epoch
if rsrc != nil { if feedUpdate != nil {
request.Epoch = lookup.GetNextEpoch(rsrc.Epoch, now) request.Epoch = lookup.GetNextEpoch(feedUpdate.Epoch, now)
} else { } else {
request.Epoch = lookup.GetFirstEpoch(now) request.Epoch = lookup.GetFirstEpoch(now)
} }
@ -172,7 +172,7 @@ func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error)
} }
if query.Hint == lookup.NoClue { // try to use our cache if query.Hint == lookup.NoClue { // try to use our cache
entry := h.get(&query.View) entry := h.get(&query.Feed)
if entry != nil && entry.Epoch.Time <= timeLimit { // avoid bad hints if entry != nil && entry.Epoch.Time <= timeLimit { // avoid bad hints
query.Hint = entry.Epoch query.Hint = entry.Epoch
} }
@ -183,19 +183,19 @@ func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error)
return nil, NewError(ErrInit, "Call Handler.SetStore() before performing lookups") return nil, NewError(ErrInit, "Call Handler.SetStore() before performing lookups")
} }
var ul ID var id ID
ul.View = query.View id.Feed = query.Feed
var readCount int var readCount int
// Invoke the lookup engine. // Invoke the lookup engine.
// The callback will be called every time the lookup algorithm needs to guess // The callback will be called every time the lookup algorithm needs to guess
requestPtr, err := lookup.Lookup(timeLimit, query.Hint, func(epoch lookup.Epoch, now uint64) (interface{}, error) { requestPtr, err := lookup.Lookup(timeLimit, query.Hint, func(epoch lookup.Epoch, now uint64) (interface{}, error) {
readCount++ readCount++
ul.Epoch = epoch id.Epoch = epoch
ctx, cancel := context.WithTimeout(ctx, defaultRetrieveTimeout) ctx, cancel := context.WithTimeout(ctx, defaultRetrieveTimeout)
defer cancel() defer cancel()
chunk, err := h.chunkStore.Get(ctx, ul.Addr()) chunk, err := h.chunkStore.Get(ctx, id.Addr())
if err != nil { // TODO: check for catastrophic errors other than chunk not found if err != nil { // TODO: check for catastrophic errors other than chunk not found
return nil, nil return nil, nil
} }
@ -227,19 +227,19 @@ func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error)
func (h *Handler) updateCache(request *Request) (*cacheEntry, error) { func (h *Handler) updateCache(request *Request) (*cacheEntry, error) {
updateAddr := request.Addr() updateAddr := request.Addr()
log.Trace("resource cache update", "topic", request.Topic.Hex(), "updatekey", updateAddr, "epoch time", request.Epoch.Time, "epoch level", request.Epoch.Level) log.Trace("feed cache update", "topic", request.Topic.Hex(), "updateaddr", updateAddr, "epoch time", request.Epoch.Time, "epoch level", request.Epoch.Level)
rsrc := h.get(&request.View) feedUpdate := h.get(&request.Feed)
if rsrc == nil { if feedUpdate == nil {
rsrc = &cacheEntry{} feedUpdate = &cacheEntry{}
h.set(&request.View, rsrc) h.set(&request.Feed, feedUpdate)
} }
// update our rsrcs entry map // update our rsrcs entry map
rsrc.lastKey = updateAddr feedUpdate.lastKey = updateAddr
rsrc.ResourceUpdate = request.ResourceUpdate feedUpdate.Update = request.Update
rsrc.Reader = bytes.NewReader(rsrc.data) feedUpdate.Reader = bytes.NewReader(feedUpdate.data)
return rsrc, nil return feedUpdate, nil
} }
// Update adds an actual data update // Update adds an actual data update
@ -255,8 +255,8 @@ func (h *Handler) Update(ctx context.Context, r *Request) (updateAddr storage.Ad
return nil, NewError(ErrInit, "Call Handler.SetStore() before updating") return nil, NewError(ErrInit, "Call Handler.SetStore() before updating")
} }
rsrc := h.get(&r.View) feedUpdate := h.get(&r.Feed)
if rsrc != nil && rsrc.Epoch.Equals(r.Epoch) { // This is the only cheap check we can do for sure if feedUpdate != nil && feedUpdate.Epoch.Equals(r.Epoch) { // This is the only cheap check we can do for sure
return nil, NewError(ErrInvalidValue, "A former update in this epoch is already known to exist") return nil, NewError(ErrInvalidValue, "A former update in this epoch is already known to exist")
} }
@ -267,32 +267,32 @@ func (h *Handler) Update(ctx context.Context, r *Request) (updateAddr storage.Ad
// send the chunk // send the chunk
h.chunkStore.Put(ctx, chunk) h.chunkStore.Put(ctx, chunk)
log.Trace("resource update", "updateAddr", r.idAddr, "epoch time", r.Epoch.Time, "epoch level", r.Epoch.Level, "data", chunk.Data()) log.Trace("feed update", "updateAddr", r.idAddr, "epoch time", r.Epoch.Time, "epoch level", r.Epoch.Level, "data", chunk.Data())
// update our resources map cache entry if the new update is older than the one we have, if we have it. // update our resources map cache entry if the new update is older than the one we have, if we have it.
if rsrc != nil && r.Epoch.After(rsrc.Epoch) { if feedUpdate != nil && r.Epoch.After(feedUpdate.Epoch) {
rsrc.Epoch = r.Epoch feedUpdate.Epoch = r.Epoch
rsrc.data = make([]byte, len(r.data)) feedUpdate.data = make([]byte, len(r.data))
rsrc.lastKey = r.idAddr feedUpdate.lastKey = r.idAddr
copy(rsrc.data, r.data) copy(feedUpdate.data, r.data)
rsrc.Reader = bytes.NewReader(rsrc.data) feedUpdate.Reader = bytes.NewReader(feedUpdate.data)
} }
return r.idAddr, nil return r.idAddr, nil
} }
// Retrieves the resource cache value for the given nameHash // Retrieves the resource cache value for the given nameHash
func (h *Handler) get(view *View) *cacheEntry { func (h *Handler) get(view *Feed) *cacheEntry {
mapKey := view.mapKey() mapKey := view.mapKey()
h.resourceLock.RLock() h.cacheLock.RLock()
defer h.resourceLock.RUnlock() defer h.cacheLock.RUnlock()
rsrc := h.resources[mapKey] feedUpdate := h.cache[mapKey]
return rsrc return feedUpdate
} }
// Sets the resource cache value for the given View // Sets the resource cache value for the given View
func (h *Handler) set(view *View, rsrc *cacheEntry) { func (h *Handler) set(view *Feed, feedUpdate *cacheEntry) {
mapKey := view.mapKey() mapKey := view.mapKey()
h.resourceLock.Lock() h.cacheLock.Lock()
defer h.resourceLock.Unlock() defer h.cacheLock.Unlock()
h.resources[mapKey] = rsrc h.cache[mapKey] = feedUpdate
} }

View File

@ -40,7 +40,7 @@ var (
Time: uint64(4200), Time: uint64(4200),
} }
cleanF func() cleanF func()
resourceName = "føø.bar" subtopicName = "føø.bar"
hashfunc = storage.MakeHashFunc(storage.DefaultHash) hashfunc = storage.MakeHashFunc(storage.DefaultHash)
) )
@ -73,7 +73,7 @@ func (f *fakeTimeProvider) Now() Timestamp {
} }
// make updates and retrieve them based on periods and versions // make updates and retrieve them based on periods and versions
func TestResourceHandler(t *testing.T) { func TestFeedsHandler(t *testing.T) {
// make fake timeProvider // make fake timeProvider
clock := &fakeTimeProvider{ clock := &fakeTimeProvider{
@ -83,7 +83,7 @@ func TestResourceHandler(t *testing.T) {
// signer containing private key // signer containing private key
signer := newAliceSigner() signer := newAliceSigner()
rh, datadir, teardownTest, err := setupTest(clock, signer) feedsHandler, datadir, teardownTest, err := setupTest(clock, signer)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -93,8 +93,8 @@ func TestResourceHandler(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
topic, _ := NewTopic("Mess with mru code and see what ghost catches you", nil) topic, _ := NewTopic("Mess with Swarm Feeds code and see what ghost catches you", nil)
view := View{ view := Feed{
Topic: topic, Topic: topic,
User: signer.Address(), User: signer.Address(),
} }
@ -108,13 +108,13 @@ func TestResourceHandler(t *testing.T) {
} }
request := NewFirstRequest(view.Topic) // this timestamps the update at t = 4200 (start time) request := NewFirstRequest(view.Topic) // this timestamps the update at t = 4200 (start time)
resourcekey := make(map[string]storage.Address) chunkAddress := make(map[string]storage.Address)
data := []byte(updates[0]) data := []byte(updates[0])
request.SetData(data) request.SetData(data)
if err := request.Sign(signer); err != nil { if err := request.Sign(signer); err != nil {
t.Fatal(err) t.Fatal(err)
} }
resourcekey[updates[0]], err = rh.Update(ctx, request) chunkAddress[updates[0]], err = feedsHandler.Update(ctx, request)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -122,7 +122,7 @@ func TestResourceHandler(t *testing.T) {
// move the clock ahead 21 seconds // move the clock ahead 21 seconds
clock.FastForward(21) // t=4221 clock.FastForward(21) // t=4221
request, err = rh.NewRequest(ctx, &request.View) // this timestamps the update at t = 4221 request, err = feedsHandler.NewRequest(ctx, &request.Feed) // this timestamps the update at t = 4221
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -136,14 +136,14 @@ func TestResourceHandler(t *testing.T) {
if err := request.Sign(signer); err != nil { if err := request.Sign(signer); err != nil {
t.Fatal(err) t.Fatal(err)
} }
resourcekey[updates[1]], err = rh.Update(ctx, request) chunkAddress[updates[1]], err = feedsHandler.Update(ctx, request)
if err == nil { if err == nil {
t.Fatal("Expected update to fail since an update in this epoch already exists") t.Fatal("Expected update to fail since an update in this epoch already exists")
} }
// move the clock ahead 21 seconds // move the clock ahead 21 seconds
clock.FastForward(21) // t=4242 clock.FastForward(21) // t=4242
request, err = rh.NewRequest(ctx, &request.View) request, err = feedsHandler.NewRequest(ctx, &request.Feed)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -151,14 +151,14 @@ func TestResourceHandler(t *testing.T) {
if err := request.Sign(signer); err != nil { if err := request.Sign(signer); err != nil {
t.Fatal(err) t.Fatal(err)
} }
resourcekey[updates[1]], err = rh.Update(ctx, request) chunkAddress[updates[1]], err = feedsHandler.Update(ctx, request)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// move the clock ahead 42 seconds // move the clock ahead 42 seconds
clock.FastForward(42) // t=4284 clock.FastForward(42) // t=4284
request, err = rh.NewRequest(ctx, &request.View) request, err = feedsHandler.NewRequest(ctx, &request.Feed)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -167,14 +167,14 @@ func TestResourceHandler(t *testing.T) {
if err := request.Sign(signer); err != nil { if err := request.Sign(signer); err != nil {
t.Fatal(err) t.Fatal(err)
} }
resourcekey[updates[2]], err = rh.Update(ctx, request) chunkAddress[updates[2]], err = feedsHandler.Update(ctx, request)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// move the clock ahead 1 second // move the clock ahead 1 second
clock.FastForward(1) // t=4285 clock.FastForward(1) // t=4285
request, err = rh.NewRequest(ctx, &request.View) request, err = feedsHandler.NewRequest(ctx, &request.Feed)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -187,25 +187,25 @@ func TestResourceHandler(t *testing.T) {
if err := request.Sign(signer); err != nil { if err := request.Sign(signer); err != nil {
t.Fatal(err) t.Fatal(err)
} }
resourcekey[updates[3]], err = rh.Update(ctx, request) chunkAddress[updates[3]], err = feedsHandler.Update(ctx, request)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
time.Sleep(time.Second) time.Sleep(time.Second)
rh.Close() feedsHandler.Close()
// check we can retrieve the updates after close // check we can retrieve the updates after close
clock.FastForward(2000) // t=6285 clock.FastForward(2000) // t=6285
rhparams := &HandlerParams{} feedParams := &HandlerParams{}
rh2, err := NewTestHandler(datadir, rhparams) feedsHandler2, err := NewTestHandler(datadir, feedParams)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
rsrc2, err := rh2.Lookup(ctx, NewQueryLatest(&request.View, lookup.NoClue)) rsrc2, err := feedsHandler2.Lookup(ctx, NewQueryLatest(&request.Feed, lookup.NoClue))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -223,7 +223,7 @@ func TestResourceHandler(t *testing.T) {
log.Debug("Latest lookup", "epoch base time", rsrc2.Base(), "epoch level", rsrc2.Level, "data", rsrc2.data) log.Debug("Latest lookup", "epoch base time", rsrc2.Base(), "epoch level", rsrc2.Level, "data", rsrc2.data)
// specific point in time // specific point in time
rsrc, err := rh2.Lookup(ctx, NewQuery(&request.View, 4284, lookup.NoClue)) rsrc, err := feedsHandler2.Lookup(ctx, NewQuery(&request.Feed, 4284, lookup.NoClue))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -234,7 +234,7 @@ func TestResourceHandler(t *testing.T) {
log.Debug("Historical lookup", "epoch base time", rsrc2.Base(), "epoch level", rsrc2.Level, "data", rsrc2.data) log.Debug("Historical lookup", "epoch base time", rsrc2.Base(), "epoch level", rsrc2.Level, "data", rsrc2.data)
// beyond the first should yield an error // beyond the first should yield an error
rsrc, err = rh2.Lookup(ctx, NewQuery(&request.View, startTime.Time-1, lookup.NoClue)) rsrc, err = feedsHandler2.Lookup(ctx, NewQuery(&request.Feed, startTime.Time-1, lookup.NoClue))
if err == nil { if err == nil {
t.Fatalf("expected previous to fail, returned epoch %s data %v", rsrc.Epoch.String(), rsrc.data) t.Fatalf("expected previous to fail, returned epoch %s data %v", rsrc.Epoch.String(), rsrc.data)
} }
@ -270,7 +270,7 @@ func TestSparseUpdates(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
topic, _ := NewTopic("Very slow updates", nil) topic, _ := NewTopic("Very slow updates", nil)
view := View{ view := Feed{
Topic: topic, Topic: topic,
User: signer.Address(), User: signer.Address(),
} }
@ -349,12 +349,12 @@ func TestValidator(t *testing.T) {
defer teardownTest() defer teardownTest()
// create new resource // create new resource
topic, _ := NewTopic(resourceName, nil) topic, _ := NewTopic(subtopicName, nil)
view := View{ feed := Feed{
Topic: topic, Topic: topic,
User: signer.Address(), User: signer.Address(),
} }
mr := NewFirstRequest(view.Topic) mr := NewFirstRequest(feed.Topic)
// chunk with address // chunk with address
data := []byte("foo") data := []byte("foo")
@ -410,9 +410,9 @@ func TestValidatorInStore(t *testing.T) {
} }
// set up resource handler and add is as a validator to the localstore // set up resource handler and add is as a validator to the localstore
rhParams := &HandlerParams{} fhParams := &HandlerParams{}
rh := NewHandler(rhParams) fh := NewHandler(fhParams)
store.Validators = append(store.Validators, rh) store.Validators = append(store.Validators, fh)
// create content addressed chunks, one good, one faulty // create content addressed chunks, one good, one faulty
chunks := storage.GenerateRandomChunks(chunk.DefaultSize, 2) chunks := storage.GenerateRandomChunks(chunk.DefaultSize, 2)
@ -420,7 +420,7 @@ func TestValidatorInStore(t *testing.T) {
badChunk := storage.NewChunk(chunks[1].Address(), goodChunk.Data()) badChunk := storage.NewChunk(chunks[1].Address(), goodChunk.Data())
topic, _ := NewTopic("xyzzy", nil) topic, _ := NewTopic("xyzzy", nil)
view := View{ feed := Feed{
Topic: topic, Topic: topic,
User: signer.Address(), User: signer.Address(),
} }
@ -430,7 +430,7 @@ func TestValidatorInStore(t *testing.T) {
Epoch: lookup.Epoch{Time: 42, Epoch: lookup.Epoch{Time: 42,
Level: 1, Level: 1,
}, },
View: view, Feed: feed,
} }
updateAddr := id.Addr() updateAddr := id.Addr()
@ -438,7 +438,7 @@ func TestValidatorInStore(t *testing.T) {
r := new(Request) r := new(Request)
r.idAddr = updateAddr r.idAddr = updateAddr
r.ResourceUpdate.ID = id r.Update.ID = id
r.data = data r.data = data
r.Sign(signer) r.Sign(signer)
@ -451,20 +451,20 @@ func TestValidatorInStore(t *testing.T) {
// put the chunks in the store and check their error status // put the chunks in the store and check their error status
err = store.Put(context.Background(), goodChunk) err = store.Put(context.Background(), goodChunk)
if err == nil { if err == nil {
t.Fatal("expected error on good content address chunk with resource validator only, but got nil") t.Fatal("expected error on good content address chunk with feed update validator only, but got nil")
} }
err = store.Put(context.Background(), badChunk) err = store.Put(context.Background(), badChunk)
if err == nil { if err == nil {
t.Fatal("expected error on bad content address chunk with resource validator only, but got nil") t.Fatal("expected error on bad content address chunk with feed update validator only, but got nil")
} }
err = store.Put(context.Background(), uglyChunk) err = store.Put(context.Background(), uglyChunk)
if err != nil { if err != nil {
t.Fatalf("expected no error on resource update chunk with resource validator only, but got: %s", err) t.Fatalf("expected no error on feed update chunk with feed update validator only, but got: %s", err)
} }
} }
// create rpc and resourcehandler // create rpc and resourcehandler
func setupTest(timeProvider timestampProvider, signer Signer) (rh *TestHandler, datadir string, teardown func(), err error) { func setupTest(timeProvider timestampProvider, signer Signer) (fh *TestHandler, datadir string, teardown func(), err error) {
var fsClean func() var fsClean func()
var rpcClean func() var rpcClean func()
@ -478,7 +478,7 @@ func setupTest(timeProvider timestampProvider, signer Signer) (rh *TestHandler,
} }
// temp datadir // temp datadir
datadir, err = ioutil.TempDir("", "rh") datadir, err = ioutil.TempDir("", "fh")
if err != nil { if err != nil {
return nil, "", nil, err return nil, "", nil, err
} }
@ -487,9 +487,9 @@ func setupTest(timeProvider timestampProvider, signer Signer) (rh *TestHandler,
} }
TimestampProvider = timeProvider TimestampProvider = timeProvider
rhparams := &HandlerParams{} fhParams := &HandlerParams{}
rh, err = NewTestHandler(datadir, rhparams) fh, err = NewTestHandler(datadir, fhParams)
return rh, datadir, cleanF, err return fh, datadir, cleanF, err
} }
func newAliceSigner() *GenericSigner { func newAliceSigner() *GenericSigner {

View File

@ -29,21 +29,21 @@ import (
// ID uniquely identifies an update on the network. // ID uniquely identifies an update on the network.
type ID struct { type ID struct {
View `json:"view"` Feed `json:"view"`
lookup.Epoch `json:"epoch"` lookup.Epoch `json:"epoch"`
} }
// ID layout: // ID layout:
// View viewLength bytes // Feed feedLength bytes
// Epoch EpochLength // Epoch EpochLength
const idLength = viewLength + lookup.EpochLength const idLength = feedLength + lookup.EpochLength
// Addr calculates the resource update chunk address corresponding to this ID // Addr calculates the resource update chunk address corresponding to this ID
func (u *ID) Addr() (updateAddr storage.Address) { func (u *ID) Addr() (updateAddr storage.Address) {
serializedData := make([]byte, idLength) serializedData := make([]byte, idLength)
var cursor int var cursor int
u.View.binaryPut(serializedData[cursor : cursor+viewLength]) u.Feed.binaryPut(serializedData[cursor : cursor+feedLength])
cursor += viewLength cursor += feedLength
eid := u.Epoch.ID() eid := u.Epoch.ID()
copy(serializedData[cursor:cursor+lookup.EpochLength], eid[:]) copy(serializedData[cursor:cursor+lookup.EpochLength], eid[:])
@ -61,10 +61,10 @@ func (u *ID) binaryPut(serializedData []byte) error {
return NewErrorf(ErrInvalidValue, "Incorrect slice size to serialize ID. Expected %d, got %d", idLength, len(serializedData)) return NewErrorf(ErrInvalidValue, "Incorrect slice size to serialize ID. Expected %d, got %d", idLength, len(serializedData))
} }
var cursor int var cursor int
if err := u.View.binaryPut(serializedData[cursor : cursor+viewLength]); err != nil { if err := u.Feed.binaryPut(serializedData[cursor : cursor+feedLength]); err != nil {
return err return err
} }
cursor += viewLength cursor += feedLength
epochBytes, err := u.Epoch.MarshalBinary() epochBytes, err := u.Epoch.MarshalBinary()
if err != nil { if err != nil {
@ -88,10 +88,10 @@ func (u *ID) binaryGet(serializedData []byte) error {
} }
var cursor int var cursor int
if err := u.View.binaryGet(serializedData[cursor : cursor+viewLength]); err != nil { if err := u.Feed.binaryGet(serializedData[cursor : cursor+feedLength]); err != nil {
return err return err
} }
cursor += viewLength cursor += feedLength
if err := u.Epoch.UnmarshalBinary(serializedData[cursor : cursor+lookup.EpochLength]); err != nil { if err := u.Epoch.UnmarshalBinary(serializedData[cursor : cursor+lookup.EpochLength]); err != nil {
return err return err
@ -108,8 +108,8 @@ func (u *ID) FromValues(values Values) error {
u.Epoch.Level = uint8(level) u.Epoch.Level = uint8(level)
u.Epoch.Time, _ = strconv.ParseUint(values.Get("time"), 10, 64) u.Epoch.Time, _ = strconv.ParseUint(values.Get("time"), 10, 64)
if u.View.User == (common.Address{}) { if u.Feed.User == (common.Address{}) {
return u.View.FromValues(values) return u.Feed.FromValues(values)
} }
return nil return nil
} }
@ -119,5 +119,5 @@ func (u *ID) FromValues(values Values) error {
func (u *ID) AppendValues(values Values) { func (u *ID) AppendValues(values Values) {
values.Set("level", fmt.Sprintf("%d", u.Epoch.Level)) values.Set("level", fmt.Sprintf("%d", u.Epoch.Level))
values.Set("time", fmt.Sprintf("%d", u.Epoch.Time)) values.Set("time", fmt.Sprintf("%d", u.Epoch.Time))
u.View.AppendValues(values) u.Feed.AppendValues(values)
} }

View File

@ -8,14 +8,14 @@ import (
func getTestID() *ID { func getTestID() *ID {
return &ID{ return &ID{
View: *getTestView(), Feed: *getTestFeed(),
Epoch: lookup.GetFirstEpoch(1000), Epoch: lookup.GetFirstEpoch(1000),
} }
} }
func TestIDAddr(t *testing.T) { func TestIDAddr(t *testing.T) {
ul := getTestID() id := getTestID()
updateAddr := ul.Addr() updateAddr := id.Addr()
compareByteSliceToExpectedHex(t, "updateAddr", updateAddr, "0x8b24583ec293e085f4c78aaee66d1bc5abfb8b4233304d14a349afa57af2a783") compareByteSliceToExpectedHex(t, "updateAddr", updateAddr, "0x8b24583ec293e085f4c78aaee66d1bc5abfb8b4233304d14a349afa57af2a783")
} }

View File

@ -27,7 +27,7 @@ import (
// Query is used to specify constraints when performing an update lookup // Query is used to specify constraints when performing an update lookup
// TimeLimit indicates an upper bound for the search. Set to 0 for "now" // TimeLimit indicates an upper bound for the search. Set to 0 for "now"
type Query struct { type Query struct {
View Feed
Hint lookup.Epoch Hint lookup.Epoch
TimeLimit uint64 TimeLimit uint64
} }
@ -41,8 +41,8 @@ func (q *Query) FromValues(values Values) error {
level, _ := strconv.ParseUint(values.Get("hint.level"), 10, 32) level, _ := strconv.ParseUint(values.Get("hint.level"), 10, 32)
q.Hint.Level = uint8(level) q.Hint.Level = uint8(level)
q.Hint.Time, _ = strconv.ParseUint(values.Get("hint.time"), 10, 64) q.Hint.Time, _ = strconv.ParseUint(values.Get("hint.time"), 10, 64)
if q.View.User == (common.Address{}) { if q.Feed.User == (common.Address{}) {
return q.View.FromValues(values) return q.Feed.FromValues(values)
} }
return nil return nil
} }
@ -59,20 +59,20 @@ func (q *Query) AppendValues(values Values) {
if q.Hint.Time != 0 { if q.Hint.Time != 0 {
values.Set("hint.time", fmt.Sprintf("%d", q.Hint.Time)) values.Set("hint.time", fmt.Sprintf("%d", q.Hint.Time))
} }
q.View.AppendValues(values) q.Feed.AppendValues(values)
} }
// NewQuery constructs an Query structure to find updates on or before `time` // NewQuery constructs an Query structure to find updates on or before `time`
// if time == 0, the latest update will be looked up // if time == 0, the latest update will be looked up
func NewQuery(view *View, time uint64, hint lookup.Epoch) *Query { func NewQuery(feed *Feed, time uint64, hint lookup.Epoch) *Query {
return &Query{ return &Query{
TimeLimit: time, TimeLimit: time,
View: *view, Feed: *feed,
Hint: hint, Hint: hint,
} }
} }
// NewQueryLatest generates lookup parameters that look for the latest version of a resource // NewQueryLatest generates lookup parameters that look for the latest version of a resource
func NewQueryLatest(view *View, hint lookup.Epoch) *Query { func NewQueryLatest(feed *Feed, hint lookup.Epoch) *Query {
return NewQuery(view, 0, hint) return NewQuery(feed, 0, hint)
} }

View File

@ -21,11 +21,11 @@ import (
) )
func getTestQuery() *Query { func getTestQuery() *Query {
ul := getTestID() id := getTestID()
return &Query{ return &Query{
TimeLimit: 5000, TimeLimit: 5000,
View: ul.View, Feed: id.Feed,
Hint: ul.Epoch, Hint: id.Epoch,
} }
} }

View File

@ -29,10 +29,10 @@ import (
// Request represents an update and/or resource create message // Request represents an update and/or resource create message
type Request struct { type Request struct {
ResourceUpdate // actual content that will be put on the chunk, less signature Update // actual content that will be put on the chunk, less signature
Signature *Signature Signature *Signature
idAddr storage.Address // cached chunk address for the update (not serialized, for internal use) idAddr storage.Address // cached chunk address for the update (not serialized, for internal use)
binaryData []byte // cached serialized data (does not get serialized again!, for efficiency/internal use) binaryData []byte // cached serialized data (does not get serialized again!, for efficiency/internal use)
} }
// updateRequestJSON represents a JSON-serialized UpdateRequest // updateRequestJSON represents a JSON-serialized UpdateRequest
@ -44,11 +44,11 @@ type updateRequestJSON struct {
} }
// Request layout // Request layout
// resourceUpdate bytes // Update bytes
// SignatureLength bytes // SignatureLength bytes
const minimumSignedUpdateLength = minimumUpdateDataLength + signatureLength const minimumSignedUpdateLength = minimumUpdateDataLength + signatureLength
// NewFirstRequest returns a ready to sign request to publish a first update // NewFirstRequest returns a ready to sign request to publish a first feed update
func NewFirstRequest(topic Topic) *Request { func NewFirstRequest(topic Topic) *Request {
request := new(Request) request := new(Request)
@ -56,7 +56,7 @@ func NewFirstRequest(topic Topic) *Request {
// get the current time // get the current time
now := TimestampProvider.Now().Time now := TimestampProvider.Now().Time
request.Epoch = lookup.GetFirstEpoch(now) request.Epoch = lookup.GetFirstEpoch(now)
request.View.Topic = topic request.Feed.Topic = topic
request.Header.Version = ProtocolVersion request.Header.Version = ProtocolVersion
return request return request
@ -88,7 +88,7 @@ func (r *Request) Verify() (err error) {
} }
// get the address of the signer (which also checks that it's a valid signature) // get the address of the signer (which also checks that it's a valid signature)
r.View.User, err = getUserAddr(digest, *r.Signature) r.Feed.User, err = getUserAddr(digest, *r.Signature)
if err != nil { if err != nil {
return err return err
} }
@ -105,7 +105,7 @@ func (r *Request) Verify() (err error) {
// Sign executes the signature to validate the resource // Sign executes the signature to validate the resource
func (r *Request) Sign(signer Signer) error { func (r *Request) Sign(signer Signer) error {
r.View.User = signer.Address() r.Feed.User = signer.Address()
r.binaryData = nil //invalidate serialized data r.binaryData = nil //invalidate serialized data
digest, err := r.GetDigest() // computes digest and serializes into .binaryData digest, err := r.GetDigest() // computes digest and serializes into .binaryData
if err != nil { if err != nil {
@ -139,10 +139,10 @@ func (r *Request) GetDigest() (result common.Hash, err error) {
hasher := hashPool.Get().(hash.Hash) hasher := hashPool.Get().(hash.Hash)
defer hashPool.Put(hasher) defer hashPool.Put(hasher)
hasher.Reset() hasher.Reset()
dataLength := r.ResourceUpdate.binaryLength() dataLength := r.Update.binaryLength()
if r.binaryData == nil { if r.binaryData == nil {
r.binaryData = make([]byte, dataLength+signatureLength) r.binaryData = make([]byte, dataLength+signatureLength)
if err := r.ResourceUpdate.binaryPut(r.binaryData[:dataLength]); err != nil { if err := r.Update.binaryPut(r.binaryData[:dataLength]); err != nil {
return result, err return result, err
} }
} }
@ -161,10 +161,10 @@ func (r *Request) toChunk() (storage.Chunk, error) {
return nil, NewError(ErrInvalidSignature, "toChunk called without a valid signature or payload data. Call .Sign() first.") return nil, NewError(ErrInvalidSignature, "toChunk called without a valid signature or payload data. Call .Sign() first.")
} }
resourceUpdateLength := r.ResourceUpdate.binaryLength() updateLength := r.Update.binaryLength()
// signature is the last item in the chunk data // signature is the last item in the chunk data
copy(r.binaryData[resourceUpdateLength:], r.Signature[:]) copy(r.binaryData[updateLength:], r.Signature[:])
chunk := storage.NewChunk(r.idAddr, r.binaryData) chunk := storage.NewChunk(r.idAddr, r.binaryData)
return chunk, nil return chunk, nil
@ -175,13 +175,13 @@ func (r *Request) fromChunk(updateAddr storage.Address, chunkdata []byte) error
// for update chunk layout see Request definition // for update chunk layout see Request definition
//deserialize the resource update portion //deserialize the resource update portion
if err := r.ResourceUpdate.binaryGet(chunkdata[:len(chunkdata)-signatureLength]); err != nil { if err := r.Update.binaryGet(chunkdata[:len(chunkdata)-signatureLength]); err != nil {
return err return err
} }
// Extract the signature // Extract the signature
var signature *Signature var signature *Signature
cursor := r.ResourceUpdate.binaryLength() cursor := r.Update.binaryLength()
sigdata := chunkdata[cursor : cursor+signatureLength] sigdata := chunkdata[cursor : cursor+signatureLength]
if len(sigdata) > 0 { if len(sigdata) > 0 {
signature = &Signature{} signature = &Signature{}
@ -209,7 +209,7 @@ func (r *Request) FromValues(values Values, data []byte) error {
r.Signature = new(Signature) r.Signature = new(Signature)
copy(r.Signature[:], signatureBytes) copy(r.Signature[:], signatureBytes)
} }
err = r.ResourceUpdate.FromValues(values, data) err = r.Update.FromValues(values, data)
if err != nil { if err != nil {
return err return err
} }
@ -223,7 +223,7 @@ func (r *Request) AppendValues(values Values) []byte {
if r.Signature != nil { if r.Signature != nil {
values.Set("signature", hexutil.Encode(r.Signature[:])) values.Set("signature", hexutil.Encode(r.Signature[:]))
} }
return r.ResourceUpdate.AppendValues(values) return r.Update.AppendValues(values)
} }
// fromJSON takes an update request JSON and populates an UpdateRequest // fromJSON takes an update request JSON and populates an UpdateRequest

View File

@ -53,25 +53,25 @@ func TestEncodingDecodingUpdateRequests(t *testing.T) {
charlie := newCharlieSigner() //Charlie charlie := newCharlieSigner() //Charlie
bob := newBobSigner() //Bob bob := newBobSigner() //Bob
// Create a resource to our good guy Charlie's name // Create a feed to our good guy Charlie's name
topic, _ := NewTopic("a good resource name", nil) topic, _ := NewTopic("a good topic name", nil)
createRequest := NewFirstRequest(topic) firstRequest := NewFirstRequest(topic)
createRequest.User = charlie.Address() firstRequest.User = charlie.Address()
// We now encode the create message to simulate we send it over the wire // We now encode the create message to simulate we send it over the wire
messageRawData, err := createRequest.MarshalJSON() messageRawData, err := firstRequest.MarshalJSON()
if err != nil { if err != nil {
t.Fatalf("Error encoding create resource request: %s", err) t.Fatalf("Error encoding first feed update request: %s", err)
} }
// ... the message arrives and is decoded... // ... the message arrives and is decoded...
var recoveredCreateRequest Request var recoveredFirstRequest Request
if err := recoveredCreateRequest.UnmarshalJSON(messageRawData); err != nil { if err := recoveredFirstRequest.UnmarshalJSON(messageRawData); err != nil {
t.Fatalf("Error decoding create resource request: %s", err) t.Fatalf("Error decoding first feed update request: %s", err)
} }
// ... but verification should fail because it is not signed! // ... but verification should fail because it is not signed!
if err := recoveredCreateRequest.Verify(); err == nil { if err := recoveredFirstRequest.Verify(); err == nil {
t.Fatal("Expected Verify to fail since the message is not signed") t.Fatal("Expected Verify to fail since the message is not signed")
} }
@ -85,13 +85,13 @@ func TestEncodingDecodingUpdateRequests(t *testing.T) {
//Put together an unsigned update request that we will serialize to send it to the signer. //Put together an unsigned update request that we will serialize to send it to the signer.
data := []byte("This hour's update: Swarm 99.0 has been released!") data := []byte("This hour's update: Swarm 99.0 has been released!")
request := &Request{ request := &Request{
ResourceUpdate: ResourceUpdate{ Update: Update{
ID: ID{ ID: ID{
Epoch: lookup.Epoch{ Epoch: lookup.Epoch{
Time: 1000, Time: 1000,
Level: 1, Level: 1,
}, },
View: createRequest.ResourceUpdate.View, Feed: firstRequest.Update.Feed,
}, },
data: data, data: data,
}, },
@ -191,7 +191,7 @@ func TestEncodingDecodingUpdateRequests(t *testing.T) {
func getTestRequest() *Request { func getTestRequest() *Request {
return &Request{ return &Request{
ResourceUpdate: *getTestResourceUpdate(), Update: *getTestFeedUpdate(),
} }
} }
@ -258,7 +258,7 @@ func TestReverse(t *testing.T) {
defer teardownTest() defer teardownTest()
topic, _ := NewTopic("Cervantes quotes", nil) topic, _ := NewTopic("Cervantes quotes", nil)
view := View{ view := Feed{
Topic: topic, Topic: topic,
User: signer.Address(), User: signer.Address(),
} }
@ -266,7 +266,7 @@ func TestReverse(t *testing.T) {
data := []byte("Donde una puerta se cierra, otra se abre") data := []byte("Donde una puerta se cierra, otra se abre")
request := new(Request) request := new(Request)
request.View = view request.Feed = view
request.Epoch = epoch request.Epoch = epoch
request.data = data request.data = data
@ -291,15 +291,15 @@ func TestReverse(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
recoveredaddress, err := getUserAddr(checkdigest, *checkUpdate.Signature) recoveredAddr, err := getUserAddr(checkdigest, *checkUpdate.Signature)
if err != nil { if err != nil {
t.Fatalf("Retrieve address from signature fail: %v", err) t.Fatalf("Retrieve address from signature fail: %v", err)
} }
originaladdress := crypto.PubkeyToAddress(signer.PrivKey.PublicKey) originalAddr := crypto.PubkeyToAddress(signer.PrivKey.PublicKey)
// check that the metadata retrieved from the chunk matches what we gave it // check that the metadata retrieved from the chunk matches what we gave it
if recoveredaddress != originaladdress { if recoveredAddr != originalAddr {
t.Fatalf("addresses dont match: %x != %x", originaladdress, recoveredaddress) t.Fatalf("addresses dont match: %x != %x", originalAddr, recoveredAddr)
} }
if !bytes.Equal(key[:], chunk.Address()[:]) { if !bytes.Equal(key[:], chunk.Address()[:]) {

View File

@ -27,7 +27,7 @@ import (
) )
const ( const (
testDbDirName = "mru" testDbDirName = "feeds"
) )
type TestHandler struct { type TestHandler struct {
@ -52,20 +52,20 @@ func newFakeNetFetcher(context.Context, storage.Address, *sync.Map) storage.NetF
// NewTestHandler creates Handler object to be used for testing purposes. // NewTestHandler creates Handler object to be used for testing purposes.
func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error) { func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error) {
path := filepath.Join(datadir, testDbDirName) path := filepath.Join(datadir, testDbDirName)
rh := NewHandler(params) fh := NewHandler(params)
localstoreparams := storage.NewDefaultLocalStoreParams() localstoreparams := storage.NewDefaultLocalStoreParams()
localstoreparams.Init(path) localstoreparams.Init(path)
localStore, err := storage.NewLocalStore(localstoreparams, nil) localStore, err := storage.NewLocalStore(localstoreparams, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("localstore create fail, path %s: %v", path, err) return nil, fmt.Errorf("localstore create fail, path %s: %v", path, err)
} }
localStore.Validators = append(localStore.Validators, storage.NewContentAddressValidator(storage.MakeHashFunc(resourceHashAlgorithm))) localStore.Validators = append(localStore.Validators, storage.NewContentAddressValidator(storage.MakeHashFunc(feedsHashAlgorithm)))
localStore.Validators = append(localStore.Validators, rh) localStore.Validators = append(localStore.Validators, fh)
netStore, err := storage.NewNetStore(localStore, nil) netStore, err := storage.NewNetStore(localStore, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
netStore.NewNetFetcherFunc = newFakeNetFetcher netStore.NewNetFetcherFunc = newFakeNetFetcher
rh.SetStore(netStore) fh.SetStore(netStore)
return &TestHandler{rh}, nil return &TestHandler{fh}, nil
} }

View File

@ -29,7 +29,7 @@ import (
// TopicLength establishes the max length of a topic string // TopicLength establishes the max length of a topic string
const TopicLength = storage.AddressLength const TopicLength = storage.AddressLength
// Topic represents what a resource talks about // Topic represents what a feed is about
type Topic [TopicLength]byte type Topic [TopicLength]byte
// ErrTopicTooLong is returned when creating a topic with a name/related content too long // ErrTopicTooLong is returned when creating a topic with a name/related content too long

View File

@ -34,8 +34,8 @@ type Header struct {
Padding [headerLength - 1]uint8 // reserved for future use Padding [headerLength - 1]uint8 // reserved for future use
} }
// ResourceUpdate encapsulates the information sent as part of a resource update // Update encapsulates the information sent as part of a feed update
type ResourceUpdate struct { type Update struct {
Header Header // Header Header //
ID // Resource update identifying information ID // Resource update identifying information
data []byte // actual data payload data []byte // actual data payload
@ -44,15 +44,15 @@ type ResourceUpdate struct {
const minimumUpdateDataLength = idLength + headerLength + 1 const minimumUpdateDataLength = idLength + headerLength + 1
const maxUpdateDataLength = chunk.DefaultSize - signatureLength - idLength - headerLength const maxUpdateDataLength = chunk.DefaultSize - signatureLength - idLength - headerLength
// binaryPut serializes the resource update information into the given slice // binaryPut serializes the feed update information into the given slice
func (r *ResourceUpdate) binaryPut(serializedData []byte) error { func (r *Update) binaryPut(serializedData []byte) error {
datalength := len(r.data) datalength := len(r.data)
if datalength == 0 { if datalength == 0 {
return NewError(ErrInvalidValue, "cannot update a resource with no data") return NewError(ErrInvalidValue, "a feed update must contain data")
} }
if datalength > maxUpdateDataLength { if datalength > maxUpdateDataLength {
return NewErrorf(ErrInvalidValue, "data is too big (length=%d). Max length=%d", datalength, maxUpdateDataLength) return NewErrorf(ErrInvalidValue, "feed update data is too big (length=%d). Max length=%d", datalength, maxUpdateDataLength)
} }
if len(serializedData) != r.binaryLength() { if len(serializedData) != r.binaryLength() {
@ -79,12 +79,12 @@ func (r *ResourceUpdate) binaryPut(serializedData []byte) error {
} }
// binaryLength returns the expected number of bytes this structure will take to encode // binaryLength returns the expected number of bytes this structure will take to encode
func (r *ResourceUpdate) binaryLength() int { func (r *Update) binaryLength() int {
return idLength + headerLength + len(r.data) return idLength + headerLength + len(r.data)
} }
// binaryGet populates this instance from the information contained in the passed byte slice // binaryGet populates this instance from the information contained in the passed byte slice
func (r *ResourceUpdate) binaryGet(serializedData []byte) error { func (r *Update) binaryGet(serializedData []byte) error {
if len(serializedData) < minimumUpdateDataLength { if len(serializedData) < minimumUpdateDataLength {
return NewErrorf(ErrNothingToReturn, "chunk less than %d bytes cannot be a resource update chunk", minimumUpdateDataLength) return NewErrorf(ErrNothingToReturn, "chunk less than %d bytes cannot be a resource update chunk", minimumUpdateDataLength)
} }
@ -116,7 +116,7 @@ func (r *ResourceUpdate) binaryGet(serializedData []byte) error {
// FromValues deserializes this instance from a string key-value store // FromValues deserializes this instance from a string key-value store
// useful to parse query strings // useful to parse query strings
func (r *ResourceUpdate) FromValues(values Values, data []byte) error { func (r *Update) FromValues(values Values, data []byte) error {
r.data = data r.data = data
version, _ := strconv.ParseUint(values.Get("protocolVersion"), 10, 32) version, _ := strconv.ParseUint(values.Get("protocolVersion"), 10, 32)
r.Header.Version = uint8(version) r.Header.Version = uint8(version)
@ -125,7 +125,7 @@ func (r *ResourceUpdate) FromValues(values Values, data []byte) error {
// AppendValues serializes this structure into the provided string key-value store // AppendValues serializes this structure into the provided string key-value store
// useful to build query strings // useful to build query strings
func (r *ResourceUpdate) AppendValues(values Values) []byte { func (r *Update) AppendValues(values Values) []byte {
r.ID.AppendValues(values) r.ID.AppendValues(values)
values.Set("protocolVersion", fmt.Sprintf("%d", r.Header.Version)) values.Set("protocolVersion", fmt.Sprintf("%d", r.Header.Version))
return r.data return r.data

View File

@ -20,31 +20,31 @@ import (
"testing" "testing"
) )
func getTestResourceUpdate() *ResourceUpdate { func getTestFeedUpdate() *Update {
return &ResourceUpdate{ return &Update{
ID: *getTestID(), ID: *getTestID(),
data: []byte("El que lee mucho y anda mucho, ve mucho y sabe mucho"), data: []byte("El que lee mucho y anda mucho, ve mucho y sabe mucho"),
} }
} }
func TestResourceUpdateSerializer(t *testing.T) { func TestUpdateSerializer(t *testing.T) {
testBinarySerializerRecovery(t, getTestResourceUpdate(), "0x0000000000000000776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce803000000000019456c20717565206c6565206d7563686f207920616e6461206d7563686f2c207665206d7563686f20792073616265206d7563686f") testBinarySerializerRecovery(t, getTestFeedUpdate(), "0x0000000000000000776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781ce803000000000019456c20717565206c6565206d7563686f207920616e6461206d7563686f2c207665206d7563686f20792073616265206d7563686f")
} }
func TestResourceUpdateLengthCheck(t *testing.T) { func TestUpdateLengthCheck(t *testing.T) {
testBinarySerializerLengthCheck(t, getTestResourceUpdate()) testBinarySerializerLengthCheck(t, getTestFeedUpdate())
// Test fail if update is too big // Test fail if update is too big
update := getTestResourceUpdate() update := getTestFeedUpdate()
update.data = make([]byte, maxUpdateDataLength+100) update.data = make([]byte, maxUpdateDataLength+100)
serialized := make([]byte, update.binaryLength()) serialized := make([]byte, update.binaryLength())
if err := update.binaryPut(serialized); err == nil { if err := update.binaryPut(serialized); err == nil {
t.Fatal("Expected resourceUpdate.binaryPut to fail since update is too big") t.Fatal("Expected update.binaryPut to fail since update is too big")
} }
// test fail if data is empty or nil // test fail if data is empty or nil
update.data = nil update.data = nil
serialized = make([]byte, update.binaryLength()) serialized = make([]byte, update.binaryLength())
if err := update.binaryPut(serialized); err == nil { if err := update.binaryPut(serialized); err == nil {
t.Fatal("Expected resourceUpdate.binaryPut to fail since data is empty") t.Fatal("Expected update.binaryPut to fail since data is empty")
} }
} }

View File

@ -25,8 +25,8 @@ import (
"github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage"
) )
// View represents a particular user's view of a resource // Feed represents a particular user's view of a resource
type View struct { type Feed struct {
Topic Topic `json:"topic"` Topic Topic `json:"topic"`
User common.Address `json:"user"` User common.Address `json:"user"`
} }
@ -34,11 +34,11 @@ type View struct {
// View layout: // View layout:
// TopicLength bytes // TopicLength bytes
// userAddr common.AddressLength bytes // userAddr common.AddressLength bytes
const viewLength = TopicLength + common.AddressLength const feedLength = TopicLength + common.AddressLength
// mapKey calculates a unique id for this view for the cache map in `Handler` // mapKey calculates a unique id for this feed. Used by the cache map in `Handler`
func (u *View) mapKey() uint64 { func (u *Feed) mapKey() uint64 {
serializedData := make([]byte, viewLength) serializedData := make([]byte, feedLength)
u.binaryPut(serializedData) u.binaryPut(serializedData)
hasher := hashPool.Get().(hash.Hash) hasher := hashPool.Get().(hash.Hash)
defer hashPool.Put(hasher) defer hashPool.Put(hasher)
@ -48,10 +48,10 @@ func (u *View) mapKey() uint64 {
return *(*uint64)(unsafe.Pointer(&hash[0])) return *(*uint64)(unsafe.Pointer(&hash[0]))
} }
// binaryPut serializes this View instance into the provided slice // binaryPut serializes this Feed instance into the provided slice
func (u *View) binaryPut(serializedData []byte) error { func (u *Feed) binaryPut(serializedData []byte) error {
if len(serializedData) != viewLength { if len(serializedData) != feedLength {
return NewErrorf(ErrInvalidValue, "Incorrect slice size to serialize View. Expected %d, got %d", viewLength, len(serializedData)) return NewErrorf(ErrInvalidValue, "Incorrect slice size to serialize View. Expected %d, got %d", feedLength, len(serializedData))
} }
var cursor int var cursor int
copy(serializedData[cursor:cursor+TopicLength], u.Topic[:TopicLength]) copy(serializedData[cursor:cursor+TopicLength], u.Topic[:TopicLength])
@ -64,14 +64,14 @@ func (u *View) binaryPut(serializedData []byte) error {
} }
// binaryLength returns the expected size of this structure when serialized // binaryLength returns the expected size of this structure when serialized
func (u *View) binaryLength() int { func (u *Feed) binaryLength() int {
return viewLength return feedLength
} }
// binaryGet restores the current instance from the information contained in the passed slice // binaryGet restores the current instance from the information contained in the passed slice
func (u *View) binaryGet(serializedData []byte) error { func (u *Feed) binaryGet(serializedData []byte) error {
if len(serializedData) != viewLength { if len(serializedData) != feedLength {
return NewErrorf(ErrInvalidValue, "Incorrect slice size to read View. Expected %d, got %d", viewLength, len(serializedData)) return NewErrorf(ErrInvalidValue, "Incorrect slice size to read Feed. Expected %d, got %d", feedLength, len(serializedData))
} }
var cursor int var cursor int
@ -84,16 +84,16 @@ func (u *View) binaryGet(serializedData []byte) error {
return nil return nil
} }
// Hex serializes the View to a hex string // Hex serializes the Feed to a hex string
func (u *View) Hex() string { func (u *Feed) Hex() string {
serializedData := make([]byte, viewLength) serializedData := make([]byte, feedLength)
u.binaryPut(serializedData) u.binaryPut(serializedData)
return hexutil.Encode(serializedData) return hexutil.Encode(serializedData)
} }
// FromValues deserializes this instance from a string key-value store // FromValues deserializes this instance from a string key-value store
// useful to parse query strings // useful to parse query strings
func (u *View) FromValues(values Values) (err error) { func (u *Feed) FromValues(values Values) (err error) {
topic := values.Get("topic") topic := values.Get("topic")
if topic != "" { if topic != "" {
if err := u.Topic.FromHex(values.Get("topic")); err != nil { if err := u.Topic.FromHex(values.Get("topic")); err != nil {
@ -119,7 +119,7 @@ func (u *View) FromValues(values Values) (err error) {
// AppendValues serializes this structure into the provided string key-value store // AppendValues serializes this structure into the provided string key-value store
// useful to build query strings // useful to build query strings
func (u *View) AppendValues(values Values) { func (u *Feed) AppendValues(values Values) {
values.Set("topic", u.Topic.Hex()) values.Set("topic", u.Topic.Hex())
values.Set("user", u.User.Hex()) values.Set("user", u.User.Hex())
} }

View File

@ -19,18 +19,18 @@ import (
"testing" "testing"
) )
func getTestView() *View { func getTestFeed() *Feed {
topic, _ := NewTopic("world news report, every hour", nil) topic, _ := NewTopic("world news report, every hour", nil)
return &View{ return &Feed{
Topic: topic, Topic: topic,
User: newCharlieSigner().Address(), User: newCharlieSigner().Address(),
} }
} }
func TestViewSerializerDeserializer(t *testing.T) { func TestFeedSerializerDeserializer(t *testing.T) {
testBinarySerializerRecovery(t, getTestView(), "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781c") testBinarySerializerRecovery(t, getTestFeed(), "0x776f726c64206e657773207265706f72742c20657665727920686f7572000000876a8936a7cd0b79ef0735ad0896c1afe278781c")
} }
func TestMetadataSerializerLengthCheck(t *testing.T) { func TestFeedSerializerLengthCheck(t *testing.T) {
testBinarySerializerLengthCheck(t, getTestView()) testBinarySerializerLengthCheck(t, getTestFeed())
} }