This is an extended description of Buckybase for Netidee 2007.
Abstract
Clusters of cheap commodity servers connected by fast networks are emerging as the standard infrastructure for large web applications, providing linearly scalable storage and parallel computing power, by distributing data and computations across the cluster.
However, managing a cluster, configuring or programming the storage software, and ensuring that the data is stored safely and in a balanced fashion across the nodes (even when nodes fail or are added) is not trivial, and requires skilled operators.
Furthermore, many web applications need to provide interesting views, reports, and aggregations of the data. These views should be kept up-to-date with fresh data as fast as possible. Common analysis and query models, which are designed for single-machine use, can no longer be applied when the data is split across many nodes. This means that data processing needs to be performed with custom programs, each of which needs to be written specially to gather the data from many machines and deal with network programming and events, which places a heavy burden on developers.
To address these problems, and exploit the opportunities of parallel computation on clusters, we have designed a combined storage and analysis system, Buckybase. It is inspired by recent advances in storage systems (such as consistent hashing, Google's GFS and Bigtable, and Amazon's S3) and analysis systems (Google's MapReduce and Sawzall, continuous query systems, and sensor networks).
Buckybase's storage subsystem provides an interface for N-way replicated storage of data objects and their metadata in a hierarchical namespace, plus for creating arbitrary indexes. Buckybase's analysis subsystem provides an interface for aggregations that are defined in a MapReduce-like model, which breaks down processing into small, associative and commutative building blocks, enabling massive parallelism and fault tolerance without programmer contribution.
Buckybase is autonomic, which means that it is self-configuring, self-healing, and self-optimizing, requiring virtually zero operator intervention, while providing high availability and load balancing. Buckybase's analysis subsystem is continuous, which means that views and reports are updated immediately when input data changes.
Buckybase offers low-maintenance, scalable storage and comfortable analysis functions to large web and information applications. Applications can focus on the data models and views they want to provide, while leaving the heavy lifting of replicated, load-balanced storage and parallel, fault-tolerant analysis to Buckybase.
Technical Description
Overview
Data Center Environment
Buckybase is designed for a data center environment, and should be able to scale to hundreds of nodes. While node additions and failures are expected to happen regularly, the environment is expected to not be as turbulent as, say, an internet-wide P2P environment. We assume that the nodes have ample RAM and disk storage, and are connected by a high-bandwidth, low-latency network.
Programming Languages
Buckybase will be developed in Java, because it is reasonably efficient, offers a large number of libraries, provides platform indepedence, and is known by most programmers. In addition, customization and scripting of the system is expected to be done in a "dynamic" language, such as JavaScript.
API
Most clients will access Buckybase via a REST-based HTTP 1.1 interface.
Installation and Configuration
Installation of a Buckybase cluster requires only copying the Buckybase .jar files to the nodes and starting them. From then on, the system is auto-configuring.
General Infrastructure
Heartbeat and Nodelist Master
The nodes in the cluster need to know each other. UDP broadcasts from all nodes could be used, but would lead to network overload in large clusters.
For this purpose, the machines elect a single machine as the temporary nodelist master: the nodelist master announces its existence via UDP broadcast to all nodes, which then send heartbeat messages to the master (via normal UDP unicast). The master provides the "canonical" nodelist to all machines. When a node no longer sends heartbeats, the master will remove it from the nodelist.
The master designation is only temporary: if the master node fails, another node will become the master, which means that the master is not a single point of failure of the system.
Time Synchronization
To prevent problems caused by non-monotonic time, all machines in the cluster, as well as client (web application) machines, should be synchronized, for example via the NTP protocol.
Storage Subsystem
Summary
Buckybase offers clients an interface for getting, putting, and deleting data objects in a hierarchical namespace. Directories are distributed across the nodes, i.e., all objects in a directory are stored on the same node. (In order to balance data in a more granular fashion, applications can split directories into smaller partitions.) Assignment of directories to nodes is done with a hashing function that minimizes the data movement (which is necessary for balancing the data) when the cluster topology changes.
Data is automatically replicated to multiple, usually three, nodes. Buckybase offers a relaxed consistency model, which guarantees that updates will eventually be distributed to the correct nodes (unless all nodes to which the data was pushed fail, which is highly unlikely). Object updates are performed atomically, but there are no multi-object transactions. As an optimization, an application can tell the system to place related directories on the same machine.
Buckybase should scale to millions of directories, each containing tens of thousands of objects.
Objects
The system is primarily designed for the storage of small (around 100KB) data objects, such as weblog posts, comments, or database entries.
Objects consist of a name, metadata, and a body. The name is a byte-string, usually UTF-8 encoded, and is unique in the directory in which the object is stored. The metadata is a list of key-value pairs, that is used to store information such as the length and MIME content-type of the object's body. The body of the object is an uninterpreted byte-sequence.
Namespace
Objects are grouped together into directories. If the user adds an object to the system at /dir1/dir2/name, /dir1/dir2 is the directory of the object, while name is its name.
The system offers a pseudo-hierarchical namespace, similar to Amazon's S3 store: the namespace is flat in that directories need not be created before objects can be placed in them, but still the system allows for the browsing of the namespace hierarchy. For example, if the user places two objects at /foo/bar/object-1, and /foo/quux/object-2, bar and quux will be visible as subdirectories of /foo.
Directory Placement
Directories are evenly distributed over the nodes in the cluster. All objects in a directory are stored on the same node, called the directory's primary node. The assignment from directory names to primary nodes is done with a hash function, called the placement function. To minimize the necessary data movement when nodes are added or removed from the cluster, we use consistent hashing, developed for the Akamai content delivery network. (An ordinary hash function would reshuffle all directories every time the cluster topology changes, resulting in massive data movement.)
Placing directories via a function, instead of a metadata table, means that the system doesn't require a central authority overseeing directory placement.
Replication
To ensure high availability and data safety, every directory and all its objects are not only stored on a single primary node, but usually on three or more nodes. These secondary and tertiary nodes are also chosen with consistent hashing.
Data Movement and Hand-Off
When the cluster topology changes, some directories need to move to different machines, to maintain a balanced distribution of data. For example when a node is added, it will take over some directories that were previously stored on other nodes. Likewise, when a node fails, the directories that were stored on it, will be copied from the secondary and tertiary nodes to the new primary nodes of the data.
After a topology change, objects are not immediately available at
the new primary node, because they are still being copied. In this
case, the client application needs to contact either the old primary
node, or the secondary or tertiary node. (Alternatively, this lookup
could also be performed by the new primary node.)
To guarantee that no data is lost, a node never drops data, even if it is no longer responsible for it (per the placement hash function), but keeps it until it is sure that the data is suitably replicated on the correct nodes. Only then will it hand off the data and remove it from its own storage.
Data Access
To read an object, a client application needs to determine the primary node of the data using the nodelist and placement function, and send a request for the object to that node.
To update an object, the client application first needs to determine the primary, secondary, and tertiary nodes for the directory using the nodelist. Then it sends the object's data to the three nodes. (Alternatively, the application could send the update request to the primary, and let the primary send the data to the other nodes.)
Object updates are performed atomically: either the object is completely updated or not, but there are no partial updates of objects. However, there are no multi-object transactions.
Consistency
The model is eventual consistency: an update request may only reach some of the three nodes, not all of them. To nevertheless guarantee that all objects eventually arrive at all three nodes, these nodes perform periodic synchronization. The rsync algorithm provides an inspiration for how this synchronization could be performed efficiently.
When a client application updates an object, it generates a timestamp, and sends it along with the object's data to the three nodes. This timestamp is used by the nodes, which otherwise have no common frame of reference, during synchronization. When during synchronization a node receives an object it compares the timestamp with the timestamp of the existing object (if it exists), and updates it only if the new timestamp is greater than the existing one.
As an alternative to timestamps, Lamport timestamps have been suggested.
Partitioning
Because each directory is stored in full on a single node, storage may become imbalanced when a directory grows overly large. For this purpose, client applications may perform partitioning by splitting the objects into multiple directories.
For example, a large directory /dir could be split into subdirectories /dir/01, /dir/02, etc, based on the last two (three, ...) digits of a checksum of the object's name, which evenly distributes the objects across 100 (1000, ...) subdirectories (discussion of this technique). Using the hierarchical namespace, the application can then find all of these subdirectories.
Locality Optimization
Sometimes it makes sense to be able to group directories together on a node, for example, when one directory holds data derived from another one.
To place a directory foo at the same node as another directory /dir, it is placed into a special, virtual subdirectory /dir/.local/foo. The .local marker instructs the placement hash function to place foo on the same node as /dir.
Indexes
It should be possible to create arbitrary indexes on data objects, for example, to provide a date-sorted view of the objects in a directory. The exact mechanism for defining indexes remains to be designed.
Analysis Subsystem
MapReduce
MapReduce is a programming model developed by Google to simplify data processing on large clusters. To quote from the original paper:
MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key. Many real world tasks are expressible in this model [...]
The client provides the MapReduce system with small functions, and MapReduce takes care of executing them on the cluster, exploiting parallelism, and re-executing tasks whose machines failed, all without programmer contribution. The system is especially scalable, because the reduce functions need to be associative and commutative, which means that many optimizations can be applied in their execution.
For example, let's say we have a large corpus of documents, and we want to compute how many times each word occurs in the corpus. In pseudo-code, the user-supplied map and reduce functions could look like this:
function map(key, value) {
// key: document name, value: document text
for word in value {
emit(word, "1");
}
}
function reduce(key, values) {
// key: a word, values: list of emitted counts for that word
var result = 0;
for value in values {
result += parseInt(value);
}
emit(toString(result));
}
The system then applies the map function to each document, yielding a list of key-value (word-count) pairs. The pairs are then grouped by key, and each set of values with the same key is passed to the reduce function, which adds the individual counts together and outputs the final result, the total count of occurrences of that word.
Continuous MapReduce
Sawzall,
also by Google, is a programming language based on MapReduce, and in
the paper on Sawzall, the authors already hint at extending the model
to continuous operation:
A more radical system model would eliminate the batch-processing mode entirely. It would be convenient for tasks such as performance monitoring to have the input be fed continuously to the Sawzall program, with the aggregators keeping up with the data flow. The aggregators would be maintained in some on-line server that could be queried at any time to discover the current value of any table or table entry.
In order to make MapReduce continuous, able to incrementally update the outputs when the inputs change, we need to change the reduce function, so that it takes the old result, the list of added values, and the list of removed values.
function reduce(key, result, addedValues, removedValues) {
// key: a word, result: result of previous reduction
// addedValues: counts to be added
// removedValues: counts to be subtracted
if (result == null) result = 0;
for v in addedValues {
result += v;
} for v in removedValues {
result -= v;
} emit(toString(result));
}
Say, a document contains the text "hello world". The outputs of the map phase are [("hello", "1"), ("world", "1")]. When the user updates the document text to "goodbye world", the map outputs are [("goodbye", "1"), ("world", "1")]. Thus, we would like to decrement the total count for "hello", and increment the total count for "goodbye".
By "diffing" the map outputs from before and after the change, we arrive at a list of added map outputs, [("goodbye", "1")], and a list of removed map outputs [("hello", "1")]. (The output for "world" remains unchanged, which means we don't have to update the total word count for "world".)
Thus, the system calls the reduce function with the existing count of "goodbye", say 37, as reduce("goodbye", "37", ["1"], []), incrementing the counter for goodbye by one. Similarly, the counter for hello is decremented by one: reduce("hello", "16", [], ["1"]).
Of course, we don't have to update the result whenever a single
document changes. Rather, we can wait a certain amount of time for
other documents to change, and then call the reduce function with the
combined added and removed values from all these documents.
Using this extension to MapReduce, the total counts for each word will now be immediately updated when one or more documents change. The extension retains the expressivity and performance properties of MapReduce, which means that a large number of aggregations and reports can be created and updated continuously.
Continuous MapReduce Execution
The details of executing continuous MapReduce tasks on a cluster need to be worked out in detail in the project planning phase.
In all large businesses, and people began to look like before you like a thousand police, as the time
Posted by: louboutin heels | May 30, 2011 at 15:06