Skip to content

Latest commit

 

History

History
1335 lines (1187 loc) · 24.5 KB

File metadata and controls

1335 lines (1187 loc) · 24.5 KB
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("yarn") \
    .appName("chris-adhoc") \
    .config('java.library.path', "<--native-libs-->") \
    .config('spark.driver.cores', "8") \
    .config('spark.driver.memory', "8g") \
    .config('spark.executor.cores', "8") \
    .config('spark.executor.memory', "16g") \
    .config('spark.executor.instances', "16") \
    .config('spark.serializer', "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.warehouse.dir", "<--warehouse-location-->") \
    .enableHiveSupport()\
    .getOrCreate()
%matplotlib inline
inputDF = spark.read.option("delimiter", " ").csv("<--file-location-->/2015_07_22_mktplace_shop_web_log_sample.log.gz")

Check data

inputDF.count()
1158500
sampledRows= inputDF.take(10)
sampledRows
[Row(_c0='2015-07-22T09:00:28.019143Z', _c1='marketpalce-shop', _c2='123.242.248.130:54635', _c3='10.0.6.158:80', _c4='0.000022', _c5='0.026109', _c6='0.00002', _c7='200', _c8='200', _c9='0', _c10='699', _c11='GET https://paytm.com:443/shop/authresponse?code=f2405b05-e2ee-4b0d-8f6a-9fed0fcfe2e0&state=null HTTP/1.1', _c12='Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.130 Safari/537.36', _c13='ECDHE-RSA-AES128-GCM-SHA256', _c14='TLSv1.2'),
 Row(_c0='2015-07-22T09:00:27.894580Z', _c1='marketpalce-shop', _c2='203.91.211.44:51402', _c3='10.0.4.150:80', _c4='0.000024', _c5='0.15334', _c6='0.000026', _c7='200', _c8='200', _c9='0', _c10='1497', _c11='GET https://paytm.com:443/shop/wallet/txnhistory?page_size=10&page_number=0&channel=web&version=2 HTTP/1.1', _c12='Mozilla/5.0 (Windows NT 6.1; rv:39.0) Gecko/20100101 Firefox/39.0', _c13='ECDHE-RSA-AES128-GCM-SHA256', _c14='TLSv1.2'),
 Row(_c0='2015-07-22T09:00:27.885745Z', _c1='marketpalce-shop', _c2='1.39.32.179:56419', _c3='10.0.4.244:80', _c4='0.000024', _c5='0.164958', _c6='0.000017', _c7='200', _c8='200', _c9='0', _c10='157', _c11='GET https://paytm.com:443/shop/wallet/txnhistory?page_size=10&page_number=0&channel=web&version=2 HTTP/1.1', _c12='Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.134 Safari/537.36', _c13='ECDHE-RSA-AES128-GCM-SHA256', _c14='TLSv1.2'),
 Row(_c0='2015-07-22T09:00:28.048369Z', _c1='marketpalce-shop', _c2='180.179.213.94:48725', _c3='10.0.6.108:80', _c4='0.00002', _c5='0.002333', _c6='0.000021', _c7='200', _c8='200', _c9='0', _c10='35734', _c11='GET https://paytm.com:443/shop/p/micromax-yu-yureka-moonstone-grey-MOBMICROMAX-YU-DUMM141CD60AF7C_34315 HTTP/1.0', _c12='-', _c13='ECDHE-RSA-AES128-GCM-SHA256', _c14='TLSv1.2'),
 Row(_c0='2015-07-22T09:00:28.036251Z', _c1='marketpalce-shop', _c2='120.59.192.208:13527', _c3='10.0.4.217:80', _c4='0.000024', _c5='0.015091', _c6='0.000016', _c7='200', _c8='200', _c9='68', _c10='640', _c11='POST https://paytm.com:443/papi/v1/expresscart/verify HTTP/1.1', _c12='Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.89 Safari/537.36', _c13='ECDHE-RSA-AES128-GCM-SHA256', _c14='TLSv1.2'),
 Row(_c0='2015-07-22T09:00:28.033793Z', _c1='marketpalce-shop', _c2='117.239.195.66:50524', _c3='10.0.6.195:80', _c4='0.000024', _c5='0.02157', _c6='0.000021', _c7='200', _c8='200', _c9='0', _c10='60', _c11='GET https://paytm.com:443/api/user/favourite?channel=web&version=2 HTTP/1.1', _c12='Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.134 Safari/537.36', _c13='ECDHE-RSA-AES128-GCM-SHA256', _c14='TLSv1.2'),
 Row(_c0='2015-07-22T09:00:28.055029Z', _c1='marketpalce-shop', _c2='101.60.186.26:33177', _c3='10.0.4.244:80', _c4='0.00002', _c5='0.001098', _c6='0.000022', _c7='200', _c8='200', _c9='0', _c10='1150', _c11='GET https://paytm.com:443/favicon.ico HTTP/1.1', _c12='Mozilla/5.0 (Windows NT 6.3; rv:27.0) Gecko/20100101 Firefox/27.0', _c13='ECDHE-RSA-AES128-GCM-SHA256', _c14='TLSv1.2'),
 Row(_c0='2015-07-22T09:00:28.050298Z', _c1='marketpalce-shop', _c2='59.183.41.47:62014', _c3='10.0.4.227:80', _c4='0.000021', _c5='0.008161', _c6='0.000021', _c7='200', _c8='200', _c9='0', _c10='72', _c11='GET https://paytm.com:443/papi/rr/products/6937770/statistics?channel=web&version=2 HTTP/1.1', _c12='Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.134 Safari/537.36', _c13='ECDHE-RSA-AES128-GCM-SHA256', _c14='TLSv1.2'),
 Row(_c0='2015-07-22T09:00:28.059081Z', _c1='marketpalce-shop', _c2='117.239.195.66:50538', _c3='10.0.4.227:80', _c4='0.000019', _c5='0.001035', _c6='0.000021', _c7='200', _c8='200', _c9='0', _c10='396', _c11='GET https://paytm.com:443/images/greyStar.png HTTP/1.1', _c12='Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.134 Safari/537.36', _c13='ECDHE-RSA-AES128-GCM-SHA256', _c14='TLSv1.2'),
 Row(_c0='2015-07-22T09:00:28.054939Z', _c1='marketpalce-shop', _c2='183.83.237.83:49687', _c3='10.0.6.108:80', _c4='0.000023', _c5='0.008762', _c6='0.000021', _c7='200', _c8='200', _c9='0', _c10='214', _c11='GET https://paytm.com:443/shop/cart?channel=web&version=2 HTTP/1.1', _c12='Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.124 Safari/537.36', _c13='ECDHE-RSA-AES128-GCM-SHA256', _c14='TLSv1.2')]
