(defn off
"Computes a lazy-seq in another (dedicated) thread."
[s]
(let [ex (. java.util.concurrent.Executors newSingleThreadExecutor)]
((fn this [s]
(if s
(let [future-rest (java.util.concurrent.FutureTask. #(rest s))]
(.execute ex future-rest)
(lazy-cons (first s) (this (.get future-rest))))
(.shutdown ex))) s)))
Monday, June 23, 2008
Lazier than lazy
This post has moved,
go to its new location
If you can't put off what you have to do, ask someone else to do it for you.
Labels:
utilities
Wide Finder 2 : adding agents
This post has moved,
go to its new location
Below is the code that allows me to make the serial code parallel without too much effort: by replacing a doseq with a pdoseq.
The input is split into batches of 10000 lines (hardcoded, sorry) that are processed in independent threads. The results of each batch are then merged using the specified rules.
(pdoseq line (-> (. System in) (java.io.InputStreamReader. "US-ASCII") java.io.BufferedReader. line-seq)This new line specifies how to merge each var that can be mutated.
[u-hits (merge-with +), u-bytes (merge-with +), s404s (merge-with +), clients (merge-with +), refs (merge-with +)]
The input is split into batches of 10000 lines (hardcoded, sorry) that are processed in independent threads. The results of each batch are then merged using the specified rules.
NB: merge-with
is the Clojure's function for merging two maps. Here for each var it is specified that intermediate values must be merged as maps while adding values on key collisions.
Runtime on 10M lines on a Core 2 Duo (serial: 2m45s):
real 1m36.296sIt doesn't work that well on the T2000:
user 3m2.651s
sys 0m3.536s
Serial code (for 1M lines):You need to know that Clojure takes 11s to boot on this computer, so without Clojure's boot time: we got a 15x ((3m59s - 11s)/(37s - 11s)) speedup, far from 32x. That's because the main loop dispatching batches of lines to workers can't keep pace.
real 2m43.273s
user 4m11.942s # user > real because of parallel GC
sys 0m16.093s
Parallel code (for 1M lines):
real 0m37.121s
user 3m59.365s
sys 0m13.944s
On the whole, I'm happy with these first results: using Clojure I had been able to make the serial code run in parallel without much changes to the main logic.
The code below can't handle the full data set (200M lines) mainly because of: memory requirements and some "malformed" log lines.
So far my best runtime on the whole dataset is 27 minutes.
To be continued...
;;;;;;;;;;; helper code
(defn pfor-each
"equivalent to (reduce merge-fn agent-init (map work-fn s)) but with some
parallelism (several map workers but only one reducer)"
[s work-fn agent-init merge-fn]
(let [nagents (.. Runtime (getRuntime) (availableProcessors))
agents (map agent (replicate nagents nil))
result (agent agent-init)]
((fn [[agent & etc-agents] s] ;a fn and not a loop to be sure to not retain
;a reference to the head of the sequence
(if-let [x & etc] s
(do
(await agent) ;awaiting agent to not flood memory under pending jobs
(when-let r @agent
(send result merge-fn r))
(send agent work-fn x)
(recur etc-agents etc))
(do
(doseq agent agents
(await agent)
(when-let r @agent
(send result merge-fn r)))
(await result)
@result))) (cycle agents) s)))
(defn batch
"Returns a lazy sequence of lists of n items each -- the last one may have less
items."
[s n] (when s (lazy-cons (take n s) (batch (drop n s) n))))
(defmacro pdoseq
"Like doseq except you have to specify how to merge vars mutated by the body."
[item s merge-rules & body]
(let [captured-vars (take-nth 2 merge-rules)
mergers (take-nth 2 (drop 1 merge-rules))
init-syms (take (count captured-vars) (repeatedly gensym))
syms-a (take (count captured-vars) (repeatedly gensym))
syms-b (take (count captured-vars) (repeatedly gensym))
result-syms (take (count captured-vars) (repeatedly gensym))]
`(let [[~@init-syms :as init#] [~@captured-vars]
work# (fn [_# items#]
(binding [~@(interleave captured-vars init-syms)]
(doseq ~item items#
~@body)
[~@captured-vars]))
merger# (fn [[~@syms-a] [~@syms-b]]
(vector ~@(map #((if (seq? %1) concat cons) %1 %&) mergers syms-a syms-b)))
[~@result-syms] (pfor-each (batch ~s 10000) work# init# merger#)]
~@(map list (repeat `set!) captured-vars result-syms))))
;;;;;;;;;;;; main code
(def u-hits)
(def u-bytes)
(def s404s)
(def clients)
(def refs)
(defmacro acc [h k v]
`(set! ~h (assoc ~h ~k (+ (get ~h ~k 0) ~v))))
(defn top [n h] ; the previous top function wasn't a real port of Tim Bray's one
(loop [top-n (replicate n (first {"-" 0})) kvs (seq h)]
(if-let [[k v :as kv] & etc] kvs
(if (> v (val (first top-n)))
(let [[lt gt] (split-with #(< (val %) v) (rest top-n))]
(recur (concat lt (cons kv gt)) etc))
(recur top-n etc))
(reverse top-n))))
(defn record [client u bytes ref]
(acc u-bytes u bytes)
(when (re-matches #"^/ongoing/When/\\d\\d\\dx/\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+$" u)
(acc u-hits u 1)
(acc clients client 1)
(when-not (or (= ref "\"-\"") (re-find #"^\"http://www.tbray.org/ongoing/" ref)
(acc refs (subs ref 1 (dec (count ref))) 1))))) ; lose the quotes
(defn printf [#^String fmt & args]
(let [f (java.util.Formatter. *out*)]
(.format f (. java.util.Locale ENGLISH) fmt (to-array args))))
(defn report
([label hash] (report label hash false))
([label hash shrink]
(println (str "Top " label ":"))
(let [fmt (if shrink " %9.1fM: %s\n" " %10d: %s\n")]
(doseq [key val] (top 10 hash)
(let [key (if (< 60 (count key)) (str (subs key 0 60) "...") key)
val (if shrink (/ val 1024.0 1024.0) val)]
(printf fmt val key))))
(binding [u-hits {} u-bytes {} s404s {} clients {} refs {}]
(pdoseq line (-> (. System in) (java.io.InputStreamReader. "US-ASCII") java.io.BufferedReader. line-seq)
[u-hits (merge-with +), u-bytes (merge-with +), s404s (merge-with +), clients (merge-with +), refs (merge-with +)]
(let [f (.split #"\\s+" line)]
(when (= "\"GET" (get f 5))
(let [[client u status bytes ref] (map #(get f %) [0 6 8 9 10])]
(cond
(= "200" status) (record client u (.parseInt Integer bytes) ref)
(= "304" status) (record client u 0 ref)
(= "404" status) (acc s404s u 1))))))
(print (count u-hits) "resources," (count s404s) "404s," (count clients) "clients\n\n")
(report "URIs by hit" u-hits)
(report "URIs by bytes" u-bytes true)
(report "404s" s404s)
(report "client addresses" clients)
(report "referrers" refs)
)
;;; explicit exit to shut down agent threads
(flush)
(. System exit 0)
Labels:
wide finder 2
Wednesday, June 18, 2008
Several mistakes (updated)
This post has moved,
go to its new location
In the previous post, there were several mistakes. They are all fixed by now, except one:
Yup, this code is buggy: it is intended to sort hash entries by descending val order (values are numbers) and it breaks on large data sets.
With large data sets, some values get big and doesn't fit into an int anymore while some stay small thus the difference of such two numbers doesn't fit into an int while Comparator.compare must return an int: overflow.
Here is one way to fix that:
I have hope that there will be a better way to fix that in a near future.
(defn top [n h]
(take n (sort #(- (val %2) (val %1)) h)))
Yup, this code is buggy: it is intended to sort hash entries by descending val order (values are numbers) and it breaks on large data sets.
With large data sets, some values get big and doesn't fit into an int anymore while some stay small thus the difference of such two numbers doesn't fit into an int while Comparator.compare must return an int: overflow.
Here is one way to fix that:
(defn top [n h]
(take n (sort #(.compare clojure.lang.Numbers (val %2) (val %1)) h)))
(defn top [n h]
(take n (sort #(compare (val %2) (val %1)) h)))
Labels:
mistakes
Friday, June 13, 2008
WideFinder 2 in Clojure (naive port from Ruby)
This post has moved,
go to its new location
I ported the reference implementation of Wide Finder 2 from Ruby to Clojure nearly line by line.
On my box, this code is more than 25% faster than the original Ruby when processing 10M lines (2'45" to 3'45") — but Ruby is faster up to 100k lines.
My next post will show how one can achieve some parallelization without altering much the logic:
On my box, this code is more than 25% faster than the original Ruby when processing 10M lines (2'45" to 3'45") — but Ruby is faster up to 100k lines.
(def u-hits)
(def u-bytes)
(def s404s)
(def clients)
(def refs)
(defmacro acc [h k v]
`(set! ~h (assoc ~h ~k (+ (get ~h ~k 0) ~v))))
(defn top [n h]
(take n (sort #(- (val %2) (val %1)) h)))
(defn record [client u bytes ref]
(acc u-bytes u bytes)
(when (re-matches #"^/ongoing/When/\\d\\d\\dx/\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+$" u)
(acc u-hits u 1)
(acc clients client 1)
(when-not (or (= ref "\"-\"") (re-find #"^\"http://www.tbray.org/ongoing/" ref)
(acc refs (subs ref 1 (dec (count ref))) 1))))) ; lose the quotes
(defn printf [#^String fmt & args]
(let [f (java.util.Formatter. *out*)]
(.format f (. java.util.Locale ENGLISH) fmt (to-array args))))
(defn report
([label hash] (report label hash false))
([label hash shrink]
(println (str "Top " label ":"))
(let [fmt (if shrink " %9.1fM: %s\n" " %10d: %s\n")]
(doseq [key val] (top 10 hash)
(let [key (if (< 60 (count key)) (str (subs key 0 60) "...") key)
val (if shrink (/ val 1024 1024) val)]
(printf fmt val key))))))
(binding [u-hits {} u-bytes {} s404s {} clients {} refs {}]
(doseq line (-> (. System in) (java.io.InputStreamReader. "US-ASCII") java.io.BufferedReader. line-seq)
(let [f (.split #"\\s+" line)]
(when (= "\"GET" (get f 5))
(let [[client u status bytes ref] (map #(get f %) [0 6 8 9 10])]
(cond
(= "200" status) (record client u (.parseInt Integer bytes) ref)
(= "304" status) (record client u 0 ref)
(= "404" status) (acc s404s u 1))))))
(print (count u-hits) "resources," (count s404s) "404s," (count clients) "clients\n\n")
(report "URIs by hit" u-hits)
(report "URIs by bytes" u-bytes true)
(report "404s" s404s)
(report "client addresses" clients)
(report "referrers" refs))
My next post will show how one can achieve some parallelization without altering much the logic:
(pdoseq line (-> (. System in) (java.io.InputStreamReader. "US-ASCII") java.io.BufferedReader. line-seq)
[u-hits (merge-with +), u-bytes (merge-with +), s404s (merge-with +), clients (merge-with +), refs (merge-with +)]
Labels:
wide finder 2
Tuesday, June 10, 2008
Google Treasure Hunt, Question #4
This post has moved,
go to its new location
A friend of mine asked me how I solved the fourth question of Google Treasure Hunt 2008 using Clojure. I didn't keep the original code around, so below is how I could have done it.
First, define a primes seq.
Second, define a function which returns the sequence of sums of N consecutive primes:
Third, define a function which, taking a list of increasing sequences, returns the first common value.
Last, use them! Here is a sample question:
And here is how to compute the answer:
(Right now this code may throw a StackOverflow exception, please use one of those definition of
First, define a primes seq.
Second, define a function which returns the sequence of sums of N consecutive primes:
(defn sum-primes [n] (map #(apply + %) (partition n 1 primes)))
Third, define a function which, taking a list of increasing sequences, returns the first common value.
(defn find= [seqs]
(if (apply == (map first seqs))
(ffirst seqs)
(let [[s1 & etc] (sort #(- (first %1) (first %2)) seqs)]
(recur (cons (rest s1) etc)))))
Last, use them! Here is a sample question:
Find the smallest number that can be expressed as
the sum of 3 consecutive prime numbers,
the sum of 5 consecutive prime numbers,
the sum of 11 consecutive prime numbers,
the sum of 1493 consecutive prime numbers,
and is itself a prime number.
And here is how to compute the answer:
(find= (cons primes (map sum-primes [3, 5, 11, 1493])))returns 9174901 in twenty seconds or so.
partition
.)Saturday, June 7, 2008
Primes
This post has moved,
go to its new location
Last night on #clojure Lau_of_DK asked for a way to define the sequence of prime numbers.
Having helped Lou Franco in his effort to parallelize primes computation and solved the fourth question of Google Treasure Hunt using Clojure, I thought I knew pretty well how to produce primes in Clojure but I stumbled accross some Haskell code that was far smarter. Here it is, now ported to Clojure:
It's interesting to note that the seq is seeded with 1 and 2 because Clojure's lazy seqs have a off by one evaluation (when one asks for the nth value, the nth+1 is computed — to know if the end of the seq is reached). No, no, no! I was plain wrong: if I need to seed with [1 2] 2 it's because of the take-while whose predicate must return at least one false.
Update: In comments, Cale Gibbard points out that my definition of prime numbers is loose: 1 isn't a prime. I fixed the code.
Having helped Lou Franco in his effort to parallelize primes computation and solved the fourth question of Google Treasure Hunt using Clojure, I thought I knew pretty well how to produce primes in Clojure but I stumbled accross some Haskell code that was far smarter. Here it is, now ported to Clojure:
(def primes (lazy-cons 2 ((fn this[n]
(let [potential-divisors (take-while #(<= (* % %) n) primes)]
(if (some #(zero? (rem n %)) potential-divisors)
(recur (inc n))
(lazy-cons n (this (inc n)))))) 3)))
Update: In comments, Cale Gibbard points out that my definition of prime numbers is loose: 1 isn't a prime. I fixed the code.
Wednesday, June 4, 2008
How to genclass a servlet
This post has moved,
go to its new location
This is an old post, refers to Clojure.org for details on how gen-class works now.
Although generating a servlet class is a canonical example of using Clojure's genclass I had never do this — I've used a handmade ClojureServlet which predates genclass.
Today, as I was starting a new project, I decided to get rid of this legacy ClojureServlet and to generate a brand new one.
Right at the end of genclass.clj there's the following example:
(clojure/gen-and-save-class "/Users/rich/dev/clojure/gen/"
'org.clojure.ClojureServlet
:extends javax.servlet.http.HttpServlet)
I changed the path, created the directory structure (
gen-and-save-class
doesn't create the org/clojure/
directories) and ran this code (NB: servlet-api.jar must be on the classpath). Et voilĂ , org/clojure/ClojureServlet.class
!Now to the Clojure code, I created
org/clojure/ClojureServlet.clj
to define the methods and launched Tomcat to test the servlet. It crashed in the init
method.In
org/clojure/ClojureServlet.clj
I have:(in-ns 'org.clojure.ClojureServlet)
(clojure/refer 'clojure)
(defn init [this config]
;censored
)
Cause: init does not call super.init(ServletConfig) as GenericServlet mandates it. Hopefully GenericServlet provides an init() which is easier to override, so I changed the code to:
(in-ns 'org.clojure.ClojureServlet)which override only the init method taking no arguments. The bug was fixed.
(clojure/refer 'clojure)
(defn init-void [this]
;censored, again
)
To override a specific signature, define a var named
methodName-firstArgClassName-secondClassName...
(e.g. init-ServletConfig) or methodName-void
if no arguments.
Labels:
mistakes
Subscribe to:
Posts (Atom)