Programmare multi processi facilmente con Twisted Matrix /2

Nell’ultima
puntata
ci siamo fatti le ossa nella gestione di sottoprocessi
utilizzando Twisted
Matrix
e un semplicissimo protocollo di comunicazione tra il
processo padre e ciascun processo figlio. Come abbiamo anticipato
però, se utilizziamo processi heavy-weight, specialmente
in Python, dobbiamo anche concentrarci nell’ammortizzare il costo
della loro creazione riutilizzandoli il più possibile, vedremo
però che per garantire la salute degli worker process
sono necessari numerosi controlli e molta intelligenza nella loro
gestione.

AMPoule

Dalla stesura del primo articolo è passato ormai molto tempo, il
semplice codice di esempio si è evoluto in un progetto più
complesso, organico e flessibile che offre tutte le funzionalità
basilari unite ad alcune caratteristiche più avanzate, il suo nome
è AMPoule e in questo
articolo ci concentreremo sulla sua struttura e sulle diverse
situazioni di utilizzo.

Il progetto si divide in 3 diversi strati:

  • Il ProcessPool vero e proprio che definisce la
    logica fondamentale di funzionamento insieme al codice di
    supporto per garantire la buona salute di tutti i processi
    figli.
  • Il ProcessStarter che definisce come ciascun
    sottoprocesso viene creato e implementa l’interfaccia
    IStarter.
  • L’AMPChild che rappresenta una base per la
    definizione delle funzionalità che ciascun worker
    process
    espone al codice che utilizza il
    pool.

Nel progetto è poi presente la possibilità di esporre, con una
semplice interfaccia a linea di comando, ciascuna funzionalità
presente nel pool attraverso un’interfaccia di rete per aiutare
alla distribuzione del codice su macchine differenti e non solo
sulla stessa. Questa è anche la funzionalità più interessante per
il ProcessPool, permette di scalare in modo
semplice la propria architettura su più macchine senza andare a
toccare il codice della propria applicazione
grazie all’utilizzo di processi separati
senza condivisione di stato. Un thread pool avrebbe potuto
sfruttare, e capita spesso che questo accada, una struttura dati
condivisa tra gli worker e il master e questo sarebbe diventato un
impedimento nel tentativo di muovere la computazione su più
macchine.

Il ProcessPool

In ampoule.pool.ProcessPool possiamo trovare
l’implementazione di tutta la logica di gestione dei
sottoprocessi. Questa classe incorpora la
parte principale del progetto e questo include:

  • La capacità di ricreare processi che si interrompono
    erroneamente.
  • Attraverso il parametro timeout del ProcessPool o della
    singola chiamata, la gestione di un timeout massimo per
    l’esecuzione di una chiamata prima di considerarla
    fallita.
  • La capacità di far crescere e diminuire il numero di
    processi vivi attraverso la definizione di un numero minimo
    e massimo, con un tempo
    massimo di idle per ciascuno dei sottoprocessi,
    attraversato il quale un worker viene ucciso senza essere
    riavviato.
  • La capacità di accodare le chiamate quando tutti gli
    worker sono occupati, questo si traduce nella capacità del
    sistema di bilanciare il carico tra tutti gli worker
    disponibili senza dover introdurre ulteriore logica in quanto
    la coda è svuotata in modalità First Come, First
    Served
    .
  • La possibilità di definire un numero massimo di chiamate a
    cui ciascun sottoprocesso ha la possibilità di rispondere
    prima di essere riciclato, ovvero ucciso e riavviato, per
    garantire un utilizzo di memoria il più possibile costante
    anche alla luce di memory leak che possono essere presenti
    nelle librerie utilizzate dai worker.

A causa della relativa giovinezza del progetto non è ancora stata
sviluppata un’interfaccia che definisca il minimo set di metodi
necessari allo sviluppo di un ProcessPool compatibile, tuttavia
possiamo già osservare alcuni dei metodi principali per gli
utilizzatori: ProcessPool.startAWorker,
ProcessPool.callRemote, ProcessPool.stopAWorker
e chiaramente i metodi start e stop.

Come vedremo più avanti è possibile reimplementare le modalità di
creazione dei sottoprocessi, anche eventualmente utilizzando
differenti protocolli di comunicazione tra master e worker,
ridefinendo alcuni dei metodi esposti dall’implementazione del
ProcessPool è possibile adattare il Pool ai propri bisogni
limitando il contatto con i moltissimi dettagli di basso livello
necessari ad implementare in modo sicuro tutte queste
funzionalità.

Dal punto di vista della comunicazione tra padre e figlio, questa
implementazione si aspetta di comunicare attraverso il protocollo
AMP, definito in twisted.protocols.amp, si tratta di una
scelta dettata dalla leggerezza del protocollo e dalla relativa
semplicità di porting del protocollo in altri linguaggi, esistono
infatti implementazioni in C, Python senza l’uso di Twisted
Matrix, Scheme e altri ancora. Questo implica anche che tutte le
limitazioni e i pregi del protocollo andranno anche a definire i
limiti della comunicazione tra padre e figlio e, fino a un certo
punto, anche l’API che il ProcessPool si aspetta di trovare nei
worker.

Il ProcessStarter

Come abbiamo già osservato, implementa l’interfaccia documentata
in ampoule.iampoule.IStarter che definisce due metodi
principali: IStarter.startPythonProcess e
IStarter.startAMPProcess. L’implementazione di default
del ProcessPool utilizza AMP come protocollo di comunicazione, un
semplice sistema basato sull’rfc822, tuttavia lo starter cerca di
garantire ai suoi utilizzatori la possibilità di sfruttare
alternative attraverso l’uso della più “semplice”
startPythonProcess che si limita a creare un sottoprocesso
utilizzando il protocollo definito dallo sviluppatore, chiaramente
però per poterlo utilizzare è necessario andare a ridefinire alcuni
dei metodi che abbiamo visto nel ProcessPool.

La separazione tra ProcessStarter e ProcessPool permette poi di
semplificare il processo di testing unitario permettendo di verificare
separatamente il funzionamento del codice che crea sottoprocessi e
quello che li gestisce, con al limite la possibilità di testare il
codice di gestione senza dover per forza andare a creare
worker.

L’implementazione di default del ProcessStarter permette di
definire alcuni parametri interessanti nei processi figli, come ad
esempio il reactor che dovrebbe essere utilizzato, se select o
gtk2 oppure epoll, oppure l’aggiunta di alcuni package che devono
assolutamente essere presenti nel PYTHONPATH dei processi figli,
attraverso il parametro packages.

AMPChild

La classe base per la definizione dei figli supporta un piccolo
numero di comandi che sono utili per il testing e per la gestione
di ciascun worker. Questo piccolo set di comandi rappresenta anche
l’interfaccia che tutti i processi figli dovrebbero implementare a
prescindere dal loro protocollo, in particolar modo il metodo
shutdown permette di uccidere un worker process senza
provocare una situazione di errore che ne provocherebbe il riavvio.

All’interno di questa classe possiamo eseguire numerose
personalizzazioni e sfruttare il
costruttore per eseguire tutto il codice bloccante di cui
disponiamo in quanto, durante l’inizializzazione della classe
figlio, il reactor non è ancora stato avviato. Il funzionamento
delle pipe va poi a garantire che i messaggi inviati ai figli non
vengano persi e che il figlio possa occuparsi di processarli
quando finalmente avrà finito di inizializzarsi. Inoltre la
funzione di accodamento delle chiamate nel ProcessPool garantisce
che una sola chiamata per volta sia inviata a un worker e in caso
di fallimento questa chiamata verrà ritornata al caller con il
giusto errore.

Qualche esempio…

Dopo aver visto più nel dettaglio il funzionamento di AMPoule
possiamo provare a usarlo per ottenere qualcosa di utile. Iniziamo
a guardare uno degli esempi disponibili:

from ampoule import child, util
from twisted.protocols import amp

class Pid(amp.Command):
    response = [("pid", amp.Integer())]

class MyChild(child.AMPChild):
    @Pid.responder
    def pid(self):
        import os
        return {"pid": os.getpid()}

@util.mainpoint
def main(args):
    import sys
    from twisted.internet import reactor, defer

    # abilitiamo anche il logging per vedere cosa succede nei figli 
    from twisted.python import log
    log.startLogging(sys.stdout)

    from ampoule import pool
    
    @defer.inlineCallbacks
    def _run():
        pp = pool.ProcessPool(MyChild, min=1, max=1)
        yield pp.start()
        result = yield pp.callRemote(Pid)
        print "The Child process PID is:", result['pid']
        yield pp.stop()
        reactor.stop()
    
    reactor.callLater(1, _run)
    reactor.run()

In questo semplice esempio usiamo molte funzionalità speciali di
Python e Twisted Matrix e, anche se a prima vista non sembrerebbe,
stiamo usando un hack per poter scrivere questo codice in un singolo
file che ci permetta di avviare questo script usando python nomefile.py
a cui tutti gli sviluppatori Python sono abituati.

