Skip to content

Commit

Permalink
Tests and fix for datetime only columns
Browse files Browse the repository at this point in the history
1. Added memory leak check tests
2. Added a test for a multi datetime column dataframe
  • Loading branch information
tomuben committed May 28, 2024
1 parent 804b8b0 commit f0e3837
Show file tree
Hide file tree
Showing 4 changed files with 438 additions and 18 deletions.
32 changes: 14 additions & 18 deletions exaudfclient/base/python/python3/python_ext_dataframe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1204,28 +1204,24 @@ void emit(PyObject *resultHandler, std::vector<ColumnInfo>& colInfo, PyObject *d
PyPtr arrayPtr;
PyArrayObject *pyArray;
PyPtr colArray;
if(colTypes.size()==1 && colTypes.at(0).second == NPY_DATETIME){
// if we get an dataframe with a single datetime column with type datetime[ns],
// it doesn't get transformed into a 2D Array with the attributes values,
// instead we get a DatetimeIndex like this:
// DatetimeIndex(['2020-07-27 14:22:33.600699', ...], dtype='datetime64[ns]', freq=None)
// As a workaround we add a column to the dataframe via resetIndex, use values to get a 2D Array and
// slice out the datetime column, which then will be an Array of pandas.Timestamp objects
PyPtr resetIndex(PyObject_CallMethod(dataframe, "reset_index", NULL));
data=PyPtr(PyObject_GetAttrString(resetIndex.get(), "values"));
bool allColsAreDateTime =
std::all_of(colTypes.begin(), colTypes.end(),
[](std::pair<std::string, int> colType) {
return colType.second == NPY_DATETIME;
});
if(allColsAreDateTime) {
PyPtr asTypeFunc (PyObject_GetAttrString(dataframe, "astype"));
PyPtr keywordArgs(PyDict_New());
PyDict_SetItemString(keywordArgs.get(), "copy", Py_False);
PyPtr funcArgs(Py_BuildValue("(s)", "object"));
PyPtr castedValues(PyObject_Call(asTypeFunc.get(), funcArgs.get(), keywordArgs.get()));
data.reset(PyObject_GetAttrString(castedValues.get(), "values"));
arrayPtr = PyPtr(PyArray_FROM_OTF(data.get(), NPY_OBJECT, NPY_ARRAY_IN_ARRAY));
pyArray = reinterpret_cast<PyArrayObject*>(arrayPtr.get());
numRows = PyArray_DIM(pyArray, 0);
numCols = PyArray_DIM(pyArray, 1)-1;
numCols = PyArray_DIM(pyArray, 1);
// Transpose to column-major
PyPtr transpose = PyPtr(PyArray_Transpose(pyArray, NULL));

PyPtr pyStart(PyLong_FromLong(1));
PyPtr pyStop(PyLong_FromLong(2));
PyPtr slice(PySlice_New(pyStart.get(), pyStop.get(), Py_None));
colArray=PyPtr(PyObject_GetItem(transpose.get(), slice.get()));


colArray = PyPtr(PyArray_Transpose(pyArray, NULL));
}else{
data=PyPtr(PyObject_GetAttrString(dataframe, "values"));
arrayPtr = PyPtr(PyArray_FROM_OTF(data.get(), NPY_OBJECT, NPY_ARRAY_IN_ARRAY));
Expand Down
30 changes: 30 additions & 0 deletions test_container/tests/test/pandas/all/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,36 @@ def run(ctx):
(datetime.datetime(2020, 7, 27, 14, 22, 33, 673000),)
], rows)

def test_dataframe_set_emits_timestamp_many_column(self):
import datetime
emits = [f"ts{i} timestamp" for i in range(40)]
emits_str = ",".join(emits)
udf_sql = udf.fixindent('''
CREATE OR REPLACE PYTHON3 SET SCRIPT foo(sec int) EMITS (%s) AS
def run(ctx):
import pandas as pd
import numpy as np
from datetime import datetime
ts1 = pd.Timestamp(datetime(2020, 7, 27, 14, 22, 33, 673251))
ts2 = pd.Timestamp(datetime(2021, 7, 27, 14, 22, 33, 673251))
df = pd.DataFrame([[ts1, ts2]*20]*40, dtype="datetime64[ns]")
ctx.emit(df)
/
''' % (emits_str))
print(udf_sql)
self.query(udf_sql)
select_sql = 'SELECT foo(1)'
print(select_sql)
rows = self.query(select_sql)
ts1 = datetime.datetime(2020, 7, 27, 14, 22, 33, 673000)
ts2 = datetime.datetime(2021, 7, 27, 14, 22, 33, 673000)

result_rows = [[ts1, ts2] * 20] * 40
self.assertRowsEqual(rows, result_rows)

def test_dataframe_set_emits_timestamp_with_timezone_only_fail(self):
import datetime
udf_sql = udf.fixindent('''
Expand Down
228 changes: 228 additions & 0 deletions test_container/tests/test/pandas/all/dataframe_memory_leak.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
#!/usr/bin/env python3

from decimal import Decimal
from datetime import date
from datetime import datetime

from exasol_python_test_framework import udf
from exasol_python_test_framework.exatest.testcase import useData
from exasol_python_test_framework.udf.udf_debug import UdfDebugger
from typing import List, Tuple, Union


class PandasDataFrameMemoryLeakTest(udf.TestCase):
def setUp(self):
self.maxDiff=None

self.query('CREATE SCHEMA FN2', ignore_errors=True)
self.query('OPEN SCHEMA FN2', ignore_errors=True)

self.create_col_defs = [
('C0','INT IDENTITY'),
('C1','Decimal(2,0)'),
('C2','Decimal(4,0)'),
('C3','Decimal(8,0)'),
('C4','Decimal(16,0)'),
('C5','Decimal(36,0)'),
('C6','DOUBLE'),
('C7','BOOLEAN'),
('C8','VARCHAR(500)'),
('C9','CHAR(10)'),
('C10','DATE'),
('C11','TIMESTAMP')
]
self.create_col_defs_str = ','.join(
'%s %s'%(name,type_decl)
for name, type_decl
in self.create_col_defs
)
self.col_defs = self.create_col_defs[1:]
self.col_defs_str = ','.join(
'%s %s'%(name,type_decl)
for name, type_decl
in self.col_defs
)
self.col_names = [name for name, type_decl in self.col_defs]
self.col_names_str = ','.join(self.col_names)

self.col_tuple = (
Decimal('1'),
Decimal('1234'),
Decimal('12345678'),
Decimal('1234567890123456'),
Decimal('123456789012345678901234567890123456'),
12345.6789,
True,
'abcdefghij',
'abcdefgh ',
date(2018, 10, 12),
datetime(2018, 10, 12, 12, 15, 30, 123000)
)
self.create_table_1()


def create_table(self,table_name,create_col_defs_str):
create_table_sql='CREATE TABLE %s (%s)' % (table_name,create_col_defs_str)
print("Create Table Statement %s"%create_table_sql)
self.query(create_table_sql)

def create_table_1(self):
self.create_table("TEST1",self.create_col_defs_str)
self.import_via_insert("TEST1",[self.col_tuple],column_names=self.col_names)
num_inserts = 17 # => ~128 K rows
for i in range(num_inserts):
insert_sql = 'INSERT INTO TEST1 (%s) SELECT %s FROM TEST1' % (self.col_names_str, self.col_names_str)
print("Insert Statement %s"%insert_sql)
self.query(insert_sql)
self.num_rows = 2**num_inserts

def test_dataframe_scalar_emits(self):
"""
This test checks that the largest memory block of a tracemalloc snapshot diff is not larger than 100KB, where
the memory block snapshots are retrieved during the first/last invocation of the scalar UDF,
but after the emit().
Reasoning for 100KB is that the number of rows is > 100K, so if there was 1 Byte leaking during every execution,
it would be found here.
"""
udf_def_str = udf.fixindent('''
CREATE OR REPLACE PYTHON3 SCALAR SCRIPT
foo(%s)
EMITS(%s) AS
%%perNodeAndCallInstanceLimit 1;
import tracemalloc
import gc
snapshot_begin = None
memory_check_executed = False
tracemalloc.start()
counter = 0
def run(ctx):
df = ctx.get_dataframe()
global memory_check_executed
global snapshot_begin
global counter
ctx.emit(df)
if counter == 0:
print("Retrieving start snapshot", flush=True)
snapshot_begin = tracemalloc.take_snapshot()
if counter == %s:
assert memory_check_executed == False #Sanity check for row number
print("Checking memory usage", flush=True)
gc.collect()
snapshot_end = tracemalloc.take_snapshot()
top_stats_begin_end = snapshot_end.compare_to(snapshot_begin, 'lineno')
first_item = top_stats_begin_end[0] #First item is always the largest one
if first_item.size_diff > 100000:
raise RuntimeError(f"scalar emit UDF uses too much memory: {first_item}")
memory_check_executed = True
counter = counter + 1
/
''' % (self.col_defs_str, self.col_defs_str, self.num_rows - 1))
self.query(udf_def_str)
select_sql = 'SELECT foo(%s) FROM FN2.TEST1' % (self.col_names_str)
rows = self.query(select_sql)
self.assertEqual(self.num_rows, len(rows))

def test_dataframe_scalar_returns(self):
"""
This test checks that the largest memory block of a tracemalloc snapshot diff is not larger than 100KB, where
the memory block snapshots are retrieved during the first/last invocation of the scalar UDF.
Reasoning for 100KB is that the number of rows is > 100K, so if there was 1 Byte leaking during every execution,
it would be found here.
"""
udf_sql = udf.fixindent('''
CREATE OR REPLACE PYTHON3 SCALAR SCRIPT
foo(%s)
RETURNS DECIMAL(10,5) AS
%%perNodeAndCallInstanceLimit 1;
import tracemalloc
import gc
snapshot_begin = None
memory_check_executed = False
tracemalloc.start()
counter = 0
def run(ctx):
df = ctx.get_dataframe()
global memory_check_executed
global snapshot_begin
global counter
if counter == 0:
print("Retrieving start snapshot", flush=True)
snapshot_begin = tracemalloc.take_snapshot()
if counter == %s:
assert memory_check_executed == False #Sanity check for row number
print("Checking memory usage", flush=True)
gc.collect()
snapshot_end = tracemalloc.take_snapshot()
top_stats_begin_end = snapshot_end.compare_to(snapshot_begin, 'lineno')
first_item = top_stats_begin_end[0] #First item is always the largest one
if first_item.size_diff > 100000:
raise RuntimeError(f"scalar emit UDF uses too much memory: {first_item}")
memory_check_executed = True
counter = counter + 1
return (df.iloc[0, 0] + df.iloc[0, 1]).item()
/
''' % (self.col_defs_str, self.num_rows - 1))
self.query(udf_sql)
print(udf_sql)
select_sql = 'SELECT foo(%s) FROM FN2.TEST1' % (self.col_names_str)
print(select_sql)
rows = self.query(select_sql)
self.assertEqual(self.num_rows, len(rows))


def test_dataframe_set_emits(self):
"""
This test validates that the <EXASCRIPT> module does not leak more than 100kb of RAM in a set/emits UDF.
The test is different from the others as it checks only the <EXASCRIPT> for leaks,
not the rest of the UDF client; the reason for that is that a
set UDF reads all input rows at once into a dataframe.
<EXASCRIPT> is the module name
of the pyextdataframe.so library (named during the runtime compilation during execution of a Python UDF).
"""
udf_sql = udf.fixindent('''
CREATE OR REPLACE PYTHON3 SET SCRIPT
foo(%s)
EMITS(%s) AS
import tracemalloc
import gc
tracemalloc.start()
def process_df(ctx):
df = ctx.get_dataframe(num_rows="all")
ctx.emit(df)
def run(ctx):
snapshot_begin = tracemalloc.take_snapshot()
process_df(ctx)
gc.collect()
snapshot_end = tracemalloc.take_snapshot()
top_stats_begin_end = snapshot_end.compare_to(snapshot_begin, 'lineno')
filtered_top_stats_begin_end = [stat for stat in top_stats_begin_end
if stat.traceback[0].filename == "<EXASCRIPT>"]
first_item = filtered_top_stats_begin_end[0] #First item is always the largest one
if first_item.size_diff > 100000:
raise RuntimeError(f"scalar emit UDF uses too much memory: {first_item}")
/
''' % (self.col_defs_str, self.col_defs_str))
print(udf_sql)
self.query(udf_sql)
select_sql = 'SELECT foo(%s) FROM FN2.TEST1' % (self.col_names_str)
print(select_sql)
rows = self.query(select_sql)
self.assertEqual(self.num_rows, len(rows))


if __name__ == '__main__':
udf.main()

Loading

0 comments on commit f0e3837

Please sign in to comment.