/[webpac2]/trunk/run.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/run.pl

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 389 by dpavlin, Sun Jan 22 13:38:17 2006 UTC revision 710 by dpavlin, Mon Sep 25 18:58:43 2006 UTC
# Line 4  use strict; Line 4  use strict;
4    
5  use Cwd qw/abs_path/;  use Cwd qw/abs_path/;
6  use File::Temp qw/tempdir/;  use File::Temp qw/tempdir/;
 use Data::Dumper;  
7  use lib './lib';  use lib './lib';
8    
9  use WebPAC::Common 0.02;  use WebPAC::Common 0.02;
10  use WebPAC::Lookup;  use WebPAC::Parser 0.04;
11  use WebPAC::Input 0.03;  use WebPAC::Input 0.13;
12  use WebPAC::Store 0.03;  use WebPAC::Store 0.10;
13  use WebPAC::Normalize::XML;  use WebPAC::Normalize 0.11;
14  use WebPAC::Output::TT;  use WebPAC::Output::TT;
15  use WebPAC::Output::Estraier 0.08;  use WebPAC::Validate 0.06;
16  use YAML qw/LoadFile/;  use WebPAC::Output::MARC;
17    use WebPAC::Config;
18  use Getopt::Long;  use Getopt::Long;
19  use File::Path;  use File::Path;
20  use Time::HiRes qw/time/;  use Time::HiRes qw/time/;
21    use File::Slurp;
22    use Data::Dump qw/dump/;
23    use Storable qw/dclone/;
24    
25    use Proc::Queue size => 1;
26    use POSIX ":sys_wait_h"; # imports WNOHANG
27    
28  =head1 NAME  =head1 NAME
29    
# Line 41  limit loading to 100 records Line 47  limit loading to 100 records
47    
48  remove database and Hyper Estraier index before indexing  remove database and Hyper Estraier index before indexing
49    
50  =item --one=database_name  =item --only=database_name/input_filter
51    
52    reindex just single database (legacy name is --one)
53    
54  reindex just single database  C</input_filter> is optional part which can be C<name>
55    or C<type> from input
56    
57  =item --config conf/config.yml  =item --config conf/config.yml
58    
59  path to YAML configuration file  path to YAML configuration file
60    
61    =item --stats
62    
63    disable indexing, modify_* in configuration and dump statistics about field
64    and subfield usage for each input
65    
66    =item --validate path/to/validation_file
67    
68    turn on extra validation of imput records, see L<WebPAC::Validation>
69    
70    =item --marc-normalize conf/normalize/mapping.pl
71    
72    This option specifies normalisation file for MARC creation
73    
74    =item --marc-output out/marc/test.marc
75    
76    Optional path to output file
77    
78    =item --marc-lint
79    
80    By default turned on if C<--marc-normalize> is used. You can disable lint
81    messages with C<--no-marc-lint>.
82    
83    =item --marc-dump
84    
85    Force dump or input and marc record for debugging.
86    
87    =item --parallel 4
88    
89    Run databases in parallel (aproximatly same as number of processors in
90    machine if you want to use full load)
91    
92    =item --only-links
93    
94    Create just links
95    
96    =item --merge
97    
98    Create merged index of databases which have links
99    
100  =back  =back
101    
102  =cut  =cut
# Line 57  my $offset; Line 105  my $offset;
105  my $limit;  my $limit;
106    
107  my $clean = 0;  my $clean = 0;
108  my $config = 'conf/config.yml';  my $config_path;
109  my $debug = 0;  my $debug = 0;
110  my $one_db_name;  my $only_filter;
111    my $stats = 0;
112    my $validate_path;
113    my ($marc_normalize, $marc_output);
114    my $marc_lint = 1;
115    my $marc_dump = 0;
116    my $parallel = 0;
117    my $only_links = 0;
118    my $merge = 0;
119    
120    my $log = _new WebPAC::Common()->_get_logger();
121    
122  GetOptions(  GetOptions(
123          "limit=i" => \$limit,          "limit=i" => \$limit,
124          "offset=i" => \$offset,          "offset=i" => \$offset,
125          "clean" => \$clean,          "clean" => \$clean,
126          "one=s" => \$one_db_name,          "one=s" => \$only_filter,
127          "config" => \$config,          "only=s" => \$only_filter,
128          "debug" => \$debug,          "config" => \$config_path,
129            "debug+" => \$debug,
130            "stats" => \$stats,
131            "validate=s" => \$validate_path,
132            "marc-normalize=s" => \$marc_normalize,
133            "marc-output=s" => \$marc_output,
134            "marc-lint!" => \$marc_lint,
135            "marc-dump!" => \$marc_dump,
136            "parallel=i" => \$parallel,
137            "only-links!" => \$only_links,
138            "merge" => \$merge,
139  );  );
140    
141  $config = LoadFile($config);  my $config = new WebPAC::Config( path => $config_path );
142    
143    #print "config = ",dump($config) if ($debug);
144    
145    die "no databases in config file!\n" unless ($config->databases);
146    
147    $log->info( "-" x 79 );
148    
149    
150    my $estcmd_fh;
151    my $estcmd_path = './estcmd-merge.sh';
152    if ($merge) {
153            open($estcmd_fh, '>', $estcmd_path) || $log->logdie("can't open $estcmd_path: $!");
154            print $estcmd_fh 'cd /data/estraier/_node/ || exit 1',$/;
155            print $estcmd_fh 'sudo /etc/init.d/hyperestraier stop',$/;
156            $log->info("created merge batch file $estcmd_path");
157    }
158    
 print "config = ",Dumper($config) if ($debug);  
