Compare commits
8 Commits
master
...
disk-rollb
Author | SHA1 | Date | |
---|---|---|---|
|
3b9bc5c1ab | ||
|
4dfcd0a7e7 | ||
|
d083226487 | ||
|
6c555ce4fd | ||
|
2c615b63b2 | ||
|
7beeab6964 | ||
|
ec3a772258 | ||
|
7ceeb3bc6d |
@ -1776,12 +1776,15 @@ utils::BasicResult<StorageManipulationError, void> DiskStorage::DiskAccessor::Co
|
||||
logging::AssertRocksDBStatus(transaction_.disk_transaction_->SetCommitTimestamp(*commit_timestamp_));
|
||||
}
|
||||
auto commitStatus = transaction_.disk_transaction_->Commit();
|
||||
delete transaction_.disk_transaction_;
|
||||
transaction_.disk_transaction_ = nullptr;
|
||||
if (!commitStatus.ok()) {
|
||||
Abort();
|
||||
spdlog::error("rocksdb: Commit failed with status {}", commitStatus.ToString());
|
||||
return StorageManipulationError{SerializationError{}};
|
||||
}
|
||||
|
||||
delete transaction_.disk_transaction_;
|
||||
transaction_.disk_transaction_ = nullptr;
|
||||
|
||||
spdlog::trace("rocksdb: Commit successful");
|
||||
if (flags::AreExperimentsEnabled(flags::Experiments::TEXT_SEARCH)) {
|
||||
disk_storage->indices_.text_index_.Commit();
|
||||
|
@ -77,6 +77,7 @@ add_subdirectory(garbage_collection)
|
||||
add_subdirectory(query_planning)
|
||||
add_subdirectory(awesome_functions)
|
||||
add_subdirectory(high_availability)
|
||||
add_subdirectory(concurrent_write)
|
||||
add_subdirectory(concurrency)
|
||||
|
||||
add_subdirectory(replication_experimental)
|
||||
|
7
tests/e2e/concurrent_write/CMakeLists.txt
Normal file
7
tests/e2e/concurrent_write/CMakeLists.txt
Normal file
@ -0,0 +1,7 @@
|
||||
function(copy_concurrent_write_e2e_python_files FILE_NAME)
|
||||
copy_e2e_python_files(concurrent_write ${FILE_NAME})
|
||||
endfunction()
|
||||
|
||||
copy_concurrent_write_e2e_python_files(write.py)
|
||||
|
||||
copy_e2e_files(concurrent_write workloads.yaml)
|
27
tests/e2e/concurrent_write/workloads.yaml
Normal file
27
tests/e2e/concurrent_write/workloads.yaml
Normal file
@ -0,0 +1,27 @@
|
||||
args: &args
|
||||
- "--bolt-port"
|
||||
- "7687"
|
||||
- "--log-level"
|
||||
- "TRACE"
|
||||
|
||||
in_memory_cluster: &in_memory_cluster
|
||||
cluster:
|
||||
main:
|
||||
args: *args
|
||||
log_file: "concurrent-write-e2e.log"
|
||||
setup_queries: []
|
||||
validation_queries: []
|
||||
|
||||
disk_cluster: &disk_cluster
|
||||
cluster:
|
||||
main:
|
||||
args: *args
|
||||
log_file: "concurrent-write-e2e.log"
|
||||
setup_queries: ["STORAGE MODE ON_DISK_TRANSACTIONAL"]
|
||||
validation_queries: []
|
||||
|
||||
workloads:
|
||||
- name: "Disk concurrent write"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["concurrent_write/write.py"]
|
||||
<<: *disk_cluster
|
103
tests/e2e/concurrent_write/write.py
Normal file
103
tests/e2e/concurrent_write/write.py
Normal file
@ -0,0 +1,103 @@
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import typing
|
||||
|
||||
import mgclient
|
||||
import pytest
|
||||
|
||||
|
||||
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
|
||||
cursor.execute(query, params)
|
||||
return cursor.fetchall()
|
||||
|
||||
|
||||
class AtomicInteger:
|
||||
def __init__(self, value=0):
|
||||
self._value = int(value)
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def inc(self, d=1):
|
||||
with self._lock:
|
||||
self._value += int(d)
|
||||
return self._value
|
||||
|
||||
def dec(self, d=1):
|
||||
return self.inc(-d)
|
||||
|
||||
@property
|
||||
def value(self):
|
||||
with self._lock:
|
||||
return self._value
|
||||
|
||||
@value.setter
|
||||
def value(self, v):
|
||||
with self._lock:
|
||||
self._value = int(v)
|
||||
return self._value
|
||||
|
||||
|
||||
def sleep_until(wanted_cnt):
|
||||
while cnt.value != wanted_cnt:
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
def client_success():
|
||||
connection = mgclient.connect(host="localhost", port=7687)
|
||||
connection.autocommit = False
|
||||
cursor = connection.cursor()
|
||||
|
||||
execute_and_fetch_all(cursor, "MATCH (n1:N1) DELETE n1;")
|
||||
cnt.inc() # 1
|
||||
sleep_until(2)
|
||||
|
||||
connection.commit()
|
||||
cnt.inc() # 3
|
||||
|
||||
|
||||
def client_fail():
|
||||
connection = mgclient.connect(host="localhost", port=7687)
|
||||
connection.autocommit = False
|
||||
cursor = connection.cursor()
|
||||
|
||||
try:
|
||||
sleep_until(1)
|
||||
execute_and_fetch_all(cursor, "MATCH (n1:N1), (n2:N2) CREATE (n1)-[:R]->(n2);")
|
||||
cnt.inc() # 2
|
||||
|
||||
sleep_until(3)
|
||||
connection.commit() # this should fail
|
||||
except mgclient.DatabaseError:
|
||||
return
|
||||
except Exception:
|
||||
assert False
|
||||
|
||||
|
||||
def test_concurrent_write():
|
||||
connection = mgclient.connect(host="localhost", port=7687)
|
||||
connection.autocommit = True
|
||||
cursor = connection.cursor()
|
||||
|
||||
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n;")
|
||||
execute_and_fetch_all(cursor, "CREATE (:N1), (:N2);")
|
||||
|
||||
t1 = threading.Thread(target=client_success)
|
||||
t2 = threading.Thread(target=client_fail)
|
||||
|
||||
global cnt
|
||||
cnt = AtomicInteger(0)
|
||||
|
||||
t1.start()
|
||||
t2.start()
|
||||
|
||||
t1.join()
|
||||
t2.join()
|
||||
|
||||
assert execute_and_fetch_all(cursor, "MATCH (n:N1) RETURN inDegree(n);") == []
|
||||
assert execute_and_fetch_all(cursor, "MATCH (n:N1) RETURN outDegree(n);") == []
|
||||
assert execute_and_fetch_all(cursor, "MATCH (n:N2) RETURN inDegree(n);")[0][0] == 0
|
||||
assert execute_and_fetch_all(cursor, "MATCH (n:N2) RETURN outDegree(n);")[0][0] == 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main([__file__, "-rA"]))
|
Loading…
Reference in New Issue
Block a user