From 9bf406cc8445d099be0245adbc76ac0c566e5099 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 27 Mar 2019 14:30:15 -0700 Subject: [PATCH] Add configurable part_size (#745) --- docs/API.md | 88 +++++++++++----------- docs/zh_CN/API.md | 8 +- examples/remove_all_bucket_notification.py | 2 +- minio/api.py | 42 ++++++++--- minio/helpers.py | 8 +- tests/unit/optimal_part_test.py | 10 +-- 6 files changed, 91 insertions(+), 67 deletions(-) diff --git a/docs/API.md b/docs/API.md index 706840b9f..586cd942b 100644 --- a/docs/API.md +++ b/docs/API.md @@ -28,18 +28,18 @@ s3Client = Minio('s3.amazonaws.com', -|Bucket operations | Object operations| Presigned operations | Bucket policy/notification operations -|:---|:---|:---|:---| -| [`make_bucket`](#make_bucket) | [`get_object`](#get_object) | [`presigned_get_object`](#presigned_get_object) | [`get_bucket_policy`](#get_bucket_policy) | -| [`list_buckets`](#list_buckets) | [`put_object`](#put_object) | [`presigned_put_object`](#presigned_put_object) | [`set_bucket_policy`](#set_bucket_policy) | -| [`bucket_exists`](#bucket_exists) | [`copy_object`](#copy_object) | [`presigned_post_policy`](#presigned_post_policy) | [`get_bucket_notification`](#get_bucket_notification) | -| [`remove_bucket`](#remove_bucket) | [`stat_object`](#stat_object) | | [`set_bucket_notification`](#set_bucket_notification) | -| [`list_objects`](#list_objects) | [`remove_object`](#remove_object) | | [`remove_all_bucket_notification`](#remove_all_bucket_notification) | -| [`list_objects_v2`](#list_objects_v2) | [`remove_objects`](#remove_objects) | | [`listen_bucket_notification`](#listen_bucket_notification) | -| [`list_incomplete_uploads`](#list_incomplete_uploads) | [`remove_incomplete_upload`](#remove_incomplete_upload) | | | -| | [`fput_object`](#fput_object) | | | -| | [`fget_object`](#fget_object) | | | -| | [`get_partial_object`](#get_partial_object) | | | +| Bucket operations | Object operations | Presigned operations | Bucket policy/notification operations | +|:------------------------------------------------------|:--------------------------------------------------------|:--------------------------------------------------|:--------------------------------------------------------------------| +| [`make_bucket`](#make_bucket) | [`get_object`](#get_object) | [`presigned_get_object`](#presigned_get_object) | [`get_bucket_policy`](#get_bucket_policy) | +| [`list_buckets`](#list_buckets) | [`put_object`](#put_object) | [`presigned_put_object`](#presigned_put_object) | [`set_bucket_policy`](#set_bucket_policy) | +| [`bucket_exists`](#bucket_exists) | [`copy_object`](#copy_object) | [`presigned_post_policy`](#presigned_post_policy) | [`get_bucket_notification`](#get_bucket_notification) | +| [`remove_bucket`](#remove_bucket) | [`stat_object`](#stat_object) | | [`set_bucket_notification`](#set_bucket_notification) | +| [`list_objects`](#list_objects) | [`remove_object`](#remove_object) | | [`remove_all_bucket_notification`](#remove_all_bucket_notification) | +| [`list_objects_v2`](#list_objects_v2) | [`remove_objects`](#remove_objects) | | [`listen_bucket_notification`](#listen_bucket_notification) | +| [`list_incomplete_uploads`](#list_incomplete_uploads) | [`remove_incomplete_upload`](#remove_incomplete_upload) | | | +| | [`fput_object`](#fput_object) | | | +| | [`fget_object`](#fget_object) | | | +| | [`get_partial_object`](#get_partial_object) | | | ## 1. Constructor @@ -474,7 +474,7 @@ There is no return value. If there are errors from the target server/service, a `ResponseError` is thrown. If there are validation errors, `InvalidArgumentError` or `TypeError` may be thrown. The input configuration cannot be empty - to delete the notification -configuration on a bucket, use the `remove_all_bucket_notifications()` +configuration on a bucket, use the `remove_all_bucket_notification()` API. __Example__ @@ -544,8 +544,8 @@ except (ArgumentError, TypeError) as err: print(err) ``` - -### remove_all_bucket_notifications(bucket_name) + +### remove_all_bucket_notification(bucket_name) Remove all notifications configured on the bucket. @@ -563,7 +563,7 @@ __Example__ ```py # Remove all the notifications config for a bucket. -minioClient.remove_all_bucket_notifications('mybucket') +minioClient.remove_all_bucket_notification('mybucket') ``` @@ -763,29 +763,30 @@ except ResponseError as err: ``` -### put_object(bucket_name, object_name, data, length, content_type='application/octet-stream', metadata=None) +### put_object(bucket_name, object_name, data, length, content_type='application/octet-stream', metadata=None, progress=None, part_size=5*1024*1024) Add a new object to the object storage server. If provided metadata key is not one of the valid/supported metadata names, the metadata information is saved with prefix `X-Amz-Meta-` prepended to the original metadata key name. NOTE: Maximum object size supported by this API is 5TiB. __Parameters__ -|Param |Type |Description | -|:---|:---|:---| -|``bucket_name`` |_string_ |Name of the bucket. | -|``object_name`` |_string_ |Name of the object. | -|``data`` |_io.RawIOBase_ |Any python object implementing io.RawIOBase. | -|``length`` |_int_ |Total length of object. | -|``content_type`` |_string_ | Content type of the object. (optional, defaults to 'application/octet-stream'). | -|``metadata`` |_dict_ | Any additional metadata. (optional, defaults to None). | -|``sse`` |_dict_ |Server-Side Encryption headers (optional, defaults to None). | -|``progress`` |_subclass_of_threading_ |A progress object (optional, defaults to None). | +| Param | Type | Description | +|:-----------------|:------------------------|:--------------------------------------------------------------------------------| +| ``bucket_name`` | _string_ | Name of the bucket. | +| ``object_name`` | _string_ | Name of the object. | +| ``data`` | _io.RawIOBase_ | Any python object implementing io.RawIOBase. | +| ``length`` | _int_ | Total length of object. | +| ``content_type`` | _string_ | Content type of the object. (optional, defaults to 'application/octet-stream'). | +| ``metadata`` | _dict_ | Any additional metadata. (optional, defaults to None). | +| ``sse`` | _dict_ | Server-Side Encryption headers (optional, defaults to None). | +| ``progress`` | _subclass_of_threading_ | A progress object (optional, defaults to None). | +| ``part_size`` | _int_ | Multipart part size. | __Return Value__ -|Param |Type |Description | -|:---|:---|:---| -|``etag``|_string_ |Object etag computed by the server. | +| Param | Type | Description | +|:---------|:---------|:------------------------------------| +| ``etag`` | _string_ | Object etag computed by the server. | __Example__ @@ -814,26 +815,27 @@ except ResponseError as err: ``` -### fput_object(bucket_name, object_name, file_path, content_type='application/octet-stream', metadata=None) +### fput_object(bucket_name, object_name, file_path, content_type='application/octet-stream', metadata=None, progress=None, part_size=5*1024*1024) Uploads contents from a file, `file_path`, to `object_name`. If provided metadata key is not one of the valid/supported metadata names, the metadata information is saved with prefix `X-Amz-Meta-` prepended to the original metadata key name. __Parameters__ -|Param |Type |Description | -|:---|:---|:---| -|``bucket_name`` |_string_ |Name of the bucket. | -|``object_name`` |_string_ |Name of the object. | -|``file_path`` |_string_ |Path on the local filesystem from which object data will be read. | -|``content_type`` |_string_ | Content type of the object (optional, defaults to 'application/octet-stream'). | -|``metadata`` |_dict_ | Any additional metadata (optional, defaults to None). | -|``sse`` |_dict_ |Server-Side Encryption headers (optional, defaults to None). | -|``progress`` |_subclass_of_threading_ |A progress object (optional, defaults to None). | +| Param | Type | Description | +|:-----------------|:------------------------|:-------------------------------------------------------------------------------| +| ``bucket_name`` | _string_ | Name of the bucket. | +| ``object_name`` | _string_ | Name of the object. | +| ``file_path`` | _string_ | Path on the local filesystem from which object data will be read. | +| ``content_type`` | _string_ | Content type of the object (optional, defaults to 'application/octet-stream'). | +| ``metadata`` | _dict_ | Any additional metadata (optional, defaults to None). | +| ``sse`` | _dict_ | Server-Side Encryption headers (optional, defaults to None). | +| ``progress`` | _subclass_of_threading_ | A progress object (optional, defaults to None). | +| ``part_size`` | _int_ | Multipart part size. | __Return Value__ -|Param |Type |Description | -|:---|:---|:---| -|``etag``|_string_ |Object etag computed by the server. | +| Param | Type | Description | +|:---------|:---------|:------------------------------------| +| ``etag`` | _string_ | Object etag computed by the server. | __Example__ diff --git a/docs/zh_CN/API.md b/docs/zh_CN/API.md index 201534da5..ac6b258c6 100644 --- a/docs/zh_CN/API.md +++ b/docs/zh_CN/API.md @@ -439,7 +439,7 @@ notification = minioClient.get_bucket_notification('mybucket') * __Value__ (string) -- 指定规则适用的值。 -没有返回值。如果目标服务报错,会抛出`ResponseError`。如果有验证错误,会抛出`InvalidArgumentError`或者`TypeError`。输入参数的configuration不能为空 - 为了删除存储桶上的通知配置,参考`remove_all_bucket_notifications()` API。 +没有返回值。如果目标服务报错,会抛出`ResponseError`。如果有验证错误,会抛出`InvalidArgumentError`或者`TypeError`。输入参数的configuration不能为空 - 为了删除存储桶上的通知配置,参考`remove_all_bucket_notification()` API。 __示例__ @@ -508,8 +508,8 @@ except (ArgumentError, TypeError) as err: print(err) ``` - -### remove_all_bucket_notifications(bucket_name) + +### remove_all_bucket_notification(bucket_name) 删除存储桶上配置的所有通知。 @@ -526,7 +526,7 @@ __示例__ ```py # Remove all the notifications config for a bucket. -minioClient.remove_all_bucket_notifications('mybucket') +minioClient.remove_all_bucket_notification('mybucket') ``` diff --git a/examples/remove_all_bucket_notification.py b/examples/remove_all_bucket_notification.py index 3db2e8ccb..fcd31e1e0 100644 --- a/examples/remove_all_bucket_notification.py +++ b/examples/remove_all_bucket_notification.py @@ -26,6 +26,6 @@ try: # Remove all notification config for a bucket. - client.remove_all_bucket_notifications('my-bucketname') + client.remove_all_bucket_notification('my-bucketname') except ResponseError as err: print(err) diff --git a/minio/api.py b/minio/api.py index 6b7b10abd..cdb19d12c 100644 --- a/minio/api.py +++ b/minio/api.py @@ -78,8 +78,11 @@ mkdir_p, dump_http, amzprefix_user_metadata, is_supported_header,is_amz_header) from .helpers import (MAX_MULTIPART_OBJECT_SIZE, + MAX_PART_SIZE, MAX_POOL_SIZE, - MIN_PART_SIZE) + MIN_PART_SIZE, + DEFAULT_PART_SIZE, + MAX_MULTIPART_COUNT) from .signer import (sign_v4, presign_v4, generate_credential_string, post_presign_signature) @@ -541,7 +544,8 @@ def listen_bucket_notification(self, bucket_name, prefix='', suffix='', def fput_object(self, bucket_name, object_name, file_path, content_type='application/octet-stream', - metadata=None, sse=None, progress=None): + metadata=None, sse=None, progress=None, + part_size=DEFAULT_PART_SIZE): """ Add a new object to the cloud storage server. @@ -555,6 +559,7 @@ def fput_object(self, bucket_name, object_name, file_path, :param metadata: Any additional metadata to be uploaded along with your PUT request. :param progress: A progress object + :param part_size: Multipart part size :return: etag """ @@ -562,7 +567,8 @@ def fput_object(self, bucket_name, object_name, file_path, with open(file_path, 'rb') as file_data: file_size = os.stat(file_path).st_size return self.put_object(bucket_name, object_name, file_data, - file_size, content_type, metadata, sse, progress) + file_size, content_type, metadata, sse, + progress, part_size) def fget_object(self, bucket_name, object_name, file_path, request_headers=None, sse=None): """ @@ -751,7 +757,8 @@ def copy_object(self, bucket_name, object_name, object_source, def put_object(self, bucket_name, object_name, data, length, content_type='application/octet-stream', - metadata=None, sse=None, progress=None): + metadata=None, sse=None, progress=None, + part_size=DEFAULT_PART_SIZE): """ Add a new object to the cloud storage server. @@ -775,6 +782,7 @@ def put_object(self, bucket_name, object_name, data, length, :param metadata: Any additional metadata to be uploaded along with your PUT request. :param progress: A progress object + :param part_size: Multipart part size :return: etag """ @@ -792,9 +800,18 @@ def put_object(self, bucket_name, object_name, data, length, raise ValueError( 'Invalid input data does not implement a callable read() method') - if length > MAX_MULTIPART_OBJECT_SIZE: - raise InvalidArgumentError('Input content size is bigger ' - ' than allowed maximum of 5TiB.') + if length > (part_size * MAX_MULTIPART_COUNT): + raise InvalidArgumentError('Part size * max_parts(10000) is ' + ' lesser than input length.') + + if part_size < MIN_PART_SIZE: + raise InvalidArgumentError('Input part size is smaller ' + ' than allowed minimum of 5MiB.') + + if part_size > MAX_PART_SIZE: + raise InvalidArgumentError('Input part size is bigger ' + ' than allowed maximum of 5GiB.') + if not metadata: metadata = {} @@ -803,10 +820,11 @@ def put_object(self, bucket_name, object_name, data, length, metadata['Content-Type'] = 'application/octet-stream' if \ not content_type else content_type - if length > MIN_PART_SIZE: + if length > part_size: return self._stream_put_object(bucket_name, object_name, data, length, metadata=metadata, - sse=sse, progress=progress) + sse=sse, progress=progress, + part_size=part_size) current_data = data.read(length) if len(current_data) != length: @@ -1550,7 +1568,8 @@ def _upload_part_routine(self, part_info): def _stream_put_object(self, bucket_name, object_name, data, content_size, - metadata=None, sse=None, progress=None): + metadata=None, sse=None, + progress=None, part_size=MIN_PART_SIZE): """ Streaming multipart upload operation. @@ -1562,6 +1581,7 @@ def _stream_put_object(self, bucket_name, object_name, :param metadata: Any additional metadata to be uploaded along with your object. :param progress: A progress object + :param part_size: Multipart part size """ is_valid_bucket_name(bucket_name) is_non_empty_string(object_name) @@ -1579,7 +1599,7 @@ def _stream_put_object(self, bucket_name, object_name, # Calculate optimal part info. total_parts_count, part_size, last_part_size = optimal_part_info( - content_size) + content_size, part_size) # Instantiate a thread pool with 3 worker threads pool = ThreadPool(_PARALLEL_UPLOADERS) diff --git a/minio/helpers.py b/minio/helpers.py index fe4954373..9c3f45026 100644 --- a/minio/helpers.py +++ b/minio/helpers.py @@ -46,8 +46,10 @@ # Constants MAX_MULTIPART_COUNT = 10000 # 10000 parts MAX_MULTIPART_OBJECT_SIZE = 5 * 1024 * 1024 * 1024 * 1024 # 5TiB +MAX_PART_SIZE = 5 * 1024 * 1024 * 1024 # 5GiB MAX_POOL_SIZE = 10 MIN_PART_SIZE = 5 * 1024 * 1024 # 5MiB +DEFAULT_PART_SIZE = MIN_PART_SIZE # Currently its 5MiB _VALID_BUCKETNAME_REGEX = re.compile('^[a-z0-9][a-z0-9\\.\\-]+[a-z0-9]$') _ALLOWED_HOSTNAME_REGEX = re.compile( @@ -616,7 +618,7 @@ def get_md5_base64digest(content): return Hasher.md5(content).base64digest() -def optimal_part_info(length): +def optimal_part_info(length, part_size): """ Calculate optimal part size for multipart uploads. @@ -633,8 +635,8 @@ def optimal_part_info(length): # Use floats for part size for all calculations to avoid # overflows during float64 to int64 conversions. part_size_float = math.ceil(length/MAX_MULTIPART_COUNT) - part_size_float = (math.ceil(part_size_float/MIN_PART_SIZE) - * MIN_PART_SIZE) + part_size_float = (math.ceil(part_size_float/part_size) + * part_size) # Total parts count. total_parts_count = int(math.ceil(length/part_size_float)) # Part size. diff --git a/tests/unit/optimal_part_test.py b/tests/unit/optimal_part_test.py index bf98ebaac..f8ebf6833 100644 --- a/tests/unit/optimal_part_test.py +++ b/tests/unit/optimal_part_test.py @@ -17,28 +17,28 @@ from nose.tools import eq_, raises from unittest import TestCase -from minio.helpers import optimal_part_info, MAX_MULTIPART_OBJECT_SIZE +from minio.helpers import optimal_part_info, MAX_MULTIPART_OBJECT_SIZE, MIN_PART_SIZE from minio.error import InvalidArgumentError class TraceTest(TestCase): @raises(InvalidArgumentError) def test_input_size_wrong(self): - optimal_part_info(MAX_MULTIPART_OBJECT_SIZE + 1) + optimal_part_info(MAX_MULTIPART_OBJECT_SIZE + 1, MIN_PART_SIZE) def test_input_size_valid_maximum(self): - total_parts_count, part_size, last_part_size = optimal_part_info(MAX_MULTIPART_OBJECT_SIZE) + total_parts_count, part_size, last_part_size = optimal_part_info(MAX_MULTIPART_OBJECT_SIZE, MIN_PART_SIZE) eq_(total_parts_count, 9987) eq_(part_size, 550502400) eq_(last_part_size, 241172480) def test_input_size_valid(self): - total_parts_count, part_size, last_part_size = optimal_part_info(MAX_MULTIPART_OBJECT_SIZE/1024) + total_parts_count, part_size, last_part_size = optimal_part_info(MAX_MULTIPART_OBJECT_SIZE/1024, MIN_PART_SIZE) eq_(total_parts_count, 1024) eq_(part_size, 5242880) eq_(last_part_size, 5242880) def test_input_size_is_special_value(self): - total_parts_count, part_size, last_part_size = optimal_part_info(-1) + total_parts_count, part_size, last_part_size = optimal_part_info(-1, MIN_PART_SIZE) eq_(total_parts_count, 9987) eq_(part_size, 550502400) eq_(last_part_size, 241172480)