Last active September 19, 2024 21:00
THEORY: Distributed Transactions and why you should avoid them (2 Phase Commit , Saga Pattern, TCC, Idempotency etc)

Distributed Transactions and why you should avoid them

  1. Modern technologies won't support it (RabbitMQ, Kafka, etc.);
  2. This is a form of using Inter-Process Communication in a synchronized way and this reduces availability;
  3. All participants of the distributed transaction need to be avaiable for a distributed commit, again: reduces availability.

Implementing business transactions that span multiple services is not straightforward. Distributed transactions are best avoided because of the CAP theorem. Moreover, many modern (NoSQL) databases don’t support them. The best solution is to use the Saga Pattern.


One of the most well-known patterns for distributed transactions is called Saga. The first paper about it was published back in 1987 and has it been a popular solution since then.

There are a couple of different ways to implement a saga transaction, but the two most popular are:

  • Events/Choreography: When there is no central coordination, each service produces and listen to other service’s events and decides if an action should be taken or not;
  • Command/Orchestration: when a coordinator service is responsible for centralizing the saga’s decision making and sequencing business logic;
rponte commented Mar 21, 2022

Ainda sobre transações distribuídas e garantias de entrega:

⭐️ What You Want Is What You Don’t: Understanding Trade-Offs in Distributed Messaging

If you’re distributed, forget about ordering and start thinking about commutativity. Forget about guaranteed delivery and start thinking about idempotence.

Outros artigos:

  • Nobody Needs Reliable Messaging
  • What is commutativity and why is it so useful in distributed systems?
    • If you can make the order not matter, you often can eliminate a lot of coordination code.

  • ⭐️ You Cannot Have Exactly-Once Delivery
  • ⭐️ You Cannot Have Exactly-Once Delivery Redux
  • ⭐️ Exactly-once message delivery (or Exactly-once processing)
    • Finally the message broker itself. Many message brokers claim to support exactly-once message processing to some degree. Unfortunately this is very confusing because the message broker is precisely the place where exactly-once message processing cannot be implemented.

      What we wanted to prove in this article is that exactly-once processing necessarily requires some form of participation from the message processing endpoint. This participation may be in form of using a shared data store, taking part in distributed transactions or implementing a protocol.

  • ⭐️ Dissecting SQS FIFO Queues — Does Ordered and Exactly Once Messaging Really Exist?
    • FIFO SQS queues offer exactly once processing only if 1) publishers never publish duplicate messages wider than five minutes apart and 2) consumers never fail to delete messages they have processed from the queue. [...] If your system cannot be designed to handle these two cases, you will not be able to use FIFO queues for exactly once processing.

    • [...] The practical implication of this result is that any first-in-first-out and exactly-once message queue will have limited throughput. This result is a proof, not an implementation detail.

    • [...] if you are building two applications that need in-order and exactly-once messaging to function, what you probably want is just to use the same database or commit log. [...] When in-order and exactly-once messaging looks like a good solution to your problem, consider using the same datastore as an alternative that is usually simpler, more performant, and less error prone.

    • For publishers, we saw that the only way to guarantee ordered message delivery is to restrict throughput to a single publisher per message group. For consumers, we saw that the only way to guarantee exactly-once and in order processing is to process messages one at a time by coordinating message processing through a transactional datastore. You also need to somehow guarantee that duplicate publishes never happen at wider than five minute intervals, or that consumers check existing processed data and drop duplicates during processing.

    • By using FIFO queues, you can guarantee in-order message processing at the expense of reduced throughput, but will still need to handle duplicate messages in the degenerate case.

  • ⭐️ FIFO, Exactly-Once, and Other Costs
    • In the article, Kevin shows how FIFO delivery is really only meaningful when you have one single-threaded publisher and one single-threaded receiver.

    • When a message is delivered to a consumer, it remains in the queue until it’s acked. The visibility timeout prevents other consumers from processing it. With FIFO queues, this also means head-of-line blocking for other messages in the same group.

    • When this happens [a prolonged GC pause, crash, network delay, whatever], the visibility timeout expires and the messages are redelivered and, potentially, reprocessed. [...] We might do this by using a database transaction to atomically process and acknowledge the messages. An alternative, yet similar, approach might be to use a write-ahead-log-like strategy whereby the consuming system reads messages from SQS and transactionally stores them in a database for future processing. Once the messages have been committed, the consumer deletes the messages from SQS. In either of these approaches, we’re basically shifting the onus of exactly-once processing onto an ACID-compliant relational database.

    • Note that this is really how Kafka achieves its exactly-once semantics. It requires end-to-end cooperation for exactly-once to work. State changes in your application need to be committed transactionally with your Kafka offsets.

    • [...] Solving either of these problems [SQS FIFO conditions to achieve Exactly-once processing] probably requires some kind of coordination between the application and queue, likely in the form of a database transaction. And if you’re using a database either as the message source, sink, or both, what are exactly-once FIFO queues actually buying you? You’re paying a seemingly large cost in throughput for little perceived value. Your messages are already going through some sort of transactional boundary that provides ordering and uniqueness.

    • Where I see FIFO and exactly-once semantics being useful is when talking to systems which cannot cooperate with the end-to-end transaction. This might be a legacy service or a system with side effects, such as sending an email.

    • When people describe a messaging system with FIFO and exactly-once semantics, they’re usually providing a poor description of a relational database with support for ACID transactions. Providing these semantics in a messaging system likely still involves database transactions, it’s just more complicated. It turns out relational databases are really good at ensuring invariants like exactly-once.

    • That’s not to say that developers are dumb or incapable of understanding, but the fact is your average developer is simply not thinking about all of the edge cases brought on by operating distributed systems at scale. They see “exactly-once FIFO queues” in SQS or “exactly-once delivery” in Kafka and take it at face value.

    • On the contrary, exactly-once semantics require careful construction of your application, assume a closed, transactional world, and do not support the case where I think people want exactly-once the most: side effects.

    • Interestingly, one of my chief concerns about Kafka’s implementation was what the difficulty of ensuring end-to-end cooperation would be in practice. Side effects into downstream systems with no support for idempotency or transactions could make it difficult. [...] so all you really need to do is commit your offsets and state changes together.

    • That’s not to say exactly-once semantics, as offered in systems like SQS and Kafka, are not useful. I think we just need to be more conscientious of the other costs and encourage developers to more deeply understand the solution space—too much sprinkling on of Kafka or exactly-once or FIFO and not enough thinking about the actual business problem.

Mensageria e At-Least Once Delivery

Trabalhar com mensageria com garantias At-Least Once significa abraçar seus tradeoffs (⭐️):

Apart from being duplicated, in-flight messages can get re-ordered. There are many reasons for this to happen, one of the most obvious being message re-delivery mechanism. If a delivery fails, a message is available for reprocessing only after some back-off period. Any other in-flight message can be processed during that time causing the respective order of those messages to change.

When combined, duplication and re-ordering can result, at the receiver side, in many different processing sequences. The only guarantee is that the resulting sequence contains at least one copy of each message sent.


rponte commented Apr 25, 2022

Message Ordering e sua complexidade

Message Ordering eh das coisas mais escorregadias de se fazer em Mensageira.

Não importa seu broker, seja um #RabbitMQ, #ActiveMQ ou #Kafka.

Mesmo quando se tem 1-único-Producer e 1-único-Consumer ainda assim existem edge-cases que podem levar a mensagens fora de ordem e consequentemente inconsistência dos dados 🤡

No fim, devemos abraçar um príncipio chamado de “The End-to-End Principle” (E2E Principle).

Quando falo de E2E Principle, estou falando de trazer alguma “inteligência” para as pontas do seu sistema, para seu producer e consumer, e se nortear por:

  1. abrace At-Least Once Delivery;

  2. não confie no seu Broker Delivery Order;

  3. comutatividade em vez ordenação;

  4. idempotência e/ou deduplication;

1) abrace At-Least Once Delivery

É praticamente impossível implementar a semântica Exactly-Once Delivery, não acredite cegamente no que os vendors te falam!

O que podemos fazer eh fingir sua execução implementando mecanismos que a façam acontecer em cima da semântica At Least-Once Delivery, como o uso de idempotência, deduplication, transações etc.

2) não confie no seu Broker Delivery Order

A ordem que as mensagens entram e saem no seu broker tem pouca ou nenhuma importância, confiar nela eh a melhor forma de se dar mal.

O que importa de fato eh a ordem de negócio das mensagens (Business event order), pois eh com base nela que vc vai desenhar e implementar sua solução.

3) comutatividade em vez ordenação

Dado que a ordem de mensagens não pode ser confiada nem garantida pelo broker, aceite que as mensagens podem (e eventualmente irão) chegar fora de ordem.

Portanto, programe seu código em consumers para isso, não espere o pior acontecer.

4) idempotência e deduplication

Embora ambos os conceitos sejam fáceis de entender, eles não necessariamente são fáceis de implementar.

Especialmente se você lida com sistemas externos, ou seja, sua lógica causa efeitos colaterais em outros serviços, como RDBMS, cache ou um processo qualquer.


Enfim, sem E2E Principle, a tal “inteligência” nas bordas, eventualmente você terá problemas com ordem de mensagens e inconsistência de dados.

Isso pode acontecer com você agora ou em 10 anos, quem sabe 🤷‍♀️

Você precisa se preocupar? Diria que sim, mas implementar algo para resolver eh outra estória. Desenhar e implementar para tolerância a falhas tem custo e sua criticidade depende do seu contexto.


rponte commented Dec 8, 2022

Artigos bacanas sobre Outbox e Inbox Pattern, garantias de entrega e uso de Logical Replication do Postgres em vez de um Job em background:

rponte commented Mar 15, 2023

rponte commented Mar 30, 2023

iseki0 commented Nov 12, 2023

But as you said, saga is a distribution transaction pattern also. So... what is "avoid distribution transaction"?

rponte commented Nov 17, 2023

This article has a very didact explanation of how to implement the different types of delivery semantics, as we can see below:

Offset Manager

Each message in Kafka is associated with an offset - an integer number denoting its position in the current partition. By storing this number, we essentially provide a checkpoint for our consumer. If it fails and comes back, it knows from where to continue. As such, it is vital for implementing various processing guarantees in Kafka:

  • For at-most-once, we need to save $offset + 1 before processing $offset. If our consumer fails before successfully process $offset and restarts, it will continue from $offset + 1 and not reprocess $offset.
  • For at-least-once, we need to successfully process $offset before saving $offset + 1. If our consumer fails before saving $offset + 1 and restarts, it will continue from and reprocess $offset.
  • For exactly-once using an external transactional storage - we need to process $offset and save $offset + 1 within one transaction and roll back if anything goes wrong.

rponte commented Jan 19, 2024

rponte commented Jan 24, 2024

Meu RT quando caiu a ficha:

excelente pergunta do @tomazfernandes_ 👏🏻👏🏻

uma solução robusta, tolerante a falhas e barata de implementar que seja retriable, idempotente e abrace consistência eventual eh justamente Outbox Pattern 🤤🤤

viva o ACID dos RDBMS + aquele job em background que roda a cada 1min 🥳

rponte commented Apr 3, 2024

Tweet by @DominikTornow

The ingredients of failure tolerance & transparency are more basic than you would think:

1⃣ Guarantee retries via persisting the invocation
2⃣ Guarantee correctness of retries via idempotent steps

Outlined in Fault Tolerance via Idempotence


rponte commented Jul 7, 2024

Reservation Queue Pattern

An excellent way to understand this pattern is to remember that it works the same way as the SQS Visibility Timeout: when a consumer claims a message from the queue, no other consumers can claim it as well, so it has to process the message before the visibility timeout exceeds.

rponte commented Jul 10, 2024

Although I like to think that Saga Pattern is a kind of (loose) distributed transaction, I liked how this article described the Saga Pattern as not being a distributed transaction:

Unfortunately, as I said earlier, the saga is not a distributed transaction, but its mere replacement. That means that the saga doesn’t meet some of the requirements applied to transactions (I’ll spare the ACID explanation[3]). The absence of Isolation[4] is the main concern, which means that all the intermediate changes made by one saga are available for other sagas (and usually even for end users), despite the fact that they may be rolled back if the saga fails. At the same time several sagas can operate the same resources without even suspecting about each other. Consequences of this behavior may be unpredictable and even dangerous for the whole system’s data integrity

