Tuesday, June 28, 2011

Cassandra Load Balancing

Modifying Schema on a Live Cluster

This page discusses features available in 0.7.

Under the Hood

A new system table called definitions keeps track of two things: keyspace definitions (SCHEMA_CF) and keyspace changes (MIGRATIONS_CF). TimeUUIDs are used throughout to match migrations up with schema and vice-versa.

Keyspace Definitions (SCHEMA_CF)

The current set of keyspace definitions are stored in a single row, one keyspace per column with a TimeUUID as the row key (also serves as version identifier), keyspace name as column name, and definition serialization as the column value. There exists a special row, keyed by "Last Migration" that contains a single column indicating the current schema version UUID. This makes it easy to look up the version and then retrieve it.

Migrations (MIGRATIONS_CF)

MIGRATIONS_CF tracks the individual modifications (add, drop, rename) that are made to the schema. It consists of a single row keyed by "Migrations Key" with one column per migration. Each column has the migration version UUID as its name, with the serialized migration as its value.


Client Operation

  • Add column family or keyspace
  • Drop column family or keyspace
  • Rename column family or keyspace
These are all executed via the Thrift API. It is expected that you have ALL access if you are using security. For rename and drop operations the client will block until all associated files are renamed or deleted.

Server Migration Process

Applying a migration consists of the following steps:
  1. Generate the migration, which includes a new version UUID.
    1. DROPs only: snapshot the data that is going away.
  2. Update SCHEMA_CF with a new schema row.
  3. Update MIGRATION_CF by appending a migration column.
  4. Update the "Last Migration" row in SCHEMA_CF.
  5. Flush the definitions table.
  6. Update runtime data structures (create directories, do deletions, etc.)

Handling Failure

A node can fail during any step of the update process. Here is an examination of what will happen if a node fails after each part of the update process (see Server Migration Process above).
  1. Nothing has been applied. Update fails outright.
    1. Same. You will have an extra snapshot though.
  2. Extra data exists in SCHEMA_CF but will be ignored because "Last Migration" was not updated.
  3. Extra data exists in SCHEMA_CF and MIGRATION_CF but will be ignored because "Last Migration" was not updated.
  4. Broken: commit log will not be replayed until *after* schemas are loaded on restart. This means that the "Last Migration" will be read, but will not be able to be loaded and applied.
  5. Startup will happen normally.
  6. Startup will happen normally.
If a node crashes during a migration, chances are you will have to do some manual cleanup. For example, if a node cashes after steps 4 or 5 of a DROP migration, you will need to manually delete the data files. (Not deleting them does no harm unless you 'recreate' the same CF via ADD later on. Then you have an instant database.)

Starting Up

When a node starts up, it checks SCHEMA_CF to find out the latest schema version it has. If it finds nothing (as would happen with a new cluster), it loads nothing and logs a warning. Otherwise, it uses the uuid it just read in to load the correct row from SCHEMA_CF. That row is deserialized into one or more keyspace definitions which are then loaded in a manner similar to the load-from-xml approach used in the past.
At the same time, the node incorporates its schema version UUID into the gossip digests it sends to other nodes. It may be the case that this node does not have the latest schema definitions (as a result of network partition, bootstrapping a new node, or any other reason you can think of). When a version mismatch is detected the definition promulgation mechanism described next is invoked.

Definition Promulgation

Definition promulgation consists of two phases: announce and push. announce is a way for node A to declare to node B "this is the schema version I have". If the versions are equal, the message is ignored. If A is newer, B responds with an announce to A (this functions as a request for updates). If A is older, B responds with a push containing all the migrations from B that A doesn't have.
When a schema update originates from the client (Thrift), gossip promulgation is bypassed and this announce-announce-push approach is used to push migrations to other nodes.

New Cluster (Fresh 0.7)

For new clusters, things will work best if you start with one node and apply migrations using Thrift until you get the schema you want. Then bring new nodes online and they will pull migrations from the first node (or each other in a large cluster).
Alternatively, you could then shut down the first node and manually copy its SCHEMA_CF and MIGRATIONS_CF to each new node in the cluster.
The simplest method of applying these schema changes is with bin/cassandra-cli. You can either do this interactively, or place the commands in a file and apply them in batch mode (type help and help <command> to see the available commands). For example:
$ cat schema.txt
/* Create a new keyspace */
create keyspace Keyspace1 with replication_factor = 3 and placement_strategy = 'org.apache.cassandra.locator.RackUnawareStrategy';

