ONJava.com    
 Published on ONJava.com (http://www.onjava.com/)
 See this if you're having trouble printing code examples


Interruptible Database Queries Interruptible Database Queries

by Slav Boleslawski
06/16/2004

One typical software design goal is responsiveness, understood as how easy and quick it is for the user to interrupt the current operation. Certain operations -- such as complex database queries; network I/O handling; extensive calculations; sorting of, or searching in, large data sets -- can take seconds or even minutes before they complete. Well-designed software allows the user to cancel such a long operation in progress. In this article I will demonstrate how to cancel a time-consuming database query by simply interrupting the thread in which the query runs. Such an interruptible database query will enable you to develop truly interactive programs that respond promptly even to the most impatient users.

A typical database query consists of four steps: creating a connection, creating a statement on this connection, obtaining a ResultSet object by executing the query on the statement, and, finally, retrieving data from the ResultSet object:

Connection conn = createConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(query);
while(rs.next()) {
    // retrieve data from rs
}

The first two steps do not require much time to execute. Usually, it takes at most a couple of seconds to establish a database connection. In practice, a connection is obtained from a pool of reusable connections and this step happens very quickly.

The bulk of the execution time belongs to the last two steps, and I will focus our attention on them.

Interruptible Query Execution

The executeQuery() method of Statement runs synchronously and it can be interrupted by invoking the cancel() method on the same statement object from another thread (see the Statement.cancel() API). We will create a new class called QueryExecutor, which will have a method similar to the executeQuery() of Statement, but can be cancelled by invoking the interrupt() method on its executing thread. So instead of:

ResultSet rs = stmt.executeQuery(query);
while(rs.next()) {
    // retrieve data from rs
}

we will have:

QueryExecutor executor = 
    new QueryExecutor(stmt, query);
ResultSet rs = executor.executeQuery();
while(rs.next()) {
    // retrieve data from rs
}

The QueryExecutor class uses a "worker" thread to execute a query. Its executeQuery() method only passes query parameters to the worker, and then waits for the query to complete.

The steps of code execution in executeQuery() and the worker need to be carried out in the following order:

The above order is achieved by using thread synchronization:

public synchronized ResultSet executeQuery(
        Statement statement, String query) 
        throws SQLException, 
        InterruptedException {
    //Set query parameters
    synchronized(params) {
        params.statement = statement;
        params.query = query;
        params.pending = true;
        params.notify();
    }
    
    synchronized(results) {
        try {
            //Wait for the query to complete
            while(!results.serviced) {
                results.wait();
            }
            if (results.exception != null) {
                throw results.exception;
            }
        } catch (InterruptedException e) {
            cancel();
            throw e;
        } finally {
            results.serviced = false;
        }
        return results.rs;
    }
}
//The implementation of the Runnable interface (for the worker thread)
public void run() {	
    ResultSet rs = null;
    SQLException ex = null;
    while(!closeRequest) {
        synchronized(params) {
            try {
                //Wait for query parameters
                while(!params.pending) {
                    params.wait();
                }
                params.pending = false;
            } catch (InterruptedException e) {
                if (closeRequest) {
                    return;
                }
            }
            //Execute query
            try {
                rs = params.statement.executeQuery(
                    params.query);
            } catch (SQLException e) {
                if (!cancelRequest) {
                    ex = e;
                }
            }
        }

        //Set query results
        synchronized(results) {
            results.rs = rs;
            results.exception = ex;
            results.serviced = true;
            results.notify();
        }
    }
}

QueryExecutor defines two private classes that help to keep the code cleaner. The Params class encapsulates query parameters:

private class Params {
    public Statement statement;
    public String query;
    public boolean pending;
}

and the Results class holds query results:

private class Results {
    public ResultSet rs;
    public SQLException exception;
    public boolean serviced;
}

In the beginning, the executeQuery() method sets query parameters in a block synchronized on the params object (created in the constructor of QueryExecutor). The worker waits for these parameters in a block also synchronized on the same params object. executeQuery() sets a Params.pending flag, notifies the worker, and waits for the query to complete, which is signalled by a serviced flag of Results.

The worker executes the query, and upon completion sets query results. If the query completes successfully, the worker sets a ResultSet object (rs). In case an exception is thrown during query execution, the worker sets an exception object of Results.

While the worker is busy executing a query, the executeQuery method is waiting on its wait() call. This is unblocked by the notify() call made by the worker when it completes its job. When executeQuery() wakes up, it first checks if an exception has been thrown in the worker during query execution. If so, it rethrows this exception. If not, the executeQuery() method returns a ResultSet object to the caller.

Let's have a closer look at what happens when the thread in which executeQuery() runs is interrupted. The executeQuery() method spends almost all of its time waiting for a query to complete. When an interruption occurs, the results.wait() method throws an InterruptedException exception. This exception should percolate to the caller of executeQuery(), but before this happens, a cancel() method is called to stop the running query:

private void cancel() {
    cancelRequest = true;
    try {
        params.statement.cancel();
        synchronized(results) {
            while(!results.serviced) {
                results.wait();
            }
        }
    } catch (SQLException e) {
        return;
    } catch (InterruptedException e) {
        return;
    } finally {
        cancelRequest = false;
    }
}

This method calls the cancel() method of Statement and waits for the query to terminate.

When the executing query is cancelled in this way, its statement throws an SQLException. In order to differentiate between this exception and other exceptions thrown because of some exceptional conditions during query execution, we use a cancelQuery flag, which is set when the statement is cancelled. Please note that when an SQLException is thrown in the worker, the exception object of Results is set only if the exception was not caused by a statement cancellation:

try {
    rs = params.statement.executeQuery(
        params.query);
} catch (SQLException e) {
    if (!cancelRequest) {
        ex = e;
    }
}

Interruptible Data Retrieval

Above, we have shown a way to interrupt an executing query. When the query completes, it returns a ResultSet object. In this section, we will demonstrate how retrieving data from a ResultSet can be interrupted. In this way, no matter whether an interruption occurs during query execution or data retrieval, our database query will be responsive and will return promptly as a result of an interruption.

Once a ResultSet object is obtained, retrieval of data follows this simple pattern:

ResultSet rs = executor.executeQuery();
while (rs.next()) {
    //Get some/all column values
}

For demonstration purposes we keep the retrieved data in a list of lists, where each list element is a List of column values:

ResultSet rs = executor.executeQuery();
int columnCount = 
    rs.getMetaData().getColumnCount();
List data = new ArrayList();
while (rs.next()) {
    List row = new ArrayList(columnCount);
    for(int i = 1; i <= columnCount; i++) {
        row.add(rs.getObject(i));
    }
    data.add(row);
}

We would like to be able to interrupt the above data retrieval in our quest for responsive database access. This goal can be easily achieved by testing inside of the while loop to see if the thread that executes this code has been interrupted by another thread:

while (rs.next()) {
    if (Thread.interrupted()) {
        throw new InterruptedException();
    }
    //data retrieval code
}

We check the current thread's interrupted condition with every row of data retrieved. An obvious question that comes to mind is, "What's the penalty for this additional check; how much overhead we introduce?" A simple test on my Pentium 1GHz PC shows that it takes one second to do 1 million checks, so it is sufficiently fast and we can neglect the overhead it introduces to the code above.

In the code that accompanies this article, a SingleThreadDatabaseQuery class implements the above ideas in its getRecords() method. The class implements a DatabaseQuery interface for demonstration purposes only, to contrast its implementation of the getRecords() method with its multi-threaded counterpart described in the next section.

The getRecords() method of SingleThreadedDatabaseQuery is synchronized. This is necessary because the number of concurrent Statements created from the same Connection can be different for different database drivers. Some drivers, such as the ODBC-JDBC bridge, which is part of the standard Java distribution, allow only one statement per connection. Without synchronization, the getRecords() method would throw an exception if another thread attempted to create a statement while the current thread was still executing the getRecords() code.

Multi-Threaded Database Queries

A MultiThreadedDatabaseQuerier class contains a multi-threaded version of the getRecords() method. This class' constructor obtains the maximum number of concurrent statements allowed:

imaxStatements = 
    conn.getMetaData().getMaxStatements();
if (maxStatements == 0) {
    maxStatements = Integer.MAX_VALUE;
}

The DatabaseMetaData API says that if a zero value is returned by its getMaxStatements() method, it can mean either unlimited or unknown number of concurrent statements. We adopt an optimistic interpretation and assume an unlimited number of statements, in this case. More cautious readers should replace the Integer.MAX_INTEGER value with the value of 1.

The MultiThreadedDatabaseQuerier class uses the maxStatements value to limit the number of threads executing getRecords() concurrently. The class implements a mechanism called counting semaphore, which is a convenient synchronization construct used to manage pools of limited resources. The semaphore that the class defines has two methods: acquire(), which blocks when the number of currently used statements reaches the maxStatements limit, and release(), which increases the pool size and notifies any waiting threads in the acquire() method.

One more change has been made to getRecords() to improve its concurrency. Since the executeQuery() method of QueryExecutor is synchronized, the getRecords() method uses a pool of QueryExecutor objects so that concurrent threads do not have to wait when competing for a single QueryExecutor object. The MultiThreadedDatabaseQuerier class implements a simple pool based on a LinkedList. The list of QueryExecutor objects is initialized in the class constructor. A getQueryExecutor() method waits until the pool is non-empty, then removes the first element from the list. After getRecords() completes its data retrieval, it returns the executor back to the pool in the releaseQueryExecutor() method.

Testing

The code for this article contains a test class called QuerierTest to show a getRecords() method in action. The reader should substitute the values of static variables in QuerierTest with the values appropriate to the reader's environment. A database Connection can be obtained through an ODBC-JDBC bridge on the Windows platform (openOdbcConnection()), or through a JDBC driver (openJdbcConnection()) for other platforms. The openJdbcConnection() method uses a free JDBC driver, jTDS 0.7.1, from SourceForge, which can connect to a Microsoft SQL Server or Sybase. If your database server is different, you need to download a proper JDBC driver from your database vendor's web site.

If you decide to use the jTDS driver, you need to download it from the SourceForge web site and include the jtds-0.7.1.jar file in your CLASSPATH:

java -cp .;LIB_DIR\jtds-0.7.1.jar test/QuerierTest

where LIB_DIR is the directory where the jtds-0.7.1.jar file was saved.

The test in QuerierTest opens a database connection, creates a DatabaseQuerier, and spawns 10 threads. Each thread executes the getRecords() method of DatabaseQuerier with the same query string. The reader should comment out/uncomment the appropriate lines in the code to use either the ODBC-JDBC bridge or the JDBC driver, and to test either SingleThreadedDatabaseQuerier or MultiThreadedDatabaseQuerier. A single-threaded version of a querier produces a deterministic output when QuerierTest runs:

Threads created
Threads started
Threads 0,2 interrupted
    InterruptedException in thread 0
Thread 1 completed
    InterruptedException in thread 2
Thread 3 completed
Thread 4 completed
Thread 5 completed
Thread 6 completed
Thread 7 completed
Thread 8 completed
Thread 9 completed

A multi-threaded version of a querier produces an output that, on my computer, looks like this:

Threads created
Threads started
Threads 0,2 interrupted
    InterruptedException in thread 0
    InterruptedException in thread 2
Thread 1 completed
Thread 8 completed
Thread 7 completed
Thread 6 completed
Thread 5 completed
Thread 9 completed
Thread 4 completed
Thread 3 completed

A multi-threaded querier is suitable in situations when multiple queries need to execute concurrently using the same database connection. This querier version provides more responsive solution over a single-threaded querier, although it comes at a cost of slightly slower execution.

Often, database queries work in the context of a GUI system. Usually, these queries are initiated by the user through action events, such as mouse clicks or key presses. Since such events execute on the event-dispatching thread, which also executes repainting code, it is important that time-consuming database queries run in a background thread. Only then the GUI will maintain its responsiveness. Once query results are obtained, any changes to the GUI should be done on the event-dispatching thread to keep them in sync with other events and repainting requests. A Swing Worker is an excellent candidate for this sort of job. It can run a piece of code asynchronously in a background thread (our database query) and then can execute another piece of code on the event-dispatching thread (the GUI updates). It also provides a way to interrupt the background thread (stopping the running query).

Resources

Slav Boleslawski is a software engineer and database designer living in Sydney, Australia.


Return to ONJava.com.

Copyright © 2009 O'Reilly Media, Inc.