From ea5b6488770a9f4f583929ada3639eb1ee544467 Mon Sep 17 00:00:00 2001 From: Rithvick Reddy Munagala Date: Fri, 5 Jun 2026 13:10:06 -0400 Subject: [PATCH] [202405] Add GCU standalone wheel package (backport of #4310) Backport the GCU (Generic Config Updater) standalone wheel packaging from master PR #4310 to the 202405 branch, adapted for 202405 API compatibility (following 202412 PR #352 structure). Changes: - Add generic_config_updater/main.py: standalone GCU entry point with all apply-patch orchestration logic (validate, preprocess, scope splitting, parallel dispatch) - Add generic_config_updater/setup.py: builds sonic-gcu wheel with gcu-standalone console_scripts entry point - Add generic_config_updater/pytest.ini and .coveragerc for test/coverage - Modify field_operation_validators.py: inline DEFAULT_SUPPORTED_FECS_LIST to remove utilities_common dependency (wheel isolation) - Add comment in utilities_common/constants.py noting the duplicate - Modify azure-pipelines.yml: add GCU wheel build + publish step - Add tests/generic_config_updater/main_test.py: 88 unit tests adapted from master (removed --time and --path-trace features not in 202405) Testing performed: - Wheel build: sonic_gcu-1.0.0-py3-none-any.whl (48KB) builds successfully - Wheel install: gcu-standalone binary at /usr/local/bin/gcu-standalone - All 7 CLI sub-commands verified: apply-patch, replace, save, rollback, create-checkpoint, delete-checkpoint, list-checkpoints - All flags tested: --verbose, --dry-run, --ignore-non-yang-tables, --format, --parallel - Basic operations: add, replace, remove patches via gcu-standalone - Mixed operations: checkpoint create -> apply patch -> rollback -> delete - Error handling: missing file -> exit 1, invalid patch -> proper error - Regression: existing 'config apply-patch' CLI unaffected - Unit tests: 88/88 pass, flake8 clean - Device: str3-7800-lc3-1 (multi-ASIC Arista 7808, SONiC 202405) Signed-off-by: Rithvick Reddy Munagala --- azure-pipelines.yml | 10 + generic_config_updater/.coveragerc | 6 + .../field_operation_validators.py | 7 +- generic_config_updater/main.py | 793 +++++++++++++++ generic_config_updater/pytest.ini | 3 + generic_config_updater/setup.py | 54 + tests/generic_config_updater/main_test.py | 945 ++++++++++++++++++ utilities_common/constants.py | 4 + 8 files changed, 1821 insertions(+), 1 deletion(-) create mode 100644 generic_config_updater/.coveragerc create mode 100644 generic_config_updater/main.py create mode 100644 generic_config_updater/pytest.ini create mode 100644 generic_config_updater/setup.py create mode 100644 tests/generic_config_updater/main_test.py diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 066ad7708..3a3c47604 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -151,3 +151,13 @@ stages: - publish: '$(System.DefaultWorkingDirectory)/dist/' artifact: wheels displayName: "Publish Python wheels" + + - script: | + set -e + pushd generic_config_updater + python3 setup.py bdist_wheel + displayName: 'Build Python 3 wheel for GCU' + + - publish: '$(System.DefaultWorkingDirectory)/generic_config_updater/dist/' + artifact: gcu_wheels + displayName: "Publish Python wheels for GCU" diff --git a/generic_config_updater/.coveragerc b/generic_config_updater/.coveragerc new file mode 100644 index 000000000..d947f08ae --- /dev/null +++ b/generic_config_updater/.coveragerc @@ -0,0 +1,6 @@ +[run] +branch = True +source = generic_config_updater +omit = + .eggs/* + tests/* diff --git a/generic_config_updater/field_operation_validators.py b/generic_config_updater/field_operation_validators.py index 9227e1522..68f297dab 100644 --- a/generic_config_updater/field_operation_validators.py +++ b/generic_config_updater/field_operation_validators.py @@ -6,7 +6,12 @@ from sonic_py_common import device_info from .gu_common import GenericConfigUpdaterError from swsscommon import swsscommon -from utilities_common.constants import DEFAULT_SUPPORTED_FECS_LIST + +# Default FEC modes when STATE_DB does not advertise supported_fecs for a port. +# Kept local to avoid pulling utilities_common into the GCU wheel. +# NOTE: A duplicate of this list exists in utilities_common/constants.py. +# If you update this list, update that copy too. +DEFAULT_SUPPORTED_FECS_LIST = ['rs', 'fc', 'none', 'auto'] STATE_DB_NAME = 'STATE_DB' REDIS_TIMEOUT_MSECS = 0 diff --git a/generic_config_updater/main.py b/generic_config_updater/main.py new file mode 100644 index 000000000..ce0c1a99f --- /dev/null +++ b/generic_config_updater/main.py @@ -0,0 +1,793 @@ +""" +GCU (Generic Config Updater) — apply-patch orchestration +========================================================= + +This module is the **single source of truth** for all apply-patch logic. +It is consumed by: + +* ``config apply-patch`` (config/main.py — thin entry-point / standalone redirect) +* ``gcu-standalone`` (console_scripts entry point installed by the GCU wheel + into the GCU container virtual-env at + /opt/sonic/gcu/current/bin/gcu-standalone) + +No caller should re-implement scope extraction, parallel execution, +pre-processing, or per-scope dispatch — they should call the helpers +exposed here instead. +""" + +import copy +import json +import logging +import os +import sys +import argparse +import subprocess +import threading +import concurrent.futures + +import jsonpatch +import jsonpointer + +from generic_config_updater.generic_updater import ( + GenericUpdater, + ConfigFormat, + extract_scope, +) +from generic_config_updater.gu_common import ( + HOST_NAMESPACE, + GenericConfigUpdaterError, +) +from sonic_py_common import multi_asic +from utilities_common.general import load_db_config + +logger = logging.getLogger(__name__) + +# Constants +DEFAULT_CONFIG_DB_FILE = '/etc/sonic/config_db.json' + + +# --------------------------------------------------------------------------- +# Lightweight JSON-Patch format validation (RFC 6902) +# --------------------------------------------------------------------------- + +def validate_patch_format(patch): + """Return *True* if *patch* is a structurally valid JSON Patch list.""" + try: + if not isinstance(patch, list): + return False + for change in patch: + if not isinstance(change, dict): + return False + if 'op' not in change or 'path' not in change: + return False + if change['op'] not in ( + 'add', 'remove', 'replace', 'move', 'copy', 'test', + ): + return False + return True + except Exception: + return False + + +# --------------------------------------------------------------------------- +# Running-config retrieval +# --------------------------------------------------------------------------- + +def get_all_running_config(): + """Fetch all running configuration as a JSON string via ``show``.""" + command = ["show", "runningconfiguration", "all"] + proc = subprocess.Popen(command, text=True, stdout=subprocess.PIPE) + all_running_config, _ = proc.communicate() + if proc.returncode: + raise GenericConfigUpdaterError( + f"Fetch all runningconfiguration failed with rc={proc.returncode}" + ) + return all_running_config + + +# --------------------------------------------------------------------------- +# Patch pre-processing helpers +# --------------------------------------------------------------------------- + +def filter_duplicate_patch_operations(patch_ops, all_running_config): + """Remove leaf-list ``add`` ops that would create duplicate entries.""" + if not any(op.get("path", "").endswith("/-") for op in patch_ops): + return patch_ops + + config = ( + json.loads(all_running_config) + if isinstance(all_running_config, str) + else all_running_config + ) + + patch_copy = jsonpatch.JsonPatch([copy.deepcopy(op) for op in patch_ops]) + all_target_config = patch_copy.apply(config) + + def _find_duplicate_entries(cfg): + duplicates = {} + + def _check(obj, path=""): + if isinstance(obj, dict): + for k, v in obj.items(): + _check(v, f"{path}/{k}" if path else f"/{k}") + elif isinstance(obj, list): + seen, dups = set(), set() + for item in obj: + (dups if item in seen else seen).add(item) + if dups: + duplicates[path] = list(dups) + for idx, item in enumerate(obj): + _check(item, f"{path}[{idx}]") + + _check(cfg) + return duplicates + + dups = _find_duplicate_entries(all_target_config) + if not dups: + return patch_ops + + ops_to_remove = set() + for list_path, dup_values in dups.items(): + for op_idx, op in enumerate(patch_ops): + if ( + op.get("op") == "add" + and op.get("path", "").endswith("/-") + and op.get("path").startswith(list_path) + and op.get("value") in dup_values + ): + ops_to_remove.add(op_idx) + + return [op for idx, op in enumerate(patch_ops) if idx not in ops_to_remove] + + +def append_emptytables_if_required(patch_ops, all_running_config): + """Insert ``add`` ops for missing top-level tables before the first + reference to each table so that subsequent ops don't fail.""" + config = ( + json.loads(all_running_config) + if isinstance(all_running_config, str) + else all_running_config + ) + missing_tables = set() + patch_ops_copy = [copy.deepcopy(op) for op in patch_ops] + + for operation in patch_ops_copy: + if 'path' not in operation: + continue + path_parts = operation['path'].strip('/').split('/') + if not path_parts: + continue + + if path_parts[0].startswith('asic') or path_parts[0] == HOST_NAMESPACE: + if len(path_parts) < 2: + continue + table_path = f"/{path_parts[0]}/{path_parts[1]}" + else: + table_path = f"/{path_parts[0]}" + + try: + jsonpointer.resolve_pointer(config, table_path) + except jsonpointer.JsonPointerException: + missing_tables.add(table_path) + + if not missing_tables: + return patch_ops_copy + + for table in missing_tables: + insert_idx = None + for idx, op in enumerate(patch_ops_copy): + if 'path' in op and op['path'].startswith(table): + insert_idx = idx + break + empty_table_patch = {"op": "add", "path": table, "value": {}} + if insert_idx is not None: + patch_ops_copy.insert(insert_idx, empty_table_patch) + else: + patch_ops_copy.append(empty_table_patch) + + return patch_ops_copy + + +# --------------------------------------------------------------------------- +# Full YANG validation of a patch against running config +# --------------------------------------------------------------------------- + +def validate_patch(patch_ops, all_running_config): + """Simulate applying *patch_ops* to *all_running_config* and validate + the result against YANG models. Returns ``True`` on success. + + Raises ``GenericConfigUpdaterError`` on unexpected failures. + """ + try: + from sonic_yang_cfg_generator import SonicYangCfgDbGenerator + except ImportError: + # In environments without sonic_yang_cfg_generator (e.g. minimal + # standalone venv), skip YANG validation. + logger.warning( + "sonic_yang_cfg_generator not available; skipping YANG validation" + ) + return True + + try: + config = ( + json.loads(all_running_config) + if isinstance(all_running_config, str) + else all_running_config + ) + patch_copy = jsonpatch.JsonPatch( + [copy.deepcopy(op) for op in patch_ops] + ) + all_target_config = patch_copy.apply(config) + + target_config = ( + all_target_config.pop(HOST_NAMESPACE) + if multi_asic.is_multi_asic() + else all_target_config + ) + target_config.pop("bgpraw", None) + if not SonicYangCfgDbGenerator().validate_config_db_json( + target_config + ): + return False + + if multi_asic.is_multi_asic(): + for asic in multi_asic.get_namespace_list(): + target_config = all_target_config.pop(asic) + target_config.pop("bgpraw", None) + if not SonicYangCfgDbGenerator().validate_config_db_json( + target_config + ): + return False + + return True + except Exception as e: + raise GenericConfigUpdaterError( + f"Validate json patch: {patch_ops} failed due to: {e}" + ) + + +# --------------------------------------------------------------------------- +# Per-scope dispatch +# --------------------------------------------------------------------------- + +def apply_patch_for_scope(scope_changes, results, config_format, + verbose, dry_run, + ignore_non_yang_tables, ignore_path): + """Apply a patch for a single ASIC scope and record the outcome in + *results* (a shared dict).""" + scope, changes = scope_changes + if scope.lower() == HOST_NAMESPACE or scope == "": + scope = multi_asic.DEFAULT_NAMESPACE + + scope_for_log = scope if scope else HOST_NAMESPACE + thread_id = threading.get_ident() + logger.info( + "apply_patch_for_scope started for %s with %d changes in thread %s", + scope_for_log, len(changes), thread_id, + ) + + try: + GenericUpdater(scope=scope).apply_patch( + jsonpatch.JsonPatch(changes), + config_format, + verbose, + dry_run, + ignore_non_yang_tables, + ignore_path, + ) + results[scope_for_log] = {"success": True, "message": "Success"} + logger.info("apply-patch succeeded for %s", scope_for_log) + except Exception as e: + results[scope_for_log] = {"success": False, "message": str(e)} + logger.error("apply-patch failed for %s: %s", scope_for_log, e) + + +def _apply_patch_wrapper(args): + """Thin wrapper so ``ThreadPoolExecutor.submit`` can unpack a tuple.""" + return apply_patch_for_scope(*args) + + +# --------------------------------------------------------------------------- +# Top-level apply-patch orchestrator +# --------------------------------------------------------------------------- + +def apply_patch_from_file(patch_file_path, config_format_name, verbose, + dry_run, parallel, ignore_non_yang_tables, + ignore_path, preprocess=True): + """Read a JSON-Patch file and apply it — the single implementation + used by all entry points. + + Parameters + ---------- + patch_file_path : str + Path to the JSON-Patch file. + config_format_name : str + ``"CONFIGDB"`` or ``"SONICYANG"``. + verbose : bool + dry_run : bool + parallel : bool + If *True*, apply per-ASIC changes in parallel threads. + ignore_non_yang_tables : bool + ignore_path : tuple/list of str + preprocess : bool + When *True* (default), fetch running config and run + ``append_emptytables_if_required``, ``filter_duplicate_patch_operations`` + and ``validate_patch``. Callers that already performed these steps + (or intentionally want to skip them) can pass *False*. + + Raises + ------ + GenericConfigUpdaterError + On validation failure or any per-scope failure. + """ + # 1. Read & validate patch file + with open(patch_file_path, 'r') as fh: + patch_json = json.loads(fh.read()) + + if not validate_patch_format(patch_json): + raise GenericConfigUpdaterError( + f"Invalid patch format in file: {patch_file_path}" + ) + + patch_ops = patch_json + config_format = ConfigFormat[config_format_name.upper()] + + # 2. Optional pre-processing (running-config fetch + YANG validation) + if preprocess: + all_running_config = get_all_running_config() + patch_ops = append_emptytables_if_required( + patch_ops, all_running_config + ) + patch_ops = filter_duplicate_patch_operations( + patch_ops, all_running_config + ) + if not validate_patch(patch_ops, all_running_config): + raise GenericConfigUpdaterError( + f"Failed validating patch: {patch_ops}" + ) + + # 3. Build a JsonPatch and split by scope + patch = jsonpatch.JsonPatch(patch_ops) + changes_by_scope = {} + + for change in patch: + scope, modified_path = extract_scope(change["path"]) + change["path"] = modified_path + changes_by_scope.setdefault(scope, []).append(change) + + # Empty case — still force YANG validation per scope + if not changes_by_scope: + asic_list = [multi_asic.DEFAULT_NAMESPACE] + if multi_asic.is_multi_asic(): + asic_list.extend(multi_asic.get_namespace_list()) + for asic in asic_list: + changes_by_scope[asic] = [] + + # 4. Dispatch + results = {} + if parallel: + with concurrent.futures.ThreadPoolExecutor() as executor: + arguments = [ + (sc, results, config_format, verbose, dry_run, + ignore_non_yang_tables, ignore_path) + for sc in changes_by_scope.items() + ] + futures = [ + executor.submit(_apply_patch_wrapper, arg) + for arg in arguments + ] + concurrent.futures.wait(futures) + else: + for scope_changes in changes_by_scope.items(): + apply_patch_for_scope( + scope_changes, results, config_format, + verbose, dry_run, ignore_non_yang_tables, ignore_path, + ) + + # 5. Aggregate results + failures = [s for s, r in results.items() if not r['success']] + if failures: + msgs = '\n'.join( + f"- {s}: {results[s]['message']}" for s in failures + ) + raise GenericConfigUpdaterError( + f"Failed to apply patch on the following scopes:\n{msgs}" + ) + + +# --------------------------------------------------------------------------- +# Helper utilities (used by the gcu-standalone entry point) +# --------------------------------------------------------------------------- + +def multiasic_save_to_singlefile(filename): + """Save all ASIC configurations to a single file in multi-asic mode.""" + all_configs = {} + + # Get host configuration + cmd = ["sonic-cfggen", "-d", "--print-data"] + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + host_config = json.loads(result.stdout) + all_configs['localhost'] = host_config + + # Get each ASIC configuration + for namespace in multi_asic.get_namespace_list(): + cmd = ["sonic-cfggen", "-d", "--print-data", "-n", namespace] + result = subprocess.run( + cmd, capture_output=True, text=True, check=True + ) + asic_config = json.loads(result.stdout) + all_configs[namespace] = asic_config + + # Save to file + with open(filename, 'w') as f: + json.dump(all_configs, f, indent=2) + + +def print_error(message): + """Print error message to stderr.""" + print(f"Error: {message}", file=sys.stderr) + + +def print_success(message): + """Print success message.""" + print(message) + + +# --------------------------------------------------------------------------- +# Sub-command implementations (used by gcu-standalone) +# --------------------------------------------------------------------------- + +def create_checkpoint(args): + """Create a checkpoint of the current configuration.""" + try: + if args.verbose: + print(f"Creating checkpoint: {args.checkpoint_name}") + + updater = GenericUpdater() + updater.checkpoint(args.checkpoint_name, args.verbose) + + print_success( + f"Checkpoint '{args.checkpoint_name}' created successfully." + ) + except Exception as ex: + print_error( + f"Failed to create checkpoint '{args.checkpoint_name}': {ex}" + ) + sys.exit(1) + + +def delete_checkpoint(args): + """Delete a checkpoint.""" + try: + if args.verbose: + print(f"Deleting checkpoint: {args.checkpoint_name}") + + updater = GenericUpdater() + updater.delete_checkpoint(args.checkpoint_name, args.verbose) + + print_success( + f"Checkpoint '{args.checkpoint_name}' deleted successfully." + ) + except Exception as ex: + print_error( + f"Failed to delete checkpoint '{args.checkpoint_name}': {ex}" + ) + sys.exit(1) + + +def list_checkpoints(args): + """List all available checkpoints.""" + try: + updater = GenericUpdater() + checkpoints = updater.list_checkpoints(args.verbose) + + if not checkpoints: + print("No checkpoints found.") + return + + print("Available checkpoints:") + for checkpoint in checkpoints: + print(f" - {checkpoint}") + except Exception as ex: + print_error(f"Failed to list checkpoints: {ex}") + sys.exit(1) + + +def apply_patch(args): + """Apply a configuration patch — delegates to apply_patch_from_file.""" + try: + if args.verbose: + print(f"Applying patch from: {args.patch_file}") + print(f"Format: {args.format}") + if args.dry_run: + print("** DRY RUN EXECUTION **") + + apply_patch_from_file( + patch_file_path=args.patch_file, + config_format_name=args.format, + verbose=args.verbose, + dry_run=args.dry_run, + parallel=args.parallel, + ignore_non_yang_tables=args.ignore_non_yang_tables, + ignore_path=args.ignore_path, + preprocess=True, + ) + + print_success("Patch applied successfully.") + except Exception as ex: + print_error(f"Failed to apply patch: {ex}") + sys.exit(1) + + +def replace_config(args): + """Replace the entire configuration with a new configuration.""" + try: + if args.verbose: + print(f"Replacing configuration from: {args.config_file}") + print(f"Format: {args.format}") + + with open(args.config_file, 'r') as f: + target_config = json.loads(f.read()) + + config_format = ConfigFormat[args.format.upper()] + updater = GenericUpdater() + updater.replace( + target_config, config_format, args.verbose, False, + args.ignore_non_yang_tables, args.ignore_path, + ) + + print_success("Configuration replaced successfully.") + except Exception as ex: + print_error(f"Failed to replace configuration: {ex}") + sys.exit(1) + + +def save_config(args): + """Save the current configuration to a file.""" + try: + filename = args.filename if args.filename else DEFAULT_CONFIG_DB_FILE + + if args.verbose: + print(f"Saving configuration to: {filename}") + + if multi_asic.is_multi_asic(): + multiasic_save_to_singlefile(filename) + else: + cmd = ["sonic-cfggen", "-d", "--print-data"] + result = subprocess.run( + cmd, capture_output=True, text=True, check=True + ) + config_to_save = json.loads(result.stdout) + with open(filename, 'w') as f: + json.dump(config_to_save, f, indent=2) + + print_success(f"Configuration saved successfully to '{filename}'.") + except subprocess.CalledProcessError as e: + print_error(f"Failed to get current configuration: {e}") + sys.exit(1) + except Exception as ex: + print_error(f"Failed to save configuration: {ex}") + sys.exit(1) + + +def rollback_config(args): + """Rollback configuration to a checkpoint.""" + try: + if args.verbose: + print(f"Rolling back to checkpoint: {args.checkpoint_name}") + + updater = GenericUpdater() + updater.rollback( + args.checkpoint_name, args.verbose, False, + args.ignore_non_yang_tables, args.ignore_path, + ) + + print_success( + f"Configuration rolled back to " + f"'{args.checkpoint_name}' successfully." + ) + except Exception as ex: + print_error( + f"Failed to rollback to checkpoint " + f"'{args.checkpoint_name}': {ex}" + ) + sys.exit(1) + + +# --------------------------------------------------------------------------- +# Argument parser (shared by gcu-standalone entry point) +# --------------------------------------------------------------------------- + +def build_parser(): + """Build and return the argument parser.""" + parser = argparse.ArgumentParser( + description=( + 'GCU - Generic Config Updater for SONiC configuration management' + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s create-checkpoint my-checkpoint + %(prog)s apply-patch patch.json + %(prog)s apply-patch patch.json --dry-run + %(prog)s replace config.json + %(prog)s save backup.json + """, + ) + + subparsers = parser.add_subparsers( + dest='command', help='Available commands' + ) + + # ---- create-checkpoint ---- + p = subparsers.add_parser( + 'create-checkpoint', + help='Create a checkpoint of the current configuration', + ) + p.add_argument('checkpoint_name', help='Name for the checkpoint') + p.add_argument( + '-v', '--verbose', action='store_true', + help='Print additional details', + ) + + # ---- delete-checkpoint ---- + p = subparsers.add_parser( + 'delete-checkpoint', help='Delete a checkpoint', + ) + p.add_argument( + 'checkpoint_name', help='Name of the checkpoint to delete', + ) + p.add_argument( + '-v', '--verbose', action='store_true', + help='Print additional details', + ) + + # ---- list-checkpoints ---- + p = subparsers.add_parser( + 'list-checkpoints', help='List all available checkpoints', + ) + p.add_argument( + '-v', '--verbose', action='store_true', + help='Print additional details', + ) + + # ---- apply-patch ---- + p = subparsers.add_parser( + 'apply-patch', help='Apply a configuration patch', + ) + p.add_argument('patch_file', help='Path to the JSON patch file') + p.add_argument( + '-f', '--format', choices=['CONFIGDB', 'SONICYANG'], + default='CONFIGDB', + help='Format of the patch file (default: CONFIGDB)', + ) + p.add_argument( + '-v', '--verbose', action='store_true', + help='Print additional details', + ) + p.add_argument( + '-d', '--dry-run', action='store_true', default=False, + help='Test out the command without affecting config state', + ) + p.add_argument( + '-p', '--parallel', action='store_true', + help='Apply changes to all ASICs in parallel (multi-asic only)', + ) + p.add_argument( + '-n', '--ignore-non-yang-tables', action='store_true', + help='Ignore validation for tables without YANG models', + ) + p.add_argument( + '-i', '--ignore-path', action='append', default=[], + help='Ignore validation for config specified by given path ' + '(JsonPointer)', + ) + + # ---- replace ---- + p = subparsers.add_parser( + 'replace', help='Replace the entire configuration', + ) + p.add_argument('config_file', help='Path to the configuration file') + p.add_argument( + '-f', '--format', choices=['CONFIGDB', 'SONICYANG'], + default='CONFIGDB', + help='Format of the configuration file (default: CONFIGDB)', + ) + p.add_argument( + '-v', '--verbose', action='store_true', + help='Print additional details', + ) + p.add_argument( + '-n', '--ignore-non-yang-tables', action='store_true', + help='Ignore validation for tables without YANG models', + ) + p.add_argument( + '-i', '--ignore-path', action='append', default=[], + help='Ignore validation for config specified by given path ' + '(JsonPointer)', + ) + + # ---- save ---- + p = subparsers.add_parser( + 'save', help='Save the current configuration to a file', + ) + p.add_argument( + 'filename', nargs='?', + help=f'Output filename (default: {DEFAULT_CONFIG_DB_FILE})', + ) + p.add_argument( + '-v', '--verbose', action='store_true', + help='Print additional details', + ) + + # ---- rollback ---- + p = subparsers.add_parser( + 'rollback', help='Rollback configuration to a checkpoint', + ) + p.add_argument( + 'checkpoint_name', + help='Name of the checkpoint to rollback to', + ) + p.add_argument( + '-v', '--verbose', action='store_true', + help='Print additional details', + ) + p.add_argument( + '-n', '--ignore-non-yang-tables', action='store_true', + help='Ignore validation for tables without YANG models', + ) + p.add_argument( + '-i', '--ignore-path', action='append', default=[], + help='Ignore validation for config specified by given path ' + '(JsonPointer)', + ) + + return parser + + +# --------------------------------------------------------------------------- +# Main entry point (used by gcu-standalone console_scripts) +# --------------------------------------------------------------------------- + +def main(): + """Main entry point for the gcu-standalone console script.""" + load_db_config() + + parser = build_parser() + args = parser.parse_args() + + if not args.command: + parser.print_help() + return + + # Validate file paths if provided + if hasattr(args, 'patch_file') and args.patch_file: + if not os.path.exists(args.patch_file): + print_error(f"Patch file not found: {args.patch_file}") + sys.exit(1) + + if hasattr(args, 'config_file') and args.config_file: + if not os.path.exists(args.config_file): + print_error(f"Config file not found: {args.config_file}") + sys.exit(1) + + command_functions = { + 'create-checkpoint': create_checkpoint, + 'delete-checkpoint': delete_checkpoint, + 'list-checkpoints': list_checkpoints, + 'apply-patch': apply_patch, + 'replace': replace_config, + 'save': save_config, + 'rollback': rollback_config, + } + + if args.command in command_functions: + command_functions[args.command](args) + else: + print_error(f"Unknown command: {args.command}") + parser.print_help() + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/generic_config_updater/pytest.ini b/generic_config_updater/pytest.ini new file mode 100644 index 000000000..15ae3d126 --- /dev/null +++ b/generic_config_updater/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +addopts = --cov-config=.coveragerc --cov --cov-report html --cov-report term --cov-report xml --junitxml=test-results.xml -vv +testpaths = ../tests/generic_config_updater diff --git a/generic_config_updater/setup.py b/generic_config_updater/setup.py new file mode 100644 index 000000000..3be16848c --- /dev/null +++ b/generic_config_updater/setup.py @@ -0,0 +1,54 @@ +# generic_config_updater/ — Standalone GCU wheel build context +# +# This setup.py builds the 'sonic-gcu' Python wheel, published independently +# from the main 'sonic-utilities' wheel. The gcu-standalone binary installed +# from this wheel is placed at /opt/sonic/gcu/current/bin/gcu-standalone and +# allows the GCU container to deliver fixes to generic_config_updater without +# touching the host sonic-utilities package. +# +# pytest.ini and .coveragerc in this directory configure test runs scoped to +# GCU only (see azure-pipelines.yml 'Build Python 3 wheel for GCU' step). +# +# Dependencies are intentionally absent from install_requires: the GCU venv is +# created with --system-site-packages and the wheel is installed with --no-deps, +# so all runtime dependencies (SONiC packages and third-party libs) are +# inherited from the host SONiC environment. +from setuptools import setup + + +setup( + name='sonic-gcu', + version='1.0.0', + description='GCU package for SONiC', + license='Apache 2.0', + author='SONiC Team', + author_email='linuxnetdev@microsoft.com', + url='https://github.com/sonic-net/sonic-utilities/generic_config_updater', + maintainer='Rithvick Reddy Munagala', + maintainer_email='rimunagala@microsoft.com', + package_dir={'generic_config_updater': '.'}, + packages=[ + 'generic_config_updater', + ], + package_data={ + 'generic_config_updater': ['gcu_services_validator.conf.json', 'gcu_field_operation_validators.conf.json'] + }, + entry_points={ + 'console_scripts': [ + 'gcu-standalone=generic_config_updater.main:main', + ] + }, + classifiers=[ + 'Development Status :: 3 - Alpha', + 'Environment :: Console', + 'Intended Audience :: Developers', + 'Intended Audience :: Information Technology', + 'Intended Audience :: System Administrators', + 'License :: OSI Approved :: Apache Software License', + 'Natural Language :: English', + 'Operating System :: POSIX :: Linux', + 'Programming Language :: Python :: 3.7', + 'Topic :: Utilities', + ], + keywords='SONiC GCU package' +) diff --git a/tests/generic_config_updater/main_test.py b/tests/generic_config_updater/main_test.py new file mode 100644 index 000000000..f7ad7ab1d --- /dev/null +++ b/tests/generic_config_updater/main_test.py @@ -0,0 +1,945 @@ +import io +import json +import os +import subprocess +import sys +import unittest +from argparse import Namespace +from unittest import mock + +# Make sure the repo root is on the path +_TEST_DIR = os.path.dirname(os.path.abspath(__file__)) +_ROOT_DIR = os.path.dirname(os.path.dirname(_TEST_DIR)) +sys.path.insert(0, _ROOT_DIR) + +from generic_config_updater.generic_updater import ConfigFormat # noqa: E402 +from generic_config_updater.gu_common import GenericConfigUpdaterError # noqa: E402 +import generic_config_updater.main as gcu_main # noqa: E402 + + +# --------------------------------------------------------------------------- +# validate_patch_format +# --------------------------------------------------------------------------- + +class TestValidatePatchFormat(unittest.TestCase): + + def test_valid_patch_returns_true(self): + patch = [{"op": "add", "path": "/TABLE/key", "value": "v"}] + self.assertTrue(gcu_main.validate_patch_format(patch)) + + def test_valid_all_ops(self): + for op in ("add", "remove", "replace", "move", "copy", "test"): + patch = [{"op": op, "path": "/X"}] + self.assertTrue(gcu_main.validate_patch_format(patch)) + + def test_empty_list_is_valid(self): + self.assertTrue(gcu_main.validate_patch_format([])) + + def test_not_a_list_returns_false(self): + self.assertFalse(gcu_main.validate_patch_format({"op": "add", "path": "/X"})) + + def test_item_not_dict_returns_false(self): + self.assertFalse(gcu_main.validate_patch_format(["not_a_dict"])) + + def test_missing_op_returns_false(self): + self.assertFalse(gcu_main.validate_patch_format([{"path": "/X"}])) + + def test_missing_path_returns_false(self): + self.assertFalse(gcu_main.validate_patch_format([{"op": "add"}])) + + def test_invalid_op_returns_false(self): + self.assertFalse( + gcu_main.validate_patch_format([{"op": "invalid_op", "path": "/X"}]) + ) + + def test_none_input_returns_false(self): + self.assertFalse(gcu_main.validate_patch_format(None)) + + def test_multiple_valid_changes(self): + patch = [ + {"op": "add", "path": "/A", "value": {}}, + {"op": "remove", "path": "/B"}, + {"op": "replace", "path": "/C", "value": 1}, + ] + self.assertTrue(gcu_main.validate_patch_format(patch)) + + def test_one_invalid_in_list_returns_false(self): + patch = [ + {"op": "add", "path": "/A", "value": {}}, + {"op": "BAD", "path": "/B"}, + ] + self.assertFalse(gcu_main.validate_patch_format(patch)) + + +# --------------------------------------------------------------------------- +# get_all_running_config +# --------------------------------------------------------------------------- + +class TestGetAllRunningConfig(unittest.TestCase): + + def _make_popen(self, stdout, returncode): + proc = mock.Mock() + proc.communicate.return_value = (stdout, None) + proc.returncode = returncode + return proc + + def test_success_returns_config_string(self): + cfg = '{"PORT": {}}' + with mock.patch('subprocess.Popen', return_value=self._make_popen(cfg, 0)): + result = gcu_main.get_all_running_config() + self.assertEqual(result, cfg) + + def test_nonzero_returncode_raises(self): + with mock.patch('subprocess.Popen', + return_value=self._make_popen('', 1)): + with self.assertRaises(GenericConfigUpdaterError): + gcu_main.get_all_running_config() + + +# --------------------------------------------------------------------------- +# filter_duplicate_patch_operations +# --------------------------------------------------------------------------- + +class TestFilterDuplicatePatchOperations(unittest.TestCase): + + def test_no_leaf_list_ops_returned_unchanged(self): + patch_ops = [{"op": "add", "path": "/TABLE/key", "value": "v"}] + config = {} + result = gcu_main.filter_duplicate_patch_operations(patch_ops, json.dumps(config)) + self.assertEqual(result, patch_ops) + + def test_removes_duplicate_leaf_list_add(self): + config = {"ACL_TABLE": {"MY_ACL": {"ports": ["Eth0", "Eth1"]}}} + patch_ops = [ + {"op": "add", "path": "/ACL_TABLE/MY_ACL/ports/-", "value": "Eth0"}, + {"op": "add", "path": "/ACL_TABLE/MY_ACL/ports/-", "value": "Eth2"}, + ] + result = gcu_main.filter_duplicate_patch_operations(patch_ops, json.dumps(config)) + paths_values = [(op["path"], op["value"]) for op in result] + self.assertNotIn(("/ACL_TABLE/MY_ACL/ports/-", "Eth0"), paths_values) + self.assertIn(("/ACL_TABLE/MY_ACL/ports/-", "Eth2"), paths_values) + + def test_no_duplicates_nothing_removed(self): + config = {"ACL_TABLE": {"MY_ACL": {"ports": []}}} + patch_ops = [ + {"op": "add", "path": "/ACL_TABLE/MY_ACL/ports/-", "value": "Eth0"}, + {"op": "add", "path": "/ACL_TABLE/MY_ACL/ports/-", "value": "Eth1"}, + ] + result = gcu_main.filter_duplicate_patch_operations(patch_ops, json.dumps(config)) + self.assertEqual(len(result), 2) + + def test_accepts_dict_config(self): + config = {"ACL_TABLE": {"MY_ACL": {"ports": ["Eth0"]}}} + patch_ops = [ + {"op": "add", "path": "/ACL_TABLE/MY_ACL/ports/-", "value": "Eth0"}, + ] + result = gcu_main.filter_duplicate_patch_operations(patch_ops, config) + self.assertEqual(len(result), 0) + + +# --------------------------------------------------------------------------- +# append_emptytables_if_required +# --------------------------------------------------------------------------- + +class TestAppendEmptyTablesIfRequired(unittest.TestCase): + + def test_no_missing_tables_returned_unchanged(self): + config = {"TABLE1": {}} + patch_ops = [{"op": "add", "path": "/TABLE1/key", "value": "v"}] + result = gcu_main.append_emptytables_if_required(patch_ops, json.dumps(config)) + self.assertEqual(result, patch_ops) + + def test_missing_table_prepended(self): + config = {} + patch_ops = [{"op": "add", "path": "/TABLE1/key", "value": "v"}] + result = gcu_main.append_emptytables_if_required(patch_ops, json.dumps(config)) + self.assertEqual(result[0], {"op": "add", "path": "/TABLE1", "value": {}}) + self.assertEqual(result[1], patch_ops[0]) + + def test_multiple_missing_tables(self): + config = {} + patch_ops = [ + {"op": "add", "path": "/TABLE1/field", "value": "v1"}, + {"op": "add", "path": "/TABLE2/field", "value": "v2"}, + ] + result = gcu_main.append_emptytables_if_required(patch_ops, json.dumps(config)) + created_paths = [op["path"] for op in result if op.get("value") == {}] + self.assertIn("/TABLE1", created_paths) + self.assertIn("/TABLE2", created_paths) + + def test_accepts_dict_config(self): + config = {} + patch_ops = [{"op": "add", "path": "/TABLE1/key", "value": "v"}] + result = gcu_main.append_emptytables_if_required(patch_ops, config) + self.assertEqual(result[0]["path"], "/TABLE1") + + def test_op_without_path_skipped(self): + config = {} + patch_ops = [{"op": "add", "value": "v"}] + result = gcu_main.append_emptytables_if_required(patch_ops, config) + # Should not crash, just return the ops with no empty table inserted + self.assertEqual(len(result), 1) + + def test_empty_path_parts_skipped(self): + config = {} + patch_ops = [{"op": "add", "path": "/", "value": "v"}] + # Should not raise + result = gcu_main.append_emptytables_if_required(patch_ops, config) + self.assertIsInstance(result, list) + + def test_asic_scoped_table_path(self): + """Paths starting with asic0/TABLE should resolve two-level pointer.""" + config = {"asic0": {}} + patch_ops = [{"op": "add", "path": "/asic0/NEW_TABLE/key", "value": "v"}] + result = gcu_main.append_emptytables_if_required(patch_ops, json.dumps(config)) + created_paths = [op["path"] for op in result if op.get("value") == {}] + self.assertIn("/asic0/NEW_TABLE", created_paths) + + +# --------------------------------------------------------------------------- +# validate_patch +# --------------------------------------------------------------------------- + +class TestValidatePatch(unittest.TestCase): + + def _simple_ops(self): + return [{"op": "add", "path": "/TABLE/key", "value": "v"}] + + def test_returns_true_when_yang_not_available(self): + """When sonic_yang_cfg_generator is not importable, validation is skipped.""" + with mock.patch.dict('sys.modules', {'sonic_yang_cfg_generator': None}): + result = gcu_main.validate_patch([], json.dumps({})) + self.assertTrue(result) + + def test_returns_true_on_valid_config(self): + mock_generator = mock.Mock() + mock_generator.validate_config_db_json.return_value = True + mock_module = mock.Mock() + mock_module.SonicYangCfgDbGenerator.return_value = mock_generator + + with mock.patch.dict('sys.modules', {'sonic_yang_cfg_generator': mock_module}): + with mock.patch('sonic_py_common.multi_asic.is_multi_asic', return_value=False): + result = gcu_main.validate_patch([], json.dumps({})) + self.assertTrue(result) + + def test_returns_false_when_validation_fails(self): + mock_generator = mock.Mock() + mock_generator.validate_config_db_json.return_value = False + mock_module = mock.Mock() + mock_module.SonicYangCfgDbGenerator.return_value = mock_generator + + with mock.patch.dict('sys.modules', {'sonic_yang_cfg_generator': mock_module}): + with mock.patch('sonic_py_common.multi_asic.is_multi_asic', return_value=False): + result = gcu_main.validate_patch([], json.dumps({})) + self.assertFalse(result) + + def test_raises_on_unexpected_exception(self): + mock_module = mock.Mock() + mock_module.SonicYangCfgDbGenerator.side_effect = RuntimeError("boom") + + with mock.patch.dict('sys.modules', {'sonic_yang_cfg_generator': mock_module}): + with self.assertRaises(GenericConfigUpdaterError): + gcu_main.validate_patch([], json.dumps({})) + + def test_multiasic_validates_all_asics(self): + mock_generator = mock.Mock() + mock_generator.validate_config_db_json.return_value = True + mock_module = mock.Mock() + mock_module.SonicYangCfgDbGenerator.return_value = mock_generator + + config = {"localhost": {}, "asic0": {}, "asic1": {}} + + with mock.patch.dict('sys.modules', {'sonic_yang_cfg_generator': mock_module}): + with mock.patch('sonic_py_common.multi_asic.is_multi_asic', return_value=True): + with mock.patch('sonic_py_common.multi_asic.get_namespace_list', + return_value=['asic0', 'asic1']): + result = gcu_main.validate_patch([], json.dumps(config)) + self.assertTrue(result) + # Called once per host + once per asic + self.assertEqual(mock_generator.validate_config_db_json.call_count, 3) + + def test_multiasic_returns_false_when_asic_fails(self): + call_count = [0] + + def side_effect(_cfg): + call_count[0] += 1 + # Fail on the second call (first asic) + return call_count[0] != 2 + + mock_generator = mock.Mock() + mock_generator.validate_config_db_json.side_effect = side_effect + mock_module = mock.Mock() + mock_module.SonicYangCfgDbGenerator.return_value = mock_generator + + config = {"localhost": {}, "asic0": {}} + + with mock.patch.dict('sys.modules', {'sonic_yang_cfg_generator': mock_module}): + with mock.patch('sonic_py_common.multi_asic.is_multi_asic', return_value=True): + with mock.patch('sonic_py_common.multi_asic.get_namespace_list', + return_value=['asic0']): + result = gcu_main.validate_patch([], json.dumps(config)) + self.assertFalse(result) + + +# --------------------------------------------------------------------------- +# apply_patch_for_scope +# --------------------------------------------------------------------------- + +class TestApplyPatchForScope(unittest.TestCase): + + def test_success_records_success(self): + mock_updater = mock.Mock() + results = {} + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + gcu_main.apply_patch_for_scope( + ("", [{"op": "add", "path": "/T/k", "value": "v"}]), + results, ConfigFormat.CONFIGDB, False, False, False, () + ) + self.assertTrue(results[gcu_main.HOST_NAMESPACE]["success"]) + + def test_exception_records_failure(self): + mock_updater = mock.Mock() + mock_updater.apply_patch.side_effect = Exception("scope error") + results = {} + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + gcu_main.apply_patch_for_scope( + ("", [{"op": "add", "path": "/T/k", "value": "v"}]), + results, ConfigFormat.CONFIGDB, False, False, False, () + ) + key = list(results.keys())[0] + self.assertFalse(results[key]["success"]) + self.assertIn("scope error", results[key]["message"]) + + def test_host_namespace_scope_mapping(self): + mock_updater = mock.Mock() + results = {} + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + gcu_main.apply_patch_for_scope( + (gcu_main.HOST_NAMESPACE, []), + results, ConfigFormat.CONFIGDB, False, False, False, () + ) + # HOST_NAMESPACE scope should map correctly + self.assertIn(gcu_main.HOST_NAMESPACE, results) + + +# --------------------------------------------------------------------------- +# apply_patch_from_file +# --------------------------------------------------------------------------- + +class TestApplyPatchFromFile(unittest.TestCase): + + def _make_patch_file(self, patch_ops): + return mock.mock_open(read_data=json.dumps(patch_ops)) + + def test_invalid_format_raises(self): + bad_patch = {"op": "add"} # not a list + with mock.patch('builtins.open', mock.mock_open(read_data=json.dumps(bad_patch))): + with self.assertRaises(GenericConfigUpdaterError): + gcu_main.apply_patch_from_file( + '/fake/patch.json', 'CONFIGDB', + False, False, False, False, () + ) + + def test_success_no_preprocess(self): + patch_ops = [{"op": "add", "path": "/TABLE/key", "value": "v"}] + mock_updater = mock.Mock() + with mock.patch('builtins.open', self._make_patch_file(patch_ops)): + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + with mock.patch('generic_config_updater.main.extract_scope', + return_value=('', '/TABLE/key')): + gcu_main.apply_patch_from_file( + '/fake/patch.json', 'CONFIGDB', + False, False, False, False, (), preprocess=False + ) + mock_updater.apply_patch.assert_called_once() + + def test_preprocess_path_runs_helpers(self): + patch_ops = [{"op": "add", "path": "/TABLE/key", "value": "v"}] + running_cfg = json.dumps({"TABLE": {}}) + mock_updater = mock.Mock() + + with mock.patch('builtins.open', self._make_patch_file(patch_ops)): + with mock.patch('sonic_py_common.multi_asic.is_multi_asic', return_value=False): + with mock.patch('generic_config_updater.main.get_all_running_config', + return_value=running_cfg): + with mock.patch('generic_config_updater.main.append_emptytables_if_required', + return_value=patch_ops) as mock_append: + with mock.patch('generic_config_updater.main.filter_duplicate_patch_operations', + return_value=patch_ops) as mock_filter: + with mock.patch('generic_config_updater.main.validate_patch', + return_value=True) as mock_validate: + with mock.patch('generic_config_updater.main.GenericUpdater', + return_value=mock_updater): + gcu_main.apply_patch_from_file( + '/fake/patch.json', 'CONFIGDB', + False, False, False, False, (), preprocess=True + ) + mock_append.assert_called_once() + mock_filter.assert_called_once() + mock_validate.assert_called_once() + + def test_preprocess_validation_failure_raises(self): + patch_ops = [{"op": "add", "path": "/TABLE/key", "value": "v"}] + running_cfg = json.dumps({}) + + with mock.patch('builtins.open', self._make_patch_file(patch_ops)): + with mock.patch('generic_config_updater.main.get_all_running_config', + return_value=running_cfg): + with mock.patch('generic_config_updater.main.append_emptytables_if_required', + return_value=patch_ops): + with mock.patch('generic_config_updater.main.filter_duplicate_patch_operations', + return_value=patch_ops): + with mock.patch('generic_config_updater.main.validate_patch', + return_value=False): + with self.assertRaises(GenericConfigUpdaterError): + gcu_main.apply_patch_from_file( + '/fake/patch.json', 'CONFIGDB', + False, False, False, False, (), preprocess=True + ) + + def test_parallel_dispatches_with_threadpool(self): + patch_ops = [{"op": "add", "path": "/TABLE/key", "value": "v"}] + mock_updater = mock.Mock() + + with mock.patch('builtins.open', self._make_patch_file(patch_ops)): + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + with mock.patch('generic_config_updater.main.extract_scope', + return_value=('', '/TABLE/key')): + gcu_main.apply_patch_from_file( + '/fake/patch.json', 'CONFIGDB', + False, False, True, False, (), preprocess=False + ) + mock_updater.apply_patch.assert_called_once() + + def test_scope_failure_raises(self): + patch_ops = [{"op": "add", "path": "/TABLE/key", "value": "v"}] + mock_updater = mock.Mock() + mock_updater.apply_patch.side_effect = Exception("boom") + + with mock.patch('builtins.open', self._make_patch_file(patch_ops)): + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + with mock.patch('generic_config_updater.main.extract_scope', + return_value=('', '/TABLE/key')): + with self.assertRaises(GenericConfigUpdaterError) as ctx: + gcu_main.apply_patch_from_file( + '/fake/patch.json', 'CONFIGDB', + False, False, False, False, (), preprocess=False + ) + self.assertIn("Failed to apply patch", str(ctx.exception)) + + def test_empty_patch_still_validates(self): + """Empty patch ops triggers per-asic validation loop.""" + patch_ops = [] + mock_updater = mock.Mock() + + with mock.patch('builtins.open', self._make_patch_file(patch_ops)): + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + with mock.patch('sonic_py_common.multi_asic.is_multi_asic', return_value=False): + gcu_main.apply_patch_from_file( + '/fake/patch.json', 'CONFIGDB', + False, False, False, False, (), preprocess=False + ) + mock_updater.apply_patch.assert_called_once() + + def test_empty_patch_multiasic(self): + """Empty patch in multiasic triggers all asic namespaces.""" + patch_ops = [] + mock_updater = mock.Mock() + + with mock.patch('builtins.open', self._make_patch_file(patch_ops)): + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + with mock.patch('sonic_py_common.multi_asic.is_multi_asic', return_value=True): + with mock.patch('sonic_py_common.multi_asic.get_namespace_list', + return_value=['asic0', 'asic1']): + gcu_main.apply_patch_from_file( + '/fake/patch.json', 'CONFIGDB', + False, False, False, False, (), preprocess=False + ) + self.assertEqual(mock_updater.apply_patch.call_count, 3) # default + asic0 + asic1 + + +# --------------------------------------------------------------------------- +# print_error / print_success +# --------------------------------------------------------------------------- + +class TestPrintHelpers(unittest.TestCase): + + def test_print_error_writes_to_stderr(self): + captured = io.StringIO() + with mock.patch('sys.stderr', captured): + gcu_main.print_error("something went wrong") + self.assertIn("something went wrong", captured.getvalue()) + + def test_print_success_writes_to_stdout(self): + captured = io.StringIO() + with mock.patch('sys.stdout', captured): + gcu_main.print_success("all good") + self.assertIn("all good", captured.getvalue()) + + +# --------------------------------------------------------------------------- +# multiasic_save_to_singlefile +# --------------------------------------------------------------------------- + +class TestMultiasicSaveToSinglefile(unittest.TestCase): + + def test_saves_host_and_asic_configs(self): + host_config = {"PORT": {}} + asic_config = {"VLAN": {}} + + def fake_run(cmd, **kwargs): + result = mock.Mock() + if "-n" in cmd: + result.stdout = json.dumps(asic_config) + else: + result.stdout = json.dumps(host_config) + return result + + mock_open_obj = mock.mock_open() + with mock.patch('subprocess.run', side_effect=fake_run): + with mock.patch('sonic_py_common.multi_asic.get_namespace_list', + return_value=['asic0']): + with mock.patch('builtins.open', mock_open_obj): + gcu_main.multiasic_save_to_singlefile('/tmp/all_config.json') + + written = ''.join( + call.args[0] + for call in mock_open_obj().write.call_args_list + ) + saved = json.loads(written) + self.assertIn('localhost', saved) + self.assertIn('asic0', saved) + + +# --------------------------------------------------------------------------- +# Sub-command functions +# --------------------------------------------------------------------------- + +class TestCreateCheckpoint(unittest.TestCase): + + def _make_args(self, name='cp1', verbose=False): + return Namespace(checkpoint_name=name, verbose=verbose) + + def test_success(self): + mock_updater = mock.Mock() + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + with mock.patch('generic_config_updater.main.print_success') as mock_ps: + gcu_main.create_checkpoint(self._make_args()) + mock_updater.checkpoint.assert_called_once_with('cp1', False) + mock_ps.assert_called_once() + + def test_success_verbose(self): + mock_updater = mock.Mock() + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + captured = io.StringIO() + with mock.patch('sys.stdout', captured): + with mock.patch('generic_config_updater.main.print_success'): + gcu_main.create_checkpoint(self._make_args(verbose=True)) + self.assertIn('cp1', captured.getvalue()) + + def test_failure_calls_sys_exit(self): + mock_updater = mock.Mock() + mock_updater.checkpoint.side_effect = Exception("fail") + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + with self.assertRaises(SystemExit): + gcu_main.create_checkpoint(self._make_args()) + + +class TestDeleteCheckpoint(unittest.TestCase): + + def _make_args(self, name='cp1', verbose=False): + return Namespace(checkpoint_name=name, verbose=verbose) + + def test_success(self): + mock_updater = mock.Mock() + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + with mock.patch('generic_config_updater.main.print_success') as mock_ps: + gcu_main.delete_checkpoint(self._make_args()) + mock_updater.delete_checkpoint.assert_called_once_with('cp1', False) + mock_ps.assert_called_once() + + def test_success_verbose(self): + mock_updater = mock.Mock() + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + captured = io.StringIO() + with mock.patch('sys.stdout', captured): + with mock.patch('generic_config_updater.main.print_success'): + gcu_main.delete_checkpoint(self._make_args(verbose=True)) + self.assertIn('cp1', captured.getvalue()) + + def test_failure_calls_sys_exit(self): + mock_updater = mock.Mock() + mock_updater.delete_checkpoint.side_effect = Exception("fail") + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + with self.assertRaises(SystemExit): + gcu_main.delete_checkpoint(self._make_args()) + + +class TestListCheckpoints(unittest.TestCase): + + def _make_args(self, verbose=False): + return Namespace(verbose=verbose) + + def test_no_checkpoints(self): + mock_updater = mock.Mock() + mock_updater.list_checkpoints.return_value = [] + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + captured = io.StringIO() + with mock.patch('sys.stdout', captured): + gcu_main.list_checkpoints(self._make_args()) + self.assertIn('No checkpoints', captured.getvalue()) + + def test_list_without_time(self): + mock_updater = mock.Mock() + mock_updater.list_checkpoints.return_value = ['cp1', 'cp2'] + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + captured = io.StringIO() + with mock.patch('sys.stdout', captured): + gcu_main.list_checkpoints(self._make_args()) + output = captured.getvalue() + self.assertIn('cp1', output) + self.assertIn('cp2', output) + + def test_failure_calls_sys_exit(self): + mock_updater = mock.Mock() + mock_updater.list_checkpoints.side_effect = Exception("fail") + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + with self.assertRaises(SystemExit): + gcu_main.list_checkpoints(self._make_args()) + + +class TestApplyPatchSubcommand(unittest.TestCase): + + def _make_args(self, patch_file='/fake/p.json', fmt='CONFIGDB', + verbose=False, dry_run=False, parallel=False, + ignore_non_yang_tables=False, ignore_path=None): + return Namespace( + patch_file=patch_file, + format=fmt, + verbose=verbose, + dry_run=dry_run, + parallel=parallel, + ignore_non_yang_tables=ignore_non_yang_tables, + ignore_path=ignore_path or [], + ) + + def test_success(self): + with mock.patch('generic_config_updater.main.apply_patch_from_file') as mock_apf: + with mock.patch('generic_config_updater.main.print_success') as mock_ps: + gcu_main.apply_patch(self._make_args()) + mock_apf.assert_called_once() + mock_ps.assert_called_once() + + def test_verbose_prints_details(self): + with mock.patch('generic_config_updater.main.apply_patch_from_file'): + captured = io.StringIO() + with mock.patch('sys.stdout', captured): + with mock.patch('generic_config_updater.main.print_success'): + gcu_main.apply_patch(self._make_args(verbose=True, dry_run=True)) + output = captured.getvalue() + self.assertIn('/fake/p.json', output) + self.assertIn('DRY RUN', output) + + def test_failure_calls_sys_exit(self): + with mock.patch('generic_config_updater.main.apply_patch_from_file', + side_effect=Exception("oops")): + with self.assertRaises(SystemExit): + gcu_main.apply_patch(self._make_args()) + + +class TestReplaceConfigSubcommand(unittest.TestCase): + + def _make_args(self, config_file='/fake/cfg.json', fmt='CONFIGDB', + verbose=False, ignore_non_yang_tables=False, + ignore_path=None): + return Namespace( + config_file=config_file, + format=fmt, + verbose=verbose, + ignore_non_yang_tables=ignore_non_yang_tables, + ignore_path=ignore_path or [], + ) + + def test_success(self): + mock_updater = mock.Mock() + cfg = json.dumps({"PORT": {}}) + with mock.patch('builtins.open', mock.mock_open(read_data=cfg)): + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + with mock.patch('generic_config_updater.main.print_success') as mock_ps: + gcu_main.replace_config(self._make_args()) + mock_updater.replace.assert_called_once() + mock_ps.assert_called_once() + + def test_verbose_prints_details(self): + mock_updater = mock.Mock() + cfg = json.dumps({}) + with mock.patch('builtins.open', mock.mock_open(read_data=cfg)): + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + captured = io.StringIO() + with mock.patch('sys.stdout', captured): + with mock.patch('generic_config_updater.main.print_success'): + gcu_main.replace_config(self._make_args(verbose=True)) + self.assertIn('/fake/cfg.json', captured.getvalue()) + + def test_failure_calls_sys_exit(self): + with mock.patch('builtins.open', side_effect=Exception("no file")): + with self.assertRaises(SystemExit): + gcu_main.replace_config(self._make_args()) + + +class TestSaveConfigSubcommand(unittest.TestCase): + + def _make_args(self, filename=None, verbose=False): + return Namespace(filename=filename, verbose=verbose) + + def test_save_defaults_to_config_db_file(self): + fake_cfg = json.dumps({"PORT": {}}) + + def fake_run(cmd, **kwargs): + r = mock.Mock() + r.stdout = fake_cfg + return r + + mock_open_obj = mock.mock_open() + with mock.patch('subprocess.run', side_effect=fake_run): + with mock.patch('sonic_py_common.multi_asic.is_multi_asic', return_value=False): + with mock.patch('builtins.open', mock_open_obj): + with mock.patch('generic_config_updater.main.print_success') as mock_ps: + gcu_main.save_config(self._make_args()) + mock_ps.assert_called_once() + + def test_save_with_explicit_filename_verbose(self): + fake_cfg = json.dumps({}) + + def fake_run(cmd, **kwargs): + r = mock.Mock() + r.stdout = fake_cfg + return r + + mock_open_obj = mock.mock_open() + with mock.patch('subprocess.run', side_effect=fake_run): + with mock.patch('sonic_py_common.multi_asic.is_multi_asic', return_value=False): + with mock.patch('builtins.open', mock_open_obj): + captured = io.StringIO() + with mock.patch('sys.stdout', captured): + with mock.patch('generic_config_updater.main.print_success'): + gcu_main.save_config(self._make_args(filename='/tmp/out.json', verbose=True)) + self.assertIn('/tmp/out.json', captured.getvalue()) + + def test_save_multiasic(self): + with mock.patch('generic_config_updater.main.multiasic_save_to_singlefile') as mock_save: + with mock.patch('sonic_py_common.multi_asic.is_multi_asic', return_value=True): + with mock.patch('generic_config_updater.main.print_success'): + gcu_main.save_config(self._make_args(filename='/tmp/out.json')) + mock_save.assert_called_once_with('/tmp/out.json') + + def test_subprocess_error_exits(self): + with mock.patch('subprocess.run', + side_effect=subprocess.CalledProcessError(1, 'cmd')): + with mock.patch('sonic_py_common.multi_asic.is_multi_asic', return_value=False): + with self.assertRaises(SystemExit): + gcu_main.save_config(self._make_args()) + + def test_generic_exception_exits(self): + with mock.patch('subprocess.run', side_effect=RuntimeError("oops")): + with mock.patch('sonic_py_common.multi_asic.is_multi_asic', return_value=False): + with self.assertRaises(SystemExit): + gcu_main.save_config(self._make_args()) + + +class TestRollbackConfigSubcommand(unittest.TestCase): + + def _make_args(self, name='cp1', verbose=False, + ignore_non_yang_tables=False, ignore_path=None): + return Namespace( + checkpoint_name=name, + verbose=verbose, + ignore_non_yang_tables=ignore_non_yang_tables, + ignore_path=ignore_path or [], + ) + + def test_success(self): + mock_updater = mock.Mock() + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + with mock.patch('generic_config_updater.main.print_success') as mock_ps: + gcu_main.rollback_config(self._make_args()) + mock_updater.rollback.assert_called_once() + mock_ps.assert_called_once() + + def test_verbose_prints_details(self): + mock_updater = mock.Mock() + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + captured = io.StringIO() + with mock.patch('sys.stdout', captured): + with mock.patch('generic_config_updater.main.print_success'): + gcu_main.rollback_config(self._make_args(verbose=True)) + self.assertIn('cp1', captured.getvalue()) + + def test_failure_calls_sys_exit(self): + mock_updater = mock.Mock() + mock_updater.rollback.side_effect = Exception("fail") + with mock.patch('generic_config_updater.main.GenericUpdater', return_value=mock_updater): + with self.assertRaises(SystemExit): + gcu_main.rollback_config(self._make_args()) + + +# --------------------------------------------------------------------------- +# build_parser +# --------------------------------------------------------------------------- + +class TestBuildParser(unittest.TestCase): + + def setUp(self): + self.parser = gcu_main.build_parser() + + def test_parser_returns_argparse_parser(self): + import argparse + self.assertIsNotNone(self.parser) + self.assertIsInstance(self.parser, argparse.ArgumentParser) + + def test_create_checkpoint_command(self): + args = self.parser.parse_args(['create-checkpoint', 'mycp']) + self.assertEqual(args.command, 'create-checkpoint') + self.assertEqual(args.checkpoint_name, 'mycp') + self.assertFalse(args.verbose) + + def test_create_checkpoint_verbose(self): + args = self.parser.parse_args(['create-checkpoint', 'mycp', '--verbose']) + self.assertTrue(args.verbose) + + def test_delete_checkpoint_command(self): + args = self.parser.parse_args(['delete-checkpoint', 'mycp']) + self.assertEqual(args.command, 'delete-checkpoint') + self.assertEqual(args.checkpoint_name, 'mycp') + + def test_list_checkpoints_command(self): + args = self.parser.parse_args(['list-checkpoints']) + self.assertEqual(args.command, 'list-checkpoints') + + def test_apply_patch_command_defaults(self): + args = self.parser.parse_args(['apply-patch', 'my.json']) + self.assertEqual(args.command, 'apply-patch') + self.assertEqual(args.patch_file, 'my.json') + self.assertEqual(args.format, 'CONFIGDB') + self.assertFalse(args.dry_run) + self.assertFalse(args.parallel) + self.assertFalse(args.ignore_non_yang_tables) + self.assertEqual(args.ignore_path, []) + + def test_apply_patch_all_flags(self): + args = self.parser.parse_args([ + 'apply-patch', 'my.json', + '--format', 'SONICYANG', + '--dry-run', + '--parallel', + '--ignore-non-yang-tables', + '--ignore-path', '/T1', + '--ignore-path', '/T2', + '--verbose', + ]) + self.assertEqual(args.format, 'SONICYANG') + self.assertTrue(args.dry_run) + self.assertTrue(args.parallel) + self.assertTrue(args.ignore_non_yang_tables) + self.assertEqual(args.ignore_path, ['/T1', '/T2']) + self.assertTrue(args.verbose) + + def test_replace_command_defaults(self): + args = self.parser.parse_args(['replace', 'cfg.json']) + self.assertEqual(args.command, 'replace') + self.assertEqual(args.config_file, 'cfg.json') + self.assertEqual(args.format, 'CONFIGDB') + + def test_save_command_default_filename(self): + args = self.parser.parse_args(['save']) + self.assertEqual(args.command, 'save') + self.assertIsNone(args.filename) + + def test_save_command_explicit_filename(self): + args = self.parser.parse_args(['save', '/tmp/out.json']) + self.assertEqual(args.filename, '/tmp/out.json') + + def test_rollback_command(self): + args = self.parser.parse_args(['rollback', 'cp1']) + self.assertEqual(args.command, 'rollback') + self.assertEqual(args.checkpoint_name, 'cp1') + + +# --------------------------------------------------------------------------- +# main() +# --------------------------------------------------------------------------- + +class TestMain(unittest.TestCase): + + def _run_main(self, argv): + with mock.patch('sys.argv', ['gcu-standalone'] + argv): + try: + gcu_main.main() + except SystemExit: + pass + + def test_no_command_prints_help(self): + captured = io.StringIO() + with mock.patch('sys.argv', ['gcu-standalone']): + with mock.patch('sys.stdout', captured): + gcu_main.main() + # argparse prints usage/help on no subcommand + # main() calls parser.print_help() and returns + + def test_create_checkpoint_dispatched(self): + with mock.patch('generic_config_updater.main.create_checkpoint') as mock_fn: + with mock.patch('sys.argv', ['gcu', 'create-checkpoint', 'mycp']): + gcu_main.main() + mock_fn.assert_called_once() + + def test_delete_checkpoint_dispatched(self): + with mock.patch('generic_config_updater.main.delete_checkpoint') as mock_fn: + with mock.patch('sys.argv', ['gcu', 'delete-checkpoint', 'mycp']): + gcu_main.main() + mock_fn.assert_called_once() + + def test_list_checkpoints_dispatched(self): + with mock.patch('generic_config_updater.main.list_checkpoints') as mock_fn: + with mock.patch('sys.argv', ['gcu', 'list-checkpoints']): + gcu_main.main() + mock_fn.assert_called_once() + + def test_save_dispatched(self): + with mock.patch('generic_config_updater.main.save_config') as mock_fn: + with mock.patch('sys.argv', ['gcu', 'save']): + gcu_main.main() + mock_fn.assert_called_once() + + def test_rollback_dispatched(self): + with mock.patch('generic_config_updater.main.rollback_config') as mock_fn: + with mock.patch('sys.argv', ['gcu', 'rollback', 'cp1']): + gcu_main.main() + mock_fn.assert_called_once() + + def test_apply_patch_missing_file_exits(self): + with mock.patch('sys.argv', ['gcu', 'apply-patch', '/nonexistent/file.json']): + with self.assertRaises(SystemExit): + gcu_main.main() + + def test_replace_missing_file_exits(self): + with mock.patch('sys.argv', ['gcu', 'replace', '/nonexistent/cfg.json']): + with self.assertRaises(SystemExit): + gcu_main.main() + + def test_apply_patch_existing_file_dispatched(self, ): + with mock.patch('generic_config_updater.main.apply_patch') as mock_fn: + with mock.patch('os.path.exists', return_value=True): + with mock.patch('sys.argv', ['gcu', 'apply-patch', '/fake/patch.json']): + gcu_main.main() + mock_fn.assert_called_once() + + def test_replace_existing_file_dispatched(self): + with mock.patch('generic_config_updater.main.replace_config') as mock_fn: + with mock.patch('os.path.exists', return_value=True): + with mock.patch('sys.argv', ['gcu', 'replace', '/fake/cfg.json']): + gcu_main.main() + mock_fn.assert_called_once() + + +if __name__ == '__main__': + unittest.main() diff --git a/utilities_common/constants.py b/utilities_common/constants.py index 25858b785..a0ef0a391 100644 --- a/utilities_common/constants.py +++ b/utilities_common/constants.py @@ -1,6 +1,10 @@ #All the constant used in sonic-utilities DEFAULT_NAMESPACE = '' + +# NOTE: A duplicate of this list exists in generic_config_updater/field_operation_validators.py +# (kept separate to avoid a utilities_common dependency in the GCU wheel). +# If you update this list, update that copy too. DEFAULT_SUPPORTED_FECS_LIST = [ 'rs', 'fc', 'none', 'auto'] DISPLAY_ALL = 'all' DISPLAY_EXTERNAL = 'frontend'