Final changes for distributed archive crawls

Shawn Tice [2012-05-16 06:May:th]
Final changes for distributed archive crawls

This includes improved handling of saving and restoring state for the archive
iterators, a new tool arc_extract.php to help test the archive iterators which
extracts batches of pages from an archive directory, and improvements to the
crawl process to control simultaneous access to the archive being iterated over
on the name server.
Filename
bin/arc_extract.php
bin/fetcher.php
configs/config.php
controllers/fetch_controller.php
lib/archive_bundle_iterators/arc_archive_bundle_iterator.php
lib/archive_bundle_iterators/archive_bundle_iterator.php
lib/archive_bundle_iterators/mediawiki_bundle_iterator.php
lib/archive_bundle_iterators/odp_rdf_bundle_iterator.php
lib/archive_bundle_iterators/web_archive_bundle_iterator.php
diff --git a/bin/arc_extract.php b/bin/arc_extract.php
new file mode 100755
index 000000000..15b19c3c4
--- /dev/null
+++ b/bin/arc_extract.php
@@ -0,0 +1,240 @@
+<?php
+/**
+ *  SeekQuarry/Yioop --
+ *  Open Source Pure PHP Search Engine, Crawler, and Indexer
+ *
+ *  Copyright (C) 2009 - 2012  Chris Pollett chris@pollett.org
+ *
+ *  LICENSE:
+ *
+ *  This program is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ *  END LICENSE
+ *
+ * @author Shawn Tice sctice@gmail.com
+ * @package seek_quarry
+ * @subpackage bin
+ * @license http://www.gnu.org/licenses/ GPL3
+ * @link http://www.seekquarry.com/
+ * @copyright 2009 - 2012
+ * @filesource
+ */
+
+if(php_sapi_name() != 'cli') {echo "BAD REQUEST"; exit();}
+
+/** Calculate base directory of script @ignore */
+define("BASE_DIR", substr(
+    dirname(realpath($_SERVER['PHP_SELF'])), 0,
+    -strlen("/bin")));
+
+/** Some pages are huge, and the page hash function can run out of memory
+ * stripping script, noscript, and style tags. */
+ini_set("memory_limit","500M");
+
+/** This tool does not need logging */
+define("LOG_TO_FILES", false);
+
+/** Load in global configuration settings */
+require_once BASE_DIR.'/configs/config.php';
+if(!PROFILE) {
+    echo "Please configure the search engine instance by visiting" .
+        "its web interface on localhost.\n";
+    exit();
+}
+
+/** NO_CACHE means don't try to use memcache */
+define("NO_CACHE", true);
+
+/** USE_CACHE false rules out file cache as well */
+define("USE_CACHE", false);
+
+/** Load the iterator classes */
+require_once BASE_DIR."/lib/archive_bundle_iterators/arc_archive_bundle_iterator.php";
+require_once BASE_DIR."/lib/archive_bundle_iterators/archive_bundle_iterator.php";
+require_once BASE_DIR."/lib/archive_bundle_iterators/mediawiki_bundle_iterator.php";
+require_once BASE_DIR."/lib/archive_bundle_iterators/odp_rdf_bundle_iterator.php";
+require_once BASE_DIR."/lib/archive_bundle_iterators/web_archive_bundle_iterator.php";
+
+/** Load FetchUrl, used by the MediaWiki archive iterator */
+require_once BASE_DIR."/lib/fetch_url.php";
+
+/** Load FetchUrl, used by the MediaWiki archive iterator */
+require_once BASE_DIR."/lib/utility.php";
+
+/** Loads common constants for web crawling */
+require_once BASE_DIR."/lib/crawl_constants.php";
+
+/*
+ *  We'll set up multi-byte string handling to use UTF-8
+ */
+mb_internal_encoding("UTF-8");
+mb_regex_encoding("UTF-8");
+
+/**
+ */
+class ArcExtractor implements CrawlConstants
+{
+
+    const DEFAULT_EXTRACT_NUM = 50;
+
+    const MAX_BUFFER_PAGES = 200;
+
+    /**
+     * Runs the ArcExtractor on the supplied command line arguments
+     */
+    function start()
+    {
+        global $argv;
+
+        $num_to_extract = self::DEFAULT_EXTRACT_NUM;
+
+        if(count($argv) < 2) {
+            $this->usageMessageAndExit();
+        }
+
+        $archive_name = $argv[1];
+        if(!file_exists($archive_name)) {
+            $archive_name = CRAWL_DIR."/cache/".$archive_name;
+            if(!file_exists($archive_name)) {
+                echo "{$archive_name} doesn't exist";
+                exit;
+            }
+        }
+
+        if(isset($argv[2])) {
+            $num_to_extract = max(1, intval($argv[2]));
+        }
+
+        $this->outputShowPages($archive_name, $num_to_extract);
+    }
+
+    /**
+     * Used to list out the pages/summaries stored in a bundle
+     * $archive_name. It lists to stdout $num many documents starting from
+     * either the beginning or wherever the last run left off.
+     *
+     * @param string $archive_name name of bundle to list documents for
+     * @param int $num number of documents to list
+     */
+    function outputShowPages($archive_name, $total)
+    {
+        $fields_to_print = array(
+            self::URL => "URL",
+            self::IP_ADDRESSES => "IP ADDRESSES",
+            self::TIMESTAMP => "DATE",
+            self::HTTP_CODE => "HTTP RESPONSE CODE",
+            self::TYPE => "MIMETYPE",
+            self::ENCODING => "CHARACTER ENCODING",
+            self::DESCRIPTION => "DESCRIPTION",
+            self::PAGE => "PAGE DATA");
+        $archive_kind = $this->getArchiveKind($archive_name);
+        if($archive_kind === false) {
+            $this->badFormatMessageAndExit($archive_name);
+        }
+        $iterator = $this->instantiateIterator($archive_name, $archive_kind);
+        if($iterator === false) {
+            $this->badFormatMessageAndExit($archive_name);
+        }
+        $seen = 0;
+        while(!$iterator->end_of_iterator && $seen < $total) {
+            $num_to_get = min(self::MAX_BUFFER_PAGES, $total - $seen);
+            $objects = $iterator->nextPages($num_to_get);
+            $seen += count($objects);
+            foreach($objects as $object) {
+                $out = "";
+                if(isset($object[self::TIMESTAMP])) {
+                    $object[self::TIMESTAMP] =
+                        date("r", $object[self::TIMESTAMP]);
+                }
+                foreach($fields_to_print as $key => $name) {
+                    if(isset($object[$key])) {
+                        $out .= "[$name]\n";
+                        if($key != self::IP_ADDRESSES) {
+                            $out .= $object[$key]."\n";
+                        } else {
+                            foreach($object[$key] as $address) {
+                                $out .= $address."\n";
+                            }
+                        }
+                    }
+                }
+                $out .= "==========\n\n";
+                echo "BEGIN ITEM, LENGTH:".strlen($out)."\n";
+                echo $out;
+            }
+        }
+    }
+
+    /**
+     * Given a folder name, determines the kind of bundle (if any) it holds.
+     * It does this based on the expected location of the arc_description.ini
+     * file.
+     *
+     * @param string $archive_name the name of folder
+     * @return string the archive bundle type or false if no arc_type is found
+     */
+    function getArchiveKind($archive_name)
+    {
+        $desc_path = "$archive_name/arc_description.ini";
+        if(file_exists($desc_path)) {
+            $desc = parse_ini_file($desc_path);
+            if(!isset($desc['arc_type'])) {
+                return false;
+            }
+            return $desc['arc_type'];
+        }
+        return false;
+    }
+
+    function instantiateIterator($archive_name, $iterator_type)
+    {
+        $iterate_timestamp = filectime($archive_name);
+        $result_timestamp = strval(time());
+        // Create the result dir under the current directory, and name it after
+        // the iterate timestamp so that running the tool twice on the same
+        // archive will result in the second run picking up where the first one
+        // left off.
+        $this->result_name = 'ArchiveExtract'.$iterate_timestamp;
+        if(!file_exists($this->result_name)) {
+            mkdir($this->result_name);
+        }
+        $iterator_class = "{$iterator_type}Iterator";
+        $iterator = new $iterator_class($iterate_timestamp, $archive_name,
+            $result_timestamp, $this->result_name);
+        return $iterator;
+    }
+
+    /**
+     * Outputs the "hey, this isn't a known bundle message" and then exit()'s.
+     */
+    function badFormatMessageAndExit($archive_name)
+    {
+        echo "{$archive_name} does not appear to be a valid archive type\n";
+        exit();
+    }
+
+    /**
+     * Outputs the "how to use this tool message" and then exit()'s.
+     */
+    function usageMessageAndExit()
+    {
+        echo "\nDescription coming soon.\n";
+        exit();
+    }
+}
+
+$arc_extractor = new ArcExtractor();
+$arc_extractor->start();
+
+?>
diff --git a/bin/fetcher.php b/bin/fetcher.php
index 4e16fc9c1..e9afdab3c 100755
--- a/bin/fetcher.php
+++ b/bin/fetcher.php
@@ -183,33 +183,39 @@ class Fetcher implements CrawlConstants
      * @var array
      */
     var $meta_words;
