Life360 Engineering

Kent Hoxsey
Infrastructure Engineer
Oct 27, 2017
From the Engineering dept

Wading Deeper into Kinesis Streams

Challenges and a few solutions working with Kinesis Streams

In a previous episode, we outlined the steps we took to pipe data from an NSQ topic to a Kinesis Stream. If that article felt way too happy, devoid of any obstacle, that's because we left out a few things. Where that post was just getting our feet wet, this time we wade in a little deeper to illustrate some of the bits that gave us trouble.

Put vs Offer

One thing that did go exactly as described previously was the initial producer coding. The AWS Big Data Blog post on Writing Efficient Producers was remarkably well-defined for our needs. Getting that code up and running in our development environment was simple and obvious, and started doing useful work almost immediately.

However, there is a huge difference in message rate between our development and production environments. Once we put that initial code under production-scale loads, it began having problems. The code was re-queueing most (if not all) of the NSQ messages. Not only was the code not feeding messages into Kinesis, it was throwing 10s of thousands of REQs back into the NSQ pool, raising alerts and garnering undesired attention from the ops team.

A requeue (REQ) in NSQ is a mechanism for the system to keep you from losing a message that your Consumer could not process. It isn't an error (the queue is doing its intended job), but we really want to design our system so REQs don't happen in the normal case.

Reviewing our code, the only call site that could be causing the REQs was the failure side of the NSQ consumer. At first this was confusing, because a) this pattern came directly from the example producer and b) it was working fine in our dev/QA environment:

consumer = new NSQConsumer(lookup, nsqTopic, nsqChannel, (msg) -> {
    try {
        messageQueue.offer(msg, largeIshTimeout);
    } catch (InterruptedException ie) {
        msg.requeue();
    }
});

As developers, sometimes our job is to accept the reality of our implementation choices even when that reality clashes with our understanding. Like the universe is signaling an incipient AHA! moment. In this case, experimenting with the BlockingQueue interface quickly paid off: switching to messageQueue.put(msg) eliminated the REQs and started messages flowing smoothly through the NSQ-Kinesis bridge.

With the problem corrected, we had a chance to think more about what had happened and what it meant for the design. For this particular application, the BlockingQueue::offer approach was incorrect. offer is non-blocking, allowing the code posting to the queue to handle a queue-full situation as it sees fit. And the reason it didn't give us any trouble in the dev environment was simply that the queue we were using was large enough to hold all of the (much lower) traffic volume for the KPL service window. Instead, using the blocking call BlockingQueue::put allowed our application to block the NSQ consumer and wait while the KPL services the backlog. Not every application will have the same tradeoff between latency and completeness, so our experience should be taken as a useful example of working with the KPL and not the One True Method.

Error handling

Handling errors for a KPL-based program is a bit different from driving the API directly. The KPL is a relatively complex engine, running in its own process to manage the various stages of aggregation, encoding, and connection handling. When errors occur (access issues, throttling, etc) the KPL returns them to your program but does not raise any exceptions. It is up to your code to decide what to look at and how to handle the conditions. Here is a simple example to call from the producer's onFailure method, taken from that same AWS Big Data Blog post:

    private void dumpError(UserRecordFailedException e) {
        UserRecordResult result = e.getResult();

        String errorList = StringUtils.join(result.getAttempts().stream()
                .map(a -> String.format("Delay after prev attempt: %d ms, "
                                + "Duration: %d ms, Code: %s, "
                                + "Message: %s",
                        a.getDelay(),
                        a.getDuration(),
                        a.getErrorCode(),
                        a.getErrorMessage()))
                .collect(Collectors.toList()), "\n");

        System.out.println(String.format("Record failed to put, attempts:\n%s", errorList));
    }

You will get a huge amount of duplication with that method, because most errors are going to apply to every single message in the aggregated KPL record. But during normal operation there won't be any error messages, so it is easy to raise an alert when errors show up. To that end, we have a metric in our code providing the count of results in errorList, with alerts for whenever we see a spike in errors.

Provisioning and Throttling: Stream Writes

Up to this point, all of our efforts and thinking have focused on getting the nsq-to-kinesis bridge running, processing messages without throwing alarms. But then we get the magical moment when things start to work as intended. Messages flowing, cpu and memory on our KPL instances stable, we get our first chance to look at Kinesis under load...and we notice weird behavior: surging waves of cpu load on our bridge nodes, and our throughput tickling the bright red line in the Kinesis monitoring chart for PutRecords(bytes). We're under-provisioned for our write volume, so our producers are being throttled.

Initially we made some estimates for how many stream shards to provision, built on the Kinesis pricing guide. In practice it turned out that even though our message-rate estimate was accurate, our message-size estimate was low and we ended up under-provisioned. Since we were just getting started and had no production dependencies, we could simply stop the message feed and resize the stream.

The best estimate we found for our shard sizing was the network traffic metric in our monitoring system. Our write-path usage matches very closely with the aggregated incoming network traffic. But a caveat: since we are using the KPL with record aggregation turned on, our PutRecords(count) totals are never anywhere near either our actual message rate or the provisioned limit. If we were not aggregating records, we would need to provision for a much higher number of smaller records.

Note also that you can check in your producer code for throttling and take appropriate action. In our case we really need to record the messages in the Kinesis stream for other uses, so we try to maintain some headroom in the stream to accomodate surges and spikes.

Provisioning and Throttling: Stream Reads

When you provision shards for a Kinesis stream, each shard provides 1MB/s of write capacity, and twice that in read capacity. In thinking about this read/write asymmetry, it appears that Kinesis provides enough capacity for every message written to the stream to be read back twice. In our case that means we are provisioning enough capacity to write the data to stream, read it with our consumer application, and read it one more time to archive it somewhere.

However, it also means that if you want to read a stream for any other applications, you'll need to provision for more read capacity. The Kinesis documentation suggests it is possible to add shards without interrupting the stream consumers, but when we tried it we had some issues.

For a little while this summer, we were bumping up against the write thresholds. Increasing the number of shards in the AWS console was easy enough, but we were surprised by what followed. We tried to scale up our stream by 50%, with the idea that AWS would spin up N/2 new shards and then shift over a portion of the partition key space to each new shard. What actually happened was that AWS started up 3*N new shards and shifted the partition key space around in ways we couldn't track. If only we had been prepared to collect and parse all the logs, it would have been interesting to see how the keyspace changed.

In the end, the stream settled in to our new provisioned capacity and things are good, but it was a useful caution that we need to study the scale-up more. It is possible we just did it wrong, or that our consumer application is not handling a shard event properly. In any case, if scaling streams up or down is part of your operational plan, it will be worthwhile to practice it with full production volumes before you need to do so with live services.

Provisioning and Throttling: Checkpointing

Kinesis consumers get their own library, the KCL, and are similarly simple to implement. KCL applications seem to have some additional responsibilities compared to producers, to make sure they scale properly with changes to stream shards as well as recording where they are in the stream. That second item (checkpointing) gave us a couple interesting moments.

The KCL requires a DynamoDB table named for the application and stream, and will set it up automatically for you. So the stream-consumer role differs quite a lot from the producer role. We defined our consumer role to be read-only on the stream, with read/write access to the DynamoDB tables used for checkpointing:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Resource": "arn:aws:dynamodb:us-east-1:1234567890AB:table/*.qa-thing-stream",
            "Action": [
                "dynamodb:DescribeTable",
                "dynamodb:CreateTable",
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:DeleteItem",
                "dynamodb:UpdateItem",
                "dynamodb:Scan"
            ]
        },
        {
            "Action": [
                "cloudwatch:ListMetrics",
                "cloudwatch:PutMetricData"
            ],
            "Effect": "Allow",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Resource": "arn:aws:kinesis:us-east-1:1234567890AB:stream/qa-thing-stream",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:ListStreams",
                "kinesis:Get*"
            ]
        }
    ]
}

Note that AWS allows for creative wildcarding in the ARNs, which makes it possible to differentiate multiple uses of the streams. For example partioning between dev/QA/prod using a namespace, or managing one role for all consumers of a stream like in the example above. The KCL will expect to create and update a table named for the application and the stream. So an application called ReadThings that reads from the stream qa-thing-stream will expect to create and update a DynamoDB table named ReadThings.qa-thing-stream. The wildcard in the DynamoDB constraint allows any application with the role to manage checkpoints for one stream, qa-thing-stream. So this role can be granted to any application that needs to read from qa-thing-stream, and the KCL will be able to create and maintain its checkpoints properly.

Each DynamoDB table has its own provisioned read and write capacity. Like everything else in AWS, you pay for that provisioned capacity, and the defaults are low enough to keep from surprising you on your bill while still providing enough capacity to get your application running. Whether that capacity is sufficient for your application depends on how often you checkpoint your consumer.

This is the tricky part. It is pretty easy to overrun the default provisioned capacity in your consumer by checkpointing too often. When we had this problem, it presented as instability in the shards (leases shifting around among multiple consumers). The stream monitoring will show that you are within provisioned limits, yet the consumer will act like it is being throttled. The throttling will only become obvious when you look at the monitoring for the DynamoDB table, and see that the application is flatlining at its write-throughput limits.

The fix in our case was to increase the provisioned write capacity on the DynamoDB table, and checkpoint our consumer state much less often. This particular application only needs to read from the tip of the stream, so the checkpoint is more about monitoring whether the consumer is getting behind than saving status for recovery.

After-the-fact-fix: while writing this post, we found a couple very useful entries on this topic in the AWS Big Data blog. They discuss DynamoDB Streams and so probably escaped our earlier attention, but one goes into detail about the parameters to tune your KCL client and provision DynamoDB properly.

We are in the process of building an example consumer with the KCL for our team to use as a starting point for Kinesis applications, and will probably blog about that when we get it working properly.