/* Switch to the new keyspace */
use Keyspace1;

/* Create new column families */
create column family Standard1 with column_type = 'Standard' and comparator = 'BytesType';
create column family Standard2 with column_type = 'Standard' and comparator = 'UTF8Type' and rows_cached = 10000;
$ bin/cassandra-cli --host localhost --batch < schema.txt

Existing Cluster (Upgrade from 0.6)

To provide some backwards compatibility, we've provided a JMX method in the StorageServiceMBean that can be used to manually load schema definitions from storage-conf.xml. This is a one-shot operation though, and will only work on a system that contains no existing migrations. If you are upgrading a cluster, you will probably only have to do this for one node (a seed). Gossip will take care of promulgating the changes to the rest of the nodes as they come online.
For those who dont know how to do it (like me):
ps aux | grep cassandra # get pid of cassandra
jconsole PID
MBeans -> org.apache.cassandra.db -> StorageService -> Operations -> loadSchemaFromYAML
Lastly, there is a system tool that can poke the same JMX method without having to remember its location:
bin/schematool HOST PORT import


It is entirely possible and expected that a node will receive migration pushes from multiple nodes. Because of this, all migrations are applied on a single-threaded stage and versions are checked throughout to make sure that no migration is applied twice, and no migration is applied out of sync.
Each migration knows the version UUID of the migration that immediately precedes it. If a node is asked to apply a migration and its current version UUID does not match the last version UUID of the migration, the migration is discarded.
One weakness of this model is that it is vulnerable if a new update starts before another update is promulgated to all live nodes--only one migration can be active within a cluster at any time. One way to get around this is to choose one node and only initiate migrations through it.

Ring management

Each Cassandra server [node] is assigned a unique Token that determines what keys it is the first replica for. If you sort all nodes' Tokens, the Range of keys each is responsible for is (PreviousToken, MyToken], that is, from the previous token (exclusive) to the node's token (inclusive). The machine with the lowest Token gets both all keys less than that token, and all keys greater than the largest Token; this is called a "wrapping Range."
(Note that there is nothing special about being the "primary" replica, in the sense of being a point of failure.)
When the RandomPartitioner is used, Tokens are integers from 0 to 2**127. Keys are converted to this range by MD5 hashing for comparison with Tokens. (Thus, keys are always convertible to Tokens, but the reverse is not always true.)

Token selection

Using a strong hash function means RandomPartitioner keys will, on average, be evenly spread across the Token space, but you can still have imbalances if your Tokens do not divide up the range evenly, so you should specify InitialToken to your first nodes as i * (2**127 / N) for i = 0 .. N-1. In Cassandra 0.7, you should specify initial_token in cassandra.yaml.
With NetworkTopologyStrategy, you should calculate the tokens the nodes in each DC independently. Tokens still needed to be unique, so you can add 1 to the tokens in the 2nd DC, add 2 in the 3rd, and so on. Thus, for a 4-node cluster in 2 datacenters, you would have
node 1 = 0
node 2 = 85070591730234615865843651857942052864

node 3 = 1
node 4 = 85070591730234615865843651857942052865
If you happen to have the same number of nodes in each data center, you can also alternate data centers when assigning tokens:
[DC1] node 1 = 0
[DC2] node 2 = 42535295865117307932921825928971026432
[DC1] node 3 = 85070591730234615865843651857942052864
[DC2] node 4 = 127605887595351923798765477786913079296
With order preserving partitioners, your key distribution will be application-dependent. You should still take your best guess at specifying initial tokens (guided by sampling actual data, if possible), but you will be more dependent on active load balancing (see below) and/or adding new nodes to hot spots.
Once data is placed on the cluster, the partitioner may not be changed without wiping and starting over.


A Cassandra cluster always divides up the key space into ranges delimited by Tokens as described above, but additional replica placement is customizable via IReplicaPlacementStrategy in the configuration file. The standard strategies are
  • RackUnawareStrategy: replicas are always placed on the next (in increasing Token order) N-1 nodes along the ring
  • RackAwareStrategy: replica 2 is placed in the first node along the ring the belongs in another data center than the first; the remaining N-2 replicas, if any, are placed on the first nodes along the ring in the same rack as the first