159    
160  die "no databases in config file!\n" unless ($config->{databases});  my $validate;
161    $validate = new WebPAC::Validate(
162            path => $validate_path,
163    ) if ($validate_path);
164    
165    
166    my $use_indexer = $config->use_indexer;
167    if ($stats) {
168            $log->debug("option --stats disables update of indexing engine...");
169            $use_indexer = undef;
170    } else {
171            $log->info("using $use_indexer indexing engine...");
172    }
173    
174    # disable indexing when creating marc
175    $use_indexer = undef if ($marc_normalize);
176    
177    # parse normalize files and create source files for lookup and normalization
178    
179    my $parser = new WebPAC::Parser( config => $config );
180    
181  my $total_rows = 0;  my $total_rows = 0;
182  my $start_t = time();  my $start_t = time();
183    
184  while (my ($database, $db_config) = each %{ $config->{databases} }) {  my @links;
185    
186    if ($parallel) {
187            $log->info("Using $parallel processes for speedup");
188            Proc::Queue::size($parallel);
189    }
190    
191    sub create_ds_config {
192            my ($db_config, $database, $input, $mfn) = @_;
193            my $c = dclone( $db_config );
194            $c->{_} = $database || $log->logconfess("need database");
195            $c->{_mfn} = $mfn || $log->logconfess("need mfn");
196            $c->{input} = $input || $log->logconfess("need input");
197            return $c;
198    }
199    
200    while (my ($database, $db_config) = each %{ $config->databases }) {
201    
202            my ($only_database,$only_input) = split(m#/#, $only_filter) if ($only_filter);
203            next if ($only_database && $database !~ m/$only_database/i);
204    
205            if ($parallel) {
206                    my $f=fork;
207                    if(defined ($f) and $f==0) {
208                            $log->info("Created processes $$ for speedup");
209                    } else {
210                            next;
211                    }
212            }
213    
214            my $indexer;
215            if ($use_indexer) {
216    
217                    my $cfg_name = $use_indexer;
218                    $cfg_name =~ s/\-.*$//;
219    
220                    my $indexer_config = $config->get( $cfg_name ) || $log->logdie("can't find '$cfg_name' part in confguration");
221                    $indexer_config->{database} = $database;
222                    $indexer_config->{clean} = $clean;
223                    $indexer_config->{label} = $db_config->{name};
224    
225                    # force clean if database has links
226                    $indexer_config->{clean} = 1 if ($db_config->{links});
227    
228                    if ($use_indexer eq 'hyperestraier') {
229    
230          next if ($one_db_name && $database !~ m/$one_db_name/i);                          # open Hyper Estraier database
231                            use WebPAC::Output::Estraier '0.10';
232                            $indexer = new WebPAC::Output::Estraier( %{ $indexer_config } );
233                    
234                    } elsif ($use_indexer eq 'hyperestraier-native') {
235    
236                            # open Hyper Estraier database
237                            use WebPAC::Output::EstraierNative;
238                            $indexer = new WebPAC::Output::EstraierNative( %{ $indexer_config } );
239    
240                    } elsif ($use_indexer eq 'kinosearch') {
241    
242                            # open KinoSearch
243                            use WebPAC::Output::KinoSearch;
244                            $indexer_config->{clean} = 1 unless (-e $indexer_config->{index_path});
245                            $indexer = new WebPAC::Output::KinoSearch( %{ $indexer_config } );
246    
247                    } else {
248                            $log->logdie("unknown use_indexer: $use_indexer");
249                    }
250    
251                    $log->logide("can't continue without valid indexer") unless ($indexer);
252            }
253    
         my $log = _new WebPAC::Common()->_get_logger();  
254    
255          #          #
256          # open Hyper Estraier database          # store Hyper Estraier links to other databases
257          #          #
258            if (ref($db_config->{links}) eq 'ARRAY' && $use_indexer) {
259                    foreach my $link (@{ $db_config->{links} }) {
260                            if ($use_indexer eq 'hyperestraier') {
261                                    if ($merge) {
262                                            print $estcmd_fh 'sudo -u www-data estcmd merge ' . $database . ' ' . $link->{to},$/;
263                                    } else {
264                                            $log->info("saving link $database -> $link->{to} [$link->{credit}]");
265                                            push @links, sub {
266                                                    $log->info("adding link $database -> $link->{to} [$link->{credit}]");
267                                                    $indexer->add_link(
268                                                            from => $database,
269                                                            to => $link->{to},
270                                                            credit => $link->{credit},
271                                                    );
272                                            };
273                                    }
274                            } else {
275                                    $log->warn("NOT IMPLEMENTED WITH $use_indexer: adding link $database -> $link->{to} [$link->{credit}]");
276                            }
277                    }
278            }
279            next if ($only_links);
280    
         my $est_config = $config->{hyperestraier} || $log->logdie("can't find 'hyperestraier' part in confguration");  
         $est_config->{database} = $database;  
         $est_config->{clean} = $clean;  
   
         my $est = new WebPAC::Output::Estraier( %{ $est_config } );  
281    
282          #          #
283          # now WebPAC::Store          # now WebPAC::Store
# Line 101  while (my ($database, $db_config) = each Line 285  while (my ($database, $db_config) = each
285          my $abs_path = abs_path($0);          my $abs_path = abs_path($0);
286          $abs_path =~ s#/[^/]*$#/#;          $abs_path =~ s#/[^/]*$#/#;
287    
288          my $db_path = $config->{webpac}->{db_path} . '/' . $database;          my $db_path = $config->get('webpac')->{db_path} . '/' . $database;
289    
290          if ($clean) {          if ($clean) {
291                  $log->info("creating new database $database in $db_path");                  $log->info("creating new database '$database' in $db_path");
292                  rmtree( $db_path ) || $log->warn("can't remove $db_path: $!");                  rmtree( $db_path ) || $log->warn("can't remove $db_path: $!");
293          } else {          } else {
294                  $log->info("working on $database in $db_path");                  $log->info("working on database '$database' in $db_path");
295          }          }
296    
297          my $db = new WebPAC::Store(          my $store = new WebPAC::Store(
298                  path => $db_path,                  path => $db_path,
299                  database => $database,                  database => $database,
300                  debug => $debug,                  debug => $debug,
# Line 130  while (my ($database, $db_config) = each Line 314  while (my ($database, $db_config) = each
314                  $log->info("database $database doesn't have inputs defined");                  $log->info("database $database doesn't have inputs defined");
315          }          }
316    
         my @supported_inputs = keys %{ $config->{webpac}->{inputs} };  
   
317          foreach my $input (@inputs) {          foreach my $input (@inputs) {
318    
319                    my $input_name = $input->{name} || $log->logdie("input without a name isn't valid: ",dump($input));
320    
321                    next if ($only_input && ($input_name !~ m#$only_input#i && $input->{type} !~ m#$only_input#i));
322    
323                  my $type = lc($input->{type});                  my $type = lc($input->{type});
324    
325                  die "I know only how to handle input types ", join(",", @supported_inputs), " not '$type'!\n" unless (grep(/$type/, @supported_inputs));                  die "I know only how to handle input types ", join(",", $config->webpac('inputs') ), " not '$type'!\n" unless (grep(/$type/, $config->webpac('inputs')));
326    
327                  my $lookup = new WebPAC::Lookup(                  my $input_module = $config->webpac('inputs')->{$type};
                         lookup_file => $input->{lookup},  
                 );  
328    
329                  my $input_module = $config->{webpac}->{inputs}->{$type};                  my @lookups = $parser->have_lookup_create($database, $input);
330    
331                  $log->info("working on input $input->{path} [$input->{type}] using $input_module");                  $log->info("working on input '$input_name' in $input->{path} [type: $input->{type}] using $input_module",
332                            @lookups ? " creating lookups: ".join(", ", @lookups) : ""
333                    );
334    
335                    if ($stats) {
336                            # disable modification of records if --stats is in use
337                            delete($input->{modify_records});
338                            delete($input->{modify_file});
339                    }
340    
341                  my $input_db = new WebPAC::Input(                  my $input_db = new WebPAC::Input(
342                          module => $input_module,                          module => $input_module,
343                          code_page => $config->{webpac}->{webpac_encoding},                          encoding => $config->webpac('webpac_encoding'),
344                          limit => $limit || $input->{limit},                          limit => $limit || $input->{limit},
345                          offset => $offset,                          offset => $offset,
346                          lookup => $lookup,                          recode => $input->{recode},
347                            stats => $stats,
348                            modify_records => $input->{modify_records},
349                            modify_file => $input->{modify_file},
350                  );                  );
351                  $log->logdie("can't create input using $input_module") unless ($input);                  $log->logdie("can't create input using $input_module") unless ($input);
352    
353                    if (defined( $input->{lookup} )) {
354                            $log->warn("$database/$input_name has depriciated lookup definition, removing it...");
355                            delete( $input->{lookup} );
356                    }
357    
358                    my $lookup;
359                    my $lookup_coderef;
360    
361                    if (@lookups) {
362    
363                            my $rules = $parser->lookup_create_rules($database, $input) || $log->logdie("no rules found for $database/$input");
364    
365                            $lookup_coderef = sub {
366                                    my $rec = shift || die "need rec!";
367                                    my $mfn = $rec->{'000'}->[0] || die "need mfn in 000";
368    
369                                    WebPAC::Normalize::data_structure(
370                                            row => $rec,
371                                            rules => $rules,
372                                            lookup => $lookup,
373                                            config => create_ds_config( $db_config, $database, $input, $mfn ),
374                                    );
375    
376                                    warn "current lookup = ", dump($lookup) if ($lookup);
377                            };
378    
379                            WebPAC::Normalize::_set_lookup( undef );
380    
381                            $log->debug("created lookup_coderef using:\n$rules");
382    
383                    };
384    
385                  my $maxmfn = $input_db->open(                  my $maxmfn = $input_db->open(
386                          path => $input->{path},                          path => $input->{path},
387                          code_page => $input->{encoding},        # database encoding                          code_page => $input->{encoding},        # database encoding
388                            lookup_coderef => $lookup_coderef,
389                            %{ $input },
390                  );                  );
391    
392                  my $n = new WebPAC::Normalize::XML(                  my $lookup_data = WebPAC::Normalize::_get_lookup();
393                  #       filter => { 'foo' => sub { shift } },  
394                          db => $db,                  $log->debug("created following lookups: ", dump( $lookup_data ));
                         lookup_regex => $lookup->regex,  
                         lookup => $lookup,  
                         prefix => $input->{name},  
                 );  
395    
396                  my $normalize_path = $input->{normalize}->{path};                  foreach my $key (keys %$lookup_data) {
397                            $store->save_lookup( $database, $input_name, $key, $lookup_data->{$key} );
398                    }
399    
400                    my $report_fh;
401                    if ($stats || $validate) {
402                            my $path = "out/report/${database}-${input_name}.txt";
403                            open($report_fh, '>', $path) || $log->logdie("can't open $path: $!");
404    
405                            print $report_fh "Report for database '$database' input '$input_name' records ",
406                                    $offset || 1, "-", $limit || $input->{limit} || $maxmfn, "\n\n";
407                            $log->info("Generating report file $path");
408                    }
409    
410                    my @norm_array = ref($input->{normalize}) eq 'ARRAY' ?
411                            @{ $input->{normalize} } : ( $input->{normalize} );
412    
413                    if ($marc_normalize) {
414                            @norm_array = ( {
415                                    path => $marc_normalize,
416                                    output => $marc_output || "out/marc/${database}-${input_name}.marc",
417                            } );
418                    }
419    
420                    foreach my $normalize (@norm_array) {
421    
422                            my $normalize_path = $normalize->{path} || $log->logdie("can't find normalize path in config");
423    
424                            $log->logdie("Found '$normalize_path' as normalization file which isn't supported any more!") unless ( $normalize_path =~ m!\.pl$!i );
425    
426                            my $rules = read_file( $normalize_path ) or die "can't open $normalize_path: $!";
427    
428                            $log->info("Using $normalize_path for normalization...");
429    
430                            my $marc = new WebPAC::Output::MARC(
431                                    path => $normalize->{output},
432                                    lint => $marc_lint,
433                                    dump => $marc_dump,
434                            ) if ($normalize->{output});
435    
436                            # reset position in database
437                            $input_db->seek(1);
438    
439                            # generate name of config key for indexer (strip everything after -)
440                            my $indexer_config = $use_indexer;
441                            $indexer_config =~ s/^(\w+)-?.*$/$1/g if ($indexer_config);
442    
443                            foreach my $pos ( 0 ... $input_db->size ) {
444    
445                                    my $row = $input_db->fetch || next;
446    
447                                    my $mfn = $row->{'000'}->[0];
448    
449                                    if (! $mfn || $mfn !~ m#^\d+$#) {
450                                            $log->warn("record $pos doesn't have valid MFN but '$mfn', using $pos");
451                                            $mfn = $pos;
452                                            push @{ $row->{'000'} }, $pos;
453                                    }
454    
455    
456                                    if ($validate) {
457                                            if ( my $errors = $validate->validate_errors( $row, $input_db->dump ) ) {
458                                                    $log->error( "MFN $mfn validation error:\n",
459                                                            $validate->report_error( $errors )
460                                                    );
461                                            }
462                                    }
463    
464                                    my $ds = WebPAC::Normalize::data_structure(
465                                            row => $row,
466                                            rules => $rules,
467                                            lookup => $lookup ? $lookup->lookup_hash : undef,
468                                            config => create_ds_config( $db_config, $database, $input, $mfn ),
469                                            marc_encoding => 'utf-8',
470                                    );
471    
472                                    $store->save_ds(
473                                            id => $mfn,
474                                            ds => $ds,
475                                            prefix => $input_name,
476                                    ) if ($ds && !$stats);
477    
478                                    $indexer->add(
479                                            id => "${input_name}/${mfn}",
480                                            ds => $ds,
481                                            type => $config->get($indexer_config)->{type},
482                                    ) if ($indexer && $ds);
483    
484                                    if ($marc) {
485                                            my $i = 0;
486    
487                                            while (my $fields = WebPAC::Normalize::_get_marc_fields( fetch_next => 1 ) ) {
488                                                    $marc->add(
489                                                            id => $mfn . ( $i ? "/$i" : '' ),
490                                                            fields => $fields,
491                                                            leader => WebPAC::Normalize::marc_leader(),
492                                                            row => $row,
493                                                    );
494                                                    $i++;
495                                            }
496    
497                                            $log->info("Created $i instances of MFN $mfn\n") if ($i > 1);
498                                    }
499    
500                                    $total_rows++;
501                            }
502    
503                  if ($normalize_path =~ m/\.xml$/i) {                          if ($validate) {
504                          $n->open(                                  my $errors = $validate->report;
505                                  tag => $input->{normalize}->{tag},                                  if ($errors) {
506                                  xml_file => $input->{normalize}->{path},                                          $log->info("validation errors:\n$errors\n" );
507                          );                                          print $report_fh "$errors\n" if ($report_fh);
508                  } elsif ($normalize_path =~ m/\.(?:yml|yaml)$/i) {                                  }
                         $n->open_yaml(  
                                 path => $normalize_path,  
                                 tag => $input->{normalize}->{tag},  
                         );  
                 }  
   
                 foreach my $pos ( 0 ... $input_db->size ) {  
   
                         my $row = $input_db->fetch || next;  
   
                         my $mfn = $row->{'000'}->[0];  
   
                         if (! $mfn || $mfn !~ m#^\d+$#) {  
                                 $log->warn("record $pos doesn't have valid MFN but '$mfn', using $pos");  
                                 $mfn = $pos;  
                                 push @{ $row->{'000'} }, $pos;  
509                          }                          }
510    
511                          my $ds = $n->data_structure($row);                          if ($stats) {
512                                    my $s = $input_db->stats;
513                                    $log->info("statistics of fields usage:\n$s");
514                                    print $report_fh "Statistics of fields usage:\n$s" if ($report_fh);
515                            }
516    
517                          $est->add(                          # close MARC file
518                                  id => $input->{name} . "/" . $mfn,                          $marc->finish if ($marc);
                                 ds => $ds,  
                                 type => $config->{hyperestraier}->{type},  
                         );  
519    
520                          $total_rows++;                          # close report
521                            close($report_fh) if ($report_fh)
522                  }                  }
523    
524          };          }
525    
526            eval { $indexer->finish } if ($indexer && $indexer->can('finish'));
527    
528          my $dt = time() - $start_t;          my $dt = time() - $start_t;
529          $log->info("$total_rows records indexed in " .          $log->info("$total_rows records ", $indexer ? "indexed " : "",
530                  sprintf("%.2f sec [%.2f rec/sec]",                  sprintf("in %.2f sec [%.2f rec/sec]",
531                          $dt, ($total_rows / $dt)                          $dt, ($total_rows / $dt)
532                  )                  )
533          );          );
534    
535          #  
536          # add Hyper Estraier links to other databases          # end forked process
537          #          if ($parallel) {
538          if (ref($db_config->{links}) eq 'ARRAY') {                  $log->info("parallel process $$ finished");
539                  foreach my $link (@{ $db_config->{links} }) {                  exit(0);
                         $log->info("adding link $database -> $link->{to} [$link->{credit}]");  
                         $est->add_link(  
                                 from => $database,  
                                 to => $link->{to},  
                                 credit => $link->{credit},  
                         );  
                 }  
540          }          }
541    
542  }  }
543    
544    if ($parallel) {
545            # wait all children to finish
546            sleep(1) while wait != -1;
547            $log->info("all parallel processes finished");
548    }
549    
550    #
551    # handle links or merge after indexing
552    #
553    
554    if ($merge) {
555            print $estcmd_fh 'sudo /etc/init.d/hyperestraier start',$/;
556            close($estcmd_fh);
557            chmod 0700, $estcmd_path || $log->warn("can't chmod 0700 $estcmd_path: $!");
558            system $estcmd_path;
559    } else {
560            foreach my $link (@links) {
561                    $log->logdie("coderef in link ", Dumper($link), " is ", ref($link), " and not CODE") unless (ref($link) eq 'CODE');
562                    $link->();
563            }
564    }

Legend:
Removed from v.389  
changed lines
  Added in v.710

  ViewVC Help
Powered by ViewVC 1.1.26