EzDevInfo.com

apache-spark interview questions

Top apache-spark frequently asked interview questions

How to read multiple text files into a single RDD?

I want to read a bunch of text files from a hdfs location and perform mapping on it in an iteration using spark.

JavaRDD<String> records = ctx.textFile(args[1], 1); is capable of reading only one file at a time.

I want to read more than one file and process them as a single RDD. How?


Source: (StackOverflow)

Apache Spark vs. Apache Storm [closed]

What are the differences between Apache Spark and Apache Storm? What are suitable use cases for each one?


Source: (StackOverflow)

Advertisements

How to turn off INFO logging in PySpark?

I installed Spark using the AWS EC2 guide and I can launch the program fine using the bin/pyspark script to get to the spark prompt and can also do the Quick Start quide successfully.

However, I cannot for the life of me figure out how to stop all of the verbose INFO logging after each command.

I have tried nearly every possible scenario in the below code (commenting out, setting to OFF) within my log4j.properties file in the conf folder in where I launch the application from as well as on each node and nothing is doing anything. I still get the logging INFO statements printing after executing each statement.

I am very confused with how this is supposed to work.

#Set everything to be logged to the console log4j.rootCategory=INFO, console                                                                        
log4j.appender.console=org.apache.log4j.ConsoleAppender 
log4j.appender.console.target=System.err     
log4j.appender.console.layout=org.apache.log4j.PatternLayout 
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

Here is my full classpath when I use SPARK_PRINT_LAUNCH_COMMAND:

Spark Command: /Library/Java/JavaVirtualMachines/jdk1.8.0_05.jdk/Contents/Home/bin/java -cp :/root/spark-1.0.1-bin-hadoop2/conf:/root/spark-1.0.1-bin-hadoop2/conf:/root/spark-1.0.1-bin-hadoop2/lib/spark-assembly-1.0.1-hadoop2.2.0.jar:/root/spark-1.0.1-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar:/root/spark-1.0.1-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/root/spark-1.0.1-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark.repl.Main

contents of spark-env.sh:

#!/usr/bin/env bash

# This file is sourced when running various Spark programs.
# Copy it as spark-env.sh and edit that to configure Spark for your site.

# Options read when launching programs locally with 
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
# - SPARK_CLASSPATH=/root/spark-1.0.1-bin-hadoop2/conf/

# Options read by executors and drivers running inside the cluster
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
# - SPARK_CLASSPATH, default classpath entries to append
# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
# - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos

# Options read in YARN client mode
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_EXECUTOR_INSTANCES, Number of workers to start (Default: 2)
# - SPARK_EXECUTOR_CORES, Number of cores for the workers (Default: 1).
# - SPARK_EXECUTOR_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
# - SPARK_DRIVER_MEMORY, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
# - SPARK_YARN_APP_NAME, The name of your application (Default: Spark)
# - SPARK_YARN_QUEUE, The hadoop queue to use for allocation requests (Default: ‘default’)
# - SPARK_YARN_DIST_FILES, Comma separated list of files to be distributed with the job.
# - SPARK_YARN_DIST_ARCHIVES, Comma separated list of archives to be distributed with the job.

# Options for the daemons used in the standalone deploy mode:
# - SPARK_MASTER_IP, to bind the master to a different IP address or hostname
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker
# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers

export SPARK_SUBMIT_CLASSPATH="$FWDIR/conf"

Source: (StackOverflow)

Spark-What is the difference between cache and persist?

In terms of RDD persistence, What are the differences between cache() and persist() in spark ?


Source: (StackOverflow)

Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

Getting strange behavior when calling function outside of a closure:

  • when function is in a object everything is working
  • when function is in a class get :

    Task not serializable: java.io.NotSerializableException: testing
    

The problem is I need my code in a class and not an object. Any idea why this is happening? Is a Scala object serialized (default?)?

This is a working code example:

 object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
  }

This is the non-working example :

  object NOTworking extends App {
     new testing().doIT
  }
  //adding extends Serializable wont help
  class testing {

    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)

    def doIT =  {
      //again calling the fucntion someFunc 
      val after = rddList.map(someFunc(_))
      //this will crash (spark lazy)
      after.collect().map(println(_))
    }

    def someFunc(a:Int) = a+1

  }

Source: (StackOverflow)

Errors when using OFF_HEAP Storage with Spark 1.4.0 and Tachyon 0.6.4

I am trying to persist my RDD using off heap storage on spark 1.4.0 and tachyon 0.6.4 doing it like this :

val a = sqlContext.parquetFile("a1.parquet")
a.persist(org.apache.spark.storage.StorageLevel.OFF_HEAP)
a.count()

Afterwards I am getting the following exception.

Any ideas on that?

15/06/16 10:14:53 INFO : Tachyon client (version 0.6.4) is trying to connect master @ localhost/127.0.0.1:19998
15/06/16 10:14:53 INFO : User registered at the master localhost/127.0.0.1:19998 got UserId 3
15/06/16 10:14:53 INFO TachyonBlockManager: Created tachyon directory at /tmp_spark_tachyon/spark-6b2512ab-7bb8-47ca-b6e2-8023d3d7f7dc/driver/spark-tachyon-20150616101453-ded3
15/06/16 10:14:53 INFO BlockManagerInfo: Added rdd_10_3 on ExternalBlockStore on localhost:33548 (size: 0.0 B)
15/06/16 10:14:53 INFO BlockManagerInfo: Added rdd_10_1 on ExternalBlockStore on localhost:33548 (size: 0.0 B)
15/06/16 10:14:53 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 5710423667942934352
org.apache.spark.storage.BlockNotFoundException: Block rdd_10_3 not found
    at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:306)
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
    at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)

I have also tried the same with the text file and I was able to persist it in tachyon. The problem is with persisting DataFrame originally read from parquet.


Source: (StackOverflow)

What is the differences between Apache Spark and Apache Flink?

What are the differences between Apache Spark and Apache Flink?

Will Apache Flink replace Hadoop?


Source: (StackOverflow)

Spark Streaming: Why internal processing costs are so high to handle user state of a few MB?

Based on our experiments we see that stateful Spark Streaming internal processing costs take significant amount of time when state becomes more than a million of objects. As a result latency suffers, because we have to increase batch interval to avoid unstable behavior (processing time > batch interval).

It has nothing to do with specifics of our app, since it can be reproduced by code below.

What are exactly those Spark internal processing/infrastructure costs that take it so much time to handle user state? Is there any options to decrease processing time besides of simply increasing batch interval?

We planned to use state extensively: at least 100MB or so on a each of a few nodes to keep all data in memory and only dump it once in hour.

Increasing batch interval helps, but we want to keep batch interval minimal.

The reason is probably not space occupied by state, but rather large object graph, because when we changed list to large array of primitives, the problem gone.

Just a guess: it might has something to do with org.apache.spark.util.SizeEstimator used internally by Spark, because it shows up while profiling from time to time.

enter image description here

Here is simple demo to reproduce the picture above on modern iCore7:

  • less than 15 MB of state
  • no stream input at all
  • quickest possible (dummy) 'updateStateByKey' function
  • batch interval 1 second
  • checkpoint (required by Spark, must have) to local disk
  • tested both locally and on YARN

Code:

package spark;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.SizeEstimator;
import scala.Tuple2;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class SlowSparkStreamingUpdateStateDemo {

    // Very simple state model
    static class State implements Serializable {
        final List<String> data;
        State(List<String> data) {
            this.data = data;
        }
    }

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                // Tried KryoSerializer, but it does not seem to help much
                //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .setMaster("local[*]")
                .setAppName(SlowSparkStreamingUpdateStateDemo.class.getName());

        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(1));
        javaStreamingContext.checkpoint("checkpoint"); // a must (if you have stateful operation)

        List<Tuple2<String, State>> initialRddGeneratedData = prepareInitialRddData();
        System.out.println("Estimated size, bytes: " + SizeEstimator.estimate(initialRddGeneratedData));
        JavaPairRDD<String, State> initialRdd = javaStreamingContext.sparkContext().parallelizePairs(initialRddGeneratedData);

        JavaPairDStream<String, State> stream = javaStreamingContext
                .textFileStream(".") // fake: effectively, no input at all
                .mapToPair(input -> (Tuple2<String, State>) null) //  fake to get JavaPairDStream
                .updateStateByKey(
                        (inputs, maybeState) -> maybeState, // simplest possible dummy function
                        new HashPartitioner(javaStreamingContext.sparkContext().defaultParallelism()),
                        initialRdd); // set generated state

        stream.foreachRDD(rdd -> { // simplest possible action (required by Spark)
            System.out.println("Is empty: " + rdd.isEmpty());
            return null;
        });

        javaStreamingContext.start();
        javaStreamingContext.awaitTermination();
    }

    private static List<Tuple2<String, State>> prepareInitialRddData() {
        // 'stateCount' tuples with value = list of size 'dataListSize' of strings of length 'elementDataSize'
        int stateCount = 1000;
        int dataListSize = 200;
        int elementDataSize = 10;
        List<Tuple2<String, State>> initialRddInput = new ArrayList<>(stateCount);
        for (int stateIdx = 0; stateIdx < stateCount; stateIdx++) {
            List<String> stateData = new ArrayList<>(dataListSize);
            for (int dataIdx = 0; dataIdx < dataListSize; dataIdx++) {
                stateData.add(RandomStringUtils.randomAlphanumeric(elementDataSize));
            }
            initialRddInput.add(new Tuple2<>("state" + stateIdx, new State(stateData)));
        }
        return initialRddInput;
    }

}

Source: (StackOverflow)

Getting Spark, Python, and MongoDB to work together

I'm having difficulty getting these components to knit together properly. I have Spark installed and working succesfully, I can run jobs locally, standalone, and also via YARN. I have followed the steps advised (to the best of my knowledge) here and here

I'm working on Ubuntu and the various component versions I have are

I had some difficulty following the various steps such as which jars to add to which path, so what I have added are

  • in /usr/local/share/hadoop-2.6.1/share/hadoop/mapreduce I have added mongo-hadoop-core-1.5.0-SNAPSHOT.jar
  • the following environment variables
    • export HADOOP_HOME="/usr/local/share/hadoop-2.6.1"
    • export PATH=$PATH:$HADOOP_HOME/bin
    • export SPARK_HOME="/usr/local/share/spark-1.5.1-bin-hadoop2.6"
    • export PYTHONPATH="/usr/local/share/mongo-hadoop/spark/src/main/python"
    • export PATH=$PATH:$SPARK_HOME/bin

My Python program is basic

from pyspark import SparkContext, SparkConf
import pymongo_spark
pymongo_spark.activate()

def main():
    conf = SparkConf().setAppName("pyspark test")
    sc = SparkContext(conf=conf)
    rdd = sc.mongoRDD(
        'mongodb://username:password@localhost:27017/mydb.mycollection')

if __name__ == '__main__':
    main()

I am running it using the command

$SPARK_HOME/bin/spark-submit --driver-class-path /usr/local/share/mongo-hadoop/spark/build/libs/ --master local[4] ~/sparkPythonExample/SparkPythonExample.py

and I am getting the following output as a result

Traceback (most recent call last):
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 24, in <module>
    main()
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 17, in main
    rdd = sc.mongoRDD('mongodb://username:password@localhost:27017/mydb.mycollection')
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 161, in mongoRDD
    return self.mongoPairRDD(connection_string, config).values()
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 143, in mongoPairRDD
    _ensure_pickles(self)
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 80, in _ensure_pickles
    orig_tb)
py4j.protocol.Py4JError

According to here

This exception is raised when an exception occurs in the Java client code. For example, if you try to pop an element from an empty stack. The instance of the Java exception thrown is stored in the java_exception member.

Looking at the source code for pymongo_spark.py and the line throwing the error, it says

"Error while communicating with the JVM. Is the MongoDB Spark jar on Spark's CLASSPATH? : "

So in response I have tried to be sure the right jars are being passed, but I might be doing this all wrong, see below

$SPARK_HOME/bin/spark-submit --jars /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar --driver-class-path /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar --master local[4] ~/sparkPythonExample/SparkPythonExample.py

I have imported pymongo to the same python program to verify that I can at least access MongoDB using that, and I can.

I know there are quite a few moving parts here so if I can provide any more useful information please let me know.


Source: (StackOverflow)

When are accumulators truly reliable?

I want to use an accumulator to gather some stats about the data I'm manipulating on a Spark job. Ideally, I would do that while the job computes the required transformations, but since Spark would re-compute tasks on different cases the accumulators would not reflect true metrics. Here is how the documentation describes this:

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

This is confusing since most actions do not allow running custom code (where accumulators can be used), they mostly take the results from previous transformations (lazily). The documentation also shows this:

val acc = sc.accumulator(0)
data.map(x => acc += x; f(x))
// Here, acc is still 0 because no actions have cause the `map` to be computed.

But if we add data.count() at the end, would this be guaranteed to be correct (have no duplicates) or not? Clearly acc is not used "inside actions only", as map is a transformation. So it should not be guaranteed.

On the other hand, discussion on related Jira tickets talk about "result tasks" rather than "actions". For instance here and here. This seems to indicate that the result would indeed be guaranteed to be correct, since we are using acc immediately before and action and thus should be computed as a single stage.

I'm guessing that this concept of a "result task" has to do with the type of operations involved, being the last one that includes an action, like in this example, which shows how several operations are divided into stages (in magenta, image taken from here):

A job dividing several operations into multiple purple stages

So hypothetically, a count() action at the end of that chain would be part of the same final stage, and I would be guaranteed that accumulators used on the last map will no include any duplicates?

Clarification around this issue would be great! Thanks.


Source: (StackOverflow)

What's the most efficient way to filter a DataFrame

... by checking whether a columns' value is in a seq.
Perhaps I'm not explaining it very well, I basically want this (to express it using regular SQL): DF_Column IN seq?

First I did it using a broadcast var (where I placed the seq), UDF (that did the checking) and registerTempTable.
The problem is that I didn't get to test it since I ran into a known bug that apparently only appears when using registerTempTable with ScalaIDE.

I ended up creating a new DataFrame out of seq and doing inner join with it (intersection), but I doubt that's the most performant way of accomplishing the task.

Thanks

EDIT: (in response to @YijieShen):
How to do filter based on whether elements of one DataFrame's column are in another DF's column (like SQL select * from A where login in (select username from B))?

E.g: First DF:

login      count
login1     192  
login2     146  
login3     72   

Second DF:

username
login2
login3
login4

The result:

login      count
login2     146  
login3     72   

Attempts:
EDIT-2: I think, now that the bug is fixed, these should work. END EDIT-2

ordered.select("login").filter($"login".contains(empLogins("username")))

and

ordered.select("login").filter($"login" in empLogins("username"))

which both throw Exception in thread "main" org.apache.spark.sql.AnalysisException, respectively:

resolved attribute(s) username#10 missing from login#8 in operator 
!Filter Contains(login#8, username#10);

and

resolved attribute(s) username#10 missing from login#8 in operator 
!Filter login#8 IN (username#10);

Source: (StackOverflow)

Spark performance for Scala vs Python

I prefer Python over Scala. But, as Spark is natively written in Scala, I was expecting my code to run faster in the Scala than the Python version for obvious reasons.

With that assumption, I thought to learn & write the Scala version of some very common preprocessing code for some 1 GB of data. Data is picked from the SpringLeaf competition on Kaggle. Just to give an overview of the data (it contains 1936 dimensions and 145232 rows). Data is composed of various types e.g. int, float, string, boolean. I am using 6 cores out of 8 for Spark processing; that's why I used minPartitions=6 so that every core has something to process.

Scala Code

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Python Code

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"')) for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

Scala Performance Stage 0 (38 mins), Stage 1 (18 sec) enter image description here

Python Performance Stage 0 (11 mins), Stage 1 (7 sec) enter image description here

Both produces different DAG visualisation graphs (due to which both pics show different stage 0 functions for Scala (map) and Python (reduceByKey))

But, essentially both code tries to transform data into (dimension_id, string of list of values) RDD and save to disk. The output will be used to compute various statistics for each dimension.

Performance wise, Scala code for this real data like this seems to run 4 times slower than the Python version. Good news for me is that it gave me good motivation to stay with Python. Bad news is I didn't quite understand why?


Source: (StackOverflow)

Why can't PySpark find py4j.java_gateway?

I installed Spark, ran the sbt assembly, and can open bin/pyspark with no problem. However, I am running into problems loading the pyspark module into ipython. I'm getting the following error:

In [1]: import pyspark
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
<ipython-input-1-c15ae3402d12> in <module>()
----> 1 import pyspark

/usr/local/spark/python/pyspark/__init__.py in <module>()
     61
     62 from pyspark.conf import SparkConf
---> 63 from pyspark.context import SparkContext
     64 from pyspark.sql import SQLContext
     65 from pyspark.rdd import RDD

/usr/local/spark/python/pyspark/context.py in <module>()
     28 from pyspark.conf import SparkConf
     29 from pyspark.files import SparkFiles
---> 30 from pyspark.java_gateway import launch_gateway
     31 from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
     32     PairDeserializer, CompressedSerializer

/usr/local/spark/python/pyspark/java_gateway.py in <module>()
     24 from subprocess import Popen, PIPE
     25 from threading import Thread
---> 26 from py4j.java_gateway import java_import, JavaGateway, GatewayClient
     27
     28

ImportError: No module named py4j.java_gateway

Source: (StackOverflow)

how to make saveAsTextFile NOT split output into multiple file?

When using Scala in Spark, whenever I dump result out using saveAsTextFile, it seems to split the output into multiple part. I'm just passing a parameter(path) to it.

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")

does the number of output correspond to the number of reducer it uses? does this mean the output is compressed?

I know I can combine the output together using bash, but is there an option to store the output in a single text file, without splitting?? I looked at the API docs, but it doesn't say much about this.


Source: (StackOverflow)

What is the difference between Apache Mahout and Apache Spark's MLlib?

Considering a MySQL products database with 10 millions products for an e-commerce website.

I'm trying to set up a classification module to categorize products. I'm using Apache Sqoop to import data from MySQL to Hadoop.

I wanted to use Mahout over it as a Machine Learning framework to use one of it's Classification algorithms, and than I ran into Spark which is provided with MLlib

  • So what is the difference between the two frameworks?
  • Mainly, what are the advantages,down-points and limitations of each?

Source: (StackOverflow)