/[webpac2]/trunk/run.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Contents of /trunk/run.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 698 - (show annotations)
Mon Sep 25 11:14:53 2006 UTC (17 years, 7 months ago) by dpavlin
File MIME type: text/plain
File size: 13073 byte(s)
 r990@llin:  dpavlin | 2006-09-25 13:12:42 +0200
 new depends method to track dependencies, input in most places can be input name or
 hash with key 'name' which will be used as input (for exaple, from configuration file),
 database and input names will have correctly stripped quotes,
 begin removal of old lookup support

1 #!/usr/bin/perl -w
2
3 use strict;
4
5 use Cwd qw/abs_path/;
6 use File::Temp qw/tempdir/;
7 use lib './lib';
8
9 use WebPAC::Common 0.02;
10 use WebPAC::Parser 0.04;
11 use WebPAC::Lookup 0.03;
12 use WebPAC::Input 0.11;
13 use WebPAC::Store 0.03;
14 use WebPAC::Normalize 0.11;
15 use WebPAC::Output::TT;
16 use WebPAC::Validate 0.06;
17 use WebPAC::Output::MARC;
18 use WebPAC::Config;
19 use Getopt::Long;
20 use File::Path;
21 use Time::HiRes qw/time/;
22 use File::Slurp;
23 use Data::Dump qw/dump/;
24 use Storable qw/dclone/;
25
26 use Proc::Queue size => 1;
27 use POSIX ":sys_wait_h"; # imports WNOHANG
28
29 =head1 NAME
30
31 run.pl - start WebPAC indexing
32
33 B<this command will probably go away. Don't get used to it!>
34
35 Options:
36
37 =over 4
38
39 =item --offset 42
40
41 start loading (all) databases at offset 42
42
43 =item --limit 100
44
45 limit loading to 100 records
46
47 =item --clean
48
49 remove database and Hyper Estraier index before indexing
50
51 =item --only=database_name/input_filter
52
53 reindex just single database (legacy name is --one)
54
55 C</input_filter> is optional part which can be C<name>
56 or C<type> from input
57
58 =item --config conf/config.yml
59
60 path to YAML configuration file
61
62 =item --stats
63
64 disable indexing, modify_* in configuration and dump statistics about field
65 and subfield usage for each input
66
67 =item --validate path/to/validation_file
68
69 turn on extra validation of imput records, see L<WebPAC::Validation>
70
71 =item --marc-normalize conf/normalize/mapping.pl
72
73 This option specifies normalisation file for MARC creation
74
75 =item --marc-output out/marc/test.marc
76
77 Optional path to output file
78
79 =item --marc-lint
80
81 By default turned on if C<--marc-normalize> is used. You can disable lint
82 messages with C<--no-marc-lint>.
83
84 =item --marc-dump
85
86 Force dump or input and marc record for debugging.
87
88 =item --parallel 4
89
90 Run databases in parallel (aproximatly same as number of processors in
91 machine if you want to use full load)
92
93 =item --only-links
94
95 Create just links
96
97 =item --merge
98
99 Create merged index of databases which have links
100
101 =back
102
103 =cut
104
105 my $offset;
106 my $limit;
107
108 my $clean = 0;
109 my $config_path;
110 my $debug = 0;
111 my $only_filter;
112 my $stats = 0;
113 my $validate_path;
114 my ($marc_normalize, $marc_output);
115 my $marc_lint = 1;
116 my $marc_dump = 0;
117 my $parallel = 0;
118 my $only_links = 0;
119 my $merge = 0;
120
121 my $log = _new WebPAC::Common()->_get_logger();
122
123 GetOptions(
124 "limit=i" => \$limit,
125 "offset=i" => \$offset,
126 "clean" => \$clean,
127 "one=s" => \$only_filter,
128 "only=s" => \$only_filter,
129 "config" => \$config_path,
130 "debug+" => \$debug,
131 "stats" => \$stats,
132 "validate=s" => \$validate_path,
133 "marc-normalize=s" => \$marc_normalize,
134 "marc-output=s" => \$marc_output,
135 "marc-lint!" => \$marc_lint,
136 "marc-dump!" => \$marc_dump,
137 "parallel=i" => \$parallel,
138 "only-links!" => \$only_links,
139 "merge" => \$merge,
140 );
141
142 my $config = new WebPAC::Config( path => $config_path );
143
144 #print "config = ",dump($config) if ($debug);
145
146 die "no databases in config file!\n" unless ($config->databases);
147
148 $log->info( "-" x 79 );
149
150
151 my $estcmd_fh;
152 my $estcmd_path = './estcmd-merge.sh';
153 if ($merge) {
154 open($estcmd_fh, '>', $estcmd_path) || $log->logdie("can't open $estcmd_path: $!");
155 print $estcmd_fh 'cd /data/estraier/_node/ || exit 1',$/;
156 print $estcmd_fh 'sudo /etc/init.d/hyperestraier stop',$/;
157 $log->info("created merge batch file $estcmd_path");
158 }
159
160
161 my $validate;
162 $validate = new WebPAC::Validate(
163 path => $validate_path,
164 ) if ($validate_path);
165
166
167 my $use_indexer = $config->use_indexer;
168 if ($stats) {
169 $log->debug("option --stats disables update of indexing engine...");
170 $use_indexer = undef;
171 } else {
172 $log->info("using $use_indexer indexing engine...");
173 }
174
175 # disable indexing when creating marc
176 $use_indexer = undef if ($marc_normalize);
177
178 # parse normalize files and create source files for lookup and normalization
179
180 my $parser = new WebPAC::Parser( config => $config );
181
182 my $total_rows = 0;
183 my $start_t = time();
184
185 my @links;
186
187 if ($parallel) {
188 $log->info("Using $parallel processes for speedup");
189 Proc::Queue::size($parallel);
190 }
191
192 while (my ($database, $db_config) = each %{ $config->databases }) {
193
194 my ($only_database,$only_input) = split(m#/#, $only_filter) if ($only_filter);
195 next if ($only_database && $database !~ m/$only_database/i);
196
197 if ($parallel) {
198 my $f=fork;
199 if(defined ($f) and $f==0) {
200 $log->info("Created processes $$ for speedup");
201 } else {
202 next;
203 }
204 }
205
206 my $indexer;
207 if ($use_indexer) {
208
209 my $cfg_name = $use_indexer;
210 $cfg_name =~ s/\-.*$//;
211
212 my $indexer_config = $config->get( $cfg_name ) || $log->logdie("can't find '$cfg_name' part in confguration");
213 $indexer_config->{database} = $database;
214 $indexer_config->{clean} = $clean;
215 $indexer_config->{label} = $db_config->{name};
216
217 # force clean if database has links
218 $indexer_config->{clean} = 1 if ($db_config->{links});
219
220 if ($use_indexer eq 'hyperestraier') {
221
222 # open Hyper Estraier database
223 use WebPAC::Output::Estraier '0.10';
224 $indexer = new WebPAC::Output::Estraier( %{ $indexer_config } );
225
226 } elsif ($use_indexer eq 'hyperestraier-native') {
227
228 # open Hyper Estraier database
229 use WebPAC::Output::EstraierNative;
230 $indexer = new WebPAC::Output::EstraierNative( %{ $indexer_config } );
231
232 } elsif ($use_indexer eq 'kinosearch') {
233
234 # open KinoSearch
235 use WebPAC::Output::KinoSearch;
236 $indexer_config->{clean} = 1 unless (-e $indexer_config->{index_path});
237 $indexer = new WebPAC::Output::KinoSearch( %{ $indexer_config } );
238
239 } else {
240 $log->logdie("unknown use_indexer: $use_indexer");
241 }
242
243 $log->logide("can't continue without valid indexer") unless ($indexer);
244 }
245
246
247 #
248 # store Hyper Estraier links to other databases
249 #
250 if (ref($db_config->{links}) eq 'ARRAY' && $use_indexer) {
251 foreach my $link (@{ $db_config->{links} }) {
252 if ($use_indexer eq 'hyperestraier') {
253 if ($merge) {
254 print $estcmd_fh 'sudo -u www-data estcmd merge ' . $database . ' ' . $link->{to},$/;
255 } else {
256 $log->info("saving link $database -> $link->{to} [$link->{credit}]");
257 push @links, sub {
258 $log->info("adding link $database -> $link->{to} [$link->{credit}]");
259 $indexer->add_link(
260 from => $database,
261 to => $link->{to},
262 credit => $link->{credit},
263 );
264 };
265 }
266 } else {
267 $log->warn("NOT IMPLEMENTED WITH $use_indexer: adding link $database -> $link->{to} [$link->{credit}]");
268 }
269 }
270 }
271 next if ($only_links);
272
273
274 #
275 # now WebPAC::Store
276 #
277 my $abs_path = abs_path($0);
278 $abs_path =~ s#/[^/]*$#/#;
279
280 my $db_path = $config->get('webpac')->{db_path} . '/' . $database;
281
282 if ($clean) {
283 $log->info("creating new database '$database' in $db_path");
284 rmtree( $db_path ) || $log->warn("can't remove $db_path: $!");
285 } else {
286 $log->info("working on database '$database' in $db_path");
287 }
288
289 my $db = new WebPAC::Store(
290 path => $db_path,
291 database => $database,
292 debug => $debug,
293 );
294
295
296 #
297 # now, iterate through input formats
298 #
299
300 my @inputs;
301 if (ref($db_config->{input}) eq 'ARRAY') {
302 @inputs = @{ $db_config->{input} };
303 } elsif ($db_config->{input}) {
304 push @inputs, $db_config->{input};
305 } else {
306 $log->info("database $database doesn't have inputs defined");
307 }
308
309 foreach my $input (@inputs) {
310
311 next if ($only_input && ($input->{name} !~ m#$only_input#i && $input->{type} !~ m#$only_input#i));
312
313 my $type = lc($input->{type});
314
315 die "I know only how to handle input types ", join(",", $config->webpac('inputs') ), " not '$type'!\n" unless (grep(/$type/, $config->webpac('inputs')));
316
317 my $input_module = $config->webpac('inputs')->{$type};
318
319 $log->info("working on input '$input->{name}' in $input->{path} [type: $input->{type}] using $input_module",
320 $input->{lookup} ? "lookup '$input->{lookup}'" : ""
321 );
322
323 if ($stats) {
324 # disable modification of records if --stats is in use
325 delete($input->{modify_records});
326 delete($input->{modify_file});
327 }
328
329 warn "depends on: ", dump( $parser->depends($database, $input->{name}), $parser->{depends}, $parser->lookup_create_rules($database, $input->{name}), $parser->{_lookup_create} );
330
331 my $lookup;
332
333 my $input_db = new WebPAC::Input(
334 module => $input_module,
335 encoding => $config->webpac('webpac_encoding'),
336 limit => $limit || $input->{limit},
337 offset => $offset,
338 lookup_coderef => sub {
339 my $rec = shift || return;
340 $lookup->add( $rec );
341 },
342 recode => $input->{recode},
343 stats => $stats,
344 modify_records => $input->{modify_records},
345 modify_file => $input->{modify_file},
346 );
347 $log->logdie("can't create input using $input_module") unless ($input);
348
349 my $maxmfn = $input_db->open(
350 path => $input->{path},
351 code_page => $input->{encoding}, # database encoding
352 %{ $input },
353 );
354
355 my $report_fh;
356 if ($stats || $validate) {
357 my $path = "out/report/" . $database . '-' . $input->{name} . '.txt';
358 open($report_fh, '>', $path) || $log->logdie("can't open $path: $!");
359
360 print $report_fh "Report for database '$database' input '$input->{name}' records ",
361 $offset || 1, "-", $limit || $input->{limit} || $maxmfn, "\n\n";
362 $log->info("Generating report file $path");
363 }
364
365 my @norm_array = ref($input->{normalize}) eq 'ARRAY' ?
366 @{ $input->{normalize} } : ( $input->{normalize} );
367
368 if ($marc_normalize) {
369 @norm_array = ( {
370 path => $marc_normalize,
371 output => $marc_output || 'out/marc/' . $database . '-' . $input->{name} . '.marc',
372 } );
373 }
374
375 foreach my $normalize (@norm_array) {
376
377 my $normalize_path = $normalize->{path} || $log->logdie("can't find normalize path in config");
378
379 $log->logdie("Found '$normalize_path' as normalization file which isn't supported any more!") unless ( $normalize_path =~ m!\.pl$!i );
380
381 my $rules = read_file( $normalize_path ) or die "can't open $normalize_path: $!";
382
383 $log->info("Using $normalize_path for normalization...");
384
385 my $marc = new WebPAC::Output::MARC(
386 path => $normalize->{output},
387 lint => $marc_lint,
388 dump => $marc_dump,
389 ) if ($normalize->{output});
390
391 # reset position in database
392 $input_db->seek(1);
393
394 # generate name of config key for indexer (strip everything after -)
395 my $indexer_config = $use_indexer;
396 $indexer_config =~ s/^(\w+)-?.*$/$1/g if ($indexer_config);
397
398 foreach my $pos ( 0 ... $input_db->size ) {
399
400 my $row = $input_db->fetch || next;
401
402 my $mfn = $row->{'000'}->[0];
403
404 if (! $mfn || $mfn !~ m#^\d+$#) {
405 $log->warn("record $pos doesn't have valid MFN but '$mfn', using $pos");
406 $mfn = $pos;
407 push @{ $row->{'000'} }, $pos;
408 }
409
410
411 if ($validate) {
412 if ( my $errors = $validate->validate_errors( $row, $input_db->dump ) ) {
413 $log->error( "MFN $mfn validation error:\n",
414 $validate->report_error( $errors )
415 );
416 }
417 }
418
419 my $ds_config = dclone($db_config);
420
421 # default values -> database key
422 $ds_config->{_} = $database;
423
424 # current mfn
425 $ds_config->{_mfn} = $mfn;
426
427 # attach current input
428 $ds_config->{input} = $input;
429
430 my $ds = WebPAC::Normalize::data_structure(
431 row => $row,
432 rules => $rules,
433 lookup => $lookup ? $lookup->lookup_hash : undef,
434 config => $ds_config,
435 marc_encoding => 'utf-8',
436 );
437
438 $db->save_ds(
439 id => $mfn,
440 ds => $ds,
441 prefix => $input->{name},
442 ) if ($ds && !$stats);
443
444 $indexer->add(
445 id => $input->{name} . "/" . $mfn,
446 ds => $ds,
447 type => $config->get($indexer_config)->{type},
448 ) if ($indexer && $ds);
449
450 if ($marc) {
451 my $i = 0;
452
453 while (my $fields = WebPAC::Normalize::_get_marc_fields( fetch_next => 1 ) ) {
454 $marc->add(
455 id => $mfn . ( $i ? "/$i" : '' ),
456 fields => $fields,
457 leader => WebPAC::Normalize::marc_leader(),
458 row => $row,
459 );
460 $i++;
461 }
462
463 $log->info("Created $i instances of MFN $mfn\n") if ($i > 1);
464 }
465
466 $total_rows++;
467 }
468
469 if ($validate) {
470 my $errors = $validate->report;
471 if ($errors) {
472 $log->info("validation errors:\n$errors\n" );
473 print $report_fh "$errors\n" if ($report_fh);
474 }
475 }
476
477 if ($stats) {
478 my $s = $input_db->stats;
479 $log->info("statistics of fields usage:\n$s");
480 print $report_fh "Statistics of fields usage:\n$s" if ($report_fh);
481 }
482
483 # close MARC file
484 $marc->finish if ($marc);
485
486 # close report
487 close($report_fh) if ($report_fh)
488 }
489
490 }
491
492 eval { $indexer->finish } if ($indexer && $indexer->can('finish'));
493
494 my $dt = time() - $start_t;
495 $log->info("$total_rows records ", $indexer ? "indexed " : "",
496 sprintf("in %.2f sec [%.2f rec/sec]",
497 $dt, ($total_rows / $dt)
498 )
499 );
500
501
502 # end forked process
503 if ($parallel) {
504 $log->info("parallel process $$ finished");
505 exit(0);
506 }
507
508 }
509
510 if ($parallel) {
511 # wait all children to finish
512 sleep(1) while wait != -1;
513 $log->info("all parallel processes finished");
514 }
515
516 #
517 # handle links or merge after indexing
518 #
519
520 if ($merge) {
521 print $estcmd_fh 'sudo /etc/init.d/hyperestraier start',$/;
522 close($estcmd_fh);
523 chmod 0700, $estcmd_path || $log->warn("can't chmod 0700 $estcmd_path: $!");
524 system $estcmd_path;
525 } else {
526 foreach my $link (@links) {
527 $log->logdie("coderef in link ", Dumper($link), " is ", ref($link), " and not CODE") unless (ref($link) eq 'CODE');
528 $link->();
529 }
530 }

Properties

Name Value
svn:executable *

  ViewVC Help
Powered by ViewVC 1.1.26