Compare commits

...

47 Commits

Author SHA1 Message Date
jbajic
190fe3c373 Update markers 2022-07-05 12:20:31 +02:00
jbajic
81a1bd7957 Fix replication errors 2022-07-04 16:27:17 +02:00
jbajic
c18555bf68 Remove serialization logic 2022-07-04 16:26:52 +02:00
jbajic
73d7f6cabc Add encoding to wal 2022-07-01 16:23:30 +02:00
jbajic
9e0540f8ec Add serialization for uint8 2022-07-01 16:21:01 +02:00
jbajic
534d5ee954 Update storage interaface 2022-07-01 16:19:39 +02:00
jbajic
e65805abe9 Add interface for serialization 2022-07-01 11:36:35 +02:00
jbajic
2279575d2c Fix GetSchema 2022-07-01 11:35:15 +02:00
jbajic
201f9f1023 Fix clang tidy errors 2022-06-28 18:02:01 +02:00
jbajic
eb922babb8 Fix tests 2022-06-28 17:15:16 +02:00
jbajic
21cdd07103 Merge branch 'E112-MG-implement-partial-schema' into T0849-MG-create-schema-ddl-expressions 2022-06-28 16:37:15 +02:00
jbajic
beb7c41497 Finish interpreter tests 2022-06-28 15:22:56 +02:00
jbajic
cbbecebb74 Drop float support 2022-06-28 12:27:01 +02:00
jbajic
0e43cf3145 Add notifications 2022-06-28 12:25:14 +02:00
jbajic
44910def21 Fix conflicting keywords 2022-06-28 12:17:11 +02:00
jbajic
b90e856619 Catch repeating type name in schema definition 2022-06-27 14:28:20 +02:00
jbajic
e9053368d7 Add privilege and cypher visitor tests 2022-06-24 17:28:02 +02:00
jbajic
5d22af7c36 Fix grammar with schemaTypeMap 2022-06-24 17:27:32 +02:00
jbajic
ff34e26878 Remove list and map from lexer 2022-06-24 15:51:40 +02:00
jbajic
ed6f9a91a2 Rename DeleteSchema to DropSchema 2022-06-24 14:08:45 +02:00
jbajic
ed814f3188 Add verification on creation and deletion 2022-06-24 13:39:22 +02:00
Jure Bajic
6666eea897
Add initial schema implementation
* Add initial schema implementation

* Add index to schema

* List schemas and enable multiple properties

* Implement SchemaTypes

* Apply suggestions from code review

Co-authored-by: Jeremy B <97525434+42jeremy@users.noreply.github.com>
Co-authored-by: János Benjamin Antal <antaljanosbenjamin@users.noreply.github.com>

* Address review comments

* Remove Map and List

* Apply suggestions from code review

Co-authored-by: Kostas Kyrimis  <kostaskyrim@gmail.com>

Co-authored-by: Jeremy B <97525434+42jeremy@users.noreply.github.com>
Co-authored-by: János Benjamin Antal <antaljanosbenjamin@users.noreply.github.com>
Co-authored-by: Kostas Kyrimis  <kostaskyrim@gmail.com>
2022-06-23 14:04:44 +02:00
jbajic
21ab15f8e3 Override visitPropertyType 2022-06-15 19:12:57 +02:00
jbajic
88d54d4e2f Enable Create schema ddl 2022-06-15 18:23:30 +02:00
jbajic
cb0e072c7d Rename schemaproperty to schemapropertytype 2022-06-15 09:03:20 +02:00
jbajic
7f81fda1c6 Fix naming for schemaproperty type to schema type 2022-06-15 09:01:55 +02:00
jbajic
8eb136bc82 Temporary create ddl logic 2022-06-14 16:44:48 +02:00
jbajic
33f8b27a2a Fix grammar 2022-06-14 16:44:25 +02:00
jbajic
a2aabdb5ae Add common schema type 2022-06-14 16:08:00 +02:00
jbajic
eebb48d094 Fix schema visitor 2022-06-13 14:00:39 +02:00
jbajic
c95c428355 Add show schema query 2022-06-10 17:42:32 +02:00
jbajic
1afe86d4ce Implement show schemas 2022-06-10 14:14:50 +02:00
jbajic
69a5db6875 Add PrepareSchemaQuery function 2022-06-10 14:14:50 +02:00
jbajic
904e3df3a1 Update metadata 2022-06-10 14:14:50 +02:00
jbajic
9922f90e81 Add schema visitors 2022-06-10 14:14:50 +02:00
jbajic
c4863b36a5 Add drop schema query 2022-06-10 14:14:50 +02:00
jbajic
0cf677eef6 Add missing keywords into lexer 2022-06-10 14:14:50 +02:00
jbajic
300eae544d Add privileges for schema 2022-06-10 14:14:50 +02:00
jbajic
382f85cd4d Add create and show schema queries 2022-06-10 14:14:50 +02:00
jbajic
88b98a97cb Add schema operations in storage 2022-06-10 14:14:49 +02:00
jbajic
27f688d7ca Remove Map and List 2022-06-10 14:12:52 +02:00
jbajic
53a5b77f14 Address review comments 2022-05-27 11:40:06 +02:00
Jure Bajic
8cb56a4970
Apply suggestions from code review
Co-authored-by: Jeremy B <97525434+42jeremy@users.noreply.github.com>
Co-authored-by: János Benjamin Antal <antaljanosbenjamin@users.noreply.github.com>
2022-05-26 17:22:54 +02:00
jbajic
eb4d23c504 Implement SchemaTypes 2022-05-25 10:41:43 +02:00
jbajic
65b8e055a6 List schemas and enable multiple properties 2022-05-25 10:41:18 +02:00
jbajic
f8e1d8c3f6 Add index to schema 2022-05-19 16:56:48 +02:00
jbajic
01618fb0f2 Add initial schema implementation 2022-05-19 14:42:01 +02:00
33 changed files with 1088 additions and 26 deletions

View File

@ -37,7 +37,7 @@ const std::vector<Permission> kPermissionsAll = {
Permission::CONSTRAINT, Permission::DUMP, Permission::AUTH, Permission::REPLICATION,
Permission::DURABILITY, Permission::READ_FILE, Permission::FREE_MEMORY, Permission::TRIGGER,
Permission::CONFIG, Permission::STREAM, Permission::MODULE_READ, Permission::MODULE_WRITE,
Permission::WEBSOCKET};
Permission::WEBSOCKET, Permission::SCHEMA};
} // namespace
std::string PermissionToString(Permission permission) {
@ -84,6 +84,8 @@ std::string PermissionToString(Permission permission) {
return "MODULE_WRITE";
case Permission::WEBSOCKET:
return "WEBSOCKET";
case Permission::SCHEMA:
return "SCHEMA";
}
}

View File

@ -38,7 +38,8 @@ enum class Permission : uint64_t {
STREAM = 1U << 17U,
MODULE_READ = 1U << 18U,
MODULE_WRITE = 1U << 19U,
WEBSOCKET = 1U << 20U
WEBSOCKET = 1U << 20U,
SCHEMA = 1U << 21U
};
// clang-format on

19
src/common/types.hpp Normal file
View File

@ -0,0 +1,19 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <cstdint>
namespace memgraph::common {
enum class SchemaType : uint8_t { BOOL, INT, STRING, DATE, LOCALTIME, LOCALDATETIME, DURATION };
} // namespace memgraph::common

View File

@ -57,6 +57,8 @@ auth::Permission PrivilegeToPermission(query::AuthQuery::Privilege privilege) {
return auth::Permission::MODULE_WRITE;
case query::AuthQuery::Privilege::WEBSOCKET:
return auth::Permission::WEBSOCKET;
case query::AuthQuery::Privilege::SCHEMA:
return auth::Permission::SCHEMA;
}
}
} // namespace memgraph::glue

View File

