Your verification ID is: guDlT7MCuIOFFHSbB3jPFN5QLaQ Big Computing: Big Data Analysis can require more than just Hadoop

Monday, August 15, 2011

Big Data Analysis can require more than just Hadoop

Bryan Lewis gave a talk last week at the New York CTO Breakfast on Big Data Analytics with a focus on Algorithms, Storage and Communication. I have attached the handout of that talk here for your enjoyment.

Bryan W. Lewis

You’ve got (a lot of) data. What would you like to do with it? Summarize it? Make predictions using it? Can your desired analysis approach work on partitioned data? If not, then what? Is latency important? Are data updated slowly or quickly? What about pragmatic issues like hardware failure?

There has never been a better time to ask such questions than today. I am awed by the capability and variety of possible solutions.

Big data analysis traditionally involved big, vertical software solutions. That is changing. We will discuss some new ideas for analysis of large data, highlighting ideas that are less well-known than they should be. Our discussion is organized across three broad topics: Algorithms, Storage, and Communication.


Consider the well-studied business problem of constructing recommendation systems. The data generally consist of customer attributes, customer purchase histories, customer/product ratings, and other product attributes. They can often amount to many billions of data points, often arranged as very sparse numeric arrays (not every customer buys every item, for example).

The business problem is to use those data to recommend products that customers are likely to like and buy.

Unsupervised machine learning (ML) methods are popular for building recommendation systems, and ML is a very hot topic these days.  Clustering methods are unsupervised ML methods that group customer preferences by identifying clusters of nearby points in plots of the data in appropriate scales. You may have heard of the widely used k-means or k-nearest neighbor clustering methods.

Clustering algorithms tend to exhibit greatly increased computational cost with increasing data size. That can become a serious issue for some product recommendation problems. Even problems with relatively modest storage requirements by today's standards can result in prohibitively expensive computation.

Clustering algorithms can also get confused by some data, reducing the quality of their solutions (recommendations, in this case). Researchers are interested in improving the quality of recommendations without over-specializing models to specific data sets.

Projection methods represent another class of unspervised ML algorithms. They typically reduce or compress the data by eliminating redundancy. The singular value decomposition (SVD) is one of the most important of these methods.

Alex Lin[1] has recently given a number of nice talks for the NYC Predictive Analytics group advocating combinations of SVD and clustering for recommendation systems. The general idea is to compress the data with the SVD and then cluster it. The clustering algorithms require less computational effort on the reduced-sized problems and, as Alex has shown, can exhibit better quality recommendations because the SVD has "cleansed" the data of confusing redundancies.

(Combinations of ML methods are often quite successful, and have been essential to the solution of very challenging problems like the Netflix prize.)

But the SVD is computationally expensive too (although not as much as traditional clustering). Fortunately there are some relatively recent state of the art methods for the efficient computation of the SVD for these kinds of problems. My favorite is the IRLB algorithm by Jim Baglama[2]. The method is presently available in MATLAB and R. It works well with sparse and dense data, it is straightforward to parallelize, and it effectively exploits high-performance linear algebra kernels of modern CPUs and GPUs. It's pretty amazing--I was able to compute a basic partial SVD of the Netflix prize data set in a few minutes on a laptop[3] with it.

Why do we care about a choice of algorithm like IRLB? Let's compare algorithm costs using the Netflix Prize data:

Memory Cost
Computation Cost
Naive SVD
64 GB
5 TFlop
< 1 GB
< 5 GFlop

Of course, we shouldn't (couldn't in some cases) use a naive SVD approach to solve this. IRLBA compares favorably (often about order of magnitude better) to other methods too, such as the one used in Apache Mahout ML library[4] associated with the Hadoop project.

A different set of problematic clustering problems are those that operate on unbounded sets of data. The Java-based Massive Online Analysis[5] project, related to the Weka data mining project, might be more appropriate in such cases. It implements a number of state of the art updating clustering algorithms. The idea is to compute clusters on a subset of data, storing a reduced set of statistics about the clusters, and then update the stats as new data arrives. 

Functions of Graphs

