Line data Source code
1 : #ifndef SOCIAL_NETWORK_MICROSERVICES_SRC_USERMENTIONSERVICE_USERMENTIONHANDLER_H_
2 : #define SOCIAL_NETWORK_MICROSERVICES_SRC_USERMENTIONSERVICE_USERMENTIONHANDLER_H_
3 :
4 : #include <bson.h>
5 : #include <libmemcached/memcached.h>
6 : #include <libmemcached/util.h>
7 : #include <mongoc.h>
8 :
9 : #include "../../gen-cpp/UserMentionService.h"
10 : #include "../../gen-cpp/social_network_types.h"
11 : #include "../ClientPool.h"
12 : #include "../logger.h"
13 : #include "../tracing.h"
14 : #include "../utils.h"
15 :
16 : namespace social_network {
17 :
18 : class UserMentionHandler : public UserMentionServiceIf {
19 : public:
20 : UserMentionHandler(memcached_pool_st *, mongoc_client_pool_t *);
21 0 : ~UserMentionHandler() override = default;
22 :
23 : void ComposeUserMentions(std::vector<UserMention> &_return, int64_t,
24 : const std::vector<std::string> &,
25 : const std::map<std::string, std::string> &) override;
26 :
27 : private:
28 : memcached_pool_st *_memcached_client_pool;
29 : mongoc_client_pool_t *_mongodb_client_pool;
30 : };
31 :
32 1 : UserMentionHandler::UserMentionHandler(
33 : memcached_pool_st *memcached_client_pool,
34 1 : mongoc_client_pool_t *mongodb_client_pool) {
35 1 : _memcached_client_pool = memcached_client_pool;
36 1 : _mongodb_client_pool = mongodb_client_pool;
37 1 : }
38 :
39 0 : void UserMentionHandler::ComposeUserMentions(
40 : std::vector<UserMention> &_return, int64_t req_id,
41 : const std::vector<std::string> &usernames,
42 : const std::map<std::string, std::string> &carrier) {
43 : // 记录收到请求
44 0 : LOG(info) << "Received ComposeUserMentions request [req_id=" << req_id << ", usernames count=" << usernames.size() << "]";
45 :
46 : // Initialize a span
47 0 : TextMapReader reader(carrier);
48 0 : std::map<std::string, std::string> writer_text_map;
49 0 : TextMapWriter writer(writer_text_map);
50 0 : auto parent_span = opentracing::Tracer::Global()->Extract(reader);
51 0 : auto span = opentracing::Tracer::Global()->StartSpan(
52 : "compose_user_mentions_server",
53 0 : {opentracing::ChildOf(parent_span->get())});
54 0 : opentracing::Tracer::Global()->Inject(span->context(), writer);
55 :
56 0 : std::vector<UserMention> user_mentions;
57 0 : if (!usernames.empty()) {
58 0 : std::map<std::string, bool> usernames_not_cached;
59 :
60 0 : for (auto &username : usernames) {
61 0 : usernames_not_cached.emplace(std::make_pair(username, false));
62 : }
63 :
64 : // Find in Memcached
65 : memcached_return_t rc;
66 0 : auto client = memcached_pool_pop(_memcached_client_pool, true, &rc);
67 0 : if (!client) {
68 0 : ServiceException se;
69 0 : se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
70 0 : se.message = "Failed to pop a client from memcached pool";
71 0 : throw se;
72 : }
73 :
74 : char **keys;
75 : size_t *key_sizes;
76 0 : keys = new char *[usernames.size()];
77 0 : key_sizes = new size_t[usernames.size()];
78 0 : int idx = 0;
79 0 : for (auto &username : usernames) {
80 0 : std::string key_str = username + ":user_id";
81 0 : keys[idx] = new char[key_str.length() + 1];
82 0 : strcpy(keys[idx], key_str.c_str());
83 0 : key_sizes[idx] = key_str.length();
84 0 : idx++;
85 : }
86 :
87 0 : auto get_span = opentracing::Tracer::Global()->StartSpan(
88 : "compose_user_mentions_memcached_get_client",
89 0 : {opentracing::ChildOf(&span->context())});
90 0 : rc = memcached_mget(client, keys, key_sizes, usernames.size());
91 0 : if (rc != MEMCACHED_SUCCESS) {
92 0 : LOG(error) << "Cannot get usernames of request " << req_id << ": "
93 0 : << memcached_strerror(client, rc);
94 0 : ServiceException se;
95 0 : se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
96 0 : se.message = memcached_strerror(client, rc);
97 0 : memcached_pool_push(_memcached_client_pool, client);
98 0 : get_span->Finish();
99 0 : throw se;
100 : }
101 :
102 : char return_key[MEMCACHED_MAX_KEY];
103 : size_t return_key_length;
104 : char *return_value;
105 : size_t return_value_length;
106 : uint32_t flags;
107 :
108 0 : while (true) {
109 : return_value = memcached_fetch(client, return_key, &return_key_length,
110 0 : &return_value_length, &flags, &rc);
111 0 : if (return_value == nullptr) {
112 0 : LOG(debug) << "Memcached mget finished "
113 0 : << memcached_strerror(client, rc);
114 0 : break;
115 : }
116 0 : if (rc != MEMCACHED_SUCCESS) {
117 0 : free(return_value);
118 0 : memcached_quit(client);
119 0 : memcached_pool_push(_memcached_client_pool, client);
120 0 : LOG(error) << "Cannot get components of request " << req_id;
121 0 : ServiceException se;
122 0 : se.errorCode = ErrorCode::SE_MEMCACHED_ERROR;
123 : se.message =
124 0 : "Cannot get usernames of request " + std::to_string(req_id);
125 0 : get_span->Finish();
126 0 : throw se;
127 : }
128 0 : UserMention new_user_mention;
129 0 : std::string username(return_key, return_key + return_key_length);
130 : username =
131 0 : username.substr(0, username.length() - std::strlen(":user_id"));
132 0 : new_user_mention.username = username;
133 0 : new_user_mention.user_id = std::stoul(
134 0 : std::string(return_value, return_value + return_value_length));
135 0 : user_mentions.emplace_back(new_user_mention);
136 0 : usernames_not_cached.erase(username);
137 0 : free(return_value);
138 : }
139 0 : memcached_quit(client);
140 0 : memcached_pool_push(_memcached_client_pool, client);
141 0 : get_span->Finish();
142 0 : for (int i = 0; i < usernames.size(); ++i) {
143 0 : delete keys[i];
144 : }
145 0 : delete[] keys;
146 0 : delete[] key_sizes;
147 :
148 : // Memcached 查询后
149 0 : LOG(info) << "ComposeUserMentions Memcached hit [req_id=" << req_id << ", hit count=" << user_mentions.size() << "]";
150 :
151 : // Find the rest in MongoDB
152 0 : if (!usernames_not_cached.empty()) {
153 : mongoc_client_t *mongodb_client =
154 0 : mongoc_client_pool_pop(_mongodb_client_pool);
155 0 : if (!mongodb_client) {
156 0 : ServiceException se;
157 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
158 0 : se.message = "Failed to pop a client from MongoDB pool";
159 0 : throw se;
160 : }
161 :
162 : auto collection =
163 0 : mongoc_client_get_collection(mongodb_client, "user", "user");
164 0 : if (!collection) {
165 0 : ServiceException se;
166 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
167 0 : se.message = "Failed to create collection user from DB user";
168 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
169 0 : throw se;
170 : }
171 :
172 0 : bson_t *query = bson_new();
173 : bson_t query_child_0;
174 : bson_t query_username_list;
175 : const char *key;
176 0 : idx = 0;
177 : char buf[16];
178 :
179 0 : BSON_APPEND_DOCUMENT_BEGIN(query, "username", &query_child_0);
180 0 : BSON_APPEND_ARRAY_BEGIN(&query_child_0, "$in", &query_username_list);
181 0 : for (auto &item : usernames_not_cached) {
182 0 : bson_uint32_to_string(idx, &key, buf, sizeof buf);
183 0 : BSON_APPEND_UTF8(&query_username_list, key, item.first.c_str());
184 0 : idx++;
185 : }
186 0 : bson_append_array_end(&query_child_0, &query_username_list);
187 0 : bson_append_document_end(query, &query_child_0);
188 :
189 0 : auto find_span = opentracing::Tracer::Global()->StartSpan(
190 : "compose_user_mentions_mongo_find_client",
191 0 : {opentracing::ChildOf(&span->context())});
192 : mongoc_cursor_t *cursor =
193 0 : mongoc_collection_find_with_opts(collection, query, nullptr, nullptr);
194 : const bson_t *doc;
195 :
196 0 : while (mongoc_cursor_next(cursor, &doc)) {
197 : bson_iter_t iter;
198 0 : UserMention new_user_mention;
199 0 : if (bson_iter_init_find(&iter, doc, "user_id")) {
200 0 : new_user_mention.user_id = bson_iter_value(&iter)->value.v_int64;
201 : } else {
202 0 : ServiceException se;
203 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
204 0 : se.message = "Attribute of MongoDB item is not complete";
205 0 : bson_destroy(query);
206 0 : mongoc_cursor_destroy(cursor);
207 0 : mongoc_collection_destroy(collection);
208 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
209 0 : find_span->Finish();
210 0 : throw se;
211 : }
212 0 : if (bson_iter_init_find(&iter, doc, "username")) {
213 0 : new_user_mention.username = bson_iter_value(&iter)->value.v_utf8.str;
214 : } else {
215 0 : ServiceException se;
216 0 : se.errorCode = ErrorCode::SE_MONGODB_ERROR;
217 0 : se.message = "Attribute of MongoDB item is not complete";
218 0 : bson_destroy(query);
219 0 : mongoc_cursor_destroy(cursor);
220 0 : mongoc_collection_destroy(collection);
221 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
222 0 : find_span->Finish();
223 0 : throw se;
224 : }
225 0 : user_mentions.emplace_back(new_user_mention);
226 : }
227 0 : bson_destroy(query);
228 0 : mongoc_cursor_destroy(cursor);
229 0 : mongoc_collection_destroy(collection);
230 0 : mongoc_client_pool_push(_mongodb_client_pool, mongodb_client);
231 0 : find_span->Finish();
232 : }
233 : }
234 :
235 : // MongoDB 查询后(只要 user_mentions 有新增就会体现到 size)
236 0 : LOG(info) << "ComposeUserMentions MongoDB hit [req_id=" << req_id << ", total user_mentions count=" << user_mentions.size() << "]";
237 :
238 0 : _return = user_mentions;
239 0 : span->Finish();
240 :
241 : // 方法结束
242 0 : LOG(info) << "ComposeUserMentions completed [req_id=" << req_id << ", return count=" << user_mentions.size() << "]";
243 0 : }
244 :
245 : } // namespace social_network
246 :
247 : #endif // SOCIAL_NETWORK_MICROSERVICES_SRC_USERMENTIONSERVICE_USERMENTIONHANDLER_H_
|