Skip to content
This repository has been archived by the owner on Dec 7, 2024. It is now read-only.

Add support for xt db basis #56

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions gxtdb-rs/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ impl GrpcApi for ServerMock {
) -> Result<tonic::Response<proto_api::EntityResponse>, tonic::Status> {
todo!()
}

async fn db_basis(
&self,
_request: tonic::Request<proto_api::DbBasisRequest>,
) -> Result<tonic::Response<proto_api::DbBasisResponse>, tonic::Status> {
todo!()
}
}

pub async fn client() -> gxtdb_rs::Client {
Expand Down
16 changes: 16 additions & 0 deletions resources/dbbasis.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
syntax = "proto3";

package com.xtdb.protos;
import "common.proto";

message DbBasisRequest {
OptionInt64 tx_id = 1;
OptionDatetime valid_time = 2;
OptionDatetime tx_time = 3;
}

message DbBasisResponse {
string xt_id = 1;
string valid_time = 2;
string xt_tx = 3;
}
4 changes: 3 additions & 1 deletion resources/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package com.xtdb.protos;
import "transactions.proto";
import "entity.proto";
import "common.proto";

import "dbbasis.proto";

message StatusResponse {
string version = 1;
Expand All @@ -27,4 +27,6 @@ service GrpcApi {
returns (com.xtdb.protos.EntityTxResponse);
rpc Entity(com.xtdb.protos.EntityRequest)
returns (com.xtdb.protos.EntityResponse);
rpc DbBasis(com.xtdb.protos.DbBasisRequest)
returns(com.xtdb.protos.DbBasisResponse);
}
95 changes: 95 additions & 0 deletions src/com/xtdb/protos.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
;;----------------------------------------------------------------------------------
;;----------------------------------------------------------------------------------

(declare cis->DbBasisRequest)
(declare ecis->DbBasisRequest)
(declare new-DbBasisRequest)
(declare cis->Empty)
(declare ecis->Empty)
(declare new-Empty)
Expand All @@ -41,6 +44,9 @@
(declare cis->SpeculativeTxResponse)
(declare ecis->SpeculativeTxResponse)
(declare new-SpeculativeTxResponse)
(declare cis->DbBasisResponse)
(declare ecis->DbBasisResponse)
(declare new-DbBasisResponse)
(declare cis->Delete)
(declare ecis->Delete)
(declare new-Delete)
Expand Down Expand Up @@ -197,6 +203,51 @@
;;----------------------------------------------------------------------------------
;;----------------------------------------------------------------------------------

;-----------------------------------------------------------------------------
; DbBasisRequest
;-----------------------------------------------------------------------------
(defrecord DbBasisRequest-record [tx-id valid-time tx-time]
pb/Writer
(serialize [this os]
(serdes.core/write-embedded 1 (:tx-id this) os)
(serdes.core/write-embedded 2 (:valid-time this) os)
(serdes.core/write-embedded 3 (:tx-time this) os))
pb/TypeReflection
(gettype [this]
"com.xtdb.protos.DbBasisRequest"))

(s/def ::DbBasisRequest-spec (s/keys :opt-un []))
(def DbBasisRequest-defaults {})

(defn cis->DbBasisRequest
"CodedInputStream to DbBasisRequest"
[is]
(map->DbBasisRequest-record (tag-map DbBasisRequest-defaults (fn [tag index] (case index 1 [:tx-id (ecis->OptionInt64 is)] 2 [:valid-time (ecis->OptionDatetime is)] 3 [:tx-time (ecis->OptionDatetime is)] [index (serdes.core/cis->undefined tag is)])) is)))

(defn ecis->DbBasisRequest
"Embedded CodedInputStream to DbBasisRequest"
[is]
(serdes.core/cis->embedded cis->DbBasisRequest is))

(defn new-DbBasisRequest
"Creates a new instance from a map, similar to map->DbBasisRequest except that
it properly accounts for nested messages, when applicable.
"
[init]
{:pre [(if (s/valid? ::DbBasisRequest-spec init) true (throw (ex-info "Invalid input" (s/explain-data ::DbBasisRequest-spec init))))]}
(-> (merge DbBasisRequest-defaults init)
(cond-> (some? (get init :tx-id)) (update :tx-id new-OptionInt64))
(cond-> (some? (get init :valid-time)) (update :valid-time new-OptionDatetime))
(cond-> (some? (get init :tx-time)) (update :tx-time new-OptionDatetime))
(map->DbBasisRequest-record)))

(defn pb->DbBasisRequest
"Protobuf to DbBasisRequest"
[input]
(cis->DbBasisRequest (serdes.stream/new-cis input)))

(def ^:protojure.protobuf.any/record DbBasisRequest-meta {:type "com.xtdb.protos.DbBasisRequest" :decoder pb->DbBasisRequest})

;-----------------------------------------------------------------------------
; Empty
;-----------------------------------------------------------------------------
Expand Down Expand Up @@ -524,6 +575,50 @@

(def ^:protojure.protobuf.any/record SpeculativeTxResponse-meta {:type "com.xtdb.protos.SpeculativeTxResponse" :decoder pb->SpeculativeTxResponse})

;-----------------------------------------------------------------------------
; DbBasisResponse
;-----------------------------------------------------------------------------
(defrecord DbBasisResponse-record [xt-id valid-time xt-tx]
pb/Writer
(serialize [this os]
(serdes.core/write-String 1 {:optimize true} (:xt-id this) os)
(serdes.core/write-String 2 {:optimize true} (:valid-time this) os)
(serdes.core/write-String 3 {:optimize true} (:xt-tx this) os))
pb/TypeReflection
(gettype [this]
"com.xtdb.protos.DbBasisResponse"))

(s/def :com.xtdb.protos.DbBasisResponse/xt-id string?)
(s/def :com.xtdb.protos.DbBasisResponse/valid-time string?)
(s/def :com.xtdb.protos.DbBasisResponse/xt-tx string?)
(s/def ::DbBasisResponse-spec (s/keys :opt-un [:com.xtdb.protos.DbBasisResponse/xt-id :com.xtdb.protos.DbBasisResponse/valid-time :com.xtdb.protos.DbBasisResponse/xt-tx]))
(def DbBasisResponse-defaults {:xt-id "" :valid-time "" :xt-tx ""})

(defn cis->DbBasisResponse
"CodedInputStream to DbBasisResponse"
[is]
(map->DbBasisResponse-record (tag-map DbBasisResponse-defaults (fn [tag index] (case index 1 [:xt-id (serdes.core/cis->String is)] 2 [:valid-time (serdes.core/cis->String is)] 3 [:xt-tx (serdes.core/cis->String is)] [index (serdes.core/cis->undefined tag is)])) is)))

(defn ecis->DbBasisResponse
"Embedded CodedInputStream to DbBasisResponse"
[is]
(serdes.core/cis->embedded cis->DbBasisResponse is))

(defn new-DbBasisResponse
"Creates a new instance from a map, similar to map->DbBasisResponse except that
it properly accounts for nested messages, when applicable.
"
[init]
{:pre [(if (s/valid? ::DbBasisResponse-spec init) true (throw (ex-info "Invalid input" (s/explain-data ::DbBasisResponse-spec init))))]}
(map->DbBasisResponse-record (merge DbBasisResponse-defaults init)))

(defn pb->DbBasisResponse
"Protobuf to DbBasisResponse"
[input]
(cis->DbBasisResponse (serdes.stream/new-cis input)))

(def ^:protojure.protobuf.any/record DbBasisResponse-meta {:type "com.xtdb.protos.DbBasisResponse" :decoder pb->DbBasisResponse})

;-----------------------------------------------------------------------------
; Delete
;-----------------------------------------------------------------------------
Expand Down
12 changes: 12 additions & 0 deletions src/com/xtdb/protos/GrpcApi/client.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,15 @@
:metadata metadata}]
(p/then (send-unary-params input params) (fn [_] (invoke-unary client desc output))))))

