Last commit for src/library/IndexDocumentBundle.php: 88ba842636f692ac9bde972fed5a3cf6959d841b

Allows Arctool to rebuild/remerge a range of partitions, fixes term lookup bugs in WordIterator and IndexDocumentBundle

Chris Pollett [2024-02-04 02:Feb:th]
Allows Arctool to rebuild/remerge a range of partitions, fixes term lookup bugs in WordIterator and IndexDocumentBundle
<?php
/**
 * SeekQuarry/Yioop --
 * Open Source Pure PHP Search Engine, Crawler, and Indexer
 *
 * Copyright (C) 2009 - 2023  Chris Pollett chris@pollett.org
 *
 * LICENSE:
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 *
 * END LICENSE
 *
 * @author Chris Pollett chris@pollett.org
 * @license https://www.gnu.org/licenses/ GPL3
 * @link https://www.seekquarry.com/
 * @copyright 2009 - 2023
 * @filesource
 */
namespace seekquarry\yioop\library;

use seekquarry\yioop\configs as C;

/**
 * Used for crawlLog, crawlHash, and garbageCollect
 */
require_once __DIR__ . '/Utility.php';
/**
 * Encapsulates a set of web page documents and an inverted word-index of terms
 * from these documents which allow one to search for documents containing a
 * particular word.
 *
 * @author Chris Pollett
 */
