Running unit tests on Hadoop 2

This is a quick blog post. If you’ve attempted to run the unit tests on any mavenized branch of Hadoop (Hadoop 2, Hadoop 2.0.5, Hadoop 3.0, trunk, etc.) you probably know how to do this:

$ mvn clean test

If you’ve attempted to do this, you’ve probably also been flummoxed by various unrelated unit test failures that have nothing to do with the code changes that you made. How do you get maven to continue running the unit tests with a consolidated report at the end, instead of failing with an error upon the first unit test failure? Using the following command:

$ mvn -Dmaven.test.failure.ignore=true test

Thanks Cos (Konstantin Boudnik) for this extremely useful pointer!

Continuous Availability versus High Availability

Wikipedia’s page on Continuous Availability is available here:

http://en.wikipedia.org/wiki/Continuous_availability

A quick perusal tells us that High Availability can be ‘accomplished by providing redundancy or quickly restarting failed components’. This is very different from ‘Continuously Available’ systems that enable continuous operation through planned and unplanned outages of one or more components.

As large global organizations move from using Hadoop for batch storage and retrieval to mission critical real-time applications where the cost of even one minute of downtime is unacceptable, mere high availability will not be enough.

Solutions such as HDFS NameNode High Availability (NameNode HA) that come with Apache Hadoop 2.0 and Hadoop distributions based on it are subject to downtimes of 5 to 15 minutes.  In addition, NameNode HA, is limited to a single data center, and only one NameNode can be active at a time, creating a performance as well as an availability bottleneck. Deployments that incorporate WANdisco Non-Stop Hadoop are not subject to any downtime, regardless of whether a single NameNode server or an entire data center goes offline. There is no need for maintenance windows with Non-Stop Hadoop, since you can simply bring down the NameNode servers one at a time, and perform your maintenance operations.  The remaining active NameNodes continue to support real-time client applications as well as batch jobs.

The business advantages of a Continuously Available, multi-data center aware systems are well known to IT decision makers. Here are some examples that illustrate how both real-time and batch applications can benefit and new use cases can be supported:

  • A Batch Big Data DAG is a chain of applications wherein the output of a preceding job is used as the input to a subsequent job. At companies such as Yahoo, these DAGs take six to eight hours to run, and they are run every day. Fifteen minutes of NameNode downtime may cause one of these jobs to fail. As a result of this single failure, the entire DAG may not run to completion, creating delays that can last many hours.
  • Global clickstream analysis applications that enable businesses to see and respond to customer behavior or detect potentially fraudulent activity in real-time.
  • A web site or service built to use HBase as a backing store will be down if the HDFS underlying HBase goes down when the NameNode fails. This is likely to result in lost revenue and erode customer goodwill.  Non-Stop Hadoop eliminates this risk.
  • Continuous Availability systems such as  WANdisco Non-Stop Hadoop are administered with  fewer staff. This is because failure of one out of five NameNodes is not an emergency event. It can be dealt with by staff during regular business hours. Significant cost savings in staffing can be achieved since Continuously Available systems do not require 24×7 sysadmin staff .  In addition, in a distributed multi-data center environment, Non-Stop Hadoop can be managed from one location.
  • There are no passive or standby servers or data centers that essentially sit idle until disaster strikes.  All servers are active and provide full read and write access to the same data at every location.

See a demo of Non-Stop Hadoop for Cloudera and Non-Stop Hadoop for Hortonworks in action and read what leading industry analysts like Gartner’s Merv Adrian have to say about the need for continuous Hadoop availability.

 

Introduction to Hadoop 2, with a simple tool for generating Hadoop 2 config files

Introduction to Hadoop 2
Core Hadoop 2 consists of the distributed filesystem HDFS and the compute framework YARN.

HDFS is a distributed filesystem that can be used to store anywhere from a few gigabytes to many petabytes of data. It is distributed in the sense that it utilizes a number of slave servers, ranging from 3 to a few thousand, to store and serve files from.

YARN is the compute framework for Hadoop 2. It manages the distribution of compute jobs to the very same slave servers that store the HDFS data. This ensures that the compute jobs do not reach out over the network to access the data stored in HDFS.

