00001 #ifndef RDMA_WRAP_H
00002 #define RDMA_WRAP_H
00003
00004 #include "rdma_factories.h"
00005
00006 #include <rdma/rdma_cma.h>
00007
00008 #include "qpid/RefCounted.h"
00009 #include "qpid/sys/IOHandle.h"
00010 #include "qpid/sys/posix/PrivatePosix.h"
00011
00012 #include <fcntl.h>
00013
00014 #include <vector>
00015 #include <algorithm>
00016 #include <iostream>
00017 #include <stdexcept>
00018 #include <boost/shared_ptr.hpp>
00019 #include <boost/intrusive_ptr.hpp>
00020
00021 namespace Rdma {
00022 const int DEFAULT_TIMEOUT = 2000;
00023 const int DEFAULT_BACKLOG = 100;
00024 const int DEFAULT_CQ_ENTRIES = 256;
00025 const int DEFAULT_WR_ENTRIES = 64;
00026 const ::rdma_conn_param DEFAULT_CONNECT_PARAM = {
00027 0,
00028 0,
00029 4,
00030 4,
00031 0,
00032 5,
00033 7
00034 };
00035
00036 struct Buffer {
00037 friend class QueuePair;
00038
00039 char* const bytes;
00040 const int32_t byteCount;
00041 int32_t dataStart;
00042 int32_t dataCount;
00043
00044 Buffer(::ibv_pd* pd, char* const b, const int32_t s) :
00045 bytes(b),
00046 byteCount(s),
00047 dataStart(0),
00048 dataCount(0),
00049 mr(CHECK_NULL(::ibv_reg_mr(
00050 pd, bytes, byteCount,
00051 ::IBV_ACCESS_LOCAL_WRITE)))
00052 {}
00053
00054 ~Buffer() {
00055 (void) ::ibv_dereg_mr(mr);
00056 delete [] bytes;
00057 }
00058
00059 private:
00060 ::ibv_mr* mr;
00061 };
00062
00063 class Connection;
00064
00065 enum QueueDirection {
00066 NONE,
00067 SEND,
00068 RECV
00069 };
00070
00071 class QueuePairEvent {
00072 boost::shared_ptr< ::ibv_cq > cq;
00073 ::ibv_wc wc;
00074 QueueDirection dir;
00075
00076 friend class QueuePair;
00077
00078 QueuePairEvent() :
00079 dir(NONE)
00080 {}
00081
00082 QueuePairEvent(
00083 const ::ibv_wc& w,
00084 boost::shared_ptr< ::ibv_cq > c,
00085 QueueDirection d) :
00086 cq(c),
00087 wc(w),
00088 dir(d)
00089 {
00090 assert(dir != NONE);
00091 }
00092
00093 public:
00094 operator bool() const {
00095 return dir != NONE;
00096 }
00097
00098 QueueDirection getDirection() const {
00099 return dir;
00100 }
00101
00102 ::ibv_wc_opcode getEventType() const {
00103 return wc.opcode;
00104 }
00105
00106 ::ibv_wc_status getEventStatus() const {
00107 return wc.status;
00108 }
00109
00110 Buffer* getBuffer() const {
00111 Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id);
00112 b->dataCount = wc.byte_len;
00113 return b;
00114 }
00115 };
00116
00117
00118
00119
00120
00121
00122
00123 class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted {
00124 boost::shared_ptr< ::ibv_pd > pd;
00125 boost::shared_ptr< ::ibv_comp_channel > cchannel;
00126 boost::shared_ptr< ::ibv_cq > scq;
00127 boost::shared_ptr< ::ibv_cq > rcq;
00128 boost::shared_ptr< ::rdma_cm_id > id;
00129 int outstandingSendEvents;
00130 int outstandingRecvEvents;
00131
00132 friend class Connection;
00133
00134 QueuePair(boost::shared_ptr< ::rdma_cm_id > id);
00135 ~QueuePair();
00136
00137 public:
00138 typedef boost::intrusive_ptr<QueuePair> intrusive_ptr;
00139
00140
00141 Buffer* createBuffer(int s) {
00142 return new Buffer(pd.get(), new char[s], s);
00143 }
00144
00145
00146
00147 void nonblocking() {
00148 ::fcntl(cchannel->fd, F_SETFL, O_NONBLOCK);
00149 }
00150
00151
00152
00153 QueuePair::intrusive_ptr getNextChannelEvent() {
00154
00155 ::ibv_cq* cq;
00156 void* ctx;
00157 int rc = ::ibv_get_cq_event(cchannel.get(), &cq, &ctx);
00158 if (rc == -1 && errno == EAGAIN)
00159 return 0;
00160 CHECK(rc);
00161
00162
00163 if (cq == scq.get()) {
00164 if (++outstandingSendEvents > DEFAULT_CQ_ENTRIES / 2) {
00165 ::ibv_ack_cq_events(cq, outstandingSendEvents);
00166 outstandingSendEvents = 0;
00167 }
00168 } else if (cq == rcq.get()) {
00169 if (++outstandingRecvEvents > DEFAULT_CQ_ENTRIES / 2) {
00170 ::ibv_ack_cq_events(cq, outstandingRecvEvents);
00171 outstandingRecvEvents = 0;
00172 }
00173 }
00174
00175 return static_cast<QueuePair*>(ctx);
00176 }
00177
00178 QueuePairEvent getNextEvent() {
00179 ::ibv_wc w;
00180 if (::ibv_poll_cq(scq.get(), 1, &w) == 1)
00181 return QueuePairEvent(w, scq, SEND);
00182 else if (::ibv_poll_cq(rcq.get(), 1, &w) == 1)
00183 return QueuePairEvent(w, rcq, RECV);
00184 else
00185 return QueuePairEvent();
00186 }
00187
00188 void postRecv(Buffer* buf);
00189 void postSend(Buffer* buf);
00190 void notifyRecv();
00191 void notifySend();
00192 };
00193
00194 class ConnectionEvent {
00195 friend class Connection;
00196
00197
00198
00199 boost::intrusive_ptr<Connection> id;
00200 boost::intrusive_ptr<Connection> listen_id;
00201 boost::shared_ptr< ::rdma_cm_event > event;
00202
00203 ConnectionEvent() {}
00204 ConnectionEvent(::rdma_cm_event* e);
00205
00206
00207 public:
00208 operator bool() const {
00209 return event;
00210 }
00211
00212 ::rdma_cm_event_type getEventType() const {
00213 return event->event;
00214 }
00215
00216 ::rdma_conn_param getConnectionParam() const {
00217 if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST) {
00218 return event->param.conn;
00219 } else {
00220 ::rdma_conn_param p = {};
00221 return p;
00222 }
00223 }
00224
00225 boost::intrusive_ptr<Connection> getConnection () const {
00226 return id;
00227 }
00228
00229 boost::intrusive_ptr<Connection> getListenId() const {
00230 return listen_id;
00231 }
00232 };
00233
00234
00235
00236
00237
00238
00239
00240 class Connection : public qpid::sys::IOHandle, public qpid::RefCounted {
00241 boost::shared_ptr< ::rdma_event_channel > channel;
00242 boost::shared_ptr< ::rdma_cm_id > id;
00243 QueuePair::intrusive_ptr qp;
00244
00245 void* context;
00246
00247 friend class ConnectionEvent;
00248 friend class QueuePair;
00249
00250
00251
00252 Connection(::rdma_cm_id* i) :
00253 qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
00254 id(i, destroyId),
00255 context(0)
00256 {
00257 impl->fd = id->channel->fd;
00258
00259
00260
00261 if (i)
00262 i->context = this;
00263 }
00264
00265 Connection() :
00266 qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
00267 channel(mkEChannel()),
00268 id(mkId(channel.get(), this, RDMA_PS_TCP)),
00269 context(0)
00270 {
00271 impl->fd = channel->fd;
00272 }
00273
00274
00275
00276 void ensureQueuePair() {
00277 assert(id.get());
00278
00279
00280 if (qp)
00281 return;
00282
00283 qp = new QueuePair(id);
00284 }
00285
00286 public:
00287 typedef boost::intrusive_ptr<Connection> intrusive_ptr;
00288
00289 static intrusive_ptr make() {
00290 return new Connection();
00291 }
00292
00293 static intrusive_ptr find(::rdma_cm_id* i) {
00294 if (!i)
00295 return 0;
00296 Connection* id = static_cast< Connection* >(i->context);
00297 if (!id)
00298 throw std::logic_error("Couldn't find existing Connection");
00299 return id;
00300 }
00301
00302 template <typename T>
00303 void addContext(T* c) {
00304
00305 if (!context)
00306 context = c;
00307 }
00308
00309 template <typename T>
00310 T* getContext() {
00311 return static_cast<T*>(context);
00312 }
00313
00314
00315
00316 void nonblocking() {
00317 assert(id.get());
00318 ::fcntl(id->channel->fd, F_SETFL, O_NONBLOCK);
00319 }
00320
00321
00322
00323 ConnectionEvent getNextEvent() {
00324 assert(id.get());
00325 ::rdma_cm_event* e;
00326 int rc = ::rdma_get_cm_event(id->channel, &e);
00327 if (rc == -1 && errno == EAGAIN)
00328 return ConnectionEvent();
00329 CHECK(rc);
00330 return ConnectionEvent(e);
00331 }
00332
00333 void bind(sockaddr& src_addr) const {
00334 assert(id.get());
00335 CHECK(::rdma_bind_addr(id.get(), &src_addr));
00336 }
00337
00338 void listen(int backlog = DEFAULT_BACKLOG) const {
00339 assert(id.get());
00340 CHECK(::rdma_listen(id.get(), backlog));
00341 }
00342
00343 void resolve_addr(
00344 sockaddr& dst_addr,
00345 sockaddr* src_addr = 0,
00346 int timeout_ms = DEFAULT_TIMEOUT) const
00347 {
00348 assert(id.get());
00349 CHECK(::rdma_resolve_addr(id.get(), src_addr, &dst_addr, timeout_ms));
00350 }
00351
00352 void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const {
00353 assert(id.get());
00354 CHECK(::rdma_resolve_route(id.get(), timeout_ms));
00355 }
00356
00357 void disconnect() const {
00358 assert(id.get());
00359 CHECK(::rdma_disconnect(id.get()));
00360 }
00361
00362
00363 void connect() {
00364 assert(id.get());
00365
00366
00367 ensureQueuePair();
00368
00369 ::rdma_conn_param p = DEFAULT_CONNECT_PARAM;
00370 CHECK(::rdma_connect(id.get(), &p));
00371 }
00372
00373 template <typename T>
00374 void connect(const T* data) {
00375 assert(id.get());
00376
00377 ensureQueuePair();
00378
00379 ::rdma_conn_param p = DEFAULT_CONNECT_PARAM;
00380 p.private_data = data;
00381 p.private_data_len = sizeof(T);
00382 CHECK(::rdma_connect(id.get(), &p));
00383 }
00384
00385
00386
00387 template <typename T>
00388 void accept(const ::rdma_conn_param& param, const T* data) {
00389 assert(id.get());
00390
00391 ensureQueuePair();
00392
00393 ::rdma_conn_param p = param;
00394 p.private_data = data;
00395 p.private_data_len = sizeof(T);
00396 CHECK(::rdma_accept(id.get(), &p));
00397 }
00398
00399 void accept(const ::rdma_conn_param& param) {
00400 assert(id.get());
00401
00402 ensureQueuePair();
00403
00404 ::rdma_conn_param p = param;
00405 p.private_data = 0;
00406 p.private_data_len = 0;
00407 CHECK(::rdma_accept(id.get(), &p));
00408 }
00409
00410 template <typename T>
00411 void reject(const T* data) const {
00412 assert(id.get());
00413 CHECK(::rdma_reject(id.get(), data, sizeof(T)));
00414 }
00415
00416 void reject() const {
00417 assert(id.get());
00418 CHECK(::rdma_reject(id.get(), 0, 0));
00419 }
00420
00421 QueuePair::intrusive_ptr getQueuePair() {
00422 assert(id.get());
00423
00424 ensureQueuePair();
00425
00426 return qp;
00427 }
00428 };
00429
00430 inline QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) :
00431 qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
00432 pd(allocPd(i->verbs)),
00433 cchannel(mkCChannel(i->verbs)),
00434 scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
00435 rcq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
00436 id(i),
00437 outstandingSendEvents(0),
00438 outstandingRecvEvents(0)
00439 {
00440 impl->fd = cchannel->fd;
00441
00442
00443
00444 scq->cq_context = this;
00445 rcq->cq_context = this;
00446
00447 ::ibv_qp_init_attr qp_attr = {};
00448
00449
00450 qp_attr.cap.max_send_wr = DEFAULT_WR_ENTRIES;
00451 qp_attr.cap.max_send_sge = 4;
00452 qp_attr.cap.max_recv_wr = DEFAULT_WR_ENTRIES;
00453 qp_attr.cap.max_recv_sge = 4;
00454
00455 qp_attr.send_cq = scq.get();
00456 qp_attr.recv_cq = rcq.get();
00457 qp_attr.qp_type = IBV_QPT_RC;
00458
00459 CHECK(::rdma_create_qp(id.get(), pd.get(), &qp_attr));
00460
00461
00462 id->qp->qp_context = this;
00463 }
00464
00465 inline QueuePair::~QueuePair() {
00466 if (outstandingSendEvents > 0)
00467 ::ibv_ack_cq_events(scq.get(), outstandingSendEvents);
00468 if (outstandingRecvEvents > 0)
00469 ::ibv_ack_cq_events(rcq.get(), outstandingRecvEvents);
00470
00471 ::rdma_destroy_qp(id.get());
00472 }
00473
00474 inline void QueuePair::notifyRecv() {
00475 CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0));
00476 }
00477
00478 inline void QueuePair::notifySend() {
00479 CHECK_IBV(ibv_req_notify_cq(scq.get(), 0));
00480 }
00481
00482 inline void QueuePair::postRecv(Buffer* buf) {
00483 ::ibv_recv_wr rwr = {};
00484 ::ibv_sge sge;
00485
00486 sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
00487 sge.length = buf->dataCount;
00488 sge.lkey = buf->mr->lkey;
00489
00490 rwr.wr_id = reinterpret_cast<uint64_t>(buf);
00491 rwr.sg_list = &sge;
00492 rwr.num_sge = 1;
00493
00494 ::ibv_recv_wr* badrwr = 0;
00495 CHECK_IBV(::ibv_post_recv(id->qp, &rwr, &badrwr));
00496 if (badrwr)
00497 throw std::logic_error("ibv_post_recv(): Bad rwr");
00498 }
00499
00500 inline void QueuePair::postSend(Buffer* buf) {
00501 ::ibv_send_wr swr = {};
00502 ::ibv_sge sge;
00503
00504 sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
00505 sge.length = buf->dataCount;
00506 sge.lkey = buf->mr->lkey;
00507
00508 swr.wr_id = reinterpret_cast<uint64_t>(buf);
00509 swr.opcode = IBV_WR_SEND;
00510 swr.send_flags = IBV_SEND_SIGNALED;
00511 swr.sg_list = &sge;
00512 swr.num_sge = 1;
00513
00514 ::ibv_send_wr* badswr = 0;
00515 CHECK_IBV(::ibv_post_send(id->qp, &swr, &badswr));
00516 if (badswr)
00517 throw std::logic_error("ibv_post_send(): Bad swr");
00518 }
00519
00520 inline ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) :
00521 id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ?
00522 Connection::find(e->id) : new Connection(e->id)),
00523 listen_id(Connection::find(e->listen_id)),
00524 event(e, acker)
00525 {}
00526 }
00527
00528 inline std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) {
00529 # define CHECK_TYPE(t) case t: o << #t; break;
00530 switch(t) {
00531 CHECK_TYPE(RDMA_CM_EVENT_ADDR_RESOLVED)
00532 CHECK_TYPE(RDMA_CM_EVENT_ADDR_ERROR)
00533 CHECK_TYPE(RDMA_CM_EVENT_ROUTE_RESOLVED)
00534 CHECK_TYPE(RDMA_CM_EVENT_ROUTE_ERROR)
00535 CHECK_TYPE(RDMA_CM_EVENT_CONNECT_REQUEST)
00536 CHECK_TYPE(RDMA_CM_EVENT_CONNECT_RESPONSE)
00537 CHECK_TYPE(RDMA_CM_EVENT_CONNECT_ERROR)
00538 CHECK_TYPE(RDMA_CM_EVENT_UNREACHABLE)
00539 CHECK_TYPE(RDMA_CM_EVENT_REJECTED)
00540 CHECK_TYPE(RDMA_CM_EVENT_ESTABLISHED)
00541 CHECK_TYPE(RDMA_CM_EVENT_DISCONNECTED)
00542 CHECK_TYPE(RDMA_CM_EVENT_DEVICE_REMOVAL)
00543 CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_JOIN)
00544 CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_ERROR)
00545 }
00546 # undef CHECK_TYPE
00547 return o;
00548 }
00549
00550 #endif // RDMA_WRAP_H