From 6f3ec8fd4033931b0ab2b4955f2d69dd94cb2ccf Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 11 Jan 2024 00:24:59 +0000 Subject: [PATCH 1/5] sql/parser: add ALTER V CLUSTER START REP FROM syntax Release note: none. Epic: none. --- pkg/sql/parser/sql.y | 10 +++++++ pkg/sql/parser/testdata/alter_virtual_cluster | 8 +++++ pkg/sql/sem/tree/alter_tenant.go | 30 ++++++++++++++++--- pkg/sql/sem/tree/walk.go | 18 +++++++++++ 4 files changed, 62 insertions(+), 4 deletions(-) diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index dc0fcf1accf3..8ab19e5012a8 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -6922,6 +6922,16 @@ alter_virtual_cluster_replication_stmt: Options: *$6.tenantReplicationOptions(), } } +| ALTER virtual_cluster virtual_cluster_spec START REPLICATION OF d_expr ON d_expr opt_with_replication_options + { + /* SKIP DOC */ + $$.val = &tree.AlterTenantReplication{ + TenantSpec: $3.tenantSpec(), + ReplicationSourceTenantName: &tree.TenantSpec{IsName: true, Expr: $7.expr()}, + ReplicationSourceAddress: $9.expr(), + Options: *$10.tenantReplicationOptions(), + } + } // %Help: ALTER VIRTUAL CLUSTER SETTING - alter cluster setting overrides for virtual clusters diff --git a/pkg/sql/parser/testdata/alter_virtual_cluster b/pkg/sql/parser/testdata/alter_virtual_cluster index 67fb60a56697..a55cd31bf687 100644 --- a/pkg/sql/parser/testdata/alter_virtual_cluster +++ b/pkg/sql/parser/testdata/alter_virtual_cluster @@ -255,6 +255,14 @@ ALTER VIRTUAL CLUSTER ('foo') SET REPLICATION RETENTION = ('-2h') -- fully paren ALTER VIRTUAL CLUSTER '_' SET REPLICATION RETENTION = '_' -- literals removed ALTER VIRTUAL CLUSTER 'foo' SET REPLICATION RETENTION = '-2h' -- identifiers removed +parse +ALTER TENANT 'foo' START REPLICATION OF 'bar' ON 'baz' WITH RETENTION = '-1h' +---- +ALTER VIRTUAL CLUSTER 'foo' SET REPLICATION RETENTION = '-1h' -- normalized! +ALTER VIRTUAL CLUSTER ('foo') SET REPLICATION RETENTION = ('-1h') -- fully parenthesized +ALTER VIRTUAL CLUSTER '_' SET REPLICATION RETENTION = '_' -- literals removed +ALTER VIRTUAL CLUSTER 'foo' SET REPLICATION RETENTION = '-1h' -- identifiers removed + parse ALTER VIRTUAL CLUSTER 'foo' RENAME TO bar ---- diff --git a/pkg/sql/sem/tree/alter_tenant.go b/pkg/sql/sem/tree/alter_tenant.go index 23077428e129..c5cf0ca283dc 100644 --- a/pkg/sql/sem/tree/alter_tenant.go +++ b/pkg/sql/sem/tree/alter_tenant.go @@ -18,10 +18,15 @@ type ReplicationCutoverTime struct { // AlterTenantReplication represents an ALTER VIRTUAL CLUSTER REPLICATION statement. type AlterTenantReplication struct { - TenantSpec *TenantSpec - Command JobCommand - Cutover *ReplicationCutoverTime - Options TenantReplicationOptions + TenantSpec *TenantSpec + Command JobCommand + Cutover *ReplicationCutoverTime + ReplicationSourceTenantName *TenantSpec + // ReplicationSourceAddress is the address of the source cluster that we are + // replicating data from. + ReplicationSourceAddress Expr + + Options TenantReplicationOptions } var _ Statement = &AlterTenantReplication{} @@ -45,6 +50,23 @@ func (n *AlterTenantReplication) Format(ctx *FmtCtx) { } else if n.Command == PauseJob || n.Command == ResumeJob { ctx.WriteString(JobCommandToStatement[n.Command]) ctx.WriteString(" REPLICATION") + } else if n.ReplicationSourceTenantName != nil { + ctx.WriteString("START REPLICATION OF ") + ctx.FormatNode(n.ReplicationSourceTenantName) + ctx.WriteString(" ON ") + _, canOmitParentheses := n.ReplicationSourceAddress.(alreadyDelimitedAsSyntacticDExpr) + if !canOmitParentheses { + ctx.WriteByte('(') + } + ctx.FormatNode(n.ReplicationSourceAddress) + if !canOmitParentheses { + ctx.WriteByte(')') + } + + if !n.Options.IsDefault() { + ctx.WriteString(" WITH ") + ctx.FormatNode(&n.Options) + } } } diff --git a/pkg/sql/sem/tree/walk.go b/pkg/sql/sem/tree/walk.go index 6a0bbd8f163a..7e5de049c5e0 100644 --- a/pkg/sql/sem/tree/walk.go +++ b/pkg/sql/sem/tree/walk.go @@ -1032,6 +1032,24 @@ func (n *AlterTenantReplication) walkStmt(v Visitor) Statement { ret.Cutover.Timestamp = e } } + if n.ReplicationSourceAddress != nil { + e, changed := WalkExpr(v, n.ReplicationSourceAddress) + if changed { + if ret == n { + ret = n.copyNode() + } + ret.ReplicationSourceAddress = e + } + } + if n.ReplicationSourceTenantName != nil { + ts, changed := walkTenantSpec(v, n.ReplicationSourceTenantName) + if changed { + if ret == n { + ret = n.copyNode() + } + ret.TenantSpec = ts + } + } if n.Options.Retention != nil { e, changed := WalkExpr(v, n.Options.Retention) if changed { From 35c07bf21364742cbea15e10dd10266c385ad576 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 11 Jan 2024 00:25:48 +0000 Subject: [PATCH 2/5] streamingccl/streamclient: add PriorReplicationDetails to client Release note: none. Epic: none. --- pkg/ccl/streamingccl/streamclient/BUILD.bazel | 1 + pkg/ccl/streamingccl/streamclient/client.go | 2 ++ .../streamingccl/streamclient/client_test.go | 7 +++++ .../streamclient/partitioned_stream_client.go | 26 +++++++++++++++++++ .../streamclient/random_stream_client.go | 8 ++++++ .../stream_ingestion_processor_test.go | 7 +++++ 6 files changed, 51 insertions(+) diff --git a/pkg/ccl/streamingccl/streamclient/BUILD.bazel b/pkg/ccl/streamingccl/streamclient/BUILD.bazel index ff439321905b..cca821db4ebc 100644 --- a/pkg/ccl/streamingccl/streamclient/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamclient/BUILD.bazel @@ -39,6 +39,7 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", + "@com_github_cockroachdb_apd_v3//:apd", "@com_github_cockroachdb_errors//:errors", "@com_github_jackc_pgconn//:pgconn", "@com_github_jackc_pgx_v4//:pgx", diff --git a/pkg/ccl/streamingccl/streamclient/client.go b/pkg/ccl/streamingccl/streamclient/client.go index ef14a1c6106c..9b31d4efd5b3 100644 --- a/pkg/ccl/streamingccl/streamclient/client.go +++ b/pkg/ccl/streamingccl/streamclient/client.go @@ -89,6 +89,8 @@ type Client interface { // Complete completes a replication stream consumption. Complete(ctx context.Context, streamID streampb.StreamID, successfulIngestion bool) error + + PriorReplicationDetails(ctx context.Context, tenant roachpb.TenantName) (string, hlc.Timestamp, error) } // Topology is a configuration of stream partitions. These are particular to a diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go index 36d7b2573b78..2150341b31df 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/streamingccl/streamclient/client_test.go @@ -111,6 +111,13 @@ func (sc testStreamClient) Complete(_ context.Context, _ streampb.StreamID, _ bo return nil } +// PriorReplicationDetails implements the streamclient.Client interface. +func (sc testStreamClient) PriorReplicationDetails( + _ context.Context, _ roachpb.TenantName, +) (string, hlc.Timestamp, error) { + return "", hlc.Timestamp{}, nil +} + type testStreamSubscription struct { eventCh chan streamingccl.Event } diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go index 56ba69314aa4..c16f7583f841 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go @@ -13,6 +13,7 @@ import ( "net" "net/url" + "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -254,6 +255,31 @@ func (p *partitionedStreamClient) Complete( return nil } +// PriorReplicationDetails implements the streamclient.Client interface. +func (p *partitionedStreamClient) PriorReplicationDetails( + ctx context.Context, tenant roachpb.TenantName, +) (string, hlc.Timestamp, error) { + ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.PriorReplicationDetails") + defer sp.Finish() + + var srcID string + var activated string + p.mu.Lock() + defer p.mu.Unlock() + row := p.mu.srcConn.QueryRow(ctx, + `SELECT source_id, activation_time FROM [SHOW VIRTUAL CLUSTER $1 WITH PRIOR REPLICATION DETAILS]`, tenant) + if err := row.Scan(&srcID, &activated); err != nil { + return "", hlc.Timestamp{}, errors.Wrapf(err, "error querying prior replication details for %s", tenant) + } + + d, _, err := apd.NewFromString(activated) + if err != nil { + return "", hlc.Timestamp{}, err + } + ts, err := hlc.DecimalToHLC(d) + return srcID, ts, err +} + type partitionedStreamSubscription struct { err error srcConnConfig *pgx.ConnConfig diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index 977235fc93b3..79eac412fcf4 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -548,6 +548,14 @@ func (m *RandomStreamClient) Complete(_ context.Context, _ streampb.StreamID, _ return nil } +// PriorReplicationDetails implements the streamclient.Client interface. +func (p *RandomStreamClient) PriorReplicationDetails( + ctx context.Context, tenant roachpb.TenantName, +) (string, hlc.Timestamp, error) { + return "", hlc.Timestamp{}, nil + +} + type randomStreamSubscription struct { receiveFn func(ctx context.Context) error eventCh chan streamingccl.Event diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index 3859868474d5..c8dd444b0f23 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -159,6 +159,13 @@ func (m *mockStreamClient) Complete(_ context.Context, _ streampb.StreamID, _ bo return nil } +// PriorReplicationDetails implements the streamclient.Client interface. +func (m *mockStreamClient) PriorReplicationDetails( + _ context.Context, _ roachpb.TenantName, +) (string, hlc.Timestamp, error) { + return "", hlc.Timestamp{}, nil +} + // errorStreamClient always returns an error when consuming a partition. type errorStreamClient struct{ mockStreamClient } From 81ff5deaabfc17fffe396179b667a2a869194b44 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 11 Jan 2024 00:29:39 +0000 Subject: [PATCH 3/5] jobs,streamingest: remove unused DestinationTenantName Release note: none. Epic: none. --- .../streamingccl/streamingest/stream_ingestion_planning.go | 3 +-- pkg/jobs/jobspb/jobs.proto | 4 +--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index cb2c49b4d1f9..e17e01ff9b4f 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -301,8 +301,7 @@ func ingestionPlanHook( Span: keys.MakeTenantSpan(destinationTenantID), ReplicationTTLSeconds: retentionTTLSeconds, - DestinationTenantID: destinationTenantID, - DestinationTenantName: roachpb.TenantName(dstTenantName), + DestinationTenantID: destinationTenantID, SourceTenantName: roachpb.TenantName(sourceTenant), SourceTenantID: replicationProducerSpec.SourceTenantID, diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index c78463e79a1d..9817c0259103 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -96,9 +96,7 @@ message StreamIngestionDetails { (gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TenantName"]; - string destination_tenant_name = 9 [ - (gogoproto.nullable) = false, - (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TenantName"]; + reserved 9; // ID of the protected timestamp record that protects the destination tenant's // keyspan from GC while it is being replicated into. From d4b8ff75cb72773c03550d7468ac92b08019454f Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 11 Jan 2024 00:38:04 +0000 Subject: [PATCH 4/5] streamingest: pure refactor to pull createReplicationJob out Release note: none. Epic: none. --- .../streamingest/stream_ingestion_planning.go | 134 +++++++++++------- 1 file changed, 79 insertions(+), 55 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index e17e01ff9b4f..28df1e17461d 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -265,71 +265,95 @@ func ingestionPlanHook( } } - // Create a new stream with stream client. - client, err := streamclient.NewStreamClient(ctx, streamAddress, p.ExecCfg().InternalDB) - if err != nil { - return err - } - - // Create the producer job first for the purpose of observability, user is - // able to know the producer job id immediately after executing - // CREATE VIRTUAL CLUSTER ... FROM REPLICATION. - req := streampb.ReplicationProducerRequest{} - if !options.resumeTimestamp.IsEmpty() { - req = streampb.ReplicationProducerRequest{ - ReplicationStartTime: options.resumeTimestamp, - - // NB: These are checked against any - // PreviousSourceTenant on the source's tenant - // record. - TenantID: destinationTenantID, - ClusterID: p.ExtendedEvalContext().ClusterID, - } - } - - replicationProducerSpec, err := client.Create(ctx, roachpb.TenantName(sourceTenant), req) - if err != nil { - return err - } - if err := client.Close(ctx); err != nil { - return err - } + return createReplicationJob( + ctx, + p, + streamAddress, + sourceTenant, + destinationTenantID, + retentionTTLSeconds, + options.resumeTimestamp, + jobID, + ingestionStmt, + ) + } - streamIngestionDetails := jobspb.StreamIngestionDetails{ - StreamAddress: string(streamAddress), - StreamID: uint64(replicationProducerSpec.StreamID), - Span: keys.MakeTenantSpan(destinationTenantID), - ReplicationTTLSeconds: retentionTTLSeconds, + return fn, nil, nil, false, nil +} - DestinationTenantID: destinationTenantID, +func createReplicationJob( + ctx context.Context, + p sql.PlanHookState, + streamAddress streamingccl.StreamAddress, + sourceTenant string, + destinationTenantID roachpb.TenantID, + retentionTTLSeconds int32, + resumeTimestamp hlc.Timestamp, + jobID jobspb.JobID, + stmt *tree.CreateTenantFromReplication, +) error { + + // Create a new stream with stream client. + client, err := streamclient.NewStreamClient(ctx, streamAddress, p.ExecCfg().InternalDB) + if err != nil { + return err + } - SourceTenantName: roachpb.TenantName(sourceTenant), - SourceTenantID: replicationProducerSpec.SourceTenantID, - SourceClusterID: replicationProducerSpec.SourceClusterID, - ReplicationStartTime: replicationProducerSpec.ReplicationStartTime, + // Create the producer job first for the purpose of observability, user is + // able to know the producer job id immediately after executing + // CREATE VIRTUAL CLUSTER ... FROM REPLICATION. + req := streampb.ReplicationProducerRequest{} + if !resumeTimestamp.IsEmpty() { + req = streampb.ReplicationProducerRequest{ + ReplicationStartTime: resumeTimestamp, + + // NB: These are checked against any + // PreviousSourceTenant on the source's tenant + // record. + TenantID: destinationTenantID, + ClusterID: p.ExtendedEvalContext().ClusterID, } + } - jobDescription, err := streamIngestionJobDescription(p, from, ingestionStmt) - if err != nil { - return err - } + replicationProducerSpec, err := client.Create(ctx, roachpb.TenantName(sourceTenant), req) + if err != nil { + return err + } + if err := client.Close(ctx); err != nil { + return err + } - jr := jobs.Record{ - Description: jobDescription, - Username: p.User(), - Progress: jobspb.StreamIngestionProgress{ - ReplicatedTime: options.resumeTimestamp, - }, - Details: streamIngestionDetails, - } + streamIngestionDetails := jobspb.StreamIngestionDetails{ + StreamAddress: string(streamAddress), + StreamID: uint64(replicationProducerSpec.StreamID), + Span: keys.MakeTenantSpan(destinationTenantID), + ReplicationTTLSeconds: retentionTTLSeconds, + + DestinationTenantID: destinationTenantID, + SourceTenantName: roachpb.TenantName(sourceTenant), + SourceTenantID: replicationProducerSpec.SourceTenantID, + SourceClusterID: replicationProducerSpec.SourceClusterID, + ReplicationStartTime: replicationProducerSpec.ReplicationStartTime, + } - _, err = p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn( - ctx, jr, jobID, p.InternalSQLTxn(), - ) + jobDescription, err := streamIngestionJobDescription(p, string(streamAddress), stmt) + if err != nil { return err } - return fn, nil, nil, false, nil + jr := jobs.Record{ + Description: jobDescription, + Username: p.User(), + Progress: jobspb.StreamIngestionProgress{ + ReplicatedTime: resumeTimestamp, + }, + Details: streamIngestionDetails, + } + + _, err = p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn( + ctx, jr, jobID, p.InternalSQLTxn(), + ) + return err } func init() { From 23566b2ea435d218f73936da3953598198566e45 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 11 Jan 2024 01:03:17 +0000 Subject: [PATCH 5/5] streamingest: support reversing replication direction After promoting a standby that was replicating from some primary to be its own active cluster, turning it into the new primary, it is often desirable to reverse the replication direction, so that changes made to this now-primary cluster are replicated _back_ to the former primary, now operating as a standby. Turning a formerly active, primary cluster into a replicating standby cluster is particularly common during "failback" flows, where the once primary cluster is returned to primary status after the standby had temporarily been made the active cluster. Re-promoting the primary in such cases requires it have a virtual cluster that is fully caught up with the promoted standby cluster that is serving traffic, then performing cut-over from that standby back to the primary. This _could_ be performed by creating a completely new virtual cluster in the primary cluster from a replication stream of the temporarily active standby; just like the creation of a normal secondary replicating cluster this would start by backfilling all data from the source -- the promoted standby -- and then continuously applying changes as they are streamed to it. However, in cases where this is being done on a cluster _that previously was the primary cluster_, the cluster may still have a nearly up to date copy of the virtual cluster, with only those writes that have been applied by the promoted standby after cutover missing from it. In such cases, backfilling a completely new virtual cluster from the promoted standby involves copying far more data than needed; most of that data is _already on the primary_. Instead, the new syntax `ALTER VIRTUAL CLUSTER a START REPLICATION FROM a ON x` can be used to indicate the virtual cluster 'a' should be rewound back to the time at which virtual cluster 'a' on physical cluster 'x' -- the promoted standby -- diverged from it. This will check with cluster x to confirm that its virtual cluster a was indeed replicated from the cluster running the command, and then communicate the time after which they diverged, once cluster x was made active and started accepting new writes. The cluster rewinds virtual cluster x back to that timestamp, then starts replicating from cluster x at that timestamp. Release note (enterprise change): A virtual cluster which was previously being used as the source for physical cluster replication into a standby in another cluster which has since been activated can now be reconfigured to become a standby of that now-promoted cluster, reversing the direction of the replication stream, and does so by reusing the existing data as much as possible. Epic: CRDB-34233. --- .../streamingest/alter_replication_job.go | 125 +++++++++++++++++- .../streamingest/stream_ingestion_dist.go | 31 ++++- .../streamingest/stream_ingestion_job_test.go | 6 +- .../streamingest/stream_ingestion_planning.go | 10 +- pkg/jobs/jobspb/jobs.proto | 4 + pkg/sql/exprutil/type_check.go | 2 +- pkg/sql/sem/tree/create.go | 4 +- 7 files changed, 172 insertions(+), 10 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go index 16792a3c6477..e76d9dda44b2 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go @@ -10,10 +10,12 @@ package streamingest import ( "context" + "fmt" "math" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -104,7 +106,8 @@ func alterReplicationJobTypeCheck( if err := exprutil.TypeCheck( ctx, alterReplicationJobOp, p.SemaCtx(), exprutil.TenantSpec{TenantSpec: alterStmt.TenantSpec}, - exprutil.Strings{alterStmt.Options.Retention}, + exprutil.TenantSpec{TenantSpec: alterStmt.ReplicationSourceTenantName}, + exprutil.Strings{alterStmt.Options.Retention, alterStmt.ReplicationSourceAddress}, ); err != nil { return false, nil, err } @@ -182,6 +185,24 @@ func alterReplicationJobHook( return nil, nil, nil, false, err } + var srcAddr, srcTenant string + if alterTenantStmt.ReplicationSourceAddress != nil { + srcAddr, err = exprEval.String(ctx, alterTenantStmt.ReplicationSourceAddress) + if err != nil { + return nil, nil, nil, false, err + } + + _, _, srcTenant, err = exprEval.TenantSpec(ctx, alterTenantStmt.ReplicationSourceTenantName) + if err != nil { + return nil, nil, nil, false, err + } + } + + retentionTTLSeconds := defaultRetentionTTLSeconds + if ret, ok := options.GetRetention(); ok { + retentionTTLSeconds = ret + } + fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { if err := utilccl.CheckEnterpriseEnabled( p.ExecCfg().Settings, @@ -198,6 +219,24 @@ func alterReplicationJobHook( if err != nil { return err } + + // If a source address is being provided, we're enabling replication into an + // existing virtual cluster. It must be inactive, and we'll verify that it + // was the cluster from which the one it will replicate was replicated, i.e. + // that we're reversing the direction of replication. We will then revert it + // to the time they diverged and pick up from there. + if alterTenantStmt.ReplicationSourceAddress != nil { + return alterTenantRestartReplication( + ctx, + p, + tenInfo, + srcAddr, + srcTenant, + retentionTTLSeconds, + alterTenantStmt, + ) + } + if tenInfo.PhysicalReplicationConsumerJobID == 0 { return errors.Newf("tenant %q (%d) does not have an active replication job", tenInfo.Name, tenInfo.ID) @@ -238,6 +277,90 @@ func alterReplicationJobHook( return fn, nil, nil, false, nil } +func alterTenantRestartReplication( + ctx context.Context, + p sql.PlanHookState, + tenInfo *mtinfopb.TenantInfo, + srcAddr string, + srcTenant string, + retentionTTLSeconds int32, + alterTenantStmt *tree.AlterTenantReplication, +) error { + dstTenantID, err := roachpb.MakeTenantID(tenInfo.ID) + if err != nil { + return err + } + + // Here, we try to prevent the user from making a few + // mistakes. Starting a replication stream into an + // existing tenant requires both that it is offline and + // that it is consistent as of the provided timestamp. + if tenInfo.ServiceMode != mtinfopb.ServiceModeNone { + return errors.Newf("cannot start replication for tenant %q (%s) in service mode %s; service mode must be %s", + tenInfo.Name, + dstTenantID, + tenInfo.ServiceMode, + mtinfopb.ServiceModeNone, + ) + } + + streamAddress := streamingccl.StreamAddress(srcAddr) + streamURL, err := streamAddress.URL() + if err != nil { + return errors.Wrap(err, "url") + } + streamAddress = streamingccl.StreamAddress(streamURL.String()) + + client, err := streamclient.NewStreamClient(ctx, streamingccl.StreamAddress(srcAddr), p.ExecCfg().InternalDB) + if err != nil { + return errors.Wrap(err, "creating client") + } + srcID, resumeTS, err := client.PriorReplicationDetails(ctx, roachpb.TenantName(srcTenant)) + if err != nil { + return errors.CombineErrors(errors.Wrap(err, "fetching prior replication details"), client.Close(ctx)) + } + if err := client.Close(ctx); err != nil { + return err + } + + if expected := fmt.Sprintf("%s:%s", p.ExtendedEvalContext().ClusterID, dstTenantID); srcID != expected { + return errors.Newf( + "tenant %q on specified cluster reports it was replicated from %q; %q cannot be rewound to start replication", + srcTenant, srcID, expected, + ) + } + + const revertFirst = true + + jobID := p.ExecCfg().JobRegistry.MakeJobID() + // Reset the last revert timestamp. + tenInfo.LastRevertTenantTimestamp = hlc.Timestamp{} + tenInfo.PhysicalReplicationConsumerJobID = jobID + tenInfo.DataState = mtinfopb.DataStateAdd + if err := sql.UpdateTenantRecord(ctx, p.ExecCfg().Settings, + p.InternalSQLTxn(), tenInfo); err != nil { + return err + } + + return errors.Wrap(createReplicationJob( + ctx, + p, + streamAddress, + srcTenant, + dstTenantID, + retentionTTLSeconds, + resumeTS, + revertFirst, + jobID, + &tree.CreateTenantFromReplication{ + TenantSpec: alterTenantStmt.TenantSpec, + ReplicationSourceTenantName: alterTenantStmt.ReplicationSourceTenantName, + ReplicationSourceAddress: alterTenantStmt.ReplicationSourceAddress, + Options: alterTenantStmt.Options, + }, + ), "creating replication job") +} + // alterTenantJobCutover returns the cutover timestamp that was used to initiate // the cutover process - if the command is 'ALTER VIRTUAL CLUSTER .. COMPLETE REPLICATION // TO LATEST' then the frontier high water timestamp is used. diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go index e9afd97e7dc3..3d64bf3cb6b6 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" + "github.com/cockroachdb/cockroach/pkg/ccl/revertccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -67,9 +68,35 @@ func startDistIngestion( } else { heartbeatTimestamp = initialScanTimestamp } - msg := redact.Sprintf("resuming stream (producer job %d) from %s", streamID, heartbeatTimestamp) - updateRunningStatus(ctx, ingestionJob, jobspb.InitializingReplication, msg) + + if streamProgress.InitialRevertRequired { + updateRunningStatus(ctx, ingestionJob, jobspb.InitializingReplication, "reverting existing data to prepare for replication") + + log.Infof(ctx, "reverting tenant %s to time %s before starting replication", details.DestinationTenantID, replicatedTime) + + spanToRevert := keys.MakeTenantSpan(details.DestinationTenantID) + if err := revertccl.RevertSpansFanout(ctx, execCtx.ExecCfg().DB, execCtx, + []roachpb.Span{spanToRevert}, + replicatedTime, + false, /* ignoreGCThreshold */ + revertccl.RevertDefaultBatchSize, + nil, /* onCompletedCallback */ + ); err != nil { + return err + } + + if err := ingestionJob.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { + md.Progress.GetStreamIngest().InitialRevertRequired = false + ju.UpdateProgress(md.Progress) + updateRunningStatusInternal(md, ju, jobspb.InitializingReplication, string(msg)) + return nil + }); err != nil { + return errors.Wrap(err, "failed to update job progress") + } + } else { + updateRunningStatus(ctx, ingestionJob, jobspb.InitializingReplication, msg) + } client, err := connectToActiveClient(ctx, ingestionJob, execCtx.ExecCfg().InternalDB, streamclient.WithStreamID(streamID)) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 80496fb122d6..a0adae445a58 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -251,8 +251,7 @@ func TestTenantStreamingFailback(t *testing.T) { sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f STOP SERVICE") waitUntilTenantServerStopped(t, serverA.SystemLayer(), "f") t.Logf("starting replication g->f") - sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f RESET DATA TO SYSTEM TIME $1::decimal", ts1) - sqlA.Exec(t, fmt.Sprintf("CREATE VIRTUAL CLUSTER f FROM REPLICATION OF g ON $1 WITH RESUME TIMESTAMP = '%s'", ts1), serverBURL.String()) + sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f START REPLICATION OF g ON $1", serverBURL.String()) _, consumerFJobID := replicationtestutils.GetStreamJobIds(t, ctx, sqlA, roachpb.TenantName("f")) t.Logf("waiting for f@%s", ts2) replicationtestutils.WaitUntilReplicatedTime(t, @@ -281,8 +280,7 @@ func TestTenantStreamingFailback(t *testing.T) { sqlB.Exec(t, "ALTER VIRTUAL CLUSTER g STOP SERVICE") waitUntilTenantServerStopped(t, serverB.SystemLayer(), "g") t.Logf("starting replication f->g") - sqlB.Exec(t, "ALTER VIRTUAL CLUSTER g RESET DATA TO SYSTEM TIME $1::decimal", ts3) - sqlB.Exec(t, fmt.Sprintf("CREATE VIRTUAL CLUSTER g FROM REPLICATION OF f ON $1 WITH RESUME TIMESTAMP = '%s'", ts3), serverAURL.String()) + sqlB.Exec(t, "ALTER VIRTUAL CLUSTER g START REPLICATION OF f ON $1", serverAURL.String()) _, consumerGJobID = replicationtestutils.GetStreamJobIds(t, ctx, sqlB, roachpb.TenantName("g")) t.Logf("waiting for g@%s", ts3) replicationtestutils.WaitUntilReplicatedTime(t, diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index 28df1e17461d..da05ab1fe582 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -177,6 +177,7 @@ func ingestionPlanHook( sourceTenant, dstTenantName, dstTenantID) } + var revertFirst bool // If we don't have a resume timestamp, make a new tenant jobID := p.ExecCfg().JobRegistry.MakeJobID() var destinationTenantID roachpb.TenantID @@ -214,6 +215,8 @@ func ingestionPlanHook( // clause and the tenant already existed. Nothing else to do. return nil } + // No revert required since this is a new tenant. + revertFirst = false } else { tenantRecord, err := sql.GetTenantRecordByName( ctx, p.ExecCfg().Settings, @@ -250,6 +253,8 @@ func ingestionPlanHook( tenantRecord.LastRevertTenantTimestamp, ) } + // No revert required, since we verified that we were already reverted. + revertFirst = false // Reset the last revert timestamp. tenantRecord.LastRevertTenantTimestamp = hlc.Timestamp{} @@ -273,6 +278,7 @@ func ingestionPlanHook( destinationTenantID, retentionTTLSeconds, options.resumeTimestamp, + revertFirst, jobID, ingestionStmt, ) @@ -289,6 +295,7 @@ func createReplicationJob( destinationTenantID roachpb.TenantID, retentionTTLSeconds int32, resumeTimestamp hlc.Timestamp, + revertFirst bool, jobID jobspb.JobID, stmt *tree.CreateTenantFromReplication, ) error { @@ -345,7 +352,8 @@ func createReplicationJob( Description: jobDescription, Username: p.User(), Progress: jobspb.StreamIngestionProgress{ - ReplicatedTime: resumeTimestamp, + ReplicatedTime: resumeTimestamp, + InitialRevertRequired: revertFirst, }, Details: streamIngestionDetails, } diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 9817c0259103..13ea902903a1 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -191,6 +191,10 @@ message StreamIngestionProgress { // the source tenant. bool initial_split_complete = 9; + // InitialRevertRequiredd is true if the stream requires an initial revert to + // the start time before it can continue (e.g. when reusing a tenant's data). + bool initial_revert_required = 10; + // Next Id: 10 } diff --git a/pkg/sql/exprutil/type_check.go b/pkg/sql/exprutil/type_check.go index 109046c724fc..1a70c569e11e 100644 --- a/pkg/sql/exprutil/type_check.go +++ b/pkg/sql/exprutil/type_check.go @@ -110,7 +110,7 @@ func (b Bools) typeCheck(ctx context.Context, op string, semaCtx *tree.SemaConte } func (ts TenantSpec) typeCheck(ctx context.Context, op string, semaCtx *tree.SemaContext) error { - if ts.All { + if ts.TenantSpec == nil || ts.All { return nil } if ts.IsName { diff --git a/pkg/sql/sem/tree/create.go b/pkg/sql/sem/tree/create.go index 7d4794f90f62..06494695f4e0 100644 --- a/pkg/sql/sem/tree/create.go +++ b/pkg/sql/sem/tree/create.go @@ -2250,7 +2250,9 @@ func (node *CreateTenantFromReplication) Format(ctx *FmtCtx) { // NB: we do not anonymize the tenant name because we assume that tenant names // do not contain sensitive information. ctx.FormatNode(node.TenantSpec) - ctx.FormatNode(node.Like) + if node.Like != nil { + ctx.FormatNode(node.Like) + } if node.ReplicationSourceAddress != nil { ctx.WriteString(" FROM REPLICATION OF ")