Damien Krotkine home

This post is a compilation of four old posts. They are gathered here in one piece for more clarity, and for archiving purpose. The work described in this post was done over the years 2014-2016

Riak as Events Storage

Booking.com constantly monitors, inspects, and analyzes our systems in order to make decisions. We capture and channel events from our various subsystems, then perform real-time, medium and long-term computation and analysis.

This is a critical operational process, since our daily work always gives precedence to data. Relying on data removes the guesswork in making sound decisions.

In this series of blog posts, we will outline details of our data pipeline, and take a closer look at the short and medium-term storage layer that was implemented using Riak.

Introduction to Events Storage

Booking.com receives, creates, and sends an enormous amount of data. Usual business-related data is handled by traditional databases, caching systems, etc. We define events as data that is generated by all the subsystems on Booking.com.

In essence, events are free-form documents that contain a variety of metrics. The generated data does not contain any direct operational information. Instead, it is used to report status, states, secondary information, logs, messages, errors and warnings, health, and so on. The data flow represents a detailed status of the platform and contains crucial information that will be harvested and used further down the stream.

To put this in numerical terms - we have more than billions of events per day, streaming at more than 100 MB per second, and adding up to more than 6 TB per day.

Here are some examples of how we use the events stream:

These typical use-cases are made available in less than one-minute after the related event has been generated.

High Level overview

This is a very simplified diagram of the data flow:

simplified diagram of the data flow

We can generate events by using literally any piece of code that exists on our servers. We pass a HashMap to a function, which packages the provided document into a UDP packet and sends it to a collection layer. This layer aggregates all the events together into “blobs”, which are split by seconds (also called epochs) and other variables. These event blobs are then sent to the storage layer running Riak. Finally, Riak sends them on to Hadoop. The Riak cluster is meant to safely store around ten days of data. It is used for near real-time analysis (something that happened seconds or minutes ago), and medium-term analysis of relatively small amounts of data. We use Hadoop for older data analysis or analysis of a larger volume of data.

The above diagram is a simplified version of our data flow. In practical application, it’s spread across multiple datacenters (DC), and includes an additional aggregation layer.

Individual Events

An event is a small schema-less [1] piece of data sent by our systems. That means that the data can be in any structure with any level of depth, as long as the top level is a HashTable. This is crucial to Booking.com - the goal is to give as much flexibility as possible for the sender, so that it’s easy to add or modify the structure, or the type and number of events.

Events are also tagged in four different ways:

Some common types are:

The subtypes are there for further specification and can answer questions like: “Which one of web server systems are we talking about?”.

Events are compressed Sereal blobs. Sereal is possibly the best schema-less serialisation format currently available. It was also written at Booking.com.

An individual event is not very big, but a huge number of them are sent every second.

We use UDP as transport because it provides a fast and simple way to send data. Despite some (very low) risk of data loss, it doesn’t impact senders sending events. We are experimenting with an UDP-to-TCP relay that will be local to the senders.

Aggregated Events

Literally every second, events from this particular second (called epoch), DC number, type, and subtype are merged together as an Array of events on the aggregation layer. At this point, it’s important to try and get the smallest size possible, so the events of a given epoch are re-serialized as a Sereal blob, using these options:

compress => Sereal::Encoder::SRL_ZLIB,
dedupe_strings => 1

dedupe_strings increases the serialisation time slightly. However it removes strings duplications which occur a lot since events are usually quite similar between them. We also add gzip compression.

We also add the checksum of the blob as a postfix, to be able to ensure data integrity later on. The following diagram shows what an aggregated blob of events looks like for a given epoch, DC, type, and subtype. You can get more information about the Sereal encoding in the Sereal Specification.

This is the general structure of an events blob:

general structure of an events blob

The compressed payload contains the events themselves. It’s an Array of HashMaps, Serialized in a Sereal structure and gzip-compressed. Here is an example of a trivial payload of two events, as follows:

[
  { cpu => 5  },
  { cpu => 99 }
]

And the gzipped payload would be the compressed version of this binary string:

gzipped payload would be the compressed version of this binary string

It can be hard to follow these hexdigits [2], yet it’s a nice illustration of why the Sereal format helps us to reduce the size of serialised data. The second array element is encoded on far fewer bytes than the first one, since the key has already be seen. The resulting binary is then re-compressed. The Sereal implementation offers multiple compression algorithms, including Snappy and gzip.

A typical blob of events for one second/DC/type/subtype can weight anywhere from several kilobytes to several megabytes, which translates into a (current) average of around 250 gigabytes per hour.

Side note: smaller subtypes on this level of aggregation aren’t always used, because we want to minimise the data we transmit over our network by having good compression ratios. Therefore we split types into subtypes only when the blobs are big enough. The downside to this approach is that consumers have to fetch data for the whole type, then filter out only subtypes they want. We’re looking at ways to find more balance here.

Data flow size and properties

Data flow properties are important, since they’re used to decide how data should be stored:

Data is used in different ways on the client side. A lot of consumers are actually daemons that will consume the fresh data as soon as possible - usually seconds after an event was emitted. A large number of clients read the last few hours of data in a chronological sequence. On rare occasions, consumers access random data that is over a few days old. Finally, consumers that want to work on larger amounts of older data would have to create Hadoop jobs.

There is a large volume of data to be moved and stored. In numerical terms:

Why Riak

In order to find the best storage solution for our needs, we tested and benchmarked several different products and solutions.

The solutions had to reach the right balance of multiple features:

After exploring a number of distributed file systems and databases, we chose Riak over distributed Key-Value stores. Riak had good performance and predictable behavior when nodes fail and when scaling up. It also had the advantage of being easy to grasp and implement within a small team. Extending it was very easy (which we’ll see in the next part of this series of blog posts) and we found the system very robust - we never had to face dramatic issues or data loss.

Disclaimer: This is not an endorsment for Riak. We compared it carefully to other solutions over a long period of time and it seemed to be the best product to suit our needs. As an example, we thoroughly tested Cassandra as an alternative: it had a larger community and similar performance but was less robust and predictable; it also lacked some advanced features. The choice is ultimately a question of priorities. The fact that our events are schema-less made it almost impossible for us to use solutions that require knowledge of the data structures. Also we needed a small team to be able to operate the storage, and a way to process data on the cluster itself, using MapReduce or similar mechanisms.

Riak 101

The Riak cluster is a collection of nodes (in our case physical servers), each of which claims ownership of a given key. Depending on the chosen replication factor, each key might be owned by multiple nodes. You can ask any node for a key and your request will be redirected to one of the owners. Same goes for writes.

On closer inspection of Riak, we see that keys are grouped into virtual nodes. Each physical node can own multiple virtual nodes. This simplifies data rebalancing when growing a cluster. Riak does not need to recalculate the owner for each individual key; it will only do it per virtual node.

We won’t cover Riak architecture in a great detail in this post, but we recommend reading the following article for further information.

Riak clusters configuration

The primary goal of this storage is to keep the data safe. We went with the regular replication number value of three. Even if two nodes owning the same data will go down, we won’t lose our data.

Riak offers multiple back-ends for actual data storage. The main three are Memory, LevelDB, and Bitcask. We chose Bitcask, since it was suitable for our particular needs. Bitcask uses log-structured hash tables that provide very fast access. As data gets written to the storage, Bitcask simply appends data to a number of opened files. Even if a key is modified or deleted, the information will be written at the end of these storage files. An in-memory HashTable maps the keys with the position of their (latest) value in files. That way, at most one seek is needed to fetch data from the file system.

Data files are then periodically compacted, and Bitcask provides very good expiration flexibility. Since Riak is a temporary storage solution for us, we set it up with automatic expiration. Our expiration period varies. It depends on the current cluster shape, but usually falls between 8-11 days.

Bitcask keeps all of the keys of a node in memory, so keeping large numbers of individual events as key value pairs isn’t trivial. We sidestep any issues by using aggregations of events (blobs), which drastically reduce the number of needed keys.

More information about Bitcask can be found here.

For our conflict resolution strategy, we use Last Write Wins. The nature of our data (which is immutable as we described before) allows us to avoid the need for conflict resolution.

The last important part of our setup is load balancing. It is crucial in an enviromnent with a high level of reads, and only 1 gigabyte network. We use our own solution for that based on Zookeeper. Zooanimal daemons are running on the riak nodes, and collect information about system health. The information is then aggregated into simple text files, where we have an ordered list of IP addresses, plus up and running Riak nodes, which we can connect to. All our Riak clients simply choose a random node to send their requests to.

We currently have two Riak clusters in different geographical locations, each of which have more than 30 nodes. More nodes equates to more storage space, CPU power, RAM, and more network bandwidth available.

Data Design

Riak is primarily a key-value store. Although it provides advanced features (secondary indexes, MapReduce, CRDTs), the simplest and most efficient way to store and retrieve data is to use the key-value model.

Riak has three concepts - a bucket is a namespace, in which a key is unique. A key is the identifier of the data; and has to be stored in a bucket. A value is the data; it has an associated mime-type, which can enable Riak awareness of its type.

Riak doesn’t provide efficient ways to retrieve the list of buckets or the list of keys by default [3]. When using Riak, it’s important to know the bucket and key to access. This is usually resolved by using self-explanatory identifiers.

In our case, our events are stored as Sereal-encoded blobs. From these, we know the datacenter, type, subtype, and of course the time at which it was created.

When we need to retrieve data, we always know the time we want. We are also confident in the list of our datacenters. It doesn’t change unexpectedly so we can make it static for our applications. We are not always sure about what types or subtypes will appear in a given epoch for a given datacenter. On some seconds events of certain types may not arrive.

We came up with this simple data design:

