Python vs Java Streams and lambda

I ported the first facebook Qualification Round Solution to Java 8.

The main idea is to count the frequency of each letter, then assign the value 26 to the most frequent letter, 25 to the next, etc. If two letters are tied for most frequent, it doesn’t matter which of them gets which value, since the sum will be the same. The python code below explains the solution pretty well.

I haven’t thoroughly checked for bugs but this is almost as beautiful as Python. Java is more verbose though.
I haven’t tested it thoroughly though.

import java.util.Map;
import java.util.TreeMap;
import java.util.stream.IntStream;

public class WordCount {
	
	public int x = 26;

	public static void main(String... argv){
		
		WordCount wc = new WordCount();
		wc.count();
	}

	private void count() {

		String s  = "__mainn__".replaceAll("[^a-z\\s]", "");

		System.out.println(s);
		

        final Map<Character, Integer> count = s.chars().
        		map(Character::toLowerCase).
                collect(TreeMap::new, (m, c) -> m.merge((char) c, 1, Integer::sum), Map::putAll);
        
             
        count.entrySet().stream().
        	sorted((l, r) -> r.getValue().compareTo(l.getValue())).
        		forEach(e -> count.merge(e.getKey(), x--, Math::multiplyExact));
                //Stop when x == 0.Not tested
        
        System.out.println(count.entrySet().stream().mapToDouble(e -> e.getValue()).sum());
	//Treating these numbers as double to sum them. Doesn't seem to matter.	
	}
}
mainn
a-1
i-1
m-1
n-2
{a=25, i=24, m=23, n=52}
124.0

Processed 0.25 TB on Amazon EMR clusters

I did that by provisioning 1 m1.medium Master node and 15 m1.xlarge Core nodes. This is easy and relatively cheap.
Since I deal with Pig I don’t have to design my MapReduce Jobs. I have to learn how to code MR jobs in the future.

This command stores the result in a file. I used to count the records in the file but I realized I don’t have to because the command actually prints how many records it writes.

store variable INTO '/user/hadoop/file' USING PigStorage();

Pig JOIN

This execution cost me $1.76 for about 1 hour. The number of machines is the same(previous post).

X = FILTER ntriples BY (subject matches '.*business.*');
y = foreach X generate subject as subject2, predicate as predicate2, object as object2 PARALLEL 50;
j = JOIN X BY subject,y BY subject2 PARALLEL 50;
j = DISTINCT j PARALLEL 50;

Screen Shot 2014-08-26 at 8.06.23 PM

Counting the records in the file.

FILE = LOAD 'join-results';
FILE_C = GROUP FILE ALL;
FILE_COUNT = FOREACH FILE_C GENERATE COUNT(FILE);

Streams

I tried to use lambdas to swap elements in the char[] array. Does this mean that I am trying to change the stream while it is streaming ? This code is from http://www.cs.uofs.edu/~mccloske/courses/cmps144/invariants_lec.html but this question is unrelated to those concepts.

If that is a problem then a new stream will do. How should this be done ? I am not looking for a Comparator. I would like to work with this code as it is without using any API but lambdas.

I am printing using lambdas in this code now.

public class DutchNationalFlag {

    private static final int N = 10;

    private static char[] flags = new char[]{'R','B','B','R','R','B','B','R','R','B'};

    public static void main( String... argv){

        new String(flags).chars().mapToObj(i -> (char)i).forEach(System.out::println);

        int m = 0,  k = 0;
        while (m != N)  {
            if (flags[m] == 'B') { }
            else {
                swap(flags,k,m); 
                k = k+1;
            }
            m = m+1;
        } 
        new String(flags).chars().mapToObj(i -> (char)i).forEach(System.out::println);
    }

    private static void swap(char[] flags, int k, int m) {

        char temp = flags[k];
        flags[k] = flags[m];
        flags[m] =  temp;

    }

}

Possible Solution 1:

This doesn’t do exactly what the original code does. It doesn’t swap and doesn’t advance k which is the boundary between ‘B’ and ‘R’.But it produces the result.

    Stream<Character> stream1 = 
    IntStream.range(0, flags.length).mapToObj(i -> (char)flags[i]);

    Stream<Character> stream2 = 
    IntStream.range(0, flags.length).mapToObj(i -> (char)flags[i]);


    Stream.concat(stream2.filter(x-> (x == 'B')), stream1.filter( y->(y == 'R')  )).forEach(System.out::println);

Cluster configuration

Screen Shot 2014-08-22 at 11.40.38 AM

So this is the real deal. The Pig Job mentioned in the previous post failed when the actual file was processed on the EMR cluster. It succeeded only after I resized the cluster and added more heap space.

I used 1 m1.small master node, 10 m1.small code nodes and 5 m1.small task nodes. I think so many nodes are not needed to process this file and just the increased heap without the task nodes would have been sufficient.

Screen Shot 2014-08-22 at 11.47.09 AM
Screen Shot 2014-08-22 at 11.47.29 AM

Big Data analysis on the cloud

I was given this dataset( http://km.aifb.kit.edu/projects/btc-2010/). I believe it is RDF. But more importantly I executed some Pig Jobs locally and this is how it worked for me. The main idea here is how it helped me to learn about Pig MapReduce Jobs.

The data is in quads like this.

<http://openean.kaufkauf.net/id/businessentities/GLN_7654990000088> <http://www.w3.org/2000/01/rdf-schema#isDefinedBy> <http://openean.kaufkauf.net/id/businessentities/><http://openean.kaufkauf.net/id/businessentities/GLN_6406510000068> .
<http://openean.kaufkauf.net/id/businessentities/GLN_3521100000068> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/goodrelations/v1#BusinessEntity> <http://openean.kaufkauf.net/id/businessentities/GLN_6406510000068> .

After processing by another Pig script I started working with this data.

(<http://openean.kaufkauf.net/id/businessentities/GLN_7612688000000>,3)
(<http://openean.kaufkauf.net/id/businessentities/GLN_7615990000096>,3)
(<http://openean.kaufkauf.net/id/businessentities/GLN_7634640000088>,3)
(<http://openean.kaufkauf.net/id/businessentities/GLN_7636150000008>,3)
(<http://openean.kaufkauf.net/id/businessentities/GLN_7636690000018>,3)
(<http://openean.kaufkauf.net/id/businessentities/GLN_7654990000088>,1)
(<http://openean.kaufkauf.net/id/businessentities/GLN_7657220000032>,3)
(<http://openean.kaufkauf.net/id/businessentities/GLN_7658940000098>,3)
(<http://openean.kaufkauf.net/id/businessentities/GLN_7659150000014>,3)
(<http://openean.kaufkauf.net/id/businessentities/GLN_7662880000018>,3)

The schema of the data is like this.


count_by_object: {group: chararray,count: long}

x = GROUP count_by_object BY count;
y = FOREACH x GENERATE group,COUNT(count_by_object);

Line 1 shown above groups the tuples by the count. This is what I get.

(1,{(<http://openean.kaufkauf.net/id/businessentities/GLN_7654990000088>,1)})
(3,{(<http://openean.kaufkauf.net/id/businessentities/GLN_0000049021028>,3),(<http://openean.kaufkauf.net/id/businessentities/GLN_0000054110120>,3),(<http://openean.kaufkauf.net/id/businessentities/GLN_0078477000014>,3),(<http://openean.kaufkauf.net/id/businessentities/GLN_0084610000032>,3),(<http://openean.kaufkauf.net/id/businessentities/GLN_0088720000050>,3),(<http://openean.kaufkauf.net/id/businessentities/GLN_0120490000028>,3),(<http://openean.kaufkauf.net/id/businessentities/GLN_0133770000090>,3),(<http://openean.kaufkauf.net/id/businessentities/GLN_0144360000086>,3),(<http://openean.kaufkauf.net/id/businessentities/GLN_0146140000040>,3),(<http://openean.kaufkauf.net/id/businessentities/GLN_0160080000038>,3),(<http://openean.kaufkauf.net/id/businessentities/GLN_0162990000030>,3),(<http://openean.kaufkauf.net/id/businessentities/GLN_0165590000028>,3),(<http://openean.kaufkauf.net/id/businessentities/GLN_0166620000056>,3),
.........

Line 2 of the Pig script give me this result.

(1,1)
(3,333)

It is a interesting way to learn Pig which internally spawns Hadoop MapReduce Jobs. But the real fun is the Amazon Elastic MapReduce on-demand clusters. If the file is very large the EMR clusters should be used. It is basically Big Data analysis on the cloud.

My AWS Pig Job

I executed some Pig Jobs on Elastic MapReduce by cloning the same cluster I used earlier(previous blog post). After that cluster setup my billing details were these.

I am still learning Pig. A sample of my pig commands are

grunt> fs -mkdir /user/hadoop
grunt> fs -ls /user/hadoop
grunt> register s3n://uw-cse-344-oregon.aws.amazon.com/myudfs.jar
2014-08-20 15:10:26,625 [main] INFO  org.apache.pig.impl.io.FileLocalizer - Downloading file s3n://uw-cse-344-oregon.aws.amazon.com/myudfs.jar to path /tmp/pig8610216688759169361tmp/myudfs.jar
2014-08-20 15:10:26,632 [main] INFO  org.apache.hadoop.fs.s3native.NativeS3FileSystem - Opening 's3n://uw-cse-344-oregon.aws.amazon.com/myudfs.jar' for reading
2014-08-20 15:10:26,693 [main] INFO  org.apache.hadoop.util.NativeCodeLoader - Loaded the native-hadoop library
grunt> raw = LOAD 's3n://uw-cse-344-oregon.aws.amazon.com/cse344-test-file' USING TextLoader as (line:chararray);
grunt> ntriples = foreach raw generate FLATTEN(myudfs.RDFSplit3(line)) as (subject:chararray,predicate:chararray,object:chararray);

After submitting the jobs one can track the Jobs using the tracker UI.

The successful completion of the Hadoop Jobs.

Screen Shot 2014-08-20 at 9.03.01 PM

This is an emancipatory experience 🙂 One is set free from the local offshore job experience.

My first AWS cluster

I have deployed to the cloud before but this time it is AWS.

Screen Shot 2014-08-20 at 10.40.17 AM

Screen Shot 2014-08-20 at 10.42.14 AM

Screen Shot 2014-08-20 at 10.45.01 AM

Screen Shot 2014-08-20 at 10.45.19 AM

A billing alarm for safety.

Screen Shot 2014-08-20 at 11.07.26 AM

Steve Vinoski recommends these papers

One does not need a degree from the IIT  to read and understand these papers. These are accessible and one has to persevere. That is all. In India technical work is considered taboo because the society thinks it is the prerogative of people with advanced degrees.

Steve Vinoski recommends these papers.

 

“Eventual Consistency Today: Limitations, Extensions, and Beyond”, Peter Bailis, Ali Ghodsi. This article provides an excellent description of eventual consistency and
recent work on eventually consistent systems.

“A comprehensive study of Convergent and Commutative Replicated Data Types”, M. Shapiro, N. Preguiça, C. Baquero, M. Zawirski. This paper explores and details data types that work well for applications built on eventually consistent systems.

“Notes on Distributed Systems for Young Bloods”, J. Hodges. This excellent blog post succinctly summarizes the past few decades of
distributed systems research and discoveries, and also explains some implementation concerns we’ve learned along the way to keep in mind when build distributed applications.

“Impossibility of Distributed Consensus with One Faulty Process”, M.Fischer, N. Lynch, M. Paterson. This paper is nearly 30 years old but is critical to understanding fundamental properties of distributed systems.

“Dynamo: Amazon’s Highly Available Key-value Store”, G. DeCandia, et al. A classic paper detailing trade-offs for high availability distributed systems.

Matrix multiplication

This is simplified code that multiplies two matrices – a and b – using data stored in a file.

["a", 0, 0, 63]
["a", 0, 1, 45]
["a", 0, 2, 93]
["a", 0, 3, 32]
["a", 0, 4, 49]
["a", 1, 0, 33]
["a", 1, 3, 26]
["a", 1, 4, 95]
["a", 2, 0, 25]
["a", 2, 1, 11]
["a", 2, 3, 60]
["a", 2, 4, 89]
["a", 3, 0, 24]
["a", 3, 1, 79]
["a", 3, 2, 24]
["a", 3, 3, 47]
["a", 3, 4, 18]
["a", 4, 0, 7]
["a", 4, 1, 98]
["a", 4, 2, 96]
["a", 4, 3, 27]
["b", 0, 0, 63]
["b", 0, 1, 18]
["b", 0, 2, 89]
["b", 0, 3, 28]
["b", 0, 4, 39]
["b", 1, 0, 59]
["b", 1, 1, 76]
["b", 1, 2, 34]
["b", 1, 3, 12]
["b", 1, 4, 6]
["b", 2, 0, 30]
["b", 2, 1, 52]
["b", 2, 2, 49]
["b", 2, 3, 3]
["b", 2, 4, 95]
["b", 3, 0, 77]
["b", 3, 1, 75]
["b", 3, 2, 85]
["b", 4, 1, 46]
["b", 4, 2, 33]
["b", 4, 3, 69]
["b", 4, 4, 88]

I loaded this data in a dict so that the keys are the tuples(row,column).

      for x in sorted(matrixa.items()):
            print x

((0, 0), 63)
((0, 1), 45)
((0, 2), 93)
((0, 3), 32)
((0, 4), 49)
((1, 0), 33)
((1, 3), 26)
((1, 4), 95)
((2, 0), 25)
((2, 1), 11)
((2, 3), 60)
((2, 4), 89)
((3, 0), 24)
((3, 1), 79)
((3, 2), 24)
((3, 3), 47)
((3, 4), 18)
((4, 0), 7)
((4, 1), 98)
((4, 2), 96)
((4, 3), 27)

It is a sparse matrix. So all the values may not be there. The code has to check that.

     l = []

      for i in range(0,5):
        for j in range(0,5):
            for k in range(0,5):
               if (i,k) in matrixa:
                   x = matrixa[(i,k)]
               else:
                   x = 0
               if (k,j) in matrixb:
                   y = matrixb[(k,j)]
               else:
                   y = 0
               z += x * y
            l.append([i,j,z])
            z = 0
     for i in l:
        print i

The result is this.

[0, 0, 11878]
[0, 1, 14044]
[0, 2, 16031]
[0, 3, 5964]
[0, 4, 15874]
[1, 0, 4081]
[1, 1, 6914]
[1, 2, 8282]
[1, 3, 7479]
[1, 4, 9647]
[2, 0, 6844]
[2, 1, 9880]
[2, 2, 10636]
[2, 3, 6973]
[2, 4, 8873]
[3, 0, 10512]
[3, 1, 12037]
[3, 2, 10587]
[3, 3, 2934]
[3, 4, 5274]
[4, 0, 11182]
[4, 1, 14591]
[4, 2, 10954]
[4, 3, 1660]
[4, 4, 9981]