-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSearchTask.scala
More file actions
59 lines (41 loc) · 1.83 KB
/
SearchTask.scala
File metadata and controls
59 lines (41 loc) · 1.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import org.apache.spark.{SparkConf, SparkContext}
import com.mongodb.spark._
import com.mongodb.spark.config._
import scala.collection.JavaConverters._
import org.apache.log4j.BasicConfigurator
import org.apache.log4j.varia.NullAppender
object SearchTask {
def main(args: Array[String]): Unit = {
val nullAppender = new NullAppender
BasicConfigurator.configure(nullAppender)
val conf = new SparkConf()
.setAppName("MongoDB RDD Connector")
.setMaster("local[*]")
val sc = new SparkContext(conf)
//load collection
val mongoUri = "mongodb://localhost:27017/BigData.dictionary"
val DocRDD = MongoSpark.load(sc, ReadConfig(Map("uri" -> mongoUri)))
println("Enter your query:")
val userInput = scala.io.StdIn.readLine()
//println(userInput)
val processedQuery = userInput.toLowerCase.split("\\s+").toSeq
println(processedQuery)
// Search in collection
/*
this is just for one word in the query
val filteredRDD = DocRDD.filter(doc => doc.getString("word") == userInput)
.flatMap(doc => doc.getList("docIds", classOf[String]).asScala) // Extract docIds only
*/
// Collect document IDs for each word
val docIdRdds = processedQuery.map{ word => //a sequence of RDDs, where each RDD contains the document IDs for word .
DocRDD.filter(doc => doc.getString("word") == word)
.flatMap(doc => doc.getList("docIds", classOf[String]).asScala)
}
docIdRdds.zip(processedQuery).foreach { case (rdd, word) =>
val docIds = rdd.collect() //collect each rdd
println(s"Document IDs for '$word': ${docIds.mkString(", ")}")}
val intersectedDocIds = docIdRdds.reduce((rdd1, rdd2) => rdd1.intersection(rdd2)) //operation on two rdd
println("The Documents that these words are mentioned in are:")
intersectedDocIds.collect().foreach(println)
}
}