Introduction

This guide explains how to use Katta to write a new kind of client/server on top of Katta.
Katta will manage loading of shards, pools of nodes, maintaining replication levels etc…
The domain specific work is done by custom client and server classes.
Katta ships with Lucene and MapFile clients and servers. If these do not meet
your needs then you must to write your own client and server. This guide describes one
sequence of steps to achieve this, but you do not have to do it in this order.

The Problem

For this example our problem is grep-ing a large file for regular expressions.
We started off just using grep, but the file size has grown to the point where latency is too
large. So we need to cut the file up into pieces and distribute the work across a pool of
nodes. Katta to the rescue!
Specifically, our data set is an ordered list of newline terminated strings. Our search
key is a regular expression, and the result is a list of strings that match the regular
expression, in the order they occur in the original list.

Katta Infrastructure

A Katta index is a directory containing nothing but “shards” (directories). The name is chosed at
adIndex time (below). We will use “grep” as our index name. The name of the index is used as part of the shard names.

You can have multiple shards per node, and this allows for a more even distribution as
the number of nodes changes. We decide on 10 nodes and 100 shards.

So we create a directory “grep_v1″, and subdirectories “shard00″ ..”shard99″. The final shard
names will then be “grep_shard00″ .. “grep_shard99″. We will be sorting based on shard name
in the client, so we zero-pad the numbers.

The contents of the shard directories are totally up to you. Katta does not care, it just
copies the directories to the server nodes and notifies them where the directories are. So we cut
the input file into 100 equal sized pieces and place them each shard directory as the file “data.txt”.

See the Katta documentation for deployment options to make the index available to the nodes.
It is common to upload to a Hadoop file system, but this is not required.

Also see Katta documentation for setting up Zookeeper. At this point we will assume we have in
place one or more Zookeeper nodes, 10 server machines, one or more client machines, and one machine
to add the index (you can do this on any of the other machines, or use a separate machine). Now we
need the custom software.

Custom Content Server

We start by defining an interface for our server. These methods will be called by our custom
client classes, not external users, so no one but us will see them. Hadoop RPC is used, so
all the data must be Writable. We only need 1 method, which takes a String as input. But to be
Writable, we use Text. In order to enable sorting in the client, we need to return the shard names,
and the list of strings for each shard. So we return a map of Text to list of Text:

interface IGrepServer extends VersionedProtocol {
public Map<Text, List<Text>> grep(Text regex, String[] shards) throws Exception;
}
The VersionedProtocol is required because in the client Hadoop RPC returns proxy objects of that type.

Due to replication and retries, the list of shards may be a subset of the shards on each node.
In the simple no replication case, each node will get exactly 1 call with its full set of shards
listed. No retries will occur.

In the client, we will use this interface to look up the grep() method. All of our RPC
calls from the client to the server nodes are listed in this interface.

Now we are ready to write our actual server class. This class must implement our IGrepServer
interface and also the IContentServer interface. IContentServer lists all the calls our server will
get from the Katta Node class (start using shard, stop using shard, shutdown). To keep things
simple we extend AbstractServer, which implements this interface and maintains the list of
shards (directories) for us. It also takes care of VersionedProtocol (always returns version 0).

public class GrepServer extends AbstractServer implements IGrepServer {
public Map> grep(Text regex, String[] shards) throws Exception {
Map> result = new HashMap>();
for (String shard : shards) {
result.put(new Text(shard), grepShard(regex.toString(), _shards.get(shard)));
}
return result;
}
private List grepShard(String regex, File shard) {
File dataFile = new File(shard, “data.txt”);
// Grep file using Scanner class.
}
public Map getShardMetaData(String shardName) throws Exception {
return null;
}
public void shutdown() throws IOException {
}
}

You could search the N shards in parallel if you wish.

Custom Client

To outside users, we want to hide the fact that we use Katta. So we define a client interface. This is
not used or required by Katta but is good practice.

public interface IGrepClient {
public List grep(String regex) throws Exception;
}

The actual client class creates an instance of Client, uses it’s broadcast() method to get a
result set, then throws an exception if any occurred. If not, it appends all the per-shard lists
in alphabetical shard order. This guarantees that the final list is in the right order. You could
pass line numbers explicitly in your data, but we wanted to keep it simple for this example.

public class GrepClient implements IGrepClient {
private Client client;
private Method method;
public GrepClient() throws KattaException {
client = new Client(IGrepServer.class);
try {
method = IGrepClient.class.getMethod("grep", String.class, String[].class);
} catch (NoSuchMethodException e) {
throw new RuntimeException(”Missing method”);
}
}
public List grep(String regex) throws KattaException {
// Call Katta.
ClientResult >> clientResult = client.broadcastToAll(3000, true, method, 1, new Text(regex), null);
// If there were any errors, pick one and throw it.
if (clientResult.isError()) {
throw clientResult.getKattaException();
}
// Combine into 1 big map of shard –> strings, sorted by shard name.
Map> combined = new TreeMap>();
for (Map> result : clientResult.getResults()) {
combined.putAll(result);
}
// Build results.
List strings = new ArrayList();
for (List texts : combined.values()) {
for (Text text : texts) {
strings.add(texts.toString());
}
}
return strings;
}
}

Things To Notice

The only server class used by the client is IGrepClient, which contains only the RPC
methods used by the client.
The second argument to grep() is the array of shard names. The last 2 arguments to broadcastToAll() are
the args to pass into the method. The second argument will be overwritten by Katta, so we pass in null.
The fourth argument to broadcastToAll() tells Katta which argument (if any) to overwrite (1 is second argument).
If this is < 0 no overwriting is done.
The first two arguments to broadcastToAll say that we are willing to wait up to 3 seconds for complete results
(data or error for all shards - due to retires this might not be all nodes), and then terminate the call.
The ClientResult object keeps track of results on a per-set-of-shards basis. Because we need to know the
exact shard for each string (for sorting), we pass that inside our result.

Starting The Nodes

Make your classes are available on the classpath. One way to to drop your jar in Katt’s lib dir.
The either start a individual node with
bin/katta startNode -c com.foo.GrepServer
or configure node.server.class in katta.node.properties

Deploy The Index

bin/katta addIndex grep

Add an index with the name “grep”. In this example the location would end in “/grep_v1″, but the deployed
index name is “grep”. The replication level should be 1 for no replication.

Using The Client

Now you are ready to use your client:

System.out.println("Katta references: " + new GrepClient().grep("[Kk]atta”);

Conclusion

I hope this has made it clear what you need to do to create a custom client/server using Katta, and
how easy it is. Please see the Katta documentation and discussion groups for more information.