Last commit for src/library/PartitionDocumentBundle.php: afd6930f42e31d81a53d42061b5fd758f56c62de

First pass at modifying Yioop to again use Logarithmic Merge Trees for its dictionary structures

Chris Pollett [2024-01-15 02:Jan:th]
First pass at modifying Yioop to again use Logarithmic Merge Trees for its dictionary structures
Folder structure of IndexDocumentBundles also modified and now supports overflow folder (which
could be on a different hard drive). ArcTool has been updated to support migration to new
indexes
<?php
/**
 * SeekQuarry/Yioop --
 * Open Source Pure PHP Search Engine, Crawler, and Indexer
 *
 * Copyright (C) 2009 - 2023  Chris Pollett chris@pollett.org
 *
 * LICENSE:
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 *
 * END LICENSE
 *
 * @author Chris Pollett chris@pollett.org
 * @license https://www.gnu.org/licenses/ GPL3
 * @link https://www.seekquarry.com/
 * @copyright 2009 - 2023
 * @filesource
 */
namespace seekquarry\yioop\library;

use seekquarry\yioop\configs as C;

/**
 * Loads crawlLog functions if needed
 */
require_once __DIR__ . "/Utility.php";
/**
 * A partition document bundle is a collection of partition each of which in
 * turn can hold a concatenated sequence of compressed documents and which
 * are managed together. It is a successor format to the earlier
 * WebArchiveBundle of Yioop. The partition document bundle stores individual
 * records using a record format defined via the PackedTableTools class.
 * This basic format has been extended by two new types BLOB and SERIAL (a
 * PHP serialized object represesnted as a blob).
 * Data for columns of these types are stored in separate files from the rest
 * of records. Offset into these archive files for blobs and serial's are
 * stored in a record as columns representing a difference list of int's
 * together with a LAST_BLOB_LEN column. Using this info, blob's and serial's
 * associated with a record can be retrieved. How many documents are
 * together with a collected into a partition can be tuned
 * for read, write, and in-memory efficiency.
 *
 * @author Chris Pollett
 */
