Apache ZooKeeper

I don’t know why management of firms that deal with financial data tend to be very conservative about new technology in my part of the world. There are varied reasons for this but the lack of experience of executives is high on the list. The vision to support large-scale deployments of mission critical software is just not there.

Company honchos should read Joel Spolsky’s opinion about the role of the executives in shaping the technology roadmap of the company.

I recently explored options to monitor and operate a simple cluster of Java socket programs and came across ZooKeeper. It is not strictly a requirement for a distributed application but these individual socket programs need to be managed and some may need to be restarted.

Initially I thought of using Google protobuffer and send simple information about the number of messages and load to the centralized ZooKeeper instance so that some decision can be taken based on it. This idea is not fully explored yet but the sample program that uses ZooKeeper nodes and proto buffer messages is shown below.

The messages sent and received from ZooKeeper nodes can be serialized in any way and here proto buffer is used for that.

Producer stores messages in the node


import com.google.protobuf.InvalidProtocolBufferException;
import message.Message;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;

public class DistributedProducer {

    private ZooKeeperCoordinator zkc;

    private String root = "/Protocol Buffer";

    {
        try {
            zkc =
            new ZooKeeperCoordinator( "localhost:2181",
                                      root );
            zkc.connect();
            zkc.setUpRoot();
        } catch (InterruptedException e) {

            System.out.println( "InterruptedException" );

        } catch (IOException e) {

            System.out.println( "IOException" );
        }

    }

    public void send() throws IOException, InterruptedException, KeeperException {

        if( null != zkc ){
            ZooKeeper zk = zkc.getZookeeper();
            Stat stat =
            zk.setData( root + "/protocolbuffer",
                       getData(),
                       -1
                      );
            System.out.println( "Stat is  [" + stat + "]");
        }
    }

    private byte[] getData() throws InvalidProtocolBufferException {
        Message.Load message = Message.Load.newBuilder().setType(
                                 ( Message.Load.LoadType.HIGH)).build();
        Message.Load message1 = Message.Load.parseFrom( message.toByteArray() );
        System.out.println( "Distributed value is [" + message1.getType().toString() +"]");
        return message.toByteArray();
    }
}

Consumer consumes it.


import com.google.protobuf.InvalidProtocolBufferException;
import message.Message;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

public class DistributedConsumer {

    private ZooKeeperCoordinator zkc;

    private String root = "/Protocol Buffer";

    {
        try {
            zkc =
            new ZooKeeperCoordinator( "localhost:2181",
                                      root );
            zkc.connect();
        } catch (InterruptedException e) {

            System.out.println( "InterruptedException" );

        } catch (IOException e) {

            System.out.println( "IOException" );
        }

    }

    public void receive() throws IOException, InterruptedException, KeeperException {

        Stat stat = null;

            if( null != zkc ){
                ZooKeeper zk = zkc.getZookeeper();
                if( null != zk ){
                    byte[] value =
                    zk.getData( root + "/protocolbuffer",
                               false,
                               stat
                              );
                    getData( value );
                }
            }
    }

    private void getData( byte[] value ){

        try {
            Message.Load message = Message.Load.parseFrom( value );
            System.out.println( "Distributed value is  [" + message.getType().toString() + "]");

        } catch (InvalidProtocolBufferException e) {

            System.out.println( "InvalidProtocolBufferException" + e.getMessage() );
            e.printStackTrace();

        }
    }
}

A client that connects to and sets up ZooKeeper


import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZooKeeperCoordinator {
    
    private String host;
    
    private String root;

    private Stat stat;

    public ZooKeeper getZookeeper() {
        return zookeeper;
    }

    private ZooKeeper zookeeper;

    public ZooKeeperCoordinator( String host,
                                 String root ){

        this.host = host;
        this.root = root;

    }

    public void setUpRoot(){

        System.out.println( "Set up root" );
        try {
            stat = zookeeper.exists( root,
                                     false );
            if( null == stat ){
                 zookeeper.create( root,
                                   new byte[ 0 ],
                                   ZooDefs.Ids.OPEN_ACL_UNSAFE,
                                   CreateMode.PERSISTENT);
            }
        } catch (KeeperException e) {
            System.out.println( "KeeperException" );

        } catch (InterruptedException e) {
            System.out.println( "InterruptedException" );
        }
    }

    public void connect() throws InterruptedException, IOException {

        System.out.println( "connect" );
        final CountDownLatch latch = new CountDownLatch( 1 );

        Watcher watcher = new Watcher(){

            public void process(WatchedEvent watchedEvent) {

                if( watchedEvent.getState() == Event.KeeperState.SyncConnected ){
                    latch.countDown();
                }
            }
        };

        zookeeper = new ZooKeeper( host, 1000, watcher );
        latch.await();
        System.out.println( "Awaiting connection" );
    }

    public void close() throws InterruptedException {
        zookeeper.close();
    }
}

The Protocol Buffers documentation has instructions to compile this simple .proto data structure

package message;

message Load {

  enum LoadType {
    HIGH = 0;
    MEDIUM = 1;
    LOW = 2;
  }


  optional LoadType type = 1 [default = MEDIUM];

}

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: