EzDevInfo.com

jeromq

Pure Java ZeroMQ

Cleanly interrupt zeromq polling thread

I have a multithreaded application written in Java using jeromq 0.3.2. I'm working on putting it into the Tanuki service wrapper so that it runs as a windows service, and I'm having some trouble with cleanly stopping the threads. The run() method of the proxy is a variation on the Simple Pirate pattern from the guide, and looks like this:

public void run()
{
    ZContext context = new ZContext();
    Socket frontend = context.createSocket(ZMQ.ROUTER);
    Socket backend = context.createSocket(ZMQ.ROUTER);
    frontend.bind("tcp://*:5555");
    backend.bind("inproc://workers");

    while (!Thread.currentThread().isInterrupted())
    {
        ZMQ.Poller poller = new ZMQ.Poller(2);
        poller.register(frontend, ZMQ.Poller.POLLIN);
        poller.register(backend, ZMQ.Poller.POLLIN);
        poller.poll();
        if (poller.pollin(0))
        {
            processFrontend(frontend, backend, context);
        }
        if (poller.pollin(1))
        {
            processBackend(frontend, backend);
        }
    }

    // additonal code to close worker threads
}

How can I cleanly exit from this loop when the controlling wrapper wants to stop the application?

If there are no clients currently connected, then the loop is blocked at the call to poller.poll(), so if the wrapper calls interrupt() on the thread it is ignored. If there are clients currently sending in messages, then a call to interrupt() causes the poller.poll() method to throw a zmq.ZError$IOException: java.nio.channels.ClosedByInterruptException

I've also tried to use:

PollItem[] items = {
    new PollItem(frontend, Poller.POLLIN),
    new PollItem(backend, Poller.POLLIN)
};
int rc = ZMQ.poll(items, 2, -1);

if (rc == -1)
    break;

if (items[0].isReadable())
{
    processFrontend(frontend, backend, context);
}

if (items[1].isReadable())
{
    processBackend(frontend, backend);
}

but the call to ZMQ.poll exhibits the same behaviour. Two possible alternatives are:

  • set a timeout on ZMQ.poll and wrap the content of the run() method in a try/catch for the IOException.
  • add a method to my Runnable that will connect to the frontend and send a special message that will be read in processFrontend and cause the code to break out of the loop.

The first seems a bit dirty, and the second feels a bit fragile. Are there any other approaches I should consider? Is there some other polling method I can use that reacts more cleanly to a thread interrupt?


Source: (StackOverflow)

Assertion error polling from inside a new thread in ZeroMQ (JeroMQ)

I have code that looks like this:

public void handleRequests() {
    ZMQ.Poller items = new ZMQ.Poller(1);
    items.register(clientEndpoint, ZMQ.Poller.POLLIN);
    while (!Thread.currentThread().isInterrupted()) {
        byte[] message;
        items.poll();  // this is the line that throws exception.
        if (items.pollin(0)) {
            message = clientEndpoint.recv(0);
        }
    }
}

It works fine when i call it directly:

foo.handleRequests();

but it fails regularly with assertion errors if it is run in a new thread:

final Runnable listener = worldviewServer::handleRequests;
Executors.newSingleThreadExecutor().execute(listener);

The stack trace I get is shown below:

Exception in thread "pool-6-thread-1" java.lang.AssertionError
at zmq.Mailbox.recv(Mailbox.java:113)
at zmq.SocketBase.process_commands(SocketBase.java:820)
at zmq.SocketBase.getsockopt(SocketBase.java:258)
at zmq.PollItem.readyOps(PollItem.java:107)
at zmq.ZMQ.zmq_poll(ZMQ.java:708)
at zmq.ZMQ.zmq_poll(ZMQ.java:600)
at org.zeromq.ZMQ$Poller.poll(ZMQ.java:1618)
at org.zeromq.ZMQ$Poller.poll(ZMQ.java:1592)
at com.tracelink.worldview.server.Head.handleRequests(Head.java:68)
at com.tracelink.worldview.server.WorldviewServer.handleRequests(WorldviewServer.java:236)
at com.tracelink.worldview.server.fsm.EnablingAction$$Lambda$12/404648734.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I'm using Java 8 with JeroMQ 0.3.5-SNAPSHOT


Source: (StackOverflow)

Advertisements

why jeromq push send failed after a period of time

I'm trying to use push/pull pattern with jeromq(0.3.2). At the beginnig, it works well. but after a period of time. the push side doesn't send out messages and blocked there. I don't know why. I set the sendTimeout param, and print the zmq socket error number. it is 35. Is there something I do not notice? or other suggests?

Thanks!

The push side code:

ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket push4Topic = context.socket(ZMQ.PUSH);

private void init() {
        push4Topic.setTCPKeepAlive(1);
        push4Topic.setSendTimeOut(30000);
        push4Topic.bind(bindUrl);
}

public boolean send(String msg) {
        return push4Topic.send(msg);
}

private void destroy() {
        if (push4Topic != null) {
            push4Topic.close();
        }
        if (context != null) {
            context.term();
        }
        logger.info("destroy() socket destroied");
}

====

I add one monitor thread monitoring the push side. then, I found that ZMQ_EVENT_DISCONNECTED event. what is that mean? my pull side code has problems?


Source: (StackOverflow)

ZeroMQ producing meager results

I am testing out ZeroMQ and I am only getting around 1227 - 1276 messages per second. I have read however that these are supposed to be over 100x this amount.

What am I doing wrong? Is there some configuration I can specify to fix this?

I am using the following functionality:

public static final String SERVER_LOCATION = "127.0.0.1";
public static final int SERVER_BIND_PORT = 5570;

public static void receiveMessages() throws InvalidProtocolBufferException, FileNotFoundException, UnsupportedEncodingException{
    ZContext ctx = new ZContext();

    Socket frontend = ctx.createSocket(ZMQ.PULL);
    frontend.bind("tcp://*:"+SERVER_BIND_PORT);

    int i = 1;
    do{
        ZMsg msg = ZMsg.recvMsg(frontend);
        ZFrame content = msg.pop();
        if(content!= null){
            msg.destroy();
            System.out.println("Received: "+i);
            i++;
            content.destroy();
        }
    }while(true);
}

public static void sendMessages() throws FileNotFoundException, UnsupportedEncodingException{
    ZContext ctx = new ZContext();
    Socket client = ctx.createSocket(ZMQ.PUSH);

    client.setIdentity("i".getBytes());
    client.connect("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT);

    PollItem[] items = new PollItem[] { new PollItem(client, Poller.POLLIN) };
    int i = 1;
    Timer t = new Timer(timeToSpendSending);
    t.start();
    do{
        client.send(/* object to send*/ , 0);
        i++;
    }while(!t.isDone());

    System.out.println("Done with "+i);
}

Timer class used to limit time the program runs for:

class Timer extends Thread{
    int time;
    boolean done;
    public Timer(int time){
        this.time = time;
        done = false;
    }
    public void run(){
        try {
            this.sleep(time);
            done = true;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public boolean isDone(){
        return done;
    }
}

Edit: I am using jeroMQ

<dependency>
    <groupId>org.zeromq</groupId>
    <artifactId>jeromq</artifactId>
    <version>0.3.4</version>
</dependency>

Source: (StackOverflow)

JeroMQ does not work across platforms

I'm trying to get a device running a native C build of ZMQ v 3.2.0 to work with a Java application built with JeroMQ (pure Java impl) using pub/sub ZMQ sockets. However, it seems that JeroMQ is using different flag configurations preceding the message payload vs. the C implementation. IIRC, JeroMQ is intended to be compatible with v 3.2.2, so I'm not sure if this is a bug in the JeroMQ port

My Java test code is similar the psenvpub example: public class psenvpub {

public static void main (String[] args) throws Exception {
    // Prepare our context and publisher
    Context context = ZMQ.context(1);
    Socket publisher = context.socket(ZMQ.PUB);

    publisher.bind("tcp://*:15052");
    byte seq = 1;
    while( !Thread.currentThread().isInterrupted() ){
        byte[] message = new byte[8];
        message[0] = 0;
        message[1] = 0;
        message[2] = 0;
        message[3] = 0;
        message[4] = seq++;
        message[5] = 0;
        message[6] = 0;
        message[7] = 0;
        publisher.send(message);
        try{
            Thread.sleep(1000);
        }
        catch(Exception e ){
            break;
        }
    }
}
}

I'm using a perl script for the Native C endpoint:

use strict;
use warnings;

use Vocollect::ZMQ::Context;
use ZMQ::Constants qw(ZMQ_SUB);

my $ctx = Vocollect::ZMQ::Context->new();
my $sock = $ctx->socket(ZMQ_SUB);
$sock->connect('tcp://localhost:15052');
$sock->subscribe('');

while (1) {
    my $msg = $sock->recv(10000);
    print "Received msg\n" if defined($msg);
}

When the subscriber receives its first message, it crashes due to an assert failure in the libzmq source code:

Assertion failed: options.recv_identity (..\..\..\src\socket_base.cpp:990)

Which is:

void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
    //  Test whether IDENTITY flag is valid for this socket type.
    if (unlikely (msg_->flags () & msg_t::identity))
        zmq_assert (options.recv_identity);

    //  Remove MORE flag.
    rcvmore = msg_->flags () & msg_t::more ? true : false;
}

An wireshark trace of the packets sent shows that the handshaking sequence as well as the flags are different between a JeroMQ pub/sub and a native C pub/sub. I have not seen any issues when using endpoints that either both JeroMQ or both native C libzmq.


Source: (StackOverflow)

JeroMq dropping messages – how to prevent?

I have following “hello world” jeromq PUSH-PULL client and server. Only after I set High Water Mark values am I able to transmit without dropping messages.

import org.jeromq.ZMQ;

    public class TestTcpServer {

    public static void main(String[] args) {

    ZMQ.Context context = ZMQ.context(1);
    ZMQ.Socket socket = context.socket(ZMQ.PULL);

    System.out.println("Binding TCP server on port 5555");

    //socket.setRcvHWM(100_000);

    socket.bind("tcp://*:5555");
    int x;
    x = 0;
    while (true) {
        x++;
        byte[] raw = socket.recv(0);
        String rawMessage = new String(raw);
        if (x > 99_997) {
            System.out.println(x);
            System.out.println(rawMessage);
        }
    }

    }

}

//client

import java.io.IOException;
import org.jeromq.ZMQ;

public class TestTcpClient {

/**
 * @param args
 * @throws InterruptedException
 * @throws IOException
 */
public static void main(String[] args) throws InterruptedException,
        IOException {
    ZMQ.Context context = ZMQ.context(1);
    ZMQ.Socket socket = context.socket(ZMQ.PUSH);
    socket.connect("tcp://localhost:5555");

    //socket.setRcvHWM(100_000);

    System.out.println("Sending 100 000 transactions over TCP...");                   long start = System.currentTimeMillis();

    for (int request_nbr = 0; request_nbr != 100_000; request_nbr++) {
        String requestString = "message";
        byte[] request = requestString.getBytes();
        boolean success = socket.send(request, 0);
        if (!success) {
            System.out.println("sending message failed!");
        }

    }
    long end = System.currentTimeMillis();
    System.out.print("Time: ");
    System.out.print(end - start);
    System.out.println(" ms");      
    socket.close();
    context.term();     
}

}

According to 0Mq documentation, only PUB sockets will drop messages when high water mark is reached. Since I am using PUSH-PULL sockets, why are messages being dropped?

It seems to me that HWM is a dynamic property of the system, so while I have been able to resolve dropping messages in this hello world example, I wonder if I can count on jeromq not to drop messages in a real world situation?


Source: (StackOverflow)

Android Application hanging on subscriber.recv()

I am writing an Android application that receives a continuous stream of data. I've set up the connection inside a runnable like so:

Runnable runnable = new Runnable()
    {
        public void run()
        {
        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
        subscriber.connect("tcp://[IP]:[Port]");
            TextView strDisplay = (TextView) findViewById(R.id.stringDisplay);
            while (!Thread.currentThread ().isInterrupted ())
            {
                // Read message contents
                Log.i("Output", "the while loop ran up to here");
                //*HANGS ON THE LINE BELOW*
                String testcase = subscriber.recvStr(0);
                strDisplay.setText(testcase);
                Log.i("Output", "The while loop completed");
            }

Now, after much scouring of the interwebs, I've come to two conclusions:

1) that recvStr() is a blocking call that waits until it receives something. So that means it hasn't connected properly or something else

and

2) that I may have to set up a filter of some sort?

I can't figure out what I should do next. Any help from someone experienced with JeroMQ or Android server access is greatly appreciated


Source: (StackOverflow)

JeroMQ - monitor - endpoints remains

Jeromq - Version 0.3.4

I'm currently working on a web application and encountering into a situation which I cannot start a monitor on inproc address X because it's not unregistered before and will remain forever...

I'm using Req socket and Pair to monitor my response from a Router socket. The idea is to get the updated connectivity status from the monitor thread and if it's not connected, to destroy the sockets and re-send.

The idea of the code:

Socket socket = zContext.socket(ZMQ.REQ);
socket.monitor("inproc://test", ZMQ.EVENT_ALL);

// start monitor thread and inside Pair socket
MonitorThread mt = new MonitorThread(zContext, "inproc://test");
new Thread(mt).start()

socket.connect("tcp://127.0.0.1:5556");
socket.send("test".getBytes());
socket.setReceiveTimeOut(100);
byte[] data = socket.recv();

if (data == null) { // check flag in mt and resend if needed
socket.monitor(null, 0); // should stop the monitor
// stop the monitor Pair socket inside the mt
// stop and recreate the socket & monitor
// send again
}

it looks like after several loops, the Reaper thread not succeed (or on delay) to stop the monitor of the Req socket - socket.monitor(null, 0); and then, the creation of the monitor is stuck! - socket.monitor("inproc://test", ZMQ.EVENT_ALL);

Can you please help me?


Source: (StackOverflow)

why does jeromq use setReuseAddress(true)?

I'm new to zeromq and not that experienced with sockets.

Are ZeroMQ sockets supposed to only allow one socket to bind() to a port?

The jeromq implementation allows more than one; pyzmq does not. Who's correct?

The jeromq ZMQ.Socket.bind() function eventually comes down to this:

https://github.com/zeromq/jeromq/blob/master/src/main/java/zmq/TcpListener.java#L141

//  Set address to listen on.
public int set_address(final String addr_)  {
    address.resolve(addr_, options.ipv4only > 0 ? true : false);

    try {
        handle = ServerSocketChannel.open();
        handle.configureBlocking(false);
        handle.socket().setReuseAddress(true);
        handle.socket().bind(address.address(), options.backlog);
        if (address.getPort()==0)
            address.updatePort(handle.socket().getLocalPort());
    } catch (IOException e) {
        close ();
        return ZError.EADDRINUSE;
    }
    endpoint = address.toString();
    socket.event_listening(endpoint, handle);
    return 0;
}

Python:

C:\tmp\jeromq\jeromq-0.3.2\target>python
Python 2.7.5 |Anaconda 1.9.1 (64-bit)| (default, May 31 2013, 10:45:37) [MSC v.1
500 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> ctx=zmq.Context()
>>> s=ctx.socket(zmq.PUB)
>>> s.bind_to_random_port('tcp://127.0.0.1')
56356
>>> s2=ctx.socket(zmq.PUB)
>>> s2.bind('tcp://127.0.0.1:56356')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "socket.pyx", line 465, in zmq.core.socket.Socket.bind (zmq\core\socket.c
:4749)
zmq.core.error.ZMQError: Address in use

Source: (StackOverflow)

zmq frames dropping between threads

I have a use case where we are using multithreaded zmq setup. There's a new zmq thread being forked using Zthread.fork. This thread publishes message to a pub socket. We are seeing frames not being received atomically on the sub socket.

The HWM has been set correctly on both PUB-SUB sockets. I suspect the frames being dropped at the PAIR socket that's established between parent and child thread when we do Zthread.fork. However, can't see any API methods to set HWM on socket created by the forked thread.

Any ideas will be appreciated. Thanks.


Source: (StackOverflow)

JeroMQ from time to time yields "IOException. too many files open"

I have a tomcat7 application that communicates with another application of mine using an IPC end point with JeroMQ in Java. There is a client server scheme and the client waits fro sometime for the response from the server and if it does not receive the response fails the first time without retry.

The code is below

@Override
public List<Result> call() throws Exception {
    final List<Result> results = new LinkedList<>();
    try {
        for (DTO dto : messages) {
            Message m = MessageHelper.MessageMapper(dto);

            Thread.sleep(dto.getDelayBeforeSend());
            final Result mtresult = send(dto);
            results.add(result);
        }
    } catch (RuntimeException e) {
        LOGGER.error("Flow => Uncaught Exception: {}", e.getMessage());
        LOGGER.debug("Flow => Uncaught Exception: ", e);
        Thread t = Thread.currentThread();
        t.getUncaughtExceptionHandler().uncaughtException(t, e);
    }
    return results;
}

private Result send(Message m) {
    ZMQ.Socket client = MQSocketFactory.getMQSocket(serverEndpoint).createRequester();
    try {
        final byte[] DTO = Helper.serializeMessage(m);
        int retriesLeft = 1;
        Result result = new Result(MessageConstants.MESSAGE_FAIL);

        while (retriesLeft > 0 && !Thread.currentThread().isInterrupted()) {

            client.send(DTO, 0);
            int expect_reply = 1;

            while (expect_reply > 0) {

                ZMQ.PollItem items[] = { new ZMQ.PollItem(client, Poller.POLLIN) };
                int rc = ZMQ.poll(items, 3000);
                if (rc == -1) break; // Interrupted

                if (items[0].isReadable()) {
                    final byte[] reply = client.recv(0);
                    if (reply == null) break;
                    result = new Result(new String(reply));
                    if (result.isSuccessful()) {
                        LOGGER.trace("Server replied OK. Result: [{}]", result);
                        retriesLeft = 0;
                        expect_reply = 0;
                    } else LOGGER.error("Malformed reply from server: [{}]", result);

                } else if (--retriesLeft == 0) {
                    LOGGER.error("Server:[{}] seems to be offline, abandoning sending message [{}]!", serverEndpoint, m);
                    break;
                } else {
                    LOGGER.warn("No response from server, retrying...");
                    client = MQSocketFactory.getMQSocket(serverEndpoint).resetRequester(client);
                    client.send(DTO, 0);
                }
            }
        }
        return result;
    } finally {
        MQSocketFactory.getMQSocket(serverEndpoint).destroyRequester(client);

    }
}

Now the MQSocketFactory class is like below:

public final class MQSocketFactory {

private static final Map<String, MQSocket> store = new HashMap<String, MQSocket>();

private static final Logger LOGGER = LoggerFactory.getLogger(MQSocketFactory.class);

public static MQSocket getMQSocket(String endpointName) {
    synchronized (store) {
        MQSocket result = store.get(endpointName);
        if (result == null) {
            result = new MQSocket(endpointName);
            store.put(endpointName, result);
        }
        return result;
    }
}

public static final class MQSocket {

    private final String endpoint;
    private final ZMQ.Context ctx;

    private MQSocket(String endpointName) {
        this.endpoint = endpointName;
        this.ctx = ZMQ.context(1);
    }

    public ZMQ.Socket createRequester() {
        ZMQ.Socket client = null;
        try {
            client = ctx.socket(ZMQ.REQ);
            assert (client != null);
            client.connect(endpoint);
        } catch (Exception e) {
            LOGGER.error("Error: {}", e.getMessage());
            LOGGER.error("Error: {}", e);
        }
        return client;
    }

    public ZMQ.Socket resetRequester(ZMQ.Socket socket) {
        destroyRequester(socket);
        return createRequester();
    }

    public void destroyRequester(ZMQ.Socket socket) {
        if (socket != null) {
            try {
                socket.close();
            } catch (Exception e) {
                LOGGER.error("Error: {}", e.getMessage());
                LOGGER.debug("Error: {}", e);
            }
        }
    }

    public ZMQ.Context getContext() {
        return ctx;
    }

    // Responder Unit
    private ZMQ.Socket responder;

    public ZMQ.Socket createResponder() {
        if (responder == null) {
            this.responder = ctx.socket(ZMQ.REP);
            responder.bind(endpoint);
        }
        return responder;
    }

    public ZMQ.Socket resetResponder() {
        destroyResponder();
        return createResponder();
    }

    public void destroyResponder() {
        try {
            responder.close();
        } catch (Exception e) {
            LOGGER.error("Error: {}", e.getMessage());
            LOGGER.debug("Error: {}", e);
        }
    }

}

}

I have done this specifically so every sockets gets closed after the request is done so as to avoid this particular problem with IOExcpetion Too Many file Open. However from very rarely I get this issue and I cannot figure out why. The application maybe working for like days under pretty much the same load and everything being ok but at some points it start throwing the exception and I don't know why.

Also is there a way to increase the ulimit in tomcat7? Right now is 1024.


Source: (StackOverflow)

PUSH with jeroMQ

I think it is something wrong with my zmq.jar so I tried with jeroMQ but I have the same problem.

This is my method:

private boolean submitEvent(String ioMessage) {
    log.info("SEND");

    ZMQ.Context context = ZMQ.context();

    ZMQ.Socket sender = context.socket(ZMQ.PUSH);

    sender.connect("tcp://localhost:8086");

    sender.send("MESSAGE");

    return true;

}

I have a script in python which is PULL and if I try a push script also in python, it receives everything.

So my problem is in java.

I see in logs the first line ("send") but I haven't receive anything in the script.

What should I change?


Source: (StackOverflow)

ZeroMQ port-forwarding in Java

I have a program written in Java that should send messages out via ZeroMQ to be listened and responded to by another program I have running.

The problem is that the listener is on another computer on the local network and I need to somehow perform port forwarding in ZeroMQ in order for the messages to be received across the network.

    ZMQ.Context context = ZMQ.context(1);
    ZMQ.Socket requester = context.socket(ZMQ.REQ);
    requester.connect("tcp://192.168.78.14:5570"); //192.168.78.14 is the address of listening machine

    String msg = "Message";
    requester.send(msg.getBytes());

    byte[] reply = requester.recv();

When the above code is run, the program waits indefinitely for a response (that never comes) and so effectively is unresponsive. The listener is listening on tcp://127.0.0.1:5570 for messages. Setting it to tcp://0.0.0.0:5570 or tcp://*:5570 which supposedly opens listening to all addresses on given port also yields nothing.

Any help is appreciated.


Source: (StackOverflow)

Using polling in jeromq

I am learning to use zeromq polling in android . I am polling on a req socket and a sub socket in the android program(client). So that this client can receive both reply messages from the server and also published messages.

My polling is not working. Both the req socket and the publish socket does not get polled in. If i don't use polling both the sockets receive the message.

I tried searching online but could not find anything relevant. The client code is this :

    public void run()
   {
    ZMQ.Context context = ZMQ.context(1);
    ZMQ.Socket reqsocket = context.socket(ZMQ.REQ);
    ZMQ.Socket subsocket =context.socket(ZMQ.SUB);
    reqsocket.connect("tcp://10.186.3.174:8081");
    subsocket.connect("tcp://10.186.3.174:8083");
    subsocket.subscribe("".getBytes());
    byte[] receivedmessage;
    Poller poller=context.poller();
    poller.register(reqsocket,Poller.POLLIN);
    poller.register(subsocket,Poller.POLLIN);

    reqsocket.send(msg.getBytes(),0); 

    while(!Thread.currentThread().isInterrupted())
     {

        if(poller.pollin(0))
        {
            receivedmessage=s.recv(0);

        }
          if(poller.pollin(0))
          {
            receivedmessage=subsocket.recv(0);

          }
   }
    s.close();
    context.term();

}

Am i missing out something or doing something wrong?


Source: (StackOverflow)

zeromq route-req java example does not work

I run this official Java example in Eclipse, with jeromq-0.3.2.jar library, it doesn't work if I "run" it, it only works if I set some break point and "debug" it.

It seems the message is lost. My own application using route-req pattern has this problem too.

This is their official example, if this doesn't work, what can? can someone try it and figure out why?

http://zguide.zeromq.org/java:rtreq

or codes are here:

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

import java.util.Random;

/**
* ROUTER-TO-REQ example
*/
public class rtreq
{
    private static Random rand = new Random();
    private static final int NBR_WORKERS = 10;

    private static class Worker extends Thread {

        @Override
        public void run() {

            Context context = ZMQ.context(1);
            Socket worker = context.socket(ZMQ.REQ);
            ZHelper.setId (worker);  //  Set a printable identity

            worker.connect("tcp://localhost:5671");

            int total = 0;
            while (true) {
                //  Tell the broker we're ready for work
                worker.send ("Hi Boss");

                //  Get workload from broker, until finished
                String workload = worker.recvStr ();
                boolean finished = workload.equals ("Fired!");
                if (finished) {
                    System.out.printf ("Completed: %d tasks\n", total);
                    break;
                }
                total++;

                //  Do some random work
                try {
                    Thread.sleep (rand.nextInt (500) + 1);
                } catch (InterruptedException e) {
                }
            }
            worker.close();
            context.term();
        }
    }

    /**
     * While this example runs in a single process, that is just to make
     * it easier to start and stop the example. Each thread has its own
     * context and conceptually acts as a separate process.
     */
    public static void main (String[] args) throws Exception {
        Context context = ZMQ.context(1);
        Socket broker = context.socket(ZMQ.ROUTER);
        broker.bind("tcp://*:5671");

        for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++)
        {
            Thread worker = new Worker ();
            worker.start ();
        }

        //  Run for five seconds and then tell workers to end
        long endTime = System.currentTimeMillis () + 5000;
        int workersFired = 0;
        while (true) {
            //  Next message gives us least recently used worker
            String identity = broker.recvStr ();
            broker.sendMore (identity);
            broker.recvStr ();     //  Envelope delimiter
            broker.recvStr ();     //  Response from worker
            broker.sendMore ("");

            //  Encourage workers until it's time to fire them
            if (System.currentTimeMillis () < endTime)
                broker.send ("Work harder");
            else {
                broker.send ("Fired!");
                if (++workersFired == NBR_WORKERS)
                    break;
            }
        }

        broker.close();
        context.term();
    }
}

Source: (StackOverflow)