Skip to content

Commit

Permalink
base tests on raft_fixture
Browse files Browse the repository at this point in the history
tbc
  • Loading branch information
bashtanov committed May 16, 2024
1 parent 836e3c9 commit d78899a
Show file tree
Hide file tree
Showing 7 changed files with 461 additions and 358 deletions.
2 changes: 1 addition & 1 deletion src/v/cluster/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ set(srcs
topic_table_partition_generator_test.cc
idempotency_tests.cc
feature_barrier_test.cc
tm_stm_tests.cc
tm_coordinator_mapper_tests.cc
tx_hash_ranges_tests.cc
rm_stm_tests.cc
Expand All @@ -52,6 +51,7 @@ endforeach()
set(srcs
distributed_kv_stm_tests.cc
id_allocator_stm_test.cc
tm_stm_tests.cc
)

foreach(cluster_test_src ${srcs})
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/tests/distributed_kv_stm_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ using test_value = int;
using stm_t = cluster::distributed_kv_stm<test_key, test_value>;
using stm_cssshptrr_t = const ss::shared_ptr<stm_t>&;

struct kv_stm_fixture : stm_raft_fixture<kv_stm_fixture, stm_t> {
struct kv_stm_fixture : stm_raft_fixture<stm_t> {
static constexpr auto TIMEOUT = 30s;
//
stm_shptrs_t create_stms(
Expand Down
3 changes: 1 addition & 2 deletions src/v/cluster/tests/id_allocator_stm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ namespace {
ss::logger idstmlog{"idstm-test"};
using stm_ssshptr_t = const ss::shared_ptr<cluster::id_allocator_stm>&;

struct id_allocator_stm_fixture
: stm_raft_fixture<id_allocator_stm_fixture, cluster::id_allocator_stm> {
struct id_allocator_stm_fixture : stm_raft_fixture<cluster::id_allocator_stm> {
//
id_allocator_stm_fixture() {
test_local_cfg.get("id_allocator_batch_size").set_value(int16_t(1));
Expand Down
304 changes: 179 additions & 125 deletions src/v/cluster/tests/tm_stm_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,182 +19,236 @@
#include "model/timestamp.h"
#include "raft/consensus_utils.h"
#include "raft/tests/raft_group_fixture.h"
#include "raft/tests/simple_raft_fixture.h"
#include "random/generators.h"
#include "storage/record_batch_builder.h"
#include "storage/tests/utils/disk_log_builder.h"
#include "test_utils/async.h"
#include "test_utils/test.h"
#include "tests/raft_fixture.h"

#include <seastar/core/future.hh>
#include <seastar/core/sstring.hh>
#include <seastar/util/defer.hh>

#include <gtest/gtest.h>

#include <cstdint>
#include <system_error>

static ss::logger tm_logger{"tm_stm-test"};
namespace {
using namespace raft;

ss::logger tm_logger{"tm_stm-test"};

struct tm_cache_struct {
tm_cache_struct() { cache = ss::make_lw_shared<cluster::tm_stm_cache>(); }

ss::lw_shared_ptr<cluster::tm_stm_cache> cache;
};

using op_status = cluster::tm_stm::op_status;
using stm_t = cluster::tm_stm;
using stm_cssshptrr_t = const ss::shared_ptr<stm_t>&;
using op_status = stm_t::op_status;
using tm_transaction = cluster::tm_transaction;
using tx_status = cluster::tm_transaction::tx_status;
using partitions_t = std::vector<tm_transaction::tx_partition>;

ss::future<> check_tx(
const checked<tm_transaction, op_status>& res,
kafka::transactional_id tx_id) {
ASSERT_TRUE_CORO(res);
ASSERT_EQ_CORO(res.assume_value().id, tx_id);
}

ss::future<> assert_success(op_status status) {
ASSERT_EQ_CORO(status, op_status::success);
}

ss::future<tm_transaction> expect_tx(
checked<tm_transaction, op_status>&& res, kafka::transactional_id tx_id) {
auto res_mv = std::move(res);
co_await check_tx(res_mv, tx_id);
auto ret = std::move(res_mv).value();
co_return ret;
}

static tm_transaction expect_tx(checked<tm_transaction, op_status> maybe_tx) {
BOOST_REQUIRE(maybe_tx.has_value());
return maybe_tx.value();
auto expect_tx(kafka::transactional_id tx_id) {
return [tx_id](checked<tm_transaction, op_status>&& res)
-> ss::future<tm_transaction> {
return expect_tx(std::move(res), tx_id);
};
}
struct tm_stm_test_fixture : simple_raft_fixture {
void create_stm_and_start_raft() {
create_raft();
raft::state_machine_manager_builder stm_m_builder;

_stm = stm_m_builder.create_stm<cluster::tm_stm>(
struct tm_stm_test_fixture : stm_raft_fixture<stm_t> {
static constexpr std::chrono::milliseconds TIMEOUT = 30s;

stm_shptrs_t create_stms(
state_machine_manager_builder& builder, raft_node_instance& node) {
return builder.create_stm<stm_t>(
tm_logger,
_raft.get(),
std::ref(_feature_table),
std::ref(tm_cache.cache));
node.raft().get(),
node.get_feature_table(),
tm_cache.cache);
}

template<class Func>
auto retry_with_term(Func&& func) {
return retry_with_leader(
model::timeout_clock::now() + TIMEOUT,
[this,
func = std::forward<Func>(func)](raft_node_instance& leader_node) {
auto stm = get_stm<0>(leader_node);
auto term = leader_node.raft()->term();
return func(stm, term);
});
}

ss::future<> register_new_producer(
kafka::transactional_id tx_id, model::producer_identity pid //
) {
return retry_with_term(
[tx_id, pid](stm_cssshptrr_t stm, model::term_id term) {
return stm->register_new_producer(term, tx_id, 0s, pid);
})
.then(assert_success);
}

_raft->start(std::move(stm_m_builder)).get();
_started = true;
ss::future<> add_partitions(
kafka::transactional_id tx_id, const partitions_t& partitions //
) {
return retry_with_term([tx_id, &partitions](
stm_cssshptrr_t stm, model::term_id term) {
return stm->add_partitions(term, tx_id, partitions);
})
.then(assert_success);
}

ss::future<tm_transaction> get_tx(kafka::transactional_id tx_id) {
return stm_retry_with_leader<0>(
TIMEOUT,
[tx_id](stm_cssshptrr_t stm) { return stm->get_tx(tx_id); })
.then(expect_tx(tx_id));
}

ss::future<tm_transaction> mark_tx_ongoing(kafka::transactional_id tx_id) {
return retry_with_term(
[tx_id](stm_cssshptrr_t stm, model::term_id term)
-> ss::future<checked<tm_transaction, op_status>> {
return stm->mark_tx_ongoing(term, tx_id);
})
.then(expect_tx(tx_id));
}

ss::future<tm_transaction> mark_tx_prepared(kafka::transactional_id tx_id) {
return retry_with_term(
[tx_id](stm_cssshptrr_t stm, model::term_id term)
-> ss::future<checked<tm_transaction, op_status>> {
return stm->mark_tx_prepared(term, tx_id);
})
.then(expect_tx(tx_id));
}

ss::shared_ptr<cluster::tm_stm> _stm;
tm_cache_struct tm_cache;
};

FIXTURE_TEST(test_tm_stm_new_tx, tm_stm_test_fixture) {
create_stm_and_start_raft();
auto& stm = *_stm;

wait_for_confirmed_leader();
wait_for_meta_initialized();

auto tx_id = kafka::transactional_id("app-id-1");
auto pid = model::producer_identity{1, 0};

auto op_code = stm
.register_new_producer(
_raft->term(), tx_id, std::chrono::milliseconds(0), pid)
.get0();
BOOST_REQUIRE_EQUAL(op_code, op_status::success);
auto tx1 = expect_tx(stm.get_tx(tx_id).get0());
BOOST_REQUIRE_EQUAL(tx1.id, tx_id);
BOOST_REQUIRE_EQUAL(tx1.pid, pid);
BOOST_REQUIRE_EQUAL(tx1.status, tx_status::ready);
BOOST_REQUIRE_EQUAL(tx1.partitions.size(), 0);
expect_tx(stm.mark_tx_ongoing(_raft->term(), tx_id).get0());
TEST_F_CORO(tm_stm_test_fixture, test_tm_stm_new_tx) {
co_await initialize_state_machines();

kafka::transactional_id tx_id("app-id-1");
model::producer_identity pid{1, 0};

co_await register_new_producer(tx_id, pid);

auto tx1 = co_await get_tx(tx_id);
ASSERT_EQ_CORO(tx1.pid, pid);
ASSERT_EQ_CORO(tx1.status, tx_status::ready);
ASSERT_EQ_CORO(tx1.partitions.size(), 0);
co_await mark_tx_ongoing(tx_id);

std::vector<tm_transaction::tx_partition> partitions = {
tm_transaction::tx_partition{
.ntp = model::ntp("kafka", "topic", 0), .etag = model::term_id(0)},
tm_transaction::tx_partition{
.ntp = model::ntp("kafka", "topic", 1), .etag = model::term_id(0)}};
BOOST_REQUIRE_EQUAL(
stm.add_partitions(_raft->term(), tx_id, partitions).get0(),
cluster::tm_stm::op_status::success);
BOOST_REQUIRE_EQUAL(tx1.partitions.size(), 0);
auto tx2 = expect_tx(stm.get_tx(tx_id).get0());
BOOST_REQUIRE_EQUAL(tx2.id, tx_id);
BOOST_REQUIRE_EQUAL(tx2.pid, pid);
BOOST_REQUIRE_EQUAL(tx2.status, tx_status::ongoing);
BOOST_REQUIRE_GT(tx2.tx_seq, tx1.tx_seq);
BOOST_REQUIRE_EQUAL(tx2.partitions.size(), 2);
auto tx4 = expect_tx(stm.mark_tx_prepared(_raft->term(), tx_id).get());
BOOST_REQUIRE_EQUAL(tx4.id, tx_id);
BOOST_REQUIRE_EQUAL(tx4.pid, pid);
BOOST_REQUIRE_EQUAL(tx4.status, tx_status::prepared);
BOOST_REQUIRE_EQUAL(tx4.tx_seq, tx2.tx_seq);
BOOST_REQUIRE_EQUAL(tx4.partitions.size(), 2);
auto tx5 = expect_tx(stm.mark_tx_ongoing(_raft->term(), tx_id).get0());
BOOST_REQUIRE_EQUAL(tx5.id, tx_id);
BOOST_REQUIRE_EQUAL(tx5.pid, pid);
BOOST_REQUIRE_EQUAL(tx5.status, tx_status::ongoing);
BOOST_REQUIRE_GT(tx5.tx_seq, tx2.tx_seq);
BOOST_REQUIRE_EQUAL(tx5.partitions.size(), 0);
co_await add_partitions(tx_id, partitions);

ASSERT_EQ_CORO(tx1.partitions.size(), 0);

auto tx2 = co_await get_tx(tx_id);
ASSERT_EQ_CORO(tx2.pid, pid);
ASSERT_EQ_CORO(tx2.status, tx_status::ongoing);
ASSERT_GT_CORO(tx2.tx_seq, tx1.tx_seq);
ASSERT_EQ_CORO(tx2.partitions.size(), 2);

auto tx4 = co_await mark_tx_prepared(tx_id);
ASSERT_EQ_CORO(tx4.pid, pid);
ASSERT_EQ_CORO(tx4.status, tx_status::prepared);
ASSERT_EQ_CORO(tx4.tx_seq, tx4.tx_seq);
ASSERT_EQ_CORO(tx4.partitions.size(), 2);

auto tx5 = co_await mark_tx_ongoing(tx_id);
ASSERT_EQ_CORO(tx5.pid, pid);
ASSERT_EQ_CORO(tx5.status, tx_status::ongoing);
ASSERT_GT_CORO(tx5.tx_seq, tx4.tx_seq);
ASSERT_EQ_CORO(tx5.partitions.size(), 0);
}

FIXTURE_TEST(test_tm_stm_seq_tx, tm_stm_test_fixture) {
create_stm_and_start_raft();
auto& stm = *_stm;
TEST_F_CORO(tm_stm_test_fixture, test_tm_stm_seq_tx) {
co_await initialize_state_machines();

wait_for_confirmed_leader();
wait_for_meta_initialized();
kafka::transactional_id tx_id("app-id-1");
model::producer_identity pid{1, 0};

auto tx_id = kafka::transactional_id("app-id-1");
auto pid = model::producer_identity{1, 0};
co_await register_new_producer(tx_id, pid);

auto op_code = stm
.register_new_producer(
_raft->term(), tx_id, std::chrono::milliseconds(0), pid)
.get0();
BOOST_REQUIRE_EQUAL(op_code, op_status::success);
auto tx1 = expect_tx(stm.get_tx(tx_id).get0());
auto tx2 = stm.mark_tx_ongoing(_raft->term(), tx_id).get0();
std::vector<tm_transaction::tx_partition> partitions = {
auto tx1 = co_await get_tx(tx_id);
co_await mark_tx_ongoing(tx_id);

partitions_t partitions = {
tm_transaction::tx_partition{
.ntp = model::ntp("kafka", "topic", 0), .etag = model::term_id(0)},
tm_transaction::tx_partition{
.ntp = model::ntp("kafka", "topic", 1), .etag = model::term_id(0)}};
BOOST_REQUIRE_EQUAL(
stm.add_partitions(_raft->term(), tx_id, partitions).get0(),
cluster::tm_stm::op_status::success);
auto tx3 = expect_tx(stm.get_tx(tx_id).get0());
auto tx5 = expect_tx(stm.mark_tx_prepared(_raft->term(), tx_id).get());
auto tx6 = expect_tx(stm.mark_tx_ongoing(_raft->term(), tx_id).get0());
BOOST_REQUIRE_EQUAL(tx6.id, tx_id);
BOOST_REQUIRE_EQUAL(tx6.pid, pid);
BOOST_REQUIRE_EQUAL(tx6.status, tx_status::ongoing);
BOOST_REQUIRE_EQUAL(tx6.partitions.size(), 0);
BOOST_REQUIRE_NE(tx6.tx_seq, tx1.tx_seq);
co_await add_partitions(tx_id, partitions);
co_await get_tx(tx_id);
co_await mark_tx_prepared(tx_id);
auto tx6 = co_await mark_tx_ongoing(tx_id);
ASSERT_EQ_CORO(tx6.pid, pid);
ASSERT_EQ_CORO(tx6.status, tx_status::ongoing);
ASSERT_EQ_CORO(tx6.partitions.size(), 0);
ASSERT_NE_CORO(tx6.tx_seq, tx1.tx_seq);
}

FIXTURE_TEST(test_tm_stm_re_tx, tm_stm_test_fixture) {
create_stm_and_start_raft();
auto& stm = *_stm;
TEST_F_CORO(tm_stm_test_fixture, test_tm_stm_re_tx) {
co_await initialize_state_machines();

wait_for_confirmed_leader();
wait_for_meta_initialized();
kafka::transactional_id tx_id("app-id-1");
model::producer_identity pid1{1, 0};

auto tx_id = kafka::transactional_id("app-id-1");
auto pid1 = model::producer_identity{1, 0};
co_await register_new_producer(tx_id, pid1);
co_await get_tx(tx_id);

auto op_code = stm
.register_new_producer(
_raft->term(), tx_id, std::chrono::milliseconds(0), pid1)
.get0();
BOOST_REQUIRE(op_code == op_status::success);
auto tx1 = expect_tx(stm.get_tx(tx_id).get0());
std::vector<tm_transaction::tx_partition> partitions = {
partitions_t partitions = {
tm_transaction::tx_partition{
.ntp = model::ntp("kafka", "topic", 0), .etag = model::term_id(0)},
tm_transaction::tx_partition{
.ntp = model::ntp("kafka", "topic", 1), .etag = model::term_id(0)}};
auto tx2 = stm.mark_tx_ongoing(_raft->term(), tx_id).get0();
BOOST_REQUIRE_EQUAL(
stm.add_partitions(_raft->term(), tx_id, partitions).get0(),
cluster::tm_stm::op_status::success);
auto tx3 = expect_tx(stm.get_tx(tx_id).get0());
auto tx5 = expect_tx(stm.mark_tx_prepared(_raft->term(), tx_id).get());
auto tx6 = expect_tx(stm.mark_tx_ongoing(_raft->term(), tx_id).get0());

auto pid2 = model::producer_identity{1, 1};
auto expected_pid = model::producer_identity(3, 5);
op_code = stm
.re_register_producer(
_raft->term(),
tx_id,
std::chrono::milliseconds(0),
pid2,
expected_pid,
pid1)
.get0();
BOOST_REQUIRE_EQUAL(op_code, op_status::success);
auto tx7 = expect_tx(stm.get_tx(tx_id).get0());
BOOST_REQUIRE_EQUAL(tx7.id, tx_id);
BOOST_REQUIRE_EQUAL(tx7.pid, pid2);
BOOST_REQUIRE_EQUAL(tx7.status, tx_status::ready);
BOOST_REQUIRE_EQUAL(tx7.partitions.size(), 0);
co_await mark_tx_ongoing(tx_id);
co_await add_partitions(tx_id, partitions);
co_await get_tx(tx_id);
co_await mark_tx_prepared(tx_id);
co_await mark_tx_ongoing(tx_id);

model::producer_identity pid2{1, 1};
model::producer_identity expected_pid(3, 5);
co_await retry_with_term([tx_id, pid1, expected_pid, pid2](
stm_cssshptrr_t stm, model::term_id term) {
return stm->re_register_producer(
term, tx_id, 0s, pid2, expected_pid, pid1);
}).then(assert_success);
auto tx7 = co_await get_tx(tx_id);
ASSERT_EQ_CORO(tx7.pid, pid2);
ASSERT_EQ_CORO(tx7.status, tx_status::ready);
ASSERT_EQ_CORO(tx7.partitions.size(), 0);
}
} // namespace
Loading

0 comments on commit d78899a

Please sign in to comment.