Naturally, traditional software written to run on a single server will not work on Hadoop. New software needs to be developed using a special programming paradigm called Map Reduce. Hadoop’s native Map Reduce framework uses java, however Hadoop MR programs can be written in almost any language. Also, higher level languages such as Pig may be used to write scripts that compile into Hadoop MR jobs.

Hadoop 2 Server Daemons
HDFS:  HDFS consists of a master metadata server called the NameNode, and a number of slave servers called DataNodes.

The NameNode function is provided by a single java daemon – the NameNode. The NameNode daemon runs on just one machine – the master. DataNode functionality is provided by a java daemon called DataNode that runs on each slave server.

Since the NameNode function is provided by a single java daemon, it turns out to be Single Point of Failure (SPOF). Open source Hadoop 2 has a number of ways to keep a standby server waiting to take over the function of the NameNode daemon, should the single NameNode fail. All of these standby solutions take 5 to 15 minutes to failover. While this failover is underway, batch MR jobs on the cluster will fail. Further, an active HBase cluster with a high write load may not necessarily survive the NameNode failover. Our company WANdisco has a commercial product called the NonStop NameNode that solves this SPOF problem.

Configuration parameters for the HDFS deamons NameNode and DataNode are all stored in a file called the hdfs-site.xml. At the end of this blog entry, I have included a simple java program that generates all the config files necessary for running a Hadoop 2 cluster. This convenient program generates a hdfs-site.xml.

YARN:
The YARN framework has a single master daemon called the YARN Resource Manager that runs in a master node, and a YARN Node Manager on each of the slave nodes. Additionally, YARN has a single Proxy server and a single Mapreduce job history server. As indicated by the name, the function of the mapreduce job history server is to store and serve a history of the mapreduce jobs that were run on the cluster.

The configuration for all of these daemons is stored in yarn-site.xml.

Daemons that comprise core Hadoop (HDFS and YARN)

Daemon Name Number Description Web Port (if any) RPC Port (if any)
NameNode 1 HDFS Metadata Server, usually run on the master 50070 8020
DataNode 1 per slave HDFS Data Server, one per slave server 50075 50010 (Data transfer RPC),
50020 (Block metadata RPC)
ResourceManager 1 YARN ResourceManager, usually on a master server 8088 8030 (Scheduler RPC),
8031 (Resource Tracker RPC),
8032 (Resource Manager RPC),
8033 (Admin RPC)
NodeManager 1 per slave YARN NodeManager, one per slave 8042 8040 (Localizer),
8041 (NodeManager RPC)
ProxyServer 1 YARN Proxy Server 8034
JobHistory 1 Mapreduce Job History Server 10020 19888

A Simple Program for generating Hadoop 2 Config files
This is a simple program that generates core-site.xml, hdfs-site.xml, yarn-site.xml and capacity-scheduler.xml. You need to supply this program with the following information :

  1. nnHost: HDFS Name Node Server hostname
  2. nnMetadataDir: The directory on the Name Node server’s local filesystem where the NameNode metadata will be stored – nominally /var/hadoop/name. If you are using our WDD rpms, note that all documentation will refer to this directory
  3. dnDataDir: The directory on each DataNode or slave machine where HDFS data blocks are stored. This location must have be the biggest disk on the DataNode. Nominally /var/hadoop/data. If you are using our WDD rpms, note that this is the location that all of our documentation will refer to.
  4. yarnRmHost: YARN Resource Manager hostname

Download the following jar: makeconf
Here is an example run:

$ java -classpath makeconf.jar com.wandisco.hadoop.makeconf.MakeHadoopConf nnHost=hdfs.hadoop.wandisco nnMetadataDir=/var/hadoop/name dnDataDir=/var/hadoop/data yarnRmHost=yarn.hadoop.wandisco

Download the source code for this simple program here: makeconf-source

Incidentally, if you are looking for an easy way to get Hadoop 2, try our free Hadoop distro – the WANdisco Distro (WDD). It is a packaged, tested and certified version of the latest Hadoop 2.

 

Answers to questions from the Webinar of Dec 11, 2012

 Download the webinar slides here.

Question 1: Are there any special considerations or support of Spring technologies for this (i.e. Spring-Data, Spring-Integration, Spring-Batch)?