Note that with RackAwareStrategy, succeeding nodes along the ring should alternate data centers to avoid hot spots. For instance, if you have nodes A, B, C, and D in increasing Token order, and instead of alternating you place A and B in DC1, and C and D in DC2, then nodes C and A will have disproportionately more data on them because they will be the replica destination for every Token range in the other data center.
  • The corollary to this is, if you want to start with a single DC and add another later, when you add the second DC you should add as many nodes as you have in the first rather than adding a node or two at a time gradually.
Replication factor is not really intended to be changed in a live cluster either, but increasing it is conceptually simple: update the replication_factor from the CLI (see below), then run repair against each node in your cluster so that all the new replicas that are supposed to have the data, actually do.
Until repair is finished, you have 3 options:
  • read at ConsistencyLevel.QUORUM or ALL (depending on your existing replication factor) to make sure that a replica that actually has the data is consulted
  • continue reading at lower CL, accepting that some requests will fail (usually only the first for a given query, if ReadRepair is enabled)
  • take downtime while repair runs
The same options apply to changing replication strategy.
Reducing replication factor is easily done and only requires running cleanup afterwards to remove extra replicas.
To update the replication factor on a live cluster, forget about cassandra.yaml. Rather you want to use cassandra-cli:
  • update keyspace Keyspace1 with replication_factor = 3;

Network topology

Besides datacenters, you can also tell Cassandra which nodes are in the same rack within a datacenter. Cassandra will use this to route both reads and data movement for Range changes to the nearest replicas. This is configured by a user-pluggable EndpointSnitch class in the configuration file.
EndpointSnitch is related to, but distinct from, replication strategy itself: RackAwareStrategy needs a properly configured Snitch to place replicas correctly, but even absent a Strategy that cares about datacenters, the rest of Cassandra will still be location-sensitive.
There is an example of a custom Snitch implementation in http://svn.apache.org/repos/asf/cassandra/tags/cassandra-0.6.1/contrib/property_snitch/.

Range changes


Adding new nodes is called "bootstrapping."
To bootstrap a node, turn AutoBootstrap on in the configuration file, and start it.
If you explicitly specify an InitialToken in the configuration, the new node will bootstrap to that position on the ring. Otherwise, it will pick a Token that will give it half the keys from the node with the most disk space used, that does not already have another node bootstrapping into its Range.
Important things to note:
  1. You should wait long enough for all the nodes in your cluster to become aware of the bootstrapping node via gossip before starting another bootstrap. The new node will log "Bootstrapping" when this is safe, 2 minutes after starting. (90s to make sure it has accurate load information, and 30s waiting for other nodes to start sending it inserts happening in its to-be-assumed part of the token ring.)
  2. Relating to point 1, one can only bootstrap N nodes at a time with automatic token picking, where N is the size of the existing cluster. If you need to more than double the size of your cluster, you have to wait for the first N nodes to finish until your cluster is size 2N before bootstrapping more nodes. So if your current cluster is 5 nodes and you want add 7 nodes, bootstrap 5 and let those finish before bootstrapping the last two.
  3. As a safety measure, Cassandra does not automatically remove data from nodes that "lose" part of their Token Range to a newly added node. Run nodetool cleanup on the source node(s) (neighboring nodes that shared the same subrange) when you are satisfied the new node is up and working. If you do not do this the old data will still be counted against the load on that node and future bootstrap attempts at choosing a location will be thrown off.
  4. When bootstrapping a new node, existing nodes have to divide the key space before beginning replication. This can take awhile, so be patient.
  5. During bootstrap, a node will drop the Thrift port and will not be accessible from nodetool.
  6. Bootstrap can take many hours when a lot of data is involved. See Streaming for how to monitor progress.
