Skip to content

Commit

Permalink
Merge pull request #86 from GoogleCloudPlatform/5789B3FF9E750C15D6B74…
Browse files Browse the repository at this point in the history
…524C048BD98

Project import generated by Copybara.
  • Loading branch information
gauravsnj authored Nov 9, 2022
2 parents 1ce6d3b + bba8a59 commit d3ce8c2
Show file tree
Hide file tree
Showing 66 changed files with 1,097 additions and 369 deletions.
12 changes: 12 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
bazel-*
.git
.gitignore
*.md
LICENSE
.dockerignore
build/docker/
build/gcb/
build/kokoro/
copybara
METADATA
OWNERS*
8 changes: 4 additions & 4 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ http_archive(

load("@io_bazel_rules_go//go:deps.bzl", "go_register_toolchains", "go_rules_dependencies")

go_register_toolchains(version = "1.19.1")
go_register_toolchains(version = "1.19.2")

http_archive(
name = "bazel_gazelle",
Expand Down Expand Up @@ -278,12 +278,12 @@ http_archive(

http_archive(
name = "com_google_zetasql",
url = "https://github.com/google/zetasql/archive/2022.08.1.tar.gz",
strip_prefix = "zetasql-2022.08.1",
url = "https://github.com/google/zetasql/archive/177d495a064e38684c462cf883e22428273bd996.tar.gz",
strip_prefix = "zetasql-177d495a064e38684c462cf883e22428273bd996",
# Patches applied:
# - Give visibility to ZetaSQL's base library to reuse some utilities
patches = ["//build/bazel:zetasql.patch"],
sha256 = "4c9611fa2fc2bde0e7877ff36fa3ebc0400477a2fe86589025d49a06897e5296",
sha256 = "4092dce28d3fb5b0071d0268bcb3ba13e28eb4f981c5c267688b8c3590ca7705",
)

http_archive(
Expand Down
File renamed without changes.
5 changes: 1 addition & 4 deletions backend/database/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ cc_library(
"//backend/schema/updater:scoped_schema_change_lock",
"//backend/storage",
"//backend/storage:in_memory_storage",
"//backend/transaction:actions",
"//backend/transaction:read_only_transaction",
"//backend/transaction:read_write_transaction",
"//common:clock",
Expand All @@ -46,7 +45,6 @@ cc_library(
"@com_google_absl//absl/status:statusor",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/time",
"@com_google_absl//absl/types:span",
"@com_google_absl//absl/types:variant",
"@com_google_zetasql//zetasql/public:type",
],
Expand All @@ -60,10 +58,9 @@ cc_test(
deps = [
":database",
"//backend/access:read",
"//backend/common:ids",
"//backend/datamodel:key_set",
"//backend/schema/updater:schema_updater",
"//backend/transaction:read_only_transaction",
"//backend/transaction:read_write_transaction",
"//common:clock",
"//common:errors",
"//tests/common:proto_matchers",
Expand Down
26 changes: 11 additions & 15 deletions backend/database/database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,17 @@

#include "absl/memory/memory.h"
#include "absl/status/statusor.h"
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "absl/types/variant.h"
#include "backend/actions/manager.h"
#include "backend/common/ids.h"
#include "backend/locking/manager.h"
#include "backend/locking/request.h"
#include "backend/query/query_engine.h"
#include "backend/schema/catalog/versioned_catalog.h"
#include "backend/schema/updater/schema_updater.h"
#include "backend/schema/updater/scoped_schema_change_lock.h"
#include "backend/storage/in_memory_storage.h"
#include "backend/transaction/actions.h"
#include "backend/transaction/options.h"
#include "absl/status/status.h"
#include "zetasql/base/status_macros.h"
Expand All @@ -53,7 +48,7 @@ namespace backend {
Database::Database() : transaction_id_generator_(1) {}

absl::StatusOr<std::unique_ptr<Database>> Database::Create(
Clock* clock, const std::vector<std::string>& create_statements) {
Clock* clock, const SchemaChangeOperation& schema_change_operation) {
auto database = absl::WrapUnique(new Database());
database->clock_ = clock;
database->storage_ = std::make_unique<InMemoryStorage>();
Expand All @@ -63,13 +58,13 @@ absl::StatusOr<std::unique_ptr<Database>> Database::Create(
std::make_unique<QueryEngine>(database->type_factory_.get());
database->action_manager_ = std::make_unique<ActionManager>();

if (create_statements.empty()) {
if (schema_change_operation.statements.empty()) {
database->versioned_catalog_ = std::make_unique<VersionedCatalog>();
} else {
SchemaUpdater updater;
ZETASQL_ASSIGN_OR_RETURN(
std::unique_ptr<const Schema> schema,
updater.CreateSchemaFromDDL(create_statements,
updater.CreateSchemaFromDDL(schema_change_operation,
database->GetSchemaChangeContext()));
database->versioned_catalog_ =
std::make_unique<VersionedCatalog>(std::move(schema));
Expand Down Expand Up @@ -107,11 +102,11 @@ SchemaChangeContext Database::GetSchemaChangeContext() {
};
}

absl::Status Database::UpdateSchema(absl::Span<const std::string> statements,
int* num_succesful_statements,
absl::Time* commit_timestamp,
absl::Status* backfill_status) {
if (statements.empty()) {
absl::Status Database::UpdateSchema(
const SchemaChangeOperation& schema_change_operation,
int* num_succesful_statements, absl::Time* commit_timestamp,
absl::Status* backfill_status) {
if (schema_change_operation.statements.empty()) {
return error::UpdateDatabaseMissingStatements();
}

Expand All @@ -130,8 +125,9 @@ absl::Status Database::UpdateSchema(absl::Span<const std::string> statements,
context.schema_change_timestamp = update_timestamp;
const Schema* existing_schema = versioned_catalog_->GetLatestSchema();
SchemaUpdater updater;
ZETASQL_ASSIGN_OR_RETURN(auto result, updater.UpdateSchemaFromDDL(
existing_schema, statements, context));
ZETASQL_ASSIGN_OR_RETURN(auto result,
updater.UpdateSchemaFromDDL(
existing_schema, schema_change_operation, context));
*commit_timestamp = update_timestamp;
*num_succesful_statements = result.num_successful_statements;
*backfill_status = result.backfill_status;
Expand Down
20 changes: 10 additions & 10 deletions backend/database/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
#include "absl/types/span.h"
#include "absl/types/variant.h"
#include "backend/actions/manager.h"
#include "backend/common/ids.h"
Expand Down Expand Up @@ -57,7 +56,7 @@ class Database {
// create_statements. Returns an error if create_statements are invalid, or if
// failed to create the database.
static absl::StatusOr<std::unique_ptr<Database>> Create(
Clock* clock, const std::vector<std::string>& create_statements);
Clock* clock, const SchemaChangeOperation& schema_change_operation);

// Creates a read only transaction attached to this database.
absl::StatusOr<std::unique_ptr<ReadOnlyTransaction>>
Expand All @@ -75,15 +74,16 @@ class Database {
// incoming schema change requests will be rejected with a FAILED_PRECONDITION
// error.
//
// DDL statements in `statements` are applied one-by-one until they either all
// succeed or the first failure is encoutered.
// DDL statements in `schema_change_operation.statements` are applied
// one-by-one until they either all succeed or the first failure is
// encoutered.
//
// On return `num_successful_statements` will contain the number of
// successfully applied DDL statements and `commit_timestamp` will contain the
// timestamp at which they were applied.
//
// If all the statements in `statements` are applied succesfully, both
// `backfill_status` and the returned status will be set to
// If all the statements in `schema_change_operation.statements` are applied
// succesfully, both `backfill_status` and the returned status will be set to
// absl::OkStatus().
//
// If all the statements are semantically valid then the return status will
Expand All @@ -96,10 +96,10 @@ class Database {
// encountered while processing the backfill/verification actions for the
// statements, then the first such error will be returned in
// `backfill_status`.
absl::Status UpdateSchema(absl::Span<const std::string> statements,
int* num_succesful_statements,
absl::Time* commit_timestamp,
absl::Status* backfill_status);
absl::Status UpdateSchema(
const SchemaChangeOperation& schema_change_operation,
int* num_succesful_statements, absl::Time* commit_timestamp,
absl::Status* backfill_status);

// Retrives the current version of the schema.
const Schema* GetLatestSchema() const;
Expand Down
82 changes: 49 additions & 33 deletions backend/database/database_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
#include "tests/common/proto_matchers.h"
#include "absl/status/status.h"
#include "backend/access/read.h"
#include "backend/common/ids.h"
#include "backend/datamodel/key_set.h"
#include "backend/schema/updater/schema_updater.h"
#include "backend/transaction/options.h"
#include "common/clock.h"
#include "common/errors.h"
Expand All @@ -43,7 +43,7 @@ using zetasql::values::Int64;

class DatabaseTest : public ::testing::Test {
public:
DatabaseTest() {}
DatabaseTest() = default;

ReadArg read_column(std::string table_name, std::string column_name) {
ReadArg args;
Expand All @@ -58,21 +58,24 @@ class DatabaseTest : public ::testing::Test {
};

TEST_F(DatabaseTest, CreateSuccessful) {
ZETASQL_EXPECT_OK(Database::Create(&clock_, /*create_statements=*/{}));
ZETASQL_EXPECT_OK(Database::Create(&clock_, SchemaChangeOperation{}));

ZETASQL_EXPECT_OK(Database::Create(&clock_, {R"(
std::vector<std::string> create_statements = {R"(
CREATE TABLE T(
k1 INT64,
k2 INT64,
) PRIMARY KEY(k1)
)",
R"(
CREATE INDEX I on T(k1))"}));
R"(
CREATE INDEX I on T(k1))"};

ZETASQL_EXPECT_OK(Database::Create(
&clock_, SchemaChangeOperation{.statements = create_statements}));
}

TEST_F(DatabaseTest, UpdateSchemaSuccessful) {
ZETASQL_ASSERT_OK_AND_ASSIGN(auto db,
Database::Create(&clock_, /*create_statements=*/{}));
Database::Create(&clock_, SchemaChangeOperation{}));

std::vector<std::string> update_statements = {R"(
CREATE TABLE T(
Expand All @@ -87,18 +90,22 @@ TEST_F(DatabaseTest, UpdateSchemaSuccessful) {
absl::Status backfill_status;
int completed_statements;
absl::Time commit_ts;
ZETASQL_EXPECT_OK(db->UpdateSchema(update_statements, &completed_statements,
&commit_ts, &backfill_status));
ZETASQL_EXPECT_OK(
db->UpdateSchema(SchemaChangeOperation{.statements = update_statements},
&completed_statements, &commit_ts, &backfill_status));
ZETASQL_EXPECT_OK(backfill_status);
}

TEST_F(DatabaseTest, UpdateSchemaPartialSuccess) {
ZETASQL_ASSERT_OK_AND_ASSIGN(auto db, Database::Create(&clock_, {R"(
std::vector<std::string> create_statements = {R"(
CREATE TABLE T(
k1 INT64,
k2 INT64,
) PRIMARY KEY(k1)
)"}));
)"};
ZETASQL_ASSERT_OK_AND_ASSIGN(
auto db, Database::Create(&clock_, SchemaChangeOperation{
.statements = create_statements}));

ZETASQL_ASSERT_OK_AND_ASSIGN(
std::unique_ptr<ReadWriteTransaction> txn,
Expand Down Expand Up @@ -133,8 +140,9 @@ TEST_F(DatabaseTest, UpdateSchemaPartialSuccess) {
absl::Time commit_ts;

// The statements are semantically valid, indicated by an OK return status.
ZETASQL_EXPECT_OK(db->UpdateSchema(update_statements, &completed_statements,
&commit_ts, &backfill_status));
ZETASQL_EXPECT_OK(
db->UpdateSchema(SchemaChangeOperation{.statements = update_statements},
&completed_statements, &commit_ts, &backfill_status));

// But the backfill statements fail.
EXPECT_EQ(backfill_status,
Expand All @@ -145,13 +153,15 @@ TEST_F(DatabaseTest, UpdateSchemaPartialSuccess) {
}

TEST_F(DatabaseTest, ConcurrentSchemaChangeIsAborted) {
ZETASQL_ASSERT_OK_AND_ASSIGN(auto db, Database::Create(&clock_, {
R"(
std::vector<std::string> create_statements = {R"(
CREATE TABLE T(
k1 INT64,
k2 INT64,
) PRIMARY KEY(k1))",
}));
) PRIMARY KEY(k1)
)"};
ZETASQL_ASSERT_OK_AND_ASSIGN(
auto db, Database::Create(&clock_, SchemaChangeOperation{
.statements = create_statements}));

// Initiate a Read inside a read-write transaction to acquire locks.
std::unique_ptr<RowCursor> row_cursor;
Expand All @@ -160,40 +170,46 @@ TEST_F(DatabaseTest, ConcurrentSchemaChangeIsAborted) {
db->CreateReadWriteTransaction(ReadWriteOptions(), RetryState()));
ZETASQL_EXPECT_OK(txn->Read(read_column("T", "k1"), &row_cursor));

absl::Status backfill_status;
int completed_statements;
absl::Time commit_ts;
EXPECT_EQ(
db->UpdateSchema({R"(
std::vector<std::string> update_statements = {R"(
CREATE TABLE T(
k1 INT64,
k2 INT64,
) PRIMARY KEY(k1)
)"},
)"};
absl::Status backfill_status;
int completed_statements;
absl::Time commit_ts;
EXPECT_EQ(
db->UpdateSchema(SchemaChangeOperation{.statements = update_statements},
&completed_statements, &commit_ts, &backfill_status),
error::ConcurrentSchemaChangeOrReadWriteTxnInProgress());
}

TEST_F(DatabaseTest, SchemaChangeLocksSuccesfullyReleased) {
ZETASQL_ASSERT_OK_AND_ASSIGN(auto db, Database::Create(&clock_, {R"(
std::vector<std::string> create_statements = {R"(
CREATE TABLE T(
k1 INT64,
k2 INT64,
) PRIMARY KEY(k1))"}));
) PRIMARY KEY(k1)
)"};
ZETASQL_ASSERT_OK_AND_ASSIGN(
auto db, Database::Create(&clock_, SchemaChangeOperation{
.statements = create_statements}));

// Schema update will fail.
absl::Status backfill_status;
int completed_statements;
absl::Time commit_ts;
EXPECT_FALSE(db->UpdateSchema({R"(
std::vector<std::string> update_statements = {R"(
CREATE TABLE T(
k1 INT64,
k2 INT64,
) PRIMARY KEY(k1)
)"},
&completed_statements, &commit_ts,
&backfill_status)
.ok());
)"};
absl::Status backfill_status;
int completed_statements;
absl::Time commit_ts;
EXPECT_FALSE(
db->UpdateSchema(SchemaChangeOperation{.statements = update_statements},
&completed_statements, &commit_ts, &backfill_status)
.ok());

// Can still run transactions as locks would have been released.
std::unique_ptr<RowCursor> row_cursor;
Expand Down
Loading

0 comments on commit d3ce8c2

Please sign in to comment.