Skip to content

Project Upgrade for Spark Streaming.#1

Open
Ayush21-AI wants to merge 1 commit into
rockthejvm:masterfrom
Ayush21-AI:Project-upgrade-for-Spark-Streaming
Open

Project Upgrade for Spark Streaming.#1
Ayush21-AI wants to merge 1 commit into
rockthejvm:masterfrom
Ayush21-AI:Project-upgrade-for-Spark-Streaming

Conversation

@Ayush21-AI

Copy link
Copy Markdown

@daniel-ciocirlan Please review this PR.

implicit val actorSystem = ActorSystem("ReceiverSystem", ConfigFactory.load("akkaconfig/remoteActors").getConfig("remoteSystem"))
implicit val actorMaterializer = ActorMaterializer()
implicit val actorMaterializer = Materializer(actorSystem)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Symbol ActorMaterializer is deprecated. In recent versions of Akka, Materializer.apply(system) is the recommended way to create a system-wide materializer.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

val sentiments = sentences.map { sentence: CoreMap =>
val tree = sentence.get(classOf[SentimentCoreAnnotations.AnnotatedTree])
val tree = sentence.get(classOf[SentimentCoreAnnotations.SentimentAnnotatedTree])
// convert the score to a double for each sentence

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AnnotatedTree is no longer available, so used SentimentAnnotatedTree.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

val kafkaProducer = getProducer()
val bindingFuture = Http().bindAndHandle(getRoute(kafkaProducer), "localhost", 9988)
val bindingFuture = Http().newServerAt("localhost", 9988).bind(getRoute(kafkaProducer))

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Symbol bindAndHandle is deprecated. Used Http().newServerAt(...)...bindFlow() to create server bindings.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

val kafkaProducer = getProducer()
val bindingFuture = Http().bindAndHandle(getRoute(kafkaProducer), "localhost", 9988)
val bindingFuture = Http().newServerAt("localhost", 9988).bind(getRoute(kafkaProducer))

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Symbol bindAndHandle is deprecated. Used Http().newServerAt(...)...bindFlow() to create server bindings.

@Ayush21-AI

Copy link
Copy Markdown
Author

@daniel-ciocirlan In many files, StreamingContext and DStream are both deprecated as of Apache Spark 3.x. They were replaced by structured streaming, which offers improved performance, scalability, and fault tolerance.

val ssc = new StreamingContext(spark.sparkContext, Seconds(1))

It's suggested to Migrate to Structured Streaming.

Correct me if I'm wrong, To migrate our application to use structured streaming, we need to replace StreamingContext object with a SparkSession object, and use the readStream() method to create a DataFrame representing our input data stream.

Do we need to do it ?

Need your guidance.

@daniel-ciocirlan daniel-ciocirlan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this, just some minor comments on lib upgrades

Comment thread build.sbt Outdated
val sparkVersion = "3.5.0"
val postgresVersion = "42.6.0"
val cassandraConnectorVersion = "3.4.1" // preview version at the moment of writing (October, 2023)
val akkaVersion = "2.8.0"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use a 2.6 Akka at most, as they changed their license model and we don't want to use non-OSS libraries

Comment thread build.sbt Outdated
val postgresVersion = "42.6.0"
val cassandraConnectorVersion = "3.4.1" // preview version at the moment of writing (October, 2023)
val akkaVersion = "2.8.0"
val akkaHttpVersion = "10.5.0"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for this one, I don't remember which was the last Akka HTTP version that was still OSS

implicit val actorSystem = ActorSystem("ReceiverSystem", ConfigFactory.load("akkaconfig/remoteActors").getConfig("remoteSystem"))
implicit val actorMaterializer = ActorMaterializer()
implicit val actorMaterializer = Materializer(actorSystem)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

val sentiments = sentences.map { sentence: CoreMap =>
val tree = sentence.get(classOf[SentimentCoreAnnotations.AnnotatedTree])
val tree = sentence.get(classOf[SentimentCoreAnnotations.SentimentAnnotatedTree])
// convert the score to a double for each sentence

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

)
)
complete {
Using(Source.fromFile("src/main/html/whackamole.html")) { source =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice of you to auto-close the file 👍

val kafkaProducer = getProducer()
val bindingFuture = Http().bindAndHandle(getRoute(kafkaProducer), "localhost", 9988)
val bindingFuture = Http().newServerAt("localhost", 9988).bind(getRoute(kafkaProducer))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@daniel-ciocirlan

Copy link
Copy Markdown
Contributor

@Ayush21-AI about DStreams - you can keep it as is for now. Please reply with a +1 if the Twitter project works with this new upgrade.

There is a way for us to read regular rows, but that means a more significant overhaul - we need to register a custom data source that implements the DataSource V2 API. That upgrade will involve some more interesting work in the code, if you're up to it. We can make that a separate PR.

@Ayush21-AI Ayush21-AI force-pushed the Project-upgrade-for-Spark-Streaming branch from 1b068c0 to e0cf1f8 Compare October 12, 2023 20:59
Comment thread build.sbt
val cassandraConnectorVersion = "3.4.1" // preview version at the moment of writing (October, 2023)
val akkaVersion = "2.6.19"
val akkaHttpVersion = "10.2.10"
val twitter4jVersion = "4.0.7"

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the akka versions as per the akka licencing.

@Ayush21-AI

Copy link
Copy Markdown
Author

@daniel-ciocirlan updated the akka versions. While running the TwitterProject, i'm getting this below error :
23/10/13 02:34:26 ERROR Utils: Exception encountered
java.io.NotSerializableException: Promises and Futures cannot be serialized
at scala.concurrent.impl.Promise$DefaultPromise.writeObject(Promise.scala:373)

This error indicates that we are trying to serialize a Promise or Future object. This is not possible because these objects are not serializable.
I think the issue arises from using Promise[TwitterStream] and Future[TwitterStream] inside the TwitterReceiver class, which extends the Receiver class.

kindly suggest what to do?

@daniel-ciocirlan

Copy link
Copy Markdown
Contributor

I don't understand - can you revert to the original code and check whtether this worked?
We aren't changing anything in the code itself so that would be a bit weird.

@Ayush21-AI Ayush21-AI force-pushed the Project-upgrade-for-Spark-Streaming branch from e0cf1f8 to 0496777 Compare October 16, 2023 20:08
@Ayush21-AI Ayush21-AI force-pushed the Project-upgrade-for-Spark-Streaming branch from 0496777 to ca3563b Compare October 16, 2023 20:13
connection and reads data from the socket.
*/
var mayBeSocket: Option[Socket] = None

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated as per

if a Twitter stream has been created.
*/
var mayBeStream: Option[TwitterStream] = None

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated as per your suggestion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants