Thursday, December 3, 2015

On Cassandra Striping Across Availability Zones

We've had mixed advice given to us in regards to how Cassandra stripes its data across AWS Availability Zones (mapped to "Racks" via the EC2MultiRegionSnitch). Some said that striping is guaranteed to be across all 3 AZs our nodes are deployed across per Region. While others said the striping is simply a "best effort" operation. This has meant that we've had to take a conservative approach and assume equal distribution of partitions across AZs can not be relied on. The practical implication of this was that when an entire AWS AZ, or even 2 nodes within an AZ went down, we'd have to fail-over to the Disaster Recovery half of our cluster (an AWS Region mapped to a logical "Datacenter" via the EC2MultiRegionSnitch) to achieve the consistency level of LOCAL_QUORUM required by our applications in order to achieve "strong consistency". This was not cool... So we've done a few some tests to see what the real story is.

Our approach was to develop a little test app that iteratively selected all rows in one of the biggest tables in our schema, "log", where all our raw messages are stored. The app used a consistency level of ALL, which meant that all replicas of the partition being requested must reply for the select to be considered successful. As is standard for our deployments, we used a replication factor of 3 (every partition is replicated 3 times per logical datacenter, of which we have 2). This means that if one of 3 replicas for a partition is unavailable, the select will fail. The app will ignore these failures (as opposed to retrying the query) and just output the count of successfully selected partitions.

This app was then used in 4 different scenarios:
  1. After a large, heavy backload had been performed. Done simply to get the count of partitions with all of the replicas available.
  2. After an entire AZ was taken down. In this test we'd expect no selects at a consistency level of ALL to succeed if striping was perfectly equally distributing replicas across AZs.
  3. After yet more load had been applied while an entire AZ is still down. This was to determine that with an AZ completely unavailable, we don't regress to striping all 3 replicas across just 2 AZs. Again we expect (desire) all selects with a consistency level of ALL to fail.
  4. After the AZ has been brought up again. This is mainly done for the sake of completeness and we of course expect all selects with a consistency level of ALL to succeed.

What happened?

Test 1
The result of Test 1 was that 296,904,900 partitions were successfully selected. This isn't interesting in itself, but this number will come in handy later.

Test 2
The result of Test 2 was that 0 selects were successful; a good result indicating that data has been striped consistently across 3 AZs. Importantly, as we later found out, this testing was done with a "fetch size" (number of rows to attempt to fetch for purposes of pagination, etc) of 1. Out of curiosity we'd run this test again with just 1, 2, and 3 of our 4 nodes in the AZ down. We'd of course expected to see roughly 74,226,225 (one quarter of 296,904,900) of selects succeed at a consistency level of ALL per node in the AZ still up. Interestingly if a single partition in a fetch does not have the required consistency level available, the entire fetch appears to fail because of this. So the only way to get the precise number of successful selects with a consistency level of ALL and multiple nodes down in an AZ is to use a fetch size of 1, which is practically infeasible in terms of time due to the performance hit of doing so. Thankfully with an entire AZ down it seems that Cassandra is smart enough to know that all queries at consistency level ALL will fail and it does so very quickly, even with a fetch size of 1.

All Nodes in AZ Up1 Node in AZ Down2 Nodes in AZ Down3 Nodes in AZ DownAll Nodes in AZ Down
Fetch SizeSuccessful SelectsSuccessful SelectsSuccessful SelectsSuccessful SelectsSuccessful Selects
1296,904,900*~ 222,678,67*~ 148,452,450*~ 74,226,225*0
10296,904,900*48,093,96017,281,8408,161,2800
100296,904,90041,047,2045,071,2001,059,5000
1,000296,904,900*5,644,0003,213,000605,0000
10,000296,904,900*2,660,0002,360,000470,0000
* Not an actual measurement, but what we'd expect to see.

Test 3
The result of Test 3 was that we once again found that no selects were successful, even with a fetch size of 1. This is good because it indicates that striping has not regressed to putting all 3 replicas across Cassandra nodes in the only 2 still available AZs. Instead it will be writing hinted handoffs that will persist for 3 hours, so that if the downed nodes were only temporally available, then the missed writes could be replicated to them within those 3 hours. As our nodes in the downed AZ are completely gone (losing the data on their ephemeral disks), we'll instead need to stream all lost data to them when they're eventually replaced.

Test 4
As we'd sent in more messages since Test 1, the total number of entries in the log table was now 301,373,773. As an aside, a handy way to monitor bootstrap streaming progress is with the following command-line fu:
watch -n 10 'nodetool netstats | grep Receiving | gawk {'"'"' print $11/$4*100"% Complete, "$11/1024/1024/1024" GB remaining" '"'"'}'

How does this manifest in our applications which use a consistency level of LOCAL_QUORUM for "strong consistency"?


All of our applications use a hard-coded consistency level of LOCAL_QUORUM for all reads and writes to Cassandra in order to ensure "strong consistency". As an outcome of our testing, we now know that with our standard configuration of a Replication Factor of 3, Ec2MultiRegionSnitch, NetworkTopologyStrategy, and Cassandra nodes distributed across 3 AWS Availability Zones per Region; we can afford to lose an entire AZ of nodes (e.g. lose all of AZ C)  and still not need to fail over to the Disaster Recovery region to consistently achieve our LOCAL_QUORUM consistency. However, if we lose one node in one AZ and another node in a different AZ (e.g. one node in AZ A and another in AZ B), we will have to fail over for some requests.

The following is a diagram of how all this manifests in terms of data striping in Cassandra with our configuration. Only writes have been shown as read behaviour is the same.


Wednesday, June 10, 2015

Why not to multi-tenant Cassandra

Cassandra and the Cloud go together like <insert hilarious comparison here>. So does multi-tenancy and the Cloud.

As well as being able to save costs by elastically horizontally scaling in response to load during a day, it’s an attractive idea to make further strides by sharing hardware between customers (multi-tenanting). 

I’m not against multi-tenancy per se… I think it’s a great idea. But I am against doing it with Cassandra and here’s why…

You basically have 3 real options for multi-tenanting Cassandra, each with pros and cons:

By Keyspace
Pros:
  • Tenants may have different replication factors.
  • Tenants may have different schemas (enabling them to potentially run different applications).
  • Tenants don’t share SSTables and data is effectively isolated by Cassandra.
Cons:
  • Your scalability is limited because holding all the extra keyspaces and their tables in memory does not play well with Cassandra.
By Table
Pros:
  • Tenants may have different schemas (enabling them to potentially run different applications).
  • Tenants don’t share SSTables and data is effectively isolated by Cassandra.
Cons:
  • Your scalability is limited because holding all the extra tables in memory does not play well with Cassandra.
  • More complex application logic is needed to deal with different tables per tenant.
By Row
Pros:
  • A scalable approach as your memory requirements are the same as for one tenant.
Cons:
  • Tenants must have compatible schemas.
  • Tenants share both SSTables and Memtables, so isolation has to be done in the application, but active tenants can hinder the effectiveness of the shared caches for other tenants and cause compaction issues that will impact every tenant.
Having multiple Logical Datacenters has also been suggested to isolate the workloads of different customers. But this is designed for different workloads (e.g. BAU and Analytics) on the same data. If you want to isolate by tenant (data) then you’ll lose the benefits of multi-tenancy as you’ll no longer be sharing the hardware.

You can not win.

If you really want to save wastage costs from unused hardware, I'd consider increasing the granularity of your instances so that the wastage is reduced.

Thursday, May 14, 2015

Cassandra Compaction Strategies Under Heavy Delete Workflows

