Apache ZooKeeper
February 24, 2012 Leave a comment
I don’t know why management of firms that deal with financial data tend to be very conservative about new technology in my part of the world. There are varied reasons for this but the lack of experience of executives is high on the list. The vision to support large-scale deployments of mission critical software is just not there.
Company honchos should read Joel Spolsky’s opinion about the role of the executives in shaping the technology roadmap of the company.
I recently explored options to monitor and operate a simple cluster of Java socket programs and came across ZooKeeper. It is not strictly a requirement for a distributed application but these individual socket programs need to be managed and some may need to be restarted.
Initially I thought of using Google protobuffer and send simple information about the number of messages and load to the centralized ZooKeeper instance so that some decision can be taken based on it. This idea is not fully explored yet but the sample program that uses ZooKeeper nodes and proto buffer messages is shown below.
The messages sent and received from ZooKeeper nodes can be serialized in any way and here proto buffer is used for that.
Producer stores messages in the node
import com.google.protobuf.InvalidProtocolBufferException; import message.Message; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.io.IOException; public class DistributedProducer { private ZooKeeperCoordinator zkc; private String root = "/Protocol Buffer"; { try { zkc = new ZooKeeperCoordinator( "localhost:2181", root ); zkc.connect(); zkc.setUpRoot(); } catch (InterruptedException e) { System.out.println( "InterruptedException" ); } catch (IOException e) { System.out.println( "IOException" ); } } public void send() throws IOException, InterruptedException, KeeperException { if( null != zkc ){ ZooKeeper zk = zkc.getZookeeper(); Stat stat = zk.setData( root + "/protocolbuffer", getData(), -1 ); System.out.println( "Stat is [" + stat + "]"); } } private byte[] getData() throws InvalidProtocolBufferException { Message.Load message = Message.Load.newBuilder().setType( ( Message.Load.LoadType.HIGH)).build(); Message.Load message1 = Message.Load.parseFrom( message.toByteArray() ); System.out.println( "Distributed value is [" + message1.getType().toString() +"]"); return message.toByteArray(); } }
Consumer consumes it.
import com.google.protobuf.InvalidProtocolBufferException; import message.Message; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; public class DistributedConsumer { private ZooKeeperCoordinator zkc; private String root = "/Protocol Buffer"; { try { zkc = new ZooKeeperCoordinator( "localhost:2181", root ); zkc.connect(); } catch (InterruptedException e) { System.out.println( "InterruptedException" ); } catch (IOException e) { System.out.println( "IOException" ); } } public void receive() throws IOException, InterruptedException, KeeperException { Stat stat = null; if( null != zkc ){ ZooKeeper zk = zkc.getZookeeper(); if( null != zk ){ byte[] value = zk.getData( root + "/protocolbuffer", false, stat ); getData( value ); } } } private void getData( byte[] value ){ try { Message.Load message = Message.Load.parseFrom( value ); System.out.println( "Distributed value is [" + message.getType().toString() + "]"); } catch (InvalidProtocolBufferException e) { System.out.println( "InvalidProtocolBufferException" + e.getMessage() ); e.printStackTrace(); } } }
A client that connects to and sets up ZooKeeper
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.concurrent.CountDownLatch; public class ZooKeeperCoordinator { private String host; private String root; private Stat stat; public ZooKeeper getZookeeper() { return zookeeper; } private ZooKeeper zookeeper; public ZooKeeperCoordinator( String host, String root ){ this.host = host; this.root = root; } public void setUpRoot(){ System.out.println( "Set up root" ); try { stat = zookeeper.exists( root, false ); if( null == stat ){ zookeeper.create( root, new byte[ 0 ], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out.println( "KeeperException" ); } catch (InterruptedException e) { System.out.println( "InterruptedException" ); } } public void connect() throws InterruptedException, IOException { System.out.println( "connect" ); final CountDownLatch latch = new CountDownLatch( 1 ); Watcher watcher = new Watcher(){ public void process(WatchedEvent watchedEvent) { if( watchedEvent.getState() == Event.KeeperState.SyncConnected ){ latch.countDown(); } } }; zookeeper = new ZooKeeper( host, 1000, watcher ); latch.await(); System.out.println( "Awaiting connection" ); } public void close() throws InterruptedException { zookeeper.close(); } }
The Protocol Buffers documentation has instructions to compile this simple .proto data structure
package message; message Load { enum LoadType { HIGH = 0; MEDIUM = 1; LOW = 2; } optional LoadType type = 1 [default = MEDIUM]; }