Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VStreamer: For larger compressed transaction payloads, stream the internal contents #17239

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

mattlord
Copy link
Contributor

@mattlord mattlord commented Nov 15, 2024

Description

For larger compressed transaction payloads (> ZstdInMemoryDecompressorMaxSize) we were already streaming the internal events as we decompressed the payload, but in the vstreamer we were still reading the entire contents into memory before sending them to the consumer (vplayer).

In this PR, we stream the internal contents all the way from the binlog consumer to the vstream consumer so that we do not need to hold the entire contents, which can be 10s or even 100s of GiBs, in memory all at once. As you can see in the test/demonstration below, we allocate and use DRASTICALLY less memory when processing the payloads: in this case approximately 14-18 times less.

Here's a manual test/demonstration on macOS (note that we end up with over 40 million rows in the customer table):

make build 

cd examples/local
alias vtctldclient='command vtctldclient --server=localhost:15999'

# Setup function to wait for the copy phase to complete
function wait_for_workflow_running() {
    local keyspace=customer
    local workflow=commerce2customer
    local wait_secs=900
    local result=""

    echo "Waiting for the ${workflow} workflow in the ${keyspace} keyspace to finish the copy phase..."

    for _ in $(seq 1 ${wait_secs}); do
        result=$(vtctldclient Workflow --keyspace="${keyspace}" show --workflow="${workflow}" 2>/dev/null | grep "Copy phase completed")
        if [[ ${result} != "" ]]; then
            break
        fi
        sleep 1
    done;

    if [[ ${result} == "" ]]; then
        echo "Timed out after ${wait_secs} seconds waiting for the ${workflow} workflow in the ${keyspace} keyspace to reach the running state"
    else
        echo "The ${workflow} workflow in the ${keyspace} keyspace is now running. $(sed -rn 's/.*"(Copy phase.*)".*/\1/p' <<< "${result}")."
    fi
}


./101_initial_cluster.sh; ./201_customer_tablets.sh


# Enable binlog transaction compression
for uid in $(vtctldclient GetTablets | awk '{print $1}' | cut -d- -f2 | bc); do
  command mysql -u root --socket "${VTDATAROOT}/vt_0000000${uid}/mysql.sock" -e "stop replica; set global  binlog_transaction_compression=ON; start replica;"
done


# Load data in the customer table
commerce_primary_uid=$(vtctldclient GetTablets --keyspace commerce --tablet-type primary --shard "0" | awk '{print $1}' | cut -d- -f2 | bc)
table_file="${VTDATAROOT}/vt_0000000${commerce_primary_uid}/data/vt_commerce/customer.ibd"

# Generate 5MiB of initial data
size=$((5*1024*1024))
while [[ $(stat -f "%z" "${table_file}") -lt ${size} ]]; do
    command mysql -u root --socket "${VTDATAROOT}/vt_0000000${commerce_primary_uid}/mysql.sock" vt_commerce -e "insert into customer (customer_id, email) values (${RANDOM}*${RANDOM}, '${RANDOM}[email protected]')" 2> /dev/null
done

say "Initial data load completed"

# Grow that to at least 2GiB
size=$((2*1024*1024*1024))
i=1
while [[ $(stat -f "%z" "${table_file}") -lt ${size} ]]; do
    command mysql -u root --socket "${VTDATAROOT}/vt_0000000${commerce_primary_uid}/mysql.sock" vt_commerce -e "insert into customer (email) select concat(${i}, email) from customer limit 5000000"
    let i=i+1
done

say "Full data load completed"

# Move the customer table from commerce to customer
vtctldclient MoveTables --workflow commerce2customer --target-keyspace customer create --source-keyspace commerce --tables "customer,corder" --tablet-types="primary"
wait_for_workflow_running

say "Workflow is running"

rm /tmp/vttablet-sample

sample $(cat ${VTDATAROOT}/vt_0000000${commerce_primary_uid}/vttablet.pid) 600 -file /tmp/vttablet-sample &

command mysql -u root --socket "${VTDATAROOT}/vt_0000000${commerce_primary_uid}/mysql.sock" vt_commerce -e "update customer set email = concat('update1_', email) limit 10000000"

say "Bulk update complete"


# Wait for the transaction payload event to be processed
while [[ $(curl -s "http://localhost:15${commerce_primary_uid}/debug/vars" | jq -r '.CompressedTransactionPayloadsViaStream') -ne 1 ]]; do
    sleep 1
done
# Wait for the workflow to catch up
sleep 5
while [[ $(vtctldclient MoveTables --target-keyspace=customer --workflow=commerce2customer show --compact --include-logs=false | jq -r '.workflows[0].max_v_replication_transaction_lag') -gt 1 ]]; do
    sleep 1
done
say "Bulk update has been successfully replicated"

kill -INT %1
grep "Physical footprint" /tmp/vttablet-sample
rm /tmp/vttablet-sample

say "Update test complete"


sample $(cat ${VTDATAROOT}/vt_0000000${commerce_primary_uid}/vttablet.pid) 1200 -file /tmp/vttablet-sample &

command mysql -u root --socket "${VTDATAROOT}/vt_0000000${commerce_primary_uid}/mysql.sock" vt_commerce -e "update customer set email = concat('update2_', email) limit 20000000"

