From 268667238a49a570d28e189c6ac6b04c397ae55c Mon Sep 17 00:00:00 2001 From: Hemang Date: Fri, 7 Jun 2019 12:49:16 +0530 Subject: [PATCH 1/3] add parallel blob object upload --- storage/google/cloud/storage/blob.py | 87 ++++++++++++++++++++++++++++ storage/tests/unit/test_blob.py | 45 ++++++++++++++ 2 files changed, 132 insertions(+) diff --git a/storage/google/cloud/storage/blob.py b/storage/google/cloud/storage/blob.py index 9c89c52b9e24..8c6de356dbc3 100644 --- a/storage/google/cloud/storage/blob.py +++ b/storage/google/cloud/storage/blob.py @@ -33,6 +33,7 @@ import os import time import warnings +import threading from six.moves.urllib.parse import parse_qsl from six.moves.urllib.parse import quote @@ -100,6 +101,7 @@ _DEFAULT_CHUNKSIZE = 104857600 # 1024 * 1024 B * 100 = 100 MB _MAX_MULTIPART_SIZE = 8388608 # 8 MB +_MAX_THREAD_LIMIT = 10 class Blob(_PropertyMixin): @@ -187,6 +189,9 @@ def __init__( if generation is not None: self._properties["generation"] = generation + self._file_count = 0 + self._lock = threading.Lock() + @property def chunk_size(self): """Get the blob's default chunk size. @@ -1950,6 +1955,88 @@ def updated(self): if value is not None: return _rfc3339_to_datetime(value) + @staticmethod + def _get_files(path): + """Get the list of full path of all files in the directory. + :param path: str + :return: The path to the directory. + + :rtype: list + :returns: The list of all file path in the directory. + """ + filepath_list = [] + for directory, _, filenames in os.walk(path): + for filename in filenames: + filepath_list.append(os.path.join(directory, filename)) + return filepath_list + + def upload_parallel( + self, path, content_type=None, client=None, predefined_acl=None + ): + """Upload this blob's contents parallel from the content of file in directory. + + The content type of the upload will be determined in order + of precedence: + + - The value passed in to this method (if not :data:`None`) + - The value stored on the current blob + - The value given by ``mimetypes.guess_type`` + - The default value ('application/octet-stream') + + .. note:: + The effect of uploading to an existing blob depends on the + "versioning" and "lifecycle" policies defined on the blob's + bucket. In the absence of those policies, upload will + overwrite any existing contents. + + See the `object versioning + `_ and + `lifecycle `_ + API documents for details. + + If :attr:`user_project` is set on the bucket, bills the API request + to that project. + + :type path: str + :param path: The path to the directory. + + :type content_type: str + :param content_type: Optional type of content being uploaded. + + :type client: :class:`~google.cloud.storage.client.Client` + :param client: (Optional) The client to use. If not passed, falls back + to the ``client`` stored on the blob's bucket. + + :type predefined_acl: str + :param predefined_acl: (Optional) predefined access control list + """ + threads = [] + files_list = self._get_files(path) + total_files = len(files_list) + max_thread_to_start = min(total_files, _MAX_THREAD_LIMIT) + for _ in range(max_thread_to_start): + thread = threading.Thread( + target=self._upload_from_list, + args=(files_list, total_files, content_type, client, predefined_acl), + ) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + + def _upload_from_list( + self, files_list, total_files, content_type, client, predefined_acl + ): + while total_files > self._file_count: + self.upload_from_filename( + files_list[self._file_count], content_type, client, predefined_acl + ) + self._lock.acquire() + self._file_count += 1 + self._lock.release() + time.sleep(0.3) + def _get_encryption_headers(key, source=False): """Builds customer encryption key headers diff --git a/storage/tests/unit/test_blob.py b/storage/tests/unit/test_blob.py index b264ddcb8acf..db2a5881c6ba 100644 --- a/storage/tests/unit/test_blob.py +++ b/storage/tests/unit/test_blob.py @@ -1852,6 +1852,51 @@ def test_upload_from_filename(self): self.assertEqual(stream.mode, "rb") self.assertEqual(stream.name, temp.name) + def _do_upload_parallel_mock_call_helper(self, blob, client, content_type, size): + self.assertEqual(blob._do_upload.call_count, 10) + mock_call = blob._do_upload.mock_calls[0] + call_name, pos_args, kwargs = mock_call + self.assertEqual(call_name, "") + self.assertEqual(len(pos_args), 6) + self.assertEqual(pos_args[0], client) + self.assertEqual(pos_args[2], content_type) + self.assertEqual(pos_args[3], size) + self.assertIsNone(pos_args[4]) # num_retries + self.assertIsNone(pos_args[5]) # predefined_acl + self.assertEqual(kwargs, {}) + return pos_args[1] + + def test_upload_parallel(self): + import tempfile + import shutil + + google.cloud.storage.blob._MAX_THREAD_LIMIT = 5 + blob = self._make_one("blob-name", bucket=None) + # Mock low-level upload helper on blob (it is tested elsewhere). + created_json = {"metadata": {"mint": "ice-cream"}} + blob._do_upload = mock.Mock(return_value=created_json, spec=[]) + + # Make sure `metadata` is empty before the request. + self.assertIsNone(blob.metadata) + data = b"soooo much data" + content_type = u"image/svg+xml" + client = mock.sentinel.client + temp_dir = tempfile.mkdtemp() + for _ in range(0, 10): + file_handle, name = tempfile.mkstemp(dir=temp_dir) + os.close(file_handle) + with open(name, "wb") as file_obj: + file_obj.write(data) + + blob.upload_parallel(temp_dir, content_type, client) + self.assertEqual(blob.metadata, created_json["metadata"]) + stream = self._do_upload_parallel_mock_call_helper( + blob, client, content_type, len(data) + ) + self.assertTrue(stream.closed) + self.assertEqual(stream.mode, "rb") + shutil.rmtree(temp_dir) + def _upload_from_string_helper(self, data, **kwargs): from google.cloud._helpers import _to_bytes From e300afd1315f260c1758d580ecd5d6666c84044d Mon Sep 17 00:00:00 2001 From: Hemang Date: Wed, 26 Jun 2019 13:19:57 +0530 Subject: [PATCH 2/3] change file variables scope --- storage/google/cloud/storage/blob.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/storage/google/cloud/storage/blob.py b/storage/google/cloud/storage/blob.py index 8c6de356dbc3..35392313bcf5 100644 --- a/storage/google/cloud/storage/blob.py +++ b/storage/google/cloud/storage/blob.py @@ -190,6 +190,8 @@ def __init__( self._properties["generation"] = generation self._file_count = 0 + self._total_files = 0 + self._files_list = [] self._lock = threading.Lock() @property @@ -1973,9 +1975,9 @@ def _get_files(path): def upload_parallel( self, path, content_type=None, client=None, predefined_acl=None ): - """Upload this blob's contents parallel from the content of file in directory. + """Upload this blob's contents in parallel, from the contents of a file in the directory. - The content type of the upload will be determined in order + The type of the uploaded content will be determined in the order of precedence: - The value passed in to this method (if not :data:`None`) @@ -2011,13 +2013,13 @@ def upload_parallel( :param predefined_acl: (Optional) predefined access control list """ threads = [] - files_list = self._get_files(path) - total_files = len(files_list) - max_thread_to_start = min(total_files, _MAX_THREAD_LIMIT) + self._files_list = self._get_files(path) + self._total_files = len(self._files_list) + max_thread_to_start = min(self._total_files, _MAX_THREAD_LIMIT) for _ in range(max_thread_to_start): thread = threading.Thread( target=self._upload_from_list, - args=(files_list, total_files, content_type, client, predefined_acl), + args=(content_type, client, predefined_acl), ) thread.start() threads.append(thread) @@ -2025,12 +2027,10 @@ def upload_parallel( for thread in threads: thread.join() - def _upload_from_list( - self, files_list, total_files, content_type, client, predefined_acl - ): - while total_files > self._file_count: + def _upload_from_list(self, content_type, client, predefined_acl): + while self._total_files > self._file_count: self.upload_from_filename( - files_list[self._file_count], content_type, client, predefined_acl + self._files_list[self._file_count], content_type, client, predefined_acl ) self._lock.acquire() self._file_count += 1 From 42f1d9e7fe14b3145d39bf70855b2f1323d0b0de Mon Sep 17 00:00:00 2001 From: Hemang Date: Wed, 26 Jun 2019 15:34:08 +0530 Subject: [PATCH 3/3] changes to avoid race condition --- storage/google/cloud/storage/blob.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/storage/google/cloud/storage/blob.py b/storage/google/cloud/storage/blob.py index 35392313bcf5..0f251b5f2b43 100644 --- a/storage/google/cloud/storage/blob.py +++ b/storage/google/cloud/storage/blob.py @@ -2029,12 +2029,16 @@ def upload_parallel( def _upload_from_list(self, content_type, client, predefined_acl): while self._total_files > self._file_count: - self.upload_from_filename( - self._files_list[self._file_count], content_type, client, predefined_acl - ) self._lock.acquire() + current_file_index = self._file_count self._file_count += 1 self._lock.release() + self.upload_from_filename( + self._files_list[current_file_index], + content_type, + client, + predefined_acl, + ) time.sleep(0.3)