class PartitionDocumentBundle
{
    /**
     * Compression strategy used to compress blob and serial columns
     */
    const DEFAULT_COMPRESSOR = C\NS_COMPRESSORS . "NonCompressor";
    /**
     * Default parameters to use when constructing a PartitionDocumentBundle
     */
    const DEFAULT_PARAMETERS = ["RECORD_COMPRESSOR" => self::DEFAULT_COMPRESSOR,
        "BLOB_COMPRESSOR" => self::DEFAULT_COMPRESSOR,
        "COUNT" => 0, "PARTITION_SIZE_THRESHOLD" =>
        self::PARTITION_SIZE_THRESHOLD,
        "FORMAT" => ["PRIMARY KEY" => "KEY", "VALUE" => "BLOB"],
        "MAX_ITEMS_PER_FILE" => self::MAX_ITEMS_PER_FILE,
        "SAVE_PARTITION" => 0, "ACTIVE_COUNT" => 0
    ];
    /**
     * Extension for PartitionDocumentBundle partition files used to contain
     * records
     */
    const INDEX_EXTENSION = ".ix";
    /**
     * Default maximum number of records to store in a partition
     */
    const MAX_ITEMS_PER_FILE = 16384;
    /**
     * File name of file used to store the parameters of this
     * PartitionDocumentBundle
     */
    const PARAMETERS_FILE = "pdb_parameters.txt";
    /**
     * Prefix to file names of PartitionDocumentBundle partition files
     */
    const PARTITION_PREFIX = "partition_";
    /**
     * Maximum number of bytes a partition can have before the next partition
     * is started. Notice this implies a maximum file size to store
     * in BLOB columns
     */
    const PARTITION_SIZE_THRESHOLD = 2147483648;
    /**
     * Used to store the file handle to, the partition number, and last add time
     * for the last time an item's blob/serial columns were added to for
     * the PartitionDocumentBundle
     * @var array
     */
    public $add_archive_cache = [null, "", -1];
    /**
     * Used to store the file handle to, the partition number, and last access
     * time for the last time an item's blob/serial columns were accessed for
     * the PartitionDocumentBundle
     * @var array
     */
    public $get_archive_cache = [null, "", -1];
    /**
     * Array of column names for the columns in a PartitionDocumentBundle which
     * are of type BLOB or SERIAL
     * @var array
     */
    public $blob_columns;
    /**
     * The seekquarry\yioop\library\compressors\Compressor object used to
     * compress record files.
     * @var object
     */
    public $record_compressor;
    /**
     * The seekquarry\yioop\library\compressors\Compressor object used to
     * compress blob columns.
     * @var object
     */
    public $blob_compressor;
    /**
     * Folder path where the PartitionDocumentBundle is stored
     * @var string
     */
    public $folder;
    /**
     * In memory cache of partitions from the PartitionDocumentBundle
     * @var array
     */
    public $index_cache;
    /**
     * Maximum number of items the partition cache is allowed to hold
     */
    public $index_cache_size;
    /**
     * Used to keep track of when this instance was created, as part of managing
     * file handles expiration (could be set/updated externally to reflect
     * some other instance using the bundle)
     * @var int
     */
    public $instance_time;
    /**
     * Name of primary key column for records
     * @var string
     */
    public $key_field;
    /**
     * Stores the constructor parameters used to create this
     * PartitionDocumentBundle
     * @var array
     */
    public $parameters;
    /**
     * Array of column names for the columns in a PartitionDocumentBundle which
     * are of type SERIAL
     * @var array
     */
    public $serial_columns;
    /**
     * The PackedTableTools object used to pack and unpack records in
     * partitions
     * @var object
     */
    public $table_tools;
    /**
     * Used to create a new instance of a PartitionDocumentBundle
     *
     * @param string $folder the path to the folder to store this
     *  PartitionDocumentBundle
     * @param array $format the column names, keys and types for this
     *  PartitionDocumentBundle object
     * @param int $max_items_per_file maximum number of items to store
     *  in a partition before making the next partition
     * @param int $partition_size_threshold maximum length of a partition
     *  file in bytes before a new partition file should be started
     * @param object $record_compressor_type
     *  seekquarry\yioop\library\compressors\Compressor object used to
     *  compress record files excluding blob columns.
     * @param object $blob_compressor_type
     *  seekquarry\yioop\library\compressors\Compressor object used to
     *  compress blob columns.
     */
    public function __construct($folder, $format =
        self::DEFAULT_PARAMETERS["FORMAT"],
        $max_items_per_file = self::MAX_ITEMS_PER_FILE,
        $partition_size_threshold = self::PARTITION_SIZE_THRESHOLD,
        $record_compressor_type = self::DEFAULT_COMPRESSOR,
        $blob_compressor_type = self::DEFAULT_COMPRESSOR)
    {
        $initial_parameters = self::DEFAULT_PARAMETERS;
        $initial_parameters["PARTITION_SIZE_THRESHOLD"] =
            $partition_size_threshold;
        $initial_parameters["MAX_ITEMS_PER_FILE"] = $max_items_per_file;
        $initial_parameters["RECORD_COMPRESSOR"] = $record_compressor_type;
        $initial_parameters["BLOB_COMPRESSOR"] = $blob_compressor_type;
        $this->record_compressor = new $record_compressor_type();
        $this->blob_compressor = new $blob_compressor_type();
        $initial_parameters["FORMAT"] = $format;
        $this->instance_time = hrtime(true);
        $this->index_cache_size = min(50, floor(metricToInt(
            ini_get('memory_limit'))/128000000));
        $this->folder = $folder;
        $folder_paths = [$folder];
        $changed_parameters = false;
        foreach ($folder_paths as $folder_path) {
            if (!file_exists($folder_path)) {
                $changed_parameters = true;
                if(!mkdir($folder_path)) {
                    return null;
                }
            }
        }
        $this->parameters = self::getParameterInfo($folder);
        foreach (self::DEFAULT_PARAMETERS as $field => $value) {
            if (!isset($this->parameters[$field])) {
                $this->parameters[$field] = $initial_parameters[$field];
                $changed_parameters = true;
            }
        }
        $format = $this->parameters["FORMAT"];
        $packed_table_format = [];
        $this->blob_columns = [];
        $this->serial_columns = [];
        $this->key_field = null;
        foreach ($format as $field_name => $type) {
            $upper_field_name = strtoupper($field_name);
            if ($upper_field_name == "PRIMARY KEY") {
                $packed_table_format["PRIMARY KEY"] = $type;
                $this->key_field = (is_array($type)) ? $type[0] : $type;
                continue;
            }
            $upper_type = strtoupper($type);
            if (in_array($upper_type, ["BLOB", "SERIAL"])) {
                $this->blob_columns[] = $field_name;
                if ($upper_type == "SERIAL") {
                    $this->serial_columns[] = $field_name;
                }
                $packed_table_format[$field_name] = "INT";
            } else if (isset(PackedTableTools::TYPE_SYNONYMS[$upper_type])) {
                $packed_table_format[$field_name] =
                    PackedTableTools::TYPE_SYNONYMS[$upper_type];
            } else {
                return null;
            }
        }
        if (empty($this->key_field)) {
            return null;
        }
        if (!empty($this->blob_columns)) {
            $packed_table_format["LAST_BLOB_LEN"] = "INT";
        }
        $this->table_tools = new PackedTableTools($packed_table_format,
            $record_compressor_type);
        if ($changed_parameters) {
            $this->saveParameters();
        }
    }
    /**
     * Returns $fields columns from the record associated with $key in
     * the $partition partition of this PartitionDocumentBundle if exists.
     * If $fields is empty all columns returned.
     *
     * @param string $key to look up in partition
     * @param int $partition to look for record in
     * @param array $fields names of fields in this PartitionDocumentBundle
     *      to return
     * @return array|false unpacked record on success, otherwise false
     */
    public function get($key, $partition, $fields = [])
    {
        $table_tools = $this->table_tools;
        $partition_filename = $this->getPartition($partition);
        $index = $this->loadPartitionIndex($partition, false,
            PackedTableTools::AS_STRING_MODE);
        if (is_string($index)) {
            $index_data_raw = $table_tools->findRowFromKeyTableString($index,
                $key);
        } else {
            $index_data_raw = $table_tools->find($index, $key);
        }
        if (empty($index_data_raw)) {
            return false;
        }
        if (!empty($fields)) {
            if (isset($fields[0]) && $fields == array_values($fields)) {
                $fields = array_combine($fields, $fields);
            }
        }
        $index_data = $table_tools->unpack($index_data_raw)[0];
        if (!empty($this->blob_columns)) {
            $num_blob_columns = count($this->blob_columns);
            $offset = intval($index_data[$this->blob_columns[0]]);
            for ($i = 0; $i < $num_blob_columns; $i++) {
                $column_name = $this->blob_columns[$i];
                $len = ($i + 1 < $num_blob_columns) ?
                    intval($index_data[$this->blob_columns[$i + 1]]) :
                    $index_data["LAST_BLOB_LEN"];
                if (empty($fields) || isset($fields[$column_name])) {
                    set_error_handler(null);
                    $index_data[$column_name] = ($len == 0) ? "" :
                        @$this->getArchive($partition_filename, $offset, $len);
                    set_error_handler(C\NS_CONFIGS . "yioop_error_handler");
                }
                $offset += $len;
            }
            unset($index_data["LAST_BLOB_LEN"]);
        }
        if (empty($fields)) {
            foreach ($this->serial_columns as $field_name) {
                $index_data[$field_name] = unserialize(
                    $index_data[$field_name]);
            }
            $index_data[$this->key_field] = $key;
            $out_data = $index_data;
        } else {
            $out_data = [];
            if (isset($fields[$this->key_field])) {
                $out_data[$fields[$this->key_field]] = $key;
                unset($fields[$this->key_field]);
            }
            set_error_handler(null);
            foreach ($fields as $in_name => $out_name) {
                if (isset($index_data[$in_name])) {
                    if (in_array($in_name, $this->serial_columns)) {
                        $out_data[$out_name] = @unserialize(
                            $index_data[$in_name]);
                    } else {
                        $out_data[$out_name] = $index_data[$in_name];
                    }
                }
            }
            set_error_handler(C\NS_CONFIGS . "yioop_error_handler");
        }
        return $out_data;
    }
    /**
     * Retrieve a BLOB string in the file $archive_filename at byte position
     * $offset of length $len. It uncompresses this string using
     * $compressor->uncompress and return the result.
     *
     * @param string $archive_filename the filename of a partition archive
     *  file to get a blob object from
     * @param int $offset a byte position in that file
     * @param int $len number of bytes from $offset to read.
     * @return string the result of uncompressing the string at $offset of
     *  length $len
     */
    public function getArchive($archive_filename, $offset, $len)
    {
        list($fh, $previous_archive_filename, $previous_instance_time) =
            $this->get_archive_cache;
        $blob_compressor = $this->blob_compressor;
        if (!$fh || $previous_archive_filename != $archive_filename ||
            $previous_instance_time != $this->instance_time) {
            if ($fh) {
                fclose($fh);
            }
            $fh = fopen($archive_filename , "r");
            $previous_archive_filename = $archive_filename;
            $previous_instance_time = $this->instance_time;
        }
        $value = false;
        if (fseek($fh, $offset) == 0) {
            $compressed_file = fread($fh, $len);
            $value = $blob_compressor->uncompress($compressed_file);
        }
        $this->get_archive_cache = [$fh, $previous_archive_filename,
            $previous_instance_time];
        return $value;
    }
    /**
     * Returns the path to the archive file (used to store BLOB and SERIAL
     * columns) for the $i partition in this PartitionDocumentBundle
     *
     * @param int $i partition to get the archive file name for
     * @return string path of $i partition archive file
     */
    public function getPartition($i)
    {
        return $this->folder . "/" . self::PARTITION_PREFIX .
            $i . $this->blob_compressor->fileExtension();
    }
    /**
     * Returns the path to the index file (used to store all columns
     * a partition record except blob and serial columns) for the $i partition
     * in this PartitionDocumentBundle
     *
     * @param int $i partition to get the index file name for
     * @return string path of $i partition index file
     */
    public function getPartitionIndex($i)
    {
        return $this->folder . "/" . self::PARTITION_PREFIX .
            $i . self::INDEX_EXTENSION;
    }
    /**
     * Returns the unserialized index file for the $partition partition of
     * this PartitionDocumentBundle. If $force_load is set to true then reloads
     * from disk rather than use a cached value if present.
     *
     * @param int $partition which partition index to read
     * @param bool $force_load whether to reload the index from disk or to
     *  use a cached value if present
     * @param int $mode PackedTableTools mode to use when reading in partition
     * @return mixed either a string if $mode as AS_STRING_MODE, or
            array $key => packed records pairs where records are
     *  packed according to this PartitionDocumentBundle's signature
     */
    public function loadPartitionIndex($partition, $force_load = false,
        $mode = PackedTableTools::REPLACE_MODE)
    {
        $index_file_name = $this->getPartitionIndex($partition);
        $time = microtime(true);
        if (!empty($this->index_cache[$partition]) && !$force_load) {
            $index = $this->index_cache[$partition][0];
            $this->index_cache[$partition][1] = $time;
        } else {
            $this->index_cache ??= [];
            $cache_size = count($this->index_cache);
            if ((php_sapi_name() == 'cli') &&
                !C\nsdefined("IS_OWN_WEB_SERVER")) {
                crawlLog("PartitionDocumentBundle cache size: $cache_size");
            }
            if ($cache_size > $this->index_cache_size) {
                $oldest_partition = -1;
                $oldest_time = 2 * $time;
                foreach ($this->index_cache as $index_partition => $cache_info){
                    if (empty($cache_info[1])) {
                        unset($this->index_cache[$index_partition]);
                    } else if (!empty($cache_info[1]) &&
                        $cache_info[1] <= $oldest_time) {
                        $oldest_time = $cache_info[1];
                        $oldest_partition = $index_partition;
                    }
                }
                unset($this->index_cache[$oldest_partition]);
            }
            if ($force_load) {
                unset($this->index_cache[$partition]);
            }
            $this->index_cache[$partition] = [
                $this->table_tools->load($index_file_name,
                $mode), $time];
            $index = $this->index_cache[$partition][0];
        }
        if (empty($index)) {
            return false;
        }
        return $index;
    }
    /**
     * Used to add new records to the PartitionDocumentBundle
     *
     * @param array $row_or_rows either array of record with fields given
     *      by this PartitionDocumentBundle's signature or an array of rows.
     * @return bool success or not
     */
    public function put($row_or_rows)
    {
        $rows = (empty($row_or_rows[$this->key_field])) ?
             $row_or_rows : [$row_or_rows];
        $table_tools = $this->table_tools;
        $num_rows = count($rows);
        $i = 0;
        $save_partition = $this->parameters["SAVE_PARTITION"];
        // remove $save_partition from read cache
        unset($this->index_cache[$save_partition]);
        $save_partition_name = $this->getPartition($save_partition);
        clearstatcache();
        $save_partition_len = file_exists($save_partition_name) ?
            filesize($save_partition_name) : 0;
        $this->save_index = $this->loadPartitionIndex($save_partition, false,
            PackedTableTools::AS_STRING_MODE);
        unset($this->index_cache[$save_partition]);
        foreach ($rows as $row) {
            crawlTimeoutLog("..still adding pages partition document bundle. " .
                " Have added %s of %s.", $i, $num_rows);
            $key = $row[$this->key_field] ?? false;
            if ($key === false) {
                crawlLog("PartitionDocumentBundle Put Failed A");
                return false;
            }
            unset($row[$this->key_field]);
            $value = $row;
            if (($this->parameters['ACTIVE_COUNT'] >
                $this->parameters["MAX_ITEMS_PER_FILE"]) || (
                $save_partition_len >
                $this->parameters["PARTITION_SIZE_THRESHOLD"])) {
                $active_count = $this->parameters['ACTIVE_COUNT'];
                crawlLog("PartitionDocumentBundle advancing partition because ".
                    "ACTIVE_COUNT = $active_count and SAVE_PARTITION_LEN = ".
                    $save_partition_len);
                $this->advanceSavePartition();
                $save_partition = $this->parameters["SAVE_PARTITION"];
                $save_partition_len = 0;
                $this->save_index = "";
                $index_dirty = false;
            }
            $blob_columns = $this->blob_columns ?? [];
            $serial_columns = $this->serial_columns ?? [];
            $format = $this->parameters["FORMAT"];
            unset($format['PRIMARY KEY']);
            $format_keys = array_keys($format);
            $out_value = [];
            foreach ($format_keys as $format_key) {
                $out_value[$format_key] = $value[$format_key] ?? null;
            }
            if (!empty($blob_columns)) {
                $old_offset = 0;
                foreach ($blob_columns as $blob_column) {
                    $blob = $out_value[$blob_column] ?? "";
                    if (in_array($blob_column, $serial_columns)) {
                        $blob = serialize($blob);
                    }
                    if (($add_info = $this->addArchive($blob)) === false) {
                        crawlLog("PartitionDocumentBundle Put Failed C");
                        return false;
                    }
                    list($offset, $len, $partition) = $add_info;
                    $save_partition_len += $len;
                    $out_value[$blob_column] = $offset - $old_offset;
                    $old_offset = $offset;
                }
                $out_value["LAST_BLOB_LEN"] = $len;
            }
            $out_value = $table_tools->pack($out_value);
            if ($table_tools->add($this->save_index, $key, $out_value,
                $table_tools::ADD_MEM_TABLE_STRING)) {
                $this->parameters['ACTIVE_COUNT']++;
            } else {
                crawlLog("PartitionDocumentBundle Put Failed D");
                return false;
            }
            $i++;
        }
        $save_index_name =
            $this->getPartitionIndex($save_partition);
        if (isset($this->save_index)) {
            $table_tools->save($save_index_name, $this->save_index);
        }
        $this->saveParameters();
        return true;
    }
    /**
     * Saves the current save partition, adds one to the save partition number,
     * and starts a new save partition.
     *
     * @param int $new_save_partition partition and add one to. If use default,
     *  then this method will use the parameters "SAVE_PARTITION"
     *  value.
     */
    public function advanceSavePartition($new_save_partition = 0)
    {
        $save_partition = $this->parameters["SAVE_PARTITION"];
        $new_save_partition = (($new_save_partition) > 0) ?
            $new_save_partition : $save_partition + 1;
        if ($new_save_partition <= $save_partition) {
            return false;
        }
        $save_index_name = $this->getPartitionIndex($save_partition);
        if (isset($this->save_index)) {
            $this->table_tools->save($save_index_name, $this->save_index);
        }
        $new_save_index_name = $this->getPartitionIndex($new_save_partition);
        if (file_exists($new_save_index_name)) {
            unlink($new_save_index_name);
        }
        $this->parameters["SAVE_PARTITION"] = $new_save_partition;
        $this->parameters['COUNT'] += $this->parameters['ACTIVE_COUNT'];
        $this->parameters['ACTIVE_COUNT'] = 0;
        return true;
    }
    /**
     * Creates a new counter $field to be maintained
     *
     * @param string $field field of info struct to add a counter for
     */
    public function initCountIfNotExists($field = "COUNT")
    {
        $this->parameters[$field] ??= 0;
        $this->saveParameters();
    }
    /**
     * Add $num to maintained counter $field
     *
     * @param int $num number of items to add to current count
     * @param string $field field of info struct to add to the count of
     */
    public function addCount($num, $field = "COUNT")
    {
        $this->parameters[$field] ??= 0;
        $this->parameters[$field] += $num;
        $this->saveParameters();
    }
    /**
     * Save the operating parameters of this PartitionDocumentBundle
     */
    public function saveParameters()
    {
        $parameter_path = $this->folder . "/" . self::PARAMETERS_FILE;
        file_put_contents($parameter_path, serialize($this->parameters),
            LOCK_EX);
    }
    /**
     * Returns the parameters (such as its signature, max number of
     * documents per partition and counts) used to configure the
     * PartitionDocumentBundle stored at $folder
     *
     * @param string $folder file path to a stored PartitionDocumentBundle
     * @return array configuration info about the PartitionDocumentBundle
     */
    public static function getParameterInfo($folder)
    {
        $parameter_path = $folder . "/" . self::PARAMETERS_FILE;
        if(file_exists($parameter_path)) {
            $parameters = unserialize(file_get_contents($parameter_path));
            if (!is_array($parameters)) {
                 $parameters = [];
            }
            if (!empty($parameters["COMPRESSOR"])) {
                //original format didn't distinguish between compressor use
                $parameters["RECORD_COMPRESSOR"] ??= $parameters["COMPRESSOR"];
                $parameters["BLOB_COMPRESSOR"] ??= $parameters["COMPRESSOR"];
            }
            if (empty($parameters['SAVE_PARTITION']) ||
                $parameters['SAVE_PARTITION'] == 0) {
                $parameters['SAVE_PARTITION'] =
                    max(count(glob("$folder/*" . self::INDEX_EXTENSION))-1, 0);
            }
            return $parameters;
        } else {
            return [];
        }
    }
    /**
     * Used to add a blob item to the current save partition file.
     *
     * @param string $value blob item to be added to file
     * @return array [offset into save partition, length stored,
     *      partition number OF current save partition]
     */
    protected function addArchive($value)
    {
        list($fh, $previous_partition_filename,
            $previous_instance_time) =  $this->add_archive_cache;
        $blob_compressor = $this->blob_compressor;
        $save_partition = $this->parameters["SAVE_PARTITION"];
        $partition_filename = $this->getPartition($save_partition);
        if (!is_resource($fh) ||
            $partition_filename != $previous_partition_filename ||
            $previous_instance_time != $this->instance_time) {
            if (!empty($fh) && is_resource($fh)) {
                fclose($fh);
            }
            $fh = fopen($partition_filename , "c+");
            $previous_partition_filename = $partition_filename;
            $previous_instance_time = $this->instance_time;
        }
        fseek($fh, 0, SEEK_END);
        $offset = ftell($fh);
        $compressed_value = $blob_compressor->compress($value);
        $len = strlen($compressed_value);
        fwrite($fh, $compressed_value, $len);
        $this->add_archive_cache = [$fh, $previous_partition_filename,
            $previous_instance_time];
        return [$offset, $len, $save_partition];
    }
}
ViewGit