talent-plan-tinykv/kv/raftstore/snap/snap.go
2020-05-28 13:04:18 +08:00

787 lines
18 KiB
Go

package snap
import (
"fmt"
"hash"
"hash/crc32"
"io"
"os"
"path/filepath"
"strings"
"sync/atomic"
"time"
"github.com/Connor1996/badger"
"github.com/Connor1996/badger/table"
"github.com/pingcap-incubator/tinykv/kv/util"
"github.com/pingcap-incubator/tinykv/kv/util/engine_util"
"github.com/pingcap-incubator/tinykv/log"
"github.com/pingcap-incubator/tinykv/proto/pkg/eraftpb"
"github.com/pingcap-incubator/tinykv/proto/pkg/metapb"
rspb "github.com/pingcap-incubator/tinykv/proto/pkg/raft_serverpb"
"github.com/pingcap/errors"
)
type SnapStateType int
const (
SnapState_Relax SnapStateType = 0 + iota
SnapState_Generating
SnapState_Applying
)
type SnapState struct {
StateType SnapStateType
Receiver chan *eraftpb.Snapshot
}
const (
snapGenPrefix = "gen" // Name prefix for the self-generated snapshot file.
snapRevPrefix = "rev" // Name prefix for the received snapshot file.
sstFileSuffix = ".sst"
tmpFileSuffix = ".tmp"
cloneFileSuffix = ".clone"
metaFileSuffix = ".meta"
deleteRetryMaxTime = 6
deleteRetryDuration = 500 * time.Millisecond
)
type ApplySnapAbortError string
func (e ApplySnapAbortError) Error() string {
return string(e)
}
var (
errAbort = ApplySnapAbortError("abort")
)
type SnapKeyWithSending struct {
SnapKey SnapKey
IsSending bool
}
type SnapKey struct {
RegionID uint64
Term uint64
Index uint64
}
func (k SnapKey) String() string {
return fmt.Sprintf("%d_%d_%d", k.RegionID, k.Term, k.Index)
}
func SnapKeyFromRegionSnap(regionID uint64, snap *eraftpb.Snapshot) SnapKey {
return SnapKey{
RegionID: regionID,
Term: snap.Metadata.Term,
Index: snap.Metadata.Index,
}
}
func SnapKeyFromSnap(snap *eraftpb.Snapshot) (SnapKey, error) {
data := new(rspb.RaftSnapshotData)
err := data.Unmarshal(snap.Data)
if err != nil {
return SnapKey{}, err
}
return SnapKeyFromRegionSnap(data.Region.Id, snap), nil
}
type SnapStatistics struct {
Size uint64
KVCount int
}
type ApplyOptions struct {
DB *badger.DB
Region *metapb.Region
}
func NewApplyOptions(db *badger.DB, region *metapb.Region) *ApplyOptions {
return &ApplyOptions{
DB: db,
Region: region,
}
}
// `Snapshot` is an interface for snapshot.
// It's used in these scenarios:
// 1. build local snapshot
// 2. read local snapshot and then replicate it to remote raftstores
// 3. receive snapshot from remote raftstore and write it to local storage
// 4. apply snapshot
// 5. snapshot gc
type Snapshot interface {
io.Reader
io.Writer
Build(dbSnap *badger.Txn, region *metapb.Region, snapData *rspb.RaftSnapshotData, stat *SnapStatistics, deleter SnapshotDeleter) error
Path() string
Exists() bool
Delete()
Meta() (os.FileInfo, error)
TotalSize() uint64
Save() error
Apply(option ApplyOptions) error
}
// `SnapshotDeleter` is a trait for deleting snapshot.
// It's used to ensure that the snapshot deletion happens under the protection of locking
// to avoid race case for concurrent read/write.
type SnapshotDeleter interface {
// DeleteSnapshot returns true if it successfully delete the specified snapshot.
DeleteSnapshot(key SnapKey, snapshot Snapshot, checkEntry bool) bool
}
func retryDeleteSnapshot(deleter SnapshotDeleter, key SnapKey, snap Snapshot) bool {
for i := 0; i < deleteRetryMaxTime; i++ {
if deleter.DeleteSnapshot(key, snap, true) {
return true
}
time.Sleep(deleteRetryDuration)
}
return false
}
func genSnapshotMeta(cfFiles []*CFFile) (*rspb.SnapshotMeta, error) {
cfMetas := make([]*rspb.SnapshotCFFile, 0, len(engine_util.CFs))
for _, cfFile := range cfFiles {
var found bool
for _, snapCF := range engine_util.CFs {
if snapCF == cfFile.CF {
found = true
break
}
}
if !found {
return nil, errors.Errorf("failed to encode invalid snapshot CF %s", cfFile.CF)
}
cfMeta := &rspb.SnapshotCFFile{
Cf: cfFile.CF,
Size_: cfFile.Size,
Checksum: cfFile.Checksum,
}
cfMetas = append(cfMetas, cfMeta)
}
return &rspb.SnapshotMeta{
CfFiles: cfMetas,
}, nil
}
func checkFileSize(path string, expectedSize uint64) error {
size, err := util.GetFileSize(path)
if err != nil {
return err
}
if size != expectedSize {
return errors.Errorf("invalid size %d for snapshot cf file %s, expected %d", size, path, expectedSize)
}
return nil
}
func checkFileChecksum(path string, expectedChecksum uint32) error {
checksum, err := util.CalcCRC32(path)
if err != nil {
return err
}
if checksum != expectedChecksum {
return errors.Errorf("invalid checksum %d for snapshot cf file %s, expected %d",
checksum, path, expectedChecksum)
}
return nil
}
func checkFileSizeAndChecksum(path string, expectedSize uint64, expectedChecksum uint32) error {
err := checkFileSize(path, expectedSize)
if err == nil {
err = checkFileChecksum(path, expectedChecksum)
}
return err
}
type CFFile struct {
CF string
Path string
TmpPath string
SstWriter *table.Builder
File *os.File
KVCount int
Size uint64
WrittenSize uint64
Checksum uint32
WriteDigest hash.Hash32
}
type MetaFile struct {
Meta *rspb.SnapshotMeta
Path string
File *os.File
// for writing snapshot
TmpPath string
}
var _ Snapshot = new(Snap)
type Snap struct {
key SnapKey
displayPath string
CFFiles []*CFFile
cfIndex int
MetaFile *MetaFile
SizeTrack *int64
holdTmpFiles bool
}
func NewSnap(dir string, key SnapKey, sizeTrack *int64, isSending, toBuild bool,
deleter SnapshotDeleter) (*Snap, error) {
if !util.DirExists(dir) {
err := os.MkdirAll(dir, 0700)
if err != nil {
return nil, errors.WithStack(err)
}
}
var snapPrefix string
if isSending {
snapPrefix = snapGenPrefix
} else {
snapPrefix = snapRevPrefix
}
prefix := fmt.Sprintf("%s_%s", snapPrefix, key)
displayPath := getDisplayPath(dir, prefix)
cfFiles := make([]*CFFile, 0, len(engine_util.CFs))
for _, cf := range engine_util.CFs {
fileName := fmt.Sprintf("%s_%s%s", prefix, cf, sstFileSuffix)
path := filepath.Join(dir, fileName)
tmpPath := path + tmpFileSuffix
cfFile := &CFFile{
CF: cf,
Path: path,
TmpPath: tmpPath,
}
cfFiles = append(cfFiles, cfFile)
}
metaFileName := fmt.Sprintf("%s%s", prefix, metaFileSuffix)
metaFilePath := filepath.Join(dir, metaFileName)
metaTmpPath := metaFilePath + tmpFileSuffix
metaFile := &MetaFile{
Path: metaFilePath,
TmpPath: metaTmpPath,
}
s := &Snap{
key: key,
displayPath: displayPath,
CFFiles: cfFiles,
MetaFile: metaFile,
SizeTrack: sizeTrack,
}
// load snapshot meta if meta file exists.
if util.FileExists(metaFile.Path) {
err := s.loadSnapMeta()
if err != nil {
if !toBuild {
return nil, err
}
log.Warnf("failed to load existent snapshot meta when try to build %s: %v", s.Path(), err)
if !retryDeleteSnapshot(deleter, key, s) {
log.Warnf("failed to delete snapshot %s because it's already registered elsewhere", s.Path())
return nil, err
}
}
}
return s, nil
}
func NewSnapForBuilding(dir string, key SnapKey, sizeTrack *int64, deleter SnapshotDeleter) (*Snap, error) {
s, err := NewSnap(dir, key, sizeTrack, true, true, deleter)
if err != nil {
return nil, err
}
err = s.initForBuilding()
if err != nil {
return nil, err
}
return s, nil
}
func NewSnapForSending(dir string, key SnapKey, sizeTrack *int64, deleter SnapshotDeleter) (*Snap, error) {
s, err := NewSnap(dir, key, sizeTrack, true, false, deleter)
if err != nil {
return nil, err
}
if !s.Exists() {
// Skip the initialization below if it doesn't exists.
return s, nil
}
for _, cfFile := range s.CFFiles {
// initialize cf file size and reader
if cfFile.Size > 0 {
cfFile.File, err = os.Open(cfFile.Path)
if err != nil {
return nil, errors.WithStack(err)
}
}
}
return s, nil
}
func NewSnapForReceiving(dir string, key SnapKey, snapshotMeta *rspb.SnapshotMeta,
sizeTrack *int64, deleter SnapshotDeleter) (*Snap, error) {
s, err := NewSnap(dir, key, sizeTrack, false, false, deleter)
if err != nil {
return nil, err
}
err = s.setSnapshotMeta(snapshotMeta)
if err != nil {
return nil, err
}
if s.Exists() {
return s, nil
}
f, err := os.OpenFile(s.MetaFile.TmpPath, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return nil, err
}
s.MetaFile.File = f
s.holdTmpFiles = true
for _, cfFile := range s.CFFiles {
if cfFile.Size == 0 {
continue
}
f, err = os.OpenFile(cfFile.TmpPath, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return nil, err
}
cfFile.File = f
cfFile.WriteDigest = crc32.NewIEEE()
}
return s, nil
}
func NewSnapForApplying(dir string, key SnapKey, sizeTrack *int64, deleter SnapshotDeleter) (*Snap, error) {
return NewSnap(dir, key, sizeTrack, false, false, deleter)
}
func (s *Snap) initForBuilding() error {
if s.Exists() {
return nil
}
file, err := os.OpenFile(s.MetaFile.TmpPath, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}
s.MetaFile.File = file
s.holdTmpFiles = true
for _, cfFile := range s.CFFiles {
file, err = os.OpenFile(cfFile.TmpPath, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}
cfFile.SstWriter = table.NewExternalTableBuilder(file, nil, badger.DefaultOptions.TableBuilderOptions)
}
return nil
}
func (s *Snap) readSnapshotMeta() (*rspb.SnapshotMeta, error) {
fi, err := os.Stat(s.MetaFile.Path)
if err != nil {
return nil, errors.WithStack(err)
}
file, err := os.Open(s.MetaFile.Path)
if err != nil {
return nil, errors.WithStack(err)
}
size := fi.Size()
buf := make([]byte, size)
_, err = io.ReadFull(file, buf)
if err != nil {
return nil, errors.WithStack(err)
}
snapshotMeta := new(rspb.SnapshotMeta)
err = snapshotMeta.Unmarshal(buf)
if err != nil {
return nil, errors.WithStack(err)
}
return snapshotMeta, nil
}
func (s *Snap) setSnapshotMeta(snapshotMeta *rspb.SnapshotMeta) error {
if len(snapshotMeta.CfFiles) != len(s.CFFiles) {
return errors.Errorf("invalid CF number of snapshot meta, expect %d, got %d",
len(s.CFFiles), len(snapshotMeta.CfFiles))
}
for i, cfFile := range s.CFFiles {
meta := snapshotMeta.CfFiles[i]
if meta.Cf != cfFile.CF {
return errors.Errorf("invalid %d CF in snapshot meta, expect %s, got %s", i, cfFile.CF, meta.Cf)
}
if util.FileExists(cfFile.Path) {
// Check only the file size for `exists()` to work correctly.
err := checkFileSize(cfFile.Path, meta.GetSize_())
if err != nil {
return err
}
}
cfFile.Size = uint64(meta.GetSize_())
cfFile.Checksum = meta.GetChecksum()
}
s.MetaFile.Meta = snapshotMeta
return nil
}
func (s *Snap) loadSnapMeta() error {
snapshotMeta, err := s.readSnapshotMeta()
if err != nil {
return err
}
err = s.setSnapshotMeta(snapshotMeta)
if err != nil {
return err
}
// check if there is a data corruption when the meta file exists
// but cf files are deleted.
if !s.Exists() {
return errors.Errorf("snapshot %s is corrupted, some cf file is missing", s.Path())
}
return nil
}
func getDisplayPath(dir string, prefix string) string {
cfNames := "(" + strings.Join(engine_util.CFs[:], "|") + ")"
return fmt.Sprintf("%s/%s_%s%s", dir, prefix, cfNames, sstFileSuffix)
}
func (s *Snap) validate() error {
for _, cfFile := range s.CFFiles {
if cfFile.Size == 0 {
// Skip empty file. The checksum of this cf file should be 0 and
// this is checked when loading the snapshot meta.
continue
}
// TODO: prepare and validate for ingestion
}
return nil
}
func (s *Snap) saveCFFiles() error {
for _, cfFile := range s.CFFiles {
if cfFile.KVCount > 0 {
err := cfFile.SstWriter.Finish()
if err != nil {
return err
}
}
cfFile.SstWriter.Close()
size, err := util.GetFileSize(cfFile.TmpPath)
if err != nil {
return err
}
if size > 0 {
err = os.Rename(cfFile.TmpPath, cfFile.Path)
if err != nil {
return errors.WithStack(err)
}
cfFile.Size = size
// add size
atomic.AddInt64(s.SizeTrack, int64(size))
cfFile.Checksum, err = util.CalcCRC32(cfFile.Path)
if err != nil {
return err
}
} else {
// Clean up the `tmp_path` if this cf file is empty.
_, err = util.DeleteFileIfExists(cfFile.TmpPath)
if err != nil {
return err
}
}
}
return nil
}
func (s *Snap) saveMetaFile() error {
bin, err := s.MetaFile.Meta.Marshal()
if err != nil {
return errors.WithStack(err)
}
_, err = s.MetaFile.File.Write(bin)
if err != nil {
return errors.WithStack(err)
}
err = os.Rename(s.MetaFile.TmpPath, s.MetaFile.Path)
if err != nil {
return errors.WithStack(err)
}
s.holdTmpFiles = false
return nil
}
func (s *Snap) Build(dbSnap *badger.Txn, region *metapb.Region, snapData *rspb.RaftSnapshotData, stat *SnapStatistics, deleter SnapshotDeleter) error {
if s.Exists() {
err := s.validate()
if err == nil {
// set snapshot meta data
snapData.FileSize = s.TotalSize()
snapData.Meta = s.MetaFile.Meta
return nil
}
log.Errorf("[region %d] file %s is corrupted, will rebuild: %v", region.Id, s.Path(), err)
if !retryDeleteSnapshot(deleter, s.key, s) {
log.Errorf("[region %d] failed to delete corrupted snapshot %s because it's already registered elsewhere",
region.Id, s.Path())
return err
}
err = s.initForBuilding()
if err != nil {
return err
}
}
builder := newSnapBuilder(s.CFFiles, dbSnap, region)
err := builder.build()
if err != nil {
return err
}
log.Infof("region %d scan snapshot %s, key count %d, size %d", region.Id, s.Path(), builder.kvCount, builder.size)
err = s.saveCFFiles()
if err != nil {
return err
}
stat.KVCount = builder.kvCount
snapshotMeta, err := genSnapshotMeta(s.CFFiles)
if err != nil {
return err
}
s.MetaFile.Meta = snapshotMeta
err = s.saveMetaFile()
if err != nil {
return err
}
totalSize := s.TotalSize()
stat.Size = totalSize
// set snapshot meta data
snapData.FileSize = totalSize
snapData.Meta = s.MetaFile.Meta
return nil
}
func (s *Snap) Path() string {
return s.displayPath
}
func (s *Snap) Exists() bool {
for _, cfFile := range s.CFFiles {
if cfFile.Size > 0 && !util.FileExists(cfFile.Path) {
return false
}
}
return util.FileExists(s.MetaFile.Path)
}
func (s *Snap) Delete() {
log.Debugf("deleting %s", s.Path())
for _, cfFile := range s.CFFiles {
if s.holdTmpFiles {
_, err := util.DeleteFileIfExists(cfFile.TmpPath)
if err != nil {
panic(err)
}
}
deleted, err := util.DeleteFileIfExists(cfFile.Path)
if err != nil {
panic(err)
}
if deleted {
atomic.AddInt64(s.SizeTrack, -int64(cfFile.Size))
}
}
_, err := util.DeleteFileIfExists(s.MetaFile.Path)
if err != nil {
panic(err)
}
if s.holdTmpFiles {
_, err := util.DeleteFileIfExists(s.MetaFile.TmpPath)
if err != nil {
panic(err)
}
}
}
func (s *Snap) Meta() (os.FileInfo, error) {
fi, err := os.Stat(s.MetaFile.Path)
if err != nil {
return nil, errors.WithStack(err)
}
return fi, nil
}
func (s *Snap) TotalSize() (total uint64) {
for _, cf := range s.CFFiles {
total += cf.Size
}
return
}
func (s *Snap) Save() error {
log.Debugf("saving to %s", s.MetaFile.Path)
for _, cfFile := range s.CFFiles {
if cfFile.Size == 0 {
// skip empty cf file.
continue
}
// Check each cf file has been fully written, and the checksum matches.
if cfFile.WrittenSize != cfFile.Size {
return errors.Errorf("snapshot file %s for CF %s size mismatch, real size %d, expected %d",
cfFile.Path, cfFile.CF, cfFile.WrittenSize, cfFile.Size)
}
checksum := cfFile.WriteDigest.Sum32()
if cfFile.Checksum != checksum {
return errors.Errorf("snapshot file %s for CF %s checksum mismatch, real checksum %d, expected %d",
cfFile.Path, cfFile.CF, checksum, cfFile.Checksum)
}
err := os.Rename(cfFile.TmpPath, cfFile.Path)
if err != nil {
return errors.WithStack(err)
}
atomic.AddInt64(s.SizeTrack, int64(cfFile.Size))
}
// write meta file
bin, err := s.MetaFile.Meta.Marshal()
if err != nil {
return errors.WithStack(err)
}
_, err = s.MetaFile.File.Write(bin)
if err != nil {
return errors.WithStack(err)
}
err = s.MetaFile.File.Sync()
if err != nil {
return errors.WithStack(err)
}
err = os.Rename(s.MetaFile.TmpPath, s.MetaFile.Path)
if err != nil {
return errors.WithStack(err)
}
s.holdTmpFiles = false
return nil
}
func (s *Snap) Apply(opts ApplyOptions) error {
err := s.validate()
if err != nil {
return err
}
externalFiles := make([]*os.File, 0, len(s.CFFiles))
for _, cfFile := range s.CFFiles {
if cfFile.Size == 0 {
// Skip empty cf file
continue
}
file, err := os.Open(cfFile.Path)
if err != nil {
log.Errorf("open ingest file %s failed: %s", cfFile.Path, err)
return err
}
externalFiles = append(externalFiles, file)
}
n, err := opts.DB.IngestExternalFiles(externalFiles)
for _, file := range externalFiles {
file.Close()
}
if err != nil {
log.Errorf("ingest sst failed (first %d files succeeded): %s", n, err)
return err
}
log.Infof("apply snapshot ingested %d tables", n)
return nil
}
func (s *Snap) Read(b []byte) (int, error) {
if len(b) == 0 {
return 0, nil
}
for s.cfIndex < len(s.CFFiles) {
cfFile := s.CFFiles[s.cfIndex]
if cfFile.Size == 0 {
s.cfIndex++
continue
}
n, err := cfFile.File.Read(b)
if n > 0 {
return n, nil
}
if err != nil {
if err == io.EOF {
s.cfIndex++
continue
}
return 0, errors.WithStack(err)
}
}
return 0, io.EOF
}
func (s *Snap) Write(b []byte) (int, error) {
if len(b) == 0 {
return 0, nil
}
nextBuf := b
for s.cfIndex < len(s.CFFiles) {
cfFile := s.CFFiles[s.cfIndex]
if cfFile.Size == 0 {
s.cfIndex++
continue
}
left := cfFile.Size - cfFile.WrittenSize
if left == 0 {
s.cfIndex++
continue
}
file := cfFile.File
digest := cfFile.WriteDigest
if len(nextBuf) > int(left) {
_, err := file.Write(nextBuf[:left])
if err != nil {
return 0, errors.WithStack(err)
}
digest.Write(nextBuf[:left])
cfFile.WrittenSize += left
s.cfIndex++
nextBuf = nextBuf[left:]
} else {
_, err := file.Write(nextBuf)
if err != nil {
return 0, errors.WithStack(err)
}
digest.Write(nextBuf)
cfFile.WrittenSize += uint64(len(nextBuf))
return len(b), nil
}
}
return len(b) - len(nextBuf), nil
}
func (s *Snap) Drop() {
var cfTmpFileExists bool
for _, cfFile := range s.CFFiles {
// cleanup if some of the cf files and meta file is partly written
if util.FileExists(cfFile.TmpPath) {
cfTmpFileExists = true
break
}
}
if cfTmpFileExists || util.FileExists(s.MetaFile.TmpPath) {
s.Delete()
return
}
// cleanup if data corruption happens and any file goes missing
if !s.Exists() {
s.Delete()
}
}