LCOV - code coverage report
Current view: top level - src/UserTimelineService - UserTimelineHandler.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 98 212 46.2 %
Date: 2025-11-03 23:12:51 Functions: 4 9 44.4 %

          Line data    Source code
       1             : #ifndef SOCIAL_NETWORK_MICROSERVICES_SRC_USERTIMELINESERVICE_USERTIMELINEHANDLER_H_
       2             : #define SOCIAL_NETWORK_MICROSERVICES_SRC_USERTIMELINESERVICE_USERTIMELINEHANDLER_H_
       3             : 
       4             : #include <bson/bson.h>
       5             : #include <mongoc.h>
       6             : #include <sw/redis++/redis++.h>
       7             : 
       8             : #include <future>
       9             : #include <iostream>
      10             : #include <string>
      11             : 
      12             : #include "../../gen-cpp/PostStorageService.h"
      13             : #include "../../gen-cpp/UserTimelineService.h"
      14             : #include "../ClientPool.h"
      15             : #include "../ThriftClient.h"
      16             : #include "../logger.h"
      17             : #include "../tracing.h"
      18             : 
      19             : using namespace sw::redis;
      20             : 
      21             : namespace social_network {
      22             : 
      23             : class UserTimelineHandler : public UserTimelineServiceIf {
      24             :  public:
      25             :   UserTimelineHandler(Redis *, mongoc_client_pool_t *,
      26             :                       ClientPool<ThriftClient<PostStorageServiceClient>> *);
      27             : 
      28             :   UserTimelineHandler(Redis *, Redis *, mongoc_client_pool_t *,
      29             :       ClientPool<ThriftClient<PostStorageServiceClient>> *);
      30             : 
      31             :   UserTimelineHandler(RedisCluster *, mongoc_client_pool_t *,
      32             :                       ClientPool<ThriftClient<PostStorageServiceClient>> *);
      33           0 :   ~UserTimelineHandler() override = default;
      34             : 
      35             :   bool IsRedisReplicationEnabled();
      36             : 
      37             :   void WriteUserTimeline(
      38             :       int64_t req_id, int64_t post_id, int64_t user_id, int64_t timestamp,
      39             :       const std::map<std::string, std::string> &carrier) override;
      40             : 
      41             :   void ReadUserTimeline(std::vector<Post> &, int64_t, int64_t, int, int,
      42             :                         const std::map<std::string, std::string> &) override;
      43             : 
      44             :  private:
      45             :   Redis *_redis_client_pool;
      46             :   Redis *_redis_replica_pool;
      47             :   Redis *_redis_primary_pool;
      48             :   RedisCluster *_redis_cluster_client_pool;
      49             :   mongoc_client_pool_t *_mongodb_client_pool;
      50             :   ClientPool<ThriftClient<PostStorageServiceClient>> *_post_client_pool;
      51             : };
      52             : 
      53           1 : UserTimelineHandler::UserTimelineHandler(
      54             :     Redis *redis_pool, mongoc_client_pool_t *mongodb_pool,
      55           1 :     ClientPool<ThriftClient<PostStorageServiceClient>> *post_client_pool) {
      56           1 :   _redis_client_pool = redis_pool;
      57           1 :   _redis_replica_pool = nullptr;
      58           1 :   _redis_primary_pool = nullptr;
      59           1 :   _redis_cluster_client_pool = nullptr;
      60           1 :   _mongodb_client_pool = mongodb_pool;
      61           1 :   _post_client_pool = post_client_pool;
      62           1 : }
      63             : 
      64           0 : UserTimelineHandler::UserTimelineHandler(
      65             :     Redis* redis_replica_pool, Redis* redis_primary_pool, mongoc_client_pool_t* mongodb_pool,
      66           0 :     ClientPool<ThriftClient<PostStorageServiceClient>>* post_client_pool) {
      67           0 :     _redis_client_pool = nullptr;
      68           0 :     _redis_replica_pool = redis_replica_pool;
      69           0 :     _redis_primary_pool = redis_primary_pool;
      70           0 :     _redis_cluster_client_pool = nullptr;
      71           0 :     _mongodb_client_pool = mongodb_pool;
      72           0 :     _post_client_pool = post_client_pool;
      73           0 : }
      74             : 
      75           0 : UserTimelineHandler::UserTimelineHandler(
      76             :     RedisCluster *redis_pool, mongoc_client_pool_t *mongodb_pool,
      77           0 :     ClientPool<ThriftClient<PostStorageServiceClient>> *post_client_pool) {
      78           0 :   _redis_cluster_client_pool = redis_pool;
      79           0 :   _redis_replica_pool = nullptr;
      80           0 :   _redis_primary_pool = nullptr;
      81           0 :   _redis_client_pool = nullptr;
      82           0 :   _mongodb_client_pool = mongodb_pool;
      83           0 :   _post_client_pool = post_client_pool;
      84           0 : }
      85             : 
      86           0 : bool UserTimelineHandler::IsRedisReplicationEnabled() {
      87           0 :     return (_redis_primary_pool || _redis_replica_pool);
      88             : }
      89             : 
      90         200 : void UserTimelineHandler::WriteUserTimeline(
      91             :     int64_t req_id, int64_t post_id, int64_t user_id, int64_t timestamp,
      92             :     const std::map<std::string, std::string> &carrier) {
      93             :   // 记录收到写入请求
      94         400 :   LOG(info) << "Received WriteUserTimeline request [req_id=" << req_id << ", user_id=" << user_id << ", post_id=" << post_id << ", timestamp=" << timestamp << "]";
      95             :   // Initialize a span
      96         400 :   TextMapReader reader(carrier);
      97         400 :   std::map<std::string, std::string> writer_text_map;
      98         400 :   TextMapWriter writer(writer_text_map);
      99         400 :   auto parent_span = opentracing::Tracer::Global()->Extract(reader);
     100         400 :   auto span = opentracing::Tracer::Global()->StartSpan(
     101         800 :       "write_user_timeline_server", {opentracing::ChildOf(parent_span->get())});
     102         200 :   opentracing::Tracer::Global()->Inject(span->context(), writer);
     103             : 
     104             :   mongoc_client_t *mongodb_client =
     105         200 :       mongoc_client_pool_pop(_mongodb_client_pool);
     106         200 :   if (!mongodb_client) {
     107           0 :     ServiceException se;
     108           0 :     se.errorCode = ErrorCode::SE_MONGODB_ERROR;
     109           0 :     se.message = "Failed to pop a client from MongoDB pool";
     110           0 :     throw se;
     111             :   }
     112             :   auto collection = mongoc_client_get_collection(
     113         200 :       mongodb_client, "user-timeline", "user-timeline");
     114         200 :   if (!collection) {
     115           0 :     ServiceException se;
     116           0 :     se.errorCode = ErrorCode::SE_MONGODB_ERROR;
     117           0 :     se.message = "Failed to create collection user-timeline from MongoDB";
     118           0 :     mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
     119           0 :     throw se;
     120             :   }
     121         200 :   bson_t *query = bson_new();
     122             : 
     123         200 :   BSON_APPEND_INT64(query, "user_id", user_id);
     124             :   bson_t *update =
     125         200 :       BCON_NEW("$push", "{", "posts", "{", "$each", "[", "{", "post_id",
     126             :                BCON_INT64(post_id), "timestamp", BCON_INT64(timestamp), "}",
     127             :                "]", "$position", BCON_INT32(0), "}", "}");
     128             :   bson_error_t error;
     129             :   bson_t reply;
     130         400 :   auto update_span = opentracing::Tracer::Global()->StartSpan(
     131             :       "write_user_timeline_mongo_insert_client",
     132         800 :       {opentracing::ChildOf(&span->context())});
     133             :   bool updated = mongoc_collection_find_and_modify(collection, query, nullptr,
     134             :                                                    update, nullptr, false, true,
     135         200 :                                                    true, &reply, &error);
     136         200 :   update_span->Finish();
     137             : 
     138         200 :   if (!updated) {
     139             :     // update the newly inserted document (upsert: false)
     140             :     updated = mongoc_collection_find_and_modify(collection, query, nullptr,
     141             :                                                 update, nullptr, false, false,
     142           0 :                                                 true, &reply, &error);
     143           0 :     if (!updated) {
     144           0 :       LOG(error) << "Failed to update user-timeline for user " << user_id
     145           0 :                  << " to MongoDB: " << error.message;
     146           0 :       ServiceException se;
     147           0 :       se.errorCode = ErrorCode::SE_MONGODB_ERROR;
     148           0 :       se.message = error.message;
     149           0 :       bson_destroy(update);
     150           0 :       bson_destroy(query);
     151           0 :       bson_destroy(&reply);
     152           0 :       mongoc_collection_destroy(collection);
     153           0 :       mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
     154           0 :       throw se;
     155             :     }
     156             :   }
     157             : 
     158         200 :   bson_destroy(update);
     159         200 :   bson_destroy(&reply);
     160         200 :   bson_destroy(query);
     161         200 :   mongoc_collection_destroy(collection);
     162         200 :   mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
     163             : 
     164             :   // Update user's timeline in redis
     165         400 :   auto redis_span = opentracing::Tracer::Global()->StartSpan(
     166             :       "write_user_timeline_redis_update_client",
     167         800 :       {opentracing::ChildOf(&span->context())});
     168             :   try {
     169         200 :     if (_redis_client_pool)
     170         400 :       _redis_client_pool->zadd(std::to_string(user_id), std::to_string(post_id),
     171         200 :                               timestamp, UpdateType::NOT_EXIST);
     172           0 :     else if (IsRedisReplicationEnabled()) {
     173           0 :         _redis_primary_pool->zadd(std::to_string(user_id), std::to_string(post_id),
     174           0 :                               timestamp, UpdateType::NOT_EXIST);
     175             :     }
     176             :     else
     177           0 :       _redis_cluster_client_pool->zadd(std::to_string(user_id), std::to_string(post_id),
     178           0 :                               timestamp, UpdateType::NOT_EXIST);
     179             : 
     180           0 :   } catch (const Error &err) {
     181           0 :     LOG(error) << err.what();
     182           0 :     throw err;
     183             :   }
     184         200 :   redis_span->Finish();
     185         200 :   span->Finish();
     186             :   // 写入完成日志
     187         400 :   LOG(info) << "WriteUserTimeline completed [req_id=" << req_id << ", user_id=" << user_id << ", post_id=" << post_id << "]";
     188         200 : }
     189             : 
     190         800 : void UserTimelineHandler::ReadUserTimeline(
     191             :     std::vector<Post> &_return, int64_t req_id, int64_t user_id, int start,
     192             :     int stop, const std::map<std::string, std::string> &carrier) {
     193             :   // 记录收到读取请求
     194        1600 :   LOG(info) << "Received ReadUserTimeline request [req_id=" << req_id << ", user_id=" << user_id << ", start=" << start << ", stop=" << stop << "]";
     195             :   // Initialize a span
     196        1200 :   TextMapReader reader(carrier);
     197        1200 :   std::map<std::string, std::string> writer_text_map;
     198        1200 :   TextMapWriter writer(writer_text_map);
     199        1200 :   auto parent_span = opentracing::Tracer::Global()->Extract(reader);
     200        1600 :   auto span = opentracing::Tracer::Global()->StartSpan(
     201        2800 :       "read_user_timeline_server", {opentracing::ChildOf(parent_span->get())});
     202         800 :   opentracing::Tracer::Global()->Inject(span->context(), writer);
     203             : 
     204         800 :   if (stop <= start || start < 0) {
     205         400 :     return;
     206             :   }
     207             : 
     208         800 :   auto redis_span = opentracing::Tracer::Global()->StartSpan(
     209             :       "read_user_timeline_redis_find_client",
     210        1600 :       {opentracing::ChildOf(&span->context())});
     211             : 
     212         800 :   std::vector<std::string> post_ids_str;
     213             :   try {
     214         400 :     if (_redis_client_pool)
     215         800 :       _redis_client_pool->zrevrange(std::to_string(user_id), start, stop - 1,
     216         400 :                                   std::back_inserter(post_ids_str));
     217           0 :     else if (IsRedisReplicationEnabled()) {
     218           0 :         _redis_replica_pool->zrevrange(std::to_string(user_id), start, stop - 1,
     219           0 :             std::back_inserter(post_ids_str));
     220             :     }
     221             :     else
     222           0 :       _redis_cluster_client_pool->zrevrange(std::to_string(user_id), start, stop - 1,
     223           0 :                                   std::back_inserter(post_ids_str));
     224           0 :   } catch (const Error &err) {
     225           0 :     LOG(error) << err.what();
     226           0 :     throw err;
     227             :   }
     228         400 :   redis_span->Finish();
     229             : 
     230         800 :   std::vector<int64_t> post_ids;
     231         400 :   for (auto &post_id_str : post_ids_str) {
     232           0 :     post_ids.emplace_back(std::stoul(post_id_str));
     233             :   }
     234             : 
     235             :   // find in mongodb
     236         400 :   int mongo_start = start + post_ids.size();
     237         800 :   std::unordered_map<std::string, double> redis_update_map;
     238         400 :   if (mongo_start < stop) {
     239             :     // Instead find post_ids from mongodb
     240             :     mongoc_client_t *mongodb_client =
     241         400 :         mongoc_client_pool_pop(_mongodb_client_pool);
     242         400 :     if (!mongodb_client) {
     243           0 :       ServiceException se;
     244           0 :       se.errorCode = ErrorCode::SE_MONGODB_ERROR;
     245           0 :       se.message = "Failed to pop a client from MongoDB pool";
     246           0 :       throw se;
     247             :     }
     248             :     auto collection = mongoc_client_get_collection(
     249         400 :         mongodb_client, "user-timeline", "user-timeline");
     250         400 :     if (!collection) {
     251           0 :       ServiceException se;
     252           0 :       se.errorCode = ErrorCode::SE_MONGODB_ERROR;
     253           0 :       se.message = "Failed to create collection user-timeline from MongoDB";
     254           0 :       throw se;
     255             :     }
     256             : 
     257         400 :     bson_t *query = BCON_NEW("user_id", BCON_INT64(user_id));
     258         400 :     bson_t *opts = BCON_NEW("projection", "{", "posts", "{", "$slice", "[",
     259             :                             BCON_INT32(0), BCON_INT32(stop), "]", "}", "}");
     260             : 
     261         800 :     auto find_span = opentracing::Tracer::Global()->StartSpan(
     262             :         "user_timeline_mongo_find_client",
     263        1600 :         {opentracing::ChildOf(&span->context())});
     264             :     mongoc_cursor_t *cursor =
     265         400 :         mongoc_collection_find_with_opts(collection, query, opts, nullptr);
     266         400 :     find_span->Finish();
     267             :     const bson_t *doc;
     268         400 :     bool found = mongoc_cursor_next(cursor, &doc);
     269         400 :     if (found) {
     270             :       bson_iter_t iter_0;
     271             :       bson_iter_t iter_1;
     272             :       bson_iter_t post_id_child;
     273             :       bson_iter_t timestamp_child;
     274           0 :       int idx = 0;
     275           0 :       bson_iter_init(&iter_0, doc);
     276           0 :       bson_iter_init(&iter_1, doc);
     277           0 :       while (bson_iter_find_descendant(
     278           0 :                  &iter_0, ("posts." + std::to_string(idx) + ".post_id").c_str(),
     279           0 :                  &post_id_child) &&
     280           0 :              BSON_ITER_HOLDS_INT64(&post_id_child) &&
     281           0 :              bson_iter_find_descendant(
     282             :                  &iter_1,
     283           0 :                  ("posts." + std::to_string(idx) + ".timestamp").c_str(),
     284           0 :                  &timestamp_child) &&
     285           0 :              BSON_ITER_HOLDS_INT64(&timestamp_child)) {
     286           0 :         auto curr_post_id = bson_iter_int64(&post_id_child);
     287           0 :         auto curr_timestamp = bson_iter_int64(&timestamp_child);
     288           0 :         if (idx >= mongo_start) {
     289             :           //In mixed workload condition, post may composed between redis and mongo read
     290             :           //mongodb index will shift and duplicate post_id occurs
     291           0 :           if ( std::find(post_ids.begin(), post_ids.end(), curr_post_id) == post_ids.end() ) {
     292           0 :             post_ids.emplace_back(curr_post_id);
     293             :           }
     294             :         }
     295           0 :         redis_update_map.insert(std::make_pair(std::to_string(curr_post_id),
     296           0 :                                                (double)curr_timestamp));
     297           0 :         bson_iter_init(&iter_0, doc);
     298           0 :         bson_iter_init(&iter_1, doc);
     299           0 :         idx++;
     300             :       }
     301             :     }
     302         400 :     bson_destroy(opts);
     303         400 :     bson_destroy(query);
     304         400 :     mongoc_cursor_destroy(cursor);
     305         400 :     mongoc_collection_destroy(collection);
     306         400 :     mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
     307             :   }
     308             : 
     309             :   std::future<std::vector<Post>> post_future =
     310         400 :       std::async(std::launch::async, [&]() {
     311         800 :         auto post_client_wrapper = _post_client_pool->Pop();
     312         400 :         if (!post_client_wrapper) {
     313           0 :           ServiceException se;
     314           0 :           se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
     315           0 :           se.message = "Failed to connect to post-storage-service";
     316           0 :           throw se;
     317             :         }
     318         400 :         std::vector<Post> _return_posts;
     319         400 :         auto post_client = post_client_wrapper->GetClient();
     320             :         try {
     321         400 :           post_client->ReadPosts(_return_posts, req_id, post_ids,
     322         800 :                                  writer_text_map);
     323           0 :         } catch (...) {
     324           0 :           _post_client_pool->Remove(post_client_wrapper);
     325           0 :           LOG(error) << "Failed to read posts from post-storage-service";
     326           0 :           throw;
     327             :         }
     328         400 :         _post_client_pool->Keepalive(post_client_wrapper);
     329         400 :         return _return_posts;
     330         800 :       });
     331             : 
     332         400 :   if (redis_update_map.size() > 0) {
     333           0 :     auto redis_update_span = opentracing::Tracer::Global()->StartSpan(
     334             :         "user_timeline_redis_update_client",
     335           0 :         {opentracing::ChildOf(&span->context())});
     336             :     try {
     337           0 :       if (_redis_client_pool)
     338           0 :         _redis_client_pool->zadd(std::to_string(user_id),
     339             :                                redis_update_map.begin(),
     340           0 :                                redis_update_map.end());
     341           0 :       else if (IsRedisReplicationEnabled()) {
     342           0 :           _redis_primary_pool->zadd(std::to_string(user_id),
     343             :               redis_update_map.begin(),
     344           0 :               redis_update_map.end());
     345             :       }
     346             :       else
     347           0 :         _redis_cluster_client_pool->zadd(std::to_string(user_id),
     348             :                                redis_update_map.begin(),
     349           0 :                                redis_update_map.end());
     350             : 
     351           0 :     } catch (const Error &err) {
     352           0 :       LOG(error) << err.what();
     353           0 :       throw err;
     354             :     }
     355           0 :     redis_update_span->Finish();
     356             :   }
     357             : 
     358             :   try {
     359         400 :     _return = post_future.get();
     360           0 :   } catch (...) {
     361           0 :     LOG(error) << "Failed to get post from post-storage-service";
     362           0 :     throw;
     363             :   }
     364         400 :   span->Finish();
     365             :   // 读取完成日志
     366         800 :   LOG(info) << "ReadUserTimeline completed [req_id=" << req_id << ", user_id=" << user_id << ", return count=" << _return.size() << "]";
     367             : }
     368             : 
     369             : }  // namespace social_network
     370             : 
     371             : #endif  // SOCIAL_NETWORK_MICROSERVICES_SRC_USERTIMELINESERVICE_USERTIMELINEHANDLER_H_

Generated by: LCOV version 1.12