Skip to content

Commit

Permalink
libmemcached: add MEMCACHED_BEHAVIOR_META_PROTOCOL
Browse files Browse the repository at this point in the history
  • Loading branch information
m6w6 committed May 18, 2021
1 parent c98c359 commit d02c98c
Show file tree
Hide file tree
Showing 28 changed files with 785 additions and 272 deletions.
14 changes: 10 additions & 4 deletions ChangeLog-1.1.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# ChangeLog v1.1

## v 1.1.0-dev

> released TBD
**Changes from beta3:**

* Initial transparent support for the memcached META protocol:
* Add `MEMCACHED_BEHAVIOR_META_PROTOCOL` enabling the META protocol.
* Add `-m` switch to client programs enabling the META protocol.

## v 1.1.0-beta3

> released 2021-04-15
Expand Down Expand Up @@ -37,10 +47,6 @@

> released 2020-12-21
**NOTE:**
This is a bug fix release, not a feature release. The minor version number
was incremented due to the following changes:

* Ported build system to CMake.
* Ported test suite to Catch2.
* Build requires C++11 compiler support.
Expand Down
20 changes: 16 additions & 4 deletions docs/source/ChangeLog-1.1.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@
ChangeLog v1.1
==============

v 1.1.0-dev
-----------

..
released TBD


**Changes from beta3:**


* Initial transparent support for the memcached META protocol:

* Add ``MEMCACHED_BEHAVIOR_META_PROTOCOL`` enabling the META protocol.
* Add ``-m`` switch to client programs enabling the META protocol.

v 1.1.0-beta3
-------------

Expand Down Expand Up @@ -56,10 +72,6 @@ v 1.1.0-beta1
released 2020-12-21


**NOTE:**\ :raw-html-m2r:`<br>`
This is a bug fix release, not a feature release. The minor version number
was incremented due to the following changes:


* Ported build system to CMake.
* Ported test suite to Catch2.
Expand Down
6 changes: 6 additions & 0 deletions docs/source/libmemcached/memcached_behavior.rst
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ SYNOPSIS
Enable the use of the binary protocol. Please note that you cannot
toggle this flag on an open connection.

.. enumerator:: MEMCACHED_BEHAVIOR_META_PROTOCOL

Enable the use of the META protocol. This setting can be switched on and
off at will when using an ASCII protocol connection, but causes
a reconnect when using the binary protocol.

.. enumerator:: MEMCACHED_BEHAVIOR_IO_MSG_WATERMARK

Set this value to tune the number of messages that may be sent before
Expand Down
2 changes: 1 addition & 1 deletion include/libmemcached-1.0/struct/memcached.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct memcached_st {
bool tcp_keepalive : 1;
bool is_aes : 1;
bool is_fetching_version : 1;
bool not_used : 1;
bool meta_protocol : 1;
} flags;

memcached_server_distribution_t distribution;
Expand Down
1 change: 1 addition & 0 deletions include/libmemcached-1.0/types/behavior.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ enum memcached_behavior_t {
MEMCACHED_BEHAVIOR_REMOVE_FAILED_SERVERS,
MEMCACHED_BEHAVIOR_DEAD_TIMEOUT,
MEMCACHED_BEHAVIOR_SERVER_TIMEOUT_LIMIT,
MEMCACHED_BEHAVIOR_META_PROTOCOL,
MEMCACHED_BEHAVIOR_MAX
};

Expand Down
11 changes: 11 additions & 0 deletions src/bin/common/options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ class client_options {
}
return true;
};
def("meta", 'm', no_argument, "Use the text based meta memcached protocol.")
.apply = [](const client_options &opt_, const extended_option &ext, memcached_st *memc) {
if (MEMCACHED_SUCCESS != memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_META_PROTOCOL, ext.set)) {
if (!opt_.isset("quiet")) {
std::cerr << memcached_last_error_message(memc);
}
return false;
}
return true;
};

def("buffer", 'B', no_argument, "Buffer requests.")
.apply = [](const client_options &opt, const extended_option &ext, memcached_st *memc) {
if (MEMCACHED_SUCCESS != memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, ext.set)) {
Expand Down
2 changes: 1 addition & 1 deletion src/bin/memslap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ int main(int argc, char *argv[]) {
opt.add("noreply", 'R', no_argument, "Enable the NOREPLY behavior for storage commands.")
.apply = [](const client_options &opt_, const client_options::extended_option &ext, memcached_st *memc) {
if (MEMCACHED_SUCCESS != memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NOREPLY, ext.set)) {
if(!opt_.isset("quiet")) {
if (!opt_.isset("quiet")) {
std::cerr << memcached_last_error_message(memc);
}
return false;
Expand Down
47 changes: 46 additions & 1 deletion src/libmemcached/auto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,47 @@ static void auto_response(memcached_instance_st *instance, const bool reply, mem
}
}

static memcached_return_t meta_incr_decr(memcached_instance_st *instance, bool is_incr, bool w_init,
const char *key, size_t key_len,
uint64_t offset, uint64_t initial, uint32_t expiration) {
char new_buf[32] = " N", inl_buf[32] = " J", dlt_buf[32] = " D", exp_buf[32] = " T";
size_t new_len = strlen(new_buf), inl_len = strlen(inl_buf), dlt_len = strlen(dlt_buf), exp_len = strlen(exp_buf);
size_t io_num = 0;
libmemcached_io_vector_st io_vec[10] = {};

io_vec[io_num++] = {memcached_literal_param("ma ")};
io_vec[io_num++] = {memcached_array_string(instance->root->_namespace),
memcached_array_size(instance->root->_namespace)},
io_vec[io_num++] = {key, key_len};

if (!is_incr) {
io_vec[io_num++] = {memcached_literal_param(" MD")};
}
if (w_init) {
new_len += snprintf(new_buf + new_len, sizeof(new_buf) - new_len, "%" PRIu32, expiration);
io_vec[io_num++] = {new_buf, new_len};
inl_len += snprintf(inl_buf + inl_len, sizeof(inl_buf) - inl_len, "%" PRIu64, initial);
io_vec[io_num++] = {inl_buf, inl_len};
}
if (offset != 1) {
dlt_len += snprintf(dlt_buf + dlt_len, sizeof(dlt_buf) - dlt_len, "%" PRIu64, offset);
io_vec[io_num++] = {dlt_buf, dlt_len};
}
if (expiration) {
exp_len += snprintf(exp_buf + exp_len, sizeof(exp_buf) - exp_len, "%" PRIu32, expiration);
io_vec[io_num++] = {exp_buf, exp_len};
}

if (memcached_is_replying(instance->root)) {
io_vec[io_num++] = {memcached_literal_param(" v")};
} else {
io_vec[io_num++] = {memcached_literal_param(" q")};
}
io_vec[io_num++] = {memcached_literal_param(" O+\r\n")};

return memcached_vdo(instance, io_vec, io_num, true);
}

static memcached_return_t text_incr_decr(memcached_instance_st *instance, const bool is_incr,
const char *key, size_t key_length, const uint64_t offset,
const bool reply) {
Expand Down Expand Up @@ -145,6 +186,8 @@ static memcached_return_t increment_decrement_by_key(const protocol_binary_comma
if (memcached_is_binary(memc)) {
rc = binary_incr_decr(instance, command, key, key_length, uint64_t(offset), 0,
MEMCACHED_EXPIRATION_NOT_ADD, reply);
} else if (memcached_is_meta(memc)) {
rc = meta_incr_decr(instance, command == PROTOCOL_BINARY_CMD_INCREMENT, false, key, key_length, offset, 0, 0);
} else {
rc = text_incr_decr(instance, command == PROTOCOL_BINARY_CMD_INCREMENT ? true : false, key,
key_length, offset, reply);
Expand Down Expand Up @@ -189,7 +232,9 @@ increment_decrement_with_initial_by_key(const protocol_binary_command command, M
if (memcached_is_binary(memc)) {
rc = binary_incr_decr(instance, command, key, key_length, offset, initial, uint32_t(expiration),
reply);

} else if (memcached_is_meta(memc)) {
rc = meta_incr_decr(instance, command == PROTOCOL_BINARY_CMD_INCREMENT, true,
key, key_length, offset, initial, uint32_t(expiration));
} else {
rc = memcached_set_error(
*memc, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
Expand Down
14 changes: 12 additions & 2 deletions src/libmemcached/behavior.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,22 @@ memcached_return_t memcached_behavior_set(memcached_st *shell, const memcached_b
break;

case MEMCACHED_BEHAVIOR_BINARY_PROTOCOL:
send_quit(
ptr); // We need t shutdown all of the connections to make sure we do the correct protocol
// We need t shutdown all of the connections to make sure we do the correct protocol
send_quit(ptr);
if (data) {
ptr->flags.verify_key = false;
}
ptr->flags.binary_protocol = bool(data);
break;

case MEMCACHED_BEHAVIOR_META_PROTOCOL:
if (data && ptr->flags.binary_protocol) {
send_quit(ptr);
ptr->flags.binary_protocol = false;
}
ptr->flags.meta_protocol = bool(data);
break;

case MEMCACHED_BEHAVIOR_SUPPORT_CAS:
ptr->flags.support_cas = bool(data);
break;
Expand Down Expand Up @@ -623,6 +631,8 @@ const char *libmemcached_string_behavior(const memcached_behavior_t flag) {
return "MEMCACHED_BEHAVIOR_KETAMA_HASH";
case MEMCACHED_BEHAVIOR_BINARY_PROTOCOL:
return "MEMCACHED_BEHAVIOR_BINARY_PROTOCOL";
case MEMCACHED_BEHAVIOR_META_PROTOCOL:
return "MEMCACHED_BEHAVIOR_META_PROTOCOL";
case MEMCACHED_BEHAVIOR_SND_TIMEOUT:
return "MEMCACHED_BEHAVIOR_SND_TIMEOUT";
case MEMCACHED_BEHAVIOR_RCV_TIMEOUT:
Expand Down
36 changes: 30 additions & 6 deletions src/libmemcached/delete.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,30 @@ memcached_return_t memcached_delete(memcached_st *shell, const char *key, size_t
return memcached_delete_by_key(shell, key, key_length, key, key_length, expiration);
}

static inline memcached_return_t meta_delete(memcached_instance_st *instance,
const char *key, size_t key_length,
time_t expiration) {

char ex_buf[32] = " I T";
size_t io_num = 0, ex_len = strlen(ex_buf);
libmemcached_io_vector_st io_vec[6] = {};
io_vec[io_num++] = {memcached_literal_param("md ")};
io_vec[io_num++] = {memcached_array_string(instance->root->_namespace),
memcached_array_size(instance->root->_namespace)};
io_vec[io_num++] = {key, key_length};
if (!memcached_is_replying(instance->root)) {
io_vec[io_num++] = {" q", 2};
}
if (expiration) {
ex_len += snprintf(ex_buf + ex_len, sizeof(ex_buf) - ex_len, "%llu", (unsigned long long) expiration);
io_vec[io_num++] = {ex_buf, ex_len};
}
io_vec[io_num++] = {memcached_literal_param("\r\n")};

/* Send command header, only flush if we are NOT buffering */
return memcached_vdo(instance, io_vec, io_num, !memcached_is_buffering(instance->root));
}

static inline memcached_return_t ascii_delete(memcached_instance_st *instance, uint32_t,
const char *key, const size_t key_length,
const bool reply, const bool is_buffering) {
Expand Down Expand Up @@ -85,10 +109,9 @@ static inline memcached_return_t binary_delete(memcached_instance_st *instance,
return rc;
}

memcached_return_t memcached_delete_by_key(memcached_st *shell, const char *group_key,
memcached_return_t memcached_delete_by_key(memcached_st *memc, const char *group_key,
size_t group_key_length, const char *key,
size_t key_length, time_t expiration) {
Memcached *memc = memcached2Memcached(shell);
LIBMEMCACHED_MEMCACHED_DELETE_START();

memcached_return_t rc;
Expand All @@ -100,16 +123,15 @@ memcached_return_t memcached_delete_by_key(memcached_st *shell, const char *grou
return memcached_last_error(memc);
}

if (expiration) {
if (expiration && !memcached_is_meta(memc)) {
return memcached_set_error(
*memc, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
memcached_literal_param(
"Memcached server version does not allow expiration of deleted items"));
}

uint32_t server_key =
memcached_generate_hash_with_redistribution(memc, group_key, group_key_length);
memcached_instance_st *instance = memcached_instance_fetch(memc, server_key);
auto server_key = memcached_generate_hash_with_redistribution(memc, group_key, group_key_length);
auto *instance = memcached_instance_fetch(memc, server_key);

bool is_buffering = memcached_is_buffering(instance->root);
bool is_replying = memcached_is_replying(instance->root);
Expand All @@ -132,6 +154,8 @@ memcached_return_t memcached_delete_by_key(memcached_st *shell, const char *grou

if (memcached_is_binary(memc)) {
rc = binary_delete(instance, server_key, key, key_length, is_replying, is_buffering);
} else if (memcached_is_meta(memc)) {
rc = meta_delete(instance, key, key_length, expiration);
} else {
rc = ascii_delete(instance, server_key, key, key_length, is_replying, is_buffering);
}
Expand Down
4 changes: 2 additions & 2 deletions src/libmemcached/fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ memcached_result_st *memcached_fetch_result(memcached_st *ptr, memcached_result_
*error = MEMCACHED_END;
} else if (*error == MEMCACHED_MAXIMUM_RETURN and result->count) {
*error = MEMCACHED_END;
} else if (*error == MEMCACHED_MAXIMUM_RETURN) // while() loop was never entered
{
} else if (*error == MEMCACHED_MAXIMUM_RETURN) {
// while() loop was never entered
*error = MEMCACHED_NOTFOUND;
} else if (connection_failures) {
/*
Expand Down
Loading

0 comments on commit d02c98c

Please sign in to comment.