say "Bulk update two complete"


# Wait for the transaction payload event to be processed
while [[ $(curl -s "http://localhost:15${commerce_primary_uid}/debug/vars" | jq -r '.CompressedTransactionPayloadsViaStream') -ne 2 ]]; do
    sleep 1
done
# Wait for the workflow to catch up
sleep 5
while [[ $(vtctldclient MoveTables --target-keyspace=customer --workflow=commerce2customer show --compact --include-logs=false | jq -r '.workflows[0].max_v_replication_transaction_lag')  -gt 1 ]]; do
    sleep 1
done
say "Bulk update has been successfully replicated"

kill -INT %1
grep "Physical footprint" /tmp/vttablet-sample
rm /tmp/vttablet-sample

say "Update test two complete"


# Do a vdiff to be sure everything was replicated correctly
vtctldclient vdiff --target-keyspace customer --workflow commerce2customer create --tablet-types=primary --wait

Results on the PR branch:

update1:
Physical footprint:         310.5M
Physical footprint (peak):  310.5M

update2:
Physical footprint:         586.5M
Physical footprint (peak):  747.3M

Results on the main branch:

update1:
Physical footprint:         5.1G
Physical footprint (peak):  5.7G

update2:
Physical footprint:         10.3G
Physical footprint (peak):  10.9G

Related Issue(s)

Checklist

  • "Backport to:" labels have been added if this change should be back-ported to release branches
  • If this change is to be back-ported to previous releases, a justification is included in the PR description
  • Tests were added or are not required
  • Did the new or modified tests pass consistently locally and on CI?
  • Documentation was added or is not required

For larger payloads (> ZstdInMemoryDecompressorMaxSize) we were already
streaming the internal events as we decompressed the payload, but in
the vstreamer we were still reading the entire contents into memory
before sending them to the consumer (vplayer).

With this we stream the internal contents all the way from the binlog
consumer to the vstream consumer so that we do not need to hold the
entire contents, which can be 10s of GiBs, in memory all at once.

Signed-off-by: Matt Lord <[email protected]>
Copy link
Contributor

vitess-bot bot commented Nov 15, 2024

Review Checklist

Hello reviewers! 👋 Please follow this checklist when reviewing this Pull Request.

General

  • Ensure that the Pull Request has a descriptive title.
  • Ensure there is a link to an issue (except for internal cleanup and flaky test fixes), new features should have an RFC that documents use cases and test cases.

Tests

  • Bug fixes should have at least one unit or end-to-end test, enhancement and new features should have a sufficient number of tests.

Documentation

  • Apply the release notes (needs details) label if users need to know about this change.
  • New features should be documented.
  • There should be some code comments as to why things are implemented the way they are.
  • There should be a comment at the top of each new or modified test to explain what the test does.

New flags

  • Is this flag really necessary?
  • Flag names must be clear and intuitive, use dashes (-), and have a clear help text.

If a workflow is added or modified:

  • Each item in Jobs should be named in order to mark it as required.
  • If the workflow needs to be marked as required, the maintainer team must be notified.

Backward compatibility

  • Protobuf changes should be wire-compatible.
  • Changes to _vt tables and RPCs need to be backward compatible.
  • RPC changes should be compatible with vitess-operator
  • If a flag is removed, then it should also be removed from vitess-operator and arewefastyet, if used there.
  • vtctl command output order should be stable and awk-able.

@vitess-bot vitess-bot bot added NeedsBackportReason If backport labels have been applied to a PR, a justification is required NeedsDescriptionUpdate The description is not clear or comprehensive enough, and needs work NeedsIssue A linked issue is missing for this Pull Request NeedsWebsiteDocsUpdate What it says labels Nov 15, 2024
@mattlord mattlord changed the title For larger payloads, stream the internal contents VStreamer: For larger payloads, stream the internal contents Nov 15, 2024
@mattlord mattlord changed the title VStreamer: For larger payloads, stream the internal contents VStreamer: For larger compressed transaction payloads, stream the internal contents Nov 15, 2024
@github-actions github-actions bot added this to the v22.0.0 milestone Nov 15, 2024
@mattlord mattlord removed the NeedsBackportReason If backport labels have been applied to a PR, a justification is required label Nov 15, 2024
Copy link

codecov bot commented Nov 15, 2024

Codecov Report

Attention: Patch coverage is 16.66667% with 20 lines in your changes missing coverage. Please review.

Project coverage is 67.39%. Comparing base (216fd70) to head (0bc59bc).

Files with missing lines Patch % Lines
go/vt/vttablet/tabletserver/vstreamer/vstreamer.go 9.09% 20 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main   #17239   +/-   ##
=======================================
  Coverage   67.39%   67.39%           
=======================================
  Files        1570     1570           
  Lines      252917   252937   +20     
=======================================
+ Hits       170446   170460   +14     
- Misses      82471    82477    +6     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@mattlord mattlord removed NeedsDescriptionUpdate The description is not clear or comprehensive enough, and needs work NeedsWebsiteDocsUpdate What it says NeedsIssue A linked issue is missing for this Pull Request labels Nov 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant