Monday, October 1, 2012

Installing Cassandra


Introduction


This document aims to provide a few easy to follow steps to take the first-time user from installation, to running single node Cassandra, and overview to configure multinode cluster. Cassandra is meant to run on a cluster of nodes, but will run equally well on a single machine. This is a handy way of getting familiar with the software while avoiding the complexities of a larger system.

Step 0: Prerequisites and connection to the community


Cassandra requires the most stable version of Java 1.6 you can deploy. For Sun's jvm, this means at least u19; u21 is better. Cassandra also runs on the IBM jvm, and should run on jrockit as well.
The best way to ensure you always have up to date information on the project, releases, stability, bugs, and features is to subscribe to the users mailing list (subscription required) and participate in the #cassandra channel on IRC.

Step 1: Download Cassandra Kit


  • Download links for the latest stable release can always be found on the website.
  • Users of Debian or Debian-based derivatives can install the latest stable release in package form, see DebianPackaging for details.
  • Users of RPM-based distributions can get packages from Datastax.
  • If you are interested in building Cassandra from source, please refer to How to Build page.
For more details about misc builds, please refer to Cassandra versions and builds page.
  • If you plan to run "snapshot" command on Cassandra, it will be better to install jna.jar also. Please refer to Backup Data section.

Step 2: Edit configuration files


Cassandra configuration files can be found in conf directory under the top directory of binary and source distributions. If you have installed cassandra from RPM packages, configuration files will be placed into /etc/cassandra/conf.

Step 2.1: Edit cassandra.yaml


The distribution's sample configuration conf/cassandra.yaml contains reasonable defaults for single node operation, but you will need to make sure that the paths exist fordata_file_directoriescommitlog_directory, and saved_caches_directory.
Verify storage_port and rpc_port are not conflict with other service on your computer. By default, Cassandra uses 7000 for storage_port, and 9160 for rpc_port. The storage_port must be identical between Cassandra nodes in a cluster. Cassandra client applications will use rpc_port to connect to Cassandra.
It will be a good idea to change cluster_name to avoid unnecessary conflict with existing clusters.
initial_token. You can leave it blank, but I recommend you to set it to 0 if you are configuring your first node.

Step 2.2: Edit log4j-server.properties


conf/log4j.properties contains a path for the log file. Edit the line if you need.
# Edit the next line to point to your logs directory
log4j.appender.R.File=/var/log/cassandra/system.log

Step 2.3: Edit cassandra-env.sh


Cassandra has JMX (Java Management Extensions) interface, and the JMX_PORT is defined in conf/cassandra-env.shEdit following line if you need.
# Specifies the default port over which Cassandra will be available for
# JMX connections.
JMX_PORT="7199"

By default, Cassandra will allocate memory based on physical memory your system has. For example it will allocate 1GB heap on 2GB system, and 2GB heap on 8GB system. If you want to specify Cassandra heap size, remove leading pound sign(#) on the following lines and specify memory size for them.
#MAX_HEAP_SIZE="4G"
#HEAP_NEWSIZE="800M"

If you are not familiar with Java GC, 1/4 of MAX_HEAP_SIZE may be a good start point for HEAP_NEWSIZE.
Cassandra will need more than few GB heap for production use, but you can run it with smaller footprint for test drive. If you want to assign 128MB as max, edit the lines as following.
MAX_HEAP_SIZE="128M"
HEAP_NEWSIZE="32M"

If you face OutOfMemory exceptions or massive GCs with this configuration, increase these values. Don't start your production service with such tiny heap configuration!
  • Note for Mac Uses:
    Some people running OS X have trouble getting Java 6 to work. If you've kept up with Apple's updates, Java 6 should already be installed (it comes in Mac OS X 10.5 Update 1). Unfortunately, Apple does not default to using it. What you have to do is change your JAVA_HOME environment setting to/System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home and add /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin to the beginning of yourPATH.

Step 3: Start up Cassandra


And now for the moment of truth, start up Cassandra by invoking bin/cassandra -f from the command line1. The service should start in the foreground and log gratuitously to standard-out. Assuming you don't see messages with scary words like "error", or "fatal", or anything that looks like a Java stack trace, then chances are you've succeeded.
Press "Control-C" to stop Cassandra.
If you start up Cassandra without "-f" option, it will run in background, so you need to kill the process to stop.

Step 4: Using cassandra-cli


bin/cassandra-cli is a interactive command line interface for Cassandra. You can define schema, store and fetch data with the tool. Run following command to connect to your Cassandra instance.
bin/cassandra-cli -h host -p rpc_port

example:
% bin/cassandra-cli -h 127.0.0.1 -p 9160

Then you will see following cassandra-cli prompt.
Connected to: "Test Cluster" on 127.0.0.1/9160
Welcome to Cassandra CLI version 1.0.7

Type 'help;' or '?' for help.
Type 'quit;' or 'exit;' to quit.

[default@unknown] 

Tuesday, September 18, 2012

Apache Cassandra - Load Balancing the cluster


Load balancing


When adding new nodes to the cluster, the data does not automatically get shared across new nodes equally and share load proportionately. This will make the cluster completely unbalanced.
 In order to make the data get shared equally we need to shift the token range some by using the nodetool move command.
Token ranges must be calculated in a way that will make sharing of data almost equal in each of the node.
Here's a python program which can be used to calculate new tokens for the nodes. 
  • def tokens(nodes):
    • for x in xrange(nodes):
      • print 2 ** 127 / nodes * x
Run this py program for each node and try the nodetool ring command . This will tell you the load on each of the node connected to the cluster. Run the program on each node again if needed until you see the nodetool ring display the load to be shared equally between each node.

In versions of Cassandra 0.7.* and lower, there's also nodetool loadbalance: essentially a convenience over decommission + bootstrap, only instead of telling the target node where to move on the ring it will choose its location based on the same heuristic as Token selection on bootstrap. You should not use this as it doesn't rebalance the entire ring.
The status of move and balancing operations can be monitored using nodetool with the netstat argument. (Cassandra 0.6.* and lower use the streams argument).

Apache Cassandra - Moving a node to a different token range


Moving nodes


nodetool move: move the target node to a given Token. Moving is both a convenience over and more efficient than decommission + bootstrap. After moving a node, nodetool cleanup should be run to remove any unnecessary data.
As with bootstrap, see Streaming for how to monitor progress.


Streaming :

Transfer


The following steps occur for Stream Transfers.
  1. Source has a list of ranges it must transfer to another node.
  2. Source copies the data in those ranges to sstable files in preparation for streaming. This is called anti-compaction (because compaction merges multiple sstable files into one, and this does the opposite).
  3. Source builds a list of PendingFile's which contains information on each sstable to be transfered.
  4. Source starts streaming the first file from the list, followed by the log "Waiting for transfer to $some_node to complete". The header for the stream contains information on the streamed file for the Destination to interpret what to do with the incoming stream.
  5. Destination receives the file writes it to disk and sends a FileStatus.
  6. On successful transfer the Source streams the next file until its done, on error it re-streams the same file.

Request


  1. Destination compiles a list of ranges it needs from another node.
  2. Destination sends a StreamRequestMessage to the Source node with the list of ranges.
  3. Source prepares the SSTables for those ranges and creates the PendingFile's.
  4. Source starts streaming the first file in the list. The header for the first stream contains info of the current stream and a list of the remaining PendingFile's that fall in the requested ranges.
  5. Destination receives the stream and writes it to disk, followed by the log message "Streaming added org.apache.cassandra.io.sstable.SSTableReader(path='/var/lib/cassandra/data/Keyspace1/Standard1-e-1-Data.db')".
  6. Destination then takes the lead and requests the remaining files one at a time. If an error occurs it re-requests the same file, if not continues with the next file until done.
  7. Source streams each of the requested files. The files are already anti-compacted, so it just streams them to the Destination.

Apache Cassandra - How to remove nodes from a live cluster

If you have decided to remove a node from a live cluster you can follow one of the two options based on which one suits you better.

Nodetool is available in the cassandra home directory

{CASSANDRA_HOME}/bin/nodetool decommission
{CASSANDRA_HOME}/bin/nodetool removetoken

Removing nodes entirely


* Decommission :

You can take a node out of the cluster with nodetool decommission to a live node. This will assign the ranges the old node was responsible for to other nodes, and replicate the appropriate data there. If decommission is used, the data will stream from the decommissioned node. 
No data is removed automatically from the node being decommissioned, so if you want to put the node back into service at a different token on the ring, it should be removed manually.

* Remove Token :

You can execute nodetool removetoken on any available machine to remove a dead one. This will assign the token range the old node was applicable to other nodes. If removetoken is used, the data will stream from the remaining replicas.

Tuesday, August 7, 2012

Cassandra Hector - Fetch all rows

How to do a "select * from my_table" in cassandra using hector library ?

Did some googling around , and this seems to be working ...





public class Dumper {
    private final Cluster cluster;
    private final Keyspace keyspace;

    public Dumper() {
        this.cluster = HFactory.getOrCreateCluster("Name", "hostname");
        this.keyspace = HFactory.createKeyspace("Keyspace", cluster, new QuorumAllConsistencyLevelPolicy());
    }

    public void run() {
        int row_count = 100;

        RangeSlicesQuery<UUID, String, Long> rangeSlicesQuery = HFactory
            .createRangeSlicesQuery(keyspace, UUIDSerializer.get(), StringSerializer.get(), LongSerializer.get())
            .setColumnFamily("Column Family")
            .setRange(null, null, false, 10)
            .setRowCount(row_count);

        UUID last_key = null;

        while (true) {
            rangeSlicesQuery.setKeys(last_key, null);
            System.out.println(" > " + last_key);

            QueryResult<OrderedRows<UUID, String, Long>> result = rangeSlicesQuery.execute();
            OrderedRows<UUID, String, Long> rows = result.get();
            Iterator<Row<UUID, String, Long>> rowsIterator = rows.iterator();

            // we'll skip this first one, since it is the same as the last one from previous time we executed
            if (last_key != null && rowsIterator != null) rowsIterator.next();   

            while (rowsIterator.hasNext()) {
              Row<UUID, String, Long> row = rowsIterator.next();
              last_key = row.getKey();

              if (row.getColumnSlice().getColumns().isEmpty()) {
                continue;
              }


              System.out.println(row);
            }

            if (rows.getCount() < row_count)
                break;
        }
    }

    public static void main(String[] args) {
        new Dumper().run();
    }
}


This will page through the column family in pages of 100 rows. It will only fetch 10 columns for each row (you will want to page very long rows too).
This is for a column family with uuids for row keys, strings for column names and longs for values. Hopefully it should be obvious how to change this.

Cassandra Replication Factor - What is it ?


Replication


A Cassandra cluster always divides up the key space into ranges delimited by Tokens as described above, but additional replica placement is customizable via IReplicaPlacementStrategy in the configuration file. The standard strategies are
  • RackUnawareStrategy: replicas are always placed on the next (in increasing Token order) N-1 nodes along the ring
  • RackAwareStrategy: replica 2 is placed in the first node along the ring the belongs in another data center than the first; the remaining N-2 replicas, if any, are placed on the first nodes along the ring in the same rack as the first
Note that with RackAwareStrategy, succeeding nodes along the ring should alternate data centers to avoid hot spots. For instance, if you have nodes A, B, C, and D in increasing Token order, and instead of alternating you place A and B in DC1, and C and D in DC2, then nodes C and A will have disproportionately more data on them because they will be the replica destination for every Token range in the other data center.
  • The corollary to this is, if you want to start with a single DC and add another later, when you add the second DC you should add as many nodes as you have in the first rather than adding a node or two at a time gradually.
Replication factor is not really intended to be changed in a live cluster either, but increasing it is conceptually simple: update the replication_factor from the CLI (see below), then run repair against each node in your cluster so that all the new replicas that are supposed to have the data, actually do.
Until repair is finished, you have 3 options:
  • read at ConsistencyLevel.QUORUM or ALL (depending on your existing replication factor) to make sure that a replica that actually has the data is consulted
  • continue reading at lower CL, accepting that some requests will fail (usually only the first for a given query, if ReadRepair is enabled)
  • take downtime while repair runs
The same options apply to changing replication strategy.
Reducing replication factor is easily done and only requires running cleanup afterwards to remove extra replicas.
To update the replication factor on a live cluster, forget about cassandra.yaml. Rather you want to use cassandra-cli:
  • update keyspace Keyspace1 with strategy_options = {replication_factor:3};

Cassandra replication


About Replication in Cassandra

Replication is the process of storing copies of data on multiple nodes to ensure reliability and fault tolerance.
Cassandra stores copies, called replicas, of each row based on the row key. You set the number of replicas when you create a keyspace using the replica placement strategy. In addition to setting the number of replicas, this strategy sets the distribution of the replicas across the nodes in the cluster depending on the cluster’s topology.
The total number of replicas across the cluster is referred to as the replication factor. A replication factor of 1 means that there is only one copy of each row on one node. A replication factor of 2 means two copies of each row, where each copy is on a different node. All replicas are equally important; there is no primary ormaster replica. As a general rule, the replication factor should not exceed the number of nodes in the cluster. However, you can increase the replication factor and then add the desired number of nodes afterwards. When replication factor exceeds the number of nodes, writes are rejected, but reads are served as long as the desired consistency level can be met.
To determine the physical location of nodes and their proximity to each other, the replication strategy also relies on the cluster-configured snitch, which is described below.

Replication Strategy

The available strategies are:

SimpleStrategy

Use SimpleStrategy for simple single data center clusters. This strategy is the default replica placement strategy when creating a keyspace using the Cassandra CLI. See Creating a Keyspace. When using the Cassandra Query Language interface, you must explicitly specify a strategy. See CREATE KEYSPACE.
SimpleStrategy places the first replica on a node determined by the partitioner. Additional replicas are placed on the next nodes clockwise in the ring without considering rack or data center location.


NetworkTopologyStrategy

Use NetworkTopologyStrategy when you have (or plan to have) your cluster deployed across multiple data centers. This strategy specify how many replicas you want in each data center.
When deciding how many replicas to configure in each data center, the two primary considerations are (1) being able to satisfy reads locally, without incurring cross-datacenter latency, and (2) failure scenarios. The two most common ways to configure multiple data center clusters are:
  • Two replicas in each data center. This configuration tolerates the failure of a single node per replication group and still allows local reads at a consistency level of ONE.
  • Three replicas in each data center. This configuration tolerates the failure of a one node per replication group at a strong consistency level of LOCAL_QUORUM or tolerates multiple node failures per data center using consistency level ONE.
Asymmetrical replication groupings are also possible. For example, you can have three replicas per data center to serve real-time application requests and use a single replica for running analytics.
The NetworkTopologyStrategy determines replica placement independently within each data center as follows:
  • The first replica is placed according to the partitioner (same as with SimpleStrategy).
  • Additional replicas are placed by walking the ring clockwise until a node in a different rack is found. If no such node exists, additional replicas are placed in different nodes in the same rack.
NetworkTopologyStrategy attempts to place replicas on distinct racks because nodes in the same rack (or similar physical grouping) can fail at the same time due to power, cooling, or network issues.