Skip to content
This repository has been archived by the owner on Sep 20, 2023. It is now read-only.

Get kms key from env #4

Merged
merged 6 commits into from
Dec 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions kms_ext/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import boto3
import dotenv
import os
import structlog
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
Expand All @@ -18,7 +19,6 @@

log = structlog.get_logger()


class KMSCrypto:
"""Encrypt plaintext using KMS-compliant RSA algorithm."""

Expand Down Expand Up @@ -67,15 +67,13 @@ def invoke(self, *args: Any | None, **kwargs: Any) -> None:

def encrypt(
self,
kms_key_id: str,
public_key_path: Path,
dotenv_path: Path = Path(".env"),
output_path: Path = Path("secrets.yml"),
) -> Path:
"""Encrypt a given dotenv file with a given RSA public key (PEM file).

Args:
kms_key_id: The KMS key ID for the keypair being used.
public_key_path: Path to RSA public key (PEM file).
dotenv_path: Path to dotenv file (defaults to '.env')
output_path: Path to output file (defaults to 'secrets.yml')
Expand All @@ -94,7 +92,7 @@ def encrypt(
)
env_vars.append(EnvVar(name=key, value=secret))

secrets = SecretsFile(kms_key_id=kms_key_id, env=env_vars)
secrets = SecretsFile(env=env_vars)

with open(output_path, "w") as secrets_file:
secrets_file.write(secrets.yaml())
Expand All @@ -108,14 +106,19 @@ def decrypt(
) -> Path:
client = boto3.client("kms")

try:
kms_key_id = os.environ["KMS_KEY_ID"]
except KeyError as ex:
raise Exception("The environment variable $KMS_KEY_ID must be set to decrypt") from ex

with open(input_path) as ciphertext_file:
secrets = SecretsFile.parse_raw(ciphertext_file.read())

for env_var in secrets.env:
ciphertext = base64.b64decode(env_var.value.ciphertext)
response = client.decrypt(
CiphertextBlob=ciphertext,
KeyId=secrets.kms_key_id,
KeyId=kms_key_id,
EncryptionAlgorithm=env_var.value.scheme, # "RSAES_OAEP_SHA_256" | "SYMMETRIC_DEFAULT" | "RSAES_OAEP_SHA_1" | "SM2PKE"
)
plaintext = response["Plaintext"].decode("utf-8")
Expand Down
2 changes: 0 additions & 2 deletions kms_ext/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,12 @@ def describe(

@app.command()
def encrypt(
kms_key_id: str,
public_key_path: Path,
dotenv_path: Optional[Path] = typer.Option(Path(".env")),
output_path: Optional[Path] = typer.Option(Path("secrets.yml")),
) -> None:
"""Encrypt a given dotenv file with a given RSA Public Key (PEM file)."""
ext.encrypt(
kms_key_id=kms_key_id,
public_key_path=public_key_path,
dotenv_path=dotenv_path,
output_path=output_path,
Expand Down
1 change: 0 additions & 1 deletion kms_ext/secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,4 @@ class EnvVar(BaseModel):


class SecretsFile(YamlModel):
kms_key_id: str
env: List[EnvVar]