From b1b55080eb3b5d0f5256ec406ce559b998f5731c Mon Sep 17 00:00:00 2001 From: Aditya Dani Date: Fri, 14 Oct 2016 19:05:03 +0100 Subject: [PATCH] Backward Compatibility Support for Gossip. - Modify the StoreMap from map[string]interface{} to map[string][]byte - A node only decodes those entries in the store map whose keys it knows - Keys are registered with a gossip node using the RegisterKey api. - In the new format gossip data is prefixed with a header pattern to identify the type of gossip data. - Current gossip handles both kinds of data - with an without header. --- api.go | 4 + proto/gossip_delegates.go | 104 +++++++++++++------ proto/gossip_store.go | 108 +++++++++++++++++++- proto/gossip_store_test.go | 201 ++++++++++++++++++++++++++++++++++++- proto/gossip_test.go | 108 ++++++++++++++++++++ types/types.go | 54 ++++++++-- 6 files changed, 537 insertions(+), 42 deletions(-) diff --git a/api.go b/api.go index 6565926..b029200 100644 --- a/api.go +++ b/api.go @@ -52,6 +52,10 @@ type GossipStore interface { // Remove a node from the database RemoveNode(types.NodeId) error + + // RegisterKey registers a key and an empty object which it can + // use to decode data received on wire + RegisterKey(types.StoreKey, interface{}) error } type Gossiper interface { diff --git a/proto/gossip_delegates.go b/proto/gossip_delegates.go index c32abd8..78aa30e 100644 --- a/proto/gossip_delegates.go +++ b/proto/gossip_delegates.go @@ -2,7 +2,6 @@ package proto import ( "bytes" - "encoding/gob" "encoding/json" "fmt" "sync" @@ -31,6 +30,8 @@ type GossipDelegate struct { quorumTimeout time.Duration timeoutVersion uint64 timeoutVersionLock sync.Mutex + // userNewHeaderFormat only used for testing backward compatibility + useNewHeaderFormat bool } func (gd *GossipDelegate) InitGossipDelegate( @@ -42,6 +43,12 @@ func (gd *GossipDelegate) InitGossipDelegate( ) { gd.GenNumber = genNumber gd.nodeId = string(selfNodeId) + // ** Used only for testing backward compatibility + // TODO: Remove! + if gossipVersion == types.GOSSIP_TEST_VERSION { + gossipVersion = types.DEFAULT_GOSSIP_VERSION + gd.useNewHeaderFormat = true + } gd.stateEvent = make(chan types.StateEvent) // We start with a NOT_IN_QUORUM status gd.InitStore( @@ -68,24 +75,20 @@ func (gd *GossipDelegate) updateGossipTs() { gd.lastGossipTs = time.Now() } -func (gd *GossipDelegate) convertToBytes(obj interface{}) ([]byte, error) { - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - err := enc.Encode(obj) - if err != nil { - return []byte{}, err - } - return buf.Bytes(), nil +func (gd *GossipDelegate) decodeGossipPacket(buf []byte, msg interface{}) error { + // Strip off the header + payload := buf[types.GOSSIP_HEADER_LENGTH:] + return gd.convertFromBytes(payload, msg) } -func (gd *GossipDelegate) convertFromBytes(buf []byte, msg interface{}) error { - msgBuffer := bytes.NewBuffer(buf) - dec := gob.NewDecoder(msgBuffer) - err := dec.Decode(msg) +func (gd *GossipDelegate) encodeGossipPacket(obj interface{}) ([]byte, error) { + payload, err := gd.convertToBytes(obj) if err != nil { - return err + return nil, err } - return nil + header := make([]byte, types.GOSSIP_HEADER_LENGTH) + header[types.GH_VERSION_POS] = byte(types.GH_VERSION_1) + return append(header, payload...), nil } func (gd *GossipDelegate) gossipChecks(node *memberlist.Node) error { @@ -121,7 +124,6 @@ func (gd *GossipDelegate) gossipChecks(node *memberlist.Node) error { return err } - // NodeMeta is used to retrieve meta-data about the current node // when broadcasting an alive message. It's length is limited to // the given byte size. This metadata is available in the Node structure. @@ -163,21 +165,42 @@ func (gd *GossipDelegate) GetBroadcasts(overhead, limit int) [][]byte { // data can be sent here. See MergeRemoteState as well. The `join` // boolean indicates this is for a join instead of a push/pull. func (gd *GossipDelegate) LocalState(join bool) []byte { + var ( + err error + byteLocalState []byte + ) + gd.updateSelfTs() // We don't know which node we are talking to. gs := NewGossipSessionInfo("", types.GD_ME_TO_PEER) gs.Op = types.LocalPush - // We send our local state of nodeMap - // The receiver will decide which nodes to merge and which to ignore - localState := gd.GetLocalState() - byteLocalState, err := gd.convertToBytes(&localState) - if err != nil { - gs.Err = fmt.Sprintf("gossip: Error in LocalState. Unable to unmarshal: %v", err.Error()) - logrus.Infof(gs.Err) - byteLocalState = []byte{} + // This flag added only for testing backward compatibility + // FUTURE USE: We will always use the new header pattern + // Sending and Receiving NodeInfoMap over the wire will be deprecated + if gd.useNewHeaderFormat { + // We send our local state of nodeMap + // The receiver will decide which nodes to merge and which to ignore + localState := gd.GetGossipData() + byteLocalState, err = gd.encodeGossipPacket(&localState) + if err != nil { + gs.Err = fmt.Sprintf("gossip: Error in LocalState. Unable to unmarshal: %v", err.Error()) + logrus.Infof(gs.Err) + byteLocalState = []byte{} + } + } else { + // We send our local state of nodeMap + // The receiver will decide which nodes to merge and which to ignore + localState := gd.GetLocalState() + byteLocalState, err = gd.convertToBytes(&localState) + if err != nil { + gs.Err = fmt.Sprintf("gossip: Error in LocalState. Unable to unmarshal: %v", err.Error()) + logrus.Infof(gs.Err) + byteLocalState = []byte{} + } } + gs.Err = "" gd.updateGossipTs() gd.history.AddLatest(gs) @@ -190,6 +213,7 @@ func (gd *GossipDelegate) LocalState(join bool) []byte { // boolean indicates this is for a join instead of a push/pull. func (gd *GossipDelegate) MergeRemoteState(buf []byte, join bool) { var remoteState types.NodeInfoMap + var remoteGossipData types.GossipDataMap if join == true { // NotifyJoin will take care of this info return @@ -197,13 +221,23 @@ func (gd *GossipDelegate) MergeRemoteState(buf []byte, join bool) { gd.updateSelfTs() gs := NewGossipSessionInfo("", types.GD_PEER_TO_ME) - err := gd.convertFromBytes(buf, &remoteState) - if err != nil { - gs.Err = fmt.Sprintf("gossip: Error in unmarshalling peer's local data. Error : %v", err.Error()) - logrus.Infof(gs.Err) + version := gd.handleGossipHeader(buf) + // This flag added only for testing backward compatibility + // FUTURE USE: We will always use the new header pattern + // Sending and Receiving NodeInfoMap over the wire will be deprecated + if version == types.GH_VERSION_BASE { + err := gd.convertFromBytes(buf, &remoteState) + if err != nil { + gs.Err = fmt.Sprintf("gossip: Error in unmarshalling peer's local data. Error : %v", err.Error()) + } + gd.Update(remoteState) + } else { + err := gd.decodeGossipPacket(buf, &remoteGossipData) + if err != nil { + gs.Err = fmt.Sprintf("gossip: Error in unmarshalling peer's local data. Error : %v", err.Error()) + } + gd.UpdateGossipData(remoteGossipData) } - - gd.Update(remoteState) gs.Op = types.MergeRemote gs.Err = "" gd.updateGossipTs() @@ -361,3 +395,13 @@ func (gd *GossipDelegate) handleStateEvents() { gd.UpdateSelfStatus(gd.currentState.NodeStatus()) } } + +func (gd *GossipDelegate) handleGossipHeader(packet []byte) types.GossipHeaderVersion { + headerV1 := make([]byte, types.GOSSIP_HEADER_LENGTH) + headerV1[types.GH_VERSION_POS] = byte(types.GH_VERSION_1) + if bytes.HasPrefix(packet, headerV1) { + return types.GH_VERSION_1 + } else { + return types.GH_VERSION_BASE + } +} diff --git a/proto/gossip_store.go b/proto/gossip_store.go index 4d550a4..fd86fd9 100644 --- a/proto/gossip_store.go +++ b/proto/gossip_store.go @@ -1,11 +1,12 @@ package proto import ( + "bytes" + "encoding/gob" "fmt" + "github.com/libopenstorage/gossip/types" "sync" "time" - - "github.com/libopenstorage/gossip/types" ) const ( @@ -28,6 +29,9 @@ type GossipStoreImpl struct { clusterSize int // Ts at which we lost quorum lostQuorumTs time.Time + // Map of registered keys and their repective types + // We only decode those keys which are registered with us. + typeMap types.TypeMap } func NewGossipStore(id types.NodeId, version, clusterId string) *GossipStoreImpl { @@ -59,6 +63,7 @@ func (s *GossipStoreImpl) InitStore( clusterId string, ) { s.nodeMap = make(types.NodeInfoMap) + s.typeMap = make(types.TypeMap) s.id = id s.selfCorrect = true s.GossipVersion = version @@ -86,6 +91,7 @@ func (s *GossipStoreImpl) UpdateSelf(key types.StoreKey, val interface{}) { s.Lock() defer s.Unlock() + s.RegisterKey(key, val) nodeInfo, _ := s.nodeMap[s.id] nodeInfo.Value[key] = val nodeInfo.LastUpdateTs = time.Now() @@ -223,7 +229,7 @@ func (s *GossipStoreImpl) MetaInfo() types.NodeMetaInfo { LastUpdateTs: selfNodeInfo.LastUpdateTs, GenNumber: selfNodeInfo.GenNumber, GossipVersion: s.GossipVersion, - ClusterId: s.ClusterId, + ClusterId: s.ClusterId, } return nodeMetaInfo } @@ -238,6 +244,28 @@ func (s *GossipStoreImpl) GetLocalState() types.NodeInfoMap { return localCopy } +func (s *GossipStoreImpl) GetGossipData() types.GossipDataMap { + s.Lock() + defer s.Unlock() + localCopy := make(types.GossipDataMap) + for id, nodeInfo := range s.nodeMap { + gossipData := types.GossipData{ + Id: nodeInfo.Id, + GenNumber: nodeInfo.GenNumber, + LastUpdateTs: nodeInfo.LastUpdateTs, + WaitForGenUpdateTs: nodeInfo.WaitForGenUpdateTs, + Status: nodeInfo.Status, + } + gossipData.Value = make(map[string][]byte) + for key, val := range nodeInfo.Value { + bt, _ := s.convertToBytes(&val) + gossipData.Value[string(key)] = bt + } + localCopy[id] = gossipData + } + return localCopy +} + func (s *GossipStoreImpl) GetLocalNodeInfo(id types.NodeId) (types.NodeInfo, error) { s.Lock() defer s.Unlock() @@ -274,6 +302,60 @@ func (s *GossipStoreImpl) Update(diff types.NodeInfoMap) { } } +func (s *GossipStoreImpl) UpdateGossipData(diff types.GossipDataMap) { + s.Lock() + defer s.Unlock() + + for id, nodeGossipData := range diff { + if id == s.id { + continue + } + selfValue, ok := s.nodeMap[id] + if !ok { + // We got an update for a node which we do not have in our map + // Lets add it with an offline state + selfValue.Status = types.NODE_STATUS_DOWN + } + if !ok || !statusValid(selfValue.Status) || + selfValue.LastUpdateTs.Before(nodeGossipData.LastUpdateTs) { + // Our view of Status of a Node, should only be determined by memberlist. + // We should not update the Status field in our nodeInfo based on what other node's + // value is. + nodeGossipData.Status = selfValue.Status + nodeInfo := types.NodeInfo{ + Id: nodeGossipData.Id, + GenNumber: nodeGossipData.GenNumber, + LastUpdateTs: nodeGossipData.LastUpdateTs, + WaitForGenUpdateTs: nodeGossipData.WaitForGenUpdateTs, + Status: nodeGossipData.Status, + } + storeMap := make(types.StoreMap) + for key, value := range nodeGossipData.Value { + typeVal, ok := s.typeMap[types.StoreKey(key)] + if !ok { + // Unknown store key ignoring.. + continue + } + intf := typeVal + s.convertFromBytes(value, &intf) + storeMap[types.StoreKey(key)] = intf + } + nodeInfo.Value = storeMap + s.nodeMap[id] = nodeInfo + } + } +} + +func (s *GossipStoreImpl) RegisterKey(key types.StoreKey, value interface{}) error { + if _, ok := s.typeMap[key]; ok { + // Key already exists + return fmt.Errorf("Key already exists") + } + gob.Register(value) + s.typeMap[key] = value + return nil +} + func (s *GossipStoreImpl) updateCluster(peers map[types.NodeId]string) { removeNodeIds := []types.NodeId{} addNodeIds := []types.NodeId{} @@ -320,3 +402,23 @@ func (s *GossipStoreImpl) updateCluster(peers map[types.NodeId]string) { func (s *GossipStoreImpl) getClusterSize() int { return s.clusterSize } + +func (s *GossipStoreImpl) convertToBytes(obj interface{}) ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(obj) + if err != nil { + return []byte{}, err + } + return buf.Bytes(), nil +} + +func (s *GossipStoreImpl) convertFromBytes(buf []byte, msg interface{}) error { + msgBuffer := bytes.NewBuffer(buf) + dec := gob.NewDecoder(msgBuffer) + err := dec.Decode(msg) + if err != nil { + return err + } + return nil +} diff --git a/proto/gossip_store_test.go b/proto/gossip_store_test.go index 56719fa..2bb65ee 100644 --- a/proto/gossip_store_test.go +++ b/proto/gossip_store_test.go @@ -1,6 +1,8 @@ package proto import ( + "bytes" + "encoding/gob" "fmt" "github.com/libopenstorage/gossip/types" "math/rand" @@ -188,6 +190,29 @@ func compareNodeInfo(n1 types.NodeInfo, n2 types.NodeInfo) bool { return eq } +func compareGossipData(n1 types.NodeInfo, n2 types.NodeInfo) bool { + eq := n1.Id == n2.Id && n2.LastUpdateTs == n1.LastUpdateTs // && + //n1.Status == n2.Status + eq = eq && (n1.Value == nil && n2.Value == nil || + n1.Value != nil && n2.Value != nil) + if eq && n1.Value != nil { + for key, value := range n1.Value { + value2, ok := n2.Value[key] + if value2 == nil { + continue + } + fmt.Printf("%v %v \n", value, value2.(string)) + if !ok { + eq = false + } + if value != value2.(**string) { + eq = false + } + } + } + return eq +} + func dumpNodeInfo(nodeInfoMap types.NodeInfoMap, s string, t *testing.T) { t.Log("\nDUMPING : ", s, " : LEN: ", len(nodeInfoMap)) for _, nodeInfo := range nodeInfoMap { @@ -218,7 +243,7 @@ func verifyNodeInfoMapEquality(store types.NodeInfoMap, diff types.NodeInfoMap, } } -func TestGossipStoreUpdateData(t *testing.T) { +func TestGossipStoreUpdate(t *testing.T) { printTestInfo() g := NewGossipStore(ID, types.DEFAULT_GOSSIP_VERSION, DEFAULT_CLUSTER_ID) @@ -303,6 +328,180 @@ func TestGossipStoreUpdateData(t *testing.T) { } } +func convertGossipDataToNodeInfo(gossipData types.GossipData) types.NodeInfo { + value := make(map[types.StoreKey]interface{}) + for k, v := range gossipData.Value { + // Assumes that in test we only use strings as values + //var ko KeyObj + var ko string + intf := interface{}(ko) + buf := bytes.NewBuffer(v) + dec := gob.NewDecoder(buf) + dec.Decode(&intf) + value[types.StoreKey(k)] = intf + } + return types.NodeInfo{ + Id: gossipData.Id, + Status: gossipData.Status, + LastUpdateTs: gossipData.LastUpdateTs, + Value: value, + } +} + +func fillUpGossipDataMap(data types.GossipDataMap, keyList []string, nodeLen int) { + for i := 0; i < nodeLen; i++ { + gd := types.GossipData{ + Id: types.NodeId(strconv.Itoa(i)), + Status: types.NODE_STATUS_UP, + LastUpdateTs: time.Now(), + } + storeMap := make(map[string][]byte) + gd.Value = storeMap + for i, key := range keyList { + ko := key + strconv.Itoa(i) + intf := interface{}(ko) + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + enc.Encode(&intf) + gd.Value[key] = buf.Bytes() + } + data[gd.Id] = gd + } +} + +func TestGossipStoreUpdateGossipData(t *testing.T) { + printTestInfo() + + g := NewGossipStore(ID, types.DEFAULT_GOSSIP_VERSION, DEFAULT_CLUSTER_ID) + time.Sleep(1 * time.Second) + // empty store and empty diff + diff := types.GossipDataMap{} + g.UpdateGossipData(diff) + if len(g.nodeMap) != 1 { + t.Error("Updating empty store with empty diff gave non-empty store: ", + g.nodeMap) + } + + // empty store and non-emtpy diff + gossipDataDiff := make(types.GossipDataMap) + nodeLen := 5 + keyList := []string{"key1", "key2", "key3", "key4", "key5"} + for i := 0; i < nodeLen; i++ { + k := keyList[i] + g.RegisterKey(types.StoreKey(keyList[i]), k) + } + fillUpGossipDataMap(gossipDataDiff, keyList, nodeLen) + + g.UpdateGossipData(gossipDataDiff) + // Update does not modify the statuses in the node info map. + // Its always memberlist who deals with statuses + // Fake that behavior + for id, _ := range diff { + nodeInfo, _ := g.nodeMap[id] + nodeInfo.Status = types.NODE_STATUS_UP + g.nodeMap[id] = nodeInfo + } + + diffNodeInfoMap := make(types.NodeInfoMap) + for id, gossipData := range gossipDataDiff { + diffNodeInfo := convertGossipDataToNodeInfo(gossipData) + diffNodeInfoMap[id] = diffNodeInfo + } + verifyNodeInfoMapEquality(types.NodeInfoMap(g.nodeMap), diffNodeInfoMap, t) + + for nodeId, nodeInfo := range g.nodeMap { + // id % 4 == 0 : node id is not existing + // id % 4 == 1 : store has old timestamp + // id % 4 == 2 : node id is invalid + // id % 4 == 3 : store has newer data + id, _ := strconv.Atoi(string(nodeId)) + switch { + case id%4 == 0: + delete(g.nodeMap, nodeId) + case id%4 == 1: + olderTime := nodeInfo.LastUpdateTs.UnixNano() - 1000 + nodeInfo.LastUpdateTs = time.Unix(0, olderTime) + case id%4 == 2: + if id > 10 { + nodeInfo.Status = types.NODE_STATUS_INVALID + } else { + nodeInfo.Status = types.NODE_STATUS_NEVER_GOSSIPED + } + case id%4 == 3: + n, _ := gossipDataDiff[nodeId] + olderTime := nodeInfo.LastUpdateTs.UnixNano() - 1000 + n.LastUpdateTs = time.Unix(0, olderTime) + gossipDataDiff[nodeId] = n + } + } + + g.UpdateGossipData(gossipDataDiff) + diffNodeInfoMap = make(types.NodeInfoMap) + for id, gossipData := range gossipDataDiff { + diffNodeInfo := convertGossipDataToNodeInfo(gossipData) + diffNodeInfoMap[id] = diffNodeInfo + } + + for nodeId, nodeInfo := range g.nodeMap { + // id % 4 == 0 : node id is not existing + // id % 4 == 1 : store has old timestamp + // id % 4 == 2 : node id is invalid + // id % 4 == 3 : store has newer data + id, _ := strconv.Atoi(string(nodeId)) + switch { + case id%4 != 3: + n, _ := diffNodeInfoMap[nodeId] + if !compareNodeInfo(n, nodeInfo) { + t.Error("Update failed, d: ", n, " o:", nodeInfo) + } + case id%4 == 3: + n, _ := diffNodeInfoMap[nodeId] + if compareNodeInfo(n, nodeInfo) { + t.Error("Wrongly Updated latest data d: ", n, " o: ", nodeInfo) + } + olderTime := n.LastUpdateTs.UnixNano() + 1000 + ts := time.Unix(0, olderTime) + if ts != nodeInfo.LastUpdateTs { + t.Error("Wrongly Updated latest data d: ", n, " o: ", nodeInfo) + } + } + } +} + +func TestGossipStoreIgnoreUnknownKeys(t *testing.T) { + printTestInfo() + + g := NewGossipStore(ID, types.DEFAULT_GOSSIP_VERSION, DEFAULT_CLUSTER_ID) + time.Sleep(1 * time.Second) + + // empty store and non-emtpy diff + gossipDataDiff := make(types.GossipDataMap) + nodeLen := 5 + keyList := []string{"key1", "key2", "key3", "key4", "key5"} + for i := 0; i < nodeLen-1; i++ { + // Register keys except key5 + str := "" + g.RegisterKey(types.StoreKey(keyList[i]), str) + } + fillUpGossipDataMap(gossipDataDiff, keyList, nodeLen) + + g.UpdateGossipData(gossipDataDiff) + for id, nodeInfo := range g.nodeMap { + gossipData, ok := gossipDataDiff[id] + if !ok { + t.Error("Expected nodeId to be present in nodeMap: ", id) + continue + } + if len(nodeInfo.Value) == len(gossipData.Value) { + t.Error("Expected different store maps in nodeMap") + } + _, ok = nodeInfo.Value["key5"] + if ok { + t.Error("Expected key5 to be ignored from nodeMap") + } + } + +} func TestGossipStoreGetStoreKeys(t *testing.T) { printTestInfo() diff --git a/proto/gossip_test.go b/proto/gossip_test.go index d25e901..12300c5 100644 --- a/proto/gossip_test.go +++ b/proto/gossip_test.go @@ -1021,3 +1021,111 @@ func TestGossiperNodesWithDifferentClusterId(t *testing.T) { } } + +func TestGossiperNodesWithDifferentStoreMaps(t *testing.T) { + printTestInfo() + + nodes := []string{ + "127.0.0.1:9175", + "127.0.0.2:9176", + "127.0.0.3:9177", + "127.0.0.4:9178", + "127.0.0.5:9179", + } + + rand.Seed(time.Now().UnixNano()) + gossipers := make(map[int]*GossiperImpl) + + // Start gossipers for all nodes + // Node 0, 1, 3 := Use old store map format + // Node 2, 4 := Use new store map format + for i, nodeId := range nodes { + id := types.NodeId(strconv.Itoa(i)) + var g *GossiperImpl + if i == 2 || i == 4 { + // Set a different clusterId + g, _ = NewGossiperImpl(nodeId, id, nodes, types.GOSSIP_TEST_VERSION) + } else { + g, _ = NewGossiperImpl(nodeId, id, nodes, types.DEFAULT_GOSSIP_VERSION) + } + gossipers[i] = g + } + + // key1 - Identified by both types of nodes + // key2 - Identified only by new nodes + type Key1 struct { + Id string + Value int + } + type Key2 struct { + Id int + Value string + } + for i, g := range gossipers { + if i == 2 || i == 4 { + k2 := Key2{ + Id: i, + Value: strconv.Itoa(i), + } + g.UpdateSelf(types.StoreKey("key2"), k2) + } + k1 := Key1{ + Id: strconv.Itoa(i), + Value: i, + } + g.UpdateSelf(types.StoreKey("key1"), k1) + } + + // Let the nodes gossip and populate their memberlists + time.Sleep(types.DEFAULT_GOSSIP_INTERVAL * time.Duration(len(nodes))) + + for i, g := range gossipers { + + res := g.GetStoreKeyValue(types.StoreKey("key1")) + if len(res) != len(nodes) { + t.Error("For node ", i, " did not receive gossip update from all nodes.") + continue + } + for id, nodeValue := range res { + if id == g.NodeId() { + continue + } + val := nodeValue.Value.(Key1) + + if types.NodeId(val.Id) != id { + t.Error("For node ", i, " incorrect key1 value for node ", id) + } + idInt, _ := strconv.Atoi(string(id)) + if idInt != val.Value { + t.Error("For node ", i, " incorrect key1 value for node ", id) + } + } + + res2 := g.GetStoreKeyValue(types.StoreKey("key2")) + if i == 2 || i == 4 { + for id, nodeValue := range res2 { + if id == g.NodeId() { + continue + } + val := nodeValue.Value.(Key2) + if types.NodeId(val.Value) != id { + t.Error("For node ", i, " incorrect key2 value for node ", id) + } + idInt, _ := strconv.Atoi(string(id)) + if idInt != val.Id { + t.Error("For node ", i, " incorrect key1 value for node ", id) + } + + } + } else { + if len(res2) != 0 { + t.Error("For node ", i, " did not expect key2 in its store map") + } + } + + } + //time.Sleep(30 * time.Second) + //for _, g := range gossipers { + // g.Stop(types.DEFAULT_GOSSIP_INTERVAL * time.Duration(len(nodes)+1)) + //} +} diff --git a/types/types.go b/types/types.go index 528c379..6aced77 100644 --- a/types/types.go +++ b/types/types.go @@ -5,6 +5,8 @@ import ( "time" ) +// Type Definitions + type NodeId string type StoreKey string type NodeStatus uint8 @@ -12,6 +14,23 @@ type StateEvent uint8 type NodeInfoMap map[NodeId]NodeInfo type NodeValueMap map[NodeId]NodeValue type StoreMap map[StoreKey]interface{} +type GossipHeader [GOSSIP_HEADER_LENGTH]byte +type GossipDataMap map[NodeId]GossipData +type TypeMap map[StoreKey]interface{} + +// Constant Definitions + +const ( + DEFAULT_GOSSIP_INTERVAL time.Duration = 2 * time.Second + DEFAULT_PUSH_PULL_INTERVAL time.Duration = 2 * time.Second + DEFAULT_PROBE_INTERVAL time.Duration = 5 * time.Second + DEFAULT_PROBE_TIMEOUT time.Duration = 200 * time.Millisecond + DEFAULT_QUORUM_TIMEOUT time.Duration = 1 * time.Minute + DEFAULT_GOSSIP_VERSION string = "v1" + // Version used for backward compatibility testing + GOSSIP_TEST_VERSION string = "test" + GOSSIP_HEADER_LENGTH uint8 = 8 +) const ( NODE_STATUS_INVALID NodeStatus = iota @@ -31,6 +50,20 @@ const ( TIMEOUT ) +type GossipHeaderPosition uint8 + +const ( + GH_VERSION_POS GossipHeaderPosition = 0 + GH_MAX_POS GossipHeaderPosition = 7 +) + +type GossipHeaderVersion uint8 + +const ( + GH_VERSION_BASE GossipHeaderVersion = 1 << iota + GH_VERSION_1 +) + type GossipDirection uint8 const ( @@ -49,6 +82,8 @@ const ( NotifyLeave GossipOp = "Notify Leave" ) +// Struct definitions + type GossipSessionInfo struct { Node string Ts time.Time @@ -74,6 +109,16 @@ type NodeInfo struct { Value StoreMap } +// New structure format to pass over wire +type GossipData struct { + Id NodeId + GenNumber uint64 + LastUpdateTs time.Time + WaitForGenUpdateTs time.Time + Status NodeStatus + Value map[string][]byte +} + type NodeValue struct { Id NodeId GenNumber uint64 @@ -87,14 +132,7 @@ func (n NodeInfo) String() string { n.Id, n.LastUpdateTs, n.Status, n.Value) } -const ( - DEFAULT_GOSSIP_INTERVAL time.Duration = 2 * time.Second - DEFAULT_PUSH_PULL_INTERVAL time.Duration = 2 * time.Second - DEFAULT_PROBE_INTERVAL time.Duration = 5 * time.Second - DEFAULT_PROBE_TIMEOUT time.Duration = 200 * time.Millisecond - DEFAULT_QUORUM_TIMEOUT time.Duration = 1 * time.Minute - DEFAULT_GOSSIP_VERSION string = "v1" -) +// Interface definitions type GossipIntervals struct { // GossipInterval is the time interval within which the nodes gossip