Thursday, December 27, 2012

[ZeroMQ] The example of ZeroMQ via C#

Recently I found these two articles that give a very good introduction of ZeroMQ and an useful use case for synchronized PUB-SUB pattern. Although it uses C# as its programming language instead of C, but I still think we can learn some important concept from these.And also, these articles provide very good images to illustrate the communication patterns.

http://www.codeproject.com/Articles/488207/ZeroMQ-via-Csharp-Introduction
  • There are several communication patterns described

http://www.codeproject.com/Articles/514959/ZeroMQ-via-Csharp-Multi-part-messages-JSON-and-Syn
  • Multi-part messages
  • Synchronized Pub-Sub pattern using PUB-SUB + REQ-REP





Thursday, December 13, 2012

[SWIG] How to add C function in Python by using SWIG

Here is a simple example to use SWIG to automatically wrap C function and generate a wrapper and build a shared library for python.

/*** File : example.c ***/
#include 
<time.h>
double My_variable = 3.0;
int fact(int n) {
if (n <= 1) return 1;
else return n*fact(n-1);
}
int my_mod(int x, int y) {
return (x%y);
}
char *get_time()
{
time_t ltime;
time(&ltime);
return ctime(&ltime);
}
/*************************/


/*** example.i ***/
%module example
%{
/* Put header files here or function declarations like below */
extern double My_variable;
extern int fact(int n);
extern int my_mod(int x, int y);
extern char *get_time();
%}
extern double My_variable;
extern int fact(int n);
extern int my_mod(int x, int y);
extern char *get_time();
/******************/


Steps:
1. $> swig -python example.i
2. $ >gcc -c -fpic example.c example_wrap.c -I/usr/include/python2.7 -I/usr/lib/python2.7/config
3. $> ld -shared example.o example_wrap.o -o _example.so
4. $> python
>>>import example
>>>example.fact(5)
120 -> Oh, yes!

Here is another example:

Basically you just a have to write an interface file *.i that just includes the headers you need. example.i :
 %module example
  %{
  /* Includes the header in the wrapper code */
  #include "header.h"
  %}

  /* Parse the header file to generate wrappers */
  %include "header.h"
Then use SWIG to generate a wrapper. Compile it with your c++ code and you're done:
$ swig -python example.i
$ g++ -c example.cc example_wrap.cc \
    -I/usr/local/include/python2.1
$ ld -shared example.o example_wrap.o -o _example.so
You got your python module.
>>> import example
>>> example.foo()


What if you have some pointer arguments in C ? How to deal with it in Python? 

For example:
void add(double a, double b, double *result) {
 *result = a + b;
}

Please refer to this document. It will give you the nametype mapping in SWIG.
http://www.swig.org/Doc1.3/Arguments.html




Wednesday, December 12, 2012

[Trema] The L2 isolation mechanism in sliceable switch

If someone has ever seen the documents about sliceable switch as below, he/she will feel headache or sick because of a lot of contents and description.
https://github.com/trema/apps/wiki/sliceable_switch_tutorial
https://github.com/trema/apps/wiki/sliceable_switch_features

Now, I will give a flow control chart of slice function which is summarized from the source code ( slice.c ). That can give you a clear image about L2 isolation mechanism in sliceable switch, specially in Slice function. Check it out as the following chart:

 So, broadly speaking, the slice function will check mac binding first, then port_mac binding, and finally port binding. Meanwhile, some configurations will affect the result, for instance, "restrict hosts on port" enabled will force the slice function to check port_mac binding, otherwise, it won't do that.
Based on this flow chart, you can compare with the test cases in https://github.com/trema/apps/wiki/sliceable_switch_features







Monday, December 10, 2012

[MongoDB] Install MongoDB and try a simple example of mongodb_c_driver

MongoDB Installation
http://docs.mongodb.org/manual/tutorial/install-mongodb-on-debian-or-ubuntu-linux/
or
https://www.digitalocean.com/community/articles/how-to-install-mongodb-on-ubuntu-12-04
For instance in my environment:
  > sudo apt-key adv --keyserver keyserver.ubuntu.com --recv 7F0CEB10
  > sudo echo "deb http://downloads-distro.mongodb.org/repo/ubuntu-upstart dist 10gen" | tee -a /etc/apt/sources.list.d/10gen.list
  > sudo apt-get -y update
  > sudo apt-get -y install mongodb-10gen

Try command on MongoDB
>mongo
MongoDB shell version: 2.2.2
connecting to: test

> show dbs
db    (empty)
local    (empty)
test    0.203125GB

 > use test
switched to db test

> show collections
foo
system.indexes

### Insert new data ###
> db.foo.save({a:1})
> doc = {
... "name" : "kristina",
... "contact info" : {
... "twitter" : "@kchodorow",
... "email" : "kristina@10gen.com"
... },
... "friends" : 400232,
... "pic" : BinData(...)
... "member since" : new Date()}
> db.foo.insert(doc)
> db.foo.save({a:2})
> db.foo.save({a:3})

### Find the data ###
> db.foo.find()
{ "_id" : ObjectId("50c592540770027c182d31b9"), "a" : 1 }
{ "_id" : ObjectId("50c59e030770027c182d31ba"), "name" : "kristina", "contact info" : { "twitter" : "@kchodorow", "email" : "kristina@10gen.com" }, "friends" : 400232, "member since" : ISODate("2012-12-10T08:30:50.389Z") }
{ "_id" : ObjectId("50c59e1f0770027c182d31bb"), "a" : 2 }
{ "_id" : ObjectId("50c59e220770027c182d31bc"), "a" : 3 }

> db.foo.findOne()
{ "_id" : ObjectId("50c592540770027c182d31b9"), "a" : 1 }
> db.foo.find({"a":1})
{ "_id" : ObjectId("50c592540770027c182d31b9"), "a" : 1 }
> db.foo.find({"name":"kristina"})
{ "_id" : ObjectId("50c59e030770027c182d31ba"), "name" : "kristina", "contact info" : { "twitter" : "@kchodorow", "email" : "kristina@10gen.com" }, "friends" : 400232, "member since" : ISODate("2012-12-10T08:30:50.389Z") }

Try a simple example of mongodb c driver
For more API info in details, please refer to this: http://api.mongodb.org/c/current/tutorial.htm
 

> gcc --std=c99 -I/usr/local/include -L/usr/local/lib -o mongodb_test mongodb_test.c -lmongoc
> ./mongodb_test
WARNING: mongo_connect() is deprecated, please use mongo_client()
MONGO_OK:connection succeeded
0


mongodb_test.c (source code)

#include <stdio.h> #include "mongo.h" int main() { mongo conn[1]; int status; status = mongo_connect( conn, "127.0.0.1", 27017 ); if( status != MONGO_OK ) { switch ( conn->err ) { case MONGO_CONN_SUCCESS: printf( "connection succeeded\n" ); break; //case MONGO_CONN_BAD_ARG: printf( "bad arguments\n" ); return 1; case MONGO_CONN_NO_SOCKET: printf( "no socket\n" ); return 1; case MONGO_CONN_FAIL: printf( "connection failed\n" ); return 1; case MONGO_CONN_NOT_MASTER: printf( "not master\n" ); return 1; } }else{ printf( "MONGO_OK:connection succeeded\n%d\n", status ); } mongo_destroy( conn ); return 0; }


Sunday, December 9, 2012

[Memcached] Install memcached and try libmemcached C API


Install from package
> sudo apt-get install memcached

or Install from source code
We need to have :
  1. libevent downloaded from : http://libevent.org/ 
    • > ./configure --prefix=/usr
    • > make
    • > sudo make install
  2. memcached downloaded from : http://memcached.org/
    • > ./configure --prefix=/usr/local
    • > make
    • > sudo make install
Check the status of memcached
 > sudo service memcached status


Install libmemcached C API from source code

  1. libmemcached C API ownloaded from http://libmemcached.org/libMemcached.html
    • > ./configure --prefix=/usr
    • > make 
    • > sudo make install

Give a simple try for libmemcached C API
> gcc -o mem_test2 mem_test2.c -lmemcached -lpthread
> ./mem_test2
Save key:key1 data:"This is c first value" success.
Fetch key:key1 data:This is c first value
Delete Key key1 success.

mem_test2.c

