For a while now I've been interested in functional programming, Clojure in particular but I haven't had the time or the opportunity to geek out with it. I've also done quite a bit with Hadoop. Now I've jumped right in with Cascalog, the natural intersection of Clojure and Haoop. Why start at the shallow end of the tech pool when you can dive right in at the deep end?
Cascalog is described on the Cascalog site as "Data processing on Hadoop without the hassle". Think of it as an alternative to Hive that works at a higher level. Basically you build queries in an expressive and composable way as one would expect with a Clojure library and you get scalability for free. Here is an example, my first ever Cascalog query (https://github.com/snyderep/cascalog-sensor-data/blob/master/src/chariot_sensor_data/core.clj):
(ns cascalog-sensor-data.core
(require [cascalog.logic.ops :as c])
(use [cascalog.api]
[cascalog.more-taps :only (hfs-delimited)]))
(def sensorinfo-tap
(hfs-delimited "/Users/eric/Documents/motedata_2013_10_28.csv"
:delimiter ","
:classes [Float Integer Float Float Integer Integer Float]
:skip-header? true))
(defn print-mote-counts
[]
(?- (stdout)
(<- [?mote ?count ?max_humidity ?max_temperature]
(sensorinfo-tap :> ?timestamp ?mote ?humidity ?temperature ?pirValue ?motionState ?micValue)
(c/count :> ?count)
(c/max ?humidity :> ?max_humidity)
(c/max ?temperature :> ?max_temperature))))
Running this locally yields the expected ton of log messages from Cascalog/Cascading and Hadoop. It also of course produces the results. Keep in mind that we could easily put this data into HDFS and run this on a true Hadoop cluster.
$ lein run
13/11/26 10:05:17 INFO util.HadoopUtil: resolving application jar from found main method on: clojure.main
13/11/26 10:05:17 INFO planner.HadoopPlanner: using application jar: /Users/eric/.m2/repository/org/clojure/clojure/1.5.1/clojure-1.5.1.jar
13/11/26 10:05:17 INFO property.AppProps: using <a href="http://app.id">app.id</a>: 95FF0B909C0248509D17736FCE4F178B
13/11/26 10:05:19 INFO flow.Flow: [] starting
13/11/26 10:05:19 INFO flow.Flow: [] source: Hfs["TextDelimited[['!G__4145', '!G__4146', '!G__4147', '!G__4148', '!G__4149', '!G__4150', '!G__4151']]"]["/Users/eric/Documents/motedata_2013_10_28.csv"]
13/11/26 10:05:19 INFO flow.Flow: [] sink: StdoutTap["SequenceFile[[UNKNOWN]->['?mote', '?count', '?max_humidity', '?max_temperature']]"]["/var/folders/2n/hxcfl5xj70d0fkcrps676xc80000gp/T/temp90912110954941277911385478317162124000"]
13/11/26 10:05:19 INFO flow.Flow: [] parallel execution is enabled: false
13/11/26 10:05:19 INFO flow.Flow: [] starting jobs: 1
13/11/26 10:05:19 INFO flow.Flow: [] allocating threads: 1
13/11/26 10:05:19 INFO flow.FlowStep: [] starting step: (1/1) ...1277911385478317162124000
… and much much more
RESULTS
-----------------------
1 513378 57.2 25.4
2 118543 53.6 26.4
3 513093 61.2 25.7
4 513613 61.2 27.6
-----------------------