From 3dd30fa4a816769eb5f7334061b78edc74e7599e Mon Sep 17 00:00:00 2001 From: Ankush Singh Date: Mon, 1 Jul 2024 10:34:15 -0700 Subject: [PATCH] Deleting deployment library, tests and target for the tests (#2410) Summary: Pull Request resolved: https://github.com/facebookresearch/fbpcs/pull/2410 # Context This diff stack aims to delete the deployment library which was initially developed to delete the deployment shell scripts with python scripts for better code coverage. This diff deletes the deployment library, tests and target for the tests in the fbpcs repository. The code changes in each file of the diff are as follows: - fbcode/fbpcs/TARGETS: The python_unittest target for the test_pce_deployment_library is removed. - fbcode/fbpcs/infra/pce_deployment_library/test/test_terraform_deployment.py: This file is deleted. - fbcode/fbpcs/infra/pce_deployment_library/test/test_aws.py: This file is deleted. - fbcode/fbpcs/infra/pce_deployment_library/publisher_pce/publisher_pce_utils.py: This file is deleted. - fbcode/fbpcs/infra/pce_deployment_library/publisher_pce/deploy.py: This file is deleted. Reviewed By: ajaybhargavb Differential Revision: D59231591 fbshipit-source-id: 6b49a156c5d2f020322ca480195126b802229207 --- .../publisher_pce/deploy.py | 171 ----------- .../publisher_pce/publisher_pce_defaults.py | 16 - .../publisher_pce/publisher_pce_utils.py | 52 ---- .../pce_deployment_library/test/test_aws.py | 279 ------------------ .../test/test_cloud_factory.py | 39 --- .../test/test_terraform_deployment.py | 160 ---------- .../test/test_terraform_deployment_utils.py | 112 ------- 7 files changed, 829 deletions(-) delete mode 100644 fbpcs/infra/pce_deployment_library/publisher_pce/deploy.py delete mode 100644 fbpcs/infra/pce_deployment_library/publisher_pce/publisher_pce_defaults.py delete mode 100644 fbpcs/infra/pce_deployment_library/publisher_pce/publisher_pce_utils.py delete mode 100644 fbpcs/infra/pce_deployment_library/test/test_aws.py delete mode 100644 fbpcs/infra/pce_deployment_library/test/test_cloud_factory.py delete mode 100644 fbpcs/infra/pce_deployment_library/test/test_terraform_deployment.py delete mode 100644 fbpcs/infra/pce_deployment_library/test/test_terraform_deployment_utils.py diff --git a/fbpcs/infra/pce_deployment_library/publisher_pce/deploy.py b/fbpcs/infra/pce_deployment_library/publisher_pce/deploy.py deleted file mode 100644 index 25e8a75ff..000000000 --- a/fbpcs/infra/pce_deployment_library/publisher_pce/deploy.py +++ /dev/null @@ -1,171 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# -# This source code is licensed under the MIT license found in the -# LICENSE file in the root directory of this source tree. - -import logging - -from typing import Any, Dict - -from fbpcs.infra.pce_deployment_library.cloud_library.aws.aws import AWS -from fbpcs.infra.pce_deployment_library.deploy_library.models import ( - FlaggedOption, - TerraformCommand, -) -from fbpcs.infra.pce_deployment_library.deploy_library.terraform_library.terraform_deployment import ( - TerraformDeployment, -) -from fbpcs.infra.pce_deployment_library.publisher_pce.publisher_pce_defaults import ( - TerraformDefaults, -) -from fbpcs.infra.pce_deployment_library.publisher_pce.publisher_pce_utils import ( - PublisherPceUtils, -) - - -class Deploy: - """ - Class to store the arguments used for deployment and undeployment - """ - - def __init__( - self, - s3_bucket_name: str = None, - s3_bucket_region: str = None, - account_id: str = None, - partner_account_id: str = None, - aws_region: str = None, - tag: str = None, - vpc_cidr: str = None, - partner_vpc_cidr: str = None, - vpc_logging_enabled: bool = False, - vpc_log_bucket_arn: str = None, - ): - self.s3_bucket_name = s3_bucket_name - self.s3_bucket_region = s3_bucket_region - self.account_id = account_id - self.partner_account_id = partner_account_id - self.aws_region = aws_region - self.tag = tag - self.vpc_cidr = vpc_cidr - self.partner_vpc_cidr = partner_vpc_cidr - self.vpc_logging_enabled = vpc_logging_enabled - self.vpc_log_bucket_arn = vpc_log_bucket_arn - - self.tag_postfix = f"-{self.tag}" - - self.log: logging.Logger = logging.getLogger(__name__) - - self.aws = AWS(aws_region=self.aws_region) - self.terraform = TerraformDeployment() - self.publishe_pce_utils = PublisherPceUtils() - - def deploy_pce(self, bucket_version: bool = True) -> None: - self.log.info( - "########################Started AWS Infrastructure Deployment########################" - ) - self.log.info("Creating S3 bucket...") - self.aws.check_s3_buckets_exists( - s3_bucket_name=self.s3_bucket_name, bucket_version=bucket_version - ) - self.terraform.working_directory = TerraformDefaults.PCE_TERRAFORM_FILE_LOCATION - - self.log.info( - f"Changing terraform working directory to {self.terraform.working_directory}" - ) - backend_config = self._get_init_backend_config() - - self.log.info("Running terraform init...") - terraform_init_log = self.terraform.terraform_init( - backend_config=backend_config, reconfigure=FlaggedOption - ) - self.log.info( - f"Terraform init output is: {self.publishe_pce_utils.parse_command_output(TerraformCommand.INIT, terraform_init_log)}" - ) - - var_dict = self._get_var() - if self.vpc_logging_enabled: - opt_params = self._get_vpc_var() - var_dict.update(opt_params) - - self.log.debug(f"Running terraform apply with vars: {var_dict}") - terraform_create_log = self.terraform.create(var=var_dict) - self.log.info( - f"Terraform apply output is: {self.publishe_pce_utils.parse_command_output(TerraformCommand.APPLY, terraform_create_log)}" - ) - - self.log.info( - "######################## PCE terraform output ########################" - ) - terraform_output_result = self.terraform.terraform_output() - self.log.info( - f"Terraform output is: {self.publishe_pce_utils.parse_command_output(TerraformCommand.OUTPUT, terraform_output_result)}" - ) - - self.log.info( - "########################Finished AWS Infrastructure Deployment########################" - ) - - def undeploy_pce(self) -> None: - self.log.info("Start undeploying...") - self.log.info( - "########################Check tfstate files########################" - ) - terraform_state_file = f"tfstate/pce{self.tag_postfix}.tfstate" - self.aws.check_s3_object_exists( - s3_bucket_name=self.s3_bucket_name, key_name=terraform_state_file - ) - self.log.info("All tfstate files exist. Continue...") - - self.terraform.working_directory = TerraformDefaults.PCE_TERRAFORM_FILE_LOCATION - - backend_config = self._get_init_backend_config() - - self.log.info( - "########################Delete PCE resources########################" - ) - self.log.info("Running terraform init...") - terraform_init_log = self.terraform.terraform_init( - backend_config=backend_config, reconfigure=FlaggedOption - ) - self.log.info( - f"Terraform init output is: {self.publishe_pce_utils.parse_command_output(TerraformCommand.INIT, terraform_init_log)}" - ) - - var_dict = self._get_var() - terraform_destroy_log = self.terraform.destroy(var=var_dict) - self.log.info( - f"Terraform init output is: {self.publishe_pce_utils.parse_command_output(TerraformCommand.DESTROY, terraform_destroy_log)}" - ) - - def _get_init_backend_config(self) -> Dict[str, Any]: - return { - "bucket": self.s3_bucket_name, - "region": self.aws_region, - "key": f"tfstate/pce{self.tag_postfix}.tfstate", - } - - def _get_var(self, destroy: bool = False) -> Dict[str, Any]: - return_dict = { - "aws_region": self.aws_region, - "tag_postfix": self.tag_postfix, - "pce_id": self.tag, - } - - if not destroy: - return_dict.update( - { - "vpc_cidr": self.vpc_cidr, - "otherparty_vpc_cidr": self.partner_vpc_cidr, - } - ) - - return return_dict - - def _get_vpc_var(self) -> Dict[str, Any]: - return { - "vpc_logging": { - "enabled": self.vpc_logging_enabled, - "bucket_arn": self.vpc_log_bucket_arn, - } - } diff --git a/fbpcs/infra/pce_deployment_library/publisher_pce/publisher_pce_defaults.py b/fbpcs/infra/pce_deployment_library/publisher_pce/publisher_pce_defaults.py deleted file mode 100644 index 44f912110..000000000 --- a/fbpcs/infra/pce_deployment_library/publisher_pce/publisher_pce_defaults.py +++ /dev/null @@ -1,16 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# -# This source code is licensed under the MIT license found in the -# LICENSE file in the root directory of this source tree. - -import os -from enum import Enum - -DIR = os.path.dirname -PCE_TERRAFORM_FILES = "pce/aws_terraform_template/common/pce" - - -class TerraformDefaults(str, Enum): - PCE_TERRAFORM_FILE_LOCATION = os.path.join( - DIR(DIR(DIR(os.path.realpath(__file__)))), PCE_TERRAFORM_FILES - ) diff --git a/fbpcs/infra/pce_deployment_library/publisher_pce/publisher_pce_utils.py b/fbpcs/infra/pce_deployment_library/publisher_pce/publisher_pce_utils.py deleted file mode 100644 index 61b3bd537..000000000 --- a/fbpcs/infra/pce_deployment_library/publisher_pce/publisher_pce_utils.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# -# This source code is licensed under the MIT license found in the -# LICENSE file in the root directory of this source tree. - -import logging -import re - -from fbpcs.infra.pce_deployment_library.errors_library.terraform_errors import ( - TerraformCommandExectionError, -) - - -class PublisherPceUtils: - def __init__(self): - self.log: logging.Logger = logging.getLogger(__name__) - # set logger - self.log.setLevel(logging.DEBUG) - # create file handler which logs even debug messages - fh = logging.FileHandler("test.log") - fh.setLevel(logging.DEBUG) - self.log.addHandler(fh) - - def parse_command_output(self, command, command_result) -> str: - ret_code = command_result.return_code - if ret_code != 0: - self.log.error(f"Failed to run terraform {command}") - error = f"Command terraform {command} execution failed with error, {command_result.error}" - self.log.error(f"{error}") - raise TerraformCommandExectionError(f"{error}") - - return self.sanitize_command_output_logs(command_result.output) - - def sanitize_command_output_logs(self, command_output_log: str) -> str: - if not command_output_log: - return "" - - ansi_escape = re.compile( - r""" - \x1B # ESC - (?: # 7-bit C1 Fe (except CSI) - [@-Z\\-_] - | # or [ for CSI, followed by a control sequence - \[ - [0-?]* # Parameter bytes - [ -/]* # Intermediate bytes - [@-~] # Final byte - ) - """, - re.VERBOSE, - ) - return ansi_escape.sub("", command_output_log) diff --git a/fbpcs/infra/pce_deployment_library/test/test_aws.py b/fbpcs/infra/pce_deployment_library/test/test_aws.py deleted file mode 100644 index 3777491f6..000000000 --- a/fbpcs/infra/pce_deployment_library/test/test_aws.py +++ /dev/null @@ -1,279 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# -# This source code is licensed under the MIT license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -import unittest -from unittest.mock import create_autospec - -from botocore.exceptions import ClientError - -from fbpcs.infra.pce_deployment_library.cloud_library.aws.aws import AWS -from fbpcs.infra.pce_deployment_library.errors_library.aws_errors import ( - AccessDeniedError, - S3BucketCreationError, - S3BucketDeleteError, - S3BucketDoesntExist, - S3BucketVersioningFailedError, -) - - -class TestAws(unittest.TestCase): - def setUp(self) -> None: - self.aws = AWS() - self.aws.sts.get_caller_identity = create_autospec( - self.aws.sts.get_caller_identity - ) - - def test_check_s3_buckets_exists(self) -> None: - s3_bucket_name = "fake_bucket" - self.aws.s3_client.head_bucket = create_autospec(self.aws.s3_client.head_bucket) - - with self.subTest("basic"): - with self.assertLogs() as captured: - self.aws.check_s3_buckets_exists( - s3_bucket_name=s3_bucket_name, bucket_version=False - ) - self.assertEqual(len(captured.records), 2) - self.assertEqual( - captured.records[1].getMessage(), - f"S3 bucket {s3_bucket_name} already exists in the AWS account.", - ) - - with self.subTest("BucketNotFound"): - self.aws.s3_client.create_bucket = create_autospec( - self.aws.s3_client.create_bucket - ) - self.aws.s3_client.put_bucket_versioning = create_autospec( - self.aws.s3_client.put_bucket_versioning - ) - self.aws.s3_client.head_bucket.side_effect = ClientError( - error_response={"Error": {"Code": "404"}}, - operation_name="head_bucket", - ) - with self.assertLogs() as captured: - self.aws.check_s3_buckets_exists( - s3_bucket_name=s3_bucket_name, bucket_version=False - ) - self.assertEqual(len(captured.records), 4) - self.assertEqual( - captured.records[2].getMessage(), - f"Creating new S3 bucket {s3_bucket_name}", - ) - self.assertEqual( - captured.records[3].getMessage(), - f"Create S3 bucket {s3_bucket_name} operation was successful.", - ) - - with self.subTest("AccessDenied"): - self.aws.s3_client.head_bucket.side_effect = ClientError( - error_response={"Error": {"Code": "403"}}, - operation_name="head_bucket", - ) - with self.assertRaisesRegex(AccessDeniedError, "Access denied*"): - self.aws.check_s3_buckets_exists( - s3_bucket_name=s3_bucket_name, bucket_version=False - ) - - with self.subTest("CatchAllError"): - self.aws.s3_client.head_bucket.side_effect = ClientError( - error_response={"Error": {"Code": None}}, - operation_name="head_bucket", - ) - with self.assertRaisesRegex( - S3BucketCreationError, "Couldn't create bucket*" - ): - self.aws.check_s3_buckets_exists( - s3_bucket_name=s3_bucket_name, bucket_version=False - ) - - def test_create_s3_bucket(self) -> None: - self.aws.s3_client.create_bucket = create_autospec( - self.aws.s3_client.create_bucket - ) - s3_bucket_name = "fake_bucket" - - with self.subTest("Basic"): - with self.assertLogs() as captured: - self.aws.create_s3_bucket( - s3_bucket_name=s3_bucket_name, bucket_version=False - ) - self.assertEqual(len(captured.records), 2) - self.assertEqual( - captured.records[1].getMessage(), - f"Create S3 bucket {s3_bucket_name} operation was successful.", - ) - - with self.subTest("CreateBucketException"): - self.aws.s3_client.create_bucket.side_effect = ClientError( - error_response={"Error": {"Code": None}}, - operation_name="create_bucket", - ) - - with self.assertRaisesRegex( - S3BucketCreationError, "Failed to create S3 bucket*" - ): - self.aws.create_s3_bucket( - s3_bucket_name=s3_bucket_name, bucket_version=False - ) - - def test_update_bucket_versioning(self) -> None: - s3_bucket_name = "fake_bucket" - self.aws.s3_client.put_bucket_versioning = create_autospec( - self.aws.s3_client.put_bucket_versioning - ) - - with self.subTest("Basic"): - with self.assertLogs() as captured: - self.aws.update_bucket_versioning(s3_bucket_name=s3_bucket_name) - self.assertEqual(len(captured.records), 2) - self.assertEqual( - captured.records[1].getMessage(), - f"Bucket {s3_bucket_name} is enabled with versioning.", - ) - - with self.subTest("S3BucketDoesntExist"): - self.aws.s3_client.put_bucket_versioning.side_effect = ClientError( - error_response={"Error": {"Code": "404"}}, - operation_name="put_bucket_versioning", - ) - self.assertRaises( - S3BucketDoesntExist, - lambda: self.aws.update_bucket_versioning( - s3_bucket_name=s3_bucket_name - ), - ) - - with self.subTest("AccessDeniedException"): - self.aws.s3_client.put_bucket_versioning.side_effect = ClientError( - error_response={"Error": {"Code": "403"}}, - operation_name="put_bucket_versioning", - ) - self.assertRaises( - AccessDeniedError, - lambda: self.aws.update_bucket_versioning( - s3_bucket_name=s3_bucket_name - ), - ) - - with self.subTest("BucketVersioningFailed"): - self.aws.s3_client.put_bucket_versioning.side_effect = ClientError( - error_response={"Error": {"Code": None}}, - operation_name="put_bucket_versioning", - ) - self.assertRaises( - S3BucketVersioningFailedError, - lambda: self.aws.update_bucket_versioning( - s3_bucket_name=s3_bucket_name - ), - ) - with self.assertRaisesRegex( - S3BucketVersioningFailedError, "Error in versioning S3 bucket*" - ): - self.aws.update_bucket_versioning(s3_bucket_name=s3_bucket_name) - - def test_delete_s3_bucket(self) -> None: - s3_bucket_name = "fake_bucket" - self.aws.s3_client.delete_bucket = create_autospec( - self.aws.s3_client.delete_bucket - ) - - with self.subTest("Basic"): - with self.assertLogs() as captured: - self.aws.delete_s3_bucket(s3_bucket_name=s3_bucket_name) - self.assertEqual(len(captured.records), 2) - self.assertEqual( - captured.records[1].getMessage(), - f"Delete S3 bucket {s3_bucket_name} operation was successful.", - ) - - with self.subTest("BucketDeleteFailed"): - self.aws.s3_client.delete_bucket.side_effect = ClientError( - error_response={"Error": {"Code": None}}, - operation_name="delete_bucket", - ) - self.assertRaises( - S3BucketDeleteError, - lambda: self.aws.delete_s3_bucket(s3_bucket_name=s3_bucket_name), - ) - - def test_check_s3_object_exists(self) -> None: - s3_bucket_name = "fake_bucket" - key_name = "fake_file" - account_id = "123456789" - - with self.subTest("basic"): - self.aws.s3_client.head_object = create_autospec( - self.aws.s3_client.head_object - ) - self.aws.s3_client.head_object.return_value = {"ContentLength": 100} - self.assertTrue( - self.aws.check_s3_object_exists( - s3_bucket_name=s3_bucket_name, - key_name=key_name, - account_id=account_id, - ) - ) - - with self.subTest("KeyNotExistError"): - self.aws.s3_client.head_object.side_effect = ClientError( - error_response={"Error": {"Code": "404"}}, - operation_name="head_object", - ) - - with self.assertLogs() as captured: - self.assertFalse( - self.aws.check_s3_object_exists( - s3_bucket_name=s3_bucket_name, - key_name=key_name, - account_id=account_id, - ) - ) - self.assertEqual(len(captured.records), 3) - self.assertEqual( - captured.records[1].getMessage(), - f"Couldn't find file {key_name} in bucket {s3_bucket_name}", - ) - - with self.subTest("AccessDeniedError"): - self.aws.s3_client.head_object.side_effect = ClientError( - error_response={"Error": {"Code": "403"}}, - operation_name="head_object", - ) - - with self.assertLogs() as captured: - self.assertFalse( - self.aws.check_s3_object_exists( - s3_bucket_name=s3_bucket_name, - key_name=key_name, - account_id=account_id, - ) - ) - self.assertEqual(len(captured.records), 3) - self.assertEqual( - captured.records[1].getMessage(), - f"Access denied: failed to access bucket {s3_bucket_name}", - ) - - with self.subTest("CatchAllError"): - self.aws.s3_client.head_object.side_effect = ClientError( - error_response={"Error": {"Code": None}}, - operation_name="head_object", - ) - - with self.assertLogs() as captured: - self.assertFalse( - self.aws.check_s3_object_exists( - s3_bucket_name=s3_bucket_name, - key_name=key_name, - account_id=account_id, - ) - ) - self.assertEqual(len(captured.records), 3) - self.assertEqual( - captured.records[1].getMessage(), - f"Failed to find file {key_name} in bucket {s3_bucket_name}", - ) diff --git a/fbpcs/infra/pce_deployment_library/test/test_cloud_factory.py b/fbpcs/infra/pce_deployment_library/test/test_cloud_factory.py deleted file mode 100644 index 57d312969..000000000 --- a/fbpcs/infra/pce_deployment_library/test/test_cloud_factory.py +++ /dev/null @@ -1,39 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# -# This source code is licensed under the MIT license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict -import unittest - -from fbpcs.infra.pce_deployment_library.cloud_library.cloud_factory import CloudFactory -from fbpcs.infra.pce_deployment_library.cloud_library.defaults import CloudPlatforms - - -class TestCloudFactory(unittest.TestCase): - def setUp(self) -> None: - self.test_cloud_factory = CloudFactory() - - def test_supported_cloud_platforms(self) -> None: - expected = CloudPlatforms.list() - - self.assertEqual( - expected, self.test_cloud_factory.get_supported_cloud_platforms() - ) - - def test_create_cloud_object_aws(self) -> None: - expected = CloudPlatforms.AWS - - cloud_object = self.test_cloud_factory.create_cloud_object( - cloud_type=CloudPlatforms.AWS - ) - self.assertEqual(expected, cloud_object.cloud_type()) - - def test_create_cloud_object_gcp(self) -> None: - expected = CloudPlatforms.GCP - - cloud_object = self.test_cloud_factory.create_cloud_object( - cloud_type=CloudPlatforms.GCP - ) - - self.assertEqual(expected, cloud_object.cloud_type()) diff --git a/fbpcs/infra/pce_deployment_library/test/test_terraform_deployment.py b/fbpcs/infra/pce_deployment_library/test/test_terraform_deployment.py deleted file mode 100644 index e6b545c50..000000000 --- a/fbpcs/infra/pce_deployment_library/test/test_terraform_deployment.py +++ /dev/null @@ -1,160 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# -# This source code is licensed under the MIT license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -import unittest -from subprocess import PIPE, Popen -from typing import Any, Dict, Type - -from fbpcs.infra.pce_deployment_library.deploy_library.models import ( - FlaggedOption, - NotFlaggedOption, - RunCommandResult, - TerraformOptionFlag, -) - -from fbpcs.infra.pce_deployment_library.deploy_library.terraform_library.terraform_deployment import ( - TerraformDeployment, -) - - -class TestTerraformDeployment(unittest.TestCase): - def setUp(self) -> None: - self.terraform_deployment = TerraformDeployment() - - def test_run_command(self) -> None: - with self.subTest("basicCaptureTrue"): - command = "echo Hello World!\n" - capture_output = True - test_obj = Popen(["echo", "Hello World!"], stdout=PIPE) - test_stdout, test_error = test_obj.communicate() - test_return_code = test_obj.returncode - test_command_return = RunCommandResult( - return_code=test_return_code, - output=test_stdout.decode("utf-8"), - # pyre-fixme[6]: For 3rd param expected `Optional[str]` but got - # `Union[bytes, str]`. - error=test_error if test_error else "", - ) - - func_ret = self.terraform_deployment.run_command(command=command) - self.assertEqual(test_command_return.return_code, func_ret.return_code) - self.assertEqual(test_command_return.output, func_ret.output) - self.assertEqual(test_command_return.error, func_ret.error) - - with self.subTest("basicCaptureFalse"): - command = "echo Hello World!\n" - capture_output = False - - test_obj = Popen(["echo", "Hello World!"]) - test_stdout, test_error = test_obj.communicate() - test_return_code = test_obj.returncode - test_command_return = RunCommandResult( - return_code=test_return_code, - # pyre-fixme[6]: For 2nd param expected `Optional[str]` but got `bytes`. - output=test_stdout, - # pyre-fixme[6]: For 3rd param expected `Optional[str]` but got `bytes`. - error=test_stdout, - ) - kwargs: Dict[str, Any] = {"capture_output": capture_output} - func_ret = self.terraform_deployment.run_command(command=command, **kwargs) - self.assertEqual(test_command_return.return_code, func_ret.return_code) - self.assertEqual(test_command_return.output, func_ret.output) - self.assertEqual(test_command_return.error, func_ret.error) - - with self.subTest("TestStdErrWithCaptureOutput"): - command = "cp" - capture_output = True - - func_ret = self.terraform_deployment.run_command(command=command) - test_command_return = RunCommandResult( - return_code=1, - output="", - error="cp: missing file operand\nTry 'cp --help' for more information.\n", - ) - self.assertEqual(test_command_return.return_code, func_ret.return_code) - self.assertEqual(test_command_return.output, func_ret.output) - self.assertEqual(test_command_return.error, func_ret.error) - - with self.subTest("TestStdErrWithoutCaptureOutput"): - command = "cp" - capture_output = False - kwargs: Dict[str, Any] = {"capture_output": capture_output} - - func_ret = self.terraform_deployment.run_command(command=command, **kwargs) - test_command_return = RunCommandResult( - return_code=1, - output=None, - error=None, - ) - self.assertEqual(test_command_return.return_code, func_ret.return_code) - self.assertEqual(test_command_return.output, func_ret.output) - self.assertEqual(test_command_return.error, func_ret.error) - - def test_terraform_init(self) -> None: - kwargs: Dict[str, Any] = {"dry_run": True} - with self.subTest("BackendConig"): - backend_config = { - "region": "fake_region", - "access_key": "fake_access_key", - } - expected_command = "terraform init -input=false -dry-run=true -backend-config region=fake_region -backend-config access_key=fake_access_key -reconfigure" - expected_value = RunCommandResult( - return_code=0, output=f"Dry run command: {expected_command}", error="" - ) - return_value = self.terraform_deployment.terraform_init( - backend_config=backend_config, **kwargs - ) - self.assertEqual(expected_value, return_value) - - with self.subTest("BackendConigWhiteSpaces"): - backend_config = { - "region": "fake_region ", - "access_key": "fake_access_key ", - } - expected_command = "terraform init -input=false -dry-run=true -backend-config region=fake_region -backend-config access_key=fake_access_key -reconfigure" - expected_value = RunCommandResult( - return_code=0, output=f"Dry run command: {expected_command}", error="" - ) - return_value = self.terraform_deployment.terraform_init( - backend_config=backend_config, **kwargs - ) - self.assertEqual(expected_value, return_value) - - with self.subTest("UnsetReconfigureNoBackendConfig"): - expected_command = "terraform init -input=false -dry-run=true" - expected_value = RunCommandResult( - return_code=0, output=f"Dry run command: {expected_command}", error="" - ) - reconfigure: Type[TerraformOptionFlag] = NotFlaggedOption - return_value = self.terraform_deployment.terraform_init( - reconfigure=reconfigure, **kwargs - ) - self.assertEqual(expected_value, return_value) - - with self.subTest("SetReconfigure"): - expected_command = "terraform init -input=false -dry-run=true -reconfigure" - expected_value = RunCommandResult( - return_code=0, output=f"Dry run command: {expected_command}", error="" - ) - reconfigure = FlaggedOption - return_value = self.terraform_deployment.terraform_init( - reconfigure=reconfigure, **kwargs - ) - self.assertEqual(expected_value, return_value) - - def test_create(self) -> None: - # T126572515 - pass - - def test_destory(self) -> None: - # T126573127 - pass - - def test_plan(self) -> None: - # T126574725 - pass diff --git a/fbpcs/infra/pce_deployment_library/test/test_terraform_deployment_utils.py b/fbpcs/infra/pce_deployment_library/test/test_terraform_deployment_utils.py deleted file mode 100644 index e097ace62..000000000 --- a/fbpcs/infra/pce_deployment_library/test/test_terraform_deployment_utils.py +++ /dev/null @@ -1,112 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# -# This source code is licensed under the MIT license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -import unittest -from typing import Any, Dict - -from fbpcs.infra.pce_deployment_library.deploy_library.models import FlaggedOption - -from fbpcs.infra.pce_deployment_library.deploy_library.terraform_library.terraform_deployment_utils import ( - TerraformDeploymentUtils, -) - - -class TestTerraformDeploymentUtils(unittest.TestCase): - def setUp(self) -> None: - self.terraform_deployment_utils = TerraformDeploymentUtils() - - def test_get_default_options(self) -> None: - # T125643751 - pass - - def test_get_command_list(self) -> None: - command: str = "terraform apply" - with self.subTest("OptionTypeDict"): - kwargs: Dict[str, Any] = { - "backend-config": { - "region": "fake_region", - "access_key": "fake_access_key", - } - } - expected_value = [ - "terraform", - "apply", - "-backend-config", - "region=fake_region", - "-backend-config", - "access_key=fake_access_key", - ] - return_value = self.terraform_deployment_utils.get_command_list( - command, **kwargs - ) - self.assertEqual(expected_value, return_value) - - with self.subTest("OptionTypeList"): - kwargs: Dict[str, Any] = {"target": ["fake_region", "fake_access_key"]} - expected_value = [ - "terraform", - "apply", - '-target="fake_region"', - '-target="fake_access_key"', - ] - return_value = self.terraform_deployment_utils.get_command_list( - command, **kwargs - ) - self.assertEqual(expected_value, return_value) - - with self.subTest("OptionTypeBool"): - kwargs: Dict[str, Any] = {"input": False} - expected_value = ["terraform", "apply", "-input=false"] - return_value = self.terraform_deployment_utils.get_command_list( - command, **kwargs - ) - self.assertEqual(expected_value, return_value) - - with self.subTest("OptionTypeFlaggedOption"): - kwargs: Dict[str, Any] = {"reconfigure": FlaggedOption} - expected_value = ["terraform", "apply", "-reconfigure"] - return_value = self.terraform_deployment_utils.get_command_list( - command, **kwargs - ) - self.assertEqual(expected_value, return_value) - - with self.subTest("OptionTypeDictWithArgs"): - kwargs: Dict[str, Any] = { - "backend-config": { - "region": "fake_region", - "access_key": "fake_access_key", - } - } - args = ("test_test",) - expected_value = [ - "terraform", - "apply", - "-backend-config", - "region=fake_region", - "-backend-config", - "access_key=fake_access_key", - "test_test", - ] - return_value = self.terraform_deployment_utils.get_command_list( - command, *args, **kwargs - ) - self.assertEqual(expected_value, return_value) - - def test_add_dict_options(self) -> None: - pass - - def test_add_list_options(self) -> None: - pass - - def test_add_bool_options(self) -> None: - pass - - def test_add_flagged_option(self) -> None: - pass - - def test_add_other_options(self) -> None: - pass