Last commit for src/library/FeedDocumentBundle.php: 55621f89eb585b515f5c0b94348a13fae5fefd9c

fixes a getPostingsString bug where string needed to be decode255'd, remove a lot of the code for serving results for older index formats

Chris Pollett [2024-01-24 05:Jan:th]
fixes a getPostingsString bug where string needed to be decode255'd, remove a lot of the code for serving results for older index formats
<?php
/**
 * SeekQuarry/Yioop --
 * Open Source Pure PHP Search Engine, Crawler, and Indexer
 *
 * Copyright (C) 2009 - 2023  Chris Pollett chris@pollett.org
 *
 * LICENSE:
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 *
 * END LICENSE
 *
 * @author Chris Pollett chris@pollett.org
 * @license https://www.gnu.org/licenses/ GPL3
 * @link https://www.seekquarry.com/
 * @copyright 2009 - 2023
 * @filesource
 */
namespace seekquarry\yioop\library;

use seekquarry\yioop\configs as C;

/**
 * Used for crawlLog, crawlHash, and garbageCollect
 */
require_once __DIR__ . '/Utility.php';
/**
 * Subclass of IndexDocumentBundle with bloom filters to make it easy to check
 * if a news feed item has been added to the bundle already before adding it
 *
 * @author Chris Pollett
 */
