зеркало из https://github.com/microsoft/spark.git
Merge remote-tracking branch 'mrpotes/master'
This commit is contained in:
Коммит
1667158544
|
@ -26,8 +26,8 @@ import spark.SparkContext._
|
|||
*/
|
||||
object TwitterAlgebirdCMS {
|
||||
def main(args: Array[String]) {
|
||||
if (args.length < 3) {
|
||||
System.err.println("Usage: TwitterAlgebirdCMS <master> <twitter_username> <twitter_password>" +
|
||||
if (args.length < 1) {
|
||||
System.err.println("Usage: TwitterAlgebirdCMS <master>" +
|
||||
" [filter1] [filter2] ... [filter n]")
|
||||
System.exit(1)
|
||||
}
|
||||
|
@ -40,12 +40,11 @@ object TwitterAlgebirdCMS {
|
|||
// K highest frequency elements to take
|
||||
val TOPK = 10
|
||||
|
||||
val Array(master, username, password) = args.slice(0, 3)
|
||||
val filters = args.slice(3, args.length)
|
||||
val (master, filters) = (args.head, args.tail)
|
||||
|
||||
val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10),
|
||||
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
|
||||
val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
|
||||
val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER)
|
||||
|
||||
val users = stream.map(status => status.getUser.getId)
|
||||
|
||||
|
|
|
@ -21,20 +21,19 @@ import spark.streaming.dstream.TwitterInputDStream
|
|||
*/
|
||||
object TwitterAlgebirdHLL {
|
||||
def main(args: Array[String]) {
|
||||
if (args.length < 3) {
|
||||
System.err.println("Usage: TwitterAlgebirdHLL <master> <twitter_username> <twitter_password>" +
|
||||
if (args.length < 1) {
|
||||
System.err.println("Usage: TwitterAlgebirdHLL <master>" +
|
||||
" [filter1] [filter2] ... [filter n]")
|
||||
System.exit(1)
|
||||
}
|
||||
|
||||
/** Bit size parameter for HyperLogLog, trades off accuracy vs size */
|
||||
val BIT_SIZE = 12
|
||||
val Array(master, username, password) = args.slice(0, 3)
|
||||
val filters = args.slice(3, args.length)
|
||||
val (master, filters) = (args.head, args.tail)
|
||||
|
||||
val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5),
|
||||
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
|
||||
val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
|
||||
val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER)
|
||||
|
||||
val users = stream.map(status => status.getUser.getId)
|
||||
|
||||
|
|
|
@ -12,18 +12,17 @@ import spark.SparkContext._
|
|||
*/
|
||||
object TwitterPopularTags {
|
||||
def main(args: Array[String]) {
|
||||
if (args.length < 3) {
|
||||
System.err.println("Usage: TwitterPopularTags <master> <twitter_username> <twitter_password>" +
|
||||
if (args.length < 1) {
|
||||
System.err.println("Usage: TwitterPopularTags <master>" +
|
||||
" [filter1] [filter2] ... [filter n]")
|
||||
System.exit(1)
|
||||
}
|
||||
|
||||
val Array(master, username, password) = args.slice(0, 3)
|
||||
val filters = args.slice(3, args.length)
|
||||
val (master, filters) = (args.head, args.tail)
|
||||
|
||||
val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),
|
||||
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
|
||||
val stream = ssc.twitterStream(username, password, filters)
|
||||
val stream = ssc.twitterStream(None, filters)
|
||||
|
||||
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
|
|||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
|
||||
import org.apache.hadoop.fs.Path
|
||||
import twitter4j.Status
|
||||
import twitter4j.auth.Authorization
|
||||
|
||||
|
||||
/**
|
||||
|
@ -380,18 +381,16 @@ class StreamingContext private (
|
|||
|
||||
/**
|
||||
* Create a input stream that returns tweets received from Twitter.
|
||||
* @param username Twitter username
|
||||
* @param password Twitter password
|
||||
* @param twitterAuth Twitter4J authentication
|
||||
* @param filters Set of filter strings to get only those tweets that match them
|
||||
* @param storageLevel Storage level to use for storing the received objects
|
||||
*/
|
||||
def twitterStream(
|
||||
username: String,
|
||||
password: String,
|
||||
twitterAuth: Option[Authorization] = None,
|
||||
filters: Seq[String] = Nil,
|
||||
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
|
||||
): DStream[Status] = {
|
||||
val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel)
|
||||
val inputStream = new TwitterInputDStream(this, twitterAuth, filters, storageLevel)
|
||||
registerInputStream(inputStream)
|
||||
inputStream
|
||||
}
|
||||
|
|
|
@ -4,23 +4,18 @@ import spark.streaming._
|
|||
import receivers.{ActorReceiver, ReceiverSupervisorStrategy}
|
||||
import spark.streaming.dstream._
|
||||
import spark.storage.StorageLevel
|
||||
|
||||
import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
|
||||
import spark.api.java.{JavaSparkContext, JavaRDD}
|
||||
|
||||
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
|
||||
|
||||
import twitter4j.Status
|
||||
|
||||
import akka.actor.Props
|
||||
import akka.actor.SupervisorStrategy
|
||||
import akka.zeromq.Subscribe
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
import java.lang.{Long => JLong, Integer => JInt}
|
||||
import java.io.InputStream
|
||||
import java.util.{Map => JMap}
|
||||
import twitter4j.auth.Authorization
|
||||
|
||||
/**
|
||||
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
|
||||
|
@ -312,44 +307,76 @@ class JavaStreamingContext(val ssc: StreamingContext) {
|
|||
|
||||
/**
|
||||
* Create a input stream that returns tweets received from Twitter.
|
||||
* @param username Twitter username
|
||||
* @param password Twitter password
|
||||
* @param twitterAuth Twitter4J Authorization
|
||||
* @param filters Set of filter strings to get only those tweets that match them
|
||||
* @param storageLevel Storage level to use for storing the received objects
|
||||
*/
|
||||
def twitterStream(
|
||||
username: String,
|
||||
password: String,
|
||||
twitterAuth: Authorization,
|
||||
filters: Array[String],
|
||||
storageLevel: StorageLevel
|
||||
): JavaDStream[Status] = {
|
||||
ssc.twitterStream(username, password, filters, storageLevel)
|
||||
ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a input stream that returns tweets received from Twitter using
|
||||
* java.util.Preferences to store OAuth token. OAuth key and secret should
|
||||
* be provided using system properties twitter4j.oauth.consumerKey and
|
||||
* twitter4j.oauth.consumerSecret
|
||||
* @param filters Set of filter strings to get only those tweets that match them
|
||||
* @param storageLevel Storage level to use for storing the received objects
|
||||
*/
|
||||
def twitterStream(
|
||||
filters: Array[String],
|
||||
storageLevel: StorageLevel
|
||||
): JavaDStream[Status] = {
|
||||
ssc.twitterStream(None, filters, storageLevel)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a input stream that returns tweets received from Twitter.
|
||||
* @param username Twitter username
|
||||
* @param password Twitter password
|
||||
* @param twitterAuth Twitter4J Authorization
|
||||
* @param filters Set of filter strings to get only those tweets that match them
|
||||
*/
|
||||
def twitterStream(
|
||||
username: String,
|
||||
password: String,
|
||||
twitterAuth: Authorization,
|
||||
filters: Array[String]
|
||||
): JavaDStream[Status] = {
|
||||
ssc.twitterStream(username, password, filters)
|
||||
ssc.twitterStream(Some(twitterAuth), filters)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a input stream that returns tweets received from Twitter using
|
||||
* java.util.Preferences to store OAuth token. OAuth key and secret should
|
||||
* be provided using system properties twitter4j.oauth.consumerKey and
|
||||
* twitter4j.oauth.consumerSecret
|
||||
* @param filters Set of filter strings to get only those tweets that match them
|
||||
*/
|
||||
def twitterStream(
|
||||
filters: Array[String]
|
||||
): JavaDStream[Status] = {
|
||||
ssc.twitterStream(None, filters)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a input stream that returns tweets received from Twitter.
|
||||
* @param username Twitter username
|
||||
* @param password Twitter password
|
||||
* @param twitterAuth Twitter4J Authorization
|
||||
*/
|
||||
def twitterStream(
|
||||
username: String,
|
||||
password: String
|
||||
twitterAuth: Authorization
|
||||
): JavaDStream[Status] = {
|
||||
ssc.twitterStream(username, password)
|
||||
ssc.twitterStream(Some(twitterAuth))
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a input stream that returns tweets received from Twitter using
|
||||
* java.util.Preferences to store OAuth token. OAuth key and secret should
|
||||
* be provided using system properties twitter4j.oauth.consumerKey and
|
||||
* twitter4j.oauth.consumerSecret
|
||||
*/
|
||||
def twitterStream(): JavaDStream[Status] = {
|
||||
ssc.twitterStream()
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -3,34 +3,59 @@ package spark.streaming.dstream
|
|||
import spark._
|
||||
import spark.streaming._
|
||||
import storage.StorageLevel
|
||||
|
||||
import twitter4j._
|
||||
import twitter4j.auth.BasicAuthorization
|
||||
import twitter4j.auth.Authorization
|
||||
import java.util.prefs.Preferences
|
||||
import twitter4j.conf.PropertyConfiguration
|
||||
import twitter4j.auth.OAuthAuthorization
|
||||
import twitter4j.auth.AccessToken
|
||||
|
||||
/* A stream of Twitter statuses, potentially filtered by one or more keywords.
|
||||
*
|
||||
* @constructor create a new Twitter stream using the supplied username and password to authenticate.
|
||||
* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
|
||||
* such that this may return a sampled subset of all tweets during each interval.
|
||||
*
|
||||
* Includes a simple implementation of OAuth using consumer key and secret provided using system
|
||||
* properties twitter4j.oauth.consumerKey and twitter4j.oauth.consumerSecret
|
||||
*/
|
||||
private[streaming]
|
||||
class TwitterInputDStream(
|
||||
@transient ssc_ : StreamingContext,
|
||||
username: String,
|
||||
password: String,
|
||||
twitterAuth: Option[Authorization],
|
||||
filters: Seq[String],
|
||||
storageLevel: StorageLevel
|
||||
) extends NetworkInputDStream[Status](ssc_) {
|
||||
|
||||
lazy val createOAuthAuthorization: Authorization = {
|
||||
val userRoot = Preferences.userRoot();
|
||||
val token = Option(userRoot.get(PropertyConfiguration.OAUTH_ACCESS_TOKEN, null))
|
||||
val tokenSecret = Option(userRoot.get(PropertyConfiguration.OAUTH_ACCESS_TOKEN_SECRET, null))
|
||||
val oAuth = new OAuthAuthorization(new PropertyConfiguration(System.getProperties()))
|
||||
if (token.isEmpty || tokenSecret.isEmpty) {
|
||||
val requestToken = oAuth.getOAuthRequestToken()
|
||||
println("Authorize application using URL: "+requestToken.getAuthorizationURL())
|
||||
println("Enter PIN: ")
|
||||
val pin = Console.readLine
|
||||
val accessToken = if (pin.length() > 0) oAuth.getOAuthAccessToken(requestToken, pin) else oAuth.getOAuthAccessToken()
|
||||
userRoot.put(PropertyConfiguration.OAUTH_ACCESS_TOKEN, accessToken.getToken())
|
||||
userRoot.put(PropertyConfiguration.OAUTH_ACCESS_TOKEN_SECRET, accessToken.getTokenSecret())
|
||||
userRoot.flush()
|
||||
} else {
|
||||
oAuth.setOAuthAccessToken(new AccessToken(token.get, tokenSecret.get));
|
||||
}
|
||||
oAuth
|
||||
}
|
||||
|
||||
override def getReceiver(): NetworkReceiver[Status] = {
|
||||
new TwitterReceiver(username, password, filters, storageLevel)
|
||||
new TwitterReceiver(if (twitterAuth.isEmpty) createOAuthAuthorization else twitterAuth.get, filters, storageLevel)
|
||||
}
|
||||
}
|
||||
|
||||
private[streaming]
|
||||
class TwitterReceiver(
|
||||
username: String,
|
||||
password: String,
|
||||
twitterAuth: Authorization,
|
||||
filters: Seq[String],
|
||||
storageLevel: StorageLevel
|
||||
) extends NetworkReceiver[Status] {
|
||||
|
@ -40,8 +65,7 @@ class TwitterReceiver(
|
|||
|
||||
protected override def onStart() {
|
||||
blockGenerator.start()
|
||||
twitterStream = new TwitterStreamFactory()
|
||||
.getInstance(new BasicAuthorization(username, password))
|
||||
twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
|
||||
twitterStream.addListener(new StatusListener {
|
||||
def onStatus(status: Status) = {
|
||||
blockGenerator += status
|
||||
|
|
Загрузка…
Ссылка в новой задаче