Eseguendolo otteniamo il seguente output:

2009-03-08 16:58:24-0700 [-] Log opened.
2009-03-08 16:58:25-0700 [-] Subprocess 0 started.
2009-03-08 16:58:25-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 16:58:25-0700 [-] ProcessPool stats:
2009-03-08 16:58:25-0700 [-] 	workers: 1
2009-03-08 16:58:25-0700 [-] 	timeout: None
2009-03-08 16:58:25-0700 [-] 	parent: None
2009-03-08 16:58:25-0700 [-] 	child: <class 'pid.MyChild'>
2009-03-08 16:58:25-0700 [-] 	max idle: 20
2009-03-08 16:58:25-0700 [-] 	recycle after: 500
2009-03-08 16:58:25-0700 [-] 	ProcessStarter:
2009-03-08 16:58:25-0700 [-] 		ProcessStarter(bootstrap='import sys\n\ndef main(reactor, ampChildPath):\n    from twisted.application import reactors\n    reactors.installReactor(reactor)\n    \n    from twisted.python import log\n    log.startLogging(sys.stderr)\n\n    from twisted.internet import reactor, stdio\n    from twisted.python import reflect\n\n    ampChild = reflect.namedAny(ampChildPath)\n    stdio.StandardIO(ampChild(), 3, 4)\n    reactor.run()\nmain(sys.argv[-2], sys.argv[-1])\n',
	                                 args=(),
	                                 env={},
	                                 path=None,
	                                 uid=None,
	                                 gid=None,
	                                 usePTY=0,
	                                 packages=('twisted', 'ampoule'),
	                                 childReactor='select')
2009-03-08 16:58:27-0700 [-] FROM 0: 2009-03-08 16:58:27-0700 [-] Log opened.
2009-03-08 16:58:27-0700 [-] FROM 0: 2009-03-08 16:58:27-0700 [-] MyChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x535ff0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x535eb0>)
2009-03-08 16:58:27-0700 [-] The Child process PID is: 11771
2009-03-08 16:58:27-0700 [-] FROM 0: 2009-03-08 16:58:27-0700 [-] Shutdown message received, goodbye.
2009-03-08 16:58:27-0700 [-] FROM 0: 2009-03-08 16:58:27-0700 [-] MyChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x535ff0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x535eb0>)
2009-03-08 16:58:27-0700 [-] FROM 0: 2009-03-08 16:58:27-0700 [-] Main loop terminated.
2009-03-08 16:58:27-0700 [-] Process: 0 ended
2009-03-08 16:58:27-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 16:58:27-0700 [-] STOPPING: ''
2009-03-08 16:58:27-0700 [-] Main loop terminated.

In questo output sono visibili alcune cose interessanti: le linee
che iniziano con FROM 0 sono linee che vengono effettivamente
generate da ciascuno dei processi figli, il numero 0 è un id unico
che ogni figlio riceve durante l’avvio. L’altro dettaglio interessante
è la print dettagliata di tutte le statistiche e dello stato del process
pool in cui è possibile vedere anche tutti gli argomenti con cui è
stato inizializzato il ProcessStarter.

Il primo punto per capire il file è l’uso del decoratore
util.mainpoint, la sua presenza è interamente legata
alla possibilità di importare il modulo che stiamo definendo. In
Python infatti molti sanno che un modo per rendere condizionale
l’esecuzione di codice all’interno di un modulo è quello di usare:

if __name__ == "__main__":
   # qua va il codice eseguito solo quando il programma viene fatto
   # partire con "python nomefile.py"
   pass

Questo ha la spiacevole conseguenza di rendere difficile ricavare
dinamicamente il nome del modulo dal suo interno.
util.mainpoint va quindi sopperire a questo problema.

