Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions api/src/app/enums/operating_systems.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,32 @@ class OpenLabsOS(Enum):
OpenLabsOS.WINDOWS_2019: 32,
OpenLabsOS.WINDOWS_2022: 32,
}

# SSH username mapping for each OS by provider
AWS_SSH_USERNAME_MAP = {
OpenLabsOS.DEBIAN_11: "admin",
OpenLabsOS.DEBIAN_12: "admin",
OpenLabsOS.UBUNTU_20: "ubuntu",
OpenLabsOS.UBUNTU_22: "ubuntu",
OpenLabsOS.UBUNTU_24: "ubuntu",
OpenLabsOS.SUSE_12: "ec2-user",
OpenLabsOS.SUSE_15: "ec2-user",
OpenLabsOS.KALI: "kali",
OpenLabsOS.WINDOWS_2016: "Administrator",
OpenLabsOS.WINDOWS_2019: "Administrator",
OpenLabsOS.WINDOWS_2022: "Administrator",
}

AZURE_SSH_USERNAME_MAP = {
OpenLabsOS.DEBIAN_11: "azureuser",
OpenLabsOS.DEBIAN_12: "azureuser",
OpenLabsOS.UBUNTU_20: "azureuser",
OpenLabsOS.UBUNTU_22: "azureuser",
OpenLabsOS.UBUNTU_24: "azureuser",
OpenLabsOS.SUSE_12: "azureuser",
OpenLabsOS.SUSE_15: "azureuser",
OpenLabsOS.KALI: "azureuser",
OpenLabsOS.WINDOWS_2016: "azureuser",
OpenLabsOS.WINDOWS_2019: "azureuser",
OpenLabsOS.WINDOWS_2022: "azureuser",
}
157 changes: 155 additions & 2 deletions api/tests/integration/api/v1/test_ranges.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
import pytest
from httpx import AsyncClient

from src.app.enums.operating_systems import (
AWS_SSH_USERNAME_MAP,
AZURE_SSH_USERNAME_MAP,
OpenLabsOS,
)
from src.app.enums.providers import OpenLabsProvider
from src.app.schemas.range_schemas import DeployedRangeSchema
from tests.api_test_utils import get_range, get_range_key, login_user
from tests.deploy_test_utils import (
Expand Down Expand Up @@ -113,10 +119,18 @@ async def test_jumpbox_direct_connection(
)

# Connect directly to the jumpbox using its public IP
# Jumpbox typically uses Ubuntu, so get the Ubuntu username for the provider
if range_info.provider == OpenLabsProvider.AWS:
jumpbox_username = AWS_SSH_USERNAME_MAP[OpenLabsOS.UBUNTU_22]
elif range_info.provider == OpenLabsProvider.AZURE:
jumpbox_username = AZURE_SSH_USERNAME_MAP[OpenLabsOS.UBUNTU_22]
else:
pytest.fail(f"Unsupported provider: {range_info.provider}")

await asyncio.to_thread(
ssh_client.connect,
hostname=str(range_info.jumpbox_public_ip),
username="ubuntu",
username=jumpbox_username,
pkey=private_key,
timeout=10,
)
Expand All @@ -126,7 +140,7 @@ async def test_jumpbox_direct_connection(
command_output = stdout.read().decode("utf-8").strip()
error_output = stderr.read().decode("utf-8").strip()

assert "ubuntu" in command_output
assert jumpbox_username in command_output
assert (
not error_output
), f"Error executing 'id' command on jumpbox: {error_output}"
Expand Down Expand Up @@ -155,3 +169,142 @@ async def test_jumpbox_direct_connection(
finally:
if ssh_client:
ssh_client.close()

async def test_jumpbox_to_vm_connections(
self,
integration_client: AsyncClient,
provider_deployed_ranges_for_provider: dict[
RangeType, tuple[DeployedRangeSchema, str, str]
],
range_type: RangeType,
) -> None:
"""Test SSH connection from the jumpbox to all VMs in the range.

This test verifies:
1. Successful SSH authentication to the jumpbox.
2. SSH tunneling capability from jumpbox to all VMs in the range.
3. Command execution on all accessible VMs through the jumpbox.
"""
deployed_range = provider_deployed_ranges_for_provider[range_type]
range_info, email, password = deployed_range

assert await login_user(
integration_client, email, password
), "Failed to login to the deployed range account."

private_key_str = await get_range_key(integration_client, range_info.id)
assert (
private_key_str
), f"Could not retrieve key for range with ID: {range_info.id}"

# Extract all private IPs and their OS from range_info
host_info: list[dict[str, str]] = []
for vpc in range_info.vpcs:
for subnet in vpc.subnets:
for host in subnet.hosts:
host_info.append(
{
"ip": str(host.ip_address),
"os": host.os.value,
"hostname": host.hostname,
}
)

ssh_client = None
try:
private_key_file = io.StringIO(private_key_str)
private_key = paramiko.RSAKey.from_private_key(private_key_file)

ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(
paramiko.AutoAddPolicy() # noqa: S507
)

# Connect to the jumpbox using its public IP
# Jumpbox typically uses Ubuntu, so get the Ubuntu username for the provider
if range_info.provider == OpenLabsProvider.AWS:
jumpbox_username = AWS_SSH_USERNAME_MAP[OpenLabsOS.UBUNTU_22]
elif range_info.provider == OpenLabsProvider.AZURE:
jumpbox_username = AZURE_SSH_USERNAME_MAP[OpenLabsOS.UBUNTU_22]
else:
pytest.fail(f"Unsupported provider: {range_info.provider}")

await asyncio.to_thread(
ssh_client.connect,
hostname=str(range_info.jumpbox_public_ip),
username=jumpbox_username,
pkey=private_key,
timeout=10,
)

# Get jumpbox transport for tunneling
jumpbox_transport = ssh_client.get_transport()
assert jumpbox_transport is not None, "Failed to get SSH transport"

for host_data in host_info:
ip = host_data["ip"]
os_name = host_data["os"]
hostname = host_data["hostname"]

target_client = None
try:
# Create a tunnel channel through the jumpbox
src_addr = (str(range_info.jumpbox_public_ip), 22)
dest_addr = (ip, 22)
jumpbox_channel = jumpbox_transport.open_channel(
"direct-tcpip", dest_addr, src_addr
)

target_client = paramiko.SSHClient()
target_client.set_missing_host_key_policy(
paramiko.AutoAddPolicy() # noqa: S507
)

# Get the appropriate SSH username for this OS based on provider
os_enum = OpenLabsOS(os_name)
if range_info.provider == OpenLabsProvider.AWS:
username = AWS_SSH_USERNAME_MAP[os_enum]
elif range_info.provider == OpenLabsProvider.AZURE:
username = AZURE_SSH_USERNAME_MAP[os_enum]
else:
pytest.fail(f"Unsupported provider: {range_info.provider}")

await asyncio.to_thread(
target_client.connect,
hostname=ip,
username=username,
pkey=private_key,
sock=jumpbox_channel,
timeout=10,
)

# Validate command execution with 'id' command
_, stdout, stderr = await asyncio.to_thread(
target_client.exec_command, "id"
)
command_output = stdout.read().decode("utf-8").strip()
error_output = stderr.read().decode("utf-8").strip()

assert (
username in command_output
), f"Expected username '{username}' not found in output: {command_output}"
assert (
not error_output
), f"Error executing 'id' command on {hostname} ({ip}): {error_output}"
print(
f"Successfully verified user identity on {hostname} ({ip}) with username '{username}'"
)
except Exception as e:
pytest.fail(
f"Exception connecting to {hostname} ({ip}) with username '{username}': {e}"
)
finally:
if target_client:
target_client.close()
except paramiko.AuthenticationException:
pytest.fail(
"SSH authentication failed for jumpbox. Check username and private key.",
)
finally:
if ssh_client:
ssh_client.close()
Loading