Skip to content
2 changes: 2 additions & 0 deletions oks_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .cache import cache
from .quotas import quotas
from .netpeering import netpeering
from .user import user

from .utils import ctx_update, install_completions, profile_completer, cluster_completer, project_completer

Expand Down Expand Up @@ -62,6 +63,7 @@ def cli(ctx, project_name, cluster_name, profile, verbose):
cli.add_command(cache)
cli.add_command(quotas)
cli.add_command(netpeering)
cli.add_command(user)

def recursive_help(cmd, parent=None):
"""Recursively prints help for all commands and subcommands."""
Expand Down
169 changes: 169 additions & 0 deletions oks_cli/user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import click
from datetime import datetime
import dateutil.parser
import human_readable
import prettytable
import json

from nacl.public import PrivateKey, SealedBox
from nacl.encoding import Base64Encoder

from .utils import do_request, print_output, find_project_id_by_name, ctx_update, login_profile, profile_completer, project_completer, JSONClickException

# DEIFNE THE USER COMMAND GROUP
@click.group(help="EIM users related commands.")
@click.option('--project-name', '-p', required=False, help="Project Name", shell_complete=project_completer)
@click.option('--profile', help="Configuration profile to use", shell_complete=profile_completer)
@click.pass_context
def user(ctx, project_name, profile):
"""Group of commands related to project management."""
ctx_update(ctx, project_name, None, profile)

# LIST USERS
@user.command('list', help="List EIM users")
@click.option('--output', '-o', type=click.Choice(["json", "yaml"]), help="Specify output format, by default is json")
@click.option('--project-name', '-p', required=False, help="Project Name", shell_complete=project_completer)
@click.option('--profile', help="Configuration profile to use")
@click.pass_context
def user_list(ctx, output, project_name, profile):
"""List users"""
project_name, _, profile = ctx_update(ctx, project_name, None, profile)
login_profile(profile)

project_id = find_project_id_by_name(project_name)

data = do_request("GET", f'projects/{project_id}/eim_users')

if output:
print_output(data, output)
return

field_names = ["USER", "ACCESS KEY", "STATE", "CREATED", "EXPIRATION DATE"]
table = prettytable.PrettyTable()
table.field_names = field_names

for user in data:
access_keys = user.get("AccessKeys", [])
access_key = access_keys[0] if access_keys else {}


state = access_key.get("State", "N/A")
if state == 'ACTIVE':
state = click.style(state, fg='green')
elif state == "INACTIVE":
state = click.style(state, fg='red')


row = [
user.get("UserName"),
access_key.get("AccessKeyId", "N/A"),
state
]

if "CreationDate" in access_key:
created_at = dateutil.parser.parse(access_key.get("CreationDate"))
now = datetime.now(tz=created_at.tzinfo)
row.append(human_readable.date_time(now - created_at))
else:
row.append("N/A")

if "ExpirationDate" in access_key:
exp_at = dateutil.parser.parse(access_key.get("ExpirationDate"))
now = datetime.now(tz=exp_at.tzinfo)
row.append(human_readable.date_time(now - exp_at))
else:
row.append("N/A")

table.add_row(row)

click.echo(table)


@user.command('create', help="Create a new EIM user")
@click.option('--project-name', '-p', help="Name of project", type=click.STRING, shell_complete=project_completer)
@click.option('--output', '-o', type=click.Choice(["json", "yaml"]), help="Specify output format, by default is json")
@click.option('--profile', help="Configuration profile to use")
@click.option('--user', '-u', required=True, help="OKS User type")
@click.option('--ttl', type=click.STRING, help="TTL in human readable format (5h, 1d, 1w), by default is 1w")
@click.option('--nacl', is_flag=True, help="Use public key encryption on wire")
@click.pass_context
def user_create(ctx, project_name, output, profile, user, ttl, nacl):
"""Create a new EIM user."""
project_name, _, profile = ctx_update(ctx, project_name, None, profile)
login_profile(profile)

project_id = find_project_id_by_name(project_name)

params = {
"user": user
}
if ttl:
params["ttl"] = ttl

if nacl:
ephemeral = PrivateKey.generate()
unsealbox = SealedBox(ephemeral)

headers = {
'x-encrypt-nacl': ephemeral.public_key.encode(Base64Encoder).decode('ascii')
}

raw_data = do_request(
"POST",
f'projects/{project_id}/eim_users',
params=params,
headers=headers
)

decrypted = unsealbox.decrypt(
raw_data.get("Data").encode('ascii'),
encoder=Base64Encoder
).decode('ascii')

data = json.loads(decrypted)

# format decrypted errors the same way as the api errors.
if "Errors" in data:
response_context = raw_data.get("ResponseContext")
errors = []
for error in data.get("Errors", []):
error["Code"] = str(data.get("Code"))
errors.append(error)

raise JSONClickException(json.dumps({"Errors": errors,"ResponseContext": response_context}, separators=(",", ":")))

else:
data = do_request(
"POST",
f'projects/{project_id}/eim_users',
params=params
)

print_output(data, output)

# DELETE USER
@user.command('delete', help="Delete an EIM user")
@click.option('--project-name', '-p', required=False, help="Project name", shell_complete=project_completer)
@click.option('--user', '-u', required=True, help="User name")
@click.option('--output', '-o', type=click.Choice(["json", "yaml"]), help="Specify output format")
@click.option('--dry-run', is_flag=True, help="Run without any action")
@click.option('--force', is_flag=True, help="Force deletion without confirmation")
@click.option('--profile', help="Configuration profile to use", shell_complete=profile_completer)
@click.pass_context
def user_delete(ctx, project_name, user, output, dry_run, force, profile):
"""CLI command to delete an EIM user."""

project_name, _, profile = ctx_update(ctx, project_name, None, profile)
login_profile(profile)

project_id = find_project_id_by_name(project_name)

if dry_run:
message = {"message": f"Dry run: The user '{user}' would be deleted."}
print_output(message, output)
return

if force or click.confirm(f"Are you sure you want to delete the user '{user}'?", abort=True):
data = do_request("DELETE", f"projects/{project_id}/eim_users/{user}")
print_output(data, output)

6 changes: 6 additions & 0 deletions oks_cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ def find_response_object(data):
return response["IP"]
elif key == "Nets":
return response["Nets"]
elif key == "EimUsers":
return response["EimUsers"]
elif key == "EimUser":
return response["EimUser"]
elif key == "Data":
return response

raise click.ClickException("The API response format is incorrect.")

Expand Down
56 changes: 56 additions & 0 deletions tests/test_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from click.testing import CliRunner
from oks_cli.main import cli
from unittest.mock import patch, MagicMock

@patch("oks_cli.utils.requests.request")
def test_user_list_command(mock_request, add_default_profile):
mock_request.side_effect = [
MagicMock(status_code=200, headers = {}, json=lambda: {"ResponseContext": {}, "Projects": [{"id": "12345"}]}),
MagicMock(status_code=200, headers = {}, json=lambda: {"ResponseContext": {}, "EimUsers": []})
]

runner = CliRunner()
result = runner.invoke(cli, ["user", "list", "-p", "test"])
assert result.exit_code == 0
assert 'USER | ACCESS KEY' in result.output

@patch("oks_cli.utils.requests.request")
def test_user_create_command(mock_request, add_default_profile):
mock_request.side_effect = [
MagicMock(status_code=200, headers = {}, json=lambda: {"ResponseContext": {}, "Projects": [{"id": "12345"}]}),
MagicMock(status_code=200, headers = {}, json=lambda: {"ResponseContext": {}, "EimUser": {
"UserName": "OKSAuditor",
"CreationDate": "2026-03-04T12:31:58.000+0000",
"UserId": "BlaBla",
"UserEmail": "bla@email.local",
"LastModificationDate": "2026-03-04T12:31:58.000+0000",
"Path": "/",
"AccessKeys": [
{
"State": "ACTIVE",
"AccessKeyId": "AK",
"CreationDate": "2026-03-04T12:31:59.841+0000",
"ExpirationDate": "2026-03-11T12:31:59.297+0000",
"SecretKey": "SK",
"LastModificationDate": "2026-03-04T12:31:59.841+0000"
}
]
}})
]

runner = CliRunner()
result = runner.invoke(cli, ["user", "create", "-p", "test", "-u", "OKSAuditor", "--ttl", "1w"])
assert result.exit_code == 0
assert 'bla@email.local' in result.output

@patch("oks_cli.utils.requests.request")
def test_user_delete_command(mock_request, add_default_profile):
mock_request.side_effect = [
MagicMock(status_code=200, headers = {}, json=lambda: {"ResponseContext": {}, "Projects": [{"id": "12345"}]}),
MagicMock(status_code=200, headers = {}, json=lambda: {"ResponseContext": {}, "Details": "User has been deleted." })
]

runner = CliRunner()
result = runner.invoke(cli, ["user", "delete", "-p", "test", "-u", "OKSAuditor", "--force"])
assert result.exit_code == 0
assert 'User has been deleted.' in result.output
Loading