Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
117656: streamingest: support reversing replication direction r=msbutler a=dt

After promoting a standby that was replicating from some primary to be
its own active cluster, turning it into the new primary, it is often
desirable to reverse the replication direction, so that changes made to
this now-primary cluster are replicated _back_ to the former primary,
now operating as a standby.

Turning a formerly active, primary cluster into a replicating standby
cluster is particularly common during "failback" flows, where the once
primary cluster is returned to primary status after the standby had
temporarily been made the active cluster.

Re-promoting the primary in such cases requires it have a virtual cluster
that is fully caught up with the promoted standby cluster that is serving
traffic, then performing cut-over from that standby back to the primary.

This _could_ be performed by creating a completely new virtual cluster
in the primary cluster from a replication stream of the temporarily active
standby; just like the creation of a normal secondary replicating cluster
this would start by backfilling all data from the source -- the promoted
standby -- and then continuously applying changes as they are streamed
to it.

However, in cases where this is being done on a cluster _that previously
was the primary cluster_, the cluster may still have a nearly up to date
copy of the virtual cluster, with only those writes that have been applied
by the promoted standby after cutover missing from it. In such cases,
backfilling a completely new virtual cluster from the promoted standby
involves copying far more data than needed; most of that data is _already
on the primary_.

Instead, the new syntax `ALTER VIRTUAL CLUSTER a START REPLICATION FROM a ON x`
can be used to indicate the virtual cluster 'a' should be rewound back to
the time at which virtual cluster 'a' on physical cluster 'x' -- the promoted
standby -- diverged from it. This will check with cluster x to confirm that
its virtual cluster a was indeed replicated from the cluster running the
command, and then communicate the time after which they diverged, once
cluster x was made active and started accepting new writes. The cluster
rewinds virtual cluster x back to that timestamp, then starts replicating
from cluster x at that timestamp.

Release note (enterprise change): A virtual cluster which was previously being
used as the source for physical cluster replication into a standby in another
cluster which has since been activated can now be reconfigured to become a
standby of that now-promoted cluster, reversing the direction of the replication
stream, and does so by reusing the existing data as much as possible.

Epic: CRDB-34233.

Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
craig[bot] and dt committed Jan 12, 2024
2 parents 8c5b9c7 + 23566b2 commit e2ad8d7
Show file tree
Hide file tree
Showing 17 changed files with 364 additions and 72 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgconn//:pgconn",
"@com_github_jackc_pgx_v4//:pgx",
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ type Client interface {

// Complete completes a replication stream consumption.
Complete(ctx context.Context, streamID streampb.StreamID, successfulIngestion bool) error

PriorReplicationDetails(ctx context.Context, tenant roachpb.TenantName) (string, hlc.Timestamp, error)
}

// Topology is a configuration of stream partitions. These are particular to a
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ func (sc testStreamClient) Complete(_ context.Context, _ streampb.StreamID, _ bo
return nil
}

// PriorReplicationDetails implements the streamclient.Client interface.
func (sc testStreamClient) PriorReplicationDetails(
_ context.Context, _ roachpb.TenantName,
) (string, hlc.Timestamp, error) {
return "", hlc.Timestamp{}, nil
}

