Add resource limit and timeout logs in Recommendation job, r=chris

Parth Patel [2022-12-12 06:Dec:th]
Add resource limit and timeout logs in Recommendation job, r=chris

Signed-off-by: Chris Pollett <chris@pollett.org>
Filename
src/library/media_jobs/RecommendationJob.php
diff --git a/src/library/media_jobs/RecommendationJob.php b/src/library/media_jobs/RecommendationJob.php
index 9259407bc..3c006f4f2 100644
--- a/src/library/media_jobs/RecommendationJob.php
+++ b/src/library/media_jobs/RecommendationJob.php
@@ -114,7 +114,11 @@ class RecommendationJob extends MediaJob
     /**
      * MAX term embeddings fetched from database to initialize LRUCache
      */
-    const MAX_TERM_EMBEDDINGS = 1000;
+    const MAX_TERM_EMBEDDINGS = 500;
+    /**
+     * Maximum number of resources used in making resource recommendations
+     */
+    const MAX_RESOURCES = 200;
     /**
      * Sets up the database connection so can access tables related
      * to recommendations. Initialize timing info related to job.
@@ -407,7 +411,9 @@ class RecommendationJob extends MediaJob
     {
         $db = $this->db;
         $updated_item_embeddings = [];
+        $item_count = 0;
         foreach ($item_terms as $item_id => [$terms, $group_id]) {
+            L\crawlTimeoutLog("Have done $item_count many group items");
             $item_embedding = array_fill(1, self::EMBEDDING_VECTOR_SIZE, 0);
             foreach ($terms as [$term_id, $term]) {
                 $term_embedding = $this->getTermEmbedding($term_id,
@@ -419,6 +425,7 @@ class RecommendationJob extends MediaJob
             $item_embedding = LinearAlgebra::normalize($item_embedding);
             $item_embedding = pack("E*", ...$item_embedding);
             $updated_item_embeddings[$item_id] = [$item_embedding, $group_id];
+            $item_count++;
         }
         $base_delete_sql = "DELETE FROM RECOMMENDATION_ITEM_EMBEDDING" .
             " WHERE ITEM_TYPE = ? AND ID IN (";
@@ -427,15 +434,18 @@ class RecommendationJob extends MediaJob
         $insert_sql = $base_insert_sql;
         $comma = "";
         $insert_count = 0;
+        $total_insert = 0;
         $item_type = C\THREAD_RECOMMENDATION;
         foreach ($updated_item_embeddings as
             $item_id => [$embedding, $parent_id]) {
+            L\crawlTimeoutLog("Have inserted $total_insert many group items");
             $embedding = base64_encode($embedding);
             $insert_sql .= "$comma($item_id, $item_type, " .
                 "'$embedding', $parent_id)";
             $delete_sql .= "$comma $item_id";
             $comma = ",";
             $insert_count++;
+            $total_insert++;
             if ($insert_count == self::BATCH_SQL_INSERT_NUM) {
                 $delete_sql .= ")";
                 $db->execute($delete_sql, [C\THREAD_RECOMMENDATION]);
@@ -480,7 +490,9 @@ class RecommendationJob extends MediaJob
                 C\MOST_RECENT_VIEW]);
         $item_user_embeddings = [];
         $user_items = [];
+        $user_count = 0;
         while ($row = $db->fetchArray($results)) {
+            L\crawlTimeoutLog("Have done $user_count many user embeddings");
             $user_id = $row['USER_ID'];
             $item_ids = explode(",", $row['ITEM_IDS']);
             $item_ids = array_unique($item_ids);
@@ -500,6 +512,7 @@ class RecommendationJob extends MediaJob
                 $item_user_embeddings[$user_id]);
             $item_user_embeddings[$user_id] = pack("E*",
                 ...$item_user_embeddings[$user_id]);
+            $user_count++;
         }
         return [$item_user_embeddings, $user_items];
     }
@@ -531,11 +544,15 @@ class RecommendationJob extends MediaJob
             $user_groups[$user_id] = $group_ids;
         }
         $item_user_recommendations = [];
+        $user_count = 0;
         foreach ($item_user_embeddings as $user_id => $embedding) {
+            L\crawlTimeoutLog("Have done $user_count many user recommendation");
             $embedding = unpack("E*", $embedding);
             if (array_key_exists($user_id, $user_groups)) {
+                $item_count = 0;
                 foreach ($item_embeddings as
                     $item_id => [$item_embedding, $parent_id]) {
+                    L\crawlTimeoutLog("Have done $item_count many items");
                     if (in_array($item_id, $user_items[$user_id]) ||
                         !in_array($parent_id, $user_groups[$user_id])) {
                         continue;
@@ -545,8 +562,10 @@ class RecommendationJob extends MediaJob
                         $item_embedding, $embedding);
                     $item_user_recommendations[] = [$user_id,
                         $item_id, $similarity];
+                    $item_count++;
                 }
             }
+            $user_count++;
         }
         $delete_sql = "DELETE FROM GROUP_ITEM_RECOMMENDATION WHERE" .
             " ITEM_TYPE = ?";
@@ -555,13 +574,16 @@ class RecommendationJob extends MediaJob
         $insert_sql = $base_insert_sql;
         $comma = "";
         $insert_count = 0;
+        $total_insert = 0;
         $item_type = C\THREAD_RECOMMENDATION;
         foreach ($item_user_recommendations as $recommendation) {
             [$user_id, $item_id, $similarity] = $recommendation;
+            L\crawlTimeoutLog("Have inserted $total_insert recommendations");
             $insert_sql .= "$comma($user_id, $item_id" .
                 ", $item_type, $similarity, {$this->update_time})";
             $comma = ",";
             $insert_count++;
+            $total_insert++;
             if ($insert_count == self::BATCH_SQL_INSERT_NUM) {
                 $insert_sql = $db->insertIgnore($insert_sql);
                 $db->execute($insert_sql);
@@ -588,7 +610,9 @@ class RecommendationJob extends MediaJob
     {
         $db = $this->db;
         $updated_group_embeddings = [];
+        $group_count = 0;
         foreach ($item_embeddings as $item_id => [$embedding, $parent_id]) {
+            L\crawlTimeoutLog("Have done $group_count many groups");
             if (array_key_exists($parent_id, $updated_group_embeddings)) {
                 $embedding = unpack("E*", $embedding);
                 $group_embedding = unpack("E*",
@@ -598,6 +622,7 @@ class RecommendationJob extends MediaJob
             } else {
                 $updated_group_embeddings[$parent_id] = $embedding;
             }
+            $group_count++;
         }
         foreach ($updated_group_embeddings as $group_id => $embedding) {
             $embedding = unpack("E*", $embedding);
@@ -612,14 +637,17 @@ class RecommendationJob extends MediaJob
         $insert_sql = $base_insert_sql;
         $comma = "";
         $insert_count = 0;
+        $total_insert = 0;
         $item_type = C\GROUP_RECOMMENDATION;
         foreach ($updated_group_embeddings as $group_id => $embedding) {
+            L\crawlTimeoutLog("Have inserted $total_insert group embeddings");
             $embedding = serialize(unpack("E*", $embedding));
             $insert_sql .= "$comma($group_id, $item_type, " .
                 "'$embedding', $group_id)";
             $delete_sql .= "$comma $group_id";
             $comma = ",";
             $insert_count++;
+            $total_insert++;
             if ($insert_count == self::BATCH_SQL_INSERT_NUM) {
                 $delete_sql .= ")";
                 $db->execute($delete_sql, [C\GROUP_RECOMMENDATION]);
@@ -665,14 +693,18 @@ class RecommendationJob extends MediaJob
                 C\MOST_RECENT_VIEW]);
         $group_user_embeddings = [];
         $user_groups = [];
+        $user_count = 0;
         while ($row = $db->fetchArray($results)) {
+            L\crawlTimeoutLog("Have done $user_count many user embeddings");
             $user_id = $row['USER_ID'];
             $group_ids = explode(",", $row['ITEM_IDS']);
             $group_ids = array_unique($group_ids);
             $group_user_embeddings[$user_id] = array_fill(1,
                 self::EMBEDDING_VECTOR_SIZE, 0);
             $user_groups[$user_id] = [];
+            $group_count = 0;
             foreach ($group_ids as $group_id) {
+                L\crawlTimeoutLog("Have done $group_count many groups");
                 if (array_key_exists($group_id, $group_embeddings)) {
                     $embedding = unpack("E*",
                         $group_embeddings[$group_id]);
@@ -680,9 +712,11 @@ class RecommendationJob extends MediaJob
                         $group_user_embeddings[$user_id], $embedding);
                     $user_groups[$user_id][] = $group_id;
                 }
+                $group_count++;
             }
             $group_user_embeddings[$user_id] = pack("E*",
                 ...LinearAlgebra::normalize($group_user_embeddings[$user_id]));
+            $user_count++;
         }
         return [$group_user_embeddings, $user_groups];
     }
@@ -709,9 +743,13 @@ class RecommendationJob extends MediaJob
             $exclude_group_ids[] = $row['GROUP_ID'];
         }
         $group_user_recommendations = [];
+        $user_count = 0;
         foreach ($group_user_embeddings as $user_id => $embedding) {
+            L\crawlTimeoutLog("Have done $user_count many user");
             $embedding = unpack("E*", $embedding);
+            $group_count = 0;
             foreach ($group_embeddings as $group_id => $group_embedding) {
+                L\crawlTimeoutLog("Have done $group_count many groups");
                 if (in_array($group_id, $exclude_group_ids) ||
                     in_array($group_id, $user_groups[$user_id]) ||
                     in_array($group_id, $user_group_impression[$user_id])) {
@@ -722,7 +760,9 @@ class RecommendationJob extends MediaJob
                     $group_embedding);
                 $group_user_recommendations[] = [$user_id, $group_id,
                     $similarity];
+                $group_count++;
             }
+            $user_count++;
         }
         $delete_sql = "DELETE FROM GROUP_ITEM_RECOMMENDATION WHERE" .
             " ITEM_TYPE = ?";
@@ -731,13 +771,16 @@ class RecommendationJob extends MediaJob
         $insert_sql = $base_insert_sql;
         $comma = "";
         $insert_count = 0;
+        $total_insert = 0;
         $item_type = C\GROUP_RECOMMENDATION;
         foreach ($group_user_recommendations as $recommendation) {
+            L\crawlTimeoutLog("Have inserted $total_insert recommendations");
             [$user_id, $group_id, $similarity] = $recommendation;
             $insert_sql .= "$comma($user_id, $group_id" .
                 ", $item_type, $similarity, {$this->update_time})";
             $comma = ",";
             $insert_count++;
+            $total_insert++;
             if ($insert_count == self::BATCH_SQL_INSERT_NUM) {
                 $insert_sql = $db->insertIgnore($insert_sql);
                 $db->execute($insert_sql);
@@ -804,9 +847,11 @@ class RecommendationJob extends MediaJob
                 file_get_contents(self::RECOMMENDATION_FILE));
         }
         $thumb_folders = array_unique($thumb_folders);
+        $thumb_folders_copy = $thumb_folders;
         $descriptions = [];
         $resource_metadata = [];
         foreach ($thumb_folders as $thumb_folder) {
+            array_shift($thumb_folders_copy);
             if (empty($thumb_folder)) {
                 continue;
             }
@@ -828,6 +873,12 @@ class RecommendationJob extends MediaJob
                 $descriptions[$resource_id] = $description;
                 $resource_metadata[$resource_id] = [$group_id,
                     $page_id, $resource_file];
+                if (count($descriptions) >= self::MAX_RESOURCES) {
+                    L\crawlLog("Reached max resources limit");
+                    file_put_contents(self::RECOMMENDATION_FILE,
+                        implode(PHP_EOL, $thumb_folders_copy));
+                    return [$descriptions, $resource_metadata];
+                }
             }
         }
         return [$descriptions, $resource_metadata];
@@ -894,7 +945,9 @@ class RecommendationJob extends MediaJob
             $carry += $difference * $difference;
         }
         $std_deviation = sqrt($carry / self::CONTEXT_WINDOW_LENGTH);
+        $resource_count = 0;
         foreach ($descriptions as $resource_id => $description) {
+            L\crawlTimeoutLog("Have processed $resource_count many resources");
             $resource_terms[$resource_id] = [];
             $meta_details_terms[$resource_id] = [];
             $description_parts = explode("\n", $description);
@@ -911,7 +964,9 @@ class RecommendationJob extends MediaJob
             }
             if (count($resource_terms[$resource_id]) > 0) {
                 $terms = $resource_terms[$resource_id];
-                for ($i = 0; $i < count($terms); $i++) {
+                $num_terms = count($terms);
+                for ($i = 0; $i < $num_terms; $i++) {
+                    L\crawlTimeoutLog("Have processed $i of $num_terms terms");
                     [$term_id, $term] = $terms[$i];
                     $term_hash = $term_id % self::EMBEDDING_VECTOR_SIZE + 1;
                     $term_sign_hash = hash(self::SIGN_HASH_ALGORITHM,
@@ -948,6 +1003,7 @@ class RecommendationJob extends MediaJob
                     $this->updateTermEmbeddingCache($term_id, $term_embedding,
                         C\RESOURCE_RECOMMENDATION);
                 }
+                $resource_count++;
             }
         }
         return [$resource_terms, $meta_details_terms];
@@ -1006,7 +1062,9 @@ class RecommendationJob extends MediaJob
     {
         $db = $this->db;
         $updated_item_embeddings = [];
+        $resource_count = 0;
         foreach ($resource_terms as $resource_id => $terms) {
+            L\crawlTimeoutLog("Have processed $resource_count many resources");
             $item_embedding = array_fill(1, self::EMBEDDING_VECTOR_SIZE, 0);
             foreach ($terms as [$term_id, $term]) {
                 $term_embedding = $this->getTermEmbedding($term_id,
@@ -1017,6 +1075,7 @@ class RecommendationJob extends MediaJob
             }
             $updated_item_embeddings[$resource_id] = pack("E*",
                 ...$item_embedding);
+            $resource_count++;
         }
         foreach ($meta_details_terms as $resource_id => $meta_terms) {
             if (!array_key_exists($resource_id, $updated_item_embeddings)) {
@@ -1049,13 +1108,16 @@ class RecommendationJob extends MediaJob
         $insert_sql = $base_insert_sql;
         $comma = "";
         $insert_count = 0;
+        $total_insert = 0;
         $item_type = C\RESOURCE_RECOMMENDATION;
         foreach ($updated_item_embeddings as $resource_id => $embedding) {
+            L\crawlTimeoutLog("Have inserted $total_insert many resources");
             $embedding = base64_encode($embedding);
             $insert_sql .= "$comma($resource_id, $item_type," .
                 " '$embedding', $resource_id)";
             $comma = ",";
             $insert_count++;
+            $total_insert++;
             if ($insert_count == self::BATCH_SQL_INSERT_NUM) {
                 $insert_sql = $db->insertIgnore($insert_sql);
                 $db->execute($insert_sql);
@@ -1096,7 +1158,9 @@ class RecommendationJob extends MediaJob
                 C\MOST_RECENT_VIEW]);
         $user_embeddings = [];
         $user_items = [];
+        $user_count = 0;
         while ($row = $db->fetchArray($results)) {
+            L\crawlTimeoutLog("Have processed $user_count many users");
             $user_id = $row['USER_ID'];
             $item_ids = explode(",", $row['ITEM_IDS']);
             $item_ids = array_unique($item_ids);
@@ -1114,6 +1178,7 @@ class RecommendationJob extends MediaJob
             }
             $user_embeddings[$user_id] = pack("E*",
                 ...LinearAlgebra::normalize($user_embeddings[$user_id]));
+            $user_count++;
         }
         return [$user_embeddings, $user_items];
     }
@@ -1131,9 +1196,13 @@ class RecommendationJob extends MediaJob
     {
         $db = $this->db;
         $recommendations = [];
+        $user_count = 0;
         foreach ($user_embeddings as $user_id => $user_embedding) {
+            L\crawlTimeoutLog("Have processed $user_count many users");
             $user_embedding = unpack("E*", $user_embedding);
+            $resource_count = 0;
             foreach ($item_embeddings as $item_id => $item_embedding) {
+                L\crawlTimeoutLog("Have processed $resource_count resources");
                 if (in_array($item_id, $user_items[$user_id]) ||
                     !array_key_exists($item_id, $resource_metadata)) {
                     continue;
@@ -1146,7 +1215,9 @@ class RecommendationJob extends MediaJob
                 unset($resource_metadata[$item_id]);
                 $recommendations[] = [$user_id, $group_id, $page_id,
                     $resource_path, $similarity, $item_id];
+                $resource_count++;
             }
+            $user_count++;
         }
         $delete_sql = "DELETE FROM GROUP_RESOURCE_RECOMMENDATION";
         $db->execute($delete_sql);
@@ -1155,7 +1226,9 @@ class RecommendationJob extends MediaJob
         $insert_sql = $base_insert_sql;
         $comma = "";
         $insert_count = 0;
+        $total_insert = 0;
         foreach ($recommendations as $recommendation) {
+            L\crawlTimeoutLog("Have inserted $total_insert recommendations");
             list($user_id, $group_id, $page_id, $resource_path,
                 $score, $item_id) = $recommendation;
             $time = $this->update_time;
@@ -1163,6 +1236,7 @@ class RecommendationJob extends MediaJob
                 "\"$resource_path\", $score, $time, $item_id)";
             $comma = ",";
             $insert_count++;
+            $total_insert++;
             if ($insert_count == self::BATCH_SQL_INSERT_NUM) {
                 $insert_sql = $db->insertIgnore($insert_sql);
                 $db->execute($insert_sql);
@@ -1243,12 +1317,15 @@ class RecommendationJob extends MediaJob
         $insert_sql = $base_insert_sql;
         $comma = "";
         $insert_count = 0;
+        $total_insert = 0;
         foreach ($this->lru_cache->getAll() as $id => $embedding) {
+            L\crawlTimeoutLog("Have inserted $total_insert many embeddings");
             $embedding = base64_encode($embedding);
             $insert_sql .= "$comma($id, $item_type, '$embedding')";
             $delete_sql .= "$comma $id";
             $comma = ",";
             $insert_count++;
+            $total_insert++;
             if ($insert_count == self::BATCH_SQL_INSERT_NUM) {
                 $delete_sql .= ")";
                 $db->execute($delete_sql, [$item_type]);
ViewGit