inputDF.printSchema
<bound method DataFrame.printSchema of DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string, _c12: string, _c13: string, _c14: string]>
inputDF.createOrReplaceTempView("input")

Put schema

spark.sql("""select _c0 as timestamp, _c1 as elb, _c2 as client_ip from input""").createOrReplaceTempView("normInput")

Check unique clients

spark.sql("""select count(distinct(client_ip)) from normInput""").toPandas()
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
</style>
count(DISTINCT client_ip)
0 404391
spark.sql("""
select client_ip, max(unix_timestamp(timestamp, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) - min(unix_timestamp(timestamp, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) as max_duration from normInput group by client_ip
""").createOrReplaceTempView("max_duration_tbl")
max_duration_distribution = spark.sql("""
select max_duration, count(*) as client_count
from max_duration_tbl
group by max_duration
""").toPandas()

Check possible duration distribution

max_duration_distribution.sort_values("client_count", ascending=False)
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
</style>
max_duration client_count
470 0 213999
9623 819 320
4134 820 287
6665 900 279
3622 880 276
18297 887 273
4918 856 272
17706 862 272
13645 898 271
1932 876 271
10771 919 269
6462 833 269
16396 895 267
15518 881 267
15516 809 267
7378 818 266
7982 911 266
7035 890 266
5481 861 266
15023 860 265
7481 806 264
8098 867 262
2448 909 262
12923 931 261
7372 874 261
4132 915 261
17099 848 261
6748 896 261
6935 859 260
11017 922 260
... ... ...
7722 32708 1
7757 57644 1
7724 11523 1
7725 46133 1
7726 58018 1
7727 48566 1
7728 9442 1
7729 43369 1
7730 13272 1
7731 33172 1
7732 14011 1
7733 8883 1
7735 17352 1
7737 46261 1
7738 54709 1
7739 18226 1
7740 26233 1
7741 65776 1
7744 4980 1
7745 2147 1
7746 55424 1
7747 8835 1
7748 17837 1
7750 44074 1
7751 57357 1
7752 42626 1
7753 31023 1
7754 54557 1
7756 28381 1
19595 8826 1