Answer: We are continuously looking at technologies that make Hadoop easier to use and program. Spring-Data, Spring-Integration and Spring-Batch show promise. When sufficient momentum is gathered by these projects, we will work with the Spring community to include a tested version of these technologies in the AltoStor Appliance.

Question 2: What is the hadoop version underneath the appliance?

Answer: Hadoop 2. We intend to remain close to the latest version of Hadoop at any given point in time, modulo fixes and changes for bug fixes.

Question 3: Will the pricing model based on number of name nodes or size of the cluster?

Answer: Pricing decisions have not been made yet. We will announce pricing in the first quarter of 2013.

Question 4: Can you comment on how load balancing is resolved across active nodes? Is there a load balance router concept?

Answer: We do not require/depend on any specialized hardware such as load balancers or NFS filers. By “load balancing” we simply mean that application requests (read or write) can be directed to any NameNode based on its proximity to the client or available resources. Thus NameNodes can share the workload and provide higher overall cluster performance compared to active-standby architecture.

Question 5: How does Active-Active replication impact processing time relative to current Hadoop architectures?

Answer: Active-Active replication will result in load balancing of clients across many NameNodes, i.e. fewer clients will be serviced by each NameNode. Since NameNodes share the workload on a busy cluster, you should expect faster response time for clients. Generally, more active NameNodes can perform a proportionally larger amount of work.


Design of the Hadoop HDFS NameNode: Part 1 – Request processing

NameNode Design Part 1 - Client Request Processing

NameNode Design Part 1 – Client Request Processing

HDFS is Hadoop’s File System. It is a distributed file system in that it uses a multitude of machines to implement its functionality. Contrast that with NTFS, FAT32, ext3 etc. which are all single machine filesystems.

HDFS is architected such that the metadata, i.e. the information about file names, directories, permissions, etc. is separated from the user data. HDFS consists of the NameNode, which is HDFS’s metadata server, and DataNodes, where user data is stored. There can be only one active instance of the NameNode. A number of DataNodes (a handful to several thousand) can be part of this HDFS served by the single NameNode.

Here is how a client RPC request to the Hadoop HDFS NameNode flows through the NameNode. This pertains to the Hadoop trunk code base on Dec 2, 2012, i.e. a few months after Hadoop 2.0.2-alpha was released.

The Hadoop NameNode receives requests from HDFS clients in the form of Hadoop RPC requests over a TCP connection. Typical client requests include mkdir, getBlockLocations, create file, etc. Remember – HDFS separates metadata from actual file data, and that the NameNode is the metadata server. Hence, these requests are pure metadata requests – no data transfer is involved. The following diagram traces the path of a HDFS client request through the NameNode. The various thread pools used by the system, locks taken and released by these threads, queues used, etc. are described in detail in this message.

 

  • As shown in the diagram, a Listener object listens to the TCP port serving RPC requests from the client. It accepts new connections from clients, and adds them to the Server object’s connectionList
  • Next, a number of RPC Reader threads read requests from the connections in connectionList, decode the RPC requests, and add them to the rpc call queue – Server.callQueue.
  • Now, the actual worker threads kick in – these are the Handler threads. The threads pick up RPC calls and process them. The processing involves the following:
    • First grab the write lock for the namespace
    • Change the in-memory namespace
    • Write to the in-memory FSEdits log (journal)
  • Now, release the write lock on the namespace. Note that the journal has not been sync’d yet – this means we cannot return success to the RPC client yet
  • Next, each handler thread calls logSync. Upon returning from this call, it is guaranteed that the logfile modification have been sync’d to disk. Exactly how this is guaranteed is messy. Here are the details:
    • Everytime an edit entry is written to the edits log,  a unique txid is assigned for this specific edit. The Handler retrieves this log txid and saves it. This is going to be used to verify whether this specific edit log entry has been sync’d to disk
    • When logSync is called by a Handler, it first checks to see if the last sync’d log edit entry is greater than the txid of the edit log just finished by the Handler. If the Handler’s edit log txid is less than the last sync’d txid, then the Handler can mark the RPC call as complete. If the Handler’s edit log txid is greater than the last sync’d txid, then the Handler has to do one of the following things:
      • It has to grab the sync lock and sync all transcations
      • If it cannot grab the sync lock, then it waits 1000ms and tries again in a loop
      • At this point, the log entry for the transaction made by this Handler has been persisted. The Handler can now mark the RPC as  complete.
    • Now, the single Responder thread picks up completed RPCs and returns the result of the RPC call to to the RPC client. Note that the Responder thread uses NIO to asynchronously send responses back to waiting clients. Hence one thread is sufficient.

