/[webpac2]/trunk/run.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/run.pl

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 389 by dpavlin, Sun Jan 22 13:38:17 2006 UTC revision 769 by dpavlin, Fri Nov 3 20:21:09 2006 UTC
# Line 4  use strict; Line 4  use strict;
4    
5  use Cwd qw/abs_path/;  use Cwd qw/abs_path/;
6  use File::Temp qw/tempdir/;  use File::Temp qw/tempdir/;
 use Data::Dumper;  
7  use lib './lib';  use lib './lib';
8    
9  use WebPAC::Common 0.02;  use WebPAC::Common 0.02;
10  use WebPAC::Lookup;  use WebPAC::Parser 0.08;
11  use WebPAC::Input 0.03;  use WebPAC::Input 0.14;
12  use WebPAC::Store 0.03;  use WebPAC::Store 0.14;
13  use WebPAC::Normalize::XML;  use WebPAC::Normalize 0.22;
14  use WebPAC::Output::TT;  use WebPAC::Output::TT;
15  use WebPAC::Output::Estraier 0.08;  use WebPAC::Validate 0.06;
16  use YAML qw/LoadFile/;  use WebPAC::Output::MARC;
17    use WebPAC::Config;
18  use Getopt::Long;  use Getopt::Long;
19  use File::Path;  use File::Path;
20  use Time::HiRes qw/time/;  use Time::HiRes qw/time/;
21    use File::Slurp;
22    use Data::Dump qw/dump/;
23    use Storable qw/dclone/;
24    
25    use Proc::Queue size => 1;
26    use POSIX ":sys_wait_h"; # imports WNOHANG
27    
28  =head1 NAME  =head1 NAME
29    
# Line 41  limit loading to 100 records Line 47  limit loading to 100 records
47    
48  remove database and Hyper Estraier index before indexing  remove database and Hyper Estraier index before indexing
49    
50  =item --one=database_name  =item --only=database_name/input_filter
51    
52    reindex just single database (legacy name is --one)
53    
54  reindex just single database  C</input_filter> is optional part which can be C<name>
55    or C<type> from input
56    
57  =item --config conf/config.yml  =item --config conf/config.yml
58    
59  path to YAML configuration file  path to YAML configuration file
60    
61    =item --stats
62    
63    disable indexing, modify_* in configuration and dump statistics about field
64    and subfield usage for each input
65    
66    =item --validate path/to/validation_file
67    
68    turn on extra validation of imput records, see L<WebPAC::Validation>
69    
70    =item --marc-lint
71    
72    By default turned on if normalisation file has C<marc*> directives. You can disable lint
73    messages with C<--no-marc-lint>.
74    
75    =item --marc-dump
76    
77    Force dump or input and marc record for debugging.
78    
79    =item --parallel 4
80    
81    Run databases in parallel (aproximatly same as number of processors in
82    machine if you want to use full load)
83    
84    =item --only-links
85    
86    Create just links
87    
88    =item --merge
89    
90    Create merged index of databases which have links
91    
92  =back  =back
93    
94  =cut  =cut
# Line 57  my $offset; Line 97  my $offset;
97  my $limit;  my $limit;
98    
99  my $clean = 0;  my $clean = 0;
100  my $config = 'conf/config.yml';  my $config_path;
101  my $debug = 0;  my $debug = 0;
102  my $one_db_name;  my $only_filter;
103    my $stats = 0;
104    my $validate_path;
105    my $marc_lint = 1;
106    my $marc_dump = 0;
107    my $parallel = 0;
108    my $only_links = 0;
109    my $merge = 0;
110    
111    my $log = _new WebPAC::Common()->_get_logger();
112    
113  GetOptions(  GetOptions(
114          "limit=i" => \$limit,          "limit=i" => \$limit,
115          "offset=i" => \$offset,          "offset=i" => \$offset,
116          "clean" => \$clean,          "clean" => \$clean,
117          "one=s" => \$one_db_name,          "one=s" => \$only_filter,
118          "config" => \$config,          "only=s" => \$only_filter,
119          "debug" => \$debug,          "config" => \$config_path,
120            "debug+" => \$debug,
121            "stats" => \$stats,
122            "validate=s" => \$validate_path,
123            "marc-lint!" => \$marc_lint,
124            "marc-dump!" => \$marc_dump,
125            "parallel=i" => \$parallel,
126            "only-links!" => \$only_links,
127            "merge" => \$merge,
128  );  );
129    
130  $config = LoadFile($config);  my $config = new WebPAC::Config( path => $config_path );
131    
132    #print "config = ",dump($config) if ($debug);
133    
134    die "no databases in config file!\n" unless ($config->databases);
135    
136    $log->info( "-" x 79 );
137    
138    
139    my $estcmd_fh;
140    my $estcmd_path = './estcmd-merge.sh';
141    if ($merge) {
142            open($estcmd_fh, '>', $estcmd_path) || $log->logdie("can't open $estcmd_path: $!");
143            print $estcmd_fh 'cd /data/estraier/_node/ || exit 1',$/;
144            print $estcmd_fh 'sudo /etc/init.d/hyperestraier stop',$/;
145            $log->info("created merge batch file $estcmd_path");
146    }
147    
148    
149  print "config = ",Dumper($config) if ($debug);  my $validate;
150    $validate = new WebPAC::Validate(
151            path => $validate_path,
152    ) if ($validate_path);
153    
154  die "no databases in config file!\n" unless ($config->{databases});  
155    my $use_indexer = $config->use_indexer;
156    $stats ||= $validate;
157    if ($stats) {
158            $log->debug("disabled indexing for stats collection");
159            $use_indexer = undef;
160    } else {
161            $log->info("using $use_indexer indexing engine...");
162    }
163    
164    # parse normalize files and create source files for lookup and normalization
165    
166    my $parser = new WebPAC::Parser( config => $config );
167    
168  my $total_rows = 0;  my $total_rows = 0;
169  my $start_t = time();  my $start_t = time();
170    
171  while (my ($database, $db_config) = each %{ $config->{databases} }) {  my @links;
172    
173          next if ($one_db_name && $database !~ m/$one_db_name/i);  if ($parallel) {
174            $log->info("Using $parallel processes for speedup");
175            Proc::Queue::size($parallel);
176    }
177    
178    sub create_ds_config {
179            my ($db_config, $database, $input, $mfn) = @_;
180            my $c = dclone( $db_config );
181            $c->{_} = $database || $log->logconfess("need database");
182            $c->{_mfn} = $mfn || $log->logconfess("need mfn");
183            $c->{input} = $input || $log->logconfess("need input");
184            return $c;
185    }
186    
187    while (my ($database, $db_config) = each %{ $config->databases }) {
188    
189            my ($only_database,$only_input) = split(m#/#, $only_filter) if ($only_filter);
190            next if ($only_database && $database !~ m/$only_database/i);
191    
192            if ($parallel) {
193                    my $f=fork;
194                    if(defined ($f) and $f==0) {
195                            $log->info("Created processes $$ for speedup");
196                    } else {
197                            next;
198                    }
199            }
200    
201            my $indexer;
202            if ($use_indexer && $parser->have_rules( 'search', $database )) {
203    
204                    my $cfg_name = $use_indexer;
205                    $cfg_name =~ s/\-.*$//;
206    
207                    my $indexer_config = $config->get( $cfg_name ) || $log->logdie("can't find '$cfg_name' part in confguration");
208                    $indexer_config->{database} = $database;
209                    $indexer_config->{clean} = $clean;
210                    $indexer_config->{label} = $db_config->{name};
211    
212                    # force clean if database has links
213                    $indexer_config->{clean} = 1 if ($db_config->{links});
214    
215                    if ($use_indexer eq 'hyperestraier') {
216    
217                            # open Hyper Estraier database
218                            use WebPAC::Output::Estraier '0.10';
219                            $indexer = new WebPAC::Output::Estraier( %{ $indexer_config } );
220                    
221                    } elsif ($use_indexer eq 'hyperestraier-native') {
222    
223                            # open Hyper Estraier database
224                            use WebPAC::Output::EstraierNative;
225                            $indexer = new WebPAC::Output::EstraierNative( %{ $indexer_config } );
226    
227                    } elsif ($use_indexer eq 'kinosearch') {
228    
229                            # open KinoSearch
230                            use WebPAC::Output::KinoSearch;
231                            $indexer_config->{clean} = 1 unless (-e $indexer_config->{index_path});
232                            $indexer = new WebPAC::Output::KinoSearch( %{ $indexer_config } );
233    
234                    } else {
235                            $log->logdie("unknown use_indexer: $use_indexer");
236                    }
237    
238                    $log->logide("can't continue without valid indexer") unless ($indexer);
239            }
240    
         my $log = _new WebPAC::Common()->_get_logger();  
241    
242          #          #
243          # open Hyper Estraier database          # store Hyper Estraier links to other databases
244          #          #
245            if (ref($db_config->{links}) eq 'ARRAY' && $use_indexer) {
246                    foreach my $link (@{ $db_config->{links} }) {
247                            if ($use_indexer eq 'hyperestraier') {
248                                    if ($merge) {
249                                            print $estcmd_fh 'sudo -u www-data estcmd merge ' . $database . ' ' . $link->{to},$/;
250                                    } else {
251                                            $log->info("saving link $database -> $link->{to} [$link->{credit}]");
252                                            push @links, sub {
253                                                    $log->info("adding link $database -> $link->{to} [$link->{credit}]");
254                                                    $indexer->add_link(
255                                                            from => $database,
256                                                            to => $link->{to},
257                                                            credit => $link->{credit},
258                                                    );
259                                            };
260                                    }
261                            } else {
262                                    $log->warn("NOT IMPLEMENTED WITH $use_indexer: adding link $database -> $link->{to} [$link->{credit}]");
263                            }
264                    }
265            }
266            next if ($only_links);
267    
         my $est_config = $config->{hyperestraier} || $log->logdie("can't find 'hyperestraier' part in confguration");  
         $est_config->{database} = $database;  
         $est_config->{clean} = $clean;  
   
         my $est = new WebPAC::Output::Estraier( %{ $est_config } );  
268    
269          #          #
270          # now WebPAC::Store          # now WebPAC::Store
# Line 101  while (my ($database, $db_config) = each Line 272  while (my ($database, $db_config) = each
272          my $abs_path = abs_path($0);          my $abs_path = abs_path($0);
273          $abs_path =~ s#/[^/]*$#/#;          $abs_path =~ s#/[^/]*$#/#;
274    
275          my $db_path = $config->{webpac}->{db_path} . '/' . $database;          my $db_path = $config->webpac('db_path');
276    
277          if ($clean) {          if ($clean) {
278                  $log->info("creating new database $database in $db_path");                  $log->info("creating new database '$database' in $db_path");
279                  rmtree( $db_path ) || $log->warn("can't remove $db_path: $!");                  rmtree( $db_path ) || $log->warn("can't remove $db_path: $!");
280          } else {          } else {
281                  $log->info("working on $database in $db_path");                  $log->info("working on database '$database' in $db_path");
282          }          }
283    
284          my $db = new WebPAC::Store(          my $store = new WebPAC::Store(
285                  path => $db_path,                  path => $db_path,
                 database => $database,  
286                  debug => $debug,                  debug => $debug,
287          );          );
288    
# Line 130  while (my ($database, $db_config) = each Line 300  while (my ($database, $db_config) = each
300                  $log->info("database $database doesn't have inputs defined");                  $log->info("database $database doesn't have inputs defined");
301          }          }
302    
         my @supported_inputs = keys %{ $config->{webpac}->{inputs} };  
   
303          foreach my $input (@inputs) {          foreach my $input (@inputs) {
304    
305                    my $input_name = $input->{name} || $log->logdie("input without a name isn't valid: ",dump($input));
306    
307                    next if ($only_input && ($input_name !~ m#$only_input#i && $input->{type} !~ m#$only_input#i));
308    
309                  my $type = lc($input->{type});                  my $type = lc($input->{type});
310    
311                  die "I know only how to handle input types ", join(",", @supported_inputs), " not '$type'!\n" unless (grep(/$type/, @supported_inputs));                  die "I know only how to handle input types ", join(",", $config->webpac('inputs') ), " not '$type'!\n" unless (grep(/$type/, $config->webpac('inputs')));
312    
313                  my $lookup = new WebPAC::Lookup(                  my $input_module = $config->webpac('inputs')->{$type};
                         lookup_file => $input->{lookup},  
                 );  
314    
315                  my $input_module = $config->{webpac}->{inputs}->{$type};                  my @lookups = $parser->have_lookup_create($database, $input);
316    
317                  $log->info("working on input $input->{path} [$input->{type}] using $input_module");                  $log->info("working on input '$input_name' in $input->{path} [type: $input->{type}] using $input_module",
318                            @lookups ? " creating lookups: ".join(", ", @lookups) : ""
319                    );
320    
321                    if ($stats) {
322                            # disable modification of records if --stats is in use
323                            delete($input->{modify_records});
324                            delete($input->{modify_file});
325                    }
326    
327                  my $input_db = new WebPAC::Input(                  my $input_db = new WebPAC::Input(
328                          module => $input_module,                          module => $input_module,
329                          code_page => $config->{webpac}->{webpac_encoding},                          encoding => $config->webpac('webpac_encoding'),
330                          limit => $limit || $input->{limit},                          limit => $limit || $input->{limit},
331                          offset => $offset,                          offset => $offset,
332                          lookup => $lookup,                          recode => $input->{recode},
333                            stats => $stats,
334                            modify_records => $input->{modify_records},
335                            modify_file => $input->{modify_file},
336                  );                  );
337                  $log->logdie("can't create input using $input_module") unless ($input);                  $log->logdie("can't create input using $input_module") unless ($input);
338    
339                    if (defined( $input->{lookup} )) {
340                            $log->warn("$database/$input_name has depriciated lookup definition, removing it...");
341                            delete( $input->{lookup} );
342                    }
343    
344                    my $lookup_coderef;
345    
346                    if (@lookups) {
347    
348                            my $rules = $parser->lookup_create_rules($database, $input) || $log->logdie("no rules found for $database/$input");
349    
350                            $lookup_coderef = sub {
351                                    my $rec = shift || die "need rec!";
352                                    my $mfn = $rec->{'000'}->[0] || die "need mfn in 000";
353    
354                                    WebPAC::Normalize::data_structure(
355                                            row => $rec,
356                                            rules => $rules,
357                                            config => create_ds_config( $db_config, $database, $input, $mfn ),
358                                    );
359    
360                                    #warn "current lookup: ", dump(WebPAC::Normalize::_get_lookup());
361                            };
362    
363                            WebPAC::Normalize::_set_lookup( undef );
364    
365                            $log->debug("created lookup_coderef using:\n$rules");
366    
367                    };
368    
369                    my $lookup_jar;
370    
371                  my $maxmfn = $input_db->open(                  my $maxmfn = $input_db->open(
372                          path => $input->{path},                          path => $input->{path},
373                          code_page => $input->{encoding},        # database encoding                          code_page => $input->{encoding},        # database encoding
374                  );                          lookup_coderef => $lookup_coderef,
375                            lookup => $lookup_jar,
376                            %{ $input },
377                            load_row => sub {
378                                    my $a = shift;
379                                    return $store->load_row(
380                                            database => $database,
381                                            input => $input_name,
382                                            id => $a->{id},
383                                    );
384                            },
385                            save_row => sub {
386                                    my $a = shift;
387                                    return $store->save_row(
388                                            database => $database,
389                                            input => $input_name,
390                                            id => $a->{id},
391                                            row => $a->{row},
392                                    );
393                            },
394    
                 my $n = new WebPAC::Normalize::XML(  
                 #       filter => { 'foo' => sub { shift } },  
                         db => $db,  
                         lookup_regex => $lookup->regex,  
                         lookup => $lookup,  
                         prefix => $input->{name},  
395                  );                  );
396    
397                  my $normalize_path = $input->{normalize}->{path};                  my $lookup_data = WebPAC::Normalize::_get_lookup();
398    
399                  if ($normalize_path =~ m/\.xml$/i) {                  if (defined( $lookup_data->{$database}->{$input_name} )) {
400                          $n->open(                          $log->debug("created following lookups: ", sub { dump( $lookup_data ) } );
401                                  tag => $input->{normalize}->{tag},  
402                                  xml_file => $input->{normalize}->{path},                          foreach my $key (keys %{ $lookup_data->{$database}->{$input_name} }) {
403                          );                                  $store->save_lookup(
404                  } elsif ($normalize_path =~ m/\.(?:yml|yaml)$/i) {                                          database => $database,
405                          $n->open_yaml(                                          input => $input_name,
406                                  path => $normalize_path,                                          key => $key,
407                                  tag => $input->{normalize}->{tag},                                          data => $lookup_data->{$database}->{$input_name}->{$key},
408                                    );
409                            }
410                    }
411    
412                    my $report_fh;
413                    if ($stats || $validate) {
414                            my $path = "out/report/${database}-${input_name}.txt";
415                            open($report_fh, '>', $path) || $log->logdie("can't open $path: $!");
416    
417                            print $report_fh "Report for database '$database' input '$input_name' records ",
418                                    $offset || 1, "-", $limit || $input->{limit} || $maxmfn, "\n\n";
419                            $log->info("Generating report file $path");
420                    }
421    
422                    my $marc;
423                    if ($parser->have_rules( 'marc', $database, $input_name )) {
424                            $marc = new WebPAC::Output::MARC(
425                                    path => "out/marc/${database}-${input_name}.marc",
426                                    lint => $marc_lint,
427                                    dump => $marc_dump,
428                          );                          );
429                  }                  }
430    
431                    my $rules = $parser->normalize_rules($database,$input_name) || $log->logdie("no normalize rules found for $database/$input_name");
432                    $log->debug("parsed normalize rules:\n$rules");
433    
434                    # reset position in database
435                    $input_db->seek(1);
436    
437                    # generate name of config key for indexer (strip everything after -)
438                    my $indexer_config = $use_indexer;
439                    $indexer_config =~ s/^(\w+)-?.*$/$1/g if ($indexer_config);
440    
441                    my $lookup_hash;
442                    my $depends = $parser->depends($database,$input_name);
443            
444                    if ($depends) {
445                            $log->debug("$database/$input_name depends on: ", dump($depends)) if ($depends);
446                            $log->logdie("parser->depends didn't return HASH") unless (ref($depends) eq 'HASH');
447    
448                            foreach my $db (keys %$depends) {
449                                    foreach my $i (keys %{$depends->{$db}}) {
450                                            foreach my $k (keys %{$depends->{$db}->{$i}}) {
451                                                    my $t = time();
452                                                    $log->debug("loading lookup $db/$i");
453                                                    $lookup_hash->{$db}->{$i}->{$k} = $store->load_lookup(
454                                                            database => $db,
455                                                            input => $i,
456                                                            key => $k,
457                                                    );
458                                                    $log->debug(sprintf("lookup $db/$i took %.2fs", time() - $t));
459                                            }
460                                    }
461                            }
462    
463                            $log->debug("lookup_hash = ", sub { dump( $lookup_hash ) });
464                    }
465    
466    
467                  foreach my $pos ( 0 ... $input_db->size ) {                  foreach my $pos ( 0 ... $input_db->size ) {
468    
469                          my $row = $input_db->fetch || next;                          my $row = $input_db->fetch || next;
470    
471                            $total_rows++;
472    
473                          my $mfn = $row->{'000'}->[0];                          my $mfn = $row->{'000'}->[0];
474    
475                          if (! $mfn || $mfn !~ m#^\d+$#) {                          if (! $mfn || $mfn !~ m#^\d+$#) {
# Line 194  while (my ($database, $db_config) = each Line 478  while (my ($database, $db_config) = each
478                                  push @{ $row->{'000'} }, $pos;                                  push @{ $row->{'000'} }, $pos;
479                          }                          }
480    
                         my $ds = $n->data_structure($row);  
481    
482                          $est->add(                          if ($validate) {
483                                  id => $input->{name} . "/" . $mfn,                                  if ( my $errors = $validate->validate_rec( $row, $input_db->dump ) ) {
484                                  ds => $ds,                                          $log->error( "MFN $mfn validation error:\n",
485                                  type => $config->{hyperestraier}->{type},                                                  $validate->report_error( $errors )
486                                            );
487                                    }
488                                    next;   # validation doesn't create any output
489                            }
490    
491                            my $ds = WebPAC::Normalize::data_structure(
492                                    row => $row,
493                                    rules => $rules,
494                                    lookup => $lookup_hash,
495                                    config => create_ds_config( $db_config, $database, $input, $mfn ),
496                                    marc_encoding => 'utf-8',
497                                    load_row_coderef => sub {
498                                            my ($database,$input,$mfn) = @_;
499                                            return $store->load_row(
500                                                    database => $database,
501                                                    input => $input,
502                                                    id => $mfn,
503                                            );
504                                    },
505                          );                          );
506    
507                          $total_rows++;                          $log->debug("ds = ", sub { dump($ds) }) if ($ds);
508    
509                            $store->save_ds(
510                                    database => $database,
511                                    input => $input_name,
512                                    id => $mfn,
513                                    ds => $ds,
514                            ) if ($ds && !$stats);
515    
516                            $indexer->add(
517                                    id => "${input_name}/${mfn}",
518                                    ds => $ds,
519                                    type => $config->get($indexer_config)->{type},
520                            ) if ($indexer && $ds);
521    
522                            if ($marc) {
523                                    my $i = 0;
524    
525                                    while (my $fields = WebPAC::Normalize::_get_marc_fields( fetch_next => 1 ) ) {
526                                            $marc->add(
527                                                    id => $mfn . ( $i ? "/$i" : '' ),
528                                                    fields => $fields,
529                                                    leader => WebPAC::Normalize::marc_leader(),
530                                                    row => $row,
531                                            );
532                                            $i++;
533                                    }
534    
535                                    $log->info("Created $i instances of MFN $mfn\n") if ($i > 1);
536                            }
537                    }
538    
539                    if ($validate) {
540                            my $errors = $validate->report;
541                            if ($errors) {
542                                    $log->info("validation errors:\n$errors\n" );
543                                    print $report_fh "$errors\n" if ($report_fh);
544                            }
545                    }
546    
547                    if ($stats) {
548                            my $s = $input_db->stats;
549                            $log->info("statistics of fields usage:\n$s");
550                            print $report_fh "Statistics of fields usage:\n$s" if ($report_fh);
551                  }                  }
552    
553          };                  # close MARC file
554                    $marc->finish if ($marc);
555    
556                    # close report
557                    close($report_fh) if ($report_fh)
558    
559            }
560    
561            eval { $indexer->finish } if ($indexer && $indexer->can('finish'));
562    
563          my $dt = time() - $start_t;          my $dt = time() - $start_t;
564          $log->info("$total_rows records indexed in " .          $log->info("$total_rows records ", $indexer ? "indexed " : "",
565                  sprintf("%.2f sec [%.2f rec/sec]",                  sprintf("in %.2f sec [%.2f rec/sec]",
566                          $dt, ($total_rows / $dt)                          $dt, ($total_rows / $dt)
567                  )                  )
568          );          );
569    
570          #  
571          # add Hyper Estraier links to other databases          # end forked process
572          #          if ($parallel) {
573          if (ref($db_config->{links}) eq 'ARRAY') {                  $log->info("parallel process $$ finished");
574                  foreach my $link (@{ $db_config->{links} }) {                  exit(0);
                         $log->info("adding link $database -> $link->{to} [$link->{credit}]");  
                         $est->add_link(  
                                 from => $database,  
                                 to => $link->{to},  
                                 credit => $link->{credit},  
                         );  
                 }  
575          }          }
576    
577  }  }
578    
579    if ($parallel) {
580            # wait all children to finish
581            sleep(1) while wait != -1;
582            $log->info("all parallel processes finished");
583    }
584    
585    #
586    # handle links or merge after indexing
587    #
588    
589    if ($merge) {
590            print $estcmd_fh 'sudo /etc/init.d/hyperestraier start',$/;
591            close($estcmd_fh);
592            chmod 0700, $estcmd_path || $log->warn("can't chmod 0700 $estcmd_path: $!");
593            system $estcmd_path;
594    } else {
595            foreach my $link (@links) {
596                    $log->logdie("coderef in link ", Dumper($link), " is ", ref($link), " and not CODE") unless (ref($link) eq 'CODE');
597                    $link->();
598            }
599    }

Legend:
Removed from v.389  
changed lines
  Added in v.769

  ViewVC Help
Powered by ViewVC 1.1.26