Fix out of memory issue caused by Recommendation media job, r=chris

Parth Patel [2022-12-02 03:Dec:nd]
Fix out of memory issue caused by Recommendation media job, r=chris

Signed-off-by: Chris Pollett <chris@pollett.org>
Filename
src/library/media_jobs/RecommendationJob.php
diff --git a/src/library/media_jobs/RecommendationJob.php b/src/library/media_jobs/RecommendationJob.php
index 6f316e9b7..8ff295a92 100644
--- a/src/library/media_jobs/RecommendationJob.php
+++ b/src/library/media_jobs/RecommendationJob.php
@@ -395,14 +395,6 @@ class RecommendationJob extends MediaJob
     public function computeItemEmbeddings($term_embeddings, $item_terms)
     {
         $db = $this->db;
-        $sql = "SELECT * FROM RECOMMENDATION_ITEM_EMBEDDING" .
-            " WHERE ITEM_TYPE = ?";
-        $results = $db->execute($sql, [C\THREAD_RECOMMENDATION]);
-        $item_embeddings = [];
-        while ($row = $db->fetchArray($results)) {
-            $item_embeddings[$row['ID']] = [unserialize($row['VECTOR']),
-                $row['PARENT_ID']];
-        }
         $updated_item_embeddings = [];
         foreach ($item_terms as $item_id => [$terms, $group_id]) {
             $item_embedding = array_fill(0, self::EMBEDDING_VECTOR_SIZE, 0);
@@ -412,21 +404,12 @@ class RecommendationJob extends MediaJob
                         $term_embeddings[$term_id]);
                 }
             }
+            $item_embedding = LinearAlgebra::normalize($item_embedding);
             $updated_item_embeddings[$item_id] = [$item_embedding, $group_id];
-            if (array_key_exists($item_id, $item_embeddings)) {
-                unset($item_embeddings[$item_id]);
-            }
-        }
-        foreach ($item_embeddings as $item_id => [$embedding, $parent_id]) {
-            $updated_item_embeddings[$item_id] = [$embedding, $parent_id];
-        }
-        foreach ($updated_item_embeddings as $item_id => $embedding) {
-            $updated_item_embeddings[$item_id][0] = LinearAlgebra::normalize(
-                $updated_item_embeddings[$item_id][0]);
         }
-        $delete_sql = "DELETE FROM RECOMMENDATION_ITEM_EMBEDDING" .
-            " WHERE ITEM_TYPE = ?";
-        $db->execute($delete_sql, [C\THREAD_RECOMMENDATION]);
+        $base_delete_sql = "DELETE FROM RECOMMENDATION_ITEM_EMBEDDING" .
+            " WHERE ITEM_TYPE = ? AND ID IN (";
+        $delete_sql = $base_delete_sql;
         $base_insert_sql = "INSERT INTO RECOMMENDATION_ITEM_EMBEDDING VALUES ";
         $insert_sql = $base_insert_sql;
         $comma = "";
@@ -437,17 +420,23 @@ class RecommendationJob extends MediaJob
             $serialized_embedding = serialize($embedding);
             $insert_sql .= "$comma($item_id, $item_type," .
                 " '$serialized_embedding', $parent_id)";
+            $delete_sql .= "$comma $item_id";
             $comma = ",";
             $insert_count++;
             if ($insert_count == self::BATCH_SQL_INSERT_NUM) {
+                $delete_sql .= ")";
+                $db->execute($delete_sql, [C\THREAD_RECOMMENDATION]);
                 $insert_sql = $db->insertIgnore($insert_sql);
                 $db->execute($insert_sql);
                 $insert_count = 0;
                 $comma = "";
+                $delete_sql = $base_delete_sql;
                 $insert_sql = $base_insert_sql;
             }
         }
         if ($insert_count > 0) {
+            $delete_sql .= ")";
+            $db->execute($delete_sql, [C\THREAD_RECOMMENDATION]);
             $insert_sql = $db->insertIgnore($insert_sql);
             $db->execute($insert_sql);
         }
@@ -587,13 +576,6 @@ class RecommendationJob extends MediaJob
     public function computeGroupEmbeddings($item_embeddings)
     {
         $db = $this->db;
-        $sql = "SELECT * FROM RECOMMENDATION_ITEM_EMBEDDING" .
-            " WHERE ITEM_TYPE = ?";
-        $results = $db->execute($sql, [C\GROUP_RECOMMENDATION]);
-        $group_embeddings = [];
-        while ($row = $db->fetchArray($results)) {
-            $group_embeddings[$row['ID']] = unserialize($row['VECTOR']);
-        }
         $updated_group_embeddings = [];
         foreach ($item_embeddings as $item_id => [$embedding, $parent_id]) {
             if (array_key_exists($parent_id, $updated_group_embeddings)) {
@@ -605,17 +587,11 @@ class RecommendationJob extends MediaJob
         }
         foreach ($updated_group_embeddings as $group_id => $embedding) {
             $embedding = LinearAlgebra::normalize($embedding);
-            if (array_key_exists($group_id, $group_embeddings)) {
-                $embedding = LinearAlgebra::add($embedding,
-                    $group_embeddings[$group_id]);
-                $embedding = LinearAlgebra::normalize($embedding);
-                unset($group_embeddings[$group_id]);
-            }
-            $updated_group_embeddings[$group_id] = $embedding;
-        }
-        foreach ($group_embeddings as $group_id => $embedding) {
             $updated_group_embeddings[$group_id] = $embedding;
         }
+        $base_delete_sql = "DELETE FROM RECOMMENDATION_ITEM_EMBEDDING" .
+            " WHERE ITEM_TYPE = ? AND ID IN (";
+        $delete_sql = $base_delete_sql;
         $base_insert_sql = "INSERT INTO RECOMMENDATION_ITEM_EMBEDDING VALUES ";
         $insert_sql = $base_insert_sql;
         $comma = "";
@@ -625,17 +601,23 @@ class RecommendationJob extends MediaJob
             $serialized_embedding = serialize($embedding);
             $insert_sql .= "$comma($group_id, $item_type," .
                 " '$serialized_embedding', $group_id)";
+            $delete_sql .= "$comma $group_id";
             $comma = ",";
             $insert_count++;
             if ($insert_count == self::BATCH_SQL_INSERT_NUM) {
+                $delete_sql .= ")";
+                $db->execute($delete_sql, [C\GROUP_RECOMMENDATION]);
                 $insert_sql = $db->insertIgnore($insert_sql);
                 $db->execute($insert_sql);
                 $insert_count = 0;
                 $comma = "";
+                $delete_sql = $base_delete_sql;
                 $insert_sql = $base_insert_sql;
             }
         }
         if ($insert_count > 0) {
+            $delete_sql .= ")";
+            $db->execute($delete_sql, [C\GROUP_RECOMMENDATION]);
             $insert_sql = $db->insertIgnore($insert_sql);
             $db->execute($insert_sql);
         }
@@ -764,14 +746,13 @@ class RecommendationJob extends MediaJob
             getWikiResourceDescriptions();
         L\crawlLog("...Finished fetching descriptions for the wiki page " .
             "resources...");
-        $item_embeddings = $this->getWikiResourceEmbeddings();
         L\crawlLog("...Start computing wiki term embeddings...");
         [$term_embeddings, $resource_terms, $meta_details_terms] =
-            $this->computeWikiTermEmbeddings($descriptions, $item_embeddings);
+            $this->computeWikiTermEmbeddings($descriptions);
         L\crawlLog("...Finished computing wiki term embeddings...");
         L\crawlLog("...Start computing wiki resource embeddings...");
         $item_embeddings = $this->computeWikiResourceEmbeddings($resource_terms,
-            $meta_details_terms, $term_embeddings, $item_embeddings);
+            $meta_details_terms, $term_embeddings);
         L\crawlLog("...Finished computing wiki resource embeddings...");
         L\crawlLog("...Start computing wiki user embeddings...");
         [$user_embeddings, $user_items] = $this->computeWikiUserEmbeddings(
@@ -852,36 +833,18 @@ class RecommendationJob extends MediaJob
         }
         return $file_paths;
     }
-    /**
-     * Retrieves wiki resources embeddings from the database
-     *
-     * @return array $item_embeddings embedding vector for resources
-     */
-    public function getWikiResourceEmbeddings()
-    {
-        $db = $this->db;
-        $sql = "SELECT * FROM RECOMMENDATION_ITEM_EMBEDDING WHERE" .
-            " ITEM_TYPE = ?";
-        $results = $db->execute($sql, [C\RESOURCE_RECOMMENDATION]);
-        $item_embeddings = [];
-        while ($row = $db->fetchArray($results)) {
-            $item_embeddings[$row['ID']] = unserialize($row['VECTOR']);
-        }
-        return $item_embeddings;
-    }
     /**
      * Computes the embedding for new terms in the description of wiki
      * resources and updates the embedding of existing terms using Hash2Vec
      * approach
      *
      * @param array $descriptions of resources
-     * @param array $item_embeddings embedding vector for resources
      * @return array [$term_embeddings, $resource_terms, $meta_details_term]
      * first with key being term id and value is the embedding vector for that
      * term, second with key being resource id and value being array of clean
      * terms in that resource description
      */
-    public function computeWikiTermEmbeddings($descriptions, $item_embeddings)
+    public function computeWikiTermEmbeddings($descriptions)
     {
         $db = $this->db;
         $select_sql = "SELECT * FROM RECOMMENDATION_TERM_EMBEDDING WHERE" .
@@ -917,9 +880,6 @@ class RecommendationJob extends MediaJob
                         $resource_terms[$resource_id]);
                 }
             }
-            if (array_key_exists($resource_id, $item_embeddings)) {
-                continue;
-            }
             if (count($resource_terms[$resource_id]) > 0) {
                 $terms = $resource_terms[$resource_id];
                 for ($i = 0; $i < count($terms); $i++) {
@@ -1018,12 +978,11 @@ class RecommendationJob extends MediaJob
      * @param array $resource_terms of processed terms from resource description
      * @param array $meta_details_terms of raw resource descriptions
      * @param array $term_embeddings of term embeddings
-     * @param array $item_embeddings of existing wiki resource embeddings
      * @return array $updated_item_embeddings array of updated wiki resource
      * embeddings
      */
     public function computeWikiResourceEmbeddings($resource_terms,
-        $meta_details_terms, $term_embeddings, $item_embeddings)
+        $meta_details_terms, $term_embeddings)
     {
         $db = $this->db;
         $updated_item_embeddings = [];
@@ -1036,12 +995,6 @@ class RecommendationJob extends MediaJob
                 }
             }
             $updated_item_embeddings[$resource_id] = $item_embedding;
-            if (array_key_exists($resource_id, $item_embeddings)) {
-                unset($item_embeddings[$resource_id]);
-            }
-        }
-        foreach ($item_embeddings as $resource_id => $embedding) {
-            $updated_item_embeddings[$resource_id] = $embedding;
         }
         foreach ($meta_details_terms as $resource_id => $meta_terms) {
             if (!array_key_exists($resource_id, $updated_item_embeddings)) {
ViewGit