diff --git a/cli/cmd/runtime/start.go b/cli/cmd/runtime/start.go index f18405aeeac..ea79ea64020 100644 --- a/cli/cmd/runtime/start.go +++ b/cli/cmd/runtime/start.go @@ -37,6 +37,7 @@ import ( _ "github.com/rilldata/rill/runtime/drivers/gcs" _ "github.com/rilldata/rill/runtime/drivers/https" _ "github.com/rilldata/rill/runtime/drivers/mysql" + _ "github.com/rilldata/rill/runtime/drivers/pinot" _ "github.com/rilldata/rill/runtime/drivers/postgres" _ "github.com/rilldata/rill/runtime/drivers/redshift" _ "github.com/rilldata/rill/runtime/drivers/s3" diff --git a/docs/docs/build/olap/olap.md b/docs/docs/build/olap/olap.md index 3c90a5feb07..95277bbcee6 100644 --- a/docs/docs/build/olap/olap.md +++ b/docs/docs/build/olap/olap.md @@ -25,6 +25,7 @@ Rill supports the use of several different OLAP engines to power your dashboards - [DuckDB](/reference/olap-engines/duckdb.md) - [Druid](/reference/olap-engines/druid.md) - [ClickHouse](/reference/olap-engines/clickhouse.md) +- [Pinot](/reference/olap-engines/pinot.md) :::note Additional OLAP Engines @@ -60,4 +61,12 @@ When ClickHouse has been configured as the [default OLAP engine](../../reference
![External ClickHouse tables](/img/build/connect/external-tables/external-clickhouse-table.png) -
\ No newline at end of file + + +## Pinot + +When Pinot has been configured as the [default OLAP engine](../../reference/project-files/rill-yaml.md#configuring-the-default-olap-engine) for your project, any existing external tables that Rill can read and query should be shown through the Rill Developer UI under `Tables` section in left pane. You can then create dashboards using these external Pinot tables. + +
+![External Pinot tables](/img/build/connect/external-tables/external-pinot-table.png) +
diff --git a/docs/docs/reference/olap-engines/pinot.md b/docs/docs/reference/olap-engines/pinot.md new file mode 100644 index 00000000000..d1185dc9a08 --- /dev/null +++ b/docs/docs/reference/olap-engines/pinot.md @@ -0,0 +1,99 @@ +--- +title: Pinot +description: Power Rill dashboards using Pinot +sidebar_label: Pinot +sidebar_position: 4 +--- + +## Overview + +[Apache Pinot](https://docs.pinot.apache.org/) is a real-time distributed OLAP datastore purpose-built for low-latency, high-throughput analytics, and perfect for user-facing analytical workloads. + +Rill supports connecting to an existing Pinot cluster and using it as an OLAP engine to power Rill dashboards built against [external tables](../../build/olap/olap.md#external-olap-tables). + +## Connection string (DSN) + +Rill connects to Pinot using the [Pinot Golang Client](https://docs.pinot.apache.org/users/clients/golang) and requires a connection string of the following format: `http://:@:`. +`host`and `port` should be of the Pinot Controller server. If `user` or `password` contain special characters they should be URL encoded (i.e. `p@ssword` -> `p%40ssword`). This should be set in the `connector.pinot.dsn` property in Rill. + +As an example, this typically looks something like: + +```bash +connector.pinot.dsn="https://username:password@localhost:9000" +``` + +:::info Need help connecting to Pinot? + +If you would like to connect Rill to an existing Pinot instance, please don't hesitate to [contact us](../../contact.md). We'd love to help! + +::: + +## Setting the default OLAP connection + +You'll also need to update the `olap_connector` property in your project's `rill.yaml` to change the default OLAP engine to Pinot: + +```yaml + +olap_connector: pinot + +``` + +:::note + +For more information about available properties in `rill.yaml`, see our [project YAML](../project-files/rill-yaml.md) documentation. + +::: + +:::info Interested in using multiple OLAP engines in the same project? + +Please see our [Using Multiple OLAP Engines](multiple-olap.md) page. + +::: + +## Configuring Rill Developer + +When using Rill for local development, there are two options to configure Rill to enable Pinot as an OLAP engine: +- You can set `connector.pinot.dsn` in your project's `.env` file or try pulling existing credentials locally using `rill env pull` if the project has already been deployed to Rill Cloud +- You can pass in `connector.pinot.dsn` as a variable to `rill start` directly (e.g. `rill start --var connector.pinot.dsn=...`) + +:::tip Getting DSN errors in dashboards after setting `.env`? + +If you are facing issues related to DSN connection errors in your dashboards even after setting the connection string via the project's `.env` file, try restarting Rill using the `rill start --reset` command. + +::: + +## Configuring Rill Cloud + +When deploying a Pinot-backed project to Rill Cloud, you have the following options to pass the appropriate connection string to Rill Cloud: +- Use the `rill env configure` command to set `connector.pinot.dsn` after deploying the project +- If `connector.pinot.dsn` has already been set in your project `.env`, you can push and update these variables directly in your cloud deployment by using the `rill env push` command + +:::info + +Note that you must `cd` into the Git repository that your project was deployed from before running `rill env configure`. + +::: + +## Support for Multi-Valued dimensions + +Multi-Valued dimensions needed to be defined in the dashboard yaml as expressions using `arrayToMv` function. For example if `RandomAirports` is a multi-valued column in a Pinot table then the dimension definition will look like: + +```yaml +- label: RandomAirports + expression: arrayToMv(RandomAirports) + name: RandomAirports + description: "Random Airports" + ignore: false +``` +Refer to the [Dashboard YAML](../project-files/dashboards) reference page for all dimension properties detail. + +:::note + +Pinot does not support unnest function so don't set `unnest` property to true in the dimension definition of dashboard yaml. + +::: + +## Additional Notes + +- At the moment, we do not support modeling with Pinot. If this is something you're interested in, please [contact us](../../contact.md). +- For dashboards powered by Pinot, [measure definitions](../../build/dashboards/dashboards.md#measures) are required to follow [Pinot SQL](https://docs.pinot.apache.org/users/user-guide-query/querying-pinot) syntax. \ No newline at end of file diff --git a/docs/static/img/build/connect/external-tables/external-pinot-table.png b/docs/static/img/build/connect/external-tables/external-pinot-table.png new file mode 100644 index 00000000000..7f097262f0f Binary files /dev/null and b/docs/static/img/build/connect/external-tables/external-pinot-table.png differ diff --git a/go.mod b/go.mod index afb29e588db..fc4b213f2ee 100644 --- a/go.mod +++ b/go.mod @@ -75,6 +75,7 @@ require ( github.com/snowflakedb/gosnowflake v1.8.0 github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 + github.com/startreedata/pinot-client-go v0.4.0 github.com/stretchr/testify v1.9.0 github.com/testcontainers/testcontainers-go v0.27.0 github.com/testcontainers/testcontainers-go/modules/clickhouse v0.27.0 @@ -206,6 +207,7 @@ require ( github.com/go-openapi/swag v0.22.3 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-zookeeper/zk v1.0.3 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v5 v5.1.0 // indirect diff --git a/go.sum b/go.sum index 58d73a611f1..30ef05dac02 100644 --- a/go.sum +++ b/go.sum @@ -706,6 +706,7 @@ github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2 h1:+vx7roKuyA63n github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2/go.mod h1:HBCaDeC1lPdgDeDbhX8XFpy1jqjK0IBG8W5K+xYqA0w= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/ProtonMail/go-crypto v0.0.0-20230217124315-7d5c6f04bbb8/go.mod h1:I0gYDMZ6Z5GRU7l58bNFSkPTFN6Yl12dsUlAZ8xy98g= +github.com/ProtonMail/go-crypto v0.0.0-20230518184743-7afd39499903/go.mod h1:8TI4H3IbrackdNgv+92dI+rhpCaLqM0IfpgCgenFvRE= github.com/ProtonMail/go-crypto v1.0.0 h1:LRuvITjQWX+WIfr930YHG2HNfjR1uOfyf5vE0kC2U78= github.com/ProtonMail/go-crypto v1.0.0/go.mod h1:EjAoLdwvbIOoOQr3ihjnSoLZRtE8azugULFRteWMNc0= github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= @@ -717,6 +718,7 @@ github.com/ViViDboarder/gotifier v0.0.0-20140619195515-0f19f3d7c54c h1:qLWjxZGLd github.com/ViViDboarder/gotifier v0.0.0-20140619195515-0f19f3d7c54c/go.mod h1:/nH+y85gO3ta3b6JtRWGA5hPIH35XJr/ZHXlfrBRx3A= github.com/XSAM/otelsql v0.27.0 h1:i9xtxtdcqXV768a5C6SoT/RkG+ue3JTOgkYInzlTOqs= github.com/XSAM/otelsql v0.27.0/go.mod h1:0mFB3TvLa7NCuhm/2nU7/b2wEtsczkj8Rey8ygO7V+A= +github.com/acomagu/bufpipe v1.0.4/go.mod h1:mxdxdup/WdsKVreO5GpW4+M/1CE2sMG4jeGJ2sYmHc4= github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ= github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm/4RlzPXRlREEwqTHAN3T56Bv2ITsFT3gY= github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk= @@ -745,6 +747,7 @@ github.com/apache/arrow/go/v12 v12.0.0 h1:xtZE63VWl7qLdB0JObIXvvhGjoVNrQ9ciIHG2O github.com/apache/arrow/go/v12 v12.0.0/go.mod h1:d+tV/eHZZ7Dz7RPrFKtPK02tpr+c9/PEd/zm8mDS9Vg= github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw= github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY= +github.com/apache/calcite-avatica-go/v5 v5.3.0/go.mod h1:xgozzeFAHCh2ZZ7NCrD4CHx9waunSMOMXLDZRj9Gn3s= github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/apache/thrift v0.18.1 h1:lNhK/1nqjbwbiOPDBPFJVKxgDEGSepKuTh6OLiXW8kg= github.com/apache/thrift v0.18.1/go.mod h1:rdQn/dCcDKEWjjylUeueum4vQEjG2v8v2PqriUnbr+I= @@ -1164,10 +1167,12 @@ github.com/go-fonts/liberation v0.2.0/go.mod h1:K6qoJYypsmfVjWg8KOVDQhLc8UDgIK2H github.com/go-fonts/stix v0.1.0/go.mod h1:w/c1f0ldAUlJmLBvlbkvVXLAD+tAMqobIIQpmnUIzUY= github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 h1:+zs/tPmkDkHx3U66DAb0lQFJrpS6731Oaa12ikc+DiI= github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376/go.mod h1:an3vInlBmSxCcxctByoQdvwPiA7DTK7jaaFDBTtu0ic= +github.com/go-git/go-billy/v5 v5.4.1/go.mod h1:vjbugF6Fz7JIflbVpl1hJsGjSHNltrSw45YK/ukIvQg= github.com/go-git/go-billy/v5 v5.5.0 h1:yEY4yhzCDuMGSv83oGxiBotRzhwhNr8VZyphhiu+mTU= github.com/go-git/go-billy/v5 v5.5.0/go.mod h1:hmexnoNsr2SJU1Ju67OaNz5ASJY3+sHgFRpCtpDCKow= github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399 h1:eMje31YglSBqCdIqdhKBW8lokaMrL3uTkpGYlE2OOT4= github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399/go.mod h1:1OCfN199q1Jm3HZlxleg+Dw/mwps2Wbk9frAWm+4FII= +github.com/go-git/go-git/v5 v5.7.0/go.mod h1:coJHKEOk5kUClpsNlXrUvPrDxY3w3gjHvhcZd8Fodw8= github.com/go-git/go-git/v5 v5.12.0 h1:7Md+ndsjrzZxbddRDZjF14qK+NN56sy6wkqaVrjZtys= github.com/go-git/go-git/v5 v5.12.0/go.mod h1:FTM9VKtnI2m65hNI/TenDDDnUf2Q9FHnXYjuz9i5OEY= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= @@ -1241,6 +1246,8 @@ github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8Wd github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/go-test/deep v1.0.4 h1:u2CU3YKy9I2pmu9pX0eq50wCgjfGIt539SqR7FbHiho= github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= +github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg= +github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= github.com/gocarina/gocsv v0.0.0-20231116093920-b87c2d0e983a h1:RYfmiM0zluBJOiPDJseKLEN4BapJ42uSi9SZBQ2YyiA= github.com/gocarina/gocsv v0.0.0-20231116093920-b87c2d0e983a/go.mod h1:5YoVOkjYAQumqlV356Hj3xeYh4BdZuLE0/nRkf2NKkI= github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= @@ -1473,6 +1480,8 @@ github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerX github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= @@ -1496,6 +1505,7 @@ github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0/go.mod h1:N0 github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/icholy/digest v0.1.22/go.mod h1:uLAeDdWKIWNFMH0wqbwchbTQOmJWhzSnL7zmqSPqEEc= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.10/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= @@ -1566,6 +1576,12 @@ github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI= github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI= github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ= @@ -1676,6 +1692,7 @@ github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ github.com/marcboeker/go-duckdb v1.6.2 h1:BlsvrL5dFmTSOCmLG3iLTCaGgH/typTOwgfrE/IrCdI= github.com/marcboeker/go-duckdb v1.6.2/go.mod h1:WtWeqqhZoTke/Nbd7V9lnBx7I2/A/q0SAq/urGzPCMs= github.com/marstr/guid v1.1.0/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho= +github.com/matryer/is v1.2.0/go.mod h1:2fLPjFQM9rhQ15aVEtbuwhJinnOqrmgXPNdZsdwlWXA= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= @@ -1991,6 +2008,7 @@ github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= +github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I= github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 h1:n661drycOFuPLCN3Uc8sB6B/s6Z4t2xvBgU1htSHuq8= github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3/go.mod h1:A0bzQcvG0E7Rwjx0REVgAGH58e96+X0MeOfepqsbeW4= github.com/shirou/gopsutil/v3 v3.23.11 h1:i3jP9NjCPUz7FiZKxlMnODZkdSIp2gnzfrvsu9CuWEQ= @@ -2014,6 +2032,7 @@ github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/skeema/knownhosts v1.1.1/go.mod h1:g4fPeYpque7P0xefxtGzV81ihjC8sX2IqpAoNkjxbMo= github.com/skeema/knownhosts v1.2.2 h1:Iug2P4fLmDw9f41PB6thxUkNUkJzB5i+1/exaj40L3A= github.com/skeema/knownhosts v1.2.2/go.mod h1:xYbVRSPxqBZFrdmDyMmsOs+uX1UZC3nTN3ThzgDxUwo= github.com/slack-go/slack v0.12.5 h1:ddZ6uz6XVaB+3MTDhoW04gG+Vc/M/X1ctC+wssy2cqs= @@ -2054,6 +2073,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= +github.com/startreedata/pinot-client-go v0.4.0 h1:2AVI5HtvOGelgBKikMze/zlqK7IKS7SaIUJumruX1ZM= +github.com/startreedata/pinot-client-go v0.4.0/go.mod h1:nLpzufhX949nlHBG0Z02fi7xyxVTmqgMe7c1D50StWs= github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/runtime/drivers/olap.go b/runtime/drivers/olap.go index 21457d31fa6..4c2f7475e4f 100644 --- a/runtime/drivers/olap.go +++ b/runtime/drivers/olap.go @@ -115,6 +115,7 @@ const ( DialectDuckDB DialectDruid DialectClickHouse + DialectPinot ) func (d Dialect) String() string { @@ -127,6 +128,8 @@ func (d Dialect) String() string { return "druid" case DialectClickHouse: return "clickhouse" + case DialectPinot: + return "pinot" default: panic("not implemented") } diff --git a/runtime/drivers/pinot/olap.go b/runtime/drivers/pinot/olap.go new file mode 100644 index 00000000000..8047e4bd9ce --- /dev/null +++ b/runtime/drivers/pinot/olap.go @@ -0,0 +1,321 @@ +package pinot + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/jmoiron/sqlx" + runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" + "github.com/rilldata/rill/runtime/drivers" + "golang.org/x/sync/errgroup" +) + +var _ drivers.OLAPStore = &connection{} + +// AddTableColumn implements drivers.OLAPStore. +func (c *connection) AddTableColumn(ctx context.Context, tableName, columnName, typ string) error { + return fmt.Errorf("pinot: data transformation not yet supported") +} + +// AlterTableColumn implements drivers.OLAPStore. +func (c *connection) AlterTableColumn(ctx context.Context, tableName, columnName, newType string) error { + return fmt.Errorf("pinot: data transformation not yet supported") +} + +// CreateTableAsSelect implements drivers.OLAPStore. +func (c *connection) CreateTableAsSelect(ctx context.Context, name string, view bool, sql string) error { + return fmt.Errorf("pinot: data transformation not yet supported") +} + +// DropTable implements drivers.OLAPStore. +func (c *connection) DropTable(ctx context.Context, name string, view bool) error { + return fmt.Errorf("pinot: data transformation not yet supported") +} + +// InsertTableAsSelect implements drivers.OLAPStore. +func (c *connection) InsertTableAsSelect(ctx context.Context, name string, byName bool, sql string) error { + return fmt.Errorf("pinot: data transformation not yet supported") +} + +// RenameTable implements drivers.OLAPStore. +func (c *connection) RenameTable(ctx context.Context, name, newName string, view bool) error { + return fmt.Errorf("pinot: data transformation not yet supported") +} + +func (c *connection) Dialect() drivers.Dialect { + return drivers.DialectPinot +} + +func (c *connection) WithConnection(ctx context.Context, priority int, longRunning, tx bool, fn drivers.WithConnectionFunc) error { + return fmt.Errorf("pinot: WithConnection not supported") +} + +func (c *connection) EstimateSize() (int64, bool) { + return 0, false +} + +func (c *connection) Exec(ctx context.Context, stmt *drivers.Statement) error { + res, err := c.Execute(ctx, stmt) + if err != nil { + return err + } + if stmt.DryRun { + return nil + } + return res.Close() +} + +func (c *connection) Execute(ctx context.Context, stmt *drivers.Statement) (*drivers.Result, error) { + if stmt.DryRun { + rows, err := c.db.QueryxContext(ctx, "EXPLAIN PLAN FOR "+stmt.Query, stmt.Args...) + if err != nil { + return nil, err + } + + return nil, rows.Close() + } + + var cancelFunc context.CancelFunc + if stmt.ExecutionTimeout != 0 { + ctx, cancelFunc = context.WithTimeout(ctx, stmt.ExecutionTimeout) + } + + rows, err := c.db.QueryxContext(ctx, stmt.Query, stmt.Args...) + if err != nil { + if cancelFunc != nil { + cancelFunc() + } + return nil, err + } + + schema, err := rowsToSchema(rows) + if err != nil { + rows.Close() + if cancelFunc != nil { + cancelFunc() + } + return nil, err + } + + r := &drivers.Result{Rows: rows, Schema: schema} + r.SetCleanupFunc(func() error { + if cancelFunc != nil { + cancelFunc() + } + return nil + }) + + return r, nil +} + +type informationSchema struct { + c *connection +} + +func (c *connection) InformationSchema() drivers.InformationSchema { + return informationSchema{c: c} +} + +func (i informationSchema) All(ctx context.Context) ([]*drivers.Table, error) { + // query /tables endpoint, for each table name, query /tables/{tableName}/schema + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, i.c.baseURL+"/tables", http.NoBody) + for k, v := range i.c.headers { + req.Header.Set(k, v) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + var tablesResp pinotTables + err = json.NewDecoder(resp.Body).Decode(&tablesResp) + if err != nil { + return nil, err + } + + tables := make([]*drivers.Table, 0, len(tablesResp.Tables)) + // fetch table schemas in parallel with concurrency of 5 + g, ctx := errgroup.WithContext(ctx) + sem := make(chan struct{}, 5) + for _, tableName := range tablesResp.Tables { + tableName := tableName + g.Go(func() error { + sem <- struct{}{} + defer func() { <-sem }() + + table, err := i.Lookup(ctx, "", "", tableName) + if err != nil { + fmt.Printf("Error fetching schema for table %s: %v\n", tableName, err) + return nil + } + tables = append(tables, table) + return nil + }) + } + if err := g.Wait(); err != nil { + return nil, err + } + + return tables, nil +} + +func (i informationSchema) Lookup(ctx context.Context, db, schema, name string) (*drivers.Table, error) { + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, i.c.baseURL+"/tables/"+name+"/schema", http.NoBody) + for k, v := range i.c.headers { + req.Header.Set(k, v) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + var schemaResponse pinotSchema + err = json.NewDecoder(resp.Body).Decode(&schemaResponse) + if err != nil { + return nil, err + } + + unsupportedCols := make(map[string]string) + var schemaFields []*runtimev1.StructType_Field + for _, field := range schemaResponse.DateTimeFieldSpecs { + if field.DataType != "TIMESTAMP" { + unsupportedCols[field.Name] = field.DataType + "_(DATE_TIME_FIELD)" + continue + } + schemaFields = append(schemaFields, &runtimev1.StructType_Field{Name: field.Name, Type: databaseTypeToPB(field.DataType, !field.NotNull, true)}) + } + for _, field := range schemaResponse.DimensionFieldSpecs { + singleValueField := true + if field.SingleValueField != nil { + singleValueField = *field.SingleValueField + } + if !singleValueField { + // Skip array fields for now + unsupportedCols[field.Name] = field.DataType + "_ARRAY" + continue + } + schemaFields = append(schemaFields, &runtimev1.StructType_Field{Name: field.Name, Type: databaseTypeToPB(field.DataType, !field.NotNull, singleValueField)}) + } + for _, field := range schemaResponse.MetricFieldSpecs { + singleValueField := true + if field.SingleValueField != nil { + singleValueField = *field.SingleValueField + } + if !singleValueField { + // Skip array fields for now + unsupportedCols[field.Name] = field.DataType + "_ARRAY" + continue + } + schemaFields = append(schemaFields, &runtimev1.StructType_Field{Name: field.Name, Type: databaseTypeToPB(field.DataType, !field.NotNull, singleValueField)}) + } + + // Mapping the schemaResponse to your Table structure + table := &drivers.Table{ + Database: "", + DatabaseSchema: "", + Name: name, + View: false, + Schema: &runtimev1.StructType{Fields: schemaFields}, + UnsupportedCols: unsupportedCols, + } + + return table, nil +} + +func rowsToSchema(r *sqlx.Rows) (*runtimev1.StructType, error) { + if r == nil { + return nil, nil + } + + cts, err := r.ColumnTypes() + if err != nil { + return nil, err + } + + fields := make([]*runtimev1.StructType_Field, len(cts)) + for i, ct := range cts { + nullable, ok := ct.Nullable() + if !ok { + nullable = true + } + + fields[i] = &runtimev1.StructType_Field{ + Name: ct.Name(), + Type: databaseTypeToPB(ct.DatabaseTypeName(), nullable, true), + } + } + + return &runtimev1.StructType{Fields: fields}, nil +} + +func databaseTypeToPB(dbt string, nullable, singleValueField bool) *runtimev1.Type { + t := &runtimev1.Type{Nullable: nullable} + if !singleValueField { + // currently we don't support array fields, so unreachable code + t.Code = runtimev1.Type_CODE_ARRAY + t.ArrayElementType = databaseTypeToPB(dbt, false, true) + return t + } + switch dbt { + case "INT": + t.Code = runtimev1.Type_CODE_INT32 + case "LONG": + t.Code = runtimev1.Type_CODE_INT64 + case "FLOAT": + t.Code = runtimev1.Type_CODE_FLOAT32 + case "DOUBLE": + t.Code = runtimev1.Type_CODE_FLOAT64 + case "BIG_DECIMAL": + t.Code = runtimev1.Type_CODE_STRING + case "BOOLEAN": + t.Code = runtimev1.Type_CODE_BOOL + case "STRING": + t.Code = runtimev1.Type_CODE_STRING + case "TIMESTAMP": + t.Code = runtimev1.Type_CODE_TIMESTAMP + case "JSON": + t.Code = runtimev1.Type_CODE_JSON + case "BYTES": + t.Code = runtimev1.Type_CODE_BYTES + default: + t.Code = runtimev1.Type_CODE_STRING + } + + return t +} + +type pinotTables struct { + Tables []string `json:"tables"` +} + +type pinotSchema struct { + SchemaName string `json:"schemaName"` + EnableColumnBasedNullHandling bool `json:"enableColumnBasedNullHandling"` + DimensionFieldSpecs []pinotFieldSpec `json:"dimensionFieldSpecs"` + MetricFieldSpecs []pinotFieldSpec `json:"metricFieldSpecs"` + DateTimeFieldSpecs []pinotFieldSpec `json:"dateTimeFieldSpecs"` +} + +type pinotFieldSpec struct { + Name string `json:"name"` + DataType string `json:"dataType"` + SingleValueField *bool `json:"singleValueField"` + NotNull bool `json:"notNull"` + DefaultNullValue interface{} `json:"defaultNullValue"` + Format string `json:"format"` // only for timeFieldSpec + Granularity string `json:"granularity"` // only for timeFieldSpec +} diff --git a/runtime/drivers/pinot/pinot.go b/runtime/drivers/pinot/pinot.go new file mode 100644 index 00000000000..e1fe7faebc8 --- /dev/null +++ b/runtime/drivers/pinot/pinot.go @@ -0,0 +1,160 @@ +package pinot + +import ( + "context" + "fmt" + + "github.com/jmoiron/sqlx" + "github.com/rilldata/rill/runtime/drivers" + "github.com/rilldata/rill/runtime/drivers/pinot/sqldriver" + "github.com/rilldata/rill/runtime/pkg/activity" + "go.uber.org/zap" +) + +func init() { + drivers.Register("pinot", driver{}) +} + +var spec = drivers.Spec{ + DisplayName: "Pinot", + Description: "Connect to Apache Pinot.", + DocsURL: "https://docs.rilldata.com/reference/olap-engines/pinot", + ConfigProperties: []*drivers.PropertySpec{ + { + Key: "dsn", + Type: drivers.StringPropertyType, + Required: true, + DisplayName: "Connection string", + Placeholder: "http(s)://username:password@localhost:9000", + Secret: true, + }, + }, + SourceProperties: nil, + ImplementsOLAP: true, +} + +type driver struct{} + +// Open a connection to Apache Pinot using HTTP API. +func (d driver) Open(instanceID string, config map[string]any, client *activity.Client, logger *zap.Logger) (drivers.Handle, error) { + if instanceID == "" { + return nil, fmt.Errorf("pinot driver can't be shared") + } + dsn, ok := config["dsn"].(string) + if !ok || dsn == "" { + return nil, fmt.Errorf("require dsn to open pinot connection") + } + + db, err := sqlx.Open("pinot", dsn) + if err != nil { + return nil, err + } + + // very roughly approximating num queries required for a typical page load + db.SetMaxOpenConns(20) + + err = db.Ping() + if err != nil { + return nil, fmt.Errorf("pinot: %w", err) + } + + controller, headers, err := sqldriver.ParseDSN(dsn) + if err != nil { + return nil, err + } + + conn := &connection{ + db: db, + config: config, + baseURL: controller, + headers: headers, + } + return conn, nil +} + +func (d driver) Spec() drivers.Spec { + return spec +} + +func (d driver) HasAnonymousSourceAccess(ctx context.Context, src map[string]any, logger *zap.Logger) (bool, error) { + return false, fmt.Errorf("not implemented") +} + +func (d driver) TertiarySourceConnectors(ctx context.Context, src map[string]any, logger *zap.Logger) ([]string, error) { + return nil, fmt.Errorf("not implemented") +} + +type connection struct { + db *sqlx.DB + config map[string]any + baseURL string + headers map[string]string +} + +// Driver implements drivers.Connection. +func (c *connection) Driver() string { + return "pinot" +} + +// Config used to open the Connection +func (c *connection) Config() map[string]any { + return c.config +} + +// Close implements drivers.Connection. +func (c *connection) Close() error { + return c.db.Close() +} + +func (c *connection) AsRegistry() (drivers.RegistryStore, bool) { + return nil, false +} + +func (c *connection) AsCatalogStore(instanceID string) (drivers.CatalogStore, bool) { + return nil, false +} + +func (c *connection) AsRepoStore(instanceID string) (drivers.RepoStore, bool) { + return nil, false +} + +func (c *connection) AsAdmin(instanceID string) (drivers.AdminService, bool) { + return nil, false +} + +func (c *connection) AsAI(instanceID string) (drivers.AIService, bool) { + return nil, false +} + +func (c *connection) AsOLAP(instanceID string) (drivers.OLAPStore, bool) { + return c, true +} + +func (c *connection) Migrate(ctx context.Context) (err error) { + return nil +} + +func (c *connection) MigrationStatus(ctx context.Context) (current, desired int, err error) { + return 0, 0, nil +} + +func (c *connection) AsObjectStore() (drivers.ObjectStore, bool) { + return nil, false +} + +func (c *connection) AsTransporter(from, to drivers.Handle) (drivers.Transporter, bool) { + return nil, false +} + +func (c *connection) AsFileStore() (drivers.FileStore, bool) { + return nil, false +} + +func (c *connection) AsSQLStore() (drivers.SQLStore, bool) { + return nil, false +} + +// AsNotifier implements drivers.Connection. +func (c *connection) AsNotifier(properties map[string]any) (drivers.Notifier, error) { + return nil, drivers.ErrNotNotifier +} diff --git a/runtime/drivers/pinot/sqldriver/driver.go b/runtime/drivers/pinot/sqldriver/driver.go new file mode 100644 index 00000000000..6e9abe6d395 --- /dev/null +++ b/runtime/drivers/pinot/sqldriver/driver.go @@ -0,0 +1,298 @@ +package sqldriver + +import ( + "context" + "database/sql" + sqlDriver "database/sql/driver" + "encoding/base64" + "errors" + "fmt" + "io" + "math" + "math/big" + "net/url" + "reflect" + "strings" + "time" + + "github.com/startreedata/pinot-client-go/pinot" +) + +type pinotDriver struct{} + +func (d *pinotDriver) Open(dsn string) (sqlDriver.Conn, error) { + address, headers, err := ParseDSN(dsn) + if err != nil { + return nil, err + } + pinotConn, err := pinot.NewWithConfig(&pinot.ClientConfig{ + ExtraHTTPHeader: headers, + ControllerConfig: &pinot.ControllerConfig{ + ExtraControllerAPIHeaders: headers, + ControllerAddress: address, + }, + }) + if err != nil { + return nil, err + } + // We have joins and nested queries which are supported by multistage engine + pinotConn.UseMultistageEngine(true) + return &conn{pinotConn: pinotConn}, nil +} + +func init() { + sql.Register("pinot", &pinotDriver{}) +} + +type conn struct { + pinotConn *pinot.Connection +} + +func (c *conn) Prepare(query string) (sqlDriver.Stmt, error) { + return nil, fmt.Errorf("unsupported") +} + +func (c *conn) Close() error { + return nil +} + +func (c *conn) Begin() (sqlDriver.Tx, error) { + return nil, fmt.Errorf("unsupported") +} + +func (c *conn) QueryContext(ctx context.Context, query string, args []sqlDriver.NamedValue) (sqlDriver.Rows, error) { + if len(args) > 0 { + q, err := completeQuery(query, args) + if err != nil { + return nil, err + } + query = q + } + // TODO: cancel the query if ctx is done + resp, err := c.pinotConn.ExecuteSQL("", query) + if err != nil { + return nil, err + } + if resp.Exceptions != nil && len(resp.Exceptions) > 0 { + if len(resp.Exceptions) == 1 { + return nil, fmt.Errorf("query error: %q: %q", resp.Exceptions[0].ErrorCode, resp.Exceptions[0].Message) + } + errMsg := "query errors:\n" + for _, e := range resp.Exceptions { + errMsg += fmt.Sprintf("\t%q: %q\n", e.ErrorCode, e.Message) + } + return nil, errors.New(errMsg) + } + + cols := colSchema(resp.ResultTable) + + return &rows{results: resp.ResultTable, columns: cols, numRows: resp.ResultTable.GetRowCount(), currIdx: 0}, nil +} + +func (c *conn) ExecContext(ctx context.Context, query string, args []sqlDriver.NamedValue) (sqlDriver.Result, error) { + return nil, fmt.Errorf("unsupported") +} + +func (c *conn) Ping(ctx context.Context) error { + rows, err := c.QueryContext(ctx, "SELECT 1", nil) + if err != nil { + return err + } + defer rows.Close() + return nil +} + +type rows struct { + results *pinot.ResultTable + columns []column + numRows int + currIdx int +} + +func (r *rows) Columns() []string { + return r.results.DataSchema.ColumnNames +} + +func (r *rows) Close() error { + return nil +} + +func (r *rows) Next(dest []sqlDriver.Value) error { + if r.currIdx >= r.numRows { + return io.EOF + } + for i := range len(r.Columns()) { + dest[i] = r.goValue(r.currIdx, i, r.results.GetColumnDataType(i)) + } + r.currIdx++ + return nil +} + +func (r *rows) ColumnTypeScanType(index int) reflect.Type { + return r.columns[index].scanType +} + +func (r *rows) ColumnTypeDatabaseTypeName(index int) string { + return r.columns[index].pinotType +} + +type column struct { + name string + pinotType string + scanType reflect.Type +} + +func colSchema(results *pinot.ResultTable) []column { + var cols []column + for i := 0; i < results.GetColumnCount(); i++ { + cols = append(cols, column{ + name: results.GetColumnName(i), + pinotType: results.GetColumnDataType(i), + scanType: scanType(results.GetColumnDataType(i)), + }) + } + return cols +} + +func scanType(pinotType string) reflect.Type { + switch pinotType { + case "INT": + return reflect.TypeOf(int32(0)) + case "LONG": + return reflect.TypeOf(int64(0)) + case "FLOAT": + return reflect.TypeOf(float32(0)) + case "DOUBLE": + return reflect.TypeOf(float64(0)) + case "STRING": + return reflect.TypeOf("") + case "BYTES": + return reflect.TypeOf("") + case "BIG_DECIMAL": + return reflect.TypeOf(big.Float{}) + case "TIMESTAMP": + return reflect.TypeOf(time.Time{}) + case "BOOLEAN": + return reflect.TypeOf(false) + default: + return reflect.TypeOf("") + } +} + +func (r *rows) goValue(rowIdx, coldIdx int, pinotType string) interface{} { + if r.results.Get(rowIdx, coldIdx) == nil { + return nil + } + switch pinotType { + case "INT": + // check if interface is string as it may be NaN + if reflect.TypeOf(r.results.Get(rowIdx, coldIdx)).String() == "string" { + return int32(math.NaN()) + } + return r.results.GetInt(rowIdx, coldIdx) + case "LONG": + if reflect.TypeOf(r.results.Get(rowIdx, coldIdx)).String() == "string" { + return int64(math.NaN()) + } + return r.results.GetLong(rowIdx, coldIdx) + case "FLOAT": + if reflect.TypeOf(r.results.Get(rowIdx, coldIdx)).String() == "string" { + return float32(math.NaN()) + } + return r.results.GetFloat(rowIdx, coldIdx) + case "DOUBLE": + if reflect.TypeOf(r.results.Get(rowIdx, coldIdx)).String() == "string" { + return math.NaN() + } + return r.results.GetDouble(rowIdx, coldIdx) + case "STRING": + return r.results.GetString(rowIdx, coldIdx) + case "BYTES": + // return hex string as it is + return r.results.GetString(rowIdx, coldIdx) + case "BIG_DECIMAL": + return r.results.Get(rowIdx, coldIdx) + case "TIMESTAMP": + // convert iso8601 formatted string to time.Time + t, err := time.Parse("2006-01-02 15:04:05.0", r.results.GetString(rowIdx, coldIdx)) + if err != nil { + return err + } + return t + case "BOOLEAN": + return r.results.Get(rowIdx, coldIdx).(bool) + default: + return reflect.TypeOf("") + } +} + +// ParseDSN parses the DSN string to extract the controller address and basic auth credentials +func ParseDSN(dsn string) (string, map[string]string, error) { + // validate dsn - it should be a valid URL, may contain basic auth credentials + u, err := url.Parse(dsn) + if err != nil { + return "", nil, fmt.Errorf("invalid DSN: %w", err) + } + + var authHeader map[string]string + if u.User != nil { + uname := u.User.Username() + pwd, passwordSet := u.User.Password() + if uname == "" || !passwordSet { + return "", nil, fmt.Errorf("DSN should contain valid basic auth credentials") + } + // clear user info from URL so that u.String() doesn't include it + u.User = nil + authString := fmt.Sprintf("%s:%s", uname, pwd) + authHeader = map[string]string{ + "Authorization": fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(authString))), + } + } + return u.String(), authHeader, nil +} + +func completeQuery(query string, args []sqlDriver.NamedValue) (string, error) { + parts := strings.Split(query, "?") + if len(parts)-1 != len(args) { + return "", fmt.Errorf("mismatch in the number of placeholders and arguments") + } + + var sb strings.Builder + for i, part := range parts { + sb.WriteString(part) + if i < len(args) { + argStr, err := formatArg(args[i].Value) + if err != nil { + return "", err + } + sb.WriteString(argStr) + } + } + + return sb.String(), nil +} + +func formatArg(value sqlDriver.Value) (string, error) { + switch v := value.(type) { + case string: + // Escape any single quotes in the string + escaped := strings.ReplaceAll(v, "'", "''") + return fmt.Sprintf("'%s'", escaped), nil + case *big.Int, *big.Float: + // For pinot types - BIG_INT and BIG_DECIMAL - enclose in single quotes + return fmt.Sprintf("'%v'", v), nil + case []byte: + // For pinot type - BYTES - convert to Hex string and enclose in single quotes + hexString := fmt.Sprintf("%x", v) + return fmt.Sprintf("'%s'", hexString), nil + case time.Time: + // For pinot type - TIMESTAMP - convert to below ISO8601 format that it expects and enclose in single quotes + return fmt.Sprintf("'%s'", v.Format("2006-01-02 15:04:05.000Z")), nil + case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, bool: + // For types - INT, LONG, FLOAT, DOUBLE and BOOLEAN use as-is + return fmt.Sprintf("%v", v), nil + default: + // Throw error for unsupported types + return "", fmt.Errorf("unsupported type: %T", v) + } +} diff --git a/runtime/drivers/pinot/sqldriver/sql_param_test.go b/runtime/drivers/pinot/sqldriver/sql_param_test.go new file mode 100644 index 00000000000..497a48166f8 --- /dev/null +++ b/runtime/drivers/pinot/sqldriver/sql_param_test.go @@ -0,0 +1,37 @@ +package sqldriver + +import ( + "testing" + "time" + + sqlDriver "database/sql/driver" + "github.com/stretchr/testify/require" +) + +func Test_main(t *testing.T) { + query := "SELECT * FROM users WHERE id = ? AND name = ?" + args := []sqlDriver.NamedValue{ + {Ordinal: 1, Value: 123}, + {Ordinal: 2, Value: "John"}, + } + q, err := completeQuery(query, args) + require.NoError(t, err) + require.Equal(t, "SELECT * FROM users WHERE id = 123 AND name = 'John'", q) + + args = []sqlDriver.NamedValue{ + {Ordinal: 1, Value: 123.5}, + {Ordinal: 2, Value: ""}, + } + q, err = completeQuery(query, args) + require.NoError(t, err) + require.Equal(t, "SELECT * FROM users WHERE id = 123.5 AND name = ''", q) + + now := time.Now() + args = []sqlDriver.NamedValue{ + {Ordinal: 1, Value: now}, + {Ordinal: 2, Value: true}, + } + q, err = completeQuery(query, args) + require.NoError(t, err) + require.Equal(t, "SELECT * FROM users WHERE id = '"+now.Format("2006-01-02 15:04:05.000Z")+"' AND name = true", q) +} diff --git a/runtime/queries/metricsview.go b/runtime/queries/metricsview.go index 017f75743d6..c51930935d5 100644 --- a/runtime/queries/metricsview.go +++ b/runtime/queries/metricsview.go @@ -373,11 +373,11 @@ func (builder *ExpressionBuilder) buildLikeExpression(cond *runtimev1.Condition) var clause string // Build [NOT] len(list_filter("dim", x -> x ILIKE ?)) > 0 - if unnest && builder.dialect != drivers.DialectDruid { + if unnest && builder.dialect != drivers.DialectDruid && builder.dialect != drivers.DialectPinot { clause = fmt.Sprintf("%s len(list_filter((%s), x -> x ILIKE %s)) > 0", notKeyword, leftExpr, rightExpr) } else { - if builder.dialect == drivers.DialectDruid { - // Druid does not support ILIKE + if builder.dialect == drivers.DialectDruid || builder.dialect == drivers.DialectPinot { + // Druid and Pinot does not support ILIKE clause = fmt.Sprintf("LOWER(%s) %s LIKE LOWER(CAST(%s AS VARCHAR))", leftExpr, notKeyword, rightExpr) } else { clause = fmt.Sprintf("(%s) %s ILIKE %s", leftExpr, notKeyword, rightExpr) diff --git a/runtime/queries/metricsview_aggregation.go b/runtime/queries/metricsview_aggregation.go index 792e20e86df..2fae1dce01c 100644 --- a/runtime/queries/metricsview_aggregation.go +++ b/runtime/queries/metricsview_aggregation.go @@ -90,7 +90,7 @@ func (q *MetricsViewAggregation) Resolve(ctx context.Context, rt *runtime.Runtim } defer release() - if olap.Dialect() != drivers.DialectDuckDB && olap.Dialect() != drivers.DialectDruid && olap.Dialect() != drivers.DialectClickHouse { + if olap.Dialect() != drivers.DialectDuckDB && olap.Dialect() != drivers.DialectDruid && olap.Dialect() != drivers.DialectClickHouse && olap.Dialect() != drivers.DialectPinot { return fmt.Errorf("not available for dialect '%s'", olap.Dialect()) } @@ -879,6 +879,9 @@ func (q *MetricsViewAggregation) buildTimestampExpr(mv *runtimev1.MetricsViewSpe return fmt.Sprintf("date_trunc('%s', %s)", dialect.ConvertToDateTruncSpecifier(dim.TimeGrain), col), nil, nil } return fmt.Sprintf("toTimezone(date_trunc('%s', toTimezone(%s::TIMESTAMP, ?)), ?)", dialect.ConvertToDateTruncSpecifier(dim.TimeGrain), col), []any{dim.TimeZone, dim.TimeZone}, nil + case drivers.DialectPinot: + // ToDateTime format truncates millis to secs because we don't support that, for example timeseries api does timestamppb.New(ts) which truncates to seconds + return fmt.Sprintf("ToDateTime(date_trunc('%s', %s, 'MILLISECONDS', ?), 'yyyy-MM-dd''T''HH:mm:ss''Z''')", dialect.ConvertToDateTruncSpecifier(dim.TimeGrain), col), []any{dim.TimeZone}, nil default: return "", nil, fmt.Errorf("unsupported dialect %q", dialect) } diff --git a/runtime/queries/metricsview_comparison_toplist.go b/runtime/queries/metricsview_comparison_toplist.go index 8cc83550460..aac3d8c979e 100644 --- a/runtime/queries/metricsview_comparison_toplist.go +++ b/runtime/queries/metricsview_comparison_toplist.go @@ -92,7 +92,7 @@ func (q *MetricsViewComparison) Resolve(ctx context.Context, rt *runtime.Runtime } defer release() - if olap.Dialect() != drivers.DialectDuckDB && olap.Dialect() != drivers.DialectDruid && olap.Dialect() != drivers.DialectClickHouse { + if olap.Dialect() != drivers.DialectDuckDB && olap.Dialect() != drivers.DialectDruid && olap.Dialect() != drivers.DialectClickHouse && olap.Dialect() != drivers.DialectPinot { return fmt.Errorf("not available for dialect '%s'", olap.Dialect()) } @@ -649,14 +649,14 @@ func (q *MetricsViewComparison) buildMetricsComparisonTopListSQL(mv *runtimev1.M if dialect != drivers.DialectDruid { if q.measuresMeta[m.Name].expand { columnsTuple = fmt.Sprintf( - "base.%[1]s AS %[1]s, comparison.%[1]s AS %[2]s, base.%[1]s - comparison.%[1]s AS %[3]s, (base.%[1]s - comparison.%[1]s)/comparison.%[1]s::DOUBLE AS %[4]s", + "base.%[1]s AS %[1]s, comparison.%[1]s AS %[2]s, base.%[1]s - comparison.%[1]s AS %[3]s, CAST((base.%[1]s - comparison.%[1]s) AS DOUBLE)/comparison.%[1]s AS %[4]s", safeName(m.Name), safeName(m.Name+"__previous"), safeName(m.Name+"__delta_abs"), safeName(m.Name+"__delta_rel"), ) labelTuple = fmt.Sprintf( - "base.%[1]s AS %[5]s, comparison.%[1]s AS %[2]s, base.%[1]s - comparison.%[1]s AS %[3]s, (base.%[1]s - comparison.%[1]s)/comparison.%[1]s::DOUBLE AS %[4]s", + "base.%[1]s AS %[5]s, comparison.%[1]s AS %[2]s, base.%[1]s - comparison.%[1]s AS %[3]s, CAST((base.%[1]s - comparison.%[1]s) AS DOUBLE)/comparison.%[1]s AS %[4]s", safeName(m.Name), safeName(labelMap[m.Name]+" (prev)"), safeName(labelMap[m.Name]+" (Δ)"), @@ -1126,11 +1126,7 @@ func (q *MetricsViewComparison) Export(ctx context.Context, rt *runtime.Runtime, return err } } - case drivers.DialectDruid: - if err := q.generalExport(ctx, rt, instanceID, w, opts, mv); err != nil { - return err - } - case drivers.DialectClickHouse: + case drivers.DialectDruid, drivers.DialectClickHouse, drivers.DialectPinot: if err := q.generalExport(ctx, rt, instanceID, w, opts, mv); err != nil { return err } diff --git a/runtime/queries/metricsview_rows.go b/runtime/queries/metricsview_rows.go index 539e629ed76..13651f1d625 100644 --- a/runtime/queries/metricsview_rows.go +++ b/runtime/queries/metricsview_rows.go @@ -72,7 +72,7 @@ func (q *MetricsViewRows) Resolve(ctx context.Context, rt *runtime.Runtime, inst } defer release() - if olap.Dialect() != drivers.DialectDuckDB && olap.Dialect() != drivers.DialectDruid && olap.Dialect() != drivers.DialectClickHouse { + if olap.Dialect() != drivers.DialectDuckDB && olap.Dialect() != drivers.DialectDruid && olap.Dialect() != drivers.DialectClickHouse && olap.Dialect() != drivers.DialectPinot { return fmt.Errorf("not available for dialect '%s'", olap.Dialect()) } @@ -152,11 +152,7 @@ func (q *MetricsViewRows) Export(ctx context.Context, rt *runtime.Runtime, insta return err } } - case drivers.DialectDruid: - if err := q.generalExport(ctx, rt, instanceID, w, opts, q.MetricsView); err != nil { - return err - } - case drivers.DialectClickHouse: + case drivers.DialectDruid, drivers.DialectClickHouse, drivers.DialectPinot: if err := q.generalExport(ctx, rt, instanceID, w, opts, q.MetricsView); err != nil { return err } diff --git a/runtime/queries/metricsview_time_range.go b/runtime/queries/metricsview_time_range.go index 2fab3a87e6d..d97e103c34c 100644 --- a/runtime/queries/metricsview_time_range.go +++ b/runtime/queries/metricsview_time_range.go @@ -77,7 +77,9 @@ func (q *MetricsViewTimeRange) Resolve(ctx context.Context, rt *runtime.Runtime, case drivers.DialectDruid: return q.resolveDruid(ctx, olap, q.MetricsView.TimeDimension, escapeMetricsViewTable(drivers.DialectDruid, q.MetricsView), policyFilter, priority) case drivers.DialectClickHouse: - return q.resolveClickHouse(ctx, olap, q.MetricsView.TimeDimension, escapeMetricsViewTable(drivers.DialectClickHouse, q.MetricsView), policyFilter, priority) + return q.resolveClickHouseAndPinot(ctx, olap, q.MetricsView.TimeDimension, escapeMetricsViewTable(drivers.DialectClickHouse, q.MetricsView), policyFilter, priority) + case drivers.DialectPinot: + return q.resolveClickHouseAndPinot(ctx, olap, q.MetricsView.TimeDimension, escapeMetricsViewTable(drivers.DialectPinot, q.MetricsView), policyFilter, priority) default: return fmt.Errorf("not available for dialect '%s'", olap.Dialect()) } @@ -232,7 +234,7 @@ func (q *MetricsViewTimeRange) resolveDruid(ctx context.Context, olap drivers.OL return nil } -func (q *MetricsViewTimeRange) resolveClickHouse(ctx context.Context, olap drivers.OLAPStore, timeDim, escapedTableName, filter string, priority int) error { +func (q *MetricsViewTimeRange) resolveClickHouseAndPinot(ctx context.Context, olap drivers.OLAPStore, timeDim, escapedTableName, filter string, priority int) error { if filter != "" { filter = fmt.Sprintf(" WHERE %s", filter) } diff --git a/runtime/queries/metricsview_timeseries.go b/runtime/queries/metricsview_timeseries.go index 8702a792eac..bc2bf51051a 100644 --- a/runtime/queries/metricsview_timeseries.go +++ b/runtime/queries/metricsview_timeseries.go @@ -169,6 +169,11 @@ func (q *MetricsViewTimeSeries) Resolve(ctx context.Context, rt *runtime.Runtime if v != nil { t = *v } + case int64: + if olap.Dialect() != drivers.DialectPinot { + panic(fmt.Sprintf("unexpected type for timestamp column: %T", v)) + } + t = time.UnixMilli(v) default: panic(fmt.Sprintf("unexpected type for timestamp column: %T", v)) } @@ -347,6 +352,9 @@ func (q *MetricsViewTimeSeries) buildMetricsTimeseriesSQL(olap drivers.OLAPStore case drivers.DialectDruid: args = append([]any{timezone}, args...) sql = q.buildDruidSQL(mv, tsAlias, selectCols, whereClause, havingClause) + case drivers.DialectPinot: + args = append([]any{timezone}, args...) + sql = q.buildPinotSQL(mv, tsAlias, selectCols, whereClause, havingClause) case drivers.DialectClickHouse: sql = q.buildClickHouseSQL(mv, tsAlias, selectCols, whereClause, havingClause, timezone) default: @@ -356,6 +364,25 @@ func (q *MetricsViewTimeSeries) buildMetricsTimeseriesSQL(olap drivers.OLAPStore return sql, tsAlias, args, nil } +func (q *MetricsViewTimeSeries) buildPinotSQL(mv *runtimev1.MetricsViewSpec, tsAlias string, selectCols []string, whereClause, havingClause string) string { + dateTruncSpecifier := drivers.DialectPinot.ConvertToDateTruncSpecifier(q.TimeGranularity) + + // TODO: handle shift, currently we add validation error for this, see runtime/validate.go + + timeClause := fmt.Sprintf("DATETRUNC('%s', %s,'MILLISECONDS', ?)", dateTruncSpecifier, safeName(mv.TimeDimension)) + sql := fmt.Sprintf( + `SELECT %s AS %s, %s FROM %s WHERE %s GROUP BY 1 %s ORDER BY 1`, + timeClause, + tsAlias, + strings.Join(selectCols, ", "), + safeName(mv.Table), + whereClause, + havingClause, + ) + + return sql +} + func (q *MetricsViewTimeSeries) buildDruidSQL(mv *runtimev1.MetricsViewSpec, tsAlias string, selectCols []string, whereClause, havingClause string) string { tsSpecifier := convertToDruidTimeFloorSpecifier(q.TimeGranularity) diff --git a/runtime/queries/metricsview_toplist.go b/runtime/queries/metricsview_toplist.go index 717539451a4..46303168362 100644 --- a/runtime/queries/metricsview_toplist.go +++ b/runtime/queries/metricsview_toplist.go @@ -73,7 +73,7 @@ func (q *MetricsViewToplist) Resolve(ctx context.Context, rt *runtime.Runtime, i } defer release() - if olap.Dialect() != drivers.DialectDuckDB && olap.Dialect() != drivers.DialectDruid && olap.Dialect() != drivers.DialectClickHouse { + if olap.Dialect() != drivers.DialectDuckDB && olap.Dialect() != drivers.DialectDruid && olap.Dialect() != drivers.DialectClickHouse && olap.Dialect() != drivers.DialectPinot { return fmt.Errorf("not available for dialect '%s'", olap.Dialect()) } @@ -137,11 +137,7 @@ func (q *MetricsViewToplist) Export(ctx context.Context, rt *runtime.Runtime, in return err } } - case drivers.DialectDruid: - if err := q.generalExport(ctx, rt, instanceID, w, opts); err != nil { - return err - } - case drivers.DialectClickHouse: + case drivers.DialectDruid, drivers.DialectClickHouse, drivers.DialectPinot: if err := q.generalExport(ctx, rt, instanceID, w, opts); err != nil { return err } diff --git a/runtime/queries/metricsview_totals.go b/runtime/queries/metricsview_totals.go index e1cc066d512..9993766f23f 100644 --- a/runtime/queries/metricsview_totals.go +++ b/runtime/queries/metricsview_totals.go @@ -69,7 +69,7 @@ func (q *MetricsViewTotals) Resolve(ctx context.Context, rt *runtime.Runtime, in } defer release() - if olap.Dialect() != drivers.DialectDuckDB && olap.Dialect() != drivers.DialectDruid && olap.Dialect() != drivers.DialectClickHouse { + if olap.Dialect() != drivers.DialectDuckDB && olap.Dialect() != drivers.DialectDruid && olap.Dialect() != drivers.DialectClickHouse && olap.Dialect() != drivers.DialectPinot { return fmt.Errorf("not available for dialect '%s'", olap.Dialect()) } diff --git a/runtime/queries/table_columns.go b/runtime/queries/table_columns.go index 4798ab3235a..acce7af2405 100644 --- a/runtime/queries/table_columns.go +++ b/runtime/queries/table_columns.go @@ -123,7 +123,7 @@ func (q *TableColumns) Resolve(ctx context.Context, rt *runtime.Runtime, instanc } return nil }) - case drivers.DialectClickHouse, drivers.DialectDruid: + case drivers.DialectClickHouse, drivers.DialectDruid, drivers.DialectPinot: tbl, err := olap.InformationSchema().Lookup(ctx, q.Database, q.DatabaseSchema, q.TableName) if err != nil { return err diff --git a/runtime/queries/table_head.go b/runtime/queries/table_head.go index ee02c04a375..3a08da5af49 100644 --- a/runtime/queries/table_head.go +++ b/runtime/queries/table_head.go @@ -64,7 +64,7 @@ func (q *TableHead) Resolve(ctx context.Context, rt *runtime.Runtime, instanceID } defer release() - if olap.Dialect() != drivers.DialectDuckDB && olap.Dialect() != drivers.DialectClickHouse && olap.Dialect() != drivers.DialectDruid { + if olap.Dialect() != drivers.DialectDuckDB && olap.Dialect() != drivers.DialectClickHouse && olap.Dialect() != drivers.DialectDruid && olap.Dialect() != drivers.DialectPinot { return fmt.Errorf("not available for dialect '%s'", olap.Dialect()) } diff --git a/runtime/validate.go b/runtime/validate.go index 7b4787eff86..89dbafbeec0 100644 --- a/runtime/validate.go +++ b/runtime/validate.go @@ -98,6 +98,11 @@ func (r *Runtime) ValidateMetricsView(ctx context.Context, instanceID string, mv validateIndividualDimensionsAndMeasures(ctx, olap, t, mv, fields, res) } + // Pinot does have any native support for time shift using time grain specifiers + if olap.Dialect() == drivers.DialectPinot && (mv.FirstDayOfWeek > 1 || mv.FirstMonthOfYear > 1) { + res.OtherErrs = append(res.OtherErrs, fmt.Errorf("time shift not supported for Pinot dialect, so FirstDayOfWeek and FirstMonthOfYear should be 1")) + } + // Check the default theme exists if mv.DefaultTheme != "" { _, err := ctrl.Get(ctx, &runtimev1.ResourceName{Kind: ResourceKindTheme, Name: mv.DefaultTheme}, false) diff --git a/web-common/src/features/tables/TableMenuItems.svelte b/web-common/src/features/tables/TableMenuItems.svelte index 31e40de959e..84d291ab7f2 100644 --- a/web-common/src/features/tables/TableMenuItems.svelte +++ b/web-common/src/features/tables/TableMenuItems.svelte @@ -21,7 +21,7 @@ export let connector: string; export let database: string = ""; - export let databaseSchema: string; + export let databaseSchema: string = ""; export let table: string; const queryClient = useQueryClient(); diff --git a/web-common/src/features/tables/TablePreviewWorkspace.svelte b/web-common/src/features/tables/TablePreviewWorkspace.svelte index a26714b9204..87c7623935d 100644 --- a/web-common/src/features/tables/TablePreviewWorkspace.svelte +++ b/web-common/src/features/tables/TablePreviewWorkspace.svelte @@ -5,7 +5,7 @@ export let connector: string; export let database: string = ""; - export let databaseSchema: string; + export let databaseSchema: string = ""; export let table: string; diff --git a/web-common/src/features/tables/olap-config.ts b/web-common/src/features/tables/olap-config.ts index 01af70e3a89..25023e795ab 100644 --- a/web-common/src/features/tables/olap-config.ts +++ b/web-common/src/features/tables/olap-config.ts @@ -15,6 +15,8 @@ export function makeFullyQualifiedTableName( // return `${database}.${databaseSchema}.${table}`; // For now, only show the table name return table; + case "pinot": + return table; default: throw new Error(`Unsupported OLAP connector: ${connector}`); } @@ -33,6 +35,8 @@ export function makeTablePreviewHref( return `/connector/druid/${databaseSchema}/${table}`; case "duckdb": return `/connector/duckdb/${database}/${databaseSchema}/${table}`; + case "pinot": + return `/connector/pinot/${table}`; default: throw new Error(`Unsupported connector: ${connector}`); } diff --git a/web-local/src/routes/(application)/connector/pinot/+page.ts b/web-local/src/routes/(application)/connector/pinot/+page.ts new file mode 100644 index 00000000000..734f8c17c7b --- /dev/null +++ b/web-local/src/routes/(application)/connector/pinot/+page.ts @@ -0,0 +1,5 @@ +import { redirect } from "@sveltejs/kit"; + +export function load() { + throw redirect(307, `/`); +} diff --git a/web-local/src/routes/(application)/connector/pinot/[table]/+page.svelte b/web-local/src/routes/(application)/connector/pinot/[table]/+page.svelte new file mode 100644 index 00000000000..4e6f0115ada --- /dev/null +++ b/web-local/src/routes/(application)/connector/pinot/[table]/+page.svelte @@ -0,0 +1,23 @@ + + + + Rill Developer | {table} + + +