For almost half a century, ACID transactions (satisfying the properties of atomicity, consistency, isolation, and durability) have been the abstraction of choice for ensuring consistency in data-storage systems. The well-known atomicity property ensures that either all or none of a transaction's writes take effect in the case of a failure; isolation prevents interference from concurrently running transactions; and durability ensures that writes made by committed transactions are not lost in the case of a failure.
While transactions work well within the scope of a single database product, transactions that span several different data-storage products from distinct vendors have been problematic: many storage systems do not support them, and those that do often perform poorly. Today, large-scale applications are often implemented by combining several distinct data-storage technologies that are optimized for different access patterns. Distributed transactions have failed to gain adoption in most such settings, and most large-scale applications instead rely on ad hoc, unreliable approaches for maintaining the consistency of their data systems.
In recent years, however, there has been an increase in the use of event logs as a data-management mechanism in large-scale applications. This trend includes the event-sourcing approach to data modeling, the use of change data capture systems, and the increasing popularity of log-based publish/subscribe systems such as Apache Kafka. Although many databases use logs internally (for example, write-ahead logs or replication logs), this new generation of log-based systems is different: rather than using logs as an implementation detail, they raise them to the level of the application-programming model.
Since this approach uses application-defined events to solve problems that traditionally fall in the transaction-processing domain, we name it OLEP (online event processing) to contrast with OLTP (online transaction processing) and OLAP (online analytical processing). This article explains the reasons for the emergence of OLEP and shows how it allows applications to guarantee strong consistency properties across heterogeneous data systems, without resorting to atomic commit protocols or distributed locking. The architecture of OLEP systems allows them to achieve consistent high performance, fault tolerance, and scalability.
Different data-storage systems are designed for different access patterns, and there is no single one-size-fits-all storage technology that is able to serve all possible uses of data efficiently. Consequently, many applications today use a combination of several different storage technologies, an approach sometimes known as polyglot persistence.
Note these storage systems are not fully independent of each other. Rather, it is common for one system to hold a copy or materialized view of data in another system. Thus, when data in one system is updated, it often needs to be updated in another, as illustrated in Figure 1.
OLTP transactions are predefined and short. In the traditional view, as implemented by most relational database products today, a transaction is an interactive session in which a client's queries and data modification commands are interleaved with arbitrary processing and business logic on the client. Moreover, there is no time limit for the duration of a transaction, since the session traditionally may have included human interaction.
The architecture of online event processing systems allows them to achieve consistent high performance, fault tolerance, and scalability.
However, reality today looks different. Most OLTP database transactions are triggered by a user request made via HTTP to a Web application or Web service. In the vast majority of applications, the span of a transaction extends no longer than the handling of a single HTTP request. This means that by the time the service sends its response to the user, any transactions on the underlying databases have already been committed or aborted. In a user workflow that spans several HTTP requests (for example, adding an item to a cart, going to checkout, confirming the shipping address, entering payment details, and giving a final confirmation), no one transaction spans the entire user workflow; there are only short, noninteractive transactions to handle single steps of the workflow.
Moreover, an OLTP system generally executes a fairly small set of known transaction patterns. On this basis, some database systems encapsulate the business logic of transactions as stored procedures that are registered ahead of time by the application. To execute a transaction, a stored procedure is invoked with certain input parameters, and the procedure then runs to completion on a single execution thread without communicating with any nodes outside of the database.
Heterogeneous distributed transactions are problematic. It is important to distinguish between two types of distributed transactions:
While some homogeneous transaction implementations have proved successful, heterogeneous transactions continue to be problematic. By their nature, they can only rely on a lowest common denominator of participating systems. For example, XA transactions block execution if the application process fails during the prepare phase; moreover, XA provides no deadlock detection and no support for optimistic concurrency-control schemes.3
Many of the systems listed here, such as search indexes, do not support XA or any other heterogeneous transaction model. Thus, ensuring the atomicity of writes across different storage technologies remains a challenging problem for applications.
Figure 1 shows an example of polyglot persistence: an application that needs to maintain records in two separate storage systems such as an OLTP database (for example, an RDBMS) and a full-text search server. If heterogeneous distributed transactions are available, the system can ensure atomicity of writes across the two systems. Most search servers do not support distributed transactions, however, leaving the system vulnerable to these potential inconsistencies:
Figure 2 presents a simple solution to these problems: when the application wants to update a record, rather than performing direct writes to the two storage systems, it appends an update event to a log. The database and the search index each subscribe to this log and write updates to their storage in the order they appear in the log.4 By sequencing updates through a log, the database and the search index apply the same set of writes in the same order, keeping them consistent with each other. In effect, the database and the search index are materialized views onto the sequence of events in the log. This approach solves both of the aforementioned problems as follows:
In this example, the log serializes writes only, but the application may read from the storage systems at any time. Since the log subscribers are asynchronous, reading the index may return a record that does not yet exist in the database, or vice versa; such transient inconsistencies are not a problem for many applications. For those applications that require it, reads can also be serialized through the log; an example of this is presented later.
The log abstraction. There are several log implementations that can serve this role, including Apache Kafka, CORFU (from Microsoft Research), Apache Pulsar, and Facebook's LogDevice. The required log abstraction has the following properties:
The following assumptions are made about subscribers of a log:
These assumptions are satisfied by existing log-based stream-processing frameworks such as Apache Kafka Streams and Apache Samza. Updating state deterministically based on an ordered log corresponds to the classic state machine replication principle.5 Since it is possible for an event to be processed more than once when recovering from a failure, state updates must also be idempotent.
Aside: Exactly-once semantics. Some log-based stream processors such as Apache Flink support so-called exactly-once semantics, which means that even though an event may be processed more than once, the effect of the processing will be the same as if it had been processed exactly once. This behavior is implemented by managing side effects within the processing framework and atomically committing these side effects together with the checkpoint that marks a section of the log as processed.
When a log consumer writes to external storage systems, however, as in Figure 2, exactly-once semantics cannot be ensured, since doing so would require a heterogeneous atomic commit protocol across the stream processor and the storage system, which is not available on many storage systems, such as full-text search indexes. Thus, frameworks with exactly-once semantics still exhibit at-least-once processing when interacting with external storage and rely on idempotence to eliminate the effects of duplicate processing.
Atomicity and enforcing constraints. A classic example where atomicity is required is in a banking/payments system, where a transfer of funds from one account to another account must happen atomically, even if the two accounts are stored on different nodes. Moreover, such a system typically needs to maintain consistency properties or invariants (for example, an account cannot be overdrawn by more than some set limit). Figure 3 shows how such a payments application can be implemented using the OLEP approach instead of distributed transactions. Arrows with solid heads denote appending an event to a log, while arrows with hollow heads denote subscribing to the events in a log. It works as follows:
If the payment executor crashes and restarts, it may reprocess some payment requests that were partially processed before the crash. Since the executor is deterministic, upon recovery it will make the same decisions to approve or decline requests, and thus potentially append duplicate payment events to the source, destination, and fees logs. Based on the ID in the events, however, it is easy for downstream processes to detect and ignore such duplicates.
Heterogeneous transactions continue to be problematic. By their very nature, they can only rely on a lowest common denominator of participating systems.
Multipartition processing. In this payment example, each account has a separate log and thus may be stored on a different node. Moreover, each payment executor only needs to subscribe to events from a single account, and different executors handle different accounts. These factors allow the system to scale linearly to an arbitrary number of accounts.
In this example, the decision of whether to allow the payment request is conditional only on the balance of the source account; you can assume that the payment into the destination account always succeeds, since its balance can only increase. For this reason, the payment executor needs to serialize the payment request only with respect to other events in the source account. If other log partitions need to contribute to the decision, the approval of the payment request can be performed as a multistage process in which each stage serializes the request with respect to a particular log.
Splitting a "transaction" into a multistage pipeline of stream processors allows each stage to make progress based only on local data; it ensures that one partition is never blocked waiting for communication or coordination with another partition. Unlike multipartition transactions, which often impose a scalability bottleneck in distributed transaction implementations, this pipelined design allows OLEP systems to scale linearly.
Advantages of event processing. Besides this scalability advantage, developing applications in an OLEP style has several further advantages:
Disadvantages of the OLEP approach. In the previous examples, log consumers update the state in data stores (the database and search index in Figure 2; the account balances and account statements in Figure 3). While the OLEP approach ensures every event in the log will eventually be processed by every consumer, even in the face of crashes, there is no upper bound on the time until an event is processed.
This means if a client reads from two different data stores that are updated by two different consumers or log partitions, then the values read by the client may be inconsistent with each other. For example, reading the source and destination accounts of a payment may return the source account after the payment has been processed, but the destination account before it has been processed. Thus, even though the accounts will eventually converge toward a consistent state, they may be inconsistent when read at one particular point in time.
Debugging is much easier with an append-only log than a mutable database because events can be replayed in order to diagnose what happened in a particular situation.
Note that in an ACID context, preventing this anomaly falls under the heading of isolation, not atomicity; a system with atomicity alone does not guarantee that two accounts will be read in a consistent state. A database transaction running at "read committed" isolation levelthe default isolation level in many systems including PostgreSQL, Oracle DB, and SQL Servermay experience the same anomaly when reading from two accounts.3 Preventing this anomaly requires a stronger isolation level: "repeatable read," snapshot isolation, or serializability.
At present, the OLEP approach does not provide isolation for read requests that are sent directly to data stores (rather than being serialized through the log). Hopefully, future research will enable stronger isolation levels such as snapshot isolation across data stores that are updated from a log.
The New York Times maintains all textual content published since the newspaper's founding in 1851 in a single log partition in Apache Kafka.6 Image files are stored in a separate system, but URLs and captions of images are also stored as log events.
Whenever a piece of content (known as an asset) is published or updated, an event is appended to this log. Several systems subscribe to this log: for example, the full text of each article is written to an indexing service for full-text search; various cached pages (for example, the list of articles with a particular tag, or all pieces by a particular author) need to be updated; and personalization systems notify readers who may be interested in a new article.
Each asset is given a unique identifier, and an event may create or update an asset with a given ID. Moreover, an event may reference the identifiers of other assetsmuch like a normalized schema in a relational database, where one record may reference the primary key of another record. For example, an image (with caption and other metadata) is an asset that may be referenced by one or more articles.
The order of events in the log satisfies two rules:
For example, an editor might publish an image and then update an article to reference the image. Every consumer of the log then passes through three states in sequence:
Different log consumers will pass through these three states at different times but in the same order. The log order ensures that no consumer is ever in a state where the article references an image that does not yet exist, ensuring referential integrity.
Moreover, whenever an image or caption is updated, all articles referencing that image need to be updated in caches and search indexes. This can easily be achieved with a log consumer that uses a database to keep track of references between articles and images. This consistency model lends itself very easily to a log, and it provides most of the benefits of distributed transactions without the performance costs.
Further details on the New York Times's approach appear in a blog post.6
Support for distributed transactions across heterogeneous storage technologies is either nonexistent or suffers from poor operational and performance characteristics. In contrast, OLEP is increasingly used to provide good performance and strong consistency guarantees in such settings.
In data systems it is very common for logs (for example, write-ahead logs) to be used as internal implementation details. The OLEP approach is different: it uses event logs, rather than transactions, as the primary application programming model for data management. Traditional databases are still used, but their writes come from a log rather than directly from the application. This approach has been explored by several influential figures in industry, such as Jay Kreps,4 Martin Fowler,2 and Greg Young under names such as event sourcing and CQRS (Command/Query Responsibility Segregation).1,7
The use of OLEP is not simply pragmatism on the part of developers, but rather it offers a number of advantages. These include linear scalability; a means of effectively managing polyglot persistence; support for incremental development where new application features or storage technologies are added or removed iteratively; excellent support for debugging via direct access to the event log; and improved availability (because running nodes can continue to make progress when other nodes have failed).
Consequently, OLEP is expected to be increasingly used to provide strong consistency in large-scale systems that use heterogeneous storage technologies.
Acknowledgments. This work was supported by a grant from The Boeing Company. Thanks to Pat Helland for feedback on a draft of this article.
Evolution and Practice: Low-latency Distributed Applications in Finance
It Isn't Your Father's Real Time Anymore
1. Betts, D., Domínguez, J., Melnik, G., Simonazzi, F. and Subramanian, M. Exploring CQRS and Event Sourcing. Microsoft Patterns & Practices, 2012; http://aka.ms/cqrs.
2. Fowler, M. Event sourcing, 2005; https://www.martinfowler.com/eaaDev/EventSourcing.html.
4. Kreps, J. The log: What every software engineer should know about real-time data's unifying abstraction. LinkedIn Engineering, 2013; https://bit.ly/199iMwY.
5. Schneider, F.B. Implementing fault-tolerant services using the state machine approach: A tutorial. ACM Computing Surveys 22, 4 (1990), 299319; https://dl.acm.org/citation.cfm?doid=98163.98167.
6. Svingen, B. Publishing with Apache Kafka at the New York Times, (Sept. 5 2017); https://open.nytimes.com/publishing-with-apache-kafka-at-the-new-york-times-7f0e3b7d2077.
Copyright held by owner/author. Publication rights licensed to ACM.
Request permission to publish from email@example.com
The Digital Library is published by the Association for Computing Machinery. Copyright © 2019 ACM, Inc.
No entries found