class IndexDocumentBundle implements CrawlConstants
{
    /**
     * File name used to store within the folder of the IndexDocumentBundle
     * parameter/configuration information about the bundle
     */
    const ARCHIVE_INFO_FILE = "archive_info.txt";
    /**
     * The version of this IndexDocumentBundle. The lowest format number is
     * 3.0 as prior inverted index/document stores used IndexArchiveBundle's
     */
    const DEFAULT_VERSION = "3.2";
    /**
     * Default values for the configuration parameters of an
     * IndexDocumentBundle
     */
    const DEFAULT_PARAMETERS = ["DESCRIPTION" => "",
        "VERSION" => self::DEFAULT_VERSION
    ];
    /**
     * Subfolder of IndexDocumentBundle to store the btree with
     * term => posting list information (i.e., the inverted index)
     */
    const DICTIONARY_FOLDER = "dictionary";
    /**
     * DocIds are made of three parts: hash of url, hash of document, hash
     * of url hostname. Each of these hashes is  DOCID_PART_LEN long
     */
    const DOCID_PART_LEN = 8;
    /**
     * Length of DocIds used by this IndexDocumentBundle
     */
    const DOCID_LEN = 24;
    /**
     * Length of TermIds used by this IndexDocumentBundle
     */
    const TERMID_LEN = 16;
    /**
     * Partition i in an IndexDocumentBundle has a subfolder i
     * within self::POSITIONS_DOC_MAP_FOLDER. Within this subfolder i,
     * self::DOC_MAP_FILENAME is the name of the file used to store the
     * document map for the partition. The document map consists of a sequence
     * of records associated with each doc_id of a document stored in the
     * partition. The first record is ["POS" => $num_words,
     * "SCORE" => floatval($global_score_for_document)]. The second record is:
     * ["POS" => $length_of_title_of_document, "SCORE" =>
     *          floatval($num_description_scores)]]
     * Here a description score is a score for the importance for a section
     * of a document. Subsequence records, list [POS => the length of the jth
     * section of the document, SCORE => its score].
     */
    const DOC_MAP_FILENAME = "doc_map";
    /**
     * Folder used to store the partition data of this IndexDocumentBundle
     * These will consists of .txt.gz files for each partition which are used
     * to store summaries of documents and actual documents (web pages) and
     * .ix files which are used to store doc_id and the associated offsets to
     * their summary and actual document within the .txt.gz file
     */
    const DOCUMENTS_FOLDER = "documents";
    /**
     * Name of the last entries file used to help compute difference lists
     * for doc_map_index, and position list offsets used in postings for the
     * partition. This file is also used to track the total number of
     * occurrences of term in a partition
     */
    const LAST_ENTRIES_FILENAME = "last_entries";
    /**
     * The filename of a file that is used to keep track of the integer that
     * says what is the next partition with documents that can be added to
     * this IndexDocumentBundle's dictionary. I.e., It should be that
     * next_partition <= save_partition
     */
    const NEXT_PARTITION_FILE = "next_partition.txt";
    /**
     * Names for the files which appear within a partition sub-folder
     */
    const PARTITION_FILENAMES = [self::DOC_MAP_FILENAME,
        self::LAST_ENTRIES_FILENAME, self::POSITIONS_FILENAME,
        self::POSTINGS_FILENAME];
    /**
     * Name of the file within a partitions positions_doc_maps folder used
     * to contain the partition's position list for all terms in partition.
     */
    const POSITIONS_FILENAME = "positions";
    /**
     * Name of the file within a partition's positions_doc_maps folder with
     * posting information for all terms in that partition. This consists of
     * key value pairs term_id => posting records for all documents with that
     * term.
     */
    const POSTINGS_FILENAME = "postings";
    /**
     * Temporary name for postings from a POSTINGS_FILENAME file while
     * they are being compressed.
     */
    const TEMP_POSTINGS_FILENAME = "temp_postings";
    /**
     * How many bytes of posting to buffer before writing, when
     * addPartitionPostingsDictionary
     */
    const POSTINGS_BUFFER_SIZE = 1000000;
    /**
     * Name of the folder used to hold position lists and document maps. Within
     * this folder there is a subfolder for each partition which contains a
     * doc_map file, postings file for the docs within the partition,
     * position lists file for those postings, and a last_entries file
     * used in the computation of difference list for doc_map_index and position
     * list offsets, as well as number of occurrences of terms.
     */
    const POSITIONS_DOC_MAP_FOLDER = "positions_doc_maps";
    /**
     * Folder name to use for this IndexDocumentBundle
     * @var string
     */
    public $dir_name;
    /**
     * A short text name for this IndexDocumentBundle
     * @var string
     */
    public $description;
    /**
     * structure contains info about the current partition
     * @var array
     */
    public $next_partition_to_add;
    /**
     * IndexDictionary for all shards in the IndexArchiveBundle
     * This contains entries of the form (word, num_shards with word,
     * posting list info 0th shard containing the word,
     * posting list info 1st shard containing the word, ...)
     * @var object
     */
    public $dictionary;
    /**
     * PartitionDocumentBundle for web page documents
     * @var object
     */
    public $documents;
    /**
     * Associative array of docid=>doc_record pairs
     * @var array
     */
    public $doc_map;
    /**
     * Used to keep track of the previous values
     * posting quantities so difference lists can be computed. For example,
     * previous $doc_map_index, previous position list offset. It also tracks
     * the total number of occurrences of a term within a partition.
     * @var array
     */
    public $last_entries;
    /**
     * Map from int -> three character unpack string used to unpack posting info
     * @var array
     */
    public $unpack_map;
    /**
     * Array of string lengths each of $unpack_maps codes consumes
     * @var array
     */
    public $unpack_len_map;
    /**
     * A string consisting of a concatenated sequence
     * term position information for each document in turn and within this for
     * each term in that document.
     * @var string
     */
    public $positions;
    /**
     *  Associative array $term_id => posting list
     *  records for that term in the partition.
     * @var array
     */
    public $postings;
    /**
     * Makes or initializes an IndexDocumentBundle with the provided parameters
     *
     * @param string $dir_name folder name to store this bundle
     * @param bool $read_only_archive whether to open archive only for reading
     *  or reading and writing
     * @param string $description a text name/serialized info about this
     *  IndexDocumentBundle
     * @param int $num_docs_per_partition the number of documents to be stored
     *  in a single partition
     * @param int $max_keys the maximum number of keys used by the BPlusTree
     *  used for the inverted index
     */
    public function __construct($dir_name, $read_only_archive = true,
        $description = null, $num_docs_per_partition =
        C\NUM_DOCS_PER_PARTITION, $max_keys = BPlusTree::MAX_KEYS)
    {
        $this->dir_name = $dir_name;
        $is_dir = is_dir($this->dir_name);
        if (!$is_dir && !$read_only_archive) {
            mkdir($this->dir_name);
            mkdir($this->dir_name . "/". self::POSITIONS_DOC_MAP_FOLDER);
        } else if (!$is_dir) {
            return false;
        }
        $archive_info_path = $this->dir_name . "/" . self::ARCHIVE_INFO_FILE;
        $this->archive_info = self::DEFAULT_PARAMETERS;
        if (!empty($description)) {
            $this->archive_info["DESCRIPTION"] = $description;
        }
        $just_got_info = false;
        if(file_exists($archive_info_path)) {
            $this->archive_info = unserialize(file_get_contents(
                $archive_info_path));
            $just_got_info = true;
        }
        if ($this->archive_info['VERSION'] < "3.1") {
            $this->archive_info['RECORD_COMPRESSOR'] =
                C\NS_COMPRESSORS . "GzipCompressor";
        }
        $this->archive_info['RECORD_COMPRESSOR'] ??=
            C\NS_COMPRESSORS . "NonCompressor";
        $record_compressor = $this->archive_info['RECORD_COMPRESSOR'];
        $this->archive_info['BLOB_COMPRESSOR'] ??=
            C\NS_COMPRESSORS . "GzipCompressor";
        $blob_compressor = $this->archive_info['BLOB_COMPRESSOR'];
        if ($just_got_info &&
            empty($this->archive_info['BPLUS_BLOB_COMPRESSOR'])) {
            $this->archive_info['BPLUS_BLOB_COMPRESSOR'] =
                C\NS_COMPRESSORS . "GzipCompressor";
        }
        $this->archive_info['BPLUS_BLOB_COMPRESSOR'] ??=
            C\NS_COMPRESSORS . "NonCompressor";
        $bplus_blob_compressor = $this->archive_info['BPLUS_BLOB_COMPRESSOR'];
        if (!$read_only_archive && !$just_got_info) {
            file_put_contents($archive_info_path,
                serialize($this->archive_info));
        }
        $next_partition_path = $this->dir_name . "/".
            self::NEXT_PARTITION_FILE;
        if (file_exists($next_partition_path)) {
            $this->next_partition_to_add = intval(
                file_get_contents($next_partition_path));
        } else if (!$read_only_archive) {
            $this->next_partition_to_add = 0;
            file_put_contents($next_partition_path,
                $this->next_partition_to_add);
        }
        $this->documents = new PartitionDocumentBundle($dir_name . "/" .
            self::DOCUMENTS_FOLDER, ["PRIMARY KEY" => [self::DOC_ID,
            self::DOCID_LEN],
            self::SUMMARY => "SERIAL", self::PAGE => "SERIAL"],
            $num_docs_per_partition,
            PartitionDocumentBundle::PARTITION_SIZE_THRESHOLD,
            $record_compressor, $blob_compressor);
        if (!$read_only_archive) {
            $this->documents->index_cache_size = 1;
        }
        $this->doc_map_tools = new PackedTableTools([
            "PRIMARY KEY" => ["DOC_KEYS", self::DOCID_LEN], "POS" => "INT",
            "SCORE" => "FLOAT"], $record_compressor);
        $this->postings_tools = new PackedTableTools([
            "PRIMARY KEY" => ["TERM", self::TERMID_LEN],
            "DOC_MAP_INDEX" => "INT", "FREQUENCY" => "INT",
            "POSITIONS_OFFSET" => "INT", "POSITIONS_LEN" => "INT"],
            $record_compressor);
        $unpack_codes = [0 => "C", 1 => "n", 2=> "N", 3 => "J"];
        $len_codes = [0 => 1, 1 => 2, 2=> 4, 3 => 8];
        for ($i = 0; $i < 4; $i++) {
            for ($j = 0; $j < 4; $j++) {
                for ($k = 0; $k < 4; $k++) {
                    for ($m = 0; $m < 4; $m++) {
                        $this->unpack_map[] =
                            $unpack_codes[$i] . "DOC_MAP_INDEX/" .
                            $unpack_codes[$j] . "FREQUENCY/" .
                            $unpack_codes[$k] . "POSITIONS_OFFSET/" .
                            $unpack_codes[$m] . "POSITIONS_LEN";
                        $this->unpack_len_map[] = $len_codes[$i] +
                            $len_codes[$j] + $len_codes[$k] + $len_codes[$m];
                    }
                }
            }
        }
        $this->last_entries_tools = new PackedTableTools([
            "PRIMARY KEY" => ["TERM", 16], "LAST_INDEX" => "INT",
            "LAST_OFFSET" => "INT", "NUM_OCCURRENCES" => "INT"],
            $record_compressor);
        if (!$read_only_archive) {
            $this->documents->initCountIfNotExists("VISITED_URLS_COUNT");
        }
        if ($this->archive_info['VERSION'] < "3.2") {
            $this->dictionary = new BPlusTree($this->dir_name . "/" .
                self::DICTIONARY_FOLDER, ["PRIMARY KEY" => ["TERM", 16],
                "PARTITION" => "INT", "NUM_DOCS" => "INT",
                "NUM_OCCURRENCES" => "INT", "POSTINGS" => "BLOB"], $max_keys,
                $record_compressor, $bplus_blob_compressor);
        } else {
            $this->dictionary = new BPlusTree($this->dir_name . "/" .
                self::DICTIONARY_FOLDER, ["PRIMARY KEY" => ["TERM", 16],
                "PARTITION" => "INT", "NUM_DOCS" => "INT",
                "NUM_OCCURRENCES" => "INT", "POSTINGS_OFFSET" => "INT",
                "POSTINGS_LEN" => "INT"], $max_keys,
                $record_compressor, $bplus_blob_compressor);
        }
    }
    /**
     * Add the array of $pages to the documents PartitionDocumentBundle
     *
     * @param array $pages data to store
     * @param int $visited_urls_count number to add to the count of visited urls
     *     (visited urls is a smaller number than the total count of objects
     *     stored in the index).
     * @return bool success or failure of adding the pages
     */
    public function addPages($pages, $visited_urls_count)
    {
        crawlLog("Indexer adding pages to document bundle...");
        $success = $this->documents->put($pages);
        $this->documents->addCount($visited_urls_count,
            "VISITED_URLS_COUNT");
        return $success;
    }
    /**
     * For every partition between next partition and save partition, adds
     * the posting list information to the dictionary BPlusTree. At the
     * end of this process next partition and save partition should be the same
     *
     * @param string $taking_too_long_touch a filename of a file to touch
     *  so its last modified time becomes the current time. In a typical
     *  Yioop crawl this is done for the crawl_status.txt file to prevent
     *  Yioop's web interface from stopping the crawl because it has seen
     *  no recent  progress activity on a crawl.
     * @param bool $till_equal is set to true will keep adding each partition
     *  up till the save partition if set to false, oln;y adds one partition
     */
    public function updateDictionary($taking_too_long_touch = null,
        $till_equal = true)
    {
        $next_partition = $this->next_partition_to_add;
        $save_partition = $this->documents->parameters["SAVE_PARTITION"];
        $current_num_docs = $this->documents->parameters['ACTIVE_COUNT'];
        crawlLog("Current save partition has $current_num_docs documents.");
        $memory_limit = metricToInt(ini_get("memory_limit"));
        $before_usage = memory_get_usage();
        crawlLog("Indexer Memory  limit is $memory_limit. Usage is " .
            $before_usage);
        $advanced_partition = false;
        while ($next_partition < $save_partition && $advanced_partition <=
            $till_equal) {
            crawlLog("Indexer adding Partition to dictionary...");
            crawlLog("...because save partition changed");
            $switch_time = microtime(true);
            // Save current shard dictionary to main dictionary
            $this->buildInvertedIndexPartition($next_partition,
                $taking_too_long_touch);
            $num_freed = garbageCollect();
            $this->addPartitionPostingsDictionary(
                $next_partition, $taking_too_long_touch);
            crawlLog("Indexer force running garbage collector after partition".
                 " advance. This freed $num_freed bytes.");
            $after_usage = memory_get_usage();
            crawlLog(
                "Indexer after partition changed memory usage: $after_usage");
            crawlLog("Switch Partition time:".
                changeInMicrotime($switch_time));
            $next_partition++;
            file_put_contents($this->dir_name . "/". self::NEXT_PARTITION_FILE,
                $next_partition);
            file_put_contents($this->dir_name . "/". self::NEXT_PARTITION_FILE
                . "-advanced", $next_partition);
            $advanced_partition = true;
        }
        $this->next_partition_to_add = $next_partition;
        return $advanced_partition;
    }
    /**
     * Adds the previously constructed inverted index $partition to the inverted
     * index of the whole bundle
     *
     * @param int $partition which partitions inverted index to add, by
     *  default the current save partition
     * @param string $taking_too_long_touch a filename of a file to touch
     *  so its last modified time becomes the current time. In a typical
     *  Yioop crawl this is done for the crawl_status.txt file to prevent
     *  Yioop's web interface from stopping the crawl because it has seen
     *  no recent  progress activity on a crawl.
     */
    public function addPartitionPostingsDictionary($partition = -1,
        $taking_too_long_touch = null)
    {
        $save_partition = $this->documents->parameters["SAVE_PARTITION"];
        if ($partition < 0 ) {
            if ($save_partition <= 0) {
                return false;
            }
            $partition = $save_partition - 1;
        }
        $base_folder = $this->getPartitionBaseFolder($partition);
        $postings_tools = $this->postings_tools;
        unset($this->postings, $this->doc_map, $this->positions,
            $this->last_entries);
        $last_entries_tools = $this->last_entries_tools;
        $dictionary = $this->dictionary;
        $postings_filename = $base_folder . "/" . self::POSTINGS_FILENAME;
        $last_entries_filename = $base_folder . "/" .
            self::LAST_ENTRIES_FILENAME;
        if (!file_exists($postings_filename)) {
            crawlLog($postings_filename);
            crawlLog("Postings file for partition $partition does not exist");
            return false;
        }
        if (!file_exists($last_entries_filename)) {
            crawlLog(
                "Last entries file for partition $partition does not exist");
            return false;
        }
        crawlLog("Start Adding Partition Posting Info to Dictionary");
        $start_time = microtime(true);
        $postings_string = $postings_tools->load($postings_filename,
            PackedTableTools::AS_STRING_MODE);
        $temp_postings_filename = $base_folder . "/" .
            self::TEMP_POSTINGS_FILENAME;
        rename($postings_filename, $temp_postings_filename);
        $posting_files_len = strlen($postings_string);
        //add a marker for the end of the file as a string
        $key_len = $this->postings_tools->key_len;
        $this->last_entries = $last_entries_tools->load($last_entries_filename);
        $num_postings = substr_count($postings_string, "\xFF") + 1;
        $last_marker = 0;
        $out_postings = "";
        $postings_offset = 0;
        $fh = fopen($postings_filename, "w");
        for ($i = 0; $i < $num_postings; $i++) {
            $cur_marker = strpos($postings_string, "\xFF", $last_marker);
            $diff = ($cur_marker === false) ? null :
                $cur_marker - $last_marker;
            $pre_row = substr($postings_string, $last_marker, $diff);
            $last_marker = $cur_marker + 1;
            $term = substr($pre_row, 0, $key_len);
            $row = decode255(substr($pre_row, $key_len));
            $postings_len = strlen($row);
            $out_postings .= $row;
            if(crawlTimeoutLog("..Indexer Still processing partition ".
                "$partition. Have completed $i postings of $num_postings.") &&
                $taking_too_long_touch) {
                if (file_exists($taking_too_long_touch)) {
                    touch($taking_too_long_touch, time());
                }
            }
            $start = 0;
            $num_docs_term = vByteDecode($row, $start);
            $num_occurrences_term = 0;
            $last_entry = $last_entries_tools->find($this->last_entries, $term);
            if (!empty($last_entry)) {
                $last_entry_row =
                    $last_entries_tools->unpack($last_entry);
                $num_occurrences_term = $last_entry_row[0]["NUM_OCCURRENCES"];
            }
            $dictionary->put(["TERM" => $term, "PARTITION" => $partition,
                "NUM_DOCS" => $num_docs_term,
                "NUM_OCCURRENCES"  => $num_occurrences_term,
                "POSTINGS_OFFSET" => $postings_offset,
                "POSTINGS_LEN" => $postings_len]);
            $postings_offset += $postings_len;
            if (strlen($out_postings) > self::POSTINGS_BUFFER_SIZE) {
                fwrite($fh, $out_postings);
                $out_postings = "";
            }
        }
        $dictionary->flushLastPutNode();
        fwrite($fh, $out_postings);
        fclose($fh);
        unlink($temp_postings_filename);
        crawlLog("...Finished Adding Partition Posting Info to " .
            "Dictionary: " . changeInMicrotime($start_time));
        if (!C\nsdefined("KEEP_PARTITION_CALCULATIONS") ||
            !C\KEEP_PARTITION_CALCULATIONS) {
            if (file_exists($last_entries_filename)) {
                unlink($last_entries_filename);
            }
            crawlLog("..Done deleting partition posting calculations.");
        }
    }
    /**
     * Gets the file path corresponding to the partition with index $partition
     *
     * @param int $partition desired partition index
     * @return string file path to where this partitions index data is stored
     *  (Not the original documents which are stored in the
     *  PartitionDocumentBundle)
     */
    public function getPartitionBaseFolder($partition)
    {
        $base_folder = $this->dir_name . "/" . self::POSITIONS_DOC_MAP_FOLDER
            . "/$partition";
        return $base_folder;
    }
    /**
     * Given the $doc_id of a document and a $partition to look for it in
     * return's the document summary info if present and [] otherwise.
     *
     * @param string $doc_id of document to look up
     * @param int $partition to look for document in
     * @return array desired summary or [] if look up failed
     */
    public function getSummary($doc_id, $partition)
    {
        $row = $this->documents->get($doc_id, $partition, [self::SUMMARY]);
        return $row[self::SUMMARY] ?? [];
    }
    /**
     * Given the $doc_id of a document and a $partition to look for it in
     * return's the cached page of the document if present and [] otherwise
     *
     * @param string $doc_id of document to look up
     * @param int $partition to look for document in
     * @return array desired page cache or [] if look up failed
     */
    public function getCachePage($doc_id, $partition)
    {
        $row = $this->documents->get($doc_id, $partition, [self::PAGE]);
        return $row[self::PAGE] ?? [];
    }
    /**
     * Builds an inverted index shard for a documents PartitionDocumentBundle
     * partition.
     * @param int $partition to build index for
     * @param string $taking_too_long_touch a filename of a file to touch
     *  so its last modified time becomes the current time. In a typical
     *  Yioop crawl this is done for the crawl_status.txt file to prevent
     *  Yioop's web interface from stopping the crawl because it has seen
     *  no recent  progress activity on a crawl.
     * @return mixed whether job executed to completion (true or false) if
     *  !$just_stats, otherwise, an array with NUM_DOCS, NUM_LINKS,
     *  and TERM_STATISTICS (the latter having term frequency info)
     */
    public function buildInvertedIndexPartition($partition = -1,
        $taking_too_long_touch = null, $just_stats = false)
    {
        $start_time = microtime(true);
        crawlLog("  Indexer start building inverted index ...  Current Memory:".
            memory_get_usage());
        if ($partition < 0) {
            $partition = $this->documents->parameters["SAVE_PARTITION"];
        }
        crawlLog(
            "Indexer Building index inverted index for partition $partition");
        $base_folder = $this->getPartitionBaseFolder($partition);
        if (!file_exists($base_folder)) {
            if (!file_exists($this->dir_name . "/".
                self::POSITIONS_DOC_MAP_FOLDER)) {
                mkdir($this->dir_name . "/". self::POSITIONS_DOC_MAP_FOLDER);
            }
            mkdir($base_folder);
        }
        /* set up $doc_map_filename, $postings_filename, $postings_filename,
           $positions_filename, etc
         */
        foreach (self::PARTITION_FILENAMES as $filename) {
            $component_filename = $base_folder . "/" . $filename;
            if (file_exists($component_filename)) {
                unlink($component_filename);
            }
            $component = $filename . "_filename";
            $$component = $component_filename;
        }
        $doc_map_tools = $this->doc_map_tools;
        $postings_tools = $this->postings_tools;
        $last_entries_tools = $this->last_entries_tools;
        $this->doc_map = "";
        $this->doc_map_counter = 0;
        $this->postings = [];
        $this->last_entries = [];
        $this->positions = "";
        crawlLog("Indexer Preparing Index Map...");
        $index_map = $this->prepareIndexMap($partition);
        crawlLog("Done Prepare Index Map. Number of documents in mapped ".
            "partition:" . count($index_map));
        $cnt = 0;
        $non_aux_doc_cnt = 0;
        $link_cnt = 0;
        $num_partition = count($index_map);
        $doc_field = self::DOC_ID;
        $score_field = self::SCORE;
        $aux_docs_field = self::AUX_DOCS;
        $get_summaries_time = 0;
        $aux_get_summaries_time = 0;
        $safe_score_time = 0;
        $safe_meta_score_time = 0;
        $invert_pages_time = 0;
        $invert_metas_time = 0;
        $invert_links_time = 0;
        $this->extract_phrase_time = 0;
        foreach ($index_map as $hash_url => $url_info) {
            $site = [];
            $non_aux_doc_cnt++;
            if (!empty($url_info[$doc_field])) {
                $start_get_summaries = microtime(true);
                $site = $this->getSummary($url_info[$doc_field], $partition);
                $get_summaries_time += changeInMicrotime($start_get_summaries);
                if (empty($site) || !is_array($site)) {
                    continue;
                }
            }
            /* if $site still empty here then current group'd urls didn't have a
               document (downloaded webpage) amongst themselves
             */
            $max_description_len ??= C\MAX_DESCRIPTION_LEN;
            $max_description_len = (empty($site[self::DESCRIPTION])) ?
                $max_description_len : max($max_description_len,
                strlen($site[self::DESCRIPTION]));
            $metas_only = ($url_info[$aux_docs_field] == 'metas_only');
            $aux_description = "";
            $tmp_description = $site[self::DESCRIPTION] ?? "";
            if (isset($site[self::TYPE]) && $site[self::TYPE] == "link") {
                $site_url = $site[self::TITLE];
            } else {
                $site_url = str_replace('|', "%7C", $site[self::URL] ?? "");
            }
            if ($metas_only) {
                $start_safe_meta_time = microtime(true);
                if (PhraseParser::computeSafeSearchScore($tmp_description,
                        $site_url) < PhraseParser::SAFE_PHRASE_THRESHOLD) {
                    $site[self::IS_SAFE] = true;
                    $url_info[self::IS_SAFE] = true;
                } else {
                    $site[self::IS_SAFE] = false;
                    $url_info[self::IS_SAFE] = false;
                }
                $safe_meta_score_time +=
                    changeInMicrotime($start_safe_meta_time);
                $site[self::JUST_METAS] = true;
                $start_invert_metas = microtime(true);
                $site_url = $this->invertOneSite($site, $url_info, $link_cnt);
                $invert_metas_time +=
                    changeInMicrotime($start_invert_metas);
                continue;
            }
            /*
               Index pages that were hashed together or links to page
               before page itself.
             */
            $pre_aux_docs = explode("\xFF", $url_info[$aux_docs_field]);
            $aux_sites = [];
            foreach ($pre_aux_docs as $pre_aux_doc) {
                $aux_doc = decode255($pre_aux_doc);
                $start_get_summaries = microtime(true);
                $aux_site = $this->getSummary($aux_doc, $partition);
                $aux_get_summaries_time +=
                    changeInMicrotime($start_get_summaries);
                if (empty($aux_site) || !is_array($aux_site)) {
                    $aux_site = []; // make sure empty
                    continue;
                }
                $aux_site[self::JUST_METAS] = true;
                if (!empty($aux_site[self::DESCRIPTION])) {
                    if (strlen($aux_description) +
                        strlen($aux_site[self::DESCRIPTION]) <
                        $max_description_len) {
                        $aux_description .= " .. " .
                            $aux_site[self::DESCRIPTION];
                    }
                }
                $aux_sites[] = $aux_site;
            }
            unset($pre_aux_docs);
            if (empty($site) && !empty($aux_site) && is_array($aux_site)) {
                //use one left aux_site for site if site empty
                unset($aux_site[self::JUST_METAS]);
                $site = $aux_site;
                $site[self::DESCRIPTION] = "";
            }
            $site[self::DESCRIPTION] ??= "";
            $site[self::DESCRIPTION] .= $aux_description;
            $start_safe_time = microtime(true);
            if (PhraseParser::computeSafeSearchScore($site[self::DESCRIPTION],
                $site_url) < PhraseParser::SAFE_PHRASE_THRESHOLD) {
                $site[self::IS_SAFE] = true;
                $url_info[self::IS_SAFE] = true;
            } else {
                $site[self::IS_SAFE] = false;
                $url_info[self::IS_SAFE] = false;
            }
            $safe_score_time +=
                changeInMicrotime($start_safe_time);
            $cnt++;
            $start_invert_page = microtime(true);
            $site_url = $this->invertOneSite($site, $url_info, $link_cnt);
            $invert_pages_time += changeInMicrotime($start_invert_page);
            foreach ($aux_sites as $aux_site) {
                $cnt++;
                $start_invert_links = microtime(true);
                $site_url = $this->invertOneSite($aux_site, $url_info,
                    $link_cnt);
                $invert_links_time += changeInMicrotime($start_invert_links);
            }
            $memory_usage = memory_get_usage();
            $link_to = (isset($site[self::TYPE]) &&
                $site[self::TYPE] == "link") ? "LINK TO:" : "";
            $time_string = makeTimestamp();
            if ($site_url &&
                crawlTimeoutLog("..Indexer Still building inverted index ".
                    "for partition $partition \n" .
                    "$time_string ....Current Indexer Memory Usage is %s.\n" .
                    $time_string .
                    " ....Indexer has processed %s of %s documents.\n" .
                    $time_string .
                    " ....Total links or docs processed by Indexer is %s.\n" .
                    "$time_string ....Last url Indexer processed was %s.",
                $memory_usage, $non_aux_doc_cnt, $num_partition,
                $non_aux_doc_cnt + $cnt, $link_to . $site_url) &&
                $taking_too_long_touch) {
                if (file_exists($taking_too_long_touch)) {
                    touch($taking_too_long_touch, time());
                }
            }
        }
        if ($just_stats) {
            $term_stats = [];
            foreach ($this->postings as $term => $postings) {
                list($posting_records,) = $this->unpackPostings($postings);
                $term_stats[$term] = count($posting_records);
            }
            $statistics = [
                "NUM_DOCS" => $this->doc_map_counter,
                "NUM_LINKS" => $link_cnt,
                "TERM_STATISTICS" => $term_stats
            ];
            return $statistics;
        }
        $start_save_times = microtime(true);
        $doc_map_tools->save($doc_map_filename, $this->doc_map);
        ksort($this->postings);
        $postings_tools->save($postings_filename, $this->postings);
        $last_entries_tools->save($last_entries_filename, $this->last_entries);
        file_put_contents($positions_filename, $this->positions);
        $final_save_time = changeInMicrotime($start_save_times);
        $time_string = makeTimestamp();
        crawlLog("  Indexer build inverted index time ".
            changeInMicrotime($start_time) .
            "\n$time_string  ..Component times:" .
            "\n$time_string  ....Get page summaries time: $get_summaries_time" .
            "\n$time_string  ....Get link summaries time: " .
                $aux_get_summaries_time .
            "\n$time_string  ....Compute Safe Page time: $safe_score_time" .
            "\n$time_string  ....Compute Safe Meta time: $safe_meta_score_time".
            "\n$time_string  ....Invert pages time: $invert_pages_time" .
            "\n$time_string  ....Invert meta pages time: $invert_metas_time" .
            "\n$time_string  ....Invert links time: $invert_links_time" .
            "\n$time_string  ....Final file saves time: $final_save_time" .
            "\n$time_string  ----" .
            "\n$time_string  ....Of Invert times, time in " .
                "extractPhrasesInLists:". $this->extract_phrase_time);
        return true;
    }
    /**
     * Used to create inverted index for one site and add its information to
     * the current partition.
     *
     * @param array $site site to invert
     * @param array $url_info collection of url and hash's of documents which
     *   map to the same document
     * @param int &$link_cnt current count of number of links discovered so far
     * @return string $site_url canonical url for site
     */
    public function invertOneSite($site, $url_info, &$link_cnt)
    {
        $interim_time = microtime(true);
        if (!isset($site[self::HASH]) ||
            (isset($site[self::ROBOT_METAS]) &&
            in_array("JUSTFOLLOW", $site[self::ROBOT_METAS]))) {
            return "";
        }
        //this case  might occur on a recrawl
        if (isset($site[self::TYPE]) && $site[self::TYPE] == "link") {
            $is_link = true;
            $link_cnt++;
            $site_url = $site[self::TITLE];
            $host =  UrlParser::getHost($site_url);
            $link_parts = explode('|', $site[self::HASH]);
            if (isset($link_parts[5])) {
                $link_origin = $link_parts[5];
            } else {
                $link_origin = $site_url;
            }
            $url_info = [];
            if (!empty($site[self::LANG])) {
                $url_info[self::LANG] = $site[self::LANG];
            }
            $meta_ids = PhraseParser::calculateLinkMetas($site_url,
                $host, $site[self::DESCRIPTION], $link_origin,
                $url_info);
            $link_to = "LINK TO:";
        } else {
            $is_link = false;
            $site_url = str_replace('|', "%7C", $site[self::URL]);
            $meta_ids =  PhraseParser::calculateMetas($site);
            $link_to = "";
        }
        $word_lists = [];
        $host_keywords_end_pos = 0;
        $title_end_pos = 0;
        $path_keywords_end_pos = 0;
        $triplet_lists = [];
        /*
            self::JUST_METAS check to avoid getting sitemaps in results
            for popular words
         */
        $lang = null;
        if (!isset($site[self::JUST_METAS])) {
            $host_words = UrlParser::getWordsInHostUrl($site_url);
            $path_words = UrlParser::getWordsLastPathPartUrl(
                $site_url);
            if ($is_link) {
                $phrase_string = $site[self::DESCRIPTION];
            } else {
                if (isset($site[self::LANG])) {
                    if (isset($this->programming_language_extension[
                        $site[self::LANG]])) {
                        $phrase_string = $site[self::DESCRIPTION];
                    } else {
                        /* r6t was chosen as short enough not to be
                           changed by chargramming, but rare enough
                           that can be used as a useful splitter
                         */
                        $phrase_string = $host_words . " r6t ".
                            $site[self::TITLE] . " r6t ". $path_words .
                            " r6t ". $site[self::DESCRIPTION];
                    }
                } else {
                    $phrase_string = $host_words . " r6t " .
                        $site[self::TITLE] . " r6t " . $path_words .
                        " r6t ". $site[self::DESCRIPTION];
                }
            }
            /* at this point we have already extracted meta words,
               we attempt to compute the lang here as a value different
               from empty or mul for the purposes of stemming chargramming.
               (helps with extracting words from images because the image
                itself might have had few words to guess the lnaguage
                but when combined with inlinks it does)
             */
            if (empty($site[self::LANG]) || $site[self::LANG] == "mul") {
                $lang = guessLocaleFromString(
                    $site[self::DESCRIPTION]);
            } else {
                $lang = $site[self::LANG];
            }
            $word_and_qa_lists = PhraseParser::extractPhrasesInLists(
                $phrase_string, $lang);
            if (!isset($this->extract_phrase_time)) {
                $this->extract_phrase_time = 0;
            }
            $this->extract_phrase_time +=
                $word_and_qa_lists['TIMES']['TOTAL_TIME'] ?? 0;
            $word_lists = $word_and_qa_lists['WORD_LIST'];
            if (!empty($word_lists["r6t"][2])) {
                if ($path_keywords_end_pos < 255) {
                    $host_keywords_end_pos = $word_lists["r6t"][0];
                    $title_end_pos = $word_lists["r6t"][1];
                }
                $path_keywords_end_pos = $word_lists["r6t"][2];
                unset($word_lists["r6t"]);
            } else if (!empty($word_lists["r6t"])) {
                $path_keywords_end_pos = $word_lists["r6t"][
                    count($word_lists["r6t"]) - 1];
                unset($word_lists["r6t"]);
            }
        }
        $description_scores =
            (empty($site[self::DESCRIPTION_SCORES])) ? [] :
            $site[self::DESCRIPTION_SCORES];
        $user_ranks =
            (empty($site[self::USER_RANKS])) ? [] :
            $site[self::USER_RANKS];
        $num_words = 0;
        foreach($word_lists as $word => $position_list)
        {
            $num_words += count($position_list);
        }
        $doc_id = ($url_info[self::DOC_ID] ??
            ($url_info[self::AUX_DOCS][0] ?? ""));
        if (empty($doc_id)) {
            return "";
        }
        $this->addScoresDocMap($doc_id, $num_words,
            $url_info[self::SCORE], $host_keywords_end_pos, $title_end_pos,
            $path_keywords_end_pos, $description_scores, $user_ranks);
        $this->addTermPostingLists(0, $num_words,
            $word_lists, $meta_ids, $this->doc_map_counter);
        $this->doc_map_counter++;
        $interim_elapse = changeInMicrotime($interim_time);
        if ($interim_elapse > 5) {
            crawlLog("..Indexer Inverting " . $link_to . $site_url .
            "...took > 5s.");
        }
        return $site_url;
    }
    /**
     * Given a $site array of information about a web page/document. Use
     * CrawlConstant::URL and CrawlConstant::HASH fields to compute a
     * unique doc id for the array.
     *
     * @param array $site site to compute doc_id for
     * @return string the computedd doc_id
     */
    public static function computeDocId($site)
    {
        $doc_id = false;
        if (isset($site[self::TYPE]) && $site[self::TYPE] == "link") {
            $doc_id = $site[self::HTTP_CODE];
        } else {
            $letter_code = 'b';
            $main_type = (!empty($site[self::TYPE])) ?
                substr($site[self::TYPE], 0, 4) : "binary";
            if (!empty($site[self::IS_VIDEO])) {
                $letter_code = 'v';
            } else if ($main_type == "text" ) {
                $letter_code = 't';
            } else if ($main_type == "imag") {
                $letter_code = 'p';
            }
            $hash = $site[self::HASH];
            if ($letter_code == "t" && !empty($site[self::TITLE])) {
                $trim_title = trim($site[self::TITLE]);
                $hash = substr(crawlHash($trim_title, true), 0, 4) .
                    substr($hash, 4);
            }
            $site_url = str_replace('|', "%7C", $site[self::URL]);
            $host = UrlParser::getHost($site_url);
            $cld = UrlParser::getCompanyLevelDomain($site_url);
            if (in_array($site_url, ["https://$cld/",
                "https://www.$cld/", "http://$cld/", "http://www.$cld/"])) {
                $letter_code = chr(ord($letter_code) + 128);
            }
            $doc_id = crawlHash($site_url, true) . $hash .
                $letter_code . substr(crawlHash($host . "/", true), 1);
        }
        return $doc_id;
    }
    /**
     * Used to add a doci_id => doc_record to the current partition's
     * document map ($this->doc_map). A doc record records the number of words
     * in the document, an overall length of the document, the length of its
     * title, scores for each of the sentences included into the summary
     * for the documents, and classifier scores for each classifier that was
     * used by the crawl.
     *
     * @param string $doc_id new document id to add a record for
     * @param int $num_words number of terms in the document associated with the
     *  doc-id
     * @param float $score overall score for the important of this document
     * @param int $host_keywords_end_pos end of the  portion of the
     *  document summary containing terms coming from the hostname
     * @param int $title_end_pos end of the portion of the document
     *  summary containing terms in the title
     * @param int $path_keywords_end_pos length of the portion of the
     *  document summary containing terms in the url path
     * @param array $description_scores pairs of the form (length of summary
     *  portion, score for that portion)
     * @param array $user_ranks for each user defined classifier for this crawl
     *  the float score of the classifier on this document
     */
    public function addScoresDocMap($doc_id, $num_words, $score,
        $host_keywords_end_pos, $title_end_pos, $path_keywords_end_pos,
        $description_scores, $user_ranks)
    {
        $num_description_scores = count($description_scores);
        $preface_positions =
            ((($host_keywords_end_pos << 8) + $title_end_pos) << 8) +
            $path_keywords_end_pos;
        $out_rows = [["POS" => $num_words, "SCORE" => floatval($score)],
            ["POS" => $preface_positions, "SCORE" =>
                floatval($num_description_scores)]];
        foreach($description_scores as $position => $score) {
            $out_rows[] = ["POS" => $position, "SCORE" => floatval($score)];
        }
        foreach($user_ranks as $user_rank) {
            $out_rows[] = ["POS" => 0, "SCORE" => floatval($score)];
        }
        $entry = $this->doc_map_tools->pack($out_rows);
        $this->doc_map_tools->add($this->doc_map, $doc_id, $entry,
            PackedTableTools::ADD_MEM_TABLE_STRING);
    }
    /**
     * Adds posting records associated to a document to the posting lists for
     * a partition.
     *
     * @param int $position_offset number of header bytes that might be used
     *  before including any position data in the file that positions will
     *  eventually be stored.
     * @param int $doc_length length of document in terms for the document
     *  for which we are adding posting data.
     * @param array $word_lists term => positions within current document of
     *  that term for the document whose posting data we are adding
     * @param array $meta_ids meta terms associated with the document we are
     *  adding. An example, meta term might be "media:news"
     * @param int $doc_map_index which document within the partition is the one
     *  we are adding. I.e., 5 would mean there were 5 earlier documents whose
     *  postings we have already added.
     */
    public function addTermPostingLists($position_offset, $doc_length,
        $word_lists, $meta_ids, $doc_map_index)
    {
        $postings_tools = $this->postings_tools;
        $last_entries_tools = $this->last_entries_tools;
        foreach ($meta_ids as $meta_id) {
            $word_lists[$meta_id] = [];
        }
        foreach ($word_lists as $word => $position_list) {
            $term_id = canonicalTerm($word);
            $meta_prefix = substr($word, 0, 5);
            $site_meta = ($meta_prefix == "site:" || $meta_prefix == "info:");
            $occurrences = $site_meta ? $doc_length : count($position_list);
            if (!$site_meta && $occurrences > 0) {
                $encoded_position_list = encodePositionList($position_list);
                $offset = $position_offset + strlen($this->positions);
                $len = strlen($encoded_position_list);
                $this->positions .= $encoded_position_list;
            } else {
                $offset = 0;
                $len = 0;
            }
            $last_entry = $last_entries_tools->find($this->last_entries,
                $term_id);
            if (empty($last_entry)) {
                list($last_index, $last_offset, $num_occurrences) = [0, 0, 0];
            } else {
                $last_entry_row = $last_entries_tools->unpack($last_entry);
                list($last_index, $last_offset, $num_occurrences) =
                    array_values($last_entry_row[0]);
            }
            $diff_doc_map_index = $doc_map_index - $last_index;
            $diff_offset = (!$site_meta && $occurrences > 0) ?
                $offset - $last_offset : 0;
            $entry = $postings_tools->pack([
                "DOC_MAP_INDEX" => $diff_doc_map_index,
                "FREQUENCY" => $occurrences, "POSITIONS_OFFSET" => $diff_offset,
                "POSITIONS_LEN" => $len]);
            $postings_tools->add($this->postings, $term_id, $entry,
                PackedTableTools::ADD_MEM_TABLE, PackedTableTools::APPEND_MODE);
            $add_entry = $last_entries_tools->pack(
                ["LAST_INDEX" => $doc_map_index, "LAST_OFFSET" => $offset,
                "NUM_OCCURRENCES" => $num_occurrences + $occurrences]);
            $last_entries_tools->add($this->last_entries, $term_id, $add_entry);
        }
    }
    /**
     * Checks if a doc_id $key is that of a host url.
     * I.e., a url https://www.yahoo.com/ as opposed to
     * https://www.yahoo.com/foo
     * @param string $key to check if doc or not
     */
    public static function isAHostDocId($key)
    {
        if (strlen($key) == self::DOCID_LEN && substr($key, 1, 7) ==
            substr($key, -7)) {
            return true;
        }
        return false;
    }
    /**
     * Checks if a doc_id $key is that of a Company level domain (cld) or
     * www.cld.
     * I.e., a url https://yahoo.com/  or https://www.yahoo.com/ as opposed to
     * https://foo.yahoo.com/
     * @param string $key to check if doc or not
     */
    public static function isACldDocId($key)
    {
        return (ord($key[self::DOCID_PART_LEN << 1] ?? '\0') & 128) > 0;
    }
    /**
     * Checks if a doc_id corresponds to a particular large scale type among
     * external_link, internal_link, link (union of previous two),
     * binary, feed, image, text, video, document (union of previous five)
     *
     * @param string $key to check if doc or not
     * @param string|array if a string then a particular type from above list
     *  to check against if array then an array of types to check against
     * @return bool true if a document
     */
    public static function isType($key, $types)
    {
        $type_map = [
            "b" => "binary",
            "d" => "old_doc",
            "e" => "external_link",
            "f" => "feed",
            "i" => "internal_link",
            "l" => "old_link",
            "p" => "image",
            "t" => "text",
            "v" => "video",
        ];
        if (is_string($types)) {
            $types = [$types];
        }
        if (in_array("link", $types)) {
            $types = array_merge($types, ["external_link", "internal_link",
                "old_link"]);
        } else if (in_array("doc", $types) || in_array("document", $types)) {
            $types = array_merge($types, ["binary", "feed", "image",
                "old_doc", "text", "video"]);
        }
        $key_type = chr(ord($key[self::DOCID_PART_LEN << 1] ?? 0) & 127);
        return in_array($type_map[$key_type] ?? "old_link", $types);
    }
    /**
     * As pre-step to calculating the inverted index information for a partition
     * this method groups documents and links to documents into single objects.
     * It also does simple deduplication of documents that have the same hash.
     * It then returns an array of the grouped document data.
     * Grouping is done by giving a score to each document based on
     * (number of doc in index - order doc added). For two entries with
     * the same hash_url, a document will be chosen over a link as the
     * representative; otherwise, the one with higher score will be chosen as
     * the representative. The representative document is given the sum of
     * the scores of its constituents. A second phase where documents are
     * grouped by hash of the text body is also done. Finally, the returned
     * documents are sorted by their scores. So the order of documents from
     * this process is roughly in the order of importance.
     *
     * @param int $partition index of partition to do deduplication for
     *  in the case that test index is empty
     * @param array $test_index is non-null only when doing testing of what
     *  this method does. In which case, it should consist of an array
     *  of $doc_id => string represent a possible record for that doc.
     *  As deduplication is done entirely based on component of the doc_id
     *  (hash_url, doc_type, hash_doc, hash_host) the string doesn't matter
     *  too much.
     * @return array groups doc_id => records associated with that doc_id
     */
    public function prepareIndexMap($partition, $test_index = [])
    {
        if (empty($test_index)) {
            $doc_index = $this->documents->loadPartitionIndex($partition, true);
        } else {
            $doc_index = $test_index;
        }
        if (empty($doc_index)) {
            return [];
        }
        $doc_ids = array_keys($doc_index);
        $num_ids = count($doc_ids);
        $grouped_urls = [];
        $grouped_hashes = [];
        $score = $num_ids;
        $doc_key_len = self::DOCID_PART_LEN;
        $doc_field = self::DOC_ID;
        $score_field = self::SCORE;
        $aux_docs_field = self::AUX_DOCS;
        foreach ($doc_ids as $doc_id) {
            list($hash_url, $hash_code, ) = str_split($doc_id, $doc_key_len);
            $current_grouped_urls = $grouped_urls[$hash_url] ??
                [$aux_docs_field => "", $score_field => 0];
            $current_grouped_hashes = $grouped_hashes[$hash_code] ?? "";
            if (!$this->isType($doc_id, "link")) {
                $current_grouped_hashes .= "\xFF". encode255($hash_url);
                $current_grouped_urls[$doc_field] = $doc_id;
            } else {
                $current_grouped_urls[$aux_docs_field] .= "\xFF".
                    encode255($doc_id);
            }
            $current_grouped_urls[$score_field] += $score;
            if (!empty($current_grouped_hashes)) {
                $grouped_hashes[$hash_code] = $current_grouped_hashes;
            }
            $grouped_urls[$hash_url] = $current_grouped_urls;
            $score--;
        }
        foreach ($grouped_hashes as $pre_same_hash_group) {
            if (strlen($pre_same_hash_group) <= 2 * $doc_key_len) {
                continue;
            }
            $max_score = 0;
            $max_url = "";
            $same_hash_group = explode("\xFF", $pre_same_hash_group);
            foreach ($same_hash_group as $pre_hash_url) {
                if (empty($pre_hash_url)) {
                    continue;
                }
                $hash_url = decode255($pre_hash_url);
                $hash_score = $grouped_urls[$hash_url][$score_field];
                if ($hash_score > $max_score) {
                    $max_score = $hash_score;
                    $max_url = $hash_url;
                }
            }
            $max_group = $grouped_urls[$max_url];
            foreach ($same_hash_group as $pre_hash_url) {
                if (empty($pre_hash_url)) {
                    continue;
                }
                $hash_url = decode255($pre_hash_url);
                if ($hash_url != $max_url) {
                    $hash_group = $grouped_urls[$hash_url];
                    $max_group[$score_field] += $hash_group[$score_field];
                    if ($max_group[$aux_docs_field] != "metas_only") {
                        $max_group[$aux_docs_field] .= "\xFF" .
                            encode255($hash_group[$doc_field]) .
                            $hash_group[$aux_docs_field];
                    }
                    $hash_group[$aux_docs_field] = "metas_only";
                    $grouped_urls[$hash_url] = $hash_group;
                }
            }
            $grouped_urls[$max_url] = $max_group;
        }
        unset($grouped_hashes);
        uasort($grouped_urls, function ($a, $b) use ($score_field) {
            return intval($b[$score_field] - $a[$score_field]);
        });
        return $grouped_urls;
    }
    /**
     * Forces the current shard to be saved
     */
    public function forceSave()
    {
        $this->buildInvertedIndexPartition();
    }
    /**
     * Used when a crawl stops to perform final dictionary operations
     * to produce a working stand-alone index.
     */
    public function stopIndexing()
    {
        $this->forceSave();
    }
    /**
     * Gets an array of posting list positions for each shard in the
     * bundle $index_name for the word id $term_id
     *
     * @param string $term_id id of phrase or word to look up in bundle
     *     dictionary
     * @param int $threshold after the number of results exceeds this amount
     *     stop looking for more dictionary entries.
     * @param int $start_generation what generation in the index to start
     *      finding occurrence of phrase from
     * @param int $num_distinct_generations from $start_generation how
     *      many generation to search forward to
     * @param bool $with_remaining_total whether to total number of
     *      postings found as well or not
     * @return array either [total, sequence of four tuples]
    *       or sequence of four tuples:
     *      (index_shard generation, posting_list_offset, length, exact id
     *      that match $term_id)
     */
    public function getWordInfo($term_id, $threshold = -1,
        $offset = 0, $num_partitions = -1, $with_remaining_total = false)
    {
        $dictionary = $this->dictionary ?? [];
        if (!$dictionary) {
            return [];
        }
        $result = $dictionary->get($term_id, true, true, true, false, $offset,
            $num_partitions);
        if (empty($result)) {
            $result = [];
        }
        $max_found_partition = 0;
        $doc_count = 0;
        $occurrence_count = 0;
        $num_rows = 0;
        $threshold_met = false;
        $save_partition = $this->documents->parameters["SAVE_PARTITION"];
        if (empty($result['ROWS'])) {
            $result['ROWS'] = [];
        }
        foreach ($result['ROWS'] as $row) {
            if ($threshold > 0 && $doc_count > $threshold) {
                $result['ROWS'] = array_slice($result['ROWS'], 0, $num_rows);
                $threshold_met = true;
                break;
            }
            $max_found_partition = ($max_found_partition < $row['PARTITION']) ?
                $row['PARTITION'] : $max_found_partition;
            $doc_count += $row['NUM_DOCS'];
            $occurrence_count += $row['NUM_OCCURRENCES'];
            $num_rows++;
        }
        $parameters = $this->documents->parameters;
        $result['AVG_ITEMS_PER_PARTITION'] = $doc_count/max($num_rows, 1.0);
        $result['TOTAL_NUM_DOCS'] = $parameters["VISITED_URLS_COUNT"] ?? 0;
        $result['TOTAL_NUM_LINKS_AND_DOCS'] = $parameters["ACTIVE_COUNT"] +
            $parameters["COUNT"];
        $result['MAX_ITEMS_PER_PARTITION'] = $parameters["MAX_ITEMS_PER_FILE"];
        $result['TOTAL_NUMBER_OF_PARTITIONS'] = $parameters["SAVE_PARTITION"]
            + 1;
        if ($threshold_met) {
            $fraction_seen = ($save_partition - $offset) /
                ($max_found_partition - $offset);
            $result['TOTAL_COUNT'] = $fraction_seen * $doc_count;
            $result['TOTAL_OCCURRENCES'] = $fraction_seen * $occurrence_count;
            $result['THESHOLD_EXCEEDED'] = true;
            return $result;
        }
        $base_folder = $this->getPartitionBaseFolder($save_partition);
        $postings_filename = $base_folder . "/" . self::POSTINGS_FILENAME;
        $postings_tool = $this->postings_tools;
        if (file_exists($postings_filename)) {
            $active_dictionary =
                $postings_tool->load($postings_filename,
                $postings_tool::AS_STRING_MODE, true);
            $active_postings_entry =
                $postings_tool->findRowFromKeyTableString($active_dictionary,
                $term_id);
            $active_postings = (empty($active_postings_entry)) ? [] :
                $postings_tool->unpack($active_postings_entry);
        }
        if (!empty($active_postings)) {
            $row = ["PARTITION" => $save_partition,
                "NUM_DOCS" => count($active_postings),
                "POSTINGS" => $active_postings];
            $doc_count += $row["NUM_DOCS"];
            $active_occurrences = $this->deDeltaPostingsSumFrequencies(
                $row["POSTINGS"]);
            $row['NUM_OCCURRENCES'] = $active_occurrences;
            $occurrence_count += $active_occurrences;
            $result['ROWS'][] = $row;
        }
        $result['TOTAL_COUNT'] = $doc_count;
        $result['TOTAL_OCCURRENCES'] = $occurrence_count;
        return $result;
    }
    /**
     * Get the postings stored in the postings file in a partition from
     * $offset to $offset+len remove the 255 encoding.
     *
     * @param int $partition partition to retrieve posting from
     * @param int $offset byte offset int partition/postings file to look for
     *  them
     * @param int $len length of the posting list to retrieve.
     * @return string encoded posting list data -- vbyte encoded number of
     *  postings, followed by the posting data in PacktableTools format
     */
    public function getPostingsString($partition, $offset, $len)
    {
        static $file_handles = [];
        if (empty($file_handles[$partition])) {
            $postings_filename = $this->getPartitionBaseFolder($partition) .
                "/" . IndexDocumentBundle::POSTINGS_FILENAME;
            if (file_exists($postings_filename)) {
                $fh = fopen($postings_filename, "r");
            } else {
                return "";
            }
            $file_handles[$partition] = $fh;
        } else {
            $fh = $file_handles[$partition];
        }
        if ($fh && fseek($fh, $offset) == 0 && $len > 0) {
            $out = fread($fh, $len);
            return $out;
        }
        return "";
    }
    /**
     * Given the postings as a string for a partition for a term unpacks them
     * into an array of postings, doing de-delta of doc_map_indices and
     * de-delta of positions. Each posting represents occurrence of a term
     * in a documents, so the frequency component  is the number of occurrences
     * of the term in the document. This method also computes the sum of these
     * requencies over all postings in partition.
     *
     * @param string $postings_string compress string representation of a
     *   set of postings for a term
     * @return array a pair [array of unpacked postings, sum of frequencies
     *   of all the postings]
     */
    public function unpackPostings($postings_string)
    {
        $unpack_map = $this->unpack_map;
        $unpack_len_map = $this->unpack_len_map;
        $current_pos = 0;
        $num_items = vByteDecode($postings_string, $current_pos);
        if (empty($postings_string)) {
            return [];
        }
        $items = [];
        $sum_frequencies = 0;
        $doc_map_index = 0;
        $positions_offset = 0;
        $len_posting_strings = strlen($postings_string);
        for ($i = 0; $i < $num_items; $i++) {
            if (!isset($postings_string[$current_pos])) {
                crawlLog("Posting decode error");
                break;
            }
            $int_info = ord($postings_string[$current_pos]);
            $current_pos++;
            $len_unpack_info = $unpack_len_map[$int_info];
            if ($current_pos + $len_unpack_info > $len_posting_strings) {
                crawlLog("Posting decode error");
                break; //sanity check break
            }
            $pre_item = unpack($unpack_map[$int_info], $postings_string,
                $current_pos);
            $item = $pre_item;
            $item["DOC_MAP_INDEX"] += $doc_map_index;
            $item["POSITIONS_OFFSET"] += $positions_offset;
            $doc_map_index += $pre_item["DOC_MAP_INDEX"];
            $positions_offset += $pre_item["POSITIONS_OFFSET"];
            $sum_frequencies += $pre_item["FREQUENCY"];
            $current_pos += $len_unpack_info;
            $items[] = $item;
        }
        return [$items, $sum_frequencies];
    }
    /**
     * Within postings DOC_MAP_INDEX and POSITION_OFFSETS to position lists are
     * stored as delta lists (difference over previous values), this method
     * undoes the delta list to restore the actual DELTA_DOC_MAP_INDEX and
     * POSITION_OFFSETS values. It also computes the of the frequencies of items
     * within the list of postings. This method is current only used for
     * active partition in an index (the one whose terms haven't yet been added
     * to the B+-tree).
     *
     * @param array &$postings a reference to an array of posting lists for a
     *  term (this will be changed by this method)
     * @return int sum of the frequencies of term occurrences as given by the
     *  above postings
     */
    public function deDeltaPostingsSumFrequencies(&$postings)
    {
        if (empty($postings) || !is_array($postings)) {
            return 0;
        }
        list($doc_map_index, $sum_frequencies, $positions_offset) =
            array_values($postings[0]);
        $num_postings = count($postings);
        for ($i = 1; $i < $num_postings; $i++) {
            $posting = & $postings[$i];
            list($doc_map_delta, $frequency, $positions_delta) =
                array_values($posting);
            $sum_frequencies += $frequency;
            $doc_map_index += $doc_map_delta;
            $positions_offset += $positions_delta;
            $posting["DOC_MAP_INDEX"] = $doc_map_index;
            $posting["POSITIONS_OFFSET"] = $positions_offset;
        }
        return $sum_frequencies;
    }
    /**
     * Gets the description, count of documents, and number of partitions of the
     * documents store in the supplied directory. If the file
     * arc_description.txt exists, this is viewed as a dummy index archive for
     * the sole purpose of allowing conversions of downloaded data such as arc
     * files into Yioop! format.
     *
     * @param string $dir_name path to a directory containing a documents
     *      IndexDocumentBundle
     * @return array summary of the given archive
     */
    public static function getArchiveInfo($dir_name)
    {
        if (file_exists($dir_name . "/arc_description.txt")) {
            $crawl = [];
            $info = [];
            $crawl['DESCRIPTION'] = substr(
                file_get_contents($dir_name . "/arc_description.txt"), 0, 256);
            $crawl['ARCFILE'] = true;
            $info['VISITED_URLS_COUNT'] = 0;
            $info['COUNT'] = 0;
            $info['NUM_DOCS_PER_PARTITION'] = 0;
            $info['WRITE_PARTITION'] = 0;
            $info["VERSION"] = self::DEFAULT_VERSION;
            $info['DESCRIPTION'] = serialize($crawl);
            return $info;
        }
        $info_path = $dir_name . "/" . self::ARCHIVE_INFO_FILE;
        if (!file_exists($info_path)) {
            $info = [];
            $info['DESCRIPTION'] =
                "Archive does not exist OR Archive description file not found";
            $info['COUNT'] = 0;
            $info['NUM_DOCS_PER_PARTITION'] = -1;
            $info["VERSION"] = self::DEFAULT_VERSION;
            return $info;
        }
        $info = unserialize(file_get_contents($info_path));
        if (!is_array($info)) {
            $info = [];
        }
        $table_info = PartitionDocumentBundle::getParameterInfo($dir_name . "/".
            self::DOCUMENTS_FOLDER);
        if (!is_array($table_info)) {
            $table_info = [];
        }
        $info = array_diff_key($info, $table_info);
        $info = array_merge($table_info, $info);
        return $info;
    }
    /**
     * Sets the archive info struct for the web archive bundle associated with
     * this bundle. This struct has fields like: DESCRIPTION
     * (serialized store of global parameters of the crawl like seed sites,
     * timestamp, etc).
     *
     * @param string $dir_name folder with archive bundle
     * @param array $update_info struct with above fields
     */
    public static function setArchiveInfo($dir_name, $update_info)
    {
        $archive_info_path = $dir_name. "/" . self::ARCHIVE_INFO_FILE;
        if (file_exists($archive_info_path)) {
            $info = self::getArchiveInfo($dir_name);
        }
        if (empty($info) || !is_array($info)) {
            $info = [];
        }
        $pdb_info = [];
        $got_pdb_info = false;
        if (!empty($info)) {
            $doc_folder =  $dir_name. "/" . self::DOCUMENTS_FOLDER;
            if (file_exists($doc_folder)) {
                $pdb_info = PartitionDocumentBundle::getParameterInfo(
                    $doc_folder);
                if (!empty($pdb_info)) {
                    $got_pdb_info = true;
                }
                // avoid getting same data (_COUNTS) stored in two locations
                if (!empty($info) && !empty($pdb_info)) {
                    $info = array_diff_key($info, $pdb_info);
                }
            }
        }
        $pdb_change = false;
        foreach ($update_info as $field => $value) {
            if (isset($pdb_info[$field])) {
                $pdb_info[$field] = $value;
                $pdb_change = true;
            }
            $info[$field] = $value;
        }
        if (empty($info["VERSION"])) {
            $info["VERSION"] = self::DEFAULT_VERSION;
        }
        file_put_contents($archive_info_path, serialize($info));
        if ($got_pdb_info && $pdb_change) {
            $parameter_path = $doc_folder . "/" .
                PartitionDocumentBundle::PARAMETERS_FILE;
            file_put_contents($parameter_path, serialize($pdb_info),
                LOCK_EX);
        }
    }
    /**
     * Returns the last time the archive info of the bundle was modified.
     *
     * @param string $dir_name folder with archive bundle
     * @returb mixed either time if file exists or false
     */
    public static function getParamModifiedTime($dir_name)
    {
        $doc_param_path = $dir_name . "/" . self::DOCUMENTS_FOLDER . "/" .
            PartitionDocumentBundle::PARAMETERS_FILE;
        if (file_exists($doc_param_path)) {
            clearstatcache();
            return filemtime($doc_param_path);
        }
        return false;
    }
}
ViewGit