Skip to content

Instantly share code, notes, and snippets.

@Logioniz
Created January 28, 2017 01:51
Show Gist options
  • Save Logioniz/444a9f84a577c8a45f7314beaf5f261d to your computer and use it in GitHub Desktop.
Save Logioniz/444a9f84a577c8a45f7314beaf5f261d to your computer and use it in GitHub Desktop.
Example of ioloop (based on Mojo::IOLoop)
#!/usr/bin/perl
package Reactor::Select;
use Mojo::Base -base;
use IO::Select;
sub new {
my $self = shift->SUPER::new;
$self->{select}{read} = IO::Select->new;
$self->{select}{write} = IO::Select->new;
return $self;
}
sub add {
my ($self, $fd, $read, $write, $cb) = @_;
return unless $read || $write;
my $params = {cb => $cb};
if ($read) {
$self->{select}{read}->add($fd);
$params->{read} = 1;
}
if ($write) {
$self->{select}{write}->add($fd);
$params->{write} = 1;
}
$self->{select}{fd}{$fd} = $params;
}
sub remove {
my ($self, $fd) = @_;
my $params = $self->{select}{fd}{$fd};
$self->{select}{read}->remove($fd) if $params->{read};
$self->{select}{write}->remove($fd) if $params->{write};
delete $self->{select}{fd}{$fd};
}
sub watch {
my ($self, $fd, $read, $write) = @_;
my $params = $self->{select}{fd}{$fd};
if ($read && !$params->{read}) {
$self->{select}{read}->add($fd);
$params->{read} = 1;
}
if (!$read && $params->{read}) {
$self->{select}{read}->remove($fd);
delete $params->{read};
}
if ($write && !$params->{write}) {
$self->{select}{write}->add($fd);
$params->{write} = 1;
}
if (!$write && $params->{write}) {
$self->{select}{write}->remove($fd);
delete $params->{write};
}
return $self;
}
sub wait {
my ($self, $timeout) = @_;
my ($r, $w, $e) = IO::Select::select($self->{select}{read}, $self->{select}{write}, undef, $timeout);
for my $fh (@$r) {
my $cb = $self->{select}{fd}{$fh}{cb};
$self->$cb($fh, 'read');
}
for my $fh (@$w) {
my $cb = $self->{select}{fd}{$fh}{cb};
$self->$cb($fh, 'write');
}
}
package IOLoop;
use Mojo::Base -base;
use Time::HiRes 'time';
use List::Util 'min';
has 'reactor';
has 'timers';
sub new {
my $self = shift->SUPER::new;
$self->reactor(Reactor::Select->new);
$self->timers({});
$self->{delay} = Delay->new(steps => [])->ioloop($self);
return $self;
}
sub add {
my $cb = pop;
my $self = shift;
$self->reactor->add(@_, sub {
my $reactor = shift;
$self->$cb(@_);
});
return $self;
}
sub remove {
my $self = shift;
$self->reactor->remove(@_);
return $self;
}
sub watch {
my $self = shift;
$self->reactor->watch(@_);
return $self;
}
sub wait {
my $self = shift;
++$self->{running};
$self->one_tick while $self->{running};
}
sub one_tick {
my $self = shift;
my $min_time = min map { $_->{time} } values %{$self->timers};
my $timeout = defined $min_time ? $min_time - time : 0.5;
$timeout = $timeout <= 0 ? 0 : $timeout;
$self->reactor->wait($timeout);
my $now = time;
for my $timer_id (keys %{$self->timers}) {
my ($time, $cb) = @{$self->timers->{$timer_id}}{qw/time cb/};
next if $time >= $now;
delete $self->timers->{$timer_id};
$self->$cb();
}
}
sub start {
my $self = shift;
$self->wait unless $self->{running};
}
sub stop {
delete shift->{running};
}
sub timer {
my ($self, $seconds, $cb) = @_;
my $now = time;
my $timer_id = $$ . rand . $now;
$self->timers->{$timer_id} = {
time => $seconds + $now,
cb => $cb
};
}
sub next_tick {
shift->timer(0, @_);
}
sub is_running {
shift->{running};
}
sub singleton {
state $ioloop = shift->new(@_);
}
package EventEmitter;
use Mojo::Base -base;
sub on {
shift->subscribe(@_);
}
sub subscribe {
my ($self, $event, $cb) = @_;
push @{$self->{event}{$event}}, $cb;
return $self;
}
sub unsubscribe {
my ($self, $event, $cb) = @_;
return delete $self->{event}{$event} unless $cb;
my @subscribers = grep { $_ ne $cb } @{$self->{event}{$event}};
$self->{event}{$event} = \@subscribers;
return $self;
}
sub emit {
my ($self, $event) = (shift, shift);
for my $cb (@{$self->{event}{$event} // []}) {
$self->$cb(@_);
}
return $self;
}
package Delay;
use Mojo::Base 'EventEmitter';
has 'ioloop';
sub new {
my $self = shift->SUPER::new(@_);
$self->{steps_state} = [map { {count => 0, action => 'all', args => []} } @{$self->{steps} // []}];
@$self{qw/current_step data/} = (0, {});
return $self;
}
sub begin {
my $self = shift;
my $step_id = $self->{current_step};
my $operation_id = $self->{steps_state}[$step_id]{count}++;
return sub {
$self->_next($step_id, $operation_id, \@_);
}
}
sub wait {
my $self = shift;
return $self if $self->ioloop->is_running;
$self->on(finish => sub { $self->ioloop->stop });
$self->ioloop->start;
return $self;
}
sub perform {
my $self = shift;
$self->ioloop->next_tick(sub { $self->_run_first });
return $self;
}
sub run {
my $self = shift->new(steps => \@_);
$self->ioloop(IOLoop->singleton);
$self->perform->wait;
}
sub add_step {
my ($self, $cb) = @_;
push @{$self->{steps}}, $cb;
push @{$self->{steps_state}}, {count => 0, action => 'all', args => []};
return $self;
}
sub data {
my $self = shift;
return $self->{data}{shift()} unless @_ > 1;
$self->{data}{$_[0]} = $_[1];
return $self;
}
sub race {
my $self = shift;
@{$self->{steps_state}[$self->{current_step}]}{qw/action timeout/} = ('race', $_[1]);
return $self;
}
sub _run_first {
my $self = shift;
my $cb = shift @{$self->{steps}};
$self->$cb();
}
sub _next {
my ($self, $step_id, $operation_id, $params) = @_;
return unless $step_id == $self->{current_step};
my $state = $self->{steps_state}[$step_id];
--$state->{count};
if ($state->{action} eq 'all') {
$state->{args}[$operation_id] = $params;
return if $state->{count} > 0;
} else {
$state->{args}[0] = [$operation_id, @$params];
}
$self->{current_step}++;
my $cb = shift @{$self->{steps}};
#my $cb = $self->{steps}->[$self->{current_step}++];
$self->$cb(map { @$_ } @{delete $state->{args}});
$self->emit('finish') unless @{$self->{steps}};
}
package Stream;
use Mojo::Base 'EventEmitter';
use Errno qw(EAGAIN ECONNRESET EINTR EWOULDBLOCK);
has 'ioloop';
has 'handle';
sub new {
my $self = shift->SUPER::new(@_);
$self->{ioloop}->add($self->{handle}, 1, 1, sub { pop() eq 'write' ? $self->_write : $self->_read });
$self->{buffer} = '';
return $self;
}
sub close {
my $self = shift;
my $handle = delete $self->{handle};
$handle->close;
$self->ioloop->remove($handle);
$self->emit('close');
}
sub write {
my ($self, $chunk, $cb) = @_;
$self->{buffer} .= $chunk;
if ($cb) {
$self->on(drain => $cb);
} elsif (!length $self->{buffer}) {
return $self;
}
$self->ioloop->watch($self->{handle}, 1, 1) if $self->{handle};
return $self;
}
sub _write {
my $self = shift;
my $handle = $self->handle;
if (length $self->{buffer}) {
return unless defined(my $written = $handle->syswrite($self->{buffer}));
$self->emit(write => substr($self->{buffer}, 0, $written, ''));
}
$self->emit('drain')->unsubscribe('drain') unless length $self->{buffer};
return if length $self->{buffer};
$self->ioloop->watch($handle, 1, 0) if $self->{handle};
}
sub _read {
my $self = shift;
my $handle = $self->handle;
my $read = $self->{handle}->sysread(my $buffer, 100_000, 0);
return $read == 0 ? $self->close : $self->emit(read => $buffer) if defined $read;
# Retry
return if $! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK;
# Closed (maybe real error)
$! == ECONNRESET ? $self->close : $self->emit(error => $!)->close;
}
package Server;
use Mojo::Base 'EventEmitter';
use IO::Socket::IP;
has 'ioloop';
has 'handle';
sub listen {
my ($self, %options) = @_;
$self->handle(my $handle = IO::Socket::IP->new(%options));
$handle->blocking(0);
return $self;
}
sub start {
my $self = shift;
$self->ioloop->add($self->handle, 1, 0, sub { $self->_accept(@_) });
return $self;
}
sub stop {
my $self = shift;
$self->ioloop->remove($self->handle);
return $self;
}
sub _accept {
my ($self, $ioloop, $server_fd, $method) = @_;
return unless my $client_fd = $server_fd->accept;
$client_fd->blocking(0);
$self->emit(accept => $client_fd);
}
package Client;
use Mojo::Base 'EventEmitter';
use Errno 'EINPROGRESS';
use IO::Socket::IP;
has 'ioloop';
has 'handle';
sub connect {
my ($self, %options) = @_;
$self->handle(my $handle = IO::Socket::IP->new(%options, Blocking => 0));
$handle->blocking(0);
$self->ioloop->add($handle, 0, 1, sub { $self->_connect(@_) });
return $self;
}
sub _connect {
my ($self, $ioloop, $server_fd, $method) = @_;
# need to re-add $server_handle
if ($server_fd->isa('IO::Socket::IP') && !$server_fd->connect) {
return $self->emit(error => $!) unless $! == EINPROGRESS;
$self->ioloop->remove($server_fd);
$self->ioloop->add($server_fd, 1, 0, sub { $self->_connect(@_) });
return;
}
return $self->emit(error => $! || 'Not connected') unless $server_fd->connected;
$self->ioloop->remove($server_fd);
$self->emit(connect => $server_fd);
}
package main;
use Mojo::Base -strict;
use IO::Socket::IP;
use DDP;
my $ioloop = IOLoop->singleton;
my $clients_data;
# my $sigint_count = 0;
# local $SIG{INT} = sub { warn "sigint $$"; die "sigint" if ++$sigint_count == 3; };
sub server {
Server->new(ioloop => $ioloop)
->listen(Listen => 1, LocalPort => 8080, ReuseAddr => 1)
->on(accept => sub {
my ($server, $client_fd) = @_;
warn "ACCEPT\n\n";
my $stream = Stream
->new(ioloop => $server->ioloop, handle => $client_fd)
->on(read => sub {
my ($stream, $data) = @_;
warn "Server READ: $client_fd\n\n";
if ($data !~ m/\r?\n\r?\n/m) {
$clients_data->{$client_fd} = ($clients_data->{$client_fd} // '') . $data;
return;
}
my $client_data_all = ($clients_data->{$client_fd} // '') . $data;
delete $clients_data->{$client_fd};
warn "$client_data_all\n\n";
my $time;
Delay->run(
sub {
$time = time;
my $d = shift;
$ioloop->timer(1 => $d->begin);
$ioloop->timer(2 => $d->begin);
f_4_sec($d->begin);
}, sub {
warn 'Time in seconds from 1 -> 2: ', time - $time, "\n\n";
p @_;
my $d = shift;
$stream->write("HTTP/1.1 200 OK\nContent-Length: 13\n\nHello, world\n" => sub {});
$time = time;
$ioloop->timer(3 => $d->begin);
f_1_sec($d->begin);
$d->race;
}, sub {
warn 'Time in seconds from 2 -> 3: ', time - $time, "\n\n";
}
)
->on(finish => sub {
warn 'Delay FINISH';
});
})
->on(write => sub {
warn "Server WRITE\n\n";
})
->on(close => sub {
warn "Client close socket: $client_fd\n\n";
})
})
->start
->ioloop->start;
# Server->new(ioloop => $ioloop)
# ->listen(Listen => 1, LocalPort => 8080, ReuseAddr => 1)
# ->on(accept => sub {
# my ($server, $client_fd) = @_;
# warn "ACCEPT\n\n";
# $server->ioloop->add($client_fd, 1, 0, sub {
# my ($ioloop, $client_fd, $method) = @_;
# $client_fd->recv(my $data, 1000);
# if (length $data == 0) {
# warn "Client close socket: $client_fd\n\n";
# $ioloop->remove($client_fd);
# delete $clients_data->{$client_fd};
# return;
# }
# warn "Server READ: $client_fd\n\n";
# if ($data !~ m/\r?\n\r?\n/m) {
# $clients_data->{$client_fd} = ($clients_data->{$client_fd} // '') . $data;
# return;
# }
# my $client_data_all = ($clients_data->{$client_fd} // '') . $data;
# delete $clients_data->{$client_fd};
# warn "$client_data_all\n\n";
# my $time;
# Delay->run(
# sub {
# $time = time;
# my $d = shift;
# $ioloop->timer(1 => $d->begin);
# $ioloop->timer(2 => $d->begin);
# f_4_sec($d->begin);
# }, sub {
# warn 'Time in seconds from 1 -> 2: ', time - $time, "\n\n";
# p @_;
# my $d = shift;
# $client_fd->send("HTTP/1.1 200 OK\nContent-Length: 13\n\nHello, world\n");
# $time = time;
# $ioloop->timer(3 => $d->begin);
# f_1_sec($d->begin);
# $d->race;
# }, sub {
# warn 'Time in seconds from 2 -> 3: ', time - $time, "\n\n";
# }
# );
# # $client_fd->close;
# # $ioloop->remove($client_fd);
# });
# })
# ->start
# ->ioloop->start;
}
sub client {
# Client->new(ioloop => $ioloop)
# ->on(connect => sub {
# my ($client, $server_fd) = @_;
# warn "CONNECT\n\n";
# $server_fd->send("GET / HTTP/1.0\r\nHost: www.google.ru\r\n\r\n");
# $client->ioloop->add($server_fd, 1, 0, sub {
# my ($ioloop, $server_fd, $method) = @_;
# $server_fd->recv(my $data, 1000);
# if (length $data == 0) {
# warn "Server close socket: $server_fd\n\n";
# $ioloop->remove($server_fd);
# return;
# }
# warn "Client READ: $server_fd\n\n";
# warn "$data\n\n";
# });
# })
# ->on(error => sub {
# p @_;
# })
# ->connect(PeerHost => 'www.google.ru', PeerPort => '80')
# ->ioloop->start;
Client->new(ioloop => $ioloop)
->on(connect => sub {
my ($client, $server_fd) = @_;
warn "CONNECT\n\n";
my $stream = Stream
->new(ioloop => $client->ioloop, handle => $server_fd)
->on(read => sub {
warn "CLIENT READ\n\n";
warn $_[1] . "\n\n";
})
->on(write => sub {
warn "CLIENT WRITE\n\n";
})
->write(
"POST / HTTP/1.0\r\nHost: www.google.ru\r\nContent-Length: 100000\r\n\r\n" . ('1' x 10_000)
=> sub {
warn "CLIENT DRAIN\n\n";
});
#$ioloop->{stream} = $stream;
})
->on(error => sub {
p @_;
})
->connect(PeerHost => '127.0.0.1', PeerPort => '12345')
#->connect(PeerHost => 'www.google.ru', PeerPort => '80')
->ioloop->start;
}
sub f_4_sec {
my $cb = shift;
Delay->run(
sub {
my $d = shift;
IOLoop->singleton->timer(3 => $d->begin);
$d->data(param => 'my params');
}, sub {
my $d = shift;
IOLoop->singleton->timer(1 => $d->begin);
IOLoop->singleton->timer(3 => $d->begin);
$d->race;
}, sub {
my $d = shift;
$cb->($d->data('param'), '123');
}
);
}
sub f_1_sec {
my $cb = shift;
Delay->run(
sub {
IOLoop->singleton->timer(1 => shift->begin);
}, sub {
$cb->('my params', 343);
}
);
}
# my $handle = IO::Socket::IP->new(PeerHost => 'www.google.ru', PeerPort => '80');
# warn 111;
# $handle->connect();
# warn 222;
# $handle->send("GET / HTTP/1.0\r\nHost: www.google.ru\r\n\r\n");
# warn 333;
# $handle->recv(my $data, 1000);
# warn $data;
#client();
server();
__END__
History
1) Create reactor (new, add, wait), Create ioloop (new, add, wait, start, stop)
2) Add reactor remove, Add ioloop remove
3) Add ioloop timer
4) Add ioloop next_tick, is_running, singleton, Add Delay
5) Add server
6) Add client
7) Add Delay
8) Add EventEmitter
9) Add stream
10) Add one_tick for IOLoop
TODO
1) Add for method race in delay timeout
2) SSL support
3) Check on memory leak (i have NO weaken in my code :)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment