hashbeat
Thursday, March 28, 2013
Exposable: An Attribute Access Mixin/Wrapper
[link to code]
I'm writing/have written (what amounts to) an "application server" in Python & Javascript. User apps have two interrelated parts: a front-end JS part, and potentially a back-end Python part (if server-side logic is required). Sometimes I'd like the JS part of the app to read/write attributes of the Python part. It is tedious to have to check selectively against attribute access:
// js
promise = me.getAttr(["a", "b"]); // 'me' is a js app of some kind
promise.then(function (response) {
console.log(response.a); // prints '1'
console.log(response.b); // prints 'null'
});
# python
class SomePythonAppThatCounterpartsTheJsAppAbove(object):
def __init__(self):
self._a = 1
self._b = 2
@property
def a(self):
...
return self._a
@property
def b(self):
...
return self._b
def on_get(self, attr):
allowed = ['a']
if attr in allowed:
try:
return getattr(self, attr)
except AttributeError:
pass
# an implicit None value is returned here
This works fine and all, but the definition of the SomePythonAppThatCounterpartsTheJsAppAbove (from hereon called App) class is fluid, and as I add/remove/alter attributes I also need to remember to update that 'allowed' list inside the on_get method.
With the Exposable class I can simply decorate or un-decorate the attributes as I add them, remove them, rename them, or alter what they mean.
I also realized that sometimes the permissions on server-side app attributes were user-dependant. An admin required access to some parts of an app that a regular user would never have access to. Ideally I wanted to grab a "policy" from a user regarding an app in question, and then apply that policy to the app whenever the user requested access to attributes. So the Exposable class mutated into a wrapper, and then (because it was so simple!) a context manager.
You can check out the work on Github. Feedback definitely appreciated.
Labels:
attributes,
decorators,
mixin,
mixins,
Python,
wrapper
Wednesday, December 5, 2012
Twisted Static & WebSocket Server
Going forward, I've decided to use web technologies for as many applications as possible at work. The README for the github repo explains things nicely, so I won't repeat. Gist: it's a Twisted web server that serves up an initial app payload, then communicates with it via WebSocket thereafter.
I am absolutely and most definitely interested in comments/criticisms. Please do so if you've got the time! (Or fork and make better.)
[https://github.com/jomido/tswss]
I am absolutely and most definitely interested in comments/criticisms. Please do so if you've got the time! (Or fork and make better.)
[https://github.com/jomido/tswss]
Tuesday, October 9, 2012
No Use Case: Meshing Part 3 - Schedulers
This is the second article (part 3) in a series of articles regarding the Python programming language. I periodically find myself playing with features of the language outside of any actual project. As such, discoveries that I make (some neat, others of little interest) often have no use case.
Despite having no purpose, I can't help but think that at some point these discoveries will prove useful. So I'm documenting them.
NOTE: I am unable resist writing to an invisible, and most likely non-existent, audience. The writing takes the tone that someone besides myself is reading this.
[Part 1]: Generators & Coroutines
[Part 2]: Tasklets
In part 2 of this article I had a long example showcasing the tasklet, with sink, doubler, and tripler tasklets. In that example you passed in a coroutine (sink()) as an argument to the coroutine of the tasklet (doubler, or tripler). There is nothing stopping you from passing in another tasklet instead, so that one tasklet targets another tasklet (or multiple tasklets).
from tasklet import coroutine, Tasklet
@coroutine
def start(target):
while True:
i = (yield)
target(i * 2)
@coroutine
def finish():
while True:
i = (yield)
print (i)
f = Tasklet(finish)
s = Tasklet(start, f).send(1).next()
f.next()
# 2
@coroutine
def start(target):
while True:
i = (yield)
target(i * 2)
@coroutine
def finish():
while True:
i = (yield)
print (i)
f = Tasklet(finish)
s = Tasklet(start, f).send(1).next()
f.next()
# 2
A Simple Scheduler
The original vision was of a fabric-like, stretchy mesh of nodes that could be poked, prodded, and squished into work. For this I need some sort of prime mover to actuate the inert mesh into a living thing. Enter the scheduler.
Here is a simple one that interleaves paths as they branch throughout the mesh (I've left out the try-except GeneratorExit clauses, since they're not strictly necessary; also, I've foregone using .send(), and elected to just call the tasklets in this scenario - which is equivalent to .send() in the tasklet API):
from tasklet import coroutine, Tasklet
@coroutine
def start(target):
while True:
i = (yield)
target(i + 1)
@coroutine
def branch(left, right, rand):
import random
while True:
i = (yield)
if i % 2 == 0:
left(i)
elif random.randint(1, 2) == 1:
rand(i)
else:
right(i)
@coroutine
def doubler(target):
while True:
i = (yield)
target('2x{}'.format(i))
@coroutine
def tripler(target):
while True:
i = (yield)
target('3x{}'.format(i))
@coroutine
def recycler(beginning, sink):
import random
while True:
i = (yield)
if random.randint(1, 2) == 1:
beginning(i)
else:
sink(i)
@coroutine
def sink():
while True:
i = (yield)
print ("Sink got: {}".format(i))
def taskmaster(tasks):
"""######################################################################
So simple.
It basically takes a set of tasklets and runs them as a group,
interleaving as a natural consequence of how it steps over them.
When no one in the group succeeds at a flush() call, we know
there is no 'data' left to flow through this particular pipeworks,
or 'mesh' of tasklets.
For more fine-grained interleaving, task.flush() can be changed to
task.next(). However, for large meshes, or meshes where there are
chunks of work to be done concentrated in a single node of the mesh,
it would be less efficient.
This scheduler can be turned into a tasklet itself with minimal effort.
######################################################################"""
while any([task.flush() for task in tasks]): pass
s = Tasklet(sink)
d = Tasklet(doubler, s)
t = Tasklet(tripler, s)
b = Tasklet() # placeholder for now; we set this in just a second...
a = Tasklet(start, b)
r = Tasklet(recycler, a, s)
b.set(branch, d, t, r) # r needs a, a needs b, b needs r: circular reference!
# so must set b again to get access to r; didn't
# implement set() for this reason, but it seems like its
# come in handy for circular refs.
tasks = [s, d, t, b, a, r] # order doesn't matter here
a(0)(1)(2)(3) # queue up some stuff
taskmaster(tasks) # interleave
a(10)(11) # queue up some more
taskmaster(tasks) # interleave
If you run this, the result should be something similar to this:
Sink got: 2x2
Sink got: 2x4
Sink got: 3x1
Sink got: 3x3
Sink got: 11
Sink got: 2x12
@coroutine
def start(target):
while True:
i = (yield)
target(i + 1)
@coroutine
def branch(left, right, rand):
import random
while True:
i = (yield)
if i % 2 == 0:
left(i)
elif random.randint(1, 2) == 1:
rand(i)
else:
right(i)
@coroutine
def doubler(target):
while True:
i = (yield)
target('2x{}'.format(i))
@coroutine
def tripler(target):
while True:
i = (yield)
target('3x{}'.format(i))
@coroutine
def recycler(beginning, sink):
import random
while True:
i = (yield)
if random.randint(1, 2) == 1:
beginning(i)
else:
sink(i)
@coroutine
def sink():
while True:
i = (yield)
print ("Sink got: {}".format(i))
def taskmaster(tasks):
"""######################################################################
So simple.
It basically takes a set of tasklets and runs them as a group,
interleaving as a natural consequence of how it steps over them.
When no one in the group succeeds at a flush() call, we know
there is no 'data' left to flow through this particular pipeworks,
or 'mesh' of tasklets.
For more fine-grained interleaving, task.flush() can be changed to
task.next(). However, for large meshes, or meshes where there are
chunks of work to be done concentrated in a single node of the mesh,
it would be less efficient.
This scheduler can be turned into a tasklet itself with minimal effort.
######################################################################"""
while any([task.flush() for task in tasks]): pass
s = Tasklet(sink)
d = Tasklet(doubler, s)
t = Tasklet(tripler, s)
b = Tasklet() # placeholder for now; we set this in just a second...
a = Tasklet(start, b)
r = Tasklet(recycler, a, s)
b.set(branch, d, t, r) # r needs a, a needs b, b needs r: circular reference!
# so must set b again to get access to r; didn't
# implement set() for this reason, but it seems like its
# come in handy for circular refs.
tasks = [s, d, t, b, a, r] # order doesn't matter here
a(0)(1)(2)(3) # queue up some stuff
taskmaster(tasks) # interleave
a(10)(11) # queue up some more
taskmaster(tasks) # interleave
If you run this, the result should be something similar to this:
Sink got: 2x2
Sink got: 2x4
Sink got: 3x1
Sink got: 3x3
Sink got: 11
Sink got: 2x12
Of course, since both the 'branch' and 'recycler' coroutine implement a measure of randomness, the results will vary with each run. Any data sent to the branch coroutine has an equal chance of moving along to one of three other coroutines: doubler, tripler, and recycler. Any data sent to the recycler coroutine has a 50% chance of recycling back to the start coroutine, and a 50% chance of heading on to the sink coroutine.
The basic idea then, has been realized:
"In a mesh nodes do work on whatever fluids are sent their way (same as a pipeworks), but can also send work to zero or more other nodes in the entire mesh. That is, the pathways are not strictly defined outside of the control of the node - they're not rigid pipes. Nodes sit within some form of malleable landscape (read: mesh) and can rewire the pathways as they see fit. Nodes are smart, and contain internal logic that allows them to become more than just dumb processing stations in a pre-configured chain. They're programs unto themselves."
That's pretty much all there is to "meshing". There's a general prime mover that causes the meshing to happen, but each node in the mesh (tasklet) is free to contain behavioural logic that lets it dynamically push content (data) to different parts of the mesh.
A Priority Scheduler
Moving forward, I can imagine different kinds of schedulers. In particular, having some way to prioritize tasklets seems like it could come in handy. Tasklets, then, need a mechanism to tell the scheduler how important they are. Here is an extension of the Tasklet class, a PriorityTasklet:
from tasklet import coroutine, Tasklet
try:
from Queue import PriorityQueue, Empty # Python 2.x
except ImportError:
from queue import PriorityQueue, Empty # Python 3.x
class PriorityTasklet(Tasklet):
def __init__(self, fn=None, priority=None, *args, **kwargs):
Tasklet.__init__(self, fn, *args, **kwargs)
self.priority = priority
def get_priority(self):
if self.priority:
return self._get_value(self.priority)
else:
return 0
def name(self, name):
self._name = name
return self
def _get_value(self, obj):
# obj may either be some value, or a function that returns a
# value. In either case, this helper function returns the
# eventual value. This works to any depth. If the attribute
# is a function that returns a function, for instance, this
# will still get the eventual value. The functions must
# accept no parameters.
while callable(obj):
obj = obj()
return obj
try:
from Queue import PriorityQueue, Empty # Python 2.x
except ImportError:
from queue import PriorityQueue, Empty # Python 3.x
class PriorityTasklet(Tasklet):
def __init__(self, fn=None, priority=None, *args, **kwargs):
Tasklet.__init__(self, fn, *args, **kwargs)
self.priority = priority
def get_priority(self):
if self.priority:
return self._get_value(self.priority)
else:
return 0
def name(self, name):
self._name = name
return self
def _get_value(self, obj):
# obj may either be some value, or a function that returns a
# value. In either case, this helper function returns the
# eventual value. This works to any depth. If the attribute
# is a function that returns a function, for instance, this
# will still get the eventual value. The functions must
# accept no parameters.
while callable(obj):
obj = obj()
return obj
The 'priority' parameter accepts either an integer or a function that returns an integer. (Or a function that returns a function that returns an integer, etc.) The integer will be used to prioritize the tasklet in relation to other tasklets. The lower the integer, the higher the priority. I've also added a method to allow changing the name of the tasklet (which is normally automatically assigned as the name of the passed-in coroutine):
@coroutine
def echoer(target, suffix):
while True:
i = (yield)
target(", ".join([i, suffix]))
@coroutine
def sink():
while True:
i = (yield)
print ("Sink got: {}".format(i))
def random_priority():
return random.randint(1, 20)
s = PriorityTasklet(sink, 1000)
e1 = PriorityTasklet(echoer,
random_priority,
s,
"world!").name("echoer_world")
e2 = PriorityTasklet(echoer,
random_priority,
s,
"womb!").name("echoer_womb")
e1.send ("Hello") ("Bonjour") ("Ola") # queue up some work on echoer_world
e2.send ("Goodbye") ("Au revoir") ("Ciao") # queue up some work on echoer_womb
def echoer(target, suffix):
while True:
i = (yield)
target(", ".join([i, suffix]))
@coroutine
def sink():
while True:
i = (yield)
print ("Sink got: {}".format(i))
def random_priority():
return random.randint(1, 20)
s = PriorityTasklet(sink, 1000)
e1 = PriorityTasklet(echoer,
random_priority,
s,
"world!").name("echoer_world")
e2 = PriorityTasklet(echoer,
random_priority,
s,
"womb!").name("echoer_womb")
e1.send ("Hello") ("Bonjour") ("Ola") # queue up some work on echoer_world
e2.send ("Goodbye") ("Au revoir") ("Ciao") # queue up some work on echoer_womb
The priority for the sink is set purposefully low (1000; remember, the lower the number the higher the priority). The priority for the echoers is set to a function that returns a random number between 1 and 20.
The priority scheduler comes next, which inherits from the built-in PriorityQueue class:
class PriorityScheduler(PriorityQueue):
def __init__(self, tasklets=None):
PriorityQueue.__init__(self)
if tasklets == None:
self.tasklets = []
else:
self.tasklets = tasklets
def flush(self, incremental=False):
self._fill()
while self._process(incremental):
self._fill()
def _fill(self):
if self.tasklets:
([self.put((task.get_priority(), task))
for task in self.tasklets if task.items])
def _process(self, incremental):
results = []
while True:
try:
priority, task = self.get(block=False)
print ("Scheduler executes {} on priority {}".format(task, priority, task.items))
for i, item in enumerate(task.items):
if i == 0:
print (" {} <-- processing this item".format(item))
else:
print (" {}".format(item))
result = task.next() if incremental else task.flush()
if task.items:
print ("Scheduler puts {} back into priority queue.".format(task))
self.put((task.get_priority(), task))
try:
results.extend(result)
except TypeError: # result is not an iterable
results.append(result)
except Empty:
break
return results
def __init__(self, tasklets=None):
PriorityQueue.__init__(self)
if tasklets == None:
self.tasklets = []
else:
self.tasklets = tasklets
def flush(self, incremental=False):
self._fill()
while self._process(incremental):
self._fill()
def _fill(self):
if self.tasklets:
([self.put((task.get_priority(), task))
for task in self.tasklets if task.items])
def _process(self, incremental):
results = []
while True:
try:
priority, task = self.get(block=False)
print ("Scheduler executes {} on priority {}".format(task, priority, task.items))
for i, item in enumerate(task.items):
if i == 0:
print (" {} <-- processing this item".format(item))
else:
print (" {}".format(item))
result = task.next() if incremental else task.flush()
if task.items:
print ("Scheduler puts {} back into priority queue.".format(task))
self.put((task.get_priority(), task))
try:
results.extend(result)
except TypeError: # result is not an iterable
results.append(result)
except Empty:
break
return results
You create and call the scheduler like so, passing in the tasklets as a list:
scheduler = PriorityScheduler([s, e1, e2])
scheduler.flush()
scheduler.flush()
Which will result in similar output to this:
# Scheduler executes echoer_womb on priority 8
# (('Goodbye',), {}) <-- processing this data
# (('Au revoir',), {}) <-- processing this data
# (('Ciao',), {}) <-- processing this data
# Scheduler executes echoer_world on priority 15
# (('Hello',), {}) <-- processing this data
# (('Bonjour',), {}) <-- processing this data
# (('Ola',), {}) <-- processing this data
# Scheduler executes sink on priority 1000
# (('Goodbye, womb!',), {}) <-- processing this data
# (('Au revoir, womb!',), {}) <-- processing this data
# (('Ciao, womb!',), {}) <-- processing this data
# (('Hello, world!',), {}) <-- processing this data
# (('Bonjour, world!',), {}) <-- processing this data
# (('Ola, world!',), {}) <-- processing this data
# Sink got: Goodbye, womb!
# Sink got: Au revoir, womb!
# Sink got: Ciao, womb!
# Sink got: Hello, world!
# Sink got: Bonjour, world!
# Sink got: Ola, world!
# (('Goodbye',), {}) <-- processing this data
# (('Au revoir',), {}) <-- processing this data
# (('Ciao',), {}) <-- processing this data
# Scheduler executes echoer_world on priority 15
# (('Hello',), {}) <-- processing this data
# (('Bonjour',), {}) <-- processing this data
# (('Ola',), {}) <-- processing this data
# Scheduler executes sink on priority 1000
# (('Goodbye, womb!',), {}) <-- processing this data
# (('Au revoir, womb!',), {}) <-- processing this data
# (('Ciao, womb!',), {}) <-- processing this data
# (('Hello, world!',), {}) <-- processing this data
# (('Bonjour, world!',), {}) <-- processing this data
# (('Ola, world!',), {}) <-- processing this data
# Sink got: Goodbye, womb!
# Sink got: Au revoir, womb!
# Sink got: Ciao, womb!
# Sink got: Hello, world!
# Sink got: Bonjour, world!
# Sink got: Ola, world!
Optionally, you can specify that the scheduler should call next() on the tasklets that it processes from its queue, rather than flush():
scheduler.flush(incremental=True)
Which will make the processing somewhat more interleaved:
# Scheduler executes echoer_womb on priority 16
# (('Goodbye',), {}) <-- processing just this data
# (('Au revoir',), {})
# (('Ciao',), {})
# Scheduler puts echoer_womb back into priority queue.
# Scheduler executes echoer_womb on priority 1
# (('Au revoir',), {}) <-- processing just this data
# (('Ciao',), {})
# Scheduler puts echoer_womb back into priority queue.
# Scheduler executes echoer_womb on priority 10
# (('Ciao',), {}) <-- processing just this data
# Scheduler executes echoer_world on priority 20
# (('Hello',), {}) <-- processing just this data
# (('Bonjour',), {})
# (('Ola',), {})
# Scheduler puts echoer_world back into priority queue.
# Scheduler executes echoer_world on priority 18
# (('Bonjour',), {}) <-- processing just this data
# (('Ola',), {})
# Scheduler puts echoer_world back into priority queue.
# Scheduler executes echoer_world on priority 1
# (('Ola',), {}) <-- processing just this data
# Scheduler executes sink on priority 1000
# (('Goodbye, womb!',), {}) <-- processing just this data
# (('Au revoir, womb!',), {})
# (('Ciao, womb!',), {})
# (('Hello, world!',), {})
# (('Bonjour, world!',), {})
# (('Ola, world!',), {})
# Sink got: Goodbye, womb!
# Scheduler puts sink back into priority queue.
# Scheduler executes sink on priority 1000
# (('Au revoir, womb!',), {}) <-- processing just this data
# (('Ciao, womb!',), {})
# (('Hello, world!',), {})
# (('Bonjour, world!',), {})
# (('Ola, world!',), {})
# Sink got: Au revoir, womb!
# Scheduler puts sink back into priority queue.
# Scheduler executes sink on priority 1000
# (('Ciao, womb!',), {}) <-- processing just this data
# (('Hello, world!',), {})
# (('Bonjour, world!',), {})
# (('Ola, world!',), {})
# Sink got: Ciao, womb!
# Scheduler puts sink back into priority queue.
# Scheduler executes sink on priority 1000
# (('Hello, world!',), {}) <-- processing just this data
# (('Bonjour, world!',), {})
# (('Ola, world!',), {})
# Sink got: Hello, world!
# Scheduler puts sink back into priority queue.
# Scheduler executes sink on priority 1000
# (('Bonjour, world!',), {}) <-- processing just this data
# (('Ola, world!',), {})
# Sink got: Bonjour, world!
# Scheduler puts sink back into priority queue.
# Scheduler executes sink on priority 1000
# (('Ola, world!',), {}) <-- processing just this data
# Sink got: Ola, world!
# (('Goodbye',), {}) <-- processing just this data
# (('Au revoir',), {})
# (('Ciao',), {})
# Scheduler puts echoer_womb back into priority queue.
# Scheduler executes echoer_womb on priority 1
# (('Au revoir',), {}) <-- processing just this data
# (('Ciao',), {})
# Scheduler puts echoer_womb back into priority queue.
# Scheduler executes echoer_womb on priority 10
# (('Ciao',), {}) <-- processing just this data
# Scheduler executes echoer_world on priority 20
# (('Hello',), {}) <-- processing just this data
# (('Bonjour',), {})
# (('Ola',), {})
# Scheduler puts echoer_world back into priority queue.
# Scheduler executes echoer_world on priority 18
# (('Bonjour',), {}) <-- processing just this data
# (('Ola',), {})
# Scheduler puts echoer_world back into priority queue.
# Scheduler executes echoer_world on priority 1
# (('Ola',), {}) <-- processing just this data
# Scheduler executes sink on priority 1000
# (('Goodbye, womb!',), {}) <-- processing just this data
# (('Au revoir, womb!',), {})
# (('Ciao, womb!',), {})
# (('Hello, world!',), {})
# (('Bonjour, world!',), {})
# (('Ola, world!',), {})
# Sink got: Goodbye, womb!
# Scheduler puts sink back into priority queue.
# Scheduler executes sink on priority 1000
# (('Au revoir, womb!',), {}) <-- processing just this data
# (('Ciao, womb!',), {})
# (('Hello, world!',), {})
# (('Bonjour, world!',), {})
# (('Ola, world!',), {})
# Sink got: Au revoir, womb!
# Scheduler puts sink back into priority queue.
# Scheduler executes sink on priority 1000
# (('Ciao, womb!',), {}) <-- processing just this data
# (('Hello, world!',), {})
# (('Bonjour, world!',), {})
# (('Ola, world!',), {})
# Sink got: Ciao, womb!
# Scheduler puts sink back into priority queue.
# Scheduler executes sink on priority 1000
# (('Hello, world!',), {}) <-- processing just this data
# (('Bonjour, world!',), {})
# (('Ola, world!',), {})
# Sink got: Hello, world!
# Scheduler puts sink back into priority queue.
# Scheduler executes sink on priority 1000
# (('Bonjour, world!',), {}) <-- processing just this data
# (('Ola, world!',), {})
# Sink got: Bonjour, world!
# Scheduler puts sink back into priority queue.
# Scheduler executes sink on priority 1000
# (('Ola, world!',), {}) <-- processing just this data
# Sink got: Ola, world!
The output here will be different each time it is run, since the priority for the echoer tasklets is reassessed every time they are put back into the queue.
Other Ideas
An Implicit Scheduler
An implicit scheduler is a scheduler that you don't have to manually create. Rather, it exists from the get-go as a singleton, and you'd do a from tasklet import Tasklet, scheduler. You don't add tasklets to it, either. When you create a Tasklet instance, the class would take care of registering it with the scheduler singleton. When you're done setting up your tasklets, you'd just call scheduler.run(), or possibly some other utility function that causes the scheduler to "step".
A Step Back
Dennis M. Ritchie on pipes and coroutines (search for 'pipes' and 'coroutine')
pythonect
Conclusion
This is where the adventure ends, I'm afraid. Hope you had fun. Thanks for bearing with me if all of this is dead familiar to you already, and I've screwed up or been sloppy somewhere along the line.
Certainly there is more to explore, and you can do so by reading up on the myriad of projects that use coroutines (see Further Reading, below). Many of them happen to define something called a 'tasklet' of their own. I'm not sure if there is a standard definition for this word kicking around somewhere. If you find it, please let me know in the comments.
Also, if you have an actual use case for coroutines, or "tasklets", feel free to let go in the comments as well!
Further Reading
Stackless --> Greenlet --> Gevent, Cogen, Concurrence, asyncoro
Labels:
coroutine,
coroutines,
generator,
generators,
meshing,
Python,
scheduler,
schedulers,
tasklet,
tasklets
Monday, August 13, 2012
No Use Case: Meshing Part 2 - Tasklets
This is the second article (part 2) in a series of articles regarding the Python programming language. I periodically find myself playing with features of the language outside of any actual project. As such, discoveries that I make (some neat, others of little interest) often have no use case.
Despite having no purpose, I can't help but think that at some point these discoveries will prove useful. So I'm documenting them.
NOTE: I am unable resist writing to an invisible, and most likely non-existent, audience. The writing takes the tone that someone besides myself is reading this.
[Part 1]: Generators & Coroutines
[Part 2]: Tasklets (this article)
[Part 3]: Schedulers
In the part 1 of this article, I wrote:
From http://linux.die.net/Linux-CLI/the-unix-tools-philosophy.html, or "Chapter 3: the Unix Tools Philosophy":
"A tool is a simple program, usually designed for a specific purpose..."
"[A pipe allows] the output of one program to be sent to the input of another. The tools philosophy was to have small programs to accomplish a particular task instead of trying to develop large monolithic programs to do a large number of tasks. To accomplish more complex tasks, tools would simply be connected together, using pipes."
However, I see no reason that one could not structure a large program to contain this philosophy internally. That is, inside of a single program one can have a bunch of functions that do small things to data - functions which can be connected together to form larger chains of work. The data is the water in a system of pipeworks, where each location that work can be done is a node (a function). The node is where the water is churned or spun around or split apart and then sent on its way(s) again.
In a mesh nodes do work on whatever fluids are sent their way (same as a pipeworks), but can also send work to zero or more other nodes in the entire mesh. That is, the pathways are not strictly defined outside of the control of the node - they're not rigid pipes. Nodes sit within some form of malleable landscape (read: mesh) and can rewire the pathways as they see fit. Nodes are smart, and contain internal logic that allows them to become more than just dumb processing stations in a pre-configured chain. They're programs unto themselves. That's all rather vague (to me, too!), so you'll have to keep reading if you're as curious as I am.
I then went on to discuss coroutines, and how I was implementing them at that time. Essentially, this was the pattern:
@coroutine
def multiplier_coroutine(multiplier):
while True:
try:
i = (yield)
print (multiplier * i)
except GeneratorExit:
print ("multiplier has closed.")
raise StopIteration
tripler = multiplier_coroutine(3)
tripler.send(3)
# 9
# multiplier has closed <- because it's been garbage collected when the script ends
def multiplier_coroutine(multiplier):
while True:
try:
i = (yield)
print (multiplier * i)
except GeneratorExit:
print ("multiplier has closed.")
raise StopIteration
tripler = multiplier_coroutine(3)
tripler.send(3)
# 9
# multiplier has closed <- because it's been garbage collected when the script ends
That pattern is essentially what I was thinking could be a "node" in an amorphous pipeworks I want to call a "mesh". The water flowing through the mesh is the data; the paths the data take to flow between work stations are stretchable, bendable, rearrangeable pipes; the points of intersection where the work is done (the work stations) are the nodes (read: coroutines) - they determine what those stretchy pipes look like, at run-time. That is, they determine to which node or nodes (if any) they should pass the incoming data along to (after they've done some work on it).
Additionally, there would need to be some kind of prime mover that squished and prodded the entire mesh into doing actual work. Some sort of overarching "scheduler" that would take care of which node went next. More on that later.
NIHD (Not Invented Here Digression)
To keep on calling the coroutines "nodes" seemed a little out of place, since that word was (and is) widely used in other contexts to mean different things. So I came up with "tasks", and then "tasklets". As it turned out, "tasklet" was already taken: http://www.stackless.com/. Just search that page and you'll see what I mean. In fact, multiple projects seemed to be using this word. Which made me think I must have picked it up subconsciously from somewhere. Also, from the Stackless docs, lo and behold:
"Tasklets wrap functions, allowing them to be launched as microthreads to be run within the scheduler."
Son of a *.
Others had already worked on this, and quite heavily it seemed. Was I just reinventing "microthreads"? Not wanting to have someone else's fun spoil my own, I elected to forego mucking about in Stackless until after I'd run into a suitable denouement with my own explorations. If I studied up on Stackless' concepts, I might begin to lose the greenhorn edge: that fearless machette a beginner has that experts have long ago disdained as an embarassing and over-optimistic instrument. However, to see this concept stated so bluntly in different terms certainly helped to clarify what I was doing.
At the end of a good ten minutes or so with an online thesaurus, I decided to stick with "tasklet". I may have been suffering from NIH, but I decided to, at least temporarily, stick my head in the sand. So that's that.
[end digression]
The Tasklet
Basically, a tasklet (as defined by me) is just a class wrapper around a coroutine. To build the tasklet instance, you pass in an optional coroutine, and the optional args and kwargs that the coroutine will take when it is first fired up:
from tasklet import Tasklet
task = Tasklet(doubler, sink() )
""" ^ ^ ^
------- | | |_ [your coroutine kwargs]
| API | | |_______ [your coroutine args]
------- |_______________ [your coroutine]
Everything is optional. You can do this:
task = Tasklet()
And it will still work. Empty tasklets are like
black holes, though: anything sent to them will
disappear forever. So:
null = Tasklet()
"""
task = Tasklet(doubler, sink() )
""" ^ ^ ^
------- | | |_ [your coroutine kwargs]
| API | | |_______ [your coroutine args]
------- |_______________ [your coroutine]
Everything is optional. You can do this:
task = Tasklet()
And it will still work. Empty tasklets are like
black holes, though: anything sent to them will
disappear forever. So:
null = Tasklet()
"""
Here is the definition of tasklet.py as it stands:
import sys
import collections
_PY_VERSION = sys.version[0]
def coroutine(func):
def start(*args, **kwargs):
cr = func(*args, **kwargs)
cr.next() if _PY_VERSION == 2 else next(cr)
return cr
start.__name__ = func.func_name if _PY_VERSION == 2 else func.__name__
return start
class Tasklet(object):
def __init__(self, fn=None, *args, **kwargs):
self.set(fn, *args, **kwargs)
def set(self, fn=None, *args, **kwargs):
def default_coro(*args, **kwargs):
try:
while True:
i = (yield)
except GeneratorExit:
raise StopIteration
if fn == None:
fn = default_coro
self.fn = fn
self.args = args
self.kwargs = kwargs
self.coro = fn(*args, **kwargs)
self.coro.gi_frame.f_globals['self'] = self # magic injection!
self.items = collections.deque()
self.closed = False
self._name = fn.__name__
return self
def send(self, *args, **kwargs):
self.items.append((args, kwargs))
return self
def next(self):
try:
return self.__next__()
except StopIteration:
pass
def last(self):
try:
return self.__next__(last=True)
except StopIteration:
pass
def flush(self):
return [pending for pending in self]
def clear(self):
self.items.clear()
return self
def rescope(self, *args, **kwargs):
dq = self.items
self.set(self.fn, *args, **kwargs)
self.items = dq
return self
def restart(self):
self.set(self.fn, *self.args, **self.kwargs)
def close(self):
self.coro.close()
self.closed = True
return self
def __call__(self, *args, **kwargs):
return self.send(*args, **kwargs)
def __next__(self, last=False):
try:
if last:
args, kwargs = self.items.pop()
else:
args, kwargs = self.items.popleft()
except IndexError:
raise StopIteration
try:
self.coro.send(*args, **kwargs)
return (args, kwargs)
except StopIteration:
self.closed = True
raise
except TypeError as ex:
# If the user neglects to decorate their coroutines with the
# @decorator function defined in this module, you can catch that
# here and make sure you start up the coroutine first.
if str(ex) == "can't send non-None value to a just-started generator":
self.coro.next() if _PY_VERSION == 2 else next(self.coro)
self.coro.send(*args, **kwargs)
return (args, kwargs)
else:
raise
def __iter__(self):
while True:
yield self.__next__()
def __repr__(self):
return self._name
import collections
_PY_VERSION = sys.version[0]
def coroutine(func):
def start(*args, **kwargs):
cr = func(*args, **kwargs)
cr.next() if _PY_VERSION == 2 else next(cr)
return cr
start.__name__ = func.func_name if _PY_VERSION == 2 else func.__name__
return start
class Tasklet(object):
def __init__(self, fn=None, *args, **kwargs):
self.set(fn, *args, **kwargs)
def set(self, fn=None, *args, **kwargs):
def default_coro(*args, **kwargs):
try:
while True:
i = (yield)
except GeneratorExit:
raise StopIteration
if fn == None:
fn = default_coro
self.fn = fn
self.args = args
self.kwargs = kwargs
self.coro = fn(*args, **kwargs)
self.coro.gi_frame.f_globals['self'] = self # magic injection!
self.items = collections.deque()
self.closed = False
self._name = fn.__name__
return self
def send(self, *args, **kwargs):
self.items.append((args, kwargs))
return self
def next(self):
try:
return self.__next__()
except StopIteration:
pass
def last(self):
try:
return self.__next__(last=True)
except StopIteration:
pass
def flush(self):
return [pending for pending in self]
def clear(self):
self.items.clear()
return self
def rescope(self, *args, **kwargs):
dq = self.items
self.set(self.fn, *args, **kwargs)
self.items = dq
return self
def restart(self):
self.set(self.fn, *self.args, **self.kwargs)
def close(self):
self.coro.close()
self.closed = True
return self
def __call__(self, *args, **kwargs):
return self.send(*args, **kwargs)
def __next__(self, last=False):
try:
if last:
args, kwargs = self.items.pop()
else:
args, kwargs = self.items.popleft()
except IndexError:
raise StopIteration
try:
self.coro.send(*args, **kwargs)
return (args, kwargs)
except StopIteration:
self.closed = True
raise
except TypeError as ex:
# If the user neglects to decorate their coroutines with the
# @decorator function defined in this module, you can catch that
# here and make sure you start up the coroutine first.
if str(ex) == "can't send non-None value to a just-started generator":
self.coro.next() if _PY_VERSION == 2 else next(self.coro)
self.coro.send(*args, **kwargs)
return (args, kwargs)
else:
raise
def __iter__(self):
while True:
yield self.__next__()
def __repr__(self):
return self._name
A queue (a deque, specifically) exists on the tasklet instance, into which you can pass work items (i.e. data). You can pass as much data as you like into the tasklet - nothing happens, it just piles up until you start calling the API on the tasklet to actually do some work. The API consists of the following methods:
set
send
next
last
flush
clear
rescope
restart
close
As well, the tasklet is both a callable and an iterable. Calling the tasklet is exactly the same as invoking the send() method. Iterating over the tasklet is exactly the same as calling the next() method repeatedly.
A code example will make it clearer:
from tasklet import coroutine, Tasklet
# / / / SETUP SOME COROUTINES
@coroutine
def doubler(target):
while True:
try:
i = (yield)
target.send('2x{}'.format(i))
except GeneratorExit:
target.send("Doubler has closed.")
raise StopIteration
@coroutine
def tripler(target):
while True:
try:
i = (yield)
target.send('3x{}'.format(i))
except GeneratorExit:
target.send("Tripler has closed.")
raise StopIteration
@coroutine
def sink():
while True:
try:
i = (yield)
print ("Sink got: {}".format(i))
except GeneratorExit:
print ("Sink has closed.")
raise StopIteration
# / / / END SETTING UP COROUTINES
# / / / BEGIN EXAMPLE
task = Tasklet(doubler, sink())
task(1) # queued up
task.send(2) # same thing, queued up; .send() is optional, can keep it for
# readability if one likes
print ("Nothing has happened yet...")
for pending in task: # this fires the send, iteratively, to exhaustion
args, kwargs = pending # you can examine what was sent, or just pass
print ("Okay, done pending jobs in task")
task.close() # explicitly close the underlying coroutine
print ("All done\n")
if task.closed:
task.set(tripler, sink()) # resets the task to something else
task(4)
task(5)
task.flush() # runs all pending jobs in the task; does nothing if there
# is nothing to do; you can examine what was sent, since it
# returns all pending args; essentially, this is '[pending
# for pending in task]'; you receive the result as a list
# of tuples
task(6).next() # fires _next_ item that is pending - in this case, 6
task(7) # queued
task.next() # fires next one - in this case, 7
task(8)
task(9).last() # fires _last_ item that is pending - in this case, 9
task.next() # 8
task.next() # does nothing if there is nothing to do,
# could return a NO-OP signal instead
task(10)
task(11)
task.clear() # clears all pending jobs
task(12)
task.flush()
@coroutine
def new_sink():
while True:
try:
i = (yield)
print ("New Sink got: {}".format(i))
except GeneratorExit:
raise StopIteration
task.rescope(new_sink()) # if you want the same coro, and only need
# to change the initial args/kwargs, you
# can 'rescope' the coroutine, instead of
# using set()
task(13).next()
task.close()
task(14).next() # nothing happens here
task.restart()
task(15).next()
# / / / SETUP SOME COROUTINES
@coroutine
def doubler(target):
while True:
try:
i = (yield)
target.send('2x{}'.format(i))
except GeneratorExit:
target.send("Doubler has closed.")
raise StopIteration
@coroutine
def tripler(target):
while True:
try:
i = (yield)
target.send('3x{}'.format(i))
except GeneratorExit:
target.send("Tripler has closed.")
raise StopIteration
@coroutine
def sink():
while True:
try:
i = (yield)
print ("Sink got: {}".format(i))
except GeneratorExit:
print ("Sink has closed.")
raise StopIteration
# / / / END SETTING UP COROUTINES
# / / / BEGIN EXAMPLE
task = Tasklet(doubler, sink())
task(1) # queued up
task.send(2) # same thing, queued up; .send() is optional, can keep it for
# readability if one likes
print ("Nothing has happened yet...")
for pending in task: # this fires the send, iteratively, to exhaustion
args, kwargs = pending # you can examine what was sent, or just pass
print ("Okay, done pending jobs in task")
task.close() # explicitly close the underlying coroutine
print ("All done\n")
if task.closed:
task.set(tripler, sink()) # resets the task to something else
task(4)
task(5)
task.flush() # runs all pending jobs in the task; does nothing if there
# is nothing to do; you can examine what was sent, since it
# returns all pending args; essentially, this is '[pending
# for pending in task]'; you receive the result as a list
# of tuples
task(6).next() # fires _next_ item that is pending - in this case, 6
task(7) # queued
task.next() # fires next one - in this case, 7
task(8)
task(9).last() # fires _last_ item that is pending - in this case, 9
task.next() # 8
task.next() # does nothing if there is nothing to do,
# could return a NO-OP signal instead
task(10)
task(11)
task.clear() # clears all pending jobs
task(12)
task.flush()
@coroutine
def new_sink():
while True:
try:
i = (yield)
print ("New Sink got: {}".format(i))
except GeneratorExit:
raise StopIteration
task.rescope(new_sink()) # if you want the same coro, and only need
# to change the initial args/kwargs, you
# can 'rescope' the coroutine, instead of
# using set()
task(13).next()
task.close()
task(14).next() # nothing happens here
task.restart()
task(15).next()
If you run this, you will get the following output:
Nothing has happened yet...
Sink got: 2x1
Sink got: 2x2
Okay, done pending jobs in task
Sink got: Doubler has closed.
All done
Sink has closed.
Sink got: 3x4
Sink got: 3x5
Sink got: 3x6
Sink got: 3x7
Sink got: 3x9
Sink got: 3x8
Sink got: 3x12
Sink got: Tripler has closed.
Sink has closed.
New Sink got: 3x13
New Sink got: Tripler has closed.
New Sink got: 3x15
New Sink got: Tripler has closed.
The example illustrates all of a tasklet's methods, but here is a reiteration:
set (coroutine, *args, **kwargs)
(re)sets the tasklet with a new coroutine, coroutine args & kwargs
send (*args, **kwargs)
optionally used to queue work items onto the tasklet; note that you can just call the tasklet instead: tasklet(args, kwargs) is equivalent to tasklet.send(args, kwargs); also note that nothing actually occurs when you do this (no work is done on the work item)
next ()
causes the tasklet to process the next item in its queue; i.e. this fires the body of the coroutine inside the tasklet (the code after the 'while True' line); returns the data that the coroutine worked on as a tuple of (args, kwargs)
last ()
causes the tasklet to process the last item in its queue; returns the data that the coroutine worked on as a tuple of (args, kwargs)
flush ()
causes the tasklet to process all the items in its queue; returns all the data that the coroutine worked on as a list of tuples of (args, kwargs)
clear ()
clears the tasklet's queue
rescope (*args, **kwargs)These methods all return the tasklet object itself, so one can chain calls:
like set, but preserves the tasklet's initial coroutine and current work items; rescope only resets the coroutine's args & kwargs
restart ()
equivalent to set(self.fn, *self.args, **self.kwargs); can be used as shown, to start back up a closed tasklet
close ()
closes the tasklet
task(1)(2)(3).flush().rescope(new_sink())(4).flush()
Back in part one, I chose to describe the tasklet as having two parts: an initialization part that set up some internal context (or state), and a behavioural part that did the actual work.
@coroutine
def doubler(target):
# the code here is run once, when the tasklet is created; in this case,
# the 'target' variable is just injected and set up, and no further
# action takes place
while True:
# the code here is run every time next() is called on the tasklet
try:
i = (yield)
target.send('2x{}'.format(i))
except GeneratorExit:
target.send("Doubler has closed.")
raise StopIteration
def doubler(target):
# the code here is run once, when the tasklet is created; in this case,
# the 'target' variable is just injected and set up, and no further
# action takes place
while True:
# the code here is run every time next() is called on the tasklet
try:
i = (yield)
target.send('2x{}'.format(i))
except GeneratorExit:
target.send("Doubler has closed.")
raise StopIteration
When you rescope doubler, you are just changing the target variable that comprises the tasklet's internal state. Nothing else changes. Any data (work items) previously sent to doubler remain in its queue. However, if you had some initialization code before the while True line, that would also fire once upon rescope. Additionally, any state you may have been keeping track of after the while True will also be gone.
A Few Points of Interest
Minimum Implementation
The pattern for a coroutine implemented thus far has been:
@coroutine
def func(*args, **kwargs):
# initialization code
while True:
# behavioural code
try:
val = (yield)
# do something
except GeneratorExit:
raise StopIteration
def func(*args, **kwargs):
# initialization code
while True:
# behavioural code
try:
val = (yield)
# do something
except GeneratorExit:
raise StopIteration
The try-except GeneratorExit clause exists so that you have the option of forwarding a message somewhere when the tasklet closes (by being garbage collected, by explicitly closing, by rescoping or restarting, etc.). However, a minimum implementation is this:
@coroutine
def func(*args, **kwargs):
while True:
i = (yield)
def func(*args, **kwargs):
while True:
i = (yield)
@coroutine
Well, the above is almost a minimum implementation: the @coroutine decorator is optional. The Tasklet class ensures that it is called on the coroutine to prime it up, even if you don't explicitly declare it on your coroutine definition. These are the same:
@coroutine
def func(*args, **kwargs):
while True:
i = (yield)
task = Tasklet(func)
#...is equivalent to:
def func(*args, **kwargs):
while True:
i = (yield)
task = Tasklet(func)
def func(*args, **kwargs):
while True:
i = (yield)
task = Tasklet(func)
#...is equivalent to:
def func(*args, **kwargs):
while True:
i = (yield)
task = Tasklet(func)
.send() vs. ()
Although the .send method exists for readability, it makes sense to me to just be able to call the tasklet with arguments. Therefore, these are equivalent:
task.send(1)
task(1)
task(1)
self
There is a funny-looking line in the Tasklet class right after the coroutine is created in the __init__ method:
self.coro.gi_frame.f_globals['self'] = self # magic injection!
This little bit of magic injects the tasklet instance into the coroutine as a local variable called 'self'. It allows the coroutine definition to reference the tasklet instance:
@coroutine
def my_coro():
while True:
i = (yield)
print ("{} got {}".format(self, i))
task = Tasklet(my_coro)
task(1).next()
# my_coro got 1
def my_coro():
while True:
i = (yield)
print ("{} got {}".format(self, i))
task = Tasklet(my_coro)
task(1).next()
# my_coro got 1
Recursion
Two tasklets can target each other without fear of hitting a recursion maximum:
from tasklet import Tasklet
NO_LOOPS = 10000
def first(target):
while True:
i = (yield) + 1
if i > NO_LOOPS: print ("{} has {}".format(self, i))
target(i)
def second(target):
while True:
target((yield))
first = Tasklet(first, None) # <-- pass in None as a placeholder for now
second = Tasklet(second, first)
first.rescope(second).send(1)
[task.next() for i in range(NO_LOOPS) for task in [first, second]]
# first has 10001
NO_LOOPS = 10000
def first(target):
while True:
i = (yield) + 1
if i > NO_LOOPS: print ("{} has {}".format(self, i))
target(i)
def second(target):
while True:
target((yield))
first = Tasklet(first, None) # <-- pass in None as a placeholder for now
second = Tasklet(second, first)
first.rescope(second).send(1)
[task.next() for i in range(NO_LOOPS) for task in [first, second]]
# first has 10001
In part 3 of this adventure, I will (try to) discuss rudimentary schedulers that can be used to drive a mesh of tasklets.
[Part 1]: Generators & Coroutines
[Part 2]: Tasklets (this article)
[Part 3]: Schedulers
Labels:
coroutine,
coroutines,
generator,
generators,
meshing,
Python,
scheduler,
schedulers,
tasklet,
tasklets
Subscribe to:
Posts (Atom)