@ -17,6 +17,7 @@
#include <variant>
#include <vector>
#include "common/types.hpp"
#include "query/frontend/ast/ast_visitor.hpp"
#include "query/frontend/semantic/symbol.hpp"
#include "query/interpret/awesome_memgraph_functions.hpp"
@ -2253,7 +2254,7 @@ cpp<#
(lcp:define-enum privilege
(create delete match merge set remove index stats auth constraint
dump replication durability read_file free_memory trigger config stream module_read module_write
websocket)
websocket schema)
(:serialize))
#>cpp
AuthQuery() = default;
@ -2295,7 +2296,7 @@ const std::vector<AuthQuery::Privilege> kPrivilegesAll = {
AuthQuery::Privilege::FREE_MEMORY, AuthQuery::Privilege::TRIGGER,
AuthQuery::Privilege::CONFIG, AuthQuery::Privilege::STREAM,
AuthQuery::Privilege::MODULE_READ, AuthQuery::Privilege::MODULE_WRITE,
AuthQuery::Privilege::WEBSOCKET};
AuthQuery::Privilege::WEBSOCKET, AuthQuery::Privilege::SCHEMA};
cpp<#
(lcp:define-class info-query (query)
@ -2665,5 +2666,37 @@ cpp<#
(:serialize (:slk))
(:clone))
(lcp:define-class schema-query (query)
((action "Action" :scope :public)
(label "LabelIx" :scope :public
:slk-load (lambda (member)
#>cpp
slk::Load(&self->${member}, reader, storage);
cpp<#)
:clone (lambda (source dest)
#>cpp
${dest} = storage->GetLabelIx(${source}.name);
cpp<#))
(schema_type_map "std::unordered_map<PropertyIx, common::SchemaType>"
:slk-save #'slk-save-property-map
:slk-load #'slk-load-property-map
:scope :public))
(:public
(lcp:define-enum action
(create-schema drop-schema show-schema show-schemas)
(:serialize))
#>cpp
SchemaQuery() = default;
DEFVISITABLE(QueryVisitor<void>);
cpp<#)
(:private
#>cpp
friend class AstStorage;
cpp<#)
(:serialize (:slk))
(:clone))
(lcp:pop-namespace) ;; namespace query
(lcp:pop-namespace) ;; namespace memgraph

View File

@ -94,6 +94,7 @@ class StreamQuery;
class SettingQuery;
class VersionQuery;
class Foreach;
class SchemaQuery;
using TreeCompositeVisitor = utils::CompositeVisitor<
SingleQuery, CypherUnion, NamedExpression, OrOperator, XorOperator, AndOperator, NotOperator, AdditionOperator,
@ -125,9 +126,9 @@ class ExpressionVisitor
None, ParameterLookup, Identifier, PrimitiveLiteral, RegexMatch> {};
template <class TResult>
class QueryVisitor
: public utils::Visitor<TResult, CypherQuery, ExplainQuery, ProfileQuery, IndexQuery, AuthQuery, InfoQuery,
ConstraintQuery, DumpQuery, ReplicationQuery, LockPathQuery, FreeMemoryQuery, TriggerQuery,
IsolationLevelQuery, CreateSnapshotQuery, StreamQuery, SettingQuery, VersionQuery> {};
class QueryVisitor : public utils::Visitor<TResult, CypherQuery, ExplainQuery, ProfileQuery, IndexQuery, AuthQuery,
InfoQuery, ConstraintQuery, DumpQuery, ReplicationQuery, LockPathQuery,
FreeMemoryQuery, TriggerQuery, IsolationLevelQuery, CreateSnapshotQuery,
StreamQuery, SettingQuery, VersionQuery, SchemaQuery> {};
} // namespace memgraph::query

View File

@ -27,6 +27,7 @@
#include <boost/preprocessor/cat.hpp>
#include "common/types.hpp"
#include "query/exceptions.hpp"
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/ast/ast_visitor.hpp"
@ -1338,6 +1339,7 @@ antlrcpp::Any CypherMainVisitor::visitPrivilege(MemgraphCypher::PrivilegeContext
if (ctx->MODULE_READ()) return AuthQuery::Privilege::MODULE_READ;
if (ctx->MODULE_WRITE()) return AuthQuery::Privilege::MODULE_WRITE;
if (ctx->WEBSOCKET()) return AuthQuery::Privilege::WEBSOCKET;
if (ctx->SCHEMA()) return AuthQuery::Privilege::SCHEMA;
LOG_FATAL("Should not get here - unknown privilege!");
}
@ -2336,6 +2338,94 @@ antlrcpp::Any CypherMainVisitor::visitForeach(MemgraphCypher::ForeachContext *ct
return for_each;
}
antlrcpp::Any CypherMainVisitor::visitSchemaQuery(MemgraphCypher::SchemaQueryContext *ctx) {
MG_ASSERT(ctx->children.size() == 1, "SchemaQuery should have exactly one child!");
auto *schema_query = ctx->children[0]->accept(this).as<SchemaQuery *>();
query_ = schema_query;
return schema_query;
}
antlrcpp::Any CypherMainVisitor::visitShowSchema(MemgraphCypher::ShowSchemaContext *ctx) {
auto *schema_query = storage_->Create<SchemaQuery>();
schema_query->action_ = SchemaQuery::Action::SHOW_SCHEMA;
schema_query->label_ = AddLabel(ctx->labelName()->accept(this));
query_ = schema_query;
return schema_query;
}
antlrcpp::Any CypherMainVisitor::visitShowSchemas(MemgraphCypher::ShowSchemasContext * /*ctx*/) {
auto *schema_query = storage_->Create<SchemaQuery>();
schema_query->action_ = SchemaQuery::Action::SHOW_SCHEMAS;
query_ = schema_query;
return schema_query;
}
antlrcpp::Any CypherMainVisitor::visitPropertyType(MemgraphCypher::PropertyTypeContext *ctx) {
MG_ASSERT(ctx->symbolicName());
const auto property_type = utils::ToLowerCase(ctx->symbolicName()->accept(this).as<std::string>());
if (property_type == "bool") {
return common::SchemaType::BOOL;
}
if (property_type == "string") {
return common::SchemaType::STRING;
}
if (property_type == "integer") {
return common::SchemaType::INT;
}
if (property_type == "date") {
return common::SchemaType::DATE;
}
if (property_type == "duration") {
return common::SchemaType::DURATION;
}
if (property_type == "localdatetime") {
return common::SchemaType::LOCALDATETIME;
}
if (property_type == "localtime") {
return common::SchemaType::LOCALTIME;
}
throw SyntaxException("Property type must be one of the supported types!");
}
/**
* @return Schema*
*/
antlrcpp::Any CypherMainVisitor::visitSchemaTypeMap(MemgraphCypher::SchemaTypeMapContext *ctx) {
std::unordered_map<PropertyIx, common::SchemaType> map;
for (auto *property_key_pair : ctx->propertyKeyTypePair()) {
PropertyIx key = property_key_pair->propertyKeyName()->accept(this);
common::SchemaType type = property_key_pair->propertyType()->accept(this);
if (!map.insert({key, type}).second) {
throw SemanticException("Same property name can't appear twice in a schema map.");
}
}
return map;
}
antlrcpp::Any CypherMainVisitor::visitCreateSchema(MemgraphCypher::CreateSchemaContext *ctx) {
auto *schema_query = storage_->Create<SchemaQuery>();
schema_query->action_ = SchemaQuery::Action::CREATE_SCHEMA;
schema_query->label_ = AddLabel(ctx->labelName()->accept(this));
if (!ctx->schemaTypeMap()) {
throw SemanticException("Schema property map must exist!");
}
schema_query->schema_type_map_ =
ctx->schemaTypeMap()->accept(this).as<std::unordered_map<PropertyIx, common::SchemaType>>();
query_ = schema_query;
return schema_query;
}
/**
* @return Schema*
*/
antlrcpp::Any CypherMainVisitor::visitDropSchema(MemgraphCypher::DropSchemaContext *ctx) {
auto *schema_query = storage_->Create<SchemaQuery>();
schema_query->action_ = SchemaQuery::Action::DROP_SCHEMA;
schema_query->label_ = AddLabel(ctx->labelName()->accept(this));
query_ = schema_query;
return schema_query;
}
LabelIx CypherMainVisitor::AddLabel(const std::string &name) { return storage_->GetLabelIx(name); }
PropertyIx CypherMainVisitor::AddProperty(const std::string &name) { return storage_->GetPropertyIx(name); }

View File

@ -849,6 +849,41 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
*/
antlrcpp::Any visitForeach(MemgraphCypher::ForeachContext *ctx) override;
/**
* @return Schema*
*/
antlrcpp::Any visitPropertyType(MemgraphCypher::PropertyTypeContext *ctx) override;
/**
* @return Schema*
*/
antlrcpp::Any visitSchemaTypeMap(MemgraphCypher::SchemaTypeMapContext *ctx) override;
/**
* @return Schema*
*/
antlrcpp::Any visitSchemaQuery(MemgraphCypher::SchemaQueryContext *ctx) override;
/**
* @return Schema*
*/
antlrcpp::Any visitShowSchema(MemgraphCypher::ShowSchemaContext *ctx) override;
/**
* @return Schema*
*/
antlrcpp::Any visitShowSchemas(MemgraphCypher::ShowSchemasContext *ctx) override;
/**
* @return Schema*
*/
antlrcpp::Any visitCreateSchema(MemgraphCypher::CreateSchemaContext *ctx) override;
/**
* @return Schema*
*/
antlrcpp::Any visitDropSchema(MemgraphCypher::DropSchemaContext *ctx) override;
public:
Query *query() { return query_; }
const static std::string kAnonPrefix;

View File

@ -46,10 +46,10 @@ memgraphCypherKeyword : cypherKeyword
| DROP
| DUMP
| EXECUTE
| FOR
| FOREACH
| FREE
| FROM
| FOR
| FOREACH
| GLOBAL
| GRANT
| HEADER
@ -76,6 +76,8 @@ memgraphCypherKeyword : cypherKeyword
| ROLE
| ROLES
| QUOTE
| SCHEMA
| SCHEMAS
| SESSION
| SETTING
| SETTINGS
@ -122,6 +124,7 @@ query : cypherQuery
| streamQuery
| settingQuery
| versionQuery
| schemaQuery
;
authQuery : createRole
@ -192,6 +195,12 @@ settingQuery : setSetting
| showSettings
;
schemaQuery : showSchema
| showSchemas
| createSchema
| dropSchema
;
loadCsv : LOAD CSV FROM csvFile ( WITH | NO ) HEADER
( IGNORE BAD ) ?
( DELIMITER delimiter ) ?
@ -254,6 +263,7 @@ privilege : CREATE
| MODULE_READ
| MODULE_WRITE
| WEBSOCKET
| SCHEMA
;
privilegeList : privilege ( ',' privilege )* ;
@ -374,3 +384,17 @@ showSetting : SHOW DATABASE SETTING settingName ;
showSettings : SHOW DATABASE SETTINGS ;
versionQuery : SHOW VERSION ;
showSchema : SHOW SCHEMA ON ':' labelName ;
showSchemas : SHOW SCHEMAS ;
propertyType : symbolicName ;
propertyKeyTypePair : propertyKeyName propertyType ;
schemaTypeMap : '(' propertyKeyTypePair ( ',' propertyKeyTypePair )* ')' ;
createSchema : CREATE SCHEMA ON ':' labelName schemaTypeMap ;
dropSchema : DROP SCHEMA ON ':' labelName ;

View File

@ -89,6 +89,8 @@ REVOKE : R E V O K E ;
ROLE : R O L E ;
ROLES : R O L E S ;
QUOTE : Q U O T E ;
SCHEMA : S C H E M A ;
SCHEMAS : S C H E M A S ;
SERVICE_URL : S E R V I C E UNDERSCORE U R L ;
SESSION : S E S S I O N ;
SETTING : S E T T I N G ;

View File

@ -80,6 +80,8 @@ class PrivilegeExtractor : public QueryVisitor<void>, public HierarchicalTreeVis
void Visit(VersionQuery & /*version_query*/) override { AddPrivilege(AuthQuery::Privilege::STATS); }
void Visit(SchemaQuery & /*schema_query*/) override { AddPrivilege(AuthQuery::Privilege::SCHEMA); }
bool PreVisit(Create & /*unused*/) override {
AddPrivilege(AuthQuery::Privilege::CREATE);
return false;

View File

@ -204,8 +204,9 @@ const trie::Trie kKeywords = {"union",
"pulsar",
"service_url",
"version",
"websocket"
"foreach"};
"websocket",
"foreach",
"schema"};
// Unicode codepoints that are allowed at the start of the unescaped name.
const std::bitset<kBitsetSize> kUnescapedNameAllowedStarts(

View File

@ -44,6 +44,7 @@
#include "query/trigger.hpp"
#include "query/typed_value.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v2/schemas.hpp"
#include "utils/algorithm.hpp"
#include "utils/csv_parsing.hpp"
#include "utils/event_counter.hpp"
@ -820,6 +821,108 @@ Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters &param
}
}
Callback HandleSchemaQuery(SchemaQuery *schema_query, InterpreterContext *interpreter_context,
std::vector<Notification> *notifications) {
Callback callback;
switch (schema_query->action_) {
case SchemaQuery::Action::SHOW_SCHEMAS: {
callback.header = {"label", "primary_key", "primary_key_type"};
callback.fn = [interpreter_context]() {
auto *db = interpreter_context->db;
auto schemas_info = db->ListAllSchemas();
std::vector<std::vector<TypedValue>> results;
results.reserve(schemas_info.schemas.size());
for (const auto &[label_id, schema_types] : schemas_info.schemas) {
std::vector<TypedValue> schema_info_row;
schema_info_row.reserve(3);
schema_info_row.emplace_back(db->LabelToName(label_id));
std::vector<std::string> primary_key_properties;
primary_key_properties.reserve(schema_types.size());
std::transform(schema_types.begin(), schema_types.end(), std::back_inserter(primary_key_properties),
[&db](const auto &schema_type) {
return db->PropertyToName(schema_type.property_id) +
"::" + storage::SchemaTypeToString(schema_type.type);
});
schema_info_row.emplace_back(utils::Join(primary_key_properties, ", "));
schema_info_row.emplace_back(schema_types.size() == 1 ? "Single" : "Composite");
results.push_back(std::move(schema_info_row));
}
return results;
};
return callback;
}
case SchemaQuery::Action::SHOW_SCHEMA: {
callback.header = {"property_name", "property_type"};
callback.fn = [interpreter_context, primary_label = schema_query->label_]() {
auto *db = interpreter_context->db;
const auto label = db->NameToLabel(primary_label.name);
const auto schemas_info = db->GetSchema(label);
MG_ASSERT(schemas_info.schemas.size() < 2, "There can be only one schema under single label!");
std::vector<std::vector<TypedValue>> results;
if (!schemas_info.schemas.empty()) {
const auto schema = schemas_info.schemas[0];
for (const auto &schema_property : schema.second) {
std::vector<TypedValue> schema_info_row;
schema_info_row.reserve(2);
schema_info_row.emplace_back(db->PropertyToName(schema_property.property_id));
schema_info_row.emplace_back(storage::SchemaTypeToString(schema_property.type));
results.push_back(std::move(schema_info_row));
}
}
return results;
};
return callback;
}
case SchemaQuery::Action::CREATE_SCHEMA: {
auto schema_type_map = schema_query->schema_type_map_;
if (schema_query->schema_type_map_.empty()) {
throw SyntaxException("One or more types have to be defined in schema definition.");
}
callback.fn = [interpreter_context, primary_label = schema_query->label_,
schema_type_map = std::move(schema_type_map)]() {
auto *db = interpreter_context->db;
const auto label = db->NameToLabel(primary_label.name);
std::vector<storage::SchemaPropertyType> schemas_types;
schemas_types.reserve(schema_type_map.size());
for (const auto &schema_type : schema_type_map) {
auto property_id = db->NameToProperty(schema_type.first.name);
schemas_types.push_back({schema_type.second, property_id});
}
if (!db->CreateSchema(label, schemas_types)) {
throw QueryException(fmt::format("Schema on label :{} already exists!", primary_label.name));
}
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::CREATE_SCHEMA,
fmt::format("Create schema on label :{}", schema_query->label_.name));
return callback;
}
case SchemaQuery::Action::DROP_SCHEMA: {
callback.fn = [interpreter_context, primary_label = schema_query->label_]() {
auto *db = interpreter_context->db;
const auto label = db->NameToLabel(primary_label.name);
if (!db->DropSchema(label)) {
throw QueryException(fmt::format("Schema on label :{} does not exist!", primary_label.name));
}
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::DROP_SCHEMA,
fmt::format("Dropped schema on label :{}", schema_query->label_.name));
return callback;
}
}
return callback;
}
// Struct for lazy pulling from a vector
struct PullPlanVector {
explicit PullPlanVector(std::vector<std::vector<TypedValue>> values) : values_(std::move(values)) {}
@ -2015,6 +2118,32 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_
RWType::NONE};
}
PreparedQuery PrepareSchemaQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
InterpreterContext *interpreter_context, std::vector<Notification> *notifications) {
if (in_explicit_transaction) {
throw ConstraintInMulticommandTxException();
}
auto *schema_query = utils::Downcast<SchemaQuery>(parsed_query.query);
MG_ASSERT(schema_query);
auto callback = HandleSchemaQuery(schema_query, interpreter_context, notifications);
return PreparedQuery{std::move(callback.header), std::move(parsed_query.required_privileges),
[handler = std::move(callback.fn), action = QueryHandlerResult::NOTHING,
pull_plan = std::shared_ptr<PullPlanVector>(nullptr)](
AnyStream *stream, std::optional<int> n) mutable -> std::optional<QueryHandlerResult> {
if (!pull_plan) {
auto results = handler();
pull_plan = std::make_shared<PullPlanVector>(std::move(results));
}
if (pull_plan->Pull(stream, n)) {
return action;
}
return std::nullopt;
},
RWType::NONE};
}
void Interpreter::BeginTransaction() {
const auto prepared_query = PrepareTransactionQuery("BEGIN");
prepared_query.query_handler(nullptr, {});
@ -2148,6 +2277,9 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
prepared_query = PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, &*execution_db_accessor_);
} else if (utils::Downcast<VersionQuery>(parsed_query.query)) {
prepared_query = PrepareVersionQuery(std::move(parsed_query), in_explicit_transaction_);
} else if (utils::Downcast<SchemaQuery>(parsed_query.query)) {
prepared_query = PrepareSchemaQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_,
&query_execution->notifications);
} else {
LOG_FATAL("Should not get here -- unknown query type!");
}

