Tuesday, June 28, 2011

Cassandra RandomPartitioner Vs OrderPreservingPartitioner


When building a Cassandra cluster, the “key” question (sorry, that’s weak) is whether to use the RandomPartitioner (RP), or the OrderPreservingPartitioner (OPP). These control how your data is distributed over your nodes. Once you have chosen your partitioner, you cannot change without wiping your data, so think carefully!
For Cassandra newbies, like me and my team of HBasers wanting to try a quick port of our project (more on why in another post) nailing the exact issues is quite daunting. So here is a quick summary.
What OPP gives you
Using OPP provides you with two obvious advantages over RP:
1. You can perform range slices. That is you can scan over ranges of your rows as though you were moving a cursor through a traditional index. For example, if you are using user ids as your keys, you could scan over the rows for users whose names begin with J e.g. jake, james, jamie etc
2. You can store real time full text indexes inside Cassandra, which are built using the aforementioned feature e.g. see Lucandra
3. If you screw up, you can scan over your data to recover/delete orphaned keys
***UPDATE*** Since v6 you *can* now scan your keys when using RP, although obviously not in any particular order. Typically you request a page of rows starting with the empty/”" key, and then use the apparently random end key from the page as the start key when you request another page. At the time of writing, this method only seems to work with KeyRange not TokenRing. If you are using Java to access Cassandra read the change log for v0.804 of Pelops.
Given that Web applications typically need/benefit from the above, the question is why would you *not* use OPP. The answer is a nuanced one about load balancing.
The problem with OPP
With both RP and OPP, by default Cassandra will tend to evenly distribute individual keys and their corresponding rows over the nodes in the cluster. The default algorithm is nice and simple: every time you add a new node, it will assign a range of keys to that node such that it takes responsibility for half the keys stored on the node that currently stores the most keys (more on options for overriding the default behaviour later).
The nuance is, that this simple default algorithm will tend to lead to good load balancing when RP is used, but not necessarily when OPP is used.
The reason is that although the algorithm succeeds in assigning key ranges such that as your cluster scales nodes receive roughly similar numbers of keys, with OPP on any given node those keys are unlikely to be drawn equally from the different column families present within your database…
If the distribution of keys used by individual column families is different, their sets of keys will not fall evenly across the ranges assigned to nodes. Thus nodes will end up storing preponderances of keys (and the associated data) corresponding to one column family or another. If as is likely column families store differing quantities of data with their keys, or store data accessed according to differing usage patterns, then some nodes will end up with disproportionately more data than others, or serving more “hot” data than others. <yikes!>
By contrast, when using RP the distribution of the keys occuring within individual column families does not matter. This is because an MD5 hash of keys is used as the “real” key by the system for the purposes of locating the key and data on nodes (the MD5 hashes randomly map any input key to a point in the 0..2**127 range). The result is that the keys from each individual column family are spread evenly across the ranges/nodes, meaning that data and access corresponding to those column families is evenly distributed across the cluster.
If you must have OPP
You may quite reasonably feel that you must have the range scan features that come with OPP, for example because you want to use Lucandra. The question then becomes how you can you ameliorate the aforementioned problems with load balancing.
The best you can do, is to identify the data upon which you do not need to perform range scans. This data can then be randomly distributed across your cluster using a simple idiom where the key is actually written as <MD5(ROWKEY)>.<ROWKEY>
But be clear, the items whose keys must be undecorated (because you wish to perform range scans over them), may still not map evenly onto the key ranges held by the nodes. The only recourse you have then, is to consider manually specifying the key ranges assigned to nodes. This is typically done when you bootstrap a new node, but you can also rebalance an existing cluster by simply decomissioning nodes, deleting their data, and then bootstrapping them back in. To do this safely, you obviously have to do this one at a time, but then I’m sure I didn’t have to tell you that…
You can see where this is going now right? You’ve just made a whole load of work for yourself, and anyway, even if you have the time, if you have lots of different column families with widely differing key distributions then getting load balancing right is going to be a nightmare.
This is the basic reason that fully seasoned Cassandra heads, in my experience, seem to prefer RD *unless* a mono use setup is proposed, for example where a cluster is used simply to store a full-text index with Lucandra.
If you have a database with a seriously heterogeneous set of column families, and need range scans, you might now be thinking you should actually be using HBase, which is designed for this. That would not be a bad choice (!), but there are good reasons for hanging with Cassandra if you can, which I will cover in a future post. Read on…
If you must use RP (very likely)
So having delved a little more deeply into the implications of OPP, you decide you really should go with RP. But, what to do with those indexes you need?
Well, first of all there is a really simple if brutal solution: simply store your index inside a single column family row as a series of columns. Since Cassandra can in principle cope with millions of columns, this is perfectly possible. Although it is true each index won’t be distributed across your whole cluster, the load will at the least be distributed across the nodes holding the replicas. If you use a typical replication factor (RF) of 3 the load associated with each index will be shared by 3 nodes etc.
In the vast majority of cases, this will be enough, and it will be sufficient that the rest of your data is properly balanced across your cluster.
But, I hear you saying, this is too brutal. Your index is too massive to fit on 3 nodes, is extremely hot and this just won’t work. You moved to Cassandra because you want your load distributed across your entire cluster. Period.
This is a perfectly reasonably point of view.
The only solution in this case is to build an index system over the top of the simple hashmap provided. We are taking this approach, and it will be elaborated with some sample code in a later post.
Basic indexing strategy for RP
For those that need to know the basic strategy now, here it is: you need to start off with the simple approach where you store your entire index using columns under a single key. As the number of columns grows past some threshold you define, the columns should be split such that half the entries are migrated to a new key/row. Thus the index is split across the cluster evenly.
Each range can be stored under a key named in a predictable way, for example <INDEX>.<SPLIT NO.> The start and end index entries stored in each split should themselves be stored in a dedicated column family that is used to record index meta information using the same key name, ensuring that the meta information is also distributed.
You can then progressively test the existence of splits simply by attempting to open the key for the meta that would be used to describe the split. If you can retrieve the meta information, you know that the split also exists. It won’t be necessary to cache this information to make the process reasonably performant – Cassandra already caches data in memory, and also uses Bloom filters to determine whether or not a requested row exists (Bloom filters enable a Cassandra node to rapidly determine whether it holds a key without traversing its list of keys).
There you have it, an index offering range scans fully distributed over your cluster!
Full text search sanity check
Implementing a full text index will of course involve more work than a simple left-side/ISAM style index, although the principles are the same. Given the existence of Lucandra though, I would suggest that before proceeding to create your full text index using the described approach, you first examine another possibility: running your full text searches off a dedicated cluster.
If you are running in the cloud, for example on EC2 or Rackspace Cloud, you can start your dedicated full text search cluster at low cost on small instances that can be scaled up if necessary later. Otherwise, consider virtualization or configuring Cassandra to run two clusters in parallel on the same nodes (more on this possibility in a later post).
The beauty of open source is that many problems have already been solved for you, and Lucandra is too good an opportunity to miss is you need full text search on Cassandra.