There is one thing about this design that bothers me:

  • The threads that wait for their txid to sync sleep 1000ms, wait, sleep 1000ms, wait and continue with this poll. It may make sense to remove the polling mechanism and to replace by an actual sleep/notify mechanism.

That’s all in this writeup, folks.

Setting up Amanda to backup a linux box to Amazon S3

Here is my experience with compiling and configuring amanda to backup to the cloud (S3).

First compiling and installing amanda

  1. Create a user ‘amanda’ in group ‘backup’. Login as this user
  2. Download amanda-3.3.2.tar.gz, unzip
  3. As user amanda, run ‘./configure –enable-s3-device’ and ‘make’
  4. As user root, run ‘make install’
  5. Now this next instruction is basically a hack to workaround some packaging bugs in amanda:
    • chgrp backup /usr/local/sbin/am*

Next, creating a backup configuration called ‘MyConfig’ for backing up /etc to a virtual tape in the directory /amanda

  1. As root, create a directory /amanda, and chown it to amanda:backup
  2. Run the following commands as user amanda:
    • mkdir -p /amanda/vtapes/slot{1,2,3,4}
    • mkdir -p /amanda/holding
    • mkdir -p /amanda/state/{curinfo,log,index}
  3. As root, create a directory ‘/usr/local/etc/amanda/MyConfig’, and chown it to amanda:backup.
  4. As user amanda, create a file /usr/local/etc/amanda/MyConfig/amanda.conf with the following contents:

 

org "MyConfig"
infofile "/amanda/state/curinfo"
logdir "/amanda/state/log"
indexdir "/amanda/state/index"
dumpuser "amanda"

tpchanger "chg-disk:/amanda/vtapes"
labelstr "MyData[0-9][0-9]"
autolabel "MyData%%" EMPTY VOLUME_ERROR
tapecycle 4
dumpcycle 3 days
amrecover_changer "changer"

tapetype "TEST-TAPE"
define tapetype TEST-TAPE {
  length 100 mbytes
  filemark 4 kbytes
}

define dumptype simple-gnutar-local {
    auth "local"
    compress none
    program "GNUTAR"
}

holdingdisk hd1 {
    directory "/amanda/holding"
    use 50 mbytes
    chunksize 1 mbyte
}
  • Create a file /usr/local/etc/amanda/MyConfig/disklist with the following contents:
localhost /etc simple-gnutar-local
  • As root make a directory /usr/local/var/amanda/gnutar-lists, and change ownership of this directory to amanda:backup
  • Next, as user amanda, run the program ‘amcheck MyConfig’. It should finish its report with ‘Client check: 1 host checked in 2.064 seconds.  0 problems found.’
  • Next, run ‘amdump MyConfig’. The program should exit with return value 0
  • Finally, run ‘amreport MyConfig’ to get the status of you backup effort!

That’s it for testing a local backup to the virtual tape /amanda. Now, on to trying this with S3 as the tape backend.

  • Create a new config directory /usr/local/etc/amanda/MyS3Config/amanda.conf with the following contents:
org "MyS3Config"
infofile "/amanda/state/curinfo"
logdir "/amanda/state/log"
indexdir "/amanda/state/index"
dumpuser "amanda"

# amazonaws S3
device_property "S3_ACCESS_KEY" "YOUR_AMAZON_KEY_ID_HERE"
device_property "S3_SECRET_KEY" "YOUR_AMAZON_SECRET_KEY_HERE"
device_property "S3_SSL" "YES"
tpchanger "chg-multi:s3:YOUR_NAME-backups/MyS3Config1/slot-{01,02,03,04,05,06,07,08,09,10}"
changerfile  "s3-statefile"

tapetype S3
define tapetype S3 {
    comment "S3 Bucket"
    length 10240 gigabytes # Bucket size 10TB
}

define dumptype simple-gnutar-local {
    auth "local"
    compress none
    program "GNUTAR"
}

