diff --git a/src/library/media_jobs/RecommendationJob.php b/src/library/media_jobs/RecommendationJob.php index 5d62cad8d..0c6263c6a 100644 --- a/src/library/media_jobs/RecommendationJob.php +++ b/src/library/media_jobs/RecommendationJob.php @@ -111,9 +111,10 @@ class RecommendationJob extends MediaJob */ const MAX_TERM_EMBEDDINGS = 500; /** - * Maximum number of resources used in making resource recommendations + * Maximum number of resources used in making resource recommendations/ + * Maximum number of group items to hold in memory in one go */ - const MAX_RESOURCES = 200; + const MAX_BATCH_SIZE = 200; /** * Sets up the database connection so can access tables related * to recommendations. Initialize timing info related to job. @@ -347,55 +348,81 @@ class RecommendationJob extends MediaJob $carry += $difference * $difference; } $std_deviation = sqrt($carry / self::CONTEXT_WINDOW_LENGTH); - $group_item_sql = "SELECT * FROM GROUP_ITEM WHERE ID = PARENT_ID" . - " AND TITLE NOT LIKE '%Page%' ORDER BY EDIT_DATE DESC " . + $item_count_sql = "SELECT COUNT(*) AS NUM_ITEMS FROM GROUP_ITEM ". + "WHERE ID = PARENT_ID AND TITLE NOT LIKE '%Page%'" . $db->limitOffset(self::MAX_GROUP_ITEMS); - $results = $db->execute($group_item_sql); + $results = $db->execute($item_count_sql); + $num_items = 0; + if ($results) { + $row = $db->fetchArray($results); + $num_items = $row['NUM_ITEMS'] ?? 0; + } + $num_batches = ceil($num_items/self::MAX_BATCH_SIZE); + L\crawlLog("Number of group items will consider: " . $num_items); $item_count = 0; - while ($row = $db->fetchArray($results)) { + for ($item_batch = 0; $item_batch < $num_batches; $item_batch++) { + $group_item_sql = "SELECT * FROM GROUP_ITEM WHERE ID = PARENT_ID" . + " AND TITLE NOT LIKE '%Page%' ORDER BY EDIT_DATE DESC " . + $db->limitOffset($item_batch * self::MAX_BATCH_SIZE, + self::MAX_BATCH_SIZE); L\crawlTimeoutLog("Have processed $item_count many group items"); - $item_id = $row['ID']; - $text_corpus = $row['TITLE'] . " " . $row['DESCRIPTION']; - $text_corpus = mb_strtolower($text_corpus); - $terms = $this->cleanRemoveStopWords($text_corpus); - $item_terms[$item_id] = [$terms, $row['GROUP_ID']]; - $num_terms = count($terms); - for ($i = 0; $i < $num_terms; $i++) { - L\crawlTimeoutLog("Have processed $i of $num_terms terms"); - [$term_id, $term] = $terms[$i]; - $term_hash = unpack('N', hash(self::HASH_ALGORITHM, $term, true) - )[1] % C\EMBEDDING_VECTOR_SIZE + 1; - $term_sign_hash = hash(self::SIGN_HASH_ALGORITHM, $term, true); - $term_sign = unpack('N', $term_sign_hash)[1] % 2 == 0 ? -1 : 1; - $term_embedding = $this->getTermEmbedding($term_id, - C\THREAD_RECOMMENDATION); - $term_embedding = unpack("E*", $term_embedding); - for ($j = $i - 1; $j >= 0 && - $j >= $i - self::CONTEXT_WINDOW_LENGTH; $j--) { - [$context_term_id, $context_term] = $terms[$j]; - $context_term_embedding = $this->getTermEmbedding( - $context_term_id, C\THREAD_RECOMMENDATION); - $context_term_embedding = unpack("E*", - $context_term_embedding); - $weight = exp(-1 * pow(($i - $j) / $std_deviation, 2)); - $context_term_hash = unpack('N', hash(self::HASH_ALGORITHM, - $context_term, true))[1] % C\EMBEDDING_VECTOR_SIZE + 1; - $context_term_sign_hash = hash(self::SIGN_HASH_ALGORITHM, - $context_term, true); - $context_term_sign = unpack('N', $context_term_sign_hash)[1] - % 2 == 0 ? -1 : 1; - $term_embedding[$context_term_hash] += - $context_term_sign * $weight; - $context_term_embedding[$term_hash] += $term_sign * $weight; - $context_term_embedding = pack("E*", - ...$context_term_embedding); - $this->updateTermEmbeddingCache($context_term_id, - $context_term_embedding, C\THREAD_RECOMMENDATION, - "thread_context_term_update"); + $results = $db->execute($group_item_sql); + $batch_items = []; + while ($row = $db->fetchArray($results)) { + $item_id = $row['ID']; + $text_corpus = $row['TITLE'] . " " . $row['DESCRIPTION']; + $text_corpus = mb_strtolower($text_corpus); + $terms = $this->cleanRemoveStopWords($text_corpus); + $batch_items[$item_id] = [$terms, $row['GROUP_ID']]; + } + foreach ($batch_items as $item_id => $term_data) { + $item_terms[$item_id] = $term_data; + $terms = $term_data[0]; + $num_terms = count($terms); + for ($i = 0; $i < $num_terms; $i++) { + L\crawlTimeoutLog("Have processed $i of $num_terms terms"); + [$term_id, $term] = $terms[$i]; + $term_hash = unpack('N', + hash(self::HASH_ALGORITHM, $term, true))[1] % + C\EMBEDDING_VECTOR_SIZE + 1; + $term_sign_hash = + hash(self::SIGN_HASH_ALGORITHM, $term, true); + $term_sign = + unpack('N', $term_sign_hash)[1] % 2 == 0 ? -1 : 1; + $term_embedding = $this->getTermEmbedding($term_id, + C\THREAD_RECOMMENDATION); + $term_embedding = unpack("E*", $term_embedding); + for ($j = $i - 1; $j >= 0 && + $j >= $i - self::CONTEXT_WINDOW_LENGTH; $j--) { + [$context_term_id, $context_term] = $terms[$j]; + $context_term_embedding = $this->getTermEmbedding( + $context_term_id, C\THREAD_RECOMMENDATION); + $context_term_embedding = unpack("E*", + $context_term_embedding); + $weight = exp(-1 * pow(($i - $j) / $std_deviation, 2)); + $context_term_hash = + unpack('N', hash(self::HASH_ALGORITHM, + $context_term, true))[1] % + C\EMBEDDING_VECTOR_SIZE + 1; + $context_term_sign_hash = + hash(self::SIGN_HASH_ALGORITHM,$context_term, true); + $context_term_sign = + unpack('N', $context_term_sign_hash)[1] % 2 == 0 ? + -1 : 1; + $term_embedding[$context_term_hash] += + $context_term_sign * $weight; + $context_term_embedding[$term_hash] += + $term_sign * $weight; + $context_term_embedding = pack("E*", + ...$context_term_embedding); + $this->updateTermEmbeddingCache($context_term_id, + $context_term_embedding, C\THREAD_RECOMMENDATION, + "thread_context_term_update"); + } + $term_embedding = pack("E*", ...$term_embedding); + $this->updateTermEmbeddingCache($term_id, $term_embedding, + C\THREAD_RECOMMENDATION, "thread_term_update"); } - $term_embedding = pack("E*", ...$term_embedding); - $this->updateTermEmbeddingCache($term_id, $term_embedding, - C\THREAD_RECOMMENDATION, "thread_term_update"); } $item_count++; } @@ -882,7 +909,7 @@ class RecommendationJob extends MediaJob $descriptions[$resource_id] = $description; $resource_metadata[$resource_id] = [$group_id, $page_id, $resource_file]; - if (count($descriptions) >= self::MAX_RESOURCES) { + if (count($descriptions) >= self::MAX_BATCH_SIZE) { L\crawlLog("Reached max resources limit"); file_put_contents(self::RECOMMENDATION_FILE, implode(PHP_EOL, $thumb_folders_copy)); @@ -1282,7 +1309,6 @@ class RecommendationJob extends MediaJob $db = $this->db; $term_embedding = $this->lru_cache->get($term_id); if (!isset($term_embedding)) { - $db->beginTransaction(); $sql = "SELECT VECTOR FROM RECOMMENDATION_TERM_EMBEDDING " . "WHERE ITEM_TYPE = ? AND ID = ? " . $db->limitOffset(1); $result = $db->execute($sql, [$item_type, $term_id]); @@ -1297,11 +1323,10 @@ class RecommendationJob extends MediaJob $db->closeCursor($result); $term_embedding = base64_decode($row['VECTOR'], true); } - $db->commit(); } if ($update) { $this->updateTermEmbeddingCache($term_id, $term_embedding, - $item_type, "getTermEmbedding"); + $item_type, "get_term_embedding"); } return $term_embedding; } @@ -1323,14 +1348,12 @@ class RecommendationJob extends MediaJob $on_conflict = in_array($db->to_upper_dbms, ["MYSQL"]) ? " ON DUPLICATE KEY " : " ON CONFLICT (ITEM_TYPE, ID) DO UPDATE "; - $db->beginTransaction(); $sql = "INSERT INTO RECOMMENDATION_TERM_EMBEDDING VALUES ". "(?, ?, ?) $on_conflict SET VECTOR = ?"; $vector = base64_encode($evicted_item[1]); $db->pre_message = "$message {$evicted_item[0]} was evicted"; $db->execute($sql, [$evicted_item[0], $item_type, $vector, $vector]); - $db->commit(); } } /**