Sunday, November 11, 2012

[ZeroMQ] the study note of the ØMQ guilde -- Part II

Shared Queue ( DEALER and ROUTER sockets)
The only constraint is that services must be stateless, all state being in the request or in some shared storage such as a database.
It then uses zmq-poll[3] to monitor these two sockets for activity and when it has some, it shuttles messages between its two sockets. It doesn't actually manage any queues explicitly — ØMQ does that automatically on each socket.

Built-in Proxy Function
Please see the examle of msgqueue.c that replaces rrbroker.c with built-in proxy function.

Transport Bridging

Handling Errors
In most of the C examples we've seen so far there's been no error handling. Real code should do error handling on every single ØMQ call.
void *context = zmq-ctx-new ();
assert (context);
void *socket = zmq-socket (context, ZMQ-REP);
assert (socket);
int rc = zmq-bind (socket, "tcp://*:5555");
if (rc != 0) {
printf ("E: bind failed: %s\n", strerror (errno));
return -1;

We'll use a publish-subscribe model to send kill messages to the workers:
  • The sink creates a PUB socket on a new endpoint.
  • Workers bind their input socket to this endpoint.
  • When the sink detects the end of the batch it sends a kill to its PUB socket.
  • When a worker detects this kill message, it exits.
It doesn't take much new code in the sink:
void *control = zmq-socket (context, ZMQ-PUB);
zmq-bind (control, "tcp://*:5559");

//Send kill signal to workers
zmq-msg-init-data (&message, "KILL", 5);
zmq-msg-send (control, &message, 0);
zmq-msg-close (&message);
Handling Interrupt Signals
s-catch-signals ();
client = zmq-socket (...);
while (!s-interrupted) {
    char *message = s-recv (client);
    if (!message)
        break;          //  Ctrl-C used
zmq-close (client);

Detecting Memory Leaks
valgrind: sudo apt-get install valgrind

Multithreading with ØMQ
If you've spent years learning tricks to make your MT code work at all, let alone rapidly, with locks and semaphores and critical sections, you will be disgusted when you realize it was all for nothing. If there's one lesson we've learned from 30+ years of concurrent programming it is: just don't share state.
You should follow some rules to write happy multithreaded code with ØMQ:
  • You must not access the same data from multiple threads. Using classic MT techniques like mutexes are an anti-pattern in ØMQ applications. The only exception to this is a ØMQ context object, which is threadsafe.
  • You must create a ØMQ context for your process, and pass that to all threads that you want to connect via inproc sockets.
  • You may treat threads as separate tasks, with their own context, but these threads cannot communicate over inproc. However they will be easier to break into standalone processes afterwards.
  • You must not share ØMQ sockets between threads. ØMQ sockets are not threadsafe. Technically it's possible to do this, but it demands semaphores, locks, or mutexes. This will make your application slow and fragile. The only place where it's remotely sane to share sockets between threads are in language bindings that need to do magic like garbage collection on sockets.

Singaling between Threads ( PAIR sockets )
the only mechanism that you should use are ØMQ messages.
This is a classic pattern for multithreading with ØMQ ( The Relay Race ) :
  1. Two threads communicate over inproc, using a shared context.
  2. The parent thread creates one socket, binds it to an inproc:// endpoint, and then starts the child thread, passing the context to it.
  3. The child thread creates the second socket, connects it to that inproc:// endpoint, and then signals to the parent thread that it's ready.
mtrelay.c : multithreaded relay in C
For these reasons, PAIR makes the best choice for coordination between pairs of threads.

Node Coordination
syncpub.c: Synchronized publisher in C
  • The publisher knows in advance how many subscribers it expects. This is just a magic number it gets from somewhere.
  • The publisher starts up and waits for all subscribers to connect. This is the node coordination part. Each subscriber subscribes and then tells the publisher it's ready via another socket.
  • When the publisher has all subscribers connected, it starts to publish data.

Zero Copy
ØMQ's message API lets you can send and receive messages directly from and to application buffers without copying data. We call "zero-copy", and it can improve performance in some applications.
There is no way to do zero-copy on receive: ØMQ delivers you a buffer that you can store as long as you wish but it will not write data directly into application buffers.

Pub-Sub Message Envelops
Pub-Sub Envelope with Separate Key
psenvpub.c: Pub-Sub envelop publisher in C
s_sendmore (publisher, "B");
s_send (publisher, "We would like to see this");
psenvsub.c: Pub-Sub envelope subscriber in C
//Read envelope with address
char *address = s_recv (subscriber);
//Read message contents
char *contents = s_recv (subscriber);
printf ("[%s] %s\n", address, contents);

High Water Marks
When you can send messages rapidly from process to process, you soon discover that memory is a precious resource, and one that can be trivially filled up.
The answer for messaging is to set limits on the size of buffers, and then when we reach those limits, take some sensible action. In most cases (not for a subway system, though), the answer is to throw away messages. In a few others, it's to wait.
In ØMQ/2.x the HWM was infinite by default. In ØMQ/3.x it's set to 1,000 by default, which is more sensible.
Some sockets (PUB, PUSH) only have transmit buffers. Some (SUB, PULL, REQ, REP) only have receive buffers. Some (DEALER, ROUTER, PAIR) have both transmit and receive buffers.
Post a Comment