mirror of
https://github.com/talent-plan/tinykv.git
synced 2024-12-26 12:50:11 +08:00
fix transaction test build
Signed-off-by: Connor1996 <zbk602423539@gmail.com>
This commit is contained in:
parent
ca5a7773ac
commit
73f60164cb
@ -301,13 +301,26 @@ func ClearMeta(engines *engine_util.Engines, kvWB, raftWB *engine_util.WriteBatc
|
||||
return nil
|
||||
}
|
||||
|
||||
// Append the given entries to the raft log and update ps.raftState also delete log entries that will
|
||||
// never be committed
|
||||
func (ps *PeerStorage) Append(entries []eraftpb.Entry, raftWB *engine_util.WriteBatch) error {
|
||||
// Your Code Here (2B).
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply the peer with given snapshot
|
||||
func (ps *PeerStorage) ApplySnapshot(snapshot *eraftpb.Snapshot, kvWB *engine_util.WriteBatch, raftWB *engine_util.WriteBatch) (*ApplySnapResult, error) {
|
||||
// Hint: things need to do here including: update peer storage state like raftState and applyState, etc,
|
||||
// and send RegionTaskApply task to region worker through ps.regionSched, also remember call ps.clearMeta
|
||||
// and ps.clearExtraData to delete stale data
|
||||
// Your Code Here (2B).
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Save memory states to disk.
|
||||
// Do not modify ready in this function, this is a requirement to advance the ready object properly later.
|
||||
func (ps *PeerStorage) SaveReadyState(ready *raft.Ready) (*ApplySnapResult, error) {
|
||||
// Hint: things need to do here including: update peer storage state like raftState and applyState, etc,
|
||||
// When applying snapshot, send RegionTaskApply task to region worker through ps.regionSched, also remember call ps.clearMeta
|
||||
// and ps.clearExtraData to delete stale data
|
||||
|
||||
// Hint: you may call `Append()` and `ApplySnapshot()` in this function
|
||||
// Your Code Here (2B).
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pingcap-incubator/tinykv/scheduler/pkg/tsoutil"
|
||||
|
||||
"github.com/pingcap-incubator/tinykv/kv/storage"
|
||||
@ -17,7 +18,8 @@ var _ tinykvpb.TinyKvServer = new(Server)
|
||||
// Server is a TinyKV server, it 'faces outwards', sending and receiving messages from clients such as TinySQL.
|
||||
type Server struct {
|
||||
storage storage.Storage
|
||||
// used in 4A/4B
|
||||
|
||||
// (Used in 4A/4B)
|
||||
Latches *latches.Latches
|
||||
}
|
||||
|
||||
@ -32,22 +34,22 @@ func NewServer(storage storage.Storage) *Server {
|
||||
|
||||
// Raw API.
|
||||
func (server *Server) RawGet(_ context.Context, req *kvrpcpb.RawGetRequest) (*kvrpcpb.RawGetResponse, error) {
|
||||
// Your code here (1).
|
||||
// Your Code Here (1).
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (server *Server) RawPut(_ context.Context, req *kvrpcpb.RawPutRequest) (*kvrpcpb.RawPutResponse, error) {
|
||||
// Your code here (1).
|
||||
// Your Code Here (1).
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (server *Server) RawDelete(_ context.Context, req *kvrpcpb.RawDeleteRequest) (*kvrpcpb.RawDeleteResponse, error) {
|
||||
// Your code here (1).
|
||||
// Your Code Here (1).
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (server *Server) RawScan(_ context.Context, req *kvrpcpb.RawScanRequest) (*kvrpcpb.RawScanResponse, error) {
|
||||
// Your code here (1).
|
||||
// Your Code Here (1).
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@ -65,37 +67,37 @@ func (server *Server) Snapshot(stream tinykvpb.TinyKv_SnapshotServer) error {
|
||||
|
||||
// Transactional API.
|
||||
func (server *Server) KvGet(_ context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) {
|
||||
// Your code here (4B).
|
||||
// Your Code Here (4B).
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (server *Server) KvPrewrite(_ context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) {
|
||||
// Your code here (4B).
|
||||
// Your Code Here (4B).
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (server *Server) KvCommit(_ context.Context, req *kvrpcpb.CommitRequest) (*kvrpcpb.CommitResponse, error) {
|
||||
// Your code here (4B).
|
||||
// Your Code Here (4B).
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (server *Server) KvScan(_ context.Context, req *kvrpcpb.ScanRequest) (*kvrpcpb.ScanResponse, error) {
|
||||
// Your code here (4C).
|
||||
// Your Code Here (4C).
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (server *Server) KvCheckTxnStatus(_ context.Context, req *kvrpcpb.CheckTxnStatusRequest) (*kvrpcpb.CheckTxnStatusResponse, error) {
|
||||
// Your code here (4C).
|
||||
// Your Code Here (4C).
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (server *Server) KvBatchRollback(_ context.Context, req *kvrpcpb.BatchRollbackRequest) (*kvrpcpb.BatchRollbackResponse, error) {
|
||||
// Your code here (4C).
|
||||
// Your Code Here (4C).
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (server *Server) KvResolveLock(_ context.Context, req *kvrpcpb.ResolveLockRequest) (*kvrpcpb.ResolveLockResponse, error) {
|
||||
// Your code here (4C).
|
||||
// Your Code Here (4C).
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
@ -77,7 +77,7 @@ func (lock *Lock) IsLockedFor(key []byte, txnStartTs uint64, resp interface{}) b
|
||||
// AllLocksForTxn returns all locks for the current transaction.
|
||||
func AllLocksForTxn(txn *MvccTxn) ([]KlPair, error) {
|
||||
var result []KlPair
|
||||
iter := txn.Reader().IterCF(engine_util.CfLock)
|
||||
iter := txn.Reader.IterCF(engine_util.CfLock)
|
||||
defer iter.Close()
|
||||
|
||||
for ; iter.Valid(); iter.Next() {
|
||||
@ -90,7 +90,7 @@ func AllLocksForTxn(txn *MvccTxn) ([]KlPair, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if lock.Ts == txn.StartTS() {
|
||||
if lock.Ts == txn.StartTS {
|
||||
result = append(result, KlPair{item.Key(), lock})
|
||||
}
|
||||
}
|
||||
|
@ -10,75 +10,73 @@ import (
|
||||
// MvccTxn groups together writes as part of a single transaction. It also provides an abstraction over low-level
|
||||
// storage, lowering the concepts of timestamps, writes, and locks into plain keys and values.
|
||||
type MvccTxn struct {
|
||||
// Your code here (4a).
|
||||
StartTS uint64
|
||||
Reader storage.StorageReader
|
||||
writes []storage.Modify
|
||||
}
|
||||
|
||||
func (txn *MvccTxn) Reader() storage.StorageReader {
|
||||
// Your code here (4a).
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *MvccTxn) StartTS() uint64 {
|
||||
// Your code here (4a).
|
||||
return 0
|
||||
func NewMvccTxn(reader storage.StorageReader, startTs uint64) *MvccTxn {
|
||||
return &MvccTxn{
|
||||
Reader: reader,
|
||||
StartTS: startTs,
|
||||
}
|
||||
}
|
||||
|
||||
// Writes returns all changes added to this transaction.
|
||||
func (txn *MvccTxn) Writes() []storage.Modify {
|
||||
// Your code here (4a).
|
||||
return nil
|
||||
return txn.writes
|
||||
}
|
||||
|
||||
// PutWrite records a write at key and ts.
|
||||
func (txn *MvccTxn) PutWrite(key []byte, ts uint64, write *Write) {
|
||||
// Your code here (4a).
|
||||
// Your Code Here (4A).
|
||||
}
|
||||
|
||||
// GetLock returns a lock if key is locked. It will return (nil, nil) if there is no lock on key, and (nil, err)
|
||||
// if an error occurs during lookup.
|
||||
func (txn *MvccTxn) GetLock(key []byte) (*Lock, error) {
|
||||
// Your code here (4a).
|
||||
// Your Code Here (4A).
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// PutLock adds a key/lock to this transaction.
|
||||
func (txn *MvccTxn) PutLock(key []byte, lock *Lock) {
|
||||
// Your code here (4a).
|
||||
// Your Code Here (4A).
|
||||
}
|
||||
|
||||
// DeleteLock adds a delete lock to this transaction.
|
||||
func (txn *MvccTxn) DeleteLock(key []byte) {
|
||||
// Your code here (4a).
|
||||
// Your Code Here (4A).
|
||||
}
|
||||
|
||||
// GetValue finds the value for key, valid at the start timestamp of this transaction.
|
||||
// I.e., the most recent value committed before the start of this transaction.
|
||||
func (txn *MvccTxn) GetValue(key []byte) ([]byte, error) {
|
||||
// Your code here (4a).
|
||||
// Your Code Here (4A).
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// PutValue adds a key/value write to this transaction.
|
||||
func (txn *MvccTxn) PutValue(key []byte, value []byte) {
|
||||
// Your code here (4a).
|
||||
// Your Code Here (4A).
|
||||
}
|
||||
|
||||
// DeleteValue removes a key/value pair in this transaction.
|
||||
func (txn *MvccTxn) DeleteValue(key []byte) {
|
||||
// Your code here (4a).
|
||||
// Your Code Here (4A).
|
||||
}
|
||||
|
||||
// CurrentWrite searches for a write with this transaction's start timestamp. It returns a Write from the DB and that
|
||||
// write's commit timestamp, or an error.
|
||||
func (txn *MvccTxn) CurrentWrite(key []byte) (*Write, uint64, error) {
|
||||
// Your code here (4a).
|
||||
// Your Code Here (4A).
|
||||
return nil, 0, nil
|
||||
}
|
||||
|
||||
// MostRecentWrite finds the most recent write with the given key. It returns a Write from the DB and that
|
||||
// write's commit timestamp, or an error.
|
||||
func (txn *MvccTxn) MostRecentWrite(key []byte) (*Write, uint64, error) {
|
||||
// Your code here (4a).
|
||||
// Your Code Here (4A).
|
||||
return nil, 0, nil
|
||||
}
|
||||
|
||||
|
@ -2,9 +2,10 @@ package mvcc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/pingcap-incubator/tinykv/kv/util/engine_util"
|
||||
"github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"
|
||||
"testing"
|
||||
|
||||
"github.com/pingcap-incubator/tinykv/kv/storage"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@ -30,13 +31,13 @@ func TestDecodeKey(t *testing.T) {
|
||||
assert.Equal(t, []byte{42, 0, 5}, DecodeUserKey(EncodeKey([]byte{42, 0, 5}, 234234)))
|
||||
}
|
||||
|
||||
func testTxn(startTs uint64, f func(m *storage.MemStorage)) MvccTxn {
|
||||
func testTxn(startTs uint64, f func(m *storage.MemStorage)) *MvccTxn {
|
||||
mem := storage.NewMemStorage()
|
||||
if f != nil {
|
||||
f(mem)
|
||||
}
|
||||
reader, _ := mem.Reader(&kvrpcpb.Context{})
|
||||
return NewTxn(reader, startTs)
|
||||
return NewMvccTxn(reader, startTs)
|
||||
}
|
||||
|
||||
func assertPutInTxn(t *testing.T, txn *MvccTxn, key []byte, value []byte, cf string) {
|
||||
@ -67,7 +68,7 @@ func TestPutLock4A(t *testing.T) {
|
||||
}
|
||||
|
||||
txn.PutLock([]byte{1}, &lock)
|
||||
assertPutInTxn(t, &txn, []byte{1}, lock.ToBytes(), engine_util.CfLock)
|
||||
assertPutInTxn(t, txn, []byte{1}, lock.ToBytes(), engine_util.CfLock)
|
||||
}
|
||||
|
||||
func TestPutWrite4A(t *testing.T) {
|
||||
@ -78,7 +79,7 @@ func TestPutWrite4A(t *testing.T) {
|
||||
}
|
||||
|
||||
txn.PutWrite([]byte{16, 240}, 0, &write)
|
||||
assertPutInTxn(t, &txn, EncodeKey([]byte{16, 240}, 0), write.ToBytes(), engine_util.CfWrite)
|
||||
assertPutInTxn(t, txn, EncodeKey([]byte{16, 240}, 0), write.ToBytes(), engine_util.CfWrite)
|
||||
}
|
||||
|
||||
func TestPutValue4A(t *testing.T) {
|
||||
@ -86,7 +87,7 @@ func TestPutValue4A(t *testing.T) {
|
||||
value := []byte{1, 1, 2, 3, 5, 8, 13}
|
||||
|
||||
txn.PutValue([]byte{32}, value)
|
||||
assertPutInTxn(t, &txn, EncodeKey([]byte{32}, 453325345), value, engine_util.CfDefault)
|
||||
assertPutInTxn(t, txn, EncodeKey([]byte{32}, 453325345), value, engine_util.CfDefault)
|
||||
}
|
||||
|
||||
func TestGetLock4A(t *testing.T) {
|
||||
@ -108,13 +109,13 @@ func TestGetLock4A(t *testing.T) {
|
||||
func TestDeleteLock4A(t *testing.T) {
|
||||
txn := testTxn(42, nil)
|
||||
txn.DeleteLock([]byte{1})
|
||||
assertDeleteInTxn(t, &txn, []byte{1}, engine_util.CfLock)
|
||||
assertDeleteInTxn(t, txn, []byte{1}, engine_util.CfLock)
|
||||
}
|
||||
|
||||
func TestDeleteValue4A(t *testing.T) {
|
||||
txn := testTxn(63454245, nil)
|
||||
txn.DeleteValue([]byte{17, 255, 0})
|
||||
assertDeleteInTxn(t, &txn, EncodeKey([]byte{17, 255, 0}, 63454245), engine_util.CfDefault)
|
||||
assertDeleteInTxn(t, txn, EncodeKey([]byte{17, 255, 0}, 63454245), engine_util.CfDefault)
|
||||
}
|
||||
|
||||
func singleEntry(m *storage.MemStorage) {
|
||||
|
Loading…
Reference in New Issue
Block a user