Katta is a distributed application running on many commodity hardware servers very similar to Hadoop MapReduce, Hadoop DFS, HBase, Bigtable or Hypertable.


A master server manages nodes and index shard assignment. Nodes serve index shards. A client is allowed to search within an index by connecting to all nodes and merging results from all nodes into a unified result list.
The kind of the indices/shard can be different (f.e. lucene), depending on what content-servers are hosted by the nodes.

Content Server

Each node hosts a so called content server. The content server determines which type of shards/indices are supported by the katta cluster. There are different content server implementations in Katta. Currently there is one for Lucene indices and one for Hadoop mapfiles. And you can implement your own!

Data Structure

An index for Katta is a folder with a set of subfolders. Those subfolder are called index shards. They can contain Apache Lucene indices or Apache Hadoop mapfiles (or implement your own format). An lucene index shard can be created by simply using the Lucene index writer. In that case creating a Katta index is nothing more than copying a bunch of Lucene indices together into one folder. Therefore Katta indices can be created using Hadoop map reduce (we provide some tools for that), a single server or what ever suits your needs.
This allows you to structure your index in the way that makes the most sense for your application, e.g. having documents with common related terms all in one shard etc.

Master - Node communication

Master -node communication is critical in distributed systems since the master needs to know as fast as possible about a node failure. Usually such communication is realized using heartbeat messages from nodes to the master. Katta uses a different approach. The distributed configuration and locking system Zookeeper, a Yahoo! Research project, is used for master-node communication. Zookeeper allows you to read and write into a kind of distributed virtual file system - though it is not a real file system. Nodes announce themselves during startup by writing an ephemeral file into a folder “/nodes/live”. The master subscribes to any changes of this folder and in the case of a node failure Zookeeper removes the ephemeral file and sends a notification to the master. A similar procedure is used to handle master failover, though here only the active master writes a “/master” file and secondary masters subscribe notifications to this file. Beside this all work of master and nodes is organized in operations, single units of work. There are MasterOperations like IndexDeploy-/IndexUndeployOperation and NodeOperations like ShardDeploy-/ShardUndeployOperation. These operations are stored in zookeeper in form of blocking queues. Each component (master, node) has its own queue and sequentially processes incoming operations.
The following workflow describes how an index is deployed in Katta and is a good example for any other workflow.

  • A DeployClient constructs an IndexDeployOperation and adds it to the master queue
  • The master picks up the operation, generates as shard-assignment plan and adds a bunch of ShardDeployOperations to a set of nodes
  • The nodes pick up the operations and try to deploy the contained shards (copy f.e.from hdfs, add to content server f.e. lucene) and on success publishes them to zk under “/shard-to-nodes” folder
  • The master is notified if all node operations are completed (or a node crashed) and publishes the index with some metadata to zk, f.e. with such a path “/indices/indexX”
  • A search client watches the “/indices” folder and is automatically notified if a new index is available. It explores the “/shard-to-nodes” folder for identify which nodes are serving the shards of the new index

Client Node Communication

A client communicates with with the nodes on a search request. For client-node communication we decided to use Hadoop RPC since this is a very fast and easy to use java implementation for synchronous communication (Apache Mina is faster only for asynchronous communication). For each search request to the client we send a request to all nodes serving a shard of the index we search in. (See INodeSelectionPolicy for details).

All requests are done multi-threaded and Hadoop RPC keeps TCP/IP connections open as far as we know.

Loading Shards to Nodes

Since performance is critical in search, Katta copies shards first to the local HDD of the node.

All uri understood by Hadoop file systems can be used as source. For example “file:” to deploy an index from a shared local or NAS all nodes can access. Of course “hdfs:” is also supported to deploy an index from the Hadoop distributed file system, one of the most common cases. Amazon’s S3 is also supported - refer to the Hadoop file system documentation for more details.
Katta’s shard replication is not a replacement for securing the source index e.g. with a big enough Hadoop replication level within the Hadoop DFS.

In order to do not slow down the search while deploying, katta also provide a throttling mechanism for shard deployment.

Distributed Scoring For Lucene Implementation

Katta supports distributed scoring for its lucene implementation - this is because we do not expect that term distribution is fully balanced over all shards.

Each search query that is done in Katta ends up being two network roundtrips: first we get the document frequencies for a query from all the nodes and on the second trip pass this value and the search query to all nodes. Please note that we also provide a simple count method that just counts documents matching the query but does that within one network roundtrip.


Katta provides a Java API for managing the system that can be integrated into your management and monitoring application . for deploying and un-deploying of indices. for querying lucene indices (uses for querying hadoop mapfilesĀ  (

Those I*Clients would be the integration points to connect your website or application with search results.

Finally, Katta provides a command line tool (see to manage the system level functionality such as deploying and un-deploying of indexes.