LCOV - code coverage report
Current view: top level - src/HomeTimelineService - HomeTimelineHandler.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 72 155 46.5 %
Date: 2025-11-04 00:23:31 Functions: 3 8 37.5 %

          Line data    Source code
       1             : #ifndef SOCIAL_NETWORK_MICROSERVICES_SRC_HOMETIMELINESERVICE_HOMETIMELINEHANDLER_H_
       2             : #define SOCIAL_NETWORK_MICROSERVICES_SRC_HOMETIMELINESERVICE_HOMETIMELINEHANDLER_H_
       3             : 
       4             : #include <sw/redis++/redis++.h>
       5             : 
       6             : #include <future>
       7             : #include <iostream>
       8             : #include <string>
       9             : 
      10             : #include "../../gen-cpp/HomeTimelineService.h"
      11             : #include "../../gen-cpp/PostStorageService.h"
      12             : #include "../../gen-cpp/SocialGraphService.h"
      13             : #include "../ClientPool.h"
      14             : #include "../ThriftClient.h"
      15             : #include "../logger.h"
      16             : #include "../tracing.h"
      17             : 
      18             : using namespace sw::redis;
      19             : namespace social_network {
      20             : class HomeTimelineHandler : public HomeTimelineServiceIf {
      21             :  public:
      22             :   HomeTimelineHandler(Redis *,
      23             :                       ClientPool<ThriftClient<PostStorageServiceClient>> *,
      24             :                       ClientPool<ThriftClient<SocialGraphServiceClient>> *);
      25             : 
      26             : 
      27             :   HomeTimelineHandler(Redis *,Redis *,
      28             :       ClientPool<ThriftClient<PostStorageServiceClient>>*,
      29             :       ClientPool<ThriftClient<SocialGraphServiceClient>>*);
      30             : 
      31             : 
      32             :   HomeTimelineHandler(RedisCluster *,
      33             :                       ClientPool<ThriftClient<PostStorageServiceClient>> *,
      34             :                       ClientPool<ThriftClient<SocialGraphServiceClient>> *);
      35           0 :   ~HomeTimelineHandler() override = default;
      36             : 
      37             :   bool IsRedisReplicationEnabled();
      38             : 
      39             :   void ReadHomeTimeline(std::vector<Post> &, int64_t, int64_t, int, int,
      40             :                         const std::map<std::string, std::string> &) override;
      41             : 
      42             :   void WriteHomeTimeline(int64_t, int64_t, int64_t, int64_t,
      43             :                          const std::vector<int64_t> &,
      44             :                          const std::map<std::string, std::string> &) override;
      45             : 
      46             :  private:
      47             :      Redis *_redis_replica_pool;
      48             :      Redis *_redis_primary_pool;
      49             :      Redis *_redis_client_pool;
      50             :      RedisCluster *_redis_cluster_client_pool;
      51             :      ClientPool<ThriftClient<PostStorageServiceClient>> *_post_client_pool;
      52             :      ClientPool<ThriftClient<SocialGraphServiceClient>> *_social_graph_client_pool;
      53             : };
      54             : 
      55           1 : HomeTimelineHandler::HomeTimelineHandler(
      56             :     Redis *redis_pool,
      57             :     ClientPool<ThriftClient<PostStorageServiceClient>> *post_client_pool,
      58             :     ClientPool<ThriftClient<SocialGraphServiceClient>>
      59           1 :         *social_graph_client_pool) {
      60           1 :     _redis_primary_pool = nullptr;
      61           1 :     _redis_replica_pool = nullptr;
      62           1 :     _redis_client_pool = redis_pool;
      63           1 :     _redis_cluster_client_pool = nullptr;
      64           1 :     _post_client_pool = post_client_pool;
      65           1 :     _social_graph_client_pool = social_graph_client_pool;
      66           1 : }
      67             : 
      68           0 : HomeTimelineHandler::HomeTimelineHandler(
      69             :     RedisCluster *redis_pool,
      70             :     ClientPool<ThriftClient<PostStorageServiceClient>> *post_client_pool,
      71             :     ClientPool<ThriftClient<SocialGraphServiceClient>>
      72           0 :         *social_graph_client_pool) {
      73           0 :     _redis_primary_pool = nullptr;
      74           0 :     _redis_replica_pool = nullptr;
      75           0 :     _redis_client_pool = nullptr;
      76           0 :     _redis_cluster_client_pool = redis_pool; 
      77           0 :     _post_client_pool = post_client_pool;
      78           0 :     _social_graph_client_pool = social_graph_client_pool;
      79           0 : }
      80             : 
      81           0 : HomeTimelineHandler::HomeTimelineHandler(
      82             :     Redis *redis_replica_pool,
      83             :     Redis *redis_primary_pool,
      84             :     ClientPool<ThriftClient<PostStorageServiceClient>>* post_client_pool,
      85             :     ClientPool<ThriftClient<SocialGraphServiceClient>>
      86           0 :     * social_graph_client_pool) {
      87           0 :     _redis_primary_pool = redis_primary_pool;
      88           0 :     _redis_replica_pool = redis_replica_pool;
      89           0 :     _redis_client_pool = nullptr;
      90           0 :     _redis_cluster_client_pool = nullptr;
      91           0 :     _post_client_pool = post_client_pool;
      92           0 :     _social_graph_client_pool = social_graph_client_pool;
      93           0 : }
      94             : 
      95           0 : bool HomeTimelineHandler::IsRedisReplicationEnabled() {
      96           0 :     return (_redis_primary_pool || _redis_replica_pool);
      97             : }
      98             : 
      99         200 : void HomeTimelineHandler::WriteHomeTimeline(
     100             :     int64_t req_id, int64_t post_id, int64_t user_id, int64_t timestamp,
     101             :     const std::vector<int64_t> &user_mentions_id,
     102             :     const std::map<std::string, std::string> &carrier) {
     103         400 :   LOG(info) << "Start writing home timeline [req_id=" << req_id << ", user_id=" << user_id << ", post_id=" << post_id << "]";
     104             :   // Initialize a span
     105         400 :   TextMapReader reader(carrier);
     106         400 :   auto parent_span = opentracing::Tracer::Global()->Extract(reader);
     107         400 :   auto span = opentracing::Tracer::Global()->StartSpan(
     108         800 :       "write_home_timeline_server", {opentracing::ChildOf(parent_span->get())});
     109             : 
     110             :   // Find followers of the user
     111         400 :   auto followers_span = opentracing::Tracer::Global()->StartSpan(
     112         800 :       "get_followers_client", {opentracing::ChildOf(&span->context())});
     113         400 :   std::map<std::string, std::string> writer_text_map;
     114         400 :   TextMapWriter writer(writer_text_map);
     115         200 :   opentracing::Tracer::Global()->Inject(followers_span->context(), writer);
     116             : 
     117         200 :   auto social_graph_client_wrapper = _social_graph_client_pool->Pop();
     118         200 :   if (!social_graph_client_wrapper) {
     119           0 :     ServiceException se;
     120           0 :     se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
     121           0 :     se.message = "Failed to connect to social-graph-service";
     122           0 :     throw se;
     123             :   }
     124         200 :   auto social_graph_client = social_graph_client_wrapper->GetClient();
     125         400 :   std::vector<int64_t> followers_id;
     126             :   try {
     127             :     social_graph_client->GetFollowers(followers_id, req_id, user_id,
     128         200 :                                       writer_text_map);
     129           0 :   } catch (...) {
     130           0 :     LOG(error) << "Failed to get followers from social-network-service";
     131           0 :     _social_graph_client_pool->Remove(social_graph_client_wrapper);
     132           0 :     throw;
     133             :   }
     134         200 :   _social_graph_client_pool->Keepalive(social_graph_client_wrapper);
     135         200 :   followers_span->Finish();
     136         400 :   LOG(info) << "Successfully got followers [req_id=" << req_id << ", user_id=" << user_id << ", followers_count=" << followers_id.size() << "]";
     137             : 
     138         400 :   std::set<int64_t> followers_id_set(followers_id.begin(), followers_id.end());
     139         200 :   followers_id_set.insert(user_mentions_id.begin(), user_mentions_id.end());
     140             : 
     141             :   // Update Redis ZSet
     142             :   // Zset key: follower_id, Zset value: post_id_str, Zset score: timestamp_str
     143         400 :   auto redis_span = opentracing::Tracer::Global()->StartSpan(
     144             :       "write_home_timeline_redis_update_client",
     145         800 :       {opentracing::ChildOf(&span->context())});
     146         400 :   std::string post_id_str = std::to_string(post_id);
     147             : 
     148             :   {
     149         200 :     if (_redis_client_pool) {
     150         400 :       auto pipe = _redis_client_pool->pipeline(false);
     151        6800 :       for (auto &follower_id : followers_id_set) {
     152       13200 :         pipe.zadd(std::to_string(follower_id), post_id_str, timestamp,
     153       19800 :                   UpdateType::NOT_EXIST);
     154             :       }
     155             :       try {
     156         400 :         auto replies = pipe.exec();
     157         400 :         LOG(info) << "Successfully wrote to redis home timeline [req_id=" << req_id << ", post_id=" << post_id << ", followers_count=" << followers_id_set.size() << "]";
     158           0 :       } catch (const Error &err) {
     159           0 :         LOG(error) << err.what();
     160           0 :         throw err;
     161             :       }
     162             :     }
     163             :     
     164           0 :     else if (IsRedisReplicationEnabled()) {
     165           0 :         auto pipe = _redis_primary_pool->pipeline(false);
     166           0 :         for (auto& follower_id : followers_id_set) {
     167           0 :             pipe.zadd(std::to_string(follower_id), post_id_str, timestamp,
     168           0 :                 UpdateType::NOT_EXIST);
     169             :         }
     170             :         try {
     171           0 :             auto replies = pipe.exec();
     172           0 :             LOG(info) << "Successfully wrote to redis home timeline [req_id=" << req_id << ", post_id=" << post_id << ", followers_count=" << followers_id_set.size() << "]";
     173             :         }
     174           0 :         catch (const Error& err) {
     175           0 :             LOG(error) << err.what();
     176           0 :             throw err;
     177             :         }
     178             :     }
     179             :     
     180             :     else {
     181             :       // Create multi-pipeline that match with shards pool
     182           0 :       std::map<std::shared_ptr<ConnectionPool>, std::shared_ptr<Pipeline>> pipe_map;
     183           0 :       auto *shards_pool = _redis_cluster_client_pool->get_shards_pool();
     184             : 
     185           0 :       for (auto &follower_id : followers_id_set) {
     186           0 :         auto conn = shards_pool->fetch(std::to_string(follower_id));
     187           0 :         auto pipe = pipe_map.find(conn);
     188           0 :         if(pipe == pipe_map.end()) {//Not found, create new pipeline and insert
     189           0 :           auto new_pipe = std::make_shared<Pipeline>(_redis_cluster_client_pool->pipeline(std::to_string(follower_id), false));
     190           0 :           pipe_map.insert(make_pair(conn, new_pipe));
     191           0 :           auto *_pipe = new_pipe.get();
     192           0 :           _pipe->zadd(std::to_string(follower_id), post_id_str, timestamp,
     193           0 :                   UpdateType::NOT_EXIST);
     194             :         }else{//Found, use exist pipeline
     195           0 :           std::pair<std::shared_ptr<ConnectionPool>, std::shared_ptr<Pipeline>> found = *pipe;
     196           0 :           auto *_pipe = found.second.get();
     197           0 :           _pipe->zadd(std::to_string(follower_id), post_id_str, timestamp,
     198           0 :                   UpdateType::NOT_EXIST);
     199             :         }
     200             :       }
     201             :       // LOG(info) <<"followers_id_set items:" << followers_id_set.size()<<"; pipeline items:" << pipe_map.size();
     202             :       try {
     203           0 :         for(auto const &it : pipe_map) {
     204           0 :           auto _pipe = it.second.get();
     205           0 :           _pipe->exec();
     206             :         }
     207           0 :         LOG(info) << "Successfully wrote to redis home timeline [req_id=" << req_id << ", post_id=" << post_id << ", followers_count=" << followers_id_set.size() << "]";
     208           0 :       } catch (const Error &err) {
     209           0 :         LOG(error) << err.what();
     210           0 :         throw err;
     211             :       }
     212             :     }
     213             :   }
     214         200 :   redis_span->Finish();
     215         400 :   LOG(info) << "Home timeline write completed [req_id=" << req_id << ", post_id=" << post_id << "]";
     216         200 : }
     217             : 
     218             : 
     219        1000 : void HomeTimelineHandler::ReadHomeTimeline(
     220             :     std::vector<Post> &_return, int64_t req_id, int64_t user_id, int start_idx,
     221             :     int stop_idx, const std::map<std::string, std::string> &carrier) {
     222        2000 :   LOG(info) << "Start reading home timeline [req_id=" << req_id << ", user_id=" << user_id << ", start=" << start_idx << ", stop=" << stop_idx << "]";
     223             :   // Initialize a span
     224        1200 :   TextMapReader reader(carrier);
     225        1200 :   std::map<std::string, std::string> writer_text_map;
     226        1200 :   TextMapWriter writer(writer_text_map);
     227        1200 :   auto parent_span = opentracing::Tracer::Global()->Extract(reader);
     228        2000 :   auto span = opentracing::Tracer::Global()->StartSpan(
     229        3200 :       "read_home_timeline_server", {opentracing::ChildOf(parent_span->get())});
     230        1000 :   opentracing::Tracer::Global()->Inject(span->context(), writer);
     231             : 
     232        1000 :   if (stop_idx <= start_idx || start_idx < 0) {
     233         800 :     return;
     234             :   }
     235             : 
     236         400 :   auto redis_span = opentracing::Tracer::Global()->StartSpan(
     237             :       "read_home_timeline_redis_find_client",
     238         800 :       {opentracing::ChildOf(&span->context())});
     239             : 
     240         400 :   std::vector<std::string> post_ids_str;
     241             :   try {
     242         200 :     if (_redis_client_pool) {
     243         600 :       _redis_client_pool->zrevrange(std::to_string(user_id), start_idx,
     244         200 :                                     stop_idx - 1,
     245         200 :                                     std::back_inserter(post_ids_str));
     246             :     }
     247           0 :     else if (IsRedisReplicationEnabled()) {
     248           0 :         _redis_replica_pool->zrevrange(std::to_string(user_id), start_idx,
     249           0 :                                        stop_idx - 1,
     250           0 :                                        std::back_inserter(post_ids_str));
     251             :     }
     252             :     
     253             :     else {
     254           0 :       _redis_cluster_client_pool->zrevrange(std::to_string(user_id), start_idx,
     255           0 :                                             stop_idx - 1,
     256           0 :                                             std::back_inserter(post_ids_str));
     257             :     }
     258           0 :   } catch (const Error &err) {
     259           0 :     LOG(error) << err.what();
     260           0 :     throw err;
     261             :   }
     262         200 :   redis_span->Finish();
     263             : 
     264         400 :   std::vector<int64_t> post_ids;
     265         200 :   for (auto &post_id_str : post_ids_str) {
     266           0 :     post_ids.emplace_back(std::stoul(post_id_str));
     267             :   }
     268             : 
     269         200 :   auto post_client_wrapper = _post_client_pool->Pop();
     270         200 :   if (!post_client_wrapper) {
     271           0 :     ServiceException se;
     272           0 :     se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
     273           0 :     se.message = "Failed to connect to post-storage-service";
     274           0 :     throw se;
     275             :   }
     276         200 :   auto post_client = post_client_wrapper->GetClient();
     277             :   try {
     278         200 :     post_client->ReadPosts(_return, req_id, post_ids, writer_text_map);
     279           0 :   } catch (...) {
     280           0 :     _post_client_pool->Remove(post_client_wrapper);
     281           0 :     LOG(error) << "Failed to read posts from post-storage-service";
     282           0 :     throw;
     283             :   }
     284         200 :   _post_client_pool->Keepalive(post_client_wrapper);
     285         400 :   LOG(info) << "Successfully got posts from post-storage-service [req_id=" << req_id << ", user_id=" << user_id << ", post_count=" << _return.size() << "]";
     286             : 
     287         200 :   span->Finish();
     288         400 :   LOG(info) << "Home timeline read completed [req_id=" << req_id << ", user_id=" << user_id << "]";
     289             : }
     290             : 
     291             : }  // namespace social_network
     292             : 
     293             : #endif  // SOCIAL_NETWORK_MICROSERVICES_SRC_HOMETIMELINESERVICE_HOMETIMELINEHANDLER_H_

Generated by: LCOV version 1.12