-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcreate-okta-group
More file actions
executable file
·122 lines (97 loc) · 3.46 KB
/
create-okta-group
File metadata and controls
executable file
·122 lines (97 loc) · 3.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python
"""
Configurable via environment variables:
OKTA_API_SUBDOMAIN: an okta url subdomain like "example" for the url below
OKTA_API_URL: a url like https://example.okta.com/api/1
OKTA_API_KEY: a token from your Okta admin site at the path /admin/access/api/tokens
If a subdomain is provided it takes priority over a provided url.
This script then takes a minimum of two arguments:
the first being the name of a group and all
remaining arguments are usernames or emails of
users to add to that group.
"""
import argparse
import os
import sys
import requests
OKTA_API_SUBDOMAIN = os.environ.get('OKTA_API_SUBDOMAIN')
if OKTA_API_SUBDOMAIN:
OKTA_API_URL = f"https://{OKTA_API_SUBDOMAIN}.okta.com"
else:
OKTA_API_URL = os.environ.get('OKTA_API_URL')
OKTA_API_URL = OKTA_API_URL + '/api/v1'
OKTA_API_KEY = os.environ.get('OKTA_API_KEY')
OKTA_API_HEADERS = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': f"SSWS {OKTA_API_KEY}"
}
OKTA_GROUP_ALREADY_EXISTS_MESSAGE = \
'name: An object with this field already' \
'exists in the current organization'
GROUP_IDS = {}
class UnauthorizedError(Exception):
pass
class UnknownError(Exception):
pass
def okta_api_post_request(path: str, json: dict = {}) -> requests.models.Response:
response = requests.post(
OKTA_API_URL + path,
headers=OKTA_API_HEADERS,
json=json
)
if response.status_code == 401:
raise UnauthorizedError
return response
def okta_api_get_request(path: str) -> requests.models.Response:
response = requests.get(
OKTA_API_URL + path,
headers=OKTA_API_HEADERS
)
if response.status_code == 401:
raise UnauthorizedError
return response
def add_okta_group(name: str, description: str = '') -> bool:
payload = dict(profile=dict(name=name, description=description))
response = okta_api_post_request('/groups', json=payload)
if response.status_code > 399:
raise UnknownError
else:
return True
def get_okta_group_id(name: str) -> str:
if name in GROUP_IDS:
return GROUP_IDS[name]
response = okta_api_get_request(f"/groups?q={name}")
response_json = response.json()
group_id = ''
if response.status_code == 200:
if len(response_json) == 1:
group_id = response_json[0]['id']
else:
for group in response_json:
if group['profile']['name'] == name:
group_id = group['id']
if not group_id:
print(f"No group with name {name} found", file=sys.stderr)
GROUP_IDS[name] = group_id
return group_id
def parse_args(args: list) -> argparse.Namespace:
parser = argparse.ArgumentParser(description='Create a group in Okta')
parser.add_argument('name')
parser.add_argument('-d', '--description', default='')
return parser.parse_args(args)
def main(args: list) -> None:
parameters = parse_args(args)
if get_okta_group_id(parameters.name):
print(f"A group in Okta matching {parameters.name} already exists.")
sys.exit(1)
else:
add_okta_group(name=parameters.name, description=parameters.description)
if __name__ == '__main__':
if not (OKTA_API_URL or OKTA_API_KEY or len(sys.argv) < 2):
print(__doc__)
sys.exit(2)
try:
main(sys.argv[1:])
except UnauthorizedError:
print('Request was unauthorized. Please check your Okta token and try again.', file=sys.stderr)