зеркало из https://github.com/microsoft/spark.git
Removed stream id from the constructor of NetworkReceiver to make it easier for PluggableNetworkInputDStream.
This commit is contained in:
Родитель
365506fb03
Коммит
0a2e333341
|
@ -23,16 +23,17 @@ class TwitterInputDStream(
|
|||
) extends NetworkInputDStream[Status](ssc_) {
|
||||
|
||||
override def createReceiver(): NetworkReceiver[Status] = {
|
||||
new TwitterReceiver(id, username, password, filters, storageLevel)
|
||||
new TwitterReceiver(username, password, filters, storageLevel)
|
||||
}
|
||||
}
|
||||
|
||||
class TwitterReceiver(streamId: Int,
|
||||
username: String,
|
||||
password: String,
|
||||
filters: Seq[String],
|
||||
storageLevel: StorageLevel
|
||||
) extends NetworkReceiver[Status](streamId) {
|
||||
class TwitterReceiver(
|
||||
username: String,
|
||||
password: String,
|
||||
filters: Seq[String],
|
||||
storageLevel: StorageLevel
|
||||
) extends NetworkReceiver[Status] {
|
||||
|
||||
var twitterStream: TwitterStream = _
|
||||
lazy val blockGenerator = new BlockGenerator(storageLevel)
|
||||
|
||||
|
|
|
@ -18,7 +18,10 @@ private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: Act
|
|||
private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage
|
||||
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
|
||||
|
||||
|
||||
/**
|
||||
* This class manages the execution of the receivers of NetworkInputDStreams.
|
||||
*/
|
||||
private[streaming]
|
||||
class NetworkInputTracker(
|
||||
@transient ssc: StreamingContext,
|
||||
@transient networkInputStreams: Array[NetworkInputDStream[_]])
|
||||
|
@ -32,16 +35,20 @@ class NetworkInputTracker(
|
|||
|
||||
var currentTime: Time = null
|
||||
|
||||
/** Start the actor and receiver execution thread. */
|
||||
def start() {
|
||||
ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker")
|
||||
receiverExecutor.start()
|
||||
}
|
||||
|
||||
/** Stop the receiver execution thread. */
|
||||
def stop() {
|
||||
// TODO: stop the actor as well
|
||||
receiverExecutor.interrupt()
|
||||
receiverExecutor.stopReceivers()
|
||||
}
|
||||
|
||||
/** Return all the blocks received from a receiver. */
|
||||
def getBlockIds(receiverId: Int, time: Time): Array[String] = synchronized {
|
||||
val queue = receivedBlockIds.synchronized {
|
||||
receivedBlockIds.getOrElse(receiverId, new Queue[String]())
|
||||
|
@ -53,6 +60,7 @@ class NetworkInputTracker(
|
|||
result.toArray
|
||||
}
|
||||
|
||||
/** Actor to receive messages from the receivers. */
|
||||
private class NetworkInputTrackerActor extends Actor {
|
||||
def receive = {
|
||||
case RegisterReceiver(streamId, receiverActor) => {
|
||||
|
@ -83,7 +91,8 @@ class NetworkInputTracker(
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/** This thread class runs all the receivers on the cluster. */
|
||||
class ReceiverExecutor extends Thread {
|
||||
val env = ssc.env
|
||||
|
||||
|
@ -97,13 +106,22 @@ class NetworkInputTracker(
|
|||
stopReceivers()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the receivers from the NetworkInputDStreams, distributes them to the
|
||||
* worker nodes as a parallel collection, and runs them.
|
||||
*/
|
||||
def startReceivers() {
|
||||
val receivers = networkInputStreams.map(_.createReceiver())
|
||||
val receivers = networkInputStreams.map(nis => {
|
||||
val rcvr = nis.createReceiver()
|
||||
rcvr.setStreamId(nis.id)
|
||||
rcvr
|
||||
})
|
||||
|
||||
// Right now, we only honor preferences if all receivers have them
|
||||
val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _)
|
||||
|
||||
// Create the parallel collection of receivers to distributed them on the worker nodes
|
||||
val tempRDD =
|
||||
if (hasLocationPreferences) {
|
||||
val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
|
||||
|
@ -113,21 +131,21 @@ class NetworkInputTracker(
|
|||
ssc.sc.makeRDD(receivers, receivers.size)
|
||||
}
|
||||
|
||||
// Function to start the receiver on the worker node
|
||||
val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => {
|
||||
if (!iterator.hasNext) {
|
||||
throw new Exception("Could not start receiver as details not found.")
|
||||
}
|
||||
iterator.next().start()
|
||||
}
|
||||
// Distribute the receivers and start them
|
||||
ssc.sc.runJob(tempRDD, startReceiver)
|
||||
}
|
||||
|
||||
/** Stops the receivers. */
|
||||
def stopReceivers() {
|
||||
//implicit val ec = env.actorSystem.dispatcher
|
||||
// Signal the receivers to stop
|
||||
receiverInfo.values.foreach(_ ! StopReceiver)
|
||||
//val listOfFutures = receiverInfo.values.map(_.ask(StopReceiver)(timeout)).toList
|
||||
//val futureOfList = Future.sequence(listOfFutures)
|
||||
//Await.result(futureOfList, timeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ class FlumeInputDStream[T: ClassManifest](
|
|||
) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
|
||||
|
||||
override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = {
|
||||
new FlumeReceiver(id, host, port, storageLevel)
|
||||
new FlumeReceiver(host, port, storageLevel)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -112,11 +112,10 @@ class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
|
|||
* Flume Avro interface.*/
|
||||
private[streaming]
|
||||
class FlumeReceiver(
|
||||
streamId: Int,
|
||||
host: String,
|
||||
port: Int,
|
||||
storageLevel: StorageLevel
|
||||
) extends NetworkReceiver[SparkFlumeEvent](streamId) {
|
||||
) extends NetworkReceiver[SparkFlumeEvent] {
|
||||
|
||||
lazy val blockGenerator = new BlockGenerator(storageLevel)
|
||||
|
||||
|
|
|
@ -96,15 +96,15 @@ class KafkaInputDStream[T: ClassManifest](
|
|||
} */
|
||||
|
||||
def createReceiver(): NetworkReceiver[T] = {
|
||||
new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel)
|
||||
new KafkaReceiver(host, port, groupId, topics, initialOffsets, storageLevel)
|
||||
.asInstanceOf[NetworkReceiver[T]]
|
||||
}
|
||||
}
|
||||
|
||||
private[streaming]
|
||||
class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
|
||||
class KafkaReceiver(host: String, port: Int, groupId: String,
|
||||
topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
|
||||
storageLevel: StorageLevel) extends NetworkReceiver[Any](streamId) {
|
||||
storageLevel: StorageLevel) extends NetworkReceiver[Any] {
|
||||
|
||||
// Timeout for establishing a connection to Zookeper in ms.
|
||||
val ZK_TIMEOUT = 10000
|
||||
|
|
|
@ -17,6 +17,15 @@ import akka.util.duration._
|
|||
import spark.streaming.util.{RecurringTimer, SystemClock}
|
||||
import java.util.concurrent.ArrayBlockingQueue
|
||||
|
||||
/**
|
||||
* Abstract class for defining any InputDStream that has to start a receiver on worker
|
||||
* nodes to receive external data. Specific implementations of NetworkInputDStream must
|
||||
* define the createReceiver() function that creates the receiver object of type
|
||||
* [[spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive
|
||||
* data.
|
||||
* @param ssc_ Streaming context that will execute this input stream
|
||||
* @tparam T Class type of the object of this stream
|
||||
*/
|
||||
abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext)
|
||||
extends InputDStream[T](ssc_) {
|
||||
|
||||
|
@ -25,7 +34,7 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
|
|||
val id = ssc.getNewNetworkStreamId()
|
||||
|
||||
/**
|
||||
* This method creates the receiver object that will be sent to the workers
|
||||
* Creates the receiver object that will be sent to the worker nodes
|
||||
* to receive data. This method needs to defined by any specific implementation
|
||||
* of a NetworkInputDStream.
|
||||
*/
|
||||
|
@ -48,7 +57,11 @@ private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverM
|
|||
private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage
|
||||
private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage
|
||||
|
||||
abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Serializable with Logging {
|
||||
/**
|
||||
* Abstract class of a receiver that can be run on worker nodes to receive external data. See
|
||||
* [[spark.streaming.dstream.NetworkInputDStream]] for an explanation.
|
||||
*/
|
||||
abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Logging {
|
||||
|
||||
initLogging()
|
||||
|
||||
|
@ -59,17 +72,22 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
|
|||
|
||||
lazy protected val receivingThread = Thread.currentThread()
|
||||
|
||||
/** This method will be called to start receiving data. */
|
||||
protected var streamId: Int = -1
|
||||
|
||||
/**
|
||||
* This method will be called to start receiving data. All your receiver
|
||||
* starting code should be implemented by defining this function.
|
||||
*/
|
||||
protected def onStart()
|
||||
|
||||
/** This method will be called to stop receiving data. */
|
||||
protected def onStop()
|
||||
|
||||
/** This method conveys a placement preference (hostname) for this receiver. */
|
||||
/** Conveys a placement preference (hostname) for this receiver. */
|
||||
def getLocationPreference() : Option[String] = None
|
||||
|
||||
/**
|
||||
* This method starts the receiver. First is accesses all the lazy members to
|
||||
* Starts the receiver. First is accesses all the lazy members to
|
||||
* materialize them. Then it calls the user-defined onStart() method to start
|
||||
* other threads, etc required to receiver the data.
|
||||
*/
|
||||
|
@ -92,7 +110,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
|
|||
}
|
||||
|
||||
/**
|
||||
* This method stops the receiver. First it interrupts the main receiving thread,
|
||||
* Stops the receiver. First it interrupts the main receiving thread,
|
||||
* that is, the thread that called receiver.start(). Then it calls the user-defined
|
||||
* onStop() method to stop other threads and/or do cleanup.
|
||||
*/
|
||||
|
@ -103,7 +121,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
|
|||
}
|
||||
|
||||
/**
|
||||
* This method stops the receiver and reports to exception to the tracker.
|
||||
* Stops the receiver and reports to exception to the tracker.
|
||||
* This should be called whenever an exception has happened on any thread
|
||||
* of the receiver.
|
||||
*/
|
||||
|
@ -115,7 +133,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
|
|||
|
||||
|
||||
/**
|
||||
* This method pushes a block (as iterator of values) into the block manager.
|
||||
* Pushes a block (as iterator of values) into the block manager.
|
||||
*/
|
||||
def pushBlock(blockId: String, iterator: Iterator[T], metadata: Any, level: StorageLevel) {
|
||||
val buffer = new ArrayBuffer[T] ++ iterator
|
||||
|
@ -125,7 +143,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
|
|||
}
|
||||
|
||||
/**
|
||||
* This method pushes a block (as bytes) into the block manager.
|
||||
* Pushes a block (as bytes) into the block manager.
|
||||
*/
|
||||
def pushBlock(blockId: String, bytes: ByteBuffer, metadata: Any, level: StorageLevel) {
|
||||
env.blockManager.putBytes(blockId, bytes, level)
|
||||
|
@ -157,6 +175,10 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
|
|||
}
|
||||
}
|
||||
|
||||
protected[streaming] def setStreamId(id: Int) {
|
||||
streamId = id
|
||||
}
|
||||
|
||||
/**
|
||||
* Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into
|
||||
* appropriately named blocks at regular intervals. This class starts two threads,
|
||||
|
@ -202,7 +224,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
|
|||
val newBlockBuffer = currentBuffer
|
||||
currentBuffer = new ArrayBuffer[T]
|
||||
if (newBlockBuffer.size > 0) {
|
||||
val blockId = "input-" + NetworkReceiver.this.streamId + "- " + (time - blockInterval)
|
||||
val blockId = "input-" + NetworkReceiver.this.streamId + "-" + (time - blockInterval)
|
||||
val newBlock = createBlock(blockId, newBlockBuffer.toIterator)
|
||||
blocksForPushing.add(newBlock)
|
||||
}
|
||||
|
|
|
@ -26,13 +26,13 @@ class RawInputDStream[T: ClassManifest](
|
|||
) extends NetworkInputDStream[T](ssc_ ) with Logging {
|
||||
|
||||
def createReceiver(): NetworkReceiver[T] = {
|
||||
new RawNetworkReceiver(id, host, port, storageLevel).asInstanceOf[NetworkReceiver[T]]
|
||||
new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[NetworkReceiver[T]]
|
||||
}
|
||||
}
|
||||
|
||||
private[streaming]
|
||||
class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: StorageLevel)
|
||||
extends NetworkReceiver[Any](streamId) {
|
||||
class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel)
|
||||
extends NetworkReceiver[Any] {
|
||||
|
||||
var blockPushingThread: Thread = null
|
||||
|
||||
|
|
|
@ -16,18 +16,17 @@ class SocketInputDStream[T: ClassManifest](
|
|||
) extends NetworkInputDStream[T](ssc_) {
|
||||
|
||||
def createReceiver(): NetworkReceiver[T] = {
|
||||
new SocketReceiver(id, host, port, bytesToObjects, storageLevel)
|
||||
new SocketReceiver(host, port, bytesToObjects, storageLevel)
|
||||
}
|
||||
}
|
||||
|
||||
private[streaming]
|
||||
class SocketReceiver[T: ClassManifest](
|
||||
streamId: Int,
|
||||
host: String,
|
||||
port: Int,
|
||||
bytesToObjects: InputStream => Iterator[T],
|
||||
storageLevel: StorageLevel
|
||||
) extends NetworkReceiver[T](streamId) {
|
||||
) extends NetworkReceiver[T] {
|
||||
|
||||
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче