diff --git a/.gitignore b/.gitignore index 8970ed2c..30e306d9 100644 --- a/.gitignore +++ b/.gitignore @@ -173,6 +173,7 @@ deployments logs # local data +clickhouse_data postgres-data parquet-data data diff --git a/docker-compose.yml b/docker-compose.yml index b2eb8762..7a470d9a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,7 @@ services: soft: 262144 hard: 262144 volumes: - - clickhouse_data:/var/lib/clickhouse + - ./clickhouse_data:/var/lib/clickhouse - ./parquet-data:/var/lib/clickhouse/user_files/parquet-data ports: - 8123:8123 @@ -85,6 +85,8 @@ services: image: ghcr.io/synthetixio/data/extractors:${VERSION} build: context: ./extractors + networks: + - data env_file: - .env volumes: diff --git a/extractors/pyproject.toml b/extractors/pyproject.toml index b83cd141..1e94666e 100644 --- a/extractors/pyproject.toml +++ b/extractors/pyproject.toml @@ -5,6 +5,7 @@ description = "Add your description here" readme = "README.md" requires-python = ">=3.12" dependencies = [ + "clickhouse-connect>=0.8.9", "cryo==0.3.0", "duckdb>=1.1.3", "maturin>=1.7.7", diff --git a/extractors/src/clean.py b/extractors/src/clean.py index 7633fcb9..784bf99a 100644 --- a/extractors/src/clean.py +++ b/extractors/src/clean.py @@ -5,6 +5,7 @@ from eth_utils import decode_hex import polars as pl import duckdb +from .insert import insert_data def fix_labels(labels): @@ -58,7 +59,7 @@ def decode_output(contract, function_name, call): def camel_to_snake(name): - name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name).lower() + name = re.sub(r"(?=0.8.9" }, { name = "cryo", specifier = "==0.3.0" }, { name = "duckdb", specifier = ">=1.1.3" }, { name = "maturin", specifier = ">=1.7.7" }, @@ -525,6 +595,21 @@ version = "1.2.0" source = { registry = "https://pypi.org/simple" } sdist = { url = "https://files.pythonhosted.org/packages/83/63/21480e8ecc218b9b15672d194ea79da8a7389737c21d8406254306733cac/lru-dict-1.2.0.tar.gz", hash = "sha256:13c56782f19d68ddf4d8db0170041192859616514c706b126d0df2ec72a11bd7", size = 10895 } +[[package]] +name = "lz4" +version = "4.3.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a4/31/ec1259ca8ad11568abaf090a7da719616ca96b60d097ccc5799cd0ff599c/lz4-4.3.3.tar.gz", hash = "sha256:01fe674ef2889dbb9899d8a67361e0c4a2c833af5aeb37dd505727cf5d2a131e", size = 171509 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4d/6f/081811b17ccaec5f06b3030756af2737841447849118a6e1078481a78c6c/lz4-4.3.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:e36cd7b9d4d920d3bfc2369840da506fa68258f7bb176b8743189793c055e43d", size = 254213 }, + { url = "https://files.pythonhosted.org/packages/53/4d/8e04ef75feff8848ba3c624ce81c7732bdcea5f8f994758afa88cd3d7764/lz4-4.3.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:31ea4be9d0059c00b2572d700bf2c1bc82f241f2c3282034a759c9a4d6ca4dc2", size = 212354 }, + { url = "https://files.pythonhosted.org/packages/a3/04/257a72d6a879dbc8c669018989f776fcdd5b4bf3c2c51c09a54f1ca31721/lz4-4.3.3-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:33c9a6fd20767ccaf70649982f8f3eeb0884035c150c0b818ea660152cf3c809", size = 1238643 }, + { url = "https://files.pythonhosted.org/packages/d9/93/4a7e489156fa7ded03ba9cde4a8ca7f373672b5787cac9a0391befa752a1/lz4-4.3.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bca8fccc15e3add173da91be8f34121578dc777711ffd98d399be35487c934bf", size = 1265014 }, + { url = "https://files.pythonhosted.org/packages/fd/a4/f84ebc23bc7602623b1b003b4e1120cbf86fb03a35c595c226be1985449b/lz4-4.3.3-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e7d84b479ddf39fe3ea05387f10b779155fc0990125f4fb35d636114e1c63a2e", size = 1184881 }, + { url = "https://files.pythonhosted.org/packages/de/3d/8ba48305378e84908221de143a21ba0c0ce52778893865cf85b66b1068da/lz4-4.3.3-cp312-cp312-win32.whl", hash = "sha256:337cb94488a1b060ef1685187d6ad4ba8bc61d26d631d7ba909ee984ea736be1", size = 87241 }, + { url = "https://files.pythonhosted.org/packages/c4/5d/7b70965a0692de29af2af1007fe837f46fd456bbe2aa8f838a8543a3b5cb/lz4-4.3.3-cp312-cp312-win_amd64.whl", hash = "sha256:5d35533bf2cee56f38ced91f766cd0038b6abf46f438a80d50c52750088be93f", size = 99776 }, +] + [[package]] name = "maturin" version = "1.7.7" @@ -764,6 +849,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c8/11/fabf6ecabb1fe5b7d96889228ca2a9158c4c3bb732e3b8ee3f7f6d40b703/pyarrow-18.1.0-cp313-cp313t-manylinux_2_28_x86_64.whl", hash = "sha256:b76130d835261b38f14fc41fdfb39ad8d672afb84c447126b84d5472244cfaba", size = 40043567 }, ] +[[package]] +name = "pycparser" +version = "2.22" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/1d/b2/31537cf4b1ca988837256c910a668b553fceb8f069bedc4b1c826024b52c/pycparser-2.22.tar.gz", hash = "sha256:491c8be9c040f5390f5bf44a5b07752bd07f56edf992381b05c701439eec10f6", size = 172736 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/13/a3/a812df4e2dd5696d1f351d58b8fe16a405b234ad2886a0dab9183fb78109/pycparser-2.22-py3-none-any.whl", hash = "sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc", size = 117552 }, +] + [[package]] name = "pycryptodome" version = "3.21.0" @@ -1173,3 +1267,46 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f5/d5/688db678e987c3e0fb17867970700b92603cadf36c56e5fb08f23e822a0c/yarl-1.18.3-cp313-cp313-win_amd64.whl", hash = "sha256:578e281c393af575879990861823ef19d66e2b1d0098414855dd367e234f5b3c", size = 315723 }, { url = "https://files.pythonhosted.org/packages/f5/4b/a06e0ec3d155924f77835ed2d167ebd3b211a7b0853da1cf8d8414d784ef/yarl-1.18.3-py3-none-any.whl", hash = "sha256:b57f4f58099328dfb26c6a771d09fb20dbbae81d20cfb66141251ea063bd101b", size = 45109 }, ] + +[[package]] +name = "zstandard" +version = "0.23.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "cffi", marker = "platform_python_implementation == 'PyPy'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ed/f6/2ac0287b442160a89d726b17a9184a4c615bb5237db763791a7fd16d9df1/zstandard-0.23.0.tar.gz", hash = "sha256:b2d8c62d08e7255f68f7a740bae85b3c9b8e5466baa9cbf7f57f1cde0ac6bc09", size = 681701 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7b/83/f23338c963bd9de687d47bf32efe9fd30164e722ba27fb59df33e6b1719b/zstandard-0.23.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:b4567955a6bc1b20e9c31612e615af6b53733491aeaa19a6b3b37f3b65477094", size = 788713 }, + { url = "https://files.pythonhosted.org/packages/5b/b3/1a028f6750fd9227ee0b937a278a434ab7f7fdc3066c3173f64366fe2466/zstandard-0.23.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:1e172f57cd78c20f13a3415cc8dfe24bf388614324d25539146594c16d78fcc8", size = 633459 }, + { url = "https://files.pythonhosted.org/packages/26/af/36d89aae0c1f95a0a98e50711bc5d92c144939efc1f81a2fcd3e78d7f4c1/zstandard-0.23.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b0e166f698c5a3e914947388c162be2583e0c638a4703fc6a543e23a88dea3c1", size = 4945707 }, + { url = "https://files.pythonhosted.org/packages/cd/2e/2051f5c772f4dfc0aae3741d5fc72c3dcfe3aaeb461cc231668a4db1ce14/zstandard-0.23.0-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:12a289832e520c6bd4dcaad68e944b86da3bad0d339ef7989fb7e88f92e96072", size = 5306545 }, + { url = "https://files.pythonhosted.org/packages/0a/9e/a11c97b087f89cab030fa71206963090d2fecd8eb83e67bb8f3ffb84c024/zstandard-0.23.0-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d50d31bfedd53a928fed6707b15a8dbeef011bb6366297cc435accc888b27c20", size = 5337533 }, + { url = "https://files.pythonhosted.org/packages/fc/79/edeb217c57fe1bf16d890aa91a1c2c96b28c07b46afed54a5dcf310c3f6f/zstandard-0.23.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:72c68dda124a1a138340fb62fa21b9bf4848437d9ca60bd35db36f2d3345f373", size = 5436510 }, + { url = "https://files.pythonhosted.org/packages/81/4f/c21383d97cb7a422ddf1ae824b53ce4b51063d0eeb2afa757eb40804a8ef/zstandard-0.23.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:53dd9d5e3d29f95acd5de6802e909ada8d8d8cfa37a3ac64836f3bc4bc5512db", size = 4859973 }, + { url = "https://files.pythonhosted.org/packages/ab/15/08d22e87753304405ccac8be2493a495f529edd81d39a0870621462276ef/zstandard-0.23.0-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:6a41c120c3dbc0d81a8e8adc73312d668cd34acd7725f036992b1b72d22c1772", size = 4936968 }, + { url = "https://files.pythonhosted.org/packages/eb/fa/f3670a597949fe7dcf38119a39f7da49a8a84a6f0b1a2e46b2f71a0ab83f/zstandard-0.23.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:40b33d93c6eddf02d2c19f5773196068d875c41ca25730e8288e9b672897c105", size = 5467179 }, + { url = "https://files.pythonhosted.org/packages/4e/a9/dad2ab22020211e380adc477a1dbf9f109b1f8d94c614944843e20dc2a99/zstandard-0.23.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:9206649ec587e6b02bd124fb7799b86cddec350f6f6c14bc82a2b70183e708ba", size = 4848577 }, + { url = "https://files.pythonhosted.org/packages/08/03/dd28b4484b0770f1e23478413e01bee476ae8227bbc81561f9c329e12564/zstandard-0.23.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:76e79bc28a65f467e0409098fa2c4376931fd3207fbeb6b956c7c476d53746dd", size = 4693899 }, + { url = "https://files.pythonhosted.org/packages/2b/64/3da7497eb635d025841e958bcd66a86117ae320c3b14b0ae86e9e8627518/zstandard-0.23.0-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:66b689c107857eceabf2cf3d3fc699c3c0fe8ccd18df2219d978c0283e4c508a", size = 5199964 }, + { url = "https://files.pythonhosted.org/packages/43/a4/d82decbab158a0e8a6ebb7fc98bc4d903266bce85b6e9aaedea1d288338c/zstandard-0.23.0-cp312-cp312-musllinux_1_2_s390x.whl", hash = "sha256:9c236e635582742fee16603042553d276cca506e824fa2e6489db04039521e90", size = 5655398 }, + { url = "https://files.pythonhosted.org/packages/f2/61/ac78a1263bc83a5cf29e7458b77a568eda5a8f81980691bbc6eb6a0d45cc/zstandard-0.23.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:a8fffdbd9d1408006baaf02f1068d7dd1f016c6bcb7538682622c556e7b68e35", size = 5191313 }, + { url = "https://files.pythonhosted.org/packages/e7/54/967c478314e16af5baf849b6ee9d6ea724ae5b100eb506011f045d3d4e16/zstandard-0.23.0-cp312-cp312-win32.whl", hash = "sha256:dc1d33abb8a0d754ea4763bad944fd965d3d95b5baef6b121c0c9013eaf1907d", size = 430877 }, + { url = "https://files.pythonhosted.org/packages/75/37/872d74bd7739639c4553bf94c84af7d54d8211b626b352bc57f0fd8d1e3f/zstandard-0.23.0-cp312-cp312-win_amd64.whl", hash = "sha256:64585e1dba664dc67c7cdabd56c1e5685233fbb1fc1966cfba2a340ec0dfff7b", size = 495595 }, + { url = "https://files.pythonhosted.org/packages/80/f1/8386f3f7c10261fe85fbc2c012fdb3d4db793b921c9abcc995d8da1b7a80/zstandard-0.23.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:576856e8594e6649aee06ddbfc738fec6a834f7c85bf7cadd1c53d4a58186ef9", size = 788975 }, + { url = "https://files.pythonhosted.org/packages/16/e8/cbf01077550b3e5dc86089035ff8f6fbbb312bc0983757c2d1117ebba242/zstandard-0.23.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:38302b78a850ff82656beaddeb0bb989a0322a8bbb1bf1ab10c17506681d772a", size = 633448 }, + { url = "https://files.pythonhosted.org/packages/06/27/4a1b4c267c29a464a161aeb2589aff212b4db653a1d96bffe3598f3f0d22/zstandard-0.23.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d2240ddc86b74966c34554c49d00eaafa8200a18d3a5b6ffbf7da63b11d74ee2", size = 4945269 }, + { url = "https://files.pythonhosted.org/packages/7c/64/d99261cc57afd9ae65b707e38045ed8269fbdae73544fd2e4a4d50d0ed83/zstandard-0.23.0-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2ef230a8fd217a2015bc91b74f6b3b7d6522ba48be29ad4ea0ca3a3775bf7dd5", size = 5306228 }, + { url = "https://files.pythonhosted.org/packages/7a/cf/27b74c6f22541f0263016a0fd6369b1b7818941de639215c84e4e94b2a1c/zstandard-0.23.0-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:774d45b1fac1461f48698a9d4b5fa19a69d47ece02fa469825b442263f04021f", size = 5336891 }, + { url = "https://files.pythonhosted.org/packages/fa/18/89ac62eac46b69948bf35fcd90d37103f38722968e2981f752d69081ec4d/zstandard-0.23.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6f77fa49079891a4aab203d0b1744acc85577ed16d767b52fc089d83faf8d8ed", size = 5436310 }, + { url = "https://files.pythonhosted.org/packages/a8/a8/5ca5328ee568a873f5118d5b5f70d1f36c6387716efe2e369010289a5738/zstandard-0.23.0-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ac184f87ff521f4840e6ea0b10c0ec90c6b1dcd0bad2f1e4a9a1b4fa177982ea", size = 4859912 }, + { url = "https://files.pythonhosted.org/packages/ea/ca/3781059c95fd0868658b1cf0440edd832b942f84ae60685d0cfdb808bca1/zstandard-0.23.0-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:c363b53e257246a954ebc7c488304b5592b9c53fbe74d03bc1c64dda153fb847", size = 4936946 }, + { url = "https://files.pythonhosted.org/packages/ce/11/41a58986f809532742c2b832c53b74ba0e0a5dae7e8ab4642bf5876f35de/zstandard-0.23.0-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:e7792606d606c8df5277c32ccb58f29b9b8603bf83b48639b7aedf6df4fe8171", size = 5466994 }, + { url = "https://files.pythonhosted.org/packages/83/e3/97d84fe95edd38d7053af05159465d298c8b20cebe9ccb3d26783faa9094/zstandard-0.23.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:a0817825b900fcd43ac5d05b8b3079937073d2b1ff9cf89427590718b70dd840", size = 4848681 }, + { url = "https://files.pythonhosted.org/packages/6e/99/cb1e63e931de15c88af26085e3f2d9af9ce53ccafac73b6e48418fd5a6e6/zstandard-0.23.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:9da6bc32faac9a293ddfdcb9108d4b20416219461e4ec64dfea8383cac186690", size = 4694239 }, + { url = "https://files.pythonhosted.org/packages/ab/50/b1e703016eebbc6501fc92f34db7b1c68e54e567ef39e6e59cf5fb6f2ec0/zstandard-0.23.0-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:fd7699e8fd9969f455ef2926221e0233f81a2542921471382e77a9e2f2b57f4b", size = 5200149 }, + { url = "https://files.pythonhosted.org/packages/aa/e0/932388630aaba70197c78bdb10cce2c91fae01a7e553b76ce85471aec690/zstandard-0.23.0-cp313-cp313-musllinux_1_2_s390x.whl", hash = "sha256:d477ed829077cd945b01fc3115edd132c47e6540ddcd96ca169facff28173057", size = 5655392 }, + { url = "https://files.pythonhosted.org/packages/02/90/2633473864f67a15526324b007a9f96c96f56d5f32ef2a56cc12f9548723/zstandard-0.23.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:fa6ce8b52c5987b3e34d5674b0ab529a4602b632ebab0a93b07bfb4dfc8f8a33", size = 5191299 }, + { url = "https://files.pythonhosted.org/packages/b0/4c/315ca5c32da7e2dc3455f3b2caee5c8c2246074a61aac6ec3378a97b7136/zstandard-0.23.0-cp313-cp313-win32.whl", hash = "sha256:a9b07268d0c3ca5c170a385a0ab9fb7fdd9f5fd866be004c4ea39e44edce47dd", size = 430862 }, + { url = "https://files.pythonhosted.org/packages/a2/bf/c6aaba098e2d04781e8f4f7c0ba3c7aa73d00e4c436bcc0cf059a66691d1/zstandard-0.23.0-cp313-cp313-win_amd64.whl", hash = "sha256:f3513916e8c645d0610815c257cbfd3242adfd5c4cfa78be514e5a3ebb42a41b", size = 495578 }, +] diff --git a/indexers/scripts/clean.py b/indexers/scripts/clean_parquet.py similarity index 56% rename from indexers/scripts/clean.py rename to indexers/scripts/clean_parquet.py index d456a588..397fd38d 100644 --- a/indexers/scripts/clean.py +++ b/indexers/scripts/clean_parquet.py @@ -2,37 +2,49 @@ from pathlib import Path import pandas as pd import os +from utils import convert_case + +RAW_DATA_PATH = "/parquet-data/indexers/raw" +CLEAN_DATA_PATH = "/parquet-data/indexers/clean" def clean_parquet_files(network_name: str, protocol_name: str): - source_base = f"/parquet-data/indexers/raw/{network_name}/{protocol_name}" - target_base = f"/parquet-data/indexers/clean/{network_name}/{protocol_name}" + raw_path = Path(f"{RAW_DATA_PATH}/{network_name}/{protocol_name}") + clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}") + + if not raw_path.exists(): + raise ValueError(f"Source path {raw_path} does not exist") - protocol_path = Path(source_base) - if not protocol_path.exists(): - raise ValueError(f"Source path {source_base} does not exist") - Path(target_base).mkdir(parents=True, exist_ok=True) + clean_path.mkdir(parents=True, exist_ok=True) - for block_range_dir in protocol_path.iterdir(): + for block_range_dir in sorted(raw_path.iterdir()): if not block_range_dir.is_dir(): continue + if "temp" in block_range_dir.name: + continue block_range = block_range_dir.name + empty_files = 0 + written_files = 0 for parquet_file in block_range_dir.glob("*.parquet"): event_name = parquet_file.stem - event_dir = Path(target_base) / event_name + event_dir = clean_path / event_name output_file = event_dir / f"{event_name}_{block_range}.parquet" - - # Skip if file already exists if output_file.exists(): continue - df = pd.read_parquet(parquet_file) if df.empty: + empty_files += 1 continue + df.columns = [convert_case(col) for col in df.columns] event_dir.mkdir(parents=True, exist_ok=True) df.to_parquet(output_file, index=False) - print(f"Processed {protocol_name} {block_range}") + written_files += 1 + print( + f"Processed {network_name}.{protocol_name}.{block_range}:\n" + f"\t empty raw files {empty_files}\n" + f"\t written files {written_files}" + ) if __name__ == "__main__": diff --git a/indexers/scripts/import_parquet.py b/indexers/scripts/import_parquet.py new file mode 100644 index 00000000..0433bc9d --- /dev/null +++ b/indexers/scripts/import_parquet.py @@ -0,0 +1,62 @@ +import argparse +import os +from pathlib import Path +import clickhouse_connect +from utils import create_table_from_schema, insert_data_from_path + +CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean" +CLEAN_DATA_PATH = "/parquet-data/indexers/clean" +SCHEMAS_PATH = "/parquet-data/indexers/schemas" + + +def init_tables_from_schemas(client, network_name: str, protocol_name: str): + print(f"Initializing tables for {network_name} {protocol_name}") + schema_path = Path(f"{SCHEMAS_PATH}/{network_name}/{protocol_name}") + db_name = f"raw_{network_name}" + + for schema_file in schema_path.glob("*.sql"): + event_name = schema_file.stem + table_name = f"{protocol_name}_{event_name}" + + client.command(f"drop table if exists {db_name}.{table_name}") + create_table_from_schema(client, str(schema_file)) + + +def import_parquet_files(client, network_name: str, protocol_name: str): + print(f"Inserting {network_name} {protocol_name} data into tables") + clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}") + db_name = f"raw_{network_name}" + + for event_name in clean_path.iterdir(): + if not event_name.is_dir(): + continue + event_name = event_name.name + table_name = f"{protocol_name}_{event_name}" + file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/{event_name}/*.parquet" + + insert_data_from_path(client, db_name, table_name, file_path) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--network_name", type=str) + parser.add_argument("--protocol_name", type=str) + args = parser.parse_args() + + network_name = os.getenv("NETWORK_NAME") or args.network_name + protocol_name = os.getenv("PROTOCOL_NAME") or args.protocol_name + + if network_name is None or protocol_name is None: + raise ValueError("Network and protocol must be provided") + + client = clickhouse_connect.get_client( + host="clickhouse", + port=8123, + username="default", + ) + + db_name = f"raw_{network_name}" + client.command(f"CREATE DATABASE IF NOT EXISTS {db_name}") + + init_tables_from_schemas(client, network_name, protocol_name) + import_parquet_files(client, network_name, protocol_name) diff --git a/indexers/scripts/listener.py b/indexers/scripts/listener.py index 90e8d405..54fc09bc 100644 --- a/indexers/scripts/listener.py +++ b/indexers/scripts/listener.py @@ -1,24 +1,16 @@ -import argparse -import os from pathlib import Path import time -import re import pandas as pd from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import clickhouse_connect -from clickhouse_connect.driver.client import Client +from utils import create_table_from_path, insert_data_from_path, convert_case CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean" RAW_DATA_PATH = "/parquet-data/indexers/raw" CLEAN_DATA_PATH = "/parquet-data/indexers/clean" -def convert_case(name): - snake_case = re.sub(r"(?