(defn DbBasis
([client params] (DbBasis client {} params))
([client metadata params]
(let [input (async/chan 1)
output (async/chan 1)
desc {:service "com.xtdb.protos.GrpcApi"
:method "DbBasis"
:input {:f com.xtdb.protos/new-DbBasisRequest :ch input}
:output {:f com.xtdb.protos/pb->DbBasisResponse :ch output}
:metadata metadata}]
(p/then (send-unary-params input params) (fn [_] (invoke-unary client desc output))))))

9 changes: 7 additions & 2 deletions src/com/xtdb/protos/GrpcApi/server.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
(SubmitTx [this param])
(SpeculativeTx [this param])
(EntityTx [this param])
(Entity [this param]))
(Entity [this param])
(DbBasis [this param]))

(def GrpcApi-service-name "com.xtdb.protos.GrpcApi")

Expand All @@ -35,10 +36,14 @@
(defn- Entity-dispatch
[ctx request]
(Entity ctx request))
(defn- DbBasis-dispatch
[ctx request]
(DbBasis ctx request))

(def ^:const rpc-metadata
[{:pkg "com.xtdb.protos" :service "GrpcApi" :method "Status" :method-fn Status-dispatch :server-streaming false :client-streaming false :input pb->Empty :output new-StatusResponse}
{:pkg "com.xtdb.protos" :service "GrpcApi" :method "SubmitTx" :method-fn SubmitTx-dispatch :server-streaming false :client-streaming false :input pb->SubmitRequest :output new-SubmitResponse}
{:pkg "com.xtdb.protos" :service "GrpcApi" :method "SpeculativeTx" :method-fn SpeculativeTx-dispatch :server-streaming false :client-streaming false :input pb->SpeculativeTxRequest :output new-SpeculativeTxResponse}
{:pkg "com.xtdb.protos" :service "GrpcApi" :method "EntityTx" :method-fn EntityTx-dispatch :server-streaming false :client-streaming false :input pb->EntityRequest :output new-EntityTxResponse}
{:pkg "com.xtdb.protos" :service "GrpcApi" :method "Entity" :method-fn Entity-dispatch :server-streaming false :client-streaming false :input pb->EntityRequest :output new-EntityResponse}])
{:pkg "com.xtdb.protos" :service "GrpcApi" :method "Entity" :method-fn Entity-dispatch :server-streaming false :client-streaming false :input pb->EntityRequest :output new-EntityResponse}
{:pkg "com.xtdb.protos" :service "GrpcApi" :method "DbBasis" :method-fn DbBasis-dispatch :server-streaming false :client-streaming false :input pb->DbBasisRequest :output new-DbBasisResponse}])
21 changes: 5 additions & 16 deletions src/gxtdb/adapters/db.clj
Original file line number Diff line number Diff line change
@@ -1,18 +1,7 @@
(ns gxtdb.adapters.db
(:require [gxtdb.adapters.tx-time :refer [->clj-time]]
[gxtdb.utils :refer [not-nil?]]
[xtdb.api :as xt]))

(defn ->db-basis [params]
(let [valid-time (some-> params :valid-time ->clj-time)
tx-time (some-> params :tx-time ->clj-time)
tx-id (some-> params :tx-id :value :some)]
(case [(not-nil? valid-time) (not-nil? tx-time) (not-nil? tx-id)]
[true true true] {::xt/valid-time valid-time}
[true false false] {::xt/valid-time valid-time}
[true true false] {::xt/valid-time valid-time ::xt/tx-time tx-time}
[true false true] {::xt/valid-time valid-time ::xt/tx tx-id}
[false true false] {::xt/tx-time tx-time}
[false false true] {::xt/tx tx-id}
nil)))
(:require [gxtdb.utils :as utils]))

(defn db-basis->proto [db-basis-response]
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change broke stuff, I would probably test it with duplication for now

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, db-basis->proto and ->db-basis are different things and had different usages

{:xt-id (-> db-basis-response :xt/id str)
:valid-time (-> db-basis-response :xtdb.api/valid-time utils/->inst-str)
:tx-time (-> db-basis-response :xtdb.api/tx-time utils/->inst-str)})
6 changes: 3 additions & 3 deletions src/gxtdb/controllers.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns gxtdb.controllers
(:require [clojure.pprint :as pprint]
[gxtdb.adapters.db :refer [->db-basis]]
[gxtdb.adapters.db :as adapters.db]
[gxtdb.adapters.entity :as adapters.entity]
[gxtdb.adapters.tx-log :as tx-log-adapter]
[gxtdb.logic.time :refer [assoc-some-time]]
Expand All @@ -19,12 +19,12 @@

(defn entity-tx [xtdb-node params]
(let [id (->id (:id-type params) (:entity-id params))
db-basis (->db-basis params)
db-basis (adapters.db/db-basis->proto params)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it breaks here

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep the old one

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would checkout src/gxtdb and start fresh

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok! I reverted ->db-basis, and now the tests are passing. Now, going to check src/gxtdb

db (open-db xtdb-node (:open-snapshot params) db-basis)]
(->> id (xt/entity-tx db) adapters.entity/entity-tx->proto)))

(defn entity [xtdb-node params]
(let [id (->id (:id-type params) (:entity-id params))
db-basis (->db-basis params)
db-basis (adapters.db/db-basis->proto params)
db (open-db xtdb-node (:open-snapshot params) db-basis)]
(->> id (xt/entity db) adapters.entity/entity->proto)))
16 changes: 8 additions & 8 deletions test/gxtdb/adapters/db_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns gxtdb.adapters.db-test
(:require [clojure.test :refer [deftest testing is]]
[xtdb.api :as xt]
[gxtdb.adapters.db :as db]))
[gxtdb.adapters.db :refer [db-basis->proto]]))