#include <stdio.h> #include <stdlib.h> #include <string.h> #include <libmemcached/memcached.h> int main(int argc, char *argv[]) { memcached_st *memc; memcached_return rc; memcached_server_st *servers; char value[8191]; //connect multi server memc = memcached_create(NULL); servers = memcached_server_list_append(NULL, "localhost", 11211, &rc); servers = memcached_server_list_append(servers, "localhost", 11212, &rc); rc = memcached_server_push(memc, servers); memcached_server_free(servers); //Save multi data size_t i; char *keys[] = {"key1", "key2", "key3"}; size_t key_length[] = {4, 4, 4}; char *values[] = {"This is c first value", "This is c second value", "This is c third value"}; size_t val_length[] = {21, 22, 21}; for (i = 0; i < 3; i++) { rc = memcached_set(memc, keys[i], key_length[i], values[i], val_length[i], (time_t) 180, (uint32_t) 0); if (rc == MEMCACHED_SUCCESS) { printf("Save key:%s data:\"%s\" success.\n", keys[i], values[i]); } } //Fetch multi data char return_key[MEMCACHED_MAX_KEY]; size_t return_key_length; char *return_value; size_t return_value_length; uint32_t flags; rc = memcached_mget(memc, keys, key_length, 3); while ((return_value = memcached_fetch(memc, return_key, &return_key_length, &return_value_length, &flags, &rc))) { if (rc == MEMCACHED_SUCCESS) { printf("Fetch key:%s data:%s\n", return_key, return_value); } } //Delete multi data for (i = 0; i < 3; i++) { rc = memcached_set(memc, keys[i], key_length[i], values[i], val_length[i], (time_t) 180, (uint32_t) 0); rc = memcached_delete(memc, keys[i], key_length[i], (time_t) 0); if (rc == MEMCACHED_SUCCESS) { printf("Delete %s success\n", keys[i], values[i]); } } //free memcached_free(memc); return 0; }



Tuesday, December 4, 2012

[Presentation] OpenStack 2012 fall summit observation - Quantum/SDN

Taiwan OpenStack User Group (TWOSUG)3rd Meet Up is hold in Dec 5, 2012. I give a presentation in one of session, which is "OpenStack 2012 fall summit observation - Quantum/SDN". The topic is focused on Quantum and SDN and the slide is shared on shlideshare as follows:
http://www.slideshare.net/teyenliu/open-stack-2012-fall-summit-observation-with-quantumsdn-15493510

Friday, November 23, 2012

[iptables] some common examples of iptables rule


  • Read all tables without DNS lookup
    • > iptables -L -n
  • Obtain the line number of the lines: 
    •  > iptables -L -nv --line-numbers
  • Read NAT table in list without DNS lookup
    • > iptables -t nat -L -n
  • Do NAT ( SNAT )
    • > echo "1" > /proc/sys/net/ipv4/ip_forward
    • > iptables-t nat -A POSTROUTING -s ${INSIDE_NETWORK}/${INSIDE_NETMASK} -o ${OUTSIDE_DEVICE} -j MASQUERADE
    • or > iptables-t nat -A POSTROUTING -s ${INSIDE_NETWORK}/${INSIDE_NETMASK} -o ${OUTSIDE_DEVICE} -j SNAT --to ${TARGET_IP}
  • Do DNAT 
    • > iptables -t nat -A PREROUTING -i eth0 -p tcp --dport 80 -j DNAT --to-destination 192.168.100.10:80
  • Drop the packet which is from 192.168.2.20 to 192.168.1.100 with TCP port 80
    • > iptables -A POSTROUTING -t nat -s 192.168.2.20 -d 192.168.1.100 -p TCP --dport 80 -j DROP
  • Accept the packet which is from 192.168.100.0/24 and interface eth1
    • > iptables -A INPUT -i eth1 -s 192.168.100.0/24 -j ACCEPT
  •  Insert a logging rule between the last one which drops packet with iptables something like this would do the trick
    • > iptables -I INPUT (next-to-the-last rule number) -j LOG --log-prefix "blocked packets : "


iptables [-AI 鏈名] [-io 網路介面] [-p 協定] \
> [-s 來源IP/網域] [-d 目標IP/網域] -j [ACCEPT|DROP|REJECT|LOG]
選項與參數:

-S:規則列表
-t:指定表格 ( nat / filter ) 不用t 則預設為 filter
-AI 鏈名:針對某的鏈進行規則的 "插入" "累加"
    -A :新增加一條規則,該規則增加在原本規則的最後面。例如原本已經有四條規則,
         使用 -A 就可以加上第五條規則!
    -I :插入一條規則。如果沒有指定此規則的順序,預設是插入變成第一條規則。
         例如原本有四條規則,使用 -I 則該規則變成第一條,而原本四條變成 2~5
    :有 INPUT, OUTPUT, FORWARD 等,此鏈名稱又與 -io 有關,請看底下。

