Handle user-defined metadata and expose it with SHOW TRANSACTIONS(#945)

This commit is contained in:
andrejtonev 2023-05-29 11:40:14 +02:00 committed by GitHub
parent cdfcbc106c
commit d842adbed3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 560 additions and 57 deletions

View File

@ -1,6 +1,7 @@
---
BasedOnStyle: Google
---
Language: Cpp
BasedOnStyle: Google
Standard: "c++20"
UseTab: Never
DerivePointerAlignment: false

View File

@ -3,6 +3,7 @@ repos:
rev: v4.4.0
hooks:
- id: check-yaml
args: [--allow-multiple-documents]
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/psf/black

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -63,7 +63,8 @@ class Session {
* if an explicit transaction was started.
*/
virtual std::pair<std::vector<std::string>, std::optional<int>> Interpret(
const std::string &query, const std::map<std::string, Value> &params) = 0;
const std::string &query, const std::map<std::string, Value> &params,
const std::map<std::string, memgraph::communication::bolt::Value> &metadata) = 0;
/**
* Put results of the processed query in the `encoder`.
@ -85,7 +86,7 @@ class Session {
*/
virtual std::map<std::string, Value> Discard(std::optional<int> n, std::optional<int> qid) = 0;
virtual void BeginTransaction() = 0;
virtual void BeginTransaction(const std::map<std::string, memgraph::communication::bolt::Value> &) = 0;
virtual void CommitTransaction() = 0;
virtual void RollbackTransaction() = 0;

View File

@ -12,6 +12,7 @@
#pragma once
#include <map>
#include <optional>
#include <string>
#include <string_view>
#include <vector>
@ -22,6 +23,7 @@
#include "communication/bolt/v1/state.hpp"
#include "communication/bolt/v1/value.hpp"
#include "communication/exceptions.hpp"
#include "storage/v2/property_value.hpp"
#include "utils/logging.hpp"
#include "utils/message.hpp"
@ -71,6 +73,23 @@ inline std::pair<std::string, std::string> ExceptionToErrorMessage(const std::ex
"should be in database logs."};
}
namespace helpers {
/** Extracts metadata from the extras field.
* NOTE: In order to avoid a copy, the metadata in moved.
* TODO: Update if extra field is used for anything else.
*/
inline std::map<std::string, Value> ConsumeMetadata(Value &extra) {
std::map<std::string, Value> md;
auto &md_tv = extra.ValueMap()["tx_metadata"];
if (md_tv.IsMap()) {
md = std::move(md_tv.ValueMap());
}
return md;
}
} // namespace helpers
namespace details {
template <bool is_pull, typename TSession>
@ -209,7 +228,7 @@ State HandleRunV1(TSession &session, const State state, const Marker marker) {
try {
// Interpret can throw.
const auto [header, qid] = session.Interpret(query.ValueString(), params.ValueMap());
const auto [header, qid] = session.Interpret(query.ValueString(), params.ValueMap(), {});
// Convert std::string to Value
std::vector<Value> vec;
std::map<std::string, Value> data;
@ -250,6 +269,7 @@ State HandleRunV4(TSession &session, const State state, const Marker marker) {
// Even though this part seems unnecessary it is needed to move the buffer
if (!session.decoder_.ReadValue(&extra, Value::Type::Map)) {
spdlog::trace("Couldn't read extra field!");
return State::Close;
}
if (state != State::Idle) {
@ -266,7 +286,8 @@ State HandleRunV4(TSession &session, const State state, const Marker marker) {
try {
// Interpret can throw.
const auto [header, qid] = session.Interpret(query.ValueString(), params.ValueMap());
const auto [header, qid] =
session.Interpret(query.ValueString(), params.ValueMap(), helpers::ConsumeMetadata(extra));
// Convert std::string to Value
std::vector<Value> vec;
std::map<std::string, Value> data;
@ -360,7 +381,7 @@ State HandleBegin(TSession &session, const State state, const Marker marker) {
}
try {
session.BeginTransaction();
session.BeginTransaction(helpers::ConsumeMetadata(extra));
} catch (const std::exception &e) {
return HandleFailure(session, e);
}

View File

@ -533,16 +533,29 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
using memgraph::communication::bolt::Session<memgraph::communication::v2::InputStream,
memgraph::communication::v2::OutputStream>::TEncoder;
void BeginTransaction() override { interpreter_.BeginTransaction(); }
void BeginTransaction(const std::map<std::string, memgraph::communication::bolt::Value> &metadata) override {
std::map<std::string, memgraph::storage::PropertyValue> metadata_pv;
for (const auto &[key, bolt_value] : metadata) {
metadata_pv.emplace(key, memgraph::glue::ToPropertyValue(bolt_value));
}
interpreter_.BeginTransaction(metadata_pv);
}
void CommitTransaction() override { interpreter_.CommitTransaction(); }
void RollbackTransaction() override { interpreter_.RollbackTransaction(); }
std::pair<std::vector<std::string>, std::optional<int>> Interpret(
const std::string &query, const std::map<std::string, memgraph::communication::bolt::Value> &params) override {
const std::string &query, const std::map<std::string, memgraph::communication::bolt::Value> &params,
const std::map<std::string, memgraph::communication::bolt::Value> &metadata) override {
std::map<std::string, memgraph::storage::PropertyValue> params_pv;
for (const auto &kv : params) params_pv.emplace(kv.first, memgraph::glue::ToPropertyValue(kv.second));
std::map<std::string, memgraph::storage::PropertyValue> metadata_pv;
for (const auto &[key, bolt_param] : params) {
params_pv.emplace(key, memgraph::glue::ToPropertyValue(bolt_param));
}
for (const auto &[key, bolt_md] : metadata) {
metadata_pv.emplace(key, memgraph::glue::ToPropertyValue(bolt_md));
}
const std::string *username{nullptr};
if (user_) {
username = &user_->username();
@ -554,7 +567,7 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
}
#endif
try {
auto result = interpreter_.Prepare(query, params_pv, username);
auto result = interpreter_.Prepare(query, params_pv, username, metadata_pv);
if (user_ && !memgraph::glue::AuthChecker::IsUserAuthorized(*user_, result.privileges)) {
interpreter_.Abort();
throw memgraph::communication::bolt::ClientError(

View File

@ -15,6 +15,7 @@
#include <algorithm>
#include <atomic>
#include <chrono>
#include <concepts>
#include <cstddef>
#include <cstdint>
#include <functional>
@ -112,6 +113,16 @@ void UpdateTypeCount(const plan::ReadWriteTypeChecker::RWType type) {
}
}
template <typename T>
concept HasEmpty = requires(T t) {
{ t.empty() } -> std::convertible_to<bool>;
};
template <typename T>
inline std::optional<T> GenOptional(const T &in) {
return in.empty() ? std::nullopt : std::make_optional<T>(in);
}
struct Callback {
std::vector<std::string> header;
using CallbackFunction = std::function<std::vector<std::vector<TypedValue>>()>;
@ -1177,11 +1188,14 @@ Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_
MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL");
}
PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper) {
PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper,
const std::map<std::string, storage::PropertyValue> &metadata) {
std::function<void()> handler;
if (query_upper == "BEGIN") {
handler = [this] {
// TODO: Evaluate doing move(metadata). Currently the metadata is very small, but this will be important if it ever
// becomes large.
handler = [this, metadata] {
if (in_explicit_transaction_) {
throw ExplicitTransactionUsageException("Nested transactions are not supported.");
}
@ -1190,6 +1204,7 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
in_explicit_transaction_ = true;
expect_rollback_ = false;
metadata_ = GenOptional(metadata);
db_accessor_ =
std::make_unique<storage::Storage::Accessor>(interpreter_context_->db->Access(GetIsolationLevelOverride()));
@ -1220,6 +1235,7 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
expect_rollback_ = false;
in_explicit_transaction_ = false;
metadata_ = std::nullopt;
};
} else if (query_upper == "ROLLBACK") {
handler = [this] {
@ -1232,6 +1248,7 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
Abort();
expect_rollback_ = false;
in_explicit_transaction_ = false;
metadata_ = std::nullopt;
};
} else {
LOG_FATAL("Should not get here -- unknown transaction query!");
@ -2212,6 +2229,14 @@ std::vector<std::vector<TypedValue>> TransactionQueueQueryHandler::ShowTransacti
const auto &typed_queries = interpreter->GetQueries();
results.push_back({TypedValue(interpreter->username_.value_or("")),
TypedValue(std::to_string(transaction_id.value())), TypedValue(typed_queries)});
// Handle user-defined metadata
std::map<std::string, TypedValue> metadata_tv;
if (interpreter->metadata_) {
for (const auto &md : *(interpreter->metadata_)) {
metadata_tv.emplace(md.first, TypedValue(md.second));
}
}
results.back().push_back(TypedValue(metadata_tv));
}
}
return results;
@ -2281,7 +2306,7 @@ Callback HandleTransactionQueueQuery(TransactionQueueQuery *transaction_query,
Callback callback;
switch (transaction_query->action_) {
case TransactionQueueQuery::Action::SHOW_TRANSACTIONS: {
callback.header = {"username", "transaction_id", "query"};
callback.header = {"username", "transaction_id", "query", "metadata"};
callback.fn = [handler = TransactionQueueQueryHandler(), interpreter_context, username,
hasTransactionManagementPrivilege]() mutable {
std::vector<std::vector<TypedValue>> results;
@ -2717,8 +2742,8 @@ std::optional<uint64_t> Interpreter::GetTransactionId() const {
return {};
}
void Interpreter::BeginTransaction() {
const auto prepared_query = PrepareTransactionQuery("BEGIN");
void Interpreter::BeginTransaction(const std::map<std::string, storage::PropertyValue> &metadata) {
const auto prepared_query = PrepareTransactionQuery("BEGIN", metadata);
prepared_query.query_handler(nullptr, {});
}
@ -2738,10 +2763,13 @@ void Interpreter::RollbackTransaction() {
Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
const std::map<std::string, storage::PropertyValue> &params,
const std::string *username) {
const std::string *username,
const std::map<std::string, storage::PropertyValue> &metadata) {
if (!in_explicit_transaction_) {
query_executions_.clear();
transaction_queries_->clear();
// Handle user-defined metadata in auto-transactions
metadata_ = GenOptional(metadata);
}
// This will be done in the handle transaction query. Our handler can save username and then send it to the kill and
@ -2761,7 +2789,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
std::optional<int> qid =
in_explicit_transaction_ ? static_cast<int>(query_executions_.size() - 1) : std::optional<int>{};
query_execution->prepared_query.emplace(PrepareTransactionQuery(trimmed_query));
query_execution->prepared_query.emplace(PrepareTransactionQuery(trimmed_query, metadata));
return {query_execution->prepared_query->header, query_execution->prepared_query->privileges, qid};
}
@ -2962,6 +2990,7 @@ void Interpreter::Abort() {
expect_rollback_ = false;
in_explicit_transaction_ = false;
metadata_ = std::nullopt;
memgraph::metrics::DecrementCounter(memgraph::metrics::ActiveTransactions);

View File

@ -261,6 +261,7 @@ class Interpreter final {
std::optional<std::string> username_;
bool in_explicit_transaction_{false};
bool expect_rollback_{false};
std::optional<std::map<std::string, storage::PropertyValue>> metadata_{}; //!< User defined transaction metadata
/**
* Prepare a query for execution.
@ -271,7 +272,8 @@ class Interpreter final {
* @throw query::QueryException
*/
PrepareResult Prepare(const std::string &query, const std::map<std::string, storage::PropertyValue> &params,
const std::string *username);
const std::string *username,
const std::map<std::string, storage::PropertyValue> &metadata = {});
/**
* Execute the last prepared query and stream *all* of the results into the
@ -315,7 +317,7 @@ class Interpreter final {
std::map<std::string, TypedValue> Pull(TStream *result_stream, std::optional<int> n = {},
std::optional<int> qid = {});
void BeginTransaction();
void BeginTransaction(const std::map<std::string, storage::PropertyValue> &metadata = {});
/*
Returns transaction id or empty if the db_accessor is not initialized.
@ -405,7 +407,8 @@ class Interpreter final {
std::optional<storage::IsolationLevel> interpreter_isolation_level;
std::optional<storage::IsolationLevel> next_transaction_isolation_level;
PreparedQuery PrepareTransactionQuery(std::string_view query_upper);
PreparedQuery PrepareTransactionQuery(std::string_view query_upper,
const std::map<std::string, storage::PropertyValue> &metadata = {});
void Commit();
void AdvanceCommand();
void AbortCommand(std::unique_ptr<QueryExecution> *query_execution);

View File

@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Neo4j.Driver.Simple" Version="4.1.1" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,75 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using Neo4j.Driver;
public class Transactions {
public static void Main(string[] args) {
var driver =
GraphDatabase.Driver("bolt://localhost:7687", AuthTokens.None,
(builder) => builder.WithEncryptionLevel(EncryptionLevel.None));
ClearDatabase(driver);
// Explicit transaction query.
using (var session = driver.Session()) {
Console.WriteLine("Checking explicit transaction metadata...");
var txMetadata = new Dictionary<string, object> {
{ "ver", "transaction" }, { "str", "oho" }, { "num", 456 }
};
using (var tx = session.BeginTransaction(txConfig => txConfig.WithMetadata(txMetadata))) {
tx.Run("MATCH (n) RETURN n LIMIT 1").Consume();
// Check transaction info from another thread
Thread show_tx = new Thread(() => ShowTx(ref driver));
show_tx.Start();
show_tx.Join();
// End current transaction
tx.Commit();
}
}
// Implicit transaction query
using (var session = driver.Session()) {
Console.WriteLine("Checking implicit transaction metadata...");
var txMetadata = new Dictionary<string, object> {
{ "ver", "session" }, { "str", "aha" }, { "num", 123 }
};
CheckMD(session.Run("SHOW TRANSACTIONS", txConfig => txConfig.WithMetadata(txMetadata)));
}
Console.WriteLine("All ok!");
}
private static void ClearDatabase(IDriver driver) {
using (var session = driver.Session()) session.Run("MATCH (n) DETACH DELETE n").Consume();
}
public static void ShowTx(ref IDriver driver) {
using (var session = driver.Session()) {
CheckMD(session.Run("SHOW TRANSACTIONS"));
}
}
public static void CheckMD(IResult tx_md) {
int n = 0;
try {
foreach (var res in tx_md) {
var md = res["metadata"].As<Dictionary<string, object>>();
if (md.Count != 0) {
if (md["ver"].As<string>() == "transaction" && md["str"].As<string>() == "oho" &&
md["num"].As<int>() == 456) {
n = n + 1;
} else if (md["ver"].As<string>() == "session" && md["str"].As<string>() == "aha" &&
md["num"].As<int>() == 123) {
n = n + 1;
}
}
}
} catch {
n = 0;
}
if (n == 0) {
Console.WriteLine("Metadata error!");
Environment.Exit(1);
}
}
}

View File

@ -0,0 +1,90 @@
package main
import "github.com/neo4j/neo4j-go-driver/neo4j"
import "log"
import "fmt"
func handle_error(err error) {
log.Fatal("Error occured: %s", err)
}
func check_md(result neo4j.Result, err error) {
if err != nil {
handle_error(err)
}
n := 0
for result.Next() {
md, ok := result.Record().Get("metadata")
if !ok {
log.Fatal("Failed to read metadata!")
}
md_map, ok := md.(map[string]interface{})
if ok {
ver_val, ver_ok := md_map["ver"]
str_val, str_ok := md_map["str"]
num_val, num_ok := md_map["num"]
if (ver_ok && str_ok && num_ok) {
if ((ver_val.(string) == "session" && str_val.(string) == "aha" && num_val.(int64) == 123) ||
(ver_val.(string) == "transaction" && str_val.(string) == "oho" && num_val.(int64) == 456)) {
n++
}
}
}
}
if n == 0 {
log.Fatal("Wrong metadata values!")
}
_, err = result.Consume()
if err != nil {
handle_error(err)
}
}
func check_tx(driver neo4j.Driver) {
sessionConfig := neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}
session, err := driver.NewSession(sessionConfig)
if err != nil {
log.Fatal("An error occurred while creating a session: %s", err)
}
defer session.Close()
result, err := session.Run("SHOW TRANSACTIONS", nil)
check_md(result, err)
}
func main() {
configForNeo4j40 := func(conf *neo4j.Config) { conf.Encrypted = false }
driver, err := neo4j.NewDriver("bolt://localhost:7687", neo4j.BasicAuth("", "", ""), configForNeo4j40)
if err != nil {
log.Fatal("An error occurred opening conn: %s", err)
}
defer driver.Close()
sessionConfig := neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}
session, err := driver.NewSession(sessionConfig)
if err != nil {
log.Fatal("An error occurred while creating a session: %s", err)
}
defer session.Close()
// Implicit transaction
fmt.Println("Checking implicit transaction metadata...")
result, err := session.Run("SHOW TRANSACTIONS", nil, neo4j.WithTxMetadata(map[string]interface{}{"ver":"session", "str":"aha", "num":123}))
check_md(result, err)
// Explicit transaction
fmt.Println("Checking explicit transaction metadata...")
tx, err := session.BeginTransaction(neo4j.WithTxMetadata(map[string]interface{}{"ver":"transaction", "str":"oho", "num":456}))
if err != nil {
handle_error(err)
}
tx.Run("MATCH (n) RETURN n LIMIT 1", map[string]interface{}{})
go check_tx(driver)
tx.Commit()
fmt.Println("All ok!")
}

View File

@ -12,3 +12,4 @@ go get github.com/neo4j/neo4j-go-driver/neo4j
go run docs_how_to_query.go
go run transactions.go
go run metadata.go

View File

@ -1,38 +1,39 @@
import static org.neo4j.driver.v1.Values.parameters;
import java.util.*;
import org.neo4j.driver.v1.*;
import org.neo4j.driver.v1.types.*;
import static org.neo4j.driver.v1.Values.parameters;
import java.util.*;
public class Basic {
public static void main(String[] args) {
Config config = Config.build().withoutEncryption().toConfig();
Driver driver = GraphDatabase.driver( "bolt://localhost:7687", AuthTokens.basic( "neo4j", "1234" ), config );
public static void main(String[] args) {
Config config = Config.build().withoutEncryption().toConfig();
Driver driver =
GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "1234"), config);
try ( Session session = driver.session() ) {
StatementResult rs1 = session.run( "MATCH (n) DETACH DELETE n" );
System.out.println( "Database cleared." );
try (Session session = driver.session()) {
StatementResult rs1 = session.run("MATCH (n) DETACH DELETE n");
System.out.println("Database cleared.");
StatementResult rs2 = session.run( "CREATE (alice: Person {name: 'Alice', age: 22})" );
System.out.println( "Record created." );
StatementResult rs2 = session.run("CREATE (alice: Person {name: 'Alice', age: 22})");
System.out.println("Record created.");
StatementResult rs3 = session.run( "MATCH (n) RETURN n" );
System.out.println( "Record matched." );
StatementResult rs3 = session.run("MATCH (n) RETURN n");
System.out.println("Record matched.");
List<Record> records = rs3.list();
Record record = records.get( 0 );
Node node = record.get( "n" ).asNode();
if ( !node.get("name").asString().equals( "Alice" ) || node.get("age").asInt() != 22 ) {
System.out.println( "Data doesn't match!" );
System.exit( 1 );
}
List<org.neo4j.driver.v1.Record> records = rs3.list();
org.neo4j.driver.v1.Record record = records.get(0);
Node node = record.get("n").asNode();
if (!node.get("name").asString().equals("Alice") || node.get("age").asInt() != 22) {
System.out.println("Data doesn't match!");
System.exit(1);
}
System.out.println( "All ok!" );
}
catch ( Exception e ) {
System.out.println( e );
System.exit( 1 );
}
driver.close();
System.out.println("All ok!");
} catch (Exception e) {
System.out.println(e);
System.exit(1);
}
driver.close();
}
}

View File

@ -0,0 +1,111 @@
import static org.neo4j.driver.Values.parameters;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Config;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Record;
import org.neo4j.driver.Result;
import org.neo4j.driver.Session;
import org.neo4j.driver.Transaction;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.TransientException;
public class Metadata {
public static String createPerson(Transaction tx, String name) {
Result result =
tx.run("CREATE (a:Person {name: $name}) RETURN a.name", parameters("name", name));
return result.single().get(0).asString();
}
public static void checkMd(Result result) {
int n = 0;
while (result.hasNext()) {
Record r = result.next();
Value md = r.get("metadata");
if (md != null && Objects.equals(md.get("ver").asString(), "transaction")
&& Objects.equals(md.get("str").asString(), "oho")
&& Objects.equals(md.get("num").asInt(), 456)) {
n = n + 1;
} else if (md != null && Objects.equals(md.get("ver").asString(), "session")
&& Objects.equals(md.get("str").asString(), "aha")
&& Objects.equals(md.get("num").asInt(), 123)) {
n = n + 1;
}
}
if (n == 0) {
System.out.println("Error while reading metadata!");
System.exit(1);
}
}
public static void main(String[] args) {
Config config = Config.builder()
.withoutEncryption()
.withMaxTransactionRetryTime(0, TimeUnit.SECONDS)
.build();
Driver driver =
GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "1234"), config);
try (Session session = driver.session()) {
// Explicit transaction
System.out.println("Checking explicit transaction metadata...");
try {
TransactionConfig tx_config =
TransactionConfig.builder()
.withTimeout(Duration.ofSeconds(2))
.withMetadata(Map.ofEntries(Map.entry("ver", "transaction"),
Map.entry("str", "oho"), Map.entry("num", 456)))
.build();
Transaction tx = session.beginTransaction(tx_config);
tx.run("MATCH (n) RETURN n LIMIT 1");
// Check the metadata from another thread
try {
Runnable checkTx = new Runnable() {
public void run() {
try (Session s = driver.session()) {
checkMd(s.run("SHOW TRANSACTIONS"));
} catch (ClientException e) {
System.out.println(e);
}
}
};
Thread thread = new Thread(checkTx);
thread.start();
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
tx.commit();
} catch (ClientException e) {
System.out.println(e);
}
// Implicit transaction
System.out.println("Checking implicit transaction metadata...");
try {
TransactionConfig tx_config = TransactionConfig.builder()
.withTimeout(Duration.ofSeconds(2))
.withMetadata(Map.ofEntries(Map.entry("ver", "session"),
Map.entry("str", "aha"), Map.entry("num", 123)))
.build();
checkMd(session.run("SHOW TRANSACTIONS", tx_config));
} catch (ClientException e) {
System.out.println(e);
}
System.out.println("All ok!");
} catch (Exception e) {
System.out.println(e);
System.exit(1);
}
driver.close();
}
}

View File

@ -35,3 +35,6 @@ java -classpath .:$DRIVER:$REACTIVE_STREAM_DEP MaxQueryLength
javac -classpath .:$DRIVER:$REACTIVE_STREAM_DEP Transactions.java
java -classpath .:$DRIVER:$REACTIVE_STREAM_DEP Transactions
javac -classpath .:$DRIVER:$REACTIVE_STREAM_DEP Metadata.java
java -classpath .:$DRIVER:$REACTIVE_STREAM_DEP Metadata

View File

@ -0,0 +1,31 @@
var neo4j = require('neo4j-driver');
var driver = neo4j.driver("bolt://localhost:7687",
neo4j.auth.basic("", ""),
{ encrypted: 'ENCRYPTION_OFF' });
var session = driver.session();
function die() {
session.close();
driver.close();
process.exit(1);
}
function run_query(query, callback) {
var run = session.run(query, {}, {metadata:{"ver":"session", "str":"aha", "num":123}});
run.then(callback).catch(function (error) {
console.log(error);
die();
});
}
console.log("Checking implicit transaction metadata...");
run_query("SHOW TRANSACTIONS;", function (result) {
const md = result.records[0].get("metadata");
if (md["ver"] != "session" || md["str"] != "aha" || md["num"] != 123){
console.log("Error while reading metadata!");
die();
}
console.log("All ok!");
session.close();
driver.close();
});

View File

@ -15,3 +15,4 @@ fi
node docs_how_to_query.js
node max_query_length.js
node metadata.js

View File

@ -0,0 +1,65 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2021 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import threading
import neo4j
def check_md(tx_md):
n = 0
for record in tx_md:
md = record[3]
if md["ver"] == "session" and md["str"] == "aha" and md["num"] == 123:
n = n + 1
elif md["ver"] == "transaction" and md["str"] == "oho" and md["num"] == 456:
n = n + 1
return n
def session_run(driver):
print("Checking implicit transaction metadata...")
with driver.session() as session:
query = neo4j.Query("SHOW TRANSACTIONS", timeout=2, metadata={"ver": "session", "str": "aha", "num": 123})
result = session.run(query).values()
assert check_md(result) == 1, "metadata info error!"
def show_tx(driver, tx_md):
with driver.session() as session:
query = neo4j.Query("SHOW TRANSACTIONS", timeout=2, metadata={"ver": "session", "str": "aha", "num": 123})
for t in session.run(query).values():
tx_md.append(t)
def transaction_run(driver):
print("Checking explicit transaction metadata...")
with driver.session() as session:
tx = session.begin_transaction(timeout=2, metadata={"ver": "transaction", "str": "oho", "num": 456})
tx.run("MATCH (n) RETURN n LIMIT 1").consume()
tx_md = []
th = threading.Thread(target=show_tx, args=(driver, tx_md))
th.start()
if th.is_alive():
th.join()
tx.commit()
assert check_md(tx_md) == 2, "metadata info error!"
if __name__ == "__main__":
driver = neo4j.GraphDatabase.driver("bolt://localhost:7687", auth=("user", "pass"), encrypted=False)
session_run(driver)
transaction_run(driver)
driver.close()
print("All ok!")

View File

@ -31,3 +31,4 @@ source ve3/bin/activate
python3 docs_how_to_query.py || exit 1
python3 max_query_length.py || exit 1
python3 transactions.py || exit 1
python3 metadata.py || exit 1

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -12,6 +12,7 @@
#include <string>
#include <gflags/gflags.h>
#include <gtest/gtest.h>
#include "bolt_common.hpp"
#include "communication/bolt/v1/session.hpp"
@ -27,6 +28,7 @@ using memgraph::communication::bolt::Value;
static const char *kInvalidQuery = "invalid query";
static const char *kQueryReturn42 = "RETURN 42";
static const char *kQueryReturnMultiple = "UNWIND [1,2,3] as n RETURN n";
static const char *kQueryShowTx = "SHOW TRANSACTIONS";
static const char *kQueryEmpty = "no results";
class TestSessionData {};
@ -39,10 +41,18 @@ class TestSession : public Session<TestInputStream, TestOutputStream> {
: Session<TestInputStream, TestOutputStream>(input_stream, output_stream) {}
std::pair<std::vector<std::string>, std::optional<int>> Interpret(
const std::string &query, const std::map<std::string, Value> &params) override {
const std::string &query, const std::map<std::string, Value> &params,
const std::map<std::string, Value> &metadata) override {
if (!metadata.empty()) md_ = metadata;
if (query == kQueryReturn42 || query == kQueryEmpty || query == kQueryReturnMultiple) {
query_ = query;
return {{"result_name"}, {}};
} else if (query == kQueryShowTx) {
if (md_.at("str").ValueString() != "aha" || md_.at("num").ValueInt() != 123) {
throw ClientError("Wrong metadata!");
}
query_ = query;
return {{"username", "transaction_id", "query", "metadata"}, {}};
} else {
query_ = "";
throw ClientError("client sent invalid query");
@ -71,6 +81,9 @@ class TestSession : public Session<TestInputStream, TestOutputStream> {
}
return {std::pair("has_more", true)};
} else if (query_ == kQueryShowTx) {
encoder->MessageRecord({"", 1234567890, query_, md_});
return {};
} else {
throw ClientError("client sent invalid query");
}
@ -78,11 +91,11 @@ class TestSession : public Session<TestInputStream, TestOutputStream> {
std::map<std::string, Value> Discard(std::optional<int>, std::optional<int>) override { return {}; }
void BeginTransaction() override {}
void CommitTransaction() override {}
void RollbackTransaction() override {}
void BeginTransaction(const std::map<std::string, Value> &metadata) override { md_ = metadata; }
void CommitTransaction() override { md_.clear(); }
void RollbackTransaction() override { md_.clear(); }
void Abort() override {}
void Abort() override { md_.clear(); }
bool Authenticate(const std::string &username, const std::string &password) override { return true; }
@ -90,6 +103,7 @@ class TestSession : public Session<TestInputStream, TestOutputStream> {
private:
std::string query_;
std::map<std::string, Value> md_;
};
// TODO: This could be done in fixture.
@ -157,6 +171,10 @@ inline constexpr uint8_t handshake_req[] = {0x60, 0x60, 0xb0, 0x17, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
inline constexpr uint8_t handshake_resp[] = {0x00, 0x00, 0x03, 0x04};
inline constexpr uint8_t route[]{0xb3, 0x66, 0xa0, 0x90, 0xc0};
const std::string extra_w_metadata =
"\xa2\x8b\x74\x78\x5f\x6d\x65\x74\x61\x64\x61\x74\x61\xa2\x83\x73\x74\x72\x83\x61\x68\x61\x83\x6e\x75\x6d\x7b\x8a"
"\x74\x78\x5f\x74\x69\x6d\x65\x6f\x75\x74\xc9\x07\xd0";
inline constexpr uint8_t commit[] = {0xb0, 0x12};
} // namespace v4_3
// Write bolt chunk header (length)
@ -229,10 +247,11 @@ void ExecuteInit(TestInputStream &input_stream, TestSession &session, std::vecto
}
// Write bolt encoded run request
void WriteRunRequest(TestInputStream &input_stream, const char *str, const bool is_v4 = false) {
void WriteRunRequest(TestInputStream &input_stream, const char *str, const bool is_v4 = false,
const std::string &extra = "\xA0") {
// write chunk header
auto len = strlen(str);
WriteChunkHeader(input_stream, (3 + is_v4) + 2 + len + 1);
WriteChunkHeader(input_stream, (3 + is_v4 * extra.size()) + 2 + len + 1);
const auto *run_header = is_v4 ? v4::run_req_header : run_req_header;
const auto run_header_size = is_v4 ? sizeof(v4::run_req_header) : sizeof(run_req_header);
@ -250,7 +269,7 @@ void WriteRunRequest(TestInputStream &input_stream, const char *str, const bool
if (is_v4) {
// write empty map for extra field
input_stream.Write("\xA0", 1); // TinyMap
input_stream.Write(extra.data(), extra.size()); // TinyMap
}
// write chunk tail
@ -1122,3 +1141,27 @@ TEST(BoltSession, ResetInIdle) {
EXPECT_EQ(session.state_, State::Idle);
}
}
TEST(BoltSession, PassMetadata) {
// v4+
{
INIT_VARS;
ExecuteHandshake(input_stream, session, output, v4_3::handshake_req, v4_3::handshake_resp);
ExecuteInit(input_stream, session, output, true);
WriteRunRequest(input_stream, kQueryShowTx, true, v4_3::extra_w_metadata);
session.Execute();
ASSERT_EQ(session.state_, State::Result);
ExecuteCommand(input_stream, session, v4::pullall_req, sizeof(v4::pullall_req));
ASSERT_EQ(session.state_, State::Idle);
PrintOutput(output);
constexpr std::array<uint8_t, 5> md_num_123{0x83, 0x6E, 0x75, 0x6D, 0x7B};
constexpr std::array<uint8_t, 8> md_str_aha{0x83, 0x73, 0x74, 0x72, 0x83, 0x61, 0x68, 0x61};
auto find_num = std::search(begin(output), end(output), begin(md_num_123), end(md_num_123));
EXPECT_NE(find_num, end(output));
auto find_str = std::search(begin(output), end(output), begin(md_str_aha), end(md_str_aha));
EXPECT_NE(find_str, end(output));
}
}