type testStreamSubscription struct {
eventCh chan streamingccl.Event
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"net"
"net/url"

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -254,6 +255,31 @@ func (p *partitionedStreamClient) Complete(
return nil
}

// PriorReplicationDetails implements the streamclient.Client interface.
func (p *partitionedStreamClient) PriorReplicationDetails(
ctx context.Context, tenant roachpb.TenantName,
) (string, hlc.Timestamp, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.PriorReplicationDetails")
defer sp.Finish()

var srcID string
var activated string
p.mu.Lock()
defer p.mu.Unlock()
row := p.mu.srcConn.QueryRow(ctx,
`SELECT source_id, activation_time FROM [SHOW VIRTUAL CLUSTER $1 WITH PRIOR REPLICATION DETAILS]`, tenant)
if err := row.Scan(&srcID, &activated); err != nil {
return "", hlc.Timestamp{}, errors.Wrapf(err, "error querying prior replication details for %s", tenant)
}

d, _, err := apd.NewFromString(activated)
if err != nil {
return "", hlc.Timestamp{}, err
}
ts, err := hlc.DecimalToHLC(d)
return srcID, ts, err
}

type partitionedStreamSubscription struct {
err error
srcConnConfig *pgx.ConnConfig
Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,14 @@ func (m *RandomStreamClient) Complete(_ context.Context, _ streampb.StreamID, _
return nil
}

// PriorReplicationDetails implements the streamclient.Client interface.
func (p *RandomStreamClient) PriorReplicationDetails(
ctx context.Context, tenant roachpb.TenantName,
) (string, hlc.Timestamp, error) {
return "", hlc.Timestamp{}, nil

}

type randomStreamSubscription struct {
receiveFn func(ctx context.Context) error
eventCh chan streamingccl.Event
Expand Down
125 changes: 124 additions & 1 deletion pkg/ccl/streamingccl/streamingest/alter_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ package streamingest

import (
"context"
"fmt"
"math"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down Expand Up @@ -104,7 +106,8 @@ func alterReplicationJobTypeCheck(
if err := exprutil.TypeCheck(
ctx, alterReplicationJobOp, p.SemaCtx(),
exprutil.TenantSpec{TenantSpec: alterStmt.TenantSpec},
exprutil.Strings{alterStmt.Options.Retention},
exprutil.TenantSpec{TenantSpec: alterStmt.ReplicationSourceTenantName},
exprutil.Strings{alterStmt.Options.Retention, alterStmt.ReplicationSourceAddress},
); err != nil {
return false, nil, err
}
Expand Down Expand Up @@ -182,6 +185,24 @@ func alterReplicationJobHook(
return nil, nil, nil, false, err
}

var srcAddr, srcTenant string
if alterTenantStmt.ReplicationSourceAddress != nil {
srcAddr, err = exprEval.String(ctx, alterTenantStmt.ReplicationSourceAddress)
if err != nil {
return nil, nil, nil, false, err
}

_, _, srcTenant, err = exprEval.TenantSpec(ctx, alterTenantStmt.ReplicationSourceTenantName)
if err != nil {
return nil, nil, nil, false, err
}
}

retentionTTLSeconds := defaultRetentionTTLSeconds
if ret, ok := options.GetRetention(); ok {
retentionTTLSeconds = ret
}

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
if err := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings,
Expand All @@ -198,6 +219,24 @@ func alterReplicationJobHook(
if err != nil {
return err
}

// If a source address is being provided, we're enabling replication into an
// existing virtual cluster. It must be inactive, and we'll verify that it
// was the cluster from which the one it will replicate was replicated, i.e.
// that we're reversing the direction of replication. We will then revert it
// to the time they diverged and pick up from there.
if alterTenantStmt.ReplicationSourceAddress != nil {
return alterTenantRestartReplication(
ctx,
p,
tenInfo,
srcAddr,
srcTenant,
retentionTTLSeconds,
alterTenantStmt,
)
}

if tenInfo.PhysicalReplicationConsumerJobID == 0 {
return errors.Newf("tenant %q (%d) does not have an active replication job",
tenInfo.Name, tenInfo.ID)
Expand Down Expand Up @@ -238,6 +277,90 @@ func alterReplicationJobHook(
return fn, nil, nil, false, nil
}

func alterTenantRestartReplication(
ctx context.Context,
p sql.PlanHookState,
tenInfo *mtinfopb.TenantInfo,
srcAddr string,
srcTenant string,
retentionTTLSeconds int32,
alterTenantStmt *tree.AlterTenantReplication,
) error {
dstTenantID, err := roachpb.MakeTenantID(tenInfo.ID)
if err != nil {
return err
}

// Here, we try to prevent the user from making a few
// mistakes. Starting a replication stream into an
// existing tenant requires both that it is offline and
// that it is consistent as of the provided timestamp.
if tenInfo.ServiceMode != mtinfopb.ServiceModeNone {
return errors.Newf("cannot start replication for tenant %q (%s) in service mode %s; service mode must be %s",
tenInfo.Name,
dstTenantID,
tenInfo.ServiceMode,
mtinfopb.ServiceModeNone,
)
}

streamAddress := streamingccl.StreamAddress(srcAddr)
streamURL, err := streamAddress.URL()
if err != nil {
return errors.Wrap(err, "url")
}
streamAddress = streamingccl.StreamAddress(streamURL.String())

client, err := streamclient.NewStreamClient(ctx, streamingccl.StreamAddress(srcAddr), p.ExecCfg().InternalDB)
if err != nil {
return errors.Wrap(err, "creating client")
}
srcID, resumeTS, err := client.PriorReplicationDetails(ctx, roachpb.TenantName(srcTenant))
if err != nil {
return errors.CombineErrors(errors.Wrap(err, "fetching prior replication details"), client.Close(ctx))
}
if err := client.Close(ctx); err != nil {
return err
}

if expected := fmt.Sprintf("%s:%s", p.ExtendedEvalContext().ClusterID, dstTenantID); srcID != expected {
return errors.Newf(
"tenant %q on specified cluster reports it was replicated from %q; %q cannot be rewound to start replication",
srcTenant, srcID, expected,
)
}

const revertFirst = true

jobID := p.ExecCfg().JobRegistry.MakeJobID()
// Reset the last revert timestamp.
tenInfo.LastRevertTenantTimestamp = hlc.Timestamp{}
tenInfo.PhysicalReplicationConsumerJobID = jobID
tenInfo.DataState = mtinfopb.DataStateAdd
if err := sql.UpdateTenantRecord(ctx, p.ExecCfg().Settings,
p.InternalSQLTxn(), tenInfo); err != nil {
return err
}

return errors.Wrap(createReplicationJob(
ctx,
p,
streamAddress,
srcTenant,
dstTenantID,
retentionTTLSeconds,
resumeTS,
revertFirst,
jobID,
&tree.CreateTenantFromReplication{
TenantSpec: alterTenantStmt.TenantSpec,
ReplicationSourceTenantName: alterTenantStmt.ReplicationSourceTenantName,
ReplicationSourceAddress: alterTenantStmt.ReplicationSourceAddress,
Options: alterTenantStmt.Options,
},
), "creating replication job")
}

// alterTenantJobCutover returns the cutover timestamp that was used to initiate
// the cutover process - if the command is 'ALTER VIRTUAL CLUSTER .. COMPLETE REPLICATION
// TO LATEST' then the frontier high water timestamp is used.
Expand Down
31 changes: 29 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
"github.com/cockroachdb/cockroach/pkg/ccl/revertccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -67,9 +68,35 @@ func startDistIngestion(
} else {
heartbeatTimestamp = initialScanTimestamp
}

msg := redact.Sprintf("resuming stream (producer job %d) from %s", streamID, heartbeatTimestamp)
updateRunningStatus(ctx, ingestionJob, jobspb.InitializingReplication, msg)

if streamProgress.InitialRevertRequired {
updateRunningStatus(ctx, ingestionJob, jobspb.InitializingReplication, "reverting existing data to prepare for replication")

log.Infof(ctx, "reverting tenant %s to time %s before starting replication", details.DestinationTenantID, replicatedTime)

spanToRevert := keys.MakeTenantSpan(details.DestinationTenantID)
if err := revertccl.RevertSpansFanout(ctx, execCtx.ExecCfg().DB, execCtx,
[]roachpb.Span{spanToRevert},
replicatedTime,
false, /* ignoreGCThreshold */
revertccl.RevertDefaultBatchSize,
nil, /* onCompletedCallback */
); err != nil {
return err
}

if err := ingestionJob.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
md.Progress.GetStreamIngest().InitialRevertRequired = false
ju.UpdateProgress(md.Progress)
updateRunningStatusInternal(md, ju, jobspb.InitializingReplication, string(msg))
return nil
}); err != nil {
return errors.Wrap(err, "failed to update job progress")
}
} else {
updateRunningStatus(ctx, ingestionJob, jobspb.InitializingReplication, msg)
}

client, err := connectToActiveClient(ctx, ingestionJob, execCtx.ExecCfg().InternalDB,
streamclient.WithStreamID(streamID))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,7 @@ func TestTenantStreamingFailback(t *testing.T) {
sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f STOP SERVICE")
waitUntilTenantServerStopped(t, serverA.SystemLayer(), "f")
t.Logf("starting replication g->f")
sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f RESET DATA TO SYSTEM TIME $1::decimal", ts1)
sqlA.Exec(t, fmt.Sprintf("CREATE VIRTUAL CLUSTER f FROM REPLICATION OF g ON $1 WITH RESUME TIMESTAMP = '%s'", ts1), serverBURL.String())
sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f START REPLICATION OF g ON $1", serverBURL.String())
_, consumerFJobID := replicationtestutils.GetStreamJobIds(t, ctx, sqlA, roachpb.TenantName("f"))
t.Logf("waiting for f@%s", ts2)
replicationtestutils.WaitUntilReplicatedTime(t,
Expand Down Expand Up @@ -281,8 +280,7 @@ func TestTenantStreamingFailback(t *testing.T) {
sqlB.Exec(t, "ALTER VIRTUAL CLUSTER g STOP SERVICE")
waitUntilTenantServerStopped(t, serverB.SystemLayer(), "g")
t.Logf("starting replication f->g")
sqlB.Exec(t, "ALTER VIRTUAL CLUSTER g RESET DATA TO SYSTEM TIME $1::decimal", ts3)
sqlB.Exec(t, fmt.Sprintf("CREATE VIRTUAL CLUSTER g FROM REPLICATION OF f ON $1 WITH RESUME TIMESTAMP = '%s'", ts3), serverAURL.String())
sqlB.Exec(t, "ALTER VIRTUAL CLUSTER g START REPLICATION OF f ON $1", serverAURL.String())
_, consumerGJobID = replicationtestutils.GetStreamJobIds(t, ctx, sqlB, roachpb.TenantName("g"))
t.Logf("waiting for g@%s", ts3)
replicationtestutils.WaitUntilReplicatedTime(t,
Expand Down
Loading

0 comments on commit e2ad8d7

Please sign in to comment.