View File

@ -38,6 +38,8 @@ constexpr std::string_view GetCodeString(const NotificationCode code) {
return "CreateIndex"sv;
case NotificationCode::CREATE_STREAM:
return "CreateStream"sv;
case NotificationCode::CREATE_SCHEMA:
return "CreateSchema"sv;
case NotificationCode::CHECK_STREAM:
return "CheckStream"sv;
case NotificationCode::CREATE_TRIGGER:
@ -48,6 +50,8 @@ constexpr std::string_view GetCodeString(const NotificationCode code) {
return "DropReplica"sv;
case NotificationCode::DROP_INDEX:
return "DropIndex"sv;
case NotificationCode::DROP_SCHEMA:
return "DropSchema"sv;
case NotificationCode::DROP_STREAM:
return "DropStream"sv;
case NotificationCode::DROP_TRIGGER:
@ -68,6 +72,10 @@ constexpr std::string_view GetCodeString(const NotificationCode code) {
return "ReplicaPortWarning"sv;
case NotificationCode::SET_REPLICA:
return "SetReplica"sv;
case NotificationCode::SHOW_SCHEMA:
return "ShowSchema"sv;
case NotificationCode::SHOW_SCHEMAS:
return "ShowSchemas"sv;
case NotificationCode::START_STREAM:
return "StartStream"sv;
case NotificationCode::START_ALL_STREAMS:
@ -114,4 +122,4 @@ std::string ExecutionStatsKeyToString(const ExecutionStats::Key key) {
}
}
} // namespace memgraph::query
} // namespace memgraph::query

View File