(def all-none-input
{:valid-time {:value {:none {}}}
Expand All @@ -23,16 +23,16 @@
:tx-time {:value {:some "2023-06-12T21:32:44.717-05:00"}}
:tx-id {:value {:some 3}}})

(deftest ->db-basis-test
(deftest db-basis->proto-test
(testing "When all fields are none"
(is (= nil (db/->db-basis all-none-input))))
(is (= nil (-> db-basis->proto all-none-input))))
(testing "When all tx inputs are some"
(is (= nil (db/->db-basis tx-some-inputs))))
(is (= nil (-> db-basis->proto tx-some-inputs))))
(testing "When only tx-id input is some"
(is (= {::xt/tx 3} (db/->db-basis {:tx-id {:value {:some 3}}}))))
(is (= (:tx_id 3) (-> db-basis->proto {:tx-id {:value {:some 3}}}))))
(testing "When only tx-time input is some"
(is (= {::xt/tx-time #inst "2023-06-13T02:32:44.717-00:00"} (db/->db-basis {:tx-time {:value {:some "2023-06-12T21:32:44.717-05:00"}}}))))
(is (= (:tx-time #inst "2023-06-13T02:32:44.717-00:00") (-> db-basis->proto {:tx-time {:value {:some "2023-06-12T21:32:44.717-05:00"}}}))))
(testing "When only valid-time is some"
(is (= {::xt/valid-time #inst "2023-06-13T02:32:44.717-00:00"} (db/->db-basis valid-time-input))))
(is (= (:valid-time #inst "2023-06-13T02:32:44.717-00:00") (-> db-basis->proto valid-time-input))))
(testing "When all fields are some"
(is (= {::xt/valid-time #inst "2023-06-13T02:32:44.717-00:00"} (db/->db-basis all-some-input)))))
(is (= (:valid-time #inst "2023-06-13T02:32:44.717-00:00") (-> db-basis->proto all-some-input)))))