diff --git a/exaudfclient/base/python/python3/python_ext_dataframe.cc b/exaudfclient/base/python/python3/python_ext_dataframe.cc index 36f0fffdb..972560c66 100644 --- a/exaudfclient/base/python/python3/python_ext_dataframe.cc +++ b/exaudfclient/base/python/python3/python_ext_dataframe.cc @@ -1,3 +1,4 @@ +#include #include "exaudflib/swig/swig_common.h" #include "debug_message.h" @@ -162,12 +163,18 @@ struct PyPtr { PyUniquePtr ptr; }; -inline void checkPyPtrIsNull(const PyPtr& obj) { +inline void checkPyPtrIsNotNull(const PyPtr& obj) { // Error message set by Python if (!obj) throw std::runtime_error("F-UDF-CL-SL-PYTHON-1039"); } +inline void checkPyObjIsNotNull(const PyObject *obj) { + // Error message set by Python + if (!obj) + throw std::runtime_error("F-UDF-CL-SL-PYTHON-1142"); +} + struct ColumnInfo @@ -620,7 +627,7 @@ inline void handleEmitNpyUint64( throw std::runtime_error(ss.str().c_str()); } } - checkPyPtrIsNull(pyValue); + checkPyPtrIsNotNull(pyValue); pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pyColSetMethods[c].second.get(), pyColSetMethods[c].first.get(), pyValue.get(), NULL)); } @@ -657,7 +664,7 @@ inline void handleEmitNpyUint32( throw std::runtime_error(ss.str().c_str()); } } - checkPyPtrIsNull(pyValue); + checkPyPtrIsNotNull(pyValue); pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pyColSetMethods[c].second.get(), pyColSetMethods[c].first.get(), pyValue.get(), NULL)); } @@ -694,7 +701,7 @@ inline void handleEmitNpyUint16( throw std::runtime_error(ss.str().c_str()); } } - checkPyPtrIsNull(pyValue); + checkPyPtrIsNotNull(pyValue); pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pyColSetMethods[c].second.get(), pyColSetMethods[c].first.get(), pyValue.get(), NULL)); } @@ -731,7 +738,7 @@ inline void handleEmitNpyUint8( throw std::runtime_error(ss.str().c_str()); } } - checkPyPtrIsNull(pyValue); + checkPyPtrIsNotNull(pyValue); pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pyColSetMethods[c].second.get(), pyColSetMethods[c].first.get(), pyValue.get(), NULL)); } @@ -768,7 +775,7 @@ inline void handleEmitNpyFloat64( throw std::runtime_error(ss.str().c_str()); } } - checkPyPtrIsNull(pyValue); + checkPyPtrIsNotNull(pyValue); pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pyColSetMethods[c].second.get(), pyColSetMethods[c].first.get(), pyValue.get(), NULL)); } @@ -805,7 +812,7 @@ inline void handleEmitNpyFloat32( throw std::runtime_error(ss.str().c_str()); } } - checkPyPtrIsNull(pyValue); + checkPyPtrIsNotNull(pyValue); pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pyColSetMethods[c].second.get(), pyColSetMethods[c].first.get(), pyValue.get(), NULL)); } @@ -842,7 +849,7 @@ inline void handleEmitNpyBool( throw std::runtime_error(ss.str().c_str()); } } - checkPyPtrIsNull(pyValue); + checkPyPtrIsNotNull(pyValue); pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pyColSetMethods[c].second.get(), pyColSetMethods[c].first.get(), pyValue.get(), NULL)); } @@ -856,19 +863,20 @@ inline void handleEmitPyBool( PyPtr& pyValue, PyPtr& pyResult, PyPtr& pySetNullMethodName){ - PyPtr pyBool(PyList_GetItem(columnArrays[c].get(), r)); - checkPyPtrIsNull(pyBool); - if (isNoneOrNA(pyBool.get())) { + //PyList_GetItem returns a 'borrowed' reference. Must not call XDECREF() on it. + PyObject *pyBool = PyList_GetItem(columnArrays[c].get(), r); + checkPyObjIsNotNull(pyBool); + if (isNoneOrNA(pyBool)) { pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pySetNullMethodName.get(), pyColSetMethods[c].first.get(), NULL)); return; } switch (colInfo[c].type) { case SWIGVMContainers::BOOLEAN: - if (pyBool.get() == Py_True) { + if (pyBool == Py_True) { Py_INCREF(Py_True); pyValue.reset(Py_True); } - else if (pyBool.get() == Py_False) { + else if (pyBool == Py_False) { Py_INCREF(Py_False); pyValue.reset(Py_False); } @@ -883,7 +891,7 @@ inline void handleEmitPyBool( throw std::runtime_error(ss.str().c_str()); } } - checkPyPtrIsNull(pyValue); + checkPyPtrIsNotNull(pyValue); pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pyColSetMethods[c].second.get(), pyColSetMethods[c].first.get(), pyValue.get(), NULL)); } @@ -897,9 +905,10 @@ inline void handleEmitPyInt( PyPtr& pyValue, PyPtr& pyResult, PyPtr& pySetNullMethodName){ - PyPtr pyInt(PyList_GetItem(columnArrays[c].get(), r)); - checkPyPtrIsNull(pyInt); - if (isNoneOrNA(pyInt.get())) { + //PyList_GetItem returns a 'borrowed' reference. Must not call XDECREF() on it. + PyObject *pyInt = PyList_GetItem(columnArrays[c].get(), r); + checkPyObjIsNotNull(pyInt); + if (isNoneOrNA(pyInt)) { pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pySetNullMethodName.get(), pyColSetMethods[c].first.get(), NULL)); return; } @@ -907,14 +916,18 @@ inline void handleEmitPyInt( switch (colInfo[c].type) { case SWIGVMContainers::INT64: case SWIGVMContainers::INT32: - pyValue.reset(pyInt.release()); + { + //pyInt points to a 'borrowed' reference. We need to explicitly increase the ref counter here, as pyValue will decrease it again later. + Py_INCREF(pyInt); + pyValue.reset(pyInt); break; + } case SWIGVMContainers::NUMERIC: - pyValue.reset(PyObject_Str(pyInt.get())); + pyValue.reset(PyObject_Str(pyInt)); break; case SWIGVMContainers::DOUBLE: { - double value = PyFloat_AsDouble(pyInt.get()); + double value = PyFloat_AsDouble(pyInt); if (value < 0 && PyErr_Occurred()) throw std::runtime_error("F-UDF-CL-SL-PYTHON-1067: emit() PY_INT: PyFloat_AsDouble error"); pyValue.reset(PyFloat_FromDouble(value)); @@ -940,9 +953,10 @@ inline void handleEmitPyFloat( PyPtr& pyValue, PyPtr& pyResult, PyPtr& pySetNullMethodName){ - PyPtr pyFloat(PyList_GetItem(columnArrays[c].get(), r)); - checkPyPtrIsNull(pyFloat); - if (isNoneOrNA(pyFloat.get())) { + //PyList_GetItem returns a 'borrowed' reference. Must not call XDECREF() on it. + PyObject *pyFloat = PyList_GetItem(columnArrays[c].get(), r); + checkPyObjIsNotNull(pyFloat); + if (isNoneOrNA(pyFloat)) { pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pySetNullMethodName.get(), pyColSetMethods[c].first.get(), NULL)); return; } @@ -951,7 +965,7 @@ inline void handleEmitPyFloat( case SWIGVMContainers::INT64: case SWIGVMContainers::INT32: { - double value = PyFloat_AsDouble(pyFloat.get()); + double value = PyFloat_AsDouble(pyFloat); if (value < 0 && PyErr_Occurred()) throw std::runtime_error("F-UDF-CL-SL-PYTHON-1139: emit() PY_FLOAT: PyFloat_AsDouble error"); if (npy_isnan(value)) { @@ -963,11 +977,15 @@ inline void handleEmitPyFloat( break; } case SWIGVMContainers::NUMERIC: - pyValue.reset(PyObject_Str(pyFloat.get())); + pyValue.reset(PyObject_Str(pyFloat)); break; case SWIGVMContainers::DOUBLE: - pyValue.reset(pyFloat.release()); + { + //pyFloat points to a 'borrowed' reference. We need to explicitly increase the ref counter here, as pyValue will decrease it again later. + Py_INCREF(pyFloat); + pyValue.reset(pyFloat); break; + } default: { std::stringstream ss; @@ -991,8 +1009,9 @@ inline void handleEmitPyDecimal( PyPtr& pyIntMethodName, PyPtr& pyFloatMethodName ){ - PyPtr pyDecimal(PyList_GetItem(columnArrays[c].get(), r)); - if (isNoneOrNA(pyDecimal.get())) { + //PyList_GetItem returns a 'borrowed' reference. Must not call XDECREF() on it. + PyObject *pyDecimal = PyList_GetItem(columnArrays[c].get(), r); + if (isNoneOrNA(pyDecimal)) { pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pySetNullMethodName.get(), pyColSetMethods[c].first.get(), NULL)); return; } @@ -1001,16 +1020,16 @@ inline void handleEmitPyDecimal( case SWIGVMContainers::INT64: case SWIGVMContainers::INT32: { - PyPtr pyInt(PyObject_CallMethodObjArgs(pyDecimal.get(), pyIntMethodName.get(), NULL)); + PyPtr pyInt(PyObject_CallMethodObjArgs(pyDecimal, pyIntMethodName.get(), NULL)); pyValue.reset(pyInt.release()); break; } case SWIGVMContainers::NUMERIC: - pyValue.reset(PyObject_Str(pyDecimal.get())); + pyValue.reset(PyObject_Str(pyDecimal)); break; case SWIGVMContainers::DOUBLE: { - PyPtr pyFloat(PyObject_CallMethodObjArgs(pyDecimal.get(), pyFloatMethodName.get(), NULL)); + PyPtr pyFloat(PyObject_CallMethodObjArgs(pyDecimal, pyFloatMethodName.get(), NULL)); pyValue.reset(pyFloat.release()); break; } @@ -1034,25 +1053,25 @@ inline void handleEmitPyStr( PyPtr& pyValue, PyPtr& pyResult, PyPtr& pySetNullMethodName){ - - PyPtr pyString(PyList_GetItem(columnArrays[c].get(), r)); - if (isNoneOrNA(pyString.get())) { + //PyList_GetItem returns a 'borrowed' reference. Must not call XDECREF() on it. + PyObject *pyString = PyList_GetItem(columnArrays[c].get(), r); + if (isNoneOrNA(pyString)) { pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pySetNullMethodName.get(), pyColSetMethods[c].first.get(), NULL)); return; } switch (colInfo[c].type) { case SWIGVMContainers::NUMERIC: - pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pyColSetMethods[c].second.get(), pyColSetMethods[c].first.get(), pyString.get(), NULL)); + pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pyColSetMethods[c].second.get(), pyColSetMethods[c].first.get(), pyString, NULL)); break; case SWIGVMContainers::STRING: { Py_ssize_t size = -1; - const char *str = PyUnicode_AsUTF8AndSize(pyString.get(), &size); + const char *str = PyUnicode_AsUTF8AndSize(pyString, &size); if (!str && size < 0) throw std::runtime_error("F-UDF-CL-SL-PYTHON-1137: invalid size of string"); PyPtr pySize(PyLong_FromSsize_t(size)); - pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pyColSetMethods[c].second.get(), pyColSetMethods[c].first.get(), pyString.get(), pySize.get(), NULL)); + pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pyColSetMethods[c].second.get(), pyColSetMethods[c].first.get(), pyString, pySize.get(), NULL)); break; } default: @@ -1075,8 +1094,9 @@ inline void handleEmitPyDate( PyPtr& pyResult, PyPtr& pySetNullMethodName, PyPtr& pyIsoformatMethodName){ - PyPtr pyDate(PyList_GetItem(columnArrays[c].get(), r)); - if (isNoneOrNA(pyDate.get())) { + //PyList_GetItem returns a 'borrowed' reference. Must not call XDECREF() on it. + PyObject *pyDate = PyList_GetItem(columnArrays[c].get(), r); + if (isNoneOrNA(pyDate)) { pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pySetNullMethodName.get(), pyColSetMethods[c].first.get(), NULL)); return; } @@ -1084,7 +1104,7 @@ inline void handleEmitPyDate( switch (colInfo[c].type) { case SWIGVMContainers::DATE: { - PyPtr pyIsoDate(PyObject_CallMethodObjArgs(pyDate.get(), pyIsoformatMethodName.get(), NULL)); + PyPtr pyIsoDate(PyObject_CallMethodObjArgs(pyDate, pyIsoformatMethodName.get(), NULL)); pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pyColSetMethods[c].second.get(), pyColSetMethods[c].first.get(), pyIsoDate.get(), NULL)); break; } @@ -1106,8 +1126,9 @@ inline void handleEmitPyTimestamp( PyPtr& pyValue, PyPtr& pyResult, PyPtr& pySetNullMethodName){ - PyPtr pyTimestamp(PyList_GetItem(columnArrays[c].get(), r)); - if (isNoneOrNA(pyTimestamp.get())) { + //PyList_GetItem returns a 'borrowed' reference. Must not call XDECREF() on it. + PyObject *pyTimestamp = PyList_GetItem(columnArrays[c].get(), r); + if (isNoneOrNA(pyTimestamp)) { pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pySetNullMethodName.get(), pyColSetMethods[c].first.get(), NULL)); return; } @@ -1118,8 +1139,8 @@ inline void handleEmitPyTimestamp( // We call here pandas.Timestamp.tz_localize(None), because we need to remove the timezone from the timestamp. // Exasol doesn't support timezones, and if we don't remove the timezone, pandas.Timestamp.isoformat will add // it to the generated string. - pyTimestamp.reset(PyObject_CallMethod(pyTimestamp.get(), "tz_localize", "z", NULL)); - PyPtr pyIsoDatetime(PyObject_CallMethod(pyTimestamp.get(), "isoformat", "s", " ")); + PyPtr pyTzLocalize(PyObject_CallMethod(pyTimestamp, "tz_localize", "z", NULL)); + PyPtr pyIsoDatetime(PyObject_CallMethod(pyTzLocalize.get(), "isoformat", "s", " ")); pyResult.reset(PyObject_CallMethodObjArgs( resultHandler, pyColSetMethods[c].second.get(), pyColSetMethods[c].first.get(), pyIsoDatetime.get(), NULL)); break; @@ -1145,8 +1166,9 @@ inline void handleEmitNpyDateTime( PyPtr& pyResult, PyPtr& pySetNullMethodName, PyPtr& pdNaT){ - PyPtr pyTimestamp(PyList_GetItem(columnArrays[c].get(), r)); - if (pyTimestamp.get() == pdNaT.get()) { + //PyList_GetItem returns a 'borrowed' reference. Must not call XDECREF() on it. + PyObject *pyTimestamp = PyList_GetItem(columnArrays[c].get(), r); + if (pyTimestamp == pdNaT.get()) { pyResult.reset(PyObject_CallMethodObjArgs(resultHandler, pySetNullMethodName.get(), pyColSetMethods[c].first.get(), NULL)); return; } @@ -1154,7 +1176,7 @@ inline void handleEmitNpyDateTime( switch (colInfo[c].type) { case SWIGVMContainers::TIMESTAMP: { - PyPtr pyIsoDatetime(PyObject_CallMethod(pyTimestamp.get(), "isoformat", "s", " ")); + PyPtr pyIsoDatetime(PyObject_CallMethod(pyTimestamp, "isoformat", "s", " ")); pyResult.reset(PyObject_CallMethodObjArgs( resultHandler, pyColSetMethods[c].second.get(), pyColSetMethods[c].first.get(), pyIsoDatetime.get(), NULL)); break; @@ -1189,32 +1211,34 @@ void emit(PyObject *resultHandler, std::vector& colInfo, PyObject *d PyPtr data; + PyPtr arrayPtr; PyArrayObject *pyArray; PyPtr colArray; - if(colTypes.size()==1 && colTypes.at(0).second == NPY_DATETIME){ - // if we get an dataframe with a single datetime column with type datetime[ns], - // it doesn't get transformed into a 2D Array with the attributes values, - // instead we get a DatetimeIndex like this: - // DatetimeIndex(['2020-07-27 14:22:33.600699', ...], dtype='datetime64[ns]', freq=None) - // As a workaround we add a column to the dataframe via resetIndex, use values to get a 2D Array and - // slice out the datetime column, which then will be an Array of pandas.Timestamp objects - PyPtr resetIndex(PyObject_CallMethod(dataframe, "reset_index", NULL)); - data=PyPtr(PyObject_GetAttrString(resetIndex.get(), "values")); - pyArray = reinterpret_cast(PyArray_FROM_OTF(data.get(), NPY_OBJECT, NPY_ARRAY_IN_ARRAY)); + bool allColsAreDateTime = + std::all_of(colTypes.begin(), colTypes.end(), + [](std::pair colType) { + return colType.second == NPY_DATETIME; + }); + if(allColsAreDateTime) { + // if we get an dataframe with only datetime columns with type datetime[ns], + // As we call PyArray_FROM_OTF(data.get(), NPY_OBJECT, NPY_ARRAY_IN_ARRAY) with parameter NPY_OBJECT we need + // to explicitly cast the values to type 'object'. Per default the dtypes of values are datetime[ns]. + PyPtr asTypeFunc (PyObject_GetAttrString(dataframe, "astype")); + PyPtr keywordArgs(PyDict_New()); + PyDict_SetItemString(keywordArgs.get(), "copy", Py_False); + PyPtr funcArgs(Py_BuildValue("(s)", "object")); + PyPtr castedValues(PyObject_Call(asTypeFunc.get(), funcArgs.get(), keywordArgs.get())); + data.reset(PyObject_GetAttrString(castedValues.get(), "values")); + arrayPtr = PyPtr(PyArray_FROM_OTF(data.get(), NPY_OBJECT, NPY_ARRAY_IN_ARRAY)); + pyArray = reinterpret_cast(arrayPtr.get()); numRows = PyArray_DIM(pyArray, 0); - numCols = PyArray_DIM(pyArray, 1)-1; + numCols = PyArray_DIM(pyArray, 1); // Transpose to column-major - PyPtr transpose = PyPtr(PyArray_Transpose(pyArray, NULL)); - - PyPtr pyStart(PyLong_FromLong(1)); - PyPtr pyStop(PyLong_FromLong(2)); - PyPtr slice(PySlice_New(pyStart.get(), pyStop.get(), Py_None)); - colArray=PyPtr(PyObject_GetItem(transpose.get(), slice.get())); - - + colArray = PyPtr(PyArray_Transpose(pyArray, NULL)); }else{ data=PyPtr(PyObject_GetAttrString(dataframe, "values")); - pyArray = reinterpret_cast(PyArray_FROM_OTF(data.get(), NPY_OBJECT, NPY_ARRAY_IN_ARRAY)); + arrayPtr = PyPtr(PyArray_FROM_OTF(data.get(), NPY_OBJECT, NPY_ARRAY_IN_ARRAY)); + pyArray = reinterpret_cast(arrayPtr.get()); numRows = PyArray_DIM(pyArray, 0); numCols = PyArray_DIM(pyArray, 1); // Transpose to column-major diff --git a/test_container/build/deps/requirements.txt b/test_container/build/deps/requirements.txt index 860734413..3d224a6ea 100644 --- a/test_container/build/deps/requirements.txt +++ b/test_container/build/deps/requirements.txt @@ -4,3 +4,4 @@ lxml docker scipy https://github.com/exasol/exasol-python-test-framework/releases/download/0.4.0/exasol_python_test_framework-0.4.0-py3-none-any.whl +humanfriendly \ No newline at end of file diff --git a/test_container/tests/test/pandas/all/dataframe.py b/test_container/tests/test/pandas/all/dataframe.py index 58c9fd738..1e5ba1f25 100755 --- a/test_container/tests/test/pandas/all/dataframe.py +++ b/test_container/tests/test/pandas/all/dataframe.py @@ -818,6 +818,36 @@ def run(ctx): (datetime.datetime(2020, 7, 27, 14, 22, 33, 673000),) ], rows) + def test_dataframe_set_emits_timestamp_many_column(self): + import datetime + emits = [f"ts{i} timestamp" for i in range(40)] + emits_str = ",".join(emits) + udf_sql = udf.fixindent(''' + CREATE OR REPLACE PYTHON3 SET SCRIPT foo(sec int) EMITS (%s) AS + + def run(ctx): + import pandas as pd + import numpy as np + from datetime import datetime + + ts1 = pd.Timestamp(datetime(2020, 7, 27, 14, 22, 33, 673251)) + ts2 = pd.Timestamp(datetime(2021, 7, 27, 14, 22, 33, 673251)) + df = pd.DataFrame([[ts1, ts2]*20]*40, dtype="datetime64[ns]") + + ctx.emit(df) + / + ''' % (emits_str)) + print(udf_sql) + self.query(udf_sql) + select_sql = 'SELECT foo(1)' + print(select_sql) + rows = self.query(select_sql) + ts1 = datetime.datetime(2020, 7, 27, 14, 22, 33, 673000) + ts2 = datetime.datetime(2021, 7, 27, 14, 22, 33, 673000) + + result_rows = [[ts1, ts2] * 20] * 40 + self.assertRowsEqual(rows, result_rows) + def test_dataframe_set_emits_timestamp_with_timezone_only_fail(self): import datetime udf_sql = udf.fixindent(''' diff --git a/test_container/tests/test/pandas/all/dataframe_memory_leak.py b/test_container/tests/test/pandas/all/dataframe_memory_leak.py new file mode 100755 index 000000000..ee9820bcd --- /dev/null +++ b/test_container/tests/test/pandas/all/dataframe_memory_leak.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python3 +import textwrap +from decimal import Decimal +from datetime import date +from datetime import datetime + +from exasol_python_test_framework import udf +from exasol_python_test_framework.exatest.testcase import useData +from exasol_python_test_framework.udf.udf_debug import UdfDebugger +from typing import List, Tuple, Union + + +class PandasDataFrameMemoryLeakTest(udf.TestCase): + def setUp(self): + self.maxDiff=None + + self.query('CREATE SCHEMA FN2', ignore_errors=True) + self.query('OPEN SCHEMA FN2', ignore_errors=True) + + self.create_col_defs = [ + ('C0','INT IDENTITY'), + ('C1','Decimal(2,0)'), + ('C2','Decimal(4,0)'), + ('C3','Decimal(8,0)'), + ('C4','Decimal(16,0)'), + ('C5','Decimal(36,0)'), + ('C6','DOUBLE'), + ('C7','BOOLEAN'), + ('C8','VARCHAR(500)'), + ('C9','CHAR(10)'), + ('C10','DATE'), + ('C11','TIMESTAMP') + ] + self.create_col_defs_str = ','.join( + '%s %s'%(name,type_decl) + for name, type_decl + in self.create_col_defs + ) + self.col_defs = self.create_col_defs[1:] + self.col_defs_str = ','.join( + '%s %s'%(name,type_decl) + for name, type_decl + in self.col_defs + ) + self.col_names = [name for name, type_decl in self.col_defs] + self.col_names_str = ','.join(self.col_names) + + self.col_tuple = ( + Decimal('1'), + Decimal('1234'), + Decimal('12345678'), + Decimal('1234567890123456'), + Decimal('123456789012345678901234567890123456'), + 12345.6789, + True, + 'abcdefghij', + 'abcdefgh ', + date(2018, 10, 12), + datetime(2018, 10, 12, 12, 15, 30, 123000) + ) + self.create_table_1() + + + def create_table(self,table_name,create_col_defs_str): + create_table_sql='CREATE TABLE %s (%s)' % (table_name,create_col_defs_str) + print("Create Table Statement %s"%create_table_sql) + self.query(create_table_sql) + + def create_table_1(self): + self.create_table("TEST1",self.create_col_defs_str) + self.import_via_insert("TEST1",[self.col_tuple],column_names=self.col_names) + num_inserts = 17 # => ~128 K rows + for i in range(num_inserts): + insert_sql = 'INSERT INTO TEST1 (%s) SELECT %s FROM TEST1' % (self.col_names_str, self.col_names_str) + print("Insert Statement %s"%insert_sql) + self.query(insert_sql) + self.num_rows = 2**num_inserts + + def test_dataframe_scalar_emits(self): + """ + This test checks that the largest memory block of a tracemalloc snapshot diff is not larger than 100KB, where + the memory block snapshots are retrieved during the first/last invocation of the scalar UDF, + but after the emit(). + Reasoning for 100KB is that the number of rows is > 100K, so if there was 1 Byte leaking during every execution, + it would be found here. + """ + udf_def_str = udf.fixindent(''' + CREATE OR REPLACE PYTHON3 SCALAR SCRIPT + foo(%s) + EMITS(%s) AS + + %%perNodeAndCallInstanceLimit 1; + + import tracemalloc + import gc + snapshot_begin = None + memory_check_executed = False + tracemalloc.start() + counter = 0 + + def run(ctx): + df = ctx.get_dataframe() + global memory_check_executed + global snapshot_begin + global counter + ctx.emit(df) + if counter == 0: + print("Retrieving start snapshot", flush=True) + snapshot_begin = tracemalloc.take_snapshot() + if counter == %s: + assert memory_check_executed == False #Sanity check for row number + print("Checking memory usage", flush=True) + gc.collect() + snapshot_end = tracemalloc.take_snapshot() + top_stats_begin_end = snapshot_end.compare_to(snapshot_begin, 'lineno') + first_item = top_stats_begin_end[0] #First item is always the largest one + if first_item.size_diff > 100000: + raise RuntimeError(f"scalar emit UDF uses too much memory: {first_item}") + memory_check_executed = True + counter = counter + 1 + / + + ''' % (self.col_defs_str, self.col_defs_str, self.num_rows - 1)) + self.query(udf_def_str) + select_sql = 'SELECT foo(%s) FROM FN2.TEST1' % (self.col_names_str) + rows = self.query(select_sql) + self.assertEqual(self.num_rows, len(rows)) + + def test_dataframe_scalar_emits_multiple(self): + """ + This test checks that the largest memory block of a tracemalloc snapshot diff is not larger than 150KB, where + the memory block snapshots are retrieved during the first/last invocation of the scalar UDF, + but after the emit(). The emit is called multiple times in this test. The scalar UDF gets 2^3 = 8 rows as input, + and emits 2~14 = 16384 dataframes of row size = 1 in each execution. + For some reason + (which would require further investigation) this test allocates ~ 120KB of memory in the module. + (Which is more compared to the other tests in this file). + However, manual tests showed that a further increase of the number of emitted dataframes (2~17 * 5) + does not increase the allocated memory even more. + """ + batch_count = 8 + batch_size = int(self.num_rows / batch_count) + udf_def_str = udf.fixindent(f''' + CREATE OR REPLACE PYTHON3 SCALAR SCRIPT + foo({self.col_defs_str}) + EMITS({self.col_defs_str}) AS + + %perNodeAndCallInstanceLimit 1; + + import tracemalloc + import gc + snapshot_begin = None + memory_check_executed = False + tracemalloc.start() + counter = 0 + + def run(ctx): + df = ctx.get_dataframe() + assert df.shape[0] == 1 + global memory_check_executed + global snapshot_begin + global counter + for idx_batch in range({batch_size}): + ctx.emit(df) + if counter == 0 and idx_batch == 0: + print("Retrieving start snapshot", flush=True) + snapshot_begin = tracemalloc.take_snapshot() + if counter == {batch_count - 1}: + assert memory_check_executed == False #Sanity check for row number + print("Checking memory usage", flush=True) + gc.collect() + snapshot_end = tracemalloc.take_snapshot() + top_stats_begin_end = snapshot_end.compare_to(snapshot_begin, 'lineno') + first_item = top_stats_begin_end[0] #First item is always the largest one + if first_item.size_diff > 150000: + raise RuntimeError(f"scalar emit UDF uses too much memory: {{first_item}}") + memory_check_executed = True + counter = counter + 1 + / + + ''') + print(udf_def_str) + self.query(udf_def_str) + select_sql = 'SELECT foo(%s) FROM FN2.TEST1 WHERE C0 <= %d; ' % (self.col_names_str, batch_count) + print(select_sql) + rows = self.query(select_sql) + self.assertEqual(self.num_rows, len(rows)) + + def test_dataframe_scalar_returns(self): + """ + This test checks that the largest memory block of a tracemalloc snapshot diff is not larger than 100KB, where + the memory block snapshots are retrieved during the first/last invocation of the scalar UDF. + Reasoning for 100KB is that the number of rows is > 100K, so if there was 1 Byte leaking during every execution, + it would be found here. + """ + udf_sql = udf.fixindent(''' + CREATE OR REPLACE PYTHON3 SCALAR SCRIPT + foo(%s) + RETURNS DECIMAL(10,5) AS + + + %%perNodeAndCallInstanceLimit 1; + + import tracemalloc + import gc + snapshot_begin = None + memory_check_executed = False + tracemalloc.start() + counter = 0 + + def run(ctx): + df = ctx.get_dataframe() + global memory_check_executed + global snapshot_begin + global counter + if counter == 0: + print("Retrieving start snapshot", flush=True) + snapshot_begin = tracemalloc.take_snapshot() + if counter == %s: + assert memory_check_executed == False #Sanity check for row number + print("Checking memory usage", flush=True) + gc.collect() + snapshot_end = tracemalloc.take_snapshot() + top_stats_begin_end = snapshot_end.compare_to(snapshot_begin, 'lineno') + first_item = top_stats_begin_end[0] #First item is always the largest one + if first_item.size_diff > 100000: + raise RuntimeError(f"scalar emit UDF uses too much memory: {first_item}") + memory_check_executed = True + counter = counter + 1 + + return (df.iloc[0, 0] + df.iloc[0, 1]).item() + / + ''' % (self.col_defs_str, self.num_rows - 1)) + self.query(udf_sql) + print(udf_sql) + select_sql = 'SELECT foo(%s) FROM FN2.TEST1' % (self.col_names_str) + print(select_sql) + rows = self.query(select_sql) + self.assertEqual(self.num_rows, len(rows)) + + + def test_dataframe_set_emits(self): + """ + This test checks that the largest memory block of a tracemalloc snapshot diff is not larger than 15KB, + where the memory block snapshots are retrieved during the first/last get+emit + of a batch of 2^14 = 16384 rows. In total, 2^3 = 8 batches are retrieved and emitted. + """ + batch_count = 8 + batch_size = int(self.num_rows / batch_count) + udf_sql = udf.fixindent(f''' + CREATE OR REPLACE PYTHON3 SET SCRIPT + foo({self.col_defs_str}) + EMITS({self.col_defs_str}) AS + + import tracemalloc + import gc + tracemalloc.start() + + def process_df(ctx): + df = ctx.get_dataframe(num_rows={batch_size}) + ctx.emit(df) + + def run(ctx): + + for batch_idx in range({batch_count}): + print(f"Processing batch {{batch_idx}}", flush=True) + + process_df(ctx) + if batch_idx == 0: + gc.collect() + snapshot_begin = tracemalloc.take_snapshot() + + if batch_idx == ({batch_count} - 1): + gc.collect() + snapshot_end = tracemalloc.take_snapshot() + top_stats_begin_end = snapshot_end.compare_to(snapshot_begin, 'lineno') + first_item = top_stats_begin_end[0] #First item is always the largest one + print(f"Largest memory item is {{first_item}}", flush=True) + if first_item.size_diff > 20000: + raise RuntimeError(f"scalar emit UDF uses too much memory: {{first_item}}") + / + ''') + print(udf_sql) + self.query(udf_sql) + select_sql = 'SELECT foo(%s) FROM TEST1' % self.col_names_str + print(select_sql) + rows = self.query(select_sql) + self.assertEqual(self.num_rows, len(rows)) + + +if __name__ == '__main__': + udf.main() + diff --git a/test_container/tests/test/pandas/all/emit_dtypes_memory_leak.py b/test_container/tests/test/pandas/all/emit_dtypes_memory_leak.py new file mode 100755 index 000000000..efda77120 --- /dev/null +++ b/test_container/tests/test/pandas/all/emit_dtypes_memory_leak.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 + +from decimal import Decimal +from datetime import date +from datetime import datetime + +from exasol_python_test_framework import udf +from exasol_python_test_framework.exatest.testcase import useData +from exasol_python_test_framework.udf.udf_debug import UdfDebugger +from typing import List, Tuple, Union +import humanfriendly + + +class PandasDataFrameEmitDTypesMemoryLeakCheck(udf.TestCase): + """ + This test creates huge dataframes inside a UDF, emits it, + and validates that memory consumption is within expected range. + """ + + def setUp(self): + self.maxDiff = None + + self.query(f'CREATE SCHEMA {self.__class__.__name__}', ignore_errors=True) + self.query(f'OPEN SCHEMA {self.__class__.__name__}', ignore_errors=True) + + max_memory_usage_bytes = humanfriendly.parse_size("200KB") + + int_dataframe_value_str = "1" + + float16_dataframe_value_str = '1.1' + float_dataframe_value_str = "1.1" + + str_dataframe_value_str = "'a'" + + bool_dataframe_value_str = "True" + + decimal_dataframe_value_str = "Decimal('1.1')" + + timestamp_dataframe_value_str = 'pd.Timestamp(datetime(2020, 7, 27, 14, 22, 33, 673251))' + datetime_dataframe_value_str = 'datetime(2020, 7, 27, 14, 22, 33, 673251)' + date_dataframe_value_str = 'date(2020, 7, 27)' + + types = [ + # Full columns without None or NaN / Int + + ("uint8", "integer", int_dataframe_value_str), + ("uint16", "integer", int_dataframe_value_str), + ("uint32", "integer", int_dataframe_value_str), + ("uint64", "integer", int_dataframe_value_str), + ("int8", "integer", int_dataframe_value_str), + ("int16", "integer", int_dataframe_value_str), + ("int32", "integer", int_dataframe_value_str), + ("int64", "integer", int_dataframe_value_str), + ("object", "integer", int_dataframe_value_str), + + # Full columns without None or NaN / Float + + ("float16", "double", float16_dataframe_value_str), + ("float32", "double", float_dataframe_value_str), + ("float64", "double", float_dataframe_value_str), + ("float", "double", float_dataframe_value_str), + ("double", "double", float_dataframe_value_str), + ("object", "double", float_dataframe_value_str), + + # Full columns without None or NaN / Int to Float + + ("uint8", "double", int_dataframe_value_str), + ("uint16", "double", int_dataframe_value_str), + ("uint32", "double", int_dataframe_value_str), + ("uint64", "double", int_dataframe_value_str), + ("int8", "double", int_dataframe_value_str), + ("int16", "double", int_dataframe_value_str), + ("int32", "double", int_dataframe_value_str), + ("int64", "double", int_dataframe_value_str), + ("object", "double", int_dataframe_value_str), + + # Full columns without None or NaN / Float to Int + + ("float16", "integer", float16_dataframe_value_str), + ("float32", "integer", float_dataframe_value_str), + ("float64", "integer", float_dataframe_value_str), + ("float", "integer", float_dataframe_value_str), + ("double", "integer", float_dataframe_value_str), + ("object", "integer", float_dataframe_value_str), + + # Full columns without None or NaN / Int to Decimal + + ("uint8", "DECIMAL(10,5)", int_dataframe_value_str), + ("uint16", "DECIMAL(10,5)", int_dataframe_value_str), + ("uint32", "DECIMAL(10,5)", int_dataframe_value_str), + ("uint64", "DECIMAL(10,5)", int_dataframe_value_str), + ("int8", "DECIMAL(10,5)", int_dataframe_value_str), + ("int16", "DECIMAL(10,5)", int_dataframe_value_str), + ("int32", "DECIMAL(10,5)", int_dataframe_value_str), + ("int64", "DECIMAL(10,5)", int_dataframe_value_str), + ("object", "DECIMAL(10,5)", int_dataframe_value_str), + + # Full columns without None or NaN / Float to Decimal + + ("float16", "DECIMAL(10,5)", float16_dataframe_value_str), + ("float32", "DECIMAL(10,5)", float_dataframe_value_str), + ("float64", "DECIMAL(10,5)", float_dataframe_value_str), + ("float", "DECIMAL(10,5)", float_dataframe_value_str), + ("double", "DECIMAL(10,5)", float_dataframe_value_str), + ("object", "DECIMAL(10,5)", float_dataframe_value_str), + + # Full columns without None or NaN / Decimal + + ("object", "DECIMAL(10,5)", decimal_dataframe_value_str), + + # Full columns without None or NaN / String + + ("string", "VARCHAR(2000000)", str_dataframe_value_str), + ("object", "VARCHAR(2000000)", str_dataframe_value_str), + + # Full columns without None or NaN / Boolean + + ("bool_", "boolean", bool_dataframe_value_str), + ("boolean", "boolean", bool_dataframe_value_str), + ("object", "boolean", bool_dataframe_value_str), + + # Full columns without None or NaN / Date and Time + + ("datetime64[ns]", "timestamp", timestamp_dataframe_value_str), + ("object", "timestamp", timestamp_dataframe_value_str), + ("object", "DATE", date_dataframe_value_str), + ] + + @useData(types) + def test_dtype_emit(self, dtype: str, sql_type: str, dataframe_value_str: str): + emit_cols = [f"o{i} {sql_type}" for i in range(25)] + emit_cols_str = ",".join(emit_cols) + udf_def_str = udf.fixindent(f''' + CREATE OR REPLACE PYTHON3 SCALAR SCRIPT test_dtype_emit("batch_size" integer, "batch_count" integer) + EMITS ({emit_cols_str}) AS + + import gc + import tracemalloc + import pandas as pd + from decimal import Decimal + from datetime import datetime, date + + def run(ctx): + tracemalloc.start() + for i in range(ctx.batch_count): + df = pd.DataFrame([[{dataframe_value_str} for c in range(25)] for r in range(ctx.batch_size)], + dtype="{dtype}") + ctx.emit(df) + if i == 0: + snapshot_begin = tracemalloc.take_snapshot() + elif i == ctx.batch_count - 1: + snapshot_end = tracemalloc.take_snapshot() + top_stats_begin_end = snapshot_end.compare_to(snapshot_begin, 'lineno') + first_item = top_stats_begin_end[0] #First item is always the largest one + if first_item.size_diff > {self.max_memory_usage_bytes}: + raise RuntimeError(f"scalar emit UDF uses too much memory: {{first_item}}") + + / + ''') + print(udf_def_str) + self.query(udf_def_str) + rows = self.query('''SELECT test_dtype_emit(100, 1000)''') + assert len(rows[0]) == 25 + assert len(rows) == 100000 + + +if __name__ == '__main__': + udf.main()