Skip to content

Commit 5957b8e

Browse files
✅ test: add Python integration tests for security group
1 parent 91b862e commit 5957b8e

File tree

3 files changed

+172
-0
lines changed

3 files changed

+172
-0
lines changed

tests/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+

tests/integration_utils.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import base64
2+
import random
3+
import string
4+
import time
5+
6+
def random_string(length=10):
7+
alphabet = string.ascii_lowercase + string.digits
8+
return "".join(random.choice(alphabet) for _ in range(length))
9+
10+
11+
def tagged_name(prefix="osc-sdk-python-test", length=10):
12+
return "{}-{}".format(prefix, random_string(length))
13+
14+
15+
def log_step(message):
16+
print("[tests] {}".format(message), flush=True)
17+
18+
19+
def wait_until(fetch, ready, timeout=300, interval=10, failure=None, describe=None):
20+
deadline = time.time() + timeout
21+
last_value = None
22+
while time.time() < deadline:
23+
last_value = fetch()
24+
if describe is not None:
25+
log_step(describe(last_value))
26+
if ready(last_value):
27+
return last_value
28+
if failure is not None and failure(last_value):
29+
raise AssertionError("Resource entered an unexpected state: {}".format(last_value))
30+
time.sleep(interval)
31+
raise AssertionError("Timed out waiting for resource state: {}".format(last_value))
32+
33+
34+
def first(items, message):
35+
if not items:
36+
raise AssertionError(message)
37+
return items[0]
38+
39+
40+
def first_subregion_name(gw):
41+
subregions = gw.ReadSubregions()
42+
subregion = first(subregions.get("Subregions"), "No subregions returned")
43+
name = subregion.get("SubregionName")
44+
if not name:
45+
raise AssertionError("SubregionName is missing")
46+
return name
47+
48+
49+
def latest_public_ubuntu_image_id(gw):
50+
images = gw.ReadImages(
51+
Filters={
52+
"AccountAliases": ["Outscale"],
53+
"ImageNames": ["Ubuntu*"],
54+
"PermissionsToLaunchGlobalPermission": True,
55+
"States": ["available"],
56+
},
57+
ResultsPerPage=10,
58+
).get("Images", [])
59+
image = first(
60+
sorted(images, key=lambda item: item.get("CreationDate", ""), reverse=True),
61+
"No public Ubuntu image returned",
62+
)
63+
image_id = image.get("ImageId")
64+
if not image_id:
65+
raise AssertionError("ImageId is missing")
66+
return image_id
67+
68+
69+
def read_single(gw, action, key, resource_id_key, resource_id):
70+
response = getattr(gw, action)(
71+
Filters={
72+
resource_id_key: [resource_id],
73+
}
74+
)
75+
items = response.get(key)
76+
if not items or len(items) != 1:
77+
raise AssertionError(
78+
"{} did not return exactly one resource for {}".format(action, resource_id)
79+
)
80+
return items[0]
81+
82+
83+
def create_name_tag(resource_id):
84+
return {
85+
"ResourceIds": [resource_id],
86+
"Tags": [{"Key": "Name", "Value": tagged_name("osc-sdk-python-tag")}],
87+
}
88+
89+
90+
def linux_http_user_data():
91+
script = """#!/bin/sh
92+
set -eu
93+
mkdir -p /tmp/python-sdk-lb
94+
echo ok > /tmp/python-sdk-lb/index.html
95+
nohup python3 -m http.server 80 --directory /tmp/python-sdk-lb >/tmp/python-sdk-lb/http.log 2>&1 &
96+
"""
97+
return base64.b64encode(script.encode("utf-8")).decode("ascii")

tests/test_security_group.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import sys
2+
import unittest
3+
4+
sys.path.append("..")
5+
from osc_sdk_python import Gateway
6+
from tests.integration_utils import log_step, tagged_name
7+
8+
9+
class TestSecurityGroup(unittest.TestCase):
10+
def test_security_group_lifecycle(self):
11+
gw = Gateway()
12+
security_group_id = None
13+
tcp = "tcp"
14+
ip_range = "0.0.0.0/0"
15+
try:
16+
log_step("Creating security group")
17+
response = gw.CreateSecurityGroup(
18+
SecurityGroupName=tagged_name("osc-sdk-python-sg"),
19+
Description="Test security group lifecycle",
20+
)
21+
security_group = response.get("SecurityGroup")
22+
self.assertIsInstance(security_group, dict)
23+
security_group_id = security_group.get("SecurityGroupId")
24+
self.assertTrue(security_group_id)
25+
log_step("Created security group {}".format(security_group_id))
26+
27+
log_step("Creating inbound SSH rule on {}".format(security_group_id))
28+
rule_response = gw.CreateSecurityGroupRule(
29+
SecurityGroupId=security_group_id,
30+
Flow="Inbound",
31+
IpProtocol=tcp,
32+
FromPortRange=22,
33+
ToPortRange=22,
34+
IpRange=ip_range,
35+
)
36+
self.assertIsInstance(rule_response.get("SecurityGroup"), dict)
37+
38+
log_step("Reading security group {}".format(security_group_id))
39+
read_response = gw.ReadSecurityGroups(
40+
Filters={"SecurityGroupIds": [security_group_id]}
41+
)
42+
security_groups = read_response.get("SecurityGroups")
43+
self.assertIsInstance(security_groups, list)
44+
self.assertEqual(len(security_groups), 1)
45+
46+
rules = security_groups[0].get("InboundRules", [])
47+
self.assertTrue(
48+
any(
49+
rule.get("FromPortRange") == 22
50+
and rule.get("ToPortRange") == 22
51+
and rule.get("IpProtocol") == tcp
52+
and ip_range in rule.get("IpRanges", [])
53+
for rule in rules
54+
),
55+
"expected SSH inbound rule on the security group",
56+
)
57+
58+
log_step("Deleting inbound SSH rule on {}".format(security_group_id))
59+
gw.DeleteSecurityGroupRule(
60+
SecurityGroupId=security_group_id,
61+
Flow="Inbound",
62+
IpProtocol=tcp,
63+
FromPortRange=22,
64+
ToPortRange=22,
65+
IpRange=ip_range,
66+
)
67+
finally:
68+
if security_group_id:
69+
log_step("Deleting security group {}".format(security_group_id))
70+
gw.DeleteSecurityGroup(SecurityGroupId=security_group_id)
71+
72+
73+
if __name__ == "__main__":
74+
unittest.main()

0 commit comments

Comments
 (0)