19596 rows × 2 columns

max_duration_distribution.sort_values("max_duration", ascending=False)
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
</style>
max_duration client_count
15778 67414 1
3677 67349 1
15594 67348 1
12403 67296 1
4980 67280 1
6416 67277 1
1895 67275 1
14674 67236 1
14216 67225 1
10852 67194 1
15980 67191 1
12046 67185 1
12231 67182 1
11737 67175 1
168 67169 1
4287 67168 1
10964 67144 2
13364 67138 1
6559 67132 1
7031 67124 1
12140 67114 2
15187 67112 1
4587 67111 1
15337 67109 1
741 67107 1
13598 67106 1
5756 67099 1
12699 67098 1
7293 67095 1
7682 67093 1
... ... ...
3 29 88
6287 28 90
4924 27 67
2 26 90
3627 25 77
19403 24 83
16810 23 84
871 22 76
15523 21 67
17107 20 94
354 19 85
14631 18 80
5656 17 85
18624 16 82
15989 15 78
15026 14 72
13965 13 69
10846 12 78
11691 11 87
7488 10 79
4808 9 85
11221 8 77
1095 7 91
4420 6 100
6666 5 84
13649 4 88
10389 3 103
12402 2 75
6747 1 116
470 0 213999

19596 rows × 2 columns

Check possible duration within 15 min

max_duration_distribution = spark.sql("""
select int(max_duration / 60) as duration_bucket, count(*) as client_count
from max_duration_tbl
where max_duration > 0 and max_duration < 15 * 60
group by int(max_duration / 60)
""").toPandas()
max_duration_distribution.sort_values("duration_bucket") # Bucketed in minute
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
</style>
duration_bucket client_count
14 0 5202
1 1 6924
13 2 7384
4 3 8701
7 4 8434
5 5 8207
3 6 8551
9 7 9346
8 8 9307
6 9 10443
10 10 10396
11 11 11104
0 12 12490
2 13 14140
12 14 14953
max_duration_distribution.sort_values("duration_bucket").plot.bar()
<matplotlib.axes._subplots.AxesSubplot at 0x7f809bc98630>

png

Check possible duration within 30 min

max_duration_distribution_30m = spark.sql("""
select int(max_duration / 60) as duration_bucket, count(*) as client_count
from max_duration_tbl
where max_duration > 0 and max_duration < 30 * 60
group by int(max_duration / 60)
""").toPandas()
max_duration_distribution_30m.sort_values("duration_bucket") # Bucketed in minute
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
</style>
duration_bucket client_count
28 0 5202
5 1 6924
27 2 7384
9 3 8701
16 4 8434
11 5 8207
7 6 8551
19 7 9346
17 8 9307
14 9 10443
20 10 10396
25 11 11104
3 12 12490
6 13 14140
26 14 14953
13 15 13129
8 16 4747
15 17 1024
29 18 350
12 19 138
10 20 107
24 21 71
4 22 70
18 23 68
22 24 62
21 25 69
2 26 57
1 27 54
0 28 68
23 29 41
max_duration_distribution_30m.sort_values("duration_bucket").plot.bar(x="duration_bucket", y="client_count")
<matplotlib.axes._subplots.AxesSubplot at 0x7f80a375e5f8>

png

Time-wise distribution

spark.sql("""
select substring(timestamp, 0, 13), count(*) from normInput
group by substring(timestamp, 0, 13)
order by substring(timestamp, 0, 13)
""").toPandas()
<style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; }
.dataframe tbody tr th {
    vertical-align: top;
}

.dataframe thead th {
    text-align: right;
}
</style>
substring(timestamp, 0, 13) count(1)
0 2015-07-22T02 26791
1 2015-07-22T05 62310
2 2015-07-22T06 63185
3 2015-07-22T07 1282
4 2015-07-22T09 99999
5 2015-07-22T10 299997
6 2015-07-22T11 100135
7 2015-07-22T12 12
8 2015-07-22T13 201
9 2015-07-22T15 184
10 2015-07-22T16 299997
11 2015-07-22T17 97369
12 2015-07-22T18 80789
13 2015-07-22T19 97
14 2015-07-22T21 26152
_89.plot.bar(x="substring(timestamp, 0, 13)", y="count(1)")
<matplotlib.axes._subplots.AxesSubplot at 0x7f809bb882e8>

png