/[wait]/trunk/lib/WAIT/Table.pm
This is repository of my old source code which isn't updated any more. Go to git.rot13.org for current projects!
ViewVC logotype

Diff of /trunk/lib/WAIT/Table.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

cvs-head/lib/WAIT/Table.pm revision 41 by laperla, Mon Nov 13 20:25:49 2000 UTC trunk/lib/WAIT/Table.pm revision 115 by dpavlin, Wed Jul 14 07:35:56 2004 UTC
# Line 4  Line 4 
4  # Author          : Ulrich Pfeifer  # Author          : Ulrich Pfeifer
5  # Created On      : Thu Aug  8 13:05:10 1996  # Created On      : Thu Aug  8 13:05:10 1996
6  # Last Modified By: Ulrich Pfeifer  # Last Modified By: Ulrich Pfeifer
7  # Last Modified On: Sun Nov 12 17:51:56 2000  # Last Modified On: Wed Jan 23 14:15:15 2002
8  # Language        : CPerl  # Language        : CPerl
9  # Update Count    : 148  # Update Count    : 152
10  # Status          : Unknown, Use with caution!  # Status          : Unknown, Use with caution!
11  #  #
12  # Copyright (c) 1996-1997, Ulrich Pfeifer  # Copyright (c) 1996-1997, Ulrich Pfeifer
# Line 25  WAIT::Table -- Module for maintaining Ta Line 25  WAIT::Table -- Module for maintaining Ta
25  =cut  =cut
26    
27  package WAIT::Table;  package WAIT::Table;
28    our $VERSION = "2.000";
29    
30  use WAIT::Table::Handle ();  use WAIT::Table::Handle ();
31  require WAIT::Parse::Base;  require WAIT::Parse::Base;
32    
33  use strict;  use strict;
34  use Carp;  use Carp qw(cluck croak confess);
35  # use autouse Carp => qw( croak($) );  # use autouse Carp => qw( croak($) );
36  use DB_File;  use BerkeleyDB;
37  use Fcntl;  use Fcntl;
 use LockFile::Simple ();  
   
 my $USE_RECNO = 0;  
38    
39  =head2 Creating a Table.  =head2 Creating a Table.
40    
# Line 84  In that moment the access-defining class Line 82  In that moment the access-defining class
82  structures that cannot be reconstructed via the Data::Dumper dump,  structures that cannot be reconstructed via the Data::Dumper dump,
83  such as database handles or C pointers.  such as database handles or C pointers.
84    
85  =item C<file> => I<fname>  =item C<path> => I<dir>
86    
87  The filename of the records file. Files for indexes will have I<fname>  The path to database. Files for indexes will have I<path>
88  as prefix. I<Mandatory>, but usually taken care of by the  as prefix. I<Mandatory>, but usually taken care of by the
89  WAIT::Database handle when the constructor is called via  WAIT::Database handle when the constructor is called via
90  WAIT::Database::create_table().  WAIT::Database::create_table().
# Line 133  sub new { Line 131  sub new {
131    my $self = {};    my $self = {};
132    
133    # Check for mandatory attrs early    # Check for mandatory attrs early
134    $self->{name}     = $parm{name}     or croak "No name specified";    for my $x (qw(name attr env maindbfile tablename)) {
135    $self->{attr}     = $parm{attr}     or croak "No attributes specified";      $self->{$x}     = $parm{$x}     or croak "No $x specified";
136      }
137    
138    # Do that before we eventually add '_weight' to attributes.    # Do that before we eventually add '_weight' to attributes.
139    $self->{keyset}   = $parm{keyset}   || [[@{$parm{attr}}]];    $self->{keyset}   = $parm{keyset}   || [[@{$parm{attr}}]];
# Line 158  sub new { Line 157  sub new {
157      unshift @{$parm{attr}}, '_weight' unless $attr{'_weight'};      unshift @{$parm{attr}}, '_weight' unless $attr{'_weight'};
158    }    }
159    
160    $self->{file}     = $parm{file}     or croak "No file specified";    $self->{path}     = $parm{path}     or croak "No path specified";
161    if (-d  $self->{file}){    bless $self, $type;
     warn "Warning: Directory '$self->{file}' already exists\n";  
   } elsif (!mkdir($self->{file}, 0775)) {  
     croak "Could not 'mkdir $self->{file}': $!\n";  
   }  
162    
163    $self->{djk}      = $parm{djk}      if defined $parm{djk};    $self->{djk}      = $parm{djk}      if defined $parm{djk};
164    $self->{layout}   = $parm{layout} || new WAIT::Parse::Base;    $self->{layout}   = $parm{layout} || new WAIT::Parse::Base;
# Line 172  sub new { Line 167  sub new {
167    $self->{deleted}  = {};       # no deleted records yet    $self->{deleted}  = {};       # no deleted records yet
168    $self->{indexes}  = {};    $self->{indexes}  = {};
169    
   bless $self, $type;  
   
170    # Checking for readers is not necessary, but let's go with the    # Checking for readers is not necessary, but let's go with the
171    # generic method.    # generic method.
   $self->getlock(O_RDWR|O_CREAT); # dies when failing  
172        
173    # Call create_index() and create_index() for compatibility    # Call create_index() and create_index() for compatibility
174    for (@{$self->{keyset}||[]}) {    for (@{$self->{keyset}||[]}) {
# Line 187  sub new { Line 179  sub new {
179      # carp "Specification of inverted indexes at table create time is deprecated";      # carp "Specification of inverted indexes at table create time is deprecated";
180      my $att  = shift @{$parm{invindex}};      my $att  = shift @{$parm{invindex}};
181      my @spec = @{shift @{$parm{invindex}}};      my @spec = @{shift @{$parm{invindex}}};
182      my @opt;      my @opt  = ();
183    
184      if (ref($spec[0])) {      if (ref($spec[0])) {
185        carp "Secondary pipelines are deprecated\n";        warn "Secondary pipelines are deprecated";
186        @opt = %{shift @spec};        @opt = %{shift @spec};
187      }      }
188      $self->create_inverted_index(attribute => $att, pipeline  => \@spec, @opt);      $self->create_inverted_index(attribute => $att,
189                                     pipeline  => \@spec,
190                                     @opt);
191    }    }
192    
193    $self;    $self;
194    # end of backwarn compatibility stuff    # end of backwarn compatibility stuff
195  }  }
196    
197    for my $accessor (qw(maindbfile tablename)) {
198      no strict 'refs';
199      *{$accessor} = sub {
200        my($self) = @_;
201        return $self->{$accessor} if $self->{$accessor};
202        require Carp;
203        Carp::confess("accessor $accessor not there");
204      }
205    }
206    
207  =head2 Creating an index  =head2 Creating an index
208    
209    $tb->create_index('docid');    $tb->create_index('docid');
210    
211  =item C<create_index>  C<create_index>
   
