- 
                Notifications
    You must be signed in to change notification settings 
- Fork 790
Open
Description
I'm experiencing an unusual issue. Under high load, I'm seeing the first frame of a message that isn't the routing id sometimes vanish. I'm doing a blocking synchronous client using zmq_immediate in conjunction with zmq_sndtimeo. So, if the message can't be sent it returns false after a few tries. Something like this
bool cppzmq_client(zmq::context_t* context, bool dowhile)
{
  const int recvtimeout = 4000;
  static std::atomic<int> id = 0;
  zmq::socket_t m_socket = {*context, ZMQ_DEALER};
  m_socket.set(zmq::sockopt::sndhwm, 0);
  m_socket.set(zmq::sockopt::routing_id, std::to_string(id++));
  m_socket.set(zmq::sockopt::immediate, 1);
  m_socket.set(zmq::sockopt::rcvtimeo, recvtimeout);
  zmq::multipart_t message;
  message.pushstr({});
  message.push_back(zmq::message_t{std::string{"stuff"}});
  message.push_back(zmq::message_t{std::string{"more stuff"}});
  message.push_back(zmq::message_t{std::string{"even more stuff"}});
  bool succeeded = false;
  const int sendtimeout = 1000;
  m_socket.set(zmq::sockopt::sndtimeo, sendtimeout);
  m_socket.connect("ipc://@/zmq-client");
  int retry_count = 0;
  do {
    try {
       succeeded = message.send(m_socket);
      } catch (const std::exception& e) {
        if (zmq_errno() != EINTR && zmq_errno() != EAGAIN) {
          throw;
        }
      }
      retry_count++;
      std::cout << "Retry count: " << retry_count << std::endl;
    } while (!succeeded && retry_count < 20);
    
  if(!succeeded) {
    std::cerr << "Failed to send message to broker: " << zmq_strerror(zmq_errno()) << std::endl;
    return false;
  }
  zmq::multipart_t result;
  bool read_succeeded = result.recv(m_socket);
  return read_succeeded && result.peekstr(0) != "MISSING_FRAME";
}
In the send() function, its popping off the first part of the message, but it never restores the first message to the mulitpart_t if the read fails, which leads to retries failing
 bool send(socket_ref socket, int flags = 0)
    {
        flags &= ~(ZMQ_SNDMORE);
        bool more = size() > 0;
        while (more) {
            message_t message = pop();
            more = size() > 0;
#ifdef ZMQ_CPP11
            if (!socket.send(message, static_cast<send_flags>(
                                        (more ? ZMQ_SNDMORE : 0) | flags)))
                return false;
#else
            if (!socket.send(message, (more ? ZMQ_SNDMORE : 0) | flags))
                return false;
#endif
        }
        clear();
        return true;
    }
Would it be possible to restore the popped message so a retry can occur?
Metadata
Metadata
Assignees
Labels
No labels