/[wait]/trunk/lib/WAIT/Table.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/lib/WAIT/Table.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 107 by dpavlin, Mon May 24 20:57:08 2004 UTC revision 108 by dpavlin, Tue Jul 13 17:41:12 2004 UTC
# Line 25  WAIT::Table -- Module for maintaining Ta Line 25  WAIT::Table -- Module for maintaining Ta
25  =cut  =cut
26    
27  package WAIT::Table;  package WAIT::Table;
28    our $VERSION = "2.000";
29    
30  use WAIT::Table::Handle ();  use WAIT::Table::Handle ();
31  require WAIT::Parse::Base;  require WAIT::Parse::Base;
# Line 32  require WAIT::Parse::Base; Line 33  require WAIT::Parse::Base;
33  use strict;  use strict;
34  use Carp;  use Carp;
35  # use autouse Carp => qw( croak($) );  # use autouse Carp => qw( croak($) );
36  use DB_File;  use BerkeleyDB;
37  use Fcntl;  use Fcntl;
 use LockFile::Simple ();  
   
 my $USE_RECNO = 0;  
38    
39  =head2 Creating a Table.  =head2 Creating a Table.
40    
# Line 133  sub new { Line 131  sub new {
131    my $self = {};    my $self = {};
132    
133    # Check for mandatory attrs early    # Check for mandatory attrs early
134    $self->{name}     = $parm{name}     or croak "No name specified";    for my $x (qw(name attr env maindbfile tablename)) {
135    $self->{attr}     = $parm{attr}     or croak "No attributes specified";      $self->{$x}     = $parm{$x}     or croak "No $x specified";
136      }
137    
138    # Do that before we eventually add '_weight' to attributes.    # Do that before we eventually add '_weight' to attributes.
139    $self->{keyset}   = $parm{keyset}   || [[@{$parm{attr}}]];    $self->{keyset}   = $parm{keyset}   || [[@{$parm{attr}}]];
# Line 158  sub new { Line 157  sub new {
157      unshift @{$parm{attr}}, '_weight' unless $attr{'_weight'};      unshift @{$parm{attr}}, '_weight' unless $attr{'_weight'};
158    }    }
159    
160    $self->{file}     = $parm{file}     or croak "No file specified";    $self->{path}     = $parm{path}     or croak "No path specified";
161    if (-d  $self->{file}){    bless $self, $type;
     warn "Warning: Directory '$self->{file}' already exists\n";  
   } elsif (!mkdir($self->{file}, 0775)) {  
     croak "Could not 'mkdir $self->{file}': $!\n";  
   }  
162    
163    $self->{djk}      = $parm{djk}      if defined $parm{djk};    $self->{djk}      = $parm{djk}      if defined $parm{djk};
164    $self->{layout}   = $parm{layout} || new WAIT::Parse::Base;    $self->{layout}   = $parm{layout} || new WAIT::Parse::Base;
# Line 172  sub new { Line 167  sub new {
167    $self->{deleted}  = {};       # no deleted records yet    $self->{deleted}  = {};       # no deleted records yet
168    $self->{indexes}  = {};    $self->{indexes}  = {};
169    
   bless $self, $type;  
   
170    # Checking for readers is not necessary, but let's go with the    # Checking for readers is not necessary, but let's go with the
171    # generic method.    # generic method.
   $self->getlock(O_RDWR|O_CREAT); # dies when failing  
172        
173    # Call create_index() and create_index() for compatibility    # Call create_index() and create_index() for compatibility
174    for (@{$self->{keyset}||[]}) {    for (@{$self->{keyset}||[]}) {
# Line 187  sub new { Line 179  sub new {
179      # carp "Specification of inverted indexes at table create time is deprecated";      # carp "Specification of inverted indexes at table create time is deprecated";
180      my $att  = shift @{$parm{invindex}};      my $att  = shift @{$parm{invindex}};
181      my @spec = @{shift @{$parm{invindex}}};      my @spec = @{shift @{$parm{invindex}}};
182      my @opt;      my @opt  = ();
183    
184      if (ref($spec[0])) {      if (ref($spec[0])) {
185        carp "Secondary pipelines are deprecated\n";        warn "Secondary pipelines are deprecated";
186        @opt = %{shift @spec};        @opt = %{shift @spec};
187      }      }
188      $self->create_inverted_index(attribute => $att, pipeline  => \@spec, @opt);      $self->create_inverted_index(attribute => $att,
189                                     pipeline  => \@spec,
190                                     @opt);
191    }    }
192    
193    $self;    $self;
194    # end of backwarn compatibility stuff    # end of backwarn compatibility stuff
195  }  }
196    
197    for my $accessor (qw(maindbfile tablename)) {
198      no strict 'refs';
199      *{$accessor} = sub {
200        my($self) = @_;
201        return $self->{$accessor} if $self->{$accessor};
202        require Carp;
203        Carp::confess("accessor $accessor not there");
204      }
205    }
206    
207  =head2 Creating an index  =head2 Creating an index
208    
209    $tb->create_index('docid');    $tb->create_index('docid');
# Line 222  sub create_index { Line 226  sub create_index {
226    require WAIT::Index;    require WAIT::Index;
227    
228    my $name = join '-', @_;    my $name = join '-', @_;
229      #### warn "WARNING: Suspect use of \$_ in method create_index. name[$name]_[$_]";
230    $self->{indexes}->{$name} =    $self->{indexes}->{$name} =
231      new WAIT::Index file => $self->{file}.'/'.$name, attr => $_;      WAIT::Index->new(
232                         file => $self->file.'/'.$name,
233                         subname => $name,
234                         env  => $self->{env},
235                         maindbfile => $self->maindbfile,
236                         tablename => $self->tablename,
237                         attr => $_,
238                        );
239  }  }
240    
241  =head2 Creating an inverted index  =head2 Creating an inverted index
# Line 284  sub create_inverted_index { Line 296  sub create_inverted_index {
296    }    }
297    
298    my $name = join '_', ($parm{attribute}, @{$parm{pipeline}});    my $name = join '_', ($parm{attribute}, @{$parm{pipeline}});
299    my $idx = new WAIT::InvertedIndex(file   => $self->{file}.'/'.$name,    my $idx = WAIT::InvertedIndex->new(file   => $self->file.'/'.$name,
300                                      filter => [@{$parm{pipeline}}], # clone                                       subname=> $name,
301                                      name   => $name,                                       env    => $self->{env},
302                                      attr   => $parm{attribute},                                       maindbfile => $self->maindbfile,
303                                      %opt, # backward compatibility stuff                                       tablename => $self->tablename,
304                                     );                                       filter => [@{$parm{pipeline}}], # clone
305                                         name   => $name,
306                                         attr   => $parm{attribute},
307                                         %opt, # backward compatibility stuff
308                                        );
309    # We will have to use $parm{predicate} here    # We will have to use $parm{predicate} here
310    push @{$self->{inverted}->{$parm{attribute}}}, $idx;    push @{$self->{inverted}->{$parm{attribute}}}, $idx;
311  }  }
312    
313  sub dir {  sub dir {
314    $_[0]->{file};    $_[0]->file;
315  }  }
316    
317  =head2 C<$tb-E<gt>layout>  =head2 C<$tb-E<gt>layout>
# Line 324  Must be called via C<WAIT::Database::dro Line 340  Must be called via C<WAIT::Database::dro
340  sub drop {  sub drop {
341    my $self = shift;    my $self = shift;
342    
   unless ($self->{write_lock}){  
     warn "Cannot drop table without write lock. Nothing done";  
     return;  
   }  
     
343    if ((caller)[0] eq 'WAIT::Database') { # database knows about this    if ((caller)[0] eq 'WAIT::Database') { # database knows about this
344      $self->close;               # just make sure      $self->close;               # just make sure
345    
346      my $file = $self->{file};      my $file = $self->file;
347    
348      for (values %{$self->{indexes}}) {      for (values %{$self->{indexes}}) {
349        $_->drop;        $_->drop;
# Line 340  sub drop { Line 351  sub drop {
351      unlink "$file/records";      unlink "$file/records";
352      rmdir "$file/read" or warn "Could not rmdir '$file/read'";      rmdir "$file/read" or warn "Could not rmdir '$file/read'";
353    
     # $self->unlock;  
     ! (!-e $file or rmdir $file);  
354    } else {    } else {
355      croak ref($self)."::drop called directly";      croak ref($self)."::drop called directly";
356    }    }
# Line 355  sub mrequire ($) { Line 364  sub mrequire ($) {
364    require $module;    require $module;
365  }  }
366    
367    sub path {
368      my($self) = @_;
369      return $self->{path} if $self->{path};
370      require Data::Dumper; print STDERR "Line " . __LINE__ . ", File: " . __FILE__ . "\n" . Data::Dumper->new([$self],[qw(self)])->Indent(1)->Useqq(1)->Dump; # XXX
371      require Carp;
372      Carp::confess("NO file attr");
373    }
374    
375  sub open {  sub open {
376    my $self = shift;    my $self = shift;
377    my $file = $self->{file} . '/records';    my $file = $self->file . '/records';
378    
379    mrequire ref($self);           # that's tricky eh?    mrequire ref($self);           # that's tricky eh?
380    if (defined $self->{'layout'}) {    if (defined $self->{'layout'}) {
# Line 368  sub open { Line 385  sub open {
385    }    }
386    if (exists $self->{indexes}) {    if (exists $self->{indexes}) {
387      require WAIT::Index;      require WAIT::Index;
388      for (values %{$self->{indexes}}) {      for my $Ind (values %{$self->{indexes}}) {
389        $_->{mode} = $self->{mode};        for my $x (qw(mode env maindbfile)) {
390            $Ind->{$x} = $self->{$x};
391          }
392      }      }
393    }    }
394    if (exists $self->{inverted}) {    if (exists $self->{inverted}) {
395      my ($att, $idx);      my ($att, $idx);
396      for $att (keys %{$self->{inverted}}) {      for $att (keys %{$self->{inverted}}) {
397        for $idx (@{$self->{inverted}->{$att}}) {        for $idx (@{$self->{inverted}->{$att}}) {
398          $idx->{mode} = $self->{mode};          for my $x (qw(mode env maindbfile)) {
399              $idx->{$x} = $self->{$x};
400            }
401        }        }
402      }      }
403      require WAIT::InvertedIndex;      require WAIT::InvertedIndex;
404    }    }
405    
406    $self->getlock($self->{mode});    # CONFUSION: WAIT knows two *modes*: read-only or read-write.
407      # BerkeleyDB means file permissions when talking about Mode.
408      # BerkeleyDB has the "Flags" attribute to specify
409      # read/write/lock/etc subsystems.
410    
411      my $flags;
412      if ($self->{mode} & O_RDWR) {
413        $flags = DB_CREATE; # | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_CDB;
414        warn "Flags on table $file set to 'writing'";
415      } else {
416        $flags = DB_RDONLY;
417        # warn "Flags on table $file set to 'readonly'";
418      }
419    unless (defined $self->{dbh}) {    unless (defined $self->{dbh}) {
420      if ($USE_RECNO) {      my $subname = $self->tablename . "/records";
421        $self->{dbh} = tie(@{$self->{db}}, 'DB_File', $file,      $self->{dbh} =
422                           $self->{mode}, 0664, $DB_RECNO);          tie(%{$self->{db}}, 'BerkeleyDB::Btree',
423      } else {              $self->{env} ? (Env => $self->{env}) : (),
424        $self->{dbh} =              # Filename => $file,
425          tie(%{$self->{db}}, 'DB_File', $file,              Filename => $self->maindbfile,
426                           $self->{mode}, 0664, $DB_BTREE);              Subname => $subname,
427      }              Mode => 0664,
428                Flags => $flags,
429                $WAIT::Database::Cachesize?(Cachesize => $WAIT::Database::Cachesize):(),
430                $WAIT::Database::Pagesize?(Pagesize => $WAIT::Database::Pagesize):(),
431               )
432                or die "Cannot tie: $BerkeleyDB::Error;
433     DEBUG: Filename[$self->{maindbfile}]subname[$subname]Mode[0664]Flags[$flags]";
434    }    }
     
     
435    $self;    $self;
436  }  }
437    
# Line 473  sub insert { Line 509  sub insert {
509    unless ($gotkey) {    unless ($gotkey) {
510      $key = $self->{nextk}++;      $key = $self->{nextk}++;
511    }    }
512    if ($USE_RECNO) {    $self->{db}->{$key} = $tuple;
     $self->{db}->[$key] = $tuple;  
   } else {  
     $self->{db}->{$key} = $tuple;  
   }  
513    for (values %{$self->{indexes}}) {    for (values %{$self->{indexes}}) {
514      unless ($_->insert($key, %parm)) {      unless ($_->insert($key, %parm)) {
515        # duplicate key, undo changes        # duplicate key, undo changes
# Line 528  sub fetch { Line 560  sub fetch {
560    return () if exists $self->{deleted}->{$key};    return () if exists $self->{deleted}->{$key};
561    
562    defined $self->{db} or $self->open;    defined $self->{db} or $self->open;
563    if ($USE_RECNO) {    $self->unpack($self->{db}->{$key});
     $self->unpack($self->{db}->[$key]);  
   } else {  
     $self->unpack($self->{db}->{$key});  
   }  
564  }  }
565    
566  sub delete_by_key {  sub delete_by_key {
# Line 592  sub unpack { Line 620  sub unpack {
620    
621  sub set {  sub set {
622    my ($self, $iattr, $value) = @_;    my ($self, $iattr, $value) = @_;
     
   unless ($self->{write_lock}){  
     warn "Cannot set iattr[$iattr] without write lock. Nothing done";  
     return;  
   }  
   
623    # in the rare case that they haven't written a single record yet, we    # in the rare case that they haven't written a single record yet, we
624    # make sure, the inverted inherits our $self->{mode}:    # make sure, the inverted inherits our $self->{mode}:
625    defined $self->{db} or $self->open;    defined $self->{db} or $self->open;
626    
627    for my $att (keys %{$self->{inverted}}) {    for my $att (keys %{$self->{inverted}}) {
628      require WAIT::InvertedIndex;      if ($] > 5.003) {         # avoid bug in perl up to 5.003_05
     if ($^V gt v5.003) {         # avoid bug in perl up to 5.003_05  
629        my $idx;        my $idx;
630        for $idx (@{$self->{inverted}->{$att}}) {        for $idx (@{$self->{inverted}->{$att}}) {
631          $idx->set($iattr, $value);          $idx->set($iattr, $value);
# Line 620  sub set { Line 641  sub set {
641  sub close {  sub close {
642    my $self = shift;    my $self = shift;
643    
644      require Carp; Carp::cluck("------->Closing A Table<-------");
645    
646    if (exists $self->{'access'}) {    if (exists $self->{'access'}) {
647      eval {$self->{'access'}->close}; # dont bother if not opened      eval {$self->{'access'}->close}; # dont bother if not opened
648    }    }
# Line 646  sub close { Line 669  sub close {
669    }    }
670    if ($self->{dbh}) {    if ($self->{dbh}) {
671      delete $self->{dbh};      delete $self->{dbh};
   
     if ($USE_RECNO) {  
       untie @{$self->{db}};  
     } else {  
       untie %{$self->{db}};  
     }  
     delete $self->{db};  
672    }    }
673      untie %{$self->{db}};
674    $self->unlock;    for my $att (qw(env db file maindbfile)) {
675          delete $self->{$att};
676    1;      warn "----->Deleted att $att<-----";
 }  
   
 # Locking  
 #  
 # We allow multiple readers to coexists.  But write access excludes  
 # all read access and vice versa.  In practice read access on tables  
 # open for writing will mostly work ;-)  
   
 # If a "write" lock is requested, an existing "read" lock will be  
 # released.  If a "read" lock ist requested, an existing "write" lock  
 # will be released.  Requiring a lock already hold has no effect.  
   
 sub getlock {  
   my ($self, $mode) = @_;  
   
   # autoclean cleans on DESTROY, stale sends SIGZERO to the owner  
   #  
   my $lockmgr = LockFile::Simple->make(-autoclean => 1, -stale => 1);  
   my $file    = $self->{file} . '/records';  
   my $lockdir = $self->{file} . '/read';  
   
   unless (-d $lockdir) {  
     mkdir $lockdir, 0755 or die "Could not mkdir $lockdir: $!";  
677    }    }
     
   if ($mode & O_RDWR) {         # Get a write lock.  Release it again  
                                 # and die if there is any valid  
                                 # readers.  
       
     # Have a write lock already  
     return $self if $self->{write_lock};  
   
     if ($self->{read_lock}) {   # We are a becoming a writer now. So  
                                 # we release the read lock to avoid  
                                 # blocking ourselves.  
       $self->{read_lock}->release;  
       delete $self->{read_lock};  
     }  
   
     # Get the preliminary write lock  
     $self->{write_lock} = $lockmgr->lock($self->{file} . '/write')  
       or die "Can't lock '$self->{file}/write'";  
       
     # If we actually want to write we must check if there are any  
     # readers.  The write lock is confirmed if wen cannot find any  
     # valid readers.  
       
     local *DIR;  
     opendir DIR, $lockdir or  
       die "Could not opendir '$lockdir': $!";  
     for my $lockfile (grep { -f "$lockdir/$_" } readdir DIR) {  
       # Check if the locks are still valid.  Since we are protected by  
       # a write lock, we could use a plain file.  But we want to use  
       # the stale testing from LockFile::Simple.  
       if (my $lck = $lockmgr->trylock("$lockdir/$lockfile")) {  
         warn "Removing stale lockfile '$lockdir/$lockfile'";  
         $lck->release;  
       } else {                  # Found an active reader, rats!  
         $self->{write_lock}->release;  
         die "Cannot write table '$file' while it's in use";  
       }  
     }  
     closedir DIR;  
   } else {  
     # Have a read lock already  
     return $self if $self->{read_lock};  
   
     # Get the preliminary write lock to protect the directory  
     # operations.  
   
     my $write_lock = $lockmgr->lock($self->{file} . '/read/write')  
       or die "Can't lock '$self->{file}/read/write'";  
   
     # Find a new read slot.  Maybe the plain file would be better?  
     my $id = time;  
     while (-f "$lockdir/$id.lock") { # here assume ".lock" format!  
       $id++;  
     }  
   
     $self->{read_lock} = $lockmgr->lock("$lockdir/$id")  
       or die "Can't lock '$lockdir/$id'";  
678    
679      # We are a reader now. So we release the write lock    1;
     $write_lock->release;  
   }  
   return $self;  
680  }  }
681    
682  sub unlock {  sub DESTROY {
683    my $self = shift;    my $self = shift;
684    
685    # Either we have a read or a write lock (or we close the table already)    delete $self->{env};
   # unless ($self->{read_lock} || $self->{write_lock}) {  
   #   warn "WAIT::Table::unlock: Table aparently hold's no lock"  
   # }  
   if ($self->{write_lock}) {  
     $self->{write_lock}->release();  
     delete $self->{write_lock};  
   }  
   if ($self->{read_lock}) {  
     $self->{read_lock}->release();  
     delete $self->{read_lock};  
   }  
686    
687  }    # require Data::Dumper; print STDERR "Line " . __LINE__ . ", File: " . __FILE__ . "\n" . Data::Dumper->new([$self],[qw(self)])->Indent(1)->Useqq(1)->Dump; # XXX
688    
 sub DESTROY {  
   my $self = shift;  
   
   if ($self->{write_lock} || $self->{read_lock}) {  
     warn "Table handle destroyed without closing it first";  
     $self->unlock;  
   }  
689  }  }
690    
691  sub open_scan {  sub open_scan {
# Line 827  sub intervall { Line 742  sub intervall {
742    bless \%result, 'WAIT::Query::Raw';    bless \%result, 'WAIT::Query::Raw';
743  }  }
744    
745  sub search {  sub search_ref {
746    my $self  = shift;    my $self  = shift;
747    my ($query, $attr, $cont, $raw);    my ($query, $attr, $cont, $raw);
748    if (ref $_[0]) {    if (ref $_[0]) {
749      $query = shift;      $query = shift;
750          # require Data::Dumper; print STDERR "Line " . __LINE__ . ", File: " . __FILE__ . "\n" . Data::Dumper->new([$query],[qw()])->Indent(1)->Useqq(1)->Dump; # XXX
751    
752      $attr = $query->{attr};      $attr = $query->{attr};
753      $cont = $query->{cont};      $cont = $query->{cont};
754      $raw  = $query->{raw};      $raw  = $query->{raw};
# Line 872  sub search { Line 788  sub search {
788    }    }
789    if (defined $cont and $cont ne '') {    if (defined $cont and $cont ne '') {
790      for (@{$self->{inverted}->{$attr}}) {      for (@{$self->{inverted}->{$attr}}) {
791        my %r = $_->search($query, $cont);        my $r = $_->search_ref($query, $cont);
792        my ($key, $val);        my ($key, $val);
793        while (($key, $val) = each %r) {        while (($key, $val) = each %$r) {
794          if (exists $result{$key}) {          if (exists $result{$key}) {
795            $result{$key} += $val;            $result{$key} += $val;
796          } else {          } else {
# Line 888  sub search { Line 804  sub search {
804    for (keys %result) {    for (keys %result) {
805      delete $result{$_} if $self->{deleted}->{$_}      delete $result{$_} if $self->{deleted}->{$_}
806    }    }
807    %result;    \%result;
808    }
809    
810    sub parse_query {
811      my($self, $attr, $query) = @_;
812      return unless defined $query && length $query;
813      my %qt;
814      for (@{$self->{inverted}->{$attr}}) {
815        grep $qt{$_}++, $_->parse($query);
816      }
817      [keys %qt];
818  }  }
819    
820  sub hilight_positions {  sub hilight_positions {

Legend:
Removed from v.107  
changed lines
  Added in v.108

  ViewVC Help
Powered by ViewVC 1.1.26