Programmazione 4 commenti

Programmare multi processi facilmente con Twisted Matrix /1

di Valentino Volonghi

L'aumento del numero di core nei processori moderni e la necessità di sfruttare il più possibile il parallelismo tra diverse macchine, rende necessario trovare metodi efficaci per ottenere il massimo dei vantaggi con il minimo dello sforzo e della complessità. Storicamente si è sempre ricorso alla programmazione multi-thread shared-state ma questa è soggetta a numerosi problemi come ad esempio la gestione dei lock e dei semafori, la scalabilità sia nella stessa macchina che orizzontalmente su piu` macchine, l'imprevedibilità delle ottimizzazioni attuate dai compilatori, la complessità di unit-testing e debugging, i modelli di coerenza e consistenza della cache dei processori e della RAM e altri.

È però chiaro che esistono numerose alternative al modello shared-state, come il modello ad Actors, implementato ad esempio da Erlang o da Scala sebbene in modo molto differente tra loro, ma in questo articolo ci soffermeremo con particolare attenzione sulla gestione di più processi heavyweight in comunicazione tra loro. Il tutto sarà gestito attraverso un framework asincrono a eventi chiamato Twisted Matrix.

Perché proprio questo modello?

Una delle limitazioni, a seconda dei punti di vista, di Python è la presenza del GIL (Global Interpreter Lock). La Virtual Machine dell'implementazione di riferimento del linguaggio, chiamata anche CPython, non è infatti thread-safe e richiede la presenza di un Giant Lock per evitare di corrompere lo stato di esecuzione del processo, questo però non permette di sfruttare, attraverso il multi-threading, la presenza di più processori, con l'eccezione del codice scritto a più basso livello che può rilasciare il GIL per dedicarsi alla sua esecuzione senza intralciare la VM. Nel passato si è cercato numerose volte di rimuovere il GIL per sostituirlo con una serie di lock più fini e precisi, tuttavia tutti gli sforzi hanno prodotto delle versioni di Python almeno 2 volte più lente della versione con GIL e che complicavano incredibilmente la scrittura di moduli di estensione in C.

Il metodo più semplice per risolvere questo tipo di problematiche in Python è l'uso di più processi long-lived dedicati ognuno a uno specifico compito, un tipo di approcio simile a quello degli Actors. Il problema con questo tipo di soluzione è che i processi sono generalmente più pesanti dei thread e senza una soluzione per gestire la concorrenza non stiamo effettivamente risolvendo il problema in quanto ci ritroviamo nella situazione in cui i processi eseguiranno una singola operazione per volta, ipotesi accettabile solo nel caso in cui questi processi worker siano impegnati soltanto con lunghe operazioni di calcolo e pochissimo I/O. Per risolvere anche questo problema ci appoggeremo al modello asincrono sfruttando poi le funzionalità messe a disposizione da Twisted Matrix per la gestione di casi come l'interazione coi database o le interazioni con altri client mediante socket.

Programmazione asincrona con Twisted Matrix

Twisted Matrix è un framework per la programmazione di rete asincrona, è sviluppato in Python e implementa già numerosi e diversi protocolli, sia lato client che lato server, come, ad esempio, SSH, HTTP(S), POP3, SMTP, IMAP, XMPP e tanti altri. Come tutti i framework asincroni utilizza al suo interno un event-loop con una delle tante syscall che reagiscono al cambiamento dello stato nei file descriptor osservati come select(), poll(), epoll(), kqueue() e tante altre. In Twisted questo loop si chiama reactor dal nome del design pattern che lo descrive. La bontà della scelta di questo pattern è testimoniata anche dal relativamente grande numero di network framework che ne fanno utilizzo: ACE, Boost ASIO, Apache Mina, POE e Ruby EventMachine.

Un'ulteriore particolarità di Twisted Matrix sta nell'aver introdotto un nuovo pattern per la gestione delle callback chiamato Deferred. Non fa parte di questo articolo una spiegazione esaustiva del funzionamento e delle peculiarità di questo pattern, spiegazione che rimando a questo paper, ma vedremo brevemente i principali concetti riguardo al loro funzionamento per poter meglio comprendere il funzionamento del codice che scriveremo.

Una "Deferred" è sostanzialmente una promessa di consegnare un dato nel futuro. È un concetto molto simile alla possibilità di sospendere l'esecuzione di una funzione per riprenderla quando i dati necessari al proseguimento saranno finalmente disponibili. Un breve esempio di codice può mostrarci quello che intendo:

>>> from twisted.internet import defer
>>> d = defer.Deferred()
>>> def callback_1(arg):
...     print "Ricevuto", arg
...     return arg + 1
... 
>>> def callback_2(arg):
...     print "Ricevuto", arg
...     return arg + "ciao"
... 
>>> d.addCallback(callback_1)
<Deferred at 0x572e68>
>>> d.addCallback(callback_2)
<Deferred at 0x572e68>
>>> d.callback(45)
Ricevuto 45
Ricevuto 46

Come vediamo vengono chiamate le due callback nell'ordine in cui sono state aggiunte alla deferred d. Quello che però stupisce è che non vediamo traccia di alcuna eccezione che, invece, sarebbe stata ovvia considerando che la callback_2 cerca di sommare un numero e una stringa, cosa vietata in Python. In realtà nulla è andato perso e possiamo invece risolvere il problema facendo una leggera modifica al codice di cui sopra:

>>> def errorback_1(exception):
...     print exception
...     return "Fine"
... 
>>> d = defer.Deferred()
>>> d.addErrback(errorback_1)
<Deferred at 0x572e68>
>>> d.addCallback(callback_1)
<Deferred at 0x572e68>
>>> d.addCallback(callback_2)
<Deferred at 0x572e68>
>>> d.callback(45)
Ricevuto 45
Ricevuto 46

Ancora una volta non vediamo traccia della nostra eccezione. Probabilmente è per colpa del fatto che il gestore di eccezioni è stato aggiunto prima della callback problematica. Riproviamo:

>>> d = defer.Deferred()
>>> d.addCallback(callback_1)
<Deferred at 0x572e68>
>>> d.addCallback(callback_2)
<Deferred at 0x572e68>
>>> d.addErrback(errorback_1)
<Deferred at 0x572e68>
>>> d.callback(45)
Ricevuto 45
Ricevuto 46
[Failure instance: Traceback: <type 'exceptions.TypeError'>: unsupported operand type(s) for +: 'int' and 'str'
<stdin>:1:<module>
/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py:242:callback
/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py:307:_startRunCallbacks
- <exception caught here> -
/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py:323:_runCallbacks
<stdin>:3:callback_2
]

Questo significa che le deferred rappresentano un flusso di esecuzione diverso ma analogo a quello del programma principale e bisogna gestirle opportunamente e con attenzione. Tuttavia permettono di associare all'arrivo di alcuni dati, determinato dalla chiamata d.callback(45), a una serie di operazioni che vengono compiute consecutivamente una dopo l'altra in maniera completamente esplicita.

Prima di chiudere, provando il codice degli esempi vi potrebbe capitare di vedere comparire eccezioni come questa:

Unhandled error in Deferred:
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>

  File "/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py", line 242, in callback
    self._startRunCallbacks(result)
  File "/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py", line 307, in _startRunCallbacks
    self._runCallbacks()
- <exception caught here> -
  File "/Users/dialtone/dev/Twisted/trunk/twisted/internet/defer.py", line 323, in _runCallbacks
    self.result = callback(self.result, *args, **kw)
  File "<stdin>", line 3, in callback_2

exceptions.TypeError: unsupported operand type(s) for +: 'int' and 'str'

Il problema evidenziato è quello scritto nella prima riga: Unhandled error in Deferred. Avrete notato che sto riutilizzando sempre la lettera d per identificare l'istanza di una deferred. Riutilizzandola come negli esempi vado a distruggere l'istanza più vecchia che, nel momento in cui viene deallocata dal garbage collector, avverte della mancata gestione di un errore.

deferToProcess

La cosa più semplice che possiamo implementare in questa prima parte è sicuramente una funzione come deferToProcess analoga a deferToThread già presente in Twisted Matrix. Quest'ultima prende come argomento la funzione da eseguire in un thread separato ritornando una deferred che verrà scatenata quando il thread avrà terminato l'esecuzione passando il valore ritornato dalla funzione. In tipico stile TDD andiamo prima a identificare l'API che vorremmo usare assieme al funzionamento che ci aspettiamo:

def test_deferToProcess(self):
    STRING = "ciao\n"
    ready, finished = p.deferToProcess(good.main, packages=("test",))
    def _eb(reason):
        print reason
    finished.addErrback(_eb)
    finished.addCallback(self.assertEquals, STRING)
    ready.addCallback(lambda child: child.write(STRING))
    return finished

