Programmare multi processi facilmente con Twisted Matrix /1

L’aumento del numero di core nei processori moderni e la
necessità di sfruttare il più possibile il parallelismo tra
diverse macchine, rende necessario trovare metodi efficaci per
ottenere il massimo dei vantaggi con il minimo dello sforzo e
della complessità. Storicamente si è sempre ricorso alla
programmazione multi-thread shared-state ma questa è
soggetta a numerosi problemi come ad
esempio la gestione dei lock e dei semafori, la scalabilità sia
nella stessa macchina che orizzontalmente su piu` macchine,
l’imprevedibilità delle ottimizzazioni attuate dai compilatori, la
complessità di unit-testing e debugging, i modelli di coerenza e
consistenza della cache dei processori e della RAM e altri.

È però chiaro che esistono numerose alternative al modello
shared-state, come il modello ad Actors,
implementato ad esempio da Erlang o da Scala
sebbene in modo molto differente tra loro, ma in questo articolo
ci soffermeremo con particolare attenzione sulla gestione di più
processi heavyweight in comunicazione tra loro. Il tutto
sarà gestito attraverso un framework asincrono a eventi chiamato
Twisted Matrix.

Perché proprio questo modello?

Una delle limitazioni, a seconda dei punti di vista, di
Python è la presenza del GIL
(Global Interpreter Lock). La Virtual Machine
dell’implementazione di riferimento del linguaggio, chiamata anche
CPython, non è infatti thread-safe e richiede la
presenza di un Giant Lock per evitare di corrompere lo
stato di esecuzione del processo, questo però non permette di
sfruttare, attraverso il multi-threading, la presenza di
più processori, con l’eccezione del codice scritto a più basso
livello che può rilasciare il GIL per dedicarsi alla sua
esecuzione senza intralciare la VM. Nel passato si è cercato
numerose volte di rimuovere il GIL per sostituirlo con una serie
di lock più fini e precisi, tuttavia tutti gli sforzi hanno
prodotto delle versioni di Python almeno 2 volte più lente della
versione con GIL e che complicavano incredibilmente la scrittura
di moduli di estensione in C.

Il metodo più semplice per risolvere questo tipo di problematiche
in Python è l’uso di più processi long-lived dedicati
ognuno a uno specifico compito, un tipo di approcio simile a
quello degli Actors. Il problema con questo tipo di soluzione è
che i processi sono generalmente più pesanti dei thread e senza
una soluzione per gestire la concorrenza non stiamo effettivamente
risolvendo il problema in quanto ci ritroviamo nella situazione in
cui i processi eseguiranno una singola operazione per volta,
ipotesi accettabile solo nel caso in cui questi processi worker
siano impegnati soltanto con lunghe operazioni di calcolo e
pochissimo I/O. Per risolvere anche questo problema ci appoggeremo
al modello asincrono sfruttando poi le funzionalità messe a
disposizione da Twisted Matrix per la gestione di casi come
l’interazione coi database o le interazioni con altri client
mediante socket.

Programmazione asincrona con Twisted Matrix

Twisted Matrix è un
framework per la programmazione di rete asincrona, è
sviluppato in Python e implementa già numerosi e diversi
protocolli, sia lato client che lato server, come, ad esempio,
SSH, HTTP(S), POP3, SMTP, IMAP, XMPP e tanti altri. Come tutti i
framework asincroni utilizza al suo interno un event-loop
con una delle tante syscall che reagiscono al cambiamento dello
stato nei file descriptor osservati come select(),
poll(), epoll(), kqueue() e
tante altre. In Twisted questo loop si chiama reactor
dal nome del design pattern che lo descrive. La bontà
della scelta di questo pattern è testimoniata anche dal
relativamente grande numero di network framework che ne fanno
utilizzo: ACE, Boost ASIO, Apache Mina, POE e Ruby EventMachine.

Un’ulteriore particolarità di Twisted Matrix sta nell’aver
introdotto un nuovo pattern per la gestione delle callback
chiamato Deferred. Non fa parte di questo articolo una
spiegazione esaustiva del funzionamento e delle peculiarità di
questo pattern, spiegazione che rimando a questo
paper, ma vedremo brevemente i principali concetti riguardo al
loro funzionamento per poter meglio comprendere il funzionamento
del codice che scriveremo.

Una “Deferred” è sostanzialmente una promessa di consegnare un
dato nel futuro. È un concetto molto simile alla possibilità di
sospendere l’esecuzione di una funzione per riprenderla quando i
dati necessari al proseguimento saranno finalmente disponibili.
Un breve esempio di codice può mostrarci quello che intendo:

>>> from twisted.internet import defer
>>> d = defer.Deferred()
>>> def callback_1(arg):
...     print "Ricevuto", arg
...     return arg + 1
... 
>>> def callback_2(arg):
...     print "Ricevuto", arg
...     return arg + "ciao"
... 
>>> d.addCallback(callback_1)

>>> d.addCallback(callback_2)

>>> d.callback(45)
Ricevuto 45
Ricevuto 46

Come vediamo vengono chiamate le due callback nell’ordine in cui
sono state aggiunte alla deferred d. Quello che però
stupisce è che non vediamo traccia di alcuna eccezione che, invece,
sarebbe stata ovvia considerando che la callback_2
cerca di sommare un numero e una stringa, cosa vietata in Python.

In realtà nulla è andato perso e possiamo invece risolvere il problema
facendo una leggera modifica al codice di cui sopra:

>>> def errorback_1(exception):
...     print exception
...     return "Fine"
... 
>>> d = defer.Deferred()
>>> d.addErrback(errorback_1)

>>> d.addCallback(callback_1)

>>> d.addCallback(callback_2)

>>> d.callback(45)
Ricevuto 45
Ricevuto 46

Ancora una volta non vediamo traccia della nostra eccezione. Probabilmente
è per colpa del fatto che il gestore di eccezioni è stato aggiunto
prima della callback problematica. Riproviamo:

>>> d = defer.Deferred()
>>> d.addCallback(callback_1)

>>> d.addCallback(callback_2)

>>> d.addErrback(errorback_1)

>>> d.callback(45)
Ricevuto 45
Ricevuto 46
[Failure instance: Traceback: : unsupported operand type(s) for +: 'int' and 'str'
:1:
/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py:242:callback
/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py:307:_startRunCallbacks
—-  —-
/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py:323:_runCallbacks
:3:callback_2
]

Questo significa che le deferred rappresentano un flusso di
esecuzione diverso ma analogo a quello del programma principale e
bisogna gestirle opportunamente e con attenzione. Tuttavia
permettono di associare all’arrivo di alcuni dati, determinato
dalla chiamata d.callback(45), a una serie di operazioni
che vengono compiute consecutivamente una dopo l’altra in maniera
completamente esplicita.

Prima di chiudere, provando il codice degli esempi vi potrebbe capitare
di vedere comparire eccezioni come questa:

Unhandled error in Deferred:
Traceback (most recent call last):
  File "", line 1, in 

  File "/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py", line 242, in callback
    self._startRunCallbacks(result)
  File "/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py", line 307, in _startRunCallbacks
    self._runCallbacks()
—-  —-
  File "/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py", line 323, in _runCallbacks
    self.result = callback(self.result, *args, **kw)
  File "", line 3, in callback_2

exceptions.TypeError: unsupported operand type(s) for +: 'int' and 'str'

Il problema evidenziato è quello scritto nella prima riga:
Unhandled error in Deferred. Avrete notato che sto
riutilizzando sempre la lettera d per identificare
l’istanza di una deferred. Riutilizzandola come negli esempi vado
a distruggere l’istanza più vecchia che, nel momento in cui viene
deallocata dal garbage collector, avverte della mancata
gestione di un errore.

deferToProcess

La cosa più semplice che possiamo implementare in questa prima
parte è sicuramente una funzione come deferToProcess
analoga a deferToThread già presente in Twisted
Matrix. Quest’ultima prende come argomento la funzione da eseguire
in un thread separato ritornando una deferred che verrà scatenata
quando il thread avrà terminato l’esecuzione passando il valore
ritornato dalla funzione. In tipico stile TDD andiamo prima a
identificare l’API che vorremmo usare assieme al funzionamento che
ci aspettiamo:

def test_deferToProcess(self):
    STRING = "ciao\n"
    ready, finished = p.deferToProcess(good.main, packages=("test",))
    def _eb(reason):
        print reason
    finished.addErrback(_eb)
    finished.addCallback(self.assertEquals, STRING)
    ready.addCallback(lambda child: child.write(STRING))
    return finished

def test_failing_deferToProcess(self):
    ready, finished = p.deferToProcess(bad.main, packages=("test",))
    self.assertFailure(finished, error.ProcessTerminated)
    return finished

Utilizzando il framework di testing incluso in Twisted Matrix andiamo
a definire una piccola batteria di test, di cui i principali ho riportato
nell’esempio. In questo pacchetto è
possibile scaricare tutto il codice su cui andremo a lavorare durante
questo articolo.

Vediamo quindi che l’API desiderabile per ora vedrebbe
deferToProcess ritornare 2 deferred, finished e
ready che rappresentano i due stati di interesse per
il nostro sottoprocesso. deferToProcess accetta inoltre delle
funzioni da eseguire come argomento e in più un argomento
packages che servirà per aggiungere eventuali
package, non presenti nel PYTHONPATH, al nuovo processo avviato.

Andiamo quindi a vedere come viene realizzata la funzionalità di
deferToProcess:

import os
import sys
import imp
import sets
j = os.path.join

from twisted.internet import reactor, protocol, defer, error
from twisted.python import log, filepath, failure, util, reflect

bin = j(util.searchupwards('.', [], ['bin']), 'bin')
run_script = j(bin, 'run')

def spawnProcess(processProtocol, args=(), env={},
                 path=None, uid=None, gid=None, usePTY=0,
                 packages=()):
    env = env.copy()

    pythonpath = []
    for pkg in packages:
        p = os.path.split(imp.find_module(pkg)[1])[0]
        if p.startswith(os.path.join(sys.prefix, 'lib')):
            continue
        pythonpath.append(p)
    pythonpath = list(sets.Set(pythonpath))
    pythonpath.extend(env.get('PYTHONPATH', '').split(os.pathsep))
    env['PYTHONPATH'] = os.pathsep.join(pythonpath)

    args = (sys.executable,) + args

    return reactor.spawnProcess(processProtocol, sys.executable, args,
                                env, path, uid, gid, usePTY)

def _deferToProcess(prot, *args, **kwargs):
    def _eb(reason):
        reason.trap(error.ProcessDone)
        log.err("FATAL: There was an error: %s" % (reason,))
    def _cb(data):
        log.msg("DONE. Received: %s bytes" % (len(data),))
        return data

    spawnProcess(prot, tuple(args), **kwargs)
    prot.finished.addErrback(_eb)
    prot.finished.addCallback(_cb)
    return prot.ready, prot.finished

class BaseProtocol(protocol.ProcessProtocol):
    def __init__(self, parent=None):
        self.finished = defer.Deferred()
        self.ready = defer.Deferred()
        self.parent = parent
        self._buffer = []

    def connectionMade(self):
        log.msg("Started subprocess")
        self.ready.callback(self)

    def outReceived(self, data):
        self._buffer.append(data)

    def errReceived(self, data):
        if self.parent is not None:
            self.parent.dataReceived(data)
        else:
            log.msg("stderr from subprocess: %s" % (data,))

    def processEnded(self, status):
        log.msg("%s" % (status.value,))
        if status.check(error.ProcessDone):
            self.finished.callback(''.join(self._buffer))
            return
        self.finished.errback(status)

    def write(self, data):
        self.transport.write(data)

    def die(self):
        self.transport.closeStdin()

def deferToProcess(func, *args, **kwargs):
    name = reflect.fullFuncName(func)
    parent = kwargs.pop('parent', None)
    prot = BaseProtocol(parent)
    return _deferToProcess(prot, run_script, name, *args, **kwargs)

In Twisted Matrix non viene data la possibilità di non eseguire
una exec(3) subito dopo l’esecuzione della
fork(2), ma viene invece eseguito un processo
completamente nuovo, che nel nostro caso sarà un altro processo
python. Notiamo quindi che nel modulo abbiamo utilizzato un piccolo
script ausiliario per far caricare ed eseguire una funzione il cui
percorso completo verrà passato come argomento. Nel nostro caso
questo script è il seguente:

#!/usr/bin/env python
import sys

from twisted.python import reflect
try:
    method = reflect.namedAny(sys.argv[-1])
except Exception, e:
    print e

method()

Questo script carica dinamicamente una funzione partendo dalla
stringa passata come argomento.

L’altro elemento fondamentale del modulo principale è
rappresentato dalla presenza della classe
BaseProtocol che specifica l’API e il protocollo di
comunicazione col processo figlio appena creato. Nell’esempio
infatti padre e figlio dialogano utilizzando stdin e
stderr mentre stdout viene riservato
alla comunicazione del messaggio di uscita del processo figlio.

Andiamo quindi a vedere come potrebbe essere scritta la funzione
da eseguire nel processo figlio:

import sys

def main():
    s = sys.stdin.readline()
    while s:
        sys.stderr.write("ciao %s" % (s.strip(),))
        s = sys.stdin.readline()

Per completare il disegno e la spiegazione finora avanzata è
d’obbligo osservare un esempio d’insieme che mostra anche alcune
delle potenzialità del semplice sistema finora creato:

import sys

from twisted.internet import reactor, error
from twisted.python import util, log
log.startLogging(sys.stdout)

from process import deferToProcess

import tt

def restart(reason, func):
    log.err("FATAL: Restarting after %s" % (reason,))
    func()

def dieGently(data):
    log.msg(data)
    reactor.stop()

def _run():
    ready, finished = deferToProcess(tt.main)

    def doSomething(child):
        child.write("mondo\n")
        child.die()

    ready.addCallback(doSomething)
    finished.addCallback(dieGently).addErrback(restart, _run)

reactor.callLater(1, _run)
reactor.run()

In questo esempio creiamo un processo figlio che va ad eseguire la
funzione main definita precedentemente. In questo caso è piuttosto
semplice aspettarsi che il risultato dell’esecuzione sarà la
creazione della classica stringa "ciao mondo". Ma
possiamo notare aspetti decisamente piu` interessanti nel codice:
come è facile notare abbiamo aggiunto una errback alla
deferred finished, questo per fare in modo di gestire le
terminazioni involontarie del processo figlio. In erlang una delle
funzionalità più utili è il monitoring dei processi figli da parte
di un processo padre che provvede a fare ripartire tutti i
processi che muoiono senza una precisa ragione, in questo esempio
abbiamo cercato di ottenere lo stesso tipo di funzionalità e
l’abbiamo fatto limitandoci a richiamare la funzione principale in
caso di errore durante la vita del figlio. Nel caso in cui, invece,
il figlio terminasse senza problemi nulla verrebbe fatto ripartire
e il programma terminerebbe attraverso dieGently.

Conclusioni

In questa prima puntata abbiamo scritto un piccolo esempio utile
per eseguire semplici processi figli che dialogano col padre
attraverso un protocollo di comunicazione piuttosto semplice, non
abbiamo quindi definito dei veri e propri worker processes,
a meno di arricchire ulteriormente il protocollo di comunicazione
e di migliorare l’uso che facciamo della deferToProcess.

Sarà infatti obiettivo delle prossime puntate quello di ottenere
un risultato ancora più facilmente utilizzabile per distribuire un
carico di lavoro su più processi heavy-weight in grado di
sfruttare al massimo le possibilità offerte dalla rete e
dall’architettura del computer d’esecuzione.

Riferimenti

Comments

  1. Lorenzo says:

    Ho utilizzato Twisted per un progetto abbastanza grande (una “specie” di server web) e si è rivelato estremamente performante. Certo che la programmazione con le deferred può essere, all’inizio un po’ difficoltosa, ma una volta capito per bene il meccanismo delle deferred è anche divertente!

  2. Ciao,

    nel tuo esempio dici “Quello che però stupisce è che non vediamo traccia di alcuna eccezione che, invece, sarebbe stata ovvia considerando che la callback_2 cerca di sommare un numero e una stringa, cosa vietata in Python”.

    Eppure l’eccezione io la vedo eccome, tentando di eseguire questo codice: http://pastebin.ca/964476

    L’errore che ottengo è questo: http://pastebin.ca/964477

    Qualche idea? Ti ringrazio anticipatamente e complimenti per l’articolo (che ancora devo finire di leggere 😛 ).

  3. Hehe, si vede non hai finito di leggere, infatti la risposta è poco più in basso 😛

    (Per i pigri: se esegui lo script all’uscita il GC fa pultito e il deferred protesta per una eccezione non gestita)

  4. Michele Simionato says:

    Parlando di threads in Python, segnalo questo
    articolo di Aahz sul GIL:
    http://www.pyzine.com/Issue001/Section_Articles/article_ThreadingGlobalInterpreter.html

Policy per i commenti: Apprezzo moltissimo i vostri commenti, critiche incluse. Per evitare spam e troll, e far rimanere il discorso civile, i commenti sono moderati e prontamente approvati poco dopo il loro invio.