Cassandra Hector Api - Querying by timestamp

You can do this with SliceQuery provided by the hector api

Lets take up a example where you want the results to be fetched based on time of posting .

Then here are the simple steps you need to take .

Create the column family with type Long . we can use the System.currentTimeInMillis() as the column name .

create column family MyCF with column_type = 'Standard' and comparator = 'LongType';

You can use the SliceQuery to get the records within a particular time rang , or most recent , oldest sorted ascending or descending withing range and you can also limit the number of returned results .

Here is the code to do the querying

Lets try out the example with a SuperColumn as a Long

SuperSliceQuery<String, Long, String, String> query = HFactory.createSuperSliceQuery(keyspace, ss, 
LongSerializer.get(), ss, ss);

Here the SuperColumnSerializer is LongSerializer.get()
similarly for a Column as Long , you need to make the Column Name Serializer as LoginSerializer.get()


query.setColumnFamily(MY_CF);
query.setKey(myRowKey);



if(sortOrder == 0)   // Specifying a startPoint and endPoint with reversing = false and limit as number of result to be returned
    query.setRange(startTimePoint,endTimePoint, false, limit);
else if(sortOrder == 1)    // Specifying a startPoint and endPoint with reversing = true and limit as number of result to be returned, note since we want the results to be descending we specify end to start , it should be the other way round for reverseOrder as you can see below
    query.setRange(endTimePoint,startTimePoint, true, limit);


//    query.setRange(long startTime,long endTime, boolean reverseOrder, int limit);




Similarly we can do this for Standard type ColumnFamily using SliceQuery instead of SuperSliceQuery

Hope this was useful . Please leave your comments , if this was useful for you.




exception connecting to localhost/9160. reason: connection refused

Connection refused to cassandra cli mode . 


goto the root directory of cassandra :


bin/cassandra -- host {host-ip} --port {9160}


if you are having trouble with this , check your {cassandra-root-directory}/conf/cassandra.yaml


the thrift ip or rpc_address is the address used as the host-ip for connecting to cli .
make it your local IP and if you are having trouble connecting using the port 9160 , try changing the rpc_port to 8070 and now try connecting to cassandra-cli mode using the command 


bin/cassandra --host {local-IP} --port 8070




This worked for me, hope it works for you too .
Provide your commments if you find this useful 

Cassandra Hector - All Host Pools Marked Down


2011-02-22 18:59:09,894 [main] ERROR (CassandraService.java:2023) -
me.prettyprint.hector.api.exceptions.HectorException: All host pools
marked down. Retry burden pushed out to client.
2011-02-22 18:59:09,894 [main] ERROR (CassandraService.java:2023) -
me.prettyprint.hector.api.exceptions.HectorException: All host pools
marked down. Retry burden pushed out to client.
2011-02-22 18:59:09,895 [main] ERROR (CassandraService.java:2023) -
me.prettyprint.hector.api.exceptions.HectorException: All host pools
marked down. Retry burden pushed out to client.
2011-02-22 18:59:09,895 [main] ERROR (CassandraService.java:2023) -
me.prettyprint.hector.api.exceptions.HectorException: All host pools
marked down. Retry burden pushed out to client.
2011-02-22 18:59:09,895 [main] ERROR (CassandraService.java:2023) -
me.prettyprint.hector.api.exceptions.HectorException: All host pools
marked down. Retry burden pushed out to client.
2011-02-22 18:59:09,895 [main] ERROR (CassandraService.java:2023) -
me.prettyprint.hector.api.exceptions.HectorException: All host pools
marked down. Retry burden pushed out to client.
2011-02-22 18:59:09,895 [main] ERROR (CassandraService.java:2099) -
me.prettyprint.hector.api.exceptions.HectorException: All host pools
marked down. Retry burden pushed out to client.
me.prettyprint.hector.api.exceptions.HectorException: All host pools
marked down. Retry burden pushed out to client.
me.prettyprint.hector.api.exceptions.HectorException: All host pools
marked down. Retry burden pushed out to client.
2011-02-22 18:59:09,895 [main] ERROR (CassandraService.java:2099) -
me.prettyprint.hector.api.exceptions.HectorException: All host pools
marked down. Retry burden pushed out to client.
2011-02-22 18:59:09,896 [main] ERROR (CassandraService.java:582) -
me.prettyprint.hector.api.exceptions.HectorException: All host pools
marked down. Retry burden pushed out to client.
2011-02-22 18:59:09,896 [main] ERROR (CassandraService.java:582) -
me.prettyprint.hector.api.exceptions.HectorException: All host pools
marked down. Retry burden pushed out to client.
2011-02-22 18:59:09,896 [main] ERROR (CassandraService.java:2099) -
me.prettyprint.hector.api.exceptions.HectorException: All host pools
marked down. Retry burden pushed out to client.
2011-02-22 18:59:09,896 [main] ERROR (CassandraService.java:2099) -
me.prettyprint.hector.api.exceptions.HectorException: All host pools
marked down. Retry burden pushed out to client.
This means no connections to the cluster were established. Can you
verify connectability from the third host to the cluster? Might there
be a firewall adjustment or similar required?

Cassandra Hector throwing error even after Cassandra is up again


Hector state when a cassandra node goes down

Hi,

I am using hector -0.7.0-26 in my application persisting data over the
network to Cassandra cluster of 3 nodes.
Everything works well until i manually bring down one of the cassandra
nodes. i need to bring down a cassandra node as part of doing negative
testing on my application. After the node goes down, i see hector
exceptions on the client side and it seems that hector is trying to
connect to the downed host.
Below is the exception trace and this is repeatedly seen in the logs.
My question is, when hector sees that the node is down, doesn't hector
close the connections to the node and stop trying again until it
detects the node to be up again?
What should be done at the client side (while using Hector) to ensure
that hector cleans up the connections to a dead node and stops trying
to reuse it.

[pool-1-thread-1] ERROR (HThriftClient.java:88) - Unable to open
transport to asp.corp.apple.com(17.108.122.70):9162
org.apache.thrift.transport.TTransportException:
java.net.ConnectException: Connection refused
 at org.apache.thrift.transport.TSocket.open(TSocket.java:185)
 at
org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:
81)
 at
me.prettyprint.cassandra.connection.HThriftClient.open(HThriftClient.java:
84)
 at me.prettyprint.cassandra.connection.CassandraHostRetryService
$RetryRunner.verifyConnection(CassandraHostRetryService.java:114)
 at me.prettyprint.cassandra.connection.CassandraHostRetryService
$RetryRunner.run(CassandraHostRetryService.java:94)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
441)
 at java.util.concurrent.FutureTask
$Sync.innerRunAndReset(FutureTask.java:317)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
 at java.util.concurrent.ScheduledThreadPoolExecutor
$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
 at java.util.concurrent.ScheduledThreadPoolExecutor
$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
 at java.util.concurrent.ScheduledThreadPoolExecutor
$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
 at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
 at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
 at java.lang.Thread.run(Thread.java:619)
Caused by: java.net.ConnectException: Connection refused
 at java.net.PlainSocketImpl.socketConnect(Native Method)
 at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:333)
 at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:
195)
 at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:182)
 at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:366)
 at java.net.Socket.connect(Socket.java:529)
 at org.apache.thrift.transport.TSocket.open(TSocket.java:180)
 ... 13 more

Actually this is working as anticipated. The following line from the
stack trace:

me.prettyprint.cassandra.connection.HThriftClient.open(HThriftClient.java:

indicates this is the host retry service (running in a background
thread) attempting to connect to the downed host every 10 seconds (by
default). What was just fixed in master and tip of 0.7.0 was an issue
with incorrect handling of UnavailableException on a consistency level
failure. This will be released at some point today (marked as
0.7.0-28).

On looking at the above trace again, that error could probably be more
clear about what is going on. I'll clean that up today as well.

Cassandra Hector Failover


Hector Dev's have added a very simple load balancing feature, as well as improved failover behavior to Hector. Hector is a Java Cassandra client, to read more about it please see my previous post Hector – a Java Cassandra client.
In version 0.5.0-6 I added poor-man’s load balancing as well as improved failover behavior.
The interface CassandraClientPool used to have this method for obtaining clients:
/**
 * Borrows a client from the pool defined by url:port
 * @param url
 * @param port
 * @return
 */
CassandraClient borrowClient(String url, int port)
    throws IllegalStateException, PoolExhaustedException, Exception;
Now with the added LB and failover it has:
/**
 * Borrow a load-balanced client, a random client from the array of given client addresses.
 *
 * This method is typically used to allow load balancing b/w the list of given client URLs. The
 * method will return a random client from the array of the given url:port pairs.
 * The method will try connecting each host in the list and will only stop when there's one
 * successful connection, so in that sense it's also useful for failover.
 *
 * @param clientUrls An array of "url:port" cassandra client addresses.
 *
 * @return A randomly chosen client from the array of clientUrls.
 * @throws Exception
 */
CassandraClient borrowClient(String[] clientUrls) throws Exception;
And usage looks like that:
// Get a connection to any of the hosts cas1, ca2 or cas3
CassandraClient client = pool.borrowClient(new String[] {"cas1:9160", "cas2:9160", "cas3:9160"});
So, when calling borrowClient(String[]) the method randomly chooses any of the clients in the array and connects to it. That’s what I call poor man’s load balancing, just plain dumb random, not real load balancing. By all means, true load balancing which takes into account performance measurements such as response time and throughput is infinitely better than the plain random selection I’m employing here and in my opinion should be left out for your ops folks to deal with and not to the program, however, if you only need a very simplistic approach of random selection, then this method may suite your needs.
A nice side effect of using this method is improved failover. In previous versions hector implemented failover, but in order to find out about the ring structure it had to connect to at least one host in the ring first and query it to learn about the rest. The result was that if a new connection is made and it’s so unfortunate that this new connections is made to unavailable host, then this new client cannot connect to the host to learn about other live hosts so it fails right away. With this new method which sends an array of hosts the client keeps connecting to hosts in the list in random order until it finds one that’s up. In the example above the client may choose to connect to cas2 first; if cas2 is down it’ll try to connect to (say) cas3 and if cas3 is also down it’ll try to connect to cas1; only if all three hosts are down will it give up and return an error. Failing to connect to hosts is considered an error, but a recoverable error, so it’s transparent to the client of hector but is reported to JMX and has its own special counter (RecoverableLoadBalancedConnectErrors).

Cassandra Data Model Pitfalls


For me, the main thing is a decision whether to use the OrderedPartitioner or RandomPartitioner.
If you use the RandomPartitioner, range scans are not possible. This means that you must know the exact key for any activity, INCLUDING CLEANING UP OLD DATA.
So if you've got a lot of churn, unless you have some magic way of knowing exactly which keys you've inserted stuff for, using the random partitioner you can easily "lose" stuff, which causes a disc space leak and will eventually consume all storage.
On the other hand, you can ask the ordered partitioner "what keys do I have in Column Family X between A and B" ? - and it'll tell you. You can then clean them up.
However, there is a downside as well. As Cassandra doesn't do automatic load balancing, if you use the ordered partitioner, in all likelihood all your data will end up in just one or two nodes and none in the others, which means you'll waste resources.
I don't have any easy answer for this, except you can get "best of both worlds" in some cases by putting a short hash value (of something you can enumerate easily from other data sources) on the beginning of your keys - for example a 16-bit hex hash of the user ID - which will give you 4 hex digits, followed by whatever the key is you really wanted to use.
Then if you had a list of recently-deleted users, you can just hash their IDs and range scan to clean up anything related to them.
The next tricky bit is secondary indexes - Cassandra doesn't have any - so if you need to look up X by Y, you need to insert the data under both keys, or have a pointer. Likewise, these pointers may need to be cleaned up when the thing they point to doesn't exist, but there's no easy way of querying stuff on this basis, so your app needs to Just Remember.
And application bugs may leave orphaned keys that you've forgotten about, and you'll have no way of easily detecting them, unless you write some garbage collector which periodically scans every single key in the db (this is going to take a while - but you can do it in chunks) to check for ones which aren't needed any more.
None of this is based on real usage, just what I've figured out during research. We don't use Cassandra in production.

Cassandra Load Balancing


Modifying Schema on a Live Cluster

This page discusses features available in 0.7.

Under the Hood

A new system table called definitions keeps track of two things: keyspace definitions (SCHEMA_CF) and keyspace changes (MIGRATIONS_CF). TimeUUIDs are used throughout to match migrations up with schema and vice-versa.

Keyspace Definitions (SCHEMA_CF)

The current set of keyspace definitions are stored in a single row, one keyspace per column with a TimeUUID as the row key (also serves as version identifier), keyspace name as column name, and definition serialization as the column value. There exists a special row, keyed by "Last Migration" that contains a single column indicating the current schema version UUID. This makes it easy to look up the version and then retrieve it.

Migrations (MIGRATIONS_CF)

MIGRATIONS_CF tracks the individual modifications (add, drop, rename) that are made to the schema. It consists of a single row keyed by "Migrations Key" with one column per migration. Each column has the migration version UUID as its name, with the serialized migration as its value.
migrations.gif

Operations

Client Operation

  • Add column family or keyspace
  • Drop column family or keyspace
  • Rename column family or keyspace
These are all executed via the Thrift API. It is expected that you have ALL access if you are using security. For rename and drop operations the client will block until all associated files are renamed or deleted.

Server Migration Process

Applying a migration consists of the following steps:
  1. Generate the migration, which includes a new version UUID.
    1. DROPs only: snapshot the data that is going away.
  2. Update SCHEMA_CF with a new schema row.
  3. Update MIGRATION_CF by appending a migration column.
  4. Update the "Last Migration" row in SCHEMA_CF.
  5. Flush the definitions table.
  6. Update runtime data structures (create directories, do deletions, etc.)

Handling Failure

A node can fail during any step of the update process. Here is an examination of what will happen if a node fails after each part of the update process (see Server Migration Process above).
  1. Nothing has been applied. Update fails outright.
    1. Same. You will have an extra snapshot though.
  2. Extra data exists in SCHEMA_CF but will be ignored because "Last Migration" was not updated.
  3. Extra data exists in SCHEMA_CF and MIGRATION_CF but will be ignored because "Last Migration" was not updated.
  4. Broken: commit log will not be replayed until *after* schemas are loaded on restart. This means that the "Last Migration" will be read, but will not be able to be loaded and applied.
  5. Startup will happen normally.
  6. Startup will happen normally.
If a node crashes during a migration, chances are you will have to do some manual cleanup. For example, if a node cashes after steps 4 or 5 of a DROP migration, you will need to manually delete the data files. (Not deleting them does no harm unless you 'recreate' the same CF via ADD later on. Then you have an instant database.)

Starting Up

When a node starts up, it checks SCHEMA_CF to find out the latest schema version it has. If it finds nothing (as would happen with a new cluster), it loads nothing and logs a warning. Otherwise, it uses the uuid it just read in to load the correct row from SCHEMA_CF. That row is deserialized into one or more keyspace definitions which are then loaded in a manner similar to the load-from-xml approach used in the past.
At the same time, the node incorporates its schema version UUID into the gossip digests it sends to other nodes. It may be the case that this node does not have the latest schema definitions (as a result of network partition, bootstrapping a new node, or any other reason you can think of). When a version mismatch is detected the definition promulgation mechanism described next is invoked.

Definition Promulgation

Definition promulgation consists of two phases: announce and push. announce is a way for node A to declare to node B "this is the schema version I have". If the versions are equal, the message is ignored. If A is newer, B responds with an announce to A (this functions as a request for updates). If A is older, B responds with a push containing all the migrations from B that A doesn't have.
When a schema update originates from the client (Thrift), gossip promulgation is bypassed and this announce-announce-push approach is used to push migrations to other nodes.

New Cluster (Fresh 0.7)

For new clusters, things will work best if you start with one node and apply migrations using Thrift until you get the schema you want. Then bring new nodes online and they will pull migrations from the first node (or each other in a large cluster).
Alternatively, you could then shut down the first node and manually copy its SCHEMA_CF and MIGRATIONS_CF to each new node in the cluster.
The simplest method of applying these schema changes is with bin/cassandra-cli. You can either do this interactively, or place the commands in a file and apply them in batch mode (type help and help <command> to see the available commands). For example:
$ cat schema.txt
/* Create a new keyspace */
create keyspace Keyspace1 with replication_factor = 3 and placement_strategy = 'org.apache.cassandra.locator.RackUnawareStrategy';

/* Switch to the new keyspace */
use Keyspace1;

/* Create new column families */
create column family Standard1 with column_type = 'Standard' and comparator = 'BytesType';
create column family Standard2 with column_type = 'Standard' and comparator = 'UTF8Type' and rows_cached = 10000;
$ bin/cassandra-cli --host localhost --batch < schema.txt

Existing Cluster (Upgrade from 0.6)

To provide some backwards compatibility, we've provided a JMX method in the StorageServiceMBean that can be used to manually load schema definitions from storage-conf.xml. This is a one-shot operation though, and will only work on a system that contains no existing migrations. If you are upgrading a cluster, you will probably only have to do this for one node (a seed). Gossip will take care of promulgating the changes to the rest of the nodes as they come online.
For those who dont know how to do it (like me):
ps aux | grep cassandra # get pid of cassandra
jconsole PID
MBeans -> org.apache.cassandra.db -> StorageService -> Operations -> loadSchemaFromYAML
Lastly, there is a system tool that can poke the same JMX method without having to remember its location:
bin/schematool HOST PORT import

Concurrency

It is entirely possible and expected that a node will receive migration pushes from multiple nodes. Because of this, all migrations are applied on a single-threaded stage and versions are checked throughout to make sure that no migration is applied twice, and no migration is applied out of sync.
Each migration knows the version UUID of the migration that immediately precedes it. If a node is asked to apply a migration and its current version UUID does not match the last version UUID of the migration, the migration is discarded.
One weakness of this model is that it is vulnerable if a new update starts before another update is promulgated to all live nodes--only one migration can be active within a cluster at any time. One way to get around this is to choose one node and only initiate migrations through it.


Ring management

Each Cassandra server [node] is assigned a unique Token that determines what keys it is the first replica for. If you sort all nodes' Tokens, the Range of keys each is responsible for is (PreviousToken, MyToken], that is, from the previous token (exclusive) to the node's token (inclusive). The machine with the lowest Token gets both all keys less than that token, and all keys greater than the largest Token; this is called a "wrapping Range."
(Note that there is nothing special about being the "primary" replica, in the sense of being a point of failure.)
When the RandomPartitioner is used, Tokens are integers from 0 to 2**127. Keys are converted to this range by MD5 hashing for comparison with Tokens. (Thus, keys are always convertible to Tokens, but the reverse is not always true.)

Token selection

Using a strong hash function means RandomPartitioner keys will, on average, be evenly spread across the Token space, but you can still have imbalances if your Tokens do not divide up the range evenly, so you should specify InitialToken to your first nodes as i * (2**127 / N) for i = 0 .. N-1. In Cassandra 0.7, you should specify initial_token in cassandra.yaml.
With NetworkTopologyStrategy, you should calculate the tokens the nodes in each DC independently. Tokens still needed to be unique, so you can add 1 to the tokens in the 2nd DC, add 2 in the 3rd, and so on. Thus, for a 4-node cluster in 2 datacenters, you would have
DC1
node 1 = 0
node 2 = 85070591730234615865843651857942052864

DC2
node 3 = 1
node 4 = 85070591730234615865843651857942052865
If you happen to have the same number of nodes in each data center, you can also alternate data centers when assigning tokens:
[DC1] node 1 = 0
[DC2] node 2 = 42535295865117307932921825928971026432
[DC1] node 3 = 85070591730234615865843651857942052864
[DC2] node 4 = 127605887595351923798765477786913079296
With order preserving partitioners, your key distribution will be application-dependent. You should still take your best guess at specifying initial tokens (guided by sampling actual data, if possible), but you will be more dependent on active load balancing (see below) and/or adding new nodes to hot spots.
Once data is placed on the cluster, the partitioner may not be changed without wiping and starting over.

Replication

A Cassandra cluster always divides up the key space into ranges delimited by Tokens as described above, but additional replica placement is customizable via IReplicaPlacementStrategy in the configuration file. The standard strategies are
  • RackUnawareStrategy: replicas are always placed on the next (in increasing Token order) N-1 nodes along the ring
  • RackAwareStrategy: replica 2 is placed in the first node along the ring the belongs in another data center than the first; the remaining N-2 replicas, if any, are placed on the first nodes along the ring in the same rack as the first
Note that with RackAwareStrategy, succeeding nodes along the ring should alternate data centers to avoid hot spots. For instance, if you have nodes A, B, C, and D in increasing Token order, and instead of alternating you place A and B in DC1, and C and D in DC2, then nodes C and A will have disproportionately more data on them because they will be the replica destination for every Token range in the other data center.
  • The corollary to this is, if you want to start with a single DC and add another later, when you add the second DC you should add as many nodes as you have in the first rather than adding a node or two at a time gradually.
Replication factor is not really intended to be changed in a live cluster either, but increasing it is conceptually simple: update the replication_factor from the CLI (see below), then run repair against each node in your cluster so that all the new replicas that are supposed to have the data, actually do.
Until repair is finished, you have 3 options:
  • read at ConsistencyLevel.QUORUM or ALL (depending on your existing replication factor) to make sure that a replica that actually has the data is consulted
  • continue reading at lower CL, accepting that some requests will fail (usually only the first for a given query, if ReadRepair is enabled)
  • take downtime while repair runs
The same options apply to changing replication strategy.
Reducing replication factor is easily done and only requires running cleanup afterwards to remove extra replicas.
To update the replication factor on a live cluster, forget about cassandra.yaml. Rather you want to use cassandra-cli:
  • update keyspace Keyspace1 with replication_factor = 3;

Network topology

Besides datacenters, you can also tell Cassandra which nodes are in the same rack within a datacenter. Cassandra will use this to route both reads and data movement for Range changes to the nearest replicas. This is configured by a user-pluggable EndpointSnitch class in the configuration file.
EndpointSnitch is related to, but distinct from, replication strategy itself: RackAwareStrategy needs a properly configured Snitch to place replicas correctly, but even absent a Strategy that cares about datacenters, the rest of Cassandra will still be location-sensitive.
There is an example of a custom Snitch implementation in http://svn.apache.org/repos/asf/cassandra/tags/cassandra-0.6.1/contrib/property_snitch/.

Range changes

Bootstrap

Adding new nodes is called "bootstrapping."
To bootstrap a node, turn AutoBootstrap on in the configuration file, and start it.
If you explicitly specify an InitialToken in the configuration, the new node will bootstrap to that position on the ring. Otherwise, it will pick a Token that will give it half the keys from the node with the most disk space used, that does not already have another node bootstrapping into its Range.
Important things to note:
  1. You should wait long enough for all the nodes in your cluster to become aware of the bootstrapping node via gossip before starting another bootstrap. The new node will log "Bootstrapping" when this is safe, 2 minutes after starting. (90s to make sure it has accurate load information, and 30s waiting for other nodes to start sending it inserts happening in its to-be-assumed part of the token ring.)
  2. Relating to point 1, one can only bootstrap N nodes at a time with automatic token picking, where N is the size of the existing cluster. If you need to more than double the size of your cluster, you have to wait for the first N nodes to finish until your cluster is size 2N before bootstrapping more nodes. So if your current cluster is 5 nodes and you want add 7 nodes, bootstrap 5 and let those finish before bootstrapping the last two.
  3. As a safety measure, Cassandra does not automatically remove data from nodes that "lose" part of their Token Range to a newly added node. Run nodetool cleanup on the source node(s) (neighboring nodes that shared the same subrange) when you are satisfied the new node is up and working. If you do not do this the old data will still be counted against the load on that node and future bootstrap attempts at choosing a location will be thrown off.
  4. When bootstrapping a new node, existing nodes have to divide the key space before beginning replication. This can take awhile, so be patient.
  5. During bootstrap, a node will drop the Thrift port and will not be accessible from nodetool.
  6. Bootstrap can take many hours when a lot of data is involved. See Streaming for how to monitor progress.
Cassandra is smart enough to transfer data from the nearest source node(s), if your EndpointSnitch is configured correctly. So, the new node doesn't need to be in the same datacenter as the primary replica for the Range it is bootstrapping into, as long as another replica is in the datacenter with the new one.
Bootstrap progress can be monitored using nodetool with the netstats argument (0.7 and later) or streams (Cassandra 0.6).
During bootstrap nodetool may report that the new node is not receiving nor sending any streams, this is because the sending node will copy out locally the data they will send to the receiving one, which can be seen in the sending node through the the "AntiCompacting... AntiCompacted" log messages.

Moving or Removing nodes

Removing nodes entirely

You can take a node out of the cluster with nodetool decommission to a live node, or nodetool removetoken (to any other machine) to remove a dead one. This will assign the ranges the old node was responsible for to other nodes, and replicate the appropriate data there. If decommission is used, the data will stream from the decommissioned node. If removetoken is used, the data will stream from the remaining replicas.
No data is removed automatically from the node being decommissioned, so if you want to put the node back into service at a different token on the ring, it should be removed manually.

Moving nodes

nodetool move: move the target node to a given Token. Moving is essentially a convenience over decommission + bootstrap.
As with bootstrap, see Streaming for how to monitor progress.

Load balancing

If you add nodes to your cluster your ring will be unbalanced and only way to get perfect balance is to compute new tokens for every node and assign them to each node manually by using nodetool move command.
Here's a python program which can be used to calculate new tokens for the nodes. There's more info on the subject at Ben Black's presentation at Cassandra Summit 2010. http://www.datastax.com/blog/slides-and-videos-cassandra-summit-2010
  • def tokens(nodes):
    • for x in xrange(nodes):
      • print 2 ** 127 / nodes * x
In versions of Cassandra 0.7.* and lower, there's also nodetool loadbalance: essentially a convenience over decommission + bootstrap, only instead of telling the target node where to move on the ring it will choose its location based on the same heuristic as Token selection on bootstrap. You should not use this as it doesn't rebalance the entire ring.
The status of move and balancing operations can be monitored using nodetool with the netstat argument. (Cassandra 0.6.* and lower use the streams argument).

Consistency

Cassandra allows clients to specify the desired consistency level on reads and writes. (See API.) If R + W > N, where R, W, and N are respectively the read replica count, the write replica count, and the replication factor, all client reads will see the most recent write. Otherwise, readers may see older versions, for periods of typically a few ms; this is called "eventual consistency." See http://www.allthingsdistributed.com/2008/12/eventually_consistent.html and http://queue.acm.org/detail.cfm?id=1466448 for more.
See below about consistent backups.

Repairing missing or inconsistent data

Cassandra repairs data in two ways:
  1. Read Repair: every time a read is performed, Cassandra compares the versions at each replica (in the background, if a low consistency was requested by the reader to minimize latency), and the newest version is sent to any out-of-date replicas.
  2. Anti-Entropy: when nodetool repair is run, Cassandra computes a Merkle tree for each range of data on that node, and compares it with the versions on other replicas, to catch any out of sync data that hasn't been read recently. This is intended to be run infrequently (e.g., weekly) since computing the Merkle tree is relatively expensive in disk i/o and CPU, since it scans ALL the data on the machine (but it is is very network efficient).
Running nodetool repair: Like all nodetool operations in 0.7, repair is blocking: it will wait for the repair to finish and then exit. This may take a long time on large data sets.
It is safe to run repair against multiple machines at the same time, but to minimize the impact on your application workload it is recommended to wait for it to complete on one node before invoking it against the next.

Frequency of nodetool repair

Unless your application performs no deletes, it is vital that production clusters run nodetool repair periodically on all nodes in the cluster. The hard requirement for repair frequency is the value used for GCGraceSeconds (see DistributedDeletes). Running nodetool repair often enough to guarantee that all nodes have performed a repair in a given period GCGraceSeconds long, ensures that deletes are not "forgotten" in the cluster.
Consider how to schedule your repairs. A repair causes additional disk and CPU activity on the nodes participating in the repair, and it will typically be a good idea to spread repairs out over time so as to minimize the chances of repairs running concurrently on many nodes.

Dealing with the consequences of nodetool repair not running within GCGraceSeconds

If nodetool repair has not been run often enough to the point that GCGraceSeconds has passed, you risk forgotten deletes (see DistributedDeletes). In addition to data popping up that has been deleted, you may see inconsistencies in data return from different nodes that will not self-heal by read-repair or further nodetool repair. Some further details on this latter effect is documented in CASSANDRA-1316.
There are at least three ways to deal with this scenario.
  1. Treat the node in question as failed, and replace it as described further below.
  2. To minimize the amount of forgotten deletes, first increase GCGraceSeconds across the cluster (rolling restart required), perform a full repair on all nodes, and then change GCRaceSeconds back again. This has the advantage of ensuring tombstones spread as much as possible, minimizing the amount of data that may "pop back up" (forgotten delete).
  3. Yet another option, that will result in more forgotten deletes than the previous suggestion but is easier to do, is to ensure 'nodetool repair' has been run on all nodes, and then perform a compaction to expire toombstones. Following this, read-repair and regular nodetool repair should cause the cluster to converge.

Handling failure

If a node goes down and comes back up, the ordinary repair mechanisms will be adequate to deal with any inconsistent data. Remember though that if a node misses updates and is not repaired for longer than your configured GCGraceSeconds (default: 10 days), it could have missed remove operations permanently. Unless your application performs no removes, you should wipe its data directory, re-bootstrap it, and removetoken its old entry in the ring (see below).
If a node goes down entirely, then you have two options:
  1. (Recommended approach) Bring up the replacement node with a new IP address, Set initial token to (failure node's token) - 1 and AutoBootstrap set to true in cassandra.yaml (storage-conf.xml for 0.6 or earlier). This will place the replacement node in front of the failure node. Then the bootstrap process begins. While this process runs, the node will not receive reads until finished. Once this process is finished on the replacement node, run nodetool removetoken once, supplying the token of the dead node, and nodetool cleanup on each node. You can obtain the dead node's token by running nodetool ring on any live node, unless there was some kind of outage, and the others came up but not the down one -- in that case, you can retrieve the token from the live nodes' system tables.
  2. (Alternative approach) Bring up a replacement node with the same IP and token as the old, and run nodetool repair. Until the repair process is complete, clients reading only from this node may get no data back. Using a higher ConsistencyLevel on reads will avoid this.
The reason why you run nodetool cleanup on all live nodes is to remove old Hinted Handoff writes stored for the dead node.

Backing up data

Cassandra can snapshot data while online using nodetool snapshot. You can then back up those snapshots using any desired system, although leaving them where they are is probably the option that makes the most sense on large clusters. nodetool snapshot triggers a node-wide flush, so all data written before the execution of the snapshot command is contained within the snapshot.
With some combinations of operating system/jvm you may receive an error related to the inability to create a process during the snapshotting, such as this on Linux
Exception in thread "main" java.io.IOException: Cannot run program "ln": java.io.IOException: error=12, Cannot allocate memory
This is caused by the operating system trying to allocate the child "ln" process a memory space as large as the parent process (the cassandra server), even though it's not going to use it. So if you have a machine with 8GB of RAM and no swap, and you gave 6GB to the cassandra server, it will fail during this because the operating system wants 12 GB of virtual memory before allowing you to create the process.
This error can be worked around by either :
  • dropping the jna.jar file into Cassandra's lib folder (requires at least Cassandra 0.6.6)
OR
  • creating a swap file, snapshotting, removing swap file
OR
  • turning on "memory overcommit"
To restore a snapshot:
  1. shut down the node
  2. clear out the old commitlog and sstables
  3. move the sstables from the snapshot location to the live data directory.

Consistent backups

You can get an eventually consistent backup by snapshotting all node; no individual node's backup is guaranteed to be consistent but if you restore from that snapshot then clients will get eventually consistent behavior as usual.
There is no such thing as a consistent view of the data in the strict sense, except in the trivial case of writes with consistency level = ALL.

Import / export

As an alternative to taking snapshots it's possible to export SSTables to JSON format using the bin/sstable2json command:
Usage: sstable2json [-f outfile] <sstable> [-k key [-k key [...]]]
bin/sstable2json accepts as a required argument, the full path to an SSTable data file, (files ending in -Data.db), and an optional argument for an output file (by default, output is written to stdout). You can also pass the names of specific keys using the -k argument to limit what is exported.
Note: If you are not running the exporter on in-place SSTables, there are a couple of things to keep in mind.
  1. The corresponding configuration must be present (same as it would be to run a node).
  2. SSTables are expected to be in a directory named for the keyspace (same as they would be on a production node).
JSON exported SSTables can be "imported" to create new SSTables using bin/json2sstable:
Usage: json2sstable -K keyspace -c column_family <json> <sstable>
bin/json2sstable takes arguments for keyspace and column family names, and full paths for the JSON input file and the destination SSTable file name.
You can also import pre-serialized rows of data using the BinaryMemtable interface. This is useful for importing via Hadoop or another source where you want to do some preprocessing of the data to import.
NOTE: Starting with version 0.7, json2sstable and sstable2json must be run in such a way that the schema can be loaded from system tables. This means that cassandra.yaml must be found in the classpath and refer to valid storage directories.

Monitoring

Running nodetool cfstats can provide an overview of each Column Family, and important metrics to graph your cluster. Cassandra also exposes internal metrics as JMX data. This is a common standard in the JVM world; OpenNMS, Nagios, and Munin at least offer some level of JMX support. For a non-stupid JMX plugin for Munin check out https://github.com/tcurdt/jmx2munin The specifics of the JMX Interface are documented at JmxInterface.
Some folks prefer having to deal with non-jmx clients, there is a JMX-to-REST bridge available at http://code.google.com/p/polarrose-jmx-rest-bridge/ Bridging to SNMP is a bit more work but can be done with https://github.com/tcurdt/jmx2snmp
Important metrics to watch on a per-Column Family basis would be: Read Count, Read Latency, Write Count and Write Latency. Pending Tasks tell you if things are backing up. These metrics can also be exposed using any JMX client such as jconsole. (See also http://simplygenius.com/2010/08/jconsole-via-socks-ssh-tunnel.html for how to proxy JConsole to firewalled machines.)
You can also use jconsole, and the MBeans tab to look at PendingTasks for thread pools. If you see one particular thread backing up, this can give you an indication of a problem. One example would be ROW-MUTATION-STAGE indicating that write requests are arriving faster than they can be handled. A more subtle example is the FLUSH stages: if these start backing up, cassandra is accepting writes into memory fast enough, but the sort-and-write-to-disk stages are falling behind.
If you are seeing a lot of tasks being built up, your hardware or configuration tuning is probably the bottleneck.
Running nodetool tpstats will dump all of those threads to console if you don't want to use jconsole. Example:
Pool Name                    Active   Pending      Completed
FILEUTILS-DELETE-POOL             0         0            119
MESSAGING-SERVICE-POOL            3         4       81594002
STREAM-STAGE                      0         0              3
RESPONSE-STAGE                    0         0       48353537
ROW-READ-STAGE                    0         0          13754
LB-OPERATIONS                     0         0              0
COMMITLOG                         1         0       78080398
GMFD                              0         0        1091592
MESSAGE-DESERIALIZER-POOL         0         0      126022919
LB-TARGET                         0         0              0
CONSISTENCY-MANAGER               0         0           2899
ROW-MUTATION-STAGE                1         2       81719765
MESSAGE-STREAMING-POOL            0         0            129
LOAD-BALANCER-STAGE               0         0              0
FLUSH-SORTER-POOL                 0         0            218
MEMTABLE-POST-FLUSHER             0         0            218
COMPACTION-POOL                   0         0            464
FLUSH-WRITER-POOL                 0         0            218
HINTED-HANDOFF-POOL               0         0            154

Monitoring with MX4J

mx4j provides an HTML and HTTP interface to JMX. Starting from version 0.7.0 cassandra lets you hook up mx4j very easily. To enable mx4j on a Cassandra node:
  • Download mx4j-tools.jar from http://mx4j.sourceforge.net/
  • Add mx4j-tools.jar to the classpath (e.g. under lib/)
  • Start cassandra
  • In the log you should see a message such as HttpAtapter started on port 8081
  • To choose a different port (8081 is the default) or a different listen address (0.0.0.0 is not the default) edit conf/cassandra-env.sh and uncomment #MX4J_ADDRESS="-Dmx4jaddress=0.0.0.0" and #MX4J_PORT="-Dmx4jport=8081"
Now browse to http://cassandra:8081/ and use the HTML interface.
If you want XML then add &template=identity to the end of any URL, e.g. http://cassandra:8081/?&template=identity


Monday, June 27, 2011

Cassandra error after installation in windows


Followed the steps to install the apache-cassandra latest build. Upon first startup (./cassandra -f), I get this:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/cassandra/thrift/CassandraDaemon Caused by: java.lang.ClassNotFoundException: org.apache.cassandra.thrift.CassandraDaemon at java.net.URLClassLoader$1.run(URLClassLoader.java:217) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:321) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) at java.lang.ClassLoader.loadClass(ClassLoader.java:266) at java.lang.ClassLoader.loadClassInternal(ClassLoader.java:334) Could not find the main class: org.apache.cassandra.thrift.CassandraDaemon. Program will exit.
I exported the JAVA_HOME path, etc. What am I doing wrong? I should note that I am on an Ubuntu Lucid machine.


The first thing you should do is setup CASSANDRA_HOME path to the Cassandra root directory. Try running cassandra cassandra -f and everything will run smooth. (Cassandra actually checks CASSANDRA_HOME environment variable to find the lib folder to run the deamon).