Making MySQL Queries Asynchronous in Tornado

Posted by justin on June 10, 2011 Chartio, Features

At Chartio, we really like rocking Tornado as part of our stack. Tornado makes writing python web apps easy and straightforward to understand. Chartio’s architecture, however, requires connecting to MySQL database servers and executing long-running queries against them. This is the sort of thing that the current MySQL driver chokes on in a Tornado setup – SQL queries block until the results are retrieved. While this isn’t a big deal in nearly all production environments (you want to know when your database gets slow, so you can optimize queries or add indexing), it matters a lot to us because a slow query can affect other customers queries. There are some ways to alleviate the pressure, for example, by using many tornado processes but multiprocesses will only get you so far.

With these constraints in mind, we’ve developed a way to make non-blocking queries to MySQL connections. Keep in mind, there are some caveats here – the connection process and result retrieval will still be blocking, but the actual queries themselves can be run in a non-blocking manner. Since we’re performing data analytics, this is a terrific way to optimize resources for us – the queries are almost always our bottleneck, and the results are normally small.

Technical Details

So how do we do it? While looking into how the python driver actually connects to MySQL, I noticed that it uses the the mysql_real_query function call, as defined in sql-common/client.c of the mysql development api. This particular function looks like this:

    mysql_real_query(MYSQL *mysql, const char *query, ulong length)
    {
      DBUG_ENTER("mysql_real_query");
      DBUG_PRINT("enter",("handle: 0x%lx", (long) mysql));
      DBUG_PRINT("query",("Query = '%-.4096s'",query));

    if (mysql_send_query(mysql,query,length))
        DBUG_RETURN(1);
      DBUG_RETURN((int) (*mysql->methods->read_query_result)(mysql));
    }

You’ll notice that there are 2 primary parts of this call: mysql_send_query and read_query_result. This is very interesting, because it implies that there are 2 steps to querying the back end, and sending the actual query isn’t blocking. The read_query_result command waits until the query is complete before retrieving results. By watching the MySQL connection file descriptor, we can tell when it’s readable and that the query has completed.

Our approach requires modifying the (excellent) python mysql drivers to expose some functions from the mysql layer. First, we need the file descriptor of the database connection to be exposed, so we can poll it using the Tornado ioloop:

Add to the :

    static MyMemberlist(_mysql_ConnectionObject_memberlist)[]

This member:

    MyMember(
             "fd",
             T_UINT,
             offsetof(_mysql_ConnectionObject,connection.net.fd),
             RO,
             "File descriptor of the server connection"
             ),

Next, we need to expose the mysql_send_query and read_query_result functions to the database connection object in python:

    static PyObject *
    _mysql_ConnectionObject_send_query(
        _mysql_ConnectionObject *self,
        PyObject *args)
    {
        char *query;
        int len, r;
        if (!PyArg_ParseTuple(args, "s#:query", &query, &len)) return NULL;
        check_connection(self);
        r = mysql_send_query(&(self->connection), query, len);
        if (r) return _mysql_Exception(self);
        Py_INCREF(Py_None);
        return Py_None;
    }

    static PyObject *
    _mysql_ConnectionObject_read_query_result(
        _mysql_ConnectionObject *self,
        PyObject *args)
    {
        int r;
        r = self->connection.methods->read_query_result(&(self->connection));
        if (r) return _mysql_Exception(self);
        Py_INCREF(Py_None);
        return Py_None;
    }

And then add them as methods to the object:

    {
            "send_query",
            (PyCFunction)_mysql_ConnectionObject_send_query,
            METH_VARARGS,
            _mysql_ConnectionObject_send_query__doc__
        },
        {
            "read_query_result",
            (PyCFunction)_mysql_ConnectionObject_read_query_result,
            METH_VARARGS,
            _mysql_ConnectionObject_read_query_result__doc__
        },

Putting it all together

We now have the necessary hooks into the c layer to use this from within python. We subclass the connection class as such:

    class NBConn(Connection):
        """ Non-blocking basic select queries
            This is only non-blocking for the query part. It assumes the
            database is churning on the query itself, not the number of
            rows or establishing the connection.
            Retrieving rows could potentially be made non-blocking as
            well.
        """

    def nb_query(self, query, callback, on_error=None, args=None):
            """ Non-blocking query. callback is function that takes list
                of tuple args """
            self.send_query(query)
            ioloop.IOLoop.instance().add_handler(self.fd,
                self.cb_factory(callback, on_error), ioloop.IOLoop.READ)

    def cb_factory(self, callback, on_error=None):
            """ Returns a function that handles the ioloop call back """

    def cb(fd, ev):
                res = []
                try:
                    self.read_query_result()
                    # Collect results
                    result = self.use_result()
                    while True:
                        row = result.fetch_row()
                        if not row:
                            break
                        res.append(row[0])
                    # Fire callback with results
                    callback(res)
                except Exception, e:
                    if on_error:
                        return on_error(e)
                    else:
                        raise e
                finally:
                    self.nb_cleanup()
            return cb

    def nb_cleanup(self):
            ioloop.IOLoop.instance().remove_handler(self.fd)
            self.close()

Verifying the concept

We can test this using a basic tornado app that runs a slow query… in this case a predicable 1 second sleep query like so:

    class MainHandler(tornado.web.RequestHandler):

    @tornado.web.asynchronous
        def get(self):
            self.conn = mysql.NBConn(
                host='127.0.0.1',
                user='dbuser',
                passwd='dbpasswd',
                db='fakedb',
                port=3306
            )
            self.conn.nb_query('SELECT sleep(1);', self.query_done)

    def query_done(self, res):
            self.finish(json.dumps(res))

    def handle_error(self, exception):
            self.finish({'error': str(exception)})

Normally, this tornado application would only be able to deliver 1 request per second, even with concurrent connections. Hitting this with apache benchmark to make many simultaneous connections:

    ab -c 100 -n 10000 127.0.0.1:8888

Gives us:

Requests per second: 97.73 [#/sec] (mean)

Ah, that’s more like it.

Conclusions

So, if you want to use Tornado and have database connections that are truly bound by query execution speed like ours, there’s no reason you can’t do non-blocking queries. Giant caveats abound – if you have large amounts of data to retrieve or the connection takes a long time to establish, this probably isn’t for you. For Chartio, however, it means we can handle databases as big as you can throw at us, and we’ll take it in stride.

I’ll publish the code to this driver in a complete form when we’re confident that it’s stable, but we wanted to share our findings with anyone who’s having a problem like this themselves. And if you want to work on these sorts of problems with us at Chartio, please contact us as jobs@chartio.com. We’re looking for Python and Javascript generalists who like solving hard problems in pragmatic ways.

Notes

After coding this up, I found that this approach had roughly been done at the c level via this site: http://jan.kneschke.de/projects/mysql/async-mysql-queries-with-c-api/