EzDevInfo.com

core.async

Facilities for async programming and communication in Clojure

Comparing core.async and Functional Reactive Programming (+Rx)

I seem to be a little bit confused when comparing Clojure's core.async to the so called Reactive Extensions (Rx) and FRP in general. They seem to tackle similar problem of async-hronicity, so I wonder what are the principal differences and in what cases is one preferred over the other. Can someone please explain?

EDIT: To encourage more in-depth answers I want to make the question more specific:

  1. Core.async allows me to write synchronous-looking code. However as I understand it FRP only needs one level of nested callbacks (all the function that handle logic are passed as arguments to FRP API). This seems that both approaches make the callback pyramids unnecessary. It is true that in JS I have to write function() {...} many times, but the main problem, the nested callbacks, is gone in FRP also. Do I get it right?

  2. "FRP complects the communication of messages with flow of control" Can you (someone) please give a more specific explanation?

  3. Can't I pass around FRP's observable endpoints the same way as i pass channels?

In general I understand where both approaches historically come from and I have tried few tutorials in both of them. However I seem to be "paralyzed" by the non-obviousness of differences. Is there some example of a code that would be hard to write in one of these and easy using the other? And what is the architectural reason of that?


Source: (StackOverflow)

More elegant way to handle error and timeouts in core.async?

Of course I want to wrap various requests to external services with core.async, while still returning results from these operations through some chan.

I want to take care of both thrown exceptions and timeouts (ie that the operation takes longer than expected to return, or to be able to choose among various services for the same task but with different approaches or qualities of service.

The smallest viable example to show examples of both being able to handle an error, a timeout and a correct returning result seems to be these:

(require '[clojure.core.async :refer [chan go timeout <! >! alt!]])

(def logchan (chan 1))

(go (loop []
      (when-let [v (<! logchan)]
        (println v)
        (recur))))

(dotimes [_ 10] 
  (go 
    (let [result-chan  (chan 1)
          error-chan   (chan 1)
          timeout-chan (timeout 100)]
      (go
        (try 
          (do (<! (timeout (rand-int 200)))
              (>! result-chan (/ 1 (rand-int 2))))
          (catch Exception e (>! error-chan :error))))
      (>! logchan (alt! [result-chan error-chan timeout-chan] 
                    ([v] (if v v :timeout)))))))

This code prints something like

1
:error
1
:error
:error
:timeout
:error
:timeout
:timeout

This is not very elegant. I especially don't like the way of returning :error and :timeout. The nil-check in alt! is clearly not what I want either.

Is there some better way to accomplish the three goals of returning result, protect from long timeouts and handle errors? The syntax is quite OK (most things above are really to provoke those three errors).


Source: (StackOverflow)

Advertisements

Why is the threadpool for core.async in clojure created with fixed thread pool of # of cores times 2 plus 42?

The threadpool implementation in core.async clojure library uses a FixedThreadPoolExecutor of size = # of cores * 2 + 42.

(defonce the-executor
  (Executors/newFixedThreadPool
    (-> (Runtime/getRuntime)
        (.availableProcessors)
        (* 2)
        (+ 42))
    (conc/counted-thread-factory "async-dispatch-%d" true)))

Is there a reason to use these numbers (# of cores times 2 plus 42) in particular? Is this optimal for all devices? I just want to know how did rich hickey (and contributors) settled with these numbers.


Thank you nullptr.

Here is the discussion for those who are interested: http://clojure-log.n01se.net/date/2013-08-29.html#15:45a


Source: (StackOverflow)

Server push of data from Clojure to ClojureScript

I'm writing an application server in Clojure that will use ClojureScript on the client.

I'd like to find an efficient, idiomatic way to push data from the server to the client as realtime events, ideally using a some combination of:

  • http-kit
  • core.async
  • Ring

(But I'm open to other possibilities)

Can anyone provide a good example / approach to doing this?


Source: (StackOverflow)

core.async go block fails to compile when protocol invocation form contains

I was implementing a function involving a core.async go block, when I stumbled on a strange compilation error :

CompilerException java.lang.IllegalArgumentException: 
No method in multimethod '-item-to-ssa' for dispatch value: :protocol-invoke, 
compiling:(NO_SOURCE_PATH:2:3) 

I experimented a little to try and strip down the problem, and found it was very generic. Say I have any protocol MyProtocol :

(defprotocol MyProtocol
  (do-something [this param] "some method"))

The following code will not compile, failing with the exception I showed you above :

(defn uncompilable! [me ch] 
  (go 
    (do-something me (<! ch)) ;; apparently, it hurts to use <! in a protocol method invocation 
    ))

However, the following 2 will compile without any problem :

(defn compilable! [me ch] 
  (let [call-it #(do-something me %)] ; wrapping the protocol call in a function
    (go 
     (call-it (<! ch))
     )))

(defn compilable-2! [me ch] 
  (go 
    (let [my-value (<! ch)] ; taking out the <! call
      (do-something me my-value))
    ))

Apparently, this has to do with the -item-to-ssa multimethod than can be found in the clojure.core.async.impl.ioc-macros namespace.

It seems to me the '<! inside protocol method invocation form' is a situation which the go macro fails to handle.

Does someone have an explanation about this? Should I file a bug?


This happened using [org.clojure/core.async "0.1.346.0-17112a-alpha"] and both [org.clojure/clojure "1.7.0-alpha1"] and [org.clojure/clojure "1.6.0"].

FYI, this happened when I was implementing a Ring-type middleware to marry http-kit async web server and core.async.


Source: (StackOverflow)

How does one clearly structure dependencies between core.async channels?

Let's say I have a corpus of computations that I want to run asynchronously using core.async, but unfortunately a few of the functions depend on the output of other functions. How do I go about structuring this cleanly in my code, while also getting the best performance?

A few potential solutions I've come across are

  • Prismatic's Graph - seems reasonable, although I haven't tested it with core.async channels; the fact that it requires the use of fnk is a little off-putting for me because it requires buying into their DSL for function definitions, but if that's the best solution then I don't mind.
  • Javelin cells - only for ClojureScript (currently) and uses FRP instead of CSP as the implementation, but it does a very good job of modeling dependencies among computations via formula cells.
  • Onyx - made for distributed computation (as a competitor to Apache Storm, etc) but has a "workflow" abstraction that handles dependencies between computations and works with core.async. This seems like the closest fit to my problem domain, but I'm not sure if I need the overhead of all the cluster management features.

What's the canonical solution for this problem?

Edit: added Onyx


Source: (StackOverflow)

Is there an equivalent of Haskell's CHP for Scala?

Clojure has this amazing library implementing Tony Hoare's Communicating Sequential Processes called core.async.

Haskell appears to have an equivalent called chp. (Not sure if it compiles under GHC 7.8).

My question is Is there an equivalent of Haskell's CHP for Scala?


Source: (StackOverflow)

How to create a channel from another with transducers?

I want to create a channel of clojure.core.async from another one that just filters specific messages. Therefore I found a function called filter<.

=> (def c1 (chan))
=> (def c2 (filter< even? c1))
=> (put! c1 1)
=> (put! c1 2)
=> (<!! c2)
2

But the function and its friends are marked as deprecated:

Deprecated - this function will be removed. Use transducer instead

There are some ways to use channels with transducer like chan with the xform parameter. How can I build a new channel from an existing one using transducers?


Source: (StackOverflow)

Could core.async have implemented its functions in terms of sequences?

Rich Hickey's Strange Loop transducers presentation tells us that there are two implementations of map in Clojure 1.6, one for sequences in clojure.core and one for channels in core.async.

enter image description here

Now we know that in 1.7 we have transducers, for which a foldr (reduce) function is returned from higher order functions like map and filter when given a function but not a collection.

What I'm trying to articulate and failing, is why core.async functions can't return a sequence, or be Seq-like. I have a feeling that the 'interfaces' (protocols) are different but I can't see how.

Surely if you're taking the first item off a channel then you can represent that as taking the first item off a sequence?

My question is: Could core.async have implemented its functions in terms of sequences?


Source: (StackOverflow)

Extending functionality of clojure core.async

Is it recommended to extend the functionality of core.async with my own asynchronous functions?

The asynchrony of channels is handled by put! and take! which accept callbacks, but the protocols are nested in the async.impl.protocols namespace. Does impl mean stay out! in this case, or is it okay to implement them?

For example, I could wrap a netty channel or a java socket as a ReadPort and a WritePort.


Source: (StackOverflow)

Gracefully exit a Clojure core.async go loop on kill

I have a top-level core.async go loop. I want it to run indefinitely, at least until I signal it to stop with CTRL-C or kill or similar. I'm currently using java.lang.Runtime/addShutdownHook like this:

(ns async-demo.core
  (:require [clojure.core.async :as async
             :refer [<! >! <!! timeout chan alt! go]]))
(defn run [] (go (loop [] (recur))))
(.addShutdownHook (Runtime/getRuntime) (Thread. #(println "SHUTDOWN")))

Here are my problems:

  1. If I start the REPL and (run) then it starts and runs in a background thread. When I exit the REPL, I don't see the desired shutdown message.

  2. However, when I run from lein run, the go loop exits immediately and displays "SHUTDOWN".

Neither is what I want.

I don't necessarily expect to find a solution that works for all JVM's. I develop on a Mac and deploy to Ubuntu, so I'd like to find a solution that works for both:

  • Mac JVM: java version "1.7.0_45" Java(TM) SE Runtime Environment (build 1.7.0_45-b18) Java HotSpot(TM) 64-Bit Server VM (build 24.45-b08, mixed mode)

  • Ubuntu JVM: java version "1.7.0_25" OpenJDK Runtime Environment (IcedTea 2.3.10) (7u25-2.3.10-1ubuntu0.12.04.2) OpenJDK 64-Bit Server VM (build 23.7-b01, mixed mode)


Source: (StackOverflow)

Clojure core.async, CPU hangs after timeout. Anyway to properly kill macro thread produced by (go..) block?

Based on core.async walk through example, I created below similar code to handle some CPU intensive jobs using multiple channels with a timeout of 10 seconds. However after the main thread returns, the CPU usage remains around 700% (8 CPUs machine). I have to manually run nrepl-close in emacs to shut down the Java process.

Is there any proper way to kill macro thread produced by (go..) block ? I tried close! each chan, but it doesn't work. I want to make sure CPU usage back to 0 by Java process after main thread returns.

(defn [] RETURNED-STR-FROM-SOME-CPU-INTENSE-JOB (do...   (str ...)))


(let [n 1000
      cs (repeatedly n chan)]
  (doseq [c cs] 
    (go 
     (>! c  (RETURNED-STR-FROM-SOME-CPU-INTENSE-JOB ))))

  (dotimes [i n]
    (let [[result source] (alts!!  (conj cs (timeout 10000))) ]  ;;wait for 10 seconds for each job
      (if  (list-contains? cs source)  ;;if returned chan belongs to cs 
        (prn "OK JOB FINISHED " result)
        (prn "JOB TIMEOUT")
        )))

 (doseq [i cs]
   (close! i))  ;;not useful for "killing" macro thread

 (prn "JOBS ARE DONE"))

;;Btw list-contains? function is used to judge whether an element is in a list
;;http://stackoverflow.com/questions/3249334/test-whether-a-list-contains-a-specific-value-in-clojure
(defn list-contains? [coll value]
  (let [s (seq coll)]
    (if s
      (if (= (first s) value) true (recur (rest s) value))
      false)))

Source: (StackOverflow)

How do I unit test clojure.core.async go macros?

I'm trying to write unit tests when using core.async go macros. Writing the test naively, as follows, appears that the code inside the go blocks doesn't get executed.

(ns app.core-test
  (:require [clojure.test :refer :all]
            [clojure.core.async :as async]))

(deftest test1 []
  (let [chan (async/chan)]
    (async/go
     (is (= (async/<! chan) "Hello")))
    (async/go
     (async/>! chan "Hello"))))

I've managed to get the following working, but it's extremely hacky.

(deftest test1 []
  (let [result (async/chan)
        chan (async/chan)]
    (async/go
     (is (= (async/<! chan) "Hello"))
     (async/>! result true))
    (async/go
     (async/>! chan "Hello"))
    (async/alts!! [result (async/timeout 10000)])))

Any suggestions as how I can do this properly?


Source: (StackOverflow)

How to best shut down a clojure core.async pipeline of processes

I have a clojure processing app that is a pipeline of channels. Each processing step does its computations asynchronously (ie. makes a http request using http-kit or something), and puts it result on the output channel. This way the next step can read from that channel and do its computation.

My main function looks like this

(defn -main [args]
 (-> file/tmp-dir
  (schedule/scheduler)
  (search/searcher)
  (process/resultprocessor)
  (buy/buyer)
  (report/reporter)))

Currently, the scheduler step drives the pipeline (it hasn't got an input channel), and provides the chain with workload.

When I run this in the REPL:

(-main "some args")

It basically runs forever due to the infinity of the scheduler. What is the best way to change this architecture such that I can shut down the whole system from the REPL? Does closing each channel means the system terminates?

Would some broadcast channel help?


Source: (StackOverflow)

Is it sane to use core.async channels to consume http-kit's post results in clojure?

I am new to clojure and am writing a library that sends post results to a server for a response. I consume the response by placing it onto a core.async channel. Is this sane or is there a better way?

Here is a high level overview of what I am doing:

(defn my-post-request [channel options]
  (client/post http://www.example.com options
          (fn [{:keys [status headers body error]}] ;; asynchronous handle response
              (go (>! channel body)))))

(defn request-caller [options]
  (let [channel (chan)]
    (my-post-request channel options)
    (json/parse-string (<!! (go (<! channel))))))

Here is the actual code that I am using: https://github.com/gilmaso/btc-trading/blob/master/src/btc_trading/btc_china.clj#L63

It works, but I have had a hard time verifying if it is the right way to go about this.


Source: (StackOverflow)