Line data Source code
1 : #ifndef SOCIAL_NETWORK_MICROSERVICES_POSTSTORAGEHANDLER_H
2 : #define SOCIAL_NETWORK_MICROSERVICES_POSTSTORAGEHANDLER_H
3 :
4 : #include <bson/bson.h>
5 : #include <libmemcached/memcached.h>
6 : #include <libmemcached/util.h>
7 : #include <mongoc.h>
8 :
9 : #include <future>
10 : #include <iostream>
11 : #include <nlohmann/json.hpp>
12 : #include <string>
13 :
14 : #include "../../gen-cpp/PostStorageService.h"
15 : #include "../logger.h"
16 : #include "../tracing.h"
17 :
18 : namespace social_network {
19 : using json = nlohmann::json;
20 :
21 : class PostStorageHandler : public PostStorageServiceIf {
22 : public:
23 : PostStorageHandler(memcached_pool_st *, mongoc_client_pool_t *);
24 0 : ~PostStorageHandler() override = default;
25 :
26 : void StorePost(int64_t req_id, const Post &post,
27 : const std::map<std::string, std::string> &carrier) override;
28 :
29 : void ReadPost(Post &_return, int64_t req_id, int64_t post_id,
30 : const std::map<std::string, std::string> &carrier) override;
31 :
32 : void ReadPosts(std::vector<Post> &_return, int64_t req_id,
33 : const std::vector<int64_t> &post_ids,
34 : const std::map<std::string, std::string> &carrier) override;
35 :
36 : private:
37 : memcached_pool_st *_memcached_client_pool;
38 : mongoc_client_pool_t *_mongodb_client_pool;
39 : };
40 :
41 1 : PostStorageHandler::PostStorageHandler(
42 : memcached_pool_st *memcached_client_pool,
43 1 : mongoc_client_pool_t *mongodb_client_pool) {
44 1 : _memcached_client_pool = memcached_client_pool;
45 1 : _mongodb_client_pool = mongodb_client_pool;
46 1 : }
47 :
48 0 : void PostStorageHandler::StorePost(
49 : int64_t req_id, const social_network::Post &post,
50 : const std::map<std::string, std::string> &carrier) {
51 : // 新增:记录收到的存储请求
52 0 : LOG(info) << "Received StorePost request [req_id=" << req_id << ", post_id=" << post.post_id << "]";
53 : // Initialize a span
54 0 : TextMapReader reader(carrier);
55 0 : std::map<std::string, std::string> writer_text_map;
56 0 : TextMapWriter writer(writer_text_map);
57 0 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
58 0 : auto span = opentracing::Tracer::Global()->StartSpan(
59 0 : "store_post_server", {opentracing::ChildOf(parent_span->get())});
60 0 : opentracing::Tracer::Global()->Inject(span->context(), writer);
61 :
62 : mongoc_client_t *mongodb_client =
63 0 : mongoc_client_pool_pop(_mongodb_client_pool);
64 0 : if (!mongodb_client) {
65 0 : ServiceException se;
66 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
67 0 : se.message = "Failed to pop a client from MongoDB pool";
68 0 : throw se;
69 : }
70 :
71 : auto collection =
72 0 : mongoc_client_get_collection(mongodb_client, "post", "post");
73 0 : if (!collection) {
74 0 : ServiceException se;
75 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
76 0 : se.message = "Failed to create collection user from DB user";
77 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
78 0 : throw se;
79 : }
80 :
81 0 : bson_t *new_doc = bson_new();
82 0 : BSON_APPEND_INT64(new_doc, "post_id", post.post_id);
83 0 : BSON_APPEND_INT64(new_doc, "timestamp", post.timestamp);
84 0 : BSON_APPEND_UTF8(new_doc, "text", post.text.c_str());
85 0 : BSON_APPEND_INT64(new_doc, "req_id", post.req_id);
86 0 : BSON_APPEND_INT32(new_doc, "post_type", post.post_type);
87 :
88 : bson_t creator_doc;
89 0 : BSON_APPEND_DOCUMENT_BEGIN(new_doc, "creator", &creator_doc);
90 0 : BSON_APPEND_INT64(&creator_doc, "user_id", post.creator.user_id);
91 0 : BSON_APPEND_UTF8(&creator_doc, "username", post.creator.username.c_str());
92 0 : bson_append_document_end(new_doc, &creator_doc);
93 :
94 : const char *key;
95 0 : int idx = 0;
96 : char buf[16];
97 :
98 : bson_t url_list;
99 0 : BSON_APPEND_ARRAY_BEGIN(new_doc, "urls", &url_list);
100 0 : for (auto &url : post.urls) {
101 0 : bson_uint32_to_string(idx, &key, buf, sizeof buf);
102 : bson_t url_doc;
103 0 : BSON_APPEND_DOCUMENT_BEGIN(&url_list, key, &url_doc);
104 0 : BSON_APPEND_UTF8(&url_doc, "shortened_url", url.shortened_url.c_str());
105 0 : BSON_APPEND_UTF8(&url_doc, "expanded_url", url.expanded_url.c_str());
106 0 : bson_append_document_end(&url_list, &url_doc);
107 0 : idx++;
108 : }
109 0 : bson_append_array_end(new_doc, &url_list);
110 :
111 : bson_t user_mention_list;
112 0 : idx = 0;
113 0 : BSON_APPEND_ARRAY_BEGIN(new_doc, "user_mentions", &user_mention_list);
114 0 : for (auto &user_mention : post.user_mentions) {
115 0 : bson_uint32_to_string(idx, &key, buf, sizeof buf);
116 : bson_t user_mention_doc;
117 0 : BSON_APPEND_DOCUMENT_BEGIN(&user_mention_list, key, &user_mention_doc);
118 0 : BSON_APPEND_INT64(&user_mention_doc, "user_id", user_mention.user_id);
119 0 : BSON_APPEND_UTF8(&user_mention_doc, "username",
120 0 : user_mention.username.c_str());
121 0 : bson_append_document_end(&user_mention_list, &user_mention_doc);
122 0 : idx++;
123 : }
124 0 : bson_append_array_end(new_doc, &user_mention_list);
125 :
126 : bson_t media_list;
127 0 : idx = 0;
128 0 : BSON_APPEND_ARRAY_BEGIN(new_doc, "media", &media_list);
129 0 : for (auto &media : post.media) {
130 0 : bson_uint32_to_string(idx, &key, buf, sizeof buf);
131 : bson_t media_doc;
132 0 : BSON_APPEND_DOCUMENT_BEGIN(&media_list, key, &media_doc);
133 0 : BSON_APPEND_INT64(&media_doc, "media_id", media.media_id);
134 0 : BSON_APPEND_UTF8(&media_doc, "media_type", media.media_type.c_str());
135 0 : bson_append_document_end(&media_list, &media_doc);
136 0 : idx++;
137 : }
138 0 : bson_append_array_end(new_doc, &media_list);
139 :
140 : bson_error_t error;
141 0 : auto insert_span = opentracing::Tracer::Global()->StartSpan(
142 : "post_storage_mongo_insert_client",
143 0 : {opentracing::ChildOf(&span->context())});
144 : bool inserted = mongoc_collection_insert_one(collection, new_doc, nullptr,
145 0 : nullptr, &error);
146 0 : insert_span->Finish();
147 :
148 0 : if (!inserted) {
149 0 : LOG(error) << "Error: Failed to insert post to MongoDB: " << error.message;
150 0 : ServiceException se;
151 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
152 0 : se.message = error.message;
153 0 : bson_destroy(new_doc);
154 0 : mongoc_collection_destroy(collection);
155 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
156 0 : throw se;
157 : }
158 :
159 0 : bson_destroy(new_doc);
160 0 : mongoc_collection_destroy(collection);
161 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
162 :
163 : // 新增:成功插入MongoDB日志
164 0 : LOG(info) << "Post inserted into MongoDB successfully [req_id=" << req_id << ", post_id=" << post.post_id << "]";
165 :
166 0 : span->Finish();
167 0 : }
168 :
169 0 : void PostStorageHandler::ReadPost(
170 : Post &_return, int64_t req_id, int64_t post_id,
171 : const std::map<std::string, std::string> &carrier) {
172 : // 新增:记录收到的读取请求
173 0 : LOG(info) << "Received ReadPost request [req_id=" << req_id << ", post_id=" << post_id << "]";
174 : // Initialize a span
175 0 : TextMapReader reader(carrier);
176 0 : std::map<std::string, std::string> writer_text_map;
177 0 : TextMapWriter writer(writer_text_map);
178 0 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
179 0 : auto span = opentracing::Tracer::Global()->StartSpan(
180 0 : "read_post_server", {opentracing::ChildOf(parent_span->get())});
181 0 : opentracing::Tracer::Global()->Inject(span->context(), writer);
182 :
183 0 : std::string post_id_str = std::to_string(post_id);
184 :
185 : memcached_return_t memcached_rc;
186 : memcached_st *memcached_client =
187 0 : memcached_pool_pop(_memcached_client_pool, true, &memcached_rc);
188 0 : if (!memcached_client) {
189 0 : ServiceException se;
190 0 : se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
191 0 : se.message = "Failed to pop a client from memcached pool";
192 0 : throw se;
193 : }
194 :
195 : size_t post_mmc_size;
196 : uint32_t memcached_flags;
197 0 : auto get_span = opentracing::Tracer::Global()->StartSpan(
198 0 : "post_storage_mmc_get_client", {opentracing::ChildOf(&span->context())});
199 : char *post_mmc =
200 0 : memcached_get(memcached_client, post_id_str.c_str(), post_id_str.length(),
201 0 : &post_mmc_size, &memcached_flags, &memcached_rc);
202 0 : if (!post_mmc && memcached_rc != MEMCACHED_NOTFOUND) {
203 0 : ServiceException se;
204 0 : se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
205 0 : se.message = memcached_strerror(memcached_client, memcached_rc);
206 0 : memcached_pool_push(_memcached_client_pool, memcached_client);
207 0 : throw se;
208 : }
209 0 : memcached_pool_push(_memcached_client_pool, memcached_client);
210 0 : get_span->Finish();
211 :
212 0 : if (post_mmc) {
213 0 : LOG(debug) << "Get post " << post_id << " cache hit from Memcached";
214 : json post_json =
215 0 : json::parse(std::string(post_mmc, post_mmc + post_mmc_size));
216 0 : _return.req_id = post_json["req_id"];
217 0 : _return.timestamp = post_json["timestamp"];
218 0 : _return.post_id = post_json["post_id"];
219 0 : _return.creator.user_id = post_json["creator"]["user_id"];
220 0 : _return.creator.username = post_json["creator"]["username"];
221 0 : _return.post_type = post_json["post_type"];
222 0 : _return.text = post_json["text"];
223 0 : for (auto &item : post_json["media"]) {
224 0 : Media media;
225 0 : media.media_id = item["media_id"];
226 0 : media.media_type = item["media_type"];
227 0 : _return.media.emplace_back(media);
228 : }
229 0 : for (auto &item : post_json["user_mentions"]) {
230 0 : UserMention user_mention;
231 0 : user_mention.username = item["username"];
232 0 : user_mention.user_id = item["user_id"];
233 0 : _return.user_mentions.emplace_back(user_mention);
234 : }
235 0 : for (auto &item : post_json["urls"]) {
236 0 : Url url;
237 0 : url.shortened_url = item["shortened_url"];
238 0 : url.expanded_url = item["expanded_url"];
239 0 : _return.urls.emplace_back(url);
240 : }
241 0 : free(post_mmc);
242 : } else {
243 : // If not cached in memcached
244 : mongoc_client_t *mongodb_client =
245 0 : mongoc_client_pool_pop(_mongodb_client_pool);
246 0 : if (!mongodb_client) {
247 0 : ServiceException se;
248 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
249 0 : se.message = "Failed to pop a client from MongoDB pool";
250 0 : throw se;
251 : }
252 :
253 : auto collection =
254 0 : mongoc_client_get_collection(mongodb_client, "post", "post");
255 0 : if (!collection) {
256 0 : ServiceException se;
257 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
258 0 : se.message = "Failed to create collection user from DB user";
259 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
260 0 : throw se;
261 : }
262 :
263 0 : bson_t *query = bson_new();
264 0 : BSON_APPEND_INT64(query, "post_id", post_id);
265 0 : auto find_span = opentracing::Tracer::Global()->StartSpan(
266 : "post_storage_mongo_find_client",
267 0 : {opentracing::ChildOf(&span->context())});
268 : mongoc_cursor_t *cursor =
269 0 : mongoc_collection_find_with_opts(collection, query, nullptr, nullptr);
270 : const bson_t *doc;
271 0 : bool found = mongoc_cursor_next(cursor, &doc);
272 0 : find_span->Finish();
273 0 : if (!found) {
274 : bson_error_t error;
275 0 : if (mongoc_cursor_error(cursor, &error)) {
276 0 : LOG(warning) << error.message;
277 0 : bson_destroy(query);
278 0 : mongoc_cursor_destroy(cursor);
279 0 : mongoc_collection_destroy(collection);
280 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
281 0 : ServiceException se;
282 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
283 0 : se.message = error.message;
284 0 : throw se;
285 : } else {
286 0 : LOG(warning) << "Post_id: " << post_id << " doesn't exist in MongoDB";
287 0 : bson_destroy(query);
288 0 : mongoc_cursor_destroy(cursor);
289 0 : mongoc_collection_destroy(collection);
290 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
291 0 : ServiceException se;
292 0 : se.errorCode = ErrorCode::SE_THRIFT_HANDLER_ERROR;
293 : se.message =
294 0 : "Post_id: " + std::to_string(post_id) + " doesn't exist in MongoDB";
295 0 : throw se;
296 : }
297 : } else {
298 0 : LOG(debug) << "Post_id: " << post_id << " found in MongoDB";
299 0 : auto post_json_char = bson_as_json(doc, nullptr);
300 0 : json post_json = json::parse(post_json_char);
301 0 : _return.req_id = post_json["req_id"];
302 0 : _return.timestamp = post_json["timestamp"];
303 0 : _return.post_id = post_json["post_id"];
304 0 : _return.creator.user_id = post_json["creator"]["user_id"];
305 0 : _return.creator.username = post_json["creator"]["username"];
306 0 : _return.post_type = post_json["post_type"];
307 0 : _return.text = post_json["text"];
308 0 : for (auto &item : post_json["media"]) {
309 0 : Media media;
310 0 : media.media_id = item["media_id"];
311 0 : media.media_type = item["media_type"];
312 0 : _return.media.emplace_back(media);
313 : }
314 0 : for (auto &item : post_json["user_mentions"]) {
315 0 : UserMention user_mention;
316 0 : user_mention.username = item["username"];
317 0 : user_mention.user_id = item["user_id"];
318 0 : _return.user_mentions.emplace_back(user_mention);
319 : }
320 0 : for (auto &item : post_json["urls"]) {
321 0 : Url url;
322 0 : url.shortened_url = item["shortened_url"];
323 0 : url.expanded_url = item["expanded_url"];
324 0 : _return.urls.emplace_back(url);
325 : }
326 0 : bson_destroy(query);
327 0 : mongoc_cursor_destroy(cursor);
328 0 : mongoc_collection_destroy(collection);
329 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
330 :
331 : // upload post to memcached
332 : memcached_client =
333 0 : memcached_pool_pop(_memcached_client_pool, true, &memcached_rc);
334 0 : if (!memcached_client) {
335 0 : ServiceException se;
336 0 : se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
337 0 : se.message = "Failed to pop a client from memcached pool";
338 0 : throw se;
339 : }
340 0 : auto set_span = opentracing::Tracer::Global()->StartSpan(
341 : "post_storage_mmc_set_client",
342 0 : {opentracing::ChildOf(&span->context())});
343 :
344 0 : memcached_rc = memcached_set(
345 : memcached_client, post_id_str.c_str(), post_id_str.length(),
346 : post_json_char, std::strlen(post_json_char), static_cast<time_t>(0),
347 0 : static_cast<uint32_t>(0));
348 0 : if (memcached_rc != MEMCACHED_SUCCESS) {
349 0 : LOG(warning) << "Failed to set post to Memcached: "
350 0 : << memcached_strerror(memcached_client, memcached_rc);
351 : }
352 0 : set_span->Finish();
353 0 : bson_free(post_json_char);
354 0 : memcached_pool_push(_memcached_client_pool, memcached_client);
355 : }
356 : }
357 :
358 : // 新增:读取完成日志
359 0 : LOG(info) << "ReadPost completed [req_id=" << req_id << ", post_id=" << post_id << "]";
360 0 : span->Finish();
361 0 : }
362 600 : void PostStorageHandler::ReadPosts(
363 : std::vector<Post> &_return, int64_t req_id,
364 : const std::vector<int64_t> &post_ids,
365 : const std::map<std::string, std::string> &carrier) {
366 : // 新增:记录收到的批量读取请求
367 1200 : LOG(info) << "Received ReadPosts request [req_id=" << req_id << ", post_ids count=" << post_ids.size() << "]";
368 : // Initialize a span
369 600 : TextMapReader reader(carrier);
370 600 : std::map<std::string, std::string> writer_text_map;
371 600 : TextMapWriter writer(writer_text_map);
372 600 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
373 1200 : auto span = opentracing::Tracer::Global()->StartSpan(
374 : "post_storage_read_posts_server",
375 1800 : {opentracing::ChildOf(parent_span->get())});
376 600 : opentracing::Tracer::Global()->Inject(span->context(), writer);
377 :
378 600 : if (post_ids.empty()) {
379 600 : return;
380 : }
381 :
382 0 : std::set<int64_t> post_ids_not_cached(post_ids.begin(), post_ids.end());
383 0 : if (post_ids_not_cached.size() != post_ids.size()) {
384 0 : LOG(error)<< "Post_ids are duplicated";
385 0 : ServiceException se;
386 0 : se.errorCode = ErrorCode::SE_THRIFT_HANDLER_ERROR;
387 0 : se.message = "Post_ids are duplicated";
388 0 : throw se;
389 : }
390 0 : std::map<int64_t, Post> return_map;
391 : memcached_return_t memcached_rc;
392 : auto memcached_client =
393 0 : memcached_pool_pop(_memcached_client_pool, true, &memcached_rc);
394 0 : if (!memcached_client) {
395 0 : ServiceException se;
396 0 : se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
397 0 : se.message = "Failed to pop a client from memcached pool";
398 0 : throw se;
399 : }
400 :
401 : char **keys;
402 : size_t *key_sizes;
403 0 : keys = new char *[post_ids.size()];
404 0 : key_sizes = new size_t[post_ids.size()];
405 0 : int idx = 0;
406 0 : for (auto &post_id : post_ids) {
407 0 : std::string key_str = std::to_string(post_id);
408 0 : keys[idx] = new char[key_str.length() + 1];
409 0 : strcpy(keys[idx], key_str.c_str());
410 0 : key_sizes[idx] = key_str.length();
411 0 : idx++;
412 : }
413 : memcached_rc =
414 0 : memcached_mget(memcached_client, keys, key_sizes, post_ids.size());
415 0 : if (memcached_rc != MEMCACHED_SUCCESS) {
416 0 : LOG(error) << "Cannot get post_ids of request " << req_id << ": "
417 0 : << memcached_strerror(memcached_client, memcached_rc);
418 0 : ServiceException se;
419 0 : se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
420 0 : se.message = memcached_strerror(memcached_client, memcached_rc);
421 0 : memcached_pool_push(_memcached_client_pool, memcached_client);
422 0 : throw se;
423 : }
424 :
425 : char return_key[MEMCACHED_MAX_KEY];
426 : size_t return_key_length;
427 : char *return_value;
428 : size_t return_value_length;
429 : uint32_t flags;
430 0 : auto get_span = opentracing::Tracer::Global()->StartSpan(
431 0 : "post_storage_mmc_mget_client", {opentracing::ChildOf(&span->context())});
432 :
433 0 : while (true) {
434 : return_value =
435 : memcached_fetch(memcached_client, return_key, &return_key_length,
436 0 : &return_value_length, &flags, &memcached_rc);
437 0 : if (return_value == nullptr) {
438 0 : LOG(debug) << "Memcached mget finished";
439 0 : break;
440 : }
441 0 : if (memcached_rc != MEMCACHED_SUCCESS) {
442 0 : free(return_value);
443 0 : memcached_quit(memcached_client);
444 0 : memcached_pool_push(_memcached_client_pool, memcached_client);
445 0 : LOG(error) << "Cannot get posts of request " << req_id;
446 0 : ServiceException se;
447 0 : se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
448 0 : se.message = "Cannot get posts of request " + std::to_string(req_id);
449 0 : throw se;
450 : }
451 0 : Post new_post;
452 : json post_json = json::parse(
453 0 : std::string(return_value, return_value + return_value_length));
454 0 : new_post.req_id = post_json["req_id"];
455 0 : new_post.timestamp = post_json["timestamp"];
456 0 : new_post.post_id = post_json["post_id"];
457 0 : new_post.creator.user_id = post_json["creator"]["user_id"];
458 0 : new_post.creator.username = post_json["creator"]["username"];
459 0 : new_post.post_type = post_json["post_type"];
460 0 : new_post.text = post_json["text"];
461 0 : for (auto &item : post_json["media"]) {
462 0 : Media media;
463 0 : media.media_id = item["media_id"];
464 0 : media.media_type = item["media_type"];
465 0 : new_post.media.emplace_back(media);
466 : }
467 0 : for (auto &item : post_json["user_mentions"]) {
468 0 : UserMention user_mention;
469 0 : user_mention.username = item["username"];
470 0 : user_mention.user_id = item["user_id"];
471 0 : new_post.user_mentions.emplace_back(user_mention);
472 : }
473 0 : for (auto &item : post_json["urls"]) {
474 0 : Url url;
475 0 : url.shortened_url = item["shortened_url"];
476 0 : url.expanded_url = item["expanded_url"];
477 0 : new_post.urls.emplace_back(url);
478 : }
479 0 : return_map.insert(std::make_pair(new_post.post_id, new_post));
480 0 : post_ids_not_cached.erase(new_post.post_id);
481 0 : free(return_value);
482 : }
483 0 : get_span->Finish();
484 0 : memcached_quit(memcached_client);
485 0 : memcached_pool_push(_memcached_client_pool, memcached_client);
486 0 : for (int i = 0; i < post_ids.size(); ++i) {
487 0 : delete keys[i];
488 : }
489 0 : delete[] keys;
490 0 : delete[] key_sizes;
491 :
492 0 : std::vector<std::future<void>> set_futures;
493 0 : std::map<int64_t, std::string> post_json_map;
494 :
495 : // Find the rest in MongoDB
496 0 : if (!post_ids_not_cached.empty()) {
497 : mongoc_client_t *mongodb_client =
498 0 : mongoc_client_pool_pop(_mongodb_client_pool);
499 0 : if (!mongodb_client) {
500 0 : ServiceException se;
501 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
502 0 : se.message = "Failed to pop a client from MongoDB pool";
503 0 : throw se;
504 : }
505 : auto collection =
506 0 : mongoc_client_get_collection(mongodb_client, "post", "post");
507 0 : if (!collection) {
508 0 : ServiceException se;
509 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
510 0 : se.message = "Failed to create collection user from DB user";
511 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
512 0 : throw se;
513 : }
514 0 : bson_t *query = bson_new();
515 : bson_t query_child;
516 : bson_t query_post_id_list;
517 : const char *key;
518 0 : idx = 0;
519 : char buf[16];
520 :
521 0 : BSON_APPEND_DOCUMENT_BEGIN(query, "post_id", &query_child);
522 0 : BSON_APPEND_ARRAY_BEGIN(&query_child, "$in", &query_post_id_list);
523 0 : for (auto &item : post_ids_not_cached) {
524 0 : bson_uint32_to_string(idx, &key, buf, sizeof buf);
525 0 : BSON_APPEND_INT64(&query_post_id_list, key, item);
526 0 : idx++;
527 : }
528 0 : bson_append_array_end(&query_child, &query_post_id_list);
529 0 : bson_append_document_end(query, &query_child);
530 : mongoc_cursor_t *cursor =
531 0 : mongoc_collection_find_with_opts(collection, query, nullptr, nullptr);
532 : const bson_t *doc;
533 :
534 0 : auto find_span = opentracing::Tracer::Global()->StartSpan(
535 0 : "mongo_find_client", {opentracing::ChildOf(&span->context())});
536 0 : while (true) {
537 0 : bool found = mongoc_cursor_next(cursor, &doc);
538 0 : if (!found) {
539 0 : break;
540 : }
541 0 : Post new_post;
542 0 : char *post_json_char = bson_as_json(doc, nullptr);
543 0 : json post_json = json::parse(post_json_char);
544 0 : new_post.req_id = post_json["req_id"];
545 0 : new_post.timestamp = post_json["timestamp"];
546 0 : new_post.post_id = post_json["post_id"];
547 0 : new_post.creator.user_id = post_json["creator"]["user_id"];
548 0 : new_post.creator.username = post_json["creator"]["username"];
549 0 : new_post.post_type = post_json["post_type"];
550 0 : new_post.text = post_json["text"];
551 0 : for (auto &item : post_json["media"]) {
552 0 : Media media;
553 0 : media.media_id = item["media_id"];
554 0 : media.media_type = item["media_type"];
555 0 : new_post.media.emplace_back(media);
556 : }
557 0 : for (auto &item : post_json["user_mentions"]) {
558 0 : UserMention user_mention;
559 0 : user_mention.username = item["username"];
560 0 : user_mention.user_id = item["user_id"];
561 0 : new_post.user_mentions.emplace_back(user_mention);
562 : }
563 0 : for (auto &item : post_json["urls"]) {
564 0 : Url url;
565 0 : url.shortened_url = item["shortened_url"];
566 0 : url.expanded_url = item["expanded_url"];
567 0 : new_post.urls.emplace_back(url);
568 : }
569 0 : post_json_map.insert({new_post.post_id, std::string(post_json_char)});
570 0 : return_map.insert({new_post.post_id, new_post});
571 0 : bson_free(post_json_char);
572 : }
573 0 : find_span->Finish();
574 : bson_error_t error;
575 0 : if (mongoc_cursor_error(cursor, &error)) {
576 0 : LOG(warning) << error.message;
577 0 : bson_destroy(query);
578 0 : mongoc_cursor_destroy(cursor);
579 0 : mongoc_collection_destroy(collection);
580 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
581 0 : ServiceException se;
582 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
583 0 : se.message = error.message;
584 0 : throw se;
585 : }
586 0 : bson_destroy(query);
587 0 : mongoc_cursor_destroy(cursor);
588 0 : mongoc_collection_destroy(collection);
589 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
590 :
591 : // upload posts to memcached
592 0 : set_futures.emplace_back(std::async(std::launch::async, [&]() {
593 : memcached_return_t _rc;
594 : auto _memcached_client =
595 0 : memcached_pool_pop(_memcached_client_pool, true, &_rc);
596 0 : if (!_memcached_client) {
597 0 : LOG(error) << "Failed to pop a client from memcached pool";
598 0 : ServiceException se;
599 0 : se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
600 0 : se.message = "Failed to pop a client from memcached pool";
601 0 : throw se;
602 : }
603 0 : auto set_span = opentracing::Tracer::Global()->StartSpan(
604 0 : "mmc_set_client", {opentracing::ChildOf(&span->context())});
605 0 : for (auto &it : post_json_map) {
606 0 : std::string id_str = std::to_string(it.first);
607 0 : _rc = memcached_set(_memcached_client, id_str.c_str(), id_str.length(),
608 : it.second.c_str(), it.second.length(),
609 0 : static_cast<time_t>(0), static_cast<uint32_t>(0));
610 : }
611 0 : memcached_pool_push(_memcached_client_pool, _memcached_client);
612 0 : set_span->Finish();
613 0 : }));
614 : }
615 :
616 0 : if (return_map.size() != post_ids.size()) {
617 : try {
618 0 : for (auto &it : set_futures) {
619 0 : it.get();
620 : }
621 0 : } catch (...) {
622 0 : LOG(warning) << "Failed to set posts to memcached";
623 : }
624 0 : LOG(error) << "Return set incomplete";
625 0 : ServiceException se;
626 0 : se.errorCode = ErrorCode::SE_THRIFT_HANDLER_ERROR;
627 0 : se.message = "Return set incomplete";
628 0 : throw se;
629 : }
630 :
631 0 : for (auto &post_id : post_ids) {
632 0 : _return.emplace_back(return_map[post_id]);
633 : }
634 :
635 : try {
636 0 : for (auto &it : set_futures) {
637 0 : it.get();
638 : }
639 0 : } catch (...) {
640 0 : LOG(warning) << "Failed to set posts to memcached";
641 : }
642 :
643 : // 新增:批量读取完成日志
644 0 : LOG(info) << "ReadPosts completed [req_id=" << req_id << ", total return count=" << _return.size() << "]";
645 : }
646 :
647 : } // namespace social_network
648 :
649 : #endif // SOCIAL_NETWORK_MICROSERVICES_POSTSTORAGEHANDLER_H
|