Life360 Engineering

Kent Hoxsey
Infrastructure Engineer
Sep 25, 2017
From the Engineering dept

Streaming data with Kinesis on AWS

Moving a high-volume message stream from NSQ to Kinesis

One really interesting aspect of our work at Life360 is supporting the flow of location messages from millions of devices. Our current message rates average around 20k/s, which is large enough to be challenging without touching the red line where absolutely terrifying lives. While we are in the process of rebuilding our services to handle much higher message rates, we are working out an approach to scale the underlying message-handling infrastructure as well.

Distributed queues, NSQ, and one or two issues

We depend on NSQ throughout our setup, and get a lot of value from its simplicity and ease of use. It is really simple to deploy and operate, and has yet to show us any capacity limitations. The standard deployment pattern is to colocate an nsqd with any service that publishes messages, which distributes work very nicely and makes scaling a no-brainer.

However, there is an aspect of its reliability model that can be problematic for us. When a message cannot be processed due to an error or hangup in a Consumer, that message gets requeued for another Consumer to handle. The requeue operation does not preserve ordering, so the requeued messages go to the end of the line for processing. This isn't often a problem, but it is an issue that we have known we would need to address at some point.

We have been considering a move to an Event Sourcing model, with a distributed, durable commit log. There's a great set of posts on this subject that I've linked down below. This introduces a different scaling model, as the log becomes an infrastructure entity in its own right, capable of handling potentially many different consumer applications. There are some nice properties that come along with such a log, including an explicit ordering of the messages in the log. If a consumer fails and another needs to pick up the failed workload, it is easy to start up at the point in the stream where the previous consumer stopped, without needing to do any special handling of messages to ensure ordering.

That "durable" property yields some other benefits as well, since we can choose how long we want to retain the messages in the log. Being able to look back in time enables applications to retry messages from a certain point in time, or warm up caches, or look back at WTF just happened after a fault. One of my favorite discussions of this is Martin Kleppman's Strange Loop presentation linked on his blog post on the subject.

Enter Kinesis

There are two candidates we considered seriously: Kafka and AWS Kinesis. In our case, Kinesis won out for ease of implementation and operation. Once we got started, we were able to get a prototype stream operating in a few minutes through the AWS Console, and then back it up with Terraform once we developed a better understanding of the configuration we wanted to run. It was gratifying to get the underlying infrastructure running quickly and turn attention to the messaging challenges right away.

One of the considerations for any team for a new infrastructure component is the availability of client libraries and abstractions. In the Kinesis space, the two key libraries are the Kinesis Producer Library (KPL) and Kinesis Consumer Library (KCL). Both live under the AWSlabs project on Github, along with a bunch of useful code. There are a few links at the end of this article with examples we found helpful getting up to speed on KCL and KPL applications.

Actual Work: Bridging messages from NSQ to Kinesis

While it was great to get a stream in place with one magic incantation, that was only the starting point for the real work. Our roadmap is to convert the messaging currently running through NSQ to write into a Kinesis stream, and develop new services using those streams. To get data flowing quickly, we executed a down-and-dirty sysadmin hack:

  1. nsq_to_file consuming an ephemeral channel on the NSQ topic we wanted and writing to local files
  2. kinesis-agent pumping those files into our new stream

This simple setup took every one of the 25 seconds you would expect to get running, and provided a working stream in our development environment to exercise our new consumer application code. kinesis-agent is another AWSlabs project, a log-watcher that will load data into a Kinesis stream. It is intended as a simple way to load and parse log data for analytics downstream, but for our purposes it provided a fast and simple way to bootstrap our development setup.

Sidebar: running the file bridge
  1. Create an IAM role for writing to the Kinesis stream
  2. Attach a write-specific policy for your stream gist
  3. Clone, build, install the kinesis-agent project
  4. Configure nsq_to_file to stream your topic to disk
  5. Configure /etc/aws/agent.json for your file-watcher pattern (and credentials if needed)

Note that we created a stream-reading role and assigned it to an instance profile, which is much nicer than trying to manage access keys and secrets for each agent install. Terraform makes this ridiculously easy.

So, how does it work?

Make no mistake, this first effort was an exercise in time management. We needed to get data flowing from NSQ to our Kinesis stream as quickly as possible so we could validate our stream-consumer code. As you might imagine, the various buffers involved in going from the nsq output utility to kinesis input utility lead to some significant wait time.

In some cases this simple setup might be adequate, but it was insufficient for our needs. For one thing, there seemed to be no easy way to parse our partition key out of our messages, so we allowed the agent to assign a random partition key. This meant that the stream of messages arriving from a particular user would go to a random shard in the stream, which in turn means the message would be handled by a random consumer in our multiple-consumer production setup. This was adequate for initial proof of concept, but not for real use. We wanted the messages from a single user to go to the same stream shard consistently, to allow for use cases that consider the full stream.

The other issue with this setup is latency. The quick-and-dirty setup interjects a file write/read into the process, with some buffering and batching as well. At the message volumes that run through our dev environment, we were getting a batch put through to Kinesis every 30 seconds.

Acceptable as a development workaround, not useful for our production needs. But really easy.

NSQ-Kinesis Bridge

With a working test environment in place, the next priority was a full KPL producer, intended to satisfy our production message rates and latency requirements. The best example for what we needed was the java client in the AWS Big Data Blog posting on the KPL. That code required only minor changes to consume data from an NSQ topic and write it to Kinesis.

The structure of the KPL example is simple: the intake-consumer and stream-writer each run on their own threads, and communicate via a BlockingQueue. Using the JavaNSQClient library, the NSQ consumer setup is a short lambda posting each new message to the shared queue:

consumer = new NSQConsumer(lookup, nsqTopic, nsqChannel, (msg) -> {
    try {
        messageQueue.put(msg);
    } catch (InterruptedException ie) {
        msg.requeue();
    }
});

The producer is a lightly-edited revamp of the final producer from the blog post, to extract our partition key from the message, pass the message to Kinesis, and then report back the success or failure cases to NSQ:

protected void runOnce() throws Exception {
    NSQMessage msg = inputQueue.take();
    String partitionKey = getPartitionKey(msg.getMessage());
    ByteBuffer data = ByteBuffer.wrap(msg.getMessage());

    ListenableFuture<UserRecordResult> f = 
        kinesis.addUserRecord(STREAM_NAME, partitionKey, data);
    Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
        @Override
        public void onSuccess(UserRecordResult result) {
            msg.finished();
        }

        @Override
        public void onFailure(Throwable t) {
            if (t instanceof UserRecordFailedException) {
                msg.requeue();
            }
        }
    });
}

Our final implementation has a few other niceties (collecting/publishing metrics for Prometheus, parameter handling for runtime constants such as NSQ topic/channel and Kinesis stream, etc) but the changes above are the only substantive ones required to work with NSQ.

Performance Observations

Our current message throughput is running about 20k/s, with a gentle oscillation over the day as the bulk of our users go through their commute hours. Using the KPL's message aggregation, we never get close to the records/sec thresholds even though we routinely use most of the provisioned stream bandwidth.

The initial implementation focused on operation and correctness rather than latency. Once up and running, our baseline setup was delivering an average delivery latency of ~80ms. Note that since the KPL batches records together, the average may not be a good indication of the distribution. However, average latency is the metric AWS delivers in CloudWatch, making it the easiest measurment to use for broad-brush tuning. (Instrumenting to collect latency as a histogram is on the TODO list)

The most obvious parameter to tweak is MaxBufferedTime, but therein lies an interesting issue. We initially assumed that our message rate would keep the KPL turning over buffers quickly, so we began with MaxBufferedTime of 500ms. With that setting we were seeing latency for our PutRecords calls averaging 80ms in CloudWatch. Cutting the value to 200ms reduced our average latency to 20ms or less, which is sufficient for our current needs.

To date, neither memory nor CPU have been an issue for us. We run a trio of c4.xlarge instances, which hover around 50% of RAM and 40% CPU usage. The cluster may be a bit network-constrained but not worrying yet. The KPL is not particularly memory-hungry, but we do notice that the KPL bandwidth (outgoing) is about 25% more than our incoming. There is probably another set of experiments to figure out optimal tuning and sizing for this application.

Running Happily

We have been running our NSQ-to-Kinesis messaging bridge at production volume for several weeks now, and our full streaming service for a bit less than that. Overall it is working well for us. In a future post we'll cover some of the challenges we faced as we implemented our streaming service, and share a few gotchas (and workarounds) we stumbled over in our efforts.

Links:

Event Sourcing: Messaging as the Single Source of Truth (and don't miss the links at the end of that post)

Useful Kinesis examples: