From 70ea47a9bfbac448a2cd2575074af4a1c80420d3 Mon Sep 17 00:00:00 2001 From: Troy Date: Tue, 10 Dec 2024 23:49:18 +0000 Subject: [PATCH 01/14] update extractor ingestion --- docker-compose.yml | 2 + extractors/pyproject.toml | 1 + extractors/src/clean.py | 17 +++- extractors/src/extract.py | 10 +- extractors/src/insert.py | 25 +++++ extractors/uv.lock | 137 ++++++++++++++++++++++++++++ indexers/Dockerfile | 2 +- indexers/utils/clickhouse_schema.py | 42 ++++++++- 8 files changed, 226 insertions(+), 10 deletions(-) create mode 100644 extractors/src/insert.py diff --git a/docker-compose.yml b/docker-compose.yml index b2eb8762..2ed3748a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -85,6 +85,8 @@ services: image: ghcr.io/synthetixio/data/extractors:${VERSION} build: context: ./extractors + networks: + - data env_file: - .env volumes: diff --git a/extractors/pyproject.toml b/extractors/pyproject.toml index b83cd141..1e94666e 100644 --- a/extractors/pyproject.toml +++ b/extractors/pyproject.toml @@ -5,6 +5,7 @@ description = "Add your description here" readme = "README.md" requires-python = ">=3.12" dependencies = [ + "clickhouse-connect>=0.8.9", "cryo==0.3.0", "duckdb>=1.1.3", "maturin>=1.7.7", diff --git a/extractors/src/clean.py b/extractors/src/clean.py index 7633fcb9..85cca2de 100644 --- a/extractors/src/clean.py +++ b/extractors/src/clean.py @@ -5,6 +5,7 @@ from eth_utils import decode_hex import polars as pl import duckdb +from .insert import insert_data def fix_labels(labels): @@ -58,7 +59,7 @@ def decode_output(contract, function_name, call): def camel_to_snake(name): - name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name).lower() + name = re.sub(r"(?=0.8.9" }, { name = "cryo", specifier = "==0.3.0" }, { name = "duckdb", specifier = ">=1.1.3" }, { name = "maturin", specifier = ">=1.7.7" }, @@ -525,6 +595,21 @@ version = "1.2.0" source = { registry = "https://pypi.org/simple" } sdist = { url = "https://files.pythonhosted.org/packages/83/63/21480e8ecc218b9b15672d194ea79da8a7389737c21d8406254306733cac/lru-dict-1.2.0.tar.gz", hash = "sha256:13c56782f19d68ddf4d8db0170041192859616514c706b126d0df2ec72a11bd7", size = 10895 } +[[package]] +name = "lz4" +version = "4.3.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a4/31/ec1259ca8ad11568abaf090a7da719616ca96b60d097ccc5799cd0ff599c/lz4-4.3.3.tar.gz", hash = "sha256:01fe674ef2889dbb9899d8a67361e0c4a2c833af5aeb37dd505727cf5d2a131e", size = 171509 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4d/6f/081811b17ccaec5f06b3030756af2737841447849118a6e1078481a78c6c/lz4-4.3.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:e36cd7b9d4d920d3bfc2369840da506fa68258f7bb176b8743189793c055e43d", size = 254213 }, + { url = "https://files.pythonhosted.org/packages/53/4d/8e04ef75feff8848ba3c624ce81c7732bdcea5f8f994758afa88cd3d7764/lz4-4.3.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:31ea4be9d0059c00b2572d700bf2c1bc82f241f2c3282034a759c9a4d6ca4dc2", size = 212354 }, + { url = "https://files.pythonhosted.org/packages/a3/04/257a72d6a879dbc8c669018989f776fcdd5b4bf3c2c51c09a54f1ca31721/lz4-4.3.3-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:33c9a6fd20767ccaf70649982f8f3eeb0884035c150c0b818ea660152cf3c809", size = 1238643 }, + { url = "https://files.pythonhosted.org/packages/d9/93/4a7e489156fa7ded03ba9cde4a8ca7f373672b5787cac9a0391befa752a1/lz4-4.3.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bca8fccc15e3add173da91be8f34121578dc777711ffd98d399be35487c934bf", size = 1265014 }, + { url = "https://files.pythonhosted.org/packages/fd/a4/f84ebc23bc7602623b1b003b4e1120cbf86fb03a35c595c226be1985449b/lz4-4.3.3-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e7d84b479ddf39fe3ea05387f10b779155fc0990125f4fb35d636114e1c63a2e", size = 1184881 }, + { url = "https://files.pythonhosted.org/packages/de/3d/8ba48305378e84908221de143a21ba0c0ce52778893865cf85b66b1068da/lz4-4.3.3-cp312-cp312-win32.whl", hash = "sha256:337cb94488a1b060ef1685187d6ad4ba8bc61d26d631d7ba909ee984ea736be1", size = 87241 }, + { url = "https://files.pythonhosted.org/packages/c4/5d/7b70965a0692de29af2af1007fe837f46fd456bbe2aa8f838a8543a3b5cb/lz4-4.3.3-cp312-cp312-win_amd64.whl", hash = "sha256:5d35533bf2cee56f38ced91f766cd0038b6abf46f438a80d50c52750088be93f", size = 99776 }, +] + [[package]] name = "maturin" version = "1.7.7" @@ -764,6 +849,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c8/11/fabf6ecabb1fe5b7d96889228ca2a9158c4c3bb732e3b8ee3f7f6d40b703/pyarrow-18.1.0-cp313-cp313t-manylinux_2_28_x86_64.whl", hash = "sha256:b76130d835261b38f14fc41fdfb39ad8d672afb84c447126b84d5472244cfaba", size = 40043567 }, ] +[[package]] +name = "pycparser" +version = "2.22" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/1d/b2/31537cf4b1ca988837256c910a668b553fceb8f069bedc4b1c826024b52c/pycparser-2.22.tar.gz", hash = "sha256:491c8be9c040f5390f5bf44a5b07752bd07f56edf992381b05c701439eec10f6", size = 172736 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/13/a3/a812df4e2dd5696d1f351d58b8fe16a405b234ad2886a0dab9183fb78109/pycparser-2.22-py3-none-any.whl", hash = "sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc", size = 117552 }, +] + [[package]] name = "pycryptodome" version = "3.21.0" @@ -1173,3 +1267,46 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f5/d5/688db678e987c3e0fb17867970700b92603cadf36c56e5fb08f23e822a0c/yarl-1.18.3-cp313-cp313-win_amd64.whl", hash = "sha256:578e281c393af575879990861823ef19d66e2b1d0098414855dd367e234f5b3c", size = 315723 }, { url = "https://files.pythonhosted.org/packages/f5/4b/a06e0ec3d155924f77835ed2d167ebd3b211a7b0853da1cf8d8414d784ef/yarl-1.18.3-py3-none-any.whl", hash = "sha256:b57f4f58099328dfb26c6a771d09fb20dbbae81d20cfb66141251ea063bd101b", size = 45109 }, ] + +[[package]] +name = "zstandard" +version = "0.23.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "cffi", marker = "platform_python_implementation == 'PyPy'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ed/f6/2ac0287b442160a89d726b17a9184a4c615bb5237db763791a7fd16d9df1/zstandard-0.23.0.tar.gz", hash = "sha256:b2d8c62d08e7255f68f7a740bae85b3c9b8e5466baa9cbf7f57f1cde0ac6bc09", size = 681701 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7b/83/f23338c963bd9de687d47bf32efe9fd30164e722ba27fb59df33e6b1719b/zstandard-0.23.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:b4567955a6bc1b20e9c31612e615af6b53733491aeaa19a6b3b37f3b65477094", size = 788713 }, + { url = "https://files.pythonhosted.org/packages/5b/b3/1a028f6750fd9227ee0b937a278a434ab7f7fdc3066c3173f64366fe2466/zstandard-0.23.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:1e172f57cd78c20f13a3415cc8dfe24bf388614324d25539146594c16d78fcc8", size = 633459 }, + { url = "https://files.pythonhosted.org/packages/26/af/36d89aae0c1f95a0a98e50711bc5d92c144939efc1f81a2fcd3e78d7f4c1/zstandard-0.23.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b0e166f698c5a3e914947388c162be2583e0c638a4703fc6a543e23a88dea3c1", size = 4945707 }, + { url = "https://files.pythonhosted.org/packages/cd/2e/2051f5c772f4dfc0aae3741d5fc72c3dcfe3aaeb461cc231668a4db1ce14/zstandard-0.23.0-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:12a289832e520c6bd4dcaad68e944b86da3bad0d339ef7989fb7e88f92e96072", size = 5306545 }, + { url = "https://files.pythonhosted.org/packages/0a/9e/a11c97b087f89cab030fa71206963090d2fecd8eb83e67bb8f3ffb84c024/zstandard-0.23.0-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d50d31bfedd53a928fed6707b15a8dbeef011bb6366297cc435accc888b27c20", size = 5337533 }, + { url = "https://files.pythonhosted.org/packages/fc/79/edeb217c57fe1bf16d890aa91a1c2c96b28c07b46afed54a5dcf310c3f6f/zstandard-0.23.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:72c68dda124a1a138340fb62fa21b9bf4848437d9ca60bd35db36f2d3345f373", size = 5436510 }, + { url = "https://files.pythonhosted.org/packages/81/4f/c21383d97cb7a422ddf1ae824b53ce4b51063d0eeb2afa757eb40804a8ef/zstandard-0.23.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:53dd9d5e3d29f95acd5de6802e909ada8d8d8cfa37a3ac64836f3bc4bc5512db", size = 4859973 }, + { url = "https://files.pythonhosted.org/packages/ab/15/08d22e87753304405ccac8be2493a495f529edd81d39a0870621462276ef/zstandard-0.23.0-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:6a41c120c3dbc0d81a8e8adc73312d668cd34acd7725f036992b1b72d22c1772", size = 4936968 }, + { url = "https://files.pythonhosted.org/packages/eb/fa/f3670a597949fe7dcf38119a39f7da49a8a84a6f0b1a2e46b2f71a0ab83f/zstandard-0.23.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:40b33d93c6eddf02d2c19f5773196068d875c41ca25730e8288e9b672897c105", size = 5467179 }, + { url = "https://files.pythonhosted.org/packages/4e/a9/dad2ab22020211e380adc477a1dbf9f109b1f8d94c614944843e20dc2a99/zstandard-0.23.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:9206649ec587e6b02bd124fb7799b86cddec350f6f6c14bc82a2b70183e708ba", size = 4848577 }, + { url = "https://files.pythonhosted.org/packages/08/03/dd28b4484b0770f1e23478413e01bee476ae8227bbc81561f9c329e12564/zstandard-0.23.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:76e79bc28a65f467e0409098fa2c4376931fd3207fbeb6b956c7c476d53746dd", size = 4693899 }, + { url = "https://files.pythonhosted.org/packages/2b/64/3da7497eb635d025841e958bcd66a86117ae320c3b14b0ae86e9e8627518/zstandard-0.23.0-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:66b689c107857eceabf2cf3d3fc699c3c0fe8ccd18df2219d978c0283e4c508a", size = 5199964 }, + { url = "https://files.pythonhosted.org/packages/43/a4/d82decbab158a0e8a6ebb7fc98bc4d903266bce85b6e9aaedea1d288338c/zstandard-0.23.0-cp312-cp312-musllinux_1_2_s390x.whl", hash = "sha256:9c236e635582742fee16603042553d276cca506e824fa2e6489db04039521e90", size = 5655398 }, + { url = "https://files.pythonhosted.org/packages/f2/61/ac78a1263bc83a5cf29e7458b77a568eda5a8f81980691bbc6eb6a0d45cc/zstandard-0.23.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:a8fffdbd9d1408006baaf02f1068d7dd1f016c6bcb7538682622c556e7b68e35", size = 5191313 }, + { url = "https://files.pythonhosted.org/packages/e7/54/967c478314e16af5baf849b6ee9d6ea724ae5b100eb506011f045d3d4e16/zstandard-0.23.0-cp312-cp312-win32.whl", hash = "sha256:dc1d33abb8a0d754ea4763bad944fd965d3d95b5baef6b121c0c9013eaf1907d", size = 430877 }, + { url = "https://files.pythonhosted.org/packages/75/37/872d74bd7739639c4553bf94c84af7d54d8211b626b352bc57f0fd8d1e3f/zstandard-0.23.0-cp312-cp312-win_amd64.whl", hash = "sha256:64585e1dba664dc67c7cdabd56c1e5685233fbb1fc1966cfba2a340ec0dfff7b", size = 495595 }, + { url = "https://files.pythonhosted.org/packages/80/f1/8386f3f7c10261fe85fbc2c012fdb3d4db793b921c9abcc995d8da1b7a80/zstandard-0.23.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:576856e8594e6649aee06ddbfc738fec6a834f7c85bf7cadd1c53d4a58186ef9", size = 788975 }, + { url = "https://files.pythonhosted.org/packages/16/e8/cbf01077550b3e5dc86089035ff8f6fbbb312bc0983757c2d1117ebba242/zstandard-0.23.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:38302b78a850ff82656beaddeb0bb989a0322a8bbb1bf1ab10c17506681d772a", size = 633448 }, + { url = "https://files.pythonhosted.org/packages/06/27/4a1b4c267c29a464a161aeb2589aff212b4db653a1d96bffe3598f3f0d22/zstandard-0.23.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d2240ddc86b74966c34554c49d00eaafa8200a18d3a5b6ffbf7da63b11d74ee2", size = 4945269 }, + { url = "https://files.pythonhosted.org/packages/7c/64/d99261cc57afd9ae65b707e38045ed8269fbdae73544fd2e4a4d50d0ed83/zstandard-0.23.0-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2ef230a8fd217a2015bc91b74f6b3b7d6522ba48be29ad4ea0ca3a3775bf7dd5", size = 5306228 }, + { url = "https://files.pythonhosted.org/packages/7a/cf/27b74c6f22541f0263016a0fd6369b1b7818941de639215c84e4e94b2a1c/zstandard-0.23.0-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:774d45b1fac1461f48698a9d4b5fa19a69d47ece02fa469825b442263f04021f", size = 5336891 }, + { url = "https://files.pythonhosted.org/packages/fa/18/89ac62eac46b69948bf35fcd90d37103f38722968e2981f752d69081ec4d/zstandard-0.23.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6f77fa49079891a4aab203d0b1744acc85577ed16d767b52fc089d83faf8d8ed", size = 5436310 }, + { url = "https://files.pythonhosted.org/packages/a8/a8/5ca5328ee568a873f5118d5b5f70d1f36c6387716efe2e369010289a5738/zstandard-0.23.0-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ac184f87ff521f4840e6ea0b10c0ec90c6b1dcd0bad2f1e4a9a1b4fa177982ea", size = 4859912 }, + { url = "https://files.pythonhosted.org/packages/ea/ca/3781059c95fd0868658b1cf0440edd832b942f84ae60685d0cfdb808bca1/zstandard-0.23.0-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:c363b53e257246a954ebc7c488304b5592b9c53fbe74d03bc1c64dda153fb847", size = 4936946 }, + { url = "https://files.pythonhosted.org/packages/ce/11/41a58986f809532742c2b832c53b74ba0e0a5dae7e8ab4642bf5876f35de/zstandard-0.23.0-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:e7792606d606c8df5277c32ccb58f29b9b8603bf83b48639b7aedf6df4fe8171", size = 5466994 }, + { url = "https://files.pythonhosted.org/packages/83/e3/97d84fe95edd38d7053af05159465d298c8b20cebe9ccb3d26783faa9094/zstandard-0.23.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:a0817825b900fcd43ac5d05b8b3079937073d2b1ff9cf89427590718b70dd840", size = 4848681 }, + { url = "https://files.pythonhosted.org/packages/6e/99/cb1e63e931de15c88af26085e3f2d9af9ce53ccafac73b6e48418fd5a6e6/zstandard-0.23.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:9da6bc32faac9a293ddfdcb9108d4b20416219461e4ec64dfea8383cac186690", size = 4694239 }, + { url = "https://files.pythonhosted.org/packages/ab/50/b1e703016eebbc6501fc92f34db7b1c68e54e567ef39e6e59cf5fb6f2ec0/zstandard-0.23.0-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:fd7699e8fd9969f455ef2926221e0233f81a2542921471382e77a9e2f2b57f4b", size = 5200149 }, + { url = "https://files.pythonhosted.org/packages/aa/e0/932388630aaba70197c78bdb10cce2c91fae01a7e553b76ce85471aec690/zstandard-0.23.0-cp313-cp313-musllinux_1_2_s390x.whl", hash = "sha256:d477ed829077cd945b01fc3115edd132c47e6540ddcd96ca169facff28173057", size = 5655392 }, + { url = "https://files.pythonhosted.org/packages/02/90/2633473864f67a15526324b007a9f96c96f56d5f32ef2a56cc12f9548723/zstandard-0.23.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:fa6ce8b52c5987b3e34d5674b0ab529a4602b632ebab0a93b07bfb4dfc8f8a33", size = 5191299 }, + { url = "https://files.pythonhosted.org/packages/b0/4c/315ca5c32da7e2dc3455f3b2caee5c8c2246074a61aac6ec3378a97b7136/zstandard-0.23.0-cp313-cp313-win32.whl", hash = "sha256:a9b07268d0c3ca5c170a385a0ab9fb7fdd9f5fd866be004c4ea39e44edce47dd", size = 430862 }, + { url = "https://files.pythonhosted.org/packages/a2/bf/c6aaba098e2d04781e8f4f7c0ba3c7aa73d00e4c436bcc0cf059a66691d1/zstandard-0.23.0-cp313-cp313-win_amd64.whl", hash = "sha256:f3513916e8c645d0610815c257cbfd3242adfd5c4cfa78be514e5a3ebb42a41b", size = 495578 }, +] diff --git a/indexers/Dockerfile b/indexers/Dockerfile index 8d60c161..523dfbf3 100644 --- a/indexers/Dockerfile +++ b/indexers/Dockerfile @@ -6,7 +6,7 @@ WORKDIR /app COPY package*.json ./ COPY patches/ ./patches/ -RUN apt-get update && apt-get install -y build-essential && npm ci && apt-get remove -y build-essential +RUN apt-get update && apt-get install -y build-essential clang && npm ci COPY pyproject.toml uv.lock ./ RUN uv sync --frozen --no-dev diff --git a/indexers/utils/clickhouse_schema.py b/indexers/utils/clickhouse_schema.py index a0298600..3b73aaf2 100644 --- a/indexers/utils/clickhouse_schema.py +++ b/indexers/utils/clickhouse_schema.py @@ -1,6 +1,10 @@ import re from pathlib import Path -from web3._utils.abi import get_abi_input_names, get_abi_input_types +from web3._utils.abi import ( + get_abi_input_names, + get_abi_input_types, + get_abi_output_types, +) def to_snake(name): @@ -9,7 +13,7 @@ def to_snake(name): def map_to_clickhouse_type(sol_type): - if sol_type in ["bytes32", "address", "string"]: + if sol_type in ["address", "string"] or sol_type.startswith("bytes"): return "String" elif re.search(r"\(.*\)|\[\[$", sol_type): return "JSON" @@ -79,6 +83,7 @@ def process_abi_schemas(client, abi, path, contract_name, network_name, protocol output_path: Path where schema files will be saved contract_name: Name of the contract (used for namespacing) """ + # do events events = [item for item in abi if item["type"] == "event"] for event in events: @@ -99,6 +104,39 @@ def process_abi_schemas(client, abi, path, contract_name, network_name, protocol client.command(schema) save_clickhouse_schema(path=path, event_name=event_name, schema=schema) + # do functions + functions = [item for item in abi if item["type"] == "function"] + + for f in functions: + function_name = to_snake(f["name"]) + contract_name = to_snake(contract_name) + function_name = f"{contract_name}_{function_name}" + + input_types = get_abi_input_types(f) + input_names = get_abi_input_names(f) + output_types = get_abi_output_types(f) + output_names = [o["name"] for o in f["outputs"]] + + all_names = input_names + output_names + all_types = input_types + output_types + + no_outputs = len(output_types) == 0 + empty_names = '' in all_names + type_mismatch = len(all_names) != len(all_types) + if no_outputs or empty_names or type_mismatch: + continue + fields = list(zip(all_names, all_types)) + + schema = generate_clickhouse_schema( + event_name=event_name, + fields=fields, + network_name=network_name, + protocol_name=protocol_name, + ) + client.command(schema) + save_clickhouse_schema(path=path, event_name=event_name, schema=schema) + + # do the blocks block_schema = ( f"CREATE TABLE IF NOT EXISTS {network_name}.{protocol_name}_block (\n" " `number` UInt64,\n" From c9bd0c197fc526b4acc475deb64d8a3112263685 Mon Sep 17 00:00:00 2001 From: Troy Date: Wed, 11 Dec 2024 14:43:54 -0700 Subject: [PATCH 02/14] update schema script --- indexers/utils/clickhouse_schema.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/indexers/utils/clickhouse_schema.py b/indexers/utils/clickhouse_schema.py index 3b73aaf2..853c1240 100644 --- a/indexers/utils/clickhouse_schema.py +++ b/indexers/utils/clickhouse_schema.py @@ -121,20 +121,22 @@ def process_abi_schemas(client, abi, path, contract_name, network_name, protocol all_types = input_types + output_types no_outputs = len(output_types) == 0 - empty_names = '' in all_names + empty_names = "" in all_names type_mismatch = len(all_names) != len(all_types) if no_outputs or empty_names or type_mismatch: continue + else: + print(f"Running query for {function_name}") fields = list(zip(all_names, all_types)) schema = generate_clickhouse_schema( - event_name=event_name, + event_name=function_name, fields=fields, network_name=network_name, protocol_name=protocol_name, ) client.command(schema) - save_clickhouse_schema(path=path, event_name=event_name, schema=schema) + save_clickhouse_schema(path=path, event_name=function_name, schema=schema) # do the blocks block_schema = ( From 1474d4d182886c3e2038bbc1534bfa6399fb6946 Mon Sep 17 00:00:00 2001 From: marcus-snx Date: Tue, 17 Dec 2024 17:56:14 +0200 Subject: [PATCH 03/14] Organize parquet scripts for indexer --- .../scripts/{clean.py => clean_parquet.py} | 32 +++++++++----- indexers/scripts/import_parquet.py | 44 +++++++++++++++++++ indexers/scripts/listener.py | 30 +++---------- indexers/scripts/utils.py | 23 ++++++++++ 4 files changed, 94 insertions(+), 35 deletions(-) rename indexers/scripts/{clean.py => clean_parquet.py} (61%) create mode 100644 indexers/scripts/import_parquet.py create mode 100644 indexers/scripts/utils.py diff --git a/indexers/scripts/clean.py b/indexers/scripts/clean_parquet.py similarity index 61% rename from indexers/scripts/clean.py rename to indexers/scripts/clean_parquet.py index d456a588..2e1c0248 100644 --- a/indexers/scripts/clean.py +++ b/indexers/scripts/clean_parquet.py @@ -3,36 +3,44 @@ import pandas as pd import os +RAW_DATA_PATH = "/parquet-data/indexers/raw" +CLEAN_DATA_PATH = "/parquet-data/indexers/clean" + def clean_parquet_files(network_name: str, protocol_name: str): - source_base = f"/parquet-data/indexers/raw/{network_name}/{protocol_name}" - target_base = f"/parquet-data/indexers/clean/{network_name}/{protocol_name}" + raw_path = Path(f"{RAW_DATA_PATH}/{network_name}/{protocol_name}") + clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}") + + if not raw_path.exists(): + raise ValueError(f"Source path {raw_path} does not exist") - protocol_path = Path(source_base) - if not protocol_path.exists(): - raise ValueError(f"Source path {source_base} does not exist") - Path(target_base).mkdir(parents=True, exist_ok=True) + clean_path.mkdir(parents=True, exist_ok=True) - for block_range_dir in protocol_path.iterdir(): + for block_range_dir in raw_path.iterdir(): if not block_range_dir.is_dir(): continue block_range = block_range_dir.name + empty_files = 0 + written_files = 0 for parquet_file in block_range_dir.glob("*.parquet"): event_name = parquet_file.stem - event_dir = Path(target_base) / event_name + event_dir = clean_path / event_name output_file = event_dir / f"{event_name}_{block_range}.parquet" - - # Skip if file already exists if output_file.exists(): continue - df = pd.read_parquet(parquet_file) if df.empty: + empty_files += 1 continue event_dir.mkdir(parents=True, exist_ok=True) df.to_parquet(output_file, index=False) - print(f"Processed {protocol_name} {block_range}") + written_files += 1 + print( + f"Processed {network_name}.{protocol_name}.{block_range}: " + f"\t empty raw files {empty_files}, " + f"\t written files {written_files}" + ) if __name__ == "__main__": diff --git a/indexers/scripts/import_parquet.py b/indexers/scripts/import_parquet.py new file mode 100644 index 00000000..fd729a35 --- /dev/null +++ b/indexers/scripts/import_parquet.py @@ -0,0 +1,44 @@ +import argparse +import os +from pathlib import Path +import clickhouse_connect +from .utils import create_table_from_path + +CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean" +CLEAN_DATA_PATH = "/parquet-data/indexers/clean" + + +def import_parquet_files(client, network_name: str, protocol_name: str): + clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}") + db_name = f"raw_{network_name}" + + for event_name in clean_path.iterdir(): + if not event_name.is_dir(): + continue + event_name = event_name.name + table_name = f"{protocol_name}_{event_name}" + file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/{event_name}/*.parquet" + + client.command(f"drop table if exists {db_name}.{table_name}") + create_table_from_path(client, db_name, table_name, file_path) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--network_name", type=str) + parser.add_argument("--protocol_name", type=str) + args = parser.parse_args() + + network_name = os.getenv("NETWORK_NAME") or args.network_name + protocol_name = os.getenv("PROTOCOL_NAME") or args.protocol_name + + print(f"Cleaning {network_name} {protocol_name}") + + if network_name is None or protocol_name is None: + raise ValueError("Network and protocol must be provided") + + client = clickhouse_connect.get_client( + host="localhost", port=8123, username="default" + ) + + import_parquet_files(client, network_name, protocol_name) diff --git a/indexers/scripts/listener.py b/indexers/scripts/listener.py index 90e8d405..74bcb6b9 100644 --- a/indexers/scripts/listener.py +++ b/indexers/scripts/listener.py @@ -1,5 +1,3 @@ -import argparse -import os from pathlib import Path import time import re @@ -7,7 +5,7 @@ from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import clickhouse_connect -from clickhouse_connect.driver.client import Client +from utils import create_table_from_path, insert_data_from_path CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean" RAW_DATA_PATH = "/parquet-data/indexers/raw" @@ -75,10 +73,14 @@ def _clean_parquet(self, path: str): table_name = f"{protocol_name}_{event_name}" clickhouse_file_path = f"{self.clickhouse_path}/{network_name}/{protocol_name}/{event_name}/{event_name}_{block_range}.parquet" if not self.client.command(f"exists {db_name}.{table_name}"): - self._create_table_from_file(db_name, table_name, clickhouse_file_path) + create_table_from_path( + self.client, db_name, table_name, clickhouse_file_path + ) tables_created += 1 else: - self._insert_data_from_file(db_name, table_name, clickhouse_file_path) + insert_data_from_path( + self.client, db_name, table_name, clickhouse_file_path + ) data_insertions += 1 print( @@ -87,24 +89,6 @@ def _clean_parquet(self, path: str): f"tables created {tables_created}, data insertions {data_insertions}" ) - def _create_table_from_file(self, db_name: str, table_name: str, file_path: str): - query = ( - f"create table if not exists {db_name}.{table_name} " - f"engine = MergeTree order by tuple() as " - f"select * from file('{file_path}', 'Parquet')" - ) - try: - self.client.command(query) - except Exception as e: - print(f"Error creating table {db_name}.{table_name}: {e}") - - def _insert_data_from_file(self, db_name: str, table_name: str, file_path: str): - query = f"insert into {db_name}.{table_name} select * from file('{file_path}', 'Parquet')" - try: - self.client.command(query) - except Exception as e: - print(f"Error inserting data into {db_name}.{table_name}: {e}") - def main(): event_handler = FolderEventHandler() diff --git a/indexers/scripts/utils.py b/indexers/scripts/utils.py new file mode 100644 index 00000000..b0100643 --- /dev/null +++ b/indexers/scripts/utils.py @@ -0,0 +1,23 @@ +from clickhouse_connect.driver.client import Client + + +def create_table_from_path(client: Client, db_name: str, table_name: str, path: str): + query = ( + f"create table if not exists {db_name}.{table_name} " + f"engine = MergeTree order by tuple() as " + f"select * from file('{path}', 'Parquet')" + ) + try: + client.command(query) + except Exception as e: + print(f"Error creating table {db_name}.{table_name}: {e}") + + +def insert_data_from_path(client: Client, db_name: str, table_name: str, path: str): + query = ( + f"insert into {db_name}.{table_name} select * from file('{path}', 'Parquet')" + ) + try: + client.command(query) + except Exception as e: + print(f"Error inserting data into {db_name}.{table_name}: {e}") From c0bf476e31f8298255cc60d2d49f6cfedefda40c Mon Sep 17 00:00:00 2001 From: marcus-snx Date: Wed, 18 Dec 2024 19:55:23 +0200 Subject: [PATCH 04/14] Update scripts --- indexers/scripts/clean_parquet.py | 6 +++--- indexers/scripts/import_parquet.py | 11 +++++++---- indexers/scripts/utils.py | 15 ++++++++------- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/indexers/scripts/clean_parquet.py b/indexers/scripts/clean_parquet.py index 2e1c0248..5f9343cc 100644 --- a/indexers/scripts/clean_parquet.py +++ b/indexers/scripts/clean_parquet.py @@ -16,7 +16,7 @@ def clean_parquet_files(network_name: str, protocol_name: str): clean_path.mkdir(parents=True, exist_ok=True) - for block_range_dir in raw_path.iterdir(): + for block_range_dir in sorted(raw_path.iterdir()): if not block_range_dir.is_dir(): continue block_range = block_range_dir.name @@ -37,8 +37,8 @@ def clean_parquet_files(network_name: str, protocol_name: str): df.to_parquet(output_file, index=False) written_files += 1 print( - f"Processed {network_name}.{protocol_name}.{block_range}: " - f"\t empty raw files {empty_files}, " + f"Processed {network_name}.{protocol_name}.{block_range}:\n" + f"\t empty raw files {empty_files}\n" f"\t written files {written_files}" ) diff --git a/indexers/scripts/import_parquet.py b/indexers/scripts/import_parquet.py index fd729a35..9e7ba907 100644 --- a/indexers/scripts/import_parquet.py +++ b/indexers/scripts/import_parquet.py @@ -2,10 +2,11 @@ import os from pathlib import Path import clickhouse_connect -from .utils import create_table_from_path +from utils import create_table_from_schema, insert_data_from_path CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean" CLEAN_DATA_PATH = "/parquet-data/indexers/clean" +SCHEMAS_PATH = "/parquet-data/indexers/schemas" def import_parquet_files(client, network_name: str, protocol_name: str): @@ -18,9 +19,11 @@ def import_parquet_files(client, network_name: str, protocol_name: str): event_name = event_name.name table_name = f"{protocol_name}_{event_name}" file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/{event_name}/*.parquet" + schema_path = f"{SCHEMAS_PATH}/{network_name}/{protocol_name}/{event_name}.sql" client.command(f"drop table if exists {db_name}.{table_name}") - create_table_from_path(client, db_name, table_name, file_path) + create_table_from_schema(client, schema_path) + insert_data_from_path(client, db_name, table_name, file_path) if __name__ == "__main__": @@ -32,13 +35,13 @@ def import_parquet_files(client, network_name: str, protocol_name: str): network_name = os.getenv("NETWORK_NAME") or args.network_name protocol_name = os.getenv("PROTOCOL_NAME") or args.protocol_name - print(f"Cleaning {network_name} {protocol_name}") + print(f"Importing {network_name} {protocol_name} to clickhouse") if network_name is None or protocol_name is None: raise ValueError("Network and protocol must be provided") client = clickhouse_connect.get_client( - host="localhost", port=8123, username="default" + host="clickhouse", port=8123, username="default" ) import_parquet_files(client, network_name, protocol_name) diff --git a/indexers/scripts/utils.py b/indexers/scripts/utils.py index b0100643..91f064b6 100644 --- a/indexers/scripts/utils.py +++ b/indexers/scripts/utils.py @@ -1,16 +1,17 @@ from clickhouse_connect.driver.client import Client -def create_table_from_path(client: Client, db_name: str, table_name: str, path: str): - query = ( - f"create table if not exists {db_name}.{table_name} " - f"engine = MergeTree order by tuple() as " - f"select * from file('{path}', 'Parquet')" - ) +def create_table_from_schema(client: Client, path: str): + try: + with open(path, "r") as file: + query = file.read() + except FileNotFoundError: + print(f"Schema file {path} not found") + return try: client.command(query) except Exception as e: - print(f"Error creating table {db_name}.{table_name}: {e}") + print(f"Error creating table from schema {path}: {e}") def insert_data_from_path(client: Client, db_name: str, table_name: str, path: str): From e7204262962ad8dc72428c655c39f92ba6c9ccb2 Mon Sep 17 00:00:00 2001 From: marcus-snx Date: Wed, 18 Dec 2024 20:22:24 +0200 Subject: [PATCH 05/14] Store objects and arrays as string --- indexers/scripts/import_parquet.py | 8 +++++++- indexers/utils/clickhouse_schema.py | 9 +++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/indexers/scripts/import_parquet.py b/indexers/scripts/import_parquet.py index 9e7ba907..7808fdcf 100644 --- a/indexers/scripts/import_parquet.py +++ b/indexers/scripts/import_parquet.py @@ -41,7 +41,13 @@ def import_parquet_files(client, network_name: str, protocol_name: str): raise ValueError("Network and protocol must be provided") client = clickhouse_connect.get_client( - host="clickhouse", port=8123, username="default" + host="clickhouse", + port=8123, + username="default", + # settings={"allow_experimental_json_type": 1}, ) + db_name = f"raw_{network_name}" + client.command(f"CREATE DATABASE IF NOT EXISTS {db_name}") + import_parquet_files(client, network_name, protocol_name) diff --git a/indexers/utils/clickhouse_schema.py b/indexers/utils/clickhouse_schema.py index b1e7edd8..fee006eb 100644 --- a/indexers/utils/clickhouse_schema.py +++ b/indexers/utils/clickhouse_schema.py @@ -11,8 +11,6 @@ def to_snake(name): def map_to_clickhouse_type(sol_type): if sol_type in ["bytes32", "address", "string"]: return "String" - elif re.search(r"\(.*\)|\[\[$", sol_type): - return "JSON" elif re.match(r"uint\d+$", sol_type): bit_size = int(re.search(r"\d+", sol_type).group()) if bit_size <= 8: @@ -43,10 +41,13 @@ def map_to_clickhouse_type(sol_type): return "Int256" elif sol_type == "bool": return "Bool" + elif re.search(r"\(.*\)|\[\[$", sol_type): + return "String" elif sol_type.endswith("[]"): base_type = sol_type[:-2] - clickhouse_type = f"Array({map_to_clickhouse_type(base_type)})" - return clickhouse_type + # clickhouse_type = f"Array({map_to_clickhouse_type(base_type)})" + # return clickhouse_type + return "String" raise ValueError(f"Type {sol_type} not mapped") From a4534cf579c336f575775f67a4ff46bad23338d7 Mon Sep 17 00:00:00 2001 From: marcus-snx Date: Thu, 19 Dec 2024 02:05:17 +0200 Subject: [PATCH 06/14] Update import parquet script --- .gitignore | 1 + docker-compose.yml | 2 +- indexers/scripts/import_parquet.py | 21 +++++++++++++++------ 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index 8970ed2c..30e306d9 100644 --- a/.gitignore +++ b/.gitignore @@ -173,6 +173,7 @@ deployments logs # local data +clickhouse_data postgres-data parquet-data data diff --git a/docker-compose.yml b/docker-compose.yml index b2eb8762..039749e4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,7 @@ services: soft: 262144 hard: 262144 volumes: - - clickhouse_data:/var/lib/clickhouse + - ./clickhouse_data:/var/lib/clickhouse - ./parquet-data:/var/lib/clickhouse/user_files/parquet-data ports: - 8123:8123 diff --git a/indexers/scripts/import_parquet.py b/indexers/scripts/import_parquet.py index 7808fdcf..0433bc9d 100644 --- a/indexers/scripts/import_parquet.py +++ b/indexers/scripts/import_parquet.py @@ -9,7 +9,21 @@ SCHEMAS_PATH = "/parquet-data/indexers/schemas" +def init_tables_from_schemas(client, network_name: str, protocol_name: str): + print(f"Initializing tables for {network_name} {protocol_name}") + schema_path = Path(f"{SCHEMAS_PATH}/{network_name}/{protocol_name}") + db_name = f"raw_{network_name}" + + for schema_file in schema_path.glob("*.sql"): + event_name = schema_file.stem + table_name = f"{protocol_name}_{event_name}" + + client.command(f"drop table if exists {db_name}.{table_name}") + create_table_from_schema(client, str(schema_file)) + + def import_parquet_files(client, network_name: str, protocol_name: str): + print(f"Inserting {network_name} {protocol_name} data into tables") clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}") db_name = f"raw_{network_name}" @@ -19,10 +33,7 @@ def import_parquet_files(client, network_name: str, protocol_name: str): event_name = event_name.name table_name = f"{protocol_name}_{event_name}" file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/{event_name}/*.parquet" - schema_path = f"{SCHEMAS_PATH}/{network_name}/{protocol_name}/{event_name}.sql" - client.command(f"drop table if exists {db_name}.{table_name}") - create_table_from_schema(client, schema_path) insert_data_from_path(client, db_name, table_name, file_path) @@ -35,8 +46,6 @@ def import_parquet_files(client, network_name: str, protocol_name: str): network_name = os.getenv("NETWORK_NAME") or args.network_name protocol_name = os.getenv("PROTOCOL_NAME") or args.protocol_name - print(f"Importing {network_name} {protocol_name} to clickhouse") - if network_name is None or protocol_name is None: raise ValueError("Network and protocol must be provided") @@ -44,10 +53,10 @@ def import_parquet_files(client, network_name: str, protocol_name: str): host="clickhouse", port=8123, username="default", - # settings={"allow_experimental_json_type": 1}, ) db_name = f"raw_{network_name}" client.command(f"CREATE DATABASE IF NOT EXISTS {db_name}") + init_tables_from_schemas(client, network_name, protocol_name) import_parquet_files(client, network_name, protocol_name) From 89d63ea174b1f3475a686f6503e3a90919b39a4d Mon Sep 17 00:00:00 2001 From: marcus-snx Date: Thu, 19 Dec 2024 13:57:33 +0200 Subject: [PATCH 07/14] Fix clean data column naming --- indexers/scripts/clean_parquet.py | 2 ++ indexers/scripts/listener.py | 8 +------- indexers/scripts/utils.py | 6 ++++++ 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/indexers/scripts/clean_parquet.py b/indexers/scripts/clean_parquet.py index 5f9343cc..fdd735f0 100644 --- a/indexers/scripts/clean_parquet.py +++ b/indexers/scripts/clean_parquet.py @@ -2,6 +2,7 @@ from pathlib import Path import pandas as pd import os +from utils import convert_case RAW_DATA_PATH = "/parquet-data/indexers/raw" CLEAN_DATA_PATH = "/parquet-data/indexers/clean" @@ -33,6 +34,7 @@ def clean_parquet_files(network_name: str, protocol_name: str): if df.empty: empty_files += 1 continue + df.columns = [convert_case(col) for col in df.columns] event_dir.mkdir(parents=True, exist_ok=True) df.to_parquet(output_file, index=False) written_files += 1 diff --git a/indexers/scripts/listener.py b/indexers/scripts/listener.py index 74bcb6b9..54fc09bc 100644 --- a/indexers/scripts/listener.py +++ b/indexers/scripts/listener.py @@ -1,22 +1,16 @@ from pathlib import Path import time -import re import pandas as pd from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import clickhouse_connect -from utils import create_table_from_path, insert_data_from_path +from utils import create_table_from_path, insert_data_from_path, convert_case CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean" RAW_DATA_PATH = "/parquet-data/indexers/raw" CLEAN_DATA_PATH = "/parquet-data/indexers/clean" -def convert_case(name): - snake_case = re.sub(r"(? Date: Thu, 19 Dec 2024 14:11:55 +0200 Subject: [PATCH 08/14] Ignore temp dirs --- indexers/scripts/clean_parquet.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/indexers/scripts/clean_parquet.py b/indexers/scripts/clean_parquet.py index fdd735f0..397fd38d 100644 --- a/indexers/scripts/clean_parquet.py +++ b/indexers/scripts/clean_parquet.py @@ -20,6 +20,8 @@ def clean_parquet_files(network_name: str, protocol_name: str): for block_range_dir in sorted(raw_path.iterdir()): if not block_range_dir.is_dir(): continue + if "temp" in block_range_dir.name: + continue block_range = block_range_dir.name empty_files = 0 From badf5efdce6f7ab19fff5460182a7d612153a48c Mon Sep 17 00:00:00 2001 From: Troy Date: Thu, 19 Dec 2024 18:50:19 +0000 Subject: [PATCH 09/14] update naming --- extractors/src/clean.py | 2 +- indexers/utils/clickhouse_schema.py | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/extractors/src/clean.py b/extractors/src/clean.py index 85cca2de..784bf99a 100644 --- a/extractors/src/clean.py +++ b/extractors/src/clean.py @@ -81,7 +81,7 @@ def clean_data( ): function_name_snake = camel_to_snake(function_name) contract_name_snake = camel_to_snake(contract_name) - directory_name = f"{contract_name_snake}_call_{function_name_snake}" + directory_name = f"{contract_name_snake}_function_{function_name_snake}" input_labels, output_labels = get_labels(contract, function_name) # fix labels diff --git a/indexers/utils/clickhouse_schema.py b/indexers/utils/clickhouse_schema.py index acb95fb8..459f0ea3 100644 --- a/indexers/utils/clickhouse_schema.py +++ b/indexers/utils/clickhouse_schema.py @@ -67,7 +67,7 @@ def generate_clickhouse_schema(event_name, fields, network_name, protocol_name): for field_name, field_type in fields: if field_name == "id": clickhouse_type = "String" - query.append(f" `param_id` String,") + query.append(" `param_id` String,") else: clickhouse_type = map_to_clickhouse_type(field_type) query.append(f" `{to_snake(field_name)}` {clickhouse_type},") @@ -111,7 +111,6 @@ def process_abi_schemas(client, abi, path, contract_name, network_name, protocol network_name=network_name, protocol_name=protocol_name, ) - print(schema) client.command(schema) save_clickhouse_schema(path=path, event_name=event_name, schema=schema) @@ -121,12 +120,15 @@ def process_abi_schemas(client, abi, path, contract_name, network_name, protocol for f in functions: function_name = to_snake(f["name"]) contract_name = to_snake(contract_name) - function_name = f"{contract_name}_{function_name}" + function_name = f"{contract_name}_function_{function_name}" input_types = get_abi_input_types(f) input_names = get_abi_input_names(f) output_types = get_abi_output_types(f) - output_names = [o["name"] for o in f["outputs"]] + output_names = [ + o["name"] if "name" in o else f"output_{ind}" + for ind, o in enumerate(f["outputs"]) + ] all_names = input_names + output_names all_types = input_types + output_types From f1518f0cf3baf274dde867b0a8d07e0313091c8c Mon Sep 17 00:00:00 2001 From: Troy Date: Thu, 19 Dec 2024 19:11:12 +0000 Subject: [PATCH 10/14] update function schema --- indexers/utils/clickhouse_schema.py | 35 ++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/indexers/utils/clickhouse_schema.py b/indexers/utils/clickhouse_schema.py index 459f0ea3..d4f650ea 100644 --- a/indexers/utils/clickhouse_schema.py +++ b/indexers/utils/clickhouse_schema.py @@ -54,16 +54,30 @@ def map_to_clickhouse_type(sol_type): raise ValueError(f"Type {sol_type} not mapped") -def generate_clickhouse_schema(event_name, fields, network_name, protocol_name): - query = [ - f"CREATE TABLE IF NOT EXISTS raw_{network_name}.{protocol_name}_{event_name} (", - " `id` String,", - " `block_number` UInt64,", - " `block_timestamp` DateTime64(3, 'UTC'),", - " `transaction_hash` String,", - " `contract` String,", - " `event_name` String,", - ] +def generate_clickhouse_schema( + event_name, fields, network_name, protocol_name, abi_type="event" +): + if abi_type == "event": + query = [ + f"CREATE TABLE IF NOT EXISTS raw_{network_name}.{protocol_name}_{event_name} (", + " `id` String,", + " `block_number` UInt64,", + " `block_timestamp` DateTime64(3, 'UTC'),", + " `transaction_hash` String,", + " `contract` String,", + " `event_name` String,", + ] + elif abi_type == "function": + query = [ + f"CREATE TABLE IF NOT EXISTS raw_{network_name}.{protocol_name}_{event_name} (", + " `block_number` UInt64,", + " `contract_address` String,", + " `chain_id` UInt64,", + " `file_location` String,", + ] + else: + raise ValueError(f"Unknown ABI type {abi_type}") + for field_name, field_type in fields: if field_name == "id": clickhouse_type = "String" @@ -147,6 +161,7 @@ def process_abi_schemas(client, abi, path, contract_name, network_name, protocol fields=fields, network_name=network_name, protocol_name=protocol_name, + abi_type="function", ) client.command(schema) save_clickhouse_schema(path=path, event_name=function_name, schema=schema) From 681e569563c03df1d8cbb5ff2f11cce73d6b67dd Mon Sep 17 00:00:00 2001 From: Troy Date: Thu, 19 Dec 2024 19:28:09 +0000 Subject: [PATCH 11/14] table name --- indexers/utils/clickhouse_schema.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/indexers/utils/clickhouse_schema.py b/indexers/utils/clickhouse_schema.py index d4f650ea..aec4af59 100644 --- a/indexers/utils/clickhouse_schema.py +++ b/indexers/utils/clickhouse_schema.py @@ -57,9 +57,10 @@ def map_to_clickhouse_type(sol_type): def generate_clickhouse_schema( event_name, fields, network_name, protocol_name, abi_type="event" ): + table_name = f"raw_{network_name}.{protocol_name}_{event_name}" if abi_type == "event": query = [ - f"CREATE TABLE IF NOT EXISTS raw_{network_name}.{protocol_name}_{event_name} (", + f"CREATE TABLE IF NOT EXISTS {table_name} (", " `id` String,", " `block_number` UInt64,", " `block_timestamp` DateTime64(3, 'UTC'),", @@ -69,7 +70,7 @@ def generate_clickhouse_schema( ] elif abi_type == "function": query = [ - f"CREATE TABLE IF NOT EXISTS raw_{network_name}.{protocol_name}_{event_name} (", + f"CREATE TABLE IF NOT EXISTS {table_name} (", " `block_number` UInt64,", " `contract_address` String,", " `chain_id` UInt64,", From 8c952fb9cc08297ffb170b0e7d74b03cf85bdd7a Mon Sep 17 00:00:00 2001 From: Troy Date: Thu, 19 Dec 2024 19:29:29 +0000 Subject: [PATCH 12/14] fix schema condition --- indexers/utils/clickhouse_schema.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/indexers/utils/clickhouse_schema.py b/indexers/utils/clickhouse_schema.py index 9a4fddb1..7c0c8b8a 100644 --- a/indexers/utils/clickhouse_schema.py +++ b/indexers/utils/clickhouse_schema.py @@ -45,12 +45,7 @@ def map_to_clickhouse_type(sol_type): return "Int256" elif sol_type == "bool": return "Bool" - elif re.search(r"\(.*\)|\[\[$", sol_type): - return "String" - elif sol_type.endswith("[]"): - base_type = sol_type[:-2] - # clickhouse_type = f"Array({map_to_clickhouse_type(base_type)})" - # return clickhouse_type + elif sol_type.endswith("[]") or re.search(r"\(.*\)|\[\[$", sol_type): return "String" raise ValueError(f"Type {sol_type} not mapped") From 929bf49043736a81d30dbc726819a273dadbe95a Mon Sep 17 00:00:00 2001 From: Troy Date: Thu, 19 Dec 2024 22:47:28 +0000 Subject: [PATCH 13/14] update data insert --- extractors/src/extract.py | 2 +- extractors/src/insert.py | 4 ++-- indexers/utils/clickhouse_schema.py | 20 ++++++++++++-------- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/extractors/src/extract.py b/extractors/src/extract.py index 1f5a652e..0ebdc938 100644 --- a/extractors/src/extract.py +++ b/extractors/src/extract.py @@ -40,7 +40,7 @@ def extract_data( snx = get_synthetix(chain_config) function_name_snake = camel_to_snake(function_name) contract_name_snake = camel_to_snake(contract_name) - output_dir = f"/parquet-data/extractors/raw/{chain_config['name']}/{protocol}/{contract_name_snake}_call_{function_name_snake}" + output_dir = f"/parquet-data/extractors/raw/{chain_config['name']}/{protocol}/{contract_name_snake}_function_{function_name_snake}" # encode the call data contract = snx.contracts[package_name][contract_name]["contract"] diff --git a/extractors/src/insert.py b/extractors/src/insert.py index 059c8417..2ad61773 100644 --- a/extractors/src/insert.py +++ b/extractors/src/insert.py @@ -18,8 +18,8 @@ def insert_data(network: str, protocol: str, contract_name: str, function_name: client: Client = clickhouse_connect.get_client( host="clickhouse", port=8123, user="default" ) - table_name = f"{network}.{protocol}_{contract_name}_{function_name}" - file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network}/{protocol}/{contract_name}_call_{function_name}/*.parquet" + table_name = f"raw_{network}.{protocol}_{contract_name}_{function_name}" + file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network}/{protocol}/{contract_name}_function_{function_name}/*.parquet" query = f"insert into {table_name} select * from file('{file_path}', 'Parquet')" client.command(query) client.close() diff --git a/indexers/utils/clickhouse_schema.py b/indexers/utils/clickhouse_schema.py index 7c0c8b8a..b59dc415 100644 --- a/indexers/utils/clickhouse_schema.py +++ b/indexers/utils/clickhouse_schema.py @@ -50,10 +50,8 @@ def map_to_clickhouse_type(sol_type): raise ValueError(f"Type {sol_type} not mapped") -def generate_clickhouse_schema( - event_name, fields, network_name, protocol_name, abi_type="event" -): - table_name = f"raw_{network_name}.{protocol_name}_{event_name}" +def generate_clickhouse_schema(event_name, fields, network_name, abi_type="event"): + table_name = f"raw_{network_name}.{event_name}" if abi_type == "event": query = [ f"CREATE TABLE IF NOT EXISTS {table_name} (", @@ -110,7 +108,7 @@ def process_abi_schemas(client, abi, path, contract_name, network_name, protocol for event in events: event_name = to_snake(event["name"]) contract_name = to_snake(contract_name) - event_name = f"{contract_name}_event_{event_name}" + event_name = f"{protocol_name}_{contract_name}_event_{event_name}" input_names = get_abi_input_names(event) input_types = get_abi_input_types(event) @@ -120,7 +118,6 @@ def process_abi_schemas(client, abi, path, contract_name, network_name, protocol event_name=event_name, fields=fields, network_name=network_name, - protocol_name=protocol_name, ) client.command(schema) save_clickhouse_schema(path=path, event_name=event_name, schema=schema) @@ -131,15 +128,23 @@ def process_abi_schemas(client, abi, path, contract_name, network_name, protocol for f in functions: function_name = to_snake(f["name"]) contract_name = to_snake(contract_name) - function_name = f"{contract_name}_function_{function_name}" + function_name = f"{protocol_name}_{contract_name}_function_{function_name}" input_types = get_abi_input_types(f) input_names = get_abi_input_names(f) + input_names = [ + f"input_{ind}" if name == "" else name + for ind, name in enumerate(input_names) + ] output_types = get_abi_output_types(f) output_names = [ o["name"] if "name" in o else f"output_{ind}" for ind, o in enumerate(f["outputs"]) ] + output_names = [ + f"output_{ind}" if name == "" else name + for ind, name in enumerate(output_names) + ] all_names = input_names + output_names all_types = input_types + output_types @@ -157,7 +162,6 @@ def process_abi_schemas(client, abi, path, contract_name, network_name, protocol event_name=function_name, fields=fields, network_name=network_name, - protocol_name=protocol_name, abi_type="function", ) client.command(schema) From c5cc1e02e6a395a72a746ff0c18be8da2902fa73 Mon Sep 17 00:00:00 2001 From: Troy Date: Fri, 20 Dec 2024 20:03:02 +0000 Subject: [PATCH 14/14] fix insert --- extractors/src/insert.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extractors/src/insert.py b/extractors/src/insert.py index 2ad61773..2895ea28 100644 --- a/extractors/src/insert.py +++ b/extractors/src/insert.py @@ -18,7 +18,7 @@ def insert_data(network: str, protocol: str, contract_name: str, function_name: client: Client = clickhouse_connect.get_client( host="clickhouse", port=8123, user="default" ) - table_name = f"raw_{network}.{protocol}_{contract_name}_{function_name}" + table_name = f"raw_{network}.{protocol}_{contract_name}_function_{function_name}" file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network}/{protocol}/{contract_name}_function_{function_name}/*.parquet" query = f"insert into {table_name} select * from file('{file_path}', 'Parquet')" client.command(query)