def test_failing_deferToProcess(self):
    ready, finished = p.deferToProcess(bad.main, packages=("test",))
    self.assertFailure(finished, error.ProcessTerminated)
    return finished

Utilizzando il framework di testing incluso in Twisted Matrix andiamo a definire una piccola batteria di test, di cui i principali ho riportato nell'esempio. In questo pacchetto è possibile scaricare tutto il codice su cui andremo a lavorare durante questo articolo.

Vediamo quindi che l'API desiderabile per ora vedrebbe deferToProcess ritornare 2 deferred, finished e ready che rappresentano i due stati di interesse per il nostro sottoprocesso. deferToProcess accetta inoltre delle funzioni da eseguire come argomento e in più un argomento packages che servirà per aggiungere eventuali package, non presenti nel PYTHONPATH, al nuovo processo avviato. Andiamo quindi a vedere come viene realizzata la funzionalità di deferToProcess:

import os
import sys
import imp
import sets
j = os.path.join

from twisted.internet import reactor, protocol, defer, error
from twisted.python import log, filepath, failure, util, reflect

bin = j(util.searchupwards('.', [], ['bin']), 'bin')
run_script = j(bin, 'run')

def spawnProcess(processProtocol, args=(), env={},
                 path=None, uid=None, gid=None, usePTY=0,
                 packages=()):
    env = env.copy()

    pythonpath = []
    for pkg in packages:
        p = os.path.split(imp.find_module(pkg)[1])[0]
        if p.startswith(os.path.join(sys.prefix, 'lib')):
            continue
        pythonpath.append(p)
    pythonpath = list(sets.Set(pythonpath))
    pythonpath.extend(env.get('PYTHONPATH', '').split(os.pathsep))
    env['PYTHONPATH'] = os.pathsep.join(pythonpath)

    args = (sys.executable,) + args

    return reactor.spawnProcess(processProtocol, sys.executable, args,
                                env, path, uid, gid, usePTY)

def _deferToProcess(prot, *args, **kwargs):
    def _eb(reason):
        reason.trap(error.ProcessDone)
        log.err("FATAL: There was an error: %s" % (reason,))
    def _cb(data):
        log.msg("DONE. Received: %s bytes" % (len(data),))
        return data

    spawnProcess(prot, tuple(args), **kwargs)
    prot.finished.addErrback(_eb)
    prot.finished.addCallback(_cb)
    return prot.ready, prot.finished

class BaseProtocol(protocol.ProcessProtocol):
    def __init__(self, parent=None):
        self.finished = defer.Deferred()
        self.ready = defer.Deferred()
        self.parent = parent
        self._buffer = []

    def connectionMade(self):
        log.msg("Started subprocess")
        self.ready.callback(self)

    def outReceived(self, data):
        self._buffer.append(data)

    def errReceived(self, data):
        if self.parent is not None:
            self.parent.dataReceived(data)
        else:
            log.msg("stderr from subprocess: %s" % (data,))

    def processEnded(self, status):
        log.msg("%s" % (status.value,))
        if status.check(error.ProcessDone):
            self.finished.callback(''.join(self._buffer))
            return
        self.finished.errback(status)

    def write(self, data):
        self.transport.write(data)

    def die(self):
        self.transport.closeStdin()

def deferToProcess(func, *args, **kwargs):
    name = reflect.fullFuncName(func)
    parent = kwargs.pop('parent', None)
    prot = BaseProtocol(parent)
    return _deferToProcess(prot, run_script, name, *args, **kwargs)

In Twisted Matrix non viene data la possibilità di non eseguire una exec(3) subito dopo l'esecuzione della fork(2), ma viene invece eseguito un processo completamente nuovo, che nel nostro caso sarà un altro processo python. Notiamo quindi che nel modulo abbiamo utilizzato un piccolo script ausiliario per far caricare ed eseguire una funzione il cui percorso completo verrà passato come argomento. Nel nostro caso questo script è il seguente:

#!/usr/bin/env python
import sys

from twisted.python import reflect
try:
    method = reflect.namedAny(sys.argv[-1])
except Exception, e:
    print e

method()

Questo script carica dinamicamente una funzione partendo dalla stringa passata come argomento.

L'altro elemento fondamentale del modulo principale è rappresentato dalla presenza della classe BaseProtocol che specifica l'API e il protocollo di comunicazione col processo figlio appena creato. Nell'esempio infatti padre e figlio dialogano utilizzando stdin e stderr mentre stdout viene riservato alla comunicazione del messaggio di uscita del processo figlio. Andiamo quindi a vedere come potrebbe essere scritta la funzione da eseguire nel processo figlio:

import sys

def main():
    s = sys.stdin.readline()
    while s:
        sys.stderr.write("ciao %s" % (s.strip(),))
        s = sys.stdin.readline()

Per completare il disegno e la spiegazione finora avanzata è d'obbligo osservare un esempio d'insieme che mostra anche alcune delle potenzialità del semplice sistema finora creato:

import sys

from twisted.internet import reactor, error
from twisted.python import util, log
log.startLogging(sys.stdout)

from process import deferToProcess

import tt

def restart(reason, func):
    log.err("FATAL: Restarting after %s" % (reason,))
    func()

def dieGently(data):
    log.msg(data)
    reactor.stop()

def _run():
    ready, finished = deferToProcess(tt.main)

    def doSomething(child):
        child.write("mondo\n")
        child.die()

    ready.addCallback(doSomething)
    finished.addCallback(dieGently).addErrback(restart, _run)

reactor.callLater(1, _run)
reactor.run()

In questo esempio creiamo un processo figlio che va ad eseguire la funzione main definita precedentemente. In questo caso è piuttosto semplice aspettarsi che il risultato dell'esecuzione sarà la creazione della classica stringa "ciao mondo". Ma possiamo notare aspetti decisamente piu` interessanti nel codice: come è facile notare abbiamo aggiunto una errback alla deferred finished, questo per fare in modo di gestire le terminazioni involontarie del processo figlio. In erlang una delle funzionalità più utili è il monitoring dei processi figli da parte di un processo padre che provvede a fare ripartire tutti i processi che muoiono senza una precisa ragione, in questo esempio abbiamo cercato di ottenere lo stesso tipo di funzionalità e l'abbiamo fatto limitandoci a richiamare la funzione principale in caso di errore durante la vita del figlio. Nel caso in cui, invece, il figlio terminasse senza problemi nulla verrebbe fatto ripartire e il programma terminerebbe attraverso dieGently.

Conclusioni

In questa prima puntata abbiamo scritto un piccolo esempio utile per eseguire semplici processi figli che dialogano col padre attraverso un protocollo di comunicazione piuttosto semplice, non abbiamo quindi definito dei veri e propri worker processes, a meno di arricchire ulteriormente il protocollo di comunicazione e di migliorare l'uso che facciamo della deferToProcess.

Sarà infatti obiettivo delle prossime puntate quello di ottenere un risultato ancora più facilmente utilizzabile per distribuire un carico di lavoro su più processi heavy-weight in grado di sfruttare al massimo le possibilità offerte dalla rete e dall'architettura del computer d'esecuzione.

Riferimenti

Pubblicato il
31 Mar 2008
Tag

Commenti

  • Lorenzo il 31 Mar 2008

    Ho utilizzato Twisted per un progetto abbastanza grande (una "specie" di server web) e si è rivelato estremamente performante. Certo che la programmazione con le deferred può essere, all'inizio un po' difficoltosa, ma una volta capito per bene il meccanismo delle deferred è anche divertente!

  • Andrea Grandi il 31 Mar 2008

    Ciao,

    nel tuo esempio dici "Quello che però stupisce è che non vediamo traccia di alcuna eccezione che, invece, sarebbe stata ovvia considerando che la callback_2 cerca di sommare un numero e una stringa, cosa vietata in Python".

    Eppure l'eccezione io la vedo eccome, tentando di eseguire questo codice: http://pastebin.ca/964476

    L'errore che ottengo è questo: http://pastebin.ca/964477

    Qualche idea? Ti ringrazio anticipatamente e complimenti per l'articolo (che ancora devo finire di leggere :P ).

  • Matteo Bertini il 31 Mar 2008

    Hehe, si vede non hai finito di leggere, infatti la risposta è poco più in basso :P

    (Per i pigri: se esegui lo script all'uscita il GC fa pultito e il deferred protesta per una eccezione non gestita)

  • Michele Simionato il 1 Apr 2008

    Parlando di threads in Python, segnalo questo articolo di Aahz sul GIL: http://www.pyzine.com/Issue001/Section_Articles/article_ThreadingGlobalInterpreter.html

Screencast e videocorsi di programmazione
Stacktrace RSS Feed Stacktrace via E-mail
Hai idee per un articolo? Faccelo sapere!