A big Portuguese client challenged us to solve a BAM (business activity monitoring) problem they had.
They receive a lot of events and they need to know, in almost real time, if the rate of events (events per second) is outside some threshold. The last 30 minutes of data are the window that is supposed to be considered and there is a threshold for every combination of parameters of events (like an OLAP Cube). This also needs to be checked every second.
The system receives a maximum of 400 events per second, and each event has 8 parameters, giving a total of 240,000 different combinations of parameter values. This is a lot, especially considering that, for every second, they need to check whether each combination falls outside the threshold or not. Moreover, in order to get the rate of each combination, we need to store 30 minutes of data for every combination.
Alternatively, we can store only the events that arrive, and every second, we calculate the rate for each combination. But, this solution is impractical, because we would need to make 240,000 queries every second.
Our hypothetical problem
Let's assume our event is
PhoneCall(phoneNumber, countryCode, geoZone). We needed the following combinations:
- all events
- and by
Let's also assume we have 4,000 different phone numbers, 5 country codes and 10 geographic zones. So we will have approximately ~240k combinations (
(4000+1)(5+1)(10+1)), and a constant stream of 800 events per second. This is the worst case scenario.
Spark+Elasticsearch for our first approach
Our first approach was to use Spark Streaming. Spark is known for handling huge amounts of real time data with good performance, and it is built in Scala, which I've been liking more and more lately. :)
Spark Streaming works by aggregating small datasets in windows of 0.5 seconds. These RDDs (Resilient Distributed Datasets) are then fed to the Spark engine. After that, you can transform your RDD and a new RDD is returned, so you can pipeline all RDD's through a set of transformations. Check this guide to better understand how this works.
We made a stream which reads messages from a JMS queue and transforms them into
PhoneCall events. Then, another stream demultiplexes each event into multiple events according to its parameter values. This will produce 8 events for each JMS message. (Our real case was 60 events for each event.) A subsequent stream uses a [countByWindow](https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/streaming/dstream/DStream.html#countByWindow(org.apache.spark.streaming.Duration, org.apache.spark.streaming.Duration)) function for each event combination. countByWindow is set up with one 30-minute window every second, and we divide the function's output by 1,800 seconds (30 minutes) to get the average rate for each window. The final stream in the pipeline checks whether that average rate was outside the threshold, in which case, that information is persisted. This is done by using an ElasticSearch cluster. We chose ElasticSearch because it's a fast insert database.
Bottom line, this solution sucks. Not because Spark sucks, but because while Spark can handle enormous amounts of data, it was not made to handle enormous amounts of windows, and this problem requires a lot of them.
Our experiments showed this solution worked well as long as the number of combinations was less than 1000. However, when this number got higher, the lag started, the events started to build up, and the process simply crashed with OOM (Out of memory) :(
Clearly, this wasn't the right solution to this problem.
Let's try a more custom solution
Our idea was:
- Have one actor per specific combination of parameter values.
- Have an actor that receives each event, demultiplexes it into multiple events according to its parameter values, and sends each combination to the respective combination actor.
- Each combination actor stores a window of 30 minutes with the number of events with a circular buffer, partitioned in seconds. Each time a second ticks, the buffer rolls one position, and the actor calculates the number of events per second (number of events/1800 seconds), checks the corresponding threshold and sends the data to the ElasticSearch actor.
- The ElasticSearch actor is just an ActorPublisher that sends the data to the ElasticSearch stream driver.
You might ask yourself, isn't an actor for each combination a lot of actors? Well, actually that's something Akka handles pretty well, as you can read from the documentation:
Very lightweight event-driven processes (several million actors per GB of heap memory).
We also did some optimizations, mainly in the combination actors, because this was our big bottleneck when the number of combinations was high.
One of the optimizations we did was, when the 1 second tick is received, we get the last calculated value and we update our total by simply subtracting the value at the end of the buffer and adding the new value. (
lastValue - buffer[buffer.length-1] + buffer). After that, we roll the buffer.
Well, the results were really nice. With an i7 laptop, we could steadily handle all the events with 800 events per second. Process memory was somewhere around 4GB, which means we could handle more combinations if needed.
Moreover, knowing that Akka can scale horizontally very well, this solution would clearly benefit from this feature. If we need many more combinations or lots more throughput, we can simply add more machines.
This problem was not trivial. We will always need to keep 30 minutes of data for 240k combinations, and each second, check if each combination is in the threshold. It's a memory intensive and CPU intensive problem.
Spark is not as bad as it seems. It has great features, such as horizontal scaling, and it is a much simpler and more intuitive solution. However, it ended up not being a good match to the problem.
Akka is really great for handling CPU intensive problems, the actor model is a really intuitive way of handling concurrency, and when fully understood, it's really easy to shape your solution with this model. Also, because of the way Akka was built, it's a memory lightweight framework.