LCOV - code coverage report
Current view: top level - src/PostStorageService - PostStorageHandler.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 62 433 14.3 %
Date: 2025-11-03 23:12:19 Functions: 3 7 42.9 %

          Line data    Source code
       1             : #ifndef SOCIAL_NETWORK_MICROSERVICES_POSTSTORAGEHANDLER_H
       2             : #define SOCIAL_NETWORK_MICROSERVICES_POSTSTORAGEHANDLER_H
       3             : 
       4             : #include <bson/bson.h>
       5             : #include <libmemcached/memcached.h>
       6             : #include <libmemcached/util.h>
       7             : #include <mongoc.h>
       8             : 
       9             : #include <future>
      10             : #include <iostream>
      11             : #include <nlohmann/json.hpp>
      12             : #include <string>
      13             : 
      14             : #include "../../gen-cpp/PostStorageService.h"
      15             : #include "../logger.h"
      16             : #include "../tracing.h"
      17             : 
      18             : namespace social_network {
      19             : using json = nlohmann::json;
      20             : 
      21             : class PostStorageHandler : public PostStorageServiceIf {
      22             :  public:
      23             :   PostStorageHandler(memcached_pool_st *, mongoc_client_pool_t *);
      24           0 :   ~PostStorageHandler() override = default;
      25             : 
      26             :   void StorePost(int64_t req_id, const Post &post,
      27             :                  const std::map<std::string, std::string> &carrier) override;
      28             : 
      29             :   void ReadPost(Post &_return, int64_t req_id, int64_t post_id,
      30             :                 const std::map<std::string, std::string> &carrier) override;
      31             : 
      32             :   void ReadPosts(std::vector<Post> &_return, int64_t req_id,
      33             :                  const std::vector<int64_t> &post_ids,
      34             :                  const std::map<std::string, std::string> &carrier) override;
      35             : 
      36             :  private:
      37             :   memcached_pool_st *_memcached_client_pool;
      38             :   mongoc_client_pool_t *_mongodb_client_pool;
      39             : };
      40             : 
      41           1 : PostStorageHandler::PostStorageHandler(
      42             :     memcached_pool_st *memcached_client_pool,
      43           1 :     mongoc_client_pool_t *mongodb_client_pool) {
      44           1 :   _memcached_client_pool = memcached_client_pool;
      45           1 :   _mongodb_client_pool = mongodb_client_pool;
      46           1 : }
      47             : 
      48         200 : void PostStorageHandler::StorePost(
      49             :     int64_t req_id, const social_network::Post &post,
      50             :     const std::map<std::string, std::string> &carrier) {
      51             :   // 新增:记录收到的存储请求
      52         400 :   LOG(info) << "Received StorePost request [req_id=" << req_id << ", post_id=" << post.post_id << "]";
      53             :   // Initialize a span
      54         400 :   TextMapReader reader(carrier);
      55         400 :   std::map<std::string, std::string> writer_text_map;
      56         400 :   TextMapWriter writer(writer_text_map);
      57         400 :   auto parent_span = opentracing::Tracer::Global()->Extract(reader);
      58         400 :   auto span = opentracing::Tracer::Global()->StartSpan(
      59         800 :       "store_post_server", {opentracing::ChildOf(parent_span->get())});
      60         200 :   opentracing::Tracer::Global()->Inject(span->context(), writer);
      61             : 
      62             :   mongoc_client_t *mongodb_client =
      63         200 :       mongoc_client_pool_pop(_mongodb_client_pool);
      64         200 :   if (!mongodb_client) {
      65           0 :     ServiceException se;
      66           0 :     se.errorCode = ErrorCode::SE_MONGODB_ERROR;
      67           0 :     se.message = "Failed to pop a client from MongoDB pool";
      68           0 :     throw se;
      69             :   }
      70             : 
      71             :   auto collection =
      72         200 :       mongoc_client_get_collection(mongodb_client, "post", "post");
      73         200 :   if (!collection) {
      74           0 :     ServiceException se;
      75           0 :     se.errorCode = ErrorCode::SE_MONGODB_ERROR;
      76           0 :     se.message = "Failed to create collection user from DB user";
      77           0 :     mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
      78           0 :     throw se;
      79             :   }
      80             : 
      81         200 :   bson_t *new_doc = bson_new();
      82         200 :   BSON_APPEND_INT64(new_doc, "post_id", post.post_id);
      83         200 :   BSON_APPEND_INT64(new_doc, "timestamp", post.timestamp);
      84         200 :   BSON_APPEND_UTF8(new_doc, "text", post.text.c_str());
      85         200 :   BSON_APPEND_INT64(new_doc, "req_id", post.req_id);
      86         200 :   BSON_APPEND_INT32(new_doc, "post_type", post.post_type);
      87             : 
      88             :   bson_t creator_doc;
      89         200 :   BSON_APPEND_DOCUMENT_BEGIN(new_doc, "creator", &creator_doc);
      90         200 :   BSON_APPEND_INT64(&creator_doc, "user_id", post.creator.user_id);
      91         200 :   BSON_APPEND_UTF8(&creator_doc, "username", post.creator.username.c_str());
      92         200 :   bson_append_document_end(new_doc, &creator_doc);
      93             : 
      94             :   const char *key;
      95         200 :   int idx = 0;
      96             :   char buf[16];
      97             : 
      98             :   bson_t url_list;
      99         200 :   BSON_APPEND_ARRAY_BEGIN(new_doc, "urls", &url_list);
     100         200 :   for (auto &url : post.urls) {
     101           0 :     bson_uint32_to_string(idx, &key, buf, sizeof buf);
     102             :     bson_t url_doc;
     103           0 :     BSON_APPEND_DOCUMENT_BEGIN(&url_list, key, &url_doc);
     104           0 :     BSON_APPEND_UTF8(&url_doc, "shortened_url", url.shortened_url.c_str());
     105           0 :     BSON_APPEND_UTF8(&url_doc, "expanded_url", url.expanded_url.c_str());
     106           0 :     bson_append_document_end(&url_list, &url_doc);
     107           0 :     idx++;
     108             :   }
     109         200 :   bson_append_array_end(new_doc, &url_list);
     110             : 
     111             :   bson_t user_mention_list;
     112         200 :   idx = 0;
     113         200 :   BSON_APPEND_ARRAY_BEGIN(new_doc, "user_mentions", &user_mention_list);
     114         200 :   for (auto &user_mention : post.user_mentions) {
     115           0 :     bson_uint32_to_string(idx, &key, buf, sizeof buf);
     116             :     bson_t user_mention_doc;
     117           0 :     BSON_APPEND_DOCUMENT_BEGIN(&user_mention_list, key, &user_mention_doc);
     118           0 :     BSON_APPEND_INT64(&user_mention_doc, "user_id", user_mention.user_id);
     119           0 :     BSON_APPEND_UTF8(&user_mention_doc, "username",
     120           0 :                      user_mention.username.c_str());
     121           0 :     bson_append_document_end(&user_mention_list, &user_mention_doc);
     122           0 :     idx++;
     123             :   }
     124         200 :   bson_append_array_end(new_doc, &user_mention_list);
     125             : 
     126             :   bson_t media_list;
     127         200 :   idx = 0;
     128         200 :   BSON_APPEND_ARRAY_BEGIN(new_doc, "media", &media_list);
     129         200 :   for (auto &media : post.media) {
     130           0 :     bson_uint32_to_string(idx, &key, buf, sizeof buf);
     131             :     bson_t media_doc;
     132           0 :     BSON_APPEND_DOCUMENT_BEGIN(&media_list, key, &media_doc);
     133           0 :     BSON_APPEND_INT64(&media_doc, "media_id", media.media_id);
     134           0 :     BSON_APPEND_UTF8(&media_doc, "media_type", media.media_type.c_str());
     135           0 :     bson_append_document_end(&media_list, &media_doc);
     136           0 :     idx++;
     137             :   }
     138         200 :   bson_append_array_end(new_doc, &media_list);
     139             : 
     140             :   bson_error_t error;
     141         400 :   auto insert_span = opentracing::Tracer::Global()->StartSpan(
     142             :       "post_storage_mongo_insert_client",
     143         800 :       {opentracing::ChildOf(&span->context())});
     144             :   bool inserted = mongoc_collection_insert_one(collection, new_doc, nullptr,
     145         200 :                                                nullptr, &error);
     146         200 :   insert_span->Finish();
     147             : 
     148         200 :   if (!inserted) {
     149           0 :     LOG(error) << "Error: Failed to insert post to MongoDB: " << error.message;
     150           0 :     ServiceException se;
     151           0 :     se.errorCode = ErrorCode::SE_MONGODB_ERROR;
     152           0 :     se.message = error.message;
     153           0 :     bson_destroy(new_doc);
     154           0 :     mongoc_collection_destroy(collection);
     155           0 :     mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
     156           0 :     throw se;
     157             :   }
     158             : 
     159         200 :   bson_destroy(new_doc);
     160         200 :   mongoc_collection_destroy(collection);
     161         200 :   mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
     162             : 
     163             :   // 新增:成功插入MongoDB日志
     164         400 :   LOG(info) << "Post inserted into MongoDB successfully [req_id=" << req_id << ", post_id=" << post.post_id << "]";
     165             : 
     166         200 :   span->Finish();
     167         200 : }
     168             : 
     169           0 : void PostStorageHandler::ReadPost(
     170             :     Post &_return, int64_t req_id, int64_t post_id,
     171             :     const std::map<std::string, std::string> &carrier) {
     172             :   // 新增:记录收到的读取请求
     173           0 :   LOG(info) << "Received ReadPost request [req_id=" << req_id << ", post_id=" << post_id << "]";
     174             :   // Initialize a span
     175           0 :   TextMapReader reader(carrier);
     176           0 :   std::map<std::string, std::string> writer_text_map;
     177           0 :   TextMapWriter writer(writer_text_map);
     178           0 :   auto parent_span = opentracing::Tracer::Global()->Extract(reader);
     179           0 :   auto span = opentracing::Tracer::Global()->StartSpan(
     180           0 :       "read_post_server", {opentracing::ChildOf(parent_span->get())});
     181           0 :   opentracing::Tracer::Global()->Inject(span->context(), writer);
     182             : 
     183           0 :   std::string post_id_str = std::to_string(post_id);
     184             : 
     185             :   memcached_return_t memcached_rc;
     186             :   memcached_st *memcached_client =
     187           0 :       memcached_pool_pop(_memcached_client_pool, true, &memcached_rc);
     188           0 :   if (!memcached_client) {
     189           0 :     ServiceException se;
     190           0 :     se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
     191           0 :     se.message = "Failed to pop a client from memcached pool";
     192           0 :     throw se;
     193             :   }
     194             : 
     195             :   size_t post_mmc_size;
     196             :   uint32_t memcached_flags;
     197           0 :   auto get_span = opentracing::Tracer::Global()->StartSpan(
     198           0 :       "post_storage_mmc_get_client", {opentracing::ChildOf(&span->context())});
     199             :   char *post_mmc =
     200           0 :       memcached_get(memcached_client, post_id_str.c_str(), post_id_str.length(),
     201           0 :                     &post_mmc_size, &memcached_flags, &memcached_rc);
     202           0 :   if (!post_mmc && memcached_rc != MEMCACHED_NOTFOUND) {
     203           0 :     ServiceException se;
     204           0 :     se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
     205           0 :     se.message = memcached_strerror(memcached_client, memcached_rc);
     206           0 :     memcached_pool_push(_memcached_client_pool, memcached_client);
     207           0 :     throw se;
     208             :   }
     209           0 :   memcached_pool_push(_memcached_client_pool, memcached_client);
     210           0 :   get_span->Finish();
     211             : 
     212           0 :   if (post_mmc) {
     213           0 :     LOG(debug) << "Get post " << post_id << " cache hit from Memcached";
     214             :     json post_json =
     215           0 :         json::parse(std::string(post_mmc, post_mmc + post_mmc_size));
     216           0 :     _return.req_id = post_json["req_id"];
     217           0 :     _return.timestamp = post_json["timestamp"];
     218           0 :     _return.post_id = post_json["post_id"];
     219           0 :     _return.creator.user_id = post_json["creator"]["user_id"];
     220           0 :     _return.creator.username = post_json["creator"]["username"];
     221           0 :     _return.post_type = post_json["post_type"];
     222           0 :     _return.text = post_json["text"];
     223           0 :     for (auto &item : post_json["media"]) {
     224           0 :       Media media;
     225           0 :       media.media_id = item["media_id"];
     226           0 :       media.media_type = item["media_type"];
     227           0 :       _return.media.emplace_back(media);
     228             :     }
     229           0 :     for (auto &item : post_json["user_mentions"]) {
     230           0 :       UserMention user_mention;
     231           0 :       user_mention.username = item["username"];
     232           0 :       user_mention.user_id = item["user_id"];
     233           0 :       _return.user_mentions.emplace_back(user_mention);
     234             :     }
     235           0 :     for (auto &item : post_json["urls"]) {
     236           0 :       Url url;
     237           0 :       url.shortened_url = item["shortened_url"];
     238           0 :       url.expanded_url = item["expanded_url"];
     239           0 :       _return.urls.emplace_back(url);
     240             :     }
     241           0 :     free(post_mmc);
     242             :   } else {
     243             :     // If not cached in memcached
     244             :     mongoc_client_t *mongodb_client =
     245           0 :         mongoc_client_pool_pop(_mongodb_client_pool);
     246           0 :     if (!mongodb_client) {
     247           0 :       ServiceException se;
     248           0 :       se.errorCode = ErrorCode::SE_MONGODB_ERROR;
     249           0 :       se.message = "Failed to pop a client from MongoDB pool";
     250           0 :       throw se;
     251             :     }
     252             : 
     253             :     auto collection =
     254           0 :         mongoc_client_get_collection(mongodb_client, "post", "post");
     255           0 :     if (!collection) {
     256           0 :       ServiceException se;
     257           0 :       se.errorCode = ErrorCode::SE_MONGODB_ERROR;
     258           0 :       se.message = "Failed to create collection user from DB user";
     259           0 :       mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
     260           0 :       throw se;
     261             :     }
     262             : 
     263           0 :     bson_t *query = bson_new();
     264           0 :     BSON_APPEND_INT64(query, "post_id", post_id);
     265           0 :     auto find_span = opentracing::Tracer::Global()->StartSpan(
     266             :         "post_storage_mongo_find_client",
     267           0 :         {opentracing::ChildOf(&span->context())});
     268             :     mongoc_cursor_t *cursor =
     269           0 :         mongoc_collection_find_with_opts(collection, query, nullptr, nullptr);
     270             :     const bson_t *doc;
     271           0 :     bool found = mongoc_cursor_next(cursor, &doc);
     272           0 :     find_span->Finish();
     273           0 :     if (!found) {
     274             :       bson_error_t error;
     275           0 :       if (mongoc_cursor_error(cursor, &error)) {
     276           0 :         LOG(warning) << error.message;
     277           0 :         bson_destroy(query);
     278           0 :         mongoc_cursor_destroy(cursor);
     279           0 :         mongoc_collection_destroy(collection);
     280           0 :         mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
     281           0 :         ServiceException se;
     282           0 :         se.errorCode = ErrorCode::SE_MONGODB_ERROR;
     283           0 :         se.message = error.message;
     284           0 :         throw se;
     285             :       } else {
     286           0 :         LOG(warning) << "Post_id: " << post_id << " doesn't exist in MongoDB";
     287           0 :         bson_destroy(query);
     288           0 :         mongoc_cursor_destroy(cursor);
     289           0 :         mongoc_collection_destroy(collection);
     290           0 :         mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
     291           0 :         ServiceException se;
     292           0 :         se.errorCode = ErrorCode::SE_THRIFT_HANDLER_ERROR;
     293             :         se.message =
     294           0 :             "Post_id: " + std::to_string(post_id) + " doesn't exist in MongoDB";
     295           0 :         throw se;
     296             :       }
     297             :     } else {
     298           0 :       LOG(debug) << "Post_id: " << post_id << " found in MongoDB";
     299           0 :       auto post_json_char = bson_as_json(doc, nullptr);
     300           0 :       json post_json = json::parse(post_json_char);
     301           0 :       _return.req_id = post_json["req_id"];
     302           0 :       _return.timestamp = post_json["timestamp"];
     303           0 :       _return.post_id = post_json["post_id"];
     304           0 :       _return.creator.user_id = post_json["creator"]["user_id"];
     305           0 :       _return.creator.username = post_json["creator"]["username"];
     306           0 :       _return.post_type = post_json["post_type"];
     307           0 :       _return.text = post_json["text"];
     308           0 :       for (auto &item : post_json["media"]) {
     309           0 :         Media media;
     310           0 :         media.media_id = item["media_id"];
     311           0 :         media.media_type = item["media_type"];
     312           0 :         _return.media.emplace_back(media);
     313             :       }
     314           0 :       for (auto &item : post_json["user_mentions"]) {
     315           0 :         UserMention user_mention;
     316           0 :         user_mention.username = item["username"];
     317           0 :         user_mention.user_id = item["user_id"];
     318           0 :         _return.user_mentions.emplace_back(user_mention);
     319             :       }
     320           0 :       for (auto &item : post_json["urls"]) {
     321           0 :         Url url;
     322           0 :         url.shortened_url = item["shortened_url"];
     323           0 :         url.expanded_url = item["expanded_url"];
     324           0 :         _return.urls.emplace_back(url);
     325             :       }
     326           0 :       bson_destroy(query);
     327           0 :       mongoc_cursor_destroy(cursor);
     328           0 :       mongoc_collection_destroy(collection);
     329           0 :       mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
     330             : 
     331             :       // upload post to memcached
     332             :       memcached_client =
     333           0 :           memcached_pool_pop(_memcached_client_pool, true, &memcached_rc);
     334           0 :       if (!memcached_client) {
     335           0 :         ServiceException se;
     336           0 :         se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
     337           0 :         se.message = "Failed to pop a client from memcached pool";
     338           0 :         throw se;
     339             :       }
     340           0 :       auto set_span = opentracing::Tracer::Global()->StartSpan(
     341             :           "post_storage_mmc_set_client",
     342           0 :           {opentracing::ChildOf(&span->context())});
     343             : 
     344           0 :       memcached_rc = memcached_set(
     345             :           memcached_client, post_id_str.c_str(), post_id_str.length(),
     346             :           post_json_char, std::strlen(post_json_char), static_cast<time_t>(0),
     347           0 :           static_cast<uint32_t>(0));
     348           0 :       if (memcached_rc != MEMCACHED_SUCCESS) {
     349           0 :         LOG(warning) << "Failed to set post to Memcached: "
     350           0 :                      << memcached_strerror(memcached_client, memcached_rc);
     351             :       }
     352           0 :       set_span->Finish();
     353           0 :       bson_free(post_json_char);
     354           0 :       memcached_pool_push(_memcached_client_pool, memcached_client);
     355             :     }
     356             :   }
     357             : 
     358             :   // 新增:读取完成日志
     359           0 :   LOG(info) << "ReadPost completed [req_id=" << req_id << ", post_id=" << post_id << "]";
     360           0 :   span->Finish();
     361           0 : }
     362         600 : void PostStorageHandler::ReadPosts(
     363             :     std::vector<Post> &_return, int64_t req_id,
     364             :     const std::vector<int64_t> &post_ids,
     365             :     const std::map<std::string, std::string> &carrier) {
     366             :   // 新增:记录收到的批量读取请求
     367        1200 :   LOG(info) << "Received ReadPosts request [req_id=" << req_id << ", post_ids count=" << post_ids.size() << "]";
     368             :   // Initialize a span
     369         600 :   TextMapReader reader(carrier);
     370         600 :   std::map<std::string, std::string> writer_text_map;
     371         600 :   TextMapWriter writer(writer_text_map);
     372         600 :   auto parent_span = opentracing::Tracer::Global()->Extract(reader);
     373        1200 :   auto span = opentracing::Tracer::Global()->StartSpan(
     374             :       "post_storage_read_posts_server",
     375        1800 :       {opentracing::ChildOf(parent_span->get())});
     376         600 :   opentracing::Tracer::Global()->Inject(span->context(), writer);
     377             : 
     378         600 :   if (post_ids.empty()) {
     379         600 :     return;
     380             :   }
     381             : 
     382           0 :   std::set<int64_t> post_ids_not_cached(post_ids.begin(), post_ids.end());
     383           0 :   if (post_ids_not_cached.size() != post_ids.size()) {
     384           0 :     LOG(error)<< "Post_ids are duplicated";
     385           0 :     ServiceException se;
     386           0 :     se.errorCode = ErrorCode::SE_THRIFT_HANDLER_ERROR;
     387           0 :     se.message = "Post_ids are duplicated";
     388           0 :     throw se;
     389             :   }
     390           0 :   std::map<int64_t, Post> return_map;
     391             :   memcached_return_t memcached_rc;
     392             :   auto memcached_client =
     393           0 :       memcached_pool_pop(_memcached_client_pool, true, &memcached_rc);
     394           0 :   if (!memcached_client) {
     395           0 :     ServiceException se;
     396           0 :     se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
     397           0 :     se.message = "Failed to pop a client from memcached pool";
     398           0 :     throw se;
     399             :   }
     400             : 
     401             :   char **keys;
     402             :   size_t *key_sizes;
     403           0 :   keys = new char *[post_ids.size()];
     404           0 :   key_sizes = new size_t[post_ids.size()];
     405           0 :   int idx = 0;
     406           0 :   for (auto &post_id : post_ids) {
     407           0 :     std::string key_str = std::to_string(post_id);
     408           0 :     keys[idx] = new char[key_str.length() + 1];
     409           0 :     strcpy(keys[idx], key_str.c_str());
     410           0 :     key_sizes[idx] = key_str.length();
     411           0 :     idx++;
     412             :   }
     413             :   memcached_rc =
     414           0 :       memcached_mget(memcached_client, keys, key_sizes, post_ids.size());
     415           0 :   if (memcached_rc != MEMCACHED_SUCCESS) {
     416           0 :     LOG(error) << "Cannot get post_ids of request " << req_id << ": "
     417           0 :                << memcached_strerror(memcached_client, memcached_rc);
     418           0 :     ServiceException se;
     419           0 :     se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
     420           0 :     se.message = memcached_strerror(memcached_client, memcached_rc);
     421           0 :     memcached_pool_push(_memcached_client_pool, memcached_client);
     422           0 :     throw se;
     423             :   }
     424             : 
     425             :   char return_key[MEMCACHED_MAX_KEY];
     426             :   size_t return_key_length;
     427             :   char *return_value;
     428             :   size_t return_value_length;
     429             :   uint32_t flags;
     430           0 :   auto get_span = opentracing::Tracer::Global()->StartSpan(
     431           0 :       "post_storage_mmc_mget_client", {opentracing::ChildOf(&span->context())});
     432             : 
     433           0 :   while (true) {
     434             :     return_value =
     435             :         memcached_fetch(memcached_client, return_key, &return_key_length,
     436           0 :                         &return_value_length, &flags, &memcached_rc);
     437           0 :     if (return_value == nullptr) {
     438           0 :       LOG(debug) << "Memcached mget finished";
     439           0 :       break;
     440             :     }
     441           0 :     if (memcached_rc != MEMCACHED_SUCCESS) {
     442           0 :       free(return_value);
     443           0 :       memcached_quit(memcached_client);
     444           0 :       memcached_pool_push(_memcached_client_pool, memcached_client);
     445           0 :       LOG(error) << "Cannot get posts of request " << req_id;
     446           0 :       ServiceException se;
     447           0 :       se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
     448           0 :       se.message = "Cannot get posts of request " + std::to_string(req_id);
     449           0 :       throw se;
     450             :     }
     451           0 :     Post new_post;
     452             :     json post_json = json::parse(
     453           0 :         std::string(return_value, return_value + return_value_length));
     454           0 :     new_post.req_id = post_json["req_id"];
     455           0 :     new_post.timestamp = post_json["timestamp"];
     456           0 :     new_post.post_id = post_json["post_id"];
     457           0 :     new_post.creator.user_id = post_json["creator"]["user_id"];
     458           0 :     new_post.creator.username = post_json["creator"]["username"];
     459           0 :     new_post.post_type = post_json["post_type"];
     460           0 :     new_post.text = post_json["text"];
     461           0 :     for (auto &item : post_json["media"]) {
     462           0 :       Media media;
     463           0 :       media.media_id = item["media_id"];
     464           0 :       media.media_type = item["media_type"];
     465           0 :       new_post.media.emplace_back(media);
     466             :     }
     467           0 :     for (auto &item : post_json["user_mentions"]) {
     468           0 :       UserMention user_mention;
     469           0 :       user_mention.username = item["username"];
     470           0 :       user_mention.user_id = item["user_id"];
     471           0 :       new_post.user_mentions.emplace_back(user_mention);
     472             :     }
     473           0 :     for (auto &item : post_json["urls"]) {
     474           0 :       Url url;
     475           0 :       url.shortened_url = item["shortened_url"];
     476           0 :       url.expanded_url = item["expanded_url"];
     477           0 :       new_post.urls.emplace_back(url);
     478             :     }
     479           0 :     return_map.insert(std::make_pair(new_post.post_id, new_post));
     480           0 :     post_ids_not_cached.erase(new_post.post_id);
     481           0 :     free(return_value);
     482             :   }
     483           0 :   get_span->Finish();
     484           0 :   memcached_quit(memcached_client);
     485           0 :   memcached_pool_push(_memcached_client_pool, memcached_client);
     486           0 :   for (int i = 0; i < post_ids.size(); ++i) {
     487           0 :     delete keys[i];
     488             :   }
     489           0 :   delete[] keys;
     490           0 :   delete[] key_sizes;
     491             : 
     492           0 :   std::vector<std::future<void>> set_futures;
     493           0 :   std::map<int64_t, std::string> post_json_map;
     494             : 
     495             :   // Find the rest in MongoDB
     496           0 :   if (!post_ids_not_cached.empty()) {
     497             :     mongoc_client_t *mongodb_client =
     498           0 :         mongoc_client_pool_pop(_mongodb_client_pool);
     499           0 :     if (!mongodb_client) {
     500           0 :       ServiceException se;
     501           0 :       se.errorCode = ErrorCode::SE_MONGODB_ERROR;
     502           0 :       se.message = "Failed to pop a client from MongoDB pool";
     503           0 :       throw se;
     504             :     }
     505             :     auto collection =
     506           0 :         mongoc_client_get_collection(mongodb_client, "post", "post");
     507           0 :     if (!collection) {
     508           0 :       ServiceException se;
     509           0 :       se.errorCode = ErrorCode::SE_MONGODB_ERROR;
     510           0 :       se.message = "Failed to create collection user from DB user";
     511           0 :       mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
     512           0 :       throw se;
     513             :     }
     514           0 :     bson_t *query = bson_new();
     515             :     bson_t query_child;
     516             :     bson_t query_post_id_list;
     517             :     const char *key;
     518           0 :     idx = 0;
     519             :     char buf[16];
     520             : 
     521           0 :     BSON_APPEND_DOCUMENT_BEGIN(query, "post_id", &query_child);
     522           0 :     BSON_APPEND_ARRAY_BEGIN(&query_child, "$in", &query_post_id_list);
     523           0 :     for (auto &item : post_ids_not_cached) {
     524           0 :       bson_uint32_to_string(idx, &key, buf, sizeof buf);
     525           0 :       BSON_APPEND_INT64(&query_post_id_list, key, item);
     526           0 :       idx++;
     527             :     }
     528           0 :     bson_append_array_end(&query_child, &query_post_id_list);
     529           0 :     bson_append_document_end(query, &query_child);
     530             :     mongoc_cursor_t *cursor =
     531           0 :         mongoc_collection_find_with_opts(collection, query, nullptr, nullptr);
     532             :     const bson_t *doc;
     533             : 
     534           0 :     auto find_span = opentracing::Tracer::Global()->StartSpan(
     535           0 :         "mongo_find_client", {opentracing::ChildOf(&span->context())});
     536           0 :     while (true) {
     537           0 :       bool found = mongoc_cursor_next(cursor, &doc);
     538           0 :       if (!found) {
     539           0 :         break;
     540             :       }
     541           0 :       Post new_post;
     542           0 :       char *post_json_char = bson_as_json(doc, nullptr);
     543           0 :       json post_json = json::parse(post_json_char);
     544           0 :       new_post.req_id = post_json["req_id"];
     545           0 :       new_post.timestamp = post_json["timestamp"];
     546           0 :       new_post.post_id = post_json["post_id"];
     547           0 :       new_post.creator.user_id = post_json["creator"]["user_id"];
     548           0 :       new_post.creator.username = post_json["creator"]["username"];
     549           0 :       new_post.post_type = post_json["post_type"];
     550           0 :       new_post.text = post_json["text"];
     551           0 :       for (auto &item : post_json["media"]) {
     552           0 :         Media media;
     553           0 :         media.media_id = item["media_id"];
     554           0 :         media.media_type = item["media_type"];
     555           0 :         new_post.media.emplace_back(media);
     556             :       }
     557           0 :       for (auto &item : post_json["user_mentions"]) {
     558           0 :         UserMention user_mention;
     559           0 :         user_mention.username = item["username"];
     560           0 :         user_mention.user_id = item["user_id"];
     561           0 :         new_post.user_mentions.emplace_back(user_mention);
     562             :       }
     563           0 :       for (auto &item : post_json["urls"]) {
     564           0 :         Url url;
     565           0 :         url.shortened_url = item["shortened_url"];
     566           0 :         url.expanded_url = item["expanded_url"];
     567           0 :         new_post.urls.emplace_back(url);
     568             :       }
     569           0 :       post_json_map.insert({new_post.post_id, std::string(post_json_char)});
     570           0 :       return_map.insert({new_post.post_id, new_post});
     571           0 :       bson_free(post_json_char);
     572             :     }
     573           0 :     find_span->Finish();
     574             :     bson_error_t error;
     575           0 :     if (mongoc_cursor_error(cursor, &error)) {
     576           0 :       LOG(warning) << error.message;
     577           0 :       bson_destroy(query);
     578           0 :       mongoc_cursor_destroy(cursor);
     579           0 :       mongoc_collection_destroy(collection);
     580           0 :       mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
     581           0 :       ServiceException se;
     582           0 :       se.errorCode = ErrorCode::SE_MONGODB_ERROR;
     583           0 :       se.message = error.message;
     584           0 :       throw se;
     585             :     }
     586           0 :     bson_destroy(query);
     587           0 :     mongoc_cursor_destroy(cursor);
     588           0 :     mongoc_collection_destroy(collection);
     589           0 :     mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
     590             : 
     591             :     // upload posts to memcached
     592           0 :     set_futures.emplace_back(std::async(std::launch::async, [&]() {
     593             :       memcached_return_t _rc;
     594             :       auto _memcached_client =
     595           0 :           memcached_pool_pop(_memcached_client_pool, true, &_rc);
     596           0 :       if (!_memcached_client) {
     597           0 :         LOG(error) << "Failed to pop a client from memcached pool";
     598           0 :         ServiceException se;
     599           0 :         se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
     600           0 :         se.message = "Failed to pop a client from memcached pool";
     601           0 :         throw se;
     602             :       }
     603           0 :       auto set_span = opentracing::Tracer::Global()->StartSpan(
     604           0 :           "mmc_set_client", {opentracing::ChildOf(&span->context())});
     605           0 :       for (auto &it : post_json_map) {
     606           0 :         std::string id_str = std::to_string(it.first);
     607           0 :         _rc = memcached_set(_memcached_client, id_str.c_str(), id_str.length(),
     608             :                             it.second.c_str(), it.second.length(),
     609           0 :                             static_cast<time_t>(0), static_cast<uint32_t>(0));
     610             :       }
     611           0 :       memcached_pool_push(_memcached_client_pool, _memcached_client);
     612           0 :       set_span->Finish();
     613           0 :     }));
     614             :   }
     615             : 
     616           0 :   if (return_map.size() != post_ids.size()) {
     617             :     try {
     618           0 :       for (auto &it : set_futures) {
     619           0 :         it.get();
     620             :       }
     621           0 :     } catch (...) {
     622           0 :       LOG(warning) << "Failed to set posts to memcached";
     623             :     }
     624           0 :     LOG(error) << "Return set incomplete";
     625           0 :     ServiceException se;
     626           0 :     se.errorCode = ErrorCode::SE_THRIFT_HANDLER_ERROR;
     627           0 :     se.message = "Return set incomplete";
     628           0 :     throw se;
     629             :   }
     630             : 
     631           0 :   for (auto &post_id : post_ids) {
     632           0 :     _return.emplace_back(return_map[post_id]);
     633             :   }
     634             : 
     635             :   try {
     636           0 :     for (auto &it : set_futures) {
     637           0 :       it.get();
     638             :     }
     639           0 :   } catch (...) {
     640           0 :     LOG(warning) << "Failed to set posts to memcached";
     641             :   }
     642             : 
     643             :   // 新增:批量读取完成日志
     644           0 :   LOG(info) << "ReadPosts completed [req_id=" << req_id << ", total return count=" << _return.size() << "]";
     645             : }
     646             : 
     647             : }  // namespace social_network
     648             : 
     649             : #endif  // SOCIAL_NETWORK_MICROSERVICES_POSTSTORAGEHANDLER_H

Generated by: LCOV version 1.12