Cassandra is smart enough to transfer data from the nearest source node(s), if your EndpointSnitch is configured correctly. So, the new node doesn't need to be in the same datacenter as the primary replica for the Range it is bootstrapping into, as long as another replica is in the datacenter with the new one.
Bootstrap progress can be monitored using nodetool with the netstats argument (0.7 and later) or streams (Cassandra 0.6).
During bootstrap nodetool may report that the new node is not receiving nor sending any streams, this is because the sending node will copy out locally the data they will send to the receiving one, which can be seen in the sending node through the the "AntiCompacting... AntiCompacted" log messages.

Moving or Removing nodes

Removing nodes entirely

You can take a node out of the cluster with nodetool decommission to a live node, or nodetool removetoken (to any other machine) to remove a dead one. This will assign the ranges the old node was responsible for to other nodes, and replicate the appropriate data there. If decommission is used, the data will stream from the decommissioned node. If removetoken is used, the data will stream from the remaining replicas.
No data is removed automatically from the node being decommissioned, so if you want to put the node back into service at a different token on the ring, it should be removed manually.

Moving nodes

nodetool move: move the target node to a given Token. Moving is essentially a convenience over decommission + bootstrap.
As with bootstrap, see Streaming for how to monitor progress.

Load balancing

If you add nodes to your cluster your ring will be unbalanced and only way to get perfect balance is to compute new tokens for every node and assign them to each node manually by using nodetool move command.
Here's a python program which can be used to calculate new tokens for the nodes. There's more info on the subject at Ben Black's presentation at Cassandra Summit 2010. http://www.datastax.com/blog/slides-and-videos-cassandra-summit-2010
  • def tokens(nodes):
    • for x in xrange(nodes):
      • print 2 ** 127 / nodes * x
In versions of Cassandra 0.7.* and lower, there's also nodetool loadbalance: essentially a convenience over decommission + bootstrap, only instead of telling the target node where to move on the ring it will choose its location based on the same heuristic as Token selection on bootstrap. You should not use this as it doesn't rebalance the entire ring.
The status of move and balancing operations can be monitored using nodetool with the netstat argument. (Cassandra 0.6.* and lower use the streams argument).


Cassandra allows clients to specify the desired consistency level on reads and writes. (See API.) If R + W > N, where R, W, and N are respectively the read replica count, the write replica count, and the replication factor, all client reads will see the most recent write. Otherwise, readers may see older versions, for periods of typically a few ms; this is called "eventual consistency." See http://www.allthingsdistributed.com/2008/12/eventually_consistent.html and http://queue.acm.org/detail.cfm?id=1466448 for more.
See below about consistent backups.

Repairing missing or inconsistent data

Cassandra repairs data in two ways:
  1. Read Repair: every time a read is performed, Cassandra compares the versions at each replica (in the background, if a low consistency was requested by the reader to minimize latency), and the newest version is sent to any out-of-date replicas.
  2. Anti-Entropy: when nodetool repair is run, Cassandra computes a Merkle tree for each range of data on that node, and compares it with the versions on other replicas, to catch any out of sync data that hasn't been read recently. This is intended to be run infrequently (e.g., weekly) since computing the Merkle tree is relatively expensive in disk i/o and CPU, since it scans ALL the data on the machine (but it is is very network efficient).
Running nodetool repair: Like all nodetool operations in 0.7, repair is blocking: it will wait for the repair to finish and then exit. This may take a long time on large data sets.
It is safe to run repair against multiple machines at the same time, but to minimize the impact on your application workload it is recommended to wait for it to complete on one node before invoking it against the next.

Frequency of nodetool repair

Unless your application performs no deletes, it is vital that production clusters run nodetool repair periodically on all nodes in the cluster. The hard requirement for repair frequency is the value used for GCGraceSeconds (see DistributedDeletes). Running nodetool repair often enough to guarantee that all nodes have performed a repair in a given period GCGraceSeconds long, ensures that deletes are not "forgotten" in the cluster.
Consider how to schedule your repairs. A repair causes additional disk and CPU activity on the nodes participating in the repair, and it will typically be a good idea to spread repairs out over time so as to minimize the chances of repairs running concurrently on many nodes.

Dealing with the consequences of nodetool repair not running within GCGraceSeconds

