From ef3887a0c9322811643b31a3006e8e494f11bce9 Mon Sep 17 00:00:00 2001 From: Aki Ariga Date: Sun, 17 Sep 2023 15:10:33 -0700 Subject: [PATCH 1/6] Add parallel upload for msgpack option --- pytd/tests/test_writer.py | 28 ++++++++++++---------- pytd/writer.py | 50 ++++++++++++++++++++++++++++++--------- 2 files changed, 55 insertions(+), 23 deletions(-) diff --git a/pytd/tests/test_writer.py b/pytd/tests/test_writer.py index 510d90d..0202d7a 100644 --- a/pytd/tests/test_writer.py +++ b/pytd/tests/test_writer.py @@ -264,7 +264,7 @@ def test_write_dataframe_tempfile_deletion(self): # file pointer to a temp CSV file fp = self.writer._bulk_import.call_args[0][1] # temp file should not exist - self.assertFalse(os.path.isfile(fp.name)) + self.assertFalse(os.path.isfile(fp[0].name)) # Case #2: bulk import failed self.writer._bulk_import = MagicMock(side_effect=Exception()) @@ -273,7 +273,7 @@ def test_write_dataframe_tempfile_deletion(self): pd.DataFrame([[1, 2], [3, 4]]), self.table, "overwrite" ) fp = self.writer._bulk_import.call_args[0][1] - self.assertFalse(os.path.isfile(fp.name)) + self.assertFalse(os.path.isfile(fp[0].name)) def test_write_dataframe_msgpack(self): df = pd.DataFrame([[1, 2], [3, 4]]) @@ -286,7 +286,7 @@ def test_write_dataframe_msgpack(self): ) size = _bytes.getbuffer().nbytes api_client.create_bulk_import().upload_part.assert_called_with( - "part", ANY, size + "part-0", ANY, size ) self.assertFalse(api_client.create_bulk_import().upload_file.called) @@ -300,15 +300,17 @@ def test_write_dataframe_msgpack_with_int_na(self): ], dtype="Int64", ) - expected_list = [ + expected_list = ( {"a": 1, "b": 2, "c": None, "time": 1234}, {"a": 3, "b": 4, "c": 5, "time": 1234}, - ] + ) self.writer._write_msgpack_stream = MagicMock() self.writer.write_dataframe(df, self.table, "overwrite", fmt="msgpack") self.assertTrue(self.writer._write_msgpack_stream.called) + print(self.writer._write_msgpack_stream.call_args[0][0][0:2]) self.assertEqual( - self.writer._write_msgpack_stream.call_args[0][0], expected_list + self.writer._write_msgpack_stream.call_args[0][0][0:2], + expected_list, ) @unittest.skipIf( @@ -320,15 +322,16 @@ def test_write_dataframe_msgpack_with_string_na(self): dtype="string", ) df["time"] = 1234 - expected_list = [ + expected_list = ( {"a": "foo", "b": "bar", "c": None, "time": 1234}, {"a": "buzz", "b": "buzz", "c": "alice", "time": 1234}, - ] + ) self.writer._write_msgpack_stream = MagicMock() self.writer.write_dataframe(df, self.table, "overwrite", fmt="msgpack") self.assertTrue(self.writer._write_msgpack_stream.called) self.assertEqual( - self.writer._write_msgpack_stream.call_args[0][0], expected_list + self.writer._write_msgpack_stream.call_args[0][0][0:2], + expected_list, ) @unittest.skipIf( @@ -340,15 +343,16 @@ def test_write_dataframe_msgpack_with_boolean_na(self): dtype="boolean", ) df["time"] = 1234 - expected_list = [ + expected_list = ( {"a": "true", "b": "false", "c": None, "time": 1234}, {"a": "false", "b": "true", "c": "true", "time": 1234}, - ] + ) self.writer._write_msgpack_stream = MagicMock() self.writer.write_dataframe(df, self.table, "overwrite", fmt="msgpack") self.assertTrue(self.writer._write_msgpack_stream.called) self.assertEqual( - self.writer._write_msgpack_stream.call_args[0][0], expected_list + self.writer._write_msgpack_stream.call_args[0][0][0:2], + expected_list, ) def test_write_dataframe_invalid_if_exists(self): diff --git a/pytd/writer.py b/pytd/writer.py index d486c39..fae5b46 100644 --- a/pytd/writer.py +++ b/pytd/writer.py @@ -6,7 +6,9 @@ import tempfile import time import uuid +from concurrent.futures import ThreadPoolExecutor from contextlib import ExitStack +from itertools import zip_longest import msgpack import numpy as np @@ -308,7 +310,9 @@ class BulkImportWriter(Writer): td-client-python's bulk importer. """ - def write_dataframe(self, dataframe, table, if_exists, fmt="csv", keep_list=False): + def write_dataframe( + self, dataframe, table, if_exists, fmt="csv", keep_list=False, max_workers=5 + ): """Write a given DataFrame to a Treasure Data table. This method internally converts a given :class:`pandas.DataFrame` into a @@ -403,6 +407,10 @@ def write_dataframe(self, dataframe, table, if_exists, fmt="csv", keep_list=Fals Or, you can use :func:`Client.load_table_from_dataframe` function as well. >>> client.load_table_from_dataframe(df, "bulk_import", keep_list=True) + + max_workers : int, optional, default: 5 + The maximum number of threads that can be used to execute the given calls. + This is used only when ``fmt`` is ``msgpack``. """ if self.closed: raise RuntimeError("this writer is already closed and no longer available") @@ -420,26 +428,31 @@ def write_dataframe(self, dataframe, table, if_exists, fmt="csv", keep_list=Fals _cast_dtypes(dataframe, keep_list=keep_list) with ExitStack() as stack: + fps = [] if fmt == "csv": fp = tempfile.NamedTemporaryFile(suffix=".csv", delete=False) stack.callback(os.unlink, fp.name) stack.callback(fp.close) dataframe.to_csv(fp.name) + fps.append(fp) elif fmt == "msgpack": _replace_pd_na(dataframe) - fp = io.BytesIO() - fp = self._write_msgpack_stream(dataframe.to_dict(orient="records"), fp) - stack.callback(fp.close) + records = dataframe.to_dict(orient="records") + for group in zip_longest(*(iter(records),) * 10000): + fp = io.BytesIO() + fp = self._write_msgpack_stream(group, fp) + fps.append(fp) + stack.callback(fp.close) else: raise ValueError( f"unsupported format '{fmt}' for bulk import. " "should be 'csv' or 'msgpack'" ) - self._bulk_import(table, fp, if_exists, fmt) + self._bulk_import(table, fps, if_exists, fmt, max_workers=max_workers) stack.close() - def _bulk_import(self, table, file_like, if_exists, fmt="csv"): + def _bulk_import(self, table, file_like, if_exists, fmt="csv", max_workers=5): """Write a specified CSV file to a Treasure Data table. This method uploads the file to Treasure Data via bulk import API. @@ -449,7 +462,7 @@ def _bulk_import(self, table, file_like, if_exists, fmt="csv"): table : :class:`pytd.table.Table` Target table. - file_like : File like object + file_like : List of file like objects Data in this file will be loaded to a target table. if_exists : str, {'error', 'overwrite', 'append', 'ignore'} @@ -462,6 +475,10 @@ def _bulk_import(self, table, file_like, if_exists, fmt="csv"): fmt : str, optional, {'csv', 'msgpack'}, default: 'csv' File format for bulk import. See also :func:`write_dataframe` + + max_workers : int, optional, default: 5 + The maximum number of threads that can be used to execute the given calls. + This is used only when ``fmt`` is ``msgpack``. """ params = None if table.exists: @@ -489,11 +506,19 @@ def _bulk_import(self, table, file_like, if_exists, fmt="csv"): try: logger.info(f"uploading data converted into a {fmt} file") if fmt == "msgpack": - size = file_like.getbuffer().nbytes - # To skip API._prepare_file(), which recreate msgpack again. - bulk_import.upload_part("part", file_like, size) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + _ = [ + executor.submit( + bulk_import.upload_part, + f"part-{i}", + fp, + fp.getbuffer().nbytes, + ) + for i, fp in enumerate(file_like) + ] else: - bulk_import.upload_file("part", fmt, file_like) + fp = file_like[0] + bulk_import.upload_file("part", fmt, fp) bulk_import.freeze() except Exception as e: bulk_import.delete() @@ -535,6 +560,9 @@ def _write_msgpack_stream(self, items, stream): with gzip.GzipFile(mode="wb", fileobj=stream) as gz: packer = msgpack.Packer() for item in items: + # Ignore None created by zip_longest + if not item: + break try: mp = packer.pack(item) except (OverflowError, ValueError): From a0767f37dd5bfb47e3724821dc49f009cfc808d3 Mon Sep 17 00:00:00 2001 From: Aki Ariga Date: Fri, 16 Aug 2024 18:50:34 -0700 Subject: [PATCH 2/6] Add chunk_record_size argument --- pytd/writer.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/pytd/writer.py b/pytd/writer.py index fae5b46..9bb3346 100644 --- a/pytd/writer.py +++ b/pytd/writer.py @@ -311,7 +311,14 @@ class BulkImportWriter(Writer): """ def write_dataframe( - self, dataframe, table, if_exists, fmt="csv", keep_list=False, max_workers=5 + self, + dataframe, + table, + if_exists, + fmt="csv", + keep_list=False, + max_workers=5, + chunk_record_size=10_000, ): """Write a given DataFrame to a Treasure Data table. @@ -407,10 +414,14 @@ def write_dataframe( Or, you can use :func:`Client.load_table_from_dataframe` function as well. >>> client.load_table_from_dataframe(df, "bulk_import", keep_list=True) - + max_workers : int, optional, default: 5 The maximum number of threads that can be used to execute the given calls. This is used only when ``fmt`` is ``msgpack``. + + chunk_record_size : int, optional, default: 10_000 + The number of records to be written in a single file. This is used only when + ``fmt`` is ``msgpack``. """ if self.closed: raise RuntimeError("this writer is already closed and no longer available") @@ -439,7 +450,7 @@ def write_dataframe( _replace_pd_na(dataframe) records = dataframe.to_dict(orient="records") - for group in zip_longest(*(iter(records),) * 10000): + for group in zip_longest(*(iter(records),) * chunk_record_size): fp = io.BytesIO() fp = self._write_msgpack_stream(group, fp) fps.append(fp) From 80ed76b2c8dfb2862d1e3a79cfb6a91219f24450 Mon Sep 17 00:00:00 2001 From: Aki Ariga Date: Mon, 26 Aug 2024 21:18:26 -0700 Subject: [PATCH 3/6] Use NamedTemporaryFile instead of io.BytesIO for msgpack This change is to avoid the need to keep the entire msgpack in memory, which can be a problem for large data sets. --- pytd/tests/test_writer.py | 74 ++++++++++++++++----------------------- pytd/writer.py | 34 ++++++++++++------ 2 files changed, 53 insertions(+), 55 deletions(-) diff --git a/pytd/tests/test_writer.py b/pytd/tests/test_writer.py index 0202d7a..ca746ad 100644 --- a/pytd/tests/test_writer.py +++ b/pytd/tests/test_writer.py @@ -1,7 +1,7 @@ -import io import os +import tempfile import unittest -from unittest.mock import ANY, MagicMock +from unittest.mock import ANY, MagicMock, patch import numpy as np import pandas as pd @@ -89,9 +89,6 @@ def test_cast_dtypes(self): # This is for consistency of _get_schema self.assertTrue(pd.isna(dft["O"][2])) - @unittest.skipIf( - pd.__version__ < "1.0.0", "pd.NA is not supported in this pandas version" - ) def test_cast_dtypes_nullable(self): dft = pd.DataFrame( { @@ -281,14 +278,12 @@ def test_write_dataframe_msgpack(self): api_client = self.table.client.api_client self.assertTrue(api_client.create_bulk_import.called) self.assertTrue(api_client.create_bulk_import().upload_part.called) - _bytes = BulkImportWriter()._write_msgpack_stream( - df.to_dict(orient="records"), io.BytesIO() - ) - size = _bytes.getbuffer().nbytes - api_client.create_bulk_import().upload_part.assert_called_with( - "part-0", ANY, size - ) - self.assertFalse(api_client.create_bulk_import().upload_file.called) + with tempfile.NamedTemporaryFile(delete=False) as fp: + fp = BulkImportWriter()._write_msgpack_stream(df.to_dict(orient="records"), fp) + api_client.create_bulk_import().upload_part.assert_called_with( + "part-0", ANY, 62 + ) + self.assertFalse(api_client.create_bulk_import().upload_file.called) def test_write_dataframe_msgpack_with_int_na(self): # Although this conversion ensures pd.NA Int64 dtype to None, @@ -305,17 +300,15 @@ def test_write_dataframe_msgpack_with_int_na(self): {"a": 3, "b": 4, "c": 5, "time": 1234}, ) self.writer._write_msgpack_stream = MagicMock() - self.writer.write_dataframe(df, self.table, "overwrite", fmt="msgpack") - self.assertTrue(self.writer._write_msgpack_stream.called) - print(self.writer._write_msgpack_stream.call_args[0][0][0:2]) - self.assertEqual( - self.writer._write_msgpack_stream.call_args[0][0][0:2], - expected_list, - ) + with patch("pytd.writer.os.unlink"): + self.writer.write_dataframe(df, self.table, "overwrite", fmt="msgpack") + self.assertTrue(self.writer._write_msgpack_stream.called) + print(self.writer._write_msgpack_stream.call_args[0][0][0:2]) + self.assertEqual( + self.writer._write_msgpack_stream.call_args[0][0][0:2], + expected_list, + ) - @unittest.skipIf( - pd.__version__ < "1.0.0", "pd.NA not supported in this pandas version" - ) def test_write_dataframe_msgpack_with_string_na(self): df = pd.DataFrame( data=[{"a": "foo", "b": "bar"}, {"a": "buzz", "b": "buzz", "c": "alice"}], @@ -327,16 +320,14 @@ def test_write_dataframe_msgpack_with_string_na(self): {"a": "buzz", "b": "buzz", "c": "alice", "time": 1234}, ) self.writer._write_msgpack_stream = MagicMock() - self.writer.write_dataframe(df, self.table, "overwrite", fmt="msgpack") - self.assertTrue(self.writer._write_msgpack_stream.called) - self.assertEqual( - self.writer._write_msgpack_stream.call_args[0][0][0:2], - expected_list, - ) + with patch("pytd.writer.os.unlink"): + self.writer.write_dataframe(df, self.table, "overwrite", fmt="msgpack") + self.assertTrue(self.writer._write_msgpack_stream.called) + self.assertEqual( + self.writer._write_msgpack_stream.call_args[0][0][0:2], + expected_list, + ) - @unittest.skipIf( - pd.__version__ < "1.0.0", "pd.NA not supported in this pandas version" - ) def test_write_dataframe_msgpack_with_boolean_na(self): df = pd.DataFrame( data=[{"a": True, "b": False}, {"a": False, "b": True, "c": True}], @@ -348,12 +339,13 @@ def test_write_dataframe_msgpack_with_boolean_na(self): {"a": "false", "b": "true", "c": "true", "time": 1234}, ) self.writer._write_msgpack_stream = MagicMock() - self.writer.write_dataframe(df, self.table, "overwrite", fmt="msgpack") - self.assertTrue(self.writer._write_msgpack_stream.called) - self.assertEqual( - self.writer._write_msgpack_stream.call_args[0][0][0:2], - expected_list, - ) + with patch("pytd.writer.os.unlink"): + self.writer.write_dataframe(df, self.table, "overwrite", fmt="msgpack") + self.assertTrue(self.writer._write_msgpack_stream.called) + self.assertEqual( + self.writer._write_msgpack_stream.call_args[0][0][0:2], + expected_list, + ) def test_write_dataframe_invalid_if_exists(self): with self.assertRaises(ValueError): @@ -412,9 +404,6 @@ def test_write_dataframe_with_int_na(self): self.writer.td_spark.spark.createDataFrame.call_args[0][0], expected_df ) - @unittest.skipIf( - pd.__version__ < "1.0.0", "pd.NA is not supported in this pandas version" - ) def test_write_dataframe_with_string_na(self): df = pd.DataFrame( data=[{"a": "foo", "b": "bar"}, {"a": "buzz", "b": "buzz", "c": "alice"}], @@ -427,9 +416,6 @@ def test_write_dataframe_with_string_na(self): self.writer.td_spark.spark.createDataFrame.call_args[0][0], expected_df ) - @unittest.skipIf( - pd.__version__ < "1.0.0", "pd.NA is not supported in this pandas version" - ) def test_write_dataframe_with_boolean_na(self): df = pd.DataFrame( data=[{"a": True, "b": False}, {"a": False, "b": True, "c": True}], diff --git a/pytd/writer.py b/pytd/writer.py index 9bb3346..9423f2d 100644 --- a/pytd/writer.py +++ b/pytd/writer.py @@ -1,6 +1,5 @@ import abc import gzip -import io import logging import os import tempfile @@ -450,11 +449,18 @@ def write_dataframe( _replace_pd_na(dataframe) records = dataframe.to_dict(orient="records") - for group in zip_longest(*(iter(records),) * chunk_record_size): - fp = io.BytesIO() - fp = self._write_msgpack_stream(group, fp) - fps.append(fp) - stack.callback(fp.close) + try: + for group in zip_longest(*(iter(records),) * chunk_record_size): + fp = tempfile.NamedTemporaryFile(suffix=".msgpack.gz", delete=False) + fp = self._write_msgpack_stream(group, fp) + fps.append(fp) + stack.callback(os.unlink, fp.name) + stack.callback(fp.close) + except OSError as e: + raise RuntimeError( + "failed to create a temporary file. " + "Increase chunk_record_size may mitigate the issue." + ) from e else: raise ValueError( f"unsupported format '{fmt}' for bulk import. " @@ -514,19 +520,21 @@ def _bulk_import(self, table, file_like, if_exists, fmt="csv", max_workers=5): bulk_import = table.client.api_client.create_bulk_import( session_name, table.database, table.table, params=params ) + s_time = time.time() try: logger.info(f"uploading data converted into a {fmt} file") if fmt == "msgpack": with ThreadPoolExecutor(max_workers=max_workers) as executor: - _ = [ + for i, fp in enumerate(file_like): + fsize = fp.tell() + fp.seek(0) executor.submit( bulk_import.upload_part, f"part-{i}", fp, - fp.getbuffer().nbytes, + fsize, ) - for i, fp in enumerate(file_like) - ] + logger.debug(f"to upload {fp.name} to TD. File size: {fsize}B") else: fp = file_like[0] bulk_import.upload_file("part", fmt, fp) @@ -535,6 +543,8 @@ def _bulk_import(self, table, file_like, if_exists, fmt="csv", max_workers=5): bulk_import.delete() raise RuntimeError(f"failed to upload file: {e}") + logger.info(f"uploaded data in {time.time() - s_time:.2f} sec") + logger.info("performing a bulk import job") job = bulk_import.perform(wait=True) @@ -581,7 +591,9 @@ def _write_msgpack_stream(self, items, stream): mp = packer.pack(normalized_msgpack(item)) gz.write(mp) - stream.seek(0) + logger.debug( + f"created a msgpack file: {stream.name}. File size: {stream.tell()}" + ) return stream From bfe2233c15c17b89c320a7c21a1add4e79fa619b Mon Sep 17 00:00:00 2001 From: Aki Ariga Date: Tue, 27 Aug 2024 14:11:40 -0700 Subject: [PATCH 4/6] Convert dataframe to dict by chunk to reduce memory consumption --- pytd/tests/test_writer.py | 16 +++++++++------- pytd/writer.py | 18 +++++++++--------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/pytd/tests/test_writer.py b/pytd/tests/test_writer.py index ca746ad..9bbfd8a 100644 --- a/pytd/tests/test_writer.py +++ b/pytd/tests/test_writer.py @@ -279,7 +279,9 @@ def test_write_dataframe_msgpack(self): self.assertTrue(api_client.create_bulk_import.called) self.assertTrue(api_client.create_bulk_import().upload_part.called) with tempfile.NamedTemporaryFile(delete=False) as fp: - fp = BulkImportWriter()._write_msgpack_stream(df.to_dict(orient="records"), fp) + fp = BulkImportWriter()._write_msgpack_stream( + df.to_dict(orient="records"), fp + ) api_client.create_bulk_import().upload_part.assert_called_with( "part-0", ANY, 62 ) @@ -295,10 +297,10 @@ def test_write_dataframe_msgpack_with_int_na(self): ], dtype="Int64", ) - expected_list = ( + expected_list = [ {"a": 1, "b": 2, "c": None, "time": 1234}, {"a": 3, "b": 4, "c": 5, "time": 1234}, - ) + ] self.writer._write_msgpack_stream = MagicMock() with patch("pytd.writer.os.unlink"): self.writer.write_dataframe(df, self.table, "overwrite", fmt="msgpack") @@ -315,10 +317,10 @@ def test_write_dataframe_msgpack_with_string_na(self): dtype="string", ) df["time"] = 1234 - expected_list = ( + expected_list = [ {"a": "foo", "b": "bar", "c": None, "time": 1234}, {"a": "buzz", "b": "buzz", "c": "alice", "time": 1234}, - ) + ] self.writer._write_msgpack_stream = MagicMock() with patch("pytd.writer.os.unlink"): self.writer.write_dataframe(df, self.table, "overwrite", fmt="msgpack") @@ -334,10 +336,10 @@ def test_write_dataframe_msgpack_with_boolean_na(self): dtype="boolean", ) df["time"] = 1234 - expected_list = ( + expected_list = [ {"a": "true", "b": "false", "c": None, "time": 1234}, {"a": "false", "b": "true", "c": "true", "time": 1234}, - ) + ] self.writer._write_msgpack_stream = MagicMock() with patch("pytd.writer.os.unlink"): self.writer.write_dataframe(df, self.table, "overwrite", fmt="msgpack") diff --git a/pytd/writer.py b/pytd/writer.py index 9423f2d..e005f0a 100644 --- a/pytd/writer.py +++ b/pytd/writer.py @@ -7,7 +7,6 @@ import uuid from concurrent.futures import ThreadPoolExecutor from contextlib import ExitStack -from itertools import zip_longest import msgpack import numpy as np @@ -448,11 +447,15 @@ def write_dataframe( elif fmt == "msgpack": _replace_pd_na(dataframe) - records = dataframe.to_dict(orient="records") try: - for group in zip_longest(*(iter(records),) * chunk_record_size): - fp = tempfile.NamedTemporaryFile(suffix=".msgpack.gz", delete=False) - fp = self._write_msgpack_stream(group, fp) + for start in range(0, len(dataframe), chunk_record_size): + records = dataframe.iloc[ + start : start + chunk_record_size + ].to_dict(orient="records") + fp = tempfile.NamedTemporaryFile( + suffix=".msgpack.gz", delete=False + ) + fp = self._write_msgpack_stream(records, fp) fps.append(fp) stack.callback(os.unlink, fp.name) stack.callback(fp.close) @@ -543,7 +546,7 @@ def _bulk_import(self, table, file_like, if_exists, fmt="csv", max_workers=5): bulk_import.delete() raise RuntimeError(f"failed to upload file: {e}") - logger.info(f"uploaded data in {time.time() - s_time:.2f} sec") + logger.debug(f"uploaded data in {time.time() - s_time:.2f} sec") logger.info("performing a bulk import job") job = bulk_import.perform(wait=True) @@ -581,9 +584,6 @@ def _write_msgpack_stream(self, items, stream): with gzip.GzipFile(mode="wb", fileobj=stream) as gz: packer = msgpack.Packer() for item in items: - # Ignore None created by zip_longest - if not item: - break try: mp = packer.pack(item) except (OverflowError, ValueError): From b000278b14d36dcc9dd752b1f708bb17f059a1ee Mon Sep 17 00:00:00 2001 From: Aki Ariga Date: Wed, 28 Aug 2024 08:25:53 -0700 Subject: [PATCH 5/6] Better wording for an error message Co-authored-by: Roman Shtykh --- pytd/writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytd/writer.py b/pytd/writer.py index e005f0a..b8552d1 100644 --- a/pytd/writer.py +++ b/pytd/writer.py @@ -462,7 +462,7 @@ def write_dataframe( except OSError as e: raise RuntimeError( "failed to create a temporary file. " - "Increase chunk_record_size may mitigate the issue." + "Larger chunk_record_size may mitigate the issue." ) from e else: raise ValueError( From 04c5500864e901204e6fb4ff2a3a440b75d917c9 Mon Sep 17 00:00:00 2001 From: Aki Ariga Date: Wed, 28 Aug 2024 08:27:23 -0700 Subject: [PATCH 6/6] Rename variable to prural --- pytd/writer.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pytd/writer.py b/pytd/writer.py index b8552d1..4da413b 100644 --- a/pytd/writer.py +++ b/pytd/writer.py @@ -472,7 +472,7 @@ def write_dataframe( self._bulk_import(table, fps, if_exists, fmt, max_workers=max_workers) stack.close() - def _bulk_import(self, table, file_like, if_exists, fmt="csv", max_workers=5): + def _bulk_import(self, table, file_likes, if_exists, fmt="csv", max_workers=5): """Write a specified CSV file to a Treasure Data table. This method uploads the file to Treasure Data via bulk import API. @@ -482,7 +482,7 @@ def _bulk_import(self, table, file_like, if_exists, fmt="csv", max_workers=5): table : :class:`pytd.table.Table` Target table. - file_like : List of file like objects + file_likes : List of file like objects Data in this file will be loaded to a target table. if_exists : str, {'error', 'overwrite', 'append', 'ignore'} @@ -528,7 +528,7 @@ def _bulk_import(self, table, file_like, if_exists, fmt="csv", max_workers=5): logger.info(f"uploading data converted into a {fmt} file") if fmt == "msgpack": with ThreadPoolExecutor(max_workers=max_workers) as executor: - for i, fp in enumerate(file_like): + for i, fp in enumerate(file_likes): fsize = fp.tell() fp.seek(0) executor.submit( @@ -539,7 +539,7 @@ def _bulk_import(self, table, file_like, if_exists, fmt="csv", max_workers=5): ) logger.debug(f"to upload {fp.name} to TD. File size: {fsize}B") else: - fp = file_like[0] + fp = file_likes[0] bulk_import.upload_file("part", fmt, fp) bulk_import.freeze() except Exception as e: