Large Scale Data Analysis

Matt Walker

Subscribe to Matt Walker: eMailAlertsEmail Alerts
Get Matt Walker: homepageHomepage mobileMobile rssRSS facebookFacebook twitterTwitter linkedinLinkedIn

Related Topics: Apache Web Server Journal, Java Developer Magazine, Big Data on Ulitzer

Apache Web Server: Article

Four Paths to Java Parallelism

Finding their 'sweet spots'

While some problems are clearly and naturally expressible as dataflow, others can be more difficult to adapt. Particularly difficult to deal with are operators that need all of the input before producing any output, with sorting and aggregating being prime examples of this. Not only does this disrupt any pipeline parallelism occurring in dependent operators, it can impact scalability if all the data ends up held in memory. These problems can usually be solved by clever thinking, taking advantage of additional knowledge of the graph (aggregating sorted data is easier) or dynamically extending the graph (to help manage memory usage). Fortunately, the pre-packaged library in Pervasive DataRush provides sorting and aggregating operators so you don't have to worry about this. Despite any required change in mindset, dataflow has proven to be a successful approach for quickly developing efficient, parallelized solutions for the analysis of large data sets (Falgout, 2008).

Terracotta: JVM Clustering
So far, we've constrained ourselves to running on a single multicore machine. However, we live in a world of inexpensive hardware and networking. If we're willing to accept more administrative complexity, we can make an attempt to gain horizontal scaling through a distributed solution. As your problems get larger, you can just throw more machines at it.

Terracotta is an open source solution for doing just that. It allows multiple JVMs, potentially on different machines, to cluster and behave as a single JVM. Not only does this provide more processing power, it also provides more memory. The best part - this is all transparent to the programmer. You can make a multi-threaded program into a clustered program without rewriting any code. Just specify which objects need to be shared across the cluster and things like change propagation and distributed locking are handled for you.

Unlike the other solutions we discussed, Terracotta does not provide (by itself) any abstractions that hide concurrency. You still have to worry about threads and locking when writing code. Plus, since it's easy to share objects, it's also easy to naively introduce cluster-wide hotspots that kill performance. But if you follow a simple pattern - the Master/Worker pattern - most of these issues can be avoided. Making it even easier, the Terracotta Framework Library already provides an implementation of this pattern.

The Master/Worker pattern uses relatively familiar concepts from multithreading. A master thread breaks a task into smaller units of work, which are then placed on a work queue. The worker threads consume the contents of the work queue, performing these subtasks, and return the results to the master. All you need to do is place workers on each JVM of the cluster and use Terracotta to share the work queue, and you now have a horizontally scalable solution, as illustrated in Figure 3.

Hadoop: Distributed MapReduce
The Terracotta approach to distributing a parallel application comes at the cost of writing your application using threads. Just as our programmer on a single SMP machine wasn't content with threads and the utilities of java.util.concurrent, and so traded up for the higher-level tools in the fork/join framework and Pervasive DataRush, a Terracotta programmer might look to MapReduce to harness a network of cheap, dedicated machines by programming at a higher level of abstraction.

The MapReduce interface is simple: provide an implementation of the map and reduce functions. Map takes a key and a value and produces a list of (key, value) pairs, potentially of a different type (Map :: (K,V) -> [(K', V')]). Reduce then takes a list of values all corresponding to the same key and produces a final list of values (Reduce :: (K', [V']) -> [V']). Behind the scenes, the framework spreads your data over multiple machines and orchestrates the distributed computation using the map and reduce you provided (Dean & Ghemawat, 2004).

Despite its simplicity, MapReduce is actually applicable to a wide range of problems. It was designed to index a copy of the Internet, building up both a model of its link structure and the word content of its pages. Large scale distributed sorts and searches obviously lend themselves to MapReduce, but recent research shows a broad class of machine-learning algorithms can also be recast to fit the model (Chu, et al., 2007).

One challenge is that Google keeps MapReduce behind closed doors. Fortunately, Apache provides Hadoop, an open source implementation of MapReduce built on top of the Hadoop Distributed File System (HDFS). Hadoop relies upon HDFS for much of its fault tolerance, particularly through replication. As we delve into the details of how Hadoop works, refer to Listing 5 for yet another implementation of grep.

Before you begin a Hadoop job, you must store its input in HDFS. At the start of the map phase, Hadoop logically partitions the data and allocates one map task, called a Mapper, per partition (there may be hundreds of these on a single machine). The map task invokes the user-defined map function once per (key, value) pair in its local portion of the data set. The output of the map task is sorted by key, then partitioned per reduce task and saved to disk.

In the reduce phase, each reduce task, called a Reducer, starts by fetching the sorted result files assigned to its partition during the map phase and merging them. The reduce task invokes the user-defined reduce function on each incoming key group, writing the results unsorted to disk. The number of reduce tasks is exposed as a tunable to the programmer; increasing the number increases overhead, but also better balances the load across the cluster (Hadoop, 2008).

What It All Means
In the end, it all boils down to standard good engineering practice: choose the right tool for the job, subject to the constraints of your problem. The tools we've discussed provide you with higher-level abstractions than threads, allowing suitable problems to be solved in far more intuitive, robust ways. They are effectively non-competing, each occupying its own niche of the problem space. And it's quite possible that a problem will require a combination approach. Both fork/join and Pervasive DataRush can be used to implement Hadoop tasks. It's also possible, in theory and with slight modification, to use both with Terracotta.

As you begin to parallelize your applications, bear in mind the requirements of each of the tools we've discussed. Fork/join requires you to specify your implementation using divide-and-conquer. The problem must fit into memory on a single, multicore machine. Pervasive DataRush requires the same hardware, but handles scaling your problem out of memory and onto disk more gracefully. This adds the requirement of an ample local disk. Rather than the divide-and-conquer of fork/join, Pervasive DataRush requires you to recast your implementation in dataflow.

If your application is already threaded, Terracotta doesn't require any rewriting. However, programmers must take on the burden of designing, implementing, and troubleshooting concurrent code. Terracotta really starts to shine when you give it multiple machines and the behavior of your code partitions readily across them. Hadoop requires the same or more machines and the infrastructure of a distributed file system, but lets you program to a much higher-level MapReduce interface. It also gives you fault tolerance, which is vital as you scale up the number of machines.

Your job as a software engineer is to distill the fundamental nature of your application and choose the tool whose "sweet spot" most closely aligns with it. The high-level overview we've provided here will give you a start in your research. You can find further help in the references below.


  • Chu, C.-T., Kim, S. K., Lin, Y.-A., Yu, Y., Bradski, G., Ng, A. Y., et al. (2007). Map-Reduce for Machine Learning on Multicore. NIPS (pp. 281 - 288). Cambridge: MIT Press.
  • Dean, J., & Ghemawat, S. (2004). MapReduce: Simplified Data Processing on Large Clusters. OSDI'04: Sixth Symposium on Operating System Design and Implementation. San Francisco.
  • Falgout, J. (2008, March 30). Crunching Big Data with Java. Retrieved from JDJ.
  • Goetz, B. (2007, November 13). Java theory and practice: Stick a fork in it, Part 1. Retrieved August 7, 2008, from IBM developerWorks.
  • Goetz, B. (2008, March 4). Java theory and practice: Stick a fork in it, Part 2. Retrieved August 7, 2008, from IBM developerWorks.
  • Hadoop. (2008, July 7). Hadoop Map-Reduce Tutorial. Retrieved August 8, 2008, from Hadoop.
  • Hadoop. (2007, February 13). Project Description. Retrieved August 8, 2008, from Hadoop Wiki.
  • Lea, D. (2000). A Java Fork/Join Framework. Java Grande, (pp. 36-43).
  • Lea, D. (n.d.). JSR 166y API. Retrieved August 7, 2008, from JSR 166y API.
  • Pervasive Software. (n.d.). Retrieved August 7, 2008, from Pervasive DataRush.
  • Sun. (2006). java.uti.concurrent Package API. Retrieved August 7, 2008, from Java 6 API.
  • Terracotta. (n.d.). Retrieved August 7, 2008, from Terracotta.
  • Terracotta. (2008, June 30). Master Worker. Retrieved August 7, 2008, from Terracotta Forge Labs.

More Stories By Matt Walker

Matt Walker is a scientist at Etsy, where he is building out their data analytics platform and researching techniques for search and advertising. Previously, he worked as a researcher at Adtuitive and as an engineer at Pervasive Software. He holds an MS in computer science from UT and received his BS in electrical and computer engineering from Rice University.

More Stories By Kevin Irwin

Kevin Irwin is a senior engineer at Pervasive Software, working on performance and concurrency within the Pervasive DataRush dataflow engine. With 15 years of industry experience, he previously worked at companies including Sun and IBM, developing high-performance, scalable enterprise software. Kevin holds an MCS and a BA in mathematics and computer science from Rice University.

Comments (1) View Comments

Share your thoughts on this story.

Add your comment
You must be signed in to add a comment. Sign-in | Register

In accordance with our Comment Policy, we encourage comments that are on topic, relevant and to-the-point. We will remove comments that include profanity, personal attacks, racial slurs, threats of violence, or other inappropriate material that violates our Terms and Conditions, and will block users who make repeated violations. We ask all readers to expect diversity of opinion and to treat one another with dignity and respect.

Most Recent Comments
oletizi 12/09/08 11:58:12 AM EST

While it’s true that familiarity with concurrent programming principles is needed to make full use of all of Terracotta’s developer-facing features, the extensive library of Terracotta Integration Modules (TIMs) for use with third-party technologies allows many people to make use of Terracotta *without* needing to know anything about concurrent programming.

This can be seen to great effect in the high-scale reference web application we built to show how Terracotta is used in a real-world scenario. When you look at the code to examinator, you’ll find very little concurrency-aware code. All of the concurrency is handled inside the various TIMs used by the application (e.g., Spring Webflow, Spring MVC, Spring Security, …).

You can see a full list of available TIMs here:

Like Terracotta itself, all of these TIMs are open source and free for use in production.