diff --git a/kv/config/config.go b/kv/config/config.go index 572c3869..5e2b81a4 100644 --- a/kv/config/config.go +++ b/kv/config/config.go @@ -65,6 +65,7 @@ func NewDefaultConfig() *Config { SchedulerAddr: "127.0.0.1:2379", StoreAddr: "127.0.0.1:20160", LogLevel: "info", + Raft: true, RaftBaseTickInterval: 1 * time.Second, RaftHeartbeatTicks: 2, RaftElectionTimeoutTicks: 10, @@ -83,6 +84,7 @@ func NewDefaultConfig() *Config { func NewTestConfig() *Config { return &Config{ LogLevel: "info", + Raft: true, RaftBaseTickInterval: 10 * time.Millisecond, RaftHeartbeatTicks: 2, RaftElectionTimeoutTicks: 10, diff --git a/kv/main.go b/kv/main.go index 36aca497..2a07c68f 100644 --- a/kv/main.go +++ b/kv/main.go @@ -24,6 +24,8 @@ import ( var ( schedulerAddr = flag.String("scheduler", "", "scheduler address") storeAddr = flag.String("addr", "", "store address") + dbPath = flag.String("path", "", "directory path of db") + logLevel = flag.String("loglevel", "", "the level of log") ) func main() { @@ -35,9 +37,16 @@ func main() { if *storeAddr != "" { conf.StoreAddr = *storeAddr } + if *dbPath != "" { + conf.DBPath = *dbPath + } + if *logLevel != "" { + conf.LogLevel = *logLevel + } + log.SetLevelByString(conf.LogLevel) log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds | log.Lshortfile) - log.Infof("conf %v", conf) + log.Infof("Server started with conf %+v", conf) var storage storage.Storage if conf.Raft { diff --git a/kv/raftstore/peer.go b/kv/raftstore/peer.go index 4767a110..9534caf6 100644 --- a/kv/raftstore/peer.go +++ b/kv/raftstore/peer.go @@ -365,7 +365,7 @@ func (p *peer) sendRaftMessage(msg eraftpb.Message, trans Transport) error { if toPeer == nil { return fmt.Errorf("failed to lookup recipient peer %v in region %v", msg.To, p.regionId) } - log.Debugf("%v, send raft msg %v from %v to %v", p.Tag, msg.MsgType, fromPeer.Id, toPeer.Id) + log.Debugf("%v, send raft msg %v from %v to %v", p.Tag, msg.MsgType, fromPeer, toPeer) sendMsg.FromPeer = &fromPeer sendMsg.ToPeer = toPeer diff --git a/kv/storage/raft_storage/raft_client.go b/kv/storage/raft_storage/raft_client.go index 424e49dc..748bbd0a 100644 --- a/kv/storage/raft_storage/raft_client.go +++ b/kv/storage/raft_storage/raft_client.go @@ -54,30 +54,27 @@ func (c *raftConn) Send(msg *raft_serverpb.RaftMessage) error { return c.stream.Send(msg) } -type connKey struct { - addr string - index int -} - type RaftClient struct { config *config.Config sync.RWMutex - conn *raftConn + conns map[string]*raftConn addrs map[uint64]string } func newRaftClient(config *config.Config) *RaftClient { return &RaftClient{ config: config, + conns: make(map[string]*raftConn), addrs: make(map[uint64]string), } } func (c *RaftClient) getConn(addr string, regionID uint64) (*raftConn, error) { c.RLock() - if c.conn != nil { + conn, ok := c.conns[addr] + if ok { c.RUnlock() - return c.conn, nil + return conn, nil } c.RUnlock() newConn, err := newRaftConn(addr, c.config) @@ -86,11 +83,11 @@ func (c *RaftClient) getConn(addr string, regionID uint64) (*raftConn, error) { } c.Lock() defer c.Unlock() - if c.conn != nil { + if conn, ok := c.conns[addr]; ok { newConn.Stop() - return c.conn, nil + return conn, nil } - c.conn = newConn + c.conns[addr] = newConn return newConn, nil } @@ -108,7 +105,7 @@ func (c *RaftClient) Send(storeID uint64, addr string, msg *raft_serverpb.RaftMe c.Lock() defer c.Unlock() conn.Stop() - c.conn = nil + delete(c.conns, addr) if oldAddr, ok := c.addrs[storeID]; ok && oldAddr == addr { delete(c.addrs, storeID) } diff --git a/kv/storage/raft_storage/raft_server.go b/kv/storage/raft_storage/raft_server.go index c6ab547e..52daa395 100644 --- a/kv/storage/raft_storage/raft_server.go +++ b/kv/storage/raft_storage/raft_server.go @@ -189,7 +189,7 @@ func (rs *RaftStorage) Start() error { resolveRunner := newResolverRunner(schedulerClient) rs.resolveWorker.Start(resolveRunner) - rs.snapManager = snap.NewSnapManager(cfg.DBPath + "snap") + rs.snapManager = snap.NewSnapManager(filepath.Join(cfg.DBPath, "snap")) rs.snapWorker = worker.NewWorker("snap-worker", &rs.wg) snapSender := rs.snapWorker.Sender() snapRunner := newSnapRunner(rs.snapManager, rs.config, rs.raftRouter) diff --git a/kv/test_raftstore/cluster.go b/kv/test_raftstore/cluster.go index 10bb7d36..f02dfa7f 100644 --- a/kv/test_raftstore/cluster.go +++ b/kv/test_raftstore/cluster.go @@ -219,7 +219,7 @@ func (c *Cluster) CallCommandOnLeader(request *raft_cmdpb.RaftCmdRequest, timeou request.Header.Peer = leader resp, txn := c.CallCommand(request, 1*time.Second) if resp == nil { - log.Warnf("can't call command %s on leader %d of region %d", request.String(), leader.GetId(), regionID) + log.Debugf("can't call command %s on leader %d of region %d", request.String(), leader.GetId(), regionID) newLeader := c.LeaderOfRegion(regionID) if leader == newLeader { region, _, err := c.schedulerClient.GetRegionByID(context.TODO(), regionID) diff --git a/scheduler/server/cluster_test.go b/scheduler/server/cluster_test.go index e65b23f8..607053e0 100644 --- a/scheduler/server/cluster_test.go +++ b/scheduler/server/cluster_test.go @@ -710,73 +710,6 @@ func (s *baseCluster) getRaftCluster(c *C) *RaftCluster { return cluster } -func (s *baseCluster) getClusterConfig(c *C, clusterID uint64) *metapb.Cluster { - req := &schedulerpb.GetClusterConfigRequest{ - Header: testutil.NewRequestHeader(clusterID), - } - - resp, err := s.grpcSchedulerClient.GetClusterConfig(context.Background(), req) - c.Assert(err, IsNil) - c.Assert(resp.GetCluster(), NotNil) - - return resp.GetCluster() -} - -func (s *testClusterSuite) TestGetPutConfig(c *C) { - var err error - var cleanup func() - s.svr, cleanup, err = NewTestServer(c) - defer cleanup() - c.Assert(err, IsNil) - mustWaitLeader(c, []*Server{s.svr}) - s.grpcSchedulerClient = testutil.MustNewGrpcClient(c, s.svr.GetAddr()) - clusterID := s.svr.clusterID - - storeAddr := "127.0.0.1:0" - bootstrapRequest := s.newBootstrapRequest(c, s.svr.clusterID, storeAddr) - _, err = s.svr.bootstrapCluster(bootstrapRequest) - c.Assert(err, IsNil) - - store := bootstrapRequest.Store - peer := s.newPeer(c, store.GetId(), 0) - region := s.newRegion(c, 0, []byte{}, []byte{}, []*metapb.Peer{peer}, nil) - err = s.svr.cluster.processRegionHeartbeat(core.NewRegionInfo(region, nil)) - c.Assert(err, IsNil) - // Get region. - region = s.getRegion(c, clusterID, []byte("abc")) - c.Assert(region.GetPeers(), HasLen, 1) - peer = region.GetPeers()[0] - - // Get region by id. - regionByID := s.getRegionByID(c, clusterID, region.GetId()) - c.Assert(region, DeepEquals, regionByID) - - // Get store. - storeID := peer.GetStoreId() - store = s.getStore(c, clusterID, storeID) - - // Update store. - store.Address = "127.0.0.1:1" - s.testPutStore(c, clusterID, store) - - // Remove store. - s.testRemoveStore(c, clusterID, store) - - // Update cluster config. - req := &schedulerpb.PutClusterConfigRequest{ - Header: testutil.NewRequestHeader(clusterID), - Cluster: &metapb.Cluster{ - Id: clusterID, - MaxPeerCount: 5, - }, - } - resp, err := s.grpcSchedulerClient.PutClusterConfig(context.Background(), req) - c.Assert(err, IsNil) - c.Assert(resp, NotNil) - meta := s.getClusterConfig(c, clusterID) - c.Assert(meta.GetMaxPeerCount(), Equals, uint32(5)) -} - func putStore(c *C, grpcSchedulerClient schedulerpb.SchedulerClient, clusterID uint64, store *metapb.Store) (*schedulerpb.PutStoreResponse, error) { req := &schedulerpb.PutStoreRequest{ Header: testutil.NewRequestHeader(clusterID),