rponte commented Jul 22, 2024

rponte commented Aug 12, 2024

Starbucks Does Not Use Two-Phase Commit

Excellent articles from 2004 and 2005 written by Gregor Hohpe, co-creator of the EIP book, about error handling in distributed systems:

Error-handling strategies for loosely coupled systems

Basically, both articles say that when we try to write 2 or more systems in a business transaction and one of them fails, we have 4 strategies to do error handling:

  1. Write-off: just ignore, do nothing;
  2. Retry: try to send the message again. Retry is a plausible option if there’s a realistic chance that the retry will succeed;
  3. Compensating Action: try to undo already completed operations to return the system to a consistent state;
  4. Transaction coordinator: follow the Two-Phase Commit (2PC) approach. Although a two-phase commit can make life a lot simpler, it can also hurt the free flow of messages (and therefore scalability) because it requires the allocation of a stateful transaction resource across the flow of multiple, asynchronous actions. When amounts and stakes are larger, a pessimistic two-phase-commit approach is more appropriate;

Those strategies are exactly the ones discussed in this amazing talk: Six Little Lines of Fail

Screenshot 2024-08-12 at 18 44 15

Other contents

rponte commented Aug 13, 2024

Some talks on how Nubank has designed and implemented consistency in their distributed systems with Kafka and Datomic:

  • Kafka Summit 2019: Maintaining Consistency for a Financial Event-Driven Architecture
    • First principle: "Whenever an event happens, the entire world must eventually be in a state where the event happened exactly once. Not zero times, not more than once, exactly once."
      (The idea here is to have a generic solution to maintain consistency among the systems using techniques such as DLQs, background jobs, and retries to allow replay events. In scenarios of outages, it is also important to have circuit breakers well configured between services)
    • Second principle: "Every event shall be idempotent."
      (Once all events (and operations) are idempotents, everything becomes too much easier to implement the first principle)
  • Clojure South 2019
    • Maintaining integrity in distributed systems with a side effect machine (🇧🇷 pt_BR)
    • Challenges of Building an Event Sourced Banking Account | Rafael Ring
      • This talk is about how Nubank implements Event Sourcing and deals with its hard parts, such as eventual consistency, concurrency, reads, and stale calculation performance;
      • Concurrency: Make sure to have idempotent services and whenever possible, rely on uniqueness checks from your database to avoid duplicating data;
      • Eventual Consistency: Try to present the data as eventual for the customer, use distributed tracing, and ensure that the system knows how to deal with failure;
      • Reads: BFFs can help aggregating data for the customer views, and polling can be used to remove the async part of reads;
      • State calculating performance: For very intensive calculation, building a cache with the same event source principles may help; The up-to-date balance feature is the main example of using a cache inside the same database leveraging the ACID properties to guarantee that after each transaction (like a deposit) everything is synchronized atomically;

rponte commented Aug 15, 2024

rponte commented Aug 28, 2024

⭐️ Cache Consistency with Database by Daniel Wu

It’s often impossible to implement a linearizability consistency model with distributed cache and database systems considering all kinds of errors and failures. Every cache pattern has its limitation and in some cases you cannot get sequential consistency, or sometimes you get unexpected latency between cache and database. With all the solutions I showed in this article, there are always corner cases that you might encounter with high concurrency. So, there is no silver bullet for this, know the limitation and define your consistency requirement before you choose a solution. If you want linearizability consistency with fault-tolerance, you’d better not use cache at all.

rponte commented Aug 29, 2024

Excellent content by Tyler Treat:

Building Reliable Systems

  • Slides: Push It to the Limit: Considerations for Building Reliable Systems - by Tyler Treat
  • Designed to Fail
    • With monolithic systems, we care more about preventing failure from occurring. [...] With highly distributed microservice architectures where failure is all but guaranteed, we embrace it.

    • What does it mean to embrace failure? Anticipating failure is understanding the behavior when things go wrong, building systems to be resilient to it, and having a game plan for when it happens, either manual or automated.

    • The key to being highly available is learning to be partially available. Frequently, one of the requirements for partial availability is telling the client “no.”.

    • [...] we made explicit decisions to drop messages on the floor if we detect the system is becoming overloaded. As queues become backed up, incoming messages are discarded, a statsd counter is incremented, and a backpressure notification is sent to the client. Upon receiving this notification, the client can respond accordingly by failing fast, exponentially backing off, or using some other flow-control strategy.

    • If an overloaded service is not essential to core business, we fail fast on calls to it to prevent availability or latency problems upstream. For example, a spam-detection service is not essential to an email system, so if it’s unavailable or overwhelmed, we can simply bypass it.

    • By isolating and controlling it, we can prevent failure from becoming widespread and unpredictable. By building in backpressure mechanisms and other types of intentional “failure” modes, we can ensure better availability and reliability for our systems through graceful degradation.

Distributed Systems Are a UX Problem

  • Blog: Distributed Systems Are a UX Problem
  • Video: Distributed systems are a UX problem
    • We need to shift the focus from system properties and guarantees to business rules and application behavior.

  • Slides: Distributed Systems Are a UX Problem - by Tyler Treat
    • We choose consistency over availability, or we can choose availability by making local decisions with the knowledge at hand and designing the UX accordingly.

    • Managing partial failure is a matter of dealing with partial knowledge and managing risk.

    • Systems don't make decisions, they make guesses.

    • Systems need the capacity to apologize.

    • Customers judge you not by your failures, but by how you handle your failures.

    • Businesses need both code and people to manage apoligies.

rponte commented Aug 30, 2024

rponte commented Sep 2, 2024

Life beyond Distributed Transactions: an Apostate’s Opinion

rponte commented Sep 2, 2024

rponte commented Sep 5, 2024


rponte commented Sep 10, 2024

Exactly-once message processsing

Distributed algorithms are difficult. If you find yourself struggling to understand one of them, we assure you – you are not alone. We have spent last couple of years researching ways to ensure exactly-once message processing in systems that exchange messages in an asynchronous and durable way (a.k.a. message queues) and you know what? We still struggle and make silly mistakes. The reason is that even a very simple distributed algorithm generates vast numbers of possible execution paths.

Very good article, Exactly-once intuition, about a set of heuristics that are very helpful in sketching the structure of an algorithm to achieve exactly-once message processing. Below there're a summary of those heuristics:

  1. The transaction and the side effects: The outcome of processing a message consists of two parts. There is a transactional part and a side effects part. The transaction consists of application state change and of marking the incoming message as processed. The side effects include things like creating objects in non-transactional data stores (e.g. uploading a blob) and sending messages.;
  2. Until the transaction is committed, nothing happened: In order for an algorithm to behave correctly, it has to guarantee that until a transaction is committed, no effects of the processing are visible to the external observers.
  3. Prepare - Commit - Publish: [...] For this reason any correct algorithm has to make sure the side effects are made durable, but not visible (prepared), before the transaction is committed. Then, after the commit, the side affects are published.
  4. Side effects stored in each processing attempt are isolated: [...] In our PDF example each processing attempt would generate its own PDF document but only the attempt that succeeded to commit would publish its outgoing messages, announcing to the world the true location of the PDF.
  5. Register - Cleanup: Although we can’t avoid generating garbage, a well-behaved algorithm ensures that the garbage is eventually cleaned up.
  6. Concurrency control ensures serialization of processing: [...] the outbox record also contains the side effects information. It can exist in only two states: created and dispatched. The transition from created to dispatched does not generate any new information so it does not require concurrency control to prevent lost writes.

rponte commented Sep 16, 2024

rponte commented Sep 18, 2024

Scaling Shared Data in Distributed Systems

  • Consistency, by definition, requires linearizability. In multi-threaded programs, we achieve this with mutexes. In distributed systems, we use transactions and distributed locking. Intuitively, both involve performance trade-offs.

  • There are several different strategies, each with their own pros and cons: Immutable Data > Last-Write Wins > Application-Level Conflict Resolution > Causal Ordering > Distributed Data Types

  • Use weakly consistent models when you can because they afford you high availability and low latency, and rely on stronger models only when absolutely necessary. Do what makes sense for your system.

