Compare commits

..

No commits in common. "disk-rollback-on-fail" and "master" have entirely different histories.

5 changed files with 2 additions and 143 deletions
src/storage/v2/disk
tests/e2e

View File

@ -1776,15 +1776,12 @@ utils::BasicResult<StorageManipulationError, void> DiskStorage::DiskAccessor::Co
logging::AssertRocksDBStatus(transaction_.disk_transaction_->SetCommitTimestamp(*commit_timestamp_));
}
auto commitStatus = transaction_.disk_transaction_->Commit();
delete transaction_.disk_transaction_;
transaction_.disk_transaction_ = nullptr;
if (!commitStatus.ok()) {
Abort();
spdlog::error("rocksdb: Commit failed with status {}", commitStatus.ToString());
return StorageManipulationError{SerializationError{}};
}
delete transaction_.disk_transaction_;
transaction_.disk_transaction_ = nullptr;
spdlog::trace("rocksdb: Commit successful");
if (flags::AreExperimentsEnabled(flags::Experiments::TEXT_SEARCH)) {
disk_storage->indices_.text_index_.Commit();

View File

@ -77,7 +77,6 @@ add_subdirectory(garbage_collection)
add_subdirectory(query_planning)
add_subdirectory(awesome_functions)
add_subdirectory(high_availability)
add_subdirectory(concurrent_write)
add_subdirectory(concurrency)
add_subdirectory(replication_experimental)

View File

@ -1,7 +0,0 @@
function(copy_concurrent_write_e2e_python_files FILE_NAME)
copy_e2e_python_files(concurrent_write ${FILE_NAME})
endfunction()
copy_concurrent_write_e2e_python_files(write.py)
copy_e2e_files(concurrent_write workloads.yaml)

View File

@ -1,27 +0,0 @@
args: &args
- "--bolt-port"
- "7687"
- "--log-level"
- "TRACE"
in_memory_cluster: &in_memory_cluster
cluster:
main:
args: *args
log_file: "concurrent-write-e2e.log"
setup_queries: []
validation_queries: []
disk_cluster: &disk_cluster
cluster:
main:
args: *args
log_file: "concurrent-write-e2e.log"
setup_queries: ["STORAGE MODE ON_DISK_TRANSACTIONAL"]
validation_queries: []
workloads:
- name: "Disk concurrent write"
binary: "tests/e2e/pytest_runner.sh"
args: ["concurrent_write/write.py"]
<<: *disk_cluster

View File

@ -1,103 +0,0 @@
import sys
import threading
import time
import typing
import mgclient
import pytest
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
cursor.execute(query, params)
return cursor.fetchall()
class AtomicInteger:
def __init__(self, value=0):
self._value = int(value)
self._lock = threading.Lock()
def inc(self, d=1):
with self._lock:
self._value += int(d)
return self._value
def dec(self, d=1):
return self.inc(-d)
@property
def value(self):
with self._lock:
return self._value
@value.setter
def value(self, v):
with self._lock:
self._value = int(v)
return self._value
def sleep_until(wanted_cnt):
while cnt.value != wanted_cnt:
time.sleep(0.1)
def client_success():
connection = mgclient.connect(host="localhost", port=7687)
connection.autocommit = False
cursor = connection.cursor()
execute_and_fetch_all(cursor, "MATCH (n1:N1) DELETE n1;")
cnt.inc() # 1
sleep_until(2)
connection.commit()
cnt.inc() # 3
def client_fail():
connection = mgclient.connect(host="localhost", port=7687)
connection.autocommit = False
cursor = connection.cursor()
try:
sleep_until(1)
execute_and_fetch_all(cursor, "MATCH (n1:N1), (n2:N2) CREATE (n1)-[:R]->(n2);")
cnt.inc() # 2
sleep_until(3)
connection.commit() # this should fail
except mgclient.DatabaseError:
return
except Exception:
assert False
def test_concurrent_write():
connection = mgclient.connect(host="localhost", port=7687)
connection.autocommit = True
cursor = connection.cursor()
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n;")
execute_and_fetch_all(cursor, "CREATE (:N1), (:N2);")
t1 = threading.Thread(target=client_success)
t2 = threading.Thread(target=client_fail)
global cnt
cnt = AtomicInteger(0)
t1.start()
t2.start()
t1.join()
t2.join()
assert execute_and_fetch_all(cursor, "MATCH (n:N1) RETURN inDegree(n);") == []
assert execute_and_fetch_all(cursor, "MATCH (n:N1) RETURN outDegree(n);") == []
assert execute_and_fetch_all(cursor, "MATCH (n:N2) RETURN inDegree(n);")[0][0] == 0
assert execute_and_fetch_all(cursor, "MATCH (n:N2) RETURN outDegree(n);")[0][0] == 0
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))