talent-plan-tinykv/kv/storage/raft_storage/raft_server.go

223 lines
5.9 KiB
Go
Raw Normal View History

package raft_storage
import (
"context"
"os"
"path/filepath"
"strings"
"sync"
"github.com/pingcap-incubator/tinykv/kv/config"
"github.com/pingcap-incubator/tinykv/kv/raftstore"
"github.com/pingcap-incubator/tinykv/kv/raftstore/message"
"github.com/pingcap-incubator/tinykv/kv/raftstore/scheduler_client"
"github.com/pingcap-incubator/tinykv/kv/raftstore/snap"
"github.com/pingcap-incubator/tinykv/kv/storage"
"github.com/pingcap-incubator/tinykv/kv/util/engine_util"
"github.com/pingcap-incubator/tinykv/kv/util/worker"
"github.com/pingcap-incubator/tinykv/proto/pkg/errorpb"
"github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"
"github.com/pingcap-incubator/tinykv/proto/pkg/raft_cmdpb"
"github.com/pingcap-incubator/tinykv/proto/pkg/tinykvpb"
"github.com/pingcap/errors"
)
// RaftStorage is an implementation of `Storage` (see tikv/server.go) backed by a Raft node. It is part of a Raft network.
// By using Raft, reads and writes are consistent with other nodes in the TinyKV instance.
type RaftStorage struct {
engines *engine_util.Engines
config *config.Config
node *raftstore.Node
snapManager *snap.SnapManager
raftRouter *raftstore.RaftstoreRouter
raftSystem *raftstore.Raftstore
resolveWorker *worker.Worker
snapWorker *worker.Worker
wg sync.WaitGroup
}
type RegionError struct {
RequestErr *errorpb.Error
}
func (re *RegionError) Error() string {
return re.RequestErr.String()
}
func (rs *RaftStorage) checkResponse(resp *raft_cmdpb.RaftCmdResponse, reqCount int) error {
if resp.Header.Error != nil {
return &RegionError{RequestErr: resp.Header.Error}
}
if len(resp.Responses) != reqCount {
return errors.Errorf("responses count %d is not equal to requests count %d",
len(resp.Responses), reqCount)
}
return nil
}
// NewRaftStorage creates a new storage engine backed by a raftstore.
func NewRaftStorage(conf *config.Config) *RaftStorage {
dbPath := conf.DBPath
kvPath := filepath.Join(dbPath, "kv")
raftPath := filepath.Join(dbPath, "raft")
snapPath := filepath.Join(dbPath, "snap")
os.MkdirAll(kvPath, os.ModePerm)
os.MkdirAll(raftPath, os.ModePerm)
os.Mkdir(snapPath, os.ModePerm)
raftDB := engine_util.CreateDB("raft", conf)
kvDB := engine_util.CreateDB("kv", conf)
engines := engine_util.NewEngines(kvDB, raftDB, kvPath, raftPath)
return &RaftStorage{engines: engines, config: conf}
}
func (rs *RaftStorage) Write(ctx *kvrpcpb.Context, batch []storage.Modify) error {
var reqs []*raft_cmdpb.Request
for _, m := range batch {
switch m.Data.(type) {
case storage.Put:
put := m.Data.(storage.Put)
reqs = append(reqs, &raft_cmdpb.Request{
CmdType: raft_cmdpb.CmdType_Put,
Put: &raft_cmdpb.PutRequest{
Cf: put.Cf,
Key: put.Key,
Value: put.Value,
}})
case storage.Delete:
delete := m.Data.(storage.Delete)
reqs = append(reqs, &raft_cmdpb.Request{
CmdType: raft_cmdpb.CmdType_Delete,
Delete: &raft_cmdpb.DeleteRequest{
Cf: delete.Cf,
Key: delete.Key,
}})
}
}
header := &raft_cmdpb.RaftRequestHeader{
RegionId: ctx.RegionId,
Peer: ctx.Peer,
RegionEpoch: ctx.RegionEpoch,
Term: ctx.Term,
}
request := &raft_cmdpb.RaftCmdRequest{
Header: header,
Requests: reqs,
}
cb := message.NewCallback()
if err := rs.raftRouter.SendRaftCommand(request, cb); err != nil {
return err
}
return rs.checkResponse(cb.WaitResp(), len(reqs))
}
func (rs *RaftStorage) Reader(ctx *kvrpcpb.Context) (storage.StorageReader, error) {
header := &raft_cmdpb.RaftRequestHeader{
RegionId: ctx.RegionId,
Peer: ctx.Peer,
RegionEpoch: ctx.RegionEpoch,
Term: ctx.Term,
}
request := &raft_cmdpb.RaftCmdRequest{
Header: header,
Requests: []*raft_cmdpb.Request{{
CmdType: raft_cmdpb.CmdType_Snap,
Snap: &raft_cmdpb.SnapRequest{},
}},
}
cb := message.NewCallback()
if err := rs.raftRouter.SendRaftCommand(request, cb); err != nil {
return nil, err
}
resp := cb.WaitResp()
if err := rs.checkResponse(resp, 1); err != nil {
if cb.Txn != nil {
cb.Txn.Discard()
}
return nil, err
}
if cb.Txn == nil {
panic("can not found region snap")
}
if len(resp.Responses) != 1 {
panic("wrong response count for snap cmd")
}
return NewRegionReader(cb.Txn, *resp.Responses[0].GetSnap().Region), nil
}
func (rs *RaftStorage) Raft(stream tinykvpb.TinyKv_RaftServer) error {
for {
msg, err := stream.Recv()
if err != nil {
return err
}
rs.raftRouter.SendRaftMessage(msg)
}
}
func (rs *RaftStorage) Snapshot(stream tinykvpb.TinyKv_SnapshotServer) error {
var err error
done := make(chan struct{})
rs.snapWorker.Sender() <- &recvSnapTask{
stream: stream,
callback: func(e error) {
err = e
close(done)
},
}
<-done
return err
}
func (rs *RaftStorage) Start() error {
cfg := rs.config
schedulerClient, err := scheduler_client.NewClient(strings.Split(cfg.SchedulerAddr, ","), "")
if err != nil {
return err
}
rs.raftRouter, rs.raftSystem = raftstore.CreateRaftstore(cfg)
rs.resolveWorker = worker.NewWorker("resolver", &rs.wg)
resolveSender := rs.resolveWorker.Sender()
resolveRunner := newResolverRunner(schedulerClient)
rs.resolveWorker.Start(resolveRunner)
rs.snapManager = snap.NewSnapManager(filepath.Join(cfg.DBPath, "snap"))
rs.snapWorker = worker.NewWorker("snap-worker", &rs.wg)
snapSender := rs.snapWorker.Sender()
snapRunner := newSnapRunner(rs.snapManager, rs.config, rs.raftRouter)
rs.snapWorker.Start(snapRunner)
raftClient := newRaftClient(cfg)
trans := NewServerTransport(raftClient, snapSender, rs.raftRouter, resolveSender)
rs.node = raftstore.NewNode(rs.raftSystem, rs.config, schedulerClient)
err = rs.node.Start(context.TODO(), rs.engines, trans, rs.snapManager)
if err != nil {
return err
}
return nil
}
func (rs *RaftStorage) Stop() error {
rs.snapWorker.Stop()
rs.node.Stop()
rs.resolveWorker.Stop()
rs.wg.Wait()
if err := rs.engines.Raft.Close(); err != nil {
return err
}
if err := rs.engines.Kv.Close(); err != nil {
return err
}
return nil
}