Shared Queue ( DEALER and ROUTER sockets)
The
only constraint is that services must be stateless, all state being in
the request or in some shared storage such as a database.
It then uses zmq-poll[3]
to monitor these two sockets for activity and when it has some, it
shuttles messages between its two sockets. It doesn't actually manage
any queues explicitly — ØMQ does that automatically on each socket.
Built-in Proxy Function
Please see the examle of msgqueue.c that replaces rrbroker.c with built-in proxy function.
Transport Bridging
Handling Errors
In most of the C examples we've seen so far there's been no error handling. Real code should do error handling on every single ØMQ call.
void *context = zmq-ctx-new ();
assert (context);
void *socket = zmq-socket (context, ZMQ-REP);
assert (socket);
int rc = zmq-bind (socket, "tcp://*:5555");
if (rc != 0) {
printf ("E: bind failed: %s\n", strerror (errno));
return -1;
}
assert (context);
void *socket = zmq-socket (context, ZMQ-REP);
assert (socket);
int rc = zmq-bind (socket, "tcp://*:5555");
if (rc != 0) {
printf ("E: bind failed: %s\n", strerror (errno));
return -1;
}
We'll use a publish-subscribe model to send kill messages to the workers:
- The sink creates a PUB socket on a new endpoint.
- Workers bind their input socket to this endpoint.
- When the sink detects the end of the batch it sends a kill to its PUB socket.
- When a worker detects this kill message, it exits.
void *control = zmq-socket (context, ZMQ-PUB);
zmq-bind (control, "tcp://*:5559");
…
//Send kill signal to workers
zmq-msg-init-data (&message, "KILL", 5);
zmq-msg-send (control, &message, 0);
zmq-msg-close (&message);
zmq-bind (control, "tcp://*:5559");
…
//Send kill signal to workers
zmq-msg-init-data (&message, "KILL", 5);
zmq-msg-send (control, &message, 0);
zmq-msg-close (&message);
Handling Interrupt Signals
s-catch-signals ();
client = zmq-socket (...);
while (!s-interrupted) {
char *message = s-recv (client);
if (!message)
break; // Ctrl-C used
}
zmq-close (client);
client = zmq-socket (...);
while (!s-interrupted) {
char *message = s-recv (client);
if (!message)
break; // Ctrl-C used
}
zmq-close (client);
Detecting Memory Leaks
valgrind:
sudo apt-get install valgrind
Multithreading with ØMQ
If
you've spent years learning tricks to make your MT code work at all,
let alone rapidly, with locks and semaphores and critical sections, you
will be disgusted when you realize it was all for nothing. If there's
one lesson we've learned from 30+ years of concurrent programming it is: just don't share state.
You should follow some rules to write happy multithreaded code with ØMQ:
- You must not access the same data from multiple threads. Using classic MT techniques like mutexes are an anti-pattern in ØMQ applications. The only exception to this is a ØMQ context object, which is threadsafe.
- You must create a ØMQ context for your process, and pass that to all threads that you want to connect via inproc sockets.
- You may treat threads as separate tasks, with their own context, but these threads cannot communicate over inproc. However they will be easier to break into standalone processes afterwards.
- You must not share ØMQ sockets between threads. ØMQ sockets are not threadsafe. Technically it's possible to do this, but it demands semaphores, locks, or mutexes. This will make your application slow and fragile. The only place where it's remotely sane to share sockets between threads are in language bindings that need to do magic like garbage collection on sockets.
Singaling between Threads ( PAIR sockets )
the only mechanism that you should use are ØMQ messages.
This is a classic pattern for multithreading with ØMQ ( The Relay Race ) :
- Two threads communicate over inproc, using a shared context.
- The parent thread creates one socket, binds it to an inproc:// endpoint, and then starts the child thread, passing the context to it.
- The child thread creates the second socket, connects it to that inproc:// endpoint, and then signals to the parent thread that it's ready.
mtrelay.c : multithreaded relay in C
For these reasons, PAIR makes the best choice for coordination between pairs of threads.
For these reasons, PAIR makes the best choice for coordination between pairs of threads.
Node Coordination
syncpub.c: Synchronized publisher in C
- The publisher knows in advance how many subscribers it expects. This is just a magic number it gets from somewhere.
- The publisher starts up and waits for all subscribers to connect. This is the node coordination part. Each subscriber subscribes and then tells the publisher it's ready via another socket.
- When the publisher has all subscribers connected, it starts to publish data.
Zero Copy
ØMQ's
message API lets you can send and receive messages directly from and to
application buffers without copying data. We call "zero-copy", and it
can improve performance in some applications.
There is no way
to do zero-copy on receive: ØMQ delivers you a buffer that you can store
as long as you wish but it will not write data directly into
application buffers.
Pub-Sub Message Envelops
Pub-Sub Envelope with Separate Key
psenvpub.c: Pub-Sub envelop publisher in C
s_sendmore (publisher, "B");
s_send (publisher, "We would like to see this");
s_send (publisher, "We would like to see this");
psenvsub.c: Pub-Sub envelope subscriber in C
//Read envelope with address
char *address = s_recv (subscriber);
//Read message contents
char *contents = s_recv (subscriber);
printf ("[%s] %s\n", address, contents);
char *address = s_recv (subscriber);
//Read message contents
char *contents = s_recv (subscriber);
printf ("[%s] %s\n", address, contents);
High Water Marks
When
you can send messages rapidly from process to process, you soon
discover that memory is a precious resource, and one that can be
trivially filled up.
The answer for messaging is to set limits
on the size of buffers, and then when we reach those limits, take some
sensible action. In most cases (not for a subway system, though), the
answer is to throw away messages. In a few others, it's to wait.
In ØMQ/2.x the HWM was infinite by default. In ØMQ/3.x it's set to 1,000 by default, which is more sensible.
Some
sockets (PUB, PUSH) only have transmit buffers. Some (SUB, PULL, REQ,
REP) only have receive buffers. Some (DEALER, ROUTER, PAIR) have both
transmit and receive buffers.