fix concurrent read and write on map

Signed-off-by: Connor1996 <zbk602423539@gmail.com>
This commit is contained in:
Connor1996 2020-07-27 15:27:24 +08:00
parent a7d2aa66d5
commit abaa565b18
6 changed files with 18 additions and 3 deletions

View File

@ -83,6 +83,7 @@ func NewDefaultConfig() *Config {
func NewTestConfig() *Config { func NewTestConfig() *Config {
return &Config{ return &Config{
LogLevel: "info",
Raft: true, Raft: true,
RaftBaseTickInterval: 50 * time.Millisecond, RaftBaseTickInterval: 50 * time.Millisecond,
RaftHeartbeatTicks: 2, RaftHeartbeatTicks: 2,

View File

@ -337,6 +337,8 @@ func (d *peerMsgHandler) checkSnapshot(msg *rspb.RaftMessage) (*snap.SnapKey, er
return &key, nil return &key, nil
} }
meta := d.ctx.storeMeta meta := d.ctx.storeMeta
meta.Lock()
defer meta.Unlock()
if !util.RegionEqual(meta.regions[d.regionId], d.Region()) { if !util.RegionEqual(meta.regions[d.regionId], d.Region()) {
if !d.isInitialized() { if !d.isInitialized() {
log.Infof("%s stale delegate detected, skip", d.Tag) log.Infof("%s stale delegate detected, skip", d.Tag)
@ -368,6 +370,8 @@ func (d *peerMsgHandler) destroyPeer() {
regionID := d.regionId regionID := d.regionId
// We can't destroy a peer which is applying snapshot. // We can't destroy a peer which is applying snapshot.
meta := d.ctx.storeMeta meta := d.ctx.storeMeta
meta.Lock()
defer meta.Unlock()
isInitialized := d.isInitialized() isInitialized := d.isInitialized()
if err := d.Destroy(d.ctx.engine, false); err != nil { if err := d.Destroy(d.ctx.engine, false); err != nil {
// If not panic here, the peer will be recreated in the next restart, // If not panic here, the peer will be recreated in the next restart,
@ -389,6 +393,8 @@ func (d *peerMsgHandler) destroyPeer() {
func (d *peerMsgHandler) findSiblingRegion() (result *metapb.Region) { func (d *peerMsgHandler) findSiblingRegion() (result *metapb.Region) {
meta := d.ctx.storeMeta meta := d.ctx.storeMeta
meta.RLock()
defer meta.RUnlock()
item := &regionItem{region: d.Region()} item := &regionItem{region: d.Region()}
meta.regionRanges.AscendGreaterOrEqual(item, func(i btree.Item) bool { meta.regionRanges.AscendGreaterOrEqual(item, func(i btree.Item) bool {
result = i.(*regionItem).region result = i.(*regionItem).region

View File

@ -36,6 +36,7 @@ func (r *regionItem) Less(other btree.Item) bool {
} }
type storeMeta struct { type storeMeta struct {
sync.RWMutex
/// region end key -> region ID /// region end key -> region ID
regionRanges *btree.BTree regionRanges *btree.BTree
/// region_id -> region /// region_id -> region

View File

@ -118,6 +118,8 @@ func (d *storeWorker) checkMsg(msg *rspb.RaftMessage) (bool, error) {
// Maybe split, but not registered yet. // Maybe split, but not registered yet.
if util.IsFirstVoteMessage(msg.Message) { if util.IsFirstVoteMessage(msg.Message) {
meta := d.ctx.storeMeta meta := d.ctx.storeMeta
meta.RLock()
defer meta.RUnlock()
// Last check on whether target peer is created, otherwise, the // Last check on whether target peer is created, otherwise, the
// vote message will never be comsumed. // vote message will never be comsumed.
if _, ok := meta.regions[regionID]; ok { if _, ok := meta.regions[regionID]; ok {
@ -194,6 +196,8 @@ func (d *storeWorker) maybeCreatePeer(regionID uint64, msg *rspb.RaftMessage) (b
// we may encounter a message with larger peer id, which means // we may encounter a message with larger peer id, which means
// current peer is stale, then we should remove current peer // current peer is stale, then we should remove current peer
meta := d.ctx.storeMeta meta := d.ctx.storeMeta
meta.Lock()
defer meta.Unlock()
if _, ok := meta.regions[regionID]; ok { if _, ok := meta.regions[regionID]; ok {
return true, nil return true, nil
} }
@ -229,7 +233,10 @@ func (d *storeWorker) maybeCreatePeer(regionID uint64, msg *rspb.RaftMessage) (b
func (d *storeWorker) storeHeartbeatScheduler() { func (d *storeWorker) storeHeartbeatScheduler() {
stats := new(schedulerpb.StoreStats) stats := new(schedulerpb.StoreStats)
stats.StoreId = d.ctx.store.Id stats.StoreId = d.ctx.store.Id
stats.RegionCount = uint32(len(d.ctx.storeMeta.regions)) meta := d.ctx.storeMeta
meta.RLock()
stats.RegionCount = uint32(len(meta.regions))
meta.RUnlock()
d.ctx.schedulerTaskSender <- &runner.SchedulerStoreHeartbeatTask{ d.ctx.schedulerTaskSender <- &runner.SchedulerStoreHeartbeatTask{
Stats: stats, Stats: stats,
Engine: d.ctx.engine.Kv, Engine: d.ctx.engine.Kv,

View File

@ -3,7 +3,6 @@ package test_raftstore
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
"sync" "sync"
"github.com/google/btree" "github.com/google/btree"
@ -350,7 +349,6 @@ func (m *MockSchedulerClient) handleHeartbeatConfVersion(region *metapb.Region)
if searchRegionPeerLen-regionPeerLen != 1 { if searchRegionPeerLen-regionPeerLen != 1 {
panic("should only one conf change") panic("should only one conf change")
} }
fmt.Println(searchRegion, region)
if len(GetDiffPeers(searchRegion, region)) != 1 { if len(GetDiffPeers(searchRegion, region)) != 1 {
panic("should only one different peer") panic("should only one different peer")
} }

View File

@ -300,6 +300,8 @@ func GenericTest(t *testing.T, part string, nclients int, unreliable bool, crash
} }
if maxraftlog > 0 { if maxraftlog > 0 {
time.Sleep(1 * time.Second)
// Check maximum after the servers have processed all client // Check maximum after the servers have processed all client
// requests and had time to checkpoint. // requests and had time to checkpoint.
key := []byte("") key := []byte("")