Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Checkpoint] Fix symlink issue where symlink file uploaded before checkpoint files upload #3376

Merged
merged 66 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
f52c770
a
bigning Jun 4, 2024
8ee8364
a
bigning Jun 5, 2024
4fecdf6
a
bigning Jun 6, 2024
7e53a3b
a
bigning Jun 6, 2024
76a5f2d
Merge https://github.com/mosaicml/composer into checkpoint_saver
bigning Jun 6, 2024
20cac57
Merge remote-tracking branch 'remotes/origin/checkpoint_saver' into c…
bigning Jun 6, 2024
f772e33
a
bigning Jun 6, 2024
4e391a6
a
bigning Jun 8, 2024
55ac530
a
bigning Jun 8, 2024
e2d267b
a
bigning Jun 10, 2024
8035f50
fix test
bigning Jun 11, 2024
e65110d
a
bigning Jun 11, 2024
40cddfb
a
bigning Jun 11, 2024
91d838c
a
bigning Jun 11, 2024
a23552b
a
bigning Jun 11, 2024
cf4e0f1
fix unit test
bigning Jun 11, 2024
229b57d
Merge https://github.com/mosaicml/composer into checkpoint_saver
bigning Jun 12, 2024
dde135f
Merge branch 'checkpoint_saver' of https://github.com/mosaicml/compos…
bigning Jun 12, 2024
e6884fc
a
bigning Jun 13, 2024
9911766
a
bigning Jun 13, 2024
36a1dc5
a
bigning Jun 13, 2024
e4db035
a
bigning Jun 13, 2024
081033c
a
bigning Jun 13, 2024
ae5ece3
fix 2gpu unit test
bigning Jun 13, 2024
2f5d6b0
a
bigning Jun 13, 2024
28a36e0
a
bigning Jun 13, 2024
703ef5f
Merge https://github.com/mosaicml/composer into checkpoint_saver
bigning Jun 13, 2024
c78f475
a
bigning Jun 13, 2024
7ecfcf3
a
bigning Jun 13, 2024
1280266
fix doctest
bigning Jun 14, 2024
c0cb94d
a
bigning Jun 14, 2024
95fca9f
fix test and lint
bigning Jun 14, 2024
2c77da9
up
bigning Jun 14, 2024
ca46b4f
a
bigning Jun 14, 2024
4f3108c
a
bigning Jun 14, 2024
11307f0
Merge branch 'dev' into checkpoint_saver
bigning Jun 17, 2024
f415d60
a
bigning Jun 18, 2024
a0a3e92
a
bigning Jun 18, 2024
301dd67
a
bigning Jun 18, 2024
c4c094b
a
bigning Jun 18, 2024
5ec3e28
a
bigning Jun 20, 2024
9813816
a
bigning Jun 20, 2024
8c3c5cc
address comments
bigning Jun 20, 2024
c81cc2f
a
bigning Jun 20, 2024
c1174d4
a
bigning Jun 20, 2024
df601d2
a
bigning Jun 20, 2024
a41f427
a
bigning Jun 20, 2024
bc06a7b
rerun test
bigning Jun 20, 2024
c87f36c
add logging
bigning Jun 21, 2024
1ebf5a7
Merge branch 'dev' into checkpoint_saver
bigning Jun 21, 2024
0e8ae23
remove debug comments
bigning Jun 21, 2024
c7541c4
comments
bigning Jun 21, 2024
a9081c2
a
bigning Jun 25, 2024
b98ad33
cleanup
bigning Jun 26, 2024
8a6f5d1
a
bigning Jun 26, 2024
ebbcc46
linter
bigning Jun 26, 2024
3575d1e
lint
bigning Jun 26, 2024
fb8dbba
Update composer/callbacks/checkpoint_saver.py
bigning Jun 28, 2024
df4f59a
commenst
bigning Jun 28, 2024
4971526
a
bigning Jun 28, 2024
ebbbf56
fix test
bigning Jun 28, 2024
3bb10c9
fix test
bigning Jun 28, 2024
0d4c7af
comments
bigning Jul 2, 2024
9d4e112
Merge branch 'dev' into checkpoint_saver
bigning Jul 3, 2024
6ed9aa7
a
bigning Jul 3, 2024
b781375
Merge branch 'dev' into checkpoint_saver
bigning Jul 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 224 additions & 20 deletions composer/callbacks/checkpoint_saver.py

Large diffs are not rendered by default.

59 changes: 5 additions & 54 deletions composer/loggers/remote_uploader_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,15 @@
from composer.loggers import Logger, MosaicMLLogger
from composer.loggers.logger_destination import LoggerDestination
from composer.utils import (
GCSObjectStore,
LibcloudObjectStore,
MLFlowObjectStore,
ObjectStore,
ObjectStoreTransientError,
OCIObjectStore,
S3ObjectStore,
SFTPObjectStore,
UCObjectStore,
build_remote_backend,
dist,
format_name_with_dist,
get_file,
retry,
validate_credentials,
)
from composer.utils.object_store.mlflow_object_store import MLFLOW_DBFS_PATH_PREFIX

Expand All @@ -50,37 +46,6 @@
__all__ = ['RemoteUploaderDownloader']


def _build_remote_backend(remote_backend_name: str, backend_kwargs: dict[str, Any]):
remote_backend_cls = None
remote_backend_name_to_cls = {
's3': S3ObjectStore,
'oci': OCIObjectStore,
'sftp': SFTPObjectStore,
'libcloud': LibcloudObjectStore,
'gs': GCSObjectStore,
}

# Handle `dbfs` backend as a special case, since it can map to either :class:`.UCObjectStore`
# or :class:`.MLFlowObjectStore`.
if remote_backend_name == 'dbfs':
path = backend_kwargs['path']
if path.startswith(MLFLOW_DBFS_PATH_PREFIX):
remote_backend_cls = MLFlowObjectStore
else:
# Validate if the path conforms to the requirements for UC volume paths
UCObjectStore.validate_path(path)
remote_backend_cls = UCObjectStore
else:
remote_backend_cls = remote_backend_name_to_cls.get(remote_backend_name, None)
if remote_backend_cls is None:
supported_remote_backends = list(remote_backend_name_to_cls.keys()) + ['dbfs']
raise ValueError(
f'The remote backend {remote_backend_name} is not supported. Please use one of ({supported_remote_backends})',
)

return remote_backend_cls(**backend_kwargs)


