Using Hadoop IPC/RPC for distributed applications
Posted by Kelvin on 02 Jun 2008 at 01:59 pm | Tagged as: Lucene / Solr / Elasticsearch / Nutch, programming
Hadoop is growing to be a pretty large framework – release 0.17.0 has 483 classes!
Previously, I'd written about Hadoop SequenceFile. SequenceFile is part of the org.apache.hadoop.io package, the other notable useful classes in that package being ArrayFile and MapFile which are persistent array and dictionary data structures respectively.
About Hadoop IPC
Here, I'm going to introduce the Hadoop IPC, or Inter-Process Communication subsystem. Hadoop IPC is the underlying mechanism is used whenever there is a need for out-of-process applications to communicate with one another.
Hadoop IPC
1. uses binary serialization using java.io.DataOutputStream and java.io.DataInputStream, unlike SOAP or XML-RPC.
2. is a simple low-fuss RPC mechanism.
3. is unicast does not support multi-cast.
Why use Hadoop IPC over RMI or java.io.Serialization? Here's what Doug has to say:
Why didn't I use Serialization when we first started Hadoop? Because it looked big-and-hairy and I thought we needed something lean-and-mean, where we had precise control over exactly how objects are written and read, since that is central to Hadoop. With Serialization you can get some control, but you have to fight for it.
The logic for not using RMI was similar. Effective, high-performance inter-process communications are critical to Hadoop. I felt like we'd need to precisely control how things like connections, timeouts and buffers are handled, and RMI gives you little control over those.
Code!!!
I'm going to jump straight into some code examples to illustrate how easy Hadoop IPC can be.
Essentially, all unicast RPC calls involve a client and a server.
To create a server,
Configuration conf = new Configuration(); Server server = RPC.getServer(this, "localhost", 16000, conf); // start a server on localhost:16000 server.start();
To create a client,
Configuration conf = new Configuration(); InetSocketAddress addr = new InetSocketAddress("localhost", 16000); // the server's inetsocketaddress ClientProtocol client = (ClientProtocol) RPC.waitForProxy(ClientProtocol.class, ClientProtocol.versionID, addr, conf);
In this example, ClientProtocol is an interface implemented by the server class. ClientProtocol.java looks like this:
interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol { public static final long versionID = 1L; HeartbeatResponse heartbeat(); }
ClientProtocol defines a single method, heartbeat() which returns a HeartbeatResponse object. heartbeat() is the remote method called by the client which periodically lets the server know that it (the client) is still alive. The server then returns a HeartbeatResponse object which provides the client with some information.
Here's what HeartbeatResponse.java looks like:
public class HeartbeatResponse implements org.apache.hadoop.io.Writable { String status; public void write(DataOutput out) throws IOException { UTF8.writeString(out, status); } public void readFields(DataInput in) throws IOException { this.status = UTF8.readString(in); } }
Summary
So, to summarize, you'll need
1. A server which implements an interface (which itself extends VersionedProtocol)
2. 1 or more clients which call the interface methods.
3. Any method arguments or objects returned by methods must implement org.apache.hadoop.io.Writable
Comments Off on Using Hadoop IPC/RPC for distributed applications