Skip to content

Commit

Permalink
Merge pull request #289 from chdb-io/patchset-2.2.0b1
Browse files Browse the repository at this point in the history
Fix starving caused by GIL
  • Loading branch information
auxten authored Dec 12, 2024
2 parents 59075e8 + 09da2d5 commit 01afb91
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 3 deletions.
4 changes: 3 additions & 1 deletion src/Processors/Sources/PythonSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ extern const int PY_EXCEPTION_OCCURED;

PythonSource::PythonSource(
py::object & data_source_,
bool isInheritsFromPyReader_,
const Block & sample_block_,
PyColumnVecPtr column_cache,
size_t data_source_row_count,
Expand All @@ -56,6 +57,7 @@ PythonSource::PythonSource(
size_t num_streams)
: ISource(sample_block_.cloneEmpty())
, data_source(data_source_)
, isInheritsFromPyReader(isInheritsFromPyReader_)
, sample_block(sample_block_)
, column_cache(column_cache)
, data_source_row_count(data_source_row_count)
Expand Down Expand Up @@ -544,7 +546,7 @@ Chunk PythonSource::generate()

try
{
if (isInheritsFromPyReader(data_source))
if (isInheritsFromPyReader)
{
PyObjectVecPtr data;
py::gil_scoped_acquire acquire;
Expand Down
2 changes: 2 additions & 0 deletions src/Processors/Sources/PythonSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class PythonSource : public ISource
public:
PythonSource(
py::object & data_source_,
bool isInheritsFromPyReader_,
const Block & sample_block_,
PyColumnVecPtr column_cache,
size_t data_source_row_count,
Expand All @@ -42,6 +43,7 @@ class PythonSource : public ISource

private:
py::object & data_source; // Do not own the reference
bool isInheritsFromPyReader; // If the data_source is a PyReader object

Block sample_block;
PyColumnVecPtr column_cache;
Expand Down
5 changes: 3 additions & 2 deletions src/Storages/StoragePython.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ Pipe StoragePython::read(

if (isInheritsFromPyReader(data_source))
{
return Pipe(std::make_shared<PythonSource>(data_source, sample_block, column_cache, data_source_row_count, max_block_size, 0, 1));
return Pipe(
std::make_shared<PythonSource>(data_source, true, sample_block, column_cache, data_source_row_count, max_block_size, 0, 1));
}

prepareColumnCache(column_names, sample_block.getColumns(), sample_block);
Expand All @@ -79,7 +80,7 @@ Pipe StoragePython::read(
// num_streams = 32; // for chdb testing
for (size_t stream = 0; stream < num_streams; ++stream)
pipes.emplace_back(std::make_shared<PythonSource>(
data_source, sample_block, column_cache, data_source_row_count, max_block_size, stream, num_streams));
data_source, false, sample_block, column_cache, data_source_row_count, max_block_size, stream, num_streams));
return Pipe::unitePipes(std::move(pipes));
}

Expand Down
43 changes: 43 additions & 0 deletions tests/queries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
SELECT COUNT(*) FROM Python(hits);
SELECT COUNT(*) FROM Python(hits) WHERE AdvEngineID <> 0;
SELECT SUM(AdvEngineID), COUNT(*), AVG(ResolutionWidth) FROM Python(hits);
SELECT AVG(UserID) FROM Python(hits);
SELECT COUNT(DISTINCT UserID) FROM Python(hits);
SELECT COUNT(DISTINCT SearchPhrase) FROM Python(hits);
SELECT MIN(EventDate), MAX(EventDate) FROM Python(hits);
SELECT AdvEngineID, COUNT(*) FROM Python(hits) WHERE AdvEngineID <> 0 GROUP BY AdvEngineID ORDER BY COUNT(*) DESC;
SELECT RegionID, COUNT(DISTINCT UserID) AS u FROM Python(hits) GROUP BY RegionID ORDER BY u DESC LIMIT 10;
SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) FROM Python(hits) GROUP BY RegionID ORDER BY c DESC LIMIT 10;
SELECT MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM Python(hits) WHERE MobilePhoneModel <> '' GROUP BY MobilePhoneModel ORDER BY u DESC LIMIT 10;
SELECT MobilePhone, MobilePhoneModel, COUNT(DISTINCT UserID) AS u FROM Python(hits) WHERE MobilePhoneModel <> '' GROUP BY MobilePhone, MobilePhoneModel ORDER BY u DESC LIMIT 10;
SELECT SearchPhrase, COUNT(*) AS c FROM Python(hits) WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;
SELECT SearchPhrase, COUNT(DISTINCT UserID) AS u FROM Python(hits) WHERE SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY u DESC LIMIT 10;
SELECT SearchEngineID, SearchPhrase, COUNT(*) AS c FROM Python(hits) WHERE SearchPhrase <> '' GROUP BY SearchEngineID, SearchPhrase ORDER BY c DESC LIMIT 10;
SELECT UserID, COUNT(*) FROM Python(hits) GROUP BY UserID ORDER BY COUNT(*) DESC LIMIT 10;
SELECT UserID, SearchPhrase, COUNT(*) FROM Python(hits) GROUP BY UserID, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10;
SELECT UserID, SearchPhrase, COUNT(*) FROM Python(hits) GROUP BY UserID, SearchPhrase LIMIT 10;
SELECT UserID, extract(minute FROM EventTime) AS m, SearchPhrase, COUNT(*) FROM Python(hits) GROUP BY UserID, m, SearchPhrase ORDER BY COUNT(*) DESC LIMIT 10;
SELECT UserID FROM Python(hits) WHERE UserID = 435090932899640449;
SELECT COUNT(*) FROM Python(hits) WHERE URL LIKE '%google%';
SELECT SearchPhrase, MIN(URL), COUNT(*) AS c FROM Python(hits) WHERE URL LIKE '%google%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;
SELECT SearchPhrase, MIN(URL), MIN(Title), COUNT(*) AS c, COUNT(DISTINCT UserID) FROM Python(hits) WHERE Title LIKE '%Google%' AND URL NOT LIKE '%.google.%' AND SearchPhrase <> '' GROUP BY SearchPhrase ORDER BY c DESC LIMIT 10;
SELECT * FROM Python(hits) WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 10;
SELECT SearchPhrase FROM Python(hits) WHERE SearchPhrase <> '' ORDER BY EventTime LIMIT 10;
SELECT SearchPhrase FROM Python(hits) WHERE SearchPhrase <> '' ORDER BY SearchPhrase LIMIT 10;
SELECT SearchPhrase FROM Python(hits) WHERE SearchPhrase <> '' ORDER BY EventTime, SearchPhrase LIMIT 10;
SELECT CounterID, AVG(length(URL)) AS l, COUNT(*) AS c FROM Python(hits) WHERE URL <> '' GROUP BY CounterID HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
SELECT REGEXP_REPLACE(Referer, '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length(Referer)) AS l, COUNT(*) AS c, MIN(Referer) FROM Python(hits) WHERE Referer <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
SELECT SUM(ResolutionWidth), SUM(ResolutionWidth + 1), SUM(ResolutionWidth + 2), SUM(ResolutionWidth + 3), SUM(ResolutionWidth + 4), SUM(ResolutionWidth + 5), SUM(ResolutionWidth + 6), SUM(ResolutionWidth + 7), SUM(ResolutionWidth + 8), SUM(ResolutionWidth + 9), SUM(ResolutionWidth + 10), SUM(ResolutionWidth + 11), SUM(ResolutionWidth + 12), SUM(ResolutionWidth + 13), SUM(ResolutionWidth + 14), SUM(ResolutionWidth + 15), SUM(ResolutionWidth + 16), SUM(ResolutionWidth + 17), SUM(ResolutionWidth + 18), SUM(ResolutionWidth + 19), SUM(ResolutionWidth + 20), SUM(ResolutionWidth + 21), SUM(ResolutionWidth + 22), SUM(ResolutionWidth + 23), SUM(ResolutionWidth + 24), SUM(ResolutionWidth + 25), SUM(ResolutionWidth + 26), SUM(ResolutionWidth + 27), SUM(ResolutionWidth + 28), SUM(ResolutionWidth + 29), SUM(ResolutionWidth + 30), SUM(ResolutionWidth + 31), SUM(ResolutionWidth + 32), SUM(ResolutionWidth + 33), SUM(ResolutionWidth + 34), SUM(ResolutionWidth + 35), SUM(ResolutionWidth + 36), SUM(ResolutionWidth + 37), SUM(ResolutionWidth + 38), SUM(ResolutionWidth + 39), SUM(ResolutionWidth + 40), SUM(ResolutionWidth + 41), SUM(ResolutionWidth + 42), SUM(ResolutionWidth + 43), SUM(ResolutionWidth + 44), SUM(ResolutionWidth + 45), SUM(ResolutionWidth + 46), SUM(ResolutionWidth + 47), SUM(ResolutionWidth + 48), SUM(ResolutionWidth + 49), SUM(ResolutionWidth + 50), SUM(ResolutionWidth + 51), SUM(ResolutionWidth + 52), SUM(ResolutionWidth + 53), SUM(ResolutionWidth + 54), SUM(ResolutionWidth + 55), SUM(ResolutionWidth + 56), SUM(ResolutionWidth + 57), SUM(ResolutionWidth + 58), SUM(ResolutionWidth + 59), SUM(ResolutionWidth + 60), SUM(ResolutionWidth + 61), SUM(ResolutionWidth + 62), SUM(ResolutionWidth + 63), SUM(ResolutionWidth + 64), SUM(ResolutionWidth + 65), SUM(ResolutionWidth + 66), SUM(ResolutionWidth + 67), SUM(ResolutionWidth + 68), SUM(ResolutionWidth + 69), SUM(ResolutionWidth + 70), SUM(ResolutionWidth + 71), SUM(ResolutionWidth + 72), SUM(ResolutionWidth + 73), SUM(ResolutionWidth + 74), SUM(ResolutionWidth + 75), SUM(ResolutionWidth + 76), SUM(ResolutionWidth + 77), SUM(ResolutionWidth + 78), SUM(ResolutionWidth + 79), SUM(ResolutionWidth + 80), SUM(ResolutionWidth + 81), SUM(ResolutionWidth + 82), SUM(ResolutionWidth + 83), SUM(ResolutionWidth + 84), SUM(ResolutionWidth + 85), SUM(ResolutionWidth + 86), SUM(ResolutionWidth + 87), SUM(ResolutionWidth + 88), SUM(ResolutionWidth + 89) FROM Python(hits);
SELECT SearchEngineID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM Python(hits) WHERE SearchPhrase <> '' GROUP BY SearchEngineID, ClientIP ORDER BY c DESC LIMIT 10;
SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM Python(hits) WHERE SearchPhrase <> '' GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;
SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM Python(hits) GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;
SELECT URL, COUNT(*) AS c FROM Python(hits) GROUP BY URL ORDER BY c DESC LIMIT 10;
SELECT 1, URL, COUNT(*) AS c FROM Python(hits) GROUP BY 1, URL ORDER BY c DESC LIMIT 10;
SELECT ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3, COUNT(*) AS c FROM Python(hits) GROUP BY ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3 ORDER BY c DESC LIMIT 10;
SELECT URL, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND URL <> '' GROUP BY URL ORDER BY PageViews DESC LIMIT 10;
SELECT Title, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND Title <> '' GROUP BY Title ORDER BY PageViews DESC LIMIT 10;
SELECT URL, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND IsLink <> 0 AND IsDownload = 0 GROUP BY URL ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
SELECT TraficSourceID, SearchEngineID, AdvEngineID, CASE WHEN (SearchEngineID = 0 AND AdvEngineID = 0) THEN Referer ELSE '' END AS Src, URL AS Dst, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000;
SELECT URLHash, EventDate, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND TraficSourceID IN (-1, 6) AND RefererHash = 3594120000172545465 GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 10 OFFSET 100;
SELECT WindowClientWidth, WindowClientHeight, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND DontCountHits = 0 AND URLHash = 2868770270353813622 GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10 OFFSET 10000;
SELECT DATE_TRUNC('minute', EventTime) AS M, COUNT(*) AS PageViews FROM Python(hits) WHERE CounterID = 62 AND EventDate >= '2013-07-14' AND EventDate <= '2013-07-15' AND IsRefresh = 0 AND DontCountHits = 0 GROUP BY DATE_TRUNC('minute', EventTime) ORDER BY DATE_TRUNC('minute', EventTime) LIMIT 10 OFFSET 1000
96 changes: 96 additions & 0 deletions tests/test_state2_dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#!/usr/bin/env python3

import unittest
import timeit
import datetime
import json
import tempfile
import pandas as pd
import chdb
import os
from urllib.request import urlretrieve


class TestChDBDataFrame(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Download parquet file if it doesn't exist
parquet_file = "hits_0.parquet"
if not os.path.exists(parquet_file):
print(f"Downloading {parquet_file}...")
url = "https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_0.parquet"
urlretrieve(url, parquet_file)
print("Download complete!")

# Load data and prepare DataFrame
cls.hits = pd.read_parquet(parquet_file)
cls.dataframe_size = cls.hits.memory_usage().sum()

# Fix types
cls.hits["EventTime"] = pd.to_datetime(cls.hits["EventTime"], unit="s")
cls.hits["EventDate"] = pd.to_datetime(cls.hits["EventDate"], unit="D")

# Convert object columns to string
for col in cls.hits.columns:
if cls.hits[col].dtype == "O":
cls.hits[col] = cls.hits[col].astype(str)

# Load queries
with open("queries.sql") as f:
cls.queries = f.readlines()

def setUp(self):
self.tmp_dir = tempfile.TemporaryDirectory()
self.conn = chdb.connect(f"{self.tmp_dir.name}")

def tearDown(self):
self.conn.close()
self.tmp_dir.cleanup()

def test_dataframe_size(self):
self.assertGreater(self.dataframe_size, 0, "DataFrame size should be positive")

def test_query_execution(self):
queries_times = []
for i, query in enumerate(self.queries, 1):
times = []
for _ in range(3):
start = timeit.default_timer()
result = self.conn.query(query, "CSV")
end = timeit.default_timer()
times.append(end - start)

# Verify query results are not empty
self.assertIsNotNone(result, f"Query {i} returned None")

queries_times.append(times)
# Verify execution times are reasonable
self.assertTrue(
all(t > 0 for t in times), f"Query {i} has invalid execution times"
)

result_json = {
"system": "chDB 2.2(DataFrame)",
"date": datetime.date.today().strftime("%Y-%m-%d"),
"machine": "",
"cluster_size": 1,
"comment": "",
"tags": [
"C++",
"column-oriented",
"embedded",
"stateless",
"serverless",
"dataframe",
"ClickHouse derivative",
],
"load_time": 0,
"data_size": int(self.dataframe_size),
"result": queries_times, # Will be populated during test_query_execution
}

print(json.dumps(result_json, indent=2))


if __name__ == "__main__":
unittest.main()

0 comments on commit 01afb91

Please sign in to comment.