-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathItemBasedRecommendationSystem.py
More file actions
224 lines (168 loc) · 10.6 KB
/
Copy pathItemBasedRecommendationSystem.py
File metadata and controls
224 lines (168 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
from pyspark import SparkContext
import time
import sys
startTime = time.time()
sc=SparkContext("local[*]","Sample2.1")
sc.setLogLevel("ERROR")
trainFile = sys.argv[1]
testFile = sys.argv[2]
outputFile = sys.argv[3]
#trainFile = './yelp_train.csv'
#testFile = './yelp_val.csv'
#outputFile = './output2_1.csv'
def pearsonCoefficientDenominatorHelper(currentBusinessDifferences,candidateBusinessDifferences):
#Denominator of W
currentBusinessDenominator = sum([x * x for x in currentBusinessDifferences])** 0.5
candidateBusinessDenominator = sum([x * x for x in candidateBusinessDifferences])** 0.5
return currentBusinessDenominator, candidateBusinessDenominator
def pearsonCoefficientNumeratorHelper(currentBusinessDifferences,candidateBusinessDifferences):
#Numerator of W
numeratorW = sum([currentBusinessDifferences[x] * candidateBusinessDifferences[x] for x in range(len(candidateBusinessDifferences))])
return numeratorW
def pearsonCoefficientHelper(currentBusiness,candidateBusiness,currentBusinessRatings,candidateBusinessRatings,commonUsers):
#Ratings of currentBusiness and candidateBusiness by all common users
currentBusinessRatingsByCommonUsers = [businessAndRatedByUsersAndRatingDictionary[currentBusiness][user] for user in commonUsers]
candidateBusinessRatingsByCommonUsers = [businessAndRatedByUsersAndRatingDictionary[candidateBusiness][user] for user in commonUsers]
#Averages of ALL the ratings of currentBusiness and candidateBusiness
currentBusinessAverage = sum(currentBusinessRatings) / float(len(currentBusinessRatings))
candidateBusinessAverage = sum(candidateBusinessRatings) / float(len(candidateBusinessRatings))
#Diff of rating and avg rating for active business and business candidateBusiness, but only for common users
currentBusinessDifferences = [x - currentBusinessAverage for x in currentBusinessRatingsByCommonUsers]
candidateBusinessDifferences = [x - candidateBusinessAverage for x in candidateBusinessRatingsByCommonUsers]
currentBusinessDenominator, candidateBusinessDenominator= pearsonCoefficientDenominatorHelper(currentBusinessDifferences,candidateBusinessDifferences)
numeratorW=pearsonCoefficientNumeratorHelper(currentBusinessDifferences,candidateBusinessDifferences)
return numeratorW, currentBusinessDenominator, candidateBusinessDenominator
def pearsonCoefficient(x):
currentUser = x[0]
currentBusiness = x[1]
#All business rated by the current user
restBusinesses = x[2]
weightsAndOtherDetailsList = []
#All users that rated the currentBusiness
usersWhoRatedCurrentBusiness = businessAndRatedByUsersDictionary[currentBusiness]
#Ratings of currentBusiness by ALL the users that rated it
#currentBusinessRatings = [businessAndRatedByUsersAndRatingDictionary[currentBusiness][user] for user in usersWhoRatedCurrentBusiness]
currentBusinessRatings = businessRatingBroadcasted.value[currentBusiness]
for candidateBusiness in restBusinesses:
#All users who rated candidateBusiness
usersWhoRatedCandidateBusiness = businessAndRatedByUsersDictionary[candidateBusiness]
#Ratings of candidateBusiness by all the users who rated it
#candidateBusinessRatings = [businessAndRatedByUsersAndRatingDictionary[candidateBusiness][user] for user in usersWhoRatedCandidateBusiness]
candidateBusinessRatings = businessRatingBroadcasted.value[candidateBusiness]
#Common Users to currentBusiness and candidateBusiness
commonUsers = usersWhoRatedCandidateBusiness.intersection(usersWhoRatedCurrentBusiness)
#When no common users
if len(commonUsers) == 0:
s1=sum(currentBusinessRatings)/len(currentBusinessRatings)
s2=sum(candidateBusinessRatings)/len(candidateBusinessRatings)
w1=float(s1)/s2
w2=float(s2)/s1
wFinal=min(w1,w2)
weightsAndOtherDetailsList.append([candidateBusiness, wFinal, businessAndRatedByUsersAndRatingDictionary[candidateBusiness][currentUser]])
continue
numeratorW, currentBusinessDenominator, candidateBusinessDenominator= pearsonCoefficientHelper(currentBusiness,candidateBusiness,currentBusinessRatings,candidateBusinessRatings,commonUsers)
weight=0
#If denominator equals 0(implies numerator also equals 0) then set weight to 1
if(currentBusinessDenominator==0 or candidateBusinessDenominator==0):
weight=1
else:
weight = numeratorW / (currentBusinessDenominator * candidateBusinessDenominator)
weightsAndOtherDetailsList.append([candidateBusiness, weight, businessAndRatedByUsersAndRatingDictionary[candidateBusiness][currentUser]])
#Sorted to get top 50 W
sortedWeightsAndOtherDetailsList = sorted(weightsAndOtherDetailsList, key=lambda x: -x[1])
return [(currentUser, currentBusiness), sortedWeightsAndOtherDetailsList[:50]]
def predictRatings(data):
currentUser = data[0][0]
currentBusiness = data[0][1]
weightsList = data[1]
numeratorP = 0
denominatorP = 0
for w in weightsList:
if(w[1]>0):
numeratorP += w[1] * float(w[2])
denominatorP += abs(w[1])
prediction = 3.0
if (denominatorP != 0):
prediction = numeratorP / float(denominatorP)
if(prediction>5):
prediction=5.0
elif(prediction<1):
prediction=1.0
return [(currentUser), {currentBusiness: prediction}]
trainRDD = sc.textFile(trainFile).map(lambda line: line.split(","))
trainRDDFirstRow = trainRDD.first()
trainRDDFinal = trainRDD.filter(lambda row: row != trainRDDFirstRow)
testRDD = sc.textFile(testFile).map(lambda line: line.split(","))
testRDDFirstRow = testRDD.first()
testRDDFinal=testRDD.filter(lambda row: row != testRDDFirstRow)
#RDD containing user and its ratings
userRating = trainRDDFinal.map(lambda row: (row[0], {float(row[2])})).reduceByKey(lambda a, b: a | b)
#Dictionary of the above
userRatingDictionary = dict(userRating.collect())
#Broadcasting the above dictionary
userRatingBroadcasted = sc.broadcast(userRatingDictionary)
#RDD containing business and its ratings
businessRating = trainRDDFinal.map(lambda row: (row[1], {float(row[2])})).reduceByKey(lambda a, b: a | b)
#Dictionary of the above
businessRatingDictionary = dict(businessRating.collect())
#Broadcasting the above dictionary
businessRatingBroadcasted = sc.broadcast(businessRatingDictionary)
#RDD containing all distinct businesses in training set
trainBusinessRDD = trainRDDFinal.map(lambda row: row[1]).distinct()
#Set containing all distinct business in training set
trainBusinessSet = set(trainBusinessRDD.collect())
#RDD with user, business from test set
testDataRDD = testRDDFinal.map(lambda row: (row[0], row[1]))
#RDD containing user and all its rated business
userAndRatedBusinessesRDD = trainRDDFinal.map(lambda row: (row[0], {row[1]})).reduceByKey(lambda a, b: a | b)
#Dictionary of the above
userAndRatedBusinessesDictionary = dict(userAndRatedBusinessesRDD.collect())
#Broadcasting the above dictionary
userAndRatedBusinessesBroadcasted = sc.broadcast(userAndRatedBusinessesDictionary)
#RDD containing business and all users that rated it
businessAndRatedByUsersRDD = trainRDDFinal.map(lambda row: (row[1], {row[0]})).reduceByKey(lambda a, b: a | b)
#Dictionary of the above
businessAndRatedByUsersDictionary = dict(businessAndRatedByUsersRDD.collect())
#RDD containing business as key and user and rating as value
businessAndRatedByUsersAndRatingRDD = trainRDDFinal.map(lambda row: (row[1], {row[0]: float(row[2])})).reduceByKey(lambda x, y: {**x, **y})
#Dictionary of the above
businessAndRatedByUsersAndRatingDictionary = dict(businessAndRatedByUsersAndRatingRDD.collect())
#RDD of TEST user, business where business is present in the train dataset
testDataFilteredRDD = testDataRDD.filter(lambda x: x[1] in trainBusinessSet)
#RDD of TEST user, business, all business rated by the user
userBusinessAndOtherRatedBusinessesRDD = testDataFilteredRDD.map(lambda row: (row[0], row[1], userAndRatedBusinessesBroadcasted.value[row[0]]))
#RDD of TEST key: (currentUser, businessToBePredicted) and value: top 50 similar business as per W-> (business, W, Rating of this business by currentUser)
top50BusinessRDD = userBusinessAndOtherRatedBusinessesRDD.map(pearsonCoefficient)
#RDD of TEST currentUser as key and currentBusiness, predicted_rating as its value
predictedRatingsRDD = top50BusinessRDD.map(predictRatings).reduceByKey(lambda x, y: {**x, **y})
#Dictionary of the above
predictedRatingsDictionary = dict(predictedRatingsRDD.collect())
#Test data's RDD with key as User and value as Business and Rating
#Remove Rating Part when upload on VOCAREUM as test data does not consider rating
#Also make reduceByKey as x | y on VOCAREUM
testUserBusinessAndRatingRDD = testRDDFinal.map(lambda row: (row[0], {row[1]})).reduceByKey(lambda x, y: x | y)
#Dictionary of the above
testUserBusinessAndRatingDictionary = dict(testUserBusinessAndRatingRDD.collect())
with open(outputFile, 'w') as file:
file.write("user_id, business_id, prediction\n")
for user in testUserBusinessAndRatingDictionary:
for business in testUserBusinessAndRatingDictionary[user]:
if user in predictedRatingsDictionary and business in predictedRatingsDictionary[user]:
file.write(user + "," + business + "," + str(float(predictedRatingsDictionary[user][business])) + "\n")
#COLD START CASES
else:
#When no business and user present in training data
if user not in predictedRatingsDictionary and business not in predictedRatingsDictionary[user]:
file.write(user + "," + business + "," + str(float(3)) + "\n")
#When no business present in training data
elif business not in predictedRatingsDictionary[user]:
userRatingArray=userRatingBroadcasted.value[user]
userAverageRating=sum(userRatingArray)/float(len(userRatingArray))
file.write(user + "," + business + "," + str(float(userAverageRating)) + "\n")
#When no user present in training data
else:
businessRatingArray=businessRatingBroadcasted.value[business]
businessAverageRating=sum(businessRatingArray)/float(len(businessRatingArray))
file.write(user + "," + business + "," + str(float(businessAverageRating)) + "\n")
endTime=time.time()
print("Duration : ", endTime - startTime)