diff --git a/bin/arc_extract.php b/bin/arc_extract.php new file mode 100755 index 000000000..15b19c3c4 --- /dev/null +++ b/bin/arc_extract.php @@ -0,0 +1,240 @@ +<?php +/** + * SeekQuarry/Yioop -- + * Open Source Pure PHP Search Engine, Crawler, and Indexer + * + * Copyright (C) 2009 - 2012 Chris Pollett chris@pollett.org + * + * LICENSE: + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * END LICENSE + * + * @author Shawn Tice sctice@gmail.com + * @package seek_quarry + * @subpackage bin + * @license http://www.gnu.org/licenses/ GPL3 + * @link http://www.seekquarry.com/ + * @copyright 2009 - 2012 + * @filesource + */ + +if(php_sapi_name() != 'cli') {echo "BAD REQUEST"; exit();} + +/** Calculate base directory of script @ignore */ +define("BASE_DIR", substr( + dirname(realpath($_SERVER['PHP_SELF'])), 0, + -strlen("/bin"))); + +/** Some pages are huge, and the page hash function can run out of memory + * stripping script, noscript, and style tags. */ +ini_set("memory_limit","500M"); + +/** This tool does not need logging */ +define("LOG_TO_FILES", false); + +/** Load in global configuration settings */ +require_once BASE_DIR.'/configs/config.php'; +if(!PROFILE) { + echo "Please configure the search engine instance by visiting" . + "its web interface on localhost.\n"; + exit(); +} + +/** NO_CACHE means don't try to use memcache */ +define("NO_CACHE", true); + +/** USE_CACHE false rules out file cache as well */ +define("USE_CACHE", false); + +/** Load the iterator classes */ +require_once BASE_DIR."/lib/archive_bundle_iterators/arc_archive_bundle_iterator.php"; +require_once BASE_DIR."/lib/archive_bundle_iterators/archive_bundle_iterator.php"; +require_once BASE_DIR."/lib/archive_bundle_iterators/mediawiki_bundle_iterator.php"; +require_once BASE_DIR."/lib/archive_bundle_iterators/odp_rdf_bundle_iterator.php"; +require_once BASE_DIR."/lib/archive_bundle_iterators/web_archive_bundle_iterator.php"; + +/** Load FetchUrl, used by the MediaWiki archive iterator */ +require_once BASE_DIR."/lib/fetch_url.php"; + +/** Load FetchUrl, used by the MediaWiki archive iterator */ +require_once BASE_DIR."/lib/utility.php"; + +/** Loads common constants for web crawling */ +require_once BASE_DIR."/lib/crawl_constants.php"; + +/* + * We'll set up multi-byte string handling to use UTF-8 + */ +mb_internal_encoding("UTF-8"); +mb_regex_encoding("UTF-8"); + +/** + */ +class ArcExtractor implements CrawlConstants +{ + + const DEFAULT_EXTRACT_NUM = 50; + + const MAX_BUFFER_PAGES = 200; + + /** + * Runs the ArcExtractor on the supplied command line arguments + */ + function start() + { + global $argv; + + $num_to_extract = self::DEFAULT_EXTRACT_NUM; + + if(count($argv) < 2) { + $this->usageMessageAndExit(); + } + + $archive_name = $argv[1]; + if(!file_exists($archive_name)) { + $archive_name = CRAWL_DIR."/cache/".$archive_name; + if(!file_exists($archive_name)) { + echo "{$archive_name} doesn't exist"; + exit; + } + } + + if(isset($argv[2])) { + $num_to_extract = max(1, intval($argv[2])); + } + + $this->outputShowPages($archive_name, $num_to_extract); + } + + /** + * Used to list out the pages/summaries stored in a bundle + * $archive_name. It lists to stdout $num many documents starting from + * either the beginning or wherever the last run left off. + * + * @param string $archive_name name of bundle to list documents for + * @param int $num number of documents to list + */ + function outputShowPages($archive_name, $total) + { + $fields_to_print = array( + self::URL => "URL", + self::IP_ADDRESSES => "IP ADDRESSES", + self::TIMESTAMP => "DATE", + self::HTTP_CODE => "HTTP RESPONSE CODE", + self::TYPE => "MIMETYPE", + self::ENCODING => "CHARACTER ENCODING", + self::DESCRIPTION => "DESCRIPTION", + self::PAGE => "PAGE DATA"); + $archive_kind = $this->getArchiveKind($archive_name); + if($archive_kind === false) { + $this->badFormatMessageAndExit($archive_name); + } + $iterator = $this->instantiateIterator($archive_name, $archive_kind); + if($iterator === false) { + $this->badFormatMessageAndExit($archive_name); + } + $seen = 0; + while(!$iterator->end_of_iterator && $seen < $total) { + $num_to_get = min(self::MAX_BUFFER_PAGES, $total - $seen); + $objects = $iterator->nextPages($num_to_get); + $seen += count($objects); + foreach($objects as $object) { + $out = ""; + if(isset($object[self::TIMESTAMP])) { + $object[self::TIMESTAMP] = + date("r", $object[self::TIMESTAMP]); + } + foreach($fields_to_print as $key => $name) { + if(isset($object[$key])) { + $out .= "[$name]\n"; + if($key != self::IP_ADDRESSES) { + $out .= $object[$key]."\n"; + } else { + foreach($object[$key] as $address) { + $out .= $address."\n"; + } + } + } + } + $out .= "==========\n\n"; + echo "BEGIN ITEM, LENGTH:".strlen($out)."\n"; + echo $out; + } + } + } + + /** + * Given a folder name, determines the kind of bundle (if any) it holds. + * It does this based on the expected location of the arc_description.ini + * file. + * + * @param string $archive_name the name of folder + * @return string the archive bundle type or false if no arc_type is found + */ + function getArchiveKind($archive_name) + { + $desc_path = "$archive_name/arc_description.ini"; + if(file_exists($desc_path)) { + $desc = parse_ini_file($desc_path); + if(!isset($desc['arc_type'])) { + return false; + } + return $desc['arc_type']; + } + return false; + } + + function instantiateIterator($archive_name, $iterator_type) + { + $iterate_timestamp = filectime($archive_name); + $result_timestamp = strval(time()); + // Create the result dir under the current directory, and name it after + // the iterate timestamp so that running the tool twice on the same + // archive will result in the second run picking up where the first one + // left off. + $this->result_name = 'ArchiveExtract'.$iterate_timestamp; + if(!file_exists($this->result_name)) { + mkdir($this->result_name); + } + $iterator_class = "{$iterator_type}Iterator"; + $iterator = new $iterator_class($iterate_timestamp, $archive_name, + $result_timestamp, $this->result_name); + return $iterator; + } + + /** + * Outputs the "hey, this isn't a known bundle message" and then exit()'s. + */ + function badFormatMessageAndExit($archive_name) + { + echo "{$archive_name} does not appear to be a valid archive type\n"; + exit(); + } + + /** + * Outputs the "how to use this tool message" and then exit()'s. + */ + function usageMessageAndExit() + { + echo "\nDescription coming soon.\n"; + exit(); + } +} + +$arc_extractor = new ArcExtractor(); +$arc_extractor->start(); + +?> diff --git a/bin/fetcher.php b/bin/fetcher.php index 4e16fc9c1..e9afdab3c 100755 --- a/bin/fetcher.php +++ b/bin/fetcher.php @@ -183,33 +183,39 @@ class Fetcher implements CrawlConstants * @var array */ var $meta_words; + /** * WebArchiveBundle used to store complete web pages and auxiliary data * @var object */ var $web_archive; + /** * Timestamp of the current crawl * @var int */ var $crawl_time; + /** * Contains the list of web pages to crawl from a queue_server * @var array */ var $to_crawl; + /** * Contains the list of web pages to crawl that failed on first attempt * (we give them one more try before bailing on them) * @var array */ var $to_crawl_again; + /** * Summary information for visited sites that the fetcher hasn't sent to * a queue_server yet * @var array */ var $found_sites; + /** * Timestamp from a queue_server of the current schedule of sites to * download. This is sent back to the server once this schedule is completed @@ -217,24 +223,28 @@ class Fetcher implements CrawlConstants * @var int */ var $schedule_time; + /** * The sum of the number of words of all the page description for the current * crawl. This is used in computing document statistics. * @var int */ var $sum_seen_site_description_length; + /** * The sum of the number of words of all the page titles for the current * crawl. This is used in computing document statistics. * @var int */ var $sum_seen_title_length; + /** * The sum of the number of words in all the page links for the current * crawl. This is used in computing document statistics. * @var int */ var $sum_seen_site_link_length; + /** * Number of sites crawled in the current crawl * @var int @@ -257,14 +267,25 @@ class Fetcher implements CrawlConstants var $crawl_type; /** - * Maximum number of bytes to download of a webpage - * @var int + * For an archive crawl, holds the name of the type of archive being + * iterated over (this is the class name of the iterator, without the word + * 'Iterator') + * @var string */ - var $page_range_request; + var $arc_type; /** - * If self::ARCHIVE_CRAWL is being down, then this field holds the iterator - * object used to iterate over the archive + * For a non-web archive crawl, holds the path to the directory that + * contains the archive files and their description (web archives have a + * different structure and are already distributed across machines and + * fetchers) + * @var string + */ + var $arc_dir; + + /** + * If an web archive crawl (i.e. a re-crawl) is active then this field + * holds the iterator object used to iterate over the archive * @var object */ var $archive_iterator; @@ -290,6 +311,12 @@ class Fetcher implements CrawlConstants */ var $fetcher_num; + /** + * Maximum number of bytes to download of a webpage + * @var int + */ + var $page_range_request; + /** * An array to keep track of hosts which have had a lot of http errors * @var array @@ -401,7 +428,7 @@ class Fetcher implements CrawlConstants if($info[self::CRAWL_TIME] == 0) { $info[self::STATUS] = self::NO_DATA_STATE; } - } else if ($this->crawl_type == self::ARCHIVE_CRAWL && + } else if($this->crawl_type == self::ARCHIVE_CRAWL && !empty($this->arc_dir)) { // An archive crawl with data coming from the name server. $info = $this->checkArchiveScheduler(); @@ -700,6 +727,10 @@ class Fetcher implements CrawlConstants $this->crawl_type = $info[self::CRAWL_TYPE]; $this->arc_dir = $info[self::ARC_DIR]; $this->arc_type = $info[self::ARC_TYPE]; + } else { + $this->crawl_type = self::WEB_CRAWL; + $this->arc_dir = ''; + $this->arc_type = ''; } // Load any batch that might exist for changed-to crawl @@ -844,8 +875,13 @@ class Fetcher implements CrawlConstants } $end_info = strpos($response_string, "\n"); - $info_string = substr($response_string, 0, $end_info); - $info = unserialize($info_string); + if($end_info !== false && ($info_string = + substr($response_string, 0, $end_info)) != '') { + $info = unserialize($info_string); + } else { + $info = array(); + $info[self::STATUS] = self::NO_DATA_STATE; + } $this->setCrawlParamsFromArray($info); if(isset($info[self::SITES])) { diff --git a/configs/config.php b/configs/config.php old mode 100755 new mode 100644 index 03ff9aaac..06428c100 --- a/configs/config.php +++ b/configs/config.php @@ -265,6 +265,17 @@ define('MAX_PHRASE_LEN', 2); /** number of multi curl page requests in one go */ define('NUM_MULTI_CURL_PAGES', 100); +/** number of pages to extract from an archive in one go */ +define('ARCHIVE_BATCH_SIZE', 500); + +/** + Time in seconds to wait to acquire an exclusive lock before we're no longer + allowed to extract the next batch of pages for an archive crawl. This is + intended to prevent a fetcher from waiting to acquire the lock, then + getting it just before cURL gives up and times out the request. + */ +define('ARCHIVE_LOCK_TIMEOUT', 8); + /** time in seconds before we give up on a page */ define('PAGE_TIMEOUT', 30); diff --git a/controllers/fetch_controller.php b/controllers/fetch_controller.php index e536fb954..97e7c31f1 100755 --- a/controllers/fetch_controller.php +++ b/controllers/fetch_controller.php @@ -161,9 +161,19 @@ class FetchController extends Controller implements CrawlConstants $this->displayView($view, $data); } + /** + * Checks to see whether there are more pages to extract from the current + * archive, and if so returns the next batch to the requesting fetcher. The + * iteration progress is automatically saved on each call to nextPages, so + * that the next fetcher will get the next batch of pages. If there is no + * current archive to iterate over, or the iterator has reached the end of + * the archive then indicate that there is no more data by setting the + * status to NO_DATA_STATE. + */ function archiveSchedule() { $view = "fetch"; + $request_start = time(); if(isset($_REQUEST['crawl_time'])) {; $crawl_time = $this->clean($_REQUEST['crawl_time'], 'int'); @@ -177,6 +187,8 @@ class FetchController extends Controller implements CrawlConstants $fetch_pages = true; $info = unserialize(file_get_contents($messages_filename)); if($info[self::STATUS] == 'STOP_CRAWL') { + // The stop crawl message gets created by the admin_controller + // when the "stop crawl" button is pressed. @unlink($messages_filename); @unlink($lock_filename); $fetch_pages = false; @@ -187,13 +199,22 @@ class FetchController extends Controller implements CrawlConstants $info = array(); } + $pages = array(); if($fetch_pages) { $archive_iterator = NULL; - $pages = array(); + // Start by trying to acquire an exclusive lock on the iterator + // lock file, so that the same batch of pages isn't extracted more + // than once. For now the call to acquire the lock blocks, so that + // fetchers will queue up. If the time between requesting the lock + // and acquiring it is greater than ARCHIVE_LOCK_TIMEOUT then we + // give up on this request and try back later. $lock_fd = fopen($lock_filename, 'w'); $have_lock = flock($lock_fd, LOCK_EX); - if(file_exists($info[self::ARC_DIR]) && $have_lock) { + $elapsed_time = time() - $request_start; + + if($have_lock && $elapsed_time <= ARCHIVE_LOCK_TIMEOUT && + file_exists($info[self::ARC_DIR])) { $iterate_timestamp = $info[self::CRAWL_INDEX]; $iterate_dir = $info[self::ARC_DIR]; $result_timestamp = $crawl_time; @@ -217,30 +238,28 @@ class FetchController extends Controller implements CrawlConstants } if($archive_iterator && !$archive_iterator->end_of_iterator) { - if($archive_iterator->end_of_iterator) { - // Stop crawl here. - } else { - $info[self::SITES] = array(); - $pages = $archive_iterator->nextPages( - 500); - $delta = time() - $time; - debugLogFile("fetch took $delta seconds", "nameserver"); - } + $info[self::SITES] = array(); + $pages = $archive_iterator->nextPages( + ARCHIVE_BATCH_SIZE); + $delta = time() - $time; } if($have_lock) { flock($lock_fd, LOCK_UN); } fclose($lock_fd); + } - $info_string = serialize($info); + if(!empty($pages)) { $pages_string = gzcompress(serialize($pages)); - $data['MESSAGE'] = $info_string."\n".$pages_string; } else { $info[self::STATUS] = self::NO_DATA_STATE; - $data['MESSAGE'] = serialize($info)."\n"; + $pages_string = ''; } + $info_string = serialize($info); + $data['MESSAGE'] = $info_string."\n".$pages_string; + $this->displayView($view, $data); } diff --git a/lib/archive_bundle_iterators/arc_archive_bundle_iterator.php b/lib/archive_bundle_iterators/arc_archive_bundle_iterator.php index cd704ed70..02d8cc1ae 100644 --- a/lib/archive_bundle_iterators/arc_archive_bundle_iterator.php +++ b/lib/archive_bundle_iterators/arc_archive_bundle_iterator.php @@ -53,12 +53,22 @@ require_once BASE_DIR. class ArcArchiveBundleIterator extends ArchiveBundleIterator implements CrawlConstants { + /** + * The path to the directory containing the archive partitions to be + * iterated over. + * @var string + */ + var $iterate_dir; + /** + * The path to the directory where the iteration status is stored. + * @var string + */ + var $result_dir; /** * The number of arc files in this arc archive bundle * @var int */ var $num_partitions; - /** * Counting in glob order for this arc archive bundle directory, the * current active file number of the arc file being process. @@ -67,7 +77,12 @@ class ArcArchiveBundleIterator extends ArchiveBundleIterator */ var $current_partition_num; /** - current byte offset into the current arc file + * current number of pages into the current arc file + * @var int + */ + var $current_page_num; + /** + * current byte offset into the current arc file * @var int */ var $current_offset; @@ -82,7 +97,6 @@ class ArcArchiveBundleIterator extends ArchiveBundleIterator */ var $fh; - /** * Creates a arc archive iterator with the given parameters. * @@ -91,27 +105,21 @@ class ArcArchiveBundleIterator extends ArchiveBundleIterator * @param string $result_timestamp timestamp of the arc archive bundle * results are being stored in */ - function __construct($prefix, $iterate_timestamp, $result_timestamp) + function __construct($iterate_timestamp, $iterate_dir, + $result_timestamp, $result_dir) { - $this->fetcher_prefix = $prefix; $this->iterate_timestamp = $iterate_timestamp; + $this->iterate_dir = $iterate_dir; $this->result_timestamp = $result_timestamp; - $archive_name = $this->get_archive_name($iterate_timestamp); + $this->result_dir = $result_dir; $this->partitions = array(); - foreach(glob("$archive_name/*.arc.gz") as $filename) { + foreach(glob("{$this->iterate_dir}/*.arc.gz") as $filename) { $this->partitions[] = $filename; } $this->num_partitions = count($this->partitions); - if(file_exists("$archive_name/iterate_status.txt")) { - $info = unserialize(file_get_contents( - "$archive_name/iterate_status.txt")); - $this->end_of_iterator = $info['end_of_iterator']; - $this->current_partition_num = $info['current_partition_num']; - $this->current_offset = $info['current_offset']; - $this->fh=gzopen( - $this->partitions[$this->current_partition_num], "rb"); - gzseek($this->fh, $this->current_offset); + if(file_exists("{$this->result_dir}/iterate_status.txt")) { + $this->restoreCheckpoint(); } else { $this->reset(); } @@ -138,8 +146,7 @@ class ArcArchiveBundleIterator extends ArchiveBundleIterator $this->end_of_iterator = false; $this->current_offset = 0; $this->fh = NULL; - $archive_name = $this->get_archive_name($this->result_timestamp); - @unlink("$archive_name/iterate_status.txt"); + @unlink("{$this->result_dir}/iterate_status.txt"); } /** @@ -153,6 +160,7 @@ class ArcArchiveBundleIterator extends ArchiveBundleIterator function nextPages($num) { $pages = array(); + $page_count = 0; for($i = 0; $i < $num; $i++) { $page = $this->nextPage(); if(!$page) { @@ -168,19 +176,15 @@ class ArcArchiveBundleIterator extends ArchiveBundleIterator $this->partitions[$this->current_partition_num], "rb"); } else { $pages[] = $page; + $page_count++; } } if(is_resource($this->fh)) { $this->current_offset = gztell($this->fh); + $this->current_page_num += $page_count; } - $archive_name = $this->get_archive_name($this->result_timestamp); - $info = array(); - $info['end_of_iterator'] = $this->end_of_iterator; - $info['current_partition_num'] = $this->current_partition_num; - $info['current_offset'] = $this->current_offset; - file_put_contents("$archive_name/iterate_status.txt", - serialize($info)); + $this->saveCheckpoint(); return $pages; } @@ -213,6 +217,5 @@ class ArcArchiveBundleIterator extends ArchiveBundleIterator $site[self::WEIGHT] = 1; return $site; } - } ?> diff --git a/lib/archive_bundle_iterators/archive_bundle_iterator.php b/lib/archive_bundle_iterators/archive_bundle_iterator.php index 51dfb9bfc..cc4122d75 100644 --- a/lib/archive_bundle_iterators/archive_bundle_iterator.php +++ b/lib/archive_bundle_iterators/archive_bundle_iterator.php @@ -53,13 +53,11 @@ abstract class ArchiveBundleIterator implements CrawlConstants * @var int */ var $iterate_timestamp; - /** * Timestamp of the archive that is being used to store results in * @var int */ var $result_timestamp; - /** * Whether or not the iterator still has more documents * @var bool @@ -67,22 +65,44 @@ abstract class ArchiveBundleIterator implements CrawlConstants var $end_of_iterator; /** - * The fetcher prefix associated with this archive. - * @var string + * Stores the current progress to the file iterate_status.txt in the result + * dir such that a new instance of the iterator could be constructed and + * return the next set of pages without having to process all of the pages + * that came before. Each iterator should make a call to saveCheckpoint + * after extracting a batch of pages. + * @param array $info any extra info a subclass wants to save */ - var $fetcher_prefix; + function saveCheckpoint($info = array()) + { + $info['end_of_iterator'] = $this->end_of_iterator; + $info['current_partition_num'] = $this->current_partition_num; + $info['current_page_num'] = $this->current_page_num; + $info['current_offset'] = $this->current_offset; + file_put_contents("{$this->result_dir}/iterate_status.txt", + serialize($info)); + } /** - * Returns the path to an archive given its timestamp. - * - * @param string $timestamp the archive timestamp - * @return string the path to the archive, based off of the fetcher prefix - * used when this iterator was constructed + * Restores the internal state from the file iterate_status.txt in the + * result dir such that the next call to nextPages will pick up from just + * after the last checkpoint. Each iterator should make a call to + * restoreCheckpoint at the end of the constructor method after the + * instance members have been initialized. + * @return array the data serialized when saveCheckpoint was called */ - function get_archive_name($timestamp) + function restoreCheckpoint() { - return CRAWL_DIR.'/cache/'.$this->fetcher_prefix. - self::archive_base_name.$timestamp; + $info = unserialize(file_get_contents( + "{$this->result_dir}/iterate_status.txt")); + $this->end_of_iterator = $info['end_of_iterator']; + $this->current_partition_num = $info['current_partition_num']; + $this->current_offset = $info['current_offset']; + if(!$this->end_of_iterator) { + $this->fh=gzopen( + $this->partitions[$this->current_partition_num], "rb"); + gzseek($this->fh, $this->current_offset); + } + return $info; } /** diff --git a/lib/archive_bundle_iterators/mediawiki_bundle_iterator.php b/lib/archive_bundle_iterators/mediawiki_bundle_iterator.php index d60fb9497..8fcd76300 100644 --- a/lib/archive_bundle_iterators/mediawiki_bundle_iterator.php +++ b/lib/archive_bundle_iterators/mediawiki_bundle_iterator.php @@ -55,21 +55,30 @@ require_once BASE_DIR.'/lib/bzip2_block_iterator.php'; class MediaWikiArchiveBundleIterator extends ArchiveBundleIterator implements CrawlConstants { + /** + * The path to the directory containing the archive partitions to be + * iterated over. + * @var string + */ + var $iterate_dir; + /** + * The path to the directory where the iteration status is stored. + * @var string + */ + var $result_dir; /** * The number of arc files in this arc archive bundle * @var int */ var $num_partitions; - /** * Counting in glob order for this arc archive bundle directory, the * current active file number of the arc file being processed. - * * @var int */ var $current_partition_num; /** - current number of wiki pages into the Media Wiki xml.bz2 file + * current number of wiki pages into the Media Wiki xml.bz2 file * @var int */ var $current_page_num; @@ -86,7 +95,6 @@ class MediaWikiArchiveBundleIterator extends ArchiveBundleIterator /** * Used to hold data that was in the buffer but before a siteinfo or a page * when that data gets parsed out. - * * @var string */ var $remainder; @@ -158,20 +166,47 @@ class MediaWikiArchiveBundleIterator extends ArchiveBundleIterator $this->num_partitions = count($this->partitions); if(file_exists("{$this->result_dir}/iterate_status.txt")) { - $info = unserialize(file_get_contents( - "{$this->result_dir}/iterate_status.txt")); - $this->end_of_iterator = $info['end_of_iterator']; - $this->current_partition_num = $info['current_partition_num']; - $this->current_page_num = $info['current_page_num']; - $this->buffer = $info['buffer']; - $this->remainder = $info['remainder']; - $this->header = $info['header']; - $this->bz2_iterator = $info['bz2_iterator']; + $this->restoreCheckpoint(); } else { $this->reset(); } } + /** + * Saves the current state so that a new instantiation can pick up just + * after the last batch of pages extracted. + */ + function saveCheckpoint($info = array()) + { + $info['end_of_iterator'] = $this->end_of_iterator; + $info['current_partition_num'] = $this->current_partition_num; + $info['current_page_num'] = $this->current_page_num; + $info['buffer'] = $this->buffer; + $info['remainder'] = $this->remainder; + $info['header'] = $this->header; + $info['bz2_iterator'] = $this->bz2_iterator; + file_put_contents("{$this->result_dir}/iterate_status.txt", + serialize($info)); + } + + /** + * Restores state from a previous instantiation, after the last batch of + * pages extracted. + */ + function restoreCheckpoint() + { + $info = unserialize(file_get_contents( + "{$this->result_dir}/iterate_status.txt")); + $this->end_of_iterator = $info['end_of_iterator']; + $this->current_partition_num = $info['current_partition_num']; + $this->current_page_num = $info['current_page_num']; + $this->buffer = $info['buffer']; + $this->remainder = $info['remainder']; + $this->header = $info['header']; + $this->bz2_iterator = $info['bz2_iterator']; + return $info; + } + /** * Estimates the important of the site according to the weighting of * the particular archive iterator @@ -223,7 +258,7 @@ class MediaWikiArchiveBundleIterator extends ArchiveBundleIterator */ function getNextTagData($tag) { - while(!stristr($this->buffer, "</$tag")) { + while(stripos($this->buffer, "</$tag") === false) { if(is_null($this->bz2_iterator) || $this->bz2_iterator->is_eof()) { return false; } @@ -333,16 +368,7 @@ class MediaWikiArchiveBundleIterator extends ArchiveBundleIterator $this->current_page_num += $page_count; } - $info = array(); - $info['end_of_iterator'] = $this->end_of_iterator; - $info['current_partition_num'] = $this->current_partition_num; - $info['current_page_num'] = $this->current_page_num; - $info['buffer'] = $this->buffer; - $info['remainder'] = $this->remainder; - $info['header'] = $this->header; - $info['bz2_iterator'] = $this->bz2_iterator; - file_put_contents("{$this->result_dir}/iterate_status.txt", - serialize($info)); + $this->saveCheckpoint(); return $pages; } diff --git a/lib/archive_bundle_iterators/odp_rdf_bundle_iterator.php b/lib/archive_bundle_iterators/odp_rdf_bundle_iterator.php index 75d74e290..5ab3a55a5 100644 --- a/lib/archive_bundle_iterators/odp_rdf_bundle_iterator.php +++ b/lib/archive_bundle_iterators/odp_rdf_bundle_iterator.php @@ -54,25 +54,34 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator implements CrawlConstants { /** - * The number of arc files in this arc archive bundle + * The path to the directory containing the archive partitions to be + * iterated over. + * @var string + */ + var $iterate_dir; + /** + * The path to the directory where the iteration status is stored. + * @var string + */ + var $result_dir; + /** + * The number of odp rdf files in this archive bundle * @var int */ var $num_partitions; - /** - * Counting in glob order for this arc archive bundle directory, the - * current active file number of the arc file being process. - * + * Counting in glob order for this odp rdf archive bundle directory, the + * current active file number of the file being processed. * @var int */ var $current_partition_num; /** - current number of wiki pages into the Media Wiki xml.bz2 file + * current number of pages into the current odp rdf file * @var int */ var $current_page_num; /** - * Array of filenames of arc files in this directory (glob order) + * Array of filenames of odp rdf files in this directory (glob order) * @var array */ var $partitions; @@ -82,7 +91,7 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator */ var $buffer; /** - * Associative array containing global properties like base url of th + * Associative array containing global properties like base url of the * current open odp rdf file * @var array */ @@ -92,11 +101,17 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator * @var resource */ var $fh; + /** + * Offset into the current odp rdf file + * @var int + */ + var $current_offset; /** - * How many bytes to read into buffer from bz2 stream in one go + * How many bytes to read into buffer from gzip stream in one go + * @var int */ - const BLOCK_SIZE = 8192; + const BLOCK_SIZE = 1024; /** * Creates an open directory rdf archive iterator with the given parameters. @@ -106,14 +121,15 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator * @param string $result_timestamp timestamp of the arc archive bundle * results are being stored in */ - function __construct($prefix, $iterate_timestamp, $result_timestamp) + function __construct($iterate_timestamp, $iterate_dir, + $result_timestamp, $result_dir) { - $this->fetcher_prefix = $prefix; $this->iterate_timestamp = $iterate_timestamp; + $this->iterate_dir = $iterate_dir; $this->result_timestamp = $result_timestamp; - $archive_name = $this->get_archive_name($iterate_timestamp); + $this->result_dir = $result_dir; $this->partitions = array(); - foreach(glob("$archive_name/*.gz") as $filename) { + foreach(glob("{$this->iterate_dir}/*.gz") as $filename) { $this->partitions[] = $filename; } $this->num_partitions = count($this->partitions); @@ -121,22 +137,31 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator $url_parts = @parse_url($this->header['base_address']); $this->header['ip_address'] = gethostbyname($url_parts['host']); - if(file_exists("$archive_name/iterate_status.txt")) { - $info = unserialize(file_get_contents( - "$archive_name/iterate_status.txt")); - $this->end_of_iterator = $info['end_of_iterator']; - $this->current_partition_num = $info['current_partition_num']; - $this->current_offset = $info['current_offset']; - - $this->fh=gzopen( - $this->partitions[$this->current_partition_num], "r"); - $this->buffer = ""; - $this->readPages($this->current_page_num, false); + if(file_exists("{$this->result_dir}/iterate_status.txt")) { + $this->restoreCheckpoint(); } else { $this->reset(); } } + /** + * Add the buffer contents to the standard gzip archive checkpoint. + */ + function saveCheckpoint($info = array()) + { + $info['buffer'] = $this->buffer; + parent::saveCheckpoint($info); + } + + /** + * Restore the buffer from the checkpoint info. + */ + function restoreCheckpoint() + { + $info = parent::restoreCheckpoint(); + $this->buffer = $info['buffer']; + } + /** * Estimates the important of the site according to the weighting of * the particular archive iterator @@ -160,34 +185,33 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator */ function getNextTagsData($tags) { + $max_tag_len = 0; + $regex = '@<('.implode('|', $tags).')[^>]*?>.*?</\1[^>]*?>@si'; + foreach($tags as $tag) { + $max_tag_len = max(strlen($tag) + 2, $max_tag_len); + } + $done = false; + $search_failed = false; + $offset = 0; do { - $done = false; - if(!$this->fh || feof($this->fh)) {return false;} + if($search_failed && (!$this->fh || feof($this->fh))) { + return false; + } $this->buffer .= gzread($this->fh, self::BLOCK_SIZE); - - foreach($tags as $tag) { - if(stristr($this->buffer, "</$tag")) { - $done = true; - } + if(preg_match($regex, $this->buffer, $matches, + PREG_OFFSET_CAPTURE, $offset)) { + $done = true; + $search_failed = false; + } else { + $search_failed = true; } + $offset = max(0, strlen($this->buffer) - $max_tag_len); } while(!$done); - $found_tag = ""; - $min_pos_tag = strlen($this->buffer); - foreach($tags as $tag) { - $pos_tag = strpos($this->buffer, $tag); - if( $pos_tag !== false) { - if($found_tag == "" || $pos_tag < $min_pos_tag) { - $found_tag = $tag; - $min_pos_tag = $pos_tag; - } - } - } - $start_info = strpos($this->buffer, "<$found_tag"); - $pre_end_info = strpos($this->buffer, "</$found_tag", $start_info); - $end_info = strpos($this->buffer, ">", $pre_end_info) + 1; - $tag_info = substr($this->buffer, $start_info, - $end_info - $start_info); - $this->buffer = substr($this->buffer, $end_info); + $found_tag = $matches[1][0]; + $start = $matches[0][1]; + $length = strlen($matches[0][0]); + $tag_info = substr($this->buffer, $start, $length); + $this->buffer = substr($this->buffer, $start + $length); return array($tag_info, $found_tag); } @@ -263,11 +287,10 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator { $this->current_partition_num = -1; $this->end_of_iterator = false; - $this->current_offset = 0; $this->fh = NULL; + $this->current_offset = 0; $this->buffer = ""; - $archive_name = $this->get_archive_name($this->result_timestamp); - @unlink("$archive_name/iterate_status.txt"); + @unlink("{$this->result_dir}/iterate_status.txt"); } /** @@ -307,6 +330,7 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator } $this->fh = gzopen( $this->partitions[$this->current_partition_num], "r"); + $this->current_offset = 0; } else { if($return_pages) { $pages[] = $page; @@ -315,16 +339,11 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator } } if(is_resource($this->fh)) { + $this->current_offset = gztell($this->fh); $this->current_page_num += $page_count; } - $archive_name = $this->get_archive_name($this->result_timestamp); - $info = array(); - $info['end_of_iterator'] = $this->end_of_iterator; - $info['current_partition_num'] = $this->current_partition_num; - $info['current_page_num'] = $this->current_page_num; - file_put_contents("$archive_name/iterate_status.txt", - serialize($info)); + $this->saveCheckpoint(); return $pages; } @@ -336,8 +355,12 @@ class OdpRdfArchiveBundleIterator extends ArchiveBundleIterator function readPage($return_page) { if(!is_resource($this->fh)) return NULL; - list($page_info, $tag) = $this->getNextTagsData( + $tag_data = $this->getNextTagsData( array("Topic","ExternalPage")); + if(!$tag_data) { + return false; + } + list($page_info, $tag) = $tag_data; if(!$return_page) { return true; } diff --git a/lib/archive_bundle_iterators/web_archive_bundle_iterator.php b/lib/archive_bundle_iterators/web_archive_bundle_iterator.php index 3cd2a399c..680610945 100644 --- a/lib/archive_bundle_iterators/web_archive_bundle_iterator.php +++ b/lib/archive_bundle_iterators/web_archive_bundle_iterator.php @@ -89,6 +89,24 @@ class WebArchiveBundleIterator extends ArchiveBundleIterator * @var object */ var $archive; + /** + * The fetcher prefix associated with this archive. + * @var string + */ + var $fetcher_prefix; + + /** + * Returns the path to an archive given its timestamp. + * + * @param string $timestamp the archive timestamp + * @return string the path to the archive, based off of the fetcher prefix + * used when this iterator was constructed + */ + function get_archive_name($timestamp) + { + return CRAWL_DIR.'/cache/'.$this->fetcher_prefix. + self::archive_base_name.$timestamp; + } /** * Creates a web archive iterator with the given parameters. @@ -108,25 +126,50 @@ class WebArchiveBundleIterator extends ArchiveBundleIterator $this->archive = new WebArchiveBundle($archive_name); $archive_name = $this->get_archive_name($result_timestamp); if(file_exists("$archive_name/iterate_status.txt")) { - $info = unserialize(file_get_contents( - "$archive_name/iterate_status.txt")); - $this->count = $this->archive->count; - $this->num_partitions = $this->archive->write_partition+1; - $this->overall_index = $info['overall_index']; - $this->end_of_iterator = $info['end_of_iterator']; - $this->partition_index = $info['partition_index']; - $this->current_partition_num = $info['current_partition_num']; - $this->partition = $this->archive->getPartition( - $this->current_partition_num, false); - $this->partition->iterator_pos = $info['iterator_pos']; + $this->restoreCheckpoint(); } else { $this->reset(); } + } + + /** + * Saves the current state so that a new instantiation can pick up just + * after the last batch of pages extracted. + */ + function saveCheckpoint($info = array()) + { + $info['overall_index'] = $this->overall_index; + $info['end_of_iterator'] = $this->end_of_iterator; + $info['partition_index'] = $this->partition_index; + $info['current_partition_num'] = $this->current_partition_num; + $info['iterator_pos'] = $this->partition->iterator_pos; + $archive_name = $this->get_archive_name($this->result_timestamp); + file_put_contents("$archive_name/iterate_status.txt", + serialize($info)); + } + /** + * Restores state from a previous instantiation, after the last batch of + * pages extracted. + */ + function restoreCheckpoint() + { + $info = unserialize(file_get_contents( + "$archive_name/iterate_status.txt")); + $this->count = $this->archive->count; + $this->num_partitions = $this->archive->write_partition+1; + $this->overall_index = $info['overall_index']; + $this->end_of_iterator = $info['end_of_iterator']; + $this->partition_index = $info['partition_index']; + $this->current_partition_num = $info['current_partition_num']; + $this->partition = $this->archive->getPartition( + $this->current_partition_num, false); + $this->partition->iterator_pos = $info['iterator_pos']; + return $info; } /** - * Estimates the important of the site according to the weighting of + * Estimates the importance of the site according to the weighting of * the particular archive iterator * @param $site an associative array containing info about a web page * @return bool false we assume files were crawled roughly according to @@ -171,16 +214,7 @@ class WebArchiveBundleIterator extends ArchiveBundleIterator $this->end_of_iterator = ($this->overall_index >= $this->count ) ? true : false; - $archive_name = $this->get_archive_name($this->result_timestamp); - $info = array(); - $info['overall_index'] = $this->overall_index; - $info['end_of_iterator'] = $this->end_of_iterator; - $info['partition_index'] = $this->partition_index; - $info['current_partition_num'] = $this->current_partition_num; - $info['iterator_pos'] =$this->partition->iterator_pos; - file_put_contents("$archive_name/iterate_status.txt", - serialize($info)); - + $this->saveCheckpoint(); return $objects; }