Line data Source code
1 : #ifndef SOCIAL_NETWORK_MICROSERVICES_CLIENTPOOL_H
2 : #define SOCIAL_NETWORK_MICROSERVICES_CLIENTPOOL_H
3 :
4 : #include <vector>
5 : #include <mutex>
6 : #include <condition_variable>
7 : #include <deque>
8 : #include <chrono>
9 : #include <string>
10 : #include <nlohmann/json.hpp>
11 :
12 : #include "logger.h"
13 :
14 : namespace social_network {
15 : using json = nlohmann::json;
16 :
17 : template<class TClient>
18 : class ClientPool {
19 : public:
20 : ClientPool(const std::string &client_type, const std::string &addr,
21 : int port, int min_size, int max_size, int timeout_ms, int keepalive_ms,
22 : const json &config_json);
23 : ~ClientPool();
24 :
25 : ClientPool(const ClientPool&) = delete;
26 : ClientPool& operator=(const ClientPool&) = delete;
27 : ClientPool(ClientPool&&) = default;
28 : ClientPool& operator=(ClientPool&&) = default;
29 :
30 : TClient * Pop();
31 : void Push(TClient *);
32 : void Keepalive(TClient *);
33 : void Remove(TClient *);
34 :
35 : private:
36 : std::deque<TClient *> _pool;
37 : std::string _addr;
38 : std::string _client_type;
39 : int _port;
40 : int _min_pool_size{};
41 : int _max_pool_size{};
42 : int _curr_pool_size{};
43 : int _timeout_ms;
44 : int _keepalive_ms;
45 : std::mutex _mtx;
46 : std::condition_variable _cv;
47 : const json *_config_json;
48 :
49 : };
50 :
51 : template<class TClient>
52 1 : ClientPool<TClient>::ClientPool(const std::string &client_type,
53 : const std::string &addr, int port, int min_pool_size,
54 : int max_pool_size, int timeout_ms, int keepalive_ms,
55 1 : const json &config_json) {
56 1 : _addr = addr;
57 1 : _port = port;
58 1 : _min_pool_size = min_pool_size;
59 1 : _max_pool_size = max_pool_size;
60 1 : _timeout_ms = timeout_ms;
61 1 : _client_type = client_type;
62 1 : _keepalive_ms = keepalive_ms;
63 1 : _config_json = &config_json;
64 :
65 1 : for (int i = 0; i < min_pool_size; ++i) {
66 0 : TClient *client = new TClient(addr, port, keepalive_ms, config_json);
67 0 : _pool.emplace_back(client);
68 : }
69 1 : _curr_pool_size = min_pool_size;
70 1 : }
71 :
72 : template<class TClient>
73 0 : ClientPool<TClient>::~ClientPool() {
74 0 : while (!_pool.empty()) {
75 0 : delete _pool.front();
76 0 : _pool.pop_front();
77 : }
78 0 : }
79 :
80 : template<class TClient>
81 75211 : TClient * ClientPool<TClient>::Pop() {
82 75211 : TClient * client = nullptr;
83 : {
84 150459 : std::unique_lock<std::mutex> cv_lock(_mtx);
85 75248 : while (_pool.size() == 0 && _curr_pool_size == _max_pool_size) {
86 : // Create a new a client if current pool size is less than
87 : // the max pool size.
88 0 : auto wait_time = std::chrono::system_clock::now() +
89 0 : std::chrono::milliseconds(_timeout_ms);
90 : bool wait_success = _cv.wait_until(cv_lock, wait_time,
91 0 : [this] { return _pool.size() > 0 || _curr_pool_size < _max_pool_size; });
92 0 : if (!wait_success) {
93 0 : LOG(warning) << "ClientPool pop timeout";
94 0 : LOG(info) << _pool.size() << " " << _curr_pool_size;
95 0 : cv_lock.unlock();
96 0 : return nullptr;
97 : }
98 : }
99 75248 : if (_pool.size() > 0) {
100 74885 : client = _pool.front();
101 74885 : _pool.pop_front();
102 : } else {
103 363 : client = new TClient(_addr, _port, _keepalive_ms, *_config_json);
104 363 : _curr_pool_size++;
105 : }
106 75248 : cv_lock.unlock();
107 : } // cv_lock(_mtx)
108 :
109 :
110 75248 : if (client) {
111 : try {
112 75247 : client->Connect();
113 0 : } catch (...) {
114 0 : LOG(error) << "Failed to connect " + _client_type;
115 0 : Remove(client);
116 0 : throw;
117 : }
118 : }
119 75245 : return client;
120 : }
121 :
122 : template<class TClient>
123 74847 : void ClientPool<TClient>::Push(TClient *client) {
124 149772 : std::unique_lock<std::mutex> cv_lock(_mtx);
125 74926 : _pool.push_back(client);
126 74926 : cv_lock.unlock();
127 74925 : _cv.notify_one();
128 74921 : }
129 :
130 : template<class TClient>
131 322 : void ClientPool<TClient>::Remove(TClient *client) {
132 : // No need to delete it from _pool because the *client has been poped out
133 322 : delete client;
134 644 : std::unique_lock<std::mutex> cv_lock(_mtx);
135 322 : _curr_pool_size--;
136 322 : cv_lock.unlock();
137 322 : _cv.notify_one();
138 321 : }
139 :
140 : template<class TClient>
141 75229 : void ClientPool<TClient>::Keepalive(TClient *client) {
142 150445 : long curr_timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
143 225662 : std::chrono::system_clock::now().time_since_epoch()).count();
144 75203 : if (curr_timestamp - client->_connect_timestamp > client->_keepalive_ms) {
145 322 : Remove(client);
146 : } else {
147 74881 : Push(client);
148 : }
149 75242 : }
150 :
151 : } // namespace social_network
152 :
153 :
154 : #endif //SOCIAL_NETWORK_MICROSERVICES_CLIENTPOOL_H
|