LCOV - code coverage report
Current view: top level - src - ClientPool.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 46 65 70.8 %
Date: 2025-11-04 02:47:32 Functions: 5 7 71.4 %

          Line data    Source code
       1             : #ifndef SOCIAL_NETWORK_MICROSERVICES_CLIENTPOOL_H
       2             : #define SOCIAL_NETWORK_MICROSERVICES_CLIENTPOOL_H
       3             : 
       4             : #include <vector>
       5             : #include <mutex>
       6             : #include <condition_variable>
       7             : #include <deque>
       8             : #include <chrono>
       9             : #include <string>
      10             : #include <nlohmann/json.hpp>
      11             : 
      12             : #include "logger.h"
      13             : 
      14             : namespace social_network {
      15             : using json = nlohmann::json;
      16             : 
      17             : template<class TClient>
      18             : class ClientPool {
      19             :  public:
      20             :   ClientPool(const std::string &client_type, const std::string &addr,
      21             :       int port, int min_size, int max_size, int timeout_ms, int keepalive_ms,
      22             :       const json &config_json);
      23             :   ~ClientPool();
      24             : 
      25             :   ClientPool(const ClientPool&) = delete;
      26             :   ClientPool& operator=(const ClientPool&) = delete;
      27             :   ClientPool(ClientPool&&) = default;
      28             :   ClientPool& operator=(ClientPool&&) = default;
      29             : 
      30             :   TClient * Pop();
      31             :   void Push(TClient *);
      32             :   void Keepalive(TClient *);
      33             :   void Remove(TClient *);
      34             : 
      35             :  private:
      36             :   std::deque<TClient *> _pool;
      37             :   std::string _addr;
      38             :   std::string _client_type;
      39             :   int _port;
      40             :   int _min_pool_size{};
      41             :   int _max_pool_size{};
      42             :   int _curr_pool_size{};
      43             :   int _timeout_ms;
      44             :   int _keepalive_ms;
      45             :   std::mutex _mtx;
      46             :   std::condition_variable _cv;
      47             :   const json *_config_json;
      48             : 
      49             : };
      50             : 
      51             : template<class TClient>
      52           1 : ClientPool<TClient>::ClientPool(const std::string &client_type,
      53             :     const std::string &addr, int port, int min_pool_size,
      54             :     int max_pool_size, int timeout_ms, int keepalive_ms,
      55           1 :     const json &config_json) {
      56           1 :   _addr = addr;
      57           1 :   _port = port;
      58           1 :   _min_pool_size = min_pool_size;
      59           1 :   _max_pool_size = max_pool_size;
      60           1 :   _timeout_ms = timeout_ms;
      61           1 :   _client_type = client_type;
      62           1 :   _keepalive_ms = keepalive_ms;
      63           1 :   _config_json = &config_json;
      64             : 
      65           1 :   for (int i = 0; i < min_pool_size; ++i) {
      66           0 :     TClient *client = new TClient(addr, port, keepalive_ms, config_json);
      67           0 :     _pool.emplace_back(client);
      68             :   }
      69           1 :   _curr_pool_size = min_pool_size;
      70           1 : }
      71             : 
      72             : template<class TClient>
      73           0 : ClientPool<TClient>::~ClientPool() {
      74           0 :   while (!_pool.empty()) {
      75           0 :     delete _pool.front();
      76           0 :     _pool.pop_front();
      77             :   }
      78           0 : }
      79             : 
      80             : template<class TClient>
      81         400 : TClient * ClientPool<TClient>::Pop() {
      82         400 :   TClient * client = nullptr;
      83             :   {
      84         800 :     std::unique_lock<std::mutex> cv_lock(_mtx);
      85         400 :     while (_pool.size() == 0 && _curr_pool_size == _max_pool_size) {
      86             :       // Create a new a client if current pool size is less than
      87             :       // the max pool size.
      88           0 :       auto wait_time = std::chrono::system_clock::now() +
      89           0 :           std::chrono::milliseconds(_timeout_ms);
      90             :       bool wait_success = _cv.wait_until(cv_lock, wait_time,
      91           0 :             [this] { return _pool.size() > 0 || _curr_pool_size < _max_pool_size; });
      92           0 :       if (!wait_success) {
      93           0 :         LOG(warning) << "ClientPool pop timeout";
      94           0 :         LOG(info) << _pool.size() << " " << _curr_pool_size;
      95           0 :         cv_lock.unlock();
      96           0 :         return nullptr;
      97             :       }
      98             :     }
      99         400 :     if (_pool.size() > 0) {
     100         300 :       client = _pool.front();
     101         300 :       _pool.pop_front();
     102             :     } else {
     103         100 :       client = new TClient(_addr, _port, _keepalive_ms, *_config_json);
     104         100 :       _curr_pool_size++;
     105             :     }
     106         400 :   cv_lock.unlock();
     107             :   } // cv_lock(_mtx)
     108             : 
     109             : 
     110         400 :   if (client) {
     111             :     try {
     112         400 :       client->Connect();
     113           0 :     } catch (...) {
     114           0 :       LOG(error) << "Failed to connect " + _client_type;
     115           0 :       Remove(client);
     116           0 :       throw;
     117             :     }
     118             :   }
     119         400 :   return client;
     120             : }
     121             : 
     122             : template<class TClient>
     123         301 : void ClientPool<TClient>::Push(TClient *client) {
     124         602 :   std::unique_lock<std::mutex> cv_lock(_mtx);
     125         301 :   _pool.push_back(client);
     126         301 :   cv_lock.unlock();
     127         301 :   _cv.notify_one();
     128         301 : }
     129             : 
     130             : template<class TClient>
     131          99 : void ClientPool<TClient>::Remove(TClient *client) {
     132             :   // No need to delete it from _pool because the *client has been poped out
     133          99 :   delete client;
     134         198 :   std::unique_lock<std::mutex> cv_lock(_mtx);
     135          99 :   _curr_pool_size--;
     136          99 :   cv_lock.unlock();
     137          99 :   _cv.notify_one();
     138          99 : }
     139             : 
     140             : template<class TClient>
     141         400 : void ClientPool<TClient>::Keepalive(TClient *client) {
     142         800 :   long curr_timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
     143        1200 :           std::chrono::system_clock::now().time_since_epoch()).count();
     144         400 :   if (curr_timestamp - client->_connect_timestamp > client->_keepalive_ms) {
     145          99 :     Remove(client);
     146             :   } else {
     147         301 :     Push(client);
     148             :   }
     149         400 : }
     150             : 
     151             : } // namespace social_network
     152             : 
     153             : 
     154             : #endif //SOCIAL_NETWORK_MICROSERVICES_CLIENTPOOL_H

Generated by: LCOV version 1.12