-io 網路介面:設定封包進出的介面規範
    -i :封包所進入的那個網路介面,例如 eth0, lo 等介面。需與 INPUT 鏈配合;
    -o :封包所傳出的那個網路介面,需與 OUTPUT 鏈配合;

-p 協定:設定此規則適用於哪種封包格式
   主要的封包格式有: tcp, udp, icmp all

-s 來源 IP/網域:設定此規則之封包的來源項目,可指定單純的 IP 或包括網域,例如:
   IP  192.168.0.100
   網域:192.168.0.0/24, 192.168.0.0/255.255.255.0 均可。
   若規範為『不許』時,則加上 ! 即可,例如:
   -s ! 192.168.100.0/24 表示不許 192.168.100.0/24 之封包來源;

-d 目標 IP/網域:同 -s ,只不過這裡指的是目標的 IP 或網域。

-j :後面接動作,主要的動作有接受(ACCEPT)、丟棄(DROP)、拒絕(REJECT)及記錄(LOG)

iptables -L -n -v -x

iptables -N TRAFFIC_ACCT
iptables -I FORWARD -j TRAFFIC_ACCT
iptables -D FORWARD -j TRAFFIC_ACCT
iptables -X TRAFFIC_ACCT


iptables -A TRAFFIC_ACCT -p tcp
iptables -A TRAFFIC_ACCT -p udp
iptables -A TRAFFIC_ACCT -p icmp

Monday, November 19, 2012

[Mongrel2] How to write a handler for mongrel2 web server

Mongrel2 is an application, language, and network architecture agnostic web server that focuses on web applications. The most powerful functionality is to use Handler to deal with ZeroMQ message.
There are already some articles talking about Handler and how to get started, for instance:
http://www.ioncannon.net/programming/1384/example-mongrel2-handler-in-ruby/
http://brubeck.io/demos.html

I almost studied 2 days to understand the usage of handler and try a simple handler to respond the request, and finally I finished. So, in this article, I will give a concept about Mongrel2 web server and a simple example.


  • From this diagram, you will quickly realize how the mongrel2 web server works with your handler by what kind of zeromq communication type.
  • Second, I will give the handler example in details:
The conf file ==> mongrel2.conf
brubeck_handler = Handler(
    send_spec='tcp://127.0.0.1:9999',
    send_ident='34f9ceee-cd52-4b7f-b197-88bf2f0ec378',
    recv_spec='tcp://127.0.0.1:9998',
    recv_ident='')

media_dir = Dir(
    base='media/',
    index_file='index.html',
    default_ctype='text/plain')

brubeck_host = Host(
    name="localhost",
    routes={
        '/media/': media_dir,
        '/handlers': brubeck_handler})

brubeck_serv = Server(
    uuid="f400bf85-4538-4f7a-8908-67e313d515c2",
    access_log="/log/mongrel2.access.log",
    error_log="/log/mongrel2.error.log",
    chroot="./",
    default_host="localhost",
    name="brubeck test",
    pid_file="/run/mongrel2.pid",
    port=6767,
    hosts = [brubeck_host]
)

settings = { "zeromq.threads": 1 }

servers = [brubeck_serv]


Load the conf file to mongrel2
> m2sh load -config mongrel2.conf -db the.db

Run the mongrel2 web server
> m2sh start -db the.db -host localhost

Run the simple handler source code ( in Python ) ==> simple_handler.py
> python simple_handler.py

#!/usr/bin/env python
import zmq

ctx = zmq.Context()
pull = ctx.socket(zmq.PULL)
pull.connect("tcp://127.0.0.1:9999")
pub = ctx.socket(zmq.PUB)
pub.connect("tcp://127.0.0.1:9998")

while True:
    msg = pull.recv()
    print msg
    sender_uuid, client_id, request_path, request_message = msg.split(" ", 4)
    ret_content = "Hi...This is Danny..."
    ret_msg = '%s 1:%s, HTTP/1.1 200 OK\r\nContent-Length: %d\r\n\r\n%s' %
    (sender_uuid, client_id , ret_content.__len__(), ret_content)
    pub.send(ret_msg, 0)




Use curl to test it and will get the result as follows:
> curl http://localhost:6767/handlers
Hi...This is Danny...



