LCOV - code coverage report
Current view: top level - src/UserMentionService - UserMentionHandler.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 21 142 14.8 %
Date: 2025-11-04 01:59:25 Functions: 2 4 50.0 %

          Line data    Source code
       1             : #ifndef SOCIAL_NETWORK_MICROSERVICES_SRC_USERMENTIONSERVICE_USERMENTIONHANDLER_H_
       2             : #define SOCIAL_NETWORK_MICROSERVICES_SRC_USERMENTIONSERVICE_USERMENTIONHANDLER_H_
       3             : 
       4             : #include <bson.h>
       5             : #include <libmemcached/memcached.h>
       6             : #include <libmemcached/util.h>
       7             : #include <mongoc.h>
       8             : 
       9             : #include "../../gen-cpp/UserMentionService.h"
      10             : #include "../../gen-cpp/social_network_types.h"
      11             : #include "../ClientPool.h"
      12             : #include "../logger.h"
      13             : #include "../tracing.h"
      14             : #include "../utils.h"
      15             : 
      16             : namespace social_network {
      17             : 
      18             : class UserMentionHandler : public UserMentionServiceIf {
      19             :  public:
      20             :   UserMentionHandler(memcached_pool_st *, mongoc_client_pool_t *);
      21           0 :   ~UserMentionHandler() override = default;
      22             : 
      23             :   void ComposeUserMentions(std::vector<UserMention> &_return, int64_t,
      24             :                            const std::vector<std::string> &,
      25             :                            const std::map<std::string, std::string> &) override;
      26             : 
      27             :  private:
      28             :   memcached_pool_st *_memcached_client_pool;
      29             :   mongoc_client_pool_t *_mongodb_client_pool;
      30             : };
      31             : 
      32           1 : UserMentionHandler::UserMentionHandler(
      33             :     memcached_pool_st *memcached_client_pool,
      34           1 :     mongoc_client_pool_t *mongodb_client_pool) {
      35           1 :   _memcached_client_pool = memcached_client_pool;
      36           1 :   _mongodb_client_pool = mongodb_client_pool;
      37           1 : }
      38             : 
      39         200 : void UserMentionHandler::ComposeUserMentions(
      40             :     std::vector<UserMention> &_return, int64_t req_id,
      41             :     const std::vector<std::string> &usernames,
      42             :     const std::map<std::string, std::string> &carrier) {
      43             :   // 记录收到请求
      44         400 :   LOG(info) << "Received ComposeUserMentions request [req_id=" << req_id << ", usernames count=" << usernames.size() << "]";
      45             : 
      46             :   // Initialize a span
      47         400 :   TextMapReader reader(carrier);
      48         400 :   std::map<std::string, std::string> writer_text_map;
      49         400 :   TextMapWriter writer(writer_text_map);
      50         400 :   auto parent_span = opentracing::Tracer::Global()->Extract(reader);
      51         400 :   auto span = opentracing::Tracer::Global()->StartSpan(
      52             :       "compose_user_mentions_server",
      53         800 :       {opentracing::ChildOf(parent_span->get())});
      54         200 :   opentracing::Tracer::Global()->Inject(span->context(), writer);
      55             : 
      56         400 :   std::vector<UserMention> user_mentions;
      57         200 :   if (!usernames.empty()) {
      58           0 :     std::map<std::string, bool> usernames_not_cached;
      59             : 
      60           0 :     for (auto &username : usernames) {
      61           0 :       usernames_not_cached.emplace(std::make_pair(username, false));
      62             :     }
      63             : 
      64             :     // Find in Memcached
      65             :     memcached_return_t rc;
      66           0 :     auto client = memcached_pool_pop(_memcached_client_pool, true, &rc);
      67           0 :     if (!client) {
      68           0 :       ServiceException se;
      69           0 :       se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
      70           0 :       se.message = "Failed to pop a client from memcached pool";
      71           0 :       throw se;
      72             :     }
      73             : 
      74             :     char **keys;
      75             :     size_t *key_sizes;
      76           0 :     keys = new char *[usernames.size()];
      77           0 :     key_sizes = new size_t[usernames.size()];
      78           0 :     int idx = 0;
      79           0 :     for (auto &username : usernames) {
      80           0 :       std::string key_str = username + ":user_id";
      81           0 :       keys[idx] = new char[key_str.length() + 1];
      82           0 :       strcpy(keys[idx], key_str.c_str());
      83           0 :       key_sizes[idx] = key_str.length();
      84           0 :       idx++;
      85             :     }
      86             : 
      87           0 :     auto get_span = opentracing::Tracer::Global()->StartSpan(
      88             :         "compose_user_mentions_memcached_get_client",
      89           0 :         {opentracing::ChildOf(&span->context())});
      90           0 :     rc = memcached_mget(client, keys, key_sizes, usernames.size());
      91           0 :     if (rc != MEMCACHED_SUCCESS) {
      92           0 :       LOG(error) << "Cannot get usernames of request " << req_id << ": "
      93           0 :                  << memcached_strerror(client, rc);
      94           0 :       ServiceException se;
      95           0 :       se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
      96           0 :       se.message = memcached_strerror(client, rc);
      97           0 :       memcached_pool_push(_memcached_client_pool, client);
      98           0 :       get_span->Finish();
      99           0 :       throw se;
     100             :     }
     101             : 
     102             :     char return_key[MEMCACHED_MAX_KEY];
     103             :     size_t return_key_length;
     104             :     char *return_value;
     105             :     size_t return_value_length;
     106             :     uint32_t flags;
     107             : 
     108           0 :     while (true) {
     109             :       return_value = memcached_fetch(client, return_key, &return_key_length,
     110           0 :                                      &return_value_length, &flags, &rc);
     111           0 :       if (return_value == nullptr) {
     112           0 :         LOG(debug) << "Memcached mget finished "
     113           0 :                    << memcached_strerror(client, rc);
     114           0 :         break;
     115             :       }
     116           0 :       if (rc != MEMCACHED_SUCCESS) {
     117           0 :         free(return_value);
     118           0 :         memcached_quit(client);
     119           0 :         memcached_pool_push(_memcached_client_pool, client);
     120           0 :         LOG(error) << "Cannot get components of request " << req_id;
     121           0 :         ServiceException se;
     122           0 :         se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
     123             :         se.message =
     124           0 :             "Cannot get usernames of request " + std::to_string(req_id);
     125           0 :         get_span->Finish();
     126           0 :         throw se;
     127             :       }
     128           0 :       UserMention new_user_mention;
     129           0 :       std::string username(return_key, return_key + return_key_length);
     130             :       username =
     131           0 :           username.substr(0, username.length() - std::strlen(":user_id"));
     132           0 :       new_user_mention.username = username;
     133           0 :       new_user_mention.user_id = std::stoul(
     134           0 :           std::string(return_value, return_value + return_value_length));
     135           0 :       user_mentions.emplace_back(new_user_mention);
     136           0 :       usernames_not_cached.erase(username);
     137           0 :       free(return_value);
     138             :     }
     139           0 :     memcached_quit(client);
     140           0 :     memcached_pool_push(_memcached_client_pool, client);
     141           0 :     get_span->Finish();
     142           0 :     for (int i = 0; i < usernames.size(); ++i) {
     143           0 :       delete keys[i];
     144             :     }
     145           0 :     delete[] keys;
     146           0 :     delete[] key_sizes;
     147             : 
     148             :     // Memcached 查询后
     149           0 :     LOG(info) << "ComposeUserMentions Memcached hit [req_id=" << req_id << ", hit count=" << user_mentions.size() << "]";
     150             : 
     151             :     // Find the rest in MongoDB
     152           0 :     if (!usernames_not_cached.empty()) {
     153             :       mongoc_client_t *mongodb_client =
     154           0 :           mongoc_client_pool_pop(_mongodb_client_pool);
     155           0 :       if (!mongodb_client) {
     156           0 :         ServiceException se;
     157           0 :         se.errorCode = ErrorCode::SE_MONGODB_ERROR;
     158           0 :         se.message = "Failed to pop a client from MongoDB pool";
     159           0 :         throw se;
     160             :       }
     161             : 
     162             :       auto collection =
     163           0 :           mongoc_client_get_collection(mongodb_client, "user", "user");
     164           0 :       if (!collection) {
     165           0 :         ServiceException se;
     166           0 :         se.errorCode = ErrorCode::SE_MONGODB_ERROR;
     167           0 :         se.message = "Failed to create collection user from DB user";
     168           0 :         mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
     169           0 :         throw se;
     170             :       }
     171             : 
     172           0 :       bson_t *query = bson_new();
     173             :       bson_t query_child_0;
     174             :       bson_t query_username_list;
     175             :       const char *key;
     176           0 :       idx = 0;
     177             :       char buf[16];
     178             : 
     179           0 :       BSON_APPEND_DOCUMENT_BEGIN(query, "username", &query_child_0);
     180           0 :       BSON_APPEND_ARRAY_BEGIN(&query_child_0, "$in", &query_username_list);
     181           0 :       for (auto &item : usernames_not_cached) {
     182           0 :         bson_uint32_to_string(idx, &key, buf, sizeof buf);
     183           0 :         BSON_APPEND_UTF8(&query_username_list, key, item.first.c_str());
     184           0 :         idx++;
     185             :       }
     186           0 :       bson_append_array_end(&query_child_0, &query_username_list);
     187           0 :       bson_append_document_end(query, &query_child_0);
     188             : 
     189           0 :       auto find_span = opentracing::Tracer::Global()->StartSpan(
     190             :           "compose_user_mentions_mongo_find_client",
     191           0 :           {opentracing::ChildOf(&span->context())});
     192             :       mongoc_cursor_t *cursor =
     193           0 :           mongoc_collection_find_with_opts(collection, query, nullptr, nullptr);
     194             :       const bson_t *doc;
     195             : 
     196           0 :       while (mongoc_cursor_next(cursor, &doc)) {
     197             :         bson_iter_t iter;
     198           0 :         UserMention new_user_mention;
     199           0 :         if (bson_iter_init_find(&iter, doc, "user_id")) {
     200           0 :           new_user_mention.user_id = bson_iter_value(&iter)->value.v_int64;
     201             :         } else {
     202           0 :           ServiceException se;
     203           0 :           se.errorCode = ErrorCode::SE_MONGODB_ERROR;
     204           0 :           se.message = "Attribute of MongoDB item is not complete";
     205           0 :           bson_destroy(query);
     206           0 :           mongoc_cursor_destroy(cursor);
     207           0 :           mongoc_collection_destroy(collection);
     208           0 :           mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
     209           0 :           find_span->Finish();
     210           0 :           throw se;
     211             :         }
     212           0 :         if (bson_iter_init_find(&iter, doc, "username")) {
     213           0 :           new_user_mention.username = bson_iter_value(&iter)->value.v_utf8.str;
     214             :         } else {
     215           0 :           ServiceException se;
     216           0 :           se.errorCode = ErrorCode::SE_MONGODB_ERROR;
     217           0 :           se.message = "Attribute of MongoDB item is not complete";
     218           0 :           bson_destroy(query);
     219           0 :           mongoc_cursor_destroy(cursor);
     220           0 :           mongoc_collection_destroy(collection);
     221           0 :           mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
     222           0 :           find_span->Finish();
     223           0 :           throw se;
     224             :         }
     225           0 :         user_mentions.emplace_back(new_user_mention);
     226             :       }
     227           0 :       bson_destroy(query);
     228           0 :       mongoc_cursor_destroy(cursor);
     229           0 :       mongoc_collection_destroy(collection);
     230           0 :       mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
     231           0 :       find_span->Finish();
     232             :     }
     233             :   }
     234             : 
     235             :   // MongoDB 查询后(只要 user_mentions 有新增就会体现到 size)
     236         400 :   LOG(info) << "ComposeUserMentions MongoDB hit [req_id=" << req_id << ", total user_mentions count=" << user_mentions.size() << "]";
     237             : 
     238         200 :   _return = user_mentions;
     239         200 :   span->Finish();
     240             : 
     241             :   // 方法结束
     242         400 :   LOG(info) << "ComposeUserMentions completed [req_id=" << req_id << ", return count=" << user_mentions.size() << "]";
     243         200 : }
     244             : 
     245             : }  // namespace social_network
     246             : 
     247             : #endif  // SOCIAL_NETWORK_MICROSERVICES_SRC_USERMENTIONSERVICE_USERMENTIONHANDLER_H_

Generated by: LCOV version 1.12