Last commit for src/library/MessagesBundle.php: 2addb500315b7393a90fe66431d7832b1e7386c7

Adjust copyrights years

Chris Pollett [2024-01-03 21:Jan:rd]
Adjust copyrights years
<?php
/**
 * SeekQuarry/Yioop --
 * Open Source Pure PHP Search Engine, Crawler, and Indexer
 *
 * Copyright (C) 2009 - 2024  Chris Pollett chris@pollett.org
 *
 * LICENSE:
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 *
 * END LICENSE
 *
 * @author Chris Pollett chris@pollett.org
 * @license https://www.gnu.org/licenses/ GPL3
 * @link https://www.seekquarry.com/
 * @copyright 2009 - 2024
 * @filesource
 */
namespace seekquarry\yioop\library;

use seekquarry\yioop\configs as C;

/**  For remoteAddress and crawlHash function and Yioop constants */
require_once __DIR__ . "/../library/Utility.php";
/**
 * Encapsulates the temporary storage of messages sent between QueueServers
 * and Fetchers during the course of a Crawl
 *
 * @author Chris Pollett
 */
class MessagesBundle implements CrawlConstants
{
    /**
    * Reference to a database object. Used since has directory manipulation
    * functions
    * @var object
     */
    public $db;
    /**
     * The folder name of this MessagesBundle
     * @var string
     */
    public $dir_name;
    /**
     * Largest sequence number currently in use for sliding window protocol
     * messages used for fetch batches of urls and downloaded web page
     * for those urls  responses
     * @var int
     */
    public $max_sequence_number;
    /**
     * Contains sequence_number => filename associations about which fetch
     * batches have been successfully  downloaded, received and should be
     * moved for further processing. These will be moved once all earlier
     * sequence numbers have been similarly received.
     * @var array
     */
    public $receive;
    /**
     * Contains sequence_number => [filename, schedule_time, tries] associations
     * for fetch batches of urls to crawl which have been produced. Here
     * filename is the name of the file that has the data for the batch,
     * schedule_time is the last time the batch was scheduled to a Fetcher,
     * and tries is the number of times it has been scheduled.
     * @var array
     */
    public $send;
    /**
     * Used in a crawl to set the maximum number of schedules generated at a
     * time  (if these many schedules are created already, queue server pauses
     * on  schedule creation until one or more of them have been successfully
     * downloaded)
     * In the QueueServer the window size is set to be the total number of
     * currently active fetchers
     * @var int
     */
    public $window_size;
    /**
     * String prefix to be used before filenames of message files in the
     * ETAG_EXPIRES_FOLDER, INDEX_DATA_FOLDER, ROBOTS_FOLDER, and
     * SCHEDULES_FOLDER folders.
     */
    const MESSAGE_PREFIX = "At";
    /**
     * Folder name for the folder within the MessageBundle folder used to
     * contain files with etag and expires header information waiting to be
     * processed by the scheduler
     */
    const ETAG_EXPIRES_FOLDER = "etag_expires";
    /**
    * Folder name for the folder within the MessageBundle folder used to
    * contain files with web page summaries and downloaded page information
    * waiting to be by the indexer (i.e., to make an inverted index)
    * processed
    */
    const INDEX_DATA_FOLDER = "index_data";
    /**
     * Folder name for the folder within the MessageBundle folder used to
     * contain files with robots.txt information waiting to be
     * processed by the scheduler
     */
    const ROBOTS_FOLDER = "robots";
    /**
    * Folder name for the folder within the MessageBundle folder used to
    * contain files with to crawl url information waiting to be
    * processed by the scheduler
    */
    const SCHEDULES_FOLDER = "schedules";
    /**
     * Makes a MessagesBundle with the provided parameters. This method
     * does not initialize send and receive properties related to
     * the fetch batch window.
     *
     * @param string $dir_name folder name used by this MessagesBundle
     * @param int $window_size number of number of outstanding url fetch batch
     *  files to allow before stop accepting new ones
     *
     */
    public function __construct($dir_name, $window_size = 1)
    {
        $this->dir_name = $dir_name;
        if (!file_exists($dir_name)) {
            mkdir($dir_name);
            set_error_handler(null);
            @chmod($dir_name, 0777);
            set_error_handler(C\NS_CONFIGS . "yioop_error_handler");
        }
        $this->window_size = $window_size;
        $db_class = C\NS_DATASOURCES . ucfirst(C\DBMS) . "Manager";
        $this->db = new $db_class();
    }
    /**
     * Use to intialize the send and receiver property arrays for the
     * send and receive fetch batch window.
     *
     * @param bool $reset_schedule_times whether to reset the schedule
     * times of each batch current in the send folder, but not acknowledged
     * to 0, forcing them to eb rescheduled.
     */
    public function initWindows($reset_schedule_times = false)
    {
        $receive_dir = $this->dir_name . "/receive";
        $send_dir = $this->dir_name . "/send";
        $max_sequence_number = 0;
        foreach (['receive' => $receive_dir, 'send' => $send_dir] as
            $window_name => $dir) {
            if (!file_exists($dir)) {
                mkdir($dir);
                set_error_handler(null);
                @chmod($dir, 0777);
                set_error_handler(C\NS_CONFIGS . "yioop_error_handler");
            }
            $files = glob("$dir/*.txt");
            $window_data = [];
            $sequence_number = 0;
            $len_prefix = strlen("$dir/");
            $len_suffix = strlen(".txt");
            foreach ($files as $file) {
                $base_name = substr($file, $len_prefix, -$len_suffix);
                $base_parts = explode("-", $base_name);
                $expected_num_parts = ($window_name == 'receive') ? 1 : 3;
                if (count($base_parts) != $expected_num_parts) {
                    continue;
                }
                $sequence_number = intval($base_parts[0]);
                if ($sequence_number > $max_sequence_number) {
                    $max_sequence_number = $sequence_number;
                }
                if ($window_name == 'send') {
                    list(, $schedule_time, $tries) = $base_parts;
                    $item = ['file' => $file,
                        'schedule_time' => intval($schedule_time),
                        'tries' => intval($tries)];
                    if ($reset_schedule_times) {
                        $schedule_time = 0;
                        $item['file'] = $this->makeSendWindowFilename(
                            $sequence_number, $schedule_time, $tries);
                        rename($file, $item['file']);
                    }
                } else {
                    $item = $file;
                }
                $window_data[$sequence_number] = $item;
            }
            $this->$window_name = $window_data;
        }
        $this->max_sequence_number = $max_sequence_number;
    }
    /**
     * Returns where the send folder (containing fetch batches which have been
     * received  but not yet downloaded) can hold another fetch batch given
     * the window size.
     * @return bool true if it can hold another fetch batch, fall otherwise
     */
    public function isSendFull()
    {
        return (count($this->receive) + count($this->send) >=
            $this->window_size);
    }
    /**
     * Returns the data from the least sequence numbered file in the send
     * window that either has not previously been returned (tries 0) or
     * that has been previously returned but more than timeout seconds earlier
     * In both cases the try count of the file is bumped. If a file is
     * found in the search process with a try count above
     * MAX_RESCHEDULE_ATTEMPTS, it is moved instead to the receive window
     * but with no data received.
     *
     * @return string file contents of found file
     */
    public function extractSendWindow()
    {
        ksort($this->send);
        $data = false;
        $time = time();
        foreach($this->send as $sequence_number => $item) {
            $tries = $item['tries'];
            $timeout = ($item['schedule_time'] + C\SCHEDULE_TIMEOUT) < $time;
            if($item['tries'] == 0 || ($timeout &&
                $tries < C\MAX_RESCHEDULE_ATTEMPTS)) {
                $data = file_get_contents($item['file']);
                $tries++;
                $this->send[$sequence_number]['tries'] = $tries;
                $schedule_time = ($timeout) ? $time : $item['schedule_time'];
                $filename = $this->makeSendWindowFilename(
                    $sequence_number, $schedule_time, $tries);
                clearstatcache();
                if (file_exists($item['file'])) {
                    rename($item['file'], $filename);
                }
                $this->send[$sequence_number]['file'] = $filename;
                break;
            } else if ($timeout) {
                $this->updateReceiveWindow($sequence_number, "");
            }
        }
        return $data;
    }
    /**
     * Add a fetch batch to the send folder of the messages bundle if there
     * space in the folder given the current window size
     * @param string $tmp_send_filename filename of the file with the
     *  fetch batch. If this operation succeeds, the file at this location
     *  will be moved to the send folder
     * @return bool whether operation was successful
     */
    public function insertSendWindow($tmp_send_filename)
    {
        if ($this->isSendFull()) {
            return false;
        }
        $schedule_time = time();
        $tries = 0;
        $this->max_sequence_number++;
        $sequence_number = $this->max_sequence_number;
        $filename = $this->makeSendWindowFilename($sequence_number,
            $schedule_time, $tries);
        rename($tmp_send_filename, $filename);
        set_error_handler(null);
        @chmod($filename, 0777);
        set_error_handler(C\NS_CONFIGS . "yioop_error_handler");
        $this->send[$sequence_number] = ['file' => $filename,
            'schedule_time' => $schedule_time, 'tries' => $tries];
        return true;
    }
    /**
     * Used to output the filename of a file storing a fetch batch of urls in
     * the  send folder
     * @param int $sequence_number sequence number used for sliding window
     *  algorithm
     * @param int $schedule_time time at which the given file was last scheduled
     *  for download
     * @param int $tries number of times a fetcher has been scheduled to try
     *  to download the given list of urls
     * @return string the filename to use for the fetch batch with the given
     *  parameters
     */
    public function makeSendWindowFilename($sequence_number, $schedule_time,
        $tries)
    {
        return $this->dir_name .
            "/send/{$sequence_number}-$schedule_time-$tries.txt";
    }
    /**
     * Used to output the filename of a file returned from a fetcher containing
     * downloaded pages for a set of urls
     *
     * @param int $sequence_number sequence number used for sliding window
     *  algorithm
     * @return string the filename to use for the fetch batch with the given
     *  parameters
     */
    public function makeReceiveWindowFilename($sequence_number)
    {
        return $this->dir_name . "/receive/{$sequence_number}.txt";
    }
    /**
     * Adds a new messages file to the received subfolder of the
     * MessagesBundle with the given sequence number and containing the
     * passed data
     *
     * @param int $sequence_number sequence number to use in making the
     *  filename of the new file to add to the receive folder
     * @param string $data message contents of the file to add
     */
    public function updateReceiveWindow($sequence_number, $data)
    {
        if (empty($this->send[$sequence_number])) {
            return;
        }
        $target_name = $this->makeReceiveWindowFilename($sequence_number);
        file_put_contents($target_name, $data);
        set_error_handler(null);
        @chmod($target_name, 0777);
        set_error_handler(C\NS_CONFIGS . "yioop_error_handler");
        $this->receive[$sequence_number] = $target_name;
        unlink($this->send[$sequence_number]['file']);
        unset($this->send[$sequence_number]);
    }
    /**
     * Searches for a file is in the receive subfolder of this MessagesBundle
     * of least sequence number. If there is no file in the send subfolder
     * of the MessagesBundle with smaller sequence number, then the file
     * is read, deleted and its contents returned. Otherwise, this function
     * returns false.
     *
     * @return string|bool Data of found file or file
     */
    public function extractReceiveWindow()
    {
        ksort($this->send);
        $first_send_number = array_key_first($this->send);
        ksort($this->receive);
        $first_receive_number = array_key_first($this->receive);
        if ($first_send_number !== null &&
            $first_send_number < $first_receive_number) {
            return false;
        }
        $filename = $this->receive[$first_receive_number] ?? "";
        clearstatcache();
        if (!empty($filename) && file_exists($filename)) {
            $data = file_get_contents($filename);
            set_error_handler(null);
            @unlink($filename);
            set_error_handler(C\NS_CONFIGS . "yioop_error_handler");
        } else {
            $data = false;
        }
        unset($this->receive[$first_receive_number]);
        return $data;
    }
    /**
     * Adds a file with contents $data and with name containing $address and
     * $time to a subfolder $day in the folder $messages_type subfolder of
     * the MessagesBundle
     *
     * @param string $messages_type the kind of messages being saved
     * @param string &$data_string encoded, compressed, serialized data the
     *     schedule is to contain
     * @param int $time
     */
    public function addMessages($messages_type, &$data_string, $time = 0)
    {
        $dir = $this->dir_name . "/$messages_type";
        $address = strtr(remoteAddress(), ["." => "-", ":" => "_"]);
        $time = ($time == 0) ? time() : $time;
        $day = floor($time/C\ONE_DAY);
        if (!file_exists($dir)) {
            mkdir($dir);
            chmod($dir, 0777);
        }
        $dir .= "/$day";
        if (!file_exists($dir)) {
            mkdir($dir);
            chmod($dir, 0777);
        }
        $data_hash = crawlHash($data_string);
        file_put_contents($dir . "/" . self::MESSAGE_PREFIX . $time .
            "From" . $address . "WithHash$data_hash.txt", $data_string);
    }
    /**
     * Get the next messages file by timestamp stored in the MessagesBundle
     * subfolder of of the given type (etag_expires, robots, schedules,
     * index_data) and either return it as a string or as a deserialized array
     * or object. This subfolder is typically organized into days subsubfolders
     * with the appropriately timestamped files in a given days folder.
     * After reading its data, but before returning it from
     * this method the file itself is deleted.
     *
     * @param string $type kind of data to look for messages about. This
     *  kind should correspond to a folder in the messages bundle. Typically,
     *  these folders will be (etag_expires, robots, schedules, index_data)
     * @param bool $unpack whether to unpack (decode/gzunzip/wddx_deserialize)
     *  the first found file and return its data, or just return its string
     *  contents after webdecode them.
     * @param string $message_prefix prefix string of filenames in
     *  given folder (etag_expires, robots, schedules, index_data) that have
     *  messages data
     * @return (string|array|object)?
     */
    public function nextMessages($type, $unpack = true, $message_prefix =
        self::MESSAGE_PREFIX)
    {
        $dirs = glob("{$this->dir_name}/$type/*", GLOB_ONLYDIR);
        $len_prefix =  strlen($message_prefix);
        foreach ($dirs as $dir) {
            $files = glob($dir . '/*.txt');
            if (isset($old_dir)) {
                crawlLog("Deleting $old_dir\n");
                $this->db->unlinkRecursive($old_dir);
                /* The idea is that only go through outer loop more than once
                   if earlier data directory empty.
                   Note: older directories should only have data dirs or
                   deleting like this might cause problems!
                 */
            }
            /* the code below returns the data
               from the lex-first file beginning with self::MESSAGE_PREFIX
             */
            foreach ($files as $file) {
                $path_parts = pathinfo($file);
                $base_name = $path_parts['basename'];
                $file_root_name = substr($base_name, 0, $len_prefix);
                if (strcmp($file_root_name, $message_prefix) == 0) {
                    if ($unpack) {
                        $message_data = $this->unpackMessages($file);
                    } else {
                        $message_data = webdecode(file_get_contents($file));
                    }
                    unlink($file);
                    return $message_data;
                }
            }
            $old_dir = $dir;
        }
        return null;
    }
    /**
     * Returns whether they are any as yet to be produced to crawl schedules
     * for the crawl queue in the given MessagesBundle's scheduler folder.
     * @param string $messages_bundle_dir folder to check from schedules
     * @return bool whether any found (true, if yes)
     */
    public static function isResumable($messages_bundle_dir)
    {
        $schedules_dir_name = "$messages_bundle_dir/" . self::SCHEDULES_FOLDER;
        if (!is_dir($schedules_dir_name)) {
            return false;
        }
        $schedules_dir = opendir($schedules_dir_name);
        $schedule_prefix = self::MESSAGE_PREFIX;
        $len_prefix = strlen(self::MESSAGE_PREFIX);
        while (($name = readdir($schedules_dir)) !==  false) {
            $sub_path = "$schedules_dir_name/$name";
            if (!is_dir($sub_path) || $name == '.' ||
                $name == '..') {
                continue;
            }
            $sub_dir = opendir($sub_path);
            while (($sub_name = readdir($sub_dir)) !== false) {
                if (substr($sub_name, 0, $len_prefix) == $schedule_prefix) {
                    return true;
                }
            }
            closedir($sub_dir);
        }
        closedir($schedules_dir);
        return false;
    }
    /**
     * Decodes the web-string safety encoding and then gunzips and deserializes
     * the result for a messages file used for communication between a Fetcher
     * and the QueueServer
     * @param string $file name of the
     * @return array|object unserialized data array or object contained in the
     *  messages file to be processed
     */
    public function unpackMessages($file)
    {
        crawlLog("Processing File: $file");
        $decode = file_get_contents($file);
        $decode = webdecode($decode);
        set_error_handler(null);
        $decode = @gzuncompress($decode);
        $sites = @unserialize($decode);
        set_error_handler(C\NS_CONFIGS . "yioop_error_handler");
        return $sites;
    }
}
ViewGit