EzDevInfo.com

flambo

A Clojure DSL for Apache Spark

Leiningen checkouts when library has a slash in its name

I'm trying to use the Checkout Dependencies feature in Leiningen to work on a project that uses a checkout of Flambo. My project.clj looks something like:

(defproject spark-streaming "0.1.0-SNAPSHOT"

:dependencies [[org.clojure/clojure "1.5.1"]
             [yieldbot/flambo "0.4.0-SNAPSHOT"]
             [amazonica "0.2.29"]
             [clj-time "0.8.0"]] ;other stuff omitted

My directory structure looks like this:

|- checkouts
  |- <need symlink to yieldbot/flambo>  
|- src
|- project.clj

What I tried:

  1. Since the library name contains a slash, I can't make a symlink named "yieldbot/flambo" to the actual location of Flambo.
  2. I can't do this either:

    |- checkouts |- yieldbot |- flambo //this is a symlink

because Lein expects a project.clj in the yieldbot directory. Doing lein classpath confirms that my checkout isn't being used.

  1. Directly checkout flambo into checkouts/yieldbot. Doesn't work, for the same reason as above

What can I do?


Source: (StackOverflow)

Results of updating a vector in clojure

I have a function in developed in clojure using flambo spark api functions

(:require [flambo.api :as f]
          [clojure.string :as s])

(defn get-distinct-column-val
 "input = {:col val}"
 [ xctx input ]
 (let [{:keys [ col ]} input
       column-values []
       result (f/map (:rdd xctx) (f/fn [row]
                                   (if (s/blank? (get row col))
                                       nil
                                       (assoc column-values (get row col)))))]
   (println "Col values: " column-values)
   (distinct column-values)))

And I try to print out the values of column-values and I'm getting

 Col values:  []

Is there any reason as to why this is so?

I tried replacing the println in the above function with this:

(println "Result: " result)

and got the following:

 #<JavaRDD MapPartitionsRDD[16] at map at NativeMethodAccessorImpl.java:-2>

Thanks!


Source: (StackOverflow)

Advertisements

Alternative to mutable data structure in clojure [duplicate]

This question already has an answer here:

I developed a function in clojure to fill in an empty column from the last non-empty value, I'm using flambo which is an apache-spark wrapper using clojure for some of its functions

(:require [flambo.api :as f])

(defn replicate-val
  [rdd input ]
  (let [{:keys [ col ]} input
    prev-col-val (atom [])
    result (f/map rdd (f/fn [ row ]
                                (if-not (s/blank? (get row col))
                                  (do
                                    (swap! prev-col-val assoc 0 (get row col))
                                    row)
                                  (assoc row col (get @prev-col-val 0)))))]
result))

I don't really like the idea of mutating prev-col-val to keep track of state, Any ideas on how to refactor the above to preserve clojure's immutable data structure?

Input is of the form:

[["04" "2" "3"] ["04" "" "5"] ["5" "16" ""] ["07" "" "36"] ["07" "" "34"] ["07" "25" "34"]]

and desired output is:

[["04" "2" "3"] ["04" "2" "5"] ["5" "16" ""] ["07" "16" "36"] ["07" "16" "34"] ["07" "25" "34"]]

Thanks


Source: (StackOverflow)

Convert clojure vector to flambo sql row

I'm working on developing a function to convert a vector into an sql row to further convert it to a data-frame and save it into table using SQLcontext in Apache spark. I'm developing in clojure and I got lost along the way. I thought of implementing the solution thus:

  1. For each rdd (vector) convert it to rows
  2. Convert the rows to a data frame
  3. Save data frame to a table
  4. use the sqlContext to query for particular information in the table
  5. and how to convert the result from query into into RDD again for further analysis.

    (defn assign-ecom 
      []
       (let [rdd-fields (-> (:rdd @transformed-rdd)
                     (f/map #(sql/row->vec %))
                      f/collect)]
         (clojure.pprint/pprint rdd-fields)))
    

I'm using flambo v0.60 api function for abstracting Apache-spark functions, I also welcome any suggestion as to how to go about solving the problem. Thanks

Here's the link to Flambo row -> vec docs:

Flambo documentation:


Source: (StackOverflow)

Convert from clojure.lang.LazySeq to type org.apache.spark.api.java.JavaRDD

I developed a function in clojure to fill in an empty column from the last non-empty value, I'm assuming this works, given

(:require [flambo.api :as f])

(defn replicate-val
  [ rdd input ]
  (let [{:keys [ col ]} input
    result (reductions (fn [a b]
                         (if (empty? (nth b col))
                           (assoc b col (nth a col))
                           b)) rdd )]
(println "Result type is: "(type result))))

Got this:

;=> "Result type is:  clojure.lang.LazySeq"

The question is how do I convert this back to type JavaRDD, using flambo (spark wrapper)

I tried (f/map result #(.toJavaRDD %)) in the let form to attempt to convert to JavaRDD type

I got this error

"No matching method found: map for class clojure.lang.LazySeq"

which is expected because result is of type clojure.lang.LazySeq

Question is how to I make this conversion, or how can I refactor the code to accomodate this.

Here is a sample input rdd:

(type rdd) ;=> "org.apache.spark.api.java.JavaRDD"

But looks like:

[["04" "2" "3"] ["04" "" "5"] ["5" "16" ""] ["07" "" "36"] ["07" "" "34"] ["07" "25" "34"]]

Required output is:

[["04" "2" "3"] ["04" "2" "5"] ["5" "16" ""] ["07" "16" "36"] ["07" "16" "34"] ["07" "25" "34"]]

Thanks.


Source: (StackOverflow)

Untuple a Clojure sequence

I have a function that is deduplicating with preference, I thought of implementing the solution in clojure using flambo function thus:

  1. From the data set, using the group-by, to group duplicates (i.e based on a specified :key)

  2. Given a :val as input, using a filter to check if the some of values for each row are equal to this :val

  3. Use a map to untuple the duplicates to return single vectors (Not very sure if that is the right way though, I tried using a flat-map without any luck)

For a sample data-set

(def rdd
   (f/parallelize sc [ ["Coke" "16" ""] ["Pepsi" "" "5"] ["Coke" "2" "3"] ["Coke" "" "36"] ["Pepsi" "" "34"] ["Pepsi" "25" "34"]]))

I tried this:

(defn dedup-rows
 [rows input]
 (let [{:keys [key-col col val]} input  
      result (-> rows
               (f/group-by (f/fn [row]
                            (get row key-col)))
              (f/values)
              (f/map (f/fn [rows]
                (if (= (count rows) 1)
                  rows
                  (filter (fn [row]
                            (let [col-val (get row col)
                                  equal? (= col-val val)]
                              (if (not equal?)
                               true
                               false))) rows)))))]
    result))

if I run this function thus:

(dedup-rows rdd {:key-col 0 :col 1 :val ""})

it produces

;=> [(["Pepsi" 25 34]), (["Coke" 16 ] ["Coke" 2 3])]]

I don't know what else to do to handle the result to produce a result of

;=> [["Pepsi" 25 34],["Coke" 16 ],["Coke" 2 3]]

I tried f/map f/untuple as the last form in the -> macro with no luck.

Any suggestions? I will really appreciate if there's another way to go about this. Thanks.

PS: when grouped

;=> [[["Pepsi" "" 5], ["Pepsi" "" 34], ["Pepsi" 25 34]], [["Coke" 16 ""], ["Coke" 2 3], ["Coke" "" 36]]]

For each group, rows that have"" are considered duplicates and hence removed from the group.


Source: (StackOverflow)

RDDs and vectors in clojure

Given a function to filter out dates that are greater than the maximum date determined from a subset of a given dataset (RDD) and hence use the maximum date determined to check if a given vector contains a date value greater than the maximum date determined I tried the following:

(defn future-rows
  "input = { :col val :qty-col val}
   :col = Date column reference"
   [ row input ]
   (let [{:keys [ col qty-col ]} input
     get-qty-max-date (-> (:rdd @scope-rdd)
                       (f/map #(if-not (or (s/blank? (get % qty-col)) (not (pos? (read-string (get % qty-col)))))
                                 (get % col) false))
                       (f/reduce #(if (pos? (compare % %2)) %1 %2)))]
(when-not (pos? (compare (get row col) get-qty-max-date)) row)))

Here row is a vector. The challenge I have is the get-qty-max-date is of type RDD. How do I make the comparison in the when-not form?

NB: The idea is the future-rows function is going to be used as a predicate

Given an RDD:

[[" " "2009/12/02"] ["4" "2005/02/08"] ["0" "2014/12/02"] ["5" "2005/08/01"] ["2" "2007/09/02"]]

When future-rows is used as a predicate, the desired output is:

[["4" "2005/02/08"] ["5" "2005/08/01"] ["2" "2007/09/02"]]

where input is input { :col 1 :qty-col 0 } for the above function the maximum-date determined is 2007/09/02. Hence dates 2009/12/02 and 2014/12/02 which are greater is removed from the data set.

If there's any other approach on how to go about doing this, I will appreciate it.

So say we have a main function that does this

(defn remove-rows [xctx input]
  (f/filter (:rdd xctx) #(future-rows row { :col 1 :qty-col 0 }))

will produce the desired output

Thanks!


Source: (StackOverflow)

Access file name in Clojure Flambo Library

In the Clojure Flambo library that wraps Apache Spark how can i access the file name for the present element in an RDD?


Source: (StackOverflow)

zipWithUniqueId() in flambo using clojure

I want to create a rdd such that each row has an index. I tried the following

Given an rdd:

["a" "b" "c"] 

(defn make-row-index [input]
  (let [{:keys [col]} input]
    (swap! @rdd assoc :rdd (-> (:rdd xctx)
                          (f/map #(vector %1 %2 ) (range))))))

Desired output:

 (["a" 0] ["b" 1] ["c" 2])

I got an arity error, since f/map is used as (f/map rdd fn) Wanted to use zipWithUniqueId() in apache spark but I'm lost on how to implement this and I cant find equivalent function in flambo. Any suggestion and help is appreciated.

Apache-spark zip with Index

Map implementation in flambo

Thanks


Source: (StackOverflow)

Extraneous groupBy in spark DAG

According to the spark DAG vizualization there is a groupBy being performed in Stage 1 after the groupBy being performed in Stage 0. I only have one groupBy in my code and wouldn't expect any of the other transformations I'm doing to result in a groupBy.

Here's the code (clojure / flambo):

;; stage 0
(-> (.textFile sc path 8192)
    (f/map (f/fn [msg] (json/parse-string msg true)))
    (f/group-by (f/fn [msg] (:mmsi msg)) 8192)

;; stage 1
    (f/map-values (f/fn [values] (sort-by :timestamp (vec values))))
    (f/flat-map (ft/key-val-fn (f/fn [mmsi messages]
                                 (let [state-map (atom {}) draught-map (atom {})]
                                   (map #(mk-line % state-map draught-map) (vec messages))))))
  (f/map (f/fn [line] (json/generate-string line)))
  (f/save-as-text-file path)))

It's clear to me how Stage 0 is the sequence textFile, map, groupBy and Stage 1 is map-values, map-values, flat-map, map, saveAsTextFile, but where does the groupBy in stage 1 come from?

enter image description here

Since groupBy causes a shuffle which is computationally expensive and time-consuming I don't want an extraneous one if it can be helped.


Source: (StackOverflow)

Implementing a flambo mapValues function in clojure

I have a clojure function that uses the flambo v0.60 functions api to do some analysis on a sample data set. I noticed that when I use a (get rdd 2) instead of getting the second element in the rdd collection, its getting the second character of the first element of the rdd collection. My assumption is clojure is treating each row of the rdd collection as a whole string and not a vector for me to be able to get the second element in the collection. I'm thinking of using the map-values function to convert the mapped values into a vector for which I can get the second element, I tried this:

(defn split-on-tab-transformation [xctx input]
 (assoc xctx :rdd (-> (:rdd xctx)
                   (spark/map (spark/fn [row] (s/split row #"\t")))
                   (spark/map-values vec)))) 

Unfortunately I got an error: java.lang.IllegalArgumentException: No matching method found: mapValues for class org.apache.spark.api.java.JavaRDD...

This is code returns the first collection in the rdd: (assuming I removed the (spark/map-values vec) in the above function

(defn get-distinct-column-val
 "input = {:col val}"
  [ xctx input ]
  (let [rdds (-> (:rdd xctx)
           (f/map (f/fn [row] row))
           f/first)]
(clojure.pprint/pprint rdds)))

Output:

[2.00000 770127      200939.000000   \t6094\tBENTONVILLE, AR DPS\t22.500000\t5.000000\t2.500000\t5.000000\t0.000000\t0.000000\t0.000000\t0.000000\t0.000000\t1\tStore Tab\t0.000000\t4.50\t3.83\t5.00\t0.000000\t0.000000\t0.000000\t0.000000\t19.150000]

if I try to get the second element 770127

(defn get-distinct-column-val
 "input = {:col val}"
  [ xctx input ]
  (let [rdds (-> (:rdd xctx)
           (f/map (f/fn [row] row))
           f/first)]
   (clojure.pprint/pprint (get rdds 1)))

I get :

[\.]

Flambo documentation for map-values

I'm new to clojure and I'd appreciate any help. Thanks


Source: (StackOverflow)