zhujiangtaotaise 2020-02-09
void *zmq_init (int io_threads_) { if (io_threads_ >= 0) { void *ctx = zmq_ctx_new (); zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_); return ctx; } errno = EINVAL; return NULL; }
// New context API void *zmq_ctx_new (void) { // We do this before the ctx constructor since its embedded mailbox_t // object needs the network to be up and running (at least on Windows). if (!zmq::initialize_network ()) { return NULL; } // Create 0MQ context. zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t; /* 这里使用的是不抛出异常的new,因为zmq作为库来说,要与不支持异常的调用者兼容,如C。*/ if (ctx) { if (!ctx->valid ()) { delete ctx; return NULL; } } return ctx; }
zmq::ctx_t::ctx_t () : _tag (ZMQ_CTX_TAG_VALUE_GOOD), _starting (true), _terminating (false), _reaper (NULL), _max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)), _max_msgsz (INT_MAX), _io_thread_count (ZMQ_IO_THREADS_DFLT), _blocky (true), _ipv6 (false), _zero_copy (true) { #ifdef HAVE_FORK _pid = getpid (); #endif #ifdef ZMQ_HAVE_VMCI _vmci_fd = -1; _vmci_family = -1; #endif // Initialise crypto library, if needed. zmq::random_open (); }
zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) : object_t (ctx_, tid_), _mailbox_handle (static_cast<poller_t::handle_t> (NULL)), _poller (NULL), _sockets (0), _terminating (false) { if (!_mailbox.valid ()) return; /* Linux下,poller_t实际上即为epoll_t*/ _poller = new (std::nothrow) poller_t (*ctx_); alloc_assert (_poller); //这里的mailbox.get_fd()得到该reaper_t拥有的mailbox的读描述符 if (_mailbox.get_fd () != retired_fd) { _mailbox_handle = _poller->add_fd (_mailbox.get_fd (), this); _poller->set_pollin (_mailbox_handle); } #ifdef HAVE_FORK _pid = getpid (); #endif }