212  must be called with a list of attributes. This must be a subset of the  must be called with a list of attributes. This must be a subset of the
213  attributes specified when the table was created. Currently this  attributes specified when the table was created. Currently this
214  method must be called before the first tuple is inserted in the  method must be called before the first tuple is inserted in the
# Line 222  sub create_index { Line 225  sub create_index {
225    require WAIT::Index;    require WAIT::Index;
226    
227    my $name = join '-', @_;    my $name = join '-', @_;
228      #### warn "WARNING: Suspect use of \$_ in method create_index. name[$name]_[$_]";
229    $self->{indexes}->{$name} =    $self->{indexes}->{$name} =
230      new WAIT::Index file => $self->{file}.'/'.$name, attr => $_;      WAIT::Index->new(
231                         path => $self->path.'/'.$name,
232                         subname => $name,
233                         env  => $self->{env},
234                         maindbfile => $self->maindbfile,
235                         tablename => $self->tablename,
236                         attr => $_,
237                        );
238  }  }
239    
240  =head2 Creating an inverted index  =head2 Creating an inverted index
# Line 284  sub create_inverted_index { Line 295  sub create_inverted_index {
295    }    }
296    
297    my $name = join '_', ($parm{attribute}, @{$parm{pipeline}});    my $name = join '_', ($parm{attribute}, @{$parm{pipeline}});
298    my $idx = new WAIT::InvertedIndex(file   => $self->{file}.'/'.$name,    my $idx = WAIT::InvertedIndex->new(path   => $self->path.'/'.$name,
299                                      filter => [@{$parm{pipeline}}], # clone                                       subname=> $name,
300                                      name   => $name,                                       env    => $self->{env},
301                                      attr   => $parm{attribute},                                       maindbfile => $self->maindbfile,
302                                      %opt, # backward compatibility stuff                                       tablename => $self->tablename,
303                                     );                                       filter => [@{$parm{pipeline}}], # clone
304                                         name   => $name,
305                                         attr   => $parm{attribute},
306                                         %opt, # backward compatibility stuff
307                                        );
308    # We will have to use $parm{predicate} here    # We will have to use $parm{predicate} here
309    push @{$self->{inverted}->{$parm{attribute}}}, $idx;    push @{$self->{inverted}->{$parm{attribute}}}, $idx;
310  }  }
311    
312  sub dir {  sub dir {
313    $_[0]->{file};    $_[0]->path;
314  }  }
315    
316  =head2 C<$tb-E<gt>layout>  =head2 C<$tb-E<gt>layout>
# Line 324  Must be called via C<WAIT::Database::dro Line 339  Must be called via C<WAIT::Database::dro
339  sub drop {  sub drop {
340    my $self = shift;    my $self = shift;
341    
   unless ($self->{write_lock}){  
     warn "Cannot drop table without write lock. Nothing done";  
     return;  
   }  
     
342    if ((caller)[0] eq 'WAIT::Database') { # database knows about this    if ((caller)[0] eq 'WAIT::Database') { # database knows about this
343      $self->close;               # just make sure      $self->close;               # just make sure
344    
345      my $file = $self->{file};  #    my $path = $self->path;
346    
347      for (values %{$self->{indexes}}) {      for (values %{$self->{indexes}}) {
348        $_->drop;        $_->drop;
349      }      }
350      unlink "$file/records";  #    unlink "$path/records";
351      rmdir "$file/read" or warn "Could not rmdir '$file/read'";  #    rmdir "$path/read" or warn "Could not rmdir '$path/read'";
352    
     # $self->unlock;  
     ! (!-e $file or rmdir $file);  
353    } else {    } else {
354      croak ref($self)."::drop called directly";      confess ref($self)."::drop called directly";
355    }    }
356  }  }
357    
# Line 355  sub mrequire ($) { Line 363  sub mrequire ($) {
363    require $module;    require $module;
364  }  }
365    
366    sub path {
367      my($self) = @_;
368      return $self->{path} if $self->{path};
369      require Data::Dumper; print STDERR "Line " . __LINE__ . ", File: " . __FILE__ . "\n" . Data::Dumper->new([$self],[qw(self)])->Indent(1)->Useqq(1)->Dump; # XXX
370      require Carp;
371      confess("NO path attr");
372    }
373    
374  sub open {  sub open {
375    my $self = shift;    my $self = shift;
376    my $file = $self->{file} . '/records';    my $path = $self->path . '/records';
377    
378    mrequire ref($self);           # that's tricky eh?    mrequire ref($self);           # that's tricky eh?
379    if (defined $self->{'layout'}) {    if (defined $self->{'layout'}) {
# Line 368  sub open { Line 384  sub open {
384    }    }
385    if (exists $self->{indexes}) {    if (exists $self->{indexes}) {
386      require WAIT::Index;      require WAIT::Index;
387      for (values %{$self->{indexes}}) {      for my $Ind (values %{$self->{indexes}}) {
388        $_->{mode} = $self->{mode};        for my $x (qw(mode env maindbfile)) {
389            $Ind->{$x} = $self->{$x};
390          }
391      }      }
392    }    }
393    if (exists $self->{inverted}) {    if (exists $self->{inverted}) {
394      my ($att, $idx);      my ($att, $idx);
395      for $att (keys %{$self->{inverted}}) {      for $att (keys %{$self->{inverted}}) {
396        for $idx (@{$self->{inverted}->{$att}}) {        for $idx (@{$self->{inverted}->{$att}}) {
397          $idx->{mode} = $self->{mode};          for my $x (qw(mode env maindbfile)) {
398              $idx->{$x} = $self->{$x};
399            }
400        }        }
401      }      }
402      require WAIT::InvertedIndex;      require WAIT::InvertedIndex;
403    }    }
404    
405      # CONFUSION: WAIT knows two *modes*: read-only or read-write.
406      # BerkeleyDB means file permissions when talking about Mode.
407      # BerkeleyDB has the "Flags" attribute to specify
408      # read/write/lock/etc subsystems.
409    
410      my $flags;
411      if ($self->{mode} & O_RDWR) {
412        $flags = DB_CREATE; # | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_CDB;
413        #warn "DEBUG: Flags on table $path set to 'writing'";
414      } else {
415        $flags = DB_RDONLY;
416        #warn "DEBUG: Flags on table $path set to 'readonly'";
417      }
418    unless (defined $self->{dbh}) {    unless (defined $self->{dbh}) {
419      if ($USE_RECNO) {      my $subname = $self->tablename . "/records";
420        $self->{dbh} = tie(@{$self->{db}}, 'DB_File', $file,      $self->{dbh} =
421                           $self->{mode}, 0664, $DB_RECNO);          tie(%{$self->{db}}, 'BerkeleyDB::Btree',
422      } else {              $self->{env} ? (Env => $self->{env}) : (),
423        $self->{dbh} =              # Filename => $file,
424          tie(%{$self->{db}}, 'DB_File', $file,              Filename => $self->maindbfile,
425                           $self->{mode}, 0664, $DB_BTREE);              Subname => $subname,
426      }              Mode => 0664,
427                Flags => $flags,
428                $WAIT::Database::Cachesize?(Cachesize => $WAIT::Database::Cachesize):(),
429                $WAIT::Database::Pagesize?(Pagesize => $WAIT::Database::Pagesize):(),
430               )
431                or confess "Cannot tie: $BerkeleyDB::Error\nDEBUG: Filename[$self->{maindbfile}]subname[$subname]Mode[0664]Flags[$flags]";
432    }    }
     
   $self->getlock($self->{mode});  
     
433    $self;    $self;
434  }  }
435    
# Line 471  sub insert { Line 507  sub insert {
507    unless ($gotkey) {    unless ($gotkey) {
508      $key = $self->{nextk}++;      $key = $self->{nextk}++;
509    }    }
510    if ($USE_RECNO) {    $self->{db}->{$key} = $tuple;
     $self->{db}->[$key] = $tuple;  
   } else {  
     $self->{db}->{$key} = $tuple;  
   }  
