/[meteor]/googlecode.com/svn/trunk/Meteor/Connection.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /googlecode.com/svn/trunk/Meteor/Connection.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 9 by andrew.betts, Fri Dec 8 16:52:58 2006 UTC revision 64 by andrew.betts, Mon Jan 19 11:19:41 2009 UTC
# Line 1  Line 1 
 #!/usr/bin/perl -w  
 ###############################################################################  
 #   Meteor  
 #   An HTTP server for the 2.0 web  
 #   Copyright (c) 2006 contributing authors  
 #  
 #   Subscriber.pm  
 #  
 #       Description:  
 #       Common super-class for controller and subscriber  
 #  
 ###############################################################################  
 #  
 #   This program is free software; you can redistribute it and/or modify it  
 #   under the terms of the GNU General Public License as published by the Free  
 #   Software Foundation; either version 2 of the License, or (at your option)  
 #   any later version.  
 #  
 #   This program is distributed in the hope that it will be useful, but WITHOUT  
 #   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or  
 #   FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for  
 #   more details.  
 #  
 #   You should have received a copy of the GNU General Public License along  
 #   with this program; if not, write to the Free Software Foundation, Inc.,  
 #   59 Temple Place, Suite 330, Boston, MA 02111-1307 USA  
 #  
 #   For more information visit www.meteorserver.org  
 #  
 ###############################################################################  
   
 package Meteor::Connection;  
 ###############################################################################  
 # Configuration  
 ###############################################################################  
           
         use strict;  
           
         use Errno qw(EAGAIN);  
           
         our $MAX_READ_SIZE=8192;  
         our $CONNECTION_WRITE_TIMEOUT=120;  
           
         our @Connections=();  
   
 ###############################################################################  
 # Class methods  
 ###############################################################################  
 sub addAllHandleBits {  
         my $class=shift;  
           
         my $rVecRef=shift;  
         my $wVecRef=shift;  
         my $eVecRef=shift;  
           
         map {$_->addHandleBits($rVecRef,$wVecRef,$eVecRef)} @Connections;  
 }  
   
 sub checkAllHandleBits {  
         my $class=shift;  
           
         my $rVec=shift;  
         my $wVec=shift;  
         my $eVec=shift;  
           
         map {$_->checkHandleBits($rVec,$wVec,$eVec)} @Connections;  
 }  
   
 sub connectionCount {  
         scalar(@Connections);  
 }  
   
 sub closeAllConnections {  
         my @cons=@Connections;  
           
         map { $_->close(); } @cons;  
 }  
   
 ###############################################################################  
 # Factory methods  
 ###############################################################################  
 sub new {  
         #  
         # Create a new empty instance  
         #  
         my $class=shift;  
           
         my $obj={};  
           
         bless($obj,$class);  
 }  
   
 sub newFromServer {  
         #  
         # new instance from new server connection  
         #  
         my $self=shift->new();  
           
         my $server=shift;  
         my $socket=$server->conSocket();  
           
         $self->{'socket'}=$socket;        
         $self->{'socketFN'}=$socket->fileno();  
           
         $socket->setNonBlocking();  
           
         $self->{'writeBuffer'}='';  
         $self->{'readBuffer'}='';  
           
         push(@Connections,$self);  
           
         &::syslog('debug',"New %s for %s",ref($self),$socket->{'connection'}->{'remoteIP'});  
           
         $self;  
 }  
   
 ###############################################################################  
 # Instance methods  
 ###############################################################################  
 sub write {  
         my $self=shift;  
           
         $self->{'writeBuffer'}.=shift;  
         $self->{'writeBufferTimestamp'}=time unless(exists($self->{'writeBufferTimestamp'}));  
 }  
   
 sub addHandleBits {  
         my $self=shift;  
           
         my $rVecRef=shift;  
         my $wVecRef=shift;  
         my $eVecRef=shift;  
           
         my $fno=$self->{'socketFN'};  
           
         if($self->{'writeBuffer'} ne '')  
         {  
                 if(exists($self->{'writeBufferTimestamp'}) && $self->{'writeBufferTimestamp'}+$CONNECTION_WRITE_TIMEOUT<time)  
                 {  
                         &::syslog('debug',"%s for %s: write timed out",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});  
                           
                         $self->{'writeBuffer'}='';  
                         $self->close();  
                         return;  
                 }  
                 vec($$wVecRef,$fno,1)=1;  
         }  
   
         vec($$rVecRef,$fno,1)=1;  
         vec($$eVecRef,$fno,1)=1;  
 }  
   
 sub checkHandleBits {  
         my $self=shift;  
           
         my $rVec=shift;  
         my $wVec=shift;  
         my $eVec=shift;  
           
         my $fno=$self->{'socketFN'};  
           
         if(vec($eVec,$fno,1))  
         {  
                 #  
                 # Something went wrong!  
                 #  
                 $self->exceptionReceived();  
                   
                 return;  
         }  
           
         if(vec($rVec,$fno,1))  
         {  
                 #  
                 # Data available for read  
                 #  
                 my $socket=$self->{'socket'};  
                   
                 my $buffer='';  
                 my $bytesRead=sysread($socket->{'handle'},$buffer,$MAX_READ_SIZE);  
                 if(defined($bytesRead) && $bytesRead>0)  
                 {  
                         $self->{'readBuffer'}.=$buffer;  
                         while($self->{'readBuffer'}=~s/^([^\r\n]*)\r?\n//)  
                         {  
                                 $self->processLine($1);  
                         }  
                 }  
                 elsif(defined($bytesRead) && $bytesRead==0)  
                 {  
                         # Connection closed  
                         $self->{'remoteClosed'}=1;  
                         $self->close();  
                           
                         return;  
                 }  
                 else  
                 {  
                         unless(${!}==EAGAIN)  
                         {  
                                 &::syslog('notice',"Connection closed: $!");  
                                 $self->{'remoteClosed'}=1;  
                                 $self->close();  
                                   
                                 return;  
                         }  
                 }  
         }  
           
         if(vec($wVec,$fno,1) && $self->{'writeBuffer'} ne '')  
         {  
                 #  
                 # Can write  
                 #  
                 my $socket=$self->{'socket'};  
                   
                 my $bytesWritten=syswrite($socket->{'handle'},$self->{'writeBuffer'});  
                   
                 if(defined($bytesWritten) && $bytesWritten>0)  
                 {  
                         $self->{'writeBuffer'}=substr($self->{'writeBuffer'},$bytesWritten);  
                         if(length($self->{'writeBuffer'})==0)  
                         {  
                                 delete($self->{'writeBufferTimestamp'});  
                                 $self->close() if(exists($self->{'autoClose'}));  
                         }  
                         else  
                         {  
                                 $self->{'writeBufferTimestamp'}=time;  
                         }  
                 }  
                 else  
                 {  
                         unless(${!}==EAGAIN)  
                         {  
                                 &::syslog('notice',"Connection closed: $!");  
                                 $self->{'remoteClosed'}=1;  
                                 $self->close();  
                                   
                                 return;  
                         }  
                 }  
         }  
 }  
   
 sub exceptionReceived {  
         my $self=shift;  
           
         $self->{'writeBuffer'}='';  
           
         $self->close();  
 }  
   
 sub close {  
         my $self=shift;  
           
         #&::syslog('debug',"Close called for %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});  
           
         unless($self->{'remoteClosed'})  
         {  
                 if(!exists($self->{'autoClose'}) && length($self->{'writeBuffer'})>0)  
                 {  
                         $self->{'autoClose'}=1;  
                   
                         &::syslog('debug',"Will close %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});  
                   
                         return;  
                 }  
         }  
           
         eval {  
                 $self->{'socket'}->close();  
         };  
           
         #  
         # Remove connection from list of connections  
         #  
         my $idx=undef;  
         for(my $i=0;$i<scalar(@Connections);$i++)  
         {  
                 if($Connections[$i]==$self)  
                 {  
                         $idx=$i;  
                         last;  
                 }  
         }  
           
         if(defined($idx))  
         {  
                 splice(@Connections,$idx,1);  
         }  
           
         &::syslog('debug',"Closed %s for %s",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});  
 }  
   
 1;  
 ############################################################################EOF  
1    #!/usr/bin/perl -w
2    ###############################################################################
3    #   Meteor
4    #   An HTTP server for the 2.0 web
5    #   Copyright (c) 2006 contributing authors
6    #
7    #   Subscriber.pm
8    #
9    #       Description:
10    #       Common super-class for controller and subscriber
11    #
12    ###############################################################################
13    #
14    #   This program is free software; you can redistribute it and/or modify it
15    #   under the terms of the GNU General Public License as published by the Free
16    #   Software Foundation; either version 2 of the License, or (at your option)
17    #   any later version.
18    #
19    #   This program is distributed in the hope that it will be useful, but WITHOUT
20    #   ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
21    #   FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
22    #   more details.
23    #
24    #   You should have received a copy of the GNU General Public License along
25    #   with this program; if not, write to the Free Software Foundation, Inc.,
26    #   59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27    #
28    #   For more information visit www.meteorserver.org
29    #
30    ###############################################################################
31    
32    package Meteor::Connection;
33    ###############################################################################
34    # Configuration
35    ###############################################################################
36            
37            use strict;
38            
39            use Errno qw(EAGAIN);
40            
41            our $MAX_READ_SIZE=8192;
42            our $CONNECTION_WRITE_TIMEOUT=120;
43            
44            our @Connections=();
45    
46    ###############################################################################
47    # Class methods
48    ###############################################################################
49    sub addAllHandleBits {
50            my $class=shift;
51            
52            my $rVecRef=shift;
53            my $wVecRef=shift;
54            my $eVecRef=shift;
55            my @cons=@Connections;
56            map {$_->addHandleBits($rVecRef,$wVecRef,$eVecRef) if(defined($_)) } @cons;
57    }
58    
59    sub checkAllHandleBits {
60            my $class=shift;
61            
62            my $rVec=shift;
63            my $wVec=shift;
64            my $eVec=shift;
65            
66            my @cons=@Connections;
67            map {$_->checkHandleBits($rVec,$wVec,$eVec) if(defined($_)) } @cons;
68    }
69    
70    sub connectionCount {
71            scalar(@Connections);
72    }
73    
74    sub closeAllConnections {
75            my @cons=@Connections;
76            
77            map { $_->close(); } @cons;
78    }
79    
80    sub listConnections {
81            my $class=shift;
82            my $list='';
83            foreach my $conn (@Connections) {
84                    $list .= $conn->{'socketFN'}.' '.$conn->{'ip'}.' '.$conn->{'type'};
85                    if (exists($conn->{'subscriberID'})) {
86                            $list .= ' '.$conn->{'subscriberID'};
87                    }
88                    $list .= "$::CRLF";
89            }
90            $list;
91    }
92    
93    sub describeConnWithFileNum {
94            my $class=shift;
95            my $filenum=shift;
96            foreach my $conn (@Connections) {
97                    if ($conn->{'socketFN'}==$filenum) {
98                            my $ret = "";
99                            if (exists($conn->{'socketFN'})) {
100                                    $ret .= "socketFN: ".$conn->{'socketFN'}."$::CRLF";
101                            }
102                            if (exists($conn->{'connectionStart'})) {
103                                    $ret .= "connectionStart: ".$conn->{'connectionStart'}."$::CRLF";
104                            }
105                            if (exists($conn->{'writeBuffer'})) {
106                                    $ret .= "writeBuffer: ".$conn->{'writeBuffer'}."$::CRLF";
107                            }
108                            if (exists($conn->{'readBuffer'})) {
109                                    $ret .= "readBuffer: ".$conn->{'readBuffer'}."$::CRLF";
110                            }
111                            if (exists($conn->{'bytesWritten'})) {
112                                    $ret .= "bytesWritten: ".$conn->{'bytesWritten'}."$::CRLF";
113                            }
114                            if (exists($conn->{'type'})) {
115                                    $ret .= "type: ".$conn->{'type'}."$::CRLF";
116                            }
117                            if (exists($conn->{'mode'})) {
118                                    $ret .= "mode: ".$conn->{'mode'}."$::CRLF";
119                            }
120                            if (exists($conn->{'ip'})) {
121                                    $ret .= "ip: ".$conn->{'ip'}."$::CRLF";
122                            }
123                            if (exists($conn->{'headerBuffer'})) {
124                                    $ret .= "headerBuffer: ".$conn->{'headerBuffer'}."$::CRLF";
125                            }
126                            if (exists($conn->{'messageCount'})) {
127                                    $ret .= "messageCount: ".$conn->{'messageCount'}."$::CRLF";
128                            }
129                            if (exists($conn->{'connectionStart'})) {
130                                    $ret .= "age: ".(time-$conn->{'connectionStart'})."$::CRLF";
131                            }
132                            if (exists($conn->{'connectionTimeLimit'})) {
133                                    $ret .= "connectionTimeLimit: ".$conn->{'connectionTimeLimit'}."$::CRLF";
134                            }
135                            if (exists($conn->{'useragent'})) {
136                                    $ret .= "useragent: ".$conn->{'useragent'}."$::CRLF";
137                            }
138                            if (exists($conn->{'subscriberID'})) {
139                                    $ret .= "subscriberID: ".$conn->{'subscriberID'}."$::CRLF";
140                            }
141                            return $ret;
142                    }
143            }
144            return -1;
145    }
146    
147    sub destroyBadRequests {
148            foreach my $conn (@Connections) {
149                    if (time-$conn->{'connectionStart'} > 30 && !$conn->{'autoClose'} && !exists($conn->{'subscriberID'}) && $conn->{'type'} eq 'Meteor::Subscriber') {
150                            &::syslog('debug',"Closing misbehaving subscriber %s",$conn->{'socketFN'});
151                            $conn->close();
152                    }
153            }
154    }
155    
156    ###############################################################################
157    # Factory methods
158    ###############################################################################
159    sub new {
160            #
161            # Create a new empty instance
162            #
163            my $class=shift;
164            
165            my $obj={};
166            
167            bless($obj,$class);
168    }
169    
170    sub newFromServer {
171            #
172            # new instance from new server connection
173            #
174            my $self=shift->new();
175            
176            $::Statistics->{'total_requests'}++;
177            
178            my $server=shift;
179            my $socket=$server->conSocket();
180            
181            $self->{'socket'}=$socket;      
182            $self->{'socketFN'}=$socket->fileno();
183            $self->{'connectionStart'}=time;
184            
185            $socket->setNonBlocking();
186            
187            $self->{'writeBuffer'}='';
188            $self->{'readBuffer'}='';
189            $self->{'bytesWritten'}=0;
190            $self->{'type'}=ref($self);
191            $self->{'ip'}=$socket->{'connection'}->{'remoteIP'};
192            
193            push(@Connections,$self);
194            
195            &::syslog('debug',"New %s for %s using file number %s",ref($self),$self->{'ip'},$self->{'socketFN'});
196            
197            $self;
198    }
199    
200    ###############################################################################
201    # Instance methods
202    ###############################################################################
203    sub write {
204            my $self=shift;
205            
206            $self->{'writeBuffer'}.=shift;
207            $self->{'writeBufferTimestamp'}=time unless(exists($self->{'writeBufferTimestamp'}));
208    }
209    
210    sub addHandleBits {
211            my $self=shift;
212            
213            my $rVecRef=shift;
214            my $wVecRef=shift;
215            my $eVecRef=shift;
216            
217            my $fno=$self->{'socketFN'};
218            
219            if($self->{'writeBuffer'} ne '')
220            {
221                    if(exists($self->{'writeBufferTimestamp'}) && $self->{'writeBufferTimestamp'}+$CONNECTION_WRITE_TIMEOUT<time)
222                    {
223                            &::syslog('debug',"%s for %s: write timed out",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
224                            
225                            $self->{'writeBuffer'}='';
226                            $self->close();
227                            return;
228                    }
229                    vec($$wVecRef,$fno,1)=1;
230            }
231    
232            vec($$rVecRef,$fno,1)=1;
233            vec($$eVecRef,$fno,1)=1;
234    }
235    
236    sub checkHandleBits {
237            my $self=shift;
238            
239            my $rVec=shift;
240            my $wVec=shift;
241            my $eVec=shift;
242            
243            my $fno=$self->{'socketFN'};
244            
245            if(vec($eVec,$fno,1))
246            {
247                    #
248                    # Something went wrong!
249                    #
250                    $self->exceptionReceived();
251                    
252                    return;
253            }
254            
255            if(vec($rVec,$fno,1))
256            {
257                    #
258                    # Data available for read
259                    #
260                    my $socket=$self->{'socket'};
261                    
262                    my $buffer='';
263                    my $bytesRead=sysread($socket->{'handle'},$buffer,$MAX_READ_SIZE);
264                    if(defined($bytesRead) && $bytesRead>0)
265                    {
266                            $::Statistics->{'total_inbound_bytes'}+=$bytesRead;
267                            $self->{'readBuffer'}.=$buffer;
268                            while($self->{'readBuffer'}=~s/^([^\r\n]*)\r?\n//)
269                            {
270                                    $self->processLine($1);
271                            }
272                    }
273                    elsif(defined($bytesRead) && $bytesRead==0)
274                    {
275                            # Connection closed
276                            $self->{'remoteClosed'}=1;
277                            $self->close(1, 'remoteClosed');
278                            
279                            return;
280                    }
281                    else
282                    {
283                            unless(${!}==EAGAIN)
284                            {
285                                    &::syslog('notice',"Connection closed: $!");
286                                    $self->{'remoteClosed'}=1;
287                                    $self->close(1, 'remoteClosed');
288                                    
289                                    return;
290                            }
291                    }
292            }
293            
294            if(vec($wVec,$fno,1) && $self->{'writeBuffer'} ne '')
295            {
296                    #
297                    # Can write
298                    #
299                    my $socket=$self->{'socket'};
300                    
301                    my $bytesWritten=syswrite($socket->{'handle'},$self->{'writeBuffer'});
302                    
303                    if(defined($bytesWritten) && $bytesWritten>0)
304                    {
305                            $::Statistics->{'total_outbound_bytes'}+=$bytesWritten;
306                            $self->{'bytesWritten'}+=$bytesWritten;
307                            $self->{'writeBuffer'}=substr($self->{'writeBuffer'},$bytesWritten);
308                            if(length($self->{'writeBuffer'})==0)
309                            {
310                                    delete($self->{'writeBufferTimestamp'});
311                                    $self->close(1) if(exists($self->{'autoClose'}));
312                            }
313                            else
314                            {
315                                    $self->{'writeBufferTimestamp'}=time;
316                            }
317                    }
318                    else
319                    {
320                            unless(${!}==EAGAIN)
321                            {
322                                    &::syslog('notice',"Connection closed: $!");
323                                    $self->{'remoteClosed'}=1;
324                                    $self->close(1, 'remoteClosed');
325                                    
326                                    return;
327                            }
328                    }
329            }
330    }
331    
332    sub exceptionReceived {
333            my $self=shift;
334            
335            $self->{'writeBuffer'}='';
336            
337            $self->close();
338    }
339    
340    sub close {
341            my $self=shift;
342            
343            #&::syslog('debug',"Close called for %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
344            
345            unless($self->{'remoteClosed'})
346            {
347                    if(!exists($self->{'autoClose'}) && length($self->{'writeBuffer'})>0)
348                    {
349                            $self->{'autoClose'}=1;
350                    
351                            &::syslog('debug',"Will close %s for %s when write buffer empty",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
352                    
353                            return;
354                    }
355            }
356            
357            eval {
358                    $self->{'socket'}->close();
359            };
360            
361            #
362            # Remove connection from list of connections
363            #
364            my $idx=undef;
365            my $numcon = scalar(@Connections);
366            for(my $i=0;$i<$numcon;$i++)
367            {
368                    if($Connections[$i]==$self)
369                    {
370                            $idx=$i;
371                            last;
372                    }
373            }
374            
375            if(defined($idx))
376            {
377                    splice(@Connections,$idx,1);
378            }
379            
380            &::syslog('debug',"Closed %s for %s",ref($self),$self->{'socket'}->{'connection'}->{'remoteIP'});
381            
382            $self->didClose();
383    }
384    
385    sub didClose {
386    }
387    
388    1;
389    ############################################################################EOF

Legend:
Removed from v.9  
changed lines
  Added in v.64

  ViewVC Help
Powered by ViewVC 1.1.26