.. _quickstart-with-virtualenv: ============================== Quickstart with ``virtualenv`` ============================== Prerequisites ============= ------------ Installation ------------ To start, install |virtualenv|_ and create a virtual environment for our experiments. .. sidebar:: ``virtualenv`` and ``zc.buildout`` I prefer |zc.buildout|_ for production deployments, but |virtualenv|_ is very nice for quick experimentation. The other quick-start uses |zc.buildout|_. :: $ easy_install virtualenv $ virtualenv quickstart Install |async|_ in the virtual environment. :: $ cd quickstart/ $ ./bin/easy_install zc.async .. |zc.buildout| replace:: ``zc.buildout`` .. _`zc.buildout`: http://pypi.python.org/pypi/zc.buildout .. |virtualenv| replace:: ``virtualenv`` .. _virtualenv: http://pypi.python.org/pypi/virtualenv .. |async| replace:: ``zc.async`` .. _`async`: http://pypi.python.org/pypi/zc.async ------------ Dependencies ------------ .. sidebar:: Example Dependencies Here's an example listing of the site-packages brought in by a run of this quick-start. :: $ ls lib/python2.5/site-packages/ Twisted-8.1.0-py2.5-macosx-10.5-i386.egg ZConfig-2.5.1-py2.5.egg ZODB3-3.8.1b5-py2.5-macosx-10.5-i386.egg easy-install.pth pytz-2008c-py2.5.egg rwproperty-1.0-py2.5.egg setuptools-0.6c8-py2.5.egg setuptools.pth uuid-1.30-py2.5.egg zc.async-1.4.0-py2.5.egg zc.dict-1.2.1-py2.5.egg zc.queue-1.1-py2.5.egg zc.twist-1.3-py2.5-macosx-10.5-i386.egg zdaemon-2.0.2-py2.5.egg zope.bforest-1.2-py2.5.egg zope.component-3.4.0-py2.5.egg zope.deferredimport-3.4.0-py2.5.egg zope.deprecation-3.4.0-py2.5.egg zope.event-3.4.0-py2.5.egg zope.i18nmessageid-3.4.3-py2.5-macosx-10.5-i386.egg zope.interface-3.4.1-py2.5-macosx-10.5-i386.egg zope.minmax-1.1.0-py2.5.egg zope.proxy-3.4.1-py2.5-macosx-10.5-i386.egg zope.testing-3.6.0-py2.5.egg This installed several packages. - the ZODB_, an object database from the Zope project; - Twisted_, a framework for networked applications; - the component architecture from the Zope project; - and a few smaller packages. All of these, and zc.async, are distributed under BSD-like licenses such as LGPL and ZPL_. .. _ZODB: http://pypi.python.org/pypi/ZODB3 .. _Twisted: http://pypi.python.org/pypi/Twisted .. _ZPL: http://en.wikipedia.org/wiki/Zope_Public_License ---------- ZEO Server ---------- zc.async relies on a distributed ZODB technology called ZEO ("Zope Enterprise Objects") to distribute work. ZEO has a central database server to which client processes connect. Let's start the ZEO Server:: $ ./bin/runzeo -a 9999 -f test.fs & That starts a database server, accessible on port 9999 of your local machine, saving the data in the test.fs file. Starting |async| ================ -------- A Client -------- Now let's start a Python with a client connection to the database server. Start up ``bin/python`` (not your system python, but the one in virtualenv's ``quickstart/bin``):: $ ./bin/python This will be our single client process. You might have many, each connecting to the main database server, and each able to perform and/or request |async| jobs. ------------------- Database Connection ------------------- Connect to the database. :: import ZEO.ClientStorage import ZODB storage = ZEO.ClientStorage.ClientStorage( ('127.0.0.1', 9999)) db = ZODB.DB(storage) .. When run as a doctest, this uses a simple FileStorage, rather than a ClientStorage. >>> import ZODB.FileStorage >>> storage = ZODB.FileStorage.FileStorage( ... 'zc_async.fs', create=True) >>> from ZODB.DB import DB >>> db = DB(storage) ------------- Start |async| ------------- Basics ------ Now we do some basic configuration. This first bit installs some default adapters. You might not ever have to worry too much about them. >>> import zc.async.configure >>> zc.async.configure.base() Policy ------ This second part is policy, and if you ever put zc.async in production, you'll want to understand what's going on here. >>> zc.async.configure.start(db, poll_interval=1) Now the system has a ``dispatcher`` polling for jobs every second. As we'll see below, a ``dispatcher`` polls for jobs in one or more queues, to assign them to worker threads. Using |async| ============= --------- The Queue --------- The ``start`` function also installed a queue. To get zc.async to do work, you put a job in a queue, and commit the transaction. First, let's get the queue that we have installed. We need to open a connection to the database. Then we get the queue. >>> conn = db.open() >>> import zc.async.interfaces >>> q = zc.async.interfaces.IQueue(conn) ----- A Job ----- .. sidebar:: A Silly Example This is a silly example. Imagine instead that this was some really long-running job. Maybe you have lots of these jobs coming in, and you need to have many machines to claim jobs and perform them, so that you can scale. Maybe this job divides itself up into parallel or serial jobs, and this parent job isn't done until all the children jobs run to completion. Or maybe this is a silly example. .. Let's put a job in our queue. This silly example will return the current time. >>> import time >>> j = q.put(time.time) It's not done yet. >>> j.result >>> j.status u'pending-status' ------------- A Transaction ------------- We have to commit the transaction for the dispatcher to see the job. >>> import transaction >>> transaction.commit() -------- A Result -------- Now wait a second and then try this. "transaction.begin" will sync up our database with database changes made elsewhere. .. This lets us "wait a second". >>> import zc.async.testing >>> res = zc.async.testing.wait_for_result(j) .. >>> _ = transaction.begin() >>> j.result 1216179006.856108 >>> j.status u'completed-status' -------------------------- Another Job (And Closures) -------------------------- You can also make closures. The Job class accepts arguments similarly to the Python 2.5 :func:`functools.partial`: ``Job(func, \*args, \*\*keywords)``. This instantiates a new callable (a Job instance) with partial application of the given arguments and keywords. You can then pass the job instance to the :meth:`~zc.async.interfaces.IQueue.put` method. Generating RSA keys is actually a reasonable real-world use case for something like this. :: import subprocess j = q.put(zc.async.job.Job( subprocess.call, ['openssl', 'genrsa', '-out', 'key.pem', '1024'])) transaction.commit() We need to begin the transaction to see the result... :: j.result _ = transaction.begin() j.result ...which in this case is simply ``0``, indicating a successful UNIX process. :: 0 We can open the file to show the result. :: subprocess.call(['cat', 'key.pem']) This will show you the key, which should look something like this:: -----BEGIN RSA PRIVATE KEY----- MIICXgIBAAKBgQCYAZW+HjDGJhRHnUlZZWqhrGOxU2K/RhssmcMs0JLnWI2cWmZ+ ... CEcz6ZbO8zm4AEGI/dqLicZh3bhunhflAovW6WxbNKLENQ== -----END RSA PRIVATE KEY----- 0 Running Your Own Code ===================== ------------------------ A Monte Carlo Simulation ------------------------ .. sidebar:: Another Silly Example Monte Carlo simulations are generally better suited to other tasks in real-world use; pi is typically calculated by `far more sophisticated means`_. We've now seen some simple examples from the standard library. But how do you get your own work done? Let's say we want to implement an old chestnut of a problem: use a `Monte Carlo simulation`_ (that is, "throwing darts and analyzing the results") to `calculate pi`_. This will use |async| much like the map-reduce approach of `Hadoop`_: we will want to distribute the work of running the simulation to multiple machines so the result can be done faster. .. _`Monte Carlo simulation`: http://en.wikipedia.org/wiki/Monte_Carlo_method .. _`Hadoop`: http://hadoop.apache.org/core/ .. _`calculate pi`: http://math.fullerton.edu/mathews/n2003/MonteCarloPiMod.html .. _`far more sophisticated means`: http://en.wikipedia.org/wiki/Computing_π --------------------------------- Picklable Callables and Arguments --------------------------------- You want a job to have a reference to your own callable, so the job will get the work you define performed. This reference, of the job to your callable, will need to be persisted in the database. .. sidebar:: ZODB Persistence Rules We don't need these now. But if you are really curious about ZODB persistence rules, here's a start. - Anything pickleable can be persisted. Module global functions can be pickled (by name), for instance, and will come in handy for our examples. - Custom classes should typically inherit from persistent.Persistent. Instances of persistent.Persistent subclasses are each stored as a single record in the database, and references to them are handled efficiently. - Subclasses of persistent.Persistent produce instances that recognize when their *direct* attributes have been written, and inform the database that they are dirty, for the next committed transaction. Standard, non-persistent-aware mutables such as lists, sets and dicts do *not* know when they have been changed, and are best avoided, or at least not mutated. A variety of persistent-aware replacements for these types are available. - Use the transaction module to commit and abort transactions in the ZODB. More advanced concerns include the following. - An optimistic implementation of MVCC_ in the ZODB means that you should be careful reading (not writing) one persistent.Persistent instance (record) and writing another persistent.Persistent instance with a value that depends on the first, read value within a transaction. - Concurrent transactions can produce write conflict errors. Transactions are often automatically retried in |async| and other ZODB-based systems if conflict errors are encountered. If this is not desired for a given |async| job, it should be given a ``zc.async.job.NeverRetry`` retry policy, as discussed elsewhere in this documentation. For ZODB documentation see http://www.zope.org/Wikis/ZODB/guide/zodb.html Because zc.async uses the ZODB for its persistence mechanism, the ZODB's persistence rules are in effect. Luckily, these are fairly simple. For now, we'll stay as simple as it gets: if you use *module global functions* and *immutables*, and share software across instances, you'll be fine. ZODB allows a lot more, if you're willing to follow a few more rules, but that one rule will get us moving for this quick-start. .. _MVCC: http://en.wikipedia.org/wiki/Multiversion_concurrency_control ------------- Process UUIDs ------------- Exit the Python interpreter (control-D). Look around in the directory (``ls``): you should see a few database files, the key.pem you created, and a new file: ``uuid.txt``. It should look something like this:: $ cat uuid.txt afd1e0d0-52e1-11dd-879b-0017f2c49bdd ------------------------------------------------------------------------ The value above (and this file) is created and used by the zc.async package. It is intended to uniquely identify this software instance when it is used to start a zc.async dispatcher. This allows multiple dispatchers, each in its own software instance, to connect to a single database to do work. In order to decide where to look for this file (or to create it, if necessary), the module looks in ``os.environ['ZC_ASYNC_UUID']`` for a file name. If you are using zdaemon (http://pypi.python.org/pypi/zdaemon) to daemonize your process, you can set this in a zdaemon environment section of your zdaemon.conf. Supervisor (http://supervisord.org/) also provides this functionality. Other similar tools probably do as well. If the ``ZC_ASYNC_UUID`` is not found in the environment, it will use ``os.path.join(os.getgwd(), 'uuid.txt')`` as the file name. To get a new identifier for this software instance, delete this file, restart Python, and import zc.async.instanceuuid. This file will be recreated with a new value. That text is intended to be self-explanatory, so hopefully it made sense to you. We'll handle these UUIDs more explicitly in a moment. ----------- Make a File ----------- Make a new Python file. Let's call it ``pi.py``. Save this file in ``lib/python2.5/site-packages/``. Use the following for the file content. .. sidebar:: Code Walkthrough This code walkthrough focuses on the elements needed to use |async|, rather than the calculation of pi. Numerous explanations of this Monte Carlo simulation to calculate pi are on the internet. I liked `this one`_. We begin with some imports. The ``generate_sample`` code will be run on multiple processes to produce samples for our Monte Carlo function simulation. It is ignorant of |async|. Once the samples are all done, we'll reduce the results with ``process_samples``. It will return an approximation of pi, with accuracy strongly influenced by the total size of the aggregated samples, assuming even distribution of the random numbers. As you'll see soon, we'll be using a |async| convenience function for :func:`~zc.async.job.parallel` jobs that gives all of the completed jobs that have been running in parallel to this, the postprocessing call. Therefore, ``process_samples`` gets the ``result`` of ``generate_sample`` off of each job. Other than that, this function is ignorant of |async|. The last code block should look similar to our previous example of starting up a dispatcher, except this one uses the main, installed Twisted reactor, rather than a threaded instance. It creates the database, configures zc.async, and starts the reactor. We use a short poll_interval so we can have a perceptually snappy response in this quick start. .. literalinclude:: pi1.py :linenos: .. We'll need these defined when we run this as a test. >>> import random >>> import math >>> import types >>> import sys >>> def generate_sample(size=100000): ... count = 0 ... for i in range(size): ... if math.hypot(random.random(), random.random()) < 1: ... count += 1 ... return count, size ... >>> def process_samples(*sample_jobs): ... count = 0 ... size = 0 ... for j in sample_jobs: ... count += j.result[0] ... size += j.result[1] ... return 4.0 * count / size ... >>> _pi = types.ModuleType('pi') >>> sys.modules['pi'] = _pi >>> _pi.generate_sample = generate_sample >>> _pi.process_samples = process_samples .. _`this one`: http://math.fullerton.edu/mathews/n2003/MonteCarloPiMod.html -------------------------------- Our First Monte Carlo Experiment -------------------------------- We'll need the ZEO server running. If you've gone through this file from the start, it should still be running. If not, use this:: $ ./bin/runzeo -a 9999 -f test.fs & Now, for our first experiment with the Monte Carlo simulation, we'll start a single worker process. Enter this in the terminal:: $ ./bin/python lib/python2.5/site-packages/pi.py & That will start our worker process. Now let's start an interpreter. :: $ ./bin/python Now get a database and a connection, as we've seen before. We'll also set up the base zc.async configuration. :: import ZEO.ClientStorage import ZODB storage = ZEO.ClientStorage.ClientStorage( ('127.0.0.1', 9999)) db = ZODB.DB(storage) conn = db.open() import zc.async.configure zc.async.configure.base() We don't have any adapters installed, so ``zc.async.interfaces.IQueue(conn)`` won't work. This will though, and still looks pretty good:: >>> import zc.async.queue >>> q = zc.async.queue.getDefaultQueue(conn) Now we can start some jobs in :func:`~zc.async.job.parallel`. :: >>> import pi >>> import zc.async.job >>> j = q.put(zc.async.job.parallel( ... pi.generate_sample, pi.generate_sample, pi.generate_sample, ... postprocess=pi.process_samples)) >>> import transaction >>> transaction.commit() Wait a few seconds. If the result is empty (None), begin the transaction again and check the result again. Eventually, these next two lines should give you a result: an approximation of pi. .. This lets us "wait a second". >>> zc.async.testing.wait_for_result(j) # doctest: +ELLIPSIS 3.1... .. :: _ = transaction.begin() j.result For one run, I got ``3.1386666666666665``. Cool. -------- Closures -------- We've already seen Jobs used as closures in the openssl example. You can also use them to pass a different ``size`` argument to ``generate_sample``. Let's try it. >>> j = q.put(zc.async.job.parallel( ... zc.async.job.Job(pi.generate_sample, 1000000), ... zc.async.job.Job(pi.generate_sample, size=1000000), ... postprocess=pi.process_samples)) >>> transaction.commit() Wait a bit, again. Retry these two lines until you get a result. It should be well under a minute on most machines. .. This lets us "wait a second". >>> zc.async.testing.wait_for_result(j, seconds=20) # doctest: +ELLIPSIS 3.1... .. :: _ = transaction.begin() j.result My run got ``3.1434359999999999``. Cool. ------------- Configuration ------------- Unfortunately, the parallel runs we've done so far would actually make the calculation go slower than if we did it in a single function call! That's because we ran it in a single Python worker process, and the overhead of threads just makes the jobs a bit less efficient. Threads help for some uses of |async|, but not this one. Let's assume you are running these experiments on a machine with two processor cores. We should actually start two worker processes then, one for each core. We'll need to make sure each worker process has its own OID. Moreover, we need to configure each process to only take one ``generate_sample`` job at a time. Let's adjust our code to do that. Agents ------ A worker process regularly polls the database for new jobs. The software component that polls is called a ``dispatcher``. Dispatchers look in the database to ask their personal ``agent`` (or agents) to determine what they should get their threads to do. Think of the ``agent`` as a "`talent agent`_" or a "booking agent" for the process. By default, using the zc.async.configure helpers, each dispatcher is given a single agent that will choose the first job in the queue, and that wants to run no more than three jobs at a time. For our Monte Carlo job, we'll give each process an additional agent. The new agent will only accept up to one instance of a ``generate_sample`` job at a time. We will also reconfigure the existing agent to accept up to three of anything *except* ``generate_sample``. We'll do that in our file. Here's the revised version. The only changes are the imports, the three new functions, and setting up ``install_agent`` in the ``if __name__ == '__main__':`` block. .. sidebar:: Code Walkthrough for Changes We import three new modules from |async|, :mod:zc.async.queue, :mod:zc.async.instanceuuid, and :mod:zc.async.agent. The :class:`~zc.async.interfaces.IAgent` implementation (:class:`zc.async.agent.Agent`) uses a function called a :attr:`~zc.async.agent.Agent.chooser` to determine its policy for choosing agents. We define two chooser functions, one for each agent: ``choose_generate_sample`` and ``choose_another``. Then we have a function ``install_agent`` to set up the old agent to use ``choose_another``, and create a new agent of ``size=1`` with the ``choose_generate_sample`` chooser. We make sure this function is installed to run when the Twisted reactor starts. That's it. It's important to note that the references to these functions are persistent, and by name. If you change the location or name of these functions, you will need to keep the old names and locations around, at least for long enough to switch your agents to use the new names. This is a general pattern for module globals--functions and classes--to which the ZODB has direct references or instances. We could have accomplished the same basic policy changes with several different agent configurations. For instance, we could have written a chooser that looked through its agents's current jobs to verify that it did not have another ``generate_sample``. The only trick to this kind of agent configuration is that you always want to have at least some catch-all dispatchers who are willing to do just about any jobs (what our ``choose_another`` choose accomplishes). This supports jobs that the |async| system sometimes needs to run for exceptional circumstances. .. literalinclude:: pi2.py :linenos: .. _`talent agent`: http://en.wikipedia.org/wiki/Talent_agent ------------- Demonstration ------------- Now let's start up our workers, and see how they work. We're going to have two workers now, and they will each need separate UUIDs. A really simple approach will be to make two separate working directories for the two worker processes. (We also could use the environmental variable, ``ZC_ASYNC_UUID``, described in the `Process UUIDs`_ section above.) :: $ mkdir worker1 $ mv uuid.txt worker1 $ cd worker1 $ ../bin/python ../lib/python2.5/site-packages/pi.py & $ cd .. $ mkdir worker2 $ cd worker2 $ ../bin/python ../lib/python2.5/site-packages/pi.py & Now we'll start the Python process in which we will test our code. We'll move to the main directory, but as long as we don't start another worker, it doesn't really matter. :: $ cd .. $ ./bin/python And now, our test. :: import ZEO.ClientStorage import ZODB storage = ZEO.ClientStorage.ClientStorage( ('127.0.0.1', 9999)) db = ZODB.DB(storage) conn = db.open() import zc.async.configure zc.async.configure.base() import pi import zc.async.job import zc.async.queue q = zc.async.queue.getDefaultQueue(conn) j = q.put(zc.async.job.parallel( zc.async.job.Job(pi.generate_sample, 5000000), zc.async.job.Job(pi.generate_sample, size=5000000), postprocess=pi.process_samples)) import transaction transaction.commit() Wait a few seconds and then try these lines. :: _ = transaction.begin() j.result If the result is empty (None), repeat those two lines again (thatæ is, begin the transaction again and check the result again). Eventually, these lines should give you a result: an approximation of pi. Just to prove to ourselves that we saved some time, let's do a comparison test: the same number of samples, but not in parallel. :: j2 = q.put(zc.async.job.parallel( zc.async.job.Job(pi.generate_sample, 10000000), postprocess=pi.process_samples)) transaction.commit() _ = transaction.begin() j2.result Once both jobs are complete, compare their run-time. :: j.active_end - j.active_start j2.active_end - j2.active_start On my machine, even in this simple, short example, running in parallel with a job per processor/core took 7.8 seconds, while running all in one process took 13.4 seconds. Monitoring ========== Soon, you may want to be able to monitor or introspect what's going on in your zc.async work. The package provides several tools to do that. We'll take a look at a few here. We will be turning on a monitor port. This port should be protected behind a firewall, like your ZEO ports. If you like the functionality that we describe here but would prefer to expose it in a different manner, note that most of the Python functions in ``monitor.py`` and ``monitordb.py`` power the zc.async commands in the monitor port, and can be used without the monitor itself. To enable the monitoring port, we need to install some extra dependencies for zc.async: "[monitor]". Exit the Python interpreter an make sure you are in the top "quickstart" directory, and then enter this command: $ ./bin/easy_install zc.async[monitor] If you take a glance at the output, you'll see we've only added a few dependencies: ``simplejson``, ``zc.ngi``, and ``zc.monitor``. Now let's turn on the port and the zc.async commands. At the top of the ``pi.py`` file, add some imports:: import os import zope.component import zc.monitor import zc.monitor.interfaces import zc.async.monitor import zc.async.monitordb Then down in the ``if __name__ == '__main__':`` block, add these lines at the top. :: monitor_port = os.environ.get('MONITOR_PORT') if monitor_port: for f in (zc.monitor.interactive, zc.monitor.quit, zc.monitor.help, zc.async.monitor.async, zc.async.monitordb.asyncdb): zope.component.provideUtility( f, zc.monitor.interfaces.IMonitorPlugin, f.__name__) zc.monitor.start(int(monitor_port)) The file should look like this now. .. literalinclude:: pi3.py :linenos: Now stop and restart your two worker instances, this time providing two different ports in the environment for each worker. Here's one way to do it. First we'll shut down the previous instances. If you used the lines above to start them before, type ``fg`` and RETURN, and then CTRL-C to stop worker 2; and then do the same thing (``fg`` and RETURN, and then CTRL-C) to stop worker 1. :: $ cd worker1 $ MONITOR_PORT=9991 ../bin/python ../lib/python2.5/site-packages/pi.py & $ cd ../worker2 $ MONITOR_PORT=9992 ../bin/python ../lib/python2.5/site-packages/pi.py & Now you can open connections to these two ports, 9991 and 9992, and query the worker (using the ``async`` command, primarily) and the state of zc.async in the database itself (the ``asyncdb`` command). The easiest way to experiment with this is using telnet. Try this. :: $ telnet 127.0.0.1 9991 You should see something like this:: Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. Now you can experiment with commands. Try this (followed by a RETURN):: help You should see something like this:: Supported commands: async -- Monitor zc.async activity in this process. asyncdb -- Monitor and introspect zc.async activity in the database. help -- Get help about server commands interactive -- Turn on monitor's interactive mode quit -- Quit the monitor Connection closed by foreign host. $ Hm, it dumped us straight back to the shell! ``zc.monitor`` behaves that way tp be friendly to automated monitoring processes using the port. We can use the ``interactive`` command to make things a bit more pleasant for ourselves. Reuse the telnet command shown above, or maybe connect to 9992 (``telnet 127.0.0.1 9992``) to see that you can. This time, type the ``interactive`` command first. You should see this reply:: Interactive mode on. Use "quit" To exit. Now experiment! Try some of these commands to see what you get. :: help async async help async help status async status async help poll async poll async UUID async utcnow help asyncdb asyncdb help asyncdb help UUIDs asyncdb UUIDs asyncdb lastcompleted asyncdb count completed asyncdb count completed uuid:THIS That last command shows how many jobs this worker has completed for as long as the database has records, which by default means between seven and eight days. The help for ``asyncdb count``, which is available from ``asyncdb help count``, is not short, but tells you the many options available. When you are done, use the ``quit`` command to exit the telnet connection to the monitor port. Another important tool is logging. The ``zc.async.event`` logger gets important events--pay special attention to critical events! ``zc.async.trace`` lets you follow along with every detail, if so desired. These monitoring and introspection tools, combined with logging, provide powerful tools to keep track of your zc.async work. Other Configuration =================== We're at the end of this quickstart. To close, here's a quick survey of some other configuration opportunities available that we haven't seen here. - Callbacks are a very important per-job configuration. You can add them to a job, to be run unconditionally, conditionally if the result is an instance of a ``twisted.python.failure.Failure``, or conditionally if the result is not a ``Failure``. See :meth:`~zc.async.interfaces.IJob.addCallback`, :meth:`~zc.async.interfaces.IJob.addCallbacks`, and :attr:`~zc.async.interfaces.IJob.callbacks` .. note:: Unlike Twisted callbacks, all callbacks for the same job get the same result; if you would like to chain results, the callbacks themselves are Jobs, so attach a callback to your callback. - You can request that a job :attr:`~zc.async.interfaces.IJob.begin_after` a given timezone-aware datetime. If not given, this defaults to now, for the purposes of calculating the effective :attr:`~zc.async.interfaces.IJob.begin_by` datetime, described below. - You can specify that a job should :attr:`~zc.async.interfaces.IJob.begin_by` a given duration (datetime.timedelta) *after* the jobs's :attr:`~zc.async.interfaces.IJob.begin_after` value. When the queue :gets ready to offer the job for an agent to choose, if the effective ``begin_by`` value has passed, the queue will instead offer a call to the job's :meth:`~zc.async.interfaces.IJob.fail` method. .. note:: There is no built-in way to stop a running job, short of stopping the process. This can be approximated by use in your job of :func:`~zc.async.local.getLiveAnnotation` to poll for stop requests; or the brave can write some C to use PyThreadState_SetAsyncExc_. - You can set up quotas for certain arbitrary quota names that you define. This is a limit: no more than the given quota can run at once, total, across all workers. This can let you decrease the chance of conflict errors for long-running jobs that write to the same data structures. See :attr:`~zc.async.interfaces.IQueue.quotas`, :class:`~zc.async.interfaces.IQuotas`, and :attr:`~zc.async.interfaces.IJob.quota_names`. - Retry policies determine how jobs should be retried if they raise an uncaught exception while running, if their commit raises an error, or if they are interrupted. They can be configured per job, or as defaults for callback and non-callback jobs. See :class:`~zc.async.interfaces.IRetryPolicy`, :attr:`~zc.async.interfaces.IJob.retry_policy_factory`, :meth:`~zc.async.interfaces.IJob.getRetryPolicy`; the default retry policies :class:`~zc.async.job.RetryCommonFourTimes`, :class:`~zc.async.job.RetryCommonForever` and :class:`~zc.async.job.NeverRetry`; and the ``retry_policy_factory`` argument to :meth:`~zc.async.interfaces.IJob.addCallback`, :meth:`~zc.async.interfaces.IJob.addCallbacks`, and :meth:`~zc.async.interfaces.IQueue.put`. - The :attr:`~zc.async.interfaces.IJob.failure_log_level` determines at what level Failure results for a given job should be logged. This is usually logging.ERROR, and logging.CRITICAL for callbacks. This can be set directly on the job, or applied via the ``failure_log_level`` argument to :meth:`~zc.async.interfaces.IJob.addCallback`, :meth:`~zc.async.interfaces.IJob.addCallbacks`, and :meth:`~zc.async.interfaces.IQueue.put`. - Custom job subclasses can take advantage of :meth:`~zc.async.interfaces.IJob.setUp` and :meth:`~zc.async.interfaces.IJob.tearDown` hooks to set up and tear down state. An example is the Zope 3-specific :class:`~zc.async.z3.Job`. - A custom queue might support broadcasting jobs across dispatchers, or targeting specific dispatchers. These features may be developed for |async| itself at a later date. There are many other topics to discuss--testing, debugging, and Zope 3 integration, for instance--but this is a quick start, so we'll end here. .. _PyThreadState_SetAsyncExc: http://docs.python.org/api/threads.html .. Now we are going to stop the reactor. >>> import zc.async.dispatcher >>> dispatcher = zc.async.dispatcher.get() >>> reactor = dispatcher.reactor >>> reactor.callFromThread(reactor.stop) >>> dispatcher.thread.join(3)