Skip to content

Commit bc7d4ae

Browse files
✅ test: add python integration tests for LB
1 parent 60abd9e commit bc7d4ae

File tree

1 file changed

+124
-0
lines changed

1 file changed

+124
-0
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import sys
2+
import time
3+
import unittest
4+
5+
sys.path.append("..")
6+
from osc_sdk_python import Gateway
7+
from tests.integration_utils import (
8+
create_name_tag,
9+
first_subregion_name,
10+
latest_public_ubuntu_image_id,
11+
linux_http_user_data,
12+
log_step,
13+
read_single,
14+
tagged_name,
15+
)
16+
17+
18+
class TestLoadBalancerBackend(unittest.TestCase):
19+
def test_load_balancer_backend_lifecycle(self):
20+
gw = Gateway()
21+
subregion_name = first_subregion_name(gw)
22+
image_id = latest_public_ubuntu_image_id(gw)
23+
log_step("Using subregion {} and image {}".format(subregion_name, image_id))
24+
vm_id = None
25+
load_balancer_name = tagged_name("osc-sdk-python-lb")
26+
load_balancer_created = False
27+
try:
28+
log_step("Creating backend VM")
29+
vm_response = gw.CreateVms(
30+
ImageId=image_id,
31+
MinVmsCount=1,
32+
MaxVmsCount=1,
33+
Placement={"SubregionName": subregion_name, "Tenancy": "default"},
34+
UserData=linux_http_user_data(),
35+
VmType="tinav4.c1r1p2",
36+
)
37+
vms = vm_response.get("Vms")
38+
self.assertIsInstance(vms, list)
39+
self.assertEqual(len(vms), 1)
40+
vm_id = vms[0].get("VmId")
41+
self.assertTrue(vm_id)
42+
log_step("Created backend VM {}".format(vm_id))
43+
44+
gw.CreateTags(**create_name_tag(vm_id))
45+
log_step("Tagged backend VM {}".format(vm_id))
46+
47+
for _ in range(36):
48+
vm = read_single(gw, "ReadVms", "Vms", "VmIds", vm_id)
49+
log_step("VM {} state={}".format(vm_id, vm.get("State")))
50+
if vm.get("State") == "running":
51+
break
52+
if vm.get("State") in ("terminated", "shutting-down"):
53+
self.fail("VM {} entered unexpected state {}".format(vm_id, vm.get("State")))
54+
time.sleep(10)
55+
56+
log_step("Creating load balancer {}".format(load_balancer_name))
57+
load_balancer_response = gw.CreateLoadBalancer(
58+
LoadBalancerName=load_balancer_name,
59+
Listeners=[
60+
{
61+
"BackendPort": 80,
62+
"LoadBalancerPort": 80,
63+
"LoadBalancerProtocol": "TCP",
64+
"BackendProtocol": "TCP",
65+
}
66+
],
67+
SubregionNames=[subregion_name],
68+
Tags=[{"Key": "Name", "Value": tagged_name("osc-sdk-python-lb-tag")}],
69+
)
70+
self.assertIsInstance(load_balancer_response.get("LoadBalancer"), dict)
71+
load_balancer_created = True
72+
73+
log_step("Linking backend VM {} to {}".format(vm_id, load_balancer_name))
74+
gw.LinkLoadBalancerBackendMachines(
75+
LoadBalancerName=load_balancer_name, BackendVmIds=[vm_id]
76+
)
77+
78+
log_step("Reading load balancer {}".format(load_balancer_name))
79+
read_balancers = gw.ReadLoadBalancers(
80+
Filters={"LoadBalancerNames": [load_balancer_name]}
81+
)
82+
balancers = read_balancers.get("LoadBalancers")
83+
self.assertIsInstance(balancers, list)
84+
self.assertEqual(len(balancers), 1)
85+
self.assertIn(vm_id, balancers[0].get("BackendVmIds", []))
86+
87+
health = None
88+
for _ in range(18):
89+
health = gw.ReadVmsHealth(
90+
LoadBalancerName=load_balancer_name, BackendVmIds=[vm_id]
91+
)
92+
entry_count = len(health.get("BackendVmHealth", []) or [])
93+
log_step(
94+
"Backend health entries for {}: {}".format(
95+
load_balancer_name, entry_count
96+
)
97+
)
98+
if any(
99+
entry.get("VmId") == vm_id
100+
for entry in health.get("BackendVmHealth", []) or []
101+
):
102+
break
103+
time.sleep(10)
104+
105+
self.assertIsNotNone(health)
106+
self.assertTrue(
107+
any(
108+
entry.get("VmId") == vm_id
109+
for entry in health.get("BackendVmHealth", []) or []
110+
)
111+
)
112+
log_step("Backend VM {} is registered in {}".format(vm_id, load_balancer_name))
113+
finally:
114+
if load_balancer_created:
115+
log_step("Deleting load balancer {}".format(load_balancer_name))
116+
gw.DeleteLoadBalancer(LoadBalancerName=load_balancer_name)
117+
if vm_id:
118+
time.sleep(1)
119+
log_step("Deleting backend VM {}".format(vm_id))
120+
gw.DeleteVms(VmIds=[vm_id])
121+
122+
123+
if __name__ == "__main__":
124+
unittest.main()

0 commit comments

Comments
 (0)