ONJava.com -- The Independent Source for Enterprise Java
oreilly.comSafari Books Online.Conferences.

advertisement

AddThis Social Bookmark Button

Enterprise Streaming

by Amir Shevat
04/20/2005

The concept of input and output streams was first introduced in Java 1.0. Not to be confused with the concept of multimedia streaming, Java streams serve as a standard way to write data to a destination and to read data from a source. Components such as files, sockets, and even the keyboard and the screen (System.in and System.out) are common examples of destinations and sources that can communicate using input and output streams. In fact, some objects, such as sockets, can be both a source and a destination at the same time.

The Java Message Service (JMS) is a standard way for enterprise applications to communicate with each other in a distributed environment. While this is a well-known and proven method, it is a complex and sometimes cumbersome message-driven framework that lacks some of the power that the simpler stream framework provides. MantaRay, an open source data messaging project (JMS provider) that is based on a peer-to-peer, serverless architecture, has solved this problem by combining the benefits of JMS and streaming.

This article will review the power hidden in the concept of streams and introduce the enterprise stream developed by the MantaRay open source project, a new type of stream whose destination and source are JMS topics and queues.

The Power of Input and Output Streams

Generalization

One of the most powerful features of streams is that whatever the destination or the source may be, the communications API always stays the same. Writing to a socket is the same as writing to a file. A FileOutputStream object may give you additional functionality that is unique to files, but the basic API to read and write still stays the same. The result is that if you do not use any additional functionality, you can easily replace the source/destination without changing your code.

Figure 1 shows how streams are used.

Using streams
Figure 1. Using streams

The stream API to every destination is the same and is quite primitive. For example, the return value of the read() method for the class InputStream is an int in the range 0 to 255. For this reason, Java provides a set of utilities that helps you write more complex data to these streams. These utilities include helper classes like writers and readers, as well as wrapper streams that wrap over the original stream. These utilities will be discussed in the next section.

Stream Wrapping and Changing

Another powerful feature is wrapping one stream over another. If you write data to a file but want the data to be compressed, you simply create a FileOutputStream object and wrap it with a ZipOutputStream object. You then write to ZipOutputStream, which compresses the data and delegates it to FileOutputStream, which writes it to the physical file. Similarly, you can read from that .zip file using the FileInputStream and ZipInputStream objects.

In fact, you can chain multiple streams together. For example, you can wrap a SocketOutputStream object with a CipherOutputStream object in order to encrypt the data, and a ZipInputStream object in order to compress the data. As the data passes from stream to stream, each object performs its own operation on it.

Figure 2 shows an example of wrapping streams.

Wrapping streams
Figure 2. Wrapping streams

MantaRay's Enterprise Streams

Why Are Enterprise Streams Needed?

While most streams work with physical IO components, MantaRay enterprise streams work with JMS queues and topics. JMS is a message-oriented standard for transferring data asynchronously over queues and topics, commonly used in the enterprise environment.

For example, let's take two applications that want to communicate over a queue. One application sends a message to a queue and the other receives the message. Here is the code involved in the process, implemented using JMS 1.02:

// sender code:

javax.jms.QueueConnectionFactory conFactory 
    = new ...// look up in JNDI or create an instance

javax.jms.QueueConnection con 
    = conFactory.createQueueConnection();

// creates a non transacted session with
// automatic acknowledge
javax.jms.QueueSession sendSession 
    = con.createQueueSession(false
        ,Session.AUTO_ACKNOWLEDGE);

javax.jms.Queue sendQueue 
    = sendSession.createQueue (sQueue);

javax.jms.QueueSender sender 
    = sendSession.createSender(sendQueue);

javax.jms.TextMessage msg 
    = sendSession.createTextMessage();

msg.setText( "some text" );

sender.send( msg,
    javax.jms.DeliveryMode.NON_PERSISTENT,
    javax.jms.Message.DEFAULT_PRIORITY,
    MESSAGE_TTL);

// receiver code:

javax.jms.QueueConnectionFactory conFactory 
    = new ... // look up in JNDI or create an instance 

javax.jms.QueueConnection con 
    = conFactory.createQueueConnection();
        
// creates a non transacted session with
// automatic acknowledge
javax.jms.QueueSession receiveSession 
    = con.createQueueSession(false
        ,Session.AUTO_ACKNOWLEDGE);

javax.jms.Queue receiveQueue 
    = receiveSession.createQueue (rQueue);

javax.jms.QueueReceiver qReceiver 
    = receiveSession.createReceiver(receiveQueue);

javax.jms.TextMessageMessage 
    =(TextMessageMessage)  qReceiver.receive();

As you can see, not only is this code a bit complex, it is also message-oriented and not stream-oriented. When a user writes data into an enterprise stream, the stream cuts the data into packets and wraps them in a JMS message envelope. It then sends the message on the predefined queue or topic, where it is processed by the input stream, which unwraps the data and makes it available for reading by the user at its destination.

Figure 3 shows how data is processed in MantaRay enterprise stream.

The process of data in MantaRay enterprise
stream
Figure 3. The process of data in MantaRay enterprise stream

Because enterprise streams extend the InputStream and OutputStream objects, in the same way a socket or file Input/OutputStream object does, you can use them to utilize all of the powers of streaming:

  • They are easy to use because they provide the same interface all streams share.
  • You can wrap them with other streams to archive extended functionality such as compression and encryption.
  • You do not have to worry about breaking the data into packets, buffer allocation, or any other low-level issue regarding data delivery.

Working with Enterprise Streams

JMS queues are all about point-to-point communications. When using an enterprise stream over a queue, there should be only one output stream and only one input stream on the same queue. Although most JMS providers, including MantaRay, enable multiple producers and consumers on the same queue, this is not defined in the JMS specification, and is thus considered misuse when working with enterprise streams over a queue. The reason for this is that a message on a specific queue is delivered to one and only one consumer; therefore, multiple consumers may "steal" vital data from one another.

JMS topics define many-to-many communication. With enterprise streams that use a topic as the destination or source, there can be multiple subscribes but only one publisher on the same topic. The reason for this is that otherwise, multiple publishers on the same topic will get their data scrambled and create senseless output for the subscribers.

However, the fact that enterprise streams over topics can have multiple subscribers enabling one-to-many communication provides a broadcast-like ability. The user of the output stream does not need to manage a separate output stream for each peer. Instead, the user simply writes the data to one output stream and the data packets are received by all of the topic's subscribers.

Enterprise streams bind to a topic or queue by using the connect method. Only after the stream is connected can you write to it or read from it. When streaming over a queue, the data produced by the output stream is stored in the queue until it is consumed by the input stream. Because of this ability to store data, the order of connection is not important. The output stream can connect and send data, and the input stream can then connect at any time to receive all of the data from the beginning of the stream. This is not the case when using topics as an input stream, as topics will only receive data produced after they have connected to the stream. While this may not be a problem in some cases (for example, with a continuous CPU usage report), it may be important in others cases (for example, in the case of file transfers).

MantaRay Enterprise Streaming Example

Graph Feeder-Viewer

Consider a graph feeder component that generates continuous data--anything from memory usage, to stock prices, to factory output, to the number of Martians on earth at the moment. This data needs to be displayed in several different locations in the form of a dynamically changing graph called the graph viewer.

Because the data is continuous and the communication is of the one-to-many type, this task can be easily solved by using MantaRay enterprise streams.

Figure 4 shows the graph view component.

Graph view component
Figure 4. Graph view component

Below is a short code example of the graph feeder component:


import org.mr.api.blocks.MantaOutputStream;
...
/**
 * creates random data and sends it to the graph viewer 
 */
public class GraphFeeder {

    public static void main(String[] args)
        throws Exception {

        // create an enterprise output stream with  
        // a 4 bytes packet hint, the hint helps 
        // the stream cut the info into packets
        MantaOutputStream out = 
            new MantaOutputStream(4);

        // connect the enterprise output stream to 
        // a topic called graph
        out.connect("graph", 
            MantaOutputStream.TOPIC);

        // wrap the data in a DataOutputStream 
        // so we can easily write integers  
        DataOutputStream dos = 
            new DataOutputStream(out);

        int currentStatus = 0;

        for(int rounds =0 ;rounds < 30000 
            ;rounds++  ){

            // generate random fluctuations
            // in the graph 
            int rand =(int) 
                (System.currentTimeMillis()%777);

            if(rand%2 ==0){

                currentStatus++;

            }else{

                currentStatus--;

            }

            // write data to the stream
            dos.writeInt(currentStatus);

            // sleep for a while before generating
            //  more random data
            Thread.sleep(rand/3);

        }
    }
}


Below is a short code example of the graph viewer component:

import org.mr.api.blocks.MantaInputStream;
...
/**
 * Shows the data in from
*  the input stream in a graphical way
 */
public class GraphViewer {
    // runs the program
    public static void main(String[] args) 
        throws IOException {

        // init the program
        GraphViewer view = new GraphViewer();
                
        view.init();
    }
        
        // the program logic
        public void init() throws IOException{
        // init graph view
        InitGraph();

        // create an enterprise input stream
        MantaInputStream in = 
            new MantaInputStream();

        // connect the enterprise output stream to
        // a topic called graph 
        in.connect("graph", 
            MantaOutputStream.TOPIC);

        //wrap the data in a DataOutputStream so 
        // we can easily read integers 
        DataInputStream dis = 
            new DataInputStream(in);

        int input =0;

        boolean go = true;

        while(go){
  
            try{

                //read data
                input=dis.readInt();

            }catch(IOException e){

                e.printStackTrace();

                go =false;

            }

            // update the graph
            updateGraph(input);
        }
    }

}


This example shows the ability to wrap one stream around another. As you can see in this example, a DataInputStream and a DataOutputStream were used to write the data in the form of an integer. The full code for the graph viewer example can be found in the sample folder of MantaRay's latest release. This article simplified the code example to make it more suitable for an article format.

JMS provides enhanced features, such as selectors, that may be useful for some tasks and are not provided by the enterprise stream. In addition, message-oriented tasks may be done more simply using the JMS API. This example shows how a simple stream-oriented task such as this one can benefit from the simple stream-oriented API of MantaRay's enterprise InputStream and OutputStream.

Conclusion

Combining the power of JMS and streaming can be very powerful. Applications can use all of the powerful capabilities of streams while still using the J2EE framework as a communications infrastructure. Streams are also very useful for users who do not want to work with JMS objects, as they can sometimes be complex and cumbersome.

Enterprise streams are part of MantaRay's set of "building blocks"--simple and intuitive utilities that simplify the process of writing a distributed application and extend the capabilities MantaRay provides for distributed application communication.

Resources

Amir Shevat is a senior software developer with eight years of experience in computing.

Return to ONJava.com.