Replace redis cluster setup script with shared C code.

This commit is contained in:
Steve Hill
2016-10-12 15:00:22 -04:00
parent c95dc3cbe9
commit f549578a19
7 changed files with 389 additions and 326 deletions
+1
View File
@@ -58,6 +58,7 @@
'../net/instaweb/test.gyp:mod_pagespeed_test',
'../net/instaweb/test.gyp:mod_pagespeed_speed_test',
'install.gyp:*',
'<(DEPTH)/pagespeed/kernel.gyp:redis_cache_cluster_setup',
]
},
{
+4 -90
View File
@@ -30,105 +30,19 @@ function new_redis() {
IDS+=("$ID")
}
function send_command() {
local rc=0
# Capture the output and exit status of the $REDIS_CLI command, guarding
# against set -e failures.
local redis_out=$($REDIS_CLI -p "$1" || { rc=$?; true; })
if [ -n "$redis_out" ]; then
# Output any non-"OK" messages to STDOUT. If any were output, grep returns
# true and we record failure. || true masks grep "failure" from set -e.
grep -vw OK <<< "$redis_out" && rc=1 || true
else
# No output, failure.
rc=1
fi
return $rc
}
function is_config_consistent() {
local CONFIGS=()
for port in "${PORTS[@]}"; do
# 'CLUSTER NODES' command output is specified here:
# http://redis.io/commands/cluster-nodes
# We're choosing following fields: id, ip:port, master/slave, slot-range-1.
CONFIGS+=("$($REDIS_CLI -p "$port" CLUSTER NODES | \
awk '{ print $1" "$2" "$4" "$9; }' | \
sort)")
done
for cfg in "${CONFIGS[@]}"; do
if [[ "${CONFIGS[0]}" != "$cfg" ]]; then
return 1
fi
done
return 0
}
function is_cluster_healthy() {
for port in "${PORTS[@]}"; do
if ! $REDIS_CLI -p "$port" CLUSTER INFO | grep -q "cluster_state:ok"; then
return 1
fi
done
return 0
}
function add_slots() {
local port=$1
local start=$2
local end=$3
# Older versions of redis-cli don't seem to like enormous commands.
# We break these up into chunks of 500 slots so they will "fit".
while [ $(( end - start )) -ge 500 ]; do
echo "CLUSTER ADDSLOTS $(seq -s" " $start $(( start + 499 )) )" |\
send_command $port
(( start += 500 ))
done
echo "CLUSTER ADDSLOTS $(seq -s" " $start $end)" | send_command $port
}
echo Starting replicas...
# new_redis calls start_background_server.sh, which sets EXIT trap in current
# shell. Traps do not stack, so we have to start each server in a new subshell.
(new_redis; (new_redis; (new_redis; (new_redis; (new_redis; (new_redis; (
echo -n "Setting up cluster... "
# Typically Redis Cluster will eventually propagate information itself, but we
# want to set up the cluster as fast as possible, therefore we make full
# configuration.
for a in "${PORTS[@]}"; do
for b in "${PORTS[@]}"; do
echo "CLUSTER MEET 127.0.0.1 $b"
done | send_command $a
done
# This configuration should match one in redis_cache_cluster_test.cc, and any
# changes here should be copied there.
add_slots ${PORTS[0]} 0 5499
add_slots ${PORTS[1]} 5500 10999
add_slots ${PORTS[2]} 11000 16383
echo "CLUSTER REPLICATE ${IDS[0]}" | send_command ${PORTS[3]}
echo "CLUSTER REPLICATE ${IDS[1]}" | send_command ${PORTS[4]}
echo "CLUSTER REPLICATE ${IDS[2]}" | send_command ${PORTS[5]}
export REDIS_CLUSTER_PORTS="${PORTS[*]}"
export REDIS_CLUSTER_IDS="${IDS[*]}"
echo Running redis_cache_cluster_setup...
SETUP_DIR=out/$BUILDTYPE
$SETUP_DIR/redis_cache_cluster_setup || exit 1
echo done
# Although cluster cannot be marked healthy until all slots are covered (e.g.
# each slave knows all masters), we want all nodes (including slaves) to know
# about every other node before we start unit tests.
echo -n "Waiting for configurations to propagate..."
wait_cmd_with_timeout 3 is_config_consistent
echo
# Even if each node received full information about the rest of cluster, it
# can still wait a little before going into 'healthy' mode.
echo -n "Waiting for cluster to become healthy..."
wait_cmd_with_timeout 3 is_cluster_healthy
echo
echo Running tests
"$@"
)))))))
+1
View File
@@ -53,6 +53,7 @@
'<(DEPTH)/pagespeed/kernel.gyp:pagespeed_image_test_util',
'<(DEPTH)/pagespeed/kernel.gyp:pthread_system',
'<(DEPTH)/pagespeed/kernel.gyp:pagespeed_proto_matcher_test_proto',
'<(DEPTH)/pagespeed/kernel.gyp:redis_cache_cluster_setup_lib',
'<(DEPTH)/pagespeed/kernel.gyp:tcp_connection_for_testing',
'<(DEPTH)/pagespeed/kernel.gyp:tcp_server_thread_for_testing',
'<(DEPTH)/testing/gmock.gyp:gmock',
+30
View File
@@ -454,6 +454,35 @@
'<(DEPTH)/third_party/gflags/gflags.gyp:gflags',
],
},
{
'target_name': 'redis_cache_cluster_setup_lib',
'type': '<(library)',
'sources': [
'system/redis_cache_cluster_setup.cc',
],
'include_dirs': [
'<(DEPTH)',
],
'dependencies': [
'pagespeed_base',
'tcp_connection_for_testing',
'<(DEPTH)/third_party/apr/apr.gyp:apr',
],
},
{
'target_name': 'redis_cache_cluster_setup',
'type': 'executable',
'sources': [
'system/redis_cache_cluster_setup_main.cc',
],
'include_dirs': [
'<(DEPTH)',
],
'dependencies': [
'redis_cache_cluster_setup_lib',
'<(DEPTH)/third_party/apr/apr.gyp:apr',
],
},
{
'target_name': 'tcp_connection_for_testing',
'type': '<(library)',
@@ -465,6 +494,7 @@
],
'dependencies': [
'pagespeed_base',
'../net/instaweb/instaweb.gyp:instaweb_system',
'<(DEPTH)/third_party/apr/apr.gyp:apr',
],
},
@@ -0,0 +1,281 @@
/*
* Copyright 2016 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Author: yeputons@google.com (Egor Suvorov)
// Unit-test the redis interface in conjunction with Redis Cluster
#include "pagespeed/system/redis_cache_cluster_setup.h"
#include <cstddef>
#include <cstdlib>
#include <algorithm>
#include "base/logging.h"
#include "pagespeed/kernel/base/basictypes.h"
#include "pagespeed/kernel/base/posix_timer.h"
#include "pagespeed/kernel/base/string.h"
#include "pagespeed/kernel/base/string_util.h"
#include "pagespeed/system/tcp_connection_for_testing.h"
namespace net_instaweb {
namespace RedisCluster {
namespace {
static const int kReconfigurationPropagationTimeoutMs = 5000;
GoogleString ReadBulkString(TcpConnectionForTesting* conn) {
GoogleString length_str_storage = conn->ReadLineCrLf();
StringPiece length_str = length_str_storage;
// Check that Redis answered with Bulk String
CHECK(length_str.starts_with("$"));
length_str.remove_prefix(1);
CHECK(length_str.ends_with("\r\n"));
length_str.remove_suffix(2);
int length;
CHECK(StringToInt(length_str, &length));
GoogleString result = conn->ReadBytes(length);
CHECK_EQ("\r\n", conn->ReadLineCrLf());
return result;
}
} // namespace
// TODO(cheesy): Instead of CLUSTER NODES, CLUSTER SLOTS provides the same
// information in a machine readable format that is already parsed in
// RedisCache::FetchClusterSlotMapping. The CLUSTER NODES part of this code
// could be replaced with a call to the innards of FetchClusterSlotMapping.
// redisReaderCreate and redisReaderGetReply would likely be needed to turn the
// ReadBulkString result into a redisReply. See:
// https://github.com/redis/hiredis/issues/59
StringVector GetNodeConfig(TcpConnectionForTesting* conn) {
conn->Send("CLUSTER INFO\r\n");
GoogleString cluster_info = ReadBulkString(conn);
if (cluster_info.find("cluster_state:ok\r\n") == GoogleString::npos) {
return {};
}
conn->Send("CLUSTER NODES\r\n");
GoogleString config_csv = ReadBulkString(conn);
StringPieceVector lines;
SplitStringPieceToVector(config_csv, "\r\n", &lines,
true /* omit_empty_strings */);
StringVector current_config;
for (StringPiece line : lines) {
StringPieceVector fields;
SplitStringPieceToVector(line, " ", &fields,
true /* omit_empty_strings */);
CHECK_GE(fields.size(), 8);
GoogleString node_descr;
// See http://redis.io/commands/cluster-nodes. We take three fields
// from node description (node id, ip:port, master/slave) plus
// information about slots served.
StrAppend(&node_descr, fields[0], " ", fields[1], " ", fields[3]);
for (auto it = fields.begin() + 8; it != fields.end(); ++it) {
StrAppend(&node_descr, " ", *it);
}
current_config.push_back(node_descr);
}
std::sort(current_config.begin(), current_config.end());
return current_config;
}
void ResetConfiguration(StringVector* node_ids,
std::vector<int>* ports,
ConnectionList* connections) {
// TODO(cheesy): These should be collapsed onto a single vector of
// struct { conn, port, node_id }.
CHECK_EQ(6, connections->size());
CHECK_EQ(connections->size(), ports->size());
CHECK_EQ(connections->size(), node_ids->size());
LOG(INFO) << "Resetting Redis Cluster configuration back to default";
// Flush the nodes which is required to re-configure the cluster.
FlushAll(connections);
// Reset all nodes.
for (auto& conn : *connections) {
conn->Send("CLUSTER RESET SOFT\r\n");
}
for (auto& conn : *connections) {
CHECK_EQ("+OK\r\n", conn->ReadLineCrLf());
}
// Now make nodes know about each other.
for (auto& conn : *connections) {
for (int port : *ports) {
conn->Send(
StrCat("CLUSTER MEET 127.0.0.1 ", IntegerToString(port), "\r\n"));
}
for (int i = 0, n = ports->size(); i < n; ++i) {
CHECK_EQ("+OK\r\n", conn->ReadLineCrLf());
}
}
// And finally load slots configuration.
// Some of these boundaries are explicitly probed in the SlotBoundaries
// test. If you change the cluster layout, you must also change that test.
static const int slot_ranges[] = { 0, 5500, 11000, 16384 };
for (int i = 0; i < 3; i++) {
GoogleString command = "CLUSTER ADDSLOTS";
for (int slot = slot_ranges[i]; slot < slot_ranges[i + 1]; slot++) {
StrAppend(&command, " ", IntegerToString(slot));
}
StrAppend(&command, "\r\n");
auto& conn = (*connections)[i];
conn->Send(command);
CHECK_EQ("+OK\r\n", conn->ReadLineCrLf());
}
// Nodes learn about each other asynchronously in response to CLUSTER MEET
// above, but if the system hasn't yet converged, REPLICATE will fail. We
// poll the cluster config with GetNodeClusterConfig() until every node
// knows about every other node.
LOG(INFO) << "Reset Redis Cluster configuration back to default, "
"waiting for node propagation...";
PosixTimer timer;
int64 timeout_at_ms = timer.NowMs() + kReconfigurationPropagationTimeoutMs;
bool propagated = false;
while (!propagated && timer.NowMs() < timeout_at_ms) {
size_t num_complete = 0;
// For every connection, pull the node config and verify that it sees
// the right number of nodes.
for (auto& conn : *connections) {
StringVector config = GetNodeConfig(conn.get());
if (config.size() == connections->size()) {
++num_complete;
} else {
break;
}
}
if (num_complete == connections->size()) {
propagated = true;
} else {
timer.SleepMs(50);
}
}
CHECK(propagated) << "All nodes did not report in after CLUSTER MEET";
for (int i = 3; i < 6; i++) {
auto& conn = (*connections)[i];
conn->Send(StrCat("CLUSTER REPLICATE ", (*node_ids)[i - 3], "\r\n"));
CHECK_EQ("+OK\r\n", conn->ReadLineCrLf());
}
// Now wait until all nodes report cluster as healthy and report same
// cluster configuration.
LOG(INFO) << "Reset Redis Cluster configuration back to default, "
"waiting for slot propagation...";
timeout_at_ms = timer.NowMs() + kReconfigurationPropagationTimeoutMs;
bool cluster_is_up = false;
while (!cluster_is_up) {
CHECK_LE(timer.NowMs(), timeout_at_ms)
<< "Redis Cluster configuration did not propagate in time";
StringVector first_node_config;
for (auto& conn : *connections) {
StringVector current_config = GetNodeConfig(conn.get());
if (current_config.empty() ||
current_config.size() != connections->size()) {
break;
}
// Check configs are the same on all nodes.
cluster_is_up = true;
if (first_node_config.empty()) {
first_node_config = current_config;
} else if (first_node_config != current_config) {
cluster_is_up = false;
break;
}
}
if (!cluster_is_up) {
timer.SleepMs(50);
}
}
LOG(INFO) << "Redis Cluster is reset";
}
bool LoadConfiguration(StringVector* node_ids,
std::vector<int>* ports,
ConnectionList* connections) {
// Parsing environment variables.
// TODO(cheesy): We should discover the cluster IDs by querying the ports,
// and not rely on the shell to set REDIS_CLUSTER_IDS for us.
const char* ports_env = getenv("REDIS_CLUSTER_PORTS");
const char* ids_env = getenv("REDIS_CLUSTER_IDS");
if (!ports_env && !ids_env) {
LOG(ERROR) << "Env variables REDIS_CLUSTER_* are not set. Use "
<< "install/run_program_with_redis_cluster.sh for running "
<< "these tests. Do not use real cluster; ALL DATA WILL "
<< "BE ERASED DURING TESTS!";
return false;
}
CHECK(ports_env) << "Env variable REDIS_CLUSTER_PORTS is unspecified";
CHECK(ids_env) << "Env variable REDIS_CLUSTER_IDS is unspecified";
StringPieceVector port_strs;
StringPieceVector id_strs;
SplitStringPieceToVector(ports_env, " ", &port_strs,
/* omit_empty_strings */ true);
SplitStringPieceToVector(ids_env, " ", &id_strs,
/* omit_empty_strings */ true);
CHECK_EQ(port_strs.size(), id_strs.size()) << "REDIS_CLUSTER_PORTS and "
"REDIS_CLUSTER_IDS have "
"different amount of items";
CHECK_EQ(port_strs.size(), 6) << "Six Redis Cluster nodes are expected";
for (auto port_str : port_strs) {
int port;
CHECK(StringToInt(port_str, &port)) << "Invalid port: " << port_str;
ports->push_back(port);
}
for (StringPiece id : id_strs) {
node_ids->push_back(id.as_string());
}
for (int port : *ports) {
connections->emplace_back(new TcpConnectionForTesting());
CHECK(connections->back()->Connect("localhost", port))
<< "Cannot connect to Redis Cluster node";
}
return true;
}
void FlushAll(TcpConnectionForTesting* conn) {
conn->Send("FLUSHALL\r\n");
GoogleString flushall_reply = conn->ReadLineCrLf();
// We'll get READONLY from slave nodes, which isn't a problem.
CHECK(flushall_reply == "+OK\r\n" ||
strings::StartsWith(flushall_reply, "-READONLY"));
}
void FlushAll(ConnectionList* connections) {
for (auto& conn : *connections) {
FlushAll(conn.get());
}
}
} // namespace RedisCluster
} // namespace net_instaweb
@@ -0,0 +1,62 @@
/*
* Copyright 2016 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Author: yeputons@google.com (Egor Suvorov)
*/
#ifndef PAGESPEED_SYSTEM_REDIS_CACHE_CLUSTER_SETUP_H_
#define PAGESPEED_SYSTEM_REDIS_CACHE_CLUSTER_SETUP_H_
#include <memory>
#include <vector>
#include "pagespeed/kernel/base/string_util.h"
#include "pagespeed/system/tcp_connection_for_testing.h"
namespace net_instaweb {
typedef std::vector<std::unique_ptr<TcpConnectionForTesting>> ConnectionList;
namespace RedisCluster {
// This function checks that node reports cluster as healthy and returns its
// knowledge about cluster configuration. Returns empty vector in case of
// failure.
StringVector GetNodeConfig(TcpConnectionForTesting* conn);
// Reset cluster configuration to our testing default.
//
// TODO(cheesy): node_ids, ports and connections should be collapsed onto a
// single vector of struct { conn, port, node_id }.
void ResetConfiguration(StringVector* node_ids,
std::vector<int>* ports,
ConnectionList* connections);
// Populate node_ids, ports and connections with values suitable to be passed
// into ResetClusterConfiguration. Config is loaded through environment
// variables REDIS_CLUSTER_PORTS and REDIS_CLUSTER_IDS.
bool LoadConfiguration(StringVector* node_ids,
std::vector<int>* ports,
ConnectionList* connections);
// Send redis FLUSHALL command, which removes all stored data.
void FlushAll(TcpConnectionForTesting* connection);
void FlushAll(ConnectionList* connections);
} // namespace RedisCluster
} // namespace net_instaweb
#endif // PAGESPEED_SYSTEM_REDIS_CACHE_CLUSTER_SETUP_H_
+10 -236
View File
@@ -20,8 +20,7 @@
#include "pagespeed/system/redis_cache.h"
#include <cstdlib>
#include <algorithm>
#include <cstddef>
#include <vector>
#include "base/logging.h"
@@ -31,9 +30,7 @@
#include "pagespeed/kernel/base/gtest.h"
#include "pagespeed/kernel/base/null_mutex.h"
#include "pagespeed/kernel/base/mock_timer.h"
#include "pagespeed/kernel/base/posix_timer.h"
#include "pagespeed/kernel/base/scoped_ptr.h"
#include "pagespeed/kernel/base/statistics.h"
#include "pagespeed/kernel/base/string.h"
#include "pagespeed/kernel/base/string_util.h"
#include "pagespeed/kernel/base/timer.h"
@@ -41,6 +38,7 @@
#include "pagespeed/kernel/cache/cache_test_base.h"
#include "pagespeed/kernel/util/platform.h"
#include "pagespeed/kernel/util/simple_stats.h"
#include "pagespeed/system/redis_cache_cluster_setup.h"
#include "pagespeed/system/tcp_connection_for_testing.h"
namespace net_instaweb {
@@ -49,7 +47,6 @@ namespace {
static const int kReconnectionDelayMs = 10;
static const int kTimeoutUs = 100 * Timer::kMsUs;
static const int kSlaveNodesFlushingTimeoutMs = 1000;
static const int kReconfigurationPropagationTimeoutMs = 5000;
// One can check following constants with CLUSTER KEYSLOT command.
// For testing purposes, both KEY and {}KEY should be in the same slot range.
@@ -80,232 +77,24 @@ class RedisCacheClusterTest : public CacheTestBase {
RedisCache::InitStats(&statistics_);
}
// run_program_with_redis_cluster.sh should take care of this for us, but
// leaving this here to make the test as hermetic as possible.
static void SetUpTestCase() {
StringVector node_ids;
std::vector<int> ports;
ConnectionList connections;
if (LoadClusterConfiguration(&node_ids, &ports, &connections)) {
ResetClusterConfiguration(&node_ids, &ports, &connections);
if (RedisCluster::LoadConfiguration(&node_ids, &ports, &connections)) {
RedisCluster::ResetConfiguration(&node_ids, &ports, &connections);
}
}
// This function checks that node reports cluster as healthy and returns its
// knowledge about cluster configuration. Returns empty vector in case of
// failure.
// TODO(yeputons): there is alternative command CLUSTER SLOTS which will
// produces better machine-readable result which we will have to parse inside
// RedisCache anyway to load cluster map in some C++ structure in advance.
// We can probably re-use that parser here. You may find hiredis' functions
// redisReaderCreate, redisReaderFree and redisReaderGetReply useful.
static StringVector GetNodeClusterConfig(TcpConnectionForTesting* conn) {
conn->Send("CLUSTER INFO\r\n");
GoogleString cluster_info = ReadBulkString(conn);
if (cluster_info.find("cluster_state:ok\r\n") == GoogleString::npos) {
return {};
}
conn->Send("CLUSTER NODES\r\n");
GoogleString config_csv = ReadBulkString(conn);
StringPieceVector lines;
SplitStringPieceToVector(config_csv, "\r\n", &lines,
true /* omit_empty_strings */);
StringVector current_config;
for (StringPiece line : lines) {
StringPieceVector fields;
SplitStringPieceToVector(line, " ", &fields,
true /* omit_empty_strings */);
CHECK_GE(fields.size(), 8);
GoogleString node_descr;
// See http://redis.io/commands/cluster-nodes. We take three fields
// from node description (node id, ip:port, master/slave) plus
// information about slots served.
StrAppend(&node_descr, fields[0], " ", fields[1], " ", fields[3]);
for (int i = 8; i < fields.size(); i++) {
StrAppend(&node_descr, " ", fields[i]);
}
current_config.push_back(node_descr);
}
std::sort(current_config.begin(), current_config.end());
return current_config;
}
// Reset cluster configuration to our testing default. This is duplicated in
// run_program_with_redis_cluster.sh for manual testing, and any changes here
// should be copied to there.
//
// TODO(jefftk): We should have a binary target that sets up the cluster for
// tests, and then run_program_with_redis_cluster.sh can call that binary.
// Then we don't have to duplicate the configuration.
static void ResetClusterConfiguration(StringVector* node_ids,
std::vector<int>* ports,
ConnectionList* connections) {
// TODO(cheesy): These should be collapsed onto a single vector of
// struct { conn, port, node_id }.
CHECK_EQ(6, connections->size());
CHECK_EQ(connections->size(), ports->size());
CHECK_EQ(connections->size(), node_ids->size());
LOG(INFO) << "Resetting Redis Cluster configuration back to default";
// First, flush all data from the cluster and reset all nodes.
for (auto& conn : *connections) {
conn->Send("FLUSHALL\r\nCLUSTER RESET SOFT\r\n");
}
for (auto& conn : *connections) {
GoogleString flushall_reply = conn->ReadLineCrLf();
// We'll get READONLY from slave nodes, which isn't a problem.
CHECK(flushall_reply == "+OK\r\n" ||
StringPiece(flushall_reply).starts_with("-READONLY"));
CHECK_EQ("+OK\r\n", conn->ReadLineCrLf());
}
// Now make nodes know about each other.
for (auto& conn : *connections) {
for (int port : *ports) {
conn->Send(
StrCat("CLUSTER MEET 127.0.0.1 ", IntegerToString(port), "\r\n"));
}
for (int i = 0, n = ports->size(); i < n; ++i) {
CHECK_EQ("+OK\r\n", conn->ReadLineCrLf());
}
}
// And finally load slots configuration.
// Some of these boundaries are explicitly probed in the SlotBoundaries
// test. If you change the cluster layout, you must also change that test.
static const int slot_ranges[] = { 0, 5500, 11000, 16384 };
for (int i = 0; i < 3; i++) {
GoogleString command = "CLUSTER ADDSLOTS";
for (int slot = slot_ranges[i]; slot < slot_ranges[i + 1]; slot++) {
StrAppend(&command, " ", IntegerToString(slot));
}
StrAppend(&command, "\r\n");
auto& conn = (*connections)[i];
conn->Send(command);
CHECK_EQ("+OK\r\n", conn->ReadLineCrLf());
}
// Nodes learn about each other asynchronously in response to CLUSTER MEET
// above, but if the system hasn't yet converged, REPLICATE will fail. We
// poll the cluster config with GetNodeClusterConfig() until every node
// knows about every other node.
LOG(INFO) << "Reset Redis Cluster configuration back to default, "
"waiting for node propagation...";
PosixTimer timer;
int64 timeout_at_ms = timer.NowMs() + kReconfigurationPropagationTimeoutMs;
bool propagated = false;
while (!propagated && timer.NowMs() < timeout_at_ms) {
int num_complete = 0;
// For every connection, pull the node config and verify that it sees
// the right number of nodes.
for (auto& conn : *connections) {
StringVector config = GetNodeClusterConfig(conn.get());
if (config.size() == connections->size()) {
++num_complete;
} else {
break;
}
}
if (num_complete == connections->size()) {
propagated = true;
} else {
timer.SleepMs(50);
}
}
CHECK(propagated) << "All nodes did not report in after CLUSTER MEET";
for (int i = 3; i < 6; i++) {
auto& conn = (*connections)[i];
conn->Send(StrCat("CLUSTER REPLICATE ", (*node_ids)[i - 3], "\r\n"));
CHECK_EQ("+OK\r\n", conn->ReadLineCrLf());
}
// Now wait until all nodes report cluster as healthy and report same
// cluster configuration.
LOG(INFO) << "Reset Redis Cluster configuration back to default, "
"waiting for slot propagation...";
timeout_at_ms = timer.NowMs() + kReconfigurationPropagationTimeoutMs;
bool cluster_is_up = false;
while (!cluster_is_up) {
CHECK_LE(timer.NowMs(), timeout_at_ms)
<< "Redis Cluster configuration did not propagate in time";
StringVector first_node_config;
for (auto& conn : *connections) {
StringVector current_config = GetNodeClusterConfig(conn.get());
if (current_config.empty() ||
current_config.size() != connections->size()) {
break;
}
// Check configs are the same on all nodes.
cluster_is_up = true;
if (first_node_config.empty()) {
first_node_config = current_config;
} else if (first_node_config != current_config) {
cluster_is_up = false;
break;
}
}
if (!cluster_is_up) {
timer.SleepMs(50);
}
}
LOG(INFO) << "Redis Cluster is reset";
}
static bool LoadClusterConfiguration(StringVector* node_ids,
std::vector<int>* ports,
ConnectionList* connections) {
// Parsing environment variables.
const char* ports_env = getenv("REDIS_CLUSTER_PORTS");
const char* ids_env = getenv("REDIS_CLUSTER_IDS");
if (!ports_env && !ids_env) {
LOG(ERROR) << "Env variables REDIS_CLUSTER_* are not set, skipping"
<< "RedisCacheClusterTest.* altogether. Use "
<< "install/run_program_with_redis_cluster.sh for running "
<< "these tests. Do not use real cluster; ALL DATA WILL "
<< "BE ERASED DURING TESTS!";
return false;
}
CHECK(ports_env) << "Env variable REDIS_CLUSTER_PORTS is unspecified";
CHECK(ids_env) << "Env variable REDIS_CLUSTER_IDS is unspecified";
StringPieceVector port_strs;
StringPieceVector id_strs;
SplitStringPieceToVector(ports_env, " ", &port_strs,
/* omit_empty_strings */ true);
SplitStringPieceToVector(ids_env, " ", &id_strs,
/* omit_empty_strings */ true);
CHECK_EQ(port_strs.size(), id_strs.size()) << "REDIS_CLUSTER_PORTS and "
"REDIS_CLUSTER_IDS have "
"different amount of items";
CHECK_EQ(port_strs.size(), 6) << "Six Redis Cluster nodes are expected";
for (auto port_str : port_strs) {
int port;
CHECK(StringToInt(port_str, &port)) << "Invalid port: " << port_str;
ports->push_back(port);
}
for (StringPiece id : id_strs) {
node_ids->push_back(id.as_string());
}
for (int port : *ports) {
connections->emplace_back(new TcpConnectionForTesting());
CHECK(connections->back()->Connect("localhost", port))
<< "Cannot connect to Redis Cluster node";
}
return true;
void TearDown() override {
RedisCluster::FlushAll(&connections_);
}
bool InitRedisClusterOrSkip() {
if (!LoadClusterConfiguration(&node_ids_, &ports_, &connections_)) {
if (!RedisCluster::LoadConfiguration(&node_ids_, &ports_, &connections_)) {
return false; // Already logged an error.
}
@@ -317,21 +106,6 @@ class RedisCacheClusterTest : public CacheTestBase {
return true;
}
static GoogleString ReadBulkString(TcpConnectionForTesting* conn) {
GoogleString length_str_storage = conn->ReadLineCrLf();
StringPiece length_str = length_str_storage;
// Check that Redis answered with Bulk String
CHECK(length_str.starts_with("$"));
length_str.remove_prefix(1);
CHECK(length_str.ends_with("\r\n"));
length_str.remove_suffix(2);
int length;
CHECK(StringToInt(length_str, &length));
GoogleString result = conn->ReadBytes(length);
CHECK_EQ("\r\n", conn->ReadLineCrLf());
return result;
}
CacheInterface* Cache() override { return cache_.get(); }
scoped_ptr<RedisCache> cache_;
@@ -515,7 +289,7 @@ class RedisCacheClusterTestWithReconfiguration : public RedisCacheClusterTest {
protected:
void TearDown() override {
if (!connections_.empty()) {
ResetClusterConfiguration(&node_ids_, &ports_, &connections_);
RedisCluster::ResetConfiguration(&node_ids_, &ports_, &connections_);
}
}
};