holdingdisk hd1 {
    directory "/amanda/holding"
    use 50 mbytes
    chunksize 1 mbyte
}
  • Enter your S3 Account Key ID, Secret Key and change YOUR_NAME to your name in the above config example
  • Now run the command
    • amlabel MyS3Config MyS3Config-1 slot 1
  •  If this works fine, then run the following command to create 9 more slots
    • for i in 2 3 4 5 6 7 8 9 10; do amlabel MyS3Config MyS3Config-$i slot $i; done;
  • Finally run the following to verify:
    • amdevcheck MyS3Config s3:jaganes-backups/MyS3Config1/slot-10
  • Now you are ready to run a backup. Define one for the /etc directory on your machine by adding the file /usr/local/etc/amanda/MyS3Config/disklist with the following line:
    • localhost /etc simple-gnutar-local
  • Try running a backup using the command:
    • amdump MyS3Config
  • Check the return value of the command, and if that is 0, check the contents of the backup bucket on S3 using S3fox or some such tool.

That’s all folks. You have created a backup of your linux machine’s /etc directory on Amazon’s S3 cloud storage service.

It would be interesting to try this with Amazon’s glacier service, which is much cheaper than S3

Running hdfs from the freshly built 0.23.3 source tree in distributed mode

Download the Hadoop Creation Scripts used in this post here

Here is what I did to run the newly built hadoop in distributed mode using three CentOS VMs. I have these VMs, and the scripts that I used for creating hadoop available for download later in this blog post.

First, setting up three VMs for running Hadoop

  • Install ESXi on a suitable machine. I used a 4 core machine with 8GB of RAM and two 300GB disks
  • Create three 32 bit CentOS 6.3 VMs. Each VM has 32GB of disk space, and 2GB of RAM. To the CentOS installer, I specified ‘Basic Server’ as the installation type.
  • The root password is altostor
  • I assigned IP addresses 10.0.0.175, 10.0.0.176 and 10.0.0.177 to the three virtual machines. I assigned the hostnames master.altostor.net, slave0.altostor.net and slave1.altostor.net to these machines and created a hosts file.
  • I setup passwordless ssh so that the ‘root’ user can ssh from master.altostor.net to slave0 and slave1 without typing in a password. The web is full of guides for this, but in brief:
    •  Turn off selinux on the three VMs by editing /etc/selinux/config and setting SELINUX=disabled.
    • Run ‘ssh-keygen -t rsa’ on the master
    • Create a /root/.ssh directory on slave0 and slave1 and set its permissions to 700
    • scp the file /root/.ssh/id_rsa.pub to slave0 and slave1 as /root/.ssh/authorized_keys. Set permissions of authorized_keys to 644. On the master node itself, copy the id_rsa.pub as authorized_keys.
    • From master, test that you can ssh into ‘slave0’, ‘slave1’ and to itself ‘master’
    • Create a group hadoop and a user hdfs in this new group on all three machines

Next, creating the hadoop. Download the tar file with the scripts and config files necessary for installing hadoop from the link at the top of this posting. Download the zip file, unzip on the master, drop the hadoop binary hadoop-0.23.3.tar.gz, and run ‘./create-hadoop.sh’. The script does the following:

  • master setup:
    • kill all java processes
    • delete data directory, pid directory and log directory
    • Copy config files (config file templates are included with the script tar file) and customize
    • Create the slaves file with slave0 and slave1 in it.
    • formats the HDFS filesystem
    • starts up the namenode and creates /user in hdfs. Note that the NameNode java process is running as the linux user root
  • slave setup
    • kills all running java processes
    • removes hadoop data, pid and log directory
    • scp hadoop binaries directory over from master to slave
    • starts up the hadoop DataNode process as linux user root

 

 

Running hdfs from the freshly built hadoop 0.23.3 in pseudo distributed mode

So, here’s what I did to run the freshly built hadoop 0.23.3 bits in pseudo distributed mode (this is the mode where each of the hadoop daemons runs in its own process in a single physical/virtual machine).

First I configured passwordless ssh back into the same machine (localhost). I needed to turn off selinux on this CentOS 6.3 VM in order to accomplish that. Seems like selinux is working very hard to make CentOS/Redhat completely unusable. I edited /etc/selinux/config and changed the line SELINUX=enforcing to SELINUX=disabled. Reboot, and then ‘ssh-keygen’ and then ‘ssh-copy-id -i ~/.ssh/id_rsa.pub jagane@localhost’

Now, I untarred the file <src_root>/hadoop-dist/target/hadoop-0.23.3.tar.gz into /opt. mkdir /opt/hadoop-0.23.3/conf, /opt/nn and /opt/dn. Create the following files in /opt/hadoop-0.23.3/conf.

/opt/hadoop-0.23.3/conf/core-site.xml:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
 <property>
 <name>fs.default.name</name>
 <value>hdfs://localhost:8020</value>
 </property>
</configuration>

/opt/hadoop-0.23.3/conf/hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
 <property>
 <name>dfs.replication</name>
 <value>1</value>
 </property>
 <property>
 <name>dfs.namenode.name.dir</name>
 <value>/opt/nn</value>
 </property>
 <property>
 <name>dfs.datanode.data.dir</name>
 <value>/opt/dn</value>
 </property>
</configuration>

/opt/hadoop-0.23.3/conf/hadoop-env.sh

export JAVA_HOME=/usr/java/java
export HADOOP_HOME=/opt/hadoop-0.23.3
export HADOOP_MAPRED_HOME=${HADOOP_HOME}
export HADOOP_COMMON_HOME=${HADOOP_HOME}
export HADOOP_HDFS_HOME=${HADOOP_HOME}
export YARN_HOME=${HADOOP_HOME}
export HADOOP_CONF_DIR=${HADOOP_HOME}/conf/
export YARN_CONF_DIR=~${HADOOP_HOME}/conf/

Now run the following command to format hdfs

$ (cd /opt/hadoop-0.23.3; ./bin/hdfs namenode -format)

Next, startup the namenode as follows:

$ (cd /opt/hadoop-0.23.3; ./sbin/hadoop-daemon.sh --config /opt/hadoop-0.23.3/conf start namenode)

Finally, start up the datanode

$ (cd /opt/hadoop-0.23.3; ./sbin/hadoop-daemon.sh --config /opt/hadoop-0.23.3/conf start datanode)

At this point, running the command jps will show you the datanode and the namenode running.

That’s all for now.

Compiling Hadoop 0.23.3 from source

Just a quick blog entry – here’s what I did to compile hadoop 0.23.3 from source.

  • Install CentOS 6.3 in a VM using vmware player. I used a VM with 100GB disk space and 1GB memory. More memory will probably help
  • Download the source bundle, and untar it
  • yum groupinstall “Development Tools”
  • change uid for the user ‘jagane’ from 500 to 1000
  • yum install cmake
  • yum install openssl
  • yum install openssl-devel
  • Download google protobuf sources, ./configure, make, make install.
  • install the latest maven
  • mvn clean install package -Pdist -Dtar -Pnative -DskipTests=true

Look for the built tar.gz files in hadoop-dist/target.That’s all

A Real Time Sentiment Analysis Application using Hadoop and HBase in the Cloud

Download TwitterSampler.java

Download the html files

I did a talk about a Real Time Sentiment Analysis Application at the Hadoop Summit 2012.

Here are the slides from this presentation:

http://www.slideshare.net/Hadoop_Summit/realtime-sentiment-analysis-app-using-hadoop-and-h-base

This is an application that evaluates the sentiment of twitter users towards a small number of pre-determined keywords, stores them in HBase, and displays a graph of sentiment versus time. Users can scroll back and forth in time to view how the sentiment tracked over time.

Download the java files and the html files for this project from the links at the top of this post.

There are three parts to this program.

  1. Using the twitter API to get a stream of tweets (public status updates)
  2. Doing sentiment analysis on the tweets and storing them in HBase
  3. Using a javascript program running in the browser to call back into HBase using the REST gateway, and plotting the output

I am co-founder of AltoStor – we develop software that turns HBase and Hadoop into a metered, billed service in the public cloud or in Enterprise VMware. I used our HBase Workbench to develop this entire project. The code itself will run on stock Hadoop 1.0.x with HBase 0.92.x – you don’t need the Workbench. The Workbench is simply the easiest way for you to get started developing Big Data applications.

Good luck playing with these technologies

— Jagane

avatar

About Jagane Sundar