/[cwmp]/google/trunk/lib/CWMP/Queue.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /google/trunk/lib/CWMP/Queue.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 227 - (show annotations)
Sun Nov 25 18:51:26 2007 UTC (16 years, 6 months ago) by dpavlin
File size: 2892 byte(s)
 r266@brr:  dpavlin | 2007-11-25 19:50:35 +0100
 - first pass with Devel::LeakTrace::Fast
 - remove DBM::Deep store
 - CWMP::Queue now supports dir and clean args
 - create new parser for each request

1 package CWMP::Queue;
2
3 use strict;
4 use warnings;
5
6
7 use base qw/Class::Accessor/;
8 __PACKAGE__->mk_accessors( qw/
9 id
10 dir
11 clean
12 debug
13
14 / );
15
16 #use Carp qw/confess/;
17 use Data::Dump qw/dump/;
18 use File::Spec;
19 use File::Path qw/mkpath rmtree/;
20 use IPC::DirQueue;
21 use YAML::Syck qw/Dump/;
22 use Carp qw/confess/;
23
24 #use Devel::LeakTrace::Fast;
25
26 =head1 NAME
27
28 CWMP::Queue - implement commands queue for CPE
29
30 =head1 METHODS
31
32 =head2 new
33
34 my $obj = CWMP::Queue->new({
35 id => 'CPE_serial_number',
36 dir => 'queue',
37 clean => 1,
38 debug => 1
39 });
40
41 =cut
42
43 sub new {
44 my $class = shift;
45 my $self = $class->SUPER::new( @_ );
46
47 die "need id" unless $self->id;
48
49 warn "created ", __PACKAGE__, "(", dump( @_ ), ") object\n" if $self->debug;
50
51 my $dir = File::Spec->catfile( $self->dir || 'queue', $self->id );
52
53 if ( -e $dir && $self->clean ) {
54 rmtree $dir || die "can't remove $dir: $!";
55 warn "## clean $dir\n" if $self->debug;
56 }
57
58 if ( ! -e $dir ) {
59 mkpath $dir || die "can't create $dir: $!";
60 print "created new queue $dir\n";
61 }
62
63 my $id = $self->id;
64
65 if ( ! defined( $self->{dq}->{$id} ) ) {
66 $self->{dq}->{$id} = IPC::DirQueue->new({
67 dir => $dir,
68 ordered => 1,
69 queue_fanout => 0,
70 });
71 warn "## created queue object for CPE $id path $dir\n" if $self->debug;
72 }
73
74 return $self;
75 }
76
77 =head2 enqueue
78
79 $q->enqueue(
80 'CommandToDispatch', {
81 'foo.bar.baz' => 42,
82 }
83 );
84
85 =cut
86
87 sub enqueue {
88 my $self = shift;
89
90 my $id = $self->id;
91 my ( $dispatch, $args ) = @_;
92
93 warn "## enqueue( $dispatch with ", dump( $args ), " ) for $id\n" if $self->debug;
94
95 $self->{dq}->{$id}->enqueue_string( Dump({ dispatch => $dispatch, args => $args }) );
96 }
97
98 =head2 dequeue
99
100 my $job = $q->dequeue;
101 my ( $dispatch, $args ) = $job->dispatch;
102 # after dispatch is processed
103 $job->finish;
104
105 =cut
106
107 sub dequeue {
108 my $self = shift;
109
110 my $id = $self->id;
111
112 my $job = $self->{dq}->{$id}->pickup_queued_job();
113 return unless defined $job;
114
115 warn "## dequeue for $id = ", dump( $job ), " )\n" if $self->debug;
116
117 return CWMP::Queue::Job->new({ job => $job, debug => $self->debug });
118 }
119
120 =head2 dq
121
122 Accessor to C<IPC::DirQueue> object
123
124 my $dq = $q->dq;
125
126 =cut
127
128 sub dq {
129 my $self = shift;
130 return $self->{dq}->{$self->id};
131 }
132
133 package CWMP::Queue::Job;
134
135 =head1 CWMP::Queue::Job
136
137 Single queued job
138
139 =cut
140
141 use base qw/Class::Accessor/;
142 __PACKAGE__->mk_accessors( qw/
143 job
144 debug
145 / );
146
147 use YAML qw/LoadFile/;
148 use Data::Dump qw/dump/;
149
150 =head2 dispatch
151
152 my ( $dispatch, $args ) = $job->dispatch;
153
154 =cut
155
156 sub dispatch {
157 my $self = shift;
158 my $path = $self->job->get_data_path || die "get_data_path?";
159 my $data = LoadFile( $path ) || die "can't read $path: $!";
160 warn "## dispatch returns ",dump($data),"\n" if $self->debug;
161 return ( $data->{dispatch}, $data->{args} );
162 }
163
164 =head2 finish
165
166 Finish job and remove it from queue
167
168 $job->finish;
169
170 =cut
171
172 sub finish {
173 my $self = shift;
174 $self->job->finish;
175 return 1;
176 }
177
178 1;

  ViewVC Help
Powered by ViewVC 1.1.26