diff --git a/src/memgraph.cpp b/src/memgraph.cpp
index ed1414752..19a73403a 100644
--- a/src/memgraph.cpp
+++ b/src/memgraph.cpp
@@ -186,6 +186,15 @@ DEFINE_bool(telemetry_enabled, false,
             "the database runtime (vertex and edge counts and resource usage) "
             "to allow for easier improvement of the product.");
 
+// Streams flags
+// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
+DEFINE_uint32(
+    stream_transaction_conflict_retries, 30,
+    "Number of times to retry when a stream transformation fails to commit because of conflicting transactions");
+// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
+DEFINE_uint32(
+    stream_transaction_retry_interval, 500,
+    "Retry interval in milliseconds when a stream transformation fails to commit because of conflicting transactions");
 // NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
 DEFINE_string(kafka_bootstrap_servers, "",
               "List of default Kafka brokers as a comma separated list of broker host or host:port.");
@@ -1122,12 +1131,15 @@ int main(int argc, char **argv) {
   }
   storage::Storage db(db_config);
 
-  query::InterpreterContext interpreter_context{&db,
-                                                {.query = {.allow_load_csv = FLAGS_allow_load_csv},
-                                                 .execution_timeout_sec = FLAGS_query_execution_timeout_sec,
-                                                 .default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers,
-                                                 .default_pulsar_service_url = FLAGS_pulsar_service_url},
-                                                FLAGS_data_directory};
+  query::InterpreterContext interpreter_context{
+      &db,
+      {.query = {.allow_load_csv = FLAGS_allow_load_csv},
+       .execution_timeout_sec = FLAGS_query_execution_timeout_sec,
+       .default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers,
+       .default_pulsar_service_url = FLAGS_pulsar_service_url,
+       .stream_transaction_conflict_retries = FLAGS_stream_transaction_conflict_retries,
+       .stream_transaction_retry_interval = std::chrono::milliseconds(FLAGS_stream_transaction_retry_interval)},
+      FLAGS_data_directory};
 #ifdef MG_ENTERPRISE
   SessionData session_data{&db, &interpreter_context, &auth, &audit_log};
 #else
diff --git a/src/query/common.hpp b/src/query/common.hpp
index 9e3dffce2..c855d3885 100644
--- a/src/query/common.hpp
+++ b/src/query/common.hpp
@@ -33,10 +33,6 @@ namespace impl {
 bool TypedValueCompare(const TypedValue &a, const TypedValue &b);
 }  // namespace impl
 
-constexpr inline std::string_view kSerializationErrorMessage{
-    "Cannot resolve conflicting transactions. You can retry this transaction when the conflicting transaction is "
-    "finished."};
-
 /// Custom Comparator type for comparing vectors of TypedValues.
 ///
 /// Does lexicographical ordering of elements based on the above
@@ -95,7 +91,7 @@ storage::PropertyValue PropsSetChecked(T *record, const storage::PropertyId &key
     if (maybe_old_value.HasError()) {
       switch (maybe_old_value.GetError()) {
         case storage::Error::SERIALIZATION_ERROR:
-          throw QueryRuntimeException(kSerializationErrorMessage);
+          throw TransactionSerializationException();
         case storage::Error::DELETED_OBJECT:
           throw QueryRuntimeException("Trying to set properties on a deleted object.");
         case storage::Error::PROPERTIES_DISABLED:
diff --git a/src/query/config.hpp b/src/query/config.hpp
index 4d18cf19c..aaee9326d 100644
--- a/src/query/config.hpp
+++ b/src/query/config.hpp
@@ -10,6 +10,7 @@
 // licenses/APL.txt.
 
 #pragma once
+#include <chrono>
 #include <string>
 
 namespace query {
@@ -23,5 +24,7 @@ struct InterpreterConfig {
 
   std::string default_kafka_bootstrap_servers;
   std::string default_pulsar_service_url;
+  uint32_t stream_transaction_conflict_retries;
+  std::chrono::milliseconds stream_transaction_retry_interval;
 };
 }  // namespace query
diff --git a/src/query/exceptions.hpp b/src/query/exceptions.hpp
index 9b530aa3e..f0881de94 100644
--- a/src/query/exceptions.hpp
+++ b/src/query/exceptions.hpp
@@ -130,6 +130,18 @@ class ExplicitTransactionUsageException : public QueryRuntimeException {
   using QueryRuntimeException::QueryRuntimeException;
 };
 
+/**
+ * An exception for serialization error
+ */
+class TransactionSerializationException : public QueryException {
+ public:
+  using QueryException::QueryException;
+  TransactionSerializationException()
+      : QueryException(
+            "Cannot resolve conflicting transactions. You can retry this transaction when the conflicting transaction "
+            "is finished") {}
+};
+
 class ReconstructionException : public QueryException {
  public:
   ReconstructionException()
diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp
index b8d2a1fff..b8652d3a3 100644
--- a/src/query/plan/operator.cpp
+++ b/src/query/plan/operator.cpp
@@ -181,7 +181,7 @@ VertexAccessor &CreateLocalVertex(const NodeCreationInfo &node_info, Frame *fram
     if (maybe_error.HasError()) {
       switch (maybe_error.GetError()) {
         case storage::Error::SERIALIZATION_ERROR:
-          throw QueryRuntimeException(kSerializationErrorMessage);
+          throw TransactionSerializationException();
         case storage::Error::DELETED_OBJECT:
           throw QueryRuntimeException("Trying to set a label on a deleted node.");
         case storage::Error::VERTEX_HAS_EDGES:
@@ -298,7 +298,7 @@ EdgeAccessor CreateEdge(const EdgeCreationInfo &edge_info, DbAccessor *dba, Vert
   } else {
     switch (maybe_edge.GetError()) {
       case storage::Error::SERIALIZATION_ERROR:
-        throw QueryRuntimeException(kSerializationErrorMessage);
+        throw TransactionSerializationException();
       case storage::Error::DELETED_OBJECT:
         throw QueryRuntimeException("Trying to create an edge on a deleted node.");
       case storage::Error::VERTEX_HAS_EDGES:
@@ -1921,7 +1921,7 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
       if (maybe_value.HasError()) {
         switch (maybe_value.GetError()) {
           case storage::Error::SERIALIZATION_ERROR:
-            throw QueryRuntimeException(kSerializationErrorMessage);
+            throw TransactionSerializationException();
           case storage::Error::DELETED_OBJECT:
           case storage::Error::VERTEX_HAS_EDGES:
           case storage::Error::PROPERTIES_DISABLED:
@@ -1947,7 +1947,7 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
           if (res.HasError()) {
             switch (res.GetError()) {
               case storage::Error::SERIALIZATION_ERROR:
-                throw QueryRuntimeException(kSerializationErrorMessage);
+                throw TransactionSerializationException();
               case storage::Error::DELETED_OBJECT:
               case storage::Error::VERTEX_HAS_EDGES:
               case storage::Error::PROPERTIES_DISABLED:
@@ -1978,7 +1978,7 @@ bool Delete::DeleteCursor::Pull(Frame &frame, ExecutionContext &context) {
           if (res.HasError()) {
             switch (res.GetError()) {
               case storage::Error::SERIALIZATION_ERROR:
-                throw QueryRuntimeException(kSerializationErrorMessage);
+                throw TransactionSerializationException();
               case storage::Error::VERTEX_HAS_EDGES:
                 throw RemoveAttachedVertexException();
               case storage::Error::DELETED_OBJECT:
@@ -2128,7 +2128,7 @@ void SetPropertiesOnRecord(TRecordAccessor *record, const TypedValue &rhs, SetPr
         case storage::Error::DELETED_OBJECT:
           throw QueryRuntimeException("Trying to set properties on a deleted graph element.");
         case storage::Error::SERIALIZATION_ERROR:
-          throw QueryRuntimeException(kSerializationErrorMessage);
+          throw TransactionSerializationException();
         case storage::Error::PROPERTIES_DISABLED:
           throw QueryRuntimeException("Can't set property because properties on edges are disabled.");
         case storage::Error::VERTEX_HAS_EDGES:
@@ -2184,7 +2184,7 @@ void SetPropertiesOnRecord(TRecordAccessor *record, const TypedValue &rhs, SetPr
           case storage::Error::DELETED_OBJECT:
             throw QueryRuntimeException("Trying to set properties on a deleted graph element.");
           case storage::Error::SERIALIZATION_ERROR:
-            throw QueryRuntimeException(kSerializationErrorMessage);
+            throw TransactionSerializationException();
           case storage::Error::PROPERTIES_DISABLED:
             throw QueryRuntimeException("Can't set property because properties on edges are disabled.");
           case storage::Error::VERTEX_HAS_EDGES:
@@ -2299,7 +2299,7 @@ bool SetLabels::SetLabelsCursor::Pull(Frame &frame, ExecutionContext &context) {
     if (maybe_value.HasError()) {
       switch (maybe_value.GetError()) {
         case storage::Error::SERIALIZATION_ERROR:
-          throw QueryRuntimeException(kSerializationErrorMessage);
+          throw TransactionSerializationException();
         case storage::Error::DELETED_OBJECT:
           throw QueryRuntimeException("Trying to set a label on a deleted node.");
         case storage::Error::VERTEX_HAS_EDGES:
@@ -2357,7 +2357,7 @@ bool RemoveProperty::RemovePropertyCursor::Pull(Frame &frame, ExecutionContext &
         case storage::Error::DELETED_OBJECT:
           throw QueryRuntimeException("Trying to remove a property on a deleted graph element.");
         case storage::Error::SERIALIZATION_ERROR:
-          throw QueryRuntimeException(kSerializationErrorMessage);
+          throw TransactionSerializationException();
         case storage::Error::PROPERTIES_DISABLED:
           throw QueryRuntimeException(
               "Can't remove property because properties on edges are "
@@ -2428,7 +2428,7 @@ bool RemoveLabels::RemoveLabelsCursor::Pull(Frame &frame, ExecutionContext &cont
     if (maybe_value.HasError()) {
       switch (maybe_value.GetError()) {
         case storage::Error::SERIALIZATION_ERROR:
-          throw QueryRuntimeException(kSerializationErrorMessage);
+          throw TransactionSerializationException();
         case storage::Error::DELETED_OBJECT:
           throw QueryRuntimeException("Trying to remove labels from a deleted node.");
         case storage::Error::VERTEX_HAS_EDGES:
diff --git a/src/query/stream/streams.cpp b/src/query/stream/streams.cpp
index 220a50152..a4b769b0d 100644
--- a/src/query/stream/streams.cpp
+++ b/src/query/stream/streams.cpp
@@ -232,26 +232,30 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std
 
   auto *memory_resource = utils::NewDeleteResource();
 
-  auto consumer_function =
-      [interpreter_context = interpreter_context_, memory_resource, stream_name,
-       transformation_name = stream_info.common_info.transformation_name, owner = owner,
-       interpreter = std::make_shared<Interpreter>(interpreter_context_),
-       result = mgp_result{nullptr, memory_resource}](const std::vector<typename TStream::Message> &messages) mutable {
-        auto accessor = interpreter_context->db->Access();
-        EventCounter::IncrementCounter(EventCounter::MessagesConsumed, messages.size());
-        CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name);
+  auto consumer_function = [interpreter_context = interpreter_context_, memory_resource, stream_name,
+                            transformation_name = stream_info.common_info.transformation_name, owner = owner,
+                            interpreter = std::make_shared<Interpreter>(interpreter_context_),
+                            result = mgp_result{nullptr, memory_resource},
+                            total_retries = interpreter_context_->config.stream_transaction_conflict_retries,
+                            retry_interval = interpreter_context_->config.stream_transaction_retry_interval](
+                               const std::vector<typename TStream::Message> &messages) mutable {
+    auto accessor = interpreter_context->db->Access();
+    EventCounter::IncrementCounter(EventCounter::MessagesConsumed, messages.size());
+    CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name);
 
-        DiscardValueResultStream stream;
+    DiscardValueResultStream stream;
 
-        spdlog::trace("Start transaction in stream '{}'", stream_name);
-        utils::OnScopeExit cleanup{[&interpreter, &result]() {
-          result.rows.clear();
-          interpreter->Abort();
-        }};
+    spdlog::trace("Start transaction in stream '{}'", stream_name);
+    utils::OnScopeExit cleanup{[&interpreter, &result]() {
+      result.rows.clear();
+      interpreter->Abort();
+    }};
+
+    const static std::map<std::string, storage::PropertyValue> empty_parameters{};
+    uint32_t i = 0;
+    while (true) {
+      try {
         interpreter->BeginTransaction();
-
-        const static std::map<std::string, storage::PropertyValue> empty_parameters{};
-
         for (auto &row : result.rows) {
           spdlog::trace("Processing row in stream '{}'", stream_name);
           auto [query_value, params_value] =
@@ -264,7 +268,7 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std
               interpreter->Prepare(query, params_prop.IsNull() ? empty_parameters : params_prop.ValueMap(), nullptr);
           if (!interpreter_context->auth_checker->IsUserAuthorized(owner, prepare_result.privileges)) {
             throw StreamsException{
-                "Couldn't execute query '{}' for stream '{}' becuase the owner is not authorized to execute the "
+                "Couldn't execute query '{}' for stream '{}' because the owner is not authorized to execute the "
                 "query!",
                 query, stream_name};
           }
@@ -274,8 +278,16 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std
         spdlog::trace("Commit transaction in stream '{}'", stream_name);
         interpreter->CommitTransaction();
         result.rows.clear();
-      };
-
+        break;
+      } catch (const query::TransactionSerializationException &e) {
+        if (i == total_retries) {
+          throw;
+        }
+        ++i;
+        std::this_thread::sleep_for(retry_interval);
+      }
+    }
+  };
   auto insert_result = map.try_emplace(
       stream_name, StreamData<TStream>{std::move(stream_info.common_info.transformation_name), std::move(owner),
                                        std::make_unique<SynchronizedStreamSource<TStream>>(