Last commit for src/library/media_jobs/RecommendationJob.php: 2addb500315b7393a90fe66431d7832b1e7386c7

Adjust copyrights years

Chris Pollett [2024-01-03 21:Jan:rd]
Adjust copyrights years
<?php
/**
 * SeekQuarry/Yioop --
 * Open Source Pure PHP Search Engine, Crawler, and Indexer
 *
 * Copyright (C) 2009 - 2022  Chris Pollett chris@pollett.org
 *
 * LICENSE:
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 *
 * END LICENSE
 *
 * @author Sarika Padmashali padmashalisarika@gmail.com
 * (Reworked so could scale for yioop.com by Chris Pollett)
 * @license https://www.gnu.org/licenses/ GPL3
 * @link https://www.seekquarry.com/
 * @copyright 2009 - 2022
 * @filesource
 */
namespace seekquarry\yioop\library\media_jobs;

use seekquarry\yioop\configs as C;
use seekquarry\yioop\library as L;
use seekquarry\yioop\library\LinearAlgebra as LinearAlgebra;
use seekquarry\yioop\library\LRUCache as LRUCache;
use seekquarry\yioop\library\PhraseParser as PhraseParser;
use seekquarry\yioop\models\CronModel;

/**
 * Recommendation Job recommends the trending threads as well
 * as threads and groups which are relevant based on the
 * users viewing history
 */
class RecommendationJob extends MediaJob
{
    /**
     * Time in current epoch when analytics last updated
     * @var int
     */
    public $update_time;
    /**
     * Used to track what is the active recommendation timestamp
     * @var int
     */
    public $active_time;
    /**
     * Associative array of the number of items a term appears in
     * @var array
     */
    public $item_idf;
    /**
     * Associative array of the number of user views a term appears in
     * @var array
     */
    public $user_idf;
    /**
     * LRUCache for term embeddings
     */
    public $lru_cache;
    /**
     * Maximum number of group items used in making recommendations
     */
    const MAX_GROUP_ITEMS = 50000;
    /**
     * Maximum number of terms used in making recommendations
     */
    const MAX_TERMS = 20000;
    /**
     * File containing paths to description folders of wiki page resources
     * that should be used to create data corpus for computing recommendations
     */
    const RECOMMENDATION_FILE = C\APP_DIR . "/resources/recommendation.txt";
    /**
     * Length of context window for calculating term embeddings
     */
    const CONTEXT_WINDOW_LENGTH = 5;
    /**
     * Update period to consider for fetching the records from
     * ITEM_IMPRESSION_SUMMARY table
     */
    const UPDATE_PERIOD = C\ONE_MONTH;
    /**
     * Stop words to exclude from the descriptions fetched by DescriptionUpdate
     * media job
     */
    const DESCRIPTION_STOP_WORDS = ["author", "authors", "plot", "genre",
        "genres", "star", "stars", "credits", "rating", "ratings", "year",
        "director", "cast", "runtime"];
    /**
     * Hash algorithm to be used for calculating hash in Hash2Vec embedding
     */
    const HASH_ALGORITHM = "md5";
    /**
     * Hash algorithm to be used for calculating sign in Hash2Vec term embedding
     */
    const SIGN_HASH_ALGORITHM = "crc32";
    /**
     * MAX term embeddings fetched from database to initialize LRUCache
     */
    const MAX_TERM_EMBEDDINGS = 500;
    /**
     * Maximum number of resources used in making resource recommendations/
     * Maximum number of group items to hold in memory in one go
     */
    const MAX_BATCH_SIZE = 200;
    /**
     * Sets up the database connection so can access tables related
     * to recommendations. Initialize timing info related to job.
     */
    public function init()
    {
        $this->update_time = 0;
        $this->active_time = 0;
        $this->name_server_does_client_tasks = true;
        $this->name_server_does_client_tasks_only = true;
        $this->cron_model = new CronModel();
        $db_class = C\NS_DATASOURCES . ucfirst(C\DBMS). "Manager";
        $this->db = new $db_class();
        $this->db->connect();
        $this->size = C\EMBEDDING_VECTOR_SIZE;
    }
    /**
     * Only update if its been more than an hour since the last update
     *
     * @return bool whether its been an hour since the last update
     */
    public function checkPrerequisites()
    {
        $time = time();
        $delta = $time - $this->update_time;
        if ($delta > C\ONE_DAY) {
            $this->update_time = $time;
            L\crawlLog("Prerequisites for Recommendation Media Job met");
            return true;
        }
        L\crawlLog("Time since last update not exceeded, skipping".
            " Recommendation MediaJob $delta");
        return false;
    }
    /**
     * For now analytics update is only done on name server as Yioop
     * currently only supports one DBMS at a time.
     */
    public function nondistributedTasks()
    {
        L\crawlLog("Performing the Recommendation Media Job");
        $this->active_time = $this->cron_model->getCronTime(
            "item_group_recommendations");
        L\crawlLog("Current Active Recommendation Timestamp: ".
            $this->active_time);
        L\crawlLog("...Start computing similarity-based group and item ".
            "recommendations...");
        $this->computeThreadGroupRecommendations();
        L\crawlLog("...Finished computing similarity-based group and item ".
            "recommendations.");
        L\crawlLog("...Start computing similarity-based wiki resource " .
            "recommendations...");
        $this->computeWikiResourceRecommendations();
        L\crawlLog("...Finished computing similarity-based wiki" .
            "resource recommendations...");
        L\crawlLog("...Start computing new user recommendations...");
        $this->initializeNewUserRecommendations();
        L\crawlLog("...Finished computing new user recommendations...");
        $this->cron_model->updateCronTime(
            "item_group_recommendations", $this->update_time);
    }
    /**
     * Computes recommendations for users who have yet to receive any
     * recommendation of the given type based on what is the most
     * most popular recommendation
     */
    public function initializeNewUserRecommendations()
    {
        $db = $this->db;
        $popular_recommendations = [
            C\THREAD_RECOMMENDATION  => [], C\GROUP_RECOMMENDATION  => []];
        $sql = "SELECT ITEM_ID, SUM(SCORE) AS TOTAL_SCORE FROM " .
            "GROUP_ITEM_RECOMMENDATION WHERE ITEM_TYPE = ? " .
            "GROUP BY ITEM_ID ORDER BY TOTAL_SCORE DESC ".
            $db->limitOffset(C\MAX_RECOMMENDATIONS);
        foreach ($popular_recommendations as $type => $recommendation) {
            $results = $db->execute($sql, [$type]);
            while ($row = $db->fetchArray($results)) {
                $popular_recommendations[$type][] = $row;
            }
        }
        $new_user_sql = "SELECT USER_ID AS USER_ID ".
            "FROM USERS WHERE USER_ID NOT IN ".
            "(SELECT USER_ID FROM GROUP_ITEM_RECOMMENDATION)";
        $new_user_results = $db->execute($new_user_sql);
        $base_recommend_sql = "INSERT INTO GROUP_ITEM_RECOMMENDATION VALUES ";
        $insert_recommend_sql = $base_recommend_sql;
        $comma = "";
        $insert_count = 0;
        $i = 0;
        while($row = $db->fetchArray($new_user_results)) {
            $user_id = $row['USER_ID'];
            foreach ($popular_recommendations as $type => $recommendations) {
                foreach ($recommendations as $recommendation) {
                    $insert_recommend_sql .=
                        "$comma ($user_id, {$recommendation['ITEM_ID']}, ".
                        "$type, {$recommendation['TOTAL_SCORE']}," .
                        $this->update_time . ")";
                    $comma = ",";
                    $insert_count++;
                }
                if ($insert_count > C\BATCH_SQL_INSERT_NUM) {
                    $db->execute($insert_recommend_sql);
                    $insert_recommend_sql = $base_recommend_sql;
                    $insert_count = 0;
                    $comma = "";
                }
            }
        }
        if ($insert_count > 0) {
            $db->execute($insert_recommend_sql);
        }
        $sql = "SELECT GROUP_ID, PAGE_ID, RESOURCE_PATH, RESOURCE_ID," .
            " SUM(SCORE) AS TOTAL_SCORE FROM" .
            " GROUP_RESOURCE_RECOMMENDATION GROUP BY GROUP_ID," .
            " PAGE_ID, RESOURCE_PATH, RESOURCE_ID ORDER BY TOTAL_SCORE DESC";
        $results = $db->execute($sql);
        $popular_recommendations[C\RESOURCE_RECOMMENDATION] = [];
        while ($row = $db->fetchArray($results)) {
            $popular_recommendations[C\RESOURCE_RECOMMENDATION][] = $row;
        }
        $base_recommend_sql = "INSERT INTO GROUP_RESOURCE_RECOMMENDATION" .
            " VALUES ";
        $insert_recommend_sql = $base_recommend_sql;
        $comma = "";
        $insert_count = 0;
        $new_user_sql = "SELECT USER_ID FROM USERS WHERE USER_ID NOT IN" .
            "(SELECT USER_ID FROM GROUP_RESOURCE_RECOMMENDATION)";
        $new_user_results = $db->execute($new_user_sql);
        while ($row = $db->fetchArray($new_user_results)) {
            $user_id = $row['USER_ID'];
            $timestamp = time();
            foreach ($popular_recommendations[C\RESOURCE_RECOMMENDATION] as
                $recommendation) {
                $insert_recommend_sql .=
                    "$comma ($user_id, {$recommendation['GROUP_ID']}, ".
                    "{$recommendation['PAGE_ID']}, " .
                    "'{$recommendation['RESOURCE_PATH']}', ".
                    "{$recommendation['TOTAL_SCORE']}, {$this->update_time}, ".
                    "{$recommendation['RESOURCE_ID']})";
                $comma = ",";
                $insert_count++;
                if ($insert_count > C\BATCH_SQL_INSERT_NUM) {
                    $db->execute($insert_recommend_sql);
                    $insert_recommend_sql = $base_recommend_sql;
                    $insert_count = 0;
                    $comma = "";
                }
            }
        }
        if ($insert_count > 0) {
            $db->execute($insert_recommend_sql);
        }
    }
    /**
     * Manages the whole process of computing thread and group recommendations
     * for users. Makes a series of calls to handle parts of this computation
     * before synthesizing the result
     */
    public function computeThreadGroupRecommendations()
    {
        L\crawlLog("...Start computing Item Term Embeddings...");
        $item_terms = $this->computeItemTermEmbeddings();
        L\crawlLog("...Finished computing Item Term Embeddings...");
        L\crawlLog("...Start computing Item Embeddings...");
        $item_embeddings = $this->computeItemEmbeddings($item_terms);
        L\crawlLog("...Finished computing Item Embeddings...");
        L\crawlLog("...Start write back term embeddings from cache to db");
        $this->saveTermEmbeddingsCacheToDb(C\THREAD_RECOMMENDATION);
        L\crawlLog("...Finished write back term embeddings from cache to db");
        L\crawlLog("...Start computing Item User Embeddings...");
        [$item_user_embeddings, $user_items] = $this->
            computeItemUserEmbeddings($item_embeddings);
        L\crawlLog("...Finshed computing Item User Embeddings...");
        L\crawlLog("...Start computing Item User Recommendations...");
        $user_groups = $this->computeItemUserRecommendations($item_embeddings,
            $item_user_embeddings, $user_items);
        L\crawlLog("...Finished computing Item User Recommendations...");
        unset($item_user_embeddings);
        unset($user_items);
        L\crawlLog("...Start computing Group Embeddings...");
        $group_embeddings = $this->computeGroupEmbeddings($item_embeddings);
        L\crawlLog("...Finished computing Group Embeddings...");
        unset($item_embedding);
        L\crawlLog("...Start computing Group User Embeddings...");
        [$group_user_embeddings, $user_group_impression] =
            $this->computeGroupUserEmbeddings($group_embeddings);
        L\crawlLog("...Finished computing Group User Embeddings...");
        L\crawlLog("...Start computing Group User Recommendations...");
        $this->computeGroupUserRecommendations($group_embeddings,
            $group_user_embeddings, $user_groups, $user_group_impression);
        L\crawlLog("...Finished computing Group User Recommendations...");
        unset($group_embeddings);
        unset($group_user_embeddings);
        unset($user_group_impression);
        unset($user_groups);
    }
    /**
     * Computes the term embeddings for individual items (main thread only and
     * not comments) in groups feeds for the terms in their title and
     * description text. Processes only MAX_GROUP_ITEMS which are either newly
     * created or recently edited
     *
     * @return array $item_terms terms in each item
     */
    public function computeItemTermEmbeddings()
    {
        $db = $this->db;
        $this->lru_cache = new LRUCache(self::MAX_TERM_EMBEDDINGS);
        $select_sql = "SELECT * FROM RECOMMENDATION_TERM_EMBEDDING WHERE" .
            " ITEM_TYPE = ? " . $db->limitOffset(self::MAX_TERM_EMBEDDINGS);
        $results = $db->execute($select_sql, [C\THREAD_RECOMMENDATION]);
        $term_embeddings = [];
        $item_terms = [];
        L\crawlLog("Start Populating LRUCache of Embeddings...");
        while ($row = $db->fetchArray($results)) {
            if (is_string($row['VECTOR'])) {
                $this->lru_cache->put($row['ID'],
                    base64_decode($row['VECTOR'], true));
            } else {
                var_dump($row['VECTOR']);
            }
        }
        L\crawlLog("Finish Populating LRUCache of Embeddings");
        $context_distance_sum = (self::CONTEXT_WINDOW_LENGTH *
            (self::CONTEXT_WINDOW_LENGTH + 1)) / 2.0;
        $mean = $context_distance_sum / self::CONTEXT_WINDOW_LENGTH;
        $carry = 0.0;
        for ($i = 1; $i <= self::CONTEXT_WINDOW_LENGTH; $i++) {
            $difference = $i - $mean;
            $carry += $difference * $difference;
        }
        $std_deviation = sqrt($carry / self::CONTEXT_WINDOW_LENGTH);
        $item_count_sql = "SELECT COUNT(*) AS NUM_ITEMS FROM GROUP_ITEM ".
            "WHERE ID = PARENT_ID AND TITLE NOT LIKE '%Page%'" .
            $db->limitOffset(self::MAX_GROUP_ITEMS);
        $results = $db->execute($item_count_sql);
        $num_items = 0;
        if ($results) {
            $row = $db->fetchArray($results);
            $num_items = $row['NUM_ITEMS'] ?? 0;
        }
        $num_batches = ceil($num_items/self::MAX_BATCH_SIZE);
        L\crawlLog("Number of group items will consider: " . $num_items);
        $item_count = 0;
        for ($item_batch = 0; $item_batch < $num_batches; $item_batch++) {
            $group_item_sql = "SELECT * FROM GROUP_ITEM WHERE ID = PARENT_ID" .
                " AND TITLE NOT LIKE '%Page%' ORDER BY EDIT_DATE DESC " .
                $db->limitOffset($item_batch * self::MAX_BATCH_SIZE,
                self::MAX_BATCH_SIZE);
            L\crawlTimeoutLog("Have processed $item_count many group items");
            $results = $db->execute($group_item_sql);
            $batch_items = [];
            while ($row = $db->fetchArray($results)) {
                $item_id = $row['ID'];
                $text_corpus = $row['TITLE'] . " " . $row['DESCRIPTION'];
                $text_corpus = mb_strtolower($text_corpus);
                $terms = $this->cleanRemoveStopWords($text_corpus);
                $batch_items[$item_id] = [$terms, $row['GROUP_ID']];
            }
            foreach ($batch_items as $item_id => $term_data) {
                $item_terms[$item_id] = $term_data;
                $terms = $term_data[0];
                $num_terms = count($terms);
                for ($i = 0; $i < $num_terms; $i++) {
                    L\crawlTimeoutLog("Have processed $i of $num_terms terms");
                    [$term_id, $term] = $terms[$i];
                    $term_hash = unpack('N',
                        hash(self::HASH_ALGORITHM, $term, true))[1] %
                        C\EMBEDDING_VECTOR_SIZE + 1;
                    $term_sign_hash =
                        hash(self::SIGN_HASH_ALGORITHM, $term, true);
                    $term_sign =
                        unpack('N', $term_sign_hash)[1] % 2 == 0 ? -1 : 1;
                    $term_embedding = $this->getTermEmbedding($term_id,
                        C\THREAD_RECOMMENDATION);
                    $term_embedding = unpack("E*", $term_embedding);
                    for ($j = $i - 1; $j >= 0 &&
                        $j >= $i - self::CONTEXT_WINDOW_LENGTH; $j--) {
                        [$context_term_id, $context_term] = $terms[$j];
                        $context_term_embedding = $this->getTermEmbedding(
                            $context_term_id, C\THREAD_RECOMMENDATION);
                        $context_term_embedding = unpack("E*",
                            $context_term_embedding);
                        $weight = exp(-1 * pow(($i - $j) / $std_deviation, 2));
                        $context_term_hash =
                            unpack('N', hash(self::HASH_ALGORITHM,
                            $context_term, true))[1] %
                            C\EMBEDDING_VECTOR_SIZE + 1;
                        $context_term_sign_hash =
                            hash(self::SIGN_HASH_ALGORITHM,$context_term, true);
                        $context_term_sign =
                            unpack('N', $context_term_sign_hash)[1] % 2 == 0 ?
                            -1 : 1;
                        $term_embedding[$context_term_hash] +=
                            $context_term_sign * $weight;
                        $context_term_embedding[$term_hash] +=
                            $term_sign * $weight;
                        $context_term_embedding = pack("E*",
                            ...$context_term_embedding);
                        $this->updateTermEmbeddingCache($context_term_id,
                            $context_term_embedding, C\THREAD_RECOMMENDATION,
                            "thread_context_term_update");
                    }
                    $term_embedding = pack("E*", ...$term_embedding);
                    $this->updateTermEmbeddingCache($term_id, $term_embedding,
                        C\THREAD_RECOMMENDATION, "thread_term_update");
                }
            }
            $item_count++;
        }
        return $item_terms;
    }
    /**
     * Computes the item embeddings for individual items (main thread only and
     * not comments) in groups feeds using the term embeddings for their terms.
     * Additionally fetches the existing item embeddings from database and
     * updates them if the term embeddings are updated for their terms
     *
     * @param array $item_terms terms in each item
     * @return array $updated_item_embeddings containing embeddings for items
     */
    public function computeItemEmbeddings($item_terms)
    {
        $db = $this->db;
        $updated_item_embeddings = [];
        $item_count = 0;
        foreach ($item_terms as $item_id => [$terms, $group_id]) {
            L\crawlTimeoutLog("Have done $item_count many group items");
            $item_embedding = array_fill(1, C\EMBEDDING_VECTOR_SIZE, 0);
            foreach ($terms as [$term_id, $term]) {
                $term_embedding = $this->getTermEmbedding($term_id,
                    C\THREAD_RECOMMENDATION, true);
                $term_embedding = unpack("E*", $term_embedding);
                $item_embedding = LinearAlgebra::add($item_embedding,
                    $term_embedding);
            }
            $item_embedding = LinearAlgebra::normalize($item_embedding);
            $item_embedding = pack("E*", ...$item_embedding);
            $updated_item_embeddings[$item_id] = [$item_embedding, $group_id];
            $item_count++;
        }
        $base_delete_sql = "DELETE FROM RECOMMENDATION_ITEM_EMBEDDING" .
            " WHERE ITEM_TYPE = ? AND ID IN (";
        $delete_sql = $base_delete_sql;
        $base_insert_sql = "INSERT INTO RECOMMENDATION_ITEM_EMBEDDING VALUES ";
        $insert_sql = $base_insert_sql;
        $comma = "";
        $insert_count = 0;
        $total_insert = 0;
        $item_type = C\THREAD_RECOMMENDATION;
        foreach ($updated_item_embeddings as
            $item_id => [$embedding, $parent_id]) {
            L\crawlTimeoutLog("Have inserted $total_insert many group items");
            $embedding = base64_encode($embedding);
            $insert_sql .= "$comma($item_id, $item_type, " .
                "'$embedding', $parent_id)";
            $delete_sql .= "$comma $item_id";
            $comma = ",";
            $insert_count++;
            $total_insert++;
            if ($insert_count == C\BATCH_SQL_INSERT_NUM) {
                $delete_sql .= ")";
                $db->execute($delete_sql, [C\THREAD_RECOMMENDATION]);
                $insert_sql = $db->insertIgnore($insert_sql);
                $db->execute($insert_sql);
                $insert_count = 0;
                $comma = "";
                $delete_sql = $base_delete_sql;
                $insert_sql = $base_insert_sql;
            }
        }
        if ($insert_count > 0) {
            $delete_sql .= ")";
            $db->execute($delete_sql, [C\THREAD_RECOMMENDATION]);
            $insert_sql = $db->insertIgnore($insert_sql);
            $db->execute($insert_sql);
        }
        return $updated_item_embeddings;
    }
    /**
     * Computes the user embeddings based on the item embeddings which user have
     * impression in ITEM_IMPRESSION_SUMMARY table for defined UPDATE_PERIOD
     *
     * @param array $item_embeddings embedding vectors of items
     * @return array [$item_user_embedding, $user_items] user embeddings for
     * items and the items id user have impression
     */
    public function computeItemUserEmbeddings($item_embeddings)
    {
        $db = $this->db;
        //SQLITE and MYSQL use GROUP_CONCAT, Postgres uses STRING_AGG
        $db_list_function = in_array($db->to_upper_dbms, ["SQLITE3", "MYSQL"]) ?
            "GROUP_CONCAT" : "STRING_AGG";
        $timestamp = floor(time() / self::UPDATE_PERIOD ) * self::UPDATE_PERIOD;
        $condition = "ITEM_TYPE = ? AND USER_ID <> 2 AND" .
            " ((UPDATE_PERIOD = ? AND UPDATE_TIMESTAMP = ?) OR" .
            " (UPDATE_PERIOD = ?))";
        $impression_sql = "SELECT USER_ID, ".
            "$db_list_function(CAST(ITEM_ID AS VARCHAR), ',') AS " .
            "ITEM_IDS FROM ITEM_IMPRESSION_SUMMARY WHERE $condition " .
            "GROUP BY USER_ID";
        $results = $db->execute($impression_sql,
            [C\THREAD_IMPRESSION, self::UPDATE_PERIOD, $timestamp,
                C\MOST_RECENT_VIEW]);
        $item_user_embeddings = [];
        $user_items = [];
        $user_count = 0;
        while ($row = $db->fetchArray($results)) {
            L\crawlTimeoutLog("Have done $user_count many user embeddings");
            $user_id = $row['USER_ID'];
            $item_ids = explode(",", $row['ITEM_IDS']);
            $item_ids = array_unique($item_ids);
            $item_user_embeddings[$user_id] = array_fill(1,
                C\EMBEDDING_VECTOR_SIZE, 0);
            $user_items[$user_id] = [];
            foreach ($item_ids as $item_id) {
                if (array_key_exists($item_id, $item_embeddings)) {
                    $item_embedding = unpack("E*",
                        $item_embeddings[$item_id][0]);
                    $item_user_embeddings[$user_id] = LinearAlgebra::add(
                        $item_user_embeddings[$user_id], $item_embedding);
                    $user_items[$user_id][] = $item_id;
                }
            }
            $item_user_embeddings[$user_id] = LinearAlgebra::normalize(
                $item_user_embeddings[$user_id]);
            $item_user_embeddings[$user_id] = pack("E*",
                ...$item_user_embeddings[$user_id]);
            $user_count++;
        }
        return [$item_user_embeddings, $user_items];
    }
    /**
     * Computes the items recommendation for user based on the cosine similarity
     * between user embeddings and item embeddings. Recommendations are
     * calculated for the items user have not interacted with yet and items
     * should be from the groups where the user is already a memeber
     *
     * @param array $item_embeddings embeddings vectors for items
     * @param array $item_user_embeddings embeddings vectors for user
     * @param array $user_items items id for user in impression table
     * @return array $user_groups group ids where the user is a member
     */
    public function computeItemUserRecommendations($item_embeddings,
        $item_user_embeddings, $user_items)
    {
        L\crawlLog("...Computing User Item Similarity Scores.");
        $db = $this->db;
        //SQLITE and MYSQL use GROUP_CONCAT, Postgres uses STRING_AGG
        $db_list_function = in_array($db->to_upper_dbms, ["SQLITE3", "MYSQL"]) ?
            "GROUP_CONCAT" : "STRING_AGG";
        $user_group_sql = "SELECT USER_ID, $db_list_function(" .
            "CAST(GROUP_ID AS VARCHAR), ',') " .
            "AS GROUP_IDS FROM USER_GROUP GROUP BY USER_ID";
        $results = $db->execute($user_group_sql);
        $user_groups = [];
        while ($row = $db->fetchArray($results)) {
            $user_id = $row['USER_ID'];
            $group_ids = explode(",", $row['GROUP_IDS']);
            $user_groups[$user_id] = $group_ids;
        }
        $item_user_recommendations = [];
        $user_count = 0;
        foreach ($item_user_embeddings as $user_id => $embedding) {
            L\crawlTimeoutLog("Have done $user_count many user recommendation");
            $embedding = unpack("E*", $embedding);
            if (array_key_exists($user_id, $user_groups)) {
                $item_count = 0;
                foreach ($item_embeddings as
                    $item_id => [$item_embedding, $parent_id]) {
                    L\crawlTimeoutLog("Have done $item_count many items");
                    if (in_array($item_id, $user_items[$user_id]) ||
                        !in_array($parent_id, $user_groups[$user_id])) {
                        continue;
                    }
                    $item_embedding = unpack("E*", $item_embedding);
                    $similarity = LinearAlgebra::similarity(
                        $item_embedding, $embedding);
                    $item_user_recommendations[] = [$user_id,
                        $item_id, $similarity];
                    $item_count++;
                }
            }
            $user_count++;
        }
        $delete_sql = "DELETE FROM GROUP_ITEM_RECOMMENDATION WHERE" .
            " ITEM_TYPE = ?";
        $db->execute($delete_sql, [C\THREAD_RECOMMENDATION]);
        $base_insert_sql = "INSERT INTO GROUP_ITEM_RECOMMENDATION VALUES ";
        $insert_sql = $base_insert_sql;
        $comma = "";
        $insert_count = 0;
        $total_insert = 0;
        $item_type = C\THREAD_RECOMMENDATION;
        foreach ($item_user_recommendations as $recommendation) {
            [$user_id, $item_id, $similarity] = $recommendation;
            L\crawlTimeoutLog("Have inserted $total_insert recommendations");
            $insert_sql .= "$comma($user_id, $item_id" .
                ", $item_type, $similarity, {$this->update_time})";
            $comma = ",";
            $insert_count++;
            $total_insert++;
            if ($insert_count == C\BATCH_SQL_INSERT_NUM) {
                $insert_sql = $db->insertIgnore($insert_sql);
                $db->execute($insert_sql);
                $insert_count = 0;
                $comma = "";
                $insert_sql = $base_insert_sql;
            }
        }
        if ($insert_count > 0) {
            $insert_sql = $db->insertIgnore($insert_sql);
            $db->execute($insert_sql);
        }
        return $user_groups;
    }
    /**
     * Computes the group embeddings using the item embeddings for the items in
     * a group. Additionally fetches the existing group embeddings from database
     * and updates them if the item embeddings are updated
     *
     * @param array $item_embeddings embedding for the items
     * @return array $updated_group_embeddings containing embeddings for groups
     */
    public function computeGroupEmbeddings($item_embeddings)
    {
        $db = $this->db;
        $updated_group_embeddings = [];
        $group_count = 0;
        foreach ($item_embeddings as $item_id => [$embedding, $parent_id]) {
            L\crawlTimeoutLog("Have done $group_count many groups");
            if (array_key_exists($parent_id, $updated_group_embeddings)) {
                $embedding = unpack("E*", $embedding);
                $group_embedding = unpack("E*",
                    $updated_group_embeddings[$parent_id]);
                $updated_group_embeddings[$parent_id] = pack("E*",
                    ...LinearAlgebra::add($embedding, $group_embedding));
            } else {
                $updated_group_embeddings[$parent_id] = $embedding;
            }
            $group_count++;
        }
        foreach ($updated_group_embeddings as $group_id => $embedding) {
            $embedding = unpack("E*", $embedding);
            $embedding = LinearAlgebra::normalize($embedding);
            $updated_group_embeddings[$group_id] = pack("E*",
                ...$embedding);
        }
        $base_delete_sql = "DELETE FROM RECOMMENDATION_ITEM_EMBEDDING" .
            " WHERE ITEM_TYPE = ? AND ID IN (";
        $delete_sql = $base_delete_sql;
        $base_insert_sql = "INSERT INTO RECOMMENDATION_ITEM_EMBEDDING VALUES ";
        $insert_sql = $base_insert_sql;
        $comma = "";
        $insert_count = 0;
        $total_insert = 0;
        $item_type = C\GROUP_RECOMMENDATION;
        foreach ($updated_group_embeddings as $group_id => $embedding) {
            L\crawlTimeoutLog("Have inserted $total_insert group embeddings");
            $embedding = serialize(unpack("E*", $embedding));
            $insert_sql .= "$comma($group_id, $item_type, " .
                "'$embedding', $group_id)";
            $delete_sql .= "$comma $group_id";
            $comma = ",";
            $insert_count++;
            $total_insert++;
            if ($insert_count == C\BATCH_SQL_INSERT_NUM) {
                $delete_sql .= ")";
                $db->execute($delete_sql, [C\GROUP_RECOMMENDATION]);
                $insert_sql = $db->insertIgnore($insert_sql);
                $db->execute($insert_sql);
                $insert_count = 0;
                $comma = "";
                $delete_sql = $base_delete_sql;
                $insert_sql = $base_insert_sql;
            }
        }
        if ($insert_count > 0) {
            $delete_sql .= ")";
            $db->execute($delete_sql, [C\GROUP_RECOMMENDATION]);
            $insert_sql = $db->insertIgnore($insert_sql);
            $db->execute($insert_sql);
        }
        return $updated_group_embeddings;
    }
    /**
     * Computes the user embeddings based on the group embeddings which user
     * have impression in ITEM_IMPRESSION_SUMMARY table for defined
     * UPDATE_PERIOD or are a member in the group
     *
     * @param array $group_embeddings embedding vectors of groups
     * @return array [$group_user_embedding, $user_groups] user embeddings for
     * groups and the groups id user have membership
     */
    public function computeGroupUserEmbeddings($group_embeddings)
    {
        $db = $this->db;
        //SQLITE and MYSQL use GROUP_CONCAT, Postgres uses STRING_AGG
        $db_list_function = in_array($db->to_upper_dbms, ["SQLITE3", "MYSQL"]) ?
            "GROUP_CONCAT" : "STRING_AGG";
        $timestamp = floor(time() / self::UPDATE_PERIOD ) * self::UPDATE_PERIOD;
        $condition = "ITEM_TYPE = ? AND USER_ID <> 2 AND" .
            " ((UPDATE_PERIOD = ? AND UPDATE_TIMESTAMP = ?) OR" .
            " (UPDATE_PERIOD = ?))";
        $impression_sql = "SELECT USER_ID, $db_list_function( ".
            "CAST(ITEM_ID AS VARCHAR), ',') AS " .
            "ITEM_IDS FROM ITEM_IMPRESSION_SUMMARY WHERE $condition " .
            "GROUP BY USER_ID";
        $results = $db->execute($impression_sql,
            [C\GROUP_IMPRESSION, self::UPDATE_PERIOD, $timestamp,
                C\MOST_RECENT_VIEW]);
        $group_user_embeddings = [];
        $user_groups = [];
        $user_count = 0;
        while ($row = $db->fetchArray($results)) {
            L\crawlTimeoutLog("Have done $user_count many user embeddings");
            $user_id = $row['USER_ID'];
            $group_ids = explode(",", $row['ITEM_IDS']);
            $group_ids = array_unique($group_ids);
            $group_user_embeddings[$user_id] = array_fill(1,
                C\EMBEDDING_VECTOR_SIZE, 0);
            $user_groups[$user_id] = [];
            $group_count = 0;
            foreach ($group_ids as $group_id) {
                L\crawlTimeoutLog("Have done $group_count many groups");
                if (array_key_exists($group_id, $group_embeddings)) {
                    $embedding = unpack("E*",
                        $group_embeddings[$group_id]);
                    $group_user_embeddings[$user_id] = LinearAlgebra::add(
                        $group_user_embeddings[$user_id], $embedding);
                    $user_groups[$user_id][] = $group_id;
                }
                $group_count++;
            }
            $group_user_embeddings[$user_id] = pack("E*",
                ...LinearAlgebra::normalize($group_user_embeddings[$user_id]));
            $user_count++;
        }
        return [$group_user_embeddings, $user_groups];
    }
    /**
     * Computes the group recommendation for user based on the cosine similarity
     * between user embeddings and group embeddings. Recommendations are
     * calculated for the groups whic user has not interacted with yet and
     * they are not member of that group
     *
     * @param array $group_embeddings embeddings vector for groups
     * @param array $group_user_embeddings embeddings vector for users
     * @param array $user_groups groups id for user having membership
     * @return array $user_group_impression group ids which user has seen
     */
    public function computeGroupUserRecommendations($group_embeddings,
        $group_user_embeddings, $user_groups, $user_group_impression)
    {
        $db = $this->db;
        $invite_groups_sql = "SELECT GROUP_ID FROM SOCIAL_GROUPS" .
            " WHERE REGISTER_TYPE = ?";
        $results = $db->execute($invite_groups_sql, [C\INVITE_ONLY_JOIN]);
        $exclude_group_ids = [];
        while ($row = $db->fetchArray($results)) {
            $exclude_group_ids[] = $row['GROUP_ID'];
        }
        $group_user_recommendations = [];
        $user_count = 0;
        foreach ($group_user_embeddings as $user_id => $embedding) {
            L\crawlTimeoutLog("Have done $user_count many user");
            $embedding = unpack("E*", $embedding);
            $group_count = 0;
            foreach ($group_embeddings as $group_id => $group_embedding) {
                L\crawlTimeoutLog("Have done $group_count many groups");
                if (in_array($group_id, $exclude_group_ids) ||
                    in_array($group_id, $user_groups[$user_id]) ||
                    in_array($group_id, $user_group_impression[$user_id])) {
                    continue;
                }
                $group_embedding = unpack("E*", $group_embedding);
                $similarity = LinearAlgebra::similarity($embedding,
                    $group_embedding);
                $group_user_recommendations[] = [$user_id, $group_id,
                    $similarity];
                $group_count++;
            }
            $user_count++;
        }
        $delete_sql = "DELETE FROM GROUP_ITEM_RECOMMENDATION WHERE" .
            " ITEM_TYPE = ?";
        $db->execute($delete_sql, [C\GROUP_RECOMMENDATION]);
        $base_insert_sql = "INSERT INTO GROUP_ITEM_RECOMMENDATION VALUES ";
        $insert_sql = $base_insert_sql;
        $comma = "";
        $insert_count = 0;
        $total_insert = 0;
        $item_type = C\GROUP_RECOMMENDATION;
        foreach ($group_user_recommendations as $recommendation) {
            L\crawlTimeoutLog("Have inserted $total_insert recommendations");
            [$user_id, $group_id, $similarity] = $recommendation;
            $insert_sql .= "$comma($user_id, $group_id" .
                ", $item_type, $similarity, {$this->update_time})";
            $comma = ",";
            $insert_count++;
            $total_insert++;
            if ($insert_count == C\BATCH_SQL_INSERT_NUM) {
                $insert_sql = $db->insertIgnore($insert_sql);
                $db->execute($insert_sql);
                $insert_count = 0;
                $comma = "";
                $insert_sql = $base_insert_sql;
            }
        }
        if ($insert_count > 0) {
            $insert_sql = $db->insertIgnore($insert_sql);
            $db->execute($insert_sql);
        }
    }
    /**
     * Manages the whole process of computing wiki resource recommendations
     * for users. Makes a series of calls to handle parts of this computation
     * before synthesizing the result
     */
    public function computeWikiResourceRecommendations()
    {
        L\crawlLog("...Start fetching descriptions for the wiki page " .
            "resources...");
        [$descriptions, $resource_metadata] = $this->
            getWikiResourceDescriptions();
        L\crawlLog("...Finished fetching descriptions for the wiki page " .
            "resources...");
        L\crawlLog("...Start computing wiki term embeddings...");
        [$resource_terms, $meta_details_terms] =
            $this->computeWikiTermEmbeddings($descriptions);
        L\crawlLog("...Finished computing wiki term embeddings...");
        L\crawlLog("...Start computing wiki resource embeddings...");
        $item_embeddings = $this->computeWikiResourceEmbeddings($resource_terms,
            $meta_details_terms);
        L\crawlLog("...Finished computing wiki resource embeddings...");
        unset($resource_terms);
        unset($meta_details_terms);
        L\crawlLog("...Start write back term embeddings from cache to db");
        $this->saveTermEmbeddingsCacheToDb(C\RESOURCE_RECOMMENDATION);
        L\crawlLog("...Finished write back term embeddings from cache to db");
        L\crawlLog("...Start computing wiki user embeddings...");
        [$user_embeddings, $user_items] = $this->computeWikiUserEmbeddings(
            $item_embeddings);
        L\crawlLog("...Finished computing wiki user embeddings...");
        L\crawlLog("...Start computing wiki resource recommendations...");
        $this->computeWikiUserRecommendations($item_embeddings,
            $user_embeddings, $user_items, $resource_metadata);
        L\crawlLog("...Done computing wiki resource recommendations...");
        unset($user_embeddings);
        unset($user_items);
        unset($item_embeddings);
        unset($resource_metadata);
    }
    /**
     * Fetches the description for the eligible wiki resources having the root
     * folder path captured in RECOMMENDATION_FILE
     *
     * @return array $descriptions of resources
     */
    public function getWikiResourceDescriptions()
    {
        $thumb_folders = [];
        if (file_exists(self::RECOMMENDATION_FILE)) {
            $thumb_folders = explode("\n",
                file_get_contents(self::RECOMMENDATION_FILE));
        }
        $thumb_folders = array_unique($thumb_folders);
        $thumb_folders_copy = $thumb_folders;
        $descriptions = [];
        $resource_metadata = [];
        foreach ($thumb_folders as $thumb_folder) {
            array_shift($thumb_folders_copy);
            if (empty($thumb_folder)) {
                continue;
            }
            list($group_id, $page_id, $folder) = explode("###", $thumb_folder);
            $folder = trim($folder, " \n\r\t\v\x00");
            $files = $this->getDescriptionFiles($folder);
            foreach ($files as $file) {
                $resource_file = substr($file, 0, strlen($file) - 4);
                $resource_id = unpack('n', md5($group_id . $page_id .
                    $resource_file, true))[1];
                if (array_key_exists($resource_id, $descriptions)) {
                    continue;
                }
                $description = file_get_contents($file);
                if (strcmp($description, "Description search sources".
                    " failed to find description.") == 0) {
                        continue;
                }
                $descriptions[$resource_id] = $description;
                $resource_metadata[$resource_id] = [$group_id,
                    $page_id, $resource_file];
                if (count($descriptions) >= self::MAX_BATCH_SIZE) {
                    L\crawlLog("Reached max resources limit");
                    file_put_contents(self::RECOMMENDATION_FILE,
                        implode(PHP_EOL, $thumb_folders_copy));
                    return [$descriptions, $resource_metadata];
                }
            }
        }
        return [$descriptions, $resource_metadata];
    }
    /**
     * Returns all the resource description files in a given thumb folder and
     * also recursively scan through subfolders if any
     *
     * @param string $thumb_folder path of a thumb folder
     * @return array $files list of description files path in given folder
     */
    public function getDescriptionFiles($thumb_folder)
    {
        if (!is_dir($thumb_folder)) {
            return [];
        }
        $exclude_files = [".", "..", "needs_description.txt",
            "subfolder_counts.txt", ".DS_Store"];
        $files = scandir($thumb_folder);
        $file_paths = [];
        foreach ($files as $file) {
            if (in_array($file, $exclude_files)) {
                continue;
            }
            $to_process = $thumb_folder . "/" . $file;
            if (is_dir($to_process)) {
                L\crawlLog("...$to_process is a folder," .
                    " looking files inside it...");
                $sub_file_paths = $this->getDescriptionFiles($to_process);
                $file_paths = array_merge($file_paths, $sub_file_paths);
            } else {
                $file_paths[] = $to_process;
            }
        }
        return $file_paths;
    }
    /**
     * Computes the embedding for new terms in the description of wiki
     * resources and updates the embedding of existing terms using Hash2Vec
     * approach
     *
     * @param array $descriptions of resources
     * @return array [$resource_terms, $meta_details_term]
     */
    public function computeWikiTermEmbeddings($descriptions)
    {
        $db = $this->db;
        $this->lru_cache = new LRUCache(self::MAX_TERM_EMBEDDINGS);
        $select_sql = "SELECT * FROM RECOMMENDATION_TERM_EMBEDDING WHERE " .
            "ITEM_TYPE = ? " . $db->limitOffset(self::MAX_TERM_EMBEDDINGS);
        $results = $db->execute($select_sql, [C\RESOURCE_RECOMMENDATION]);
        $resource_terms = [];
        $meta_details_terms = [];
        while ($row = $db->fetchArray($results)) {
            if (is_string($row['VECTOR'])) {
                $this->lru_cache->put($row['ID'],
                    base64_decode($row['VECTOR'], true));
            } else {
                var_dump($row);
            }
        }
        $context_distance_sum = (self::CONTEXT_WINDOW_LENGTH *
            (self::CONTEXT_WINDOW_LENGTH + 1)) / 2.0;
        $mean = $context_distance_sum / self::CONTEXT_WINDOW_LENGTH;
        $carry = 0.0;
        for ($i = 1; $i <= self::CONTEXT_WINDOW_LENGTH; $i++) {
            $difference = $i - $mean;
            $carry += $difference * $difference;
        }
        $std_deviation = sqrt($carry / self::CONTEXT_WINDOW_LENGTH);
        $resource_count = 0;
        foreach ($descriptions as $resource_id => $description) {
            L\crawlTimeoutLog("Have processed $resource_count many resources");
            $resource_terms[$resource_id] = [];
            $meta_details_terms[$resource_id] = [];
            $description_parts = explode("\n", $description);
            foreach ($description_parts as $description_part) {
                $description_part = mb_strtolower($description_part);
                $terms = $this->cleanRemoveStopWords($description_part, true);
                if (count($terms) < self::CONTEXT_WINDOW_LENGTH) {
                    $meta_details_terms[$resource_id] = array_merge($terms,
                        $meta_details_terms[$resource_id]);
                } else {
                    $resource_terms[$resource_id] = array_merge($terms,
                        $resource_terms[$resource_id]);
                }
            }
            if (count($resource_terms[$resource_id]) > 0) {
                $terms = $resource_terms[$resource_id];
                $num_terms = count($terms);
                for ($i = 0; $i < $num_terms; $i++) {
                    L\crawlTimeoutLog("Have processed $i of $num_terms terms");
                    [$term_id, $term] = $terms[$i];
                    $term_hash = unpack('N', hash(self::HASH_ALGORITHM, $term,
                        true))[1] % C\EMBEDDING_VECTOR_SIZE + 1;
                    $term_sign_hash = hash(self::SIGN_HASH_ALGORITHM,
                        $term, true);
                    $term_sign = unpack('N', $term_sign_hash)[1]
                        % 2 == 0 ? -1 : 1;
                    $term_embedding = $this->getTermEmbedding($term_id,
                        C\RESOURCE_RECOMMENDATION);
                    $term_embedding = unpack("E*", $term_embedding);
                    for ($j = $i - 1; $j >= 0 &&
                        $j >= $i - self::CONTEXT_WINDOW_LENGTH; $j--) {
                        [$context_term_id, $context_term] = $terms[$j];
                        $context_term_embedding = $this->getTermEmbedding(
                            $context_term_id, C\RESOURCE_RECOMMENDATION);
                        $context_term_embedding = unpack("E*",
                            $context_term_embedding);
                        $weight = exp(-1 * pow(($i - $j) / $std_deviation, 2));
                        $context_term_hash = unpack('N', hash(
                            self::HASH_ALGORITHM, $context_term, true))[1] %
                            C\EMBEDDING_VECTOR_SIZE + 1;
                        $context_term_sign_hash = hash(
                            self::SIGN_HASH_ALGORITHM, $context_term, true);
                        $context_term_sign = unpack('N',
                            $context_term_sign_hash)[1] % 2 == 0 ? -1 : 1;
                        $term_embedding[$context_term_hash] +=
                            $context_term_sign * $weight;
                        $context_term_embedding[$term_hash] +=
                            $term_sign * $weight;
                        $context_term_embedding = pack("E*",
                            ...$context_term_embedding);
                        $this->updateTermEmbeddingCache($context_term_id,
                            $context_term_embedding, C\RESOURCE_RECOMMENDATION,
                            "resource_context_term_update");
                    }
                    $term_embedding = pack("E*", ...$term_embedding);
                    $this->updateTermEmbeddingCache($term_id, $term_embedding,
                        C\RESOURCE_RECOMMENDATION, "resource_term_update");
                }
                $resource_count++;
            }
        }
        return [$resource_terms, $meta_details_terms];
    }
    /**
     * Split the given text into terms, clean the terms by removing non
     * alphanumeric characters and remove the stop terms in order to reduce the
     * noise while calculating the embeddings
     *
     * @param string $text which needs to be processed
     * @param boolean $description_stop_word_flag to remove
     * words present in DESCRIPTION_STOP_WORDS
     * @return array $terms [term_id, term] term_id calculated using md5 hash
     * for the term
     */
    public function cleanRemoveStopWords($text,
        $description_stop_word_flag = false)
    {
        $raw_terms = preg_split("/[\s,\/\._-]+/", $text);
        $terms = [];
        foreach ($raw_terms as $term) {
            $term = preg_replace("/\W/", "", $term);
            $term = preg_replace("/&rsquo/", "'", $term);
            $term = str_replace(['"', "'"], "", $term);
            if (strlen($term) > 0) {
                $terms[] = $term;
            }
        }
        $text_locale = L\guessLocaleFromString($text);
        $stop_obj = PhraseParser::getTokenizer($text_locale);
        if ($stop_obj && method_exists($stop_obj, "stoptermsRemover")) {
            $terms = $stop_obj->stoptermsRemover($terms);
        }
        $term_ids = [];
        foreach ($terms as $term) {
            if ($description_stop_word_flag &&
                in_array($term, self::DESCRIPTION_STOP_WORDS)) {
                continue;
            }
            $term_id = L\canonicalTerm($term);
            $term_ids[] = [$term_id, $term];
        }
        return $term_ids;
    }
    /**
     * Computes the embeddings for wiki page resources using the calculated
     * term embeddings and add the metadata details separately to the embeddings
     *
     * @param array $resource_terms of processed terms from resource description
     * @param array $meta_details_terms of raw resource descriptions
     * @return array $updated_item_embeddings array of updated wiki resource
     * embeddings
     */
    public function computeWikiResourceEmbeddings($resource_terms,
        $meta_details_terms)
    {
        $db = $this->db;
        $updated_item_embeddings = [];
        $resource_count = 0;
        foreach ($resource_terms as $resource_id => $terms) {
            L\crawlTimeoutLog("Have processed $resource_count many resources");
            $item_embedding = array_fill(1, C\EMBEDDING_VECTOR_SIZE, 0);
            foreach ($terms as [$term_id, $term]) {
                $term_embedding = $this->getTermEmbedding($term_id,
                    C\RESOURCE_RECOMMENDATION, true);
                $term_embedding = unpack("E*", $term_embedding);
                $item_embedding = LinearAlgebra::add($item_embedding,
                    $term_embedding);
            }
            $updated_item_embeddings[$resource_id] = pack("E*",
                ...$item_embedding);
            $resource_count++;
        }
        foreach ($meta_details_terms as $resource_id => $meta_terms) {
            if (!array_key_exists($resource_id, $updated_item_embeddings)) {
                $item_embedding = array_fill(1, C\EMBEDDING_VECTOR_SIZE, 0);
            } else {
                $item_embedding = unpack("E*",
                    $updated_item_embeddings[$resource_id]);
            }
            foreach ($meta_terms as [$meta_term_id, $meta_term]) {
                if (strlen($meta_term) <= 1) {
                    continue;
                }
                $meta_term_hash = unpack('N', hash(self::HASH_ALGORITHM,
                    $meta_term, true))[1] % C\EMBEDDING_VECTOR_SIZE + 1;
                $sign_hash = hash(self::SIGN_HASH_ALGORITHM, $meta_term, true);
                $sign = unpack('N', $sign_hash)[1] % 2 == 0 ? -1 : 1;
                $item_embedding[$meta_term_hash] += $sign * 1.0;
            }
            $updated_item_embeddings[$resource_id] = pack("E*",
                ...$item_embedding);
        }
        foreach ($updated_item_embeddings as $item_id => $embedding) {
            $embedding = unpack("E*", $embedding);
            $updated_item_embeddings[$item_id] = pack("E*",
                ...LinearAlgebra::normalize($embedding));
        }
        $delete_sql = "DELETE FROM RECOMMENDATION_ITEM_EMBEDDING WHERE" .
            " ITEM_TYPE = ?";
        $db->execute($delete_sql, [C\RESOURCE_RECOMMENDATION]);
        $base_insert_sql = "INSERT INTO RECOMMENDATION_ITEM_EMBEDDING VALUES ";
        $insert_sql = $base_insert_sql;
        $comma = "";
        $insert_count = 0;
        $total_insert = 0;
        $item_type = C\RESOURCE_RECOMMENDATION;
        foreach ($updated_item_embeddings as $resource_id => $embedding) {
            L\crawlTimeoutLog("Have inserted $total_insert many resources");
            $embedding = base64_encode($embedding);
            $insert_sql .= "$comma($resource_id, $item_type," .
                " '$embedding', $resource_id)";
            $comma = ",";
            $insert_count++;
            $total_insert++;
            if ($insert_count == C\BATCH_SQL_INSERT_NUM) {
                $insert_sql = $db->insertIgnore($insert_sql);
                $db->execute($insert_sql);
                $insert_count = 0;
                $comma = "";
                $insert_sql = $base_insert_sql;
            }
        }
        if ($insert_count > 0) {
            $insert_sql = $db->insertIgnore($insert_sql);
            $db->execute($insert_sql);
        }
        return $updated_item_embeddings;
    }
    /**
     * Computes user embeddings for wiki resources based on the user's resources
     * impression logged in ITEM_IMPRESSION_SUMMARY table for the defined update
     * period
     *
     * @param array $item_embeddings of wiki page resources embedding
     * @return array [$user_embeddings, $user_items] of user embeddings
     * for wiki resources and the user resource impression
     */
    public function computeWikiUserEmbeddings($item_embeddings)
    {
        $db = $this->db;
        //SQLITE and MYSQL use GROUP_CONCAT, Postgres uses STRING_AGG
        $db_list_function = in_array($db->to_upper_dbms, ["SQLITE3", "MYSQL"]) ?
            "GROUP_CONCAT" : "STRING_AGG";
        $timestamp = floor(time() / self::UPDATE_PERIOD ) * self::UPDATE_PERIOD;
        $condition = "ITEM_TYPE = ? AND USER_ID <> 2 AND" .
            " ((UPDATE_PERIOD = ? AND UPDATE_TIMESTAMP = ?) OR" .
            " (UPDATE_PERIOD = ?))";
        $impression_sql = "SELECT USER_ID, $db_list_function( " .
            "CAST(ITEM_ID AS VARCHAR), ',') AS " .
            "ITEM_IDS FROM ITEM_IMPRESSION_SUMMARY WHERE $condition " .
            "GROUP BY USER_ID";
        $results = $db->execute($impression_sql,
            [C\RESOURCE_IMPRESSION, self::UPDATE_PERIOD, $timestamp,
                C\MOST_RECENT_VIEW]);
        $user_embeddings = [];
        $user_items = [];
        $user_count = 0;
        while ($row = $db->fetchArray($results)) {
            L\crawlTimeoutLog("Have processed $user_count many users");
            $user_id = $row['USER_ID'];
            $item_ids = explode(",", $row['ITEM_IDS']);
            $item_ids = array_unique($item_ids);
            $user_embeddings[$user_id] = array_fill(1,
                C\EMBEDDING_VECTOR_SIZE, 0);
            $user_items[$user_id] = [];
            foreach ($item_ids as $item_id) {
                if (array_key_exists($item_id, $item_embeddings)) {
                    $embedding = unpack("E*",
                        $item_embeddings[$item_id]);
                    $user_embeddings[$user_id] = LinearAlgebra::add(
                        $user_embeddings[$user_id], $embedding);
                    $user_items[$user_id][] = $item_id;
                }
            }
            $user_embeddings[$user_id] = pack("E*",
                ...LinearAlgebra::normalize($user_embeddings[$user_id]));
            $user_count++;
        }
        return [$user_embeddings, $user_items];
    }
    /**
     * Computes the wiki resource recommendations based on cosine similarity
     * between resource embeddings and user embeddings
     *
     * @param array $item_embeddings of wiki resources embeddings
     * @param array $user_embeddings of users consumed wiki resources
     * embeddings
     * @param array $user_items of users consumed wiki resources
     */
    public function computeWikiUserRecommendations($item_embeddings,
        $user_embeddings, $user_items, $resource_metadata)
    {
        $db = $this->db;
        $recommendations = [];
        $user_count = 0;
        foreach ($user_embeddings as $user_id => $user_embedding) {
            L\crawlTimeoutLog("Have processed $user_count many users");
            $user_embedding = unpack("E*", $user_embedding);
            $resource_count = 0;
            foreach ($item_embeddings as $item_id => $item_embedding) {
                L\crawlTimeoutLog("Have processed $resource_count resources");
                if (in_array($item_id, $user_items[$user_id]) ||
                    !array_key_exists($item_id, $resource_metadata)) {
                    continue;
                }
                $item_embedding = unpack("E*", $item_embedding);
                $similarity = LinearAlgebra::similarity($user_embedding,
                    $item_embedding);
                list($group_id, $page_id, $resource_path) =
                    $resource_metadata[$item_id];
                unset($resource_metadata[$item_id]);
                $recommendations[] = [$user_id, $group_id, $page_id,
                    $resource_path, $similarity, $item_id];
                $resource_count++;
            }
            $user_count++;
        }
        $delete_sql = "DELETE FROM GROUP_RESOURCE_RECOMMENDATION";
        $db->execute($delete_sql);
        $base_insert_sql = "INSERT INTO GROUP_RESOURCE_RECOMMENDATION " .
            "VALUES ";
        $insert_sql = $base_insert_sql;
        $comma = "";
        $insert_count = 0;
        $total_insert = 0;
        foreach ($recommendations as $recommendation) {
            L\crawlTimeoutLog("Have inserted $total_insert recommendations");
            list($user_id, $group_id, $page_id, $resource_path,
                $score, $item_id) = $recommendation;
            $time = $this->update_time;
            $insert_sql .= "$comma($user_id, $group_id, $page_id, " .
                "'$resource_path', $score, $time, $item_id)";
            $comma = ",";
            $insert_count++;
            $total_insert++;
            if ($insert_count == C\BATCH_SQL_INSERT_NUM) {
                $insert_sql = $db->insertIgnore($insert_sql);
                $db->execute($insert_sql);
                $insert_count = 0;
                $comma = "";
                $insert_sql = $base_insert_sql;
            }
        }
        if ($insert_count > 0) {
            $insert_sql = $db->insertIgnore($insert_sql);
            $db->execute($insert_sql);
        }
    }
    /**
     * Returns the term embedding either from LRU cache or database
     *
     * @param int $term_id
     * @param int $item_type
     * @param boolean $update indicates whether to update the cache
     * @return string $term_embedding
     */
    public function getTermEmbedding($term_id, $item_type, $update = false)
    {
        $db = $this->db;
        $term_embedding = $this->lru_cache->get($term_id);
        if (!isset($term_embedding)) {
            $sql = "SELECT VECTOR FROM RECOMMENDATION_TERM_EMBEDDING " .
                "WHERE ITEM_TYPE = ? AND ID = ? " . $db->limitOffset(1);
            $result = $db->execute($sql, [$item_type, $term_id]);
            $row = null;
            if ($result) {
                $row = $db->fetchArray($result);
            }
            if (!$row || !is_string($row['VECTOR'])) {
                $term_embedding = pack("E*", ...array_fill(1,
                    C\EMBEDDING_VECTOR_SIZE, 0.0));
            } else {
                $db->closeCursor($result);
                $term_embedding = base64_decode($row['VECTOR'], true);
            }
        }
        if ($update) {
            $this->updateTermEmbeddingCache($term_id, $term_embedding,
                $item_type, "get_term_embedding");
        }
        return $term_embedding;
    }
    /**
     * Updates LRU cache of term embeddings and save the evicted
     * embedding back to database
     *
     * @param int $term_id
     * @param string $term_embedding
     * @param int $item_type
     * @param string $string
     */
    public function updateTermEmbeddingCache($term_id, $term_embedding,
        $item_type, $message = "")
    {
        $db = $this->db;
        $evicted_item = $this->lru_cache->put($term_id, $term_embedding);
        if (isset($evicted_item)) {
            $on_conflict = in_array($db->to_upper_dbms, ["MYSQL"]) ?
                " ON DUPLICATE KEY " :
                " ON CONFLICT (ITEM_TYPE, ID) DO UPDATE ";
            $sql = "INSERT INTO RECOMMENDATION_TERM_EMBEDDING VALUES ".
                "(?, ?, ?) $on_conflict SET VECTOR = ?";
            $vector = base64_encode($evicted_item[1]);
            $db->pre_message = "$message {$evicted_item[0]} was evicted";
            $db->execute($sql, [$evicted_item[0], $item_type,
                $vector, $vector]);
        }
    }
    /**
     * Writes back the term embeddings in cache to database and free up memory
     *
     * @param int $item_type value for ITEM_TYPE column
     */
    public function saveTermEmbeddingsCacheToDb($item_type)
    {
        L\crawlLog("Doing final persistence flush of LRU cache for ".
            "$item_type");
        $db = $this->db;
        $base_delete_sql = "DELETE FROM RECOMMENDATION_TERM_EMBEDDING" .
            " WHERE ITEM_TYPE = ? AND ID IN (";
        $delete_sql = $base_delete_sql;
        $base_insert_sql = "INSERT INTO RECOMMENDATION_TERM_EMBEDDING VALUES ";
        $insert_sql = $base_insert_sql;
        $comma = "";
        $insert_count = 0;
        $total_insert = 0;
        foreach ($this->lru_cache->getAll() as $id => $embedding) {
            L\crawlTimeoutLog("Have inserted $total_insert many embeddings");
            $embedding = base64_encode($embedding);
            $insert_sql .= "$comma('$id', $item_type, '$embedding')";
            $delete_sql .= "$comma '$id'";
            $comma = ",";
            $insert_count++;
            $total_insert++;
            if ($insert_count == C\BATCH_SQL_INSERT_NUM) {
                $delete_sql .= ")";
                $db->execute($delete_sql, [$item_type]);
                $insert_sql = $db->insertIgnore($insert_sql);
                $db->execute($insert_sql);
                $insert_count = 0;
                $comma = "";
                $delete_sql = $base_delete_sql;
                $insert_sql = $base_insert_sql;
            }
        }
        if ($insert_count > 0) {
            $delete_sql .= ")";
            $db->execute($delete_sql, [$item_type]);
            $insert_sql = $db->insertIgnore($insert_sql);
            $db->execute($insert_sql);
        }
        unset($this->lru_cache);
    }
}
ViewGit