diff --git a/build.sbt b/build.sbt index 358f80b..9f460a5 100644 --- a/build.sbt +++ b/build.sbt @@ -1,18 +1,20 @@ name := "spark-streaming" -version := "0.1" +version := "0.2" -scalaVersion := "2.12.10" +scalaVersion := "2.13.12" -val sparkVersion = "3.0.2" -val postgresVersion = "42.2.2" -val cassandraConnectorVersion = "3.0.0" // preview version at the moment of writing (July 7, 2020) -val akkaVersion = "2.5.24" -val akkaHttpVersion = "10.1.7" +val sparkVersion = "3.5.0" +val postgresVersion = "42.6.0" +val cassandraConnectorVersion = "3.4.1" // preview version at the moment of writing (October, 2023) +val akkaVersion = "2.6.19" +val akkaHttpVersion = "10.2.10" val twitter4jVersion = "4.0.7" -val kafkaVersion = "2.4.0" -val log4jVersion = "2.4.1" -val nlpLibVersion = "3.5.1" +val kafkaVersion = "3.6.0" +val log4jVersion = "2.20.0" +val nlpLibVersion = "4.5.4" + +evictionErrorLevel := Level.Warn resolvers ++= Seq( "bintray-spark-packages" at "https://dl.bintray.com/spark-packages/maven", @@ -32,7 +34,7 @@ libraryDependencies ++= Seq( "org.apache.spark" %% "spark-streaming" % sparkVersion, // streaming-kafka - "org.apache.spark" % "spark-sql-kafka-0-10_2.12" % sparkVersion, + "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion, // low-level integrations "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion, diff --git a/project/build.properties b/project/build.properties index 40d3e51..4790c79 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version = 1.3.8 \ No newline at end of file +sbt.version = 1.9.6 \ No newline at end of file diff --git a/src/main/scala/part4integrations/IntegratingAkka.scala b/src/main/scala/part4integrations/IntegratingAkka.scala index 04b4fcf..e49230c 100644 --- a/src/main/scala/part4integrations/IntegratingAkka.scala +++ b/src/main/scala/part4integrations/IntegratingAkka.scala @@ -1,7 +1,7 @@ package part4integrations import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props} -import akka.stream.{ActorMaterializer, OverflowStrategy} +import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy} import akka.stream.scaladsl.{Sink, Source} import com.typesafe.config.ConfigFactory import org.apache.spark.sql.{Dataset, SparkSession} @@ -49,7 +49,7 @@ object IntegratingAkka { object ReceiverSystem { implicit val actorSystem = ActorSystem("ReceiverSystem", ConfigFactory.load("akkaconfig/remoteActors").getConfig("remoteSystem")) - implicit val actorMaterializer = ActorMaterializer() + implicit val actorMaterializer = Materializer(actorSystem) class Destination extends Actor with ActorLogging { override def receive = { diff --git a/src/main/scala/part5twitter/CustomReceiverApp.scala b/src/main/scala/part5twitter/CustomReceiverApp.scala index ce0406c..ae2cac3 100644 --- a/src/main/scala/part5twitter/CustomReceiverApp.scala +++ b/src/main/scala/part5twitter/CustomReceiverApp.scala @@ -1,38 +1,36 @@ package part5twitter import java.net.Socket - import org.apache.spark.sql.SparkSession import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.{Seconds, StreamingContext} -import scala.concurrent.{Future, Promise} import scala.io.Source class CustomSocketReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) { - import scala.concurrent.ExecutionContext.Implicits.global - val socketPromise: Promise[Socket] = Promise[Socket]() - val socketFuture = socketPromise.future + /* This variable is used to store the Socket instance, when the receiver starts, it establishes the socket + connection and reads data from the socket. + */ + var mayBeSocket: Option[Socket] = None // called asynchronously override def onStart(): Unit = { - val socket = new Socket(host, port) + mayBeSocket = Some(new Socket(host, port)) + val socket = mayBeSocket.get - // run on another thread - Future { - Source.fromInputStream(socket.getInputStream) - .getLines() - .foreach(line => store(line)) // store makes this string available to Spark - } + // Read data from the socket and store it + val source = Source.fromInputStream(socket.getInputStream) + source.getLines().foreach(line => store(line)) - socketPromise.success(socket) + source.close() + socket.close() } // called asynchronously - override def onStop(): Unit = socketFuture.foreach(socket => socket.close()) + override def onStop(): Unit = mayBeSocket.foreach(socket => socket.close()) } object CustomReceiverApp { diff --git a/src/main/scala/part5twitter/SentimentAnalysis.scala b/src/main/scala/part5twitter/SentimentAnalysis.scala index 3e52480..d19667d 100644 --- a/src/main/scala/part5twitter/SentimentAnalysis.scala +++ b/src/main/scala/part5twitter/SentimentAnalysis.scala @@ -7,8 +7,7 @@ import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations import edu.stanford.nlp.pipeline.StanfordCoreNLP import edu.stanford.nlp.sentiment.SentimentCoreAnnotations import edu.stanford.nlp.util.CoreMap - -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ object SentimentAnalysis { @@ -25,7 +24,7 @@ object SentimentAnalysis { // split the text into sentences and attach scores to each val sentences = annotation.get(classOf[CoreAnnotations.SentencesAnnotation]).asScala val sentiments = sentences.map { sentence: CoreMap => - val tree = sentence.get(classOf[SentimentCoreAnnotations.AnnotatedTree]) + val tree = sentence.get(classOf[SentimentCoreAnnotations.SentimentAnnotatedTree]) // convert the score to a double for each sentence RNNCoreAnnotations.getPredictedClass(tree).toDouble } diff --git a/src/main/scala/part5twitter/TwitterReceiver.scala b/src/main/scala/part5twitter/TwitterReceiver.scala index 61280aa..bf18a0d 100644 --- a/src/main/scala/part5twitter/TwitterReceiver.scala +++ b/src/main/scala/part5twitter/TwitterReceiver.scala @@ -1,18 +1,18 @@ package part5twitter import java.io.{OutputStream, PrintStream} - import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver import twitter4j._ -import scala.concurrent.Promise - class TwitterReceiver extends Receiver[Status](StorageLevel.MEMORY_ONLY) { - import scala.concurrent.ExecutionContext.Implicits.global - val twitterStreamPromise = Promise[TwitterStream] - val twitterStreamFuture = twitterStreamPromise.future + /* This variable is used to keep track of the TwitterStream object, which is created when the receiver is started. It starts as None and is updated + to Some(twitterStream) when the Twitter stream is successfully created in the onStart method. This approach allows the code to check whether the Twitter + stream has been initialized & is active before performing cleanup or shutdown operations in the onStop method, ensuring that these operations only occur + if a Twitter stream has been created. + */ + var mayBeStream: Option[TwitterStream] = None private def simpleStatusListener = new StatusListener { override def onStatus(status: Status): Unit = store(status) @@ -38,11 +38,11 @@ class TwitterReceiver extends Receiver[Status](StorageLevel.MEMORY_ONLY) { .addListener(simpleStatusListener) .sample("en") // call the Twitter sample endpoint for English tweets - twitterStreamPromise.success(twitterStream) + mayBeStream = Some(twitterStream) } // run asynchronously - override def onStop(): Unit = twitterStreamFuture.foreach { twitterStream => + override def onStop(): Unit = mayBeStream.foreach { twitterStream => twitterStream.cleanUp() twitterStream.shutdown() } diff --git a/src/main/scala/part7science/ScienceServer.scala b/src/main/scala/part7science/ScienceServer.scala index 39cb217..c75a261 100644 --- a/src/main/scala/part7science/ScienceServer.scala +++ b/src/main/scala/part7science/ScienceServer.scala @@ -1,21 +1,20 @@ package part7science import java.util.Properties - import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model.{ContentTypes, HttpEntity, StatusCodes} -import akka.stream.ActorMaterializer +import akka.stream.Materializer import akka.http.scaladsl.server.Directives._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.{LongSerializer, StringSerializer} - +import scala.util.Using import scala.io.Source object ScienceServer { implicit val system = ActorSystem() - implicit val materializer = ActorMaterializer() + implicit val materializer: Materializer = Materializer(system) val kafkaTopic = "science" val kafkaBootstrapServer = "localhost:9092" @@ -33,12 +32,14 @@ object ScienceServer { def getRoute(producer: KafkaProducer[Long, String]) = { pathEndOrSingleSlash { get { - complete( - HttpEntity( - ContentTypes.`text/html(UTF-8)`, - Source.fromFile("src/main/html/whackamole.html").getLines().mkString("") - ) - ) + complete { + Using(Source.fromFile("src/main/html/whackamole.html")) { source => + HttpEntity( + ContentTypes.`text/html(UTF-8)`, + source.getLines().mkString("") + ) + } + } } } ~ path("api" / "report") { @@ -60,7 +61,7 @@ object ScienceServer { // spinning up the server val kafkaProducer = getProducer() - val bindingFuture = Http().bindAndHandle(getRoute(kafkaProducer), "localhost", 9988) + val bindingFuture = Http().newServerAt("localhost", 9988).bind(getRoute(kafkaProducer)) // cleanup bindingFuture.foreach { binding =>