Large Scale Data Analysis

Matt Walker

Subscribe to Matt Walker: eMailAlertsEmail Alerts
Get Matt Walker: homepageHomepage mobileMobile rssRSS facebookFacebook twitterTwitter linkedinLinkedIn

Related Topics: Apache Web Server Journal, Java Developer Magazine, Big Data on Ulitzer

Apache Web Server: Article

Four Paths to Java Parallelism

Finding their 'sweet spots'

Fork/Join: Divide-and-Conquer
If your problem fits into the heap of a single JVM on an SMP machine and can be decomposed using divide-and-conquer, then help is coming in Java 7's fork/join framework. How do you know if your problem fits the divide-and-conquer mold? Such an algorithm solves a problem by splitting it into smaller subproblems, solving them, then merging the results to form the solution to the original problem. The beauty of the paradigm is that the solutions to subproblems can be computed independently of one another, allowing the computer to work on them simultaneously. In Figure 1, you can see one way to divide-and-conquer grepping for a pattern in a file with size lines.

If the computation is large enough, the original problem may be recursively split into smaller and smaller pieces until solving a given subproblem sequentially is less expensive than further subdividing it and solving the components in parallel. You can imagine for expensive computations on large data, a divide-and-conquer implementation might generate a large number of subproblems, far outstripping the parallelism of the underlying hardware. Luckily, the fork/join execution model is designed to efficiently handle exactly this situation (Goetz, 2007).

The heart of fork/join is the ForkJoinTask, a concurrent construct much lighter than a thread. Listing 1 shows the code to implement the grep computation as a RecursiveAction, which is a ForkJoinTask designed explicitly for divide-and-conquer problems. Because tasks cost very little, you're free to create a large number of them, assigning each a fine-grained unit of work (you're responsible for determining the exact amount). Multiple tasks will be executed by each ForkJoinWorkerThread in the ForkJoinPool, the size of which you often tune commensurate to the structure of the underlying hardware.

The fork/join execution model hinges on largely implicit restrictions on tasks. First, tasks cannot perform blocking synchronization. If every task currently being executed by the thread pool were to block simultaneously, your application would deadlock: a very real danger on a machine with only a handful of cores. Second, tasks cannot depend on one another, but should only be coordinated using fork and join. Inter-task communication can lead to performance degradation or deadlock, as in the first case. Finally, it is not advisable for fork/join tasks to perform I/O and there are certainly no tools to support this use case. These restrictions would be impractical to enforce rigidly, but the API intimates them by refusing to allow you to throw checked exceptions (Lea, JSR 166y API).

That's not all the fork/join framework has to offer. Should your problem involve sorting data by one or more key fields, searching through the data to find all or some records meeting your criteria, or summarizing the data by computing aggregations like an average, you may be able to code at a significantly higher level using ParallelArray. Compare the brevity of the ParallelArray implementation of grep in Listing 2 with our previous implementation. ParallelArray is built on top of the task model discussed earlier and designed for a narrower class of problem. For sort, search, and summarization on in-memory arrays, it provides a declarative, SQL-like interface to manipulating your data, somewhat analogous to a poor man's LINQ (Goetz, 2008).

The fork/join package is replete with operation interfaces designed to be passed to the various methods of ParallelArray (Lea, JSR 166y API). By defining a predicate operation, you can filter your data to find the subset relevant to your computation (see withFilter). With a procedure operation, you can perform arbitrary actions for each record (see apply). The various typed mapping operations allow you to transform your data (see withMapping). Replacements can be performed using the built-in sort and cumulate methods, or the more general replaceWith* family of methods. Likewise, the general purpose reduce method can be used directly with a reducer operation, or summary functions like min and max. You can even produce a combination of these aggregations using the summary method (Goetz, 2008), (Lea, JSR 166y API).

The utilities of the upcoming Java 7 fork/join package provide powerful and succinct mechanisms for fine-grained parallelism. They are tuned to in-memory problems, explicitly in the case of ParallelArray and implicitly in that of ForkJoinTask, as discussed earlier. What they don't provide is facilities to help you when your problem starts to scale out of the heap. What can you do when you find yourself in this situation?

Pervasive DataRush: Dataflow Programming
When processing large sets of data, you'll notice it often follows a series of steps: read the data, perform A on it, and then perform both B and C to that result, and so on. You can express this processing as a graph of operations, connected by the flow of data. This is the essence of dataflow programming - execution as the streaming of data through a graph. As the data is streaming, only data required by any active operation need be in memory at any given time, allowing very large data sets to be processed.

Besides offering the potential for scaling to problems larger than the heap would otherwise permit, dataflow graphs are a useful model for parallelism because they can express and therefore exploit multiple forms of parallelism. By its very nature, a dataflow graph exhibits pipeline parallelism. If each operator generates output incrementally, dependent operators can execute simultaneously, just a few steps behind. Also, if the results of an operator are independent for each piece of data, the operator can be replaced with multiple copies, each receiving a portion of the original input. This is called horizontal partitioning. Finally, the output of an operator might undergo multiple sets of processing and later be merged (this is most prevalent with record data) as input to another operator. The different branches can execute in parallel; this is vertical parallelism.

Pervasive DataRush is a library and dataflow engine, allowing you to construct and execute dataflow graphs in Java. You develop new operators simply by extending a base class and implementing a few methods. Refer to Listing 3 to see how an operator might look. All threading and synchronization is handled by the framework as data is only shared through inputs and outputs. You can focus on just the processing logic of the operator. A library of common operators is already implemented as part of Pervasive DataRush, in addition to the dataflow engine.

It is just as straightforward to assemble a dataflow graph in Pervasive DataRush. You compose them by adding operators. Operators require their input sources in order to be constructed, so the wiring of outputs to inputs is done as you build the graph. Once you are finished composing, just invoke the run method and the graph begins execution. Because this is all done in Java, composition can be done conditionally based on pre-execution processing. The most common example would be making adjustments to the graph based on the number of available processors. Or you can extend this idea even further - since operators are also written in Java, they can compose graphs of their own, extending a currently executing graph. Figure 2 shows a sample dataflow graph and Listing 4 shows the Pervasive DataRush code used to construct it. In addition, it is possible to construct a graph that is used like an operator (in fact, the majority of the operator library is done this way); construction is done similarly to the above.

More Stories By Matt Walker

Matt Walker is a scientist at Etsy, where he is building out their data analytics platform and researching techniques for search and advertising. Previously, he worked as a researcher at Adtuitive and as an engineer at Pervasive Software. He holds an MS in computer science from UT and received his BS in electrical and computer engineering from Rice University.

More Stories By Kevin Irwin

Kevin Irwin is a senior engineer at Pervasive Software, working on performance and concurrency within the Pervasive DataRush dataflow engine. With 15 years of industry experience, he previously worked at companies including Sun and IBM, developing high-performance, scalable enterprise software. Kevin holds an MCS and a BA in mathematics and computer science from Rice University.

Comments (1) View Comments

Share your thoughts on this story.

Add your comment
You must be signed in to add a comment. Sign-in | Register

In accordance with our Comment Policy, we encourage comments that are on topic, relevant and to-the-point. We will remove comments that include profanity, personal attacks, racial slurs, threats of violence, or other inappropriate material that violates our Terms and Conditions, and will block users who make repeated violations. We ask all readers to expect diversity of opinion and to treat one another with dignity and respect.

Most Recent Comments
oletizi 12/09/08 11:58:12 AM EST

While it’s true that familiarity with concurrent programming principles is needed to make full use of all of Terracotta’s developer-facing features, the extensive library of Terracotta Integration Modules (TIMs) for use with third-party technologies allows many people to make use of Terracotta *without* needing to know anything about concurrent programming.

This can be seen to great effect in the high-scale reference web application we built to show how Terracotta is used in a real-world scenario. When you look at the code to examinator, you’ll find very little concurrency-aware code. All of the concurrency is handled inside the various TIMs used by the application (e.g., Spring Webflow, Spring MVC, Spring Security, …).

You can see a full list of available TIMs here:

Like Terracotta itself, all of these TIMs are open source and free for use in production.