Rewriting computeItemTermEmbeddings to batch rows, a=chris

Chris Pollett [2022-12-15 23:Dec:th]
Rewriting computeItemTermEmbeddings to batch rows, a=chris
Filename
src/library/media_jobs/RecommendationJob.php
diff --git a/src/library/media_jobs/RecommendationJob.php b/src/library/media_jobs/RecommendationJob.php
index 5d62cad8d..0c6263c6a 100644
--- a/src/library/media_jobs/RecommendationJob.php
+++ b/src/library/media_jobs/RecommendationJob.php
@@ -111,9 +111,10 @@ class RecommendationJob extends MediaJob
      */
     const MAX_TERM_EMBEDDINGS = 500;
     /**
-     * Maximum number of resources used in making resource recommendations
+     * Maximum number of resources used in making resource recommendations/
+     * Maximum number of group items to hold in memory in one go
      */
-    const MAX_RESOURCES = 200;
+    const MAX_BATCH_SIZE = 200;
     /**
      * Sets up the database connection so can access tables related
      * to recommendations. Initialize timing info related to job.
@@ -347,55 +348,81 @@ class RecommendationJob extends MediaJob
             $carry += $difference * $difference;
         }
         $std_deviation = sqrt($carry / self::CONTEXT_WINDOW_LENGTH);
-        $group_item_sql = "SELECT * FROM GROUP_ITEM WHERE ID = PARENT_ID" .
-            " AND TITLE NOT LIKE '%Page%' ORDER BY EDIT_DATE DESC " .
+        $item_count_sql = "SELECT COUNT(*) AS NUM_ITEMS FROM GROUP_ITEM ".
+            "WHERE ID = PARENT_ID AND TITLE NOT LIKE '%Page%'" .
             $db->limitOffset(self::MAX_GROUP_ITEMS);
-        $results = $db->execute($group_item_sql);
+        $results = $db->execute($item_count_sql);
+        $num_items = 0;
+        if ($results) {
+            $row = $db->fetchArray($results);
+            $num_items = $row['NUM_ITEMS'] ?? 0;
+        }
+        $num_batches = ceil($num_items/self::MAX_BATCH_SIZE);
+        L\crawlLog("Number of group items will consider: " . $num_items);
         $item_count = 0;
-        while ($row = $db->fetchArray($results)) {
+        for ($item_batch = 0; $item_batch < $num_batches; $item_batch++) {
+            $group_item_sql = "SELECT * FROM GROUP_ITEM WHERE ID = PARENT_ID" .
+                " AND TITLE NOT LIKE '%Page%' ORDER BY EDIT_DATE DESC " .
+                $db->limitOffset($item_batch * self::MAX_BATCH_SIZE,
+                self::MAX_BATCH_SIZE);
             L\crawlTimeoutLog("Have processed $item_count many group items");
-            $item_id = $row['ID'];
-            $text_corpus = $row['TITLE'] . " " . $row['DESCRIPTION'];
-            $text_corpus = mb_strtolower($text_corpus);
-            $terms = $this->cleanRemoveStopWords($text_corpus);
-            $item_terms[$item_id] = [$terms, $row['GROUP_ID']];
-            $num_terms = count($terms);
-            for ($i = 0; $i < $num_terms; $i++) {
-                L\crawlTimeoutLog("Have processed $i of $num_terms terms");
-                [$term_id, $term] = $terms[$i];
-                $term_hash = unpack('N', hash(self::HASH_ALGORITHM, $term, true)
-                    )[1] % C\EMBEDDING_VECTOR_SIZE + 1;
-                $term_sign_hash = hash(self::SIGN_HASH_ALGORITHM, $term, true);
-                $term_sign = unpack('N', $term_sign_hash)[1] % 2 == 0 ? -1 : 1;
-                $term_embedding = $this->getTermEmbedding($term_id,
-                    C\THREAD_RECOMMENDATION);
-                $term_embedding = unpack("E*", $term_embedding);
-                for ($j = $i - 1; $j >= 0 &&
-                    $j >= $i - self::CONTEXT_WINDOW_LENGTH; $j--) {
-                    [$context_term_id, $context_term] = $terms[$j];
-                    $context_term_embedding = $this->getTermEmbedding(
-                        $context_term_id, C\THREAD_RECOMMENDATION);
-                    $context_term_embedding = unpack("E*",
-                        $context_term_embedding);
-                    $weight = exp(-1 * pow(($i - $j) / $std_deviation, 2));
-                    $context_term_hash = unpack('N', hash(self::HASH_ALGORITHM,
-                        $context_term, true))[1] % C\EMBEDDING_VECTOR_SIZE + 1;
-                    $context_term_sign_hash = hash(self::SIGN_HASH_ALGORITHM,
-                        $context_term, true);
-                    $context_term_sign = unpack('N', $context_term_sign_hash)[1]
-                        % 2 == 0 ? -1 : 1;
-                    $term_embedding[$context_term_hash] +=
-                        $context_term_sign * $weight;
-                    $context_term_embedding[$term_hash] += $term_sign * $weight;
-                    $context_term_embedding = pack("E*",
-                        ...$context_term_embedding);
-                    $this->updateTermEmbeddingCache($context_term_id,
-                        $context_term_embedding, C\THREAD_RECOMMENDATION,
-                        "thread_context_term_update");
+            $results = $db->execute($group_item_sql);
+            $batch_items = [];
+            while ($row = $db->fetchArray($results)) {
+                $item_id = $row['ID'];
+                $text_corpus = $row['TITLE'] . " " . $row['DESCRIPTION'];
+                $text_corpus = mb_strtolower($text_corpus);
+                $terms = $this->cleanRemoveStopWords($text_corpus);
+                $batch_items[$item_id] = [$terms, $row['GROUP_ID']];
+            }
+            foreach ($batch_items as $item_id => $term_data) {
+                $item_terms[$item_id] = $term_data;
+                $terms = $term_data[0];
+                $num_terms = count($terms);
+                for ($i = 0; $i < $num_terms; $i++) {
+                    L\crawlTimeoutLog("Have processed $i of $num_terms terms");
+                    [$term_id, $term] = $terms[$i];
+                    $term_hash = unpack('N',
+                        hash(self::HASH_ALGORITHM, $term, true))[1] %
+                        C\EMBEDDING_VECTOR_SIZE + 1;
+                    $term_sign_hash =
+                        hash(self::SIGN_HASH_ALGORITHM, $term, true);
+                    $term_sign =
+                        unpack('N', $term_sign_hash)[1] % 2 == 0 ? -1 : 1;
+                    $term_embedding = $this->getTermEmbedding($term_id,
+                        C\THREAD_RECOMMENDATION);
+                    $term_embedding = unpack("E*", $term_embedding);
+                    for ($j = $i - 1; $j >= 0 &&
+                        $j >= $i - self::CONTEXT_WINDOW_LENGTH; $j--) {
+                        [$context_term_id, $context_term] = $terms[$j];
+                        $context_term_embedding = $this->getTermEmbedding(
+                            $context_term_id, C\THREAD_RECOMMENDATION);
+                        $context_term_embedding = unpack("E*",
+                            $context_term_embedding);
+                        $weight = exp(-1 * pow(($i - $j) / $std_deviation, 2));
+                        $context_term_hash =
+                            unpack('N', hash(self::HASH_ALGORITHM,
+                            $context_term, true))[1] %
+                            C\EMBEDDING_VECTOR_SIZE + 1;
+                        $context_term_sign_hash =
+                            hash(self::SIGN_HASH_ALGORITHM,$context_term, true);
+                        $context_term_sign =
+                            unpack('N', $context_term_sign_hash)[1] % 2 == 0 ?
+                            -1 : 1;
+                        $term_embedding[$context_term_hash] +=
+                            $context_term_sign * $weight;
+                        $context_term_embedding[$term_hash] +=
+                            $term_sign * $weight;
+                        $context_term_embedding = pack("E*",
+                            ...$context_term_embedding);
+                        $this->updateTermEmbeddingCache($context_term_id,
+                            $context_term_embedding, C\THREAD_RECOMMENDATION,
+                            "thread_context_term_update");
+                    }
+                    $term_embedding = pack("E*", ...$term_embedding);
+                    $this->updateTermEmbeddingCache($term_id, $term_embedding,
+                        C\THREAD_RECOMMENDATION, "thread_term_update");
                 }
-                $term_embedding = pack("E*", ...$term_embedding);
-                $this->updateTermEmbeddingCache($term_id, $term_embedding,
-                    C\THREAD_RECOMMENDATION, "thread_term_update");
             }
             $item_count++;
         }
@@ -882,7 +909,7 @@ class RecommendationJob extends MediaJob
                 $descriptions[$resource_id] = $description;
                 $resource_metadata[$resource_id] = [$group_id,
                     $page_id, $resource_file];
-                if (count($descriptions) >= self::MAX_RESOURCES) {
+                if (count($descriptions) >= self::MAX_BATCH_SIZE) {
                     L\crawlLog("Reached max resources limit");
                     file_put_contents(self::RECOMMENDATION_FILE,
                         implode(PHP_EOL, $thumb_folders_copy));
@@ -1282,7 +1309,6 @@ class RecommendationJob extends MediaJob
         $db = $this->db;
         $term_embedding = $this->lru_cache->get($term_id);
         if (!isset($term_embedding)) {
-            $db->beginTransaction();
             $sql = "SELECT VECTOR FROM RECOMMENDATION_TERM_EMBEDDING " .
                 "WHERE ITEM_TYPE = ? AND ID = ? " . $db->limitOffset(1);
             $result = $db->execute($sql, [$item_type, $term_id]);
@@ -1297,11 +1323,10 @@ class RecommendationJob extends MediaJob
                 $db->closeCursor($result);
                 $term_embedding = base64_decode($row['VECTOR'], true);
             }
-            $db->commit();
         }
         if ($update) {
             $this->updateTermEmbeddingCache($term_id, $term_embedding,
-                $item_type, "getTermEmbedding");
+                $item_type, "get_term_embedding");
         }
         return $term_embedding;
     }
@@ -1323,14 +1348,12 @@ class RecommendationJob extends MediaJob
             $on_conflict = in_array($db->to_upper_dbms, ["MYSQL"]) ?
                 " ON DUPLICATE KEY " :
                 " ON CONFLICT (ITEM_TYPE, ID) DO UPDATE ";
-            $db->beginTransaction();
             $sql = "INSERT INTO RECOMMENDATION_TERM_EMBEDDING VALUES ".
                 "(?, ?, ?) $on_conflict SET VECTOR = ?";
             $vector = base64_encode($evicted_item[1]);
             $db->pre_message = "$message {$evicted_item[0]} was evicted";
             $db->execute($sql, [$evicted_item[0], $item_type,
                 $vector, $vector]);
-            $db->commit();
         }
     }
     /**
ViewGit