зеркало из https://github.com/microsoft/spark.git
Remove unnecessary lock which was there to work around a bug in
Configuration in Hadoop 0.20.0
This commit is contained in:
Родитель
adaba4d550
Коммит
021c50a8d4
|
@ -34,7 +34,7 @@ class HadoopFile[K, V](
|
|||
keyClass: Class[K],
|
||||
valueClass: Class[V])
|
||||
extends RDD[(K, V)](sc) {
|
||||
@transient val splits_ : Array[Split] = ConfigureLock.synchronized {
|
||||
@transient val splits_ : Array[Split] = {
|
||||
val conf = new JobConf()
|
||||
FileInputFormat.setInputPaths(conf, path)
|
||||
val inputFormat = createInputFormat(conf)
|
||||
|
@ -53,13 +53,11 @@ extends RDD[(K, V)](sc) {
|
|||
val split = theSplit.asInstanceOf[HadoopSplit]
|
||||
var reader: RecordReader[K, V] = null
|
||||
|
||||
ConfigureLock.synchronized {
|
||||
val conf = new JobConf()
|
||||
val bufferSize = System.getProperty("spark.buffer.size", "65536")
|
||||
conf.set("io.file.buffer.size", bufferSize)
|
||||
val fmt = createInputFormat(conf)
|
||||
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
|
||||
}
|
||||
val conf = new JobConf()
|
||||
val bufferSize = System.getProperty("spark.buffer.size", "65536")
|
||||
conf.set("io.file.buffer.size", bufferSize)
|
||||
val fmt = createInputFormat(conf)
|
||||
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
|
||||
|
||||
val key: K = keyClass.newInstance()
|
||||
val value: V = valueClass.newInstance()
|
||||
|
@ -112,10 +110,3 @@ extends MappedRDD[String, (LongWritable, Text)](
|
|||
classOf[LongWritable], classOf[Text]),
|
||||
{ pair: (LongWritable, Text) => pair._2.toString }
|
||||
)
|
||||
|
||||
|
||||
/**
|
||||
* Object used to ensure that only one thread at a time is configuring Hadoop
|
||||
* InputFormat classes. Apparently configuring them is not thread safe!
|
||||
*/
|
||||
object ConfigureLock {}
|
||||
|
|
Загрузка…
Ссылка в новой задаче