MongoDB C++ Driver legacy-1.1.2
Loading...
Searching...
No Matches
message.h
1// message.h
2
3/* Copyright 2009 10gen Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#pragma once
19
20#include <vector>
21
22#include "mongo/platform/atomic_word.h"
23#include "mongo/platform/cstdint.h"
24#include "mongo/base/data_view.h"
25#include "mongo/base/encoded_value_storage.h"
26#include "mongo/util/mongoutils/str.h"
27#include "mongo/util/net/hostandport.h"
28#include "mongo/util/net/operation.h"
29#include "mongo/util/net/sock.h"
30
31namespace mongo {
32
36const size_t MaxMessageSizeBytes = 48 * 1000 * 1000;
37
38class Message;
39class MessagingPort;
40class PiggyBackData;
41
42typedef uint32_t MSGID;
43
44bool doesOpGetAResponse(int op);
45
46inline const char* opToString(int op) {
47 switch (op) {
48 case 0:
49 return "none";
50 case opReply:
51 return "reply";
52 case dbMsg:
53 return "msg";
54 case dbUpdate:
55 return "update";
56 case dbInsert:
57 return "insert";
58 case dbQuery:
59 return "query";
60 case dbGetMore:
61 return "getmore";
62 case dbDelete:
63 return "remove";
64 case dbKillCursors:
65 return "killcursors";
66 default:
67 massert(16141, str::stream() << "cannot translate opcode " << op, !op);
68 return "";
69 }
70}
71
72inline bool opIsWrite(int op) {
73 switch (op) {
74 case 0:
75 case opReply:
76 case dbMsg:
77 case dbQuery:
78 case dbGetMore:
79 case dbKillCursors:
80 return false;
81
82 case dbUpdate:
83 case dbInsert:
84 case dbDelete:
85 return true;
86
87 default:
88 verify(0);
89 return "";
90 }
91}
92
93namespace MSGHEADER {
94#pragma pack(1)
95/* see http://dochub.mongodb.org/core/mongowireprotocol
96*/
97struct Layout {
98 int32_t messageLength; // total message size, including this
99 int32_t requestID; // identifier for this message
100 int32_t responseTo; // requestID from the original request
101 // (used in responses from db)
102 int32_t opCode;
103};
104#pragma pack()
105
107public:
108 typedef ConstDataView view_type;
109
110 ConstView(const char* data) : _data(data) {}
111
112 const char* view2ptr() const {
113 return data().view();
114 }
115
116 int32_t getMessageLength() const {
117 return data().readLE<int32_t>(offsetof(Layout, messageLength));
118 }
119
120 int32_t getRequestID() const {
121 return data().readLE<int32_t>(offsetof(Layout, requestID));
122 }
123
124 int32_t getResponseTo() const {
125 return data().readLE<int32_t>(offsetof(Layout, responseTo));
126 }
127
128 int32_t getOpCode() const {
129 return data().readLE<int32_t>(offsetof(Layout, opCode));
130 }
131
132protected:
133 const view_type& data() const {
134 return _data;
135 }
136
137private:
138 view_type _data;
139};
140
141class View : public ConstView {
142public:
143 typedef DataView view_type;
144
145 View(char* data) : ConstView(data) {}
146
147 using ConstView::view2ptr;
148 char* view2ptr() {
149 return data().view();
150 }
151
152 void setMessageLength(int32_t value) {
153 data().writeLE(value, offsetof(Layout, messageLength));
154 }
155
156 void setRequestID(int32_t value) {
157 data().writeLE(value, offsetof(Layout, requestID));
158 }
159
160 void setResponseTo(int32_t value) {
161 data().writeLE(value, offsetof(Layout, responseTo));
162 }
163
164 void setOpCode(int32_t value) {
165 data().writeLE(value, offsetof(Layout, opCode));
166 }
167
168private:
169 view_type data() const {
170 return const_cast<char*>(ConstView::view2ptr());
171 }
172};
173
174class Value : public EncodedValueStorage<Layout, ConstView, View> {
175public:
176 Value() {
177 BOOST_STATIC_ASSERT(sizeof(Value) == sizeof(Layout));
178 }
179
181};
182
183} // namespace MSGHEADER
184
185namespace MsgData {
186#pragma pack(1)
187struct Layout {
188 MSGHEADER::Layout header;
189 char data[4];
190};
191#pragma pack()
192
194public:
195 ConstView(const char* storage) : _storage(storage) {}
196
197 const char* view2ptr() const {
198 return storage().view();
199 }
200
201 int32_t getLen() const {
202 return header().getMessageLength();
203 }
204
205 MSGID getId() const {
206 return header().getRequestID();
207 }
208
209 MSGID getResponseTo() const {
210 return header().getResponseTo();
211 }
212
213 int32_t getOperation() const {
214 return header().getOpCode();
215 }
216
217 const char* data() const {
218 return storage().view(offsetof(Layout, data));
219 }
220
221 bool valid() const {
222 if (getLen() <= 0 || getLen() > (4 * BSONObjMaxInternalSize))
223 return false;
224 if (getOperation() < 0 || getOperation() > 30000)
225 return false;
226 return true;
227 }
228
229 int64_t getCursor() const {
230 verify(getResponseTo() > 0);
231 verify(getOperation() == opReply);
232 return ConstDataView(data() + sizeof(int32_t)).readLE<int64_t>();
233 }
234
235 int dataLen() const; // len without header
236
237protected:
238 const ConstDataView& storage() const {
239 return _storage;
240 }
241
242 MSGHEADER::ConstView header() const {
243 return storage().view(offsetof(Layout, header));
244 }
245
246private:
247 ConstDataView _storage;
248};
249
250class View : public ConstView {
251public:
252 View(char* storage) : ConstView(storage) {}
253
254 using ConstView::view2ptr;
255 char* view2ptr() {
256 return storage().view();
257 }
258
259 void setLen(int value) {
260 return header().setMessageLength(value);
261 }
262
263 void setId(MSGID value) {
264 return header().setRequestID(value);
265 }
266
267 void setResponseTo(MSGID value) {
268 return header().setResponseTo(value);
269 }
270
271 void setOperation(int value) {
272 return header().setOpCode(value);
273 }
274
275 using ConstView::data;
276 char* data() {
277 return storage().view(offsetof(Layout, data));
278 }
279
280private:
281 DataView storage() const {
282 return const_cast<char*>(ConstView::view2ptr());
283 }
284
285 MSGHEADER::View header() const {
286 return storage().view(offsetof(Layout, header));
287 }
288};
289
290class Value : public EncodedValueStorage<Layout, ConstView, View> {
291public:
292 Value() {
293 BOOST_STATIC_ASSERT(sizeof(Value) == sizeof(Layout));
294 }
295
297};
298
299const int MsgDataHeaderSize = sizeof(Value) - 4;
300inline int ConstView::dataLen() const {
301 return getLen() - MsgDataHeaderSize;
302}
303} // namespace MsgData
304
305class Message {
306public:
307 // we assume here that a vector with initial size 0 does no allocation (0 is the default, but
308 // wanted to make it explicit).
309 Message() : _buf(0), _data(0), _freeIt(false) {}
310 Message(void* data, bool freeIt) : _buf(0), _data(0), _freeIt(false) {
311 _setData(reinterpret_cast<char*>(data), freeIt);
312 };
313 Message(Message& r) : _buf(0), _data(0), _freeIt(false) {
314 *this = r;
315 }
316 ~Message() {
317 reset();
318 }
319
320 SockAddr _from;
321
322 MsgData::View header() const {
323 verify(!empty());
324 return _buf ? _buf : _data[0].first;
325 }
326
327 int operation() const {
328 return header().getOperation();
329 }
330
331 MsgData::View singleData() const {
332 massert(13273, "single data buffer expected", _buf);
333 return header();
334 }
335
336 bool empty() const {
337 return !_buf && _data.empty();
338 }
339
340 int size() const {
341 int res = 0;
342 if (_buf) {
343 res = MsgData::ConstView(_buf).getLen();
344 } else {
345 for (MsgVec::const_iterator it = _data.begin(); it != _data.end(); ++it) {
346 res += it->second;
347 }
348 }
349 return res;
350 }
351
352 int dataSize() const {
353 return size() - sizeof(MSGHEADER::Value);
354 }
355
356 // concat multiple buffers - noop if <2 buffers already, otherwise can be expensive copy
357 // can get rid of this if we make response handling smarter
358 void concat() {
359 if (_buf || empty()) {
360 return;
361 }
362
363 verify(_freeIt);
364 int totalSize = 0;
365 for (std::vector<std::pair<char*, int> >::const_iterator i = _data.begin();
366 i != _data.end();
367 ++i) {
368 totalSize += i->second;
369 }
370 char* buf = (char*)malloc(totalSize);
371 char* p = buf;
372 for (std::vector<std::pair<char*, int> >::const_iterator i = _data.begin();
373 i != _data.end();
374 ++i) {
375 memcpy(p, i->first, i->second);
376 p += i->second;
377 }
378 reset();
379 _setData(buf, true);
380 }
381
382 // vector swap() so this is fast
383 Message& operator=(Message& r) {
384 verify(empty());
385 verify(r._freeIt);
386 _buf = r._buf;
387 r._buf = 0;
388 if (r._data.size() > 0) {
389 _data.swap(r._data);
390 }
391 r._freeIt = false;
392 _freeIt = true;
393 return *this;
394 }
395
396 void reset() {
397 if (_freeIt) {
398 if (_buf) {
399 free(_buf);
400 }
401 for (std::vector<std::pair<char*, int> >::const_iterator i = _data.begin();
402 i != _data.end();
403 ++i) {
404 free(i->first);
405 }
406 }
407 _buf = 0;
408 _data.clear();
409 _freeIt = false;
410 }
411
412 // use to add a buffer
413 // assumes message will free everything
414 void appendData(char* d, int size) {
415 if (size <= 0) {
416 return;
417 }
418 if (empty()) {
419 MsgData::View md = d;
420 md.setLen(size); // can be updated later if more buffers added
421 _setData(md.view2ptr(), true);
422 return;
423 }
424 verify(_freeIt);
425 if (_buf) {
426 _data.push_back(std::make_pair(_buf, MsgData::ConstView(_buf).getLen()));
427 _buf = 0;
428 }
429 _data.push_back(std::make_pair(d, size));
430 header().setLen(header().getLen() + size);
431 }
432
433 // use to set first buffer if empty
434 void setData(char* d, bool freeIt) {
435 verify(empty());
436 _setData(d, freeIt);
437 }
438 void setData(int operation, const char* msgtxt) {
439 setData(operation, msgtxt, strlen(msgtxt) + 1);
440 }
441 void setData(int operation, const char* msgdata, size_t len) {
442 verify(empty());
443 size_t dataLen = len + sizeof(MsgData::Value) - 4;
444 MsgData::View d = reinterpret_cast<char*>(malloc(dataLen));
445 memcpy(d.data(), msgdata, len);
446 d.setLen(dataLen);
447 d.setOperation(operation);
448 _setData(d.view2ptr(), true);
449 }
450
451 bool doIFreeIt() {
452 return _freeIt;
453 }
454
455 void send(MessagingPort& p, const char* context);
456
457 std::string toString() const;
458
459private:
460 void _setData(char* d, bool freeIt) {
461 _freeIt = freeIt;
462 _buf = d;
463 }
464 // if just one buffer, keep it in _buf, otherwise keep a sequence of buffers in _data
465 char* _buf;
466 // byte buffer(s) - the first must contain at least a full MsgData unless using _buf for storage
467 // instead
468 typedef std::vector<std::pair<char*, int> > MsgVec;
469 MsgVec _data;
470 bool _freeIt;
471};
472
473
474MSGID nextMessageId();
475
476
477} // namespace mongo
Definition data_view.h:30
Definition data_view.h:71
Definition encoded_value_storage.h:31
Definition message.h:106
Definition message.h:174
Definition message.h:141
Definition message.h:305
Definition message_port.h:74
Definition message.h:193
Definition message.h:290
Definition message.h:250
the idea here is to make one liners easy.
Definition str.h:44
Utility functions for parsing numbers from strings.
Definition compare_numbers.h:20
const size_t MaxMessageSizeBytes
Maximum accepted message size on the wire protocol.
Definition message.h:36
Definition message.h:97
Definition message.h:187
wrapped around os representation of network address
Definition sock.h:96
Definition encoded_value_storage.h:24