Sunday, November 11, 2012

[ZeroMQ] the study note of the ØMQ guilde -- Part II

Shared Queue ( DEALER and ROUTER sockets)
The only constraint is that services must be stateless, all state being in the request or in some shared storage such as a database.
It then uses zmq-poll[3] to monitor these two sockets for activity and when it has some, it shuttles messages between its two sockets. It doesn't actually manage any queues explicitly — ØMQ does that automatically on each socket.

Built-in Proxy Function
Please see the examle of msgqueue.c that replaces rrbroker.c with built-in proxy function.

Transport Bridging
fig18.png

Handling Errors
In most of the C examples we've seen so far there's been no error handling. Real code should do error handling on every single ØMQ call.
void *context = zmq-ctx-new ();
assert (context);
void *socket = zmq-socket (context, ZMQ-REP);
assert (socket);
int rc = zmq-bind (socket, "tcp://*:5555");
if (rc != 0) {
printf ("E: bind failed: %s\n", strerror (errno));
return -1;
}

We'll use a publish-subscribe model to send kill messages to the workers:
  • The sink creates a PUB socket on a new endpoint.
  • Workers bind their input socket to this endpoint.
  • When the sink detects the end of the batch it sends a kill to its PUB socket.
  • When a worker detects this kill message, it exits.
It doesn't take much new code in the sink:
void *control = zmq-socket (context, ZMQ-PUB);
zmq-bind (control, "tcp://*:5559");

//Send kill signal to workers
zmq-msg-init-data (&message, "KILL", 5);
zmq-msg-send (control, &message, 0);
zmq-msg-close (&message);
Handling Interrupt Signals
s-catch-signals ();
client = zmq-socket (...);
while (!s-interrupted) {
    char *message = s-recv (client);
    if (!message)
        break;          //  Ctrl-C used
}
zmq-close (client);

Detecting Memory Leaks
valgrind: sudo apt-get install valgrind

Multithreading with ØMQ
If you've spent years learning tricks to make your MT code work at all, let alone rapidly, with locks and semaphores and critical sections, you will be disgusted when you realize it was all for nothing. If there's one lesson we've learned from 30+ years of concurrent programming it is: just don't share state.
You should follow some rules to write happy multithreaded code with ØMQ:
  • You must not access the same data from multiple threads. Using classic MT techniques like mutexes are an anti-pattern in ØMQ applications. The only exception to this is a ØMQ context object, which is threadsafe.
  • You must create a ØMQ context for your process, and pass that to all threads that you want to connect via inproc sockets.
  • You may treat threads as separate tasks, with their own context, but these threads cannot communicate over inproc. However they will be easier to break into standalone processes afterwards.
  • You must not share ØMQ sockets between threads. ØMQ sockets are not threadsafe. Technically it's possible to do this, but it demands semaphores, locks, or mutexes. This will make your application slow and fragile. The only place where it's remotely sane to share sockets between threads are in language bindings that need to do magic like garbage collection on sockets.

Singaling between Threads ( PAIR sockets )
the only mechanism that you should use are ØMQ messages.
This is a classic pattern for multithreading with ØMQ ( The Relay Race ) :
  1. Two threads communicate over inproc, using a shared context.
  2. The parent thread creates one socket, binds it to an inproc:// endpoint, and then starts the child thread, passing the context to it.
  3. The child thread creates the second socket, connects it to that inproc:// endpoint, and then signals to the parent thread that it's ready.
fig21.png
mtrelay.c : multithreaded relay in C
For these reasons, PAIR makes the best choice for coordination between pairs of threads.

Node Coordination
syncpub.c: Synchronized publisher in C
  • The publisher knows in advance how many subscribers it expects. This is just a magic number it gets from somewhere.
  • The publisher starts up and waits for all subscribers to connect. This is the node coordination part. Each subscriber subscribes and then tells the publisher it's ready via another socket.
  • When the publisher has all subscribers connected, it starts to publish data.

Zero Copy
ØMQ's message API lets you can send and receive messages directly from and to application buffers without copying data. We call "zero-copy", and it can improve performance in some applications.
There is no way to do zero-copy on receive: ØMQ delivers you a buffer that you can store as long as you wish but it will not write data directly into application buffers.

