Line data Source code
1 : #ifndef SOCIAL_NETWORK_MICROSERVICES_SOCIALGRAPHHANDLER_H
2 : #define SOCIAL_NETWORK_MICROSERVICES_SOCIALGRAPHHANDLER_H
3 :
4 : #include <bson/bson.h>
5 : #include <mongoc.h>
6 : #include <sw/redis++/redis++.h>
7 :
8 : #include <chrono>
9 : #include <future>
10 : #include <iostream>
11 : #include <string>
12 : #include <thread>
13 : #include <vector>
14 :
15 : #include "../../gen-cpp/SocialGraphService.h"
16 : #include "../../gen-cpp/UserService.h"
17 : #include "../ClientPool.h"
18 : #include "../ThriftClient.h"
19 : #include "../logger.h"
20 : #include "../tracing.h"
21 :
22 : using namespace sw::redis;
23 :
24 : namespace social_network {
25 :
26 : using std::chrono::duration_cast;
27 : using std::chrono::milliseconds;
28 : using std::chrono::system_clock;
29 :
30 : class SocialGraphHandler : public SocialGraphServiceIf {
31 : public:
32 : SocialGraphHandler(mongoc_client_pool_t *, Redis *,
33 : ClientPool<ThriftClient<UserServiceClient>> *);
34 : SocialGraphHandler(mongoc_client_pool_t *, Redis *, Redis *,
35 : ClientPool<ThriftClient<UserServiceClient>>*);
36 : SocialGraphHandler(mongoc_client_pool_t *, RedisCluster *,
37 : ClientPool<ThriftClient<UserServiceClient>> *);
38 0 : ~SocialGraphHandler() override = default;
39 : bool IsRedisReplicationEnabled();
40 : void GetFollowers(std::vector<int64_t> &, int64_t, int64_t,
41 : const std::map<std::string, std::string> &) override;
42 : void GetFollowees(std::vector<int64_t> &, int64_t, int64_t,
43 : const std::map<std::string, std::string> &) override;
44 : void Follow(int64_t, int64_t, int64_t,
45 : const std::map<std::string, std::string> &) override;
46 : void Unfollow(int64_t, int64_t, int64_t,
47 : const std::map<std::string, std::string> &) override;
48 : void FollowWithUsername(int64_t, const std::string &, const std::string &,
49 : const std::map<std::string, std::string> &) override;
50 : void UnfollowWithUsername(
51 : int64_t, const std::string &, const std::string &,
52 : const std::map<std::string, std::string> &) override;
53 : void InsertUser(int64_t, int64_t,
54 : const std::map<std::string, std::string> &) override;
55 :
56 : private:
57 : mongoc_client_pool_t *_mongodb_client_pool;
58 : Redis *_redis_client_pool;
59 : Redis *_redis_replica_client_pool;
60 : Redis *_redis_primary_client_pool;
61 : RedisCluster *_redis_cluster_client_pool;
62 : ClientPool<ThriftClient<UserServiceClient>> *_user_service_client_pool;
63 : };
64 :
65 1 : SocialGraphHandler::SocialGraphHandler(
66 : mongoc_client_pool_t *mongodb_client_pool, Redis *redis_client_pool,
67 1 : ClientPool<ThriftClient<UserServiceClient>> *user_service_client_pool) {
68 1 : _mongodb_client_pool = mongodb_client_pool;
69 1 : _redis_client_pool = redis_client_pool;
70 1 : _redis_replica_client_pool = nullptr;
71 1 : _redis_primary_client_pool = nullptr;
72 1 : _redis_cluster_client_pool = nullptr;
73 1 : _user_service_client_pool = user_service_client_pool;
74 1 : }
75 :
76 0 : SocialGraphHandler::SocialGraphHandler(
77 : mongoc_client_pool_t* mongodb_client_pool, Redis* redis_replica_client_pool, Redis* redis_primary_client_pool,
78 0 : ClientPool<ThriftClient<UserServiceClient>>* user_service_client_pool) {
79 0 : _mongodb_client_pool = mongodb_client_pool;
80 0 : _redis_client_pool = nullptr;
81 0 : _redis_replica_client_pool = redis_replica_client_pool;
82 0 : _redis_primary_client_pool = redis_primary_client_pool;
83 0 : _redis_cluster_client_pool = nullptr;
84 0 : _user_service_client_pool = user_service_client_pool;
85 0 : }
86 :
87 0 : SocialGraphHandler::SocialGraphHandler(
88 : mongoc_client_pool_t *mongodb_client_pool,
89 : RedisCluster *redis_cluster_client_pool,
90 0 : ClientPool<ThriftClient<UserServiceClient>> *user_service_client_pool) {
91 0 : _mongodb_client_pool = mongodb_client_pool;
92 0 : _redis_client_pool = nullptr;
93 0 : _redis_replica_client_pool = nullptr;
94 0 : _redis_primary_client_pool = nullptr;
95 0 : _redis_cluster_client_pool = redis_cluster_client_pool;
96 0 : _user_service_client_pool = user_service_client_pool;
97 0 : }
98 :
99 0 : bool SocialGraphHandler::IsRedisReplicationEnabled() {
100 0 : return (_redis_primary_client_pool || _redis_replica_client_pool);
101 : }
102 :
103 37801 : void SocialGraphHandler::Follow(
104 : int64_t req_id, int64_t user_id, int64_t followee_id,
105 : const std::map<std::string, std::string> &carrier) {
106 75621 : LOG(info) << "Received Follow request [req_id=" << req_id << ", user_id=" << user_id << ", followee_id=" << followee_id << "]";
107 : // Initialize a span
108 75644 : TextMapReader reader(carrier);
109 75644 : std::map<std::string, std::string> writer_text_map;
110 75643 : TextMapWriter writer(writer_text_map);
111 75644 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
112 75641 : auto span = opentracing::Tracer::Global()->StartSpan(
113 151284 : "follow_server", {opentracing::ChildOf(parent_span->get())});
114 37821 : opentracing::Tracer::Global()->Inject(span->context(), writer);
115 :
116 : int64_t timestamp =
117 75640 : duration_cast<milliseconds>(system_clock::now().time_since_epoch())
118 37823 : .count();
119 :
120 : std::future<void> mongo_update_follower_future =
121 37818 : std::async(std::launch::async, [&]() {
122 : mongoc_client_t *mongodb_client =
123 75632 : mongoc_client_pool_pop(_mongodb_client_pool);
124 37824 : if (!mongodb_client) {
125 0 : ServiceException se;
126 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
127 0 : se.message = "Failed to pop a client from MongoDB pool";
128 0 : throw se;
129 : }
130 : auto collection = mongoc_client_get_collection(
131 37824 : mongodb_client, "social-graph", "social-graph");
132 37817 : if (!collection) {
133 0 : ServiceException se;
134 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
135 0 : se.message = "Failed to create collection social_graph from MongoDB";
136 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
137 0 : throw se;
138 : }
139 :
140 : // Update follower->followee edges
141 : const bson_t *doc;
142 75638 : bson_t *search_not_exist = BCON_NEW(
143 : "$and", "[", "{", "user_id", BCON_INT64(user_id), "}", "{",
144 : "followees", "{", "$not", "{", "$elemMatch", "{", "user_id",
145 : BCON_INT64(followee_id), "}", "}", "}", "}", "]");
146 75641 : bson_t *update = BCON_NEW("$push", "{", "followees", "{", "user_id",
147 : BCON_INT64(followee_id), "timestamp",
148 : BCON_INT64(timestamp), "}", "}");
149 : bson_error_t error;
150 : bson_t reply;
151 75632 : auto update_span = opentracing::Tracer::Global()->StartSpan(
152 151287 : "mongo_update_client", {opentracing::ChildOf(&span->context())});
153 : bool updated = mongoc_collection_find_and_modify(
154 : collection, search_not_exist, nullptr, update, nullptr, false,
155 37819 : false, true, &reply, &error);
156 37814 : if (!updated) {
157 0 : LOG(error) << "Failed to update social graph for user " << user_id
158 0 : << " to MongoDB: " << error.message;
159 0 : ServiceException se;
160 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
161 0 : se.message = error.message;
162 0 : bson_destroy(&reply);
163 0 : bson_destroy(update);
164 0 : bson_destroy(search_not_exist);
165 0 : mongoc_collection_destroy(collection);
166 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
167 0 : throw se;
168 : }
169 37814 : update_span->Finish();
170 37815 : bson_destroy(&reply);
171 37823 : bson_destroy(update);
172 37824 : bson_destroy(search_not_exist);
173 37824 : mongoc_collection_destroy(collection);
174 37814 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
175 113459 : });
176 :
177 : std::future<void> mongo_update_followee_future =
178 37813 : std::async(std::launch::async, [&]() {
179 : mongoc_client_t *mongodb_client =
180 75636 : mongoc_client_pool_pop(_mongodb_client_pool);
181 37822 : if (!mongodb_client) {
182 0 : ServiceException se;
183 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
184 0 : se.message = "Failed to pop a client from MongoDB pool";
185 0 : throw se;
186 : }
187 : auto collection = mongoc_client_get_collection(
188 37822 : mongodb_client, "social-graph", "social-graph");
189 37819 : if (!collection) {
190 0 : ServiceException se;
191 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
192 0 : se.message = "Failed to create collection social_graph from MongoDB";
193 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
194 0 : throw se;
195 : }
196 :
197 : // Update followee->follower edges
198 : bson_t *search_not_exist =
199 75636 : BCON_NEW("$and", "[", "{", "user_id", BCON_INT64(followee_id), "}",
200 : "{", "followers", "{", "$not", "{", "$elemMatch", "{",
201 : "user_id", BCON_INT64(user_id), "}", "}", "}", "}", "]");
202 75640 : bson_t *update = BCON_NEW("$push", "{", "followers", "{", "user_id",
203 : BCON_INT64(user_id), "timestamp",
204 : BCON_INT64(timestamp), "}", "}");
205 : bson_error_t error;
206 75630 : auto update_span = opentracing::Tracer::Global()->StartSpan(
207 : "social_graph_mongo_update_client",
208 151286 : {opentracing::ChildOf(&span->context())});
209 : bson_t reply;
210 : bool updated = mongoc_collection_find_and_modify(
211 : collection, search_not_exist, nullptr, update, nullptr, false,
212 37821 : false, true, &reply, &error);
213 37815 : if (!updated) {
214 0 : LOG(error) << "Failed to update social graph for user " << followee_id
215 0 : << " to MongoDB: " << error.message;
216 0 : ServiceException se;
217 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
218 0 : se.message = error.message;
219 0 : bson_destroy(update);
220 0 : bson_destroy(&reply);
221 0 : bson_destroy(search_not_exist);
222 0 : mongoc_collection_destroy(collection);
223 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
224 0 : throw se;
225 : }
226 37815 : update_span->Finish();
227 37814 : bson_destroy(update);
228 37819 : bson_destroy(&reply);
229 37820 : bson_destroy(search_not_exist);
230 37822 : mongoc_collection_destroy(collection);
231 37823 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
232 113448 : });
233 :
234 37817 : std::future<void> redis_update_future = std::async(std::launch::async, [&]() {
235 75640 : auto redis_span = opentracing::Tracer::Global()->StartSpan(
236 : "social_graph_redis_update_client",
237 151275 : {opentracing::ChildOf(&span->context())});
238 :
239 : {
240 37823 : if (_redis_client_pool) {
241 75640 : auto pipe = _redis_client_pool->pipeline(false);
242 113457 : pipe.zadd(std::to_string(user_id) + ":followees",
243 226925 : std::to_string(followee_id), timestamp, UpdateType::NOT_EXIST)
244 75639 : .zadd(std::to_string(followee_id) + ":followers",
245 226937 : std::to_string(user_id), timestamp, UpdateType::NOT_EXIST);
246 : try {
247 37822 : auto replies = pipe.exec();
248 0 : } catch (const Error &err) {
249 0 : LOG(error) << err.what();
250 0 : throw err;
251 : }
252 : }
253 0 : else if (IsRedisReplicationEnabled()) {
254 0 : auto pipe = _redis_primary_client_pool->pipeline(false);
255 0 : pipe.zadd(std::to_string(user_id) + ":followees",
256 0 : std::to_string(followee_id), timestamp, UpdateType::NOT_EXIST)
257 0 : .zadd(std::to_string(followee_id) + ":followers",
258 0 : std::to_string(user_id), timestamp, UpdateType::NOT_EXIST);
259 : try {
260 0 : auto replies = pipe.exec();
261 : }
262 0 : catch (const Error& err) {
263 0 : LOG(error) << err.what();
264 0 : throw err;
265 : }
266 : }
267 : else {
268 : // TODO: Redis++ currently does not support pipeline with multiple
269 : // hashtags in cluster mode.
270 : // Currently, we send one request for each follower, which may
271 : // incur some performance overhead. We are following the updates
272 : // of Redis++ clients:
273 : // https://github.com/sewenew/redis-plus-plus/issues/212
274 : try {
275 0 : _redis_cluster_client_pool->zadd(
276 0 : std::to_string(user_id) + ":followees",
277 0 : std::to_string(followee_id), timestamp, UpdateType::NOT_EXIST);
278 0 : _redis_cluster_client_pool->zadd(
279 0 : std::to_string(followee_id) + ":followers",
280 0 : std::to_string(user_id), timestamp, UpdateType::NOT_EXIST);
281 0 : } catch (const Error &err) {
282 0 : LOG(error) << err.what();
283 0 : throw err;
284 : }
285 : }
286 : }
287 37820 : redis_span->Finish();
288 113457 : });
289 :
290 : try {
291 37809 : redis_update_future.get();
292 37816 : mongo_update_follower_future.get();
293 37815 : mongo_update_followee_future.get();
294 75641 : LOG(info) << "Follow operation completed [req_id=" << req_id << ", user_id=" << user_id << ", followee_id=" << followee_id << "]";
295 0 : } catch (const std::exception &e) {
296 0 : LOG(warning) << e.what();
297 0 : throw;
298 0 : } catch (...) {
299 0 : throw;
300 : }
301 :
302 37823 : span->Finish();
303 37822 : }
304 :
305 200 : void SocialGraphHandler::Unfollow(
306 : int64_t req_id, int64_t user_id, int64_t followee_id,
307 : const std::map<std::string, std::string> &carrier) {
308 400 : LOG(info) << "Received Unfollow request [req_id=" << req_id << ", user_id=" << user_id << ", followee_id=" << followee_id << "]";
309 : // Initialize a span
310 400 : TextMapReader reader(carrier);
311 400 : std::map<std::string, std::string> writer_text_map;
312 400 : TextMapWriter writer(writer_text_map);
313 400 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
314 400 : auto span = opentracing::Tracer::Global()->StartSpan(
315 800 : "unfollow_server", {opentracing::ChildOf(parent_span->get())});
316 200 : opentracing::Tracer::Global()->Inject(span->context(), writer);
317 :
318 : std::future<void> mongo_update_follower_future =
319 200 : std::async(std::launch::async, [&]() {
320 : mongoc_client_t *mongodb_client =
321 400 : mongoc_client_pool_pop(_mongodb_client_pool);
322 200 : if (!mongodb_client) {
323 0 : ServiceException se;
324 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
325 0 : se.message = "Failed to pop a client from MongoDB pool";
326 0 : throw se;
327 : }
328 : auto collection = mongoc_client_get_collection(
329 200 : mongodb_client, "social-graph", "social-graph");
330 200 : if (!collection) {
331 0 : ServiceException se;
332 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
333 0 : se.message = "Failed to create collection social_graph from MongoDB";
334 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
335 0 : throw se;
336 : }
337 200 : bson_t *query = bson_new();
338 :
339 : // Update follower->followee edges
340 200 : BSON_APPEND_INT64(query, "user_id", user_id);
341 200 : bson_t *update = BCON_NEW("$pull", "{", "followees", "{", "user_id",
342 : BCON_INT64(followee_id), "}", "}");
343 : bson_t reply;
344 : bson_error_t error;
345 400 : auto update_span = opentracing::Tracer::Global()->StartSpan(
346 : "social_graph_mongo_delete_client",
347 800 : {opentracing::ChildOf(&span->context())});
348 : bool updated = mongoc_collection_find_and_modify(
349 : collection, query, nullptr, update, nullptr, false, false, true,
350 200 : &reply, &error);
351 200 : if (!updated) {
352 0 : LOG(error) << "Failed to delete social graph for user " << user_id
353 0 : << " to MongoDB: " << error.message;
354 0 : ServiceException se;
355 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
356 0 : se.message = error.message;
357 0 : bson_destroy(update);
358 0 : bson_destroy(query);
359 0 : bson_destroy(&reply);
360 0 : mongoc_collection_destroy(collection);
361 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
362 0 : throw se;
363 : }
364 200 : update_span->Finish();
365 200 : bson_destroy(update);
366 200 : bson_destroy(query);
367 200 : bson_destroy(&reply);
368 200 : mongoc_collection_destroy(collection);
369 200 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
370 600 : });
371 :
372 : std::future<void> mongo_update_followee_future =
373 200 : std::async(std::launch::async, [&]() {
374 : mongoc_client_t *mongodb_client =
375 400 : mongoc_client_pool_pop(_mongodb_client_pool);
376 200 : if (!mongodb_client) {
377 0 : ServiceException se;
378 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
379 0 : se.message = "Failed to pop a client from MongoDB pool";
380 0 : throw se;
381 : }
382 : auto collection = mongoc_client_get_collection(
383 200 : mongodb_client, "social-graph", "social-graph");
384 200 : if (!collection) {
385 0 : ServiceException se;
386 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
387 0 : se.message = "Failed to create collection social_graph from MongoDB";
388 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
389 0 : throw se;
390 : }
391 200 : bson_t *query = bson_new();
392 :
393 : // Update followee->follower edges
394 200 : BSON_APPEND_INT64(query, "user_id", followee_id);
395 200 : bson_t *update = BCON_NEW("$pull", "{", "followers", "{", "user_id",
396 : BCON_INT64(user_id), "}", "}");
397 : bson_t reply;
398 : bson_error_t error;
399 400 : auto update_span = opentracing::Tracer::Global()->StartSpan(
400 : "social_graph_mongo_delete_client",
401 800 : {opentracing::ChildOf(&span->context())});
402 : bool updated = mongoc_collection_find_and_modify(
403 : collection, query, nullptr, update, nullptr, false, false, true,
404 200 : &reply, &error);
405 200 : if (!updated) {
406 0 : LOG(error) << "Failed to delete social graph for user " << followee_id
407 0 : << " to MongoDB: " << error.message;
408 0 : ServiceException se;
409 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
410 0 : se.message = error.message;
411 0 : bson_destroy(update);
412 0 : bson_destroy(query);
413 0 : bson_destroy(&reply);
414 0 : mongoc_collection_destroy(collection);
415 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
416 0 : throw se;
417 : }
418 200 : update_span->Finish();
419 200 : bson_destroy(update);
420 200 : bson_destroy(query);
421 200 : bson_destroy(&reply);
422 200 : mongoc_collection_destroy(collection);
423 200 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
424 600 : });
425 :
426 200 : std::future<void> redis_update_future = std::async(std::launch::async, [&]() {
427 400 : auto redis_span = opentracing::Tracer::Global()->StartSpan(
428 : "social_graph_redis_update_client",
429 800 : {opentracing::ChildOf(&span->context())});
430 : {
431 200 : if (_redis_client_pool) {
432 400 : auto pipe = _redis_client_pool->pipeline(false);
433 600 : std::string followee_key = std::to_string(user_id) + ":followees";
434 600 : std::string follower_key = std::to_string(followee_id) + ":followers";
435 400 : pipe.zrem(followee_key, std::to_string(followee_id))
436 600 : .zrem(follower_key, std::to_string(user_id));
437 :
438 : try {
439 200 : auto replies = pipe.exec();
440 0 : } catch (const Error &err) {
441 0 : LOG(error) << err.what();
442 0 : throw err;
443 : }
444 : }
445 0 : else if (IsRedisReplicationEnabled()) {
446 0 : auto pipe = _redis_primary_client_pool->pipeline(false);
447 0 : std::string followee_key = std::to_string(user_id) + ":followees";
448 0 : std::string follower_key = std::to_string(followee_id) + ":followers";
449 0 : pipe.zrem(followee_key, std::to_string(followee_id))
450 0 : .zrem(follower_key, std::to_string(user_id));
451 :
452 : try {
453 0 : auto replies = pipe.exec();
454 : }
455 0 : catch (const Error& err) {
456 0 : LOG(error) << err.what();
457 0 : throw err;
458 : }
459 : }
460 : else {
461 0 : std::string followee_key = std::to_string(user_id) + ":followees";
462 0 : std::string follower_key = std::to_string(followee_id) + ":followers";
463 : try {
464 0 : _redis_cluster_client_pool->zrem(followee_key,
465 0 : std::to_string(followee_id));
466 0 : _redis_cluster_client_pool->zrem(follower_key,
467 0 : std::to_string(user_id));
468 0 : } catch (const Error &err) {
469 0 : LOG(error) << err.what();
470 0 : throw err;
471 : }
472 : }
473 : }
474 200 : redis_span->Finish();
475 600 : });
476 :
477 : try {
478 200 : redis_update_future.get();
479 200 : mongo_update_follower_future.get();
480 200 : mongo_update_followee_future.get();
481 400 : LOG(info) << "Unfollow operation completed [req_id=" << req_id << ", user_id=" << user_id << ", followee_id=" << followee_id << "]";
482 0 : } catch (...) {
483 0 : throw;
484 : }
485 :
486 200 : span->Finish();
487 200 : }
488 :
489 0 : void SocialGraphHandler::GetFollowers(
490 : std::vector<int64_t> &_return, const int64_t req_id, const int64_t user_id,
491 : const std::map<std::string, std::string> &carrier) {
492 0 : LOG(info) << "Received GetFollowers request [req_id=" << req_id << ", user_id=" << user_id << "]";
493 : // Initialize a span
494 0 : TextMapReader reader(carrier);
495 0 : std::map<std::string, std::string> writer_text_map;
496 0 : TextMapWriter writer(writer_text_map);
497 0 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
498 0 : auto span = opentracing::Tracer::Global()->StartSpan(
499 0 : "get_followers_server", {opentracing::ChildOf(parent_span->get())});
500 0 : opentracing::Tracer::Global()->Inject(span->context(), writer);
501 :
502 0 : auto redis_span = opentracing::Tracer::Global()->StartSpan(
503 : "social_graph_redis_get_client",
504 0 : {opentracing::ChildOf(&span->context())});
505 :
506 0 : std::vector<std::string> followers_str;
507 0 : std::string key = std::to_string(user_id) + ":followers";
508 : try {
509 0 : if (_redis_client_pool) {
510 0 : _redis_client_pool->zrange(key, 0, -1, std::back_inserter(followers_str));
511 : }
512 0 : else if (IsRedisReplicationEnabled()) {
513 0 : _redis_replica_client_pool->zrange(key, 0, -1, std::back_inserter(followers_str));
514 : }
515 : else {
516 0 : _redis_cluster_client_pool->zrange(key, 0, -1,
517 0 : std::back_inserter(followers_str));
518 : }
519 0 : } catch (const Error &err) {
520 0 : LOG(error) << err.what();
521 0 : throw err;
522 : }
523 0 : redis_span->Finish();
524 :
525 : // If user_id in the sodical graph Redis server, read from Redis
526 0 : if (followers_str.size() > 0) {
527 0 : for (auto const &follower_str : followers_str) {
528 0 : _return.emplace_back(std::stoul(follower_str));
529 : }
530 : }
531 : // If user_id in the sodical graph Redis server, read from MongoDB and
532 : // update Redis.
533 : else {
534 : mongoc_client_t *mongodb_client =
535 0 : mongoc_client_pool_pop(_mongodb_client_pool);
536 0 : if (!mongodb_client) {
537 0 : ServiceException se;
538 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
539 0 : se.message = "Failed to pop a client from MongoDB pool";
540 0 : throw se;
541 : }
542 : auto collection = mongoc_client_get_collection(
543 0 : mongodb_client, "social-graph", "social-graph");
544 0 : if (!collection) {
545 0 : ServiceException se;
546 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
547 0 : se.message = "Failed to create collection social_graph from MongoDB";
548 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
549 0 : throw se;
550 : }
551 0 : bson_t *query = bson_new();
552 0 : BSON_APPEND_INT64(query, "user_id", user_id);
553 0 : auto find_span = opentracing::Tracer::Global()->StartSpan(
554 : "social_graph_mongo_find_client",
555 0 : {opentracing::ChildOf(&span->context())});
556 : mongoc_cursor_t *cursor =
557 0 : mongoc_collection_find_with_opts(collection, query, nullptr, nullptr);
558 : const bson_t *doc;
559 0 : bool found = mongoc_cursor_next(cursor, &doc);
560 0 : if (found) {
561 : bson_iter_t iter_0;
562 : bson_iter_t iter_1;
563 : bson_iter_t user_id_child;
564 : bson_iter_t timestamp_child;
565 0 : int index = 0;
566 0 : std::unordered_map<std::string, double> redis_zset;
567 0 : bson_iter_init(&iter_0, doc);
568 0 : bson_iter_init(&iter_1, doc);
569 :
570 0 : while (bson_iter_find_descendant(
571 : &iter_0,
572 0 : ("followers." + std::to_string(index) + ".user_id").c_str(),
573 0 : &user_id_child) &&
574 0 : BSON_ITER_HOLDS_INT64(&user_id_child) &&
575 0 : bson_iter_find_descendant(
576 : &iter_1,
577 0 : ("followers." + std::to_string(index) + ".timestamp").c_str(),
578 0 : ×tamp_child) &&
579 0 : BSON_ITER_HOLDS_INT64(×tamp_child)) {
580 0 : auto iter_user_id = bson_iter_int64(&user_id_child);
581 0 : auto iter_timestamp = bson_iter_int64(×tamp_child);
582 0 : _return.emplace_back(iter_user_id);
583 : redis_zset.emplace(std::pair<std::string, double>(
584 0 : std::to_string(iter_user_id), (double)iter_timestamp));
585 0 : bson_iter_init(&iter_0, doc);
586 0 : bson_iter_init(&iter_1, doc);
587 0 : index++;
588 : }
589 0 : find_span->Finish();
590 0 : bson_destroy(query);
591 0 : mongoc_cursor_destroy(cursor);
592 0 : mongoc_collection_destroy(collection);
593 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
594 :
595 : // Update Redis
596 0 : std::string key = std::to_string(user_id) + ":followers";
597 0 : auto redis_insert_span = opentracing::Tracer::Global()->StartSpan(
598 : "social_graph_redis_insert_client",
599 0 : {opentracing::ChildOf(&span->context())});
600 : try {
601 0 : if (_redis_client_pool) {
602 0 : _redis_client_pool->zadd(key, redis_zset.begin(), redis_zset.end());
603 : }
604 0 : else if (IsRedisReplicationEnabled()) {
605 0 : _redis_primary_client_pool->zadd(key, redis_zset.begin(), redis_zset.end());
606 : }
607 : else {
608 0 : _redis_cluster_client_pool->zadd(key, redis_zset.begin(),
609 0 : redis_zset.end());
610 : }
611 0 : } catch (const Error &err) {
612 0 : LOG(error) << err.what();
613 0 : throw err;
614 : }
615 0 : redis_span->Finish();
616 : } else {
617 0 : LOG(warning) << "user_id: " << user_id << " not found";
618 0 : find_span->Finish();
619 0 : bson_destroy(query);
620 0 : mongoc_cursor_destroy(cursor);
621 0 : mongoc_collection_destroy(collection);
622 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
623 : }
624 : }
625 0 : LOG(info) << "GetFollowers completed [req_id=" << req_id << ", user_id=" << user_id << ", followers_count=" << _return.size() << "]";
626 0 : span->Finish();
627 0 : }
628 :
629 0 : void SocialGraphHandler::GetFollowees(
630 : std::vector<int64_t> &_return, const int64_t req_id, const int64_t user_id,
631 : const std::map<std::string, std::string> &carrier) {
632 0 : LOG(info) << "Received GetFollowees request [req_id=" << req_id << ", user_id=" << user_id << "]";
633 : // Initialize a span
634 0 : TextMapReader reader(carrier);
635 0 : std::map<std::string, std::string> writer_text_map;
636 0 : TextMapWriter writer(writer_text_map);
637 0 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
638 0 : auto span = opentracing::Tracer::Global()->StartSpan(
639 0 : "get_followees_server", {opentracing::ChildOf(parent_span->get())});
640 0 : opentracing::Tracer::Global()->Inject(span->context(), writer);
641 :
642 0 : auto redis_span = opentracing::Tracer::Global()->StartSpan(
643 : "social_graph_redis_get_client",
644 0 : {opentracing::ChildOf(&span->context())});
645 :
646 0 : std::vector<std::string> followees_str;
647 0 : std::string key = std::to_string(user_id) + ":followees";
648 : try {
649 0 : if (_redis_client_pool) {
650 0 : _redis_client_pool->zrange(key, 0, -1, std::back_inserter(followees_str));
651 : }
652 0 : else if (IsRedisReplicationEnabled()) {
653 0 : _redis_replica_client_pool->zrange(key, 0, -1, std::back_inserter(followees_str));
654 : }
655 : else {
656 0 : _redis_cluster_client_pool->zrange(key, 0, -1,
657 0 : std::back_inserter(followees_str));
658 : }
659 0 : } catch (const Error &err) {
660 0 : LOG(error) << err.what();
661 0 : throw err;
662 : }
663 0 : redis_span->Finish();
664 :
665 : // If user_id in the sodical graph Redis server, read from Redis
666 0 : if (followees_str.size() > 0) {
667 0 : for (auto const &followee_str : followees_str) {
668 0 : _return.emplace_back(std::stoul(followee_str));
669 : }
670 : }
671 : // If user_id in the sodical graph Redis server, read from MongoDB and
672 : // update Redis.
673 : else {
674 0 : redis_span->Finish();
675 : mongoc_client_t *mongodb_client =
676 0 : mongoc_client_pool_pop(_mongodb_client_pool);
677 0 : if (!mongodb_client) {
678 0 : ServiceException se;
679 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
680 0 : se.message = "Failed to pop a client from MongoDB pool";
681 0 : throw se;
682 : }
683 : auto collection = mongoc_client_get_collection(
684 0 : mongodb_client, "social-graph", "social-graph");
685 0 : if (!collection) {
686 0 : ServiceException se;
687 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
688 0 : se.message = "Failed to create collection social_graph from MongoDB";
689 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
690 0 : throw se;
691 : }
692 0 : bson_t *query = bson_new();
693 0 : BSON_APPEND_INT64(query, "user_id", user_id);
694 0 : auto find_span = opentracing::Tracer::Global()->StartSpan(
695 : "social_graph_mongo_find_client",
696 0 : {opentracing::ChildOf(&span->context())});
697 : mongoc_cursor_t *cursor =
698 0 : mongoc_collection_find_with_opts(collection, query, nullptr, nullptr);
699 : const bson_t *doc;
700 0 : bool found = mongoc_cursor_next(cursor, &doc);
701 0 : if (!found) {
702 0 : ServiceException se;
703 0 : se.errorCode = ErrorCode::SE_THRIFT_HANDLER_ERROR;
704 0 : se.message = "Cannot find user_id in MongoDB.";
705 0 : bson_destroy(query);
706 0 : mongoc_cursor_destroy(cursor);
707 0 : mongoc_collection_destroy(collection);
708 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
709 0 : throw se;
710 : } else {
711 : bson_iter_t iter_0;
712 : bson_iter_t iter_1;
713 : bson_iter_t user_id_child;
714 : bson_iter_t timestamp_child;
715 0 : int index = 0;
716 :
717 0 : bson_iter_init(&iter_0, doc);
718 0 : bson_iter_init(&iter_1, doc);
719 :
720 0 : std::multimap<std::string, double> redis_zset;
721 :
722 0 : while (bson_iter_find_descendant(
723 : &iter_0,
724 0 : ("followees." + std::to_string(index) + ".user_id").c_str(),
725 0 : &user_id_child) &&
726 0 : BSON_ITER_HOLDS_INT64(&user_id_child) &&
727 0 : bson_iter_find_descendant(
728 : &iter_1,
729 0 : ("followees." + std::to_string(index) + ".timestamp").c_str(),
730 0 : ×tamp_child) &&
731 0 : BSON_ITER_HOLDS_INT64(×tamp_child)) {
732 0 : auto iter_user_id = bson_iter_int64(&user_id_child);
733 0 : auto iter_timestamp = bson_iter_int64(×tamp_child);
734 0 : _return.emplace_back(iter_user_id);
735 :
736 : redis_zset.emplace(std::pair<std::string, double>(
737 0 : std::to_string(iter_user_id), (double)iter_timestamp));
738 0 : bson_iter_init(&iter_0, doc);
739 0 : bson_iter_init(&iter_1, doc);
740 0 : index++;
741 : }
742 :
743 0 : find_span->Finish();
744 0 : bson_destroy(query);
745 0 : mongoc_cursor_destroy(cursor);
746 0 : mongoc_collection_destroy(collection);
747 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
748 :
749 : // Update redis
750 0 : std::string key = std::to_string(user_id) + ":followees";
751 0 : auto redis_insert_span = opentracing::Tracer::Global()->StartSpan(
752 : "social_graph_redis_insert_client",
753 0 : {opentracing::ChildOf(&span->context())});
754 : try {
755 0 : if (_redis_client_pool) {
756 0 : _redis_client_pool->zadd(key, redis_zset.begin(), redis_zset.end());
757 : }
758 0 : else if (IsRedisReplicationEnabled()) {
759 0 : _redis_primary_client_pool->zadd(key, redis_zset.begin(), redis_zset.end());
760 : }
761 : else {
762 0 : _redis_cluster_client_pool->zadd(key, redis_zset.begin(),
763 0 : redis_zset.end());
764 : }
765 0 : } catch (const Error &err) {
766 0 : LOG(error) << err.what();
767 0 : throw err;
768 : }
769 0 : redis_span->Finish();
770 : }
771 : }
772 0 : LOG(info) << "GetFollowees completed [req_id=" << req_id << ", user_id=" << user_id << ", followees_count=" << _return.size() << "]";
773 0 : span->Finish();
774 0 : }
775 :
776 962 : void SocialGraphHandler::InsertUser(
777 : int64_t req_id, int64_t user_id,
778 : const std::map<std::string, std::string> &carrier) {
779 1924 : LOG(info) << "Received InsertUser request [req_id=" << req_id << ", user_id=" << user_id << "]";
780 : // Initialize a span
781 1923 : TextMapReader reader(carrier);
782 1924 : std::map<std::string, std::string> writer_text_map;
783 1924 : TextMapWriter writer(writer_text_map);
784 1924 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
785 1924 : auto span = opentracing::Tracer::Global()->StartSpan(
786 3848 : "insert_user_server", {opentracing::ChildOf(parent_span->get())});
787 962 : opentracing::Tracer::Global()->Inject(span->context(), writer);
788 :
789 : mongoc_client_t *mongodb_client =
790 962 : mongoc_client_pool_pop(_mongodb_client_pool);
791 962 : if (!mongodb_client) {
792 0 : ServiceException se;
793 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
794 0 : se.message = "Failed to pop a client from MongoDB pool";
795 0 : throw se;
796 : }
797 : auto collection = mongoc_client_get_collection(mongodb_client, "social-graph",
798 962 : "social-graph");
799 960 : if (!collection) {
800 0 : ServiceException se;
801 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
802 0 : se.message = "Failed to create collection social_graph from MongoDB";
803 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
804 0 : throw se;
805 : }
806 :
807 960 : bson_t *new_doc = BCON_NEW("user_id", BCON_INT64(user_id), "followers", "[",
808 : "]", "followees", "[", "]");
809 : bson_error_t error;
810 1924 : auto insert_span = opentracing::Tracer::Global()->StartSpan(
811 : "social_graph_mongo_insert_client",
812 3848 : {opentracing::ChildOf(&span->context())});
813 : bool inserted = mongoc_collection_insert_one(collection, new_doc, nullptr,
814 962 : nullptr, &error);
815 962 : insert_span->Finish();
816 961 : if (!inserted) {
817 0 : LOG(error) << "Failed to insert social graph for user " << user_id
818 0 : << " to MongoDB: " << error.message;
819 0 : ServiceException se;
820 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
821 0 : se.message = error.message;
822 0 : bson_destroy(new_doc);
823 0 : mongoc_collection_destroy(collection);
824 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
825 0 : throw se;
826 : }
827 961 : bson_destroy(new_doc);
828 962 : mongoc_collection_destroy(collection);
829 961 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
830 1924 : LOG(info) << "InsertUser operation completed [req_id=" << req_id << ", user_id=" << user_id << "]";
831 962 : span->Finish();
832 962 : }
833 :
834 37579 : void SocialGraphHandler::FollowWithUsername(
835 : int64_t req_id, const std::string &user_name,
836 : const std::string &followee_name,
837 : const std::map<std::string, std::string> &carrier) {
838 75171 : LOG(info) << "Received FollowWithUsername request [req_id=" << req_id << ", user_name=" << user_name << ", followee_name=" << followee_name << "]";
839 : // Initialize a span
840 75242 : TextMapReader reader(carrier);
841 75239 : std::map<std::string, std::string> writer_text_map;
842 75234 : TextMapWriter writer(writer_text_map);
843 75236 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
844 75235 : auto span = opentracing::Tracer::Global()->StartSpan(
845 : "follow_with_username_server",
846 150479 : {opentracing::ChildOf(parent_span->get())});
847 37619 : opentracing::Tracer::Global()->Inject(span->context(), writer);
848 :
849 37612 : std::future<int64_t> user_id_future = std::async(std::launch::async, [&]() {
850 75217 : auto user_client_wrapper = _user_service_client_pool->Pop();
851 37624 : if (!user_client_wrapper) {
852 0 : ServiceException se;
853 0 : se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
854 0 : se.message = "Failed to connect to social-graph-service";
855 0 : throw se;
856 : }
857 37624 : auto user_client = user_client_wrapper->GetClient();
858 : int64_t _return;
859 : try {
860 37619 : _return = user_client->GetUserId(req_id, user_name, writer_text_map);
861 0 : } catch (...) {
862 0 : _user_service_client_pool->Remove(user_client_wrapper);
863 0 : LOG(error) << "Failed to get user_id from user-service";
864 0 : throw;
865 : }
866 37605 : _user_service_client_pool->Keepalive(user_client_wrapper);
867 37618 : return _return;
868 75239 : });
869 :
870 : std::future<int64_t> followee_id_future =
871 37615 : std::async(std::launch::async, [&]() {
872 75220 : auto user_client_wrapper = _user_service_client_pool->Pop();
873 37620 : if (!user_client_wrapper) {
874 0 : ServiceException se;
875 0 : se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
876 0 : se.message = "Failed to connect to social-graph-service";
877 0 : throw se;
878 : }
879 37620 : auto user_client = user_client_wrapper->GetClient();
880 : int64_t _return;
881 : try {
882 : _return =
883 37620 : user_client->GetUserId(req_id, followee_name, writer_text_map);
884 0 : } catch (...) {
885 0 : _user_service_client_pool->Remove(user_client_wrapper);
886 0 : LOG(error) << "Failed to get user_id from user-service";
887 0 : throw;
888 : }
889 37605 : _user_service_client_pool->Keepalive(user_client_wrapper);
890 37612 : return _return;
891 75198 : });
892 :
893 : int64_t user_id;
894 : int64_t followee_id;
895 : try {
896 37609 : user_id = user_id_future.get();
897 37607 : followee_id = followee_id_future.get();
898 0 : } catch (const std::exception &e) {
899 0 : LOG(warning) << e.what();
900 0 : throw;
901 : }
902 :
903 37609 : if (user_id >= 0 && followee_id >= 0) {
904 37619 : Follow(req_id, user_id, followee_id, writer_text_map);
905 : }
906 75184 : LOG(info) << "FollowWithUsername operation completed [req_id=" << req_id << ", user_name=" << user_name << ", followee_name=" << followee_name << "]";
907 37624 : span->Finish();
908 37622 : }
909 :
910 0 : void SocialGraphHandler::UnfollowWithUsername(
911 : int64_t req_id, const std::string &user_name,
912 : const std::string &followee_name,
913 : const std::map<std::string, std::string> &carrier) {
914 0 : LOG(info) << "Received UnfollowWithUsername request [req_id=" << req_id << ", user_name=" << user_name << ", followee_name=" << followee_name << "]";
915 : // Initialize a span
916 0 : TextMapReader reader(carrier);
917 0 : std::map<std::string, std::string> writer_text_map;
918 0 : TextMapWriter writer(writer_text_map);
919 0 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
920 0 : auto span = opentracing::Tracer::Global()->StartSpan(
921 : "unfollow_with_username_server",
922 0 : {opentracing::ChildOf(parent_span->get())});
923 0 : opentracing::Tracer::Global()->Inject(span->context(), writer);
924 :
925 0 : std::future<int64_t> user_id_future = std::async(std::launch::async, [&]() {
926 0 : auto user_client_wrapper = _user_service_client_pool->Pop();
927 0 : if (!user_client_wrapper) {
928 0 : ServiceException se;
929 0 : se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
930 0 : se.message = "Failed to connect to social-graph-service";
931 0 : throw se;
932 : }
933 0 : auto user_client = user_client_wrapper->GetClient();
934 : int64_t _return;
935 : try {
936 0 : _return = user_client->GetUserId(req_id, user_name, writer_text_map);
937 0 : } catch (...) {
938 0 : _user_service_client_pool->Remove(user_client_wrapper);
939 0 : LOG(error) << "Failed to get user_id from user-service";
940 0 : throw;
941 : }
942 0 : _user_service_client_pool->Keepalive(user_client_wrapper);
943 0 : return _return;
944 0 : });
945 :
946 : std::future<int64_t> followee_id_future =
947 0 : std::async(std::launch::async, [&]() {
948 0 : auto user_client_wrapper = _user_service_client_pool->Pop();
949 0 : if (!user_client_wrapper) {
950 0 : ServiceException se;
951 0 : se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
952 0 : se.message = "Failed to connect to social-graph-service";
953 0 : throw se;
954 : }
955 0 : auto user_client = user_client_wrapper->GetClient();
956 : int64_t _return;
957 : try {
958 : _return =
959 0 : user_client->GetUserId(req_id, followee_name, writer_text_map);
960 0 : } catch (...) {
961 0 : _user_service_client_pool->Remove(user_client_wrapper);
962 0 : LOG(error) << "Failed to get user_id from user-service";
963 0 : throw;
964 : }
965 0 : _user_service_client_pool->Keepalive(user_client_wrapper);
966 0 : return _return;
967 0 : });
968 :
969 : int64_t user_id;
970 : int64_t followee_id;
971 : try {
972 0 : user_id = user_id_future.get();
973 0 : followee_id = followee_id_future.get();
974 0 : } catch (...) {
975 0 : throw;
976 : }
977 :
978 0 : if (user_id >= 0 && followee_id >= 0) {
979 : try {
980 0 : Unfollow(req_id, user_id, followee_id, writer_text_map);
981 0 : } catch (...) {
982 0 : throw;
983 : }
984 : }
985 0 : LOG(info) << "UnfollowWithUsername operation completed [req_id=" << req_id << ", user_name=" << user_name << ", followee_name=" << followee_name << "]";
986 0 : span->Finish();
987 0 : }
988 :
989 : } // namespace social_network
990 :
991 : #endif // SOCIAL_NETWORK_MICROSERVICES_SOCIALGRAPHHANDLER_H
|