There are many factors at play when we look at what compaction strategies to use for our tables in Cassandra. Bulk deleting large quantities of data is something that is usually avoided in Cassandra. However, we have requirements that necessitate it (e.g. a requirement to hard-delete patient data when they've opted out of our service) and there are certain advantages to being able to do it (e.g. freeing up space when data has become "orphaned", never to be read again).

The big concern with deleting large quantities of data is tombstones. We have a log-structured file system in Cassandra in that we always append writes, avoiding disk seeks (how we achieve such awesome write performance). Even deletes are writes in the form of tombstones which mark data as having been deleted. Having lots of tombstones is a bad thing as we need to pull back the tombstones along with the upserts (inserts and updates are fairly much the same thing) for our data when we read it back. This spreads reads across SSTables and means we're holding more information in memory in order to resolve the current state of the data we're pulling back.

Because of this, it's important to understand the behaviour of different compaction strategies in relation to tombstones when we have a heavy delete workload. These were my findings:

Leveled Compaction Strategy (LCS)

Compaction load during heavy deletes was greatest with LCS. The upside being that this enabled more data to be compacted away during the delete workload, reducing the size consumed by the table as we went.

Delete throughput was also less under LCS than the other strategies due to the compaction load, but improved over time as the data size reduced.

Until we get Cassandra 3.0, nodetool compact (triggering a "major" compaction) is a no-op under LCS. This means that we can not opt to compact away all of the deleted data, leaving only the tombstones (assuming gc_grace_seconds hasn't expired). This is a big blow to the space saving use case as well as potentially causing issues around whether the data has been "hard" deleted.

A selling point for LCS over STCS is that you don't need the space 50% overhead in order to ensure compaction can safely happen.

Size Tiered Compaction Strategy (STCS)

The default compaction strategy, STCS, had a very light compaction load during the delete workflow. As a result, the data volumes actually grew with the tombstones being written and not much of the original data being compacted away.

Delete throughput was about the same as that of DTCS, around 34% higher than that of LCS in this test. Like with DTCS, delete throughput also remained fairly constant over time.

Triggering a "major" compaction with nodetool compact on STCS is generally not recommended as when you compact all of your data into one giant SSTable, it is going to be a very long time (if ever) that future SSTables will get up to a size with which they'll compact with your monolithic table. However, this did reduce the table size considerably as the tombstones (gc_grace_seconds hadn't expired) are a lot smaller than the actual data.

A downside to STCS is that it requires a 50% space overhead in order to guarantee it has enough space in order to perform a compaction. This can be a significant cost in hardware that could otherwise be utilised when you're at large scale.

Date Tiered Compaction Strategy (DTCS)

As with STCS, the compaction load was very light (we were only compacting the last hour of data together) and our data volume actually grew as we wrote more tombstones (the writes occurred a long time before the deletes).

Delete throughput was about the same as that of STCS, around 34% higher than that of LCS in this test, and remained fairly constant over time.

Triggering a major compaction had the same result as with STCS in that we wound up with one monolithic table and saved a tonne of space. However, unlike STCS the monolithic nature of this SSTable isn't so much of a concern under DTCS as we're not interested in compacting new data together with old data anyway as we're usually using it for more time-series type data.

How does this feed back into what Compaction Strategy to use and where?

In order of descending preference...

DTCS - Use anywhere we're going to do many immutable or very, very infrequently updated/deleted writes and we're only fetching back either a single logical row or a slice of rows with very close timestamps. Preferred over LCS as it scales better with data volume and enables major compactions.

LCS - Use for large tables (size overhead) where DTCS isn't suitable and we're unlikely to be concerned with freeing up space from bulk delete operations. Preferred over STCS as it means we can push our disk space utilisation further. Note: Requires some grunt to power (decent disks, decent compaction settings).

STCS - Use for small tables and tables unsuited to DTCS but where we really need to free up space or have data hard-deleted on bulk.

Tuesday, September 23, 2014

On Naming Things

"New York, New York. So good they named it twice." Gerard Kenny 
Naming things is hard.

That being said, I haven't found any area that has struggled with naming things as much as Java Garbage Collection...

For instance, even official Oracle documentation uses the term "infant mortality" to describe objects that "die" in the Young generation. While I get the metaphor (joke?), it's not really appropriate to talk about the infant mortality rate associated with your application when you're working in the Health domain. Saying things like "...the minor collection can take advantage of the high infant mortality rate." doesn't endear you to your coworkers.

There's also a lot of inconsistency and repurposing of terms.

The terms "Young Generation" and "New Generation" are often used interchangeably (even within the same documents)...

"The NewSize and MaxNewSize parameters control the new generation’s minimum and maximum size. Regulate the new generation size by setting these parameters equal. The bigger the younger generation, the less often minor collections occur. The size of the young generation relative to the old generation is controlled by NewRatio. For example, setting -XX:NewRatio=3 means that the ratio between the old and young generation is 1:3, the combined size of eden and the survivor spaces will be fourth of the heap." - Sun Java System Application Server Enterprise Edition 8.2 Performance Tuning Guide

The same goes for "Old Generation" and "Tenured Generation"... 

Watch out though! The term "Tenured" is also used synonymously with "Promoted" when it comes to objects moving through the generations.

It gets worse when you read people's blogs and see them try to provide their own interpretations of the terminology. I read one today where the guy was trying to say some major collections aren't really major collections in his opinion... It just adds to the confusion.

Monday, July 7, 2014

On JMeter Reporting To Database

For this post I thought I'd talk about a plug-in I developed for Apache JMeter.

It's called Aggregate Report To Database. If you're familiar with Apache JMeter, you're no doubt familiar with the Aggregate Report plug-in. My Aggregate Report To Database plug-in builds upon the functionality of the Aggregate Report to enable both automatic and manually triggered saving of results to a Database (currently only Oracle, but would be very easy to enable others).



What does this achieve?

The main reason I developed this plug-in was in order to make it simple for development teams to take the performance testing scripts I'd made for their products and re-purpose them for nightly performance frameworks. Now teams can leverage existing scripts and have them run against their nightly builds to have traceability of performance improvements and regressions to builds, to quantify performance improvements and regressions, and to analyse performance trends over time.

How does it achieve this?

The build system triggers a JMeter test to run, the results of which are automatically saved into a relational database at the end of the test (so that saving the results has no impact on the test itself). We use Atlassian Confluence as an internal wiki. A page is configured by teams in the wiki to automatically graph the results (by querying the results database) of the past 2 weeks of performance testing against nightly builds for all the different features in a product. This makes it easy for a team to see at a glance how performance of different functionality has trended over time by simply loading a page. It's also an interesting set of graphs to show at a sprint demo.

Where can I get this plug-in?

I haven't currently published this plug-in publicly, but this is something I am definitely interested in doing if there is significant interest in getting it. I'd just have to clean a few things up and get permission from my employer to make it open source.

I'd like to thank Viktoriia Kuznetcova for her help in developing this plug-in.=)

 






Sunday, June 8, 2014

On Java Garbage Collection Analysis & Tuning

1. Introduction

The purpose of this page is to serve as an introductory guide (or a cheat-sheet for the experienced but out of practice) on how to perform basic GC analysis and tuning.
This is not intended to be an in depth guide to the vast field of JVM tuning.

2. Collectors and their pros/cons

In most enterprise level applications, you will only ever want to use UseParallelOldGC or UseConcMarkSweepGC.
Here are the main different collectors and their pros and cons.
  • -XX:+UseSerialGC
    • A single GC thread.
    • Only suited to single processor machines and up to 100MB of data.
    • This will likely never be the best option with modern web applications.
  • -XX:+UseParallelGC
    • Multiple GC threads for minor collections, single threaded for major collections (old generation).
    • I don't know of any reason to use this collector if you're on a JVM (Java 5u6+) that has UseParallelOldGC.
  • -XX:+UseParallelOldGC
    • Multiple GC threads for both minor and major collections (old generation).
    • This is what you want to use if you're interested in getting the highest throughput out of your application, and pause times are not as big of a concern.
    • Colloquially referred to as the throughput collector.
  • -XX:+UseConcMarkSweepGC
    • Performs most of it's garbage collection activity concurrently which results in less overall throughput than UseParallelOldGC, but also less time spent in "Stop The World" pauses.
    • This is what you want to use if response times are more important than overall throughput. For example on HP Itanium hardware that has poor collection rates.
    • Has 2 stop the world pause events with each major collection (mark and re-mark), although these are relatively short compared to the Full GCs of UseParallelOldGC.
    • You won't have to perform any Full GCs unless you encounter a concurrent mode failure (objects are being tenured into the old generation faster than they can be reclaimed from it).
    • Colloquially referred to as the concurrent collector.

3. Some background terminology and process

Terminology around Java Garbage Collection can be very confusing with multiple names for the same thing and reused terms (objects are tenured, plus there is a tenured generation).
Hopefully this will help...

3.1. Generations

  • New / Young Generation
    • Spaces in this generation have minor collections.
    • Consists of:
      • Eden Space
      • Survivor Spaces
  • Old / Tenured Generation
    • There is one space in this generation and it has major collections (e.g. CMS or Full GC).
  • Permanent (Perm) Generation
    • Used by the JVM for storing classes, methods etc.
  • Code Cache
    • Used by the HotSpot compiler for compilation and storage of native code.

3.2. Tenuring/Promotion

Objects are initially created in the eden space (in the new/young generation).
Minor collections will tenure/promote objects that have not yet been de-referenced (memory pointers still exist that point to these objects) from eden and through the survivor spaces and into the old generation.
Major collections happen in the old generation. There is nowhere for objects to be promoted to from this generation, so objects will either be collected and their space freed up, or persist and potentially cause issues.

4. How to identify a typical memory leak

Note: The tool I prefer using and have used in these screenshots, is HPJMeter.

4.1. What does a healthy JVM look like?

We can see that this JVM is healthy from a number of different views.
The following is a graph of old generation size before GC from a healthy production server under peak load using the concurrent collector.

Here we can see that the size of the old generation isn't increasing after each consecutive time we have a CMS collection. The size of the old generation after a CMS collection goes up and down, but there is no overall trend of increasing.
We could also see this on a graph of reclaimed bytes.

Here we see that the number of bytes reclaimed by the CMS collections fluctuate with the load (more concurrent users means more objects persisted), but the trend is flat.

4.2. What does a memory leak look like?

The following is a graph of the old generation before GC from a very unhealthy development server using the throughput collector.

We can see that with every successive Full GC, the old generation gets bigger and bigger.
Also note that the interval between Full GCs decreases with time.
This continues until we see the purple line, at which time we are in what is commonly referred to as GC Hell. At this point the JVM is unable to clear any objects and is spending almost all of its time in garbage collection. The user will experience the system becoming completely unresponsive.
Once the JVM gets to a point where it is spending 98% of time in garbage collection and less than 2% of the heap is being recovered, an OutOfMemory (OOM) error will be thrown. This will cause the JVM to crash!
The following is a graph showing reclaimed bytes for the same period. We can see here that with each successive collection we are reclaiming less and less bytes. If this is allowed to continue, an OOM error will certainly be thrown.


5. What tuning is typically done

5.1. Sizing the heap


The maximum heap size should be set high enough so that you have enough space for your retained heap (where your heap size drops to after a major collection) under peak load and enough head-room so that the interval between major collections isn't too low.
Typically the concurrent collector requires more head-room than the throughput collector.
Finding the ideal heap size will require load testing and analysis.
Once you have determined what maximum heap size works best for you, you may want to set the minimum heap size to the same value in order to avoid poor performance while the JVM gradually increases the heap size from the minimum to your maximum.

5.2. Sizing the generations

Depending on the nature of your application (how transient the objects are), you'll want to size the generations differently.
Sizes of generations can be explicit or by ratio.
Ratio is somewhat preferable in my personal opinion as this should mean that once we understand the "nature" of our application, we can adjust for the scale of deployment by simply adjusting the heap size.
The main ratio you're likely to need to size is the New Ratio.
Setting the New Ratio can sometimes be confusing. If you have the New Ratio set to 2 (-XX:NewRatio=2), the ratio between the old and the young generation becomes 1:2. This means that 2/3 of the heap is occupied by the old generation and 1/3 is occupied by the new.

5.3. CMS Initiating Occupancy Fraction

When using the concurrent collector, you'll want to set the CMS Initiating Occupancy Fraction.
This can be done by setting -XX:CMSInitiatingOccupancyFraction=<N> where <N> is the percentage of the tenured (old) generation size that needs to be full before a CMS collection is triggered.
This will of course have to be a percentage of the old generation that is higher than where our retained heap size sits at peak load.
It will have to be high enough so that we aren't constantly having CMS collections and their associated pauses.
It will also have to be lower than the point at which triggering a CMS will happen too late and result in Concurrent Mode Failure and a Full GC.
Again, tuning this will require load testing and analysis.

6. What other JVM parameters are handy

  • -XX:+DisableExplicitGC
    • Applications can programmatically trigger a Full GC by calling System.gc().
    • This is not desirable and this flag will disable this feature.
  • -XX:+HeapDumpOnOutOfMemoryError 
    • This will cause the JVM to generate a heap dump when you run out of memory (once the JVM gets to a point where it is spending 98% of time in garbage collection and less than 2% of the heap is being recovered).
    • Having a heap dump will mean you can analyse it in Eclipse Memory Analyser in order to determine which objects are being persisted and causing the OOM error.

Wednesday, June 4, 2014

On Performance Engineering Yourself

The Force Is Strong With This One


Lately I've been paired performance testing a product in what I like to think of as a sort of Emperor Palpatine and Darth Maul/Vader arrangement. My apprentice has been running load tests in various configurations whilst I've been working behind the scenes to generate 250 million insurance claims with distributions representative of real data to underpin future testing.


Cloud City

Due to the scale of the testing that we're doing, this would have been impossible to do on internal hardware. As such we chose to use Amazon Web Services (AWS) after previously trying a few other cloud providers and being disappointed with their performance and reliability. So far AWS has been brilliant, no complaints whatsoever.

The possibilities offered by doing our testing in the cloud are quite exciting and have made me realise that the key place where optimisation is required in order to get the most value of it is actually ourselves. Previously, we were effectively bottlenecked on the available hardware. For example, if I was generating test data, the database would be hammered and no use for running load tests. This meant we had to manage our time so that load tests were run during the day and data generation and soak tests were run overnight and looked at the following morning.


A New Hope

Now we can quickly and easily spin up (using automated tooling), separate environments for each of us to work on tasks in parallel without worrying about interfering with each other. The coolest thing about this is it makes sense from a cost perspective too. If I spin up 32 machines for 1 hour, it costs the same as 16 of those machines for 2 hours. But, the most important thing is the overall cost is less spinning up more nodes because the person-hours spent was halved. And people are by far the biggest cost.

What this means though is to get the most value out of the cloud we need to be able to make our work highly parallelized. What if my apprentice could run every one of his planned load tests at the very same time? What might previously have been a weeks worth of testing could be finished in an hour, and importantly at lower cost. Of course there is overhead with analysis of results and reporting, but some of that can be automated too.

What if my data generation could take half the time if I threw twice the hardware at it? The possibilities are awesome. But they require us to optimise ourselves as engineers just like we would normally optimise our software.