-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathuser_microservice.py
More file actions
105 lines (87 loc) · 4.96 KB
/
user_microservice.py
File metadata and controls
105 lines (87 loc) · 4.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from locust import HttpLocust, TaskSet, task
import json
class UserBehavior(TaskSet):
def on_start(self):
""" on_start is called when a Locust start before
any task is scheduled
"""
self.token = self.login()
self.headers = {'Authorization': 'Bearer ' + self.token}
self.all_ids = []
def on_stop(self):
#serach all test users by name
by_name = self.client.get("https://usermicroservice-climatetree.azurewebsites.net/v1/user/searchname/temp_user", headers=self.headers)
users = by_name.json()["users"]
for x in users:
if (x["userId"] not in self.all_ids):
self.all_ids.append(x["userId"])
#delete all test_users
for x in self.all_ids:
delete_url = "http://usermicroservice-climatetree.azurewebsites.net/v1/user/" + str(x)
self.client.delete(delete_url, headers = self.headers)
#create user
#successful login
def login(self):
response = self.client.post("https://usermicroservice-climatetree.azurewebsites.net/v1/user/login", json={
"username":"load_test_user",
"email":"load_test_user@gmail.com"
})
return json.loads(response._content)['jwtToken']
@task(1)
def user_lifetime(self):
#create another user
test_user = self.client.post("https://usermicroservice-climatetree.azurewebsites.net/v1/user/login", json={
"username":"temp_user",
"email":"temp_user@gmail.com"
})
#unauthorized login
with self.client.post('https://usermicroservice-climatetree.azurewebsites.net/login',json={"username":"load_test_user", "email":"<load_test_user@gmail.com>"}, catch_response=True) as unauthorized:
if (unauthorized.status_code==401):
unauthorized.success()
#method not allow
with self.client.post('https://usermicroservice-climatetree.azurewebsites.net/v1/user/login1',json={"username":"load_test_user", "email":"<load_test_user@gmail.com>"}, catch_response=True) as method:
if (method.status_code==405):
method.success()
#serach user by name
by_name = self.client.get("https://usermicroservice-climatetree.azurewebsites.net/v1/user/searchname/temp_user", headers=self.headers)
# serach user by role
role_url = "https://usermicroservice-climatetree.azurewebsites.net/v1/user/searchrole/3"
self.client.get(role_url, headers = self.headers)
#search user by email
self.client.get("https://usermicroservice-climatetree.azurewebsites.net/v1/user/searchemail/temp_user@gmail.com", headers=self.headers)
#search blacklisted user
self.client.get("https://usermicroservice-climatetree.azurewebsites.net/v1/user/flagged_users")
#update user role
all_users = by_name.json()["users"]
if (len(all_users) != 0):
cur_user_id = str(by_name.json()["users"][0]["userId"])
role_id = all_users[0]["role"]["roleId"]
if (cur_user_id is not None and role_id is not None and role_id != 2):
update_url = "http://usermicroservice-climatetree.azurewebsites.net/v1/user/" + cur_user_id + "/2"
self.client.put(update_url, headers = self.headers)
if (cur_user_id is not None and role_id is not None and role_id != 3):
update_url = "http://usermicroservice-climatetree.azurewebsites.net/v1/user/" + cur_user_id + "/3"
self.client.put(update_url, headers = self.headers)
# black list user
if (len(all_users) != 0):
cur_user_id = str(by_name.json()["users"][0]["userId"])
blacklisted = all_users[0]["blacklisted"]
# black list user
if (cur_user_id is not None and not blacklisted):
baned_user_url = "http://usermicroservice-climatetree.azurewebsites.net/v1/user/blacklist/" + cur_user_id
self.client.put(baned_user_url, headers=self.headers)
# unblacklist
elif (cur_user_id is not None and blacklisted):
unbaned_user_url = "http://usermicroservice-climatetree.azurewebsites.net/v1/user/unblacklist/" + cur_user_id
self.client.put(unbaned_user_url, headers=self.headers)
if (len(by_name.json()["users"]) != 0):
cur_user_id = str(by_name.json()["users"][0]["userId"])
#request role change
role_change_url = "http://usermicroservice-climatetree.azurewebsites.net/v1/user/request_role_change/" + cur_user_id + "/2"
self.client.post("http://usermicroservice-climatetree.azurewebsites.net/v1/user/request_role_change/3/2", headers=self.headers)
# get all role change requests
self.client.get("https://usermicroservice-climatetree.azurewebsites.net/v1/user/get_all_role_update_requests", headers=self.headers)
class WebsiteUser(HttpLocust):
task_set = UserBehavior
min_wait = 5000
max_wait = 9000