Skip to content

Commit

Permalink
Improve binary protocol compatibility for memcached.
Browse files Browse the repository at this point in the history
* Support CAS operations for Delete, Increment, Decrement,
  Append, and Prepend in addition to Set/Add/Replace commands.
* Return CAS unique value in response for Touch.
  • Loading branch information
ymmt2005 committed Dec 26, 2013
1 parent 8f0a249 commit ab9de63
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 36 deletions.
64 changes: 40 additions & 24 deletions src/sockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,14 @@ void memcache_socket::cmd_bin(const memcache::binary_request& cmd) {
if( obj.expired() ) {
if( cmd.cas_unique() != 0 ||
cmd.command() == binary_command::Replace ||
cmd.command() == binary_command::ReplaceQ )
return false;
cmd.command() == binary_command::ReplaceQ ) {
r.error( binary_status::NotFound );
return true;
}
} else if( cmd.command() == binary_command::Add ||
cmd.command() == binary_command::AddQ ) {
return false;
r.error( binary_status::Exists );
return true;
}
if( cmd.cas_unique() != 0 &&
cmd.cas_unique() != obj.cas_unique() ) {
Expand Down Expand Up @@ -284,13 +287,8 @@ void memcache_socket::cmd_bin(const memcache::binary_request& cmd) {
return std::move(o);
};
}
if( ! m_hash.apply(cybozu::hash_key(p, len), h, c) ) {
if( cmd.cas_unique() != 0 ) {
r.error( binary_status::NotFound );
} else {
r.error( binary_status::NotStored );
}
}
if( ! m_hash.apply(cybozu::hash_key(p, len), h, c) )
r.error( binary_status::NotFound );
break;
case binary_command::RaU:
case binary_command::RaUQ:
Expand Down Expand Up @@ -325,13 +323,8 @@ void memcache_socket::cmd_bin(const memcache::binary_request& cmd) {
repl_object(m_slaves, k, obj);
return true;
};
if( ! m_hash.apply(cybozu::hash_key(p, len), h, c) ) {
if( cmd.cas_unique() != 0 ) {
r.error( binary_status::NotFound );
} else {
r.error( binary_status::NotStored );
}
}
if( ! m_hash.apply(cybozu::hash_key(p, len), h, c) )
r.error( binary_status::NotFound );
break;
case binary_command::Append:
case binary_command::AppendQ:
Expand All @@ -342,12 +335,17 @@ void memcache_socket::cmd_bin(const memcache::binary_request& cmd) {
return true;
}
if( obj.expired() ) return false;
if( cmd.cas_unique() != 0 &&
cmd.cas_unique() != obj.cas_unique() ) {
r.error( binary_status::Exists );
return true;
}
const char* p2;
std::size_t len2;
std::tie(p2, len2) = cmd.data();
obj.append(p2, len2);
if( ! cmd.quiet() )
r.success();
r.set( obj.cas_unique() );
if( ! m_slaves.empty() )
repl_object(m_slaves, k, obj);
return true;
Expand All @@ -364,12 +362,17 @@ void memcache_socket::cmd_bin(const memcache::binary_request& cmd) {
return true;
}
if( obj.expired() ) return false;
if( cmd.cas_unique() != 0 &&
cmd.cas_unique() != obj.cas_unique() ) {
r.error( binary_status::Exists );
return true;
}
const char* p2;
std::size_t len2;
std::tie(p2, len2) = cmd.data();
obj.prepend(p2, len2);
if( ! cmd.quiet() )
r.success();
r.set( obj.cas_unique() );
if( ! m_slaves.empty() )
repl_object(m_slaves, k, obj);
return true;
Expand All @@ -385,6 +388,11 @@ void memcache_socket::cmd_bin(const memcache::binary_request& cmd) {
r.error( binary_status::Locked );
return false;
}
if( cmd.cas_unique() != 0 &&
cmd.cas_unique() != obj.cas_unique() ) {
r.error( binary_status::Exists );
return false;
}
if( obj.locked_by_self() )
remove_lock(k);
if( ! cmd.quiet() )
Expand All @@ -406,6 +414,11 @@ void memcache_socket::cmd_bin(const memcache::binary_request& cmd) {
return true;
}
if( obj.expired() ) return false;
if( cmd.cas_unique() != 0 &&
cmd.cas_unique() != obj.cas_unique() ) {
r.error( binary_status::Exists );
return true;
}
try {
std::uint64_t n = obj.incr( cmd.value() );
if( ! cmd.quiet() )
Expand Down Expand Up @@ -439,6 +452,11 @@ void memcache_socket::cmd_bin(const memcache::binary_request& cmd) {
return true;
}
if( obj.expired() ) return false;
if( cmd.cas_unique() != 0 &&
cmd.cas_unique() != obj.cas_unique() ) {
r.error( binary_status::Exists );
return true;
}
try {
std::uint64_t n = obj.decr( cmd.value() );
if( ! cmd.quiet() )
Expand All @@ -465,18 +483,16 @@ void memcache_socket::cmd_bin(const memcache::binary_request& cmd) {
break;
case binary_command::Touch:
std::tie(p, len) = cmd.key();
h = [this,&cmd](const cybozu::hash_key& k, object& obj) -> bool {
h = [this,&cmd,&r](const cybozu::hash_key& k, object& obj) -> bool {
if( obj.expired() ) return false;
obj.touch( cmd.exptime() );
if( ! m_slaves.empty() )
repl_object(m_slaves, k, obj, false);
r.set( obj.cas_unique() );
return true;
};
if( m_hash.apply(cybozu::hash_key(p, len), h, c) ) {
r.success();
} else {
if( ! m_hash.apply(cybozu::hash_key(p, len), h, c) )
r.error( binary_status::NotFound );
}
break;
case binary_command::Lock:
case binary_command::LockQ:
Expand Down
54 changes: 42 additions & 12 deletions test/protocol_binary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,48 +340,50 @@ class client {

void incr(const std::string& key, std::uint64_t value, bool q,
std::uint32_t expire = ~(std::uint32_t)0,
std::uint64_t initial = 0) {
std::uint64_t initial = 0, std::uint64_t cas = 0) {
char extra[20];
cybozu::hton(value, extra);
cybozu::hton(initial, &extra[8]);
cybozu::hton(expire, &extra[16]);
request req(q ? binary_command::IncrementQ : binary_command::Increment,
(std::uint16_t)key.size(), key.data(),
sizeof(extra), extra, 0, nullptr);
sizeof(extra), extra, 0, nullptr, nullptr, cas);
send(req.data(), req.length());
}

void decr(const std::string& key, std::uint64_t value, bool q,
std::uint32_t expire = ~(std::uint32_t)0,
std::uint64_t initial = 0) {
std::uint64_t initial = 0, std::uint64_t cas = 0) {
char extra[20];
cybozu::hton(value, extra);
cybozu::hton(initial, &extra[8]);
cybozu::hton(expire, &extra[16]);
request req(q ? binary_command::DecrementQ : binary_command::Decrement,
(std::uint16_t)key.size(), key.data(),
sizeof(extra), extra, 0, nullptr);
sizeof(extra), extra, 0, nullptr, nullptr, cas);
send(req.data(), req.length());
}

void append(const std::string& key, const std::string& value, bool q) {
void append(const std::string& key, const std::string& value, bool q,
std::uint64_t cas = 0) {
request req(q ? binary_command::AppendQ : binary_command::Append,
(std::uint16_t)key.size(), key.data(), 0, nullptr,
(std::uint32_t)value.size(), value.data());
(std::uint32_t)value.size(), value.data(), nullptr, cas);
send(req.data(), req.length());
}

void prepend(const std::string& key, const std::string& value, bool q) {
void prepend(const std::string& key, const std::string& value, bool q,
std::uint64_t cas = 0) {
request req(q ? binary_command::PrependQ : binary_command::Prepend,
(std::uint16_t)key.size(), key.data(), 0, nullptr,
(std::uint32_t)value.size(), value.data());
(std::uint32_t)value.size(), value.data(), nullptr, cas);
send(req.data(), req.length());
}

void remove(const std::string& key, bool q) {
void remove(const std::string& key, bool q, std::uint64_t cas = 0) {
request req(q ? binary_command::DeleteQ : binary_command::Delete,
(std::uint16_t)key.size(), key.data(),
0, nullptr, 0, nullptr);
0, nullptr, 0, nullptr, nullptr, cas);
send(req.data(), req.length());
}

Expand Down Expand Up @@ -604,15 +606,19 @@ AUTOTEST(touch) {
cybozu_assert( c.get_response(r) );
ASSERT_COMMAND(r, Touch);
ASSERT_OK(r);
std::uint64_t cas = r.cas_unique();
cybozu_assert( cas != 0 );

std::this_thread::sleep_for(std::chrono::seconds(4));
c.get("abc", false);
cybozu_assert( c.get_response(r) );
ASSERT_OK(r);
cybozu_assert( r.cas_unique() == cas );

c.touch("abc", 1);
cybozu_assert( c.get_response(r) );
ASSERT_OK(r);
cybozu_assert( r.cas_unique() == cas );

std::this_thread::sleep_for(std::chrono::seconds(3));
c.get("abc", false);
Expand All @@ -628,7 +634,7 @@ AUTOTEST(add) {
c.add("abc", "123", true, 0, 0);
cybozu_assert( c.get_response(r) );
ASSERT_COMMAND(r, AddQ);
cybozu_assert( r.status() == binary_status::NotStored );
cybozu_assert( r.status() == binary_status::Exists );

c.remove("abc", true);
c.add("abc", "123", false, 0, 0);
Expand All @@ -644,7 +650,7 @@ AUTOTEST(replace) {
c.replace("not exist", "hoge", true, 100, 0);
cybozu_assert( c.get_response(r) );
ASSERT_COMMAND(r, ReplaceQ);
cybozu_assert( r.status() == binary_status::NotStored );
cybozu_assert( r.status() == binary_status::NotFound );

c.set("abc", "def", true, 10, 0);
c.replace("abc", "123", false, 100, 0);
Expand Down Expand Up @@ -742,18 +748,25 @@ AUTOTEST(incr_decr) {
ASSERT_OK(r);
cybozu_assert( r.value() == 12 );
cybozu_assert( r.flags() == 0 );
std::uint64_t cas = r.cas_unique();
cybozu_assert( cas != 0 );

c.incr("abc", 10, false);
cybozu_assert( c.get_response(r) );
ASSERT_COMMAND(r, Increment);
ASSERT_OK(r);
cybozu_assert( r.value() == 22 );
cybozu_assert( r.cas_unique() != 0 );
cybozu_assert( cas != r.cas_unique() );
cas = r.cas_unique();

c.decr("abc", 1, false);
cybozu_assert( c.get_response(r) );
ASSERT_COMMAND(r, Decrement);
ASSERT_OK(r);
cybozu_assert( r.value() == 21 );
cybozu_assert( r.cas_unique() != 0 );
cybozu_assert( cas != r.cas_unique() );

c.decr("abc", 100, false);
cybozu_assert( c.get_response(r) );
Expand Down Expand Up @@ -796,6 +809,8 @@ AUTOTEST(append_prepend) {
cybozu_assert( c.get_response(r) );
ASSERT_COMMAND(r, Append);
ASSERT_OK(r);
std::uint64_t cas = r.cas_unique();
cybozu_assert( cas != 0 );

c.get("ttt", false);
cybozu_assert( c.get_response(r) );
Expand All @@ -806,6 +821,8 @@ AUTOTEST(append_prepend) {
cybozu_assert( c.get_response(r) );
ASSERT_COMMAND(r, Prepend);
ASSERT_OK(r);
cybozu_assert( r.cas_unique() != 0 );
cybozu_assert( cas != r.cas_unique() );

c.get("ttt", false);
cybozu_assert( c.get_response(r) );
Expand All @@ -829,6 +846,19 @@ AUTOTEST(delete) {
ASSERT_COMMAND(r, Delete);
ASSERT_OK(r);

c.set("abc", "def", false, 10, 0);
cybozu_assert( c.get_response(r) );
ASSERT_OK(r);
std::uint64_t cas = r.cas_unique();
c.remove("abc", true, cas + 1);
cybozu_assert( c.get_response(r) );
ASSERT_COMMAND(r, DeleteQ);
cybozu_assert( r.status() == binary_status::Exists );
c.remove("abc", false, cas);
cybozu_assert( c.get_response(r) );
ASSERT_COMMAND(r, Delete);
ASSERT_OK(r);

c.remove("abc", false);
cybozu_assert( c.get_response(r) );
ASSERT_COMMAND(r, Delete);
Expand Down

0 comments on commit ab9de63

Please sign in to comment.