diff --git a/fbpcs/infra/pce_deployment_library/publisher_pce/deploy.py b/fbpcs/infra/pce_deployment_library/publisher_pce/deploy.py deleted file mode 100644 index 25e8a75ff..000000000 --- a/fbpcs/infra/pce_deployment_library/publisher_pce/deploy.py +++ /dev/null @@ -1,171 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# -# This source code is licensed under the MIT license found in the -# LICENSE file in the root directory of this source tree. - -import logging - -from typing import Any, Dict - -from fbpcs.infra.pce_deployment_library.cloud_library.aws.aws import AWS -from fbpcs.infra.pce_deployment_library.deploy_library.models import ( - FlaggedOption, - TerraformCommand, -) -from fbpcs.infra.pce_deployment_library.deploy_library.terraform_library.terraform_deployment import ( - TerraformDeployment, -) -from fbpcs.infra.pce_deployment_library.publisher_pce.publisher_pce_defaults import ( - TerraformDefaults, -) -from fbpcs.infra.pce_deployment_library.publisher_pce.publisher_pce_utils import ( - PublisherPceUtils, -) - - -class Deploy: - """ - Class to store the arguments used for deployment and undeployment - """ - - def __init__( - self, - s3_bucket_name: str = None, - s3_bucket_region: str = None, - account_id: str = None, - partner_account_id: str = None, - aws_region: str = None, - tag: str = None, - vpc_cidr: str = None, - partner_vpc_cidr: str = None, - vpc_logging_enabled: bool = False, - vpc_log_bucket_arn: str = None, - ): - self.s3_bucket_name = s3_bucket_name - self.s3_bucket_region = s3_bucket_region - self.account_id = account_id - self.partner_account_id = partner_account_id - self.aws_region = aws_region - self.tag = tag - self.vpc_cidr = vpc_cidr - self.partner_vpc_cidr = partner_vpc_cidr - self.vpc_logging_enabled = vpc_logging_enabled - self.vpc_log_bucket_arn = vpc_log_bucket_arn - - self.tag_postfix = f"-{self.tag}" - - self.log: logging.Logger = logging.getLogger(__name__) - - self.aws = AWS(aws_region=self.aws_region) - self.terraform = TerraformDeployment() - self.publishe_pce_utils = PublisherPceUtils() - - def deploy_pce(self, bucket_version: bool = True) -> None: - self.log.info( - "########################Started AWS Infrastructure Deployment########################" - ) - self.log.info("Creating S3 bucket...") - self.aws.check_s3_buckets_exists( - s3_bucket_name=self.s3_bucket_name, bucket_version=bucket_version - ) - self.terraform.working_directory = TerraformDefaults.PCE_TERRAFORM_FILE_LOCATION - - self.log.info( - f"Changing terraform working directory to {self.terraform.working_directory}" - ) - backend_config = self._get_init_backend_config() - - self.log.info("Running terraform init...") - terraform_init_log = self.terraform.terraform_init( - backend_config=backend_config, reconfigure=FlaggedOption - ) - self.log.info( - f"Terraform init output is: {self.publishe_pce_utils.parse_command_output(TerraformCommand.INIT, terraform_init_log)}" - ) - - var_dict = self._get_var() - if self.vpc_logging_enabled: - opt_params = self._get_vpc_var() - var_dict.update(opt_params) - - self.log.debug(f"Running terraform apply with vars: {var_dict}") - terraform_create_log = self.terraform.create(var=var_dict) - self.log.info( - f"Terraform apply output is: {self.publishe_pce_utils.parse_command_output(TerraformCommand.APPLY, terraform_create_log)}" - ) - - self.log.info( - "######################## PCE terraform output ########################" - ) - terraform_output_result = self.terraform.terraform_output() - self.log.info( - f"Terraform output is: {self.publishe_pce_utils.parse_command_output(TerraformCommand.OUTPUT, terraform_output_result)}" - ) - - self.log.info( - "########################Finished AWS Infrastructure Deployment########################" - ) - - def undeploy_pce(self) -> None: - self.log.info("Start undeploying...") - self.log.info( - "########################Check tfstate files########################" - ) - terraform_state_file = f"tfstate/pce{self.tag_postfix}.tfstate" - self.aws.check_s3_object_exists( - s3_bucket_name=self.s3_bucket_name, key_name=terraform_state_file - ) - self.log.info("All tfstate files exist. Continue...") - - self.terraform.working_directory = TerraformDefaults.PCE_TERRAFORM_FILE_LOCATION - - backend_config = self._get_init_backend_config() - - self.log.info( - "########################Delete PCE resources########################" - ) - self.log.info("Running terraform init...") - terraform_init_log = self.terraform.terraform_init( - backend_config=backend_config, reconfigure=FlaggedOption - ) - self.log.info( - f"Terraform init output is: {self.publishe_pce_utils.parse_command_output(TerraformCommand.INIT, terraform_init_log)}" - ) - - var_dict = self._get_var() - terraform_destroy_log = self.terraform.destroy(var=var_dict) - self.log.info( - f"Terraform init output is: {self.publishe_pce_utils.parse_command_output(TerraformCommand.DESTROY, terraform_destroy_log)}" - ) - - def _get_init_backend_config(self) -> Dict[str, Any]: - return { - "bucket": self.s3_bucket_name, - "region": self.aws_region, - "key": f"tfstate/pce{self.tag_postfix}.tfstate", - } - - def _get_var(self, destroy: bool = False) -> Dict[str, Any]: - return_dict = { - "aws_region": self.aws_region, - "tag_postfix": self.tag_postfix, - "pce_id": self.tag, - } - - if not destroy: - return_dict.update( - { - "vpc_cidr": self.vpc_cidr, - "otherparty_vpc_cidr": self.partner_vpc_cidr, - } - ) - - return return_dict - - def _get_vpc_var(self) -> Dict[str, Any]: - return { - "vpc_logging": { - "enabled": self.vpc_logging_enabled, - "bucket_arn": self.vpc_log_bucket_arn, - } - } diff --git a/fbpcs/infra/pce_deployment_library/publisher_pce/publisher_pce_defaults.py b/fbpcs/infra/pce_deployment_library/publisher_pce/publisher_pce_defaults.py deleted file mode 100644 index 44f912110..000000000 --- a/fbpcs/infra/pce_deployment_library/publisher_pce/publisher_pce_defaults.py +++ /dev/null @@ -1,16 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# -# This source code is licensed under the MIT license found in the -# LICENSE file in the root directory of this source tree. - -import os -from enum import Enum - -DIR = os.path.dirname -PCE_TERRAFORM_FILES = "pce/aws_terraform_template/common/pce" - - -class TerraformDefaults(str, Enum): - PCE_TERRAFORM_FILE_LOCATION = os.path.join( - DIR(DIR(DIR(os.path.realpath(__file__)))), PCE_TERRAFORM_FILES - ) diff --git a/fbpcs/infra/pce_deployment_library/publisher_pce/publisher_pce_utils.py b/fbpcs/infra/pce_deployment_library/publisher_pce/publisher_pce_utils.py deleted file mode 100644 index 61b3bd537..000000000 --- a/fbpcs/infra/pce_deployment_library/publisher_pce/publisher_pce_utils.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# -# This source code is licensed under the MIT license found in the -# LICENSE file in the root directory of this source tree. - -import logging -import re - -from fbpcs.infra.pce_deployment_library.errors_library.terraform_errors import ( - TerraformCommandExectionError, -) - - -class PublisherPceUtils: - def __init__(self): - self.log: logging.Logger = logging.getLogger(__name__) - # set logger - self.log.setLevel(logging.DEBUG) - # create file handler which logs even debug messages - fh = logging.FileHandler("test.log") - fh.setLevel(logging.DEBUG) - self.log.addHandler(fh) - - def parse_command_output(self, command, command_result) -> str: - ret_code = command_result.return_code - if ret_code != 0: - self.log.error(f"Failed to run terraform {command}") - error = f"Command terraform {command} execution failed with error, {command_result.error}" - self.log.error(f"{error}") - raise TerraformCommandExectionError(f"{error}") - - return self.sanitize_command_output_logs(command_result.output) - - def sanitize_command_output_logs(self, command_output_log: str) -> str: - if not command_output_log: - return "" - - ansi_escape = re.compile( - r""" - \x1B # ESC - (?: # 7-bit C1 Fe (except CSI) - [@-Z\\-_] - | # or [ for CSI, followed by a control sequence - \[ - [0-?]* # Parameter bytes - [ -/]* # Intermediate bytes - [@-~] # Final byte - ) - """, - re.VERBOSE, - ) - return ansi_escape.sub("", command_output_log) diff --git a/fbpcs/infra/pce_deployment_library/test/test_aws.py b/fbpcs/infra/pce_deployment_library/test/test_aws.py deleted file mode 100644 index 3777491f6..000000000 --- a/fbpcs/infra/pce_deployment_library/test/test_aws.py +++ /dev/null @@ -1,279 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# -# This source code is licensed under the MIT license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -import unittest -from unittest.mock import create_autospec - -from botocore.exceptions import ClientError - -from fbpcs.infra.pce_deployment_library.cloud_library.aws.aws import AWS -from fbpcs.infra.pce_deployment_library.errors_library.aws_errors import ( - AccessDeniedError, - S3BucketCreationError, - S3BucketDeleteError, - S3BucketDoesntExist, - S3BucketVersioningFailedError, -) - - -class TestAws(unittest.TestCase): - def setUp(self) -> None: - self.aws = AWS() - self.aws.sts.get_caller_identity = create_autospec( - self.aws.sts.get_caller_identity - ) - - def test_check_s3_buckets_exists(self) -> None: - s3_bucket_name = "fake_bucket" - self.aws.s3_client.head_bucket = create_autospec(self.aws.s3_client.head_bucket) - - with self.subTest("basic"): - with self.assertLogs() as captured: - self.aws.check_s3_buckets_exists( - s3_bucket_name=s3_bucket_name, bucket_version=False - ) - self.assertEqual(len(captured.records), 2) - self.assertEqual( - captured.records[1].getMessage(), - f"S3 bucket {s3_bucket_name} already exists in the AWS account.", - ) - - with self.subTest("BucketNotFound"): - self.aws.s3_client.create_bucket = create_autospec( - self.aws.s3_client.create_bucket - ) - self.aws.s3_client.put_bucket_versioning = create_autospec( - self.aws.s3_client.put_bucket_versioning - ) - self.aws.s3_client.head_bucket.side_effect = ClientError( - error_response={"Error": {"Code": "404"}}, - operation_name="head_bucket", - ) - with self.assertLogs() as captured: - self.aws.check_s3_buckets_exists( - s3_bucket_name=s3_bucket_name, bucket_version=False - ) - self.assertEqual(len(captured.records), 4) - self.assertEqual( - captured.records[2].getMessage(), - f"Creating new S3 bucket {s3_bucket_name}", - ) - self.assertEqual( - captured.records[3].getMessage(), - f"Create S3 bucket {s3_bucket_name} operation was successful.", - ) - - with self.subTest("AccessDenied"): - self.aws.s3_client.head_bucket.side_effect = ClientError( - error_response={"Error": {"Code": "403"}}, - operation_name="head_bucket", - ) - with self.assertRaisesRegex(AccessDeniedError, "Access denied*"): - self.aws.check_s3_buckets_exists( - s3_bucket_name=s3_bucket_name, bucket_version=False - ) - - with self.subTest("CatchAllError"): - self.aws.s3_client.head_bucket.side_effect = ClientError( - error_response={"Error": {"Code": None}}, - operation_name="head_bucket", - ) - with self.assertRaisesRegex( - S3BucketCreationError, "Couldn't create bucket*" - ): - self.aws.check_s3_buckets_exists( - s3_bucket_name=s3_bucket_name, bucket_version=False - ) - - def test_create_s3_bucket(self) -> None: - self.aws.s3_client.create_bucket = create_autospec( - self.aws.s3_client.create_bucket - ) - s3_bucket_name = "fake_bucket" - - with self.subTest("Basic"): - with self.assertLogs() as captured: - self.aws.create_s3_bucket( - s3_bucket_name=s3_bucket_name, bucket_version=False - ) - self.assertEqual(len(captured.records), 2) - self.assertEqual( - captured.records[1].getMessage(), - f"Create S3 bucket {s3_bucket_name} operation was successful.", - ) - - with self.subTest("CreateBucketException"): - self.aws.s3_client.create_bucket.side_effect = ClientError( - error_response={"Error": {"Code": None}}, - operation_name="create_bucket", - ) - - with self.assertRaisesRegex( - S3BucketCreationError, "Failed to create S3 bucket*" - ): - self.aws.create_s3_bucket( - s3_bucket_name=s3_bucket_name, bucket_version=False - ) - - def test_update_bucket_versioning(self) -> None: - s3_bucket_name = "fake_bucket" - self.aws.s3_client.put_bucket_versioning = create_autospec( - self.aws.s3_client.put_bucket_versioning - ) - - with self.subTest("Basic"): - with self.assertLogs() as captured: - self.aws.update_bucket_versioning(s3_bucket_name=s3_bucket_name) - self.assertEqual(len(captured.records), 2) - self.assertEqual( - captured.records[1].getMessage(), - f"Bucket {s3_bucket_name} is enabled with versioning.", - ) - - with self.subTest("S3BucketDoesntExist"): - self.aws.s3_client.put_bucket_versioning.side_effect = ClientError( - error_response={"Error": {"Code": "404"}}, - operation_name="put_bucket_versioning", - ) - self.assertRaises( - S3BucketDoesntExist, - lambda: self.aws.update_bucket_versioning( - s3_bucket_name=s3_bucket_name - ), - ) - - with self.subTest("AccessDeniedException"): - self.aws.s3_client.put_bucket_versioning.side_effect = ClientError( - error_response={"Error": {"Code": "403"}}, - operation_name="put_bucket_versioning", - ) - self.assertRaises( - AccessDeniedError, - lambda: self.aws.update_bucket_versioning( - s3_bucket_name=s3_bucket_name - ), - ) - - with self.subTest("BucketVersioningFailed"): - self.aws.s3_client.put_bucket_versioning.side_effect = ClientError( - error_response={"Error": {"Code": None}}, - operation_name="put_bucket_versioning", - ) - self.assertRaises( - S3BucketVersioningFailedError, - lambda: self.aws.update_bucket_versioning( - s3_bucket_name=s3_bucket_name - ), - ) - with self.assertRaisesRegex( - S3BucketVersioningFailedError, "Error in versioning S3 bucket*" - ): - self.aws.update_bucket_versioning(s3_bucket_name=s3_bucket_name) - - def test_delete_s3_bucket(self) -> None: - s3_bucket_name = "fake_bucket" - self.aws.s3_client.delete_bucket = create_autospec( - self.aws.s3_client.delete_bucket - ) - - with self.subTest("Basic"): - with self.assertLogs() as captured: - self.aws.delete_s3_bucket(s3_bucket_name=s3_bucket_name) - self.assertEqual(len(captured.records), 2) - self.assertEqual( - captured.records[1].getMessage(), - f"Delete S3 bucket {s3_bucket_name} operation was successful.", - ) - - with self.subTest("BucketDeleteFailed"): - self.aws.s3_client.delete_bucket.side_effect = ClientError( - error_response={"Error": {"Code": None}}, - operation_name="delete_bucket", - ) - self.assertRaises( - S3BucketDeleteError, - lambda: self.aws.delete_s3_bucket(s3_bucket_name=s3_bucket_name), - ) - - def test_check_s3_object_exists(self) -> None: - s3_bucket_name = "fake_bucket" - key_name = "fake_file" - account_id = "123456789" - - with self.subTest("basic"): - self.aws.s3_client.head_object = create_autospec( - self.aws.s3_client.head_object - ) - self.aws.s3_client.head_object.return_value = {"ContentLength": 100} - self.assertTrue( - self.aws.check_s3_object_exists( - s3_bucket_name=s3_bucket_name, - key_name=key_name, - account_id=account_id, - ) - ) - - with self.subTest("KeyNotExistError"): - self.aws.s3_client.head_object.side_effect = ClientError( - error_response={"Error": {"Code": "404"}}, - operation_name="head_object", - ) - - with self.assertLogs() as captured: - self.assertFalse( - self.aws.check_s3_object_exists( - s3_bucket_name=s3_bucket_name, - key_name=key_name, - account_id=account_id, - ) - ) - self.assertEqual(len(captured.records), 3) - self.assertEqual( - captured.records[1].getMessage(), - f"Couldn't find file {key_name} in bucket {s3_bucket_name}", - ) - - with self.subTest("AccessDeniedError"): - self.aws.s3_client.head_object.side_effect = ClientError( - error_response={"Error": {"Code": "403"}}, - operation_name="head_object", - ) - - with self.assertLogs() as captured: - self.assertFalse( - self.aws.check_s3_object_exists( - s3_bucket_name=s3_bucket_name, - key_name=key_name, - account_id=account_id, - ) - ) - self.assertEqual(len(captured.records), 3) - self.assertEqual( - captured.records[1].getMessage(), - f"Access denied: failed to access bucket {s3_bucket_name}", - ) - - with self.subTest("CatchAllError"): - self.aws.s3_client.head_object.side_effect = ClientError( - error_response={"Error": {"Code": None}}, - operation_name="head_object", - ) - - with self.assertLogs() as captured: - self.assertFalse( - self.aws.check_s3_object_exists( - s3_bucket_name=s3_bucket_name, - key_name=key_name, - account_id=account_id, - ) - ) - self.assertEqual(len(captured.records), 3) - self.assertEqual( - captured.records[1].getMessage(), - f"Failed to find file {key_name} in bucket {s3_bucket_name}", - ) diff --git a/fbpcs/infra/pce_deployment_library/test/test_cloud_factory.py b/fbpcs/infra/pce_deployment_library/test/test_cloud_factory.py deleted file mode 100644 index 57d312969..000000000 --- a/fbpcs/infra/pce_deployment_library/test/test_cloud_factory.py +++ /dev/null @@ -1,39 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# -# This source code is licensed under the MIT license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict -import unittest - -from fbpcs.infra.pce_deployment_library.cloud_library.cloud_factory import CloudFactory -from fbpcs.infra.pce_deployment_library.cloud_library.defaults import CloudPlatforms - - -class TestCloudFactory(unittest.TestCase): - def setUp(self) -> None: - self.test_cloud_factory = CloudFactory() - - def test_supported_cloud_platforms(self) -> None: - expected = CloudPlatforms.list() - - self.assertEqual( - expected, self.test_cloud_factory.get_supported_cloud_platforms() - ) - - def test_create_cloud_object_aws(self) -> None: - expected = CloudPlatforms.AWS - - cloud_object = self.test_cloud_factory.create_cloud_object( - cloud_type=CloudPlatforms.AWS - ) - self.assertEqual(expected, cloud_object.cloud_type()) - - def test_create_cloud_object_gcp(self) -> None: - expected = CloudPlatforms.GCP - - cloud_object = self.test_cloud_factory.create_cloud_object( - cloud_type=CloudPlatforms.GCP - ) - - self.assertEqual(expected, cloud_object.cloud_type()) diff --git a/fbpcs/infra/pce_deployment_library/test/test_terraform_deployment.py b/fbpcs/infra/pce_deployment_library/test/test_terraform_deployment.py deleted file mode 100644 index e6b545c50..000000000 --- a/fbpcs/infra/pce_deployment_library/test/test_terraform_deployment.py +++ /dev/null @@ -1,160 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) Meta Platforms, Inc. and affiliates. -# -# This source code is licensed under the MIT license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -import unittest -from subprocess import PIPE, Popen -from typing import Any, Dict, Type - -from fbpcs.infra.pce_deployment_library.deploy_library.models import ( - FlaggedOption, - NotFlaggedOption, - RunCommandResult, - TerraformOptionFlag, -) - -from fbpcs.infra.pce_deployment_library.deploy_library.terraform_library.terraform_deployment import ( - TerraformDeployment, -) - - -class TestTerraformDeployment(unittest.TestCase): - def setUp(self) -> None: - self.terraform_deployment = TerraformDeployment() - - def test_run_command(self) -> None: - with self.subTest("basicCaptureTrue"): - command = "echo Hello World!\n" - capture_output = True - test_obj = Popen(["echo", "Hello World!"], stdout=PIPE) - test_stdout, test_error = test_obj.communicate() - test_return_code = test_obj.returncode - test_command_return = RunCommandResult( - return_code=test_return_code, - output=test_stdout.decode("utf-8"), - # pyre-fixme[6]: For 3rd param expected `Optional[str]` but got - # `Union[bytes, str]`. - error=test_error if test_error else "", - ) - - func_ret = self.terraform_deployment.run_command(command=command) - self.assertEqual(test_command_return.return_code, func_ret.return_code) - self.assertEqual(test_command_return.output, func_ret.output) - self.assertEqual(test_command_return.error, func_ret.error) - - with self.subTest("basicCaptureFalse"): - command = "echo Hello World!\n" - capture_output = False - - test_obj = Popen(["echo", "Hello World!"]) - test_stdout, test_error = test_obj.communicate() - test_return_code = test_obj.returncode - test_command_return = RunCommandResult( - return_code=test_return_code, - # pyre-fixme[6]: For 2nd param expected `Optional[str]` but got `bytes`. - output=test_stdout, - # pyre-fixme[6]: For 3rd param expected `Optional[str]` but got `bytes`. - error=test_stdout, - ) - kwargs: Dict[str, Any] = {"capture_output": capture_output} - func_ret = self.terraform_deployment.run_command(command=command, **kwargs) - self.assertEqual(test_command_return.return_code, func_ret.return_code) - self.assertEqual(test_command_return.output, func_ret.output) - self.assertEqual(test_command_return.error, func_ret.error) - - with self.subTest("TestStdErrWithCaptureOutput"): - command = "cp" - capture_output = True - - func_ret = self.terraform_deployment.run_command(command=command) - test_command_return = RunCommandResult( - return_code=1, - output="", - error="cp: missing file operand\nTry 'cp --help' for more information.\n", - ) - self.assertEqual(test_command_return.return_code, func_ret.return_code) - self.assertEqual(test_command_return.output, func_ret.output) - self.assertEqual(test_command_return.error, func_ret.error) - - with self.subTest("TestStdErrWithoutCaptureOutput"): - command = "cp" - capture_output = False - kwargs: Dict[str, Any] = {"capture_output": capture_output} - - func_ret = self.terraform_deployment.run_command(command=command, **kwargs) - test_command_return = RunCommandResult( - return_code=1, - output=None, - error=None, - ) - self.assertEqual(test_command_return.return_code, func_ret.return_code) - self.assertEqual(test_command_return.output, func_ret.output) - self.assertEqual(test_command_return.error, func_ret.error) - - def test_terraform_init(self) -> None: - kwargs: Dict[str, Any] = {"dry_run": True} - with self.subTest("BackendConig"): - backend_config = { - "region": "fake_region", - "access_key": "fake_access_key", - } - expected_command = "terraform init -input=false -dry-run=true -backend-config region=fake_region -backend-config access_key=fake_access_key -reconfigure" - expected_value = RunCommandResult( - return_code=0, output=f"Dry run command: {expected_command}", error="" - ) - return_value = self.terraform_deployment.terraform_init( - backend_config=backend_config, **kwargs - ) - self.assertEqual(expected_value, return_value) - - with self.subTest("BackendConigWhiteSpaces"): - backend_config = { - "region": "fake_region ", - "access_key": "fake_access_key ", - } - expected_command = "terraform init -input=false -dry-run=true -backend-config region=fake_region -backend-config access_key=fake_access_key -reconfigure" - expected_value = RunCommandResult( - return_code=0, output=f"Dry run command: {expected_command}", error="" - ) - return_value = self.terraform_deployment.terraform_init( - backend_config=backend_config, **kwargs - ) - self.assertEqual(expected_value, return_value) - - with self.subTest("UnsetReconfigureNoBackendConfig"): - expected_command = "terraform init -input=false -dry-run=true" - expected_value = RunCommandResult( - return_code=0, output=f"Dry run command: {expected_command}", error="" - ) - reconfigure: Type[TerraformOptionFlag] = NotFlaggedOption - return_value = self.terraform_deployment.terraform_init( - reconfigure=reconfigure, **kwargs - ) - self.assertEqual(expected_value, return_value) - - with self.subTest("SetReconfigure"): - expected_command = "terraform init -input=false -dry-run=true -reconfigure" - expected_value = RunCommandResult( - return_code=0, output=f"Dry run command: {expected_command}", error="" - ) - reconfigure = FlaggedOption - return_value = self.terraform_deployment.terraform_init( - reconfigure=reconfigure, **kwargs - ) - self.assertEqual(expected_value, return_value) - - def test_create(self) -> None: - # T126572515 - pass - - def test_destory(self) -> None: - # T126573127 - pass - - def test_plan(self) -> None: - # T126574725 - pass diff --git a/fbpcs/infra/pce_deployment_library/test/test_terraform_deployment_utils.py b/fbpcs/infra/pce_deployment_library/test/test_terraform_deployment_utils.py deleted file mode 100644 index e097ace62..000000000 --- a/fbpcs/infra/pce_deployment_library/test/test_terraform_deployment_utils.py +++ /dev/null @@ -1,112 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# -# This source code is licensed under the MIT license found in the -# LICENSE file in the root directory of this source tree. - -# pyre-strict - -import unittest -from typing import Any, Dict - -from fbpcs.infra.pce_deployment_library.deploy_library.models import FlaggedOption - -from fbpcs.infra.pce_deployment_library.deploy_library.terraform_library.terraform_deployment_utils import ( - TerraformDeploymentUtils, -) - - -class TestTerraformDeploymentUtils(unittest.TestCase): - def setUp(self) -> None: - self.terraform_deployment_utils = TerraformDeploymentUtils() - - def test_get_default_options(self) -> None: - # T125643751 - pass - - def test_get_command_list(self) -> None: - command: str = "terraform apply" - with self.subTest("OptionTypeDict"): - kwargs: Dict[str, Any] = { - "backend-config": { - "region": "fake_region", - "access_key": "fake_access_key", - } - } - expected_value = [ - "terraform", - "apply", - "-backend-config", - "region=fake_region", - "-backend-config", - "access_key=fake_access_key", - ] - return_value = self.terraform_deployment_utils.get_command_list( - command, **kwargs - ) - self.assertEqual(expected_value, return_value) - - with self.subTest("OptionTypeList"): - kwargs: Dict[str, Any] = {"target": ["fake_region", "fake_access_key"]} - expected_value = [ - "terraform", - "apply", - '-target="fake_region"', - '-target="fake_access_key"', - ] - return_value = self.terraform_deployment_utils.get_command_list( - command, **kwargs - ) - self.assertEqual(expected_value, return_value) - - with self.subTest("OptionTypeBool"): - kwargs: Dict[str, Any] = {"input": False} - expected_value = ["terraform", "apply", "-input=false"] - return_value = self.terraform_deployment_utils.get_command_list( - command, **kwargs - ) - self.assertEqual(expected_value, return_value) - - with self.subTest("OptionTypeFlaggedOption"): - kwargs: Dict[str, Any] = {"reconfigure": FlaggedOption} - expected_value = ["terraform", "apply", "-reconfigure"] - return_value = self.terraform_deployment_utils.get_command_list( - command, **kwargs - ) - self.assertEqual(expected_value, return_value) - - with self.subTest("OptionTypeDictWithArgs"): - kwargs: Dict[str, Any] = { - "backend-config": { - "region": "fake_region", - "access_key": "fake_access_key", - } - } - args = ("test_test",) - expected_value = [ - "terraform", - "apply", - "-backend-config", - "region=fake_region", - "-backend-config", - "access_key=fake_access_key", - "test_test", - ] - return_value = self.terraform_deployment_utils.get_command_list( - command, *args, **kwargs - ) - self.assertEqual(expected_value, return_value) - - def test_add_dict_options(self) -> None: - pass - - def test_add_list_options(self) -> None: - pass - - def test_add_bool_options(self) -> None: - pass - - def test_add_flagged_option(self) -> None: - pass - - def test_add_other_options(self) -> None: - pass