Compare commits

...

8 Commits

Author SHA1 Message Date
Ivan Milinović
3b9bc5c1ab
Merge branch 'master' into disk-rollback-on-fail 2024-03-25 09:16:30 +01:00
imilinovic
4dfcd0a7e7 remove in memory test 2024-03-18 13:01:10 +01:00
imilinovic
d083226487 Merge branch 'master' of github.com:memgraph/memgraph into disk-rollback-on-fail 2024-03-18 01:00:17 +01:00
imilinovic
6c555ce4fd fix failing tests 2024-03-14 13:58:56 +01:00
imilinovic
2c615b63b2 remove file 2024-03-14 01:41:17 +01:00
imilinovic
7beeab6964 add tests and fix wrong logic 2024-03-14 01:38:52 +01:00
imilinovic
ec3a772258 newline 2024-03-13 13:03:15 +01:00
imilinovic
7ceeb3bc6d rollback transaction on failed commit 2024-03-13 13:01:07 +01:00
5 changed files with 143 additions and 2 deletions

View File

@ -1776,12 +1776,15 @@ utils::BasicResult<StorageManipulationError, void> DiskStorage::DiskAccessor::Co
logging::AssertRocksDBStatus(transaction_.disk_transaction_->SetCommitTimestamp(*commit_timestamp_));
}
auto commitStatus = transaction_.disk_transaction_->Commit();
delete transaction_.disk_transaction_;
transaction_.disk_transaction_ = nullptr;
if (!commitStatus.ok()) {
Abort();
spdlog::error("rocksdb: Commit failed with status {}", commitStatus.ToString());
return StorageManipulationError{SerializationError{}};
}
delete transaction_.disk_transaction_;
transaction_.disk_transaction_ = nullptr;
spdlog::trace("rocksdb: Commit successful");
if (flags::AreExperimentsEnabled(flags::Experiments::TEXT_SEARCH)) {
disk_storage->indices_.text_index_.Commit();

View File

@ -77,6 +77,7 @@ add_subdirectory(garbage_collection)
add_subdirectory(query_planning)
add_subdirectory(awesome_functions)
add_subdirectory(high_availability)
add_subdirectory(concurrent_write)
add_subdirectory(concurrency)
add_subdirectory(replication_experimental)

View File

@ -0,0 +1,7 @@
function(copy_concurrent_write_e2e_python_files FILE_NAME)
copy_e2e_python_files(concurrent_write ${FILE_NAME})
endfunction()
copy_concurrent_write_e2e_python_files(write.py)
copy_e2e_files(concurrent_write workloads.yaml)

View File

@ -0,0 +1,27 @@
args: &args
- "--bolt-port"
- "7687"
- "--log-level"
- "TRACE"
in_memory_cluster: &in_memory_cluster
cluster:
main:
args: *args
log_file: "concurrent-write-e2e.log"
setup_queries: []
validation_queries: []
disk_cluster: &disk_cluster
cluster:
main:
args: *args
log_file: "concurrent-write-e2e.log"
setup_queries: ["STORAGE MODE ON_DISK_TRANSACTIONAL"]
validation_queries: []
workloads:
- name: "Disk concurrent write"
binary: "tests/e2e/pytest_runner.sh"
args: ["concurrent_write/write.py"]
<<: *disk_cluster

View File

@ -0,0 +1,103 @@
import sys
import threading
import time
import typing
import mgclient
import pytest
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
cursor.execute(query, params)
return cursor.fetchall()
class AtomicInteger:
def __init__(self, value=0):
self._value = int(value)
self._lock = threading.Lock()
def inc(self, d=1):
with self._lock:
self._value += int(d)
return self._value
def dec(self, d=1):
return self.inc(-d)
@property
def value(self):
with self._lock:
return self._value
@value.setter
def value(self, v):
with self._lock:
self._value = int(v)
return self._value
def sleep_until(wanted_cnt):
while cnt.value != wanted_cnt:
time.sleep(0.1)
def client_success():
connection = mgclient.connect(host="localhost", port=7687)
connection.autocommit = False
cursor = connection.cursor()
execute_and_fetch_all(cursor, "MATCH (n1:N1) DELETE n1;")
cnt.inc() # 1
sleep_until(2)
connection.commit()
cnt.inc() # 3
def client_fail():
connection = mgclient.connect(host="localhost", port=7687)
connection.autocommit = False
cursor = connection.cursor()
try:
sleep_until(1)
execute_and_fetch_all(cursor, "MATCH (n1:N1), (n2:N2) CREATE (n1)-[:R]->(n2);")
cnt.inc() # 2
sleep_until(3)
connection.commit() # this should fail
except mgclient.DatabaseError:
return
except Exception:
assert False
def test_concurrent_write():
connection = mgclient.connect(host="localhost", port=7687)
connection.autocommit = True
cursor = connection.cursor()
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n;")
execute_and_fetch_all(cursor, "CREATE (:N1), (:N2);")
t1 = threading.Thread(target=client_success)
t2 = threading.Thread(target=client_fail)
global cnt
cnt = AtomicInteger(0)
t1.start()
t2.start()
t1.join()
t2.join()
assert execute_and_fetch_all(cursor, "MATCH (n:N1) RETURN inDegree(n);") == []
assert execute_and_fetch_all(cursor, "MATCH (n:N1) RETURN outDegree(n);") == []
assert execute_and_fetch_all(cursor, "MATCH (n:N2) RETURN inDegree(n);")[0][0] == 0
assert execute_and_fetch_all(cursor, "MATCH (n:N2) RETURN outDegree(n);")[0][0] == 0
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))