The AppScale TaskQueue Implementation with RabbitMQ

Background processing in Google App Engine (GAE) is possible thanks to the Task Queue API. This API allows developers to run asynchronous tasks through web posts. Until recently, the Task Queue API in AppScale (the open source implementation of GAE) was implemented using the GAE SDK implementation which consisted of a local thread dispatching requests and doing exponential backoff upon failures. For many reason this implementation was not correct in a distributed setting such as AppScale where there are multiple application servers running the same application. These reasons include not tracking state of tasks across nodes, keeping track of tasks names to prevent tasks fork bombs, and the proper load balancing of tasks across application servers and nodes. 

The AppScale implementation uses RabbitMQ as its engine for message passing in a distributed setting. We've found that RabbitMQ is well documented, has a relatively low memory footprint, and is very stable. The setup was simple for a distributed cluster with features such as acknowledgement of messages, high availability, and message durability. 

Rabbitmq

The figure above shows how tasks are distributed between nodes and application servers. Each machine may have multiple applications servers running either the same app or different apps. Within an application server (right box of the figure) we see there is a separate thread which listens for incoming tasks. When a message is received, this thread will then post to the local load balancer (orange arrow), where it will be balanced across one of the many application servers handing said application. If all application servers are unavailable, then the task will be re-enqueued and its retry number, which is stored in the header, will be incremented. Each time a failure occurs, the amount of time to backoff is a random number of seconds between 0 and 2^n, where n is the number of failures thus far.

Tasks names and state are stored in the datastore. There are three states a task can be in: running, completed, and error. Currently, only a single queue is used and features like rate limiting and task deferment are not implemented, but on the roadmap. Task deferment is not trivial as RabbitMQ does not support it delaying messages. This is the main feature currently lacking but is in the works.

The load balancer in AppScale is based on a combo of Nginx and HAProxy. HAProxy gives us the capability to queue up requests and serve them to the next available application server, while also doing health checks. Nginx gives the capability to do SSL and static file serving (spares the application server to handle only dynamic requests). 

The RabbitMQ Server runs on each node configured in a cluster. Any client that listens in on a particular queue will receive a message in a round robin fashion. If a client has a task and it fails, RabbitMQ will automatically distribute it to another client, providing fault tolerance. Our testing shows this works as promised. Thus far we are very happy with how RabbitMQ has been performing and encourage you to try it out for your message passing needs. 

This implementation will be in the upcoming AppScale 1.6 release. 

 

How to effectively use Range Queries in Cassandra, Hypertable, or HBase

Here is a quick and dirty tutorial on how to do range queries in your favorite BigTable clone datastore (although Cassandra is a BigTable/Dynamo hybrid). Depending on how you set your keys you can do some fun stuff (like your own secondary indexes). Lets say you have the following keys in the same keyspace:

my_app/logs/date=some_date1
my_app/logs/date=some_date2
my_app/logs/date=some_date3
my_app/records/employee/name=alice
my_app/records/employee/name=bob
my_app/records/employee/name=claris
my_app/records/employee/name=zed
your_app/logs/date=some_date1
your_app/logs/date=some_date2
your_app/logs/date=some_date3
your_app/records/employee/name=adam
your_app/records/employee/name=alice
your_app/records/employee/name=bob
your_app/records/employee/name=claris
your_app/records/employee/name=zed

Now let's say we want the entire keyspace for just your app. We would set the start key to "your_app/" and the end key to "your_app/~" where '~' is the last character in the ascii table (http://www.asciitable.com/). Note that if your keys have non-ascii characters, your end character would be different. 

If you want all the records from your app you would use
the start key set to "your_app/records/" and end key set to "your_app/records/~"

If you want just records from your app that have a name that starts with "a" then
start key would be set to "your_app/records/employee/name=a" and end key set to "your_app/records/employee/name=a~"

In Cassandra you'll get better performance if you are using lexicographical key partitioning, as opposed to random partitioning. With lexicographical partitioning the keys will be grouped together for far more efficient scans. You can set this in your configuration file, or during runtime through the system manager. 

Now go do some range queries. 

Google App Engine Blobstore API and AppScale Implementation

Google App Engine's Blobstore API is the primary method of storing large objects. This blog post talks about the API and how it is implemented in AppScale.

Google App Engine Blobstore Upload
There are two methods of getting blobs uploaded, one is the Files API, in which you directly supply a large binary object programmatically, and the other is via an HTML form. When uploading a file via a form an upload link must be created:
   upload_url = blobstore.create_upload_url('/upload')
This url becomes the action path in your HTML form. The upload url will actually redirect the browser client to another App Engine application which handles the upload directly from the user's browser. If you try to upload a file with a bad session, you'll see this application report an error (http://temporary-blobstore-error.appspot.com).

Behind the scenes it could be storing the blob in the Google File System (GFS) or as blocks into Megastore/BigTable. The '/upload' path tells Google where to send the blob's information after it has been successfully uploaded. The upload handler will get a POST from the blobstore application with the file swapped out for a blob info (BlobInfo) object. This object has information such as the file's name, creation date, extension, and size. The POST also contains other elements from the form. These are simply forwarded on. A direct link for hosting images can be attained from your blob:
image_url = images.get_serving_url(blob_key)
The image url will be hosted on the same hosting platform as Picassa (gghpt.com) providing high availability.

Blob Download
Downloading is as simple as providing a BlobKey (stored within a BlobInfo object):
BlobInfo.get(blob_key)
Or if you are serving up an image, just provide the image url. 

AppScale Implementation
There are three components for the blobstore service in AppScale.
  1. Application server (Modified GAE SDK)
  2. Blobstore server (tornado server)
  3. Datastore (AppScale supports a multitude of datastores)

The application server is single threaded (although multiple instances/processes run on all machines) and we don't want an application server to get tied up handling uploads. Therefore we have a tornado server to handle these uploads, and it does so across all applications. 

Blobstore
Let's step through the above workflow of how blobs are uploaded within AppScale. 
  1. The user requests a web page which has an upload file form
  2. The application will create a blobstore session
    1. Store the session info into the datastore (prevents unauthorized uploads)
    2. Create a unique path to the blobstore server running on port 6106 (blob in alpha-numeric)
  3. The action path of the HTML form contains the path from step 2.2
  4. When the user submits the form, it goes to the blobstore server
  5. The blobstore server interacts with the datastore
    1. Verify the session
    2. Store a BlobInfo object
    3. Store the uploaded file in 1MB chunks
    4. Remove the session
  6. A POST is done to the success path given in step 2
    1. Any uploaded files are replaced with their BlobInfo entity
    2. All other form elements are forwarded
  7. The success path handler must do a redirect 
  8. The redirect is forwarded to the user client

Application Example
Blobstore Example source code: http://tinyurl.com/3n8fjuj

Additional Resources
AppScale Blobstore Server: http://tinyurl.com/3tue8dk

-- Raj

 

AppScale 1.5

Hello Everyone,

The RaceLab is proud to present AppScale 1.5. In this release we have the following updates:
  • Support for the bulkloader, enabling uploading and downloading of your data
  • Upgraded Java and Python AppServers to GAE 1.4.3
  • Support for Go App Engine apps (SDK version 1.5.0), including support for apps that use multiple processes
  • Fault tolerance for almost all services (processes monitored and revived by god)
  • Faster startup and termination of AppScale, especially over larger numbers of nodes
  • Tools and image now verify that all instances used have AppScale installed
  • EC2 and Eucalyptus credentials are now obscured when they are printed to logs
  • Channel API for Python (multiple receivers can also be used) - implemented via Strophe.js
  • Blobstore and Files API for Python
  • XMPP API for Python - implemented via ejabberd
  • Hybrid cloud support - run AppScale over multiple clouds in a single deployment (e.g., Eucalyptus and EC2, EC2 East Coast and EC2 West Coast)
  • Neptune language support
  • Table caching for MySQL, HBase, Hypertable to improve performance
  • Updated interface for Amazon SimpleDB
  • Upgraded Cassandra version used to 0.7.6-2
  • Upgraded HBase version used to 0.89
  • Upgraded Hadoop version used to 0.20.2
  • Upgraded Hypertable version used to 0.9.43
  • Namespacing support
  • Added Loki, a fault tolerance tester along the lines of Netflix's Chaos Monkey
  • User authorization system for MapReduce, EC2, and Neptune APIs
  • Ability to remove transaction overhead via namespaces
  • Various other bug fixes
  • Xen, KVM, and Eucalyptus image available for download
  • Revamped and simplified wiki documentation
  • Updated home page
  • New EC2 AMI: ami-e554938c

We want to thank the AppScale team and our contributors for their hard work. 

Thank you for your interest in AppScale.

-Raj

My Notes on Fantasm for Google App Engine

Fantasm is a Google App Engine library which abstracts away TaskQueues by configuring work flows as finite state machines. Other comparable projects include the Pipeline API and the MapReduce API. Fantasm is great for processing large amounts of data which cannot be done normally due to timeout constraints.

Configuration
Hook fantasm up into your app.yaml file.

- url: /fantasm/.*
  script: fantasm/main.py
  login: admin

State machines are specified in a fsm.yaml file. In the file you give your state machine a name and individual states and transitions.

State machines have a single starting state and can have multiple final states.

Each state's execution ends with that state emitting a string to signify what the next state should be.

Make sure you use the full path of the action class. Example:

  - action: serverside.computations.InitialClass

Otherwise you'll get a ModuleNotFound error.

Communication Between States
"context" is passed from from one state to another and done so by arguments in the url. By default you should just pass strings and not send more context than can fit in a single POST request.

Communication Internal to a State
"obj" is passed from doing a continuation to the actual execution of a state. The "obj" is not serialized between states.

Advance Settings
It is possible to fork off a new process by calling context.fork(data=dictionary_of_new_context).

Be Careful
Make sure you have non-idempotent statements (statements with side effects, like updating an entity in the datastore) are done last. There probably still are some race conditions even if you do this, but they should be rare. Use locks via memcache to ensure there are none.

All states with continuation should also have final as a potential state. This is needed for the execute method for the case of no results in the query.

When is your job done?

Right now there is no way to get a callback or a trigger that a job is done.

Useful Iteration

The documentation on the Google Article Site does not talk about this method which shows up in the testing code. This method does not require you to use cursors as when using the continuation function. Here's how to count up all the accounts for your application if your application is really popular (otherwise it might be best to just use count() for on the query):

from fantasm.action import FSMAction, DatastoreContinuationFSMAction

class AllAccountsClass(DatastoreContinuationFSMAction):
  def getQuery(self, context, obj):
    return Accounts.all()

  def execute(self, context, obj):
    if not obj['result']:
      return None
    return "peraccount"

# Fan in here every X seconds

class CountAccountsClass(FSMAction):
  def execute(self, contexts, obj):
    """Transactionally update our batch counter"""
    batch_key = "num_accounts"

    def tx():
      batch = Batch.get_by_key_name(batch_key)
      if not batch:
        # For whatever reason it was not already created in previous state
        batch = BadgeBatch(key_name=batch_key)
        batch.put()
      batch.counter += len(contexts)
      batch.put()
    db.run_in_transaction(tx)

 

What Does Your State Machine Look Like?

See your state machine by going to the url: fantasm/graph/<state_machine_name>

It uses the google chart API.

Fanning In

You can have a state where you specify in the fsm.yaml file to accumulate context every X seconds (fan_in: X). In your execute function you'll have a contexts or list_of_contexts variable where you can get just the length (or more from each context if need be). Then inside a transaction increment some counter.

Code examples: http://code.google.com/p/userinfuser/wiki/Analytics
Fantasm Site: http://code.google.com/p/fantasm/w/list

Fantasm is developed by: http://www.vendasta.com/

Asynchronous URL Fetch for Google App Engine

There are times when you want to do remote logging or a remote API call. You may be okay with losing some updates for the tradeoff of adding little or no overhead for each call. For this case the asynchronous URL Fetch is your solution for Google App Engine. In the case I show below, the call is made and the returned result is never checked. See the GAE documentation on doing async calls which are started early in a request and then checked later in the request.

Some things to note when playing around with it is that it will not be truly asynchronous in the SDK version. In fact, if you use the code below, nothing will happen because in the SDK the call is actually made when you wait on the result. AppScale uses a modified SDK of the GAE and will suport asynchronous fetches in version 1.5.

Make sure to catch exceptions when put into production. The code below is pseudo code on GAE versus environments that allow threads.

GAE Method

from google.appengine.api import urlfetch
def url_async_post(url, argsdic):
    if isProductionGAE:
        # This will not work on the dev server for GAE, dev server must only use
        # synchronous calls
        rpc = urlfetch.create_rpc(deadline=10)
        urlfetch.make_fetch_call(rpc, url, payload=urllib.urlencode(argsdic), method=urlfetch.POST)
    else:
        raise

def call_remote(api_key, account, urlpath):
    argsdict = {"apikey":api_key,
               "accountid":account}
    url_async_post(urlpath, argsdict)
    return True
 
Threaded Method
This is how to do it for an environment that allows threads:

import threading
def my_threaded(callback=lambda *args, **kwargs: None, daemonic=True):
  """Decorate a function to run in its own thread and report the result
  by calling callback with it. Code yanked from stackoverflow.com"""
  def innerDecorator(func):
    def inner(*args, **kwargs):
      target = lambda: callback(func(*args, **kwargs))
      t = threading.Thread(target=target)
      t.setDaemon(daemonic)
      t.start()
    return inner
  return innerDecorator

@my_threaded()
def threaded_url_post(url, argsdic):
  self.url_post(url, argsdic)

def url_post(url, argsdic):
  import socket
  socket.setdefaulttimeout(5) #timeout value
  url_values = ""
  if argsdic:
    url_values = urllib.urlencode(argsdic)

  req = urllib2.Request(url, url_values)
  output = ""
  response = urllib2.urlopen(req)
  output = response.read()

  return output

App Engine Channel API in AppScale

One of Google's newest App Engine features is the Channel API which allows for the pushing of messages to a client's javascript code. This blog entry explains AppScale's scalable implementation which is built using ejabberd and strophejs. 

There are two sets of APIs for the developer. First is the python API which consists of create_channel(app_client_id) and send_message(app_client_id, message). The create channel API under the covers uses the xmpp service implementation of AppScale. We are able to leverage ejabberd to take care of the distribution and sending of messages for us. The trick here lies in that we must create temporary accounts with each new channel created. This requires garbage collection of channels which live on longer than a prescribe period of time. 

Second, is the javascript API which can be included into the developer's code by adding the following line in the head of the html:
<script src='/_ah/channel/jsapi'></script>
This API allows for the creation of connections using strophejs. Strophejs is a robust and open source project that enabled BOSH connections to ejabberd (https://github.com/metajack/strophejs). The creation of a channel socket is actually using strophejs's connections, as well as its message callbacks. The functions have the same name and functionality to preserve the API, but the implementation is different. Google's implementation uses google talk and their xmpp service. Their javascript in production is minified and hard to decode while their SDK version uses polling instead of long lived BOSH connections (500ms poll time). AppScale's javascript library is also minified to save on bandwidth, yet the unminified version can be found in appscale/AppServer/google/appengine/tools/appscale-js.js. Within this file you will see a goog.appengine library to maintain the APIs as well as the strophe library along with additional libraries of MD5 and SHA which are needed by strophe. 

Nginx is used as a proxy to connect to ejabberd's http bind path (see http://tinyurl.com/68qbwyc on why a proxy is needed). The proxy connects to port 5280 to ejabberd's http-bind path. Long lived ajax calls are created to provide low overhead as opposed to constant polling. This can be seen when using resource tracking with Firefox or Chrome. You'll notice a call which blocks until a message is returned, followed immediately by another long lived connection. The javascript library also listens to the unload event where the client window is closed. Before a full exit, the client library will send a disconnect message to free up resources. 

AppScale's implementation allows for sending messages to multiple receivers which is more functionality then the one sender and one receiver restriction in GAE. Any clients given the same application key will see messages which are sent to that application when using the send_message(client_id, message) function. 

Naming issues
Each xmpp account is registered as <username>@<head-ip>, where username is the first part of your email (i.e. joe.smith of joe.smith@gmail.com). This reserves that username, and restricts other emails which the same username name (i.e. joe.smith@yahoo.com). 

The xmpp API implementation also creates an xmpp account for each app. If your username conflicts with an appname, you will not be able to use that email. We have ideas on how to alleviate this problem but its low on our list. If we see that users definitely don't like this limitation we will address it. 

The User/App Server within AppScale, which is a SOAP frontend to the APPS and USERS table in the datastore, must keep track of which User entry is an app, user, or channel. This is for authentication and also to know which accounts need to be garbage collected.  

Scalable Implementation
In order to have xmpp scale we need DNS. Without it we cannot route between machines because their domain (ip address) is different. The default setting will be to route all messages to the head node using nginx, but we will support DNS configuration for the advance users in the future. 

How to add a database in AppScale

This blog discusses how to add a datastore in AppScale ("datastore" and "database" are interchangeably used). There are three primary procedures which must be automated by the developer: installing, starting and stopping the datastore. Installation is done using shell scripts. Starting and stopping must be written in ruby (the AppController's language). Moreover, the AppScale DB interface must be implemented using a python interface.

Reference Code
There are currently nine different datastores already implemented in AppScale. Each one of these can serve as an example as to how to best integrate your given datastore. There is however a limitation with some datastores which do not have the capability to do range queries or the ability to get an entire table. For these datastores you must use the dhash interface. The dhash interface shards the key space amongst 16 special keys within the datastore to get around this limitation, but these datastores do not scale as well because each put must access these special keys.  
Datastore which use the dhash interface:
  • MemcacheDB (master/slave, written in C)
  • Voldemort (peer to peer, Java)
  • SimpleDB
  • Scalaris
Datastores which use the regular DB interface:
  • Cassandra (peer to peer, Java)
  • HBase (master/slave, Java)
  • Hypertable (master/slave, C++)
  • MongoDB (master/slave, C++)
  • MySQL (peer to peer, C++)
Code Locations
Starting, Stopping, and AppDB Interface paths:
appscale/AppDB/
appscale/AppDB/dbinterface.py
appscale/AppDB/dhash_datastore.py
appscale/AppDB/dbname/
appscale/AppDB/dbname/py_dbname.py
appscale/AppDB/dbname/dbname_helper.rb
appscale/AppDB/dbname/prime_dbname.py
appscale/AppDB/datastore_tester.py
appscale/AppDB/dbname/templates/
appscale/AppDB/dbname/patches/

Installation paths:
appscale/debian/appscale_install_functions.sh
appscale/debian/appscale_install.sh
appscale/debian/control.all
appscale/debian/makedeb_all.sh
appscale/debian/rules.dbname

Tools:
appscale-tools/bin/appscale-run-instances

Installing the Datastore
The scripts needed to install the datastore are to go in appscale/debian/. Here you will see shell scripts for automating installation. Grep the code in this folder for an example database for reference.

Initializing and Stopping the Datastore
The datastore you may be creating may need to have configuration files custom made for each spawning. All configuration files, or templates for them must go into appscale/AppDB/dbname/templates. The function in dbname_helper.rb named setup_db_config_files should use these templates. This function has the master ip, slave ips, and credentials (dictionary of additional args) passed to it. See a reference helper file for the functions which must be implemented.

AppScale DB Interface
The interface is a template for the following functions:
get_entity(table_name, row_key, column_names)
put_entity(table_name, row_key, column_names, cell_values)
get_table(table_name, column_names)
delete_entity(table_name, row_key)
get_schema(table_name)
delete_table(table_name)

The interface is very particular as to what is expected for each template function. Fully understand one of the reference implementations before implementing a new one.

AppScale Tools
Add the new database name into the run instance script.

Testing
Beyond trying out multiple applications and seeing if they behave correctly, there is also the datastore_tester.py in appscale/AppDB/.
Run this with args: -t <dbname>
This will check to make sure the peculiarities of the interface are correctly implemented.


Posterous theme by Cory Watilo