Programmazione 3 commenti
Programmare multi processi facilmente con Twisted Matrix /2
diNell’ultima puntata ci siamo fatti le ossa nella gestione di sottoprocessi utilizzando Twisted Matrix e un semplicissimo protocollo di comunicazione tra il processo padre e ciascun processo figlio. Come abbiamo anticipato però, se utilizziamo processi heavy-weight, specialmente in Python, dobbiamo anche concentrarci nell’ammortizzare il costo della loro creazione riutilizzandoli il più possibile, vedremo però che per garantire la salute degli worker process sono necessari numerosi controlli e molta intelligenza nella loro gestione.
AMPoule
Dalla stesura del primo articolo è passato ormai molto tempo, il semplice codice di esempio si è evoluto in un progetto più complesso, organico e flessibile che offre tutte le funzionalità basilari unite ad alcune caratteristiche più avanzate, il suo nome è AMPoule e in questo articolo ci concentreremo sulla sua struttura e sulle diverse situazioni di utilizzo.
Il progetto si divide in 3 diversi strati:
- Il ProcessPool vero e proprio che definisce la logica fondamentale di funzionamento insieme al codice di supporto per garantire la buona salute di tutti i processi figli.
- Il ProcessStarter che definisce come ciascun sottoprocesso viene creato e implementa l'interfaccia IStarter.
- L'AMPChild che rappresenta una base per la definizione delle funzionalità che ciascun worker process espone al codice che utilizza il pool.
Nel progetto è poi presente la possibilità di esporre, con una semplice interfaccia a linea di comando, ciascuna funzionalità presente nel pool attraverso un'interfaccia di rete per aiutare alla distribuzione del codice su macchine differenti e non solo sulla stessa. Questa è anche la funzionalità più interessante per il ProcessPool, permette di scalare in modo semplice la propria architettura su più macchine senza andare a toccare il codice della propria applicazione grazie all'utilizzo di processi separati senza condivisione di stato. Un thread pool avrebbe potuto sfruttare, e capita spesso che questo accada, una struttura dati condivisa tra gli worker e il master e questo sarebbe diventato un impedimento nel tentativo di muovere la computazione su più macchine.
Il ProcessPool
In ampoule.pool.ProcessPool possiamo trovare l'implementazione di tutta la logica di gestione dei sottoprocessi. Questa classe incorpora la parte principale del progetto e questo include:
- La capacità di ricreare processi che si interrompono erroneamente.
- Attraverso il parametro timeout del ProcessPool o della singola chiamata, la gestione di un timeout massimo per l'esecuzione di una chiamata prima di considerarla fallita.
- La capacità di far crescere e diminuire il numero di processi vivi attraverso la definizione di un numero minimo e massimo, con un tempo massimo di idle per ciascuno dei sottoprocessi, attraversato il quale un worker viene ucciso senza essere riavviato.
- La capacità di accodare le chiamate quando tutti gli worker sono occupati, questo si traduce nella capacità del sistema di bilanciare il carico tra tutti gli worker disponibili senza dover introdurre ulteriore logica in quanto la coda è svuotata in modalità First Come, First Served.
- La possibilità di definire un numero massimo di chiamate a cui ciascun sottoprocesso ha la possibilità di rispondere prima di essere riciclato, ovvero ucciso e riavviato, per garantire un utilizzo di memoria il più possibile costante anche alla luce di memory leak che possono essere presenti nelle librerie utilizzate dai worker.
A causa della relativa giovinezza del progetto non è ancora stata sviluppata un'interfaccia che definisca il minimo set di metodi necessari allo sviluppo di un ProcessPool compatibile, tuttavia possiamo già osservare alcuni dei metodi principali per gli utilizzatori: ProcessPool.startAWorker, ProcessPool.callRemote, ProcessPool.stopAWorker e chiaramente i metodi start e stop.
Come vedremo più avanti è possibile reimplementare le modalità di creazione dei sottoprocessi, anche eventualmente utilizzando differenti protocolli di comunicazione tra master e worker, ridefinendo alcuni dei metodi esposti dall'implementazione del ProcessPool è possibile adattare il Pool ai propri bisogni limitando il contatto con i moltissimi dettagli di basso livello necessari ad implementare in modo sicuro tutte queste funzionalità.
Dal punto di vista della comunicazione tra padre e figlio, questa implementazione si aspetta di comunicare attraverso il protocollo AMP, definito in twisted.protocols.amp, si tratta di una scelta dettata dalla leggerezza del protocollo e dalla relativa semplicità di porting del protocollo in altri linguaggi, esistono infatti implementazioni in C, Python senza l'uso di Twisted Matrix, Scheme e altri ancora. Questo implica anche che tutte le limitazioni e i pregi del protocollo andranno anche a definire i limiti della comunicazione tra padre e figlio e, fino a un certo punto, anche l'API che il ProcessPool si aspetta di trovare nei worker.
Il ProcessStarter
Come abbiamo già osservato, implementa l'interfaccia documentata in ampoule.iampoule.IStarter che definisce due metodi principali: IStarter.startPythonProcess e IStarter.startAMPProcess. L'implementazione di default del ProcessPool utilizza AMP come protocollo di comunicazione, un semplice sistema basato sull'rfc822, tuttavia lo starter cerca di garantire ai suoi utilizzatori la possibilità di sfruttare alternative attraverso l'uso della più "semplice" startPythonProcess che si limita a creare un sottoprocesso utilizzando il protocollo definito dallo sviluppatore, chiaramente però per poterlo utilizzare è necessario andare a ridefinire alcuni dei metodi che abbiamo visto nel ProcessPool.
La separazione tra ProcessStarter e ProcessPool permette poi di
semplificare il processo di testing unitario permettendo di verificare
separatamente il funzionamento del codice che crea sottoprocessi e
quello che li gestisce, con al limite la possibilità di testare il
codice di gestione senza dover per forza andare a creare
worker.
L'implementazione di default del ProcessStarter permette di definire alcuni parametri interessanti nei processi figli, come ad esempio il reactor che dovrebbe essere utilizzato, se select o gtk2 oppure epoll, oppure l'aggiunta di alcuni package che devono assolutamente essere presenti nel PYTHONPATH dei processi figli, attraverso il parametro packages.
AMPChild
La classe base per la definizione dei figli supporta un piccolo numero di comandi che sono utili per il testing e per la gestione di ciascun worker. Questo piccolo set di comandi rappresenta anche l'interfaccia che tutti i processi figli dovrebbero implementare a prescindere dal loro protocollo, in particolar modo il metodo shutdown permette di uccidere un worker process senza provocare una situazione di errore che ne provocherebbe il riavvio.
All'interno di questa classe possiamo eseguire numerose personalizzazioni e sfruttare il costruttore per eseguire tutto il codice bloccante di cui disponiamo in quanto, durante l'inizializzazione della classe figlio, il reactor non è ancora stato avviato. Il funzionamento delle pipe va poi a garantire che i messaggi inviati ai figli non vengano persi e che il figlio possa occuparsi di processarli quando finalmente avrà finito di inizializzarsi. Inoltre la funzione di accodamento delle chiamate nel ProcessPool garantisce che una sola chiamata per volta sia inviata a un worker e in caso di fallimento questa chiamata verrà ritornata al caller con il giusto errore.
Qualche esempio...
Dopo aver visto più nel dettaglio il funzionamento di AMPoule possiamo provare a usarlo per ottenere qualcosa di utile. Iniziamo a guardare uno degli esempi disponibili:
from ampoule import child, util
from twisted.protocols import amp
class Pid(amp.Command):
response = [("pid", amp.Integer())]
class MyChild(child.AMPChild):
@Pid.responder
def pid(self):
import os
return {"pid": os.getpid()}
@util.mainpoint
def main(args):
import sys
from twisted.internet import reactor, defer
# abilitiamo anche il logging per vedere cosa succede nei figli
from twisted.python import log
log.startLogging(sys.stdout)
from ampoule import pool
@defer.inlineCallbacks
def _run():
pp = pool.ProcessPool(MyChild, min=1, max=1)
yield pp.start()
result = yield pp.callRemote(Pid)
print "The Child process PID is:", result['pid']
yield pp.stop()
reactor.stop()
reactor.callLater(1, _run)
reactor.run()
In questo semplice esempio usiamo molte funzionalità speciali di
Python e Twisted Matrix e, anche se a prima vista non sembrerebbe,
stiamo usando un hack per poter scrivere questo codice in un singolo
file che ci permetta di avviare questo script usando python nomefile.py
a cui tutti gli sviluppatori Python sono abituati.
Eseguendolo otteniamo il seguente output:
2009-03-08 16:58:24-0700 [-] Log opened.
2009-03-08 16:58:25-0700 [-] Subprocess 0 started.
2009-03-08 16:58:25-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 16:58:25-0700 [-] ProcessPool stats:
2009-03-08 16:58:25-0700 [-] workers: 1
2009-03-08 16:58:25-0700 [-] timeout: None
2009-03-08 16:58:25-0700 [-] parent: None
2009-03-08 16:58:25-0700 [-] child: <class 'pid.MyChild'>
2009-03-08 16:58:25-0700 [-] max idle: 20
2009-03-08 16:58:25-0700 [-] recycle after: 500
2009-03-08 16:58:25-0700 [-] ProcessStarter:
2009-03-08 16:58:25-0700 [-] ProcessStarter(bootstrap='import sys\n\ndef main(reactor, ampChildPath):\n from twisted.application import reactors\n reactors.installReactor(reactor)\n \n from twisted.python import log\n log.startLogging(sys.stderr)\n\n from twisted.internet import reactor, stdio\n from twisted.python import reflect\n\n ampChild = reflect.namedAny(ampChildPath)\n stdio.StandardIO(ampChild(), 3, 4)\n reactor.run()\nmain(sys.argv[-2], sys.argv[-1])\n',
args=(),
env={},
path=None,
uid=None,
gid=None,
usePTY=0,
packages=('twisted', 'ampoule'),
childReactor='select')
2009-03-08 16:58:27-0700 [-] FROM 0: 2009-03-08 16:58:27-0700 [-] Log opened.
2009-03-08 16:58:27-0700 [-] FROM 0: 2009-03-08 16:58:27-0700 [-] MyChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x535ff0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x535eb0>)
2009-03-08 16:58:27-0700 [-] The Child process PID is: 11771
2009-03-08 16:58:27-0700 [-] FROM 0: 2009-03-08 16:58:27-0700 [-] Shutdown message received, goodbye.
2009-03-08 16:58:27-0700 [-] FROM 0: 2009-03-08 16:58:27-0700 [-] MyChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x535ff0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x535eb0>)
2009-03-08 16:58:27-0700 [-] FROM 0: 2009-03-08 16:58:27-0700 [-] Main loop terminated.
2009-03-08 16:58:27-0700 [-] Process: 0 ended
2009-03-08 16:58:27-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 16:58:27-0700 [-] STOPPING: ''
2009-03-08 16:58:27-0700 [-] Main loop terminated.
In questo output sono visibili alcune cose interessanti: le linee
che iniziano con FROM 0 sono linee che vengono effettivamente
generate da ciascuno dei processi figli, il numero 0 è un id unico
che ogni figlio riceve durante l'avvio. L'altro dettaglio interessante
è la print dettagliata di tutte le statistiche e dello stato del process
pool in cui è possibile vedere anche tutti gli argomenti con cui è
stato inizializzato il ProcessStarter.
Il primo punto per capire il file è l'uso del decoratore
util.mainpoint, la sua presenza è interamente legata
alla possibilità di importare il modulo che stiamo definendo. In
Python infatti molti sanno che un modo per rendere condizionale
l'esecuzione di codice all'interno di un modulo è quello di usare:
if __name__ == "__main__":
# qua va il codice eseguito solo quando il programma viene fatto
# partire con "python nomefile.py"
pass
Questo ha la spiacevole conseguenza di rendere difficile ricavare
dinamicamente il nome del modulo dal suo interno.
util.mainpoint va quindi sopperire a questo problema.
Scoperta la natura dell'hack che stiamo usando passiamo a parlare
dell'altro decoratore: defer.inlineCallbacks. Da
Python 2.5 i generatori hanno ricevuto la
possibilità di ricevere e inviare valori al loro chiamante. In
Twisted Matrix si è deciso di sfruttare questa nuova funzionalità
assieme alle deferred per rendere più semplice la
programmazione asincrona, senza tuttavia rimuovere completamente
le tracce di questo tipo di programmazione, al fine di non trarre
in inganno gli sviluppatori. Decorando qualsiasi funzione con
questo decoratore e` possibile utilizzare yield per
sospendere l'esecuzione della funzione finchè la deferred
ritornata non si sblocca o continuando l'esecuzione oppure con
un'eccezione.
A questo punto abbiamo visto gli elementi necessari per iniziare a
osservare più in dettaglio questo esempio e quelli che verranno.
Quando si scrivono software multi-processo è sempre estremamente
utile capire in che processo viene eseguita ciascuna linea di
codice che scriviamo in quanto da questo dipende la modalità di
comunicazione tra padre e figlio in un process pool. Nell'esempio
in questione possiamo dire che tutto il codice all'interno della
funzione _run è eseguito nel processo padre, mentre
la classe MyChild sarà eseguita nel processo figlio.
In realtà questo codice sarà compilato completamente sia dal padre
che dal figlio, ma grazie a util.mainpoint solo il
padre eseguirà quella sezione, il figlio infatti importerà
soltanto questo modulo. La classe Pid rappresenta la
modalità di comunicazione tra master e worker o, in altri termini,
ciò che il figlio è in grado di eseguire. In questo particolare
esempio l'unica cosa che facciamo fare al figlio è ritornare il
pid del processo in cui si trova, non ci sono poi ulteriori spunti
interessanti, è un esempio molto semplice che serve solo a dare
un'idea del minimo indispensabile per distribuire il lavoro su
più processi.
Passiamo a vedere un esempio appena più interessante allora:
from ampoule import child, util
@util.mainpoint
def main(args):
from twisted.internet import reactor, defer
from ampoule import pool, Ping
import time
@defer.inlineCallbacks
def _run():
pp = pool.ProcessPool(child.AMPChild, recycleAfter=10000)
pp.min = 1
pp.max = 5
yield pp.start()
t = time.time()
REPEATS = 40000
l = [pp.callRemote(Ping) for x in xrange(REPEATS)]
yield defer.DeferredList(l)
print REPEATS/(time.time() - t)
yield pp.stop()
reactor.stop()
reactor.callLater(1, _run)
reactor.run()
In questo esempio cerchiamo di sfruttare un process pool nel modo in cui andremo a usarlo effettivamente nel nostro software. Tuttavia il comando che eseguiamo risulta estremamente semplice e quindi ciò che andremo a misurare è l'effettiva velocità di round-trip per le chiamate ai processi figli. Eseguendo tutto sulla stessa macchina tuttavia otteniamo in output esattamente in numero di chiamate al secondo che siamo stati in grado di eseguire. Il collo di bottiglia in questo sistema è chiaramente rappresentato dal codice che serializza e deserializza i comandi e le risposte, per la maggior parte degli usi però non andremo mai a preoccuparci di questo in quanto stiamo cercando di spostare l'esecuzione di codice bloccante, che quindi impiegherà un lungo tempo per essere eseguito, essendo l'overhead di serializzazione più basso della computazione, possiamo permettendoci di non preoccuparci del limite di chiamate al secondo. Abilitando il logging sarà possibile vedere ciò che avviene nei sottoprocessi e variando i parametri con cui viene creato il ProcessPool potremo osservare le modalità di crescita e riduzione delle dimensioni del ProcessPool. Ad esempio sulla mia macchina, passando 5000 come valore per recycleAfter osservo questo:
2009-03-08 17:22:18-0700 [-] Log opened.
2009-03-08 17:22:19-0700 [-] Subprocess 0 started.
2009-03-08 17:22:19-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:19-0700 [-] ProcessPool stats:
2009-03-08 17:22:19-0700 [-] workers: 1
2009-03-08 17:22:19-0700 [-] timeout: None
2009-03-08 17:22:19-0700 [-] parent: None
2009-03-08 17:22:19-0700 [-] child: <class 'ampoule.child.AMPChild'>
2009-03-08 17:22:19-0700 [-] max idle: 20
2009-03-08 17:22:19-0700 [-] recycle after: 5000
2009-03-08 17:22:19-0700 [-] ProcessStarter:
2009-03-08 17:22:19-0700 [-] ProcessStarter(bootstrap='import sys\n\ndef main(reactor, ampChildPath):\n from twisted.application import reactors\n reactors.installReactor(reactor)\n \n from twisted.python import log\n log.startLogging(sys.stderr)\n\n from twisted.internet import reactor, stdio\n from twisted.python import reflect\n\n ampChild = reflect.namedAny(ampChildPath)\n stdio.StandardIO(ampChild(), 3, 4)\n reactor.run()\nmain(sys.argv[-2], sys.argv[-1])\n',
args=(),
env={},
path=None,
uid=None,
gid=None,
usePTY=0,
packages=('twisted', 'ampoule'),
childReactor='select')
2009-03-08 17:22:19-0700 [-] Subprocess 1 started.
2009-03-08 17:22:19-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:19-0700 [-] Subprocess 2 started.
2009-03-08 17:22:19-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:19-0700 [-] Subprocess 3 started.
2009-03-08 17:22:19-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:19-0700 [-] Subprocess 4 started.
2009-03-08 17:22:19-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:21-0700 [-] FROM 2: 2009-03-08 17:22:21-0700 [-] Log opened.
2009-03-08 17:22:21-0700 [-] FROM 0: 2009-03-08 17:22:21-0700 [-] Log opened.
2009-03-08 17:22:21-0700 [-] FROM 3: 2009-03-08 17:22:21-0700 [-] Log opened.
2009-03-08 17:22:21-0700 [-] FROM 4: 2009-03-08 17:22:21-0700 [-] Log opened.
2009-03-08 17:22:21-0700 [-] FROM 1: 2009-03-08 17:22:21-0700 [-] Log opened.
2009-03-08 17:22:21-0700 [-] FROM 0: 2009-03-08 17:22:21-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:21-0700 [-] FROM 2: 2009-03-08 17:22:21-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:21-0700 [-] FROM 4: 2009-03-08 17:22:21-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:21-0700 [-] FROM 3: 2009-03-08 17:22:21-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:21-0700 [-] FROM 1: 2009-03-08 17:22:21-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:33-0700 [-] FROM 0: 2009-03-08 17:22:33-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:33-0700 [-] FROM 0: 2009-03-08 17:22:33-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:33-0700 [-] FROM 0: 2009-03-08 17:22:33-0700 [-] Main loop terminated.
2009-03-08 17:22:33-0700 [-] Process: 0 ended
2009-03-08 17:22:33-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:33-0700 [-] STOPPING: ''
2009-03-08 17:22:33-0700 [-] Subprocess 5 started.
2009-03-08 17:22:33-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:34-0700 [-] FROM 1: 2009-03-08 17:22:34-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:34-0700 [-] FROM 1: 2009-03-08 17:22:34-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:34-0700 [-] FROM 1: 2009-03-08 17:22:34-0700 [-] Main loop terminated.
2009-03-08 17:22:34-0700 [-] Process: 1 ended
2009-03-08 17:22:34-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:34-0700 [-] STOPPING: ''
2009-03-08 17:22:34-0700 [-] Subprocess 6 started.
2009-03-08 17:22:34-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:34-0700 [-] FROM 4: 2009-03-08 17:22:34-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:34-0700 [-] FROM 4: 2009-03-08 17:22:34-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:34-0700 [-] FROM 4: 2009-03-08 17:22:34-0700 [-] Main loop terminated.
2009-03-08 17:22:34-0700 [-] Process: 4 ended
2009-03-08 17:22:34-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:34-0700 [-] STOPPING: ''
2009-03-08 17:22:34-0700 [-] Subprocess 7 started.
2009-03-08 17:22:34-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:34-0700 [-] FROM 2: 2009-03-08 17:22:34-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:34-0700 [-] FROM 2: 2009-03-08 17:22:34-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:34-0700 [-] FROM 2: 2009-03-08 17:22:34-0700 [-] Main loop terminated.
2009-03-08 17:22:34-0700 [-] Process: 2 ended
2009-03-08 17:22:34-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:34-0700 [-] STOPPING: ''
2009-03-08 17:22:34-0700 [-] Subprocess 8 started.
2009-03-08 17:22:34-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:34-0700 [-] FROM 3: 2009-03-08 17:22:34-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:34-0700 [-] FROM 3: 2009-03-08 17:22:34-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:34-0700 [-] FROM 3: 2009-03-08 17:22:34-0700 [-] Main loop terminated.
2009-03-08 17:22:34-0700 [-] Process: 3 ended
2009-03-08 17:22:34-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:34-0700 [-] STOPPING: ''
2009-03-08 17:22:34-0700 [-] Subprocess 9 started.
2009-03-08 17:22:34-0700 [-] AMP connection established (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:36-0700 [-] FROM 5: 2009-03-08 17:22:36-0700 [-] Log opened.
2009-03-08 17:22:36-0700 [-] FROM 5: 2009-03-08 17:22:36-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:36-0700 [-] FROM 6: 2009-03-08 17:22:36-0700 [-] Log opened.
2009-03-08 17:22:36-0700 [-] FROM 8: 2009-03-08 17:22:36-0700 [-] Log opened.
2009-03-08 17:22:36-0700 [-] FROM 7: 2009-03-08 17:22:36-0700 [-] Log opened.
2009-03-08 17:22:36-0700 [-] FROM 6: 2009-03-08 17:22:36-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:36-0700 [-] FROM 7: 2009-03-08 17:22:36-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:36-0700 [-] FROM 8: 2009-03-08 17:22:36-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:36-0700 [-] FROM 9: 2009-03-08 17:22:36-0700 [-] Log opened.
2009-03-08 17:22:36-0700 [-] FROM 9: 2009-03-08 17:22:36-0700 [-] AMPChild connection established (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:44-0700 [-] 1592.24963936
2009-03-08 17:22:44-0700 [-] FROM 5: 2009-03-08 17:22:44-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:44-0700 [-] FROM 5: 2009-03-08 17:22:44-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:44-0700 [-] FROM 5: 2009-03-08 17:22:44-0700 [-] Main loop terminated.
2009-03-08 17:22:44-0700 [-] FROM 8: 2009-03-08 17:22:44-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:44-0700 [-] FROM 8: 2009-03-08 17:22:44-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:44-0700 [-] FROM 8: 2009-03-08 17:22:44-0700 [-] Main loop terminated.
2009-03-08 17:22:44-0700 [-] FROM 7: 2009-03-08 17:22:44-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:44-0700 [-] FROM 7: 2009-03-08 17:22:44-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:44-0700 [-] FROM 7: 2009-03-08 17:22:44-0700 [-] Main loop terminated.
2009-03-08 17:22:44-0700 [-] FROM 6: 2009-03-08 17:22:44-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:44-0700 [-] FROM 6: 2009-03-08 17:22:44-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:44-0700 [-] FROM 6: 2009-03-08 17:22:44-0700 [-] Main loop terminated.
2009-03-08 17:22:44-0700 [-] Process: 6 ended
2009-03-08 17:22:44-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:44-0700 [-] STOPPING: ''
2009-03-08 17:22:44-0700 [-] FROM 9: 2009-03-08 17:22:44-0700 [-] Shutdown message received, goodbye.
2009-03-08 17:22:44-0700 [-] FROM 9: 2009-03-08 17:22:44-0700 [-] AMPChild connection lost (HOST:<twisted.internet._posixstdio.PipeAddress object at 0x108f8b0> PEER:<twisted.internet._posixstdio.PipeAddress object at 0x10652d0>)
2009-03-08 17:22:44-0700 [-] FROM 9: 2009-03-08 17:22:44-0700 [-] Main loop terminated.
2009-03-08 17:22:44-0700 [-] Process: 5 ended
2009-03-08 17:22:44-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:44-0700 [-] STOPPING: ''
2009-03-08 17:22:44-0700 [-] Process: 7 ended
2009-03-08 17:22:44-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:44-0700 [-] STOPPING: ''
2009-03-08 17:22:44-0700 [-] Process: 8 ended
2009-03-08 17:22:44-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:44-0700 [-] STOPPING: ''
2009-03-08 17:22:44-0700 [-] Process: 9 ended
2009-03-08 17:22:44-0700 [-] AMP connection lost (HOST:('no host',) PEER:('subprocess',))
2009-03-08 17:22:44-0700 [-] STOPPING: ''
2009-03-08 17:22:44-0700 [-] Main loop terminated.
Come esempio finale andiamo a realizzare qualcosa che allo stesso tempo risulta estremamente utile e molto interessante. Cercheremo di realizzare un piccolo servizio di creazione di screenshot di siti internet che non richiede la presenza di X11 per essere utilizzato. L'esempio che vedremo è molto lungo e anche abbastanza complesso in quanto cerca di tenere presenti alcuni dettagli importanti per avere un servizio anche basilare. Non sarà compito di questo articolo arrivare al setup completo di un sistema basato su questo codice, ma le librerie necessarie, come libgtkwebkit e xvfb sono disponibili in molte distribuzioni linux recenti. Altro punto è la selezione del reactor gtk2 nei sottoprocessi per poter integrare Twisted Matrix con GTK+2.
# This code is MIT licensed, copyright of adroll.com
import re
import os
import uuid
import base64
import urlparse
import gtk
import gobject
import webkit
from twisted.protocols import amp
from twisted.internet import defer, reactor
from ampoule import child, util
DOCUMENT_WIDTH=990 # 1024, with the scrollbar taken into account
DOCUMENT_HEIGHT=768
SCREENSHOT_WIDTH = 320
SCREENSHOT_HEIGHT = 240
# Utility functions
def generate_uuid():
"""
create a 22 char globally unique identifier
"""
u = uuid.uuid4()
b = base64.b32encode(u.bytes)
b = b.rstrip("=\n") # lose the "==" that finishes a base64 value
return b.decode('utf-8')
####################
# Ampoule integration
class FetchURL(amp.Command):
"""
The command definition for requesting a URL.
"""
arguments = [("url", amp.String())]
response = [("filepath", amp.Path())]
class ScreenshotService(child.AMPChild):
"""
The child which ampoule manages.
"""
_browser = None
@FetchURL.responder
def fetch_url(self, url):
if not self._browser:
self._browser = Browser()
return self._browser.fetch_url(url)
#####################
# gtk code that runs inside the Twisted Matrix reactor
class Browser(gtk.Window):
"""
Represents the browser application.
"""
def __init__(self):
gtk.Window.__init__(self)
# Variables to ensure that no requests are outstanding before taking
# the screenshot
self._redirect = False
self._redirect_caught = False
self._load_wait = False
# Create the interface
self._scrolled = gtk.ScrolledWindow()
self._browser = webkit.WebView()
self._browser.set_size_request(DOCUMENT_WIDTH, DOCUMENT_HEIGHT)
self._scrolled.add(self._browser)
self.add(self._scrolled)
# Setup callbacks and set the size of the window.
self.connect('destroy', lambda *a, **kw: reactor.stop())
self._browser.connect('load-finished', self._loaded)
self._browser.connect('navigation-requested', self._handle_redirects)
self.show_all()
def fetch_url(self, url):
"""
Fetches the given url.
@param url: The url to visit.
"""
def cb(_):
self._fetch_url_deferred = defer.Deferred()
self._browser.open(url)
return self._fetch_url_deferred
def eb(_):
return {"filepath" : u""}
# preemptively check if the url exists, if not don't waste
# resources.
hostname = urlparse.urlparse(url).hostname
return reactor.resolve(hostname).addCallback(cb).addErrback(eb)
def _send_response_back(self, filepath):
"""
Here we return the result to our caller.
"""
self._fetch_url_deferred.callback({"filepath" : filepath})
self._fetch_url_deferred = None
def _handle_redirects(self, pane, frame, request):
"""
Handles each new network request
Note these parameters aren't properly converted to Python by the pygtkwebkit wrapper.
@param pane: WebkitPane
@param frame: WebkitFrame
@param request: Webkit Request
"""
if self._load_wait:
self._redirect = True
def _loaded(self, view, frame):
"""
Handles each document loaded event.
Register a callback to grab a snapshot of the webpage.
It needs to be delayed in case there are other requests being made that
we didn't know about (ie javascript redirects)
"""
self._load_wait = True
reactor.callLater(0.5, self._snapshot)
def _snapshot(self):
"""
Create the full length pixmap and capture the first screen full of it
and scroll as necessary to capture the full page.
"""
# If a redirect happened than the first loaded signal was bogus, wait
# for the next loaded signal.
#
# We have to use another variable here because there are multiple
# requested signals per load event
self._load_wait = False
if self._redirect and not self._redirect_caught:
self._redirect_caught = True
return False
else:
self._redirect = False
self._redirect_caught = False
# Get the dimensions of the HTML widget
window = self._scrolled.window
(x, y, _, _, depth) = window.get_geometry()
(width, height) = self._browser.window.get_geometry()[2:4]
# Create a pixmap of the screen above the fold
self._pixbuf = gtk.gdk.Pixbuf(gtk.gdk.COLORSPACE_RGB, False, 8, width, height)
# Capture the visible part of the page
self._pixbuf.get_from_drawable(window, self.get_colormap(), 0, 0, 0, 0, width, height)
# Cut off the image to 4:3 aspect ratio
above_fold = gtk.gdk.Pixbuf(gtk.gdk.COLORSPACE_RGB, False, 8, 990, 742)
self._pixbuf.copy_area(0, 0, 990, 742, above_fold, 0, 0)
# Scale to SCREENSHOT_WIDTH x SCREENSHOT_HEIGHT
medium_size = above_fold.scale_simple(SCREENSHOT_WIDTH, SCREENSHOT_HEIGHT, gtk.gdk.INTERP_BILINEAR)
# Figure out filename
medium_size_filename = u"/tmp/%s.jpg" % (generate_uuid(),)
# Save the image
medium_size.save(medium_size_filename.encode('utf-8'), "jpeg")
self._send_response_back(medium_size_filename)
# Don't reschedule this
return False
Per ragioni di spazio, non andremo a sviscerare l'esempio in ogni
dettaglio, anche perché molti di essi riguardano più che altro il
funzionamento di WebKit e GTK+, tuttavia notiamo come l'integrazione
di questo codice con AMPoule è particolarmente semplice e si limita
alla definizione di un semplice comando FetchURL e
della classe ScreenshotService che definisce il servizio.
All'interno della classe Browser invece gli unici
punti rilevanti sono il metodo Browser.fetch_url e
Browser._send_response_back che si occupano
rispettivamente di creare la deferred per ritornare il risultato e
di sbloccare la deferred una volta che lo screenshot è stato
salvato.
Il codice per rendere questo esempio un vero e proprio servizio non è molto diverso da quello che abbiamo visto negli esempi precedenti e lo lascio quindi come esercizio per i lettori più curiosi. Utilizzando il plugin di AMPoule per il tool twistd sarà poi possibile esporre questo servizio di screenshot sulla rete per potervi accedere da macchine remote, anche se in questo caso diventerebbe necessaria l'introduzione di un filesystem condiviso per poter salvare lo screenshot in modo che tutti vi possano accedere usando lo stesso URI.
- Pubblicato il
- 15 Mar 2009
- Tag


Non ho ancora letto l'articolo, ma è curioso leggerlo proprio il giorno dopo che ho scoperto http://opensource.hyves.org/concurrence/
tra le altre cose, nella pagina dicono: "The goal of Concurrence is to provide an easier programming model for writing high performance network applications than existing solutions (Multi-threading, Twisted, asyncore etc)."
secondo te quanto puo' essere veloce / stabile / affidabile questa soluzione rispetto a twisted?
Anche io ho visto Concurrence poco tempo fa per la prima volta, non l'ho pubblicato in conseguenza comunque :).
Leggendo un po' il codice di concurrence comunque direi che l'API non e` per nulla molto pulita e semplice, il progetto in se e` promettente e offre molto di cio` che serve per sviluppare applicazioni web, pero` dal punto di vista dell'API e` davvero molto molto indietro e l'autore, secondo me, ha fatto scelte molto discutibili a riguardo.
L'esempio che mostra qua e` enormemente piu` complesso dello stesso esempio fatto con Twisted. Inoltre anche per Twisted esistono numerose estensioni, anche semplicemente twisted.internet.task.Cooperator, che permettono di usare dei generatori per gestire l'elaborazione, e per chi e` davvero innamorato delle greenlet esiste anche corotwine e l'esempio dello stesso chat server in questa estensione di Twisted e` qua anche se e` un po' che non riceve molte attenzioni.
L'altro problemino di concurrence e` che utilizza sendmsg per inviare gli fd tra processi sulla stessa macchina, questo lo rende automaticamente non portabile al di fuori degli UNIX.
Comunque e` uno dei tanti progetti interessanti a questo riguardo che si stanno creando in Python. Altri progetti interessanti simili a concurrence, o per concetto o per realizzazione, sono eventlet, fibra che pero` non supporta ancora I/O non bloccante, e appunto questo Concurrence che sembra molto completo ma ha il problema della complessita` della sua API su cui non e` stato fatto molto lavoro di semplificazione secondo me, da seguire per vedere dove andra` a finire ma per ora non scriverei applicazioni in produzione, magari qualche prototipo se volessi provarlo. Twisted Matrix dalla sua ha l'immensa maturita` e la vastita` delle librerie che lo supportano. Scrivere una piccola alternativa a Twisted e` relativamente molto facile, coprire tutti i casi estremi e le problematiche e renderlo portabile e integrato con altri event-loop (come le librerie grafiche) invece e` molto complesso. Se volessi davvero guardare verso networking ad alte o altissime prestazioni andrei verso erlang.
Il modulo sembra davvero molto interessante.
Ne avrei avuto bisogno un paio d'anni fa per un progetto che ho seguito (tutt'ora in produzione) e, alla fin fine, ho optato per scrivermi una piccola libreria custom basata su Perspective Broker. Ma dagli esempi che fai mi sembra che quest'approccio sia decisamente più "generalista" e riutilizzabile in svariate situazione.
Complimenti, e mi riprometto di darci un'occhiata approfondita ed eventualmente preparare una paginetta di recensione.