diff --git a/kv/config/config.go b/kv/config/config.go index ee58a768..0b65aed0 100644 --- a/kv/config/config.go +++ b/kv/config/config.go @@ -83,6 +83,7 @@ func NewDefaultConfig() *Config { func NewTestConfig() *Config { return &Config{ + LogLevel: "info", Raft: true, RaftBaseTickInterval: 50 * time.Millisecond, RaftHeartbeatTicks: 2, diff --git a/kv/raftstore/peer_msg_handler.go b/kv/raftstore/peer_msg_handler.go index b2dfd625..370ae959 100644 --- a/kv/raftstore/peer_msg_handler.go +++ b/kv/raftstore/peer_msg_handler.go @@ -337,6 +337,8 @@ func (d *peerMsgHandler) checkSnapshot(msg *rspb.RaftMessage) (*snap.SnapKey, er return &key, nil } meta := d.ctx.storeMeta + meta.Lock() + defer meta.Unlock() if !util.RegionEqual(meta.regions[d.regionId], d.Region()) { if !d.isInitialized() { log.Infof("%s stale delegate detected, skip", d.Tag) @@ -368,6 +370,8 @@ func (d *peerMsgHandler) destroyPeer() { regionID := d.regionId // We can't destroy a peer which is applying snapshot. meta := d.ctx.storeMeta + meta.Lock() + defer meta.Unlock() isInitialized := d.isInitialized() if err := d.Destroy(d.ctx.engine, false); err != nil { // If not panic here, the peer will be recreated in the next restart, @@ -389,6 +393,8 @@ func (d *peerMsgHandler) destroyPeer() { func (d *peerMsgHandler) findSiblingRegion() (result *metapb.Region) { meta := d.ctx.storeMeta + meta.RLock() + defer meta.RUnlock() item := ®ionItem{region: d.Region()} meta.regionRanges.AscendGreaterOrEqual(item, func(i btree.Item) bool { result = i.(*regionItem).region diff --git a/kv/raftstore/raftstore.go b/kv/raftstore/raftstore.go index d71fcdf3..351cd7ca 100644 --- a/kv/raftstore/raftstore.go +++ b/kv/raftstore/raftstore.go @@ -36,6 +36,7 @@ func (r *regionItem) Less(other btree.Item) bool { } type storeMeta struct { + sync.RWMutex /// region end key -> region ID regionRanges *btree.BTree /// region_id -> region diff --git a/kv/raftstore/store_worker.go b/kv/raftstore/store_worker.go index be4574e0..936aa34f 100644 --- a/kv/raftstore/store_worker.go +++ b/kv/raftstore/store_worker.go @@ -118,6 +118,8 @@ func (d *storeWorker) checkMsg(msg *rspb.RaftMessage) (bool, error) { // Maybe split, but not registered yet. if util.IsFirstVoteMessage(msg.Message) { meta := d.ctx.storeMeta + meta.RLock() + defer meta.RUnlock() // Last check on whether target peer is created, otherwise, the // vote message will never be comsumed. if _, ok := meta.regions[regionID]; ok { @@ -194,6 +196,8 @@ func (d *storeWorker) maybeCreatePeer(regionID uint64, msg *rspb.RaftMessage) (b // we may encounter a message with larger peer id, which means // current peer is stale, then we should remove current peer meta := d.ctx.storeMeta + meta.Lock() + defer meta.Unlock() if _, ok := meta.regions[regionID]; ok { return true, nil } @@ -229,7 +233,10 @@ func (d *storeWorker) maybeCreatePeer(regionID uint64, msg *rspb.RaftMessage) (b func (d *storeWorker) storeHeartbeatScheduler() { stats := new(schedulerpb.StoreStats) stats.StoreId = d.ctx.store.Id - stats.RegionCount = uint32(len(d.ctx.storeMeta.regions)) + meta := d.ctx.storeMeta + meta.RLock() + stats.RegionCount = uint32(len(meta.regions)) + meta.RUnlock() d.ctx.schedulerTaskSender <- &runner.SchedulerStoreHeartbeatTask{ Stats: stats, Engine: d.ctx.engine.Kv, diff --git a/kv/test_raftstore/scheduler.go b/kv/test_raftstore/scheduler.go index 2983e181..0d4413c8 100644 --- a/kv/test_raftstore/scheduler.go +++ b/kv/test_raftstore/scheduler.go @@ -3,7 +3,6 @@ package test_raftstore import ( "bytes" "context" - "fmt" "sync" "github.com/google/btree" @@ -350,7 +349,6 @@ func (m *MockSchedulerClient) handleHeartbeatConfVersion(region *metapb.Region) if searchRegionPeerLen-regionPeerLen != 1 { panic("should only one conf change") } - fmt.Println(searchRegion, region) if len(GetDiffPeers(searchRegion, region)) != 1 { panic("should only one different peer") } diff --git a/kv/test_raftstore/test_test.go b/kv/test_raftstore/test_test.go index 3a5d5bfc..9b696cca 100644 --- a/kv/test_raftstore/test_test.go +++ b/kv/test_raftstore/test_test.go @@ -300,6 +300,8 @@ func GenericTest(t *testing.T, part string, nclients int, unreliable bool, crash } if maxraftlog > 0 { + time.Sleep(1 * time.Second) + // Check maximum after the servers have processed all client // requests and had time to checkpoint. key := []byte("")