If nodetool repair has not been run often enough to the point that GCGraceSeconds has passed, you risk forgotten deletes (see DistributedDeletes). In addition to data popping up that has been deleted, you may see inconsistencies in data return from different nodes that will not self-heal by read-repair or further nodetool repair. Some further details on this latter effect is documented in CASSANDRA-1316.
There are at least three ways to deal with this scenario.
  1. Treat the node in question as failed, and replace it as described further below.
  2. To minimize the amount of forgotten deletes, first increase GCGraceSeconds across the cluster (rolling restart required), perform a full repair on all nodes, and then change GCRaceSeconds back again. This has the advantage of ensuring tombstones spread as much as possible, minimizing the amount of data that may "pop back up" (forgotten delete).
  3. Yet another option, that will result in more forgotten deletes than the previous suggestion but is easier to do, is to ensure 'nodetool repair' has been run on all nodes, and then perform a compaction to expire toombstones. Following this, read-repair and regular nodetool repair should cause the cluster to converge.

Handling failure

If a node goes down and comes back up, the ordinary repair mechanisms will be adequate to deal with any inconsistent data. Remember though that if a node misses updates and is not repaired for longer than your configured GCGraceSeconds (default: 10 days), it could have missed remove operations permanently. Unless your application performs no removes, you should wipe its data directory, re-bootstrap it, and removetoken its old entry in the ring (see below).
If a node goes down entirely, then you have two options:
  1. (Recommended approach) Bring up the replacement node with a new IP address, Set initial token to (failure node's token) - 1 and AutoBootstrap set to true in cassandra.yaml (storage-conf.xml for 0.6 or earlier). This will place the replacement node in front of the failure node. Then the bootstrap process begins. While this process runs, the node will not receive reads until finished. Once this process is finished on the replacement node, run nodetool removetoken once, supplying the token of the dead node, and nodetool cleanup on each node. You can obtain the dead node's token by running nodetool ring on any live node, unless there was some kind of outage, and the others came up but not the down one -- in that case, you can retrieve the token from the live nodes' system tables.
  2. (Alternative approach) Bring up a replacement node with the same IP and token as the old, and run nodetool repair. Until the repair process is complete, clients reading only from this node may get no data back. Using a higher ConsistencyLevel on reads will avoid this.
The reason why you run nodetool cleanup on all live nodes is to remove old Hinted Handoff writes stored for the dead node.

Backing up data

Cassandra can snapshot data while online using nodetool snapshot. You can then back up those snapshots using any desired system, although leaving them where they are is probably the option that makes the most sense on large clusters. nodetool snapshot triggers a node-wide flush, so all data written before the execution of the snapshot command is contained within the snapshot.
With some combinations of operating system/jvm you may receive an error related to the inability to create a process during the snapshotting, such as this on Linux
Exception in thread "main" java.io.IOException: Cannot run program "ln": java.io.IOException: error=12, Cannot allocate memory
This is caused by the operating system trying to allocate the child "ln" process a memory space as large as the parent process (the cassandra server), even though it's not going to use it. So if you have a machine with 8GB of RAM and no swap, and you gave 6GB to the cassandra server, it will fail during this because the operating system wants 12 GB of virtual memory before allowing you to create the process.
This error can be worked around by either :
  • dropping the jna.jar file into Cassandra's lib folder (requires at least Cassandra 0.6.6)
  • creating a swap file, snapshotting, removing swap file
  • turning on "memory overcommit"
To restore a snapshot:
  1. shut down the node
  2. clear out the old commitlog and sstables
  3. move the sstables from the snapshot location to the live data directory.

Consistent backups

You can get an eventually consistent backup by snapshotting all node; no individual node's backup is guaranteed to be consistent but if you restore from that snapshot then clients will get eventually consistent behavior as usual.
There is no such thing as a consistent view of the data in the strict sense, except in the trivial case of writes with consistency level = ALL.

Import / export

As an alternative to taking snapshots it's possible to export SSTables to JSON format using the bin/sstable2json command:
Usage: sstable2json [-f outfile] <sstable> [-k key [-k key [...]]]
bin/sstable2json accepts as a required argument, the full path to an SSTable data file, (files ending in -Data.db), and an optional argument for an output file (by default, output is written to stdout). You can also pass the names of specific keys using the -k argument to limit what is exported.
Note: If you are not running the exporter on in-place SSTables, there are a couple of things to keep in mind.
  1. The corresponding configuration must be present (same as it would be to run a node).
  2. SSTables are expected to be in a directory named for the keyspace (same as they would be on a production node).
JSON exported SSTables can be "imported" to create new SSTables using bin/json2sstable:
Usage: json2sstable -K keyspace -c column_family <json> <sstable>
bin/json2sstable takes arguments for keyspace and column family names, and full paths for the JSON input file and the destination SSTable file name.
You can also import pre-serialized rows of data using the BinaryMemtable interface. This is useful for importing via Hadoop or another source where you want to do some preprocessing of the data to import.
NOTE: Starting with version 0.7, json2sstable and sstable2json must be run in such a way that the schema can be loaded from system tables. This means that cassandra.yaml must be found in the classpath and refer to valid storage directories.


Running nodetool cfstats can provide an overview of each Column Family, and important metrics to graph your cluster. Cassandra also exposes internal metrics as JMX data. This is a common standard in the JVM world; OpenNMS, Nagios, and Munin at least offer some level of JMX support. For a non-stupid JMX plugin for Munin check out https://github.com/tcurdt/jmx2munin The specifics of the JMX Interface are documented at JmxInterface.
Some folks prefer having to deal with non-jmx clients, there is a JMX-to-REST bridge available at http://code.google.com/p/polarrose-jmx-rest-bridge/ Bridging to SNMP is a bit more work but can be done with https://github.com/tcurdt/jmx2snmp
Important metrics to watch on a per-Column Family basis would be: Read Count, Read Latency, Write Count and Write Latency. Pending Tasks tell you if things are backing up. These metrics can also be exposed using any JMX client such as jconsole. (See also http://simplygenius.com/2010/08/jconsole-via-socks-ssh-tunnel.html for how to proxy JConsole to firewalled machines.)
You can also use jconsole, and the MBeans tab to look at PendingTasks for thread pools. If you see one particular thread backing up, this can give you an indication of a problem. One example would be ROW-MUTATION-STAGE indicating that write requests are arriving faster than they can be handled. A more subtle example is the FLUSH stages: if these start backing up, cassandra is accepting writes into memory fast enough, but the sort-and-write-to-disk stages are falling behind.
If you are seeing a lot of tasks being built up, your hardware or configuration tuning is probably the bottleneck.
Running nodetool tpstats will dump all of those threads to console if you don't want to use jconsole. Example:
Pool Name                    Active   Pending      Completed
FILEUTILS-DELETE-POOL             0         0            119
MESSAGING-SERVICE-POOL            3         4       81594002
STREAM-STAGE                      0         0              3
RESPONSE-STAGE                    0         0       48353537
ROW-READ-STAGE                    0         0          13754
LB-OPERATIONS                     0         0              0
COMMITLOG                         1         0       78080398
GMFD                              0         0        1091592
MESSAGE-DESERIALIZER-POOL         0         0      126022919
LB-TARGET                         0         0              0
CONSISTENCY-MANAGER               0         0           2899
ROW-MUTATION-STAGE                1         2       81719765
MESSAGE-STREAMING-POOL            0         0            129
LOAD-BALANCER-STAGE               0         0              0
FLUSH-SORTER-POOL                 0         0            218
MEMTABLE-POST-FLUSHER             0         0            218
COMPACTION-POOL                   0         0            464
FLUSH-WRITER-POOL                 0         0            218
HINTED-HANDOFF-POOL               0         0            154

Monitoring with MX4J

mx4j provides an HTML and HTTP interface to JMX. Starting from version 0.7.0 cassandra lets you hook up mx4j very easily. To enable mx4j on a Cassandra node:
  • Download mx4j-tools.jar from http://mx4j.sourceforge.net/
  • Add mx4j-tools.jar to the classpath (e.g. under lib/)
  • Start cassandra
  • In the log you should see a message such as HttpAtapter started on port 8081
  • To choose a different port (8081 is the default) or a different listen address ( is not the default) edit conf/cassandra-env.sh and uncomment #MX4J_ADDRESS="-Dmx4jaddress=" and #MX4J_PORT="-Dmx4jport=8081"
Now browse to http://cassandra:8081/ and use the HTML interface.
If you want XML then add &template=identity to the end of any URL, e.g. http://cassandra:8081/?&template=identity

No comments:

Post a Comment