The value of chunk is an integer, starting at zero, which keeps event blobs smaller than 500 kilobytes each. We use the integer to split big events blobs into smaller ones, so that Riak can function more efficiently.

We’ll now see this data design in action when pushing data to Riak

Pushing to Riak

Pushing data to Riak is done by a number of relocators, which are daemons running on the aggregation layer that then push events blobs to Riak.

Side note: it’s not recommended to have keys more then 1-2MB in Riak (see this FAQ). And since our blobs can be 5-10MB in size, we shard them into chunks, 500KB each. Chunks are valid Sereal documents, which means we do not have to stich chunks together in order to retrieve data back.

This means that we have quite a lot of blobs to send to Riak, so to maximise our usage of networking, I/O, and CPU, it’s best to send data in a mass-parallel way. To do so, we maintain a number of forked processes (20 per host is a good start), in which each of them push data to Riak.

Pushing data to Riak can be done using the HTTP API, or the Protocol Buffers Client (PBC) API. PBC has a slighly better performance.

Whatever protocol is used, it’s important to maximise I/O utilisation. One way is to use an HTTP library that parallelises the requests in term of I/O (YAHC is an example). Another method is to use an asynchronous Riak Client like AnyEvent::Riak.

We use an in-house library to create and maintain a pool of forks, but there are more than one existing libraries on CPAN, like Parallel::ForkManager.

PUT to Riak

Writing data to Riak is rather simple. For a given epoch, we have the list of events blobs, each of them having a different DC/type/subtype combination (remember, DC is short for Data Center). For example:

PUT to Riak

The first task is to slice the blobs into 500 KB chunks and add a postfix index number to their name. That gives:

PUT to Riak - result

Next, we can store all the event blobs in Riak in the events bucket. We can simulate it with curl:

curl -d <data> -XPUT "http://node:8098/buckets/events/keys/1413813813:1:type1:subtype1:0"
# ...
curl -d <data> -XPUT "http://node:8098/buckets/events/keys/1413813813:2:type3::0"

Side note: we store all events in each of the available Riak clusters. In other words, all events from all DCs will be stored in the Riak cluster which is in DC 1, as well as in the Riak cluster which is in DC 2. We do not use cross DC replication to achieve that - instead we simply push data to all our clusters from the relocators.

Once all the events blobs are stored, we can store the metadata, which is the list of the event keys, in the epochs bucket. This metadata is stored in one key per epoch and DC. So for the current example, we will have 2 keys: 1413813813-1 and 1413813813-2. We have chosen to store the list of events blobs names as pipe separated values. Here is a simulation with curl for DC 2:

curl -d "type1:subtype1:0|type1:subtype1:1|type3::0" -XPUT "http://riak_host:8098/buckets/epochs/keys/1413813813-2"

Because the epoch and DC are already in the key name, it’s not necessary to repeat that in the content. It’s important to push the metadata after pushing the data.

PUT options

When pushing data to the Riak cluster, we can use different attributes to change the way data is written - either by specifying which ones when using the PBC API, or by setting the buckets defaults.

Riak’s documentation provides a comprehensive list of the parameters and their meaning. We have set these parameters as follows:

"n_val" : 3,

"allow_mult"      : false,
"last_write_wins" : true,

"w"  : 3,
"dw" : 0,
"pw" : 0,

Here is a brief explanation of these parameters:

In a nutshell, we have a reasonably robust way of writing data. Because our data is immutable and never modified, we don’t want to have siblings or conflict resolution on the application level. Data loss could, in theory, happen if a major network issue happened just after having acknowledged a write, but before the data reached the backend. However, in the worst case we would lose a fraction of one second of events, which is acceptable for us.

Reading from Riak

This is how the data and metadata for a given epoch is laid out in Riak:

bucket: epochs
key: 1428415043-1
value: 1:cell0:WEB:app:chunk0|1:cell0:EMK::chunk0

bucket: events
key: 1428415043:1:cell0:WEB:app:chunk0
value: <binary sereal blob>

bucket: events
key: 1428415043:1:cell0:EMK::chunk0
value: <binary sereal blob>

Fetching one second of data from Riak is quite simple. Given a DC and an epoch, the process is as follow:

Reading a time range of data is done the same way. Fetching ten minutes of data from Wed, 01 Jul 2015 11:00:00 GMT would be done by enumerating all the epochs, in this case:

1435748400
1435748401
1435748402
...
1435749000

Then, for each epoch, fetch the data as previously mentioned. It should be noted that Riak is specifically tailored for this kind of workload, where multiple parallel processes perform a huge number of small requests on different keys. This is where distributed systems shine.

GET options

The events bucket (where the event data is stored) has the following properties:

"r"            : 1,
"pr"           : 0,
"rw"           : "quorum",
"basic_quorum" : true,
"notfound_ok"  : true,

Again, let’s look at these parameters in detail:

These parameter values allow to be as fast as possible when fetching data. In theory, such values don’t protect against conflicts or data corruption. However, in the “Aggregated Events” section (see the first post), we’ve seen that every event blob has a suffix checksum. When fetching them from Riak, this enables the consumer to verify that there is no data corruption. The fact that the events are never modified ensures that no version conflict can occur. This is why having such “careless” parameter values is not an issue for this use case.

Real time data processing outside of Riak

After the events are properly stored in Riak, it’s time to use them. The first usage is quite simple: extract data out of them and process it on dedicated machines, usually grouped in clusters or aggregations of machines that perform the same kind of analysis. These machines are called consumers, and they usually run daemons that fetch data from Riak, either continuously or on demand. Most of the continuous consumers are actually small clusters of machines spreading the load of fetching data.

Real time data processing outside of Riak

Some data processing is required at near real-time. This is the case for monitoring, and building graphs. Booking.com heavily uses graphs at every layer of its technical stack. A big portion of graphs are generated from Events. Data is fetched every second from the Riak storage, processed, and dedicated graphing data is sent to an in-house Graphite cluster.

Other forms of monitoring also consume the events stream- fetched continuously and aggregated in per-second, per-minute, and daily aggregations in external databases, which are then provided to multiple departments via internal tools.

These kind of processes try to be as close as possible to real-time. Currently there are 10 to 15 seconds of lag. This lag could be shorter: a portion of it is due to the collection part of the pipeline, and an even bigger part of it is due to the re-serialisation of the events as they are grouped together, to reduce their size. A good deal of optimisation could be done there to reduce the lag down to a couple of seconds [4]. However, there was no operational requirement for reducing it and 15 seconds is small enough for our current needs.

Another way of using the data is to stick to real-time, but accumulate seconds in periods. One example is our Anomaly Detector, which continuously fetches events from the Riak clusters. However, instead of using the data right away, it accumulates it on short moving windows of time (every few minutes) and applies statistical algorithms on it. The goal is to detect anomalous patterns in our data stream and provide the first alert that prompts further action. Needless to say, this client is critical.

Another similar usage is done when gathering data related to A/B testing. A large number of machines harvest data from the events’ flow before processing it and storing the results in dedicated databases for use in experimentation-related tooling.

There are a number of other usages of the data outside of Riak, including manually looking at events to check new features behaviours or analysing past issues / outages.

Limitations of data processing outside of Riak

Fetching data outside of the Riak clusters raises some issues that are difficult to work around without changing the processing mechanism.

First of all, there is a clear network bandwidth limitation to the design: the more consumer clusters there are, the more network bandwidth is used. Even with large clusters (more than 30 nodes), it’s relatively easy to exhaust the network capacity of all the nodes as more and more fetchers try to get data from them.

Secondly, each consumer cluster tends to use only a small part of the events flow. Even though consumers can filter out types, subtypes, and DCs, the resulting events blobs still contain a large quantity of data that is useless to the consumer. For storage efficiency, events need to be stored as large compressed serialised blobs, so splitting them more by allowing more subtyping is not possible [5].

Additionally, statically splitting the events content is too rigid since use of the data changes over time and we do not want to be a bottleneck to change for our downstream consumers. Part of an event from a given type that was critical 2 years ago might be used for minor monitoring now. A subtype that was heavily used for six month may now be rarely used because of a technical change in the producers.

Finally, the amount of CPU time needed to uncompress, load, and filter the big events blobs is not tiny. It usually takes around five seconds to fetch, uncompress, and filter one second’s worth of events. Which means that any real-time data crunching requires multiple threads and likely multiple hosts - usually a small cluster. It would be much simpler if Riak could provide a real-time stream of data exactly tailored to the consumer need.

Next: data filtering and processing inside Riak

What if we could remove the CPU limitations by doing processing on the Riak cluster itself? What if we could work around the network bandwidth issue by generating sub-streams on the fly and in real-time on the Riak cluster?

This is exactly what we implemented, using simple concepts, and leveraging the ease of use and hackability of Riak. These concepts and implementations will be described in the next sections

Real-time server-side data processing: the theory

The reasoning is actually very simple. The final goal is to perform data processing of the events blobs that are stored in Riak in real-time. Data processing usually produces a very small result, and it appears to be a waste of network bandwidth to fetch data outside of Riak to perform data analysis on consumer clusters, as in this example::

The Theory

This diagram is equivalent to:

This diagram is equivalent to

So instead of bringing the data to the processing code, let’s bring the code to the data:

instead of bringing the data to the processing code, let's bring the code to the data

This is a typical use case for MapReduce. We’re going to see how to use MapReduce on our dataset in Riak, and also why it’s not a usable solution.

For the rest of this post, it’s important to establish a reference for all the events that are stored for a time period of exactly one second. Because we already happen to store our events by a second (and call it an “epoch”), using this unit of measure is a practical consideration that we’ll refer to as epoch-data.

A first attempt: MapReduce

MapReduce is a very well known (if somewhat outdated) way of bringing the code near the data and distributing data processing. There are excellent papers explaining this approach for further background study.

Riak has a very good MapReduce implementation. MapReduce jobs can be written in Javascript or Erlang. We highly recommend using Erlang for better performance.

To perform events processing of an epoch-data on Riak, the MapReduce job would look like the following list. Metadata and data keys concepts are explained in the part 2 of the blog series. Here are the MapReduce phases:

MapReduce

This works just fine. For one epoch-data, one data processing code is properly mapped to the events, the data deserialised and processed in around 0.1 second (on our initial 12 nodes cluster). This is by itself an important result: it’s taking less than one second to fully process one second worth of events. Riak makes it possible to implement a real-time MapReduce processing system [6].

Should we just use MapReduce and be done with it? Not really, because our use case involves multiple consumers doing different data processing at the same time. Let’s see why this is an issue.

The metrics

To be able to test the MapReduce solution, we need a use case and some metrics to measure.

The use case is the following: every second, multiple consumers (say 20) need the result of one of the data processing (say 10) of the previous second.

We’ll consider that an epoch-data is roughly 70MB, data processing results are around 10KB each. Also, we’ll consider that the Riak cluster is a 30 nodes ring with 10 real CPUs available for data processing on each node.

The first metric we can measure is the external network bandwidth usage. This is the first factor that encouraged us to move away from fetching the events out of Riak to do external processing. External bandwidth usage is the bandwidth used to transfer data between the cluster as a whole, and the outside world.

The second metric is the internal network bandwidth usage. This represents the network used between the nodes, inside of the Riak cluster.

Another metric is the time (more precisely the CPU-time) it takes to deserialise the data. Because of the heavily compressed nature of our data, decompression and deserialising one epoch-data takes roughly 5 sec.

The fourth metric is the CPU-time it take to process the deserialized data, analyze it and produce a result. This is very fast (compared to deserialisation), let’s assume 0.01 sec. at most.

Note: we are not taking into account the impact of storing the data in the cluster (remember that events blobs are being stored every second) because it’s impacting the system the same way in both external processing and MapReduce.

Metrics when doing external processing

When doing standard data processing as seen in the previous part of this blog series, one epoch-data is fetched out from Riak, and deserialised and processed outside of Riak.

External bandwidth usage

The external bandwidth usage is high. For each query, the epoch-data is transferred, so that’s 20 queries times 70MB/s = 1400 MB/s. Of course, this number is properly spread across all the nodes, but that’s still roughly 1400 / 30 = 47 MB/s. That, however, is just for the data processing. There is a small overhead that comes from the clusterised nature of the system and from gossiping, so let’s round that number to 50 MB/s per node, in external output network bandwidth usage.

Internal bandwidth usage

The internal bandwidth usage is very high. Each time a key value is requested, Riak will check its 3 replicas, and return the value. So 3 x 20 x 70MB/s = 4200 MB/s. Per node, it’s 4200 MB/s / 30 = 140 MB/s

Deserialise time

Deserialise time is zero: the data is deserialised outside of Riak.

Processing time

Processing time is zero: the data is processed outside of Riak.

Metrics when using MapReduce

When using MapReduce, the data processing code is sent to Riak, included in an ad hoc MapReduce job, and executed on the Riak cluster by sending the orders to the nodes where the epoch-data related data chunks are stored.

External bandwidth usage

When using MapReduce to perform data processing jobs, there is certainly a huge gain in network bandwidth usage. For each query, only the results are transferred, so 20 x 10KB/s = 200 KB/s.

Internal bandwidth usage

The internal usage is also very low: it’s only used to spread the MapReduce jobs, transfer the results, and do bookkeeping. It’s hard to put a proper number on it because of the way jobs and data are spread on the cluster, but overall it’s using a couple of MB/s at most.

Deserialise time

Deserialise time is high: for each query, the data is deserialised, so 20 x 5 = 100 sec for the whole cluster. Each node has 10 CPUs available for deserialisation, so the time needed to deserialise one second worth of data is 100/300 = 0.33 sec. We can easily see that this is an issue, because already one third of all our CPU power is used for deserialising the same data in each MapReduce instance. It’s a big waste of CPU time.

Processing time

Processing time is 20 x 0.01 = 0.2s for the whole cluster. This is really low compared to the deserialise time.

Limitations of MapReduce

As we’ve seen, using MapReduce has its advantages: it’s a well-known standard, and allows us to create real-time processing jobs. However it doesn’t scale: because MapReduce jobs are isolated, they can’t share the deserialised data, and CPU time is wasted, so it’s not possible to have more than one or two dozens of real-time data processing jobs at the same time.

It’s possible to overcome this difficulty by caching the deserialised data in memory, within the Erlang VM, on each node. CPU time would still be 3 times higher than needed (because a map job can run on any of the 3 replicas that contains the targeted data) but at least it wouldn’t be tied to the number of parallel jobs.