Let's think briefly about a different interesting example: evaluating functions of graphs. Such problems crop up, for example, in social networks, protein folding, communications, and epidemiology.  Some of the basic quantities we might compute on graphs include the number of other nodes connected to each node, how long the network path is between each pair of nodes, measures of graph centrality, and many others. As anyone working at LinkedIn will attest, one can spend a considerable amount of time computing these quantities for large graphs.

Michele Benzi and Paula Boito[6] have recently come up with new methods to very efficiently estimate some of these basic quantities. They've taken some ideas related to the same ideas used for efficient computation of the SVD, but in a different context, and applied them in a clever way to graph problems. The results are remarkable--they estimate quantities within known bounds with several orders of magnitude lower computational effort than other more typically used algorithms.  Their methods are easily parallelized and work well on sparse data.  I expect that these results will be rapidly adopted and expanded to the computation of more complicated functions of graphs.


The storage of data in ways that promote efficient data analysis is an incredibly active and rapidly developing topic. It covers a lot of ground: from traditional data warehousing and relational databases to new distributed file and NoSQL systems. Let’s start with an example application.

Clinical Looking Glass[7] from Montefiore Medical Center (CLG) is a web-based health care outcomes analysis and decision support application. It provides a wide variety of actuarial and predictive models that work on large scale data (up to millions of patient records over many years). Data resides in traditional relational database systems and can be sliced and diced by queries to support longitudinal or cross-sectional analyses. CLG presents a simple, graphical interface to users web browsers. Concrete examples of the use of CLG include retrospectively reproducing published trials on large real world populations and computing survival models on large populations.

The business challenges for Montefiore include adapting state of the art methodologies to the web-based application framework in a way that can scale easily to handle large problems. Users expect reasonably fast response times, which is a critical design consideration. The system must support the best algorithms implemented in a variety of languages. The analysis functions must be connected to results of the complex queries against the DBMS generated by the web application. Many methods are computationally intensive and require parallel computing implementations to satisfy the fast response design requirement. Intermediate results of computations must occasionally be shared among parallel workers in pipelined computations.  We desire stateful interaction between the web client and analysis engines to allow for user-steered interactive data exploration.

Using a big central DBMS alone was not easily horizontally scalable. We implemented a hybrid solution that supplements existing relational data storage with in-memory storage provided by the Redis NoSQL database, one of my favorites.

Redis is one of a wave of popular, new, NoSQL databases that include Voldemort, Riak, SimpleDB, Cassandra, HBase, Memcached, CouchDB, LevelDB, MongoDB, and many others. Redis a networked key value store that supports really interesting structured values. Stored values can be sets and queues. Redis also provides a publish/subscribe model, uniquely placing it in a convergence of databases and messaging. We chose to use Redis in CLG for its flexibility: we use it both as an in-memory networked data cache and to coordinate parallel computation through work queues. Redis is also simple to use and configure, and works well across OS platforms (both important to us).

Our hybrid DBMS/Redis implementation decouples parallel worker functions from the main DBMS system. The parallel computation is organized in queues, also by Redis. I believe that queues are the best way to organize parallel computing tasks. It’s an old, but good idea: Anonymous worker functions block for work from a generic queue. Master functions place tasks in the generic queue, which is then withdrawn by workers on a first come, first served basis. The master then looks for results in a unique result queue. Workers place results in the appropriate result queue and block for more work, and so on.

This simple approach to organizing parallel computation has a few important features. It’s naturally load balanced and it is elastic by design since workers may be added at any time. Redis additionally makes it trivial to set up fault tolerance so that failed tasks get re-queued. Thus, workers may also be removed at any time. Cross-platform online examples (soon including ready to use EC2 AMIs) may be found here: Compare the simplicity of this approach with the relative complexity of Zookeeper[8], for example.

This implementation has helped improve CLG performance by more than 10x for some data- and computationally intensive algorithms. And, the parallel back-end analysis functions can be run across clusters not directly connected to the main data warehouse.

Clouds of Opportunity

Quantitative financial methods sometimes involve computationally and data-intensive back-testing of models over historic data sets. Back-testing is also an embarrassingly parallel (easy to implement in parallel) problem. We worked with a quant desk at a mid-town hedge fund to help them implement a parallel back-testing strategy to run on a Linux cluster in their data center. Using the ideas described above, they were able to add the significant, and otherwise unused, horsepower from their desktop PCs overnight by signing them up to work queues when they went home. With fault-tolerant job resubmission, the quants simply terminated any running jobs in the morning. In the old days, we called this “resource scavenging.” Redis makes this kind of thing really trivial to implement.


