/[cwmp]/google/trunk/lib/CWMP/Queue.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Annotation of /google/trunk/lib/CWMP/Queue.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 227 - (hide annotations)
Sun Nov 25 18:51:26 2007 UTC (16 years, 6 months ago) by dpavlin
File size: 2892 byte(s)
 r266@brr:  dpavlin | 2007-11-25 19:50:35 +0100
 - first pass with Devel::LeakTrace::Fast
 - remove DBM::Deep store
 - CWMP::Queue now supports dir and clean args
 - create new parser for each request

1 dpavlin 194 package CWMP::Queue;
2    
3     use strict;
4     use warnings;
5    
6    
7     use base qw/Class::Accessor/;
8     __PACKAGE__->mk_accessors( qw/
9     id
10 dpavlin 227 dir
11     clean
12 dpavlin 194 debug
13    
14     / );
15    
16     #use Carp qw/confess/;
17     use Data::Dump qw/dump/;
18     use File::Spec;
19 dpavlin 227 use File::Path qw/mkpath rmtree/;
20 dpavlin 194 use IPC::DirQueue;
21 dpavlin 227 use YAML::Syck qw/Dump/;
22 dpavlin 194 use Carp qw/confess/;
23    
24 dpavlin 227 #use Devel::LeakTrace::Fast;
25    
26 dpavlin 194 =head1 NAME
27    
28     CWMP::Queue - implement commands queue for CPE
29    
30     =head1 METHODS
31    
32     =head2 new
33    
34     my $obj = CWMP::Queue->new({
35     id => 'CPE_serial_number',
36 dpavlin 227 dir => 'queue',
37     clean => 1,
38 dpavlin 194 debug => 1
39     });
40    
41     =cut
42    
43     sub new {
44     my $class = shift;
45     my $self = $class->SUPER::new( @_ );
46    
47     die "need id" unless $self->id;
48    
49     warn "created ", __PACKAGE__, "(", dump( @_ ), ") object\n" if $self->debug;
50    
51 dpavlin 227 my $dir = File::Spec->catfile( $self->dir || 'queue', $self->id );
52 dpavlin 194
53 dpavlin 227 if ( -e $dir && $self->clean ) {
54     rmtree $dir || die "can't remove $dir: $!";
55     warn "## clean $dir\n" if $self->debug;
56     }
57    
58 dpavlin 194 if ( ! -e $dir ) {
59     mkpath $dir || die "can't create $dir: $!";
60     print "created new queue $dir\n";
61     }
62    
63     my $id = $self->id;
64    
65     if ( ! defined( $self->{dq}->{$id} ) ) {
66     $self->{dq}->{$id} = IPC::DirQueue->new({
67     dir => $dir,
68     ordered => 1,
69     queue_fanout => 0,
70     });
71     warn "## created queue object for CPE $id path $dir\n" if $self->debug;
72     }
73    
74     return $self;
75     }
76    
77     =head2 enqueue
78    
79     $q->enqueue(
80 dpavlin 199 'CommandToDispatch', {
81     'foo.bar.baz' => 42,
82     }
83 dpavlin 194 );
84    
85     =cut
86    
87     sub enqueue {
88     my $self = shift;
89    
90     my $id = $self->id;
91 dpavlin 199 my ( $dispatch, $args ) = @_;
92    
93     warn "## enqueue( $dispatch with ", dump( $args ), " ) for $id\n" if $self->debug;
94 dpavlin 197
95 dpavlin 199 $self->{dq}->{$id}->enqueue_string( Dump({ dispatch => $dispatch, args => $args }) );
96 dpavlin 194 }
97    
98     =head2 dequeue
99    
100 dpavlin 197 my $job = $q->dequeue;
101 dpavlin 199 my ( $dispatch, $args ) = $job->dispatch;
102 dpavlin 197 # after dispatch is processed
103     $job->finish;
104 dpavlin 194
105     =cut
106    
107     sub dequeue {
108     my $self = shift;
109    
110     my $id = $self->id;
111    
112 dpavlin 197 my $job = $self->{dq}->{$id}->pickup_queued_job();
113     return unless defined $job;
114 dpavlin 194
115 dpavlin 199 warn "## dequeue for $id = ", dump( $job ), " )\n" if $self->debug;
116 dpavlin 194
117 dpavlin 199 return CWMP::Queue::Job->new({ job => $job, debug => $self->debug });
118 dpavlin 194 }
119 dpavlin 197
120 dpavlin 199 =head2 dq
121    
122     Accessor to C<IPC::DirQueue> object
123    
124     my $dq = $q->dq;
125    
126     =cut
127    
128     sub dq {
129     my $self = shift;
130     return $self->{dq}->{$self->id};
131     }
132    
133 dpavlin 197 package CWMP::Queue::Job;
134    
135 dpavlin 199 =head1 CWMP::Queue::Job
136    
137     Single queued job
138    
139     =cut
140    
141 dpavlin 197 use base qw/Class::Accessor/;
142     __PACKAGE__->mk_accessors( qw/
143     job
144 dpavlin 199 debug
145 dpavlin 197 / );
146    
147     use YAML qw/LoadFile/;
148 dpavlin 199 use Data::Dump qw/dump/;
149 dpavlin 197
150 dpavlin 199 =head2 dispatch
151    
152     my ( $dispatch, $args ) = $job->dispatch;
153    
154     =cut
155    
156 dpavlin 197 sub dispatch {
157     my $self = shift;
158     my $path = $self->job->get_data_path || die "get_data_path?";
159 dpavlin 199 my $data = LoadFile( $path ) || die "can't read $path: $!";
160     warn "## dispatch returns ",dump($data),"\n" if $self->debug;
161     return ( $data->{dispatch}, $data->{args} );
162 dpavlin 197 }
163    
164 dpavlin 199 =head2 finish
165    
166     Finish job and remove it from queue
167    
168     $job->finish;
169    
170     =cut
171    
172 dpavlin 197 sub finish {
173     my $self = shift;
174     $self->job->finish;
175 dpavlin 227 return 1;
176 dpavlin 197 }
177    
178 dpavlin 194 1;

  ViewVC Help
Powered by ViewVC 1.1.26