POE, un entorno orientado a eventos
Si ya has programado una aplicación gráfica usando algo como Tk o Gtk, sabrás que es un poco diferente de la programación procedural diaria. En la programación normal, se escribe una secuencia de cosas que quieres que el programa haga y éste lo hace. Sin embargo, en las GUI's no se trabaja de ésta manera. Se setea un entorno (por ejemplo una ventana) que responde a ciertos eventos (un click de un botón o la selección de un ítem en un menú). A eso se le llama Paradigma Orientado a Eventos.No sólo se usa en GUI's. Por ejemplo, un servidor en una red no realiza una secuencia de eventos, sino que se sienta a esperar una conexión (un evento) y entonces le sirve a esa conexión según el input del cliente. Cuando el cliente termina y se desconecta, el servidor vuelve a esperar por un próximo evento.
En forma similar se podría escribir un script que mira una carpeta; el script se sienta a mirar y periódicamente busca en los archivos de la carpeta, y cuando detecta cambios dispara una respuesta o realiza algunas acciones.
El núcleo del paradigma orientado a eventos es el bucle principal, a veces llamado 'main loop'. Tk tiene uno, el módulo Event tiene uno, y POE, un entorno orientado a eventos, tiene uno. El bucle principal de POE es manejado por el kernel POE.
POE puede ser pensado como un diminuto sistema operativo que tiene un kernel. Cuando el kernel de un sistema operativo termina de asignar los trabajos en su entorno, se sienta a esperar por nuevos eventos. Éstos pueden ser llamadas al sistema desde el espacio del usuario o interrupciones de hardware. Además de manejar eventos se ocupa también del pasaje de mensajes entre los diferentes componentes, típicamente comunicación entre procesos (IPC).
El kernel POE también sirve a eventos y maneja la comunicación entre las diferentes partes del mundo de POE, aunque su equivalente de los procesos son llamados Sessions.
Hola mundo, POE
Mucha charla y nada de código, rectifiquemos con un breve ejemplo: #!/usr/bin/perl
use strict;
use warnings;
use POE;
POE::Session->create(
inline_states => {
_start => \&start,
hello => \&hello,
},
);
print "Running Kernel\n";
$poe_kernel->run( );
print "Exiting\n";
exit(0);
sub start {
my ($kernel) = $_[KERNEL];
print "Setting up a session\n";
$kernel->yield("hello");
}
sub hello { print "Hola Mundo!\n"; }
Este es el equivalente POE del famoso programa Hola Mundo. Si continuamos con la analogía del sistema operativo (una analogía poco útil pero por ahora la usaremos) entonces iniciamos el kernel de la máquina y creamos un único proceso que imprime "Hola Mundo!" y finaliza.
use POE;
print "Running Kernel\n";
$poe_kernel->run( );
print "Exiting\n";
exit(0);
Aquí está el núcleo de cualquier programa POE, la variable $poe_kernel es provista por el módulo POE y representa el kernel mismo. La llamada a run() en muchos casos no retorna nunca, por ejemplo un servidor que espera en un loop por nuevas conexiones. En nuestro caso, sin embargo, sólo seteamos una pequeña sesión que termina en seguida. En códigos más nuevos se prefiere usar POE::Kernel->run(), en vez de la variable global.
POE::Session->create(
inline_states => {
_start => \&start,
hello => \&hello,
},
);
En esta parte se crea una sesión. Una sesión puede ser pensada como una máquina de estados con múltiples estados, o como un manejador de múltiples eventos, las dos representaciones son equivalentes. Hablando de estados, el ejemplo anterior define dos estados en el parámetro inline_states que se pasa al constructor. Los estados cuyos nombres empiezan con guión bajo son predefinidos por POE, mientras que los otros son definidos por el usuario. La sesión entra al estado _start automáticamente después de ser construída.
Hablando en términos de eventos, decimos que nuestra sesión responde al evento _start y al evento hello y que POE envía un evento _start a la sesión tan pronto como ésta es creada.
Hay otros eventos predefinidos, la mayoría de ellos son para hacer relaciones padre-hijos y señales. Está el evento _stop que es enviado cuando la sesión debe finalizar. Veamos cómo se define un manejador de eventos:
sub start {
my ($kernel) = $_[KERNEL];
print "Setting up a session\n";
$kernel->yield("hello");
}
sub hello { print "Hola Mundo!\n"; }
Se le pasa a nuestro manejador start() un número de parámetros, uno de los cuales es un manejador del kernel POE. Se extrae éste de la lista de parámetros usando la constante KERNEL. En aras de la eficiencia, POE utiliza constantes para indexar la variable @_ en vez de un parámetro hash. A menudo se verán manejadores que comienzan con algo como esto:
my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];
Esto es un ordinario array slice, con índices constantes. Retornan el kernel POE, la heap y el objeto sesión actual. La heap es un lugar donde la sesión puede almacenar su privado stuff. Volveremos a esto para ver qué tipo de stuff es bueno almacenar en la heap más tarde.
Ahora que tenemos el kernel, qué hacemos con él? Bien, le decimos que queremos pasar a otro estado, el estado hello:
$kernel->yield("hello");
Usamos yield() para enviar un evento a la sesión actual; si tuviéramos almacenada otra sesión, podríamos comunicarnos con ella enviándole un evento usando el método post(). Veremos un ejemplo más adelante.
Por ahora le dijimos al kernel POE que queremos pasar al estado hello. Pero esto no sucederá hasta que POE corra en su bucle de eventos. Una vez que corre en su bucle con $poe_kernel->run(), el kernel mira en su lista de tareas pendientes, encuentra que la primer cosa para hacer es pasar nuestra sesión al estado hello y dispara el manejador apropiado. Entonces se imprime el mensaje "Hola Mundo!".
Hola de nuevo, POE!
Supongamos que ahora queremos repetir el mensaje cada 5 segundos. Podríamos lograrlo: sub hello {
my ($kernel) = $_[KERNEL];
print "Hola Mundo!\n";
sleep 5;
$kernel->yield("hello");
}
Funciona, pero no es la manera de comportarse en un entorno cooperativo y multitarea. No podemos colgar el kernel entero por 5 segundos porque otras sesiones podrían tener cosas que hacer: por ejemplo en una red que necesita servicios, etc. En vez, permitimos que el kernel maneje el estado hello dentro de 5 segundos en el futuro. Para hacerlo usamos el método delay_set() del kernel:
sub hello {
my ($kernel) = $_[KERNEL];
print "Hola Mundo!\n";
$kernel->delay_set("hello", 5);
}
Nota mental: no usar sleep() dentro de POE porque pueden ocurrir situaciones indeseables.
Ahora seremos más amables. Veamos como podemos hacer con dos sesiones diferentes corriendo.
Se trata de un código ligeramente modificado del maravilloso tutorial de POE de Matt Sergeant:
use POE;
for my $session_no (1..2) {
POE::Session->create(
inline_states => {
hello => \&hello,
_start => sub { $_[KERNEL]->alias_set("session_" . $session_no) },
});
}
$poe_kernel->post("session_1", "hello", "session_2");
$poe_kernel->run( );
exit(0);
sub hello {
my ($kernel, $session, $next) = @_[KERNEL, SESSION, ARG0];
print "Event in ", $kernel->alias_list($session), "\n";
$kernel->post($next, "hello", $session->ID);
}
Ambas sesiones van ejecutando hello() en forma alternada, y para lograr esto una sesión le pide a la otra que se ejecute y viceversa.
Veamos con más detalle; creamos las sesiones en un bucle (en este caso dos) que tienen un manejador start() y un manejador para el evento hello(). Las sesiones comparten el código para sus dos manejadores, pero los argumentos que se les pasan serán distintos en cada caso.
Esta vez el manejador start() hace algo un poco diferente del script anterior. Le dice al kernel que registre un alias para la sesión actual. Cada sesión tiene un ID interno (que se usa más tarde en el script) pero que solamente conoce POE cuando crea las sesiones. Registrando un alias amigable para el programador nosotros obtenemos una manera para referirnos a la sesión más adelante. Esta vez start() no hace un yield().
Nuevamente para ser amigables con el programador, le pedimos al kernel cuál es el alias de nuestra sesión para mostrarla en un mensaje:
print "Event in ", $kernel->alias_list($session), "\n";
Ahora que hay más de una sesión necesitamos decirle al kernel cual de ellas comenzará la acción, por eso hacemos un post al evento hello() de la primera sesión, llamándola por su alias. El tercer parámetro es un argumento más del post, en este caso le pasamos el alias de la segunda sesión:
$poe_kernel->post("session_1", "hello", "session_2");
Cuando hacemos yield() o posteamos eventos, podemos pasar parámetros adicionales al evento, los cuales pasan al manejador del evento. Estos argumentos llegan en la variable @_ comenzando en la posición ARG0. Si tenemos muchos argumentos, podríamos escribir algo como esto para tomarlos a todos:
my ($kernel, $session, @args) = @_[KERNEL, SESSION, ARG0..$#_];
Pero aquí nos interesa solamente el primer argumento, que es el nombre de la próxima sesión a llamar. La sesión 1 le pasa el control a la sesión 2 y viceversa. Ahora que empieza a correr no necesitamos ser amigables con el programador, entonces identificamos a la sesión por su ID interno:
$kernel->post($next, "hello", $session->ID);
Está diciendo: "Yo te estoy llamando a tí ahora, y la próxima vez llamame a mí (por mi ID)".
Con estas dos sesiones corriendo, tenemos un entorno cooperativo y multitarea:
Event in session_1
Event in session_2
Event in session_1
Event in session_2
...
Sin embargo, si vamos a hacer algo interesante con este entorno, tenemos que comenzar a mirar qué nos trae POE para más complejas acciones de I/O.Wheels
Los Wheels son la fuerza de arrastre (ha, ha!) del sistema de I/O de POE. Un wheel es una conexión al mundo exterior que genera los eventos. Miremos los wheels como un equivalente de los manejadores de ficheros, pero son más que eso.El más simple wheel para entender es POE::Wheel::FollowTail, el cual sigue de cerca a un archivo que está creciendo. Se le pasa un nombre de archivo y el wheel genera eventos cuando el archivo tiene actualizaciones. Veamos un ejemplo cortito:
use POE qw(Wheel::FollowTail);
POE::Session->create(
inline_states => {
_start => sub {
my ($heap) = $_[HEAP];
my $log_watcher = POE::Wheel::FollowTail->new(
Filename => "my_log_file.txt",
InputEvent => "got_record",
);
$heap->{watcher} = $log_watcher;
},
got_record => sub { my $record = $_[ARG0]; print $record,"\n"; }
}
);
$poe_kernel->run( );
Primero, notemos la forma compacta de cargar múltiples módulos POE; cualquier parámetro pasado a use POE será interpretado como nombres de módulo bajo POE:: y serán usados.
Como antes, tenemos dos estados. El estado got_record es bonito y fácil de enteder: imprime su argumento. Miremos el estado _start con más detalle:
my $log_watcher = POE::Wheel::FollowTail->new(
Filename => "my_log_file.txt",
InputEvent => "got_record",
);
El trabajo del evento start es setear nuestro wheel. Le decimos que mire al archivo my_log_file.txt y que postee el evento got_record cada vez que vea una nueva línea.
Y qué hacemos con nuestro wheel? Nosotros queremos que el wheel persista por la duración de la sesión sino sería algo inútil, al ser un objeto ordinario de Perl será destruído al finalizar el bloque de ejecución actual sino lo almacenamos en algún lado. Por eso tener un área de almacenamiento por sesión es muy valioso, la heap:
my ($heap) = $_[HEAP];
. . .
$heap->{watcher} = $log_watcher;
Eso es todo lo que necesitamos; el wheel se sienta a mirar el archivo y va generando los eventos, nuestro manejador imprime las líneas que ha visto. Ahora agregaremos otro wheel en la ecuación.
Nota: el script anterior pierde el primer carácter de la línea aunque no sea un "\n"?
Supongamos por alguna razón que nuestro archivo de log es en realidad un log con datos binarios y queremos imprimir las líneas en exadecimal usando el comando hexdump.
Si no tienes el comando hexdump lo puedes imitar creando tu propio script en Perl, como el siguiente y llamarlo hexdump:
my $i = -16;
binmode(STDIN);
my $data; $|++;
printf "%07x ". ("%02x%02x "x8)."\n", $i+=16, map ord, split//,$data
while read STDIN, $data, 16;
El POE::Wheel::Run manejará I/O apoyado en programas externos. Podemos crear un wheel que llame a hexdump y enviarle los datos que queramos:
use POE qw(Wheel::FollowTail Wheel::Run);
POE::Session->create(
inline_states => {
_start => sub {
my ($heap) = $_[HEAP];
my $log_watcher = POE::Wheel::FollowTail->new(
Filename => "my_log_file.txt",
InputEvent => "redirect",
);
my $dumper = POE::Wheel::Run->new(
Program => "/usr/bin/hexdump",
StdoutEvent => "print"
);
$heap->{watcher} = $log_watcher;
$heap->{dumper} = $dumper;
},
redirect => sub {
my ($heap, $data) = @_[HEAP, ARG0];
$heap->{dumper}->put($data);
},
print => sub { my $record = $_[ARG0]; print $record, "\n"; }
}
);
$poe_kernel->run( );
Miremos lo que está sucediendo:
El wheel FollowTail obtiene sus datos y los envía a la sesión, la cual los envía al wheel Run, éste genera un evento print e imprime esos datos. Maravilloso.
Excepto que no funciona. Si ejecutamos el programa con el comando hexdump de Linux, todos nuestros datos desaparecen en el éter y nunca los verás de nuevo. Pero hay algo interesante: si usamos el hexdump echo en Perl, el programa trabaja bien. Cómo es posible?
La clave es el mágico $|++ de nuestra versión. El hexdump del sistema almacena toda la salida si siente que está conectado a un pipe. Como nuestro programa nunca termina, hexdump sólo se queda almacenando datos hasta que nosotros cortamos la ejecución, y entonces todo se pierde. Necesitamos que el comando hexdump piense que está conectado con un terminal real. Por supuesto, POE provee una manera para hacer esto:
my $dumper = POE::Wheel::Run->new(
Program => "/usr/bin/hexdump",
Conduit => "pty",
StdoutEvent => "print"
);
Hay otros wheels que pueden trabajar juntos como este: POE::Wheel::Curses lee datos usando una librería no bloqueante de interface Curses, mientras que POE::Wheel::ReadLine usa Term::ReadKey para implementar una interface de consola de entrada basada en líneas. POE::Wheel::ListenAccept es un socket de bajo nivel que se queda escuchando. Veremos a continuacón dos de los más importantes wheels en el próximo ejemplo: POE::Wheel::ReadWrite y POE::Wheel::SocketFactory.
Despachador de puertos
Ya sabes. Estás en el trabajo, detrás de un agresivo firewall que no te permite usar el IRC. Y tú no puedes trabajar sin tu IRC, así que realizas una tramolla algo sucia para conectarte. Seteas un reenvío de puerto para que cuando se intenta conectar al puerto 6667 en la máquina local, sea llevado al puerto 80 (el cual está permitido en tu firewall) en tu hosted box en el mundo real. Entonces otro despachador escuchará en el puerto 80 de esa máquina y enviará las conexiones a través del puerto 6667 al servidor IRC. Luego seteas el cliente IRC para que se conecte a localhost y voilà, estás conectado! Veamos como POE puede ayudarte a perder tu empleo.Comenzaremos seteando un servidor que escuche conexiones:
my $office = shift;
my ($local_address, $local_port, $remote_address, $remote_port);
($office ? $remote_address : $local_address) = "mybox.real-world.int";
($office ? $local_port : $remote_port) = 6667;
($office ? $remote_port : $local_port) = 80;
if ($office) {
$local_address = "127.0.0.1";
} else {
$remote_address = "irc.perl.org";
}
POE::Session->create(
inline_states => {
_start => \&server_start,
client_connected => \&client_connected,
on_server_error => \&server_error
},
args =>
[ $local_address, $local_port, $remote_address, $remote_port ]
);
$poe_kernel->run;
Una vez que sabemos que despachamos desde la oficina a la máquina hosted o desde la máquina hosted a la oficina, seteamos varias direcciones y puertos en la forma apropiada y creamos una nueva sesión con esos parámetros. Esta sesión iniciará todas las sesiones que necesitemos. Como estamos tratando con tres partes que se unen en este reenvío de puertos vamos a necesitar 3 sesiones y 3 wheels.
Omitiremos el manejo de errores para hacer más clara la explicación. Además en caso de error es poco lo que se puede hacer, más que ignorar y esperar que la próxima conexión sea exitosa.
El primer wheel aparece en el estado start() y tiene que escuchar por el puerto y dirección adecuados, utilizaremos el SocketFactory wheel:
sub server_start {
my ( $heap, $local_addr, $local_port, $remote_addr, $remote_port )
= @_[ HEAP, ARG0, ARG1, ARG2, ARG3 ];
# Store our parameters
$heap->{local_addr} = $local_addr;
$heap->{local_port} = $local_port;
$heap->{remote_addr} = $remote_addr;
$heap->{remote_port} = $remote_port;
# Create and store a wheel
$heap->{server_wheel} = POE::Wheel::SocketFactory->new
( BindAddress => $local_addr,
BindPort => $local_port,
Reuse => 'yes',
SuccessEvent => 'client_connected',
FailureEvent => "on_server_error",
);
}
Cuando el wheel SocketFactory acepta una conexión y postea un evento cliente_connected(), pasa el socket y el peer address y el puerto así:
sub client_connected {
my ( $heap, $socket, $peer_addr, $peer_port ) =
@_[ HEAP, ARG0, ARG1, ARG2];
}
Ahora tenemos un servidor que escucha y acpeta conexiones, pero que hacemos una vez que acepta una? Por lo común, una aplicación no POE, probablemente haría un fork o un hilo para servir el requerimiento y luego volvería a escuchar por nuevas conexiones. En cambio en POE, creamos una nueva sesión para manejar al cliente. Recuerden que hemos almacenado los parámetros de nuestra conexión en la heap de la primera sesión, entonces podemos pasárselos a la nueva sesión:
sub accept {
my ( $heap, $socket, $peer_addr, $peer_port ) =
@_[ HEAP, ARG0, ARG1, ARG2];
POE::Session->new
( _start => \&forwarder_start,
server_connect => \&connected_to_other_side,
client_input => \&forward_outbound,
server_input => \&forward_inbound,
[ $socket, $peer_addr, $peer_port,
$heap->{remote_addr}, $heap->{remote_port} ]
);
}
Cuando esta sesión inicia necesita configurar la conexión con el destino y estar listo para leer y escribir datos desde el cliente. Hacemos esto pasando al cliente $socket que recibimos para nuestro segundo wheel, POE::Wheel::ReadWrite, un wheel genérico de I/O de POE. Igual que en un entorno no POE, nosotros reusamos el socket que tenemos para manejar la conexión como lo hacemos con un filehandle para leer y escribir archivos.
Paremos un momento para mirar el siguiente diagrama:
Hasta ahora nos hemos ocupado del cliente que ha contactado con nosotros; también queremos otro wheel para conectar con el servidor en el otro extremo del túnel de reenvío:
sub forwarder_start {
my ( $heap, $session,
$socket, $peer_host, $peer_port, $remote_addr, $remote_port
) =
@_[ HEAP, SESSION, ARG0, ARG1, ARG2, ARG3, ARG4 ];
($heap->{peer_host}, $heap->{peer_port},
$heap->{remote_addr}, $heap->{remote_port})=
($peer_host, $peer_port, $remote_addr, $remote_port);
$heap->{wheel_client} = POE::Wheel::ReadWrite->new
( Handle => $socket,
Filter => POE::Filter::Stream->new,
InputEvent => 'client_input',
);
$heap->{wheel_server} = POE::Wheel::SocketFactory->new
( RemoteAddress => $remote_addr,
RemotePort => $remote_port,
SuccessEvent => 'server_connect',
);
}
Hay un pequeño detalle; desde que intentamos ser lo más asincrónicos posible, tenemos que tener en cuenta el caso en que aún está estableciéndose la conexión con el servidor, pero ya tenemos datos enviados por el cliente. Agregaremos una cola para almacenar cualquier dato que tengamos antes de establecer la conexión:
$heap->{state} = 'connecting';
$heap->{queue} = [ ];
Ahora veamos qué sucede cuando los datos llegan del cliente. Si aún esperamos la conexión, los ponemos en cola. En otro caso los enviamos a través del otro wheel al servidor:
sub forward_outbound {
my ( $heap, $input ) = @_[ HEAP, ARG0 ];
if ( $heap->{state} eq 'connecting' ) {
push @{ $heap->{queue} }, $input;
}
else {
$heap->{wheel_server}->put($input);
}
}
Una vez que hemos seteado la conexión con el otro lado, necesitamos hacer lo mismo de nuevo y colocar el socket en nuestro tercer wheel, otro ReadWrite wheel.
sub connected_to_other_side {
my ( $kernel, $session, $heap, $socket ) = @_[ KERNEL, SESSION,
HEAP, ARG0
];
$heap->{wheel_server} = POE::Wheel::ReadWrite->new
( Handle => $socket,
Driver => POE::Driver::SysRW->new,
Filter => POE::Filter::Stream->new,
InputEvent => 'server_input',
);
}
Ahora podemos desencolar la cola en caso de que tenga datos pendientes:
$heap->{state} = 'connected';
foreach my $pending ( @{ $heap->{queue} } ) {
$kernel->call( $session, 'client_input', $pending );
}
$heap->{queue} = [ ];
Por cada porción de datos que recibimos, posteamos los datos de regreso al evento client_input; sin embargo, esta vez no seguimos conectados, y el evento pasa los datos al servidor.
Finalmente, necesitamos mover los datos recibidos desde el servidor hacia el tunel al cliente, completando la función forward_inbound:
my ( $heap, $input ) = @_[ HEAP, ARG0 ];
$heap->{wheel_client}->put($input);
Miremos el diagrama final del despachador entero antes de pensar en cómo hacerlo más simple:
No hay comentarios:
Publicar un comentario