Line data Source code
1 : #ifndef SOCIAL_NETWORK_MICROSERVICES_SRC_HOMETIMELINESERVICE_HOMETIMELINEHANDLER_H_
2 : #define SOCIAL_NETWORK_MICROSERVICES_SRC_HOMETIMELINESERVICE_HOMETIMELINEHANDLER_H_
3 :
4 : #include <sw/redis++/redis++.h>
5 :
6 : #include <future>
7 : #include <iostream>
8 : #include <string>
9 :
10 : #include "../../gen-cpp/HomeTimelineService.h"
11 : #include "../../gen-cpp/PostStorageService.h"
12 : #include "../../gen-cpp/SocialGraphService.h"
13 : #include "../ClientPool.h"
14 : #include "../ThriftClient.h"
15 : #include "../logger.h"
16 : #include "../tracing.h"
17 :
18 : using namespace sw::redis;
19 : namespace social_network {
20 : class HomeTimelineHandler : public HomeTimelineServiceIf {
21 : public:
22 : HomeTimelineHandler(Redis *,
23 : ClientPool<ThriftClient<PostStorageServiceClient>> *,
24 : ClientPool<ThriftClient<SocialGraphServiceClient>> *);
25 :
26 :
27 : HomeTimelineHandler(Redis *,Redis *,
28 : ClientPool<ThriftClient<PostStorageServiceClient>>*,
29 : ClientPool<ThriftClient<SocialGraphServiceClient>>*);
30 :
31 :
32 : HomeTimelineHandler(RedisCluster *,
33 : ClientPool<ThriftClient<PostStorageServiceClient>> *,
34 : ClientPool<ThriftClient<SocialGraphServiceClient>> *);
35 0 : ~HomeTimelineHandler() override = default;
36 :
37 : bool IsRedisReplicationEnabled();
38 :
39 : void ReadHomeTimeline(std::vector<Post> &, int64_t, int64_t, int, int,
40 : const std::map<std::string, std::string> &) override;
41 :
42 : void WriteHomeTimeline(int64_t, int64_t, int64_t, int64_t,
43 : const std::vector<int64_t> &,
44 : const std::map<std::string, std::string> &) override;
45 :
46 : private:
47 : Redis *_redis_replica_pool;
48 : Redis *_redis_primary_pool;
49 : Redis *_redis_client_pool;
50 : RedisCluster *_redis_cluster_client_pool;
51 : ClientPool<ThriftClient<PostStorageServiceClient>> *_post_client_pool;
52 : ClientPool<ThriftClient<SocialGraphServiceClient>> *_social_graph_client_pool;
53 : };
54 :
55 1 : HomeTimelineHandler::HomeTimelineHandler(
56 : Redis *redis_pool,
57 : ClientPool<ThriftClient<PostStorageServiceClient>> *post_client_pool,
58 : ClientPool<ThriftClient<SocialGraphServiceClient>>
59 1 : *social_graph_client_pool) {
60 1 : _redis_primary_pool = nullptr;
61 1 : _redis_replica_pool = nullptr;
62 1 : _redis_client_pool = redis_pool;
63 1 : _redis_cluster_client_pool = nullptr;
64 1 : _post_client_pool = post_client_pool;
65 1 : _social_graph_client_pool = social_graph_client_pool;
66 1 : }
67 :
68 0 : HomeTimelineHandler::HomeTimelineHandler(
69 : RedisCluster *redis_pool,
70 : ClientPool<ThriftClient<PostStorageServiceClient>> *post_client_pool,
71 : ClientPool<ThriftClient<SocialGraphServiceClient>>
72 0 : *social_graph_client_pool) {
73 0 : _redis_primary_pool = nullptr;
74 0 : _redis_replica_pool = nullptr;
75 0 : _redis_client_pool = nullptr;
76 0 : _redis_cluster_client_pool = redis_pool;
77 0 : _post_client_pool = post_client_pool;
78 0 : _social_graph_client_pool = social_graph_client_pool;
79 0 : }
80 :
81 0 : HomeTimelineHandler::HomeTimelineHandler(
82 : Redis *redis_replica_pool,
83 : Redis *redis_primary_pool,
84 : ClientPool<ThriftClient<PostStorageServiceClient>>* post_client_pool,
85 : ClientPool<ThriftClient<SocialGraphServiceClient>>
86 0 : * social_graph_client_pool) {
87 0 : _redis_primary_pool = redis_primary_pool;
88 0 : _redis_replica_pool = redis_replica_pool;
89 0 : _redis_client_pool = nullptr;
90 0 : _redis_cluster_client_pool = nullptr;
91 0 : _post_client_pool = post_client_pool;
92 0 : _social_graph_client_pool = social_graph_client_pool;
93 0 : }
94 :
95 0 : bool HomeTimelineHandler::IsRedisReplicationEnabled() {
96 0 : return (_redis_primary_pool || _redis_replica_pool);
97 : }
98 :
99 200 : void HomeTimelineHandler::WriteHomeTimeline(
100 : int64_t req_id, int64_t post_id, int64_t user_id, int64_t timestamp,
101 : const std::vector<int64_t> &user_mentions_id,
102 : const std::map<std::string, std::string> &carrier) {
103 400 : LOG(info) << "Start writing home timeline [req_id=" << req_id << ", user_id=" << user_id << ", post_id=" << post_id << "]";
104 : // Initialize a span
105 400 : TextMapReader reader(carrier);
106 400 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
107 400 : auto span = opentracing::Tracer::Global()->StartSpan(
108 800 : "write_home_timeline_server", {opentracing::ChildOf(parent_span->get())});
109 :
110 : // Find followers of the user
111 400 : auto followers_span = opentracing::Tracer::Global()->StartSpan(
112 800 : "get_followers_client", {opentracing::ChildOf(&span->context())});
113 400 : std::map<std::string, std::string> writer_text_map;
114 400 : TextMapWriter writer(writer_text_map);
115 200 : opentracing::Tracer::Global()->Inject(followers_span->context(), writer);
116 :
117 200 : auto social_graph_client_wrapper = _social_graph_client_pool->Pop();
118 200 : if (!social_graph_client_wrapper) {
119 0 : ServiceException se;
120 0 : se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
121 0 : se.message = "Failed to connect to social-graph-service";
122 0 : throw se;
123 : }
124 200 : auto social_graph_client = social_graph_client_wrapper->GetClient();
125 400 : std::vector<int64_t> followers_id;
126 : try {
127 : social_graph_client->GetFollowers(followers_id, req_id, user_id,
128 200 : writer_text_map);
129 0 : } catch (...) {
130 0 : LOG(error) << "Failed to get followers from social-network-service";
131 0 : _social_graph_client_pool->Remove(social_graph_client_wrapper);
132 0 : throw;
133 : }
134 200 : _social_graph_client_pool->Keepalive(social_graph_client_wrapper);
135 200 : followers_span->Finish();
136 400 : LOG(info) << "Successfully got followers [req_id=" << req_id << ", user_id=" << user_id << ", followers_count=" << followers_id.size() << "]";
137 :
138 400 : std::set<int64_t> followers_id_set(followers_id.begin(), followers_id.end());
139 200 : followers_id_set.insert(user_mentions_id.begin(), user_mentions_id.end());
140 :
141 : // Update Redis ZSet
142 : // Zset key: follower_id, Zset value: post_id_str, Zset score: timestamp_str
143 400 : auto redis_span = opentracing::Tracer::Global()->StartSpan(
144 : "write_home_timeline_redis_update_client",
145 800 : {opentracing::ChildOf(&span->context())});
146 400 : std::string post_id_str = std::to_string(post_id);
147 :
148 : {
149 200 : if (_redis_client_pool) {
150 400 : auto pipe = _redis_client_pool->pipeline(false);
151 6800 : for (auto &follower_id : followers_id_set) {
152 13200 : pipe.zadd(std::to_string(follower_id), post_id_str, timestamp,
153 19800 : UpdateType::NOT_EXIST);
154 : }
155 : try {
156 400 : auto replies = pipe.exec();
157 400 : LOG(info) << "Successfully wrote to redis home timeline [req_id=" << req_id << ", post_id=" << post_id << ", followers_count=" << followers_id_set.size() << "]";
158 0 : } catch (const Error &err) {
159 0 : LOG(error) << err.what();
160 0 : throw err;
161 : }
162 : }
163 :
164 0 : else if (IsRedisReplicationEnabled()) {
165 0 : auto pipe = _redis_primary_pool->pipeline(false);
166 0 : for (auto& follower_id : followers_id_set) {
167 0 : pipe.zadd(std::to_string(follower_id), post_id_str, timestamp,
168 0 : UpdateType::NOT_EXIST);
169 : }
170 : try {
171 0 : auto replies = pipe.exec();
172 0 : LOG(info) << "Successfully wrote to redis home timeline [req_id=" << req_id << ", post_id=" << post_id << ", followers_count=" << followers_id_set.size() << "]";
173 : }
174 0 : catch (const Error& err) {
175 0 : LOG(error) << err.what();
176 0 : throw err;
177 : }
178 : }
179 :
180 : else {
181 : // Create multi-pipeline that match with shards pool
182 0 : std::map<std::shared_ptr<ConnectionPool>, std::shared_ptr<Pipeline>> pipe_map;
183 0 : auto *shards_pool = _redis_cluster_client_pool->get_shards_pool();
184 :
185 0 : for (auto &follower_id : followers_id_set) {
186 0 : auto conn = shards_pool->fetch(std::to_string(follower_id));
187 0 : auto pipe = pipe_map.find(conn);
188 0 : if(pipe == pipe_map.end()) {//Not found, create new pipeline and insert
189 0 : auto new_pipe = std::make_shared<Pipeline>(_redis_cluster_client_pool->pipeline(std::to_string(follower_id), false));
190 0 : pipe_map.insert(make_pair(conn, new_pipe));
191 0 : auto *_pipe = new_pipe.get();
192 0 : _pipe->zadd(std::to_string(follower_id), post_id_str, timestamp,
193 0 : UpdateType::NOT_EXIST);
194 : }else{//Found, use exist pipeline
195 0 : std::pair<std::shared_ptr<ConnectionPool>, std::shared_ptr<Pipeline>> found = *pipe;
196 0 : auto *_pipe = found.second.get();
197 0 : _pipe->zadd(std::to_string(follower_id), post_id_str, timestamp,
198 0 : UpdateType::NOT_EXIST);
199 : }
200 : }
201 : // LOG(info) <<"followers_id_set items:" << followers_id_set.size()<<"; pipeline items:" << pipe_map.size();
202 : try {
203 0 : for(auto const &it : pipe_map) {
204 0 : auto _pipe = it.second.get();
205 0 : _pipe->exec();
206 : }
207 0 : LOG(info) << "Successfully wrote to redis home timeline [req_id=" << req_id << ", post_id=" << post_id << ", followers_count=" << followers_id_set.size() << "]";
208 0 : } catch (const Error &err) {
209 0 : LOG(error) << err.what();
210 0 : throw err;
211 : }
212 : }
213 : }
214 200 : redis_span->Finish();
215 400 : LOG(info) << "Home timeline write completed [req_id=" << req_id << ", post_id=" << post_id << "]";
216 200 : }
217 :
218 :
219 1000 : void HomeTimelineHandler::ReadHomeTimeline(
220 : std::vector<Post> &_return, int64_t req_id, int64_t user_id, int start_idx,
221 : int stop_idx, const std::map<std::string, std::string> &carrier) {
222 2000 : LOG(info) << "Start reading home timeline [req_id=" << req_id << ", user_id=" << user_id << ", start=" << start_idx << ", stop=" << stop_idx << "]";
223 : // Initialize a span
224 1200 : TextMapReader reader(carrier);
225 1200 : std::map<std::string, std::string> writer_text_map;
226 1200 : TextMapWriter writer(writer_text_map);
227 1200 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
228 2000 : auto span = opentracing::Tracer::Global()->StartSpan(
229 3200 : "read_home_timeline_server", {opentracing::ChildOf(parent_span->get())});
230 1000 : opentracing::Tracer::Global()->Inject(span->context(), writer);
231 :
232 1000 : if (stop_idx <= start_idx || start_idx < 0) {
233 800 : return;
234 : }
235 :
236 400 : auto redis_span = opentracing::Tracer::Global()->StartSpan(
237 : "read_home_timeline_redis_find_client",
238 800 : {opentracing::ChildOf(&span->context())});
239 :
240 400 : std::vector<std::string> post_ids_str;
241 : try {
242 200 : if (_redis_client_pool) {
243 600 : _redis_client_pool->zrevrange(std::to_string(user_id), start_idx,
244 200 : stop_idx - 1,
245 200 : std::back_inserter(post_ids_str));
246 : }
247 0 : else if (IsRedisReplicationEnabled()) {
248 0 : _redis_replica_pool->zrevrange(std::to_string(user_id), start_idx,
249 0 : stop_idx - 1,
250 0 : std::back_inserter(post_ids_str));
251 : }
252 :
253 : else {
254 0 : _redis_cluster_client_pool->zrevrange(std::to_string(user_id), start_idx,
255 0 : stop_idx - 1,
256 0 : std::back_inserter(post_ids_str));
257 : }
258 0 : } catch (const Error &err) {
259 0 : LOG(error) << err.what();
260 0 : throw err;
261 : }
262 200 : redis_span->Finish();
263 :
264 400 : std::vector<int64_t> post_ids;
265 200 : for (auto &post_id_str : post_ids_str) {
266 0 : post_ids.emplace_back(std::stoul(post_id_str));
267 : }
268 :
269 200 : auto post_client_wrapper = _post_client_pool->Pop();
270 200 : if (!post_client_wrapper) {
271 0 : ServiceException se;
272 0 : se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
273 0 : se.message = "Failed to connect to post-storage-service";
274 0 : throw se;
275 : }
276 200 : auto post_client = post_client_wrapper->GetClient();
277 : try {
278 200 : post_client->ReadPosts(_return, req_id, post_ids, writer_text_map);
279 0 : } catch (...) {
280 0 : _post_client_pool->Remove(post_client_wrapper);
281 0 : LOG(error) << "Failed to read posts from post-storage-service";
282 0 : throw;
283 : }
284 200 : _post_client_pool->Keepalive(post_client_wrapper);
285 400 : LOG(info) << "Successfully got posts from post-storage-service [req_id=" << req_id << ", user_id=" << user_id << ", post_count=" << _return.size() << "]";
286 :
287 200 : span->Finish();
288 400 : LOG(info) << "Home timeline read completed [req_id=" << req_id << ", user_id=" << user_id << "]";
289 : }
290 :
291 : } // namespace social_network
292 :
293 : #endif // SOCIAL_NETWORK_MICROSERVICES_SRC_HOMETIMELINESERVICE_HOMETIMELINEHANDLER_H_
|