diff --git a/oks_cli/main.py b/oks_cli/main.py index ee88881..5a1c780 100755 --- a/oks_cli/main.py +++ b/oks_cli/main.py @@ -9,6 +9,7 @@ from .cache import cache from .quotas import quotas from .netpeering import netpeering +from .user import user from .utils import ctx_update, install_completions, profile_completer, cluster_completer, project_completer @@ -62,6 +63,7 @@ def cli(ctx, project_name, cluster_name, profile, verbose): cli.add_command(cache) cli.add_command(quotas) cli.add_command(netpeering) +cli.add_command(user) def recursive_help(cmd, parent=None): """Recursively prints help for all commands and subcommands.""" diff --git a/oks_cli/user.py b/oks_cli/user.py new file mode 100644 index 0000000..ae67ef3 --- /dev/null +++ b/oks_cli/user.py @@ -0,0 +1,169 @@ +import click +from datetime import datetime +import dateutil.parser +import human_readable +import prettytable +import json + +from nacl.public import PrivateKey, SealedBox +from nacl.encoding import Base64Encoder + +from .utils import do_request, print_output, find_project_id_by_name, ctx_update, login_profile, profile_completer, project_completer, JSONClickException + +# DEIFNE THE USER COMMAND GROUP +@click.group(help="EIM users related commands.") +@click.option('--project-name', '-p', required=False, help="Project Name", shell_complete=project_completer) +@click.option('--profile', help="Configuration profile to use", shell_complete=profile_completer) +@click.pass_context +def user(ctx, project_name, profile): + """Group of commands related to project management.""" + ctx_update(ctx, project_name, None, profile) + +# LIST USERS +@user.command('list', help="List EIM users") +@click.option('--output', '-o', type=click.Choice(["json", "yaml"]), help="Specify output format, by default is json") +@click.option('--project-name', '-p', required=False, help="Project Name", shell_complete=project_completer) +@click.option('--profile', help="Configuration profile to use") +@click.pass_context +def user_list(ctx, output, project_name, profile): + """List users""" + project_name, _, profile = ctx_update(ctx, project_name, None, profile) + login_profile(profile) + + project_id = find_project_id_by_name(project_name) + + data = do_request("GET", f'projects/{project_id}/eim_users') + + if output: + print_output(data, output) + return + + field_names = ["USER", "ACCESS KEY", "STATE", "CREATED", "EXPIRATION DATE"] + table = prettytable.PrettyTable() + table.field_names = field_names + + for user in data: + access_keys = user.get("AccessKeys", []) + access_key = access_keys[0] if access_keys else {} + + + state = access_key.get("State", "N/A") + if state == 'ACTIVE': + state = click.style(state, fg='green') + elif state == "INACTIVE": + state = click.style(state, fg='red') + + + row = [ + user.get("UserName"), + access_key.get("AccessKeyId", "N/A"), + state + ] + + if "CreationDate" in access_key: + created_at = dateutil.parser.parse(access_key.get("CreationDate")) + now = datetime.now(tz=created_at.tzinfo) + row.append(human_readable.date_time(now - created_at)) + else: + row.append("N/A") + + if "ExpirationDate" in access_key: + exp_at = dateutil.parser.parse(access_key.get("ExpirationDate")) + now = datetime.now(tz=exp_at.tzinfo) + row.append(human_readable.date_time(now - exp_at)) + else: + row.append("N/A") + + table.add_row(row) + + click.echo(table) + + +@user.command('create', help="Create a new EIM user") +@click.option('--project-name', '-p', help="Name of project", type=click.STRING, shell_complete=project_completer) +@click.option('--output', '-o', type=click.Choice(["json", "yaml"]), help="Specify output format, by default is json") +@click.option('--profile', help="Configuration profile to use") +@click.option('--user', '-u', required=True, help="OKS User type") +@click.option('--ttl', type=click.STRING, help="TTL in human readable format (5h, 1d, 1w), by default is 1w") +@click.option('--nacl', is_flag=True, help="Use public key encryption on wire") +@click.pass_context +def user_create(ctx, project_name, output, profile, user, ttl, nacl): + """Create a new EIM user.""" + project_name, _, profile = ctx_update(ctx, project_name, None, profile) + login_profile(profile) + + project_id = find_project_id_by_name(project_name) + + params = { + "user": user + } + if ttl: + params["ttl"] = ttl + + if nacl: + ephemeral = PrivateKey.generate() + unsealbox = SealedBox(ephemeral) + + headers = { + 'x-encrypt-nacl': ephemeral.public_key.encode(Base64Encoder).decode('ascii') + } + + raw_data = do_request( + "POST", + f'projects/{project_id}/eim_users', + params=params, + headers=headers + ) + + decrypted = unsealbox.decrypt( + raw_data.get("Data").encode('ascii'), + encoder=Base64Encoder + ).decode('ascii') + + data = json.loads(decrypted) + + # format decrypted errors the same way as the api errors. + if "Errors" in data: + response_context = raw_data.get("ResponseContext") + errors = [] + for error in data.get("Errors", []): + error["Code"] = str(data.get("Code")) + errors.append(error) + + raise JSONClickException(json.dumps({"Errors": errors,"ResponseContext": response_context}, separators=(",", ":"))) + + else: + data = do_request( + "POST", + f'projects/{project_id}/eim_users', + params=params + ) + + print_output(data, output) + +# DELETE USER +@user.command('delete', help="Delete an EIM user") +@click.option('--project-name', '-p', required=False, help="Project name", shell_complete=project_completer) +@click.option('--user', '-u', required=True, help="User name") +@click.option('--output', '-o', type=click.Choice(["json", "yaml"]), help="Specify output format") +@click.option('--dry-run', is_flag=True, help="Run without any action") +@click.option('--force', is_flag=True, help="Force deletion without confirmation") +@click.option('--profile', help="Configuration profile to use", shell_complete=profile_completer) +@click.pass_context +def user_delete(ctx, project_name, user, output, dry_run, force, profile): + """CLI command to delete an EIM user.""" + + project_name, _, profile = ctx_update(ctx, project_name, None, profile) + login_profile(profile) + + project_id = find_project_id_by_name(project_name) + + if dry_run: + message = {"message": f"Dry run: The user '{user}' would be deleted."} + print_output(message, output) + return + + if force or click.confirm(f"Are you sure you want to delete the user '{user}'?", abort=True): + data = do_request("DELETE", f"projects/{project_id}/eim_users/{user}") + print_output(data, output) + diff --git a/oks_cli/utils.py b/oks_cli/utils.py index 9590096..625968f 100644 --- a/oks_cli/utils.py +++ b/oks_cli/utils.py @@ -79,6 +79,12 @@ def find_response_object(data): return response["IP"] elif key == "Nets": return response["Nets"] + elif key == "EimUsers": + return response["EimUsers"] + elif key == "EimUser": + return response["EimUser"] + elif key == "Data": + return response raise click.ClickException("The API response format is incorrect.") diff --git a/tests/test_user.py b/tests/test_user.py new file mode 100644 index 0000000..e0dcb71 --- /dev/null +++ b/tests/test_user.py @@ -0,0 +1,56 @@ +from click.testing import CliRunner +from oks_cli.main import cli +from unittest.mock import patch, MagicMock + +@patch("oks_cli.utils.requests.request") +def test_user_list_command(mock_request, add_default_profile): + mock_request.side_effect = [ + MagicMock(status_code=200, headers = {}, json=lambda: {"ResponseContext": {}, "Projects": [{"id": "12345"}]}), + MagicMock(status_code=200, headers = {}, json=lambda: {"ResponseContext": {}, "EimUsers": []}) + ] + + runner = CliRunner() + result = runner.invoke(cli, ["user", "list", "-p", "test"]) + assert result.exit_code == 0 + assert 'USER | ACCESS KEY' in result.output + +@patch("oks_cli.utils.requests.request") +def test_user_create_command(mock_request, add_default_profile): + mock_request.side_effect = [ + MagicMock(status_code=200, headers = {}, json=lambda: {"ResponseContext": {}, "Projects": [{"id": "12345"}]}), + MagicMock(status_code=200, headers = {}, json=lambda: {"ResponseContext": {}, "EimUser": { + "UserName": "OKSAuditor", + "CreationDate": "2026-03-04T12:31:58.000+0000", + "UserId": "BlaBla", + "UserEmail": "bla@email.local", + "LastModificationDate": "2026-03-04T12:31:58.000+0000", + "Path": "/", + "AccessKeys": [ + { + "State": "ACTIVE", + "AccessKeyId": "AK", + "CreationDate": "2026-03-04T12:31:59.841+0000", + "ExpirationDate": "2026-03-11T12:31:59.297+0000", + "SecretKey": "SK", + "LastModificationDate": "2026-03-04T12:31:59.841+0000" + } + ] + }}) + ] + + runner = CliRunner() + result = runner.invoke(cli, ["user", "create", "-p", "test", "-u", "OKSAuditor", "--ttl", "1w"]) + assert result.exit_code == 0 + assert 'bla@email.local' in result.output + +@patch("oks_cli.utils.requests.request") +def test_user_delete_command(mock_request, add_default_profile): + mock_request.side_effect = [ + MagicMock(status_code=200, headers = {}, json=lambda: {"ResponseContext": {}, "Projects": [{"id": "12345"}]}), + MagicMock(status_code=200, headers = {}, json=lambda: {"ResponseContext": {}, "Details": "User has been deleted." }) + ] + + runner = CliRunner() + result = runner.invoke(cli, ["user", "delete", "-p", "test", "-u", "OKSAuditor", "--force"]) + assert result.exit_code == 0 + assert 'User has been deleted.' in result.output \ No newline at end of file