Jan 062014
 

In a past life when I used to work for a wireless service provider,  they used a vended application to evaluate how much data bandwidth customers were consuming and that data was sent to the billing system to form the customers’ monthly bills.

The app was a poorly written (imho) and was woefully single-threaded, incapable of leveraging oodles of compute resources that were provided in the form of the twelve two-node VCS/RHEL based clusters. There were two data centers and there was one such “cluster of clusters” at each site.

The physical infrastructure was pretty standard and over-engineered to a certain degree (the infrastructure under consideration having been built in the 2007-2008 timeframe) – DL580 G5 HP servers  with 128GB of RAM each, a pair of gigabit ethernet nics for cluster interconnects, another pair bonded together as public interfaces and 4 x 4GB FC HBAs through Brocade DCX core switches (dual fabric) to an almost dedicated EMC CLaRiiON C4-960.

The application was basically a bunch of processes that watched traffic as it flowed through the network cores and calculated bandwidth usage based on end-user handset IP addresses (i’m watering it down to help keep the narrative fast and fluid).

Each location (separated by about 200+ miles) acted as the fault tolerant component for the other. So, the traffic was cross-fed across the two data centers over a pair of OC12 links.

The application was a series of processes/services that formed a queue across the various machines of each cluster (over TCP ports). The processes within a single physical system too communicated via IP addresses and TCP ports.

The problem we started observing over a few months from the point of deployment was that the data would start queuing up and slow down/skew the calculations of the data usage, implications of which I don’t necessarily have to spell out (ridiculous).

The software  vendor’s standard response would be –

  1. It is not an application problem
  2. The various components of the application would write into FIFO queues on SAN-based filesystems. The vendor would constantly raise the bogey of SAN storage being slow, not enough IOPs being available and/or response time being poor. Their basis of coming up with was with what seemed to be an arbitrary metric of CPU IO Wait percentage rising over 5% (or perhaps even lower at times).
  3. After much deliberation poring over the NAR reports of almost dedicated EMC CX4-960s (and working with EMC), we were able to ascertain that the Storage arrays or the SAN were not in any way contributory towards any latency (that resulted in poor performance of this app).
  4. The processes being woefully single threaded, would barely ever use more than 15%of total CPUs available on the servers (each server having 16 cores at it’s disposal).
  5. Memory usage was nominal and well within acceptable limits
  6. The network throughput wasn’t anywhere near saturation

We decided to start profiling the application (despite great protestations from the vendor) during normal as well as problematic periods, at the layer where we were seeing the issues, as well as the immediately upstream and downstream layers.

What we observed was that:

  1. Under normal circumstances, the app would be spending most of it’s time in either read, write, send or recv syscalls
  2. When app was performing poorly, it would spend most of it’s time in the poll syscall. It became apparent that it was waiting on TCP sockets from app instances in the remote site (and issue was bidirectional).

 

Once this bit of information was carefully vetted out and provided to the vendor, then they decided to provide us with the minimal throughput requirements – they needed a minimum of 30mbps throughput. The assumption made was that on an OC12 (bw of 622mbps), 30mbps was quite achievable.

However, it so turns out that latency plays a huge role in the actual throughput on a WAN connection! (*sarcasm alert*)

The average RTT between the two sites was 30ms. Given that the servers were running RHEL 4.6, for the default TCP send and recv buffer sizes of 64K (untuned), the 30ms RTT resulted in a max throughput of about 17mbps.

It turns out that methodologies we (*NIX admins) would use to generate some network traffic and measure “throughput” with, aren’t necessarily equipped to handle wide area networks. For example, SSH has an artificial bottleneck in it’s code, that throttles the TCP window size to a default of 64K (in the version of OpenSSH we were using at that time) – as hardcoded in the channels.h file. Initial tests were indeed baffling, since we would never cross a 22mbps throughput on the WAN. After a little research we realized that all the default TCP window sizes (for passive ftp, scp, etc) were not really tuned for high RTT connections.

Thus begun the process of tweaking the buffer sizes, and generating synthetic loads using iperf.

After we established that the default TCP buffer sizes were inadequate, we calculated buffer sizes required to provide at least a 80mbps throughput, and implemented then across the environment. The queuing stopped immediately after.

Jan 022014
 

This is a continuation of my previous post on this topic.

First, a disclaimer –

  • I have focused on CentOS primarily. I will try and update this to accommodate more than one Linux distro (and even consider writing for Solaris-based implementations in the future).

Here’s the skeleton of the cloudera manager setup plugin:

import posixpath

from starcluster import threadpool
from starcluster import clustersetup
from starcluster.logger import log

class ClouderaMgr(clustersetup.ClusterSetup):

        def __init__(self,cdh_mgr_agentdir='/etc/cloudera-scm-agent'):
                self.cdh_mgr_agentdir = cdh_mgr_agentdir
                self.cdh_mgr_agent_conf = '/etc/cloudera-scm-agent/config.ini'
                self.cdh_mgr_repo_conf = '/etc/yum.repos.d/cloudera-cdh4.repo'
                self.cdh_mgr_repo_url = 'http://archive.cloudera.com/cm4/redhat/6/x86_64/cm/cloudera-manager.repo'
                self._pool = None

        @property
        def pool(self):
                if self._pool is None:
                        self._pool = threadpool.get_thread_pool(20, disable_threads=False)
                return self._pool

        def _install_cdhmgr_repo(self,node):
                node.ssh.execute('wget %s' % self.cdh_mgr_repo_url)
                node.ssh.execute('cat /root/cloudera-manager.repo >> %s' % self.cdh_mgr_repo_conf)

        def _install_cdhmgr_agent(self,node):
                node.ssh.execute('yum install -y cloudera-manager-agent')
                node.ssh.execute('yum install -y cloudera-manager-daemons')

        def _install_cdhmgr(self,master):
                master.ssh.execute('/sbin/service cloudera-scn-agent stop')
                master.ssh.execute('/sbin/chkconfig cloudera-scn-agent off')
                master.ssh.execute('/sbin/chkconfig hue off')
                master.ssh.execute('/sbin/chkconfig oozie off')
                master.ssh.execute('/sbin/chkconfig hadoop-httpfs off')
                master.ssh.execute('yum install -y cloudera-manager-server')
                master.ssh.execute('yum install -y cloudera-manager-server-db')
                master.ssh.execute('/sbin/service cloudera-scm-server-db start')
                master.ssh.execute('/sbin/service cloudera-scm-server start')

        def _setup_hadoop_user(self,node,user):
                node.ssh.execute('gpasswd -a %s hadoop' %user)

        def _install_agent_conf(self, node):
                node.ssh.execute('/bin/sed -e"s/server_host=localhost/server_host=master/g" self.cdh_mgr_agent_conf > /tmp/config.ini; mv /tmp/config.ini self.cdh_mgr_agent_conf')

        def _open_ports(self,master):
                ports = [7180,50070,50030]
                ec2 = master.ec2
                for group in master.cluster_groups:
                        for port in ports:
                                has_perm = ec2.has_permission(group, 'tcp', port, port, '0.0.0.0/0')
                                if not has_perm:
                                        ec2.conn.authorize_security_group(group_id=group.id,
                                                                        ip_protocol='tcp',
                                                                        from_port=port,
                                                                        to_port=port,
                                                                        cidr_ip='0.0.0.0/0')

        def run(self,nodes, master, user, user_shell, volumes):
                for node in nodes:
                     self._install_cdhmgr_repo(node)
                     self._install_cdhmgr_agent(node)
                     self._install_agent_conf(node)
                self._install_cdhmgr(master)
                self._open_ports(master)
        def on_add_node(self, node, nodes, master, user, user_shell, volumes):
                for node in nodes:
                     self._install_cdhmgr_repo(node)
                     self._install_cdhmgr_agent(node)
                     self._install_agent_conf(node)

And this plugin can be referenced and executed in the cluster configuration as follows:

[plugin cdhmgr]
setup_class = cdh_mgr.ClouderaMgr

[cluster testcluster]
keyname = testcluster
cluster_size = 2
cluster_user = sgeadmin
cluster_shell = bash
master_image_id = ami-232b034a
master_instance_type = t1.micro
node_image_id =  ami-232b034a
node_instance_type = t1.micro
plugins = wgetter,rpminstaller,repoinstaller,pkginstaller,cdhmgr

Once this step is successfully executed by StarCluster, you will be able to access the cloudera manager web GUI on port 7180 of your master node’s public IP and/or DNS entry.

<...more to follow...>

Dec 132013
 

I ran into an incredible tool known as StarCluster, which is an open-source project from MIT (http://star.mit.edu/cluster/).

StarCluster is built using Sun Microsystem’s N1 Grid Engine software (Sun used it to do deployment for HPC environments). And the folks at MIT developed on a fork of that (SGE – Sun Grid Engine) and StarCluster was formed.

StarCluster has hooks into the AWS API and is used to dynamically and automatically provision multi-node clusters in Amazon’s EC2 cloud. The base StarCluster software provisions a pre-selected virtual machine image (AMI) onto a pre-defined VM type (e.g.: t1.micro, m1.small, etc). The distributed StarCluster software has a hadoop plugin (http://star.mit.edu/cluster/docs/0.93.3/plugins/hadoop.html) which installs the generic Apache Foundation Hadoop stack on the cluster of nodes (or single node) provisioned thereof. It then uses a tool called dumbo (http://klbostee.github.io/dumbo/) to drive the Hadoop Framework. This is great.

But I want to leverage Cloudera’s CDH manager based Hadoop Distribution (CDH?) and want to roll this into the StarCluster framework. Therefore I have to start developing the necessary plugin to implement Cloudera Hadoop on top of StarCluster.

I won’t delve deeper into StarCluster’s inner-workings in this post, as I just started working on it a couple of days ago. I do intend to see how I can leverage this toolkit to add more functionality into it (like integrating Ganglia-based monitoring, etc) in due time.

Why StarCluster and not a Vagrant, Jenkins, Puppet combination?

Tools like StarCluster are meant to do rapid and effective deployments of distributed computing environments on a gigantic scale (N1 grid engine was used to deploy hundreds of physical servers back in the days when Sun still shone on the IT industry). While Vagrant, Puppet, Jenkins etc can be put together to build such a framework, it is in essence easier to use StarCluster (with less effort) to operationalize a Hadoop Cluster on EC2 (and after I spend some more time on the inner-workings, I hope to be able to use it work on other virtualization technologies too – perhaps integrate with an on-premise Xen or vmware or KVM cluster).

Let’s Skim the surface:

How StarCluster works

Let’s start with how StarCluster works (and this pre-supposes a working knowledge of Amazon’s AWS/EC2 infrastructure).

It is in essence a toolkit that you can install on your workstation/laptop and use it to drive configuration and deployment of your distributed clustering environment on EC2.

To set it up, follow the instructions on MIT’s website. It was pretty straightforward on my Mac Book Pro OS ver 10.9 (Mavericks). I already had the Xcode software installed on my Mac. So I cloned the StarCluster Git Distribution and ran through two commands that needed to be run as listed here — http://star.mit.edu/cluster/docs/latest/installation.html

After that, I set up my preliminary configuration file here (~/.starcluster/config) and we were ready to roll. I chose to test the star cluster functionality without and plugins etc by setting up a single node.

It’s important to choose the right AMI (because not all AMIs can be deployed on VM’s of your choice. For instance, I wanted to use the smallest, t1.micro instance to do my testing, since I won’t really be using this for any actual work. I settled on a CentOS 5.4 AMI from RealStack.

After verifying that my single node cluster was being setup correctly, I proceeded to develop a plugin to do Cloudera Hadoop initial setup along with a Cloudera Manager  installation on the Master node. As a result thereof, we can then roll out a cluster, connect to Cloudera Manager and use the Cloudera Manager interface to configure the Hadoop cluster. If further automation is possible with Cloudera Manager, I will work on that as an addendum to this effort.

The Plugins

Following the instructions here, I started building out the framework for my plugin. I called the foundational plugin centos (since my selected OS is centos). The CDH specific one will be called Cloudera.

Following directory structure is set up after StarCluster is installed –

In your home directory, a .starcluster directory, under which we have:

total 8
drwxr-xr-x   9 dwai  dlahiri   306 Dec 12 18:15 logs
drwxr-xr-x   4 dwai  dlahiri   136 Dec 13 10:01 plugins
-rw-r--r--   1 dwai  dlahiri  2409 Dec 13 10:01 config
drwxr-xr-x+ 65 dwai  dlahiri  2210 Dec 13 10:01 ..
drwxr-xr-x   5 dwai  dlahiri   170 Dec 13 10:01 .

Your new plugins will go into the plugins directory listed above (for now).

This is as simple as this (the plugin in essence is a python script – we call it centos.py):

from starcluster.clustersetup import ClusterSetup
from starcluster.logger import log

global repo_to_install
global pkg_to_install

class WgetPackages(ClusterSetup):
	def __init__(self,pkg_to_wget):
		self.pkg_to_wget = pkg_to_wget
		log.debug('pkg_to_wget = %s' % pkg_to_wget)
	def run(self, nodes, master, user, user_shell, volumes):
		for node in nodes:
			log.info("Wgetting %s on %s" % (self.pkg_to_wget, node.alias))
			node.ssh.execute('wget %s' % self.pkg_to_wget)

class RpmInstaller(ClusterSetup):
	def __init__(self,rpm_to_install):
		self.rpm_to_install = rpm_to_install
		log.debug('rpm_to_install = %s' % rpm_to_install)
	def run(self, nodes, master, user, user_shell, volumes):
		for node in nodes:
			log.info("Installing %s on %s" % (self.rpm_to_install, node.alias))
			node.ssh.execute('yum -y --nogpgcheck localinstall %s' %self.rpm_to_install)

class RepoConfigurator(ClusterSetup):
	def __init__(self,repo_to_install):
		self.repo_to_install  = repo_to_install
		log.debug('repo_to_install = %s' % repo_to_install)
	def run(self, nodes, master, user, user_shell, volumes):
		for node in nodes:
			log.info("Installing %s on %s" % (self.repo_to_install, node.alias))
			node.ssh.execute('rpm --import %s' % self.repo_to_install)

class PackageInstaller(ClusterSetup):
	def __init__(self,pkg_to_install):
		self.pkg_to_install  = pkg_to_install
		log.debug('pkg_to_install = %s' % pkg_to_install)
	def run(self, nodes, master, user, user_shell, volumes):
		for node in nodes:
			log.info("Installing %s on %s" % (self.pkg_to_install, node.alias))
			node.ssh.execute('yum -y install %s' % self.pkg_to_install)

Now we can reference the plugin in our configuration file:

[cluster testcluster]
keyname = testcluster
cluster_size = 1
cluster_user = sgeadmin
cluster_shell = bash
node_image_id = ami-0db22764
node_instance_type = t1.micro
plugins = wgetter,rpminstaller,repoinstaller,pkginstaller,sge
permissions = zookeeper.1, zookeeper.2, accumulo.monitor, hdfs, jobtracker, tablet_server, master_server, accumulo_logger, accumulo_tracer, datanode_data, datanode_metadata, tasktrackers, namenode_http_monitor, datanode_http_monitor, accumulo, accumulo_http_monitor

[plugin wgetter]
setup_class = centos.WgetPackages
pkg_to_wget = "http://archive.cloudera.com/cdh4/one-click-install/redhat/5/x86_64/cloudera-cdh-4-0.x86_64.rpm"

[plugin rpminstaller]
setup_class = centos.RpmInstaller
rpm_to_install = cloudera-cdh-4-0.x86_64.rpm

[plugin repoinstaller]
setup_class = centos.RepoConfigurator
repo_to_install = "http://archive.cloudera.com/cdh4/redhat/5/x86_64/cdh/RPM-GPG-KEY-cloudera"

[plugin pkginstaller]
setup_class = centos.PackageInstaller
pkg_to_install = hadoop-0.20-mapreduce-jobtracker

This is not complete by any means – I intend to post the complete plugin + framework in a series of subsequent posts.