+
     /**
      * WebArchiveBundle  used to store complete web pages and auxiliary data
      * @var object
      */
     var $web_archive;
+
     /**
      * Timestamp of the current crawl
      * @var int
      */
     var $crawl_time;
+
     /**
      * Contains the list of web pages to crawl from a queue_server
      * @var array
      */
     var $to_crawl;
+
     /**
      * Contains the list of web pages to crawl that failed on first attempt
      * (we give them one more try before bailing on them)
      * @var array
      */
     var $to_crawl_again;
+
     /**
      * Summary information for visited sites that the fetcher hasn't sent to
      * a queue_server yet
      * @var array
      */
     var $found_sites;
+
     /**
      * Timestamp from a queue_server of the current schedule of sites to
      * download. This is sent back to the server once this schedule is completed
@@ -217,24 +223,28 @@ class Fetcher implements CrawlConstants
      * @var int
      */
     var $schedule_time;
+
     /**
      * The sum of the number of words of all the page description for the current
      * crawl. This is used in computing document statistics.
      * @var int
      */
     var $sum_seen_site_description_length;
+
     /**
      * The sum of the number of words of all the page titles for the current
      * crawl. This is used in computing document statistics.
      * @var int
      */
     var $sum_seen_title_length;
+
     /**
      * The sum of the number of words in all the page links for the current
      * crawl. This is used in computing document statistics.
      * @var int
      */
     var $sum_seen_site_link_length;
+
     /**
      * Number of sites crawled in the current crawl
      * @var int
@@ -257,14 +267,25 @@ class Fetcher implements CrawlConstants
     var $crawl_type;

     /**
-     * Maximum number of bytes to download of a webpage
-     * @var int
+     * For an archive crawl, holds the name of the type of archive being
+     * iterated over (this is the class name of the iterator, without the word
+     * 'Iterator')
+     * @var string
      */
