Line data Source code
1 : #ifndef SOCIAL_NETWORK_MICROSERVICES_SRC_COMPOSEPOSTSERVICE_COMPOSEPOSTHANDLER_H_
2 : #define SOCIAL_NETWORK_MICROSERVICES_SRC_COMPOSEPOSTSERVICE_COMPOSEPOSTHANDLER_H_
3 :
4 : #include <chrono>
5 : #include <future>
6 : #include <iostream>
7 : #include <nlohmann/json.hpp>
8 : #include <string>
9 : #include <vector>
10 :
11 : #include "../../gen-cpp/ComposePostService.h"
12 : #include "../../gen-cpp/HomeTimelineService.h"
13 : #include "../../gen-cpp/MediaService.h"
14 : #include "../../gen-cpp/PostStorageService.h"
15 : #include "../../gen-cpp/TextService.h"
16 : #include "../../gen-cpp/UniqueIdService.h"
17 : #include "../../gen-cpp/UserService.h"
18 : #include "../../gen-cpp/UserTimelineService.h"
19 : #include "../../gen-cpp/social_network_types.h"
20 : #include "../ClientPool.h"
21 : #include "../ThriftClient.h"
22 : #include "../logger.h"
23 : #include "../tracing.h"
24 :
25 : namespace social_network {
26 : using json = nlohmann::json;
27 : using std::chrono::duration_cast;
28 : using std::chrono::milliseconds;
29 : using std::chrono::system_clock;
30 :
31 : class ComposePostHandler : public ComposePostServiceIf {
32 : public:
33 : ComposePostHandler(ClientPool<ThriftClient<PostStorageServiceClient>> *,
34 : ClientPool<ThriftClient<UserTimelineServiceClient>> *,
35 : ClientPool<ThriftClient<UserServiceClient>> *,
36 : ClientPool<ThriftClient<UniqueIdServiceClient>> *,
37 : ClientPool<ThriftClient<MediaServiceClient>> *,
38 : ClientPool<ThriftClient<TextServiceClient>> *,
39 : ClientPool<ThriftClient<HomeTimelineServiceClient>> *);
40 0 : ~ComposePostHandler() override = default;
41 :
42 : void ComposePost(int64_t req_id, const std::string &username, int64_t user_id,
43 : const std::string &text,
44 : const std::vector<int64_t> &media_ids,
45 : const std::vector<std::string> &media_types,
46 : PostType::type post_type,
47 : const std::map<std::string, std::string> &carrier) override;
48 :
49 : private:
50 : ClientPool<ThriftClient<PostStorageServiceClient>> *_post_storage_client_pool;
51 : ClientPool<ThriftClient<UserTimelineServiceClient>>
52 : *_user_timeline_client_pool;
53 :
54 : ClientPool<ThriftClient<UserServiceClient>> *_user_service_client_pool;
55 : ClientPool<ThriftClient<UniqueIdServiceClient>>
56 : *_unique_id_service_client_pool;
57 : ClientPool<ThriftClient<MediaServiceClient>> *_media_service_client_pool;
58 : ClientPool<ThriftClient<TextServiceClient>> *_text_service_client_pool;
59 : ClientPool<ThriftClient<HomeTimelineServiceClient>>
60 : *_home_timeline_client_pool;
61 :
62 : void _UploadUserTimelineHelper(
63 : int64_t req_id, int64_t post_id, int64_t user_id, int64_t timestamp,
64 : const std::map<std::string, std::string> &carrier);
65 :
66 : void _UploadPostHelper(int64_t req_id, const Post &post,
67 : const std::map<std::string, std::string> &carrier);
68 :
69 : void _UploadHomeTimelineHelper(
70 : int64_t req_id, int64_t post_id, int64_t user_id, int64_t timestamp,
71 : const std::vector<int64_t> &user_mentions_id,
72 : const std::map<std::string, std::string> &carrier);
73 :
74 : Creator _ComposeCreaterHelper(
75 : int64_t req_id, int64_t user_id, const std::string &username,
76 : const std::map<std::string, std::string> &carrier);
77 : TextServiceReturn _ComposeTextHelper(
78 : int64_t req_id, const std::string &text,
79 : const std::map<std::string, std::string> &carrier);
80 : std::vector<Media> _ComposeMediaHelper(
81 : int64_t req_id, const std::vector<std::string> &media_types,
82 : const std::vector<int64_t> &media_ids,
83 : const std::map<std::string, std::string> &carrier);
84 : int64_t _ComposeUniqueIdHelper(
85 : int64_t req_id, PostType::type post_type,
86 : const std::map<std::string, std::string> &carrier);
87 : };
88 :
89 1 : ComposePostHandler::ComposePostHandler(
90 : ClientPool<social_network::ThriftClient<PostStorageServiceClient>>
91 : *post_storage_client_pool,
92 : ClientPool<social_network::ThriftClient<UserTimelineServiceClient>>
93 : *user_timeline_client_pool,
94 : ClientPool<ThriftClient<UserServiceClient>> *user_service_client_pool,
95 : ClientPool<ThriftClient<UniqueIdServiceClient>>
96 : *unique_id_service_client_pool,
97 : ClientPool<ThriftClient<MediaServiceClient>> *media_service_client_pool,
98 : ClientPool<ThriftClient<TextServiceClient>> *text_service_client_pool,
99 : ClientPool<ThriftClient<HomeTimelineServiceClient>>
100 1 : *home_timeline_client_pool) {
101 1 : _post_storage_client_pool = post_storage_client_pool;
102 1 : _user_timeline_client_pool = user_timeline_client_pool;
103 1 : _user_service_client_pool = user_service_client_pool;
104 1 : _unique_id_service_client_pool = unique_id_service_client_pool;
105 1 : _media_service_client_pool = media_service_client_pool;
106 1 : _text_service_client_pool = text_service_client_pool;
107 1 : _home_timeline_client_pool = home_timeline_client_pool;
108 1 : }
109 :
110 200 : Creator ComposePostHandler::_ComposeCreaterHelper(
111 : int64_t req_id, int64_t user_id, const std::string &username,
112 : const std::map<std::string, std::string> &carrier) {
113 400 : TextMapReader reader(carrier);
114 400 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
115 400 : auto span = opentracing::Tracer::Global()->StartSpan(
116 800 : "compose_creator_client", {opentracing::ChildOf(parent_span->get())});
117 400 : std::map<std::string, std::string> writer_text_map;
118 400 : TextMapWriter writer(writer_text_map);
119 200 : opentracing::Tracer::Global()->Inject(span->context(), writer);
120 :
121 200 : auto user_client_wrapper = _user_service_client_pool->Pop();
122 200 : if (!user_client_wrapper) {
123 0 : ServiceException se;
124 0 : se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
125 0 : se.message = "Failed to connect to user-service";
126 0 : LOG(error) << se.message;
127 0 : span->Finish();
128 0 : throw se;
129 : }
130 :
131 200 : auto user_client = user_client_wrapper->GetClient();
132 200 : Creator _return_creator;
133 : try {
134 : user_client->ComposeCreatorWithUserId(_return_creator, req_id, user_id,
135 200 : username, writer_text_map);
136 0 : } catch (...) {
137 0 : LOG(error) << "Failed to send compose-creator to user-service";
138 0 : _user_service_client_pool->Remove(user_client_wrapper);
139 0 : span->Finish();
140 0 : throw;
141 : }
142 200 : _user_service_client_pool->Keepalive(user_client_wrapper);
143 200 : span->Finish();
144 400 : return _return_creator;
145 : }
146 :
147 200 : TextServiceReturn ComposePostHandler::_ComposeTextHelper(
148 : int64_t req_id, const std::string &text,
149 : const std::map<std::string, std::string> &carrier) {
150 400 : TextMapReader reader(carrier);
151 400 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
152 400 : auto span = opentracing::Tracer::Global()->StartSpan(
153 800 : "compose_text_client", {opentracing::ChildOf(parent_span->get())});
154 400 : std::map<std::string, std::string> writer_text_map;
155 400 : TextMapWriter writer(writer_text_map);
156 200 : opentracing::Tracer::Global()->Inject(span->context(), writer);
157 :
158 200 : auto text_client_wrapper = _text_service_client_pool->Pop();
159 200 : if (!text_client_wrapper) {
160 0 : ServiceException se;
161 0 : se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
162 0 : se.message = "Failed to connect to text-service";
163 0 : LOG(error) << se.message;
164 : ;
165 0 : span->Finish();
166 0 : throw se;
167 : }
168 :
169 200 : auto text_client = text_client_wrapper->GetClient();
170 200 : TextServiceReturn _return_text;
171 : try {
172 200 : text_client->ComposeText(_return_text, req_id, text, writer_text_map);
173 0 : } catch (...) {
174 0 : LOG(error) << "Failed to send compose-text to text-service";
175 0 : _text_service_client_pool->Remove(text_client_wrapper);
176 0 : span->Finish();
177 0 : throw;
178 : }
179 200 : _text_service_client_pool->Keepalive(text_client_wrapper);
180 200 : span->Finish();
181 400 : return _return_text;
182 : }
183 :
184 200 : std::vector<Media> ComposePostHandler::_ComposeMediaHelper(
185 : int64_t req_id, const std::vector<std::string> &media_types,
186 : const std::vector<int64_t> &media_ids,
187 : const std::map<std::string, std::string> &carrier) {
188 400 : TextMapReader reader(carrier);
189 400 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
190 400 : auto span = opentracing::Tracer::Global()->StartSpan(
191 800 : "compose_media_client", {opentracing::ChildOf(parent_span->get())});
192 400 : std::map<std::string, std::string> writer_text_map;
193 400 : TextMapWriter writer(writer_text_map);
194 200 : opentracing::Tracer::Global()->Inject(span->context(), writer);
195 :
196 200 : auto media_client_wrapper = _media_service_client_pool->Pop();
197 0 : if (!media_client_wrapper) {
198 0 : ServiceException se;
199 0 : se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
200 0 : se.message = "Failed to connect to media-service";
201 0 : LOG(error) << se.message;
202 : ;
203 0 : span->Finish();
204 0 : throw se;
205 : }
206 :
207 0 : auto media_client = media_client_wrapper->GetClient();
208 0 : std::vector<Media> _return_media;
209 : try {
210 : media_client->ComposeMedia(_return_media, req_id, media_types, media_ids,
211 0 : writer_text_map);
212 0 : } catch (...) {
213 0 : LOG(error) << "Failed to send compose-media to media-service";
214 0 : _media_service_client_pool->Remove(media_client_wrapper);
215 0 : span->Finish();
216 0 : throw;
217 : }
218 0 : _media_service_client_pool->Keepalive(media_client_wrapper);
219 0 : span->Finish();
220 0 : return _return_media;
221 : }
222 :
223 200 : int64_t ComposePostHandler::_ComposeUniqueIdHelper(
224 : int64_t req_id, const PostType::type post_type,
225 : const std::map<std::string, std::string> &carrier) {
226 400 : TextMapReader reader(carrier);
227 400 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
228 400 : auto span = opentracing::Tracer::Global()->StartSpan(
229 800 : "compose_unique_id_client", {opentracing::ChildOf(parent_span->get())});
230 400 : std::map<std::string, std::string> writer_text_map;
231 400 : TextMapWriter writer(writer_text_map);
232 200 : opentracing::Tracer::Global()->Inject(span->context(), writer);
233 :
234 200 : auto unique_id_client_wrapper = _unique_id_service_client_pool->Pop();
235 200 : if (!unique_id_client_wrapper) {
236 0 : ServiceException se;
237 0 : se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
238 0 : se.message = "Failed to connect to unique_id-service";
239 0 : LOG(error) << se.message;
240 0 : span->Finish();
241 0 : throw se;
242 : }
243 :
244 200 : auto unique_id_client = unique_id_client_wrapper->GetClient();
245 : int64_t _return_unique_id;
246 : try {
247 : _return_unique_id =
248 200 : unique_id_client->ComposeUniqueId(req_id, post_type, writer_text_map);
249 0 : } catch (...) {
250 0 : LOG(error) << "Failed to send compose-unique_id to unique_id-service";
251 0 : _unique_id_service_client_pool->Remove(unique_id_client_wrapper);
252 0 : span->Finish();
253 0 : throw;
254 : }
255 200 : _unique_id_service_client_pool->Keepalive(unique_id_client_wrapper);
256 200 : span->Finish();
257 400 : return _return_unique_id;
258 : }
259 :
260 0 : void ComposePostHandler::_UploadPostHelper(
261 : int64_t req_id, const Post &post,
262 : const std::map<std::string, std::string> &carrier) {
263 :
264 0 : LOG(info) << "start upload post to storage service [req_id=" << req_id << ", post_id=" << post.post_id << "]";
265 :
266 0 : TextMapReader reader(carrier);
267 0 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
268 0 : auto span = opentracing::Tracer::Global()->StartSpan(
269 0 : "store_post_client", {opentracing::ChildOf(parent_span->get())});
270 0 : std::map<std::string, std::string> writer_text_map;
271 0 : TextMapWriter writer(writer_text_map);
272 0 : opentracing::Tracer::Global()->Inject(span->context(), writer);
273 :
274 0 : auto post_storage_client_wrapper = _post_storage_client_pool->Pop();
275 0 : if (!post_storage_client_wrapper) {
276 0 : ServiceException se;
277 0 : se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
278 0 : se.message = "Failed to connect to post-storage-service";
279 0 : LOG(error) << se.message;
280 : ;
281 0 : throw se;
282 : }
283 0 : auto post_storage_client = post_storage_client_wrapper->GetClient();
284 : try {
285 0 : post_storage_client->StorePost(req_id, post, writer_text_map);
286 0 : LOG(info) << "post storage completed [req_id=" << req_id << ", post_id=" << post.post_id << "]";
287 0 : } catch (...) {
288 0 : _post_storage_client_pool->Remove(post_storage_client_wrapper);
289 0 : LOG(error) << "Failed to store post to post-storage-service";
290 0 : throw;
291 : }
292 0 : _post_storage_client_pool->Keepalive(post_storage_client_wrapper);
293 :
294 0 : span->Finish();
295 0 : LOG(info) << "finish upload post to storage service [req_id=" << req_id << "]";
296 0 : }
297 :
298 0 : void ComposePostHandler::_UploadUserTimelineHelper(
299 : int64_t req_id, int64_t post_id, int64_t user_id, int64_t timestamp,
300 : const std::map<std::string, std::string> &carrier) {
301 0 : TextMapReader reader(carrier);
302 0 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
303 0 : auto span = opentracing::Tracer::Global()->StartSpan(
304 0 : "write_user_timeline_client", {opentracing::ChildOf(parent_span->get())});
305 0 : std::map<std::string, std::string> writer_text_map;
306 0 : TextMapWriter writer(writer_text_map);
307 0 : opentracing::Tracer::Global()->Inject(span->context(), writer);
308 :
309 0 : auto user_timeline_client_wrapper = _user_timeline_client_pool->Pop();
310 0 : if (!user_timeline_client_wrapper) {
311 0 : ServiceException se;
312 0 : se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
313 0 : se.message = "Failed to connect to user-timeline-service";
314 0 : LOG(error) << se.message;
315 : ;
316 0 : throw se;
317 : }
318 0 : auto user_timeline_client = user_timeline_client_wrapper->GetClient();
319 : try {
320 : user_timeline_client->WriteUserTimeline(req_id, post_id, user_id, timestamp,
321 0 : writer_text_map);
322 0 : } catch (...) {
323 0 : _user_timeline_client_pool->Remove(user_timeline_client_wrapper);
324 0 : throw;
325 : }
326 0 : _user_timeline_client_pool->Keepalive(user_timeline_client_wrapper);
327 :
328 0 : span->Finish();
329 0 : }
330 :
331 0 : void ComposePostHandler::_UploadHomeTimelineHelper(
332 : int64_t req_id, int64_t post_id, int64_t user_id, int64_t timestamp,
333 : const std::vector<int64_t> &user_mentions_id,
334 : const std::map<std::string, std::string> &carrier) {
335 0 : TextMapReader reader(carrier);
336 0 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
337 0 : auto span = opentracing::Tracer::Global()->StartSpan(
338 0 : "write_home_timeline_client", {opentracing::ChildOf(parent_span->get())});
339 0 : std::map<std::string, std::string> writer_text_map;
340 0 : TextMapWriter writer(writer_text_map);
341 0 : opentracing::Tracer::Global()->Inject(span->context(), writer);
342 :
343 0 : auto home_timeline_client_wrapper = _home_timeline_client_pool->Pop();
344 0 : if (!home_timeline_client_wrapper) {
345 0 : ServiceException se;
346 0 : se.errorCode = ErrorCode::SE_THRIFT_CONN_ERROR;
347 0 : se.message = "Failed to connect to home-timeline-service";
348 0 : LOG(error) << se.message;
349 : ;
350 0 : throw se;
351 : }
352 0 : auto home_timeline_client = home_timeline_client_wrapper->GetClient();
353 : try {
354 : home_timeline_client->WriteHomeTimeline(req_id, post_id, user_id, timestamp,
355 0 : user_mentions_id, writer_text_map);
356 0 : } catch (...) {
357 0 : _home_timeline_client_pool->Remove(home_timeline_client_wrapper);
358 0 : LOG(error) << "Failed to write home timeline to home-timeline-service";
359 0 : throw;
360 : }
361 0 : _home_timeline_client_pool->Keepalive(home_timeline_client_wrapper);
362 :
363 0 : span->Finish();
364 0 : }
365 :
366 200 : void ComposePostHandler::ComposePost(
367 : const int64_t req_id, const std::string &username, int64_t user_id,
368 : const std::string &text, const std::vector<int64_t> &media_ids,
369 : const std::vector<std::string> &media_types, const PostType::type post_type,
370 : const std::map<std::string, std::string> &carrier) {
371 :
372 400 : LOG(info) << "start process compose post request [req_id=" << req_id << ", user=" << username << "]";
373 :
374 400 : TextMapReader reader(carrier);
375 400 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
376 400 : auto span = opentracing::Tracer::Global()->StartSpan(
377 800 : "compose_post_server", {opentracing::ChildOf(parent_span->get())});
378 400 : std::map<std::string, std::string> writer_text_map;
379 400 : TextMapWriter writer(writer_text_map);
380 200 : opentracing::Tracer::Global()->Inject(span->context(), writer);
381 :
382 : auto text_future =
383 : std::async(std::launch::async, &ComposePostHandler::_ComposeTextHelper,
384 400 : this, req_id, text, writer_text_map);
385 : auto creator_future =
386 : std::async(std::launch::async, &ComposePostHandler::_ComposeCreaterHelper,
387 400 : this, req_id, user_id, username, writer_text_map);
388 : auto media_future =
389 : std::async(std::launch::async, &ComposePostHandler::_ComposeMediaHelper,
390 400 : this, req_id, media_types, media_ids, writer_text_map);
391 : auto unique_id_future = std::async(
392 : std::launch::async, &ComposePostHandler::_ComposeUniqueIdHelper, this,
393 400 : req_id, post_type, writer_text_map);
394 :
395 400 : Post post;
396 : auto timestamp =
397 400 : duration_cast<milliseconds>(system_clock::now().time_since_epoch())
398 200 : .count();
399 200 : post.timestamp = timestamp;
400 :
401 200 : post.post_id = unique_id_future.get();
402 400 : LOG(info) << "get unique id: " << post.post_id << " [req_id=" << req_id << "]";
403 :
404 200 : post.creator = creator_future.get();
405 400 : LOG(info) << "get creator info [req_id=" << req_id << "]";
406 :
407 200 : post.media = media_future.get();
408 0 : LOG(info) << "get media info, media count: " << post.media.size() << " [req_id=" << req_id << "]";
409 :
410 0 : auto text_return = text_future.get();
411 0 : post.text = text_return.text;
412 0 : post.urls = text_return.urls;
413 0 : post.user_mentions = text_return.user_mentions;
414 0 : LOG(info) << "get text info, text length: " << post.text.length()
415 0 : << ", user mention count: " << post.user_mentions.size()
416 0 : << " [req_id=" << req_id << "]";
417 :
418 0 : post.req_id = req_id;
419 0 : post.post_type = post_type;
420 :
421 0 : std::vector<int64_t> user_mention_ids;
422 0 : for (auto &item : post.user_mentions) {
423 0 : user_mention_ids.emplace_back(item.user_id);
424 : }
425 :
426 : //In mixed workloed condition, need to make sure _UploadPostHelper execute
427 : //Before _UploadUserTimelineHelper and _UploadHomeTimelineHelper.
428 : //Change _UploadUserTimelineHelper and _UploadHomeTimelineHelper to deferred.
429 : //To let them start execute after post_future.get() return.
430 : auto post_future =
431 : std::async(std::launch::async, &ComposePostHandler::_UploadPostHelper,
432 0 : this, req_id, post, writer_text_map);
433 : auto user_timeline_future = std::async(
434 : std::launch::deferred, &ComposePostHandler::_UploadUserTimelineHelper, this,
435 0 : req_id, post.post_id, user_id, timestamp, writer_text_map);
436 : auto home_timeline_future = std::async(
437 : std::launch::deferred, &ComposePostHandler::_UploadHomeTimelineHelper, this,
438 : req_id, post.post_id, user_id, timestamp, user_mention_ids,
439 0 : writer_text_map);
440 :
441 0 : post_future.get();
442 0 : LOG(info) << "post storage completed [req_id=" << req_id << ", post_id=" << post.post_id << "]";
443 :
444 0 : user_timeline_future.get();
445 0 : LOG(info) << "user timeline update completed [req_id=" << req_id << "]";
446 :
447 0 : home_timeline_future.get();
448 0 : LOG(info) << "home timeline update completed [req_id=" << req_id << "]";
449 :
450 0 : span->Finish();
451 0 : LOG(info) << "compose post request completed [req_id=" << req_id << ", post_id=" << post.post_id << "]";
452 0 : }
453 :
454 : } // namespace social_network
455 :
456 : #endif // SOCIAL_NETWORK_MICROSERVICES_SRC_COMPOSEPOSTSERVICE_COMPOSEPOSTHANDLER_H_
|