class RemoteUploaderDownloader(LoggerDestination):
r"""Logger destination that uploads (downloads) files to (from) a remote backend.

Expand Down Expand Up @@ -339,7 +304,7 @@ def __init__(
def remote_backend(self) -> ObjectStore:
"""The :class:`.ObjectStore` instance for the main thread."""
if self._remote_backend is None:
self._remote_backend = _build_remote_backend(self.remote_backend_name, self.backend_kwargs)
self._remote_backend = build_remote_backend(self.remote_backend_name, self.backend_kwargs)
return self._remote_backend

def init(self, state: State, logger: Logger) -> None:
Expand All @@ -359,7 +324,7 @@ def init(self, state: State, logger: Logger) -> None:
retry(
ObjectStoreTransientError,
self.num_attempts,
)(lambda: _validate_credentials(self.remote_backend, file_name_to_test))()
)(lambda: validate_credentials(self.remote_backend, file_name_to_test))()

# If the remote backend is an `MLFlowObjectStore`, the original path kwarg may have placeholders that can be
# updated with information generated at runtime, i.e., the MLFlow experiment and run IDs. This information
Expand Down Expand Up @@ -635,20 +600,6 @@ def _remote_file_name(self, remote_file_name: str):
return key_name


def _validate_credentials(
remote_backend: ObjectStore,
remote_file_name_to_test: str,
) -> None:
# Validates the credentials by attempting to touch a file in the bucket
# raises an error if there was a credentials failure.
with tempfile.NamedTemporaryFile('wb') as f:
f.write(b'credentials_validated_successfully')
remote_backend.upload_object(
object_name=remote_file_name_to_test,
filename=f.name,
)


def _upload_worker(
file_queue: Union[queue.Queue[tuple[str, str, bool]], multiprocessing.JoinableQueue[tuple[str, str, bool]]],
completed_queue: Union[queue.Queue[str], multiprocessing.JoinableQueue[str]],
Expand All @@ -663,7 +614,7 @@ def _upload_worker(
The worker will continuously poll ``file_queue`` for files to upload. Once ``is_finished`` is set, the worker will
exit once ``file_queue`` is empty.
"""
remote_backend = _build_remote_backend(remote_backend_name, backend_kwargs)
remote_backend = build_remote_backend(remote_backend_name, backend_kwargs)
while True:
try:
file_path_to_upload, remote_file_name, overwrite = file_queue.get(block=True, timeout=0.5)
Expand Down
20 changes: 7 additions & 13 deletions composer/trainer/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1387,16 +1387,6 @@ def __init__(
mosaicml_logger = MosaicMLLogger()
loggers.append(mosaicml_logger)

# Remote Uploader Downloader
# Keep the ``RemoteUploaderDownloader`` below client-provided loggers so the loggers init callbacks run before
# the ``RemoteUploaderDownloader`` init. This is necessary to use an ``MLFlowObjectStore`` to log objects to a
# run managed by an ``MLFlowLogger``, as the ``MLFlowObjectStore`` relies on the ``MLFlowLogger`` to initialize
# the active MLFlow run.
if save_folder is not None:
remote_ud = maybe_create_remote_uploader_downloader_from_uri(save_folder, loggers)
if remote_ud is not None:
loggers.append(remote_ud)

# Logger
self.logger = Logger(state=self.state, destinations=loggers)

Expand Down Expand Up @@ -1476,6 +1466,7 @@ def __init__(
ignore_keys=save_ignore_keys,
save_interval=save_interval,
num_checkpoints_to_keep=save_num_checkpoints_to_keep,
save_folder=save_folder,
)
self.state.callbacks.append(self._checkpoint_saver)

Expand Down Expand Up @@ -1894,14 +1885,17 @@ def _try_checkpoint_download(
self,
latest_checkpoint_path: str,
save_latest_remote_file_name: str,
loggers: Sequence[LoggerDestination],
loggers: Sequence[Union[LoggerDestination, ObjectStore]],
load_progress_bar: bool,
) -> None:
"""Attempts to download the checkpoint from the logger destinations."""
log.debug(
f'Trying to download {save_latest_remote_file_name} to {latest_checkpoint_path} on rank {dist.get_global_rank()}',
)
for logger in loggers:
remote_destination = list(loggers)
if self._checkpoint_saver is not None and self._checkpoint_saver.remote_uploader is not None:
remote_destination.append(self._checkpoint_saver.remote_uploader.remote_backend)
for logger in remote_destination:
try:
# Fetch from logger. If it succeeds, stop trying the rest of the loggers
get_file(
Expand Down Expand Up @@ -1943,7 +1937,7 @@ def _get_autoresume_checkpoint(
f'Looking for autoresume checkpoint: {save_latest_remote_file_name} (remote), {latest_checkpoint_path} (local)',
)

if self.state.deepspeed_enabled or self.state.fsdp_sharded_state_dict_enabled:
if self.state.deepspeed_enabled:
# If latest checkpoint is not saved locally, try to fetch from loggers
if not os.path.exists(latest_checkpoint_path):
log.debug(f'Attempting to download the checkpoint on to rank {dist.get_global_rank()}')
Expand Down
6 changes: 6 additions & 0 deletions composer/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
maybe_create_object_store_from_uri,
maybe_create_remote_uploader_downloader_from_uri,
parse_uri,
validate_credentials,
)
from composer.utils.import_helpers import MissingConditionalImportError, import_object
from composer.utils.inference import ExportFormat, Transform, export_for_inference, export_with_logger, quantize_dynamic
Expand Down Expand Up @@ -72,8 +73,10 @@
S3ObjectStore,
SFTPObjectStore,
UCObjectStore,
build_remote_backend,
)
from composer.utils.parallelism import FSDPConfig, ParallelismConfig, TPConfig, create_fsdp_config
from composer.utils.remote_uploader import RemoteUploader
from composer.utils.retrying import retry
from composer.utils.string_enum import StringEnum
from composer.utils.warnings import VersionedDeprecationWarning
Expand Down Expand Up @@ -156,4 +159,7 @@
'ParallelismConfig',
'MLFLOW_EXPERIMENT_ID_FORMAT_KEY',
'MLFLOW_RUN_ID_FORMAT_KEY',
'RemoteUploader',
'validate_credentials',
'build_remote_backend',
]
16 changes: 16 additions & 0 deletions composer/utils/file_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
'maybe_create_object_store_from_uri',
'maybe_create_remote_uploader_downloader_from_uri',
'parse_uri',
'validate_credentials',
]