-    var $page_range_request;
+    var $arc_type;

     /**
-     * If self::ARCHIVE_CRAWL is being down, then this field holds the iterator
-     * object used to iterate over the archive
+     * For a non-web archive crawl, holds the path to the directory that
+     * contains the archive files and their description (web archives have a
+     * different structure and are already distributed across machines and
+     * fetchers)
+     * @var string
+     */
+    var $arc_dir;
+
+    /**
+     * If an web archive crawl (i.e. a re-crawl) is active then this field
+     * holds the iterator object used to iterate over the archive
      * @var object
      */
     var $archive_iterator;
@@ -290,6 +311,12 @@ class Fetcher implements CrawlConstants
      */
     var $fetcher_num;

+    /**
+     * Maximum number of bytes to download of a webpage
+     * @var int
+     */
+    var $page_range_request;
+
     /**
      * An array to keep track of hosts which have had a lot of http errors
      * @var array
@@ -401,7 +428,7 @@ class Fetcher implements CrawlConstants
                 if($info[self::CRAWL_TIME] == 0) {
                     $info[self::STATUS] = self::NO_DATA_STATE;
                 }
-            } else if ($this->crawl_type == self::ARCHIVE_CRAWL &&
+            } else if($this->crawl_type == self::ARCHIVE_CRAWL &&
                     !empty($this->arc_dir)) {
                 // An archive crawl with data coming from the name server.
                 $info = $this->checkArchiveScheduler();
@@ -700,6 +727,10 @@ class Fetcher implements CrawlConstants
                 $this->crawl_type = $info[self::CRAWL_TYPE];
                 $this->arc_dir = $info[self::ARC_DIR];
                 $this->arc_type = $info[self::ARC_TYPE];
+            } else {
+                $this->crawl_type = self::WEB_CRAWL;
+                $this->arc_dir = '';
+                $this->arc_type = '';
             }

             // Load any batch that might exist for changed-to crawl
@@ -844,8 +875,13 @@ class Fetcher implements CrawlConstants
         }

         $end_info = strpos($response_string, "\n");
-        $info_string = substr($response_string, 0, $end_info);
-        $info = unserialize($info_string);
+        if($end_info !== false && ($info_string =
+                substr($response_string, 0, $end_info)) != '') {
+            $info = unserialize($info_string);
+        } else {
+            $info = array();
+            $info[self::STATUS] = self::NO_DATA_STATE;
+        }
         $this->setCrawlParamsFromArray($info);

         if(isset($info[self::SITES])) {
diff --git a/configs/config.php b/configs/config.php
old mode 100755
new mode 100644
index 03ff9aaac..06428c100
--- a/configs/config.php
+++ b/configs/config.php
@@ -265,6 +265,17 @@ define('MAX_PHRASE_LEN', 2);
 /** number of multi curl page requests in one go */
 define('NUM_MULTI_CURL_PAGES', 100);

+/** number of pages to extract from an archive in one go */
+define('ARCHIVE_BATCH_SIZE', 500);
+
+/**
+    Time in seconds to wait to acquire an exclusive lock before we're no longer
+    allowed to extract the next batch of pages for an archive crawl. This is
+    intended to prevent a fetcher from waiting to acquire the lock, then
+    getting it just before cURL gives up and times out the request.
+ */
+define('ARCHIVE_LOCK_TIMEOUT', 8);
+
 /** time in seconds before we give up on a page */
 define('PAGE_TIMEOUT', 30);

diff --git a/controllers/fetch_controller.php b/controllers/fetch_controller.php
index e536fb954..97e7c31f1 100755
--- a/controllers/fetch_controller.php
+++ b/controllers/fetch_controller.php
@@ -161,9 +161,19 @@ class FetchController extends Controller implements CrawlConstants
         $this->displayView($view, $data);
     }

