Thursday, March 28, 2013

Exposable: An Attribute Access Mixin/Wrapper


[link to code]

I'm writing/have written (what amounts to) an "application server" in Python & Javascript. User apps have two interrelated parts: a front-end JS part, and potentially a back-end Python part (if server-side logic is required). Sometimes I'd like the JS part of the app to read/write attributes of the Python part. It is tedious to have to check selectively against attribute access:

    // js

    promise = me.getAttr(["a", "b"]); // 'me' is a js app of some kind
    promise.then(function (response) {
        console.log(response.a);  // prints '1'
        console.log(response.b);  // prints 'null'
    });

    # python

    class SomePythonAppThatCounterpartsTheJsAppAbove(object):

        def __init__(self):

            self._a = 1
            self._b = 2

        @property
        def a(self):
            ...
            return self._a

        @property
        def b(self):
            ...
            return self._b

        def on_get(self, attr):

            allowed = ['a']
            if attr in allowed:
                try:
                    return getattr(self, attr)
                except AttributeError:
                    pass

            # an implicit None value is returned here

This works fine and all, but the definition of the SomePythonAppThatCounterpartsTheJsAppAbove (from hereon called App) class is fluid, and as I add/remove/alter attributes I also need to remember to update that 'allowed' list inside the on_get method.

With the Exposable class I can simply decorate or un-decorate the attributes as I add them, remove them, rename them, or alter what they mean.

I also realized that sometimes the permissions on server-side app attributes were user-dependant. An admin required access to some parts of an app that a regular user would never have access to. Ideally I wanted to grab a "policy" from a user regarding an app in question, and then apply that policy to the app whenever the user requested access to attributes. So the Exposable class mutated into a wrapper, and then (because it was so simple!) a context manager.

You can check out the work on Github. Feedback definitely appreciated.

Wednesday, December 5, 2012

Twisted Static & WebSocket Server

Going forward, I've decided to use web technologies for as many applications as possible at work. The README for the github repo explains things nicely, so I won't repeat. Gist: it's a Twisted web server that serves up an initial app payload, then communicates with it via WebSocket thereafter.

I am absolutely and most definitely interested in comments/criticisms. Please do so if you've got the time! (Or fork and make better.)

[https://github.com/jomido/tswss]

Tuesday, October 9, 2012

No Use Case: Meshing Part 3 - Schedulers



This is the second article (part 3) in a series of articles regarding the Python programming language. I periodically find myself playing with features of the language outside of any actual project. As such, discoveries that I make (some neat, others of little interest) often have no use case.

Despite having no purpose, I can't help but think that at some point these discoveries will prove useful. So I'm documenting them.

NOTE: I am unable resist writing to an invisible, and most likely non-existent, audience. The writing takes the tone that someone besides myself is reading this.


[Part 1]: Generators & Coroutines
[Part 2]: Tasklets

In part 2 of this article I had a long example showcasing the tasklet, with sink, doubler, and tripler tasklets. In that example you passed in a coroutine (sink()) as an argument to the coroutine of the tasklet (doubler, or tripler). There is nothing stopping you from passing in another tasklet instead, so that one tasklet targets another tasklet (or multiple tasklets).

from tasklet import coroutine, Tasklet

@coroutine
def start(target):
    while True:
        i = (yield)
        target(i * 2)

@coroutine
def finish():
    while True:
        i = (yield)
        print (i)

f = Tasklet(finish)
s = Tasklet(start, f).send(1).next()
f.next()

# 2


A Simple Scheduler


The original vision was of a fabric-like, stretchy mesh of nodes that could be poked, prodded, and squished into work. For this I need some sort of prime mover to actuate the inert mesh into a living thing. Enter the scheduler.

Here is a simple one that interleaves paths as they branch throughout the mesh (I've left out the try-except GeneratorExit clauses, since they're not strictly necessary; also, I've foregone using .send(), and elected to just call the tasklets in this scenario - which is equivalent to .send() in the tasklet API):

from tasklet import coroutine, Tasklet

@coroutine
def start(target):

    while True:
        i = (yield)
        target(i + 1)

@coroutine
def branch(left, right, rand):        

    import random                    

    while True:
        i = (yield)
        if i % 2 == 0:
            left(i)
        elif random.randint(1, 2) == 1:
            rand(i)
        else:
            right(i)

@coroutine        
def doubler(target):

    while True:
        i = (yield)
        target('2x{}'.format(i))

@coroutine
def tripler(target):
    while True:
        i = (yield)
        target('3x{}'.format(i))

@coroutine
def recycler(beginning, sink):

    import random

    while True:
        i = (yield)
        if random.randint(1, 2) == 1:
            beginning(i)
        else:
            sink(i)

@coroutine
def sink():
    while True:
        i = (yield)
        print ("Sink got: {}".format(i))

def taskmaster(tasks):

    """######################################################################

    So simple.

    It basically takes a set of tasklets and runs them as a group, 

    interleaving as a natural consequence of how it steps over them.
    When no one in the group succeeds at a flush() call, we know 
    there is no 'data' left to flow through this particular pipeworks,
    or 'mesh' of tasklets.

    For more fine-grained interleaving, task.flush() can be changed to 

    task.next().  However, for large meshes, or meshes where there are 
    chunks of work to be done concentrated in a single node of the mesh, 
    it would be less efficient.

    This scheduler can be turned into a tasklet itself with minimal effort.

    ######################################################################"""


    while any([task.flush() for task in tasks]): pass
   
s = Tasklet(sink)
d = Tasklet(doubler, s)
t = Tasklet(tripler, s)
b = Tasklet() # placeholder for now; we set this in just a second...      
a = Tasklet(start, b)
r = Tasklet(recycler, a, s)

b.set(branch, d, t, r)  # r needs a, a needs b, b needs r: circular reference!
                        # so must set b again to get access to r; didn't
                        # implement set() for this reason, but it seems like its
                        # come in handy for circular refs.

tasks = [s, d, t, b, a, r] # order doesn't matter here

a(0)(1)(2)(3) # queue up some stuff

taskmaster(tasks) # interleave

a(10)(11) # queue up some more

taskmaster(tasks) # interleave

If you run this, the result should be something similar to this:

Sink got: 2x2
Sink got: 2x4
Sink got: 3x1
Sink got: 3x3
Sink got: 11
Sink got: 2x12

Of course, since both the 'branch' and 'recycler' coroutine implement a measure of randomness, the results will vary with each run. Any data sent to the branch coroutine has an equal chance of moving along to one of three other coroutines: doubler, tripler, and recycler. Any data sent to the recycler coroutine has a 50% chance of recycling back to the start coroutine, and a 50% chance of heading on to the sink coroutine.



The basic idea then, has been realized:

"In a mesh nodes do work on whatever fluids are sent their way (same as a pipeworks), but can also send work to zero or more other nodes in the entire mesh. That is, the pathways are not strictly defined outside of the control of the node - they're not rigid pipes. Nodes sit within some form of malleable landscape (read: mesh) and can rewire the pathways as they see fit. Nodes are smart, and contain internal logic that allows them to become more than just dumb processing stations in a pre-configured chain. They're programs unto themselves."

That's pretty much all there is to "meshing". There's a general prime mover that causes the meshing to happen, but each node in the mesh (tasklet) is free to contain behavioural logic that lets it dynamically push content (data) to different parts of the mesh.

A Priority Scheduler


Moving forward, I can imagine different kinds of schedulers. In particular, having some way to prioritize tasklets seems like it could come in handy. Tasklets, then, need a mechanism to tell the scheduler how important they are. Here is an extension of the Tasklet class, a PriorityTasklet:

from tasklet import coroutine, Tasklet
try:
    from Queue import PriorityQueue, Empty # Python 2.x
except ImportError:
    from queue import PriorityQueue, Empty # Python 3.x

class PriorityTasklet(Tasklet):

    def __init__(self, fn=None, priority=None, *args, **kwargs):

        Tasklet.__init__(self, fn, *args, **kwargs)
        self.priority = priority

    def get_priority(self):

        if self.priority:
            return self._get_value(self.priority)
        else:
            return 0

    def name(self, name):

        self._name = name
        return self

    def _get_value(self, obj):

        # obj may either be some value, or a function that returns a
        # value. In either case, this helper function returns the
        # eventual value. This works to any depth. If the attribute
        # is a function that returns a function, for instance, this
        # will still get the eventual value. The functions must
        # accept no parameters.              

        while callable(obj):
            obj = obj()
       
        return obj

The 'priority' parameter accepts either an integer or a function that returns an integer. (Or a function that returns a function that returns an integer, etc.) The integer will be used to prioritize the tasklet in relation to other tasklets. The lower the integer, the higher the priority. I've also added a method to allow changing the name of the tasklet (which is normally automatically assigned as the name of the passed-in coroutine):

@coroutine
def echoer(target, suffix):

    while True:
        i = (yield)
        target(", ".join([i, suffix]))

@coroutine
def sink():
    while True:
        i = (yield)
        print ("Sink got: {}".format(i))

def random_priority():
    return random.randint(1, 20)

s = PriorityTasklet(sink, 1000)
e1 = PriorityTasklet(echoer,
                     random_priority,
                     s,
                     "world!").name("echoer_world")
e2 = PriorityTasklet(echoer,
                     random_priority,
                     s,
                     "womb!").name("echoer_womb")

e1.send ("Hello")   ("Bonjour")   ("Ola")  # queue up some work on echoer_world
e2.send ("Goodbye") ("Au revoir") ("Ciao") # queue up some work on echoer_womb

The priority for the sink is set purposefully low (1000; remember, the lower the number the higher the priority). The priority for the echoers is set to a function that returns a random number between 1 and 20.

The priority scheduler comes next, which inherits from the built-in PriorityQueue class:

class PriorityScheduler(PriorityQueue):

    def __init__(self, tasklets=None):

        PriorityQueue.__init__(self)
        if tasklets == None:
            self.tasklets = []
        else:
            self.tasklets = tasklets

    def flush(self, incremental=False):

        self._fill()
   
        while self._process(incremental):
            self._fill()

    def _fill(self):

        if self.tasklets:
            ([self.put((task.get_priority(), task))
                for task in self.tasklets if task.items])

    def _process(self, incremental):

        results = []

        while True:
            try:
                priority, task = self.get(block=False)
                print ("Scheduler executes {} on priority {}".format(task, priority, task.items))
                for i, item in enumerate(task.items):
                    if i == 0:
                        print ("    {} <-- processing this item".format(item))
                    else:
                        print ("    {}".format(item))
                result = task.next() if incremental else task.flush()
                if task.items:
                    print ("Scheduler puts {} back into priority queue.".format(task))
                    self.put((task.get_priority(), task))
                try:
                    results.extend(result)
                except TypeError: # result is not an iterable
                    results.append(result)
            except Empty:
                break

        return results

You create and call the scheduler like so, passing in the tasklets as a list:

scheduler = PriorityScheduler([s, e1, e2])
scheduler.flush()

Which will result in similar output to this:

# Scheduler executes echoer_womb on priority 8
#     (('Goodbye',), {}) <-- processing this data
#     (('Au revoir',), {}) <-- processing this data
#     (('Ciao',), {}) <-- processing this data
# Scheduler executes echoer_world on priority 15
#     (('Hello',), {}) <-- processing this data
#     (('Bonjour',), {}) <-- processing this data
#     (('Ola',), {}) <-- processing this data
# Scheduler executes sink on priority 1000
#     (('Goodbye, womb!',), {}) <-- processing this data
#     (('Au revoir, womb!',), {}) <-- processing this data
#     (('Ciao, womb!',), {}) <-- processing this data
#     (('Hello, world!',), {}) <-- processing this data
#     (('Bonjour, world!',), {}) <-- processing this data
#     (('Ola, world!',), {}) <-- processing this data
# Sink got: Goodbye, womb!
# Sink got: Au revoir, womb!
# Sink got: Ciao, womb!
# Sink got: Hello, world!
# Sink got: Bonjour, world!
# Sink got: Ola, world!

Optionally, you can specify that the scheduler should call next() on the tasklets that it processes from its queue, rather than flush():

scheduler.flush(incremental=True)

Which will make the processing somewhat more interleaved:

# Scheduler executes echoer_womb on priority 16
#     (('Goodbye',), {}) <-- processing just this data
#     (('Au revoir',), {})
#     (('Ciao',), {})
# Scheduler puts echoer_womb back into priority queue.
# Scheduler executes echoer_womb on priority 1
#     (('Au revoir',), {}) <-- processing just this data
#     (('Ciao',), {})
# Scheduler puts echoer_womb back into priority queue.
# Scheduler executes echoer_womb on priority 10
#     (('Ciao',), {}) <-- processing just this data
# Scheduler executes echoer_world on priority 20
#     (('Hello',), {}) <-- processing just this data
#     (('Bonjour',), {})
#     (('Ola',), {})
# Scheduler puts echoer_world back into priority queue.
# Scheduler executes echoer_world on priority 18
#     (('Bonjour',), {}) <-- processing just this data
#     (('Ola',), {})
# Scheduler puts echoer_world back into priority queue.
# Scheduler executes echoer_world on priority 1
#     (('Ola',), {}) <-- processing just this data
# Scheduler executes sink on priority 1000
#     (('Goodbye, womb!',), {}) <-- processing just this data
#     (('Au revoir, womb!',), {})
#     (('Ciao, womb!',), {})
#     (('Hello, world!',), {})
#     (('Bonjour, world!',), {})
#     (('Ola, world!',), {})
# Sink got: Goodbye, womb!
# Scheduler puts sink back into priority queue.
# Scheduler executes sink on priority 1000
#     (('Au revoir, womb!',), {}) <-- processing just this data
#     (('Ciao, womb!',), {})
#     (('Hello, world!',), {})
#     (('Bonjour, world!',), {})
#     (('Ola, world!',), {})
# Sink got: Au revoir, womb!
# Scheduler puts sink back into priority queue.
# Scheduler executes sink on priority 1000
#     (('Ciao, womb!',), {}) <-- processing just this data
#     (('Hello, world!',), {})
#     (('Bonjour, world!',), {})
#     (('Ola, world!',), {})
# Sink got: Ciao, womb!
# Scheduler puts sink back into priority queue.
# Scheduler executes sink on priority 1000
#     (('Hello, world!',), {}) <-- processing just this data
#     (('Bonjour, world!',), {})
#     (('Ola, world!',), {})
# Sink got: Hello, world!
# Scheduler puts sink back into priority queue.
# Scheduler executes sink on priority 1000
#     (('Bonjour, world!',), {}) <-- processing just this data
#     (('Ola, world!',), {})
# Sink got: Bonjour, world!
# Scheduler puts sink back into priority queue.
# Scheduler executes sink on priority 1000
#     (('Ola, world!',), {}) <-- processing just this data
# Sink got: Ola, world!

The output here will be different each time it is run, since the priority for the echoer tasklets is reassessed every time they are put back into the queue.

Other Ideas


An Implicit Scheduler

An implicit scheduler is a scheduler that you don't have to manually create. Rather, it exists from the get-go as a singleton, and you'd do a from tasklet import Tasklet, scheduler. You don't add tasklets to it, either. When you create a Tasklet instance, the class would take care of registering it with the scheduler singleton. When you're done setting up your tasklets, you'd just call scheduler.run(), or possibly some other utility function that causes the scheduler to "step".

A Step Back


Dennis M. Ritchie on pipes and coroutines (search for 'pipes' and 'coroutine')
pythonect

Conclusion


This is where the adventure ends, I'm afraid. Hope you had fun. Thanks for bearing with me if all of this is dead familiar to you already, and I've screwed up or been sloppy somewhere along the line.

Certainly there is more to explore, and you can do so by reading up on the myriad of projects that use coroutines (see Further Reading, below). Many of them happen to define something called a 'tasklet' of their own. I'm not sure if there is a standard definition for this word kicking around somewhere. If you find it, please let me know in the comments.

Also, if you have an actual use case for coroutines, or "tasklets", feel free to let go in the comments as well!

Further Reading


Stackless  -->  Greenlet  -->  GeventCogenConcurrenceasyncoro

Monday, August 13, 2012

No Use Case: Meshing Part 2 - Tasklets



This is the second article (part 2) in a series of articles regarding the Python programming language. I periodically find myself playing with features of the language outside of any actual project. As such, discoveries that I make (some neat, others of little interest) often have no use case.

Despite having no purpose, I can't help but think that at some point these discoveries will prove useful. So I'm documenting them.

NOTE: I am unable resist writing to an invisible, and most likely non-existent, audience. The writing takes the tone that someone besides myself is reading this.


[Part 1]: Generators & Coroutines
[Part 2]: Tasklets (this article)
[Part 3]: Schedulers


In the part 1 of this article, I wrote:

From http://linux.die.net/Linux-CLI/the-unix-tools-philosophy.html, or "Chapter 3: the Unix Tools Philosophy": 
"A tool is a simple program, usually designed for a specific purpose..."
"[A pipe allows] the output of one program to be sent to the input of another. The tools philosophy was to have small programs to accomplish a particular task instead of trying to develop large monolithic programs to do a large number of tasks. To accomplish more complex tasks, tools would simply be connected together, using pipes."

However, I see no reason that one could not structure a large program to contain this philosophy internally. That is, inside of a single program one can have a bunch of functions that do small things to data - functions which can be connected together to form larger chains of work. The data is the water in a system of pipeworks, where each location that work can be done is a node (a function). The node is where the water is churned or spun around or split apart and then sent on its way(s) again.

In a mesh nodes do work on whatever fluids are sent their way (same as a pipeworks), but can also send work to zero or more other nodes in the entire mesh. That is, the pathways are not strictly defined outside of the control of the node - they're not rigid pipes. Nodes sit within some form of malleable landscape (read: mesh) and can rewire the pathways as they see fit. Nodes are smart, and contain internal logic that allows them to become more than just dumb processing stations in a pre-configured chain. They're programs unto themselves. That's all rather vague (to me, too!), so you'll have to keep reading if you're as curious as I am.

I then went on to discuss coroutines, and how I was implementing them at that time. Essentially, this was the pattern:

@coroutine
def multiplier_coroutine(multiplier):
    while True:    
        try:
            i = (yield)
            print (multiplier * i)
        except GeneratorExit:
            print ("multiplier has closed.")
            raise StopIteration

tripler = multiplier_coroutine(3)
tripler.send(3)

# 9
# multiplier has closed <- because it's been garbage collected when the script ends

That pattern is essentially what I was thinking could be a "node" in an amorphous pipeworks I want to call a "mesh". The water flowing through the mesh is the data; the paths the data take to flow between work stations are stretchable, bendable, rearrangeable pipes; the points of intersection where the work is done (the work stations) are the nodes (read: coroutines) - they determine what those stretchy pipes look like, at run-time. That is, they determine to which node or nodes (if any) they should pass the incoming data along to (after they've done some work on it).

Additionally, there would need to be some kind of prime mover that squished and prodded the entire mesh into doing actual work. Some sort of overarching "scheduler" that would take care of which node went next. More on that later.

NIHD (Not Invented Here Digression)


To keep on calling the coroutines "nodes" seemed a little out of place, since that word was (and is) widely used in other contexts to mean different things. So I came up with "tasks", and then "tasklets". As it turned out, "tasklet" was already taken: http://www.stackless.com/. Just search that page and you'll see what I mean. In fact, multiple projects seemed to be using this word. Which made me think I must have picked it up subconsciously from somewhere. Also, from the Stackless docs, lo and behold:

"Tasklets wrap functions, allowing them to be launched as microthreads to be run within the scheduler."

Son of a *.

Others had already worked on this, and quite heavily it seemed. Was I just reinventing "microthreads"? Not wanting to have someone else's fun spoil my own, I elected to forego mucking about in Stackless until after I'd run into a suitable denouement with my own explorations. If I studied up on Stackless' concepts, I might begin to lose the greenhorn edge: that fearless machette a beginner has that experts have long ago disdained as an embarassing and over-optimistic instrument. However, to see this concept stated so bluntly in different terms certainly helped to clarify what I was doing.

At the end of a good ten minutes or so with an online thesaurus, I decided to stick with "tasklet". I may have been suffering from NIH, but I decided to, at least temporarily, stick my head in the sand. So that's that.

[end digression]

The Tasklet


Basically, a tasklet (as defined by me) is just a class wrapper around a coroutine. To build the tasklet instance, you pass in an optional coroutine, and the optional args and kwargs that the coroutine will take when it is first fired up:

from tasklet import Tasklet

task = Tasklet(doubler, sink()      )
"""               ^       ^     ^
    -------       |       |     |_ [your coroutine kwargs]
    | API |       |       |_______ [your coroutine args]
    -------       |_______________ [your coroutine]

    Everything is optional.  You can do this:

        task = Tasklet()
 
    And it will still work. Empty tasklets are like
    black holes, though: anything sent to them will
    disappear forever. So:

        null = Tasklet()
"""

Here is the definition of tasklet.py as it stands:

import sys
import collections

_PY_VERSION = sys.version[0]

def coroutine(func):

    def start(*args, **kwargs):
   
        cr = func(*args, **kwargs)
        cr.next() if _PY_VERSION == 2 else next(cr)
        return cr

    start.__name__ = func.func_name if _PY_VERSION == 2 else func.__name__
    return start

class Tasklet(object):

    def __init__(self, fn=None, *args, **kwargs):
        self.set(fn, *args, **kwargs)

    def set(self, fn=None, *args, **kwargs):
   
        def default_coro(*args, **kwargs):
            try:
                while True:
                    i = (yield)
            except GeneratorExit:
                raise StopIteration        
   
        if fn == None:
            fn = default_coro        

        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.coro = fn(*args, **kwargs)
        self.coro.gi_frame.f_globals['self'] = self # magic injection!
        self.items = collections.deque()
        self.closed = False
        self._name = fn.__name__
   
        return self
   
    def send(self, *args, **kwargs):
        self.items.append((args, kwargs))
   
        return self

    def next(self):
        try:
            return self.__next__()
        except StopIteration:
            pass
   
    def last(self):
        try:
            return self.__next__(last=True)
        except StopIteration:
            pass
   
    def flush(self):    
        return [pending for pending in self]
   
    def clear(self):
        self.items.clear()
   
        return self

    def rescope(self, *args, **kwargs):
        dq = self.items
        self.set(self.fn, *args, **kwargs)
        self.items = dq

        return self

    def restart(self):

        self.set(self.fn, *self.args, **self.kwargs)
   
    def close(self):
        self.coro.close()
        self.closed = True
   
        return self

    def __call__(self, *args, **kwargs):
        return self.send(*args, **kwargs)
   
    def __next__(self, last=False):
   
        try:
            if last:
                args, kwargs = self.items.pop()
            else:
                args, kwargs = self.items.popleft()        
        except IndexError:
            raise StopIteration
   
        try:
            self.coro.send(*args, **kwargs)
            return (args, kwargs)
        except StopIteration:
            self.closed = True
            raise

        except TypeError as ex:
       
            # If the user neglects to decorate their coroutines with the
            # @decorator function defined in this module, you can catch that
            # here and make sure you start up the coroutine first.

            if str(ex) == "can't send non-None value to a just-started generator":
                self.coro.next() if _PY_VERSION == 2 else next(self.coro)
                self.coro.send(*args, **kwargs)
                return (args, kwargs)
            else:
                raise

    def __iter__(self):
   
        while True:
            yield self.__next__()

    def __repr__(self):
        return self._name

A queue (a deque, specifically) exists on the tasklet instance, into which you can pass work items (i.e. data). You can pass as much data as you like into the tasklet - nothing happens, it just piles up until you start calling the API on the tasklet to actually do some work. The API consists of the following methods:

    set
    send
    next
    last
    flush
    clear
    rescope
    restart
    close

As well, the tasklet is both a callable and an iterable. Calling the tasklet is exactly the same as invoking the send() method. Iterating over the tasklet is exactly the same as calling the next() method repeatedly.

A code example will make it clearer:

from tasklet import coroutine, Tasklet

# / / / SETUP SOME COROUTINES

@coroutine        
def doubler(target):
    while True:
        try:
            i = (yield)
            target.send('2x{}'.format(i))
        except GeneratorExit:
            target.send("Doubler has closed.")
            raise StopIteration

@coroutine
def tripler(target):
    while True:
        try:
            i = (yield)
            target.send('3x{}'.format(i))
        except GeneratorExit:
            target.send("Tripler has closed.")
            raise StopIteration

@coroutine
def sink():
    while True:
        try:
            i = (yield)
            print ("Sink got: {}".format(i))
        except GeneratorExit:
            print ("Sink has closed.")
            raise StopIteration

# / / / END SETTING UP COROUTINES



# / / / BEGIN EXAMPLE

task = Tasklet(doubler, sink())
   
task(1)      # queued up
task.send(2) # same thing, queued up; .send() is optional, can keep it for
             # readability if one likes

print ("Nothing has happened yet...")

for pending in task:        # this fires the send, iteratively, to exhaustion
    args, kwargs = pending  # you can examine what was sent, or just pass

print ("Okay, done pending jobs in task")
task.close() # explicitly close the underlying coroutine

print ("All done\n")

if task.closed:
    task.set(tripler, sink()) # resets the task to something else

task(4)
task(5)
task.flush() # runs all pending jobs in the task; does nothing if there
             # is nothing to do; you can examine what was sent, since it
             # returns all pending args; essentially, this is '[pending
             # for pending in task]'; you receive the result as a list
             # of tuples
         
task(6).next() # fires _next_ item that is pending - in this case, 6  
task(7) # queued
task.next() # fires next one - in this case, 7

task(8)
task(9).last() # fires _last_ item that is pending - in this case, 9
task.next() # 8

task.next() # does nothing if there is nothing to do,
            # could return a NO-OP signal instead  

task(10)
task(11)

task.clear() # clears all pending jobs

task(12)
task.flush()

@coroutine
def new_sink():
    while True:
      try:
          i = (yield)
          print ("New Sink got: {}".format(i))
      except GeneratorExit:
          raise StopIteration

task.rescope(new_sink()) # if you want the same coro, and only need
                         # to change the initial args/kwargs, you
                         # can 'rescope' the coroutine, instead of
                         # using set()
task(13).next()
task.close()
task(14).next() # nothing happens here
task.restart()
task(15).next()

If you run this, you will get the following output:

Nothing has happened yet...
Sink got: 2x1
Sink got: 2x2
Okay, done pending jobs in task
Sink got: Doubler has closed.
All done

Sink has closed.
Sink got: 3x4
Sink got: 3x5
Sink got: 3x6
Sink got: 3x7
Sink got: 3x9
Sink got: 3x8
Sink got: 3x12
Sink got: Tripler has closed.
Sink has closed.
New Sink got: 3x13
New Sink got: Tripler has closed.
New Sink got: 3x15
New Sink got: Tripler has closed.

The example illustrates all of a tasklet's methods, but here is a reiteration:
set (coroutine, *args, **kwargs)
(re)sets the tasklet with a new coroutine, coroutine args & kwargs
send (*args, **kwargs)
optionally used to queue work items onto the tasklet; note that you can just call the tasklet instead: tasklet(args, kwargs) is equivalent to tasklet.send(args, kwargs); also note that nothing actually occurs when you do this (no work is done on the work item)
next ()
causes the tasklet to process the next item in its queue; i.e. this fires the body of the coroutine inside the tasklet (the code after the 'while True' line); returns the data that the coroutine worked on as a tuple of (args, kwargs)
last ()
causes the tasklet to process the last item in its queue; returns the data that the coroutine worked on as a tuple of (args, kwargs)
flush ()
causes the tasklet to process all the items in its queue; returns all the data that the coroutine worked on as a list of tuples of (args, kwargs)
clear ()
clears the tasklet's queue
rescope (*args, **kwargs)
like set, but preserves the tasklet's initial coroutine and current work items; rescope only resets the coroutine's args & kwargs
 
restart ()
equivalent to set(self.fn, *self.args, **self.kwargs); can be used as shown, to start back up a closed tasklet
 
close ()
closes the tasklet
These methods all return the tasklet object itself, so one can chain calls:

task(1)(2)(3).flush().rescope(new_sink())(4).flush()

Back in part one, I chose to describe the tasklet as having two parts: an initialization part that set up some internal context (or state), and a behavioural part that did the actual work.

@coroutine        
def doubler(target):

    # the code here is run once, when the tasklet is created; in this case,
    # the 'target' variable is just injected and set up, and no further
    # action takes place

    while True:

        # the code here is run every time next() is called on the tasklet

        try:
            i = (yield)
            target.send('2x{}'.format(i))
        except GeneratorExit:
            target.send("Doubler has closed.")
            raise StopIteration

When you rescope doubler, you are just changing the target variable that comprises the tasklet's internal state. Nothing else changes. Any data (work items) previously sent to doubler remain in its queue. However, if you had some initialization code before the while True line, that would also fire once upon rescope. Additionally, any state you may have been keeping track of after the while True will also be gone.

A Few Points of Interest


Minimum Implementation

The pattern for a coroutine implemented thus far has been:

@coroutine        
def func(*args, **kwargs):

    # initialization code

    while True:

        # behavioural code
        try:
            val = (yield)
            # do something
        except GeneratorExit:
            raise StopIteration

The try-except GeneratorExit clause exists so that you have the option of forwarding a message somewhere when the tasklet closes (by being garbage collected, by explicitly closing, by rescoping or restarting, etc.). However, a minimum implementation is this:

@coroutine      
def func(*args, **kwargs):

    while True:
        i = (yield)

@coroutine

Well, the above is almost a minimum implementation: the @coroutine decorator is optional. The Tasklet class ensures that it is called on the coroutine to prime it up, even if you don't explicitly declare it on your coroutine definition. These are the same:

@coroutine
def func(*args, **kwargs):

    while True:
        i = (yield)    

task = Tasklet(func)

#...is equivalent to:

def func(*args, **kwargs):

    while True:
        i = (yield)    

task = Tasklet(func)

.send() vs. ()

Although the .send method exists for readability, it makes sense to me to just be able to call the tasklet with arguments. Therefore, these are equivalent:

task.send(1)
task(1)

self

There is a funny-looking line in the Tasklet class right after the coroutine is created in the __init__ method:

self.coro.gi_frame.f_globals['self'] = self # magic injection!

This little bit of magic injects the tasklet instance into the coroutine as a local variable called 'self'. It allows the coroutine definition to reference the tasklet instance:

@coroutine
def my_coro():

    while True:
        i = (yield)
        print ("{} got {}".format(self, i))

task = Tasklet(my_coro)
task(1).next()

# my_coro got 1

Recursion

Two tasklets can target each other without fear of hitting a recursion maximum:

from tasklet import Tasklet

NO_LOOPS = 10000

def first(target):

    while True:
        i = (yield) + 1
        if i > NO_LOOPS: print ("{} has {}".format(self, i))
        target(i)

def second(target):

    while True:
        target((yield))

first = Tasklet(first, None) # <-- pass in None as a placeholder for now
second = Tasklet(second, first)
first.rescope(second).send(1)

[task.next() for i in range(NO_LOOPS) for task in [first, second]]

# first has 10001

In part 3 of this adventure, I will (try to) discuss rudimentary schedulers that can be used to drive a mesh of tasklets.



[Part 1]: Generators & Coroutines
[Part 2]: Tasklets (this article)
[Part 3]: Schedulers