Skip to content

Commit

Permalink
Start adding S3 support for VRTStack (#245)
Browse files Browse the repository at this point in the history
* Start adding S3 support for `VRTStack`

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* add tests, gdal and rasterio open

* add boto3 to test reqs

* move `GeneralPath` to `_types` to modify `Filename/PathOrStr`

* remove glob until it is necessary

* add pydantic fixes to allow `GeneralPath`

* fix boto mocking

* docstring typo

* typo, add resolve

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
scottstanie and pre-commit-ci[bot] authored May 29, 2024
1 parent eea1de4 commit e4e9369
Show file tree
Hide file tree
Showing 8 changed files with 468 additions and 10 deletions.
43 changes: 36 additions & 7 deletions src/dolphin/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@
import sys
from enum import Enum
from os import PathLike
from typing import TYPE_CHECKING, NamedTuple, TypeVar, Union
from typing import (
TYPE_CHECKING,
NamedTuple,
Protocol,
TypeVar,
Union,
runtime_checkable,
)

if sys.version_info >= (3, 10):
from typing import ParamSpec
Expand All @@ -22,12 +29,6 @@
PathLikeStr = PathLike


PathOrStr = Union[str, PathLikeStr]
Filename = PathOrStr # May add a deprecation notice for `Filename`
# TypeVar added for generic functions which should return the same type as the input
PathLikeT = TypeVar("PathLikeT", str, PathLikeStr)


class Bbox(NamedTuple):
"""Bounding box named tuple, defining extent in cartesian coordinates.
Expand Down Expand Up @@ -104,3 +105,31 @@ class TropoType(str, Enum):
"""Hydrostatic (same as dry, named differently in raider)"""
COMB = "comb"
"""Combined wet + dry delay."""


@runtime_checkable
class GeneralPath(Protocol):
"""A protocol to handle paths that can be either local or S3 paths."""

def parent(self): ...

def suffix(self): ...

def resolve(self): ...

def exists(self): ...

def read_text(self): ...

def __truediv__(self, other): ...

def __str__(self) -> str: ...

def __fspath__(self) -> str:
return str(self)


PathOrStr = Union[str, PathLikeStr, GeneralPath]
Filename = PathOrStr # May add a deprecation notice for `Filename`
# TypeVar added for generic functions which should return the same type as the input
PathLikeT = TypeVar("PathLikeT", str, PathLikeStr, GeneralPath)
1 change: 1 addition & 0 deletions src/dolphin/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from ._background import *
from ._blocks import *
from ._core import *
from ._paths import *
from ._process import *
from ._readers import *
from ._writers import *
197 changes: 197 additions & 0 deletions src/dolphin/io/_paths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
from __future__ import annotations

import copy
import logging
import re
from pathlib import Path
from typing import Union
from urllib.parse import ParseResult, urlparse

from dolphin._types import GeneralPath

__all__ = ["S3Path"]


logger = logging.getLogger(__name__)


class S3Path(GeneralPath):
"""A convenience class to handle paths on S3.
This class relies on `pathlib.Path` for operations using `urllib` to parse the url.
If passing a url with a trailing slash, the slash will be preserved
when converting back to string.
Note that pure path manipulation functions do *not* require `boto3`,
but functions which interact with S3 (e.g. `exists()`, `.read_text()`) do.
Attributes
----------
bucket : str
Name of bucket in the url
path : pathlib.Path
The URL path after s3://<bucket>/
key : str
Alias of `path` converted to a string
Examples
--------
>>> from dolphin.io import S3Path
>>> s3_path = S3Path("s3://bucket/path/to/file.txt")
>>> str(s3_path)
's3://bucket/path/to/file.txt'
>>> s3_path.parent
S3Path("s3://bucket/path/to/")
>>> str(s3_path.parent)
's3://bucket/path/to/'
"""

def __init__(self, s3_url: Union[str, "S3Path"], unsigned: bool = False):
"""Create an S3Path.
Parameters
----------
s3_url : str or S3Path
The S3 url to parse.
unsigned : bool, optional
If True, disable signing requests to S3.
"""
# Names come from the urllib.parse.ParseResult
if isinstance(s3_url, S3Path):
self._scheme: str = s3_url._scheme
self._netloc: str = s3_url._netloc
self.bucket: str = s3_url.bucket
self.path: Path = s3_url.path
self._trailing_slash: str = s3_url._trailing_slash
else:
parsed: ParseResult = urlparse(s3_url)
self._scheme = parsed.scheme
self._netloc = self.bucket = parsed.netloc
self._parsed = parsed
self.path = Path(parsed.path)
self._trailing_slash = "/" if s3_url.endswith("/") else ""

if self._scheme != "s3":
raise ValueError(f"{s3_url} is not an S3 url")

self._unsigned = unsigned

@classmethod
def from_bucket_key(cls, bucket: str, key: str):
"""Create a `S3Path` from the bucket name and key/prefix.
Matches API of some Boto3 functions which use this format.
Parameters
----------
bucket : str
Name of S3 bucket.
key : str
S3 url of path after the bucket.
"""
return cls(f"s3://{bucket}/{key}")

def get_path(self):
# For S3 paths, we need to add the double slash and netloc back to the front
return f"{self._scheme}://{self._netloc}{self.path.as_posix()}{self._trailing_slash}"

@property
def key(self) -> str:
"""Name of key/prefix within the bucket with leading slash removed."""
return f"{str(self.path.as_posix()).lstrip('/')}{self._trailing_slash}"

@property
def parent(self):
parent_path = self.path.parent
# Since this is a parent, it will will always end in a slash
if self._scheme == "s3":
# For S3 paths, we need to add the scheme and netloc back to the front
return S3Path(f"{self._scheme}://{self._netloc}{parent_path.as_posix()}/")
else:
# For local paths, we can just convert the path to a string
return S3Path(str(parent_path) + "/")

@property
def suffix(self):
return self.path.suffix

def resolve(self) -> S3Path:
"""Resolve the path to an absolute path- S3 paths are always absolute."""
return self

def _get_client(self):
import boto3
from botocore import UNSIGNED
from botocore.config import Config

if self._unsigned:
return boto3.client("s3", config=Config(signature_version=UNSIGNED))
else:
return boto3.client("s3")

def exists(self) -> bool:
"""Whether this path exists on S3."""
client = self._get_client()
resp = client.list_objects_v2(
Bucket=self.bucket,
Prefix=self.key,
MaxKeys=1,
)
return resp.get("KeyCount") == 1

def read_text(self) -> str:
"""Download/read the S3 file as text."""
return self._download_as_bytes().decode()

def read_bytes(self) -> bytes:
"""Download/read the S3 file as bytes."""
return self._download_as_bytes()

def _download_as_bytes(self) -> bytes:
"""Download file to a `BytesIO` buffer to read as bytes."""
from io import BytesIO

client = self._get_client()

bio = BytesIO()
client.download_fileobj(self.bucket, self.key, bio)
bio.seek(0)
out = bio.read()
bio.close()
return out

def __truediv__(self, other):
new = copy.deepcopy(self)
new.path = self.path / other
new._trailing_slash = "/" if str(other).endswith("/") else ""
return new

def __eq__(self, other):
if isinstance(other, S3Path):
return self.get_path() == other.get_path()
elif isinstance(other, str):
return self.get_path() == other
else:
return False

def __repr__(self):
return f'S3Path("{self.get_path()}")'

def __str__(self):
return self.get_path()

def to_gdal(self):
"""Convert this S3Path to a GDAL URL."""
return f"/vsis3/{self.bucket}/{self.key}"


def fix_s3_url(url):
"""Fix an S3 URL that has been altered by pathlib.
Will replace s3:/my-bucket/... with s3://my-bucket/...
"""
return re.sub(r"s3:/((?!/).*)", r"s3://\1", str(url))
16 changes: 13 additions & 3 deletions src/dolphin/io/_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from dolphin.io._blocks import iter_blocks

from ._background import _DEFAULT_TIMEOUT, BackgroundReader
from ._paths import S3Path
from ._utils import _ensure_slices, _unpack_3d_slices

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -715,7 +716,10 @@ def __init__(

# files: list[Filename] = [Path(f) for f in file_list]
self._use_abs_path = use_abs_path
if use_abs_path:
files: list[Filename | S3Path]
if any(str(f).startswith("s3://") for f in file_list):
files = [S3Path(str(f)) for f in file_list]
elif use_abs_path:
files = [utils._resolve_gdal_path(p) for p in file_list]
else:
files = list(file_list)
Expand Down Expand Up @@ -808,12 +812,18 @@ def _write(self):
ds = None

@property
def _gdal_file_strings(self):
def _gdal_file_strings(self) -> list[str]:
"""Get the GDAL-compatible paths to write to the VRT.
If we're not using .h5 or .nc, this will just be the file_list as is.
"""
return [io.format_nc_filename(f, self.subdataset) for f in self.file_list]
out = []
for f in self.file_list:
if isinstance(f, S3Path):
out.append(f.to_gdal())
else:
out.append(io.format_nc_filename(f, self.subdataset))
return out

def __fspath__(self):
# Allows os.fspath() to work on the object, enabling rasterio.open()
Expand Down
12 changes: 12 additions & 0 deletions src/dolphin/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ class BaseStack(BaseModel):
description="Index of the SLC to use as reference during phase linking",
)

model_config = {
# For the `Filename, so it can handle the `GeneralPath` protocol`
# https://github.com/pydantic/pydantic/discussions/5767
"arbitrary_types_allowed": True
}

@field_validator("dates", mode="before")
@classmethod
def _check_if_not_tuples(cls, v):
Expand Down Expand Up @@ -190,6 +196,12 @@ class CompressedSlcInfo(BaseModel):
description="Folder/location where ministack will write outputs to.",
)

model_config = {
# For the `Filename, so it can handle the `GeneralPath` protocol`
# https://github.com/pydantic/pydantic/discussions/5767
"arbitrary_types_allowed": True
}

@field_validator("real_slc_dates", mode="before")
@classmethod
def _untuple_dates(cls, v):
Expand Down
Loading

0 comments on commit e4e9369

Please sign in to comment.