Expand Down Expand Up @@ -737,3 +738,18 @@ def create_symlink_file(
raise ValueError('The symlink filename must end with .symlink.')
with open(destination_filename, 'x') as f:
f.write(existing_path)


def validate_credentials(
remote_backend: ObjectStore,
remote_file_name_to_test: str,
) -> None:
bigning marked this conversation as resolved.
Show resolved Hide resolved
"""Upload a tiny text file to test if the credentials are setup correctly."""
# Validates the credentials by attempting to touch a file in the bucket
# raises an error if there was a credentials failure.
with tempfile.NamedTemporaryFile('wb') as f:
f.write(b'credentials_validated_successfully')
remote_backend.upload_object(
bigning marked this conversation as resolved.
Show resolved Hide resolved
object_name=remote_file_name_to_test,
filename=f.name,
)
2 changes: 2 additions & 0 deletions composer/utils/object_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from composer.utils.object_store.s3_object_store import S3ObjectStore
from composer.utils.object_store.sftp_object_store import SFTPObjectStore
from composer.utils.object_store.uc_object_store import UCObjectStore
from composer.utils.object_store.utils import build_remote_backend

__all__ = [
'ObjectStore',
Expand All @@ -28,4 +29,5 @@
'UCObjectStore',
'MLFLOW_EXPERIMENT_ID_FORMAT_KEY',
'MLFLOW_RUN_ID_FORMAT_KEY',
'build_remote_backend',
]
48 changes: 48 additions & 0 deletions composer/utils/object_store/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2022 MosaicML Composer authors
bigning marked this conversation as resolved.
Show resolved Hide resolved
# SPDX-License-Identifier: Apache-2.0

"""Helpers for working with object stores."""

from typing import Any

from composer.utils.object_store.gcs_object_store import GCSObjectStore
from composer.utils.object_store.libcloud_object_store import LibcloudObjectStore
from composer.utils.object_store.mlflow_object_store import MLFLOW_DBFS_PATH_PREFIX, MLFlowObjectStore
from composer.utils.object_store.oci_object_store import OCIObjectStore
from composer.utils.object_store.s3_object_store import S3ObjectStore
from composer.utils.object_store.sftp_object_store import SFTPObjectStore
from composer.utils.object_store.uc_object_store import UCObjectStore

__all__ = ['build_remote_backend']


def build_remote_backend(remote_backend_name: str, backend_kwargs: dict[str, Any]):
"""Build object store given the backend name and kwargs."""
remote_backend_cls = None
remote_backend_name_to_cls = {
's3': S3ObjectStore,
'oci': OCIObjectStore,
'sftp': SFTPObjectStore,
'libcloud': LibcloudObjectStore,
'gs': GCSObjectStore,
}

# Handle `dbfs` backend as a special case, since it can map to either :class:`.UCObjectStore`
# or :class:`.MLFlowObjectStore`.
if remote_backend_name == 'dbfs':
path = backend_kwargs['path']
if path.startswith(MLFLOW_DBFS_PATH_PREFIX):
remote_backend_cls = MLFlowObjectStore
else:
# Validate if the path conforms to the requirements for UC volume paths
UCObjectStore.validate_path(path)
remote_backend_cls = UCObjectStore
else:
remote_backend_cls = remote_backend_name_to_cls.get(remote_backend_name, None)
if remote_backend_cls is None:
supported_remote_backends = list(remote_backend_name_to_cls.keys()) + ['dbfs']
raise ValueError(
f'The remote backend {remote_backend_name} is not supported. Please use one of ({supported_remote_backends})',
)

return remote_backend_cls(**backend_kwargs)
Loading
Loading