Skip to content

Commit edc812a

Browse files
author
Kong Mu
authored
Merge pull request #2 from visualskyrim/develop
Finish the first version
2 parents 6641804 + 4ebc616 commit edc812a

22 files changed

Lines changed: 1955 additions & 42 deletions

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
.idea
2+
project/target/
3+
target/

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# CHANGELOG
2+
3+
# 1.0.0
4+
> 2020-04-07
5+
6+
## Added
7+
- Basic functionality to sessionize the accesses based on IP.

README.md

Lines changed: 38 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,59 @@
11
# DataEngineerChallenge
22

3-
This is an interview challenge for PayPay. Please feel free to fork. Pull Requests will be ignored.
43

5-
The challenge is to make make analytical observations about the data using the distributed tools below.
4+
## Overview
65

7-
## Processing & Analytical goals:
6+
This document describes the solution to https://github.com/Pay-Baymax/DataEngineerChallenge.
7+
This repo will only show the Spark solution.
88

9-
1. Sessionize the web log by IP. Sessionize = aggregrate all page hits by visitor/IP during a session.
10-
https://en.wikipedia.org/wiki/Session_(web_analytics)
119

12-
2. Determine the average session time
10+
## Solution
11+
### Understand the input data
12+
This is my first step. I need to look into the data to find out the data size, schema and so on.
13+
So, what I did is to use **Jupyter** to inspect the input data that I uploaded to the hdfs.
1314

14-
3. Determine unique URL visits per session. To clarify, count a hit to a unique URL only once per session.
15+
From the inspectation, what I learned:
16+
- Potential duration of a session can be extremely long. (up to 11 hours)
17+
- Normal traffic per hour can be around 100k in this data set, with peak traffic around 300k.
18+
- Most sessions will likely end around 20 minutes. And there are plenty of sessions end after 15 minutes.
19+
- There are 15 hours in the data.
1520

16-
4. Find the most engaged users, ie the IPs with the longest session times
21+
For this inspectation of the data, please refer to the [inspectation notebook](./doc/Data%20Inspect/Data%20Inspect.md).
1722

18-
## Additional questions for Machine Learning Engineer (MLE) candidates:
19-
1. Predict the expected load (requests/second) in the next minute
23+
### Design Consideration
2024

21-
2. Predict the session length for a given IP
25+
#### Should I use streaming?
2226

23-
3. Predict the number of unique URL visits by a given IP
27+
Absolutely, yes. If we only think about the mission, it makes perfect sense to process the traffic data in the streaming application.
28+
Normally I would set up logstash to stream the access log from AWS to a kafka topic, then build a streaming application to provide the realtime analysis.
29+
However, given the form of the data is actually a packed file, I assume that the scenario is more of a batched context.
30+
That's why I chose to use Spark to build a batched application.
2431

25-
## Tools allowed (in no particular order):
26-
- Spark (any language, but prefer Scala or Java)
27-
- Pig
28-
- MapReduce (Hadoop 2.x only)
29-
- Flink
30-
- Cascading, Cascalog, or Scalding
3132

32-
If you need Hadoop, we suggest
33-
HDP Sandbox:
34-
http://hortonworks.com/hdp/downloads/
35-
or
36-
CDH QuickStart VM:
37-
http://www.cloudera.com/content/cloudera/en/downloads.html
33+
#### How about the granularity of the batch
3834

35+
Based on the requirement, it would make less sense to calculate the session of the first hour of the day at the beginning of the next day in a daily batch.
36+
Why don't show it in the next hour with an hourly batch?
37+
Besides, the timestamp in the data is in UTC, thus introducing a concept of "day" would be very confusing.
3938

40-
### Additional notes:
41-
- You are allowed to use whatever libraries/parsers/solutions you can find provided you can explain the functions you are implementing in detail.
42-
- IP addresses do not guarantee distinct users, but this is the limitation of the data. As a bonus, consider what additional data would help make better analytical conclusions
43-
- For this dataset, complete the sessionization by time window rather than navigation. Feel free to determine the best session window time on your own, or start with 15 minutes.
44-
- The log file was taken from an AWS Elastic Load Balancer:
45-
http://docs.aws.amazon.com/ElasticLoadBalancing/latest/DeveloperGuide/access-log-collection.html#access-log-entry-format
39+
Another thing we can benefit from using a hourly batch is that, we can potentially reduce the cluster cost by using less resource to process hourly data instead of daily data.
4640

41+
#### How should we deal with the sessions not ended within one hour?
4742

43+
Since we need to calculate the sessions in the next hour, and a session can theoretically last forever,
44+
we need two things:
45+
- Concat the accesses from last hour that are not in any ended session, with the accesses in the current hour.
46+
- A limitation for how long at most a session can last.
4847

49-
## How to complete this challenge:
48+
For the second one, we need it because if some sessions last too long, we will have serious data skew problem.
5049

51-
1. Fork this repo in github
52-
2. Complete the processing and analytics as defined first to the best of your ability with the time provided.
53-
3. Place notes in your code to help with clarity where appropriate. Make it readable enough to present to the PayPay interview team.
54-
4. Include the test code and data in your solution.
55-
5. Complete your work in your own github repo and send the results to us and/or present them during your interview.
50+
#### How should the output look like
5651

57-
## What are we looking for? What does this prove?
52+
According to the [Analytical goals](https://github.com/Pay-Baymax/DataEngineerChallenge#processing--analytical-goals),
53+
all interested metrics are on the **session** instead of individual access.
54+
With that being said, it would make more sense to me to output session with these metrics directly instead of outputting the accesses with a session id attached to them.
55+
This benefits us with:
56+
- Easier and faster to calculate duration, session number and average accesses per session, since they are already aggregated at session level.
57+
- Avoid confusion about "*If a session last for two hours, and then we check the session number for each hour, should this session count as 1 session on each hour?*"
5858

59-
We want to see how you handle:
60-
- New technologies and frameworks
61-
- Messy (ie real) data
62-
- Understanding data transformation
63-
This is not a pass or fail test, we want to hear about your challenges and your successes with this particular problem.
59+
Other that that, we will also output the pending accesses that are not yet being cut into a session. This result will be used as the input for next hour's batch.

build.sbt

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
2+
lazy val versions = new {
3+
val sessionize = "1.0.0"
4+
val jodaTime = "2.9.3"
5+
val log4j = "1.2.17"
6+
val scalatest = "3.0.3"
7+
val sparkVersion = "2.3.3"
8+
val typesafe = "1.3.1"
9+
}
10+
11+
lazy val root = (project in file("."))
12+
.configs(Test)
13+
.settings(
14+
inThisBuild(List(
15+
organization := "com.rakuten.rat",
16+
scalaVersion := "2.11.8",
17+
version := versions.sessionize
18+
)),
19+
name := "sessionize",
20+
fork := true,
21+
parallelExecution := false,
22+
libraryDependencies ++= Seq(
23+
"org.apache.thrift" % "libthrift" % "0.11.0",
24+
"org.apache.spark" %% "spark-core" % versions.sparkVersion % "provided",
25+
"org.apache.spark" %% "spark-sql" % versions.sparkVersion % "provided",
26+
"org.apache.spark" %% "spark-hive" % versions.sparkVersion % "provided",
27+
"com.typesafe" % "config" % versions.typesafe,
28+
"joda-time" % "joda-time" % versions.jodaTime,
29+
"log4j" % "log4j" % versions.log4j,
30+
"log4j" % "apache-log4j-extras" % versions.log4j,
31+
"com.sksamuel.elastic4s" %% "elastic4s-core" % "5.6.0",
32+
"com.sksamuel.elastic4s" %% "elastic4s-http" % "5.6.0",
33+
"org.json4s" %% "json4s-native" % "3.2.11",
34+
"org.json4s" %% "json4s-jackson" % "3.2.11",
35+
"org.scalatest" %% "scalatest" % versions.scalatest % "test",
36+
"com.github.scopt" %% "scopt" % "3.6.0"
37+
)
38+
)
39+
40+
41+
parallelExecution in Test := false
42+
parallelExecution := false
43+
44+
//unmanagedSourceDirectories in Compile += baseDirectory.value / "src" / "main" / "thrift-java"
45+
//scroogeThriftSourceFolder in Compile := { baseDirectory.value / "whitelist-store-service/src/main/thrift" }
46+
47+
lazy val compileScalastyle = taskKey[Unit]("compileScalastyle")
48+
49+
/* scalastyle >= 0.9.0 */
50+
compileScalastyle := scalastyle.in(Compile).toTask("").value
51+
52+
lazy val upgrade = TaskKey[Unit]("upgrade", "Upgrade version")
53+
54+
(compile in Compile) := ((compile in Compile) dependsOn compileScalastyle).value

0 commit comments

Comments
 (0)