spark/docs/configuration.md

13 KiB

layout title
global Spark Configuration

Spark provides three main locations to configure the system:

  • Environment variables for launching Spark workers, which can be set either in your driver program or in the conf/spark-env.sh script.
  • Java system properties, which control internal configuration parameters and can be set either programmatically (by calling System.setProperty before creating a SparkContext) or through the SPARK_JAVA_OPTS environment variable in spark-env.sh.
  • Logging configuration, which is done through log4j.properties.

Environment Variables

Spark determines how to initialize the JVM on worker nodes, or even on the local node when you run spark-shell, by running the conf/spark-env.sh script in the directory where it is installed. This script does not exist by default in the Git repository, but but you can create it by copying conf/spark-env.sh.template. Make sure that you make the copy executable.

Inside spark-env.sh, you must set at least the following two variables:

  • SCALA_HOME, to point to your Scala installation, or SCALA_LIBRARY_PATH to point to the directory for Scala library JARs (if you install Scala as a Debian or RPM package, there is no SCALA_HOME, but these libraries are in a separate path, typically /usr/share/java; look for scala-library.jar).
  • MESOS_NATIVE_LIBRARY, if you are running on a Mesos cluster.

In addition, there are four other variables that control execution. These should be set in the environment that launches the job's driver program instead of spark-env.sh, because they will be automatically propagated to workers. Setting these per-job instead of in spark-env.sh ensures that different jobs can have different settings for these variables.

  • SPARK_JAVA_OPTS, to add JVM options. This includes any system properties that you'd like to pass with -D.
  • SPARK_CLASSPATH, to add elements to Spark's classpath.
  • SPARK_LIBRARY_PATH, to add search directories for native libraries.
  • SPARK_MEM, to set the amount of memory used per node. This should be in the same format as the JVM's -Xmx option, e.g. 300m or 1g. Note that this option will soon be deprecated in favor of the spark.executor.memory system property, so we recommend using that in new code.

Beware that if you do set these variables in spark-env.sh, they will override the values set by user programs, which is undesirable; if you prefer, you can choose to have spark-env.sh set them only if the user program hasn't, as follows:

{% highlight bash %} if [ -z "$SPARK_JAVA_OPTS" ] ; then SPARK_JAVA_OPTS="-verbose:gc" fi {% endhighlight %}

System Properties

To set a system property for configuring Spark, you need to either pass it with a -D flag to the JVM (for example java -Dspark.cores.max=5 MyProgram) or call System.setProperty in your code before creating your Spark context, as follows:

{% highlight scala %} System.setProperty("spark.cores.max", "5") val sc = new SparkContext(...) {% endhighlight %}

Most of the configurable system properties control internal settings that have reasonable default values. However, there are at least five properties that you will commonly want to control:

Property NameDefaultMeaning
spark.executor.memory 512m Amount of memory to use per executor process, in the same format as JVM memory strings (e.g. `512m`, `2g`).
spark.serializer spark.JavaSerializer Class to use for serializing objects that will be sent over the network or need to be cached in serialized form. The default of Java serialization works with any Serializable Java object but is quite slow, so we recommend using spark.KryoSerializer and configuring Kryo serialization when speed is necessary. Can be any subclass of spark.Serializer).
spark.kryo.registrator (none) If you use Kryo serialization, set this class to register your custom classes with Kryo. You need to set it to a class that extends spark.KryoRegistrator). See the tuning guide for more details.
spark.local.dir /tmp Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories.
spark.cores.max (infinite) When running on a standalone deploy cluster or a Mesos cluster in "coarse-grained" sharing mode, how many CPU cores to request at most. The default will use all available cores.

Apart from these, the following properties are also available, and may be useful in some situations:

