diff --git a/minio/api.py b/minio/api.py index ffd687519..b02bddf9c 100644 --- a/minio/api.py +++ b/minio/api.py @@ -1528,7 +1528,7 @@ def _stream_put_object(self, bucket_name, object_name, # Instantiate a thread pool with 3 worker threads pool = ThreadPool(_PARALLEL_UPLOADERS) - parts_to_upload = [] + pool.start_parallel() # Generate new parts and upload <= current_part_size until # part_number reaches total_parts_count calculated for the @@ -1539,13 +1539,11 @@ def _stream_put_object(self, bucket_name, object_name, else last_part_size) part_data = read_full(data, current_part_size) - # Append current part information - parts_to_upload.append((bucket_name, object_name, upload_id, + pool.add_task(self._upload_part_routine, (bucket_name, object_name, upload_id, part_number, part_data, sse)) - # Run parts upload in parallel try: - pool.parallel_run(self._upload_part_routine, parts_to_upload) + upload_result = pool.result() except: # Any exception that occurs sends an abort on the # on-going multipart operation. @@ -1556,8 +1554,8 @@ def _stream_put_object(self, bucket_name, object_name, # Update uploaded_parts with the part uploads result # and check total uploaded data. - while not pool.result().empty(): - part_number, etag, total_read = pool.result().get() + while not upload_result.empty(): + part_number, etag, total_read = upload_result.get() uploaded_parts[part_number] = UploadPart(bucket_name, object_name, upload_id, diff --git a/minio/thread_pool.py b/minio/thread_pool.py index 45e18ff41..369e90598 100644 --- a/minio/thread_pool.py +++ b/minio/thread_pool.py @@ -42,20 +42,25 @@ def __init__(self, tasks_queue, results_queue, exceptions_queue): self.start() def run(self): - fast_quit = False - while not self.tasks_queue.empty(): - func, args, kargs = self.tasks_queue.get() - if not fast_quit: + """ Continously receive tasks and execute them """ + while True: + task = self.tasks_queue.get() + if task is None: + self.tasks_queue.task_done() + break + # No exception detected in any thread, + # continue the execution. + if self.exceptions_queue.empty(): try: + # Execute the task + func, args, kargs = task result = func(*args, **kargs) self.results_queue.put(result) except Exception as e: self.exceptions_queue.put(e) - fast_quit = True # Mark this task as done, whether an exception happened or not self.tasks_queue.task_done() - class ThreadPool: """ Pool of threads consuming tasks from a queue """ @@ -69,22 +74,21 @@ def add_task(self, func, *args, **kargs): """ Add a task to the queue """ self.tasks_queue.put((func, args, kargs)) - def parallel_run(self, func, args_list): - """ Add a list of tasks to the queue """ - for args in args_list: - self.add_task(func, args) - + def start_parallel(self): + """ Prepare threads to run tasks""" for _ in range(self.num_threads): Worker(self.tasks_queue, self.results_queue, self.exceptions_queue) + def result(self): + """ Stop threads and return the result of all called tasks """ + # Send None to all threads to cleanly stop them + for _ in range(self.num_threads): + self.tasks_queue.put(None) # Wait for completion of all the tasks in the queue self.tasks_queue.join() # Check if one of the thread raised an exception, if yes # raise it here in the function if not self.exceptions_queue.empty(): raise self.exceptions_queue.get() - - def result(self): - """ Return the result of all called tasks """ return self.results_queue