511    for (values %{$self->{indexes}}) {    for (values %{$self->{indexes}}) {
512      unless ($_->insert($key, %parm)) {      unless ($_->insert($key, %parm)) {
513        # duplicate key, undo changes        # duplicate key, undo changes
# Line 526  sub fetch { Line 558  sub fetch {
558    return () if exists $self->{deleted}->{$key};    return () if exists $self->{deleted}->{$key};
559    
560    defined $self->{db} or $self->open;    defined $self->{db} or $self->open;
561    if ($USE_RECNO) {    $self->unpack($self->{db}->{$key});
     $self->unpack($self->{db}->[$key]);  
   } else {  
     $self->unpack($self->{db}->{$key});  
   }  
562  }  }
563    
564  sub delete_by_key {  sub delete_by_key {
# Line 538  sub delete_by_key { Line 566  sub delete_by_key {
566    my $key   = shift;    my $key   = shift;
567    
568    unless ($key) {    unless ($key) {
569      Carp::cluck "Warning: delete_by_key called without key. Looks like a bug in WAIT?";      cluck "Warning: delete_by_key called without key. Looks like a bug in WAIT?";
570      return;      return;
571    }    }
572    
# Line 590  sub unpack { Line 618  sub unpack {
618    
619  sub set {  sub set {
620    my ($self, $iattr, $value) = @_;    my ($self, $iattr, $value) = @_;
621        # in the rare case that they haven't written a single record yet, we
622    unless ($self->{write_lock}){    # make sure, the inverted inherits our $self->{mode}:
623      warn "Cannot set iattr[$iattr] without write lock. Nothing done";    defined $self->{db} or $self->open;
624      return;  
   }  
625    for my $att (keys %{$self->{inverted}}) {    for my $att (keys %{$self->{inverted}}) {
626      if ($] > 5.003) {         # avoid bug in perl up to 5.003_05      if ($] > 5.003) {         # avoid bug in perl up to 5.003_05
627        my $idx;        my $idx;
# Line 612  sub set { Line 639  sub set {
639  sub close {  sub close {
640    my $self = shift;    my $self = shift;
641    
642      #cluck("DEBUG: Closing A Table");
643    
644    if (exists $self->{'access'}) {    if (exists $self->{'access'}) {
645      eval {$self->{'access'}->close}; # dont bother if not opened      eval {$self->{'access'}->close}; # dont bother if not opened
646    }    }
# Line 638  sub close { Line 667  sub close {
667    }    }
668    if ($self->{dbh}) {    if ($self->{dbh}) {
669      delete $self->{dbh};      delete $self->{dbh};
   
     if ($USE_RECNO) {  
       untie @{$self->{db}};  
     } else {  
       untie %{$self->{db}};  
     }  
     delete $self->{db};  
670    }    }
671      untie %{$self->{db}};
672    $self->unlock;    for my $att (qw(env db path maindbfile)) {
673          delete $self->{$att};
674    1;      #cluck "DEBUG: Deleted att $att";
 }  
   
 # Locking  
 #  
 # We allow multiple readers to coexists.  But write access excludes  
 # all read access and vice versa.  In practice read access on tables  
 # open for writing will mostly work ;-)  
   
 # If a "write" lock is requested, an existing "read" lock will be  
 # released.  If a "read" lock ist requested, an existing "write" lock  
 # will be released.  Requiring a lock already hold has no effect.  
   
 sub getlock {  
   my ($self, $mode) = @_;  
   
   # autoclean cleans on DESTROY, stale sends SIGZERO to the owner  
   #  
   my $lockmgr = LockFile::Simple->make(-autoclean => 1, -stale => 1);  
   my $file    = $self->{file} . '/records';  
   my $lockdir = $self->{file} . '/read';  
   
   unless (-d $lockdir) {  
     mkdir $lockdir, 0755 or die "Could not mkdir $lockdir: $!";  
675    }    }
     
   if ($mode & O_RDWR) {         # Get a write lock.  Release it again  
                                 # and die if there is any valid  
                                 # readers.  
       
     # Have a write lock already  
     return $self if $self->{write_lock};  
   
     if ($self->{read_lock}) {   # We are a becoming a writer now. So  
                                 # we release the read lock to avoid  
                                 # blocking ourselves.  
       $self->{read_lock}->release;  
       delete $self->{read_lock};  
     }  
   
     # Get the preliminary write lock  
     $self->{write_lock} = $lockmgr->lock($self->{file} . '/write')  
       or die "Can't lock '$self->{file}/write'";  
       
     # If we actually want to write we must check if there are any  
     # readers.  The write lock is confirmed if wen cannot find any  
     # valid readers.  
       
     local *DIR;  
     opendir DIR, $lockdir or  
       die "Could not opendir '$lockdir': $!";  
     for my $lockfile (grep { -f "$lockdir/$_" } readdir DIR) {  
       # Check if the locks are still valid.  Since we are protected by  
       # a write lock, we could use a plain file.  But we want to use  
       # the stale testing from LockFile::Simple.  
       if (my $lck = $lockmgr->trylock("$lockdir/$lockfile")) {  
         warn "Removing stale lockfile '$lockdir/$lockfile'";  
         $lck->release;  
       } else {                  # Found an active reader, rats!  
         $self->{write_lock}->release;  
         die "Cannot write table '$file' while it's in use";  
       }  
     }  
     closedir DIR;  
   } else {  
     # Have a read lock already  
     return $self if $self->{read_lock};  
   
     # Get the preliminary write lock to protect the directory  
     # operations.  If we already have a write lock, it will go.  
   
     $self->{write_lock} ||= $lockmgr->lock($self->{file} . '/write')  
       or die "Can't lock '$self->{file}/write'";  
   
     # Find a new read slot.  Maybe the plain file would be better?  
     my $id = time;  
     while (-f "$lockdir/$id.lock") { # here assume ".lock" format!  
       $id++;  
     }  
676    
677      $self->{read_lock} = $lockmgr->lock("$lockdir/$id")    1;
       or die "Can't lock '$lockdir/$id'";  
   
     # We are a reader now. So we release the write lock  
     $self->{write_lock}->release;  
     delete $self->{write_lock};  
   }  
   return $self;  
678  }  }
679    
680  sub unlock {  sub DESTROY {
681    my $self = shift;    my $self = shift;
682    
683    # Either we have a read or a write lock (or we close the table already)    delete $self->{env};
   # unless ($self->{read_lock} || $self->{write_lock}) {  
   #   warn "WAIT::Table::unlock: Table aparently hold's no lock"  
   # }  
   if ($self->{write_lock}) {  
     $self->{write_lock}->release();  
     delete $self->{write_lock};  
   }  
   if ($self->{read_lock}) {  
     $self->{read_lock}->release();  
     delete $self->{read_lock};  
   }  
684    
685  }    # require Data::Dumper; print STDERR "Line " . __LINE__ . ", File: " . __FILE__ . "\n" . Data::Dumper->new([$self],[qw(self)])->Indent(1)->Useqq(1)->Dump; # XXX
   
 sub DESTROY {  
   my $self = shift;  
686    
   if ($self->{write_lock} || $self->{read_lock}) {  
     warn "Table handle destroyed without closing it first";  
     $self->unlock;  
   }  
687  }  }
688    
689  sub open_scan {  sub open_scan {
# Line 820  sub intervall { Line 740  sub intervall {
740    bless \%result, 'WAIT::Query::Raw';    bless \%result, 'WAIT::Query::Raw';
741  }  }
742    
743  sub search {  sub search_ref {
744    my $self  = shift;    my $self  = shift;
745    my ($query, $attr, $cont, $raw);    my ($query, $attr, $cont, $raw);
746    if (ref $_[0]) {    if (ref $_[0]) {
747      $query = shift;      $query = shift;
748          # require Data::Dumper; print STDERR "Line " . __LINE__ . ", File: " . __FILE__ . "\n" . Data::Dumper->new([$query],[qw()])->Indent(1)->Useqq(1)->Dump; # XXX
749    
750      $attr = $query->{attr};      $attr = $query->{attr};
751      $cont = $query->{cont};      $cont = $query->{cont};
752      $raw  = $query->{raw};      $raw  = $query->{raw};
753    } else {    } else {
754      require Carp;      cluck("Using three argument search interface is deprecated, use hashref interface instead");
     Carp::cluck("Using three argument search interface is deprecated, use hashref interface instead");  
755      $attr = shift;      $attr = shift;
756      $cont = shift;      $cont = shift;
757      $raw  = shift;      $raw  = shift;
# Line 865  sub search { Line 785  sub search {
785    }    }
786    if (defined $cont and $cont ne '') {    if (defined $cont and $cont ne '') {
787      for (@{$self->{inverted}->{$attr}}) {      for (@{$self->{inverted}->{$attr}}) {
788        my %r = $_->search($query, $cont);        my $r = $_->search_ref($query, $cont);
789        my ($key, $val);        my ($key, $val);
790        while (($key, $val) = each %r) {        while (($key, $val) = each %$r) {
791          if (exists $result{$key}) {          if (exists $result{$key}) {
792            $result{$key} += $val;            $result{$key} += $val;
793          } else {          } else {
# Line 881  sub search { Line 801  sub search {
801    for (keys %result) {    for (keys %result) {
802      delete $result{$_} if $self->{deleted}->{$_}      delete $result{$_} if $self->{deleted}->{$_}
803    }    }
804    %result;    \%result;
805    }
806    
807    sub parse_query {
808      my($self, $attr, $query) = @_;
809      return unless defined $query && length $query;
810      my %qt;
811      for (@{$self->{inverted}->{$attr}}) {
812        grep $qt{$_}++, $_->parse($query);
813      }
814      [keys %qt];
815  }  }
816    
817  sub hilight_positions {  sub hilight_positions {

Legend:
Removed from v.41  
changed lines
  Added in v.115

  ViewVC Help
Powered by ViewVC 1.1.26