class FeedDocumentBundle extends IndexDocumentBundle
{
   /**
     * how long in seconds before a feed item expires
     */
    const OLD_ITEM_TIME = 4 * C\ONE_WEEK;
    /**
     * Reference to a DatasourceManager to communicate with the database to
     * get a list of search sources (news feeds) associated with this
     * feed bundle
     * @var DatasourceManager
     */
    public $db;
    /**
     * Array of  information about the search sources (news feeds) that
     * were used to collect news items stored in this bundle
     * @var array
     */
    public $feeds;
    /**
     * Used to store unique identifiers of feed items that have been stored
     * in this FeedArchiveBundle. This filter_a is used for checking if items
     * are already in the archive, when it has URL_FILTER_SIZE/2 items
     * filter_b is added to as well as filter_a. When filter_a is of size
     * URL_FILTER_SIZE filter_a is deleted, filter_b is renamed to filter_a
     * and the process is repeated.
     * @var BloomFilterFile
     */
    public $filter_a;
    /**
     * Auxiliary BloomFilterFile used in checking if feed items are in this
     * archive or not. @see $filter_a
     * @var BloomFilterFile
     */
    public $filter_b;
    /**
     * Makes or initializes an FeedArchiveBundle with the provided parameters
     *
     * @param string $dir_name folder name to store this bundle
     * @param bool $read_only_archive whether to open archive only for reading
     *  or reading and writing
     * @param string $description a text name/serialized info about this
     *      IndexDocumentBundle
     * @param int $num_docs_per_partition the number of pages to be stored
     *      in a single shard
     */
    public function __construct($dir_name, $db, $read_only_archive = true,
        $description = null, $num_docs_per_partition =
        C\NUM_DOCS_PER_PARTITION)
    {
        parent::__construct($dir_name, $read_only_archive, $description,
            $num_docs_per_partition);
        $this->feeds = []; // set by FeedsUpdateJob
        $this->db = $db;
        if (file_exists($dir_name . "/filter_a.ftr")) {
            $this->filter_a = BloomFilterFile::load($dir_name .
                "/filter_a.ftr");
            $this->filter_a->filename = $dir_name . "/filter_a.ftr";
        } else {
            $this->filter_a = new BloomFilterFile($dir_name . "/filter_a.ftr",
                C\URL_FILTER_SIZE);
            set_error_handler(null);
            @chmod($dir_name . "/filter_a.ftr", 0755);
            set_error_handler(C\NS_CONFIGS . "yioop_error_handler");
        }
        if (file_exists($dir_name . "/filter_b.ftr")) {
            $this->filter_a = BloomFilterFile::load($dir_name .
                "/filter_b.ftr");
            $this->filter_b->filename = $dir_name . "/filter_b.ftr";
        } else {
            $this->filter_b = null;
        }
    }
    /**
     * Adds pages of feed items to document bundle and adds their unique hashes
     * (guids)) to bloom filters so they are not reindexed
     *
     * @param array $pages array of feed items
     * @param int $visited_urls_count number of feed items
     * @return bool whether or not succeeded in adding pages
     */
    public function addPagesAndSeenKeys($pages, $visited_urls_count)
    {
        foreach ($pages as $page) {
            $key = $page[self::SUMMARY][self::HASH];
            $this->addFilters($key);
        }
        return parent::addPages($pages, $visited_urls_count);
    }
    /**
     * Adds the key (often GUID) of a feed item to the bloom filter pair
     * associated with this archive. This always adds to filter a, if
     * filter a is more than half full it adds to filter b. If filter a is full
     * it is deletedand filter b is renamed filter a and te process continues
     * where a new filter b is created when this becomee half full.
     * @param string $key unique identifier of a feed item
     */
    public function addFilters($key)
    {
        if ($this->filter_a->count > C\URL_FILTER_SIZE/2 &&
            !$this->filter_b) {
            if (file_exists($this->dir_name . "/filter_b.ftr")) {
                $this->filter_b = BloomFilterFile::load($dir_name .
                    "/filter_b.ftr");
            } else {
                $this->filter_b = new BloomFilterFile(
                    $this->dir_name . "/filter_b.ftr", C\URL_FILTER_SIZE);
                chmod($dir_name . "/filter_a.ftr", 0755);
            }
        }
        if ($this->filter_a->count > C\URL_FILTER_SIZE) {
            unlink($this->dir_name . "/filter_a.ftr");
            rename($this->dir_name . "/filter_b.ftr",
                $this->dir_name . "/filter_a.ftr");
        }
        $this->filter_a->add($key);
        if ($this->filter_b) {
            $this->filter_b->add($key);
        }
    }
    /**
     * Copies all feeds items newer than $age to a new shard, then deletes
     * old index shard and database entries older than $age. Finally sets copied
     * shard to be active. If this method is going to take max_execution_time/2
     * it returns false, so an additional job can be schedules; otherwise
     * it returns true
     *
     * @param int $partition bundle partition to build inverted index for
     * @param string $taking_too_long_touch name of file to touch if building
     *  inverted index takes too long (whether SCHEDULE_DIR/crawl_status.txt)
     *  has been recently modified is used in crawling to see if have run out
     *  of new data and the crawl can stopped.
     * @param bool $just_stats whether to just compute stats on the inverted
     *      or to actually  save the results
     * @return mixed whether job executed to completion (true or false) if
     *      !$just_stats, otherwise, an array with NUM_DOCS, NUM_LINKS,
     *      and TERM_STATISTICS (the latter having term frequency info)
     */
    public function buildInvertedIndexPartition($partition = -1,
        $taking_too_long_touch = null, $just_stats = false)
    {
        $age = self::OLD_ITEM_TIME;
        $pre_feeds = $this->feeds ?? [];
        if ($partition < 0) {
            $partition = $this->documents->parameters["SAVE_PARTITION"];
        }
        $dockeys_filename = $this->documents->getPartitionIndex($partition);
        if(!file_exists($dockeys_filename)) {
            crawlLog("----.. No new items to index returning.");
            return false;
        }
        crawlLog("----.. Loading guids of items to index.");
        $doc_index = $this->documents->loadPartitionIndex($partition, true);
        $doc_ids = array_keys($doc_index);
        $num_ids = count($doc_ids);
        crawlLog("----.. Loaded $num_ids items in partition.");
        $feeds = [];
        foreach ($pre_feeds as $pre_feed) {
            if (!isset($pre_feed['NAME'])) {
                continue;
            }
            $feeds[$pre_feed['NAME']] = $pre_feed;
        }
        $time = time();
        $num_sites = 0;
        $completed = true;
        crawlLog("----.. Creating inverted index of items.");
        $i = 0;
        $term_counts = [];
        $seen_url_count = 0;
        $base_folder = $this->getPartitionBaseFolder($partition);
        if (!file_exists($base_folder)) {
            mkdir($base_folder);
        }
        $doc_map_tools = $this->doc_map_tools;
        $postings_tools = $this->postings_tools;
        $last_entries_tools = $this->last_entries_tools;
        $doc_map_filename = $base_folder . "/" . self::DOC_MAP_FILENAME;
        $postings_filename = $base_folder . "/" . self::POSTINGS_FILENAME;
        $positions_filename = $base_folder . "/" . self::POSITIONS_FILENAME;
        $last_entries_filename =  $base_folder . "/" .
            self::LAST_ENTRIES_FILENAME;
        $this->doc_map = "";
        $this->doc_map_counter = 0;
        $this->postings = [];
        $this->positions = "";
        $this->last_entries = [];
        for ($i = 0; $i < $num_ids; $i++) {
            $doc_id = $doc_ids[$i];
            $item = $this->getSummary($doc_id, $partition);
            if (crawlTimeoutLog(
                "----..have added %s items to new index.", $i) &&
                $taking_too_long_touch) {
                if (file_exists($taking_too_long_touch)) {
                    touch($taking_too_long_touch, time());
                }
            }
            $i++;
            if (!isset($item[self::SOURCE_NAME])) {
                continue;
            }
            $source_name = $item[self::SOURCE_NAME];
            if (isset($feeds[$source_name])) {
                $lang = $feeds[$source_name]['LANGUAGE'];
                $media_category = $feeds[$source_name]['CATEGORY'];
                $source_stop_regex =
                    $feeds[$source_name]['TRENDING_STOP_REGEX'];
            } else {
                $lang = C\DEFAULT_LOCALE;
                $media_category = "news";
                $source_stop_regex = "";
            }
            /* r6t was chosen as short enough not to be
               changed by chargramming, but rare enough
               that can be used as a useful splitter
             */
            $phrase_string = $item[self::TITLE] . " r6t " .
                $item[self::DESCRIPTION];
            $word_and_qa_lists = PhraseParser::extractPhrasesInLists(
                $phrase_string, $lang);
            $meta_ids = $this->calculateMetas($lang, $item[self::PUBDATE],
                $source_name, $item[self::HASH], $media_category);
            $len = strlen($phrase_string);
            $word_list = $word_and_qa_lists["WORD_LIST"];
            if (PhraseParser::computeSafeSearchScore($phrase_string,
                $item[self::URL]) < PhraseParser::SAFE_PHRASE_THRESHOLD) {
                $meta_ids[] = "safe:true";
                $meta_ids[] = "safe:all";
            } else {
                $meta_ids[] = "safe:false";
                $meta_ids[] = "safe:all";
            }
            $title_length = 0;
            if (!empty($word_lists["r6t"][0])) {
                $title_length = $word_lists["r6t"][0] + 1;
                unset($word_lists["r6t"]);
            }
            $num_words = 0;
            foreach($word_list as $word => $position_list)
            {
                $num_words += count($position_list);
            }
            $this->addScoresDocMap($doc_id, $num_words,
                intval($item[self::PUBDATE]), 0, $title_length, $title_length,
                [], []);
            $this->addTermPostingLists(0, $num_words,
                $word_list, $meta_ids, $this->doc_map_counter);
            $this->doc_map_counter++;
            $this->updateTrendingTermCounts($term_counts, $phrase_string,
                $word_list, $media_category, $source_name, $lang,
                $item[self::PUBDATE], $source_stop_regex);
        }
        if ($just_stats) {
            $term_stats = [];
            foreach ($this->postings as $term => $postings) {
                $posting_records = $postings_tools->unpack($postings);
                $term_stats[$term] = count($posting_records);
            }
            $statistics = [
                "NUM_DOCS" => count($this->doc_map),
                "NUM_LINKS" => 0,
                "TERM_STATISTICS" => $term_stats
            ];
            return $statistics;
        }
        unset($term_counts['seen']);
        $this->addTermCountsTrendingTable($term_counts);
        $doc_map_tools->save($doc_map_filename, $this->doc_map);
        $postings_tools->save($postings_filename, $this->postings);
        $last_entries_tools->save($last_entries_filename, $this->last_entries);
        file_put_contents($positions_filename, $this->positions);
        set_error_handler(null);
        set_error_handler(C\NS_CONFIGS . "yioop_error_handler");
    }
    /**
     * Given a $site array of information about a web page/document. Use
     * CrawlConstant::URL and CrawlConstant::HASH fields to compute a
     * unique doc id for the array.
     *
     * @param array $site site to compute doc_id for
     * @return string doc_id
     */
    public static function computeDocId($site)
    {
        $raw_guid = unbase64Hash($site[self::HASH]);
        $doc_id = crawlHash($site[self::URL], true) .
            $raw_guid . "f" . substr(crawlHash(
            UrlParser::getHost($site[self::URL]) . "/", true), 1);
        return $doc_id;
    }
    /**
     * Used to calculate the meta words for RSS feed items
     *
     * @param string $lang the locale_tag of the feed item
     * @param int $pubdate UNIX timestamp publication date of item
     * @param string $source_name the name of the feed
     * @param string $guid the guid of the item
     * @param string $media_category determines what media: metas to inject.
     *      Default is news.
     *
     * @return array $meta_ids meta words found
     */
    public function calculateMetas($lang, $pubdate, $source_name, $guid,
        $media_category = "news")
    {
        $meta_ids = ["media:all", "media:$media_category",
            "media:$media_category:" . urlencode( mb_strtolower($source_name)),
            "guid:" . strtolower($guid)];
        $meta_ids[] = 'date:all';
        $meta_ids[] = 'date:' . date('Y', $pubdate);
        $meta_ids[] = 'date:' . date('Y-m', $pubdate);
        $meta_ids[] = 'date:' . date('Y-m-d', $pubdate);
        $meta_ids[] = 'date:' . date('Y-m-d-H', $pubdate);
        $meta_ids[] = 'date:' . date('Y-m-d-H-i', $pubdate);
        $meta_ids[] = 'date:' . date('Y-m-d-H-i-s', $pubdate);
        $meta_ids[] = 'lang:all';
        if ($lang != "") {
            $lang_parts = explode("-", $lang);
            $meta_ids[] = 'lang:' . $lang_parts[0];
            if (isset($lang_parts[1])) {
                $meta_ids[] = 'lang:' . $lang;
            }
        }
        return $meta_ids;
    }
    /**
     * Whether the active filter for this feed contain thee feed item
     * of thee supplied key
     * @param string $key the feed item id to check if in archive
     * @return bool true if it is in the archive, false otherwise
     */
    public function contains($key)
    {
        return $this->filter_a->contains($key);
    }
    /**
     * Forces the current shard to be saved
     */
    public function forceSave()
    {
        $this->buildInvertedIndexPartition();
        $this->filter_a->save();
        chmod($this->dir_name . "/filter_a.ftr", 0777);
        if ($this->filter_b) {
            $this->filter_b->save();
            chmod($this->dir_name . "/filter_b.ftr", 0777);
        }
    }
    /**
     * Updates trending term counts based on the string from the current
     * feed item.
     *
     * @param array &$term_counts lang => [term => occurrences]
     * @param string $source_phrase original non-stemmed phrase from feed
     *      item to adjust $term_counts with. Used to remember non-stemmed
     *      terms. We assume we have already extracted position lists from
     * @param array $word_or_phrase_list associate array of
     *      stemmed_word_or_phrase => positions in feed item of where occurs
     * @param string $media_category of feed source the item case from. We
     *      trending counts grouped by media category
     * @param string $source_name of feed source the item case from. We exclude
     *      from counts the name of the feed source
     * @param string $lang locale_tag for this feed item
     * @param int $pubdate timestamp when string was published (used in
     *      weighting)
     * @param string $source_stop_regex a regex to remove terms which occur
     *      frequently for this particular source
     */
    public function updateTrendingTermCounts(&$term_counts, $source_phrase,
        $word_or_phrase_list, $media_category, $source_name, $lang, $pubdate,
        $source_stop_regex = "")
    {
        $time = time();
        if ($time - $pubdate > C\ONE_DAY) {
            return;
        }
        $time_weight = max(ceil(($time - $pubdate)/C\ONE_HOUR), 1);
        $tokenizer = PhraseParser::getTokenizer($lang);
        $stop_words = (empty($tokenizer::$stop_words)) ? [] :
            $tokenizer::$stop_words;
        $stop_words[] = "tztzlzngth";
        $composites = [];
        foreach ($word_or_phrase_list as $term => $positions) {
            if (strpos($term, "-") !== false) {
                $sub_terms = explode("-", $term);
                $composites += $sub_terms;
            }
        }
        foreach($word_or_phrase_list as $term => $positions) {
            if ((!empty($source_stop_regex) &&
                preg_match($source_stop_regex, $term)) ||
                !is_string($term) || $term == "") {
                continue;
            }
            $occurrences = count($positions);
            $weight = substr_count($term, "-");
            if (!empty($term_counts['seen'][$term])) {
                $term = $term_counts['seen'][$term];
                if (empty($term_counts[$lang][$media_category][$term])) {
                    if (empty($term_counts[$lang])) {
                        $term_counts[$lang] = [];
                    }
                    if (empty($term_counts[$lang][$media_category])) {
                        $term_counts[$lang][$media_category] = [];
                    }
                    $term_counts[$lang][$media_category][$term] = 0;
                }
                $term_counts[$lang][$media_category][$term] +=
                    ($weight * $occurrences) / $time_weight;
            } else if (strpos($term, " ") === false &&
                !in_array($term, $composites) &&
                !in_array($term[0], ["#", "_", "-"] ) &&
                !in_array($term, $stop_words) &&
                substr($term, 0, 4) != "http") {
                $original_term = $term;
                if (preg_match("/\b$term".'[\w|\-]*\b/ui', $source_phrase,
                    $match)) {
                    $term = $match[0];
                    if (!empty($source_stop_regex) &&
                        preg_match($source_stop_regex, $term)) {
                        continue;
                    }
                } else {
                    $term = str_replace("-", " ", $term);
                    if( mb_stristr($source_name, $term) !== false) {
                        continue;
                    }
                    if (($pre_term = substr($term, -6)) == "\_pos\_s") {
                        $term = $pre_term . "\s*(\'|\&apos\;)\s+";
                    }
                    if (preg_match("/\b$term".'[\w|\-]*\b/ui', $source_phrase,
                        $match)) {
                        $term = $match[0];
                    } else {
                        $trunc_term = mb_substr($term, 0, -1);
                        $trunc_original = mb_substr($original_term, 0, -1);
                        if (preg_match("/\b$trunc_term".'[\w|\-]*\b/ui',
                            $source_phrase, $match)) {
                            $term = $match[0];
                        } else if (preg_match(
                            "/\b$trunc_original".'[\w|\-]*\b/ui',
                            $source_phrase, $match)) {
                            $term = $match[0];
                        } else {
                            continue;
                        }
                    }
                    if (!empty($source_stop_regex) &&
                        preg_match($source_stop_regex, $term)) {
                        continue;
                    }
                }
                $lower_term = mb_strtolower($term);
                $lower_parts = preg_split("/\s+|\-/ui", $lower_term);
                $uncommon = false;
                foreach ($lower_parts as $lower_part) {
                    if (!empty($lower_part) &&
                        !in_array($lower_part, $stop_words)) {
                        $uncommon = true;
                        break;
                    }
                }
                if ($uncommon) {
                    $term = preg_replace("/\-/", " ", $lower_term);
                    $term = mb_convert_case($term, MB_CASE_TITLE, 'UTF-8');
                    if (empty($term_counts[$lang][$media_category][$term])) {
                        if (empty($term_counts[$lang])) {
                            $term_counts[$lang] = [];
                        }
                        if (empty($term_counts[$lang][$media_category])) {
                            $term_counts[$lang][$media_category] = [];
                        }
                        $term_counts[$lang][$media_category][$term] = 0;
                    }
                    $term_counts['seen'][$original_term] = $term;
                    if (!empty($term_counts['seen'][$lower_term])) {
                        $term = $term_counts['seen'][$lower_term];
                        if (empty($term_counts[$lang][$media_category][$term])){
                            $term_counts[$lang][$media_category][$term] = 0;
                        }
                    }
                    $term_counts['seen'][$lower_term] = $term;
                    $term_counts[$lang][$media_category][$term] +=
                        ($weight * $occurrences) / $time_weight;
                }
            }
        }
    }
    /**
     * Updates TRENDING_TERM, hourly, daily, and weekly top term occurrences.
     * Removes entries older than a week
     *
     * @param array $term_counts for the most recent update of the
     *      feed index, it should be an array [$lang => [$term => $occurrences]]
     *      for the top NUM_TRENDING terms per language
     */
    public function addTermCountsTrendingTable($term_counts)
    {
        $db = $this->db;
        $time = time();
        $update_intervals = [
            C\ONE_HOUR => C\ONE_DAY,
            C\ONE_DAY => C\ONE_WEEK,
            C\ONE_WEEK => C\ONE_MONTH,
            C\ONE_MONTH => C\ONE_YEAR
        ];
        $reset_interval_sql = "DELETE FROM TRENDING_TERM WHERE " .
            "UPDATE_PERIOD = ? AND LANGUAGE = ? AND ".
            "CATEGORY = ? AND TIMESTAMP >= ? ";
        $interval_sql = "SELECT TERM, SUM(OCCURRENCES) AS OCCURRENCES ".
            "FROM TRENDING_TERM WHERE UPDATE_PERIOD = ? AND " .
            "TIMESTAMP >= ? AND LANGUAGE = ? AND CATEGORY = ? GROUP BY TERM ".
            "ORDER BY OCCURRENCES DESC ".
            $db->limitOffset(C\NUM_TRENDING);
        $insert_sql = "INSERT INTO TRENDING_TERM (TERM, OCCURRENCES, " .
            "UPDATE_PERIOD, TIMESTAMP, LANGUAGE, CATEGORY) VALUES ".
            "(?, ?, ?, ?, ?, ?)";
        $cull_sql = "DELETE FROM TRENDING_TERM WHERE UPDATE_PERIOD = ? " .
            " AND TIMESTAMP < ?";
        $have_culled = [];
        foreach ($term_counts as $lang => $category_term_occurrences) {
            crawlLog("Updating Language $lang trending terms");
            foreach ($category_term_occurrences as $category =>
                $term_occurrences) {
                /* Sort the word occurrence list by occurrence
                   and insert into database.
                 */
                $num_inserted = 0;
                arsort($term_occurrences);
                foreach ($term_occurrences as $term => $occurrences) {
                    $db->execute($insert_sql, [$term, $occurrences, C\ONE_HOUR,
                        $time, $lang, $category]);
                    $num_inserted++;
                    if ($num_inserted >= C\NUM_TRENDING) {
                        break;
                    }
                }
                foreach ($update_intervals as $sub_interval => $interval) {
                    $interval_start = $time - $interval;
                    $recent_timestamp = floor($interval_start/$interval) *
                        $interval;
                    $db->execute($reset_interval_sql,
                        [$interval, $lang, $category, $recent_timestamp]);
                    $result = $db->execute($interval_sql,
                        [$sub_interval, $recent_timestamp, $lang, $category]);
                    while ($interval_info = $db->fetchArray($result)) {
                        $db->execute($insert_sql, [$interval_info['TERM'],
                            $interval_info['OCCURRENCES'], $interval,
                            $recent_timestamp, $lang, $category]);
                    }
                    if (empty($have_culled[$sub_interval])) {
                        $db->execute($cull_sql, [$sub_interval,
                            $recent_timestamp]);
                        $have_culled[$sub_interval] = true;
                    }
                }
            }
        }
    }
}
ViewGit