@ -26,12 +26,14 @@ enum class SeverityLevel : uint8_t { INFO, WARNING };
enum class NotificationCode : uint8_t {
CREATE_CONSTRAINT,
CREATE_INDEX,
CREATE_SCHEMA,
CHECK_STREAM,
CREATE_STREAM,
CREATE_TRIGGER,
DROP_CONSTRAINT,
DROP_INDEX,
DROP_REPLICA,
DROP_SCHEMA,
DROP_STREAM,
DROP_TRIGGER,
EXISTANT_INDEX,
@ -42,6 +44,8 @@ enum class NotificationCode : uint8_t {
REPLICA_PORT_WARNING,
REGISTER_REPLICA,
SET_REPLICA,
SHOW_SCHEMA,
SHOW_SCHEMAS,
START_STREAM,
START_ALL_STREAMS,
STOP_STREAM,

View File

@ -10,6 +10,7 @@ set(storage_v2_src_files
indices.cpp
property_store.cpp
vertex_accessor.cpp
schemas.cpp
storage.cpp)
##### Replication #####

View File

@ -178,6 +178,7 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
RecoveryInfo recovery_info;
RecoveredIndicesAndConstraints indices_constraints;
memgraph::storage::SchemasMap recovered_schemas;
std::optional<uint64_t> snapshot_timestamp;
if (!snapshot_files.empty()) {
spdlog::info("Try recovering from snapshot directory {}.", snapshot_directory);

View File

@ -38,6 +38,7 @@ enum class Marker : uint8_t {
SECTION_DELTA = 0x26,
SECTION_EPOCH_HISTORY = 0x27,
SECTION_OFFSETS = 0x42,
SECTION_SCHEMAS = 0x43,
DELTA_VERTEX_CREATE = 0x50,
DELTA_VERTEX_DELETE = 0x51,
@ -56,6 +57,8 @@ enum class Marker : uint8_t {
DELTA_EXISTENCE_CONSTRAINT_DROP = 0x5e,
DELTA_UNIQUE_CONSTRAINT_CREATE = 0x5f,
DELTA_UNIQUE_CONSTRAINT_DROP = 0x60,
DELTA_SCHEMA_CREATE = 0x61,
DELTA_SCHEMA_DROP = 0x62,
VALUE_FALSE = 0x00,
VALUE_TRUE = 0xff,
@ -63,7 +66,7 @@ enum class Marker : uint8_t {
/// List of all available markers.
/// IMPORTANT: Don't forget to update this list when you add a new Marker.
static const Marker kMarkersAll[] = {
constexpr Marker kMarkersAll[] = {
Marker::TYPE_NULL,
Marker::TYPE_BOOL,
Marker::TYPE_INT,
@ -99,6 +102,8 @@ static const Marker kMarkersAll[] = {
Marker::DELTA_EXISTENCE_CONSTRAINT_DROP,
Marker::DELTA_UNIQUE_CONSTRAINT_CREATE,
Marker::DELTA_UNIQUE_CONSTRAINT_DROP,
Marker::DELTA_SCHEMA_CREATE,
Marker::DELTA_SCHEMA_DROP,
Marker::VALUE_FALSE,
Marker::VALUE_TRUE,
};

View File

@ -333,6 +333,7 @@ std::optional<PropertyValue> Decoder::ReadPropertyValue() {
case Marker::SECTION_DELTA:
case Marker::SECTION_EPOCH_HISTORY:
case Marker::SECTION_OFFSETS:
case Marker::SECTION_SCHEMAS:
case Marker::DELTA_VERTEX_CREATE:
case Marker::DELTA_VERTEX_DELETE:
case Marker::DELTA_VERTEX_ADD_LABEL:
@ -350,6 +351,8 @@ std::optional<PropertyValue> Decoder::ReadPropertyValue() {
case Marker::DELTA_EXISTENCE_CONSTRAINT_DROP:
case Marker::DELTA_UNIQUE_CONSTRAINT_CREATE:
case Marker::DELTA_UNIQUE_CONSTRAINT_DROP:
case Marker::DELTA_SCHEMA_CREATE:
case Marker::DELTA_SCHEMA_DROP:
case Marker::VALUE_FALSE:
case Marker::VALUE_TRUE:
return std::nullopt;
@ -432,6 +435,7 @@ bool Decoder::SkipPropertyValue() {
case Marker::SECTION_DELTA:
case Marker::SECTION_EPOCH_HISTORY:
case Marker::SECTION_OFFSETS:
case Marker::SECTION_SCHEMAS:
case Marker::DELTA_VERTEX_CREATE:
case Marker::DELTA_VERTEX_DELETE:
case Marker::DELTA_VERTEX_ADD_LABEL:
@ -449,6 +453,8 @@ bool Decoder::SkipPropertyValue() {
case Marker::DELTA_EXISTENCE_CONSTRAINT_DROP:
case Marker::DELTA_UNIQUE_CONSTRAINT_CREATE:
case Marker::DELTA_UNIQUE_CONSTRAINT_DROP:
case Marker::DELTA_SCHEMA_CREATE:
case Marker::DELTA_SCHEMA_DROP:
case Marker::VALUE_FALSE:
case Marker::VALUE_TRUE:
return false;

View File

@ -30,6 +30,7 @@ namespace memgraph::storage::durability {
/// Structure used to hold information about a snapshot.
struct SnapshotInfo {
uint64_t offset_schemas;
uint64_t offset_edges;
uint64_t offset_vertices;
uint64_t offset_indices;

View File

@ -69,6 +69,10 @@ namespace memgraph::storage::durability {
// * unique constraint create, unique constraint drop
// * label name
// * property names
// * schema create, schema drop
// * label name
// * property names
// * property type
//
// IMPORTANT: When changing WAL encoding/decoding bump the snapshot/WAL version
// in `version.hpp`.
@ -93,6 +97,10 @@ Marker OperationToMarker(StorageGlobalOperation operation) {
return Marker::DELTA_UNIQUE_CONSTRAINT_CREATE;
case StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP:
return Marker::DELTA_UNIQUE_CONSTRAINT_DROP;
case StorageGlobalOperation::SCHEMA_CREATE:
return Marker::DELTA_SCHEMA_CREATE;
case StorageGlobalOperation::SCHEMA_DROP:
return Marker::DELTA_SCHEMA_DROP;
}
}
@ -122,7 +130,7 @@ Marker VertexActionToMarker(Delta::Action action) {
}
}
// This function convertes a Marker to a WalDeltaData::Type. It checks for the
// This function converts a Marker to a WalDeltaData::Type. It checks for the
// validity of the marker and throws if an invalid marker is specified.
// @throw RecoveryFailure
WalDeltaData::Type MarkerToWalDeltaDataType(Marker marker) {
@ -161,6 +169,10 @@ WalDeltaData::Type MarkerToWalDeltaDataType(Marker marker) {
return WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE;
case Marker::DELTA_UNIQUE_CONSTRAINT_DROP:
return WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP;
case Marker::DELTA_SCHEMA_CREATE:
return WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE;
case Marker::DELTA_SCHEMA_DROP:
return WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP;
case Marker::TYPE_NULL:
case Marker::TYPE_BOOL:
@ -309,6 +321,11 @@ WalDeltaData ReadSkipWalDeltaData(BaseDecoder *decoder) {
if (!decoder->SkipString()) throw RecoveryFailure("Invalid WAL data!");
}
}
break;
}
case WalDeltaData::Type::SCHEMA_CREATE:
case WalDeltaData::Type::SCHEMA_DROP: {
break;
}
}
@ -456,6 +473,10 @@ bool operator==(const WalDeltaData &a, const WalDeltaData &b) {
case WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP:
return a.operation_label_properties.label == b.operation_label_properties.label &&
a.operation_label_properties.properties == b.operation_label_properties.properties;
case WalDeltaData::Type::SCHEMA_CREATE:
case WalDeltaData::Type::SCHEMA_DROP: {
return a.operation_label_create_schema.label == b.operation_label_create_schema.label;
}
}
}
bool operator!=(const WalDeltaData &a, const WalDeltaData &b) { return !(a == b); }
@ -615,6 +636,49 @@ void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper, Storage
}
break;
}
case StorageGlobalOperation::SCHEMA_CREATE:
case StorageGlobalOperation::SCHEMA_DROP: {
MG_ASSERT(!properties.empty(), "Invalid function call!");
encoder->WriteMarker(OperationToMarker(operation));
encoder->WriteString(name_id_mapper->IdToName(label.AsUint()));
encoder->WriteUint(properties.size());
for (const auto &property : properties) {
encoder->WriteString(name_id_mapper->IdToName(property.AsUint()));
}
break;
}
}
}
void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper, StorageGlobalOperation operation,
const Schemas::Schema &schema, uint64_t timestamp) {
encoder->WriteMarker(Marker::SECTION_DELTA);
encoder->WriteUint(timestamp);
switch (operation) {
case StorageGlobalOperation::LABEL_INDEX_CREATE:
case StorageGlobalOperation::LABEL_INDEX_DROP:
case StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE:
case StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP:
case StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE:
case StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP:
case StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE:
case StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP:
case StorageGlobalOperation::SCHEMA_DROP: {
throw RecoveryFailure("Unsupported action!");
}
case StorageGlobalOperation::SCHEMA_CREATE: {
encoder->WriteMarker(OperationToMarker(operation));
encoder->WriteString(name_id_mapper->IdToName(schema.first.AsUint()));
encoder->WriteUint(schema.second.size());
for (const auto &schema_type : schema.second) {
encoder->WriteString(name_id_mapper->IdToName(schema_type.property_id.AsUint()));
}
encoder->WriteUint(schema.second.size());
for (const auto &schema_type : schema.second) {
encoder->WriteUint(static_cast<uint64_t>(schema_type.type));
}
break;
}
}
}
@ -847,6 +911,10 @@ RecoveryInfo LoadWal(const std::filesystem::path &path, RecoveredIndicesAndConst
"The unique constraint doesn't exist!");
break;
}
case WalDeltaData::Type::SCHEMA_CREATE:
case WalDeltaData::Type::SCHEMA_DROP: {
break;
}
}
ret.next_timestamp = std::max(ret.next_timestamp, timestamp + 1);
++deltas_applied;
@ -967,6 +1035,11 @@ void WalFile::AppendOperation(StorageGlobalOperation operation, LabelId label, c
UpdateStats(timestamp);
}
void WalFile::AppendOperation(StorageGlobalOperation operation, const Schemas::Schema &schema, uint64_t timestamp) {
EncodeOperation(&wal_, name_id_mapper_, operation, schema, timestamp);
UpdateStats(timestamp);
}
void WalFile::Sync() { wal_.Sync(); }
uint64_t WalFile::GetSize() { return wal_.GetSize(); }

View File

@ -24,6 +24,7 @@
#include "storage/v2/id_types.hpp"
#include "storage/v2/name_id_mapper.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v2/schemas.hpp"
#include "storage/v2/vertex.hpp"
#include "utils/file_locker.hpp"
#include "utils/skip_list.hpp"
@ -63,6 +64,8 @@ struct WalDeltaData {
EXISTENCE_CONSTRAINT_DROP,
UNIQUE_CONSTRAINT_CREATE,
UNIQUE_CONSTRAINT_DROP,
SCHEMA_CREATE,
SCHEMA_DROP,
};
Type type{Type::TRANSACTION_END};
@ -102,6 +105,11 @@ struct WalDeltaData {
std::string label;
std::set<std::string> properties;
} operation_label_properties;
struct {
std::string label;
std::vector<SchemaPropertyType> schema_properties_types;
} operation_label_create_schema;
};
bool operator==(const WalDeltaData &a, const WalDeltaData &b);
@ -117,6 +125,8 @@ enum class StorageGlobalOperation {
EXISTENCE_CONSTRAINT_DROP,
UNIQUE_CONSTRAINT_CREATE,
UNIQUE_CONSTRAINT_DROP,
SCHEMA_CREATE,
SCHEMA_DROP,
};
constexpr bool IsWalDeltaDataTypeTransactionEnd(const WalDeltaData::Type type) {
@ -148,6 +158,8 @@ constexpr bool IsWalDeltaDataTypeTransactionEnd(const WalDeltaData::Type type) {
case WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP:
case WalDeltaData::Type::UNIQUE_CONSTRAINT_CREATE:
case WalDeltaData::Type::UNIQUE_CONSTRAINT_DROP:
case WalDeltaData::Type::SCHEMA_CREATE:
case WalDeltaData::Type::SCHEMA_DROP:
return true;
}
}
@ -188,6 +200,10 @@ void EncodeTransactionEnd(BaseEncoder *encoder, uint64_t timestamp);
void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper, StorageGlobalOperation operation,
LabelId label, const std::set<PropertyId> &properties, uint64_t timestamp);
/// Function used to encode non-transactional operation related.
void EncodeOperation(BaseEncoder *encoder, NameIdMapper *name_id_mapper, StorageGlobalOperation operation,
const Schemas::Schema &schema, uint64_t timestamp);
/// Function used to load the WAL data into the storage.
/// @throw RecoveryFailure
RecoveryInfo LoadWal(const std::filesystem::path &path, RecoveredIndicesAndConstraints *indices_constraints,
@ -218,6 +234,8 @@ class WalFile {
void AppendOperation(StorageGlobalOperation operation, LabelId label, const std::set<PropertyId> &properties,
uint64_t timestamp);
void AppendOperation(StorageGlobalOperation operation, const Schemas::Schema &schema, uint64_t timestamp);
void Sync();
uint64_t GetSize();

View File

@ -562,6 +562,12 @@ void Storage::ReplicationClient::ReplicaStream::AppendOperation(durability::Stor
EncodeOperation(&encoder, &self_->storage_->name_id_mapper_, operation, label, properties, timestamp);
}
void Storage::ReplicationClient::ReplicaStream::AppendOperation(durability::StorageGlobalOperation operation,
const Schemas::Schema &schema, uint64_t timestamp) {
replication::Encoder encoder(stream_.GetBuilder());
EncodeOperation(&encoder, &self_->storage_->name_id_mapper_, operation, schema, timestamp);
}
replication::AppendDeltasRes Storage::ReplicationClient::ReplicaStream::Finalize() { return stream_.AwaitResponse(); }
////// CurrentWalHandler //////

View File

@ -28,6 +28,7 @@
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/replication/rpc.hpp"
#include "storage/v2/replication/serialization.hpp"
#include "storage/v2/schemas.hpp"
#include "storage/v2/storage.hpp"
#include "utils/file.hpp"
#include "utils/file_locker.hpp"
@ -62,6 +63,10 @@ class Storage::ReplicationClient {
void AppendOperation(durability::StorageGlobalOperation operation, LabelId label,
const std::set<PropertyId> &properties, uint64_t timestamp);
/// @throw rpc::RpcFailedException
void AppendOperation(durability::StorageGlobalOperation operation, const Schemas::Schema &schema,
uint64_t timestamp);
private:
/// @throw rpc::RpcFailedException
replication::AppendDeltasRes Finalize();

View File

@ -312,13 +312,13 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
switch (delta.type) {
case durability::WalDeltaData::Type::VERTEX_CREATE: {
spdlog::trace(" Create vertex {}", delta.vertex_create_delete.gid.AsUint());
auto transaction = get_transaction(timestamp);
auto *transaction = get_transaction(timestamp);
transaction->CreateVertex(delta.vertex_create_delete.gid);
break;
}
case durability::WalDeltaData::Type::VERTEX_DELETE: {
spdlog::trace(" Delete vertex {}", delta.vertex_create_delete.gid.AsUint());
auto transaction = get_transaction(timestamp);
auto *transaction = get_transaction(timestamp);
auto vertex = transaction->FindVertex(delta.vertex_create_delete.gid, storage::View::NEW);
if (!vertex) throw utils::BasicException("Invalid transaction!");
auto ret = transaction->DeleteVertex(&*vertex);
@ -328,7 +328,7 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
case durability::WalDeltaData::Type::VERTEX_ADD_LABEL: {
spdlog::trace(" Vertex {} add label {}", delta.vertex_add_remove_label.gid.AsUint(),
delta.vertex_add_remove_label.label);
auto transaction = get_transaction(timestamp);
auto *transaction = get_transaction(timestamp);
auto vertex = transaction->FindVertex(delta.vertex_add_remove_label.gid, storage::View::NEW);
if (!vertex) throw utils::BasicException("Invalid transaction!");
auto ret = vertex->AddLabel(transaction->NameToLabel(delta.vertex_add_remove_label.label));
@ -338,7 +338,7 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
case durability::WalDeltaData::Type::VERTEX_REMOVE_LABEL: {
spdlog::trace(" Vertex {} remove label {}", delta.vertex_add_remove_label.gid.AsUint(),
delta.vertex_add_remove_label.label);
auto transaction = get_transaction(timestamp);
auto *transaction = get_transaction(timestamp);
auto vertex = transaction->FindVertex(delta.vertex_add_remove_label.gid, storage::View::NEW);
if (!vertex) throw utils::BasicException("Invalid transaction!");
auto ret = vertex->RemoveLabel(transaction->NameToLabel(delta.vertex_add_remove_label.label));
@ -348,7 +348,7 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
case durability::WalDeltaData::Type::VERTEX_SET_PROPERTY: {
spdlog::trace(" Vertex {} set property {} to {}", delta.vertex_edge_set_property.gid.AsUint(),
delta.vertex_edge_set_property.property, delta.vertex_edge_set_property.value);
auto transaction = get_transaction(timestamp);
auto *transaction = get_transaction(timestamp);
auto vertex = transaction->FindVertex(delta.vertex_edge_set_property.gid, storage::View::NEW);
if (!vertex) throw utils::BasicException("Invalid transaction!");
auto ret = vertex->SetProperty(transaction->NameToProperty(delta.vertex_edge_set_property.property),
@ -360,7 +360,7 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
spdlog::trace(" Create edge {} of type {} from vertex {} to vertex {}",
delta.edge_create_delete.gid.AsUint(), delta.edge_create_delete.edge_type,
delta.edge_create_delete.from_vertex.AsUint(), delta.edge_create_delete.to_vertex.AsUint());
auto transaction = get_transaction(timestamp);
auto *transaction = get_transaction(timestamp);
auto from_vertex = transaction->FindVertex(delta.edge_create_delete.from_vertex, storage::View::NEW);
if (!from_vertex) throw utils::BasicException("Invalid transaction!");
auto to_vertex = transaction->FindVertex(delta.edge_create_delete.to_vertex, storage::View::NEW);
@ -375,7 +375,7 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
spdlog::trace(" Delete edge {} of type {} from vertex {} to vertex {}",
delta.edge_create_delete.gid.AsUint(), delta.edge_create_delete.edge_type,
delta.edge_create_delete.from_vertex.AsUint(), delta.edge_create_delete.to_vertex.AsUint());
auto transaction = get_transaction(timestamp);
auto *transaction = get_transaction(timestamp);
auto from_vertex = transaction->FindVertex(delta.edge_create_delete.from_vertex, storage::View::NEW);
if (!from_vertex) throw utils::BasicException("Invalid transaction!");
auto to_vertex = transaction->FindVertex(delta.edge_create_delete.to_vertex, storage::View::NEW);
@ -398,7 +398,7 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
"Can't set properties on edges because properties on edges "
"are disabled!");
auto transaction = get_transaction(timestamp);
auto *transaction = get_transaction(timestamp);
// The following block of code effectively implements `FindEdge` and
// yields an accessor that is only valid for managing the edge's
@ -550,6 +550,31 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
if (ret != UniqueConstraints::DeletionStatus::SUCCESS) throw utils::BasicException("Invalid transaction!");
break;
}
case durability::WalDeltaData::Type::SCHEMA_CREATE: {
// std::stringstream ss;
// utils::PrintIterable(ss, delta.operation_label_create_schema);
// spdlog::trace(" Create schema on label :{}", delta.operation_label_create_schema.label, ss.str());
// if (commit_timestamp_and_accessor) {
// throw utils::BasicException("Invalid transaction!");
// }
// if (!storage_->CreateSchema(storage_->NameToLabel(delta.operation_label_create_schema.label),
// delta.operation_label_create_schema.schema_properties_types, timestamp)) {
// throw utils::BasicException("Invalid transaction!");
// }
break;
}
case durability::WalDeltaData::Type::SCHEMA_DROP: {
// std::stringstream ss;
// utils::PrintIterable(ss, delta.operation_label);
// spdlog::trace(" Drop schema on label :{}", delta.operation_label.label, ss.str());
// if (commit_timestamp_and_accessor) {
// throw utils::BasicException("Invalid transaction!");
// }
// if (!storage_->DropSchema(storage_->NameToLabel(delta.operation_label.label), timestamp)) {
// throw utils::BasicException("Invalid transaction!");
// }
break;
}
}
}

View File

@ -31,6 +31,8 @@ class Encoder final : public durability::BaseEncoder {
void WriteUint(uint64_t value) override;
// void WriteUint(uint8_t value) override;
void WriteDouble(double value) override;
void WriteString(const std::string_view &value) override;

View File

@ -0,0 +1,83 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <unordered_map>
#include <utility>
#include <vector>
#include "storage/v2/property_value.hpp"
#include "storage/v2/schemas.hpp"
namespace memgraph::storage {
SchemaViolation::SchemaViolation(ValidationStatus status, LabelId label) : status{status}, label{label} {}
SchemaViolation::SchemaViolation(ValidationStatus status, LabelId label, SchemaPropertyType violated_type)
: status{status}, label{label}, violated_type{violated_type} {}
SchemaViolation::SchemaViolation(ValidationStatus status, LabelId label, SchemaPropertyType violated_type,
PropertyValue violated_property_value)
: status{status}, label{label}, violated_type{violated_type}, violated_property_value{violated_property_value} {}
Schemas::SchemasList Schemas::ListSchemas() const {
Schemas::SchemasList ret;
ret.reserve(schemas_.size());
for (const auto &[label_props, schema_property] : schemas_) {
ret.emplace_back(label_props, schema_property);
}
return ret;
}
std::optional<Schemas::Schema> Schemas::GetSchema(const LabelId primary_label) const {
if (auto schema_map = schemas_.find(primary_label); schema_map != schemas_.end()) {
return Schema{schema_map->first, schema_map->second};
}
return std::nullopt;
}
bool Schemas::CreateSchema(const LabelId primary_label, const std::vector<SchemaPropertyType> &schemas_types) {
if (schemas_.contains(primary_label)) {
return false;
}
schemas_.insert({primary_label, schemas_types});
return true;
}
bool Schemas::DropSchema(const LabelId primary_label) { return schemas_.erase(primary_label); }
std::optional<SchemaViolation> Schemas::ValidateVertex(const LabelId primary_label, const Vertex &vertex) {
// TODO Check for multiple defined primary labels
const auto schema = schemas_.find(primary_label);
if (schema == schemas_.end()) {
return SchemaViolation(SchemaViolation::ValidationStatus::NO_SCHEMA_DEFINED_FOR_LABEL, primary_label);
}
if (!utils::Contains(vertex.labels, primary_label)) {
return SchemaViolation(SchemaViolation::ValidationStatus::VERTEX_HAS_NO_PRIMARY_LABEL, primary_label);
}
for (const auto &schema_type : schema->second) {
if (!vertex.properties.HasProperty(schema_type.property_id)) {
return SchemaViolation(SchemaViolation::ValidationStatus::VERTEX_HAS_NO_PROPERTY, primary_label, schema_type);
}
// Property type check
// TODO Can this be replaced with just property id check?
if (auto vertex_property = vertex.properties.GetProperty(schema_type.property_id);
PropertyTypeToSchemaType(vertex_property) != schema_type.type) {
return SchemaViolation(SchemaViolation::ValidationStatus::VERTEX_PROPERTY_WRONG_TYPE, primary_label, schema_type,
vertex_property);
}
}
// TODO after the introduction of vertex hashing introduce check for vertex
// primary key uniqueness
return std::nullopt;
}
} // namespace memgraph::storage

156
src/storage/v2/schemas.hpp Normal file
View File

@ -0,0 +1,156 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <memory>
#include <optional>
#include <unordered_map>
#include <utility>
#include <vector>
#include "common/types.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/indices.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v2/temporal.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex.hpp"
#include "utils/result.hpp"
namespace memgraph::storage {
class SchemaViolationException : public utils::BasicException {
using utils::BasicException::BasicException;
};
struct SchemaPropertyType {
common::SchemaType type;
PropertyId property_id;
};
struct SchemaViolation {
enum class ValidationStatus : uint8_t {
VERTEX_HAS_NO_PRIMARY_LABEL,
VERTEX_HAS_NO_PROPERTY,
NO_SCHEMA_DEFINED_FOR_LABEL,
VERTEX_PROPERTY_WRONG_TYPE
};
SchemaViolation(ValidationStatus status, LabelId label);
SchemaViolation(ValidationStatus status, LabelId label, SchemaPropertyType violated_type);
SchemaViolation(ValidationStatus status, LabelId label, SchemaPropertyType violated_type,
PropertyValue violated_property_value);
ValidationStatus status;
LabelId label;
std::optional<SchemaPropertyType> violated_type;
std::optional<PropertyValue> violated_property_value;
};
/// Structure that represents a collection of schemas
/// Schema can be mapped under only one label => primary label
class Schemas {
public:
using Schema = std::pair<LabelId, std::vector<SchemaPropertyType>>;
using SchemasMap = std::unordered_map<LabelId, std::vector<SchemaPropertyType>>;
using SchemasList = std::vector<Schema>;
Schemas() = default;
Schemas(const Schemas &) = delete;
Schemas(Schemas &&) = delete;
Schemas &operator=(const Schemas &) = delete;
Schemas &operator=(Schemas &&) = delete;
~Schemas() = default;
[[nodiscard]] SchemasList ListSchemas() const;
[[nodiscard]] std::optional<Schemas::Schema> GetSchema(LabelId primary_label) const;
// Returns true if it was successfully created or false if the schema
// already exists
[[nodiscard]] bool CreateSchema(LabelId label, const std::vector<SchemaPropertyType> &schemas_types);
// Returns true if it was successfully dropped or false if the schema
// does not exist
[[nodiscard]] bool DropSchema(LabelId label);
[[nodiscard]] std::optional<SchemaViolation> ValidateVertex(LabelId primary_label, const Vertex &vertex);
private:
SchemasMap schemas_;
};
inline std::optional<common::SchemaType> PropertyTypeToSchemaType(const PropertyValue &property_value) {
switch (property_value.type()) {
case PropertyValue::Type::Bool: {
return common::SchemaType::BOOL;
}
case PropertyValue::Type::Int: {
return common::SchemaType::INT;
}
case PropertyValue::Type::String: {
return common::SchemaType::STRING;
}
case PropertyValue::Type::TemporalData: {
switch (property_value.ValueTemporalData().type) {
case TemporalType::Date: {
return common::SchemaType::DATE;
}
case TemporalType::LocalDateTime: {
return common::SchemaType::LOCALDATETIME;
}
case TemporalType::LocalTime: {
return common::SchemaType::LOCALTIME;
}
case TemporalType::Duration: {
return common::SchemaType::DURATION;
}
}
}
case PropertyValue::Type::Double:
case PropertyValue::Type::Null:
case PropertyValue::Type::Map:
case PropertyValue::Type::List: {
return std::nullopt;
}
}
}
inline std::string SchemaTypeToString(const common::SchemaType type) {
switch (type) {
case common::SchemaType::BOOL: {
return "Bool";
}
case common::SchemaType::INT: {
return "Integer";
}
case common::SchemaType::STRING: {
return "String";
}
case common::SchemaType::DATE: {
return "Date";
}
case common::SchemaType::LOCALTIME: {
return "LocalTime";
}
case common::SchemaType::LOCALDATETIME: {
return "LocalDateTime";
}
case common::SchemaType::DURATION: {
return "Duration";
}
}
}
} // namespace memgraph::storage

View File

@ -28,6 +28,7 @@
#include "storage/v2/indices.hpp"
#include "storage/v2/mvcc.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/schemas.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "utils/file.hpp"
@ -37,6 +38,7 @@
#include "utils/rw_lock.hpp"
#include "utils/spin_lock.hpp"
#include "utils/stat.hpp"
#include "utils/synchronized.hpp"
#include "utils/uuid.hpp"
/// REPLICATION ///
@ -456,12 +458,13 @@ VertexAccessor Storage::Accessor::CreateVertex() {
OOMExceptionEnabler oom_exception;
auto gid = storage_->vertex_id_.fetch_add(1, std::memory_order_acq_rel);
auto acc = storage_->vertices_.access();
auto delta = CreateDeleteObjectDelta(&transaction_);
auto *delta = CreateDeleteObjectDelta(&transaction_);
auto [it, inserted] = acc.insert(Vertex{storage::Gid::FromUint(gid), delta});
MG_ASSERT(inserted, "The vertex must be inserted here!");
MG_ASSERT(it != acc.end(), "Invalid Vertex accessor!");
delta->prev.Set(&*it);
return VertexAccessor(&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_);
return {&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_};
}
VertexAccessor Storage::Accessor::CreateVertex(storage::Gid gid) {
@ -1227,6 +1230,49 @@ ConstraintsInfo Storage::ListAllConstraints() const {
return {ListExistenceConstraints(constraints_), constraints_.unique_constraints.ListConstraints()};
}
SchemasInfo Storage::ListAllSchemas() const {
std::shared_lock<utils::RWLock> storage_guard_(main_lock_);
return {schemas_.ListSchemas()};
}
SchemasInfo Storage::GetSchema(const LabelId primary_label) const {
std::shared_lock<utils::RWLock> storage_guard_(main_lock_);
if (const auto schema = schemas_.GetSchema(primary_label); schema) {
return {{*schema}};
}
return {};
}
bool Storage::CreateSchema(const LabelId primary_label, const std::vector<SchemaPropertyType> &schemas_types,
const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
auto ret = schemas_.CreateSchema(primary_label, schemas_types);
if (!ret) {
return ret;
}
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE, *schemas_.GetSchema(primary_label),
commit_timestamp);
commit_log_->MarkFinished(commit_timestamp);
last_commit_timestamp_ = commit_timestamp;
return schemas_.CreateSchema(primary_label, schemas_types);
}
bool Storage::DropSchema(const LabelId primary_label, std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_);
auto res = schemas_.DropSchema(primary_label);
if (!res) {
return res;
}
const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp);
AppendToWal(durability::StorageGlobalOperation::SCHEMA_DROP, primary_label, {}, commit_timestamp);
commit_log_->MarkFinished(commit_timestamp);
last_commit_timestamp_ = commit_timestamp;
return res;
}
StorageInfo Storage::GetInfo() const {
auto vertex_count = vertices_.size();
auto edge_count = edge_count_.load(std::memory_order_acquire);
@ -1769,6 +1815,25 @@ void Storage::AppendToWal(durability::StorageGlobalOperation operation, LabelId
FinalizeWalFile();
}
void Storage::AppendToWal(durability::StorageGlobalOperation operation, const Schemas::Schema &schema,
uint64_t final_commit_timestamp) {
if (!InitializeWalFile()) return;
wal_file_->AppendOperation(operation, schema, final_commit_timestamp);
{
if (replication_role_.load() == ReplicationRole::MAIN) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->StartTransactionReplication(wal_file_->SequenceNumber());
client->IfStreamingTransaction(
[&](auto &stream) { stream.AppendOperation(operation, schema, final_commit_timestamp); });
client->FinalizeTransactionReplication();
}
});
}
}
FinalizeWalFile();
}
utils::BasicResult<Storage::CreateSnapshotError> Storage::CreateSnapshot() {
if (replication_role_.load() != ReplicationRole::MAIN) {
return CreateSnapshotError::DisabledForReplica;

View File

@ -16,6 +16,7 @@
#include <optional>
#include <shared_mutex>
#include <variant>
#include <vector>
#include "io/network/endpoint.hpp"
#include "storage/v2/commit_log.hpp"
@ -25,14 +26,18 @@
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/edge_accessor.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/indices.hpp"
#include "storage/v2/isolation_level.hpp"
#include "storage/v2/mvcc.hpp"
#include "storage/v2/name_id_mapper.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v2/result.hpp"
#include "storage/v2/schemas.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "utils/exceptions.hpp"
#include "utils/file_locker.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/rw_lock.hpp"
@ -173,6 +178,11 @@ struct ConstraintsInfo {
std::vector<std::pair<LabelId, std::set<PropertyId>>> unique;
};
/// Structure used to return information about existing schemas in the storage
struct SchemasInfo {
Schemas::SchemasList schemas;
};
/// Structure used to return information about the storage.
struct StorageInfo {
uint64_t vertex_count;
@ -364,7 +374,7 @@ class Storage final {
IndicesInfo ListAllIndices() const;
/// Creates an existence constraint. Returns true if the constraint was
/// successfuly added, false if it already exists and a `ConstraintViolation`
/// successfully added, false if it already exists and a `ConstraintViolation`
/// if there is an existing vertex violating the constraint.
///
/// @throw std::bad_alloc
@ -402,6 +412,15 @@ class Storage final {
ConstraintsInfo ListAllConstraints() const;
SchemasInfo ListAllSchemas() const;
SchemasInfo GetSchema(LabelId primary_label) const;
bool CreateSchema(LabelId primary_label, const std::vector<SchemaPropertyType> &schemas_types,
std::optional<uint64_t> desired_commit_timestamp = {});
bool DropSchema(LabelId primary_label, std::optional<uint64_t> desired_commit_timestamp = {});
StorageInfo GetInfo() const;
bool LockPath();
@ -466,6 +485,8 @@ class Storage final {
void AppendToWal(const Transaction &transaction, uint64_t final_commit_timestamp);
void AppendToWal(durability::StorageGlobalOperation operation, LabelId label, const std::set<PropertyId> &properties,
uint64_t final_commit_timestamp);
void AppendToWal(durability::StorageGlobalOperation operation, const Schemas::Schema &schema,
uint64_t final_commit_timestamp);
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
@ -491,6 +512,7 @@ class Storage final {
Constraints constraints_;
Indices indices_;
Schemas schemas_;
// Transaction engine
utils::SpinLock engine_lock_;

View File

@ -2213,6 +2213,8 @@ TEST_P(CypherMainVisitorTest, GrantPrivilege) {
{AuthQuery::Privilege::MODULE_READ});
check_auth_query(&ast_generator, "GRANT MODULE_WRITE TO user", AuthQuery::Action::GRANT_PRIVILEGE, "", "", "user", {},
{AuthQuery::Privilege::MODULE_WRITE});
check_auth_query(&ast_generator, "GRANT SCHEMA TO user", AuthQuery::Action::GRANT_PRIVILEGE, "", "", "user", {},
{AuthQuery::Privilege::SCHEMA});
}
TEST_P(CypherMainVisitorTest, DenyPrivilege) {
@ -2253,6 +2255,8 @@ TEST_P(CypherMainVisitorTest, DenyPrivilege) {
{AuthQuery::Privilege::MODULE_READ});
check_auth_query(&ast_generator, "DENY MODULE_WRITE TO user", AuthQuery::Action::DENY_PRIVILEGE, "", "", "user", {},
{AuthQuery::Privilege::MODULE_WRITE});
check_auth_query(&ast_generator, "DENY SCHEMA TO user", AuthQuery::Action::DENY_PRIVILEGE, "", "", "user", {},
{AuthQuery::Privilege::SCHEMA});
}
TEST_P(CypherMainVisitorTest, RevokePrivilege) {
@ -2295,6 +2299,8 @@ TEST_P(CypherMainVisitorTest, RevokePrivilege) {
{}, {AuthQuery::Privilege::MODULE_READ});
check_auth_query(&ast_generator, "REVOKE MODULE_WRITE FROM user", AuthQuery::Action::REVOKE_PRIVILEGE, "", "", "user",
{}, {AuthQuery::Privilege::MODULE_WRITE});
check_auth_query(&ast_generator, "REVOKE SCHEMA FROM user", AuthQuery::Action::REVOKE_PRIVILEGE, "", "", "user", {},
{AuthQuery::Privilege::SCHEMA});
}
TEST_P(CypherMainVisitorTest, ShowPrivileges) {
@ -4211,3 +4217,76 @@ TEST_P(CypherMainVisitorTest, Foreach) {
ASSERT_TRUE(dynamic_cast<RemoveProperty *>(*++clauses.begin()));
}
}
TEST_P(CypherMainVisitorTest, TestShowSchemas) {
auto &ast_generator = *GetParam();
auto *query = dynamic_cast<SchemaQuery *>(ast_generator.ParseQuery("SHOW SCHEMAS"));
ASSERT_TRUE(query);
EXPECT_EQ(query->action_, SchemaQuery::Action::SHOW_SCHEMAS);
}
TEST_P(CypherMainVisitorTest, TestShowSchema) {
auto &ast_generator = *GetParam();
EXPECT_THROW(ast_generator.ParseQuery("SHOW SCHEMA ON label"), SyntaxException);
EXPECT_THROW(ast_generator.ParseQuery("SHOW SCHEMA :label"), SyntaxException);
EXPECT_THROW(ast_generator.ParseQuery("SHOW SCHEMA label"), SyntaxException);
auto *query = dynamic_cast<SchemaQuery *>(ast_generator.ParseQuery("SHOW SCHEMA ON :label"));
ASSERT_TRUE(query);
EXPECT_EQ(query->action_, SchemaQuery::Action::SHOW_SCHEMA);
EXPECT_EQ(query->label_.name, "label");
}
TEST_P(CypherMainVisitorTest, TestCreateSchema) {
auto &ast_generator = *GetParam();
EXPECT_THROW(ast_generator.ParseQuery("CREATE SCHEMA ON :label"), SyntaxException);
EXPECT_THROW(ast_generator.ParseQuery("CREATE SCHEMA ON :label()"), SyntaxException);
EXPECT_THROW(ast_generator.ParseQuery("CREATE SCHEMA ON :label(123 INTEGER)"), SyntaxException);
EXPECT_THROW(ast_generator.ParseQuery("CREATE SCHEMA ON :label(name TYPE)"), SyntaxException);
EXPECT_THROW(ast_generator.ParseQuery("CREATE SCHEMA ON :label(name, age)"), SyntaxException);
EXPECT_THROW(ast_generator.ParseQuery("CREATE SCHEMA ON :label(name, DURATION)"), SyntaxException);
EXPECT_THROW(ast_generator.ParseQuery("CREATE SCHEMA ON label(name INTEGER)"), SyntaxException);
EXPECT_THROW(ast_generator.ParseQuery("CREATE SCHEMA ON :label(name INTEGER, name INTEGER)"), SemanticException);
EXPECT_THROW(ast_generator.ParseQuery("CREATE SCHEMA ON :label(name INTEGER, name STRING)"), SemanticException);
{
auto *query = dynamic_cast<SchemaQuery *>(ast_generator.ParseQuery("CREATE SCHEMA ON :label1(name STRING)"));
ASSERT_TRUE(query);
EXPECT_EQ(query->action_, SchemaQuery::Action::CREATE_SCHEMA);
EXPECT_EQ(query->label_.name, "label1");
}
{
auto *query = dynamic_cast<SchemaQuery *>(ast_generator.ParseQuery("CREATE SCHEMA ON :label2(name string)"));
ASSERT_TRUE(query);
EXPECT_EQ(query->action_, SchemaQuery::Action::CREATE_SCHEMA);
EXPECT_EQ(query->label_.name, "label2");
}
{
auto *query = dynamic_cast<SchemaQuery *>(
ast_generator.ParseQuery("CREATE SCHEMA ON :label3(first_name STRING, last_name STRING)"));
ASSERT_TRUE(query);
EXPECT_EQ(query->action_, SchemaQuery::Action::CREATE_SCHEMA);
EXPECT_EQ(query->label_.name, "label3");
}
{
auto *query = dynamic_cast<SchemaQuery *>(
ast_generator.ParseQuery("CREATE SCHEMA ON :label4(name STRING, age INTEGER, dur DURATION, birthday "
"LOCALDATETIME, some_time LOCALTIME, speaks_truth BOOL)"));
ASSERT_TRUE(query);
EXPECT_EQ(query->action_, SchemaQuery::Action::CREATE_SCHEMA);
EXPECT_EQ(query->label_.name, "label4");
}
}
TEST_P(CypherMainVisitorTest, TestDropSchema) {
auto &ast_generator = *GetParam();
EXPECT_THROW(ast_generator.ParseQuery("DROP SCHEMA"), SyntaxException);
EXPECT_THROW(ast_generator.ParseQuery("DROP SCHEMA ON label"), SyntaxException);
EXPECT_THROW(ast_generator.ParseQuery("DROP SCHEMA :label"), SyntaxException);
EXPECT_THROW(ast_generator.ParseQuery("DROP SCHEMA ON :label()"), SyntaxException);
auto *query = dynamic_cast<SchemaQuery *>(ast_generator.ParseQuery("DROP SCHEMA ON :label"));
ASSERT_TRUE(query);
EXPECT_EQ(query->action_, SchemaQuery::Action::DROP_SCHEMA);
EXPECT_EQ(query->label_.name, "label");
}

View File

@ -10,8 +10,10 @@
// licenses/APL.txt.
#include <algorithm>
#include <cstddef>
#include <cstdlib>
#include <filesystem>
#include <unordered_set>
#include "communication/bolt/v1/value.hpp"
#include "communication/result_stream_faker.hpp"
@ -38,6 +40,12 @@ auto ToEdgeList(const memgraph::communication::bolt::Value &v) {
list.push_back(x.ValueEdge());
}
return list;
}
auto StringToUnorderedSet(const std::string &element, const size_t number_of_split_elements) {
const auto element_split = memgraph::utils::Split(element, ", ");
MG_ASSERT(element_split.size() == number_of_split_elements);
return std::unordered_set<std::string>(element_split.begin(), element_split.end());
};
struct InterpreterFaker {
@ -1465,3 +1473,148 @@ TEST_F(InterpreterTest, LoadCsvClauseNotification) {
"conversion functions such as ToInteger, ToFloat, ToBoolean etc.");
ASSERT_EQ(notification["description"].ValueString(), "");
}
TEST_F(InterpreterTest, CreateSchemaMulticommandTransaction) {
Interpret("BEGIN");
ASSERT_THROW(Interpret("CREATE SCHEMA ON :label(name STRING, age INTEGER)"),
memgraph::query::ConstraintInMulticommandTxException);
Interpret("ROLLBACK");
}
TEST_F(InterpreterTest, ShowSchemasMulticommandTransaction) {
Interpret("BEGIN");
ASSERT_THROW(Interpret("SHOW SCHEMAS"), memgraph::query::ConstraintInMulticommandTxException);
Interpret("ROLLBACK");
}
TEST_F(InterpreterTest, ShowSchemaMulticommandTransaction) {
Interpret("BEGIN");
ASSERT_THROW(Interpret("SHOW SCHEMA ON :label"), memgraph::query::ConstraintInMulticommandTxException);
Interpret("ROLLBACK");
}
TEST_F(InterpreterTest, DropSchemaMulticommandTransaction) {
Interpret("BEGIN");
ASSERT_THROW(Interpret("DROP SCHEMA ON :label"), memgraph::query::ConstraintInMulticommandTxException);
Interpret("ROLLBACK");
}
TEST_F(InterpreterTest, SchemaTestCreateAndShow) {
// Empty schema type map should result with syntax exception.
ASSERT_THROW(Interpret("CREATE SCHEMA ON :label();"), memgraph::query::SyntaxException);
// Duplicate properties are should also cause an exception
ASSERT_THROW(Interpret("CREATE SCHEMA ON :label(name STRING, name STRING);"), memgraph::query::SemanticException);
ASSERT_THROW(Interpret("CREATE SCHEMA ON :label(name STRING, name INTEGER);"), memgraph::query::SemanticException);
{
// Cannot create same schema twice
Interpret("CREATE SCHEMA ON :label(name STRING, age INTEGER)");
ASSERT_THROW(Interpret("CREATE SCHEMA ON :label(name STRING);"), memgraph::query::QueryException);
}
// Show schema
{
auto stream = Interpret("SHOW SCHEMA ON :label");
ASSERT_EQ(stream.GetHeader().size(), 2U);
const auto &header = stream.GetHeader();
ASSERT_EQ(header[0], "property_name");
ASSERT_EQ(header[1], "property_type");
ASSERT_EQ(stream.GetResults().size(), 2U);
std::unordered_map<std::string, std::string> result_table{{"age", "Integer"}, {"name", "String"}};
const auto &result = stream.GetResults().front();
ASSERT_EQ(result.size(), 2U);
const auto key1 = result[0].ValueString();
ASSERT_TRUE(result_table.contains(key1));
ASSERT_EQ(result[1].ValueString(), result_table[key1]);
const auto &result2 = stream.GetResults().front();
ASSERT_EQ(result2.size(), 2U);
const auto key2 = result2[0].ValueString();
ASSERT_TRUE(result_table.contains(key2));
ASSERT_EQ(result[1].ValueString(), result_table[key2]);
}
// Create Another Schema
Interpret("CREATE SCHEMA ON :label2(place STRING, dur DURATION)");
// Show schemas
{
auto stream = Interpret("SHOW SCHEMAS");
ASSERT_EQ(stream.GetHeader().size(), 3U);
const auto &header = stream.GetHeader();
ASSERT_EQ(header[0], "label");
ASSERT_EQ(header[1], "primary_key");
ASSERT_EQ(header[2], "primary_key_type");
ASSERT_EQ(stream.GetResults().size(), 2U);
std::unordered_map<std::string, std::pair<std::unordered_set<std::string>, std::string>> result_table{
{"label", {{"name::String", "age::Integer"}, "Composite"}},
{"label2", {{"place::String", "dur::Duration"}, "Composite"}}};
const auto &result = stream.GetResults().front();
ASSERT_EQ(result.size(), 3U);
const auto key1 = result[0].ValueString();
ASSERT_TRUE(result_table.contains(key1));
const auto primary_key_split = StringToUnorderedSet(result[1].ValueString(), 2);
ASSERT_TRUE(primary_key_split == result_table[key1].first);
ASSERT_EQ(result[2].ValueString(), result_table[key1].second);
const auto &result2 = stream.GetResults().front();
ASSERT_EQ(result2.size(), 3U);
const auto key2 = result2[0].ValueString();
ASSERT_TRUE(result_table.contains(key2));
const auto primary_key_split2 = StringToUnorderedSet(result2[1].ValueString(), 2);
ASSERT_TRUE(primary_key_split2 == result_table[key2].first);
ASSERT_EQ(result2[2].ValueString(), result_table[key2].second);
}
}
TEST_F(InterpreterTest, SchemaTestCreateDropAndShow) {
Interpret("CREATE SCHEMA ON :label(name STRING, age INTEGER)");
// Wrong syntax for dropping schema.
ASSERT_THROW(Interpret("DROP SCHEMA ON :label();"), memgraph::query::SyntaxException);
// Cannot drop non existant schema.
ASSERT_THROW(Interpret("DROP SCHEMA ON :label1;"), memgraph::query::QueryException);
// Create Schema and Drop
auto get_number_of_schemas = [this]() {
auto stream = Interpret("SHOW SCHEMAS");
return stream.GetResults().size();
};
ASSERT_EQ(get_number_of_schemas(), 1);
Interpret("CREATE SCHEMA ON :label1(name STRING, age INTEGER)");
ASSERT_EQ(get_number_of_schemas(), 2);
Interpret("CREATE SCHEMA ON :label2(name STRING, sex BOOL)");
ASSERT_EQ(get_number_of_schemas(), 3);
Interpret("DROP SCHEMA ON :label1");
ASSERT_EQ(get_number_of_schemas(), 2);
Interpret("CREATE SCHEMA ON :label3(name STRING, birthday LOCALDATETIME)");
ASSERT_EQ(get_number_of_schemas(), 3);
Interpret("DROP SCHEMA ON :label2");
ASSERT_EQ(get_number_of_schemas(), 2);
Interpret("CREATE SCHEMA ON :label4(name STRING, age DURATION)");
ASSERT_EQ(get_number_of_schemas(), 3);
Interpret("DROP SCHEMA ON :label3");
ASSERT_EQ(get_number_of_schemas(), 2);
Interpret("DROP SCHEMA ON :label");
ASSERT_EQ(get_number_of_schemas(), 1);
// Show schemas
auto stream = Interpret("SHOW SCHEMAS");
ASSERT_EQ(stream.GetHeader().size(), 3U);
const auto &header = stream.GetHeader();
ASSERT_EQ(header[0], "label");
ASSERT_EQ(header[1], "primary_key");
ASSERT_EQ(header[2], "primary_key_type");
ASSERT_EQ(stream.GetResults().size(), 1U);
std::unordered_map<std::string, std::pair<std::unordered_set<std::string>, std::string>> result_table{
{"label4", {{"name::String", "age::Duration"}, "Composite"}}};
const auto &result = stream.GetResults().front();
ASSERT_EQ(result.size(), 3U);
const auto key1 = result[0].ValueString();
ASSERT_TRUE(result_table.contains(key1));
const auto primary_key_split = StringToUnorderedSet(result[1].ValueString(), 2);
ASSERT_TRUE(primary_key_split == result_table[key1].first);
ASSERT_EQ(result[2].ValueString(), result_table[key1].second);
}

View File

@ -192,6 +192,11 @@ TEST_F(TestPrivilegeExtractor, ShowVersion) {
EXPECT_THAT(GetRequiredPrivileges(query), UnorderedElementsAre(AuthQuery::Privilege::STATS));
}
TEST_F(TestPrivilegeExtractor, SchemaQuery) {
auto *query = storage.Create<SchemaQuery>();
EXPECT_THAT(GetRequiredPrivileges(query), UnorderedElementsAre(AuthQuery::Privilege::SCHEMA));
}
TEST_F(TestPrivilegeExtractor, CallProcedureQuery) {
{
auto *query = QUERY(SINGLE_QUERY(CALL_PROCEDURE("mg.get_module_files")));