+    /**
+     * Checks to see whether there are more pages to extract from the current
+     * archive, and if so returns the next batch to the requesting fetcher. The
+     * iteration progress is automatically saved on each call to nextPages, so
+     * that the next fetcher will get the next batch of pages. If there is no
+     * current archive to iterate over, or the iterator has reached the end of
+     * the archive then indicate that there is no more data by setting the
+     * status to NO_DATA_STATE.
+     */
     function archiveSchedule()
     {
         $view = "fetch";
+        $request_start = time();

         if(isset($_REQUEST['crawl_time'])) {;
             $crawl_time = $this->clean($_REQUEST['crawl_time'], 'int');
@@ -177,6 +187,8 @@ class FetchController extends Controller implements CrawlConstants
             $fetch_pages = true;
             $info = unserialize(file_get_contents($messages_filename));
             if($info[self::STATUS] == 'STOP_CRAWL') {
+                // The stop crawl message gets created by the admin_controller
+                // when the "stop crawl" button is pressed.
                 @unlink($messages_filename);
                 @unlink($lock_filename);
                 $fetch_pages = false;
@@ -187,13 +199,22 @@ class FetchController extends Controller implements CrawlConstants
             $info = array();
         }

+        $pages = array();
         if($fetch_pages) {
             $archive_iterator = NULL;
-            $pages = array();

+            // Start by trying to acquire an exclusive lock on the iterator
+            // lock file, so that the same batch of pages isn't extracted more
+            // than once.  For now the call to acquire the lock blocks, so that
+            // fetchers will queue up. If the time between requesting the lock
+            // and acquiring it is greater than ARCHIVE_LOCK_TIMEOUT then we
+            // give up on this request and try back later.
             $lock_fd = fopen($lock_filename, 'w');
             $have_lock = flock($lock_fd, LOCK_EX);
-            if(file_exists($info[self::ARC_DIR]) && $have_lock) {
+            $elapsed_time = time() - $request_start;
+
+            if($have_lock && $elapsed_time <= ARCHIVE_LOCK_TIMEOUT &&
+                    file_exists($info[self::ARC_DIR])) {
                 $iterate_timestamp = $info[self::CRAWL_INDEX];
                 $iterate_dir = $info[self::ARC_DIR];
                 $result_timestamp = $crawl_time;
@@ -217,30 +238,28 @@ class FetchController extends Controller implements CrawlConstants
             }

             if($archive_iterator && !$archive_iterator->end_of_iterator) {
-                if($archive_iterator->end_of_iterator) {
-                    // Stop crawl here.
-                } else {
-                    $info[self::SITES] = array();
-                    $pages = $archive_iterator->nextPages(
-                        500);
-                    $delta = time() - $time;
-                    debugLogFile("fetch took $delta seconds", "nameserver");
-                }
+                $info[self::SITES] = array();
+                $pages = $archive_iterator->nextPages(
+                    ARCHIVE_BATCH_SIZE);
+                $delta = time() - $time;
             }

             if($have_lock) {
                 flock($lock_fd, LOCK_UN);
             }
             fclose($lock_fd);
+        }

-            $info_string = serialize($info);
+        if(!empty($pages)) {
             $pages_string = gzcompress(serialize($pages));
-            $data['MESSAGE'] = $info_string."\n".$pages_string;
         } else {
             $info[self::STATUS] = self::NO_DATA_STATE;
-            $data['MESSAGE'] = serialize($info)."\n";
+            $pages_string = '';
         }

+        $info_string = serialize($info);
+        $data['MESSAGE'] = $info_string."\n".$pages_string;
+
         $this->displayView($view, $data);
     }

diff --git a/lib/archive_bundle_iterators/arc_archive_bundle_iterator.php b/lib/archive_bundle_iterators/arc_archive_bundle_iterator.php
index cd704ed70..02d8cc1ae 100644
--- a/lib/archive_bundle_iterators/arc_archive_bundle_iterator.php
+++ b/lib/archive_bundle_iterators/arc_archive_bundle_iterator.php
@@ -53,12 +53,22 @@ require_once BASE_DIR.
 class ArcArchiveBundleIterator extends ArchiveBundleIterator
     implements CrawlConstants
 {
+    /**
+     * The path to the directory containing the archive partitions to be
+     * iterated over.
+     * @var string
+     */
+    var $iterate_dir;
+    /**
+     * The path to the directory where the iteration status is stored.
+     * @var string
+     */
+    var $result_dir;
     /**
      * The number of arc files in this arc archive bundle
      *  @var int
      */
     var $num_partitions;
-
     /**
      *  Counting in glob order for this arc archive bundle directory, the
      *  current active file number of the arc file being process.
@@ -67,7 +77,12 @@ class ArcArchiveBundleIterator extends ArchiveBundleIterator
      */
     var $current_partition_num;
     /**
-        current byte offset into the current arc file
+     *  current number of pages into the current arc file
+     *  @var int
+     */
+    var $current_page_num;
+    /**
+     *  current byte offset into the current arc file
      *  @var int
      */
     var $current_offset;
@@ -82,7 +97,6 @@ class ArcArchiveBundleIterator extends ArchiveBundleIterator
      */
     var $fh;

-
     /**
      * Creates a arc archive iterator with the given parameters.
      *
@@ -91,27 +105,21 @@ class ArcArchiveBundleIterator extends ArchiveBundleIterator
      * @param string $result_timestamp timestamp of the arc archive bundle
      *      results are being stored in
      */
-    function __construct($prefix, $iterate_timestamp, $result_timestamp)
+    function __construct($iterate_timestamp, $iterate_dir,
+        $result_timestamp, $result_dir)
     {
-        $this->fetcher_prefix = $prefix;
         $this->iterate_timestamp = $iterate_timestamp;
+        $this->iterate_dir = $iterate_dir;
         $this->result_timestamp = $result_timestamp;
-        $archive_name = $this->get_archive_name($iterate_timestamp);
+        $this->result_dir = $result_dir;
         $this->partitions = array();
-        foreach(glob("$archive_name/*.arc.gz") as $filename) {
+        foreach(glob("{$this->iterate_dir}/*.arc.gz") as $filename) {
             $this->partitions[] = $filename;
         }
         $this->num_partitions = count($this->partitions);

-        if(file_exists("$archive_name/iterate_status.txt")) {
-            $info = unserialize(file_get_contents(
-                "$archive_name/iterate_status.txt"));
-            $this->end_of_iterator = $info['end_of_iterator'];
-            $this->current_partition_num = $info['current_partition_num'];
-            $this->current_offset = $info['current_offset'];
-            $this->fh=gzopen(
-                $this->partitions[$this->current_partition_num], "rb");
-            gzseek($this->fh, $this->current_offset);
+        if(file_exists("{$this->result_dir}/iterate_status.txt")) {
+            $this->restoreCheckpoint();
         } else {
             $this->reset();
         }
@@ -138,8 +146,7 @@ class ArcArchiveBundleIterator extends ArchiveBundleIterator
         $this->end_of_iterator = false;
         $this->current_offset = 0;
         $this->fh = NULL;
-        $archive_name = $this->get_archive_name($this->result_timestamp);
-        @unlink("$archive_name/iterate_status.txt");
+        @unlink("{$this->result_dir}/iterate_status.txt");
     }

     /**
@@ -153,6 +160,7 @@ class ArcArchiveBundleIterator extends ArchiveBundleIterator
     function nextPages($num)
     {
         $pages = array();
+        $page_count = 0;
         for($i = 0; $i < $num; $i++) {
             $page = $this->nextPage();
             if(!$page) {
@@ -168,19 +176,15 @@ class ArcArchiveBundleIterator extends ArchiveBundleIterator
                     $this->partitions[$this->current_partition_num], "rb");
             } else {
                 $pages[] = $page;
+                $page_count++;
             }
         }
         if(is_resource($this->fh)) {
             $this->current_offset = gztell($this->fh);
+            $this->current_page_num += $page_count;
         }

-        $archive_name = $this->get_archive_name($this->result_timestamp);
-        $info = array();
-        $info['end_of_iterator'] = $this->end_of_iterator;
-        $info['current_partition_num'] = $this->current_partition_num;
-        $info['current_offset'] = $this->current_offset;
-        file_put_contents("$archive_name/iterate_status.txt",
-            serialize($info));
+        $this->saveCheckpoint();
         return $pages;
     }

@@ -213,6 +217,5 @@ class ArcArchiveBundleIterator extends ArchiveBundleIterator
         $site[self::WEIGHT] = 1;
         return $site;
     }
-
 }
 ?>
diff --git a/lib/archive_bundle_iterators/archive_bundle_iterator.php b/lib/archive_bundle_iterators/archive_bundle_iterator.php
index 51dfb9bfc..cc4122d75 100644
--- a/lib/archive_bundle_iterators/archive_bundle_iterator.php
+++ b/lib/archive_bundle_iterators/archive_bundle_iterator.php
@@ -53,13 +53,11 @@ abstract class ArchiveBundleIterator implements CrawlConstants
      * @var int
      */
      var $iterate_timestamp;
-
     /**
      * Timestamp of the archive that is being used to store results in
      * @var int
      */
      var $result_timestamp;
-
     /**
      * Whether or not the iterator still has more documents
      * @var bool
@@ -67,22 +65,44 @@ abstract class ArchiveBundleIterator implements CrawlConstants
      var $end_of_iterator;

     /**
-     * The fetcher prefix associated with this archive.
-     * @var string
+     * Stores the current progress to the file iterate_status.txt in the result
+     * dir such that a new instance of the iterator could be constructed and
+     * return the next set of pages without having to process all of the pages
+     * that came before. Each iterator should make a call to saveCheckpoint
+     * after extracting a batch of pages.
+     * @param array $info any extra info a subclass wants to save
      */
-    var $fetcher_prefix;
+    function saveCheckpoint($info = array())
+    {
+        $info['end_of_iterator'] = $this->end_of_iterator;
+        $info['current_partition_num'] = $this->current_partition_num;
+        $info['current_page_num'] = $this->current_page_num;
+        $info['current_offset'] = $this->current_offset;
+        file_put_contents("{$this->result_dir}/iterate_status.txt",
+            serialize($info));
+    }

     /**
-     * Returns the path to an archive given its timestamp.
-     *
-     * @param string $timestamp the archive timestamp
-     * @return string the path to the archive, based off of the fetcher prefix
-     *     used when this iterator was constructed
+     * Restores the internal state from the file iterate_status.txt in the
+     * result dir such that the next call to nextPages will pick up from just
+     * after the last checkpoint. Each iterator should make a call to
+     * restoreCheckpoint at the end of the constructor method after the
+     * instance members have been initialized.
+     * @return array the data serialized when saveCheckpoint was called
      */
-    function get_archive_name($timestamp)
+    function restoreCheckpoint()
     {
-        return CRAWL_DIR.'/cache/'.$this->fetcher_prefix.
-            self::archive_base_name.$timestamp;
+        $info = unserialize(file_get_contents(
+            "{$this->result_dir}/iterate_status.txt"));
+        $this->end_of_iterator = $info['end_of_iterator'];
+        $this->current_partition_num = $info['current_partition_num'];
+        $this->current_offset = $info['current_offset'];
+        if(!$this->end_of_iterator) {
+            $this->fh=gzopen(
+                $this->partitions[$this->current_partition_num], "rb");
+            gzseek($this->fh, $this->current_offset);
+        }
+        return $info;
     }

     /**
diff --git a/lib/archive_bundle_iterators/mediawiki_bundle_iterator.php b/lib/archive_bundle_iterators/mediawiki_bundle_iterator.php
index d60fb9497..8fcd76300 100644
--- a/lib/archive_bundle_iterators/mediawiki_bundle_iterator.php
+++ b/lib/archive_bundle_iterators/mediawiki_bundle_iterator.php
@@ -55,21 +55,30 @@ require_once BASE_DIR.'/lib/bzip2_block_iterator.php';
 class MediaWikiArchiveBundleIterator extends ArchiveBundleIterator
     implements CrawlConstants
 {
+    /**
+     * The path to the directory containing the archive partitions to be
+     * iterated over.
+     * @var string
+     */
+    var $iterate_dir;
+    /**
+     * The path to the directory where the iteration status is stored.
+     * @var string
+     */
+    var $result_dir;
     /**
      * The number of arc files in this arc archive bundle
      *  @var int
      */
     var $num_partitions;
-
     /**
      *  Counting in glob order for this arc archive bundle directory, the
      *  current active file number of the arc file being processed.
-     *
      *  @var int
      */
     var $current_partition_num;
     /**
-        current number of wiki pages into the Media Wiki xml.bz2 file
+     *  current number of wiki pages into the Media Wiki xml.bz2 file
      *  @var int
      */
     var $current_page_num;
@@ -86,7 +95,6 @@ class MediaWikiArchiveBundleIterator extends ArchiveBundleIterator
     /**
      * Used to hold data that was in the buffer but before a siteinfo or a page
      * when that data gets parsed out.
-     *
      *  @var string
      */
     var $remainder;
@@ -158,20 +166,47 @@ class MediaWikiArchiveBundleIterator extends ArchiveBundleIterator
         $this->num_partitions = count($this->partitions);

         if(file_exists("{$this->result_dir}/iterate_status.txt")) {
-            $info = unserialize(file_get_contents(
-                "{$this->result_dir}/iterate_status.txt"));
-            $this->end_of_iterator = $info['end_of_iterator'];
-            $this->current_partition_num = $info['current_partition_num'];
-            $this->current_page_num = $info['current_page_num'];
-            $this->buffer = $info['buffer'];
-            $this->remainder = $info['remainder'];
-            $this->header = $info['header'];
-            $this->bz2_iterator = $info['bz2_iterator'];
+            $this->restoreCheckpoint();
         } else {
             $this->reset();
         }
     }

+    /**
+     * Saves the current state so that a new instantiation can pick up just
+     * after the last batch of pages extracted.
+     */
+    function saveCheckpoint($info = array())
+    {
+        $info['end_of_iterator'] = $this->end_of_iterator;
+        $info['current_partition_num'] = $this->current_partition_num;
+        $info['current_page_num'] = $this->current_page_num;
+        $info['buffer'] = $this->buffer;
+        $info['remainder'] = $this->remainder;
+        $info['header'] = $this->header;
+        $info['bz2_iterator'] = $this->bz2_iterator;
+        file_put_contents("{$this->result_dir}/iterate_status.txt",
+            serialize($info));
+    }
+
+    /**
+     * Restores state from a previous instantiation, after the last batch of
+     * pages extracted.
+     */
+    function restoreCheckpoint()
+    {
+        $info = unserialize(file_get_contents(
+            "{$this->result_dir}/iterate_status.txt"));
+        $this->end_of_iterator = $info['end_of_iterator'];
+        $this->current_partition_num = $info['current_partition_num'];
+        $this->current_page_num = $info['current_page_num'];
+        $this->buffer = $info['buffer'];
+        $this->remainder = $info['remainder'];
+        $this->header = $info['header'];
+        $this->bz2_iterator = $info['bz2_iterator'];
+        return $info;
+    }
+
     /**
      * Estimates the important of the site according to the weighting of
      * the particular archive iterator
@@ -223,7 +258,7 @@ class MediaWikiArchiveBundleIterator extends ArchiveBundleIterator
      */
     function getNextTagData($tag)
     {
-        while(!stristr($this->buffer, "</$tag")) {
+        while(stripos($this->buffer, "</$tag") === false) {
             if(is_null($this->bz2_iterator) || $this->bz2_iterator->is_eof()) {
                 return false;
             }
@@ -333,16 +368,7 @@ class MediaWikiArchiveBundleIterator extends ArchiveBundleIterator
             $this->current_page_num += $page_count;
         }

-        $info = array();
-        $info['end_of_iterator'] = $this->end_of_iterator;
-        $info['current_partition_num'] = $this->current_partition_num;
-        $info['current_page_num'] = $this->current_page_num;
-        $info['buffer'] = $this->buffer;
-        $info['remainder'] = $this->remainder;
-        $info['header'] = $this->header;
-        $info['bz2_iterator'] = $this->bz2_iterator;
-        file_put_contents("{$this->result_dir}/iterate_status.txt",
-            serialize($info));
+        $this->saveCheckpoint();
         return $pages;
     }

diff --git a/lib/archive_bundle_iterators/odp_rdf_bundle_iterator.php b/lib/archive_bundle_iterators/odp_rdf_bundle_iterator.php
index 75d74e290..5ab3a55a5 100644
--- a/lib/archive_bundle_iterators/odp_rdf_bundle_iterator.php
+++ b/lib/archive_bundle_iterators/odp_rdf_bundle_iterator.php
@@ -54,25 +54,34 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator
     implements CrawlConstants
 {
     /**
-     * The number of arc files in this arc archive bundle
+     * The path to the directory containing the archive partitions to be
+     * iterated over.
+     * @var string
+     */
+    var $iterate_dir;
+    /**
+     * The path to the directory where the iteration status is stored.
+     * @var string
+     */
+    var $result_dir;
+    /**
+     * The number of odp rdf files in this archive bundle
      *  @var int
      */
     var $num_partitions;
-
     /**
-     *  Counting in glob order for this arc archive bundle directory, the
-     *  current active file number of the arc file being process.
-     *
+     *  Counting in glob order for this odp rdf archive bundle directory, the
+     *  current active file number of the file being processed.
      *  @var int
      */
     var $current_partition_num;
     /**
-        current number of wiki pages into the Media Wiki xml.bz2 file
+     *  current number of pages into the current odp rdf file
      *  @var int
      */
     var $current_page_num;
     /**
-     *  Array of filenames of arc files in this directory (glob order)
+     *  Array of filenames of odp rdf files in this directory (glob order)
      *  @var array
      */
     var $partitions;
@@ -82,7 +91,7 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator
      */
     var $buffer;
     /**
-     *  Associative array containing global properties like base url of th
+     *  Associative array containing global properties like base url of the
      *  current open odp rdf file
      *  @var array
      */
@@ -92,11 +101,17 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator
      *  @var resource
      */
     var $fh;
+    /**
+     *  Offset into the current odp rdf file
+     *  @var int
+     */
+    var $current_offset;

     /**
-     * How many bytes to read into buffer from bz2 stream in one go
+     * How many bytes to read into buffer from gzip stream in one go
+     * @var int
      */
-    const BLOCK_SIZE = 8192;
+    const BLOCK_SIZE = 1024;

     /**
      * Creates an open directory rdf archive iterator with the given parameters.
@@ -106,14 +121,15 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator
      * @param string $result_timestamp timestamp of the arc archive bundle
      *      results are being stored in
      */
-    function __construct($prefix, $iterate_timestamp, $result_timestamp)
+    function __construct($iterate_timestamp, $iterate_dir,
+            $result_timestamp, $result_dir)
     {
-        $this->fetcher_prefix = $prefix;
         $this->iterate_timestamp = $iterate_timestamp;
+        $this->iterate_dir = $iterate_dir;
         $this->result_timestamp = $result_timestamp;
-        $archive_name = $this->get_archive_name($iterate_timestamp);
+        $this->result_dir = $result_dir;
         $this->partitions = array();
-        foreach(glob("$archive_name/*.gz") as $filename) {
+        foreach(glob("{$this->iterate_dir}/*.gz") as $filename) {
             $this->partitions[] = $filename;
         }
         $this->num_partitions = count($this->partitions);
@@ -121,22 +137,31 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator
         $url_parts = @parse_url($this->header['base_address']);
         $this->header['ip_address'] = gethostbyname($url_parts['host']);

-        if(file_exists("$archive_name/iterate_status.txt")) {
-            $info = unserialize(file_get_contents(
-                "$archive_name/iterate_status.txt"));
-            $this->end_of_iterator = $info['end_of_iterator'];
-            $this->current_partition_num = $info['current_partition_num'];
-            $this->current_offset = $info['current_offset'];
-
-            $this->fh=gzopen(
-                $this->partitions[$this->current_partition_num], "r");
-            $this->buffer = "";
-            $this->readPages($this->current_page_num, false);
+        if(file_exists("{$this->result_dir}/iterate_status.txt")) {
+            $this->restoreCheckpoint();
         } else {
             $this->reset();
         }
     }

+    /**
+     * Add the buffer contents to the standard gzip archive checkpoint.
+     */
+    function saveCheckpoint($info = array())
+    {
+        $info['buffer'] = $this->buffer;
+        parent::saveCheckpoint($info);
+    }
+
+    /**
+     * Restore the buffer from the checkpoint info.
+     */
+    function restoreCheckpoint()
+    {
+        $info = parent::restoreCheckpoint();
+        $this->buffer = $info['buffer'];
+    }
+
     /**
      * Estimates the important of the site according to the weighting of
      * the particular archive iterator
@@ -160,34 +185,33 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator
      */
     function getNextTagsData($tags)
     {
+        $max_tag_len = 0;
+        $regex = '@<('.implode('|', $tags).')[^>]*?>.*?</\1[^>]*?>@si';
+        foreach($tags as $tag) {
+            $max_tag_len = max(strlen($tag) + 2, $max_tag_len);
+        }
+        $done = false;
+        $search_failed = false;
+        $offset = 0;
         do {
-            $done = false;
-            if(!$this->fh || feof($this->fh)) {return false;}
+            if($search_failed && (!$this->fh || feof($this->fh))) {
+                return false;
+            }
             $this->buffer .= gzread($this->fh, self::BLOCK_SIZE);
-
-            foreach($tags as $tag) {
-                if(stristr($this->buffer, "</$tag")) {
-                    $done = true;
-                }
+            if(preg_match($regex, $this->buffer, $matches,
+                    PREG_OFFSET_CAPTURE, $offset)) {
+                $done = true;
+                $search_failed = false;
+            } else {
+                $search_failed = true;
             }
+            $offset = max(0, strlen($this->buffer) - $max_tag_len);
         } while(!$done);
-        $found_tag = "";
-        $min_pos_tag = strlen($this->buffer);
-        foreach($tags as $tag) {
-            $pos_tag = strpos($this->buffer, $tag);
-            if( $pos_tag !== false) {
-                if($found_tag == "" || $pos_tag < $min_pos_tag) {
-                    $found_tag = $tag;
-                    $min_pos_tag = $pos_tag;
-                }
-            }
-        }
-        $start_info = strpos($this->buffer, "<$found_tag");
-        $pre_end_info = strpos($this->buffer, "</$found_tag", $start_info);
-        $end_info = strpos($this->buffer, ">", $pre_end_info) + 1;
-        $tag_info = substr($this->buffer, $start_info,
-            $end_info - $start_info);
-        $this->buffer = substr($this->buffer, $end_info);
+        $found_tag = $matches[1][0];
+        $start = $matches[0][1];
+        $length = strlen($matches[0][0]);
+        $tag_info = substr($this->buffer, $start, $length);
+        $this->buffer = substr($this->buffer, $start + $length);
         return array($tag_info, $found_tag);
     }

@@ -263,11 +287,10 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator
     {
         $this->current_partition_num = -1;
         $this->end_of_iterator = false;
-        $this->current_offset = 0;
         $this->fh = NULL;
+        $this->current_offset = 0;
         $this->buffer = "";
-        $archive_name = $this->get_archive_name($this->result_timestamp);
-        @unlink("$archive_name/iterate_status.txt");
+        @unlink("{$this->result_dir}/iterate_status.txt");
     }

     /**
@@ -307,6 +330,7 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator
                 }
                 $this->fh = gzopen(
                     $this->partitions[$this->current_partition_num], "r");
+                $this->current_offset = 0;
             } else {
                 if($return_pages) {
                     $pages[] = $page;
@@ -315,16 +339,11 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator
             }
         }
         if(is_resource($this->fh)) {
+            $this->current_offset = gztell($this->fh);
             $this->current_page_num += $page_count;
         }

-        $archive_name = $this->get_archive_name($this->result_timestamp);
-        $info = array();
-        $info['end_of_iterator'] = $this->end_of_iterator;
-        $info['current_partition_num'] = $this->current_partition_num;
-        $info['current_page_num'] = $this->current_page_num;
-        file_put_contents("$archive_name/iterate_status.txt",
-            serialize($info));
+        $this->saveCheckpoint();
         return $pages;
     }

@@ -336,8 +355,12 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator
     function readPage($return_page)
     {
         if(!is_resource($this->fh)) return NULL;
-        list($page_info, $tag) = $this->getNextTagsData(
+        $tag_data = $this->getNextTagsData(
             array("Topic","ExternalPage"));
+        if(!$tag_data) {
+            return false;
+        }
+        list($page_info, $tag) = $tag_data;
         if(!$return_page) {
             return true;
         }
diff --git a/lib/archive_bundle_iterators/web_archive_bundle_iterator.php b/lib/archive_bundle_iterators/web_archive_bundle_iterator.php
index 3cd2a399c..680610945 100644
--- a/lib/archive_bundle_iterators/web_archive_bundle_iterator.php
+++ b/lib/archive_bundle_iterators/web_archive_bundle_iterator.php
@@ -89,6 +89,24 @@ class WebArchiveBundleIterator extends ArchiveBundleIterator
      * @var object
      */
     var $archive;
+    /**
+     * The fetcher prefix associated with this archive.
+     * @var string
+     */
+    var $fetcher_prefix;
+
+    /**
+     * Returns the path to an archive given its timestamp.
+     *
+     * @param string $timestamp the archive timestamp
+     * @return string the path to the archive, based off of the fetcher prefix
+     *     used when this iterator was constructed
+     */
+    function get_archive_name($timestamp)
+    {
+        return CRAWL_DIR.'/cache/'.$this->fetcher_prefix.
+            self::archive_base_name.$timestamp;
+    }

     /**
      * Creates a web archive iterator with the given parameters.
@@ -108,25 +126,50 @@ class WebArchiveBundleIterator extends ArchiveBundleIterator
         $this->archive = new WebArchiveBundle($archive_name);
         $archive_name = $this->get_archive_name($result_timestamp);
         if(file_exists("$archive_name/iterate_status.txt")) {
-            $info = unserialize(file_get_contents(
-                "$archive_name/iterate_status.txt"));
-            $this->count = $this->archive->count;
-            $this->num_partitions = $this->archive->write_partition+1;
-            $this->overall_index = $info['overall_index'];
-            $this->end_of_iterator = $info['end_of_iterator'];
-            $this->partition_index = $info['partition_index'];
-            $this->current_partition_num = $info['current_partition_num'];
-            $this->partition =  $this->archive->getPartition(
-                    $this->current_partition_num, false);
-            $this->partition->iterator_pos = $info['iterator_pos'];
+            $this->restoreCheckpoint();
         } else {
             $this->reset();
         }
+    }
+
+    /**
+     * Saves the current state so that a new instantiation can pick up just
+     * after the last batch of pages extracted.
+     */
+    function saveCheckpoint($info = array())
+    {
+        $info['overall_index'] = $this->overall_index;
+        $info['end_of_iterator'] = $this->end_of_iterator;
+        $info['partition_index'] = $this->partition_index;
+        $info['current_partition_num'] = $this->current_partition_num;
+        $info['iterator_pos'] = $this->partition->iterator_pos;
+        $archive_name = $this->get_archive_name($this->result_timestamp);
+        file_put_contents("$archive_name/iterate_status.txt",
+            serialize($info));
+    }

+    /**
+     * Restores state from a previous instantiation, after the last batch of
+     * pages extracted.
+     */
+    function restoreCheckpoint()
+    {
+        $info = unserialize(file_get_contents(
+            "$archive_name/iterate_status.txt"));
+        $this->count = $this->archive->count;
+        $this->num_partitions = $this->archive->write_partition+1;
+        $this->overall_index = $info['overall_index'];
+        $this->end_of_iterator = $info['end_of_iterator'];
+        $this->partition_index = $info['partition_index'];
+        $this->current_partition_num = $info['current_partition_num'];
+        $this->partition =  $this->archive->getPartition(
+                $this->current_partition_num, false);
+        $this->partition->iterator_pos = $info['iterator_pos'];
+        return $info;
     }

     /**
-     * Estimates the important of the site according to the weighting of
+     * Estimates the importance of the site according to the weighting of
      * the particular archive iterator
      * @param $site an associative array containing info about a web page
      * @return bool false we assume files were crawled roughly according to
@@ -171,16 +214,7 @@ class WebArchiveBundleIterator extends ArchiveBundleIterator
         $this->end_of_iterator = ($this->overall_index >= $this->count ) ?
             true : false;

-        $archive_name = $this->get_archive_name($this->result_timestamp);
-        $info = array();
-        $info['overall_index'] = $this->overall_index;
-        $info['end_of_iterator'] = $this->end_of_iterator;
-        $info['partition_index'] = $this->partition_index;
-        $info['current_partition_num'] = $this->current_partition_num;
-        $info['iterator_pos'] =$this->partition->iterator_pos;
-        file_put_contents("$archive_name/iterate_status.txt",
-            serialize($info));
-
+        $this->saveCheckpoint();
         return $objects;
     }
ViewGit