Scoperta la natura dell’hack che stiamo usando passiamo a parlare
dell’altro decoratore: defer.inlineCallbacks. Da
Python 2.5 i generatori hanno ricevuto la
possibilità di ricevere e inviare valori al loro chiamante. In
Twisted Matrix si è deciso di sfruttare questa nuova funzionalità
assieme alle deferred per rendere più semplice la
programmazione asincrona, senza tuttavia rimuovere completamente
le tracce di questo tipo di programmazione, al fine di non trarre
in inganno gli sviluppatori. Decorando qualsiasi funzione con
questo decoratore e` possibile utilizzare yield per
sospendere l’esecuzione della funzione finchè la deferred
ritornata non si sblocca o continuando l’esecuzione oppure con
un’eccezione.

A questo punto abbiamo visto gli elementi necessari per iniziare a
osservare più in dettaglio questo esempio e quelli che verranno.
Quando si scrivono software multi-processo è sempre estremamente
utile capire in che processo viene eseguita ciascuna linea di
codice che scriviamo in quanto da questo dipende la modalità di
comunicazione tra padre e figlio in un process pool. Nell’esempio
in questione possiamo dire che tutto il codice all’interno della
funzione _run è eseguito nel processo padre, mentre
la classe MyChild sarà eseguita nel processo figlio.
In realtà questo codice sarà compilato completamente sia dal padre
che dal figlio, ma grazie a util.mainpoint solo il
padre eseguirà quella sezione, il figlio infatti importerà
soltanto questo modulo. La classe Pid rappresenta la
modalità di comunicazione tra master e worker o, in altri termini,
ciò che il figlio è in grado di eseguire. In questo particolare
esempio l’unica cosa che facciamo fare al figlio è ritornare il
pid del processo in cui si trova, non ci sono poi ulteriori spunti
interessanti, è un esempio molto semplice che serve solo a dare
un’idea del minimo indispensabile per distribuire il lavoro su
più processi.

Passiamo a vedere un esempio appena più interessante allora:

from ampoule import child, util

@util.mainpoint
def main(args):
    from twisted.internet import reactor, defer
    from ampoule import pool, Ping
    import time
    
    @defer.inlineCallbacks
    def _run():
        pp = pool.ProcessPool(child.AMPChild, recycleAfter=10000)
        pp.min = 1
        pp.max = 5
        yield pp.start()
        t = time.time()
        REPEATS = 40000
        l = [pp.callRemote(Ping) for x in xrange(REPEATS)]
        yield defer.DeferredList(l)
        print REPEATS/(time.time() - t)
        yield pp.stop()
        reactor.stop()
    
    reactor.callLater(1, _run)
    reactor.run()

In questo esempio cerchiamo di sfruttare un process pool nel modo
in cui andremo a usarlo effettivamente nel nostro software.
Tuttavia il comando che eseguiamo risulta estremamente semplice e
quindi ciò che andremo a misurare è l’effettiva velocità di
round-trip per le chiamate ai processi figli. Eseguendo tutto
sulla stessa macchina tuttavia otteniamo in output esattamente in
numero di chiamate al secondo che siamo stati in grado di
eseguire. Il collo di bottiglia in questo sistema è chiaramente
rappresentato dal codice che serializza e deserializza i comandi e
le risposte, per la maggior parte degli usi però
non andremo mai a preoccuparci di questo in quanto
stiamo cercando di spostare l’esecuzione di codice bloccante, che
quindi
impiegherà un lungo tempo per essere eseguito, essendo l’overhead
di serializzazione più basso della computazione, possiamo permettendoci di non
preoccuparci del limite di chiamate al secondo. Abilitando il
logging sarà possibile vedere ciò che avviene nei sottoprocessi e
variando i parametri con cui viene creato il ProcessPool potremo
osservare le modalità di crescita e riduzione delle dimensioni del
ProcessPool. Ad esempio sulla mia macchina, passando 5000 come
valore per recycleAfter osservo questo:

2009-03-08 17:22:18-0700 [-] Log opened.
2009-03-08 17:22:19-0700 [-] Subprocess 0 started.
2009-03-08 17:22:19-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:19-0700 [-] ProcessPool stats:
2009-03-08 17:22:19-0700 [-] 	workers: 1
2009-03-08 17:22:19-0700 [-] 	timeout: None
2009-03-08 17:22:19-0700 [-] 	parent: None
2009-03-08 17:22:19-0700 [-] 	child: <class 'ampoule.child.AMPChild'>
2009-03-08 17:22:19-0700 [-] 	max idle: 20
2009-03-08 17:22:19-0700 [-] 	recycle after: 5000
2009-03-08 17:22:19-0700 [-] 	ProcessStarter:
2009-03-08 17:22:19-0700 [-] 		ProcessStarter(bootstrap='import sys\n\ndef main(reactor, ampChildPath):\n    from twisted.application import reactors\n    reactors.installReactor(reactor)\n    \n    from twisted.python import log\n    log.startLogging(sys.stderr)\n\n    from twisted.internet import reactor, stdio\n    from twisted.python import reflect\n\n    ampChild = reflect.namedAny(ampChildPath)\n    stdio.StandardIO(ampChild(), 3, 4)\n    reactor.run()\nmain(sys.argv[-2], sys.argv[-1])\n',
	                                 args=(),
	                                 env={},
	                                 path=None,
	                                 uid=None,
	                                 gid=None,
	                                 usePTY=0,
	                                 packages=('twisted', 'ampoule'),
	                                 childReactor='select')
2009-03-08 17:22:19-0700 [-] Subprocess 1 started.
2009-03-08 17:22:19-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:19-0700 [-] Subprocess 2 started.
2009-03-08 17:22:19-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:19-0700 [-] Subprocess 3 started.
2009-03-08 17:22:19-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:19-0700 [-] Subprocess 4 started.
2009-03-08 17:22:19-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:21-0700 [-] FROM 2: 2009-03-08 17:22:21-0700 [-] Log opened.
2009-03-08 17:22:21-0700 [-] FROM 0: 2009-03-08 17:22:21-0700 [-] Log opened.
2009-03-08 17:22:21-0700 [-] FROM 3: 2009-03-08 17:22:21-0700 [-] Log opened.
2009-03-08 17:22:21-0700 [-] FROM 4: 2009-03-08 17:22:21-0700 [-] Log opened.
2009-03-08 17:22:21-0700 [-] FROM 1: 2009-03-08 17:22:21-0700 [-] Log opened.
2009-03-08 17:22:21-0700 [-] FROM 0: 2009-03-08 17:22:21-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:21-0700 [-] FROM 2: 2009-03-08 17:22:21-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:21-0700 [-] FROM 4: 2009-03-08 17:22:21-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:21-0700 [-] FROM 3: 2009-03-08 17:22:21-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:21-0700 [-] FROM 1: 2009-03-08 17:22:21-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:33-0700 [-] FROM 0: 2009-03-08 17:22:33-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:33-0700 [-] FROM 0: 2009-03-08 17:22:33-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:33-0700 [-] FROM 0: 2009-03-08 17:22:33-0700 [-] Main loop terminated.
2009-03-08 17:22:33-0700 [-] Process: 0 ended
2009-03-08 17:22:33-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:33-0700 [-] STOPPING: ''
2009-03-08 17:22:33-0700 [-] Subprocess 5 started.
2009-03-08 17:22:33-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:34-0700 [-] FROM 1: 2009-03-08 17:22:34-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:34-0700 [-] FROM 1: 2009-03-08 17:22:34-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:34-0700 [-] FROM 1: 2009-03-08 17:22:34-0700 [-] Main loop terminated.
2009-03-08 17:22:34-0700 [-] Process: 1 ended
2009-03-08 17:22:34-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:34-0700 [-] STOPPING: ''
2009-03-08 17:22:34-0700 [-] Subprocess 6 started.
2009-03-08 17:22:34-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:34-0700 [-] FROM 4: 2009-03-08 17:22:34-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:34-0700 [-] FROM 4: 2009-03-08 17:22:34-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:34-0700 [-] FROM 4: 2009-03-08 17:22:34-0700 [-] Main loop terminated.
2009-03-08 17:22:34-0700 [-] Process: 4 ended
2009-03-08 17:22:34-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:34-0700 [-] STOPPING: ''
2009-03-08 17:22:34-0700 [-] Subprocess 7 started.
2009-03-08 17:22:34-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:34-0700 [-] FROM 2: 2009-03-08 17:22:34-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:34-0700 [-] FROM 2: 2009-03-08 17:22:34-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:34-0700 [-] FROM 2: 2009-03-08 17:22:34-0700 [-] Main loop terminated.
2009-03-08 17:22:34-0700 [-] Process: 2 ended
2009-03-08 17:22:34-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:34-0700 [-] STOPPING: ''
2009-03-08 17:22:34-0700 [-] Subprocess 8 started.
2009-03-08 17:22:34-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:34-0700 [-] FROM 3: 2009-03-08 17:22:34-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:34-0700 [-] FROM 3: 2009-03-08 17:22:34-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:34-0700 [-] FROM 3: 2009-03-08 17:22:34-0700 [-] Main loop terminated.
2009-03-08 17:22:34-0700 [-] Process: 3 ended
2009-03-08 17:22:34-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:34-0700 [-] STOPPING: ''
2009-03-08 17:22:34-0700 [-] Subprocess 9 started.
2009-03-08 17:22:34-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:36-0700 [-] FROM 5: 2009-03-08 17:22:36-0700 [-] Log opened.
2009-03-08 17:22:36-0700 [-] FROM 5: 2009-03-08 17:22:36-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:36-0700 [-] FROM 6: 2009-03-08 17:22:36-0700 [-] Log opened.
2009-03-08 17:22:36-0700 [-] FROM 8: 2009-03-08 17:22:36-0700 [-] Log opened.
2009-03-08 17:22:36-0700 [-] FROM 7: 2009-03-08 17:22:36-0700 [-] Log opened.
2009-03-08 17:22:36-0700 [-] FROM 6: 2009-03-08 17:22:36-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:36-0700 [-] FROM 7: 2009-03-08 17:22:36-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:36-0700 [-] FROM 8: 2009-03-08 17:22:36-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:36-0700 [-] FROM 9: 2009-03-08 17:22:36-0700 [-] Log opened.
2009-03-08 17:22:36-0700 [-] FROM 9: 2009-03-08 17:22:36-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:44-0700 [-] 1592.24963936
2009-03-08 17:22:44-0700 [-] FROM 5: 2009-03-08 17:22:44-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:44-0700 [-] FROM 5: 2009-03-08 17:22:44-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:44-0700 [-] FROM 5: 2009-03-08 17:22:44-0700 [-] Main loop terminated.
2009-03-08 17:22:44-0700 [-] FROM 8: 2009-03-08 17:22:44-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:44-0700 [-] FROM 8: 2009-03-08 17:22:44-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:44-0700 [-] FROM 8: 2009-03-08 17:22:44-0700 [-] Main loop terminated.
2009-03-08 17:22:44-0700 [-] FROM 7: 2009-03-08 17:22:44-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:44-0700 [-] FROM 7: 2009-03-08 17:22:44-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:44-0700 [-] FROM 7: 2009-03-08 17:22:44-0700 [-] Main loop terminated.
2009-03-08 17:22:44-0700 [-] FROM 6: 2009-03-08 17:22:44-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:44-0700 [-] FROM 6: 2009-03-08 17:22:44-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:44-0700 [-] FROM 6: 2009-03-08 17:22:44-0700 [-] Main loop terminated.
2009-03-08 17:22:44-0700 [-] Process: 6 ended
2009-03-08 17:22:44-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:44-0700 [-] STOPPING: ''
2009-03-08 17:22:44-0700 [-] FROM 9: 2009-03-08 17:22:44-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:44-0700 [-] FROM 9: 2009-03-08 17:22:44-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:44-0700 [-] FROM 9: 2009-03-08 17:22:44-0700 [-] Main loop terminated.
2009-03-08 17:22:44-0700 [-] Process: 5 ended
2009-03-08 17:22:44-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:44-0700 [-] STOPPING: ''
2009-03-08 17:22:44-0700 [-] Process: 7 ended
2009-03-08 17:22:44-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:44-0700 [-] STOPPING: ''
2009-03-08 17:22:44-0700 [-] Process: 8 ended
2009-03-08 17:22:44-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:44-0700 [-] STOPPING: ''
2009-03-08 17:22:44-0700 [-] Process: 9 ended
2009-03-08 17:22:44-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:44-0700 [-] STOPPING: ''
2009-03-08 17:22:44-0700 [-] Main loop terminated.

Come esempio finale andiamo a realizzare qualcosa che allo stesso
tempo risulta estremamente utile e molto interessante. Cercheremo
di realizzare un piccolo servizio di creazione di screenshot di
siti internet che non richiede la presenza di X11 per essere
utilizzato. L’esempio che vedremo è molto lungo e anche abbastanza
complesso in quanto cerca di tenere presenti alcuni dettagli
importanti per avere un servizio anche basilare. Non sarà compito
di questo articolo arrivare al setup completo di un sistema basato
su questo codice, ma le librerie necessarie, come libgtkwebkit e
xvfb sono disponibili in molte distribuzioni linux recenti. Altro
punto è la selezione del reactor gtk2 nei sottoprocessi per
poter integrare Twisted Matrix con GTK+2.

# This code is MIT licensed, copyright of adroll.com
import re
import os
import uuid
import base64
import urlparse

import gtk
import gobject
import webkit

from twisted.protocols import amp
from twisted.internet import defer, reactor

from ampoule import child, util

DOCUMENT_WIDTH=990 # 1024, with the scrollbar taken into account
DOCUMENT_HEIGHT=768

SCREENSHOT_WIDTH = 320
SCREENSHOT_HEIGHT = 240

# Utility functions
def generate_uuid():
    """
    create a 22 char globally unique identifier
    """
    u = uuid.uuid4()
    b = base64.b32encode(u.bytes)
                     
    b = b.rstrip("=\n") # lose the "==" that finishes a base64 value
    return b.decode('utf-8')
####################

# Ampoule integration
class FetchURL(amp.Command):
    """
    The command definition for requesting a URL.
    """
    arguments = [("url", amp.String())]
    response = [("filepath", amp.Path())]

class ScreenshotService(child.AMPChild):
    """
    The child which ampoule manages.
    """
    _browser = None

    @FetchURL.responder
    def fetch_url(self, url):
        if not self._browser:
            self._browser = Browser()

        return self._browser.fetch_url(url)
#####################

# gtk code that runs inside the Twisted Matrix reactor
class Browser(gtk.Window):
    """
    Represents the browser application.
    """
    def __init__(self):
        gtk.Window.__init__(self)

        # Variables to ensure that no requests are outstanding before taking
        # the screenshot
        self._redirect = False
        self._redirect_caught = False
        self._load_wait = False

        # Create the interface
        self._scrolled = gtk.ScrolledWindow()
        self._browser = webkit.WebView()
        self._browser.set_size_request(DOCUMENT_WIDTH, DOCUMENT_HEIGHT)
        self._scrolled.add(self._browser)
        self.add(self._scrolled)

        # Setup callbacks and set the size of the window.
        self.connect('destroy', lambda *a, **kw: reactor.stop())
        self._browser.connect('load-finished', self._loaded)
        self._browser.connect('navigation-requested', self._handle_redirects)

        self.show_all()

    def fetch_url(self, url):
        """
        Fetches the given url.

        @param url: The url to visit.
        """

        def cb(_):
            self._fetch_url_deferred = defer.Deferred()
            self._browser.open(url)
            return self._fetch_url_deferred

        def eb(_):
            return {"filepath" : u""}
        
        # preemptively check if the url exists, if not don't waste
        # resources.
        hostname = urlparse.urlparse(url).hostname
        return reactor.resolve(hostname).addCallback(cb).addErrback(eb)

    def _send_response_back(self, filepath):
        """
        Here we return the result to our caller.
        """
        self._fetch_url_deferred.callback({"filepath" : filepath})
        self._fetch_url_deferred = None

    def _handle_redirects(self, pane, frame, request):
        """
        Handles each new network request

        Note these parameters aren't properly converted to Python by the pygtkwebkit wrapper.

        @param pane: WebkitPane
        @param frame: WebkitFrame
        @param request: Webkit Request
        """
        if self._load_wait:
            self._redirect = True

    def _loaded(self, view, frame):
        """
        Handles each document loaded event.

        Register a callback to grab a snapshot of the webpage.

        It needs to be delayed in case there are other requests being made that
        we didn't know about (ie javascript redirects)
        """
        self._load_wait = True
        reactor.callLater(0.5, self._snapshot)

    def _snapshot(self):
        """
        Create the full length pixmap and capture the first screen full of it
        and scroll as necessary to capture the full page.
        """

        # If a redirect happened than the first loaded signal was bogus, wait
        # for the next loaded signal. 
        #
        # We have to use another variable here because there are multiple
        # requested signals per load event
        self._load_wait = False
        if self._redirect and not self._redirect_caught:
            self._redirect_caught = True
            return False
        else:
            self._redirect = False
            self._redirect_caught = False

        # Get the dimensions of the HTML widget
        window = self._scrolled.window
        (x, y, _, _, depth) = window.get_geometry()
        (width, height) = self._browser.window.get_geometry()[2:4]

        # Create a pixmap of the screen above the fold
        self._pixbuf = gtk.gdk.Pixbuf(gtk.gdk.COLORSPACE_RGB, False, 8, width, height)

        # Capture the visible part of the page
        self._pixbuf.get_from_drawable(window, self.get_colormap(), 0, 0, 0, 0, width, height)

        # Cut off the image to 4:3 aspect ratio
        above_fold = gtk.gdk.Pixbuf(gtk.gdk.COLORSPACE_RGB, False, 8, 990, 742)
        self._pixbuf.copy_area(0, 0, 990, 742, above_fold, 0, 0)

        # Scale to SCREENSHOT_WIDTH x SCREENSHOT_HEIGHT
        medium_size = above_fold.scale_simple(SCREENSHOT_WIDTH, SCREENSHOT_HEIGHT, gtk.gdk.INTERP_BILINEAR)

        # Figure out filename
        medium_size_filename = u"/tmp/%s.jpg" % (generate_uuid(),)

        # Save the image
        medium_size.save(medium_size_filename.encode('utf-8'), "jpeg")
        self._send_response_back(medium_size_filename)

        # Don't reschedule this
        return False

Per ragioni di spazio, non andremo a sviscerare l’esempio in ogni
dettaglio, anche perché molti di essi riguardano più che altro il
funzionamento di WebKit e GTK+, tuttavia notiamo come l’integrazione
di questo codice con AMPoule è particolarmente semplice e si limita
alla definizione di un semplice comando FetchURL e
della classe ScreenshotService che definisce il servizio.

All’interno della classe Browser invece gli unici
punti rilevanti sono il metodo Browser.fetch_url e
Browser._send_response_back che si occupano
rispettivamente di creare la deferred per ritornare il risultato e
di sbloccare la deferred una volta che lo screenshot è stato
salvato.

Il codice per rendere questo esempio un vero e proprio servizio
non è molto diverso da quello che abbiamo visto negli esempi
precedenti e lo lascio quindi come esercizio per i lettori più
curiosi. Utilizzando il plugin di AMPoule per il tool twistd sarà
poi possibile esporre questo servizio di screenshot sulla rete per
potervi accedere da macchine remote, anche se in questo caso
diventerebbe necessaria l’introduzione di un filesystem condiviso
per poter salvare lo screenshot in modo che tutti vi possano
accedere usando lo stesso URI.

Comments

  1. Non ho ancora letto l’articolo, ma è curioso leggerlo proprio il giorno dopo che ho scoperto http://opensource.hyves.org/concurrence/

    tra le altre cose, nella pagina dicono:
    “The goal of Concurrence is to provide an easier programming model for writing high performance network applications than existing solutions (Multi-threading, Twisted, asyncore etc).”

    secondo te quanto puo’ essere veloce / stabile / affidabile questa soluzione rispetto a twisted?

  2. Anche io ho visto Concurrence poco tempo fa per la prima volta, non l’ho pubblicato in conseguenza comunque :).

    Leggendo un po’ il codice di concurrence comunque direi che l’API non e` per nulla molto pulita e semplice, il progetto in se e` promettente e offre molto di cio` che serve per sviluppare applicazioni web, pero` dal punto di vista dell’API e` davvero molto molto indietro e l’autore, secondo me, ha fatto scelte molto discutibili a riguardo.

    L’esempio che mostra qua e` enormemente piu` complesso dello stesso esempio fatto con Twisted. Inoltre anche per Twisted esistono numerose estensioni, anche semplicemente twisted.internet.task.Cooperator, che permettono di usare dei generatori per gestire l’elaborazione, e per chi e` davvero innamorato delle greenlet esiste anche corotwine e l’esempio dello stesso chat server in questa estensione di Twisted e` qua anche se e` un po’ che non riceve molte attenzioni.

    L’altro problemino di concurrence e` che utilizza sendmsg per inviare gli fd tra processi sulla stessa macchina, questo lo rende automaticamente non portabile al di fuori degli UNIX.

    Comunque e` uno dei tanti progetti interessanti a questo riguardo che si stanno creando in Python. Altri progetti interessanti simili a concurrence, o per concetto o per realizzazione, sono eventlet, fibra che pero` non supporta ancora I/O non bloccante, e appunto questo Concurrence che sembra molto completo ma ha il problema della complessita` della sua API su cui non e` stato fatto molto lavoro di semplificazione secondo me, da seguire per vedere dove andra` a finire ma per ora non scriverei applicazioni in produzione, magari qualche prototipo se volessi provarlo. Twisted Matrix dalla sua ha l’immensa maturita` e la vastita` delle librerie che lo supportano. Scrivere una piccola alternativa a Twisted e` relativamente molto facile, coprire tutti i casi estremi e le problematiche e renderlo portabile e integrato con altri event-loop (come le librerie grafiche) invece e` molto complesso. Se volessi davvero guardare verso networking ad alte o altissime prestazioni andrei verso erlang.

  3. Alan Franzoni says:

    Il modulo sembra davvero molto interessante.

    Ne avrei avuto bisogno un paio d’anni fa per un progetto che ho seguito (tutt’ora in produzione) e, alla fin fine, ho optato per scrivermi una piccola libreria custom basata su Perspective Broker. Ma dagli esempi che fai mi sembra che quest’approccio sia decisamente più “generalista” e riutilizzabile in svariate situazione.

    Complimenti, e mi riprometto di darci un’occhiata approfondita ed eventualmente preparare una paginetta di recensione.

Policy per i commenti: Apprezzo moltissimo i vostri commenti, critiche incluse. Per evitare spam e troll, e far rimanere il discorso civile, i commenti sono moderati e prontamente approvati poco dopo il loro invio.