Another issue is the fact that writing MapReduce jobs is not that easy, especially because — in this case — it’s a prerequisite to know Erlang.

Last but not least, it’s possible to create very heavy MapReduce jobs, easily consuming all the CPU time. This directly impacts the performance and reliability of the cluster, and in extreme cases the cluster may be unable to store incoming events at a sufficient pace. It’s not trivial to fully protect the cluster against MapReduce misuse.

A better solution: post-commit hooks

To work around these limitations, We explored a different approach to enable real-time data processing on the cluster that scales properly by deserialising data only once, allows us to cap its CPU usage, and allows us to write the processing jobs in any language, while still bringing the code to the data, removing most of the internal and external network usage.

This technical solution is what is currently in production at Booking.com on our Riak events storage clusters, and it uses post-commit hooks and a companion service on the cluster nodes.

Strategy and Features##

The previous parts introduced the need for data processing of the events blobs that are stored in Riak in real-time, and the strategy of bringing the code to the data:

instead of bringing the data to the processing code, let's bring the code to the data

Using MapReduce for computing on-demand data processing worked fine but didn’t scale to many users (see part 3).

Finding an alternative to MapReduce for server-side real-time data processing requires listing the required features of the system and the compromises that can be made:

Real-time isolated data transformation

As seen in the previous parts of this blog series, we need to be able to perform transformation on the incoming events, with as little delay as possible. We don’t want any lag induced by a large batch processing. Luckily, these transformations are usually small and fast. Moreover, they are isolated: the real-time processing may involve multiple types and subtypes of events data, but should not depend on previous events knowledge. Cross-epoch data processing can be implemented by reusing the MapReduce concept, computing a Map-like transformation on each events blobs by computing them independently, but leaving the Reduce phase up to the consumer.

Performance and scalability

The data processing should have a very limited bandwidth usage and reasonable CPU usage. However, we also need the CPU usage not to be affected by the number of clients using the processed data. This is where the previous attempt using MapReduce showed its limits. Of course, horizontal scalability has to be ensured, to be able to scale with the Riak cluster.

One way of achieving this is to perform the data processing continuously for every datum that reach Riak, upfront. That way, client requests are actually only querying the results of the processing, and not triggering computation at query time.

No back-processing

The data processing will have to be performed on real-time data, but no back-processing will be done. When a data processing implementation changes, it will be effective on future events only. If old data is changed or added (usually as a result of reprocessing), data processing will be applied, but using the latest version of processing jobs. We don’t want to maintain any history of data processing, nor any migration of processed data.

Only fast transformations

To avoid putting too much pressure on the Riak cluster, we only allow data transformation that produces a small result (to limit storage and bandwidth footprint), and that runs quickly, with a strong timeout on execution time. Back-pressure management is very important, and we have a specific strategy to handle it (see “Back-pressure management strategy” below)

The solution: Substreams

Substreams, a simplified overview

With these features and compromises listed, it is now possible to describe the data processing layer that we ended up implementing at Booking.com.

This system is called Substreams. Every seconds, the list of keys of the data that has just been stored is sent to a companion app - a home-made daemon - running on every Riak node. This fetches the data, decompresses it, runs a list of data transformation code on it, and stores the results back into Riak, using the same key name but with a different namespace. Users can now fetch the processed data.

A data transformation code is called a substream because most of the time the data transformation is more about cherry-picking exactly the needed fields and values out of the full stream, rather than performing complex operations.

The companion app is actually a simple pre-forking daemon with a Rest API. It’s installed on all nodes of the cluster, with around 10 forks. The Rest API is used to send it the list of keys, and wait for the process completion. The events data doesn’t transit via this API; the daemon is fetching itself the key values from Riak, and stores the substreams (results of data transformation) back into Riak.

The main purpose of this system is to drastically reduce the size of data transferred to the end user by enabling the cherry-picking of specific branches or leaves of the events structures, and also to perform preliminary data processing on the events. Usually, clients are fetching these substreams to perform more complex and broader aggregations and computations (for instance as a data source for Machine Learning).

Unlike MapReduce, this system has multiple benefits:

Data decompressed only once

Deserialisation and decompression is done once, for many data processing jobs

A given binary blob of events (at mot 500K of compressed data) is handled by one instance of the companion app, which will decompress it once, then run all the data processing jobs on the decompressed data structure in RAM. This is a big improvement compared to MapReduce, the most CPU intensive task is actually to decompress and deserialise the data, not to transform it. Here we have the guarantee that data is decompressed only once in its lifetime.

Transformation at write time, not at query time

Data is created once and for all

Unlike MapReduce, once a transformation code is setup and enabled, it’ll be computed for every epoch, even if nobody uses the result. However, the computation will happen only once, even if multiple users request it later on. Data transformation is already done when users want to fetch the result. That way, the cluster is protected against simultaneous requests of a big number of users. It’s also easier to predict the performance of the substreams creations.

Hard timeout - open platform

Data decompression and transformation by the companion app is performed under a global timeout that would kill the processing if it takes too long. It’s easy to come up with a realistic timeout value given the average size of event blobs, the number of companion instances, and the total number of nodes. The hard timeout makes sure that data processing is not using too many resources, ensuring that Riak KV works smoothly.

This mechanism allows the cluster to be an open platform: any developer in the company can create a new substream transformation and quickly get it up and running on the cluster on its own without asking for permission. There is no critical risk for the business as substreams runs are capped by a global timeout. This approach is a good illustration of the flexible and agile spirit in IT that we have at Booking.com.

Implementation using a Riak commit hook

detailed picture with the commit hook

In this diagram we can see where the Riak commit hook kicks in. We can also see that when the companion requests data from the Riak service, there is a high chance that the data is not on the current node and Riak has to get it from other nodes. This is done transparently by Riak, but it consumes bandwidth. In the next section we’ll see how to reduce this bandwidth usage and have full data locality. But for now, let’s focus on the commit hook.

Commit hooks are a feature of Riak that allow the Riak cluster to execute a provided callback just before or just after a value is written, using respectively pre-commit and post-commit hooks. The commit hook is executed on the node that coordinated the write.

We set up a post-commit hook on the metadata bucket (the epochs bucket). We implemented the commit hook callback, which is executed each time a key is stored to that metadata bucket. In part 2 of this series, we explained that the metadata is stored in the following way:

The post-commit hook callback is quite simple: for each metadata key, it gets the value (the list of data keys), and sends it over HTTP in async mode to the companion app. Proper timeouts are set so that the execution of the callback is capped and can’t impact the Riak cluster performance.

Hook implementation

First, let’s write the post commit hook code:

:::erlang
metadata_stored_hook(RiakObject) ->
    Key = riak_object:key(RiakObject),
    Bucket = riak_object:bucket(RiakObject),
    [ Epoch, DC ] = binary:split(Key, <<"-">>),
    MetaData = riak_object:get_value(RiakObject),
    DataKeys = binary:split(MetaData, <<"|">>, [ global ]),
    send_to_REST(Epoch, Hostname, DataKeys),
    ok.

send_to_REST(Epoch, Hostname, DataKeys) ->
    Method = post,
    URL = "http://" ++ binary_to_list(Hostname)
       ++ ":5000?epoch=" ++ binary_to_list(Epoch),
    HTTPOptions = [ { timeout, 4000 } ],
    Options = [ { body_format, string },
    		         { sync, false },
            		  { receiver, fun(ReplyInfo) -> ok end }
              ],
    Body = iolist_to_binary(mochijson2:encode( DataKeys )),
    httpc:request(Method,
                  {URL, [], "application/json", Body},
                  HTTPOptions, Options),
    ok.

These two Erlang functions (here they are simplified and would probably not compile), are the main part of the hook. The function metadata_stored_hook is going to be the entry point of the commit hook, when a metadata key is stored. It receives the key and value that was stored, via the RiakObject, uses its value to extract the list of data keys. This list is then sent to the companion damone over Http using send_to_REST.

The second step is to get the code compiled and Riak setup to be able to use it is properly. This is described in the documentation about custom code.

Enabling the Hook

Finally, the commit hook has to be added to a Riak bucket-type:

riak-admin bucket-type create metadata_with_post_commit \
'{"props":{"postcommit":["metadata_stored_hook"]}'

Then the type is activated:

riak-admin bucket-type activate metadata_with_post_commit

Now, anything sent to Riak to be stored with a key within a bucket whose bucket-type is metadata_with_post_commit will trigger our callback metadata_stored_hook.

The hook is executed on the coordinator node, that is, the node that received the write request from the client. It’s not necessary the node where this metadata will be stored.

The companion app

The companion app is a Rest service, running on all Riak nodes, listening on port 5000, ready to receive a json blob, which is the list of data keys that Riak has just stored. The daemon will fetch these keys from Riak, decompress their values, deserialise them and run the data transformation code on them. The results are then stored back to Riak.

There is little point showing the code of this piece of software here, as it’s trivial to write. We implemented it in Perl using a PSGI preforking web server (Starman). Using a Perl based web server allowed us to also have the data transformation code in Perl, making it easy for anyone in the IT department to write some of their own.

Optimising intra-cluster network usage

As seen saw earlier, if the commit hook simply sends the request to the local companion app on the same Riak node, additional bandwidth usage is consumed to fetch data from other Riak nodes. As the full stream of events is quite big (around 150 MB per second), this bandwidth usage is significant.

In an effort to optimise the network usage, we have changed the post-commit hook callback to group the keys by the node that is responsible for their values. The keys are then sent to the companion apps running on the associated nodes. That way, a companion app will always receive event keys for which data are on the node they are running on. Hence, fetching events value will not use any network bandwidth. We have effectively implemented 100% data locality when computing substreams.

Better implementation where metadata is sent to the Riak node that contains the data

This optimisation is implemented by using Riak’s internal API that gives the list of primary nodes responsible for storing the value of a given key. More precisely, Riak’s Core application API provides the preflist() function: (see the API here) that is used to map the result of the hashed key to its primary nodes.

The result is a dramatic reduction of network usage. Data processing is optimised by taking place on one of the nodes that store the given data. Only the metadata (very small footprint) and the results (a tiny fraction of the data) travel on the wire. Network usage is greatly reduced.

Back-pressure management strategy

For a fun and easy-to-read description of what back-pressure is and how to react to it, you can read this great post by Fred Hebert (@mononcqc): Queues Don’t Fix Overload.

What if there are too many substreams, or one substream is buggy and performs very costly computations (especially as we allow developers to easily write their own substream), or all of a sudden the events fullstream change, one type becomes huge and a previously working substream now takes 10 times more to compute?

One way of dealing with that is to allow back-pressure: the substream creation system will inform the stream storage (Riak) that it cannot keep up, and that it should reduce the pace at which it stores events. This is however not practical here. Doing back-pressure that way will lead to the storage slowing down, and transmitting the back-pressure upward the pipeline. However, events can’t be “slowed down”. Applications send events at a given pace and if the pipeline can’t keep up, events are simply lost. So propagating back-pressure upstream will actually lead to load-shedding of events.

The other typical alternative is applied here: doing load-shedding straight away. If a substream computation is too costly in CPU time, wallclock time, disk IO or space, the data processing is simply aborted. This protects the Riak cluster from slowing down events storage - which after all, is its main and critical job.

That leaves the substream consumers downstream with missing data. Substreams creation is not guaranteed anymore. However, we used a trick to mitigate the issue. We implemented a dedicated feature in the common consumer library code; when a substream is unavailable, the full stream is fetched instead, and the data transformation is performed on the client side.

It effectively pushes the overloading issue down to the consumer, who can react appropriately, depending on the guarantees they have to fulfill, and their properties.

However, this multitude of ways of dealing with the absence of substreams, concentrated at the end of the pipeline, is a very safe yet flexible approach. In practice, it is not so rare that a substream result for one epoch is missing (one blob every couple of days), and such blips have no incidence on the consumers, allowing for a very conservative behaviour of the Riak cluster regarding substreams: “when in doubt, stop processing substreams”.

Conclusion

This data processing mechanism proved to be very reliable and well-suited for our needs. The implementation required surprisingly small amount of code, leveraging features of Riak that proved to be flexible and easy to use.

Notes

[1] It is not strictly true that our events are schema-less. They obey the structure that the producers found the most useful and natural. But they are so many producers which each of them sending events that have a different schema, so it’s almost equivalent to considering them schema-less. Our events can be seen as structured, yet with so many schemas that they can’t be traced. There is also complete technical freedom to change the structure of an event, if it’s seen as useful by a producer.

[2] After spending some time looking at and decoding Sereal blobs, the human eye easily recognizes common data structures like small HashMaps, small Arrays, small Integers and VarInts, and of course, Strings, since their content is untouched. That makes Sereal an almost human readable serialisation format, especially after a hexdump.

[3] This can be worked around by using secondary indexes (2i) if the backend is eleveldb or Riak Search, to create additional indexes on keys, thus enabling listing them in various ways.

[4] Some optimisation has been done, the main action was to implement a module to split a sereal blob without deserialising it, thus speeding up the process greatly. This module can be found here: Sereal::Splitter. Most of the time spent in splitting sereal blobs is now spent in decompressing it. The next optimization step would be to use compression that decrunches faster than the currently used gzip; for instance LZ4_HC.

[5] At that point, the attentive reader may jump in the air and proclaim “LevelDB and snappy compression!”. It is indeed possible to use LevelDB as Riak storage backend, which provides an option to use Snappy compression on the blocks of data stored. However, this compression algorithm is not good enough for our need (using gzip reduced the size by a factor of almost 2). Also, Leveldb (or at least the eleveldb implementation that is used in Riak) doesn’t provide automatic expiration which is critical to us, and had issues with reclaiming free space after key deletions, with versions below 2.x

[6] Using MapReduce on Riak is usually somewhat discouraged because most of the time it’s being used in a wrong way, for instance when performing bulk fetch or bulk insert or traversing a bucket. The MapReduce implementation in Riak is very powerful and efficient, but must be used properly. It works best when used on a small number of keys, even if the size of data processed is very large. The fewer keys the less bookkeeping and the better performance. In our case, there are only a couple of hundred keys for one second worth of data (but somewhat large values, around 400K), which is not a lot. Hence the great performance of MapReduce we’ve witnessed. YMMV.

blog comments powered by Disqus
Fork me on GitHub