Mike Kane[9] is working on models for anticipation of trading up/down limits (circuit breakers) under the new SEC rules for financial markets. The models are trained on huge volumes of historic daily stock trade and quote data using Amazon EC2, R and Redis[10]. Once developed, the idea is to apply the model to real time trade and quote data to anticipate an up/down limit and possible associated trading opportunities. Most data processing and parallel computing models can’t respond fast enough to handle trade and quote data in real time. The United States consolidated market exchanges presently average over 200,000 quotes and 28,000 trades per second, with peaks of almost 50,000 trades per second[11]. The latency required to keep up with that stream of market data is on the order of 3ms.

Complex event processing (CEP) is a database turned inside out. Traditional databases run queries against stored data. CEP engines stream data through stored queries. CEP queries can filter and aggregate results in ways similar to traditional database operations. And they are designed to do this quickly with extremely low latency. Mike is working with Sybase/Aleri to adapt sophisticated statistical models from R into the Aleri CEP engine. A single Aleri CEP engine can processes 300,000 events/second with 1.5ms latency[12] on garden variety Intel CPUs.

If we want to embed analytics from languages like R in a CEP engine like Aleri, we only have a few approaches that can keep up with high data rates. One commonly used approach is shared memory--that is linking via compiled code and/or using POSIX or similar shared memory resources across programs. This is the approach that Mike and I took to integrate R with the Esper CEP engine, for example[13].

The shared memory approach is viable, but potentially difficult to implement. It lacks the conceptual appeal and simplicity of queueing approaches described above. One must beware of thread saftey issues, deadlocks, and other potential problems.

An alternate approach uses the 0MQ (Zero-MQ) library. 0MQ is a conceptual extension of sockets. Sockets on steroids. In general, it satisfies what we always wished sockets could do, in a way that is quite simple to use. Endpoints can come and go dynamically, multiple endpoints may be assigned to one “socket,” messages are generally content-oriented (instead of byte-oriented), “sockets” are uniformly defined over multiple fabrics (IPC, networks, etc), pub/sub, queues, and peer to peer communication, and on and on. 0MQ grew out of, but eventually separated totally from, AMQP developed by J. P. Morgan.

0MQ is serverless--every node is a peer. That makes it naturally horizontally scalable. It’s uniquely positioned (among messaging libraries) as a communication framework for parallel/distributed computation. It provides low latency (as low as microseconds depending on network fabric), high throughput (as many as 10s of millions messages/sec) communication, while also not stressing CPU as much as as traditional TCP/IP and some other protocols. Idiomatic 0MQ libraries are available for many languages (I am working on an R implementation presently).

Websockets and Summary

The landscape for analysis of large data today is very diverse. Many more languages and technologies are likely to be involved in your project today than just a few years ago.

Consider web-based applications. We have integrated R and other analysis languages in big vertical web services stacks like IIS/.NET, but integration can be awkward: one must marshal C# data to unmanaged C++ and then to R for analysis and then reverse the whole process to return results. The stateless/RESTful design of many web applications, often advantageous, in our case precludes stateful client/back-end interaction.

The new Websocket API provides a beautiful solution that can directly couple Javascript clients to back end analytic services in a very interactive way. JSON is the natural serialization format between client and analysis service for small data. This lightweight coupling of analytics service and web client totally avoids web services middleware, in our case making everything much simpler.

I am not alone in my enthusiasm for this approach. Kaazing[14] (the original developer of the Websocket API) and Push Technology[15] are two commercial enterprises behind websockets, and their kit is on fire today in financial, gaming, and other markets.

We have discussed the emerging combination of good algorithms in diverse languages, backed by NoSQL and other innovative data storage methods, and organized with queues and modern messaging methods, including Websockets for direct client interaction. It’s a great way to do large data analysis.


  1. This was a very interesting read. These links/footnotes you have provided are quick useful. Thank you.

  2. Did you know that you can earn cash by locking selected areas of your blog or site?
    All you need to do is join AdWorkMedia and add their content locking plug-in.

  3. eToro is the best forex broker for novice and pro traders.