Property NameDefaultMeaning
spark.mesos.coarse false If set to "true", runs over Mesos clusters in "coarse-grained" sharing mode, where Spark acquires one long-lived Mesos task on each machine instead of one Mesos task per Spark task. This gives lower-latency scheduling for short queries, but leaves resources in use for the whole duration of the Spark job.
spark.default.parallelism 8 Default number of tasks to use for distributed shuffle operations (groupByKey, reduceByKey, etc) when not set by user.
spark.storage.memoryFraction 0.66 Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" generation of objects in the JVM, which by default is given 2/3 of the heap, but you can increase it if you configure your own old generation size.
spark.ui.port 3030 Port for your application's dashboard, which shows memory and workload data
spark.ui.retained_stages 1000 How many stages the Spark UI remembers before garbage collecting.
spark.shuffle.compress true Whether to compress map output files. Generally a good idea.
spark.broadcast.compress true Whether to compress broadcast variables before sending them. Generally a good idea.
spark.rdd.compress false Whether to compress serialized RDD partitions (e.g. for StorageLevel.MEMORY_ONLY_SER). Can save substantial space at the cost of some extra CPU time.
spark.io.compression.codec spark.io.SnappyCompressionCodec The compression codec class to use for various compressions. By default, Spark provides two codecs: spark.io.LZFCompressionCodec and spark.io.SnappyCompressionCodec.
spark.io.compression.snappy.block.size 32768 Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec is used.
spark.reducer.maxMbInFlight 48 Maximum size (in megabytes) of map outputs to fetch simultaneously from each reduce task. Since each output requires us to create a buffer to receive it, this represents a fixed memory overhead per reduce task, so keep it small unless you have a large amount of memory.
spark.closure.serializer spark.JavaSerializer Serializer class to use for closures. Generally Java is fine unless your distributed functions (e.g. map functions) reference large objects in the driver program.
spark.kryo.referenceTracking true Whether to track references to the same object when serializing data with Kryo, which is necessary if your object graphs have loops and useful for efficiency if they contain multiple copies of the same object. Can be disabled to improve performance if you know this is not the case.
spark.kryoserializer.buffer.mb 2 Maximum object size to allow within Kryo (the library needs to create a buffer at least as large as the largest single object you'll serialize). Increase this if you get a "buffer limit exceeded" exception inside Kryo. Note that there will be one buffer per core on each worker.
spark.broadcast.factory spark.broadcast.HttpBroadcastFactory Which broadcast implementation to use.
spark.locality.wait 3000 Number of milliseconds to wait to launch a data-local task before giving up and launching it on a less-local node. The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any). It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc. You should increase this setting if your tasks are long and see poor locality, but the default usually works well.
spark.locality.wait.process spark.locality.wait Customize the locality wait for process locality. This affects tasks that attempt to access cached data in a particular executor process.
spark.locality.wait.node spark.locality.wait Customize the locality wait for node locality. For example, you can set this to 0 to skip node locality and search immediately for rack locality (if your cluster has rack information).
spark.locality.wait.rack spark.locality.wait Customize the locality wait for rack locality.
spark.worker.timeout 60 Number of seconds after which the standalone deploy master considers a worker lost if it receives no heartbeats.
spark.akka.frameSize 10 Maximum message size to allow in "control plane" communication (for serialized tasks and task results), in MB. Increase this if your tasks need to send back large results to the driver (e.g. using collect() on a large dataset).
spark.akka.threads 4 Number of actor threads to use for communication. Can be useful to increase on large clusters when the driver has a lot of CPU cores.
spark.akka.timeout 20 Communication timeout between Spark nodes, in seconds.
spark.driver.host (local hostname) Hostname or IP address for the driver to listen on.
spark.driver.port (random) Port for the driver to listen on.
spark.cleaner.ttl (disable) Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks generated, etc.). Periodic cleanups will ensure that metadata older than this duration will be forgetten. This is useful for running Spark for many hours / days (for example, running 24/7 in case of Spark Streaming applications). Note that any RDD that persists in memory for more than this duration will be cleared as well.
spark.streaming.blockInterval 200 Duration (milliseconds) of how long to batch new objects coming from network receivers.
spark.task.maxFailures 4 Number of individual task failures before giving up on the job. Should be greater than or equal to 1. Number of allowed retries = this value - 1.

Configuring Logging

Spark uses log4j for logging. You can configure it by adding a log4j.properties file in the conf directory. One way to start is to copy the existing log4j.properties.template located there.