Skip to content

Commit

Permalink
refactor(solvera.cpp): remove redis dependency and add concurrency co…
Browse files Browse the repository at this point in the history
…ntrol
  • Loading branch information
ozeliurs committed Oct 2, 2024
1 parent 970eb32 commit 679e19e
Showing 1 changed file with 11 additions and 52 deletions.
63 changes: 11 additions & 52 deletions api/solver/solvera.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,11 @@

#include "board/scan_utils.h"

#include <sw/redis++/redis++.h>
#include <unifex/timed_single_thread_context.hpp>
unifex::timed_single_thread_context timer;
void update_cache(std::mutex &mutex,
const std::string &pieces_hash,
sw::redis::Redis &redis,
SharedData &shared_data)
{
std::scoped_lock lock(mutex);
spdlog::info("Adding {} hashes to redis", shared_data.hashes.size());
// insert into redis
try
{
redis.sadd(pieces_hash, shared_data.hashes.begin(), shared_data.hashes.end());
}
catch (const sw::redis::Error &e)
{
spdlog::error("Could not add hashes to redis: {}", e.what());
}
spdlog::info("Added hashes to redis");
auto temp = std::unordered_set<BoardHash>{};
redis.smembers(pieces_hash, std::inserter(temp, temp.end()));
// copy the temp set into the shared data
for (const auto &hash : temp)
{
shared_data.hashes.insert(hash);
}
shared_data.redis_hash_count = temp.size();
spdlog::info("Loaded {} hashes from redis", shared_data.redis_hash_count);
}
std::atomic<int> concurrent_jobs_count = 0;
int max_concurrent_jobs = 1;


auto build_response(const SharedData &shared_data, double elapsed_seconds) -> SolverRPC::Response
{
Expand Down Expand Up @@ -189,6 +164,12 @@ auto handle_server_solver_request(agrpc::GrpcContext &grpc_context,
return agrpc::register_sender_rpc_handler<SolverRPC>(
grpc_context, service1, [&](SolverRPC &rpc, SolverRPC::Request &request) -> unifex::task<void> {
spdlog::info("Received request");
if (concurrent_jobs_count >= max_concurrent_jobs)
{
spdlog::info("Too many concurrent jobs");
co_return;
}
concurrent_jobs_count++;
auto [board, pieces] = load_board_pieces_from_request(request);
std::mutex mutex;
Board max_board = create_board(board.size);
Expand All @@ -209,32 +190,11 @@ auto handle_server_solver_request(agrpc::GrpcContext &grpc_context,
shared_data.hash_length_threshold = request.hash_threshold();
auto start = std::chrono::high_resolution_clock::now();
auto pieces_hash = hash_pieces_board(pieces, board);
// load hashes from redis if they exist

sw::redis::ConnectionOptions connection_options;
connection_options.host = get_env_var("REDIS_HOST", "localhost");
connection_options.port = static_cast<int>(
strtol(get_env_var("REDIS_PORT", "6379").c_str(), nullptr, 10));
connection_options.password = get_env_var("REDIS_PASSWORD", "");
connection_options.db = 0;
sw::redis::Redis redis = sw::redis::Redis(connection_options);

// check if connection is working
try
{
redis.ping();
spdlog::info("Connected to redis");
}
catch (const sw::redis::Error &e)
{
spdlog::error("Could not connect to redis: {}", e.what());
co_return;
}
auto last_cache_pull = std::chrono::high_resolution_clock::now();
if (request.use_cache())
{
spdlog::info("Using cache");
update_cache(mutex, pieces_hash, redis, shared_data);
last_cache_pull = std::chrono::high_resolution_clock::now();
}
spdlog::info("Starting solver", board.size);
Expand All @@ -259,6 +219,7 @@ auto handle_server_solver_request(agrpc::GrpcContext &grpc_context,
spdlog::info("Found solution");
// stop threads
spdlog::info("Stopping threads");
concurrent_jobs_count--;
shared_data.stop = true;
for (auto &thread : threads)
{
Expand All @@ -271,15 +232,14 @@ auto handle_server_solver_request(agrpc::GrpcContext &grpc_context,
std::string out = std::string();
step_by_step.AppendToString(&out);

redis.sadd("solutions_" + pieces_hash, out);

co_await rpc.finish(grpc::Status::OK);
co_return;
}
spdlog::info("Writing response");
if (!co_await rpc.write(build_response(shared_data, seconds_since_start)))
{
spdlog::info("Client cancelled request");
concurrent_jobs_count--;
spdlog::info("Stopping threads");
// stop threads
shared_data.stop = true;
Expand All @@ -299,7 +259,6 @@ auto handle_server_solver_request(agrpc::GrpcContext &grpc_context,
/ 1000.0;
if (request.use_cache() && (seconds_since_last_cache_pull > request.cache_pull_interval()))
{
update_cache(mutex, pieces_hash, redis, shared_data);
last_cache_pull = std::chrono::high_resolution_clock::now();
}
spdlog::info("Response written");
Expand Down

0 comments on commit 679e19e

Please sign in to comment.