Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
name := "spark-streaming"

version := "0.1"
version := "0.2"

scalaVersion := "2.12.10"
scalaVersion := "2.13.12"

val sparkVersion = "3.0.2"
val postgresVersion = "42.2.2"
val cassandraConnectorVersion = "3.0.0" // preview version at the moment of writing (July 7, 2020)
val akkaVersion = "2.5.24"
val akkaHttpVersion = "10.1.7"
val sparkVersion = "3.5.0"
val postgresVersion = "42.6.0"
val cassandraConnectorVersion = "3.4.1" // preview version at the moment of writing (October, 2023)
val akkaVersion = "2.6.19"
val akkaHttpVersion = "10.2.10"
val twitter4jVersion = "4.0.7"

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the akka versions as per the akka licencing.

val kafkaVersion = "2.4.0"
val log4jVersion = "2.4.1"
val nlpLibVersion = "3.5.1"
val kafkaVersion = "3.6.0"
val log4jVersion = "2.20.0"
val nlpLibVersion = "4.5.4"

evictionErrorLevel := Level.Warn

resolvers ++= Seq(
"bintray-spark-packages" at "https://dl.bintray.com/spark-packages/maven",
Expand All @@ -32,7 +34,7 @@ libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming" % sparkVersion,

// streaming-kafka
"org.apache.spark" % "spark-sql-kafka-0-10_2.12" % sparkVersion,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,

// low-level integrations
"org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.3.8
sbt.version = 1.9.6
4 changes: 2 additions & 2 deletions src/main/scala/part4integrations/IntegratingAkka.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package part4integrations

import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy}
import akka.stream.scaladsl.{Sink, Source}
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.{Dataset, SparkSession}
Expand Down Expand Up @@ -49,7 +49,7 @@ object IntegratingAkka {

object ReceiverSystem {
implicit val actorSystem = ActorSystem("ReceiverSystem", ConfigFactory.load("akkaconfig/remoteActors").getConfig("remoteSystem"))
implicit val actorMaterializer = ActorMaterializer()
implicit val actorMaterializer = Materializer(actorSystem)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Symbol ActorMaterializer is deprecated. In recent versions of Akka, Materializer.apply(system) is the recommended way to create a system-wide materializer.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

class Destination extends Actor with ActorLogging {
override def receive = {
Expand Down
26 changes: 12 additions & 14 deletions src/main/scala/part5twitter/CustomReceiverApp.scala
Original file line number Diff line number Diff line change
@@ -1,38 +1,36 @@
package part5twitter

import java.net.Socket

import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.concurrent.{Future, Promise}
import scala.io.Source

class CustomSocketReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
import scala.concurrent.ExecutionContext.Implicits.global

val socketPromise: Promise[Socket] = Promise[Socket]()
val socketFuture = socketPromise.future
/* This variable is used to store the Socket instance, when the receiver starts, it establishes the socket
connection and reads data from the socket.
*/
var mayBeSocket: Option[Socket] = None

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated as per

// called asynchronously
override def onStart(): Unit = {
val socket = new Socket(host, port)
mayBeSocket = Some(new Socket(host, port))
val socket = mayBeSocket.get

// run on another thread
Future {
Source.fromInputStream(socket.getInputStream)
.getLines()
.foreach(line => store(line)) // store makes this string available to Spark
}
// Read data from the socket and store it
val source = Source.fromInputStream(socket.getInputStream)
source.getLines().foreach(line => store(line))

socketPromise.success(socket)
source.close()
socket.close()
}

// called asynchronously
override def onStop(): Unit = socketFuture.foreach(socket => socket.close())
override def onStop(): Unit = mayBeSocket.foreach(socket => socket.close())
}

object CustomReceiverApp {
Expand Down
5 changes: 2 additions & 3 deletions src/main/scala/part5twitter/SentimentAnalysis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations
import edu.stanford.nlp.pipeline.StanfordCoreNLP
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations
import edu.stanford.nlp.util.CoreMap

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

object SentimentAnalysis {

Expand All @@ -25,7 +24,7 @@ object SentimentAnalysis {
// split the text into sentences and attach scores to each
val sentences = annotation.get(classOf[CoreAnnotations.SentencesAnnotation]).asScala
val sentiments = sentences.map { sentence: CoreMap =>
val tree = sentence.get(classOf[SentimentCoreAnnotations.AnnotatedTree])
val tree = sentence.get(classOf[SentimentCoreAnnotations.SentimentAnnotatedTree])
// convert the score to a double for each sentence

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AnnotatedTree is no longer available, so used SentimentAnnotatedTree.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

RNNCoreAnnotations.getPredictedClass(tree).toDouble
}
Expand Down
16 changes: 8 additions & 8 deletions src/main/scala/part5twitter/TwitterReceiver.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package part5twitter

import java.io.{OutputStream, PrintStream}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import twitter4j._

import scala.concurrent.Promise

class TwitterReceiver extends Receiver[Status](StorageLevel.MEMORY_ONLY) {
import scala.concurrent.ExecutionContext.Implicits.global

val twitterStreamPromise = Promise[TwitterStream]
val twitterStreamFuture = twitterStreamPromise.future
/* This variable is used to keep track of the TwitterStream object, which is created when the receiver is started. It starts as None and is updated
to Some(twitterStream) when the Twitter stream is successfully created in the onStart method. This approach allows the code to check whether the Twitter
stream has been initialized & is active before performing cleanup or shutdown operations in the onStop method, ensuring that these operations only occur
if a Twitter stream has been created.
*/
var mayBeStream: Option[TwitterStream] = None

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated as per your suggestion.

private def simpleStatusListener = new StatusListener {
override def onStatus(status: Status): Unit = store(status)
Expand All @@ -38,11 +38,11 @@ class TwitterReceiver extends Receiver[Status](StorageLevel.MEMORY_ONLY) {
.addListener(simpleStatusListener)
.sample("en") // call the Twitter sample endpoint for English tweets

twitterStreamPromise.success(twitterStream)
mayBeStream = Some(twitterStream)
}

// run asynchronously
override def onStop(): Unit = twitterStreamFuture.foreach { twitterStream =>
override def onStop(): Unit = mayBeStream.foreach { twitterStream =>
twitterStream.cleanUp()
twitterStream.shutdown()
}
Expand Down
23 changes: 12 additions & 11 deletions src/main/scala/part7science/ScienceServer.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
package part7science

import java.util.Properties

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, StatusCodes}
import akka.stream.ActorMaterializer
import akka.stream.Materializer
import akka.http.scaladsl.server.Directives._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.{LongSerializer, StringSerializer}

import scala.util.Using
import scala.io.Source


object ScienceServer {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val materializer: Materializer = Materializer(system)

val kafkaTopic = "science"
val kafkaBootstrapServer = "localhost:9092"
Expand All @@ -33,12 +32,14 @@ object ScienceServer {
def getRoute(producer: KafkaProducer[Long, String]) = {
pathEndOrSingleSlash {
get {
complete(
HttpEntity(
ContentTypes.`text/html(UTF-8)`,
Source.fromFile("src/main/html/whackamole.html").getLines().mkString("")
)
)
complete {
Using(Source.fromFile("src/main/html/whackamole.html")) { source =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice of you to auto-close the file 👍

HttpEntity(
ContentTypes.`text/html(UTF-8)`,
source.getLines().mkString("")
)
}
}
}
} ~
path("api" / "report") {
Expand All @@ -60,7 +61,7 @@ object ScienceServer {

// spinning up the server
val kafkaProducer = getProducer()
val bindingFuture = Http().bindAndHandle(getRoute(kafkaProducer), "localhost", 9988)
val bindingFuture = Http().newServerAt("localhost", 9988).bind(getRoute(kafkaProducer))

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Symbol bindAndHandle is deprecated. Used Http().newServerAt(...)...bindFlow() to create server bindings.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Symbol bindAndHandle is deprecated. Used Http().newServerAt(...)...bindFlow() to create server bindings.

// cleanup
bindingFuture.foreach { binding =>
Expand Down