/[webpac2]/trunk/run.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/run.pl

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 493 by dpavlin, Sun May 14 13:42:48 2006 UTC revision 611 by dpavlin, Wed Aug 23 10:04:08 2006 UTC
# Line 4  use strict; Line 4  use strict;
4    
5  use Cwd qw/abs_path/;  use Cwd qw/abs_path/;
6  use File::Temp qw/tempdir/;  use File::Temp qw/tempdir/;
 use Data::Dumper;  
7  use lib './lib';  use lib './lib';
8    
9  use WebPAC::Common 0.02;  use WebPAC::Common 0.02;
10  use WebPAC::Lookup;  use WebPAC::Lookup 0.03;
11  use WebPAC::Input 0.03;  use WebPAC::Input 0.07;
12  use WebPAC::Store 0.03;  use WebPAC::Store 0.03;
13  use WebPAC::Normalize::XML;  use WebPAC::Normalize 0.11;
 use WebPAC::Normalize::Set;  
14  use WebPAC::Output::TT;  use WebPAC::Output::TT;
15    use WebPAC::Validate;
16    use WebPAC::Output::MARC;
17  use YAML qw/LoadFile/;  use YAML qw/LoadFile/;
18  use Getopt::Long;  use Getopt::Long;
19  use File::Path;  use File::Path;
20  use Time::HiRes qw/time/;  use Time::HiRes qw/time/;
21  use File::Slurp;  use File::Slurp;
22    use Data::Dump qw/dump/;
23    use Storable qw/dclone/;
24    
25    use Proc::Queue size => 1;
26    use POSIX ":sys_wait_h"; # imports WNOHANG
27    
28  =head1 NAME  =head1 NAME
29    
# Line 42  limit loading to 100 records Line 47  limit loading to 100 records
47    
48  remove database and Hyper Estraier index before indexing  remove database and Hyper Estraier index before indexing
49    
50  =item --only=database_name  =item --only=database_name/input_filter
51    
52  reindex just single database (legacy name is --one)  reindex just single database (legacy name is --one)
53    
54    C</input_filter> is optional part which can be C<name>
55    or C<type> from input
56    
57  =item --config conf/config.yml  =item --config conf/config.yml
58    
59  path to YAML configuration file  path to YAML configuration file
60    
61  =item --force-set  =item --stats
62    
63    disable indexing and dump statistics about field and subfield
64    usage for each input
65    
66    =item --validate path/to/validation_file
67    
68    turn on extra validation of imput records, see L<WebPAC::Validation>
69    
70    =item --marc-normalize conf/normalize/mapping.pl
71    
72    This option specifies normalisation file for MARC creation
73    
74    =item --marc-output out/marc/test.marc
75    
76    Optional path to output file
77    
78    =item --marc-lint
79    
80    By default turned on if C<--marc-normalize> is used. You can disable lint
81    messages with C<--no-marc-lint>.
82    
83    =item --marc-dump
84    
85  force conversion C<normalize->path> in C<config.yml> from  Force dump or input and marc record for debugging.
86  C<.xml> to C<.pl>  
87    =item --parallel 4
88    
89    Run databases in parallel (aproximatly same as number of processors in
90    machine if you want to use full load)
91    
92    =item --only-links
93    
94    Create just links
95    
96    =item --merge
97    
98    Create merged index of databases which have links
99    
100  =back  =back
101    
# Line 65  my $limit; Line 107  my $limit;
107  my $clean = 0;  my $clean = 0;
108  my $config = 'conf/config.yml';  my $config = 'conf/config.yml';
109  my $debug = 0;  my $debug = 0;
110  my $only_db_name;  my $only_filter;
111  my $force_set = 0;  my $stats = 0;
112    my $validate_path;
113    my ($marc_normalize, $marc_output);
114    my $marc_lint = 1;
115    my $marc_dump = 0;
116    my $parallel = 0;
117    my $only_links = 0;
118    my $merge = 0;
119    
120    my $log = _new WebPAC::Common()->_get_logger();
121    
122    my $hostname = `hostname`;
123    chomp($hostname);
124    $hostname =~ s/\..+$//;
125    if (-e "conf/$hostname.yml") {
126            $config = "conf/$hostname.yml";
127            $log->info("using host configuration file: $config");
128    }
129    
130  GetOptions(  GetOptions(
131          "limit=i" => \$limit,          "limit=i" => \$limit,
132          "offset=i" => \$offset,          "offset=i" => \$offset,
133          "clean" => \$clean,          "clean" => \$clean,
134          "one=s" => \$only_db_name,          "one=s" => \$only_filter,
135          "only=s" => \$only_db_name,          "only=s" => \$only_filter,
136          "config" => \$config,          "config" => \$config,
137          "debug" => \$debug,          "debug+" => \$debug,
138          "force-set" => \$force_set,          "stats" => \$stats,
139            "validate=s" => \$validate_path,
140            "marc-normalize=s" => \$marc_normalize,
141            "marc-output=s" => \$marc_output,
142            "marc-lint!" => \$marc_lint,
143            "marc-dump!" => \$marc_dump,
144            "parallel=i" => \$parallel,
145            "only-links!" => \$only_links,
146            "merge" => \$merge,
147  );  );
148    
149  $config = LoadFile($config);  $config = LoadFile($config);
150    
151  print "config = ",Dumper($config) if ($debug);  #print "config = ",dump($config) if ($debug);
152    
153  die "no databases in config file!\n" unless ($config->{databases});  die "no databases in config file!\n" unless ($config->{databases});
154    
155  my $log = _new WebPAC::Common()->_get_logger();  $log->info( "-" x 79 );
156    
157    
158    my $estcmd_fh;
159    my $estcmd_path = './estcmd-merge.sh';
160    if ($merge) {
161            open($estcmd_fh, '>', $estcmd_path) || $log->logdie("can't open $estcmd_path: $!");
162            print $estcmd_fh 'cd /data/estraier/_node/ || exit 1',$/;
163            print $estcmd_fh 'sudo /etc/init.d/hyperestraier stop',$/;
164            $log->info("created merge batch file $estcmd_path");
165    }
166    
167    
168    my $validate;
169    $validate = new WebPAC::Validate(
170            path => $validate_path,
171    ) if ($validate_path);
172    
173    
174  my $use_indexer = $config->{use_indexer} || 'hyperestraier';  my $use_indexer = $config->{use_indexer} || 'hyperestraier';
175  $log->info("using $use_indexer indexing engine...");  if ($stats) {
176            $log->debug("option --stats disables update of indexing engine...");
177            $use_indexer = undef;
178    } else {
179            $log->info("using $use_indexer indexing engine...");
180    }
181    
182    # disable indexing when creating marc
183    $use_indexer = undef if ($marc_normalize);
184    
185  my $total_rows = 0;  my $total_rows = 0;
186  my $start_t = time();  my $start_t = time();
187    
188    my @links;
189    
190    if ($parallel) {
191            $log->info("Using $parallel processes for speedup");
192            Proc::Queue::size($parallel);
193    }
194    
195  while (my ($database, $db_config) = each %{ $config->{databases} }) {  while (my ($database, $db_config) = each %{ $config->{databases} }) {
196    
197          next if ($only_db_name && $database !~ m/$only_db_name/i);          my ($only_database,$only_input) = split(m#/#, $only_filter) if ($only_filter);
198            next if ($only_database && $database !~ m/$only_database/i);
199    
200            if ($parallel) {
201                    my $f=fork;
202                    if(defined ($f) and $f==0) {
203                            $log->info("Created processes $$ for speedup");
204                    } else {
205                            next;
206                    }
207            }
208    
209          my $indexer;          my $indexer;
210            if ($use_indexer) {
211                    my $indexer_config = $config->{$use_indexer} || $log->logdie("can't find '$use_indexer' part in confguration");
212                    $indexer_config->{database} = $database;
213                    $indexer_config->{clean} = $clean;
214                    $indexer_config->{label} = $db_config->{name};
215    
216                    # force clean if database has links
217                    $indexer_config->{clean} = 1 if ($db_config->{links});
218    
219                    if ($use_indexer eq 'hyperestraier') {
220    
221                            # open Hyper Estraier database
222                            use WebPAC::Output::Estraier '0.10';
223                            $indexer = new WebPAC::Output::Estraier( %{ $indexer_config } );
224                    
225                    } elsif ($use_indexer eq 'kinosearch') {
226    
227                            # open KinoSearch
228                            use WebPAC::Output::KinoSearch;
229                            $indexer_config->{clean} = 1 unless (-e $indexer_config->{index_path});
230                            $indexer = new WebPAC::Output::KinoSearch( %{ $indexer_config } );
231    
232          my $indexer_config = $config->{$use_indexer} || $log->logdie("can't find '$use_indexer' part in confguration");                  } else {
233          $indexer_config->{database} = $database;                          $log->logdie("unknown use_indexer: $use_indexer");
234          $indexer_config->{clean} = $clean;                  }
         $indexer_config->{label} = $db_config->{name};  
   
         # important: clean database just once!  
         $clean = 0;  
   
         if ($use_indexer eq 'hyperestraier') {  
   
                 # open Hyper Estraier database  
                 use WebPAC::Output::Estraier '0.10';  
                 $indexer = new WebPAC::Output::Estraier( %{ $indexer_config } );  
           
         } elsif ($use_indexer eq 'kinosearch') {  
   
                 # open KinoSearch  
                 use WebPAC::Output::KinoSearch;  
                 $indexer_config->{clean} = 1 unless (-e $indexer_config->{index_path});  
                 $indexer = new WebPAC::Output::KinoSearch( %{ $indexer_config } );  
235    
236          } else {                  $log->logide("can't continue without valid indexer") unless ($indexer);
237                  $log->logdie("unknown use_indexer: $use_indexer");          }
238    
239    
240            #
241            # store Hyper Estraier links to other databases
242            #
243            if (ref($db_config->{links}) eq 'ARRAY' && $use_indexer) {
244                    foreach my $link (@{ $db_config->{links} }) {
245                            if ($use_indexer eq 'hyperestraier') {
246                                    if ($merge) {
247                                            print $estcmd_fh 'sudo -u www-data estcmd merge ' . $database . ' ' . $link->{to},$/;
248                                    } else {
249                                            $log->info("saving link $database -> $link->{to} [$link->{credit}]");
250                                            push @links, sub {
251                                                    $log->info("adding link $database -> $link->{to} [$link->{credit}]");
252                                                    $indexer->add_link(
253                                                            from => $database,
254                                                            to => $link->{to},
255                                                            credit => $link->{credit},
256                                                    );
257                                            };
258                                    }
259                            } else {
260                                    $log->warn("NOT IMPLEMENTED WITH $use_indexer: adding link $database -> $link->{to} [$link->{credit}]");
261                            }
262                    }
263          }          }
264            next if ($only_links);
265    
         $log->logide("can't continue without valid indexer") unless ($indexer);  
266    
267          #          #
268          # now WebPAC::Store          # now WebPAC::Store
# Line 135  while (my ($database, $db_config) = each Line 273  while (my ($database, $db_config) = each
273          my $db_path = $config->{webpac}->{db_path} . '/' . $database;          my $db_path = $config->{webpac}->{db_path} . '/' . $database;
274    
275          if ($clean) {          if ($clean) {
276                  $log->info("creating new database $database in $db_path");                  $log->info("creating new database '$database' in $db_path");
277                  rmtree( $db_path ) || $log->warn("can't remove $db_path: $!");                  rmtree( $db_path ) || $log->warn("can't remove $db_path: $!");
278          } else {          } else {
279                  $log->debug("working on $database in $db_path");                  $log->info("working on database '$database' in $db_path");
280          }          }
281    
282          my $db = new WebPAC::Store(          my $db = new WebPAC::Store(
# Line 165  while (my ($database, $db_config) = each Line 303  while (my ($database, $db_config) = each
303    
304          foreach my $input (@inputs) {          foreach my $input (@inputs) {
305    
306                    next if ($only_input && ($input->{name} !~ m#$only_input#i && $input->{type} !~ m#$only_input#i));
307    
308                  my $type = lc($input->{type});                  my $type = lc($input->{type});
309    
310                  die "I know only how to handle input types ", join(",", @supported_inputs), " not '$type'!\n" unless (grep(/$type/, @supported_inputs));                  die "I know only how to handle input types ", join(",", @supported_inputs), " not '$type'!\n" unless (grep(/$type/, @supported_inputs));
311    
312                  my $lookup = new WebPAC::Lookup(                  my $lookup;
313                          lookup_file => $input->{lookup},                  if ($input->{lookup}) {
314                  );                          $lookup = new WebPAC::Lookup(
315                                    lookup_file => $input->{lookup},
316                            );
317                            delete( $input->{lookup} );
318                    }
319    
320                  my $input_module = $config->{webpac}->{inputs}->{$type};                  my $input_module = $config->{webpac}->{inputs}->{$type};
321    
322                  $log->info("working on input '$input->{path}' [$input->{type}] using $input_module lookup '$input->{lookup}'");                  $log->info("working on input '$input->{name}' in $input->{path} [type: $input->{type}] using $input_module",
323                            $input->{lookup} ? "lookup '$input->{lookup}'" : ""
324                    );
325    
326                  my $input_db = new WebPAC::Input(                  my $input_db = new WebPAC::Input(
327                          module => $input_module,                          module => $input_module,
328                          code_page => $config->{webpac}->{webpac_encoding},                          encoding => $config->{webpac}->{webpac_encoding},
329                          limit => $limit || $input->{limit},                          limit => $limit || $input->{limit},
330                          offset => $offset,                          offset => $offset,
331                          lookup => $lookup,                          lookup_coderef => sub {
332                                    my $rec = shift || return;
333                                    $lookup->add( $rec );
334                            },
335                          recode => $input->{recode},                          recode => $input->{recode},
336                            stats => $stats,
337                            modify_records => $input->{modify_records},
338                  );                  );
339                  $log->logdie("can't create input using $input_module") unless ($input);                  $log->logdie("can't create input using $input_module") unless ($input);
340    
341                  my $maxmfn = $input_db->open(                  my $maxmfn = $input_db->open(
342                          path => $input->{path},                          path => $input->{path},
343                          code_page => $input->{encoding},        # database encoding                          code_page => $input->{encoding},        # database encoding
344                            %{ $input },
345                  );                  );
346    
347                  my $n = new WebPAC::Normalize::XML(                  my @norm_array = ref($input->{normalize}) eq 'ARRAY' ?
348                  #       filter => { 'foo' => sub { shift } },                          @{ $input->{normalize} } : ( $input->{normalize} );
                         db => $db,  
                         lookup_regex => $lookup->regex,  
                         lookup => $lookup,  
                         prefix => $input->{name},  
                 );  
   
                 my $rules;  
                 my $normalize_path = $input->{normalize}->{path};  
349    
350                  if ($force_set) {                  if ($marc_normalize) {
351                          my $new_norm_path = $normalize_path;                          @norm_array = ( {
352                          $new_norm_path =~ s/\.xml$/.pl/;                                  path => $marc_normalize,
353                          if (-e $new_norm_path) {                                  output => $marc_output || 'out/marc/' . $database . '-' . $input->{name} . '.marc',
354                                  $log->debug("--force-set replaced $normalize_path with $new_norm_path");                          } );
                                 $normalize_path = $new_norm_path;  
                         } else {  
                                 $log->debug("--force-set failed on $new_norm_path, fallback to $normalize_path");  
                         }  
355                  }                  }
356    
357                  if ($normalize_path =~ m/\.xml$/i) {                  foreach my $normalize (@norm_array) {
                         $n->open(  
                                 tag => $input->{normalize}->{tag},  
                                 xml_file => $normalize_path,  
                         );  
                 } elsif ($normalize_path =~ m/\.(?:yml|yaml)$/i) {  
                         $n->open_yaml(  
                                 path => $normalize_path,  
                                 tag => $input->{normalize}->{tag},  
                         );  
                 } elsif ($normalize_path =~ m/\.(?:pl)$/i) {  
                         $n = undef;  
                         $log->info("using WebPAC::Normalize::Set to process $normalize_path");  
                         $rules = read_file( $normalize_path ) or die "can't open $normalize_path: $!";  
                 }  
358    
359                  foreach my $pos ( 0 ... $input_db->size ) {                          my $normalize_path = $normalize->{path} || $log->logdie("can't find normalize path in config");
360    
361                          my $row = $input_db->fetch || next;                          $log->logdie("Found '$normalize_path' as normalization file which isn't supported any more!") unless ( $normalize_path =~ m!\.pl$!i );
362    
363                          my $mfn = $row->{'000'}->[0];                          my $rules = read_file( $normalize_path ) or die "can't open $normalize_path: $!";
364    
365                          if (! $mfn || $mfn !~ m#^\d+$#) {                          $log->info("Using $normalize_path for normalization...");
366                                  $log->warn("record $pos doesn't have valid MFN but '$mfn', using $pos");  
367                                  $mfn = $pos;                          my $marc = new WebPAC::Output::MARC(
368                                  push @{ $row->{'000'} }, $pos;                                  path => $normalize->{output},
369                          }                                  lint => $marc_lint,
370                                    dump => $marc_dump,
371                            ) if ($normalize->{output});
372    
373                            # reset position in database
374                            $input_db->seek(1);
375    
376                            foreach my $pos ( 0 ... $input_db->size ) {
377    
378                                    my $row = $input_db->fetch || next;
379    
380                                    my $mfn = $row->{'000'}->[0];
381    
382                                    if (! $mfn || $mfn !~ m#^\d+$#) {
383                                            $log->warn("record $pos doesn't have valid MFN but '$mfn', using $pos");
384                                            $mfn = $pos;
385                                            push @{ $row->{'000'} }, $pos;
386                                    }
387    
388    
389                                    if ($validate) {
390                                            my @errors = $validate->validate_errors( $row );
391                                            $log->error( "MFN $mfn validation errors:\n", join("\n", @errors) ) if (@errors);
392                                    }
393    
394                                    my $ds_config = dclone($db_config);
395    
396                          my $ds = $n ? $n->data_structure($row) :                                  # default values -> database key
397                                  WebPAC::Normalize::Set::data_structure(                                  $ds_config->{_} = $database;
398    
399                                    # current mfn
400                                    $ds_config->{_mfn} = $mfn;
401    
402                                    # attach current input
403                                    $ds_config->{input} = $input;
404    
405                                    my $ds = WebPAC::Normalize::data_structure(
406                                          row => $row,                                          row => $row,
407                                          rules => $rules,                                          rules => $rules,
408                                          lookup => $lookup->lookup_hash,                                          lookup => $lookup ? $lookup->lookup_hash : undef,
409                                            config => $ds_config,
410                                            marc_encoding => 'utf-8',
411                                  );                                  );
412    
413                          $indexer->add(                                  $db->save_ds(
414                                  id => $input->{name} . "/" . $mfn,                                          id => $mfn,
415                                  ds => $ds,                                          ds => $ds,
416                                  type => $config->{$use_indexer}->{type},                                          prefix => $input->{name},
417                          );                                  ) if ($ds && !$stats);
418    
419                                    $indexer->add(
420                                            id => $input->{name} . "/" . $mfn,
421                                            ds => $ds,
422                                            type => $config->{$use_indexer}->{type},
423                                    ) if ($indexer && $ds);
424    
425                                    if ($marc) {
426                                            my $i = 0;
427    
428                                            while (my $fields = WebPAC::Normalize::_get_marc_fields( fetch_next => 1 ) ) {
429                                                    $marc->add(
430                                                            id => $mfn . ( $i ? "/$i" : '' ),
431                                                            fields => $fields,
432                                                            leader => WebPAC::Normalize::marc_leader(),
433                                                            row => $row,
434                                                    );
435                                                    $i++;
436                                            }
437    
438                                            $log->info("Created $i instances of MFN $mfn\n") if ($i > 1);
439                                    }
440    
441                                    $total_rows++;
442                            }
443    
444                            $log->info("statistics of fields usage:\n", $input_db->stats) if ($stats);
445    
446                            # close MARC file
447                            $marc->finish if ($marc);
448    
                         $total_rows++;  
449                  }                  }
450    
451          };          }
452    
453          eval { $indexer->finish } if ($indexer->can('finish'));          eval { $indexer->finish } if ($indexer && $indexer->can('finish'));
454    
455          my $dt = time() - $start_t;          my $dt = time() - $start_t;
456          $log->info("$total_rows records indexed in " .          $log->info("$total_rows records ", $indexer ? "indexed " : "",
457                  sprintf("%.2f sec [%.2f rec/sec]",                  sprintf("in %.2f sec [%.2f rec/sec]",
458                          $dt, ($total_rows / $dt)                          $dt, ($total_rows / $dt)
459                  )                  )
460          );          );
461    
462          #  
463          # add Hyper Estraier links to other databases          # end forked process
464          #          if ($parallel) {
465          if (ref($db_config->{links}) eq 'ARRAY') {                  $log->info("parallel process $$ finished");
466                  foreach my $link (@{ $db_config->{links} }) {                  exit(0);
                         if ($use_indexer eq 'hyperestraier') {  
                                 $log->info("adding link $database -> $link->{to} [$link->{credit}]");  
                                 $indexer->add_link(  
                                         from => $database,  
                                         to => $link->{to},  
                                         credit => $link->{credit},  
                                 );  
                         } else {  
                                 $log->warn("NOT IMPLEMENTED WITH $use_indexer: adding link $database -> $link->{to} [$link->{credit}]");  
                         }  
                 }  
467          }          }
468    
469  }  }
470    
471    if ($parallel) {
472            # wait all children to finish
473            sleep(1) while wait != -1;
474            $log->info("all parallel processes finished");
475    }
476    
477    #
478    # handle links or merge after indexing
479    #
480    
481    if ($merge) {
482            print $estcmd_fh 'sudo /etc/init.d/hyperestraier start',$/;
483            close($estcmd_fh);
484            chmod 0700, $estcmd_path || $log->warn("can't chmod 0700 $estcmd_path: $!");
485            system $estcmd_path;
486    } else {
487            foreach my $link (@links) {
488                    $log->logdie("coderef in link ", Dumper($link), " is ", ref($link), " and not CODE") unless (ref($link) eq 'CODE');
489                    $link->();
490            }
491    }

Legend:
Removed from v.493  
changed lines
  Added in v.611

  ViewVC Help
Powered by ViewVC 1.1.26