/[webpac2]/trunk/run.pl
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/run.pl

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 493 by dpavlin, Sun May 14 13:42:48 2006 UTC revision 608 by dpavlin, Tue Aug 1 16:47:40 2006 UTC
# Line 4  use strict; Line 4  use strict;
4    
5  use Cwd qw/abs_path/;  use Cwd qw/abs_path/;
6  use File::Temp qw/tempdir/;  use File::Temp qw/tempdir/;
 use Data::Dumper;  
7  use lib './lib';  use lib './lib';
8    
9  use WebPAC::Common 0.02;  use WebPAC::Common 0.02;
10  use WebPAC::Lookup;  use WebPAC::Lookup 0.03;
11  use WebPAC::Input 0.03;  use WebPAC::Input 0.07;
12  use WebPAC::Store 0.03;  use WebPAC::Store 0.03;
13  use WebPAC::Normalize::XML;  use WebPAC::Normalize 0.11;
 use WebPAC::Normalize::Set;  
14  use WebPAC::Output::TT;  use WebPAC::Output::TT;
15    use WebPAC::Validate;
16    use WebPAC::Output::MARC;
17  use YAML qw/LoadFile/;  use YAML qw/LoadFile/;
18  use Getopt::Long;  use Getopt::Long;
19  use File::Path;  use File::Path;
20  use Time::HiRes qw/time/;  use Time::HiRes qw/time/;
21  use File::Slurp;  use File::Slurp;
22    use Data::Dump qw/dump/;
23    use Storable qw/dclone/;
24    
25    use Proc::Queue size => 1;
26    use POSIX ":sys_wait_h"; # imports WNOHANG
27    
28  =head1 NAME  =head1 NAME
29    
# Line 42  limit loading to 100 records Line 47  limit loading to 100 records
47    
48  remove database and Hyper Estraier index before indexing  remove database and Hyper Estraier index before indexing
49    
50  =item --only=database_name  =item --only=database_name/input_filter
51    
52  reindex just single database (legacy name is --one)  reindex just single database (legacy name is --one)
53    
54    C</input_filter> is optional part which can be C<name>
55    or C<type> from input
56    
57  =item --config conf/config.yml  =item --config conf/config.yml
58    
59  path to YAML configuration file  path to YAML configuration file
60    
61  =item --force-set  =item --stats
62    
63    disable indexing and dump statistics about field and subfield
64    usage for each input
65    
66    =item --validate path/to/validation_file
67    
68    turn on extra validation of imput records, see L<WebPAC::Validation>
69    
70    =item --marc-normalize conf/normalize/mapping.pl
71    
72    This option specifies normalisation file for MARC creation
73    
74    =item --marc-output out/marc/test.marc
75    
76    Optional path to output file
77    
78    =item --marc-lint
79    
80    By default turned on if C<--marc-normalize> is used. You can disable lint
81    messages with C<--no-marc-lint>.
82    
83    =item --marc-dump
84    
85  force conversion C<normalize->path> in C<config.yml> from  Force dump or input and marc record for debugging.
86  C<.xml> to C<.pl>  
87    =item --parallel 4
88    
89    Run databases in parallel (aproximatly same as number of processors in
90    machine if you want to use full load)
91    
92    =item --only-links
93    
94    Create just links
95    
96    =item --merge
97    
98    Create merged index of databases which have links
99    
100  =back  =back
101    
# Line 65  my $limit; Line 107  my $limit;
107  my $clean = 0;  my $clean = 0;
108  my $config = 'conf/config.yml';  my $config = 'conf/config.yml';
109  my $debug = 0;  my $debug = 0;
110  my $only_db_name;  my $only_filter;
111  my $force_set = 0;  my $stats = 0;
112    my $validate_path;
113    my ($marc_normalize, $marc_output);
114    my $marc_lint = 1;
115    my $marc_dump = 0;
116    my $parallel = 0;
117    my $only_links = 0;
118    my $merge = 0;
119    
120  GetOptions(  GetOptions(
121          "limit=i" => \$limit,          "limit=i" => \$limit,
122          "offset=i" => \$offset,          "offset=i" => \$offset,
123          "clean" => \$clean,          "clean" => \$clean,
124          "one=s" => \$only_db_name,          "one=s" => \$only_filter,
125          "only=s" => \$only_db_name,          "only=s" => \$only_filter,
126          "config" => \$config,          "config" => \$config,
127          "debug" => \$debug,          "debug+" => \$debug,
128          "force-set" => \$force_set,          "stats" => \$stats,
129            "validate=s" => \$validate_path,
130            "marc-normalize=s" => \$marc_normalize,
131            "marc-output=s" => \$marc_output,
132            "marc-lint!" => \$marc_lint,
133            "marc-dump!" => \$marc_dump,
134            "parallel=i" => \$parallel,
135            "only-links!" => \$only_links,
136            "merge" => \$merge,
137  );  );
138    
139  $config = LoadFile($config);  $config = LoadFile($config);
140    
141  print "config = ",Dumper($config) if ($debug);  print "config = ",dump($config) if ($debug);
142    
143  die "no databases in config file!\n" unless ($config->{databases});  die "no databases in config file!\n" unless ($config->{databases});
144    
145  my $log = _new WebPAC::Common()->_get_logger();  my $log = _new WebPAC::Common()->_get_logger();
146    $log->info( "-" x 79 );
147    
148    
149    my $estcmd_fh;
150    my $estcmd_path = './estcmd-merge.sh';
151    if ($merge) {
152            open($estcmd_fh, '>', $estcmd_path) || $log->logdie("can't open $estcmd_path: $!");
153            print $estcmd_fh 'cd /data/estraier/_node/ || exit 1',$/;
154            print $estcmd_fh 'sudo /etc/init.d/hyperestraier stop',$/;
155            $log->info("created merge batch file $estcmd_path");
156    }
157    
158    
159    my $validate;
160    $validate = new WebPAC::Validate(
161            path => $validate_path,
162    ) if ($validate_path);
163    
164    
165  my $use_indexer = $config->{use_indexer} || 'hyperestraier';  my $use_indexer = $config->{use_indexer} || 'hyperestraier';
166  $log->info("using $use_indexer indexing engine...");  if ($stats) {
167            $log->debug("option --stats disables update of indexing engine...");
168            $use_indexer = undef;
169    } else {
170            $log->info("using $use_indexer indexing engine...");
171    }
172    
173    # disable indexing when creating marc
174    $use_indexer = undef if ($marc_normalize);
175    
176  my $total_rows = 0;  my $total_rows = 0;
177  my $start_t = time();  my $start_t = time();
178    
179    my @links;
180    
181    if ($parallel) {
182            $log->info("Using $parallel processes for speedup");
183            Proc::Queue::size($parallel);
184    }
185    
186  while (my ($database, $db_config) = each %{ $config->{databases} }) {  while (my ($database, $db_config) = each %{ $config->{databases} }) {
187    
188          next if ($only_db_name && $database !~ m/$only_db_name/i);          my ($only_database,$only_input) = split(m#/#, $only_filter) if ($only_filter);
189            next if ($only_database && $database !~ m/$only_database/i);
190    
191            if ($parallel) {
192                    my $f=fork;
193                    if(defined ($f) and $f==0) {
194                            $log->info("Created processes $$ for speedup");
195                    } else {
196                            next;
197                    }
198            }
199    
200          my $indexer;          my $indexer;
201            if ($use_indexer) {
202                    my $indexer_config = $config->{$use_indexer} || $log->logdie("can't find '$use_indexer' part in confguration");
203                    $indexer_config->{database} = $database;
204                    $indexer_config->{clean} = $clean;
205                    $indexer_config->{label} = $db_config->{name};
206    
207                    # force clean if database has links
208                    $indexer_config->{clean} = 1 if ($db_config->{links});
209    
210                    if ($use_indexer eq 'hyperestraier') {
211    
212                            # open Hyper Estraier database
213                            use WebPAC::Output::Estraier '0.10';
214                            $indexer = new WebPAC::Output::Estraier( %{ $indexer_config } );
215                    
216                    } elsif ($use_indexer eq 'kinosearch') {
217    
218                            # open KinoSearch
219                            use WebPAC::Output::KinoSearch;
220                            $indexer_config->{clean} = 1 unless (-e $indexer_config->{index_path});
221                            $indexer = new WebPAC::Output::KinoSearch( %{ $indexer_config } );
222    
223          my $indexer_config = $config->{$use_indexer} || $log->logdie("can't find '$use_indexer' part in confguration");                  } else {
224          $indexer_config->{database} = $database;                          $log->logdie("unknown use_indexer: $use_indexer");
225          $indexer_config->{clean} = $clean;                  }
         $indexer_config->{label} = $db_config->{name};  
   
         # important: clean database just once!  
         $clean = 0;  
   
         if ($use_indexer eq 'hyperestraier') {  
   
                 # open Hyper Estraier database  
                 use WebPAC::Output::Estraier '0.10';  
                 $indexer = new WebPAC::Output::Estraier( %{ $indexer_config } );  
           
         } elsif ($use_indexer eq 'kinosearch') {  
   
                 # open KinoSearch  
                 use WebPAC::Output::KinoSearch;  
                 $indexer_config->{clean} = 1 unless (-e $indexer_config->{index_path});  
                 $indexer = new WebPAC::Output::KinoSearch( %{ $indexer_config } );  
226    
227          } else {                  $log->logide("can't continue without valid indexer") unless ($indexer);
                 $log->logdie("unknown use_indexer: $use_indexer");  
228          }          }
229    
230          $log->logide("can't continue without valid indexer") unless ($indexer);  
231            #
232            # store Hyper Estraier links to other databases
233            #
234            if (ref($db_config->{links}) eq 'ARRAY' && $use_indexer) {
235                    foreach my $link (@{ $db_config->{links} }) {
236                            if ($use_indexer eq 'hyperestraier') {
237                                    if ($merge) {
238                                            print $estcmd_fh 'sudo -u www-data estcmd merge ' . $database . ' ' . $link->{to},$/;
239                                    } else {
240                                            $log->info("saving link $database -> $link->{to} [$link->{credit}]");
241                                            push @links, sub {
242                                                    $log->info("adding link $database -> $link->{to} [$link->{credit}]");
243                                                    $indexer->add_link(
244                                                            from => $database,
245                                                            to => $link->{to},
246                                                            credit => $link->{credit},
247                                                    );
248                                            };
249                                    }
250                            } else {
251                                    $log->warn("NOT IMPLEMENTED WITH $use_indexer: adding link $database -> $link->{to} [$link->{credit}]");
252                            }
253                    }
254            }
255            next if ($only_links);
256    
257    
258          #          #
259          # now WebPAC::Store          # now WebPAC::Store
# Line 135  while (my ($database, $db_config) = each Line 264  while (my ($database, $db_config) = each
264          my $db_path = $config->{webpac}->{db_path} . '/' . $database;          my $db_path = $config->{webpac}->{db_path} . '/' . $database;
265    
266          if ($clean) {          if ($clean) {
267                  $log->info("creating new database $database in $db_path");                  $log->info("creating new database '$database' in $db_path");
268                  rmtree( $db_path ) || $log->warn("can't remove $db_path: $!");                  rmtree( $db_path ) || $log->warn("can't remove $db_path: $!");
269          } else {          } else {
270                  $log->debug("working on $database in $db_path");                  $log->info("working on database '$database' in $db_path");
271          }          }
272    
273          my $db = new WebPAC::Store(          my $db = new WebPAC::Store(
# Line 165  while (my ($database, $db_config) = each Line 294  while (my ($database, $db_config) = each
294    
295          foreach my $input (@inputs) {          foreach my $input (@inputs) {
296    
297                    next if ($only_input && ($input->{name} !~ m#$only_input#i && $input->{type} !~ m#$only_input#i));
298    
299                  my $type = lc($input->{type});                  my $type = lc($input->{type});
300    
301                  die "I know only how to handle input types ", join(",", @supported_inputs), " not '$type'!\n" unless (grep(/$type/, @supported_inputs));                  die "I know only how to handle input types ", join(",", @supported_inputs), " not '$type'!\n" unless (grep(/$type/, @supported_inputs));
302    
303                  my $lookup = new WebPAC::Lookup(                  my $lookup;
304                          lookup_file => $input->{lookup},                  if ($input->{lookup}) {
305                  );                          $lookup = new WebPAC::Lookup(
306                                    lookup_file => $input->{lookup},
307                            );
308                            delete( $input->{lookup} );
309                    }
310    
311                  my $input_module = $config->{webpac}->{inputs}->{$type};                  my $input_module = $config->{webpac}->{inputs}->{$type};
312    
313                  $log->info("working on input '$input->{path}' [$input->{type}] using $input_module lookup '$input->{lookup}'");                  $log->info("working on input '$input->{name}' in $input->{path} [type: $input->{type}] using $input_module",
314                            $input->{lookup} ? "lookup '$input->{lookup}'" : ""
315                    );
316    
317                  my $input_db = new WebPAC::Input(                  my $input_db = new WebPAC::Input(
318                          module => $input_module,                          module => $input_module,
319                          code_page => $config->{webpac}->{webpac_encoding},                          encoding => $config->{webpac}->{webpac_encoding},
320                          limit => $limit || $input->{limit},                          limit => $limit || $input->{limit},
321                          offset => $offset,                          offset => $offset,
322                          lookup => $lookup,                          lookup_coderef => sub {
323                                    my $rec = shift || return;
324                                    $lookup->add( $rec );
325                            },
326                          recode => $input->{recode},                          recode => $input->{recode},
327                            stats => $stats,
328                            modify_records => $input->{modify_records},
329                  );                  );
330                  $log->logdie("can't create input using $input_module") unless ($input);                  $log->logdie("can't create input using $input_module") unless ($input);
331    
332                  my $maxmfn = $input_db->open(                  my $maxmfn = $input_db->open(
333                          path => $input->{path},                          path => $input->{path},
334                          code_page => $input->{encoding},        # database encoding                          code_page => $input->{encoding},        # database encoding
335                            %{ $input },
336                  );                  );
337    
338                  my $n = new WebPAC::Normalize::XML(                  my @norm_array = ref($input->{normalize}) eq 'ARRAY' ?
339                  #       filter => { 'foo' => sub { shift } },                          @{ $input->{normalize} } : ( $input->{normalize} );
                         db => $db,  
                         lookup_regex => $lookup->regex,  
                         lookup => $lookup,  
                         prefix => $input->{name},  
                 );  
   
                 my $rules;  
                 my $normalize_path = $input->{normalize}->{path};  
340    
341                  if ($force_set) {                  if ($marc_normalize) {
342                          my $new_norm_path = $normalize_path;                          @norm_array = ( {
343                          $new_norm_path =~ s/\.xml$/.pl/;                                  path => $marc_normalize,
344                          if (-e $new_norm_path) {                                  output => $marc_output || 'out/marc/' . $database . '-' . $input->{name} . '.marc',
345                                  $log->debug("--force-set replaced $normalize_path with $new_norm_path");                          } );
                                 $normalize_path = $new_norm_path;  
                         } else {  
                                 $log->debug("--force-set failed on $new_norm_path, fallback to $normalize_path");  
                         }  
346                  }                  }
347    
348                  if ($normalize_path =~ m/\.xml$/i) {                  foreach my $normalize (@norm_array) {
                         $n->open(  
                                 tag => $input->{normalize}->{tag},  
                                 xml_file => $normalize_path,  
                         );  
                 } elsif ($normalize_path =~ m/\.(?:yml|yaml)$/i) {  
                         $n->open_yaml(  
                                 path => $normalize_path,  
                                 tag => $input->{normalize}->{tag},  
                         );  
                 } elsif ($normalize_path =~ m/\.(?:pl)$/i) {  
                         $n = undef;  
                         $log->info("using WebPAC::Normalize::Set to process $normalize_path");  
                         $rules = read_file( $normalize_path ) or die "can't open $normalize_path: $!";  
                 }  
349    
350                  foreach my $pos ( 0 ... $input_db->size ) {                          my $normalize_path = $normalize->{path} || $log->logdie("can't find normalize path in config");
351    
352                          my $row = $input_db->fetch || next;                          $log->logdie("Found '$normalize_path' as normalization file which isn't supported any more!") unless ( $normalize_path =~ m!\.pl$!i );
353    
354                          my $mfn = $row->{'000'}->[0];                          my $rules = read_file( $normalize_path ) or die "can't open $normalize_path: $!";
355    
356                          if (! $mfn || $mfn !~ m#^\d+$#) {                          $log->info("Using $normalize_path for normalization...");
357                                  $log->warn("record $pos doesn't have valid MFN but '$mfn', using $pos");  
358                                  $mfn = $pos;                          my $marc = new WebPAC::Output::MARC(
359                                  push @{ $row->{'000'} }, $pos;                                  path => $normalize->{output},
360                          }                                  lint => $marc_lint,
361                                    dump => $marc_dump,
362                            ) if ($normalize->{output});
363    
364                            # reset position in database
365                            $input_db->seek(1);
366    
367                            foreach my $pos ( 0 ... $input_db->size ) {
368    
369                                    my $row = $input_db->fetch || next;
370    
371                                    my $mfn = $row->{'000'}->[0];
372    
373                                    if (! $mfn || $mfn !~ m#^\d+$#) {
374                                            $log->warn("record $pos doesn't have valid MFN but '$mfn', using $pos");
375                                            $mfn = $pos;
376                                            push @{ $row->{'000'} }, $pos;
377                                    }
378    
379    
380                                    if ($validate) {
381                                            my @errors = $validate->validate_errors( $row );
382                                            $log->error( "MFN $mfn validation errors:\n", join("\n", @errors) ) if (@errors);
383                                    }
384    
385                                    my $ds_config = dclone($db_config);
386    
387                                    # default values -> database key
388                                    $ds_config->{_} = $database;
389    
390                                    # current mfn
391                                    $ds_config->{_mfn} = $mfn;
392    
393                          my $ds = $n ? $n->data_structure($row) :                                  # attach current input
394                                  WebPAC::Normalize::Set::data_structure(                                  $ds_config->{input} = $input;
395    
396                                    my $ds = WebPAC::Normalize::data_structure(
397                                          row => $row,                                          row => $row,
398                                          rules => $rules,                                          rules => $rules,
399                                          lookup => $lookup->lookup_hash,                                          lookup => $lookup ? $lookup->lookup_hash : undef,
400                                            config => $ds_config,
401                                            marc_encoding => 'utf-8',
402                                  );                                  );
403    
404                          $indexer->add(                                  $db->save_ds(
405                                  id => $input->{name} . "/" . $mfn,                                          id => $mfn,
406                                  ds => $ds,                                          ds => $ds,
407                                  type => $config->{$use_indexer}->{type},                                          prefix => $input->{name},
408                          );                                  ) if ($ds && !$stats);
409    
410                                    $indexer->add(
411                                            id => $input->{name} . "/" . $mfn,
412                                            ds => $ds,
413                                            type => $config->{$use_indexer}->{type},
414                                    ) if ($indexer && $ds);
415    
416                                    if ($marc) {
417                                            my $i = 0;
418    
419                                            while (my $fields = WebPAC::Normalize::_get_marc_fields( fetch_next => 1 ) ) {
420                                                    $marc->add(
421                                                            id => $mfn . ( $i ? "/$i" : '' ),
422                                                            fields => $fields,
423                                                            leader => WebPAC::Normalize::marc_leader(),
424                                                            row => $row,
425                                                    );
426                                                    $i++;
427                                            }
428    
429                                            $log->info("Created $i instances of MFN $mfn\n") if ($i > 1);
430                                    }
431    
432                                    $total_rows++;
433                            }
434    
435                            $log->info("statistics of fields usage:\n", $input_db->stats) if ($stats);
436    
437                            # close MARC file
438                            $marc->finish if ($marc);
439    
                         $total_rows++;  
440                  }                  }
441    
442          };          }
443    
444          eval { $indexer->finish } if ($indexer->can('finish'));          eval { $indexer->finish } if ($indexer && $indexer->can('finish'));
445    
446          my $dt = time() - $start_t;          my $dt = time() - $start_t;
447          $log->info("$total_rows records indexed in " .          $log->info("$total_rows records ", $indexer ? "indexed " : "",
448                  sprintf("%.2f sec [%.2f rec/sec]",                  sprintf("in %.2f sec [%.2f rec/sec]",
449                          $dt, ($total_rows / $dt)                          $dt, ($total_rows / $dt)
450                  )                  )
451          );          );
452    
453          #  
454          # add Hyper Estraier links to other databases          # end forked process
455          #          if ($parallel) {
456          if (ref($db_config->{links}) eq 'ARRAY') {                  $log->info("parallel process $$ finished");
457                  foreach my $link (@{ $db_config->{links} }) {                  exit(0);
                         if ($use_indexer eq 'hyperestraier') {  
                                 $log->info("adding link $database -> $link->{to} [$link->{credit}]");  
                                 $indexer->add_link(  
                                         from => $database,  
                                         to => $link->{to},  
                                         credit => $link->{credit},  
                                 );  
                         } else {  
                                 $log->warn("NOT IMPLEMENTED WITH $use_indexer: adding link $database -> $link->{to} [$link->{credit}]");  
                         }  
                 }  
458          }          }
459    
460  }  }
461    
462    if ($parallel) {
463            # wait all children to finish
464            sleep(1) while wait != -1;
465            $log->info("all parallel processes finished");
466    }
467    
468    #
469    # handle links or merge after indexing
470    #
471    
472    if ($merge) {
473            print $estcmd_fh 'sudo /etc/init.d/hyperestraier start',$/;
474            close($estcmd_fh);
475            chmod 0700, $estcmd_path || $log->warn("can't chmod 0700 $estcmd_path: $!");
476            system $estcmd_path;
477    } else {
478            foreach my $link (@links) {
479                    $log->logdie("coderef in link ", Dumper($link), " is ", ref($link), " and not CODE") unless (ref($link) eq 'CODE');
480                    $link->();
481            }
482    }

Legend:
Removed from v.493  
changed lines
  Added in v.608

  ViewVC Help
Powered by ViewVC 1.1.26