Pub-Sub Message Envelops
Pub-Sub Envelope with Separate Key
fig23.png
psenvpub.c: Pub-Sub envelop publisher in C
s_sendmore (publisher, "B");
s_send (publisher, "We would like to see this");
psenvsub.c: Pub-Sub envelope subscriber in C
//Read envelope with address
char *address = s_recv (subscriber);
//Read message contents
char *contents = s_recv (subscriber);
printf ("[%s] %s\n", address, contents);

High Water Marks
When you can send messages rapidly from process to process, you soon discover that memory is a precious resource, and one that can be trivially filled up.
The answer for messaging is to set limits on the size of buffers, and then when we reach those limits, take some sensible action. In most cases (not for a subway system, though), the answer is to throw away messages. In a few others, it's to wait.
In ØMQ/2.x the HWM was infinite by default. In ØMQ/3.x it's set to 1,000 by default, which is more sensible.
Some sockets (PUB, PUSH) only have transmit buffers. Some (SUB, PULL, REQ, REP) only have receive buffers. Some (DEALER, ROUTER, PAIR) have both transmit and receive buffers.

Friday, November 9, 2012

[ZeroMQ] the study note of the ØMQ guilde -- Part I

I have intrudced ZeroMQ as very powerful tool to leverage your application to become a distrubuted system. If you see http://zguide.zeromq.org/page:all and take a look at the content, you will realize there are a lot of stuff that needs to study in details. Due to this reason, I will summarize what I have studied in ZeroMQ and let me give some notes about the important concepts.
Request-Reply pattern:
The REQ-REP socket pair is lockstep. The client does zmq_msg_send(3) and then zmq_msg_recv(3), in a loop.
They create a ØMQ context to work with, and a socket.
If you kill the server (Ctrl-C) and restart it, the client won't recover properly.

Take care of string in C:
When you receive string data from ØMQ, in C, you simply cannot trust that it's safely terminated. Every single time you read a string you should allocate a new buffer with space for an extra byte, copy the string, and terminate it properly with a null.
So let's establish the rule that ØMQ strings are length-specified, and are sent on the wire without a trailing null.

Version Reporting:
int major, minor, patch;
zmq_version (&major, &minor, &patch);
printf ("Current 0MQ version is %d.%d.%d\n", major, minor, patch);

Publish-Subscribe Pattern:
  • A subscriber can connect to more than one publisher, using one 'connect' call each time. Data will then arrive and be interleaved ("fair-queued") so that no single publisher drowns out the others.
  • If a publisher has no connected subscribers, then it will simply drop all messages.
  • If you're using TCP, and a subscriber is slow, messages will queue up on the publisher. We'll look at how to protect publishers against this, using the "high-water mark" later.
  • In the current versions of ØMQ, filtering happens at the subscriber side, not the publisher side. This means, over TCP, that a publisher will send all messages to all subscribers, which will then drop messages they don't want.
Note that when you use a SUB socket you must set a subscription using zmq_setsockopt(3) and SUBSCRIBE, as in this code.
The PUB-SUB socket pair is asynchronous. The client does zmq_msg_recv(3), in a loop.
the subscriber will always miss the first messages that the publisher sends because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.
1. In Chapter Two I'll explain how to synchronize a publisher and subscribers so that you don't start to publish data until the subscriber(s) really are connected and ready.
2. The alternative to synchronization is to simply assume that the published data stream is infinite and has no start, and no end. This is how we built our weather client example.
You can stop and restart the server as often as you like, and the client will keep working.

Divide and Conquer
We have to synchronize the start of the batch with all workers being up and running.
If you don't synchronize the start of the batch somehow, the system won't run in parallel at all.
The ventilator's PUSH socket distributes tasks to workers evenly. This is called load-balancing and it's something we'll look at again in more detail.
The sink's PULL socket collects results from workers evenly. This is called fair-queuing.

Programming with 0MQ
Sockets are not threadsafe. It became legal behavior to migrate sockets from one thread to another in ØMQ/2.1, but this remains dangerous unless you use a "full memory barrier".
it's the zmq_ctx_new(3) call. You should create and use exactly one context in your process.
Do one zmq_ctx_new(3) at the start of your main line code, and one zmq_ctx_destroy(3) at the end.
If you're using the fork() system call, each process needs its own context.
  • Always close a message the moment you are done with it, using zmq_msg_close(3).
  • If you are opening and closing a lot of sockets, that's probably a sign you need to redesign your application.
  • When you exit the program, close your sockets and then call zmq_ctx_destroy(3). This destroys the context.
First, do not try to use the same socket from multiple threads.
Next, you need to shut down each socket that has ongoing requests.
Finally, destroy the context. Catch that error, and then set linger on, and close sockets in that thread, and exit. Do not destroy the same context twice.

Brokers are an excellent thing in reducing the complexity of large networks. But adding broker-based messaging to a product like Zookeeper would make it worse, not better. It would mean adding an additional big box, and a new single point of failure. A broker rapidly becomes a bottleneck and a new risk to manage.

And this is ØMQ: an efficient, embeddable library that solves most of the problems an application needs to become nicely elastic across a network, without much cost.
  • It handles I/O asynchronously, in background threads. These communicate with application threads using lock-free data structures, so concurrent ØMQ applications need no locks, semaphores, or other wait states.
  • Components can come and go dynamically and ØMQ will automatically reconnect. This means you can start components in any order. You can create "service-oriented architectures" (SOAs) where services can join and leave the network at any time.
  • It queues messages automatically when needed. It does this intelligently, pushing messages as close as possible to the receiver before queuing them.
  • It has ways of dealing with over-full queues (called "high water mark"). When a queue is full, ØMQ automatically blocks senders, or throws away messages, depending on the kind of messaging you are doing (the so-called "pattern").
  • It lets your applications talk to each other over arbitrary transports: TCP, multicast, in-process, inter-process. You don't need to change your code to use a different transport.
  • It handles slow/blocked readers safely, using different strategies that depend on the messaging pattern.
  • It lets you route messages using a variety of patterns such as request-reply and publish-subscribe. These patterns are how you create the topology, the structure of your network.
  • It lets you create proxies to queue, forward, or capture messages with a single call. Proxies can reduce the interconnection complexity of a network.
  • It delivers whole messages exactly as they were sent, using a simple framing on the wire. If you write a 10k message, you will receive a 10k message.
  • It does not impose any format on messages. They are blobs of zero to gigabytes large. When you want to represent data you choose some other product on top, such as Google's protocol buffers, XDR, and others.
  • It handles network errors intelligently. Sometimes it retries, sometimes it tells you an operation failed.
  • It reduces your carbon footprint. Doing more with less CPU means your boxes use less power, and you can keep your old boxes in use for longer. Al Gore would love ØMQ.
Missing Message Problem Solver
fig9.png
The main change in 3.x is that PUB-SUB works properly, as in, the publisher only sends subscribers stuff they actually want. In 2.x, publishers send everything and the subscribers filter.
Most of the API is backwards compatible, except a few blockheaded changes that went into 3.0 with no real regard to the cost of breaking existing code. The syntax of zmq_send(3) and zmq_recv(3) changed, and ZMQ_NOBLOCK got rebaptised to ZMQ_DONTWAIT.
So the minimal change for C/C++ apps that use the low-level libzmq API is to replace all calls to zmq_send with zmq_msg_send, and zmq_recv with zmq_msg_recv.

Pluging Sockets into the Topology
So mostly it's obvious which node should be doing zmq_bind(3) (the server) and which should be doing zmq_connect(3) (the client).
A server node can bind to many endpoints and it can do this using a single socket. This means it will accept connections across different transports:
zmq_bind (socket, "tcp://*:5555");
zmq_bind (socket, "tcp://*:9999");
zmq_bind (socket, "ipc://myserver.ipc");

Using Sockets to Carry Data
Let's look at the main differences between TCP sockets and ØMQ sockets when it comes to carrying data:
  • ØMQ sockets carry messages, rather than bytes (as in TCP) or frames (as in UDP). A message is a length-specified blob of binary data. We'll come to messages shortly, their design is optimized for performance and thus somewhat tricky to understand.
  • ØMQ sockets do their I/O in a background thread. This means that messages arrive in a local input queue, and are sent from a local output queue, no matter what your application is busy doing. These are configurable memory queues, by the way.
  • ØMQ sockets can, depending on the socket type, be connected to (or from, it's the same) many other sockets. Where TCP emulates a one-to-one phone call, ØMQ implements one-to-many (like a radio broadcast), many-to-many (like a post office), many-to-one (like a mail box), and even one-to-one.
  • ØMQ sockets can send to many endpoints (creating a fan-out model), or receive from many endpoints (creating a fan-in model)..

Unicast Transports
ØMQ provides a set of unicast transports (inproc, ipc, and tcp) and multicast transports (epgm, pgm). The inter-process transport, ipc, is like tcp except that it is abstracted from the LAN, so you don't need to specify IP addresses or domain names.
It has one limitation: it does not work on Windows. This may be fixed in future versions of ØMQ.
The inter-thread transport, inproc, is a connected signaling transport. It is much faster than tcp or ipc. This transport has a specific limitation compared to ipc and tcp: you must do bind before connect.

ØMQ is not a neutral carrier
it imposes a framing on the transport protocols it uses. This framing is not compatible with existing protocols, which tend to use their own framing.
You need to implement whatever protocol you want to speak in any case, but you can connect that protocol server (which can be extremely thin) to a ØMQ backend that does the real work. The beautiful part here is that you can then extend your backend with code in any language, running locally or remotely, as you wish. Zed Shaw's Mongrel2 web server is a great example of such an architecture.

I/O Threads
We said that ØMQ does I/O in a background thread. When you create a new context it starts with one I/O thread.
int io_threads = 4;
void *context = zmq_ctx_new ();
zmq_ctx_set (context, ZMQ_IO_THREADS, io_threads);
assert (zmq_ctx_get (context, ZMQ_IO_THREADS) == io_threads);

Limiting Socket Use
By default, a ØMQ socket will continue to accept connections until your operating system runs out of file handles. You can set a limit using another zmq_ctx_set(3) call:
int max_sockets = 1024;
void *context = zmq_ctx_new ();
zmq_ctx_get (context, ZMQ_MAX_SOCKETS, max_sockets);
assert (zmq_ctx_get (context, ZMQ_MAX_SOCKETS) == max_sockets);

Core Messaging Patterns
  • Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
  • Publish-subscribe, which connects a set of publishers to a set of subscribers. This is a data distribution pattern.
  • Pipeline, connects nodes in a fan-out / fan-in pattern that can have multiple steps, and loops. This is a parallel task distribution and collection pattern.
  • Exclusive pair, which connects two sockets in an exclusive pair. This is a pattern you should use only to connect two threads in a process. We'll see an example at the end of this chapter.
The zmq_socket(3) man page is fairly clear about the patterns, it's worth reading several times until it starts to make sense. These are the socket combinations that are valid for a connect-bind pair (either side can bind):
  • PUB and SUB
  • REQ and REP
  • REQ and ROUTER
  • DEALER and REP
  • DEALER and ROUTER
  • DEALER and DEALER
  • ROUTER and ROUTER
  • PUSH and PULL
  • PAIR and PAIR
Working with Messages
In memory, ØMQ messages are zmq_msg_t structures
Note than when you have passed a message to zmq_msg_send(3), ØMQ will clear the message, i.e. set the size to zero. You cannot send the same message twice, and you cannot access the message data after sending it.
If you want to send the same message more than once, create a second message, initialize it using zmq_msg_init(3) and then use zmq_msg_copy(3) to create a copy of the first message.
ØMQ also supports multi-part messages, which let you send or receive a list of frames as a single on-the-wire message.

Handle Multiple Sockets
To actually read from multiple sockets at once, use zmq-poll[3].

Multi-part Messages
zmq-msg-send (socket, &message, ZMQ-SNDMORE);

zmq-msg-send (socket, &message, ZMQ-SNDMORE);

zmq-msg-send (socket, &message, 0);

while (1) {
zmq-msg-t message;
zmq-msg-init (&message);
zmq-msg-recv (socket, &message, 0);
//Process the message frame
zmq-msg-close (&message);
int-t more;
size-t more-size = sizeof (more);
zmq-getsockopt (socket, ZMQ-RCVMORE, &more, &more-size);
if (!more)
break;//Last message frame
}
If you are using zmq-poll[3], when you receive the first part of a message, all the rest has also arrived.

Intermediaries and Proxies
The messaging industry calls this "intermediation", meaning that the stuff in the middle deals with either side. In ØMQ we call these proxies, queues, forwarders, device, or brokers, depending on the context.
In classic messaging, this is the job of the "message broker". ØMQ doesn't come with a message broker as such, but it lets us build intermediaries quite easily.
Pub-Sub Network with a Proxy ==>
fig13.png
he best analogy is an HTTP proxy; it's there but doesn't have any special role. Adding a pub-sub proxy solves the dynamic discovery problem in our example.