Line data Source code
1 : #ifndef SOCIAL_NETWORK_MICROSERVICES_SRC_USERTIMELINESERVICE_USERTIMELINEHANDLER_H_
2 : #define SOCIAL_NETWORK_MICROSERVICES_SRC_USERTIMELINESERVICE_USERTIMELINEHANDLER_H_
3 :
4 : #include <bson/bson.h>
5 : #include <mongoc.h>
6 : #include <sw/redis++/redis++.h>
7 :
8 : #include <future>
9 : #include <iostream>
10 : #include <string>
11 :
12 : #include "../../gen-cpp/PostStorageService.h"
13 : #include "../../gen-cpp/UserTimelineService.h"
14 : #include "../ClientPool.h"
15 : #include "../ThriftClient.h"
16 : #include "../logger.h"
17 : #include "../tracing.h"
18 :
19 : using namespace sw::redis;
20 :
21 : namespace social_network {
22 :
23 : class UserTimelineHandler : public UserTimelineServiceIf {
24 : public:
25 : UserTimelineHandler(Redis *, mongoc_client_pool_t *,
26 : ClientPool<ThriftClient<PostStorageServiceClient>> *);
27 :
28 : UserTimelineHandler(Redis *, Redis *, mongoc_client_pool_t *,
29 : ClientPool<ThriftClient<PostStorageServiceClient>> *);
30 :
31 : UserTimelineHandler(RedisCluster *, mongoc_client_pool_t *,
32 : ClientPool<ThriftClient<PostStorageServiceClient>> *);
33 0 : ~UserTimelineHandler() override = default;
34 :
35 : bool IsRedisReplicationEnabled();
36 :
37 : void WriteUserTimeline(
38 : int64_t req_id, int64_t post_id, int64_t user_id, int64_t timestamp,
39 : const std::map<std::string, std::string> &carrier) override;
40 :
41 : void ReadUserTimeline(std::vector<Post> &, int64_t, int64_t, int, int,
42 : const std::map<std::string, std::string> &) override;
43 :
44 : private:
45 : Redis *_redis_client_pool;
46 : Redis *_redis_replica_pool;
47 : Redis *_redis_primary_pool;
48 : RedisCluster *_redis_cluster_client_pool;
49 : mongoc_client_pool_t *_mongodb_client_pool;
50 : ClientPool<ThriftClient<PostStorageServiceClient>> *_post_client_pool;
51 : };
52 :
53 1 : UserTimelineHandler::UserTimelineHandler(
54 : Redis *redis_pool, mongoc_client_pool_t *mongodb_pool,
55 1 : ClientPool<ThriftClient<PostStorageServiceClient>> *post_client_pool) {
56 1 : _redis_client_pool = redis_pool;
57 1 : _redis_replica_pool = nullptr;
58 1 : _redis_primary_pool = nullptr;
59 1 : _redis_cluster_client_pool = nullptr;
60 1 : _mongodb_client_pool = mongodb_pool;
61 1 : _post_client_pool = post_client_pool;
62 1 : }
63 :
64 0 : UserTimelineHandler::UserTimelineHandler(
65 : Redis* redis_replica_pool, Redis* redis_primary_pool, mongoc_client_pool_t* mongodb_pool,
66 0 : ClientPool<ThriftClient<PostStorageServiceClient>>* post_client_pool) {
67 0 : _redis_client_pool = nullptr;
68 0 : _redis_replica_pool = redis_replica_pool;
69 0 : _redis_primary_pool = redis_primary_pool;
70 0 : _redis_cluster_client_pool = nullptr;
71 0 : _mongodb_client_pool = mongodb_pool;
72 0 : _post_client_pool = post_client_pool;
73 0 : }
74 :
75 0 : UserTimelineHandler::UserTimelineHandler(
76 : RedisCluster *redis_pool, mongoc_client_pool_t *mongodb_pool,
77 0 : ClientPool<ThriftClient<PostStorageServiceClient>> *post_client_pool) {
78 0 : _redis_cluster_client_pool = redis_pool;
79 0 : _redis_replica_pool = nullptr;
80 0 : _redis_primary_pool = nullptr;
81 0 : _redis_client_pool = nullptr;
82 0 : _mongodb_client_pool = mongodb_pool;
83 0 : _post_client_pool = post_client_pool;
84 0 : }
85 :
86 0 : bool UserTimelineHandler::IsRedisReplicationEnabled() {
87 0 : return (_redis_primary_pool || _redis_replica_pool);
88 : }
89 :
90 200 : void UserTimelineHandler::WriteUserTimeline(
91 : int64_t req_id, int64_t post_id, int64_t user_id, int64_t timestamp,
92 : const std::map<std::string, std::string> &carrier) {
93 : // 记录收到写入请求
94 400 : LOG(info) << "Received WriteUserTimeline request [req_id=" << req_id << ", user_id=" << user_id << ", post_id=" << post_id << ", timestamp=" << timestamp << "]";
95 : // Initialize a span
96 400 : TextMapReader reader(carrier);
97 400 : std::map<std::string, std::string> writer_text_map;
98 400 : TextMapWriter writer(writer_text_map);
99 400 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
100 400 : auto span = opentracing::Tracer::Global()->StartSpan(
101 800 : "write_user_timeline_server", {opentracing::ChildOf(parent_span->get())});
102 200 : opentracing::Tracer::Global()->Inject(span->context(), writer);
103 :
104 : mongoc_client_t *mongodb_client =
105 200 : mongoc_client_pool_pop(_mongodb_client_pool);
106 200 : if (!mongodb_client) {
107 0 : ServiceException se;
108 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
109 0 : se.message = "Failed to pop a client from MongoDB pool";
110 0 : throw se;
111 : }
112 : auto collection = mongoc_client_get_collection(
113 200 : mongodb_client, "user-timeline", "user-timeline");
114 200 : if (!collection) {
115 0 : ServiceException se;
116 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
117 0 : se.message = "Failed to create collection user-timeline from MongoDB";
118 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
119 0 : throw se;
120 : }
121 200 : bson_t *query = bson_new();
122 :
123 200 : BSON_APPEND_INT64(query, "user_id", user_id);
124 : bson_t *update =
125 200 : BCON_NEW("$push", "{", "posts", "{", "$each", "[", "{", "post_id",
126 : BCON_INT64(post_id), "timestamp", BCON_INT64(timestamp), "}",
127 : "]", "$position", BCON_INT32(0), "}", "}");
128 : bson_error_t error;
129 : bson_t reply;
130 400 : auto update_span = opentracing::Tracer::Global()->StartSpan(
131 : "write_user_timeline_mongo_insert_client",
132 800 : {opentracing::ChildOf(&span->context())});
133 : bool updated = mongoc_collection_find_and_modify(collection, query, nullptr,
134 : update, nullptr, false, true,
135 200 : true, &reply, &error);
136 200 : update_span->Finish();
137 :
138 200 : if (!updated) {
139 : // update the newly inserted document (upsert: false)
140 : updated = mongoc_collection_find_and_modify(collection, query, nullptr,
141 : update, nullptr, false, false,
142 0 : true, &reply, &error);
143 0 : if (!updated) {
144 0 : LOG(error) << "Failed to update user-timeline for user " << user_id
145 0 : << " to MongoDB: " << error.message;
146 0 : ServiceException se;
147 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
148 0 : se.message = error.message;
149 0 : bson_destroy(update);
150 0 : bson_destroy(query);
151 0 : bson_destroy(&reply);
152 0 : mongoc_collection_destroy(collection);
153 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
154 0 : throw se;
155 : }
156 : }
157 :
158 200 : bson_destroy(update);
159 200 : bson_destroy(&reply);
160 200 : bson_destroy(query);
161 200 : mongoc_collection_destroy(collection);
162 200 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
163 :
164 : // Update user's timeline in redis
165 400 : auto redis_span = opentracing::Tracer::Global()->StartSpan(
166 : "write_user_timeline_redis_update_client",
167 800 : {opentracing::ChildOf(&span->context())});
168 : try {
169 200 : if (_redis_client_pool)
170 400 : _redis_client_pool->zadd(std::to_string(user_id), std::to_string(post_id),
171 200 : timestamp, UpdateType::NOT_EXIST);
172 0 : else if (IsRedisReplicationEnabled()) {
173 0 : _redis_primary_pool->zadd(std::to_string(user_id), std::to_string(post_id),
174 0 : timestamp, UpdateType::NOT_EXIST);
175 : }
176 : else
177 0 : _redis_cluster_client_pool->zadd(std::to_string(user_id), std::to_string(post_id),
178 0 : timestamp, UpdateType::NOT_EXIST);
179 :
180 0 : } catch (const Error &err) {
181 0 : LOG(error) << err.what();
182 0 : throw err;
183 : }
184 200 : redis_span->Finish();
185 200 : span->Finish();
186 : // 写入完成日志
187 400 : LOG(info) << "WriteUserTimeline completed [req_id=" << req_id << ", user_id=" << user_id << ", post_id=" << post_id << "]";
188 200 : }
189 :
190 800 : void UserTimelineHandler::ReadUserTimeline(
191 : std::vector<Post> &_return, int64_t req_id, int64_t user_id, int start,
192 : int stop, const std::map<std::string, std::string> &carrier) {
193 : // 记录收到读取请求
194 1600 : LOG(info) << "Received ReadUserTimeline request [req_id=" << req_id << ", user_id=" << user_id << ", start=" << start << ", stop=" << stop << "]";
195 : // Initialize a span
196 1200 : TextMapReader reader(carrier);
197 1200 : std::map<std::string, std::string> writer_text_map;
198 1200 : TextMapWriter writer(writer_text_map);
199 1200 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
200 1600 : auto span = opentracing::Tracer::Global()->StartSpan(
201 2800 : "read_user_timeline_server", {opentracing::ChildOf(parent_span->get())});
202 800 : opentracing::Tracer::Global()->Inject(span->context(), writer);
203 :
204 800 : if (stop <= start || start < 0) {
205 400 : return;
206 : }
207 :
208 800 : auto redis_span = opentracing::Tracer::Global()->StartSpan(
209 : "read_user_timeline_redis_find_client",
210 1600 : {opentracing::ChildOf(&span->context())});
211 :
212 800 : std::vector<std::string> post_ids_str;
213 : try {
214 400 : if (_redis_client_pool)
215 800 : _redis_client_pool->zrevrange(std::to_string(user_id), start, stop - 1,
216 400 : std::back_inserter(post_ids_str));
217 0 : else if (IsRedisReplicationEnabled()) {
218 0 : _redis_replica_pool->zrevrange(std::to_string(user_id), start, stop - 1,
219 0 : std::back_inserter(post_ids_str));
220 : }
221 : else
222 0 : _redis_cluster_client_pool->zrevrange(std::to_string(user_id), start, stop - 1,
223 0 : std::back_inserter(post_ids_str));
224 0 : } catch (const Error &err) {
225 0 : LOG(error) << err.what();
226 0 : throw err;
227 : }
228 400 : redis_span->Finish();
229 :
230 800 : std::vector<int64_t> post_ids;
231 400 : for (auto &post_id_str : post_ids_str) {
232 0 : post_ids.emplace_back(std::stoul(post_id_str));
233 : }
234 :
235 : // find in mongodb
236 400 : int mongo_start = start + post_ids.size();
237 800 : std::unordered_map<std::string, double> redis_update_map;
238 400 : if (mongo_start < stop) {
239 : // Instead find post_ids from mongodb
240 : mongoc_client_t *mongodb_client =
241 400 : mongoc_client_pool_pop(_mongodb_client_pool);
242 400 : if (!mongodb_client) {
243 0 : ServiceException se;
244 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
245 0 : se.message = "Failed to pop a client from MongoDB pool";
246 0 : throw se;
247 : }
248 : auto collection = mongoc_client_get_collection(
249 400 : mongodb_client, "user-timeline", "user-timeline");
250 400 : if (!collection) {
251 0 : ServiceException se;
252 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
253 0 : se.message = "Failed to create collection user-timeline from MongoDB";
254 0 : throw se;
255 : }
256 :
257 400 : bson_t *query = BCON_NEW("user_id", BCON_INT64(user_id));
258 400 : bson_t *opts = BCON_NEW("projection", "{", "posts", "{", "$slice", "[",
259 : BCON_INT32(0), BCON_INT32(stop), "]", "}", "}");
260 :
261 800 : auto find_span = opentracing::Tracer::Global()->StartSpan(
262 : "user_timeline_mongo_find_client",
263 1600 : {opentracing::ChildOf(&span->context())});
264 : mongoc_cursor_t *cursor =
265 400 : mongoc_collection_find_with_opts(collection, query, opts, nullptr);
266 400 : find_span->Finish();
267 : const bson_t *doc;
268 400 : bool found = mongoc_cursor_next(cursor, &doc);
269 400 : if (found) {
270 : bson_iter_t iter_0;
271 : bson_iter_t iter_1;
272 : bson_iter_t post_id_child;
273 : bson_iter_t timestamp_child;
274 0 : int idx = 0;
275 0 : bson_iter_init(&iter_0, doc);
276 0 : bson_iter_init(&iter_1, doc);
277 0 : while (bson_iter_find_descendant(
278 0 : &iter_0, ("posts." + std::to_string(idx) + ".post_id").c_str(),
279 0 : &post_id_child) &&
280 0 : BSON_ITER_HOLDS_INT64(&post_id_child) &&
281 0 : bson_iter_find_descendant(
282 : &iter_1,
283 0 : ("posts." + std::to_string(idx) + ".timestamp").c_str(),
284 0 : ×tamp_child) &&
285 0 : BSON_ITER_HOLDS_INT64(×tamp_child)) {
286 0 : auto curr_post_id = bson_iter_int64(&post_id_child);
287 0 : auto curr_timestamp = bson_iter_int64(×tamp_child);
288 0 : if (idx >= mongo_start) {
289 : //In mixed workload condition, post may composed between redis and mongo read
290 : //mongodb index will shift and duplicate post_id occurs
291 0 : if ( std::find(post_ids.begin(), post_ids.end(), curr_post_id) == post_ids.end() ) {
292 0 : post_ids.emplace_back(curr_post_id);
293 : }
294 : }
295 0 : redis_update_map.insert(std::make_pair(std::to_string(curr_post_id),
296 0 : (double)curr_timestamp));
297 0 : bson_iter_init(&iter_0, doc);
298 0 : bson_iter_init(&iter_1, doc);
299 0 : idx++;
300 : }
301 : }
302 400 : bson_destroy(opts);
303 400 : bson_destroy(query);
304 400 : mongoc_cursor_destroy(cursor);
305 400 : mongoc_collection_destroy(collection);
306 400 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
307 : }
308 :
309 : std::future<std::vector<Post>> post_future =
310 400 : std::async(std::launch::async, [&]() {
311 800 : auto post_client_wrapper = _post_client_pool->Pop();
312 400 : if (!post_client_wrapper) {
313 0 : ServiceException se;
314 0 : se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
315 0 : se.message = "Failed to connect to post-storage-service";
316 0 : throw se;
317 : }
318 400 : std::vector<Post> _return_posts;
319 400 : auto post_client = post_client_wrapper->GetClient();
320 : try {
321 400 : post_client->ReadPosts(_return_posts, req_id, post_ids,
322 800 : writer_text_map);
323 0 : } catch (...) {
324 0 : _post_client_pool->Remove(post_client_wrapper);
325 0 : LOG(error) << "Failed to read posts from post-storage-service";
326 0 : throw;
327 : }
328 400 : _post_client_pool->Keepalive(post_client_wrapper);
329 400 : return _return_posts;
330 800 : });
331 :
332 400 : if (redis_update_map.size() > 0) {
333 0 : auto redis_update_span = opentracing::Tracer::Global()->StartSpan(
334 : "user_timeline_redis_update_client",
335 0 : {opentracing::ChildOf(&span->context())});
336 : try {
337 0 : if (_redis_client_pool)
338 0 : _redis_client_pool->zadd(std::to_string(user_id),
339 : redis_update_map.begin(),
340 0 : redis_update_map.end());
341 0 : else if (IsRedisReplicationEnabled()) {
342 0 : _redis_primary_pool->zadd(std::to_string(user_id),
343 : redis_update_map.begin(),
344 0 : redis_update_map.end());
345 : }
346 : else
347 0 : _redis_cluster_client_pool->zadd(std::to_string(user_id),
348 : redis_update_map.begin(),
349 0 : redis_update_map.end());
350 :
351 0 : } catch (const Error &err) {
352 0 : LOG(error) << err.what();
353 0 : throw err;
354 : }
355 0 : redis_update_span->Finish();
356 : }
357 :
358 : try {
359 400 : _return = post_future.get();
360 0 : } catch (...) {
361 0 : LOG(error) << "Failed to get post from post-storage-service";
362 0 : throw;
363 : }
364 400 : span->Finish();
365 : // 读取完成日志
366 800 : LOG(info) << "ReadUserTimeline completed [req_id=" << req_id << ", user_id=" << user_id << ", return count=" << _return.size() << "]";
367 : }
368 :
369 : } // namespace social_network
370 :
371 : #endif // SOCIAL_NETWORK_MICROSERVICES_SRC_USERTIMELINESERVICE_USERTIMELINEHANDLER_H_
|