Programmazione 4 commenti
Programmare multi processi facilmente con Twisted Matrix /1
diL'aumento del numero di core nei processori moderni e la necessità di sfruttare il più possibile il parallelismo tra diverse macchine, rende necessario trovare metodi efficaci per ottenere il massimo dei vantaggi con il minimo dello sforzo e della complessità. Storicamente si è sempre ricorso alla programmazione multi-thread shared-state ma questa è soggetta a numerosi problemi come ad esempio la gestione dei lock e dei semafori, la scalabilità sia nella stessa macchina che orizzontalmente su piu` macchine, l'imprevedibilità delle ottimizzazioni attuate dai compilatori, la complessità di unit-testing e debugging, i modelli di coerenza e consistenza della cache dei processori e della RAM e altri.
È però chiaro che esistono numerose alternative al modello shared-state, come il modello ad Actors, implementato ad esempio da Erlang o da Scala sebbene in modo molto differente tra loro, ma in questo articolo ci soffermeremo con particolare attenzione sulla gestione di più processi heavyweight in comunicazione tra loro. Il tutto sarà gestito attraverso un framework asincrono a eventi chiamato Twisted Matrix.
Perché proprio questo modello?
Una delle limitazioni, a seconda dei punti di vista, di Python è la presenza del GIL (Global Interpreter Lock). La Virtual Machine dell'implementazione di riferimento del linguaggio, chiamata anche CPython, non è infatti thread-safe e richiede la presenza di un Giant Lock per evitare di corrompere lo stato di esecuzione del processo, questo però non permette di sfruttare, attraverso il multi-threading, la presenza di più processori, con l'eccezione del codice scritto a più basso livello che può rilasciare il GIL per dedicarsi alla sua esecuzione senza intralciare la VM. Nel passato si è cercato numerose volte di rimuovere il GIL per sostituirlo con una serie di lock più fini e precisi, tuttavia tutti gli sforzi hanno prodotto delle versioni di Python almeno 2 volte più lente della versione con GIL e che complicavano incredibilmente la scrittura di moduli di estensione in C.
Il metodo più semplice per risolvere questo tipo di problematiche in Python è l'uso di più processi long-lived dedicati ognuno a uno specifico compito, un tipo di approcio simile a quello degli Actors. Il problema con questo tipo di soluzione è che i processi sono generalmente più pesanti dei thread e senza una soluzione per gestire la concorrenza non stiamo effettivamente risolvendo il problema in quanto ci ritroviamo nella situazione in cui i processi eseguiranno una singola operazione per volta, ipotesi accettabile solo nel caso in cui questi processi worker siano impegnati soltanto con lunghe operazioni di calcolo e pochissimo I/O. Per risolvere anche questo problema ci appoggeremo al modello asincrono sfruttando poi le funzionalità messe a disposizione da Twisted Matrix per la gestione di casi come l'interazione coi database o le interazioni con altri client mediante socket.
Programmazione asincrona con Twisted Matrix
Twisted Matrix è un
framework per la programmazione di rete asincrona, è
sviluppato in Python e implementa già numerosi e diversi
protocolli, sia lato client che lato server, come, ad esempio,
SSH, HTTP(S), POP3, SMTP, IMAP, XMPP e tanti altri. Come tutti i
framework asincroni utilizza al suo interno un event-loop
con una delle tante syscall che reagiscono al cambiamento dello
stato nei file descriptor osservati come select(),
poll(), epoll(), kqueue() e
tante altre. In Twisted questo loop si chiama reactor
dal nome del design pattern che lo descrive. La bontà
della scelta di questo pattern è testimoniata anche dal
relativamente grande numero di network framework che ne fanno
utilizzo: ACE, Boost ASIO, Apache Mina, POE e Ruby EventMachine.
Un'ulteriore particolarità di Twisted Matrix sta nell'aver introdotto un nuovo pattern per la gestione delle callback chiamato Deferred. Non fa parte di questo articolo una spiegazione esaustiva del funzionamento e delle peculiarità di questo pattern, spiegazione che rimando a questo paper, ma vedremo brevemente i principali concetti riguardo al loro funzionamento per poter meglio comprendere il funzionamento del codice che scriveremo.
Una "Deferred" è sostanzialmente una promessa di consegnare un dato nel futuro. È un concetto molto simile alla possibilità di sospendere l'esecuzione di una funzione per riprenderla quando i dati necessari al proseguimento saranno finalmente disponibili. Un breve esempio di codice può mostrarci quello che intendo:
>>> from twisted.internet import defer
>>> d = defer.Deferred()
>>> def callback_1(arg):
... print "Ricevuto", arg
... return arg + 1
...
>>> def callback_2(arg):
... print "Ricevuto", arg
... return arg + "ciao"
...
>>> d.addCallback(callback_1)
<Deferred at 0x572e68>
>>> d.addCallback(callback_2)
<Deferred at 0x572e68>
>>> d.callback(45)
Ricevuto 45
Ricevuto 46
Come vediamo vengono chiamate le due callback nell'ordine in cui
sono state aggiunte alla deferred d. Quello che però
stupisce è che non vediamo traccia di alcuna eccezione che, invece,
sarebbe stata ovvia considerando che la callback_2
cerca di sommare un numero e una stringa, cosa vietata in Python.
In realtà nulla è andato perso e possiamo invece risolvere il problema
facendo una leggera modifica al codice di cui sopra:
>>> def errorback_1(exception):
... print exception
... return "Fine"
...
>>> d = defer.Deferred()
>>> d.addErrback(errorback_1)
<Deferred at 0x572e68>
>>> d.addCallback(callback_1)
<Deferred at 0x572e68>
>>> d.addCallback(callback_2)
<Deferred at 0x572e68>
>>> d.callback(45)
Ricevuto 45
Ricevuto 46
Ancora una volta non vediamo traccia della nostra eccezione. Probabilmente è per colpa del fatto che il gestore di eccezioni è stato aggiunto prima della callback problematica. Riproviamo:
>>> d = defer.Deferred()
>>> d.addCallback(callback_1)
<Deferred at 0x572e68>
>>> d.addCallback(callback_2)
<Deferred at 0x572e68>
>>> d.addErrback(errorback_1)
<Deferred at 0x572e68>
>>> d.callback(45)
Ricevuto 45
Ricevuto 46
[Failure instance: Traceback: <type 'exceptions.TypeError'>: unsupported operand type(s) for +: 'int' and 'str'
<stdin>:1:<module>
/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py:242:callback
/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py:307:_startRunCallbacks
—- <exception caught here> —-
/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py:323:_runCallbacks
<stdin>:3:callback_2
]
Questo significa che le deferred rappresentano un flusso di esecuzione diverso ma analogo a quello del programma principale e bisogna gestirle opportunamente e con attenzione. Tuttavia permettono di associare all'arrivo di alcuni dati, determinato dalla chiamata d.callback(45), a una serie di operazioni che vengono compiute consecutivamente una dopo l'altra in maniera completamente esplicita.
Prima di chiudere, provando il codice degli esempi vi potrebbe capitare di vedere comparire eccezioni come questa:
Unhandled error in Deferred:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py", line 242, in callback
self._startRunCallbacks(result)
File "/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py", line 307, in _startRunCallbacks
self._runCallbacks()
—- <exception caught here> —-
File "/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py", line 323, in _runCallbacks
self.result = callback(self.result, *args, **kw)
File "<stdin>", line 3, in callback_2
exceptions.TypeError: unsupported operand type(s) for +: 'int' and 'str'
Il problema evidenziato è quello scritto nella prima riga:
Unhandled error in Deferred. Avrete notato che sto
riutilizzando sempre la lettera d per identificare
l'istanza di una deferred. Riutilizzandola come negli esempi vado
a distruggere l'istanza più vecchia che, nel momento in cui viene
deallocata dal garbage collector, avverte della mancata
gestione di un errore.
deferToProcess
La cosa più semplice che possiamo implementare in questa prima
parte è sicuramente una funzione come deferToProcess
analoga a deferToThread già presente in Twisted
Matrix. Quest'ultima prende come argomento la funzione da eseguire
in un thread separato ritornando una deferred che verrà scatenata
quando il thread avrà terminato l'esecuzione passando il valore
ritornato dalla funzione. In tipico stile TDD andiamo prima a
identificare l'API che vorremmo usare assieme al funzionamento che
ci aspettiamo:
def test_deferToProcess(self):
STRING = "ciao\n"
ready, finished = p.deferToProcess(good.main, packages=("test",))
def _eb(reason):
print reason
finished.addErrback(_eb)
finished.addCallback(self.assertEquals, STRING)
ready.addCallback(lambda child: child.write(STRING))
return finished
def test_failing_deferToProcess(self):
ready, finished = p.deferToProcess(bad.main, packages=("test",))
self.assertFailure(finished, error.ProcessTerminated)
return finished
Utilizzando il framework di testing incluso in Twisted Matrix andiamo a definire una piccola batteria di test, di cui i principali ho riportato nell'esempio. In questo pacchetto è possibile scaricare tutto il codice su cui andremo a lavorare durante questo articolo.
Vediamo quindi che l'API desiderabile per ora vedrebbe
deferToProcess ritornare 2 deferred, finished e
ready che rappresentano i due stati di interesse per
il nostro sottoprocesso. deferToProcess accetta inoltre delle
funzioni da eseguire come argomento e in più un argomento
packages che servirà per aggiungere eventuali
package, non presenti nel PYTHONPATH, al nuovo processo avviato.
Andiamo quindi a vedere come viene realizzata la funzionalità di
deferToProcess:
import os
import sys
import imp
import sets
j = os.path.join
from twisted.internet import reactor, protocol, defer, error
from twisted.python import log, filepath, failure, util, reflect
bin = j(util.searchupwards('.', [], ['bin']), 'bin')
run_script = j(bin, 'run')
def spawnProcess(processProtocol, args=(), env={},
path=None, uid=None, gid=None, usePTY=0,
packages=()):
env = env.copy()
pythonpath = []
for pkg in packages:
p = os.path.split(imp.find_module(pkg)[1])[0]
if p.startswith(os.path.join(sys.prefix, 'lib')):
continue
pythonpath.append(p)
pythonpath = list(sets.Set(pythonpath))
pythonpath.extend(env.get('PYTHONPATH', '').split(os.pathsep))
env['PYTHONPATH'] = os.pathsep.join(pythonpath)
args = (sys.executable,) + args
return reactor.spawnProcess(processProtocol, sys.executable, args,
env, path, uid, gid, usePTY)
def _deferToProcess(prot, *args, **kwargs):
def _eb(reason):
reason.trap(error.ProcessDone)
log.err("FATAL: There was an error: %s" % (reason,))
def _cb(data):
log.msg("DONE. Received: %s bytes" % (len(data),))
return data
spawnProcess(prot, tuple(args), **kwargs)
prot.finished.addErrback(_eb)
prot.finished.addCallback(_cb)
return prot.ready, prot.finished
class BaseProtocol(protocol.ProcessProtocol):
def __init__(self, parent=None):
self.finished = defer.Deferred()
self.ready = defer.Deferred()
self.parent = parent
self._buffer = []
def connectionMade(self):
log.msg("Started subprocess")
self.ready.callback(self)
def outReceived(self, data):
self._buffer.append(data)
def errReceived(self, data):
if self.parent is not None:
self.parent.dataReceived(data)
else:
log.msg("stderr from subprocess: %s" % (data,))
def processEnded(self, status):
log.msg("%s" % (status.value,))
if status.check(error.ProcessDone):
self.finished.callback(''.join(self._buffer))
return
self.finished.errback(status)
def write(self, data):
self.transport.write(data)
def die(self):
self.transport.closeStdin()
def deferToProcess(func, *args, **kwargs):
name = reflect.fullFuncName(func)
parent = kwargs.pop('parent', None)
prot = BaseProtocol(parent)
return _deferToProcess(prot, run_script, name, *args, **kwargs)
In Twisted Matrix non viene data la possibilità di non eseguire
una exec(3) subito dopo l'esecuzione della
fork(2), ma viene invece eseguito un processo
completamente nuovo, che nel nostro caso sarà un altro processo
python. Notiamo quindi che nel modulo abbiamo utilizzato un piccolo
script ausiliario per far caricare ed eseguire una funzione il cui
percorso completo verrà passato come argomento. Nel nostro caso
questo script è il seguente:
#!/usr/bin/env python
import sys
from twisted.python import reflect
try:
method = reflect.namedAny(sys.argv[-1])
except Exception, e:
print e
method()
Questo script carica dinamicamente una funzione partendo dalla stringa passata come argomento.
L'altro elemento fondamentale del modulo principale è
rappresentato dalla presenza della classe
BaseProtocol che specifica l'API e il protocollo di
comunicazione col processo figlio appena creato. Nell'esempio
infatti padre e figlio dialogano utilizzando stdin e
stderr mentre stdout viene riservato
alla comunicazione del messaggio di uscita del processo figlio.
Andiamo quindi a vedere come potrebbe essere scritta la funzione
da eseguire nel processo figlio:
import sys
def main():
s = sys.stdin.readline()
while s:
sys.stderr.write("ciao %s" % (s.strip(),))
s = sys.stdin.readline()
Per completare il disegno e la spiegazione finora avanzata è d'obbligo osservare un esempio d'insieme che mostra anche alcune delle potenzialità del semplice sistema finora creato:
import sys
from twisted.internet import reactor, error
from twisted.python import util, log
log.startLogging(sys.stdout)
from process import deferToProcess
import tt
def restart(reason, func):
log.err("FATAL: Restarting after %s" % (reason,))
func()
def dieGently(data):
log.msg(data)
reactor.stop()
def _run():
ready, finished = deferToProcess(tt.main)
def doSomething(child):
child.write("mondo\n")
child.die()
ready.addCallback(doSomething)
finished.addCallback(dieGently).addErrback(restart, _run)
reactor.callLater(1, _run)
reactor.run()
In questo esempio creiamo un processo figlio che va ad eseguire la
funzione main definita precedentemente. In questo caso è piuttosto
semplice aspettarsi che il risultato dell'esecuzione sarà la
creazione della classica stringa "ciao mondo". Ma
possiamo notare aspetti decisamente piu` interessanti nel codice:
come è facile notare abbiamo aggiunto una errback alla
deferred finished, questo per fare in modo di gestire le
terminazioni involontarie del processo figlio. In erlang una delle
funzionalità più utili è il monitoring dei processi figli da parte
di un processo padre che provvede a fare ripartire tutti i
processi che muoiono senza una precisa ragione, in questo esempio
abbiamo cercato di ottenere lo stesso tipo di funzionalità e
l'abbiamo fatto limitandoci a richiamare la funzione principale in
caso di errore durante la vita del figlio. Nel caso in cui, invece,
il figlio terminasse senza problemi nulla verrebbe fatto ripartire
e il programma terminerebbe attraverso dieGently.
Conclusioni
In questa prima puntata abbiamo scritto un piccolo esempio utile per eseguire semplici processi figli che dialogano col padre attraverso un protocollo di comunicazione piuttosto semplice, non abbiamo quindi definito dei veri e propri worker processes, a meno di arricchire ulteriormente il protocollo di comunicazione e di migliorare l'uso che facciamo della deferToProcess.
Sarà infatti obiettivo delle prossime puntate quello di ottenere un risultato ancora più facilmente utilizzabile per distribuire un carico di lavoro su più processi heavy-weight in grado di sfruttare al massimo le possibilità offerte dalla rete e dall'architettura del computer d'esecuzione.
Riferimenti
- Getting C++ Threads Right di Hans Bohem: tutto il video è estremamente interessante ma ai fini di questo articolo la parte più interessante inizia al minuto 8 circa.
- The Problem with Threads di Edward A. Lee, Berkeley
- Pubblicato il
- 31 Mar 2008
- Tag


Ho utilizzato Twisted per un progetto abbastanza grande (una "specie" di server web) e si è rivelato estremamente performante. Certo che la programmazione con le deferred può essere, all'inizio un po' difficoltosa, ma una volta capito per bene il meccanismo delle deferred è anche divertente!
Ciao,
nel tuo esempio dici "Quello che però stupisce è che non vediamo traccia di alcuna eccezione che, invece, sarebbe stata ovvia considerando che la callback_2 cerca di sommare un numero e una stringa, cosa vietata in Python".
Eppure l'eccezione io la vedo eccome, tentando di eseguire questo codice: http://pastebin.ca/964476
L'errore che ottengo è questo: http://pastebin.ca/964477
Qualche idea? Ti ringrazio anticipatamente e complimenti per l'articolo (che ancora devo finire di leggere :P ).
Hehe, si vede non hai finito di leggere, infatti la risposta è poco più in basso :P
(Per i pigri: se esegui lo script all'uscita il GC fa pultito e il deferred protesta per una eccezione non gestita)
Parlando di threads in Python, segnalo questo articolo di Aahz sul GIL: http://www.pyzine.com/Issue001/Section_Articles/article_ThreadingGlobalInterpreter.html