diff --git a/src/library/media_jobs/RecommendationJob.php b/src/library/media_jobs/RecommendationJob.php index 9259407bc..3c006f4f2 100644 --- a/src/library/media_jobs/RecommendationJob.php +++ b/src/library/media_jobs/RecommendationJob.php @@ -114,7 +114,11 @@ class RecommendationJob extends MediaJob /** * MAX term embeddings fetched from database to initialize LRUCache */ - const MAX_TERM_EMBEDDINGS = 1000; + const MAX_TERM_EMBEDDINGS = 500; + /** + * Maximum number of resources used in making resource recommendations + */ + const MAX_RESOURCES = 200; /** * Sets up the database connection so can access tables related * to recommendations. Initialize timing info related to job. @@ -407,7 +411,9 @@ class RecommendationJob extends MediaJob { $db = $this->db; $updated_item_embeddings = []; + $item_count = 0; foreach ($item_terms as $item_id => [$terms, $group_id]) { + L\crawlTimeoutLog("Have done $item_count many group items"); $item_embedding = array_fill(1, self::EMBEDDING_VECTOR_SIZE, 0); foreach ($terms as [$term_id, $term]) { $term_embedding = $this->getTermEmbedding($term_id, @@ -419,6 +425,7 @@ class RecommendationJob extends MediaJob $item_embedding = LinearAlgebra::normalize($item_embedding); $item_embedding = pack("E*", ...$item_embedding); $updated_item_embeddings[$item_id] = [$item_embedding, $group_id]; + $item_count++; } $base_delete_sql = "DELETE FROM RECOMMENDATION_ITEM_EMBEDDING" . " WHERE ITEM_TYPE = ? AND ID IN ("; @@ -427,15 +434,18 @@ class RecommendationJob extends MediaJob $insert_sql = $base_insert_sql; $comma = ""; $insert_count = 0; + $total_insert = 0; $item_type = C\THREAD_RECOMMENDATION; foreach ($updated_item_embeddings as $item_id => [$embedding, $parent_id]) { + L\crawlTimeoutLog("Have inserted $total_insert many group items"); $embedding = base64_encode($embedding); $insert_sql .= "$comma($item_id, $item_type, " . "'$embedding', $parent_id)"; $delete_sql .= "$comma $item_id"; $comma = ","; $insert_count++; + $total_insert++; if ($insert_count == self::BATCH_SQL_INSERT_NUM) { $delete_sql .= ")"; $db->execute($delete_sql, [C\THREAD_RECOMMENDATION]); @@ -480,7 +490,9 @@ class RecommendationJob extends MediaJob C\MOST_RECENT_VIEW]); $item_user_embeddings = []; $user_items = []; + $user_count = 0; while ($row = $db->fetchArray($results)) { + L\crawlTimeoutLog("Have done $user_count many user embeddings"); $user_id = $row['USER_ID']; $item_ids = explode(",", $row['ITEM_IDS']); $item_ids = array_unique($item_ids); @@ -500,6 +512,7 @@ class RecommendationJob extends MediaJob $item_user_embeddings[$user_id]); $item_user_embeddings[$user_id] = pack("E*", ...$item_user_embeddings[$user_id]); + $user_count++; } return [$item_user_embeddings, $user_items]; } @@ -531,11 +544,15 @@ class RecommendationJob extends MediaJob $user_groups[$user_id] = $group_ids; } $item_user_recommendations = []; + $user_count = 0; foreach ($item_user_embeddings as $user_id => $embedding) { + L\crawlTimeoutLog("Have done $user_count many user recommendation"); $embedding = unpack("E*", $embedding); if (array_key_exists($user_id, $user_groups)) { + $item_count = 0; foreach ($item_embeddings as $item_id => [$item_embedding, $parent_id]) { + L\crawlTimeoutLog("Have done $item_count many items"); if (in_array($item_id, $user_items[$user_id]) || !in_array($parent_id, $user_groups[$user_id])) { continue; @@ -545,8 +562,10 @@ class RecommendationJob extends MediaJob $item_embedding, $embedding); $item_user_recommendations[] = [$user_id, $item_id, $similarity]; + $item_count++; } } + $user_count++; } $delete_sql = "DELETE FROM GROUP_ITEM_RECOMMENDATION WHERE" . " ITEM_TYPE = ?"; @@ -555,13 +574,16 @@ class RecommendationJob extends MediaJob $insert_sql = $base_insert_sql; $comma = ""; $insert_count = 0; + $total_insert = 0; $item_type = C\THREAD_RECOMMENDATION; foreach ($item_user_recommendations as $recommendation) { [$user_id, $item_id, $similarity] = $recommendation; + L\crawlTimeoutLog("Have inserted $total_insert recommendations"); $insert_sql .= "$comma($user_id, $item_id" . ", $item_type, $similarity, {$this->update_time})"; $comma = ","; $insert_count++; + $total_insert++; if ($insert_count == self::BATCH_SQL_INSERT_NUM) { $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); @@ -588,7 +610,9 @@ class RecommendationJob extends MediaJob { $db = $this->db; $updated_group_embeddings = []; + $group_count = 0; foreach ($item_embeddings as $item_id => [$embedding, $parent_id]) { + L\crawlTimeoutLog("Have done $group_count many groups"); if (array_key_exists($parent_id, $updated_group_embeddings)) { $embedding = unpack("E*", $embedding); $group_embedding = unpack("E*", @@ -598,6 +622,7 @@ class RecommendationJob extends MediaJob } else { $updated_group_embeddings[$parent_id] = $embedding; } + $group_count++; } foreach ($updated_group_embeddings as $group_id => $embedding) { $embedding = unpack("E*", $embedding); @@ -612,14 +637,17 @@ class RecommendationJob extends MediaJob $insert_sql = $base_insert_sql; $comma = ""; $insert_count = 0; + $total_insert = 0; $item_type = C\GROUP_RECOMMENDATION; foreach ($updated_group_embeddings as $group_id => $embedding) { + L\crawlTimeoutLog("Have inserted $total_insert group embeddings"); $embedding = serialize(unpack("E*", $embedding)); $insert_sql .= "$comma($group_id, $item_type, " . "'$embedding', $group_id)"; $delete_sql .= "$comma $group_id"; $comma = ","; $insert_count++; + $total_insert++; if ($insert_count == self::BATCH_SQL_INSERT_NUM) { $delete_sql .= ")"; $db->execute($delete_sql, [C\GROUP_RECOMMENDATION]); @@ -665,14 +693,18 @@ class RecommendationJob extends MediaJob C\MOST_RECENT_VIEW]); $group_user_embeddings = []; $user_groups = []; + $user_count = 0; while ($row = $db->fetchArray($results)) { + L\crawlTimeoutLog("Have done $user_count many user embeddings"); $user_id = $row['USER_ID']; $group_ids = explode(",", $row['ITEM_IDS']); $group_ids = array_unique($group_ids); $group_user_embeddings[$user_id] = array_fill(1, self::EMBEDDING_VECTOR_SIZE, 0); $user_groups[$user_id] = []; + $group_count = 0; foreach ($group_ids as $group_id) { + L\crawlTimeoutLog("Have done $group_count many groups"); if (array_key_exists($group_id, $group_embeddings)) { $embedding = unpack("E*", $group_embeddings[$group_id]); @@ -680,9 +712,11 @@ class RecommendationJob extends MediaJob $group_user_embeddings[$user_id], $embedding); $user_groups[$user_id][] = $group_id; } + $group_count++; } $group_user_embeddings[$user_id] = pack("E*", ...LinearAlgebra::normalize($group_user_embeddings[$user_id])); + $user_count++; } return [$group_user_embeddings, $user_groups]; } @@ -709,9 +743,13 @@ class RecommendationJob extends MediaJob $exclude_group_ids[] = $row['GROUP_ID']; } $group_user_recommendations = []; + $user_count = 0; foreach ($group_user_embeddings as $user_id => $embedding) { + L\crawlTimeoutLog("Have done $user_count many user"); $embedding = unpack("E*", $embedding); + $group_count = 0; foreach ($group_embeddings as $group_id => $group_embedding) { + L\crawlTimeoutLog("Have done $group_count many groups"); if (in_array($group_id, $exclude_group_ids) || in_array($group_id, $user_groups[$user_id]) || in_array($group_id, $user_group_impression[$user_id])) { @@ -722,7 +760,9 @@ class RecommendationJob extends MediaJob $group_embedding); $group_user_recommendations[] = [$user_id, $group_id, $similarity]; + $group_count++; } + $user_count++; } $delete_sql = "DELETE FROM GROUP_ITEM_RECOMMENDATION WHERE" . " ITEM_TYPE = ?"; @@ -731,13 +771,16 @@ class RecommendationJob extends MediaJob $insert_sql = $base_insert_sql; $comma = ""; $insert_count = 0; + $total_insert = 0; $item_type = C\GROUP_RECOMMENDATION; foreach ($group_user_recommendations as $recommendation) { + L\crawlTimeoutLog("Have inserted $total_insert recommendations"); [$user_id, $group_id, $similarity] = $recommendation; $insert_sql .= "$comma($user_id, $group_id" . ", $item_type, $similarity, {$this->update_time})"; $comma = ","; $insert_count++; + $total_insert++; if ($insert_count == self::BATCH_SQL_INSERT_NUM) { $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); @@ -804,9 +847,11 @@ class RecommendationJob extends MediaJob file_get_contents(self::RECOMMENDATION_FILE)); } $thumb_folders = array_unique($thumb_folders); + $thumb_folders_copy = $thumb_folders; $descriptions = []; $resource_metadata = []; foreach ($thumb_folders as $thumb_folder) { + array_shift($thumb_folders_copy); if (empty($thumb_folder)) { continue; } @@ -828,6 +873,12 @@ class RecommendationJob extends MediaJob $descriptions[$resource_id] = $description; $resource_metadata[$resource_id] = [$group_id, $page_id, $resource_file]; + if (count($descriptions) >= self::MAX_RESOURCES) { + L\crawlLog("Reached max resources limit"); + file_put_contents(self::RECOMMENDATION_FILE, + implode(PHP_EOL, $thumb_folders_copy)); + return [$descriptions, $resource_metadata]; + } } } return [$descriptions, $resource_metadata]; @@ -894,7 +945,9 @@ class RecommendationJob extends MediaJob $carry += $difference * $difference; } $std_deviation = sqrt($carry / self::CONTEXT_WINDOW_LENGTH); + $resource_count = 0; foreach ($descriptions as $resource_id => $description) { + L\crawlTimeoutLog("Have processed $resource_count many resources"); $resource_terms[$resource_id] = []; $meta_details_terms[$resource_id] = []; $description_parts = explode("\n", $description); @@ -911,7 +964,9 @@ class RecommendationJob extends MediaJob } if (count($resource_terms[$resource_id]) > 0) { $terms = $resource_terms[$resource_id]; - for ($i = 0; $i < count($terms); $i++) { + $num_terms = count($terms); + for ($i = 0; $i < $num_terms; $i++) { + L\crawlTimeoutLog("Have processed $i of $num_terms terms"); [$term_id, $term] = $terms[$i]; $term_hash = $term_id % self::EMBEDDING_VECTOR_SIZE + 1; $term_sign_hash = hash(self::SIGN_HASH_ALGORITHM, @@ -948,6 +1003,7 @@ class RecommendationJob extends MediaJob $this->updateTermEmbeddingCache($term_id, $term_embedding, C\RESOURCE_RECOMMENDATION); } + $resource_count++; } } return [$resource_terms, $meta_details_terms]; @@ -1006,7 +1062,9 @@ class RecommendationJob extends MediaJob { $db = $this->db; $updated_item_embeddings = []; + $resource_count = 0; foreach ($resource_terms as $resource_id => $terms) { + L\crawlTimeoutLog("Have processed $resource_count many resources"); $item_embedding = array_fill(1, self::EMBEDDING_VECTOR_SIZE, 0); foreach ($terms as [$term_id, $term]) { $term_embedding = $this->getTermEmbedding($term_id, @@ -1017,6 +1075,7 @@ class RecommendationJob extends MediaJob } $updated_item_embeddings[$resource_id] = pack("E*", ...$item_embedding); + $resource_count++; } foreach ($meta_details_terms as $resource_id => $meta_terms) { if (!array_key_exists($resource_id, $updated_item_embeddings)) { @@ -1049,13 +1108,16 @@ class RecommendationJob extends MediaJob $insert_sql = $base_insert_sql; $comma = ""; $insert_count = 0; + $total_insert = 0; $item_type = C\RESOURCE_RECOMMENDATION; foreach ($updated_item_embeddings as $resource_id => $embedding) { + L\crawlTimeoutLog("Have inserted $total_insert many resources"); $embedding = base64_encode($embedding); $insert_sql .= "$comma($resource_id, $item_type," . " '$embedding', $resource_id)"; $comma = ","; $insert_count++; + $total_insert++; if ($insert_count == self::BATCH_SQL_INSERT_NUM) { $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); @@ -1096,7 +1158,9 @@ class RecommendationJob extends MediaJob C\MOST_RECENT_VIEW]); $user_embeddings = []; $user_items = []; + $user_count = 0; while ($row = $db->fetchArray($results)) { + L\crawlTimeoutLog("Have processed $user_count many users"); $user_id = $row['USER_ID']; $item_ids = explode(",", $row['ITEM_IDS']); $item_ids = array_unique($item_ids); @@ -1114,6 +1178,7 @@ class RecommendationJob extends MediaJob } $user_embeddings[$user_id] = pack("E*", ...LinearAlgebra::normalize($user_embeddings[$user_id])); + $user_count++; } return [$user_embeddings, $user_items]; } @@ -1131,9 +1196,13 @@ class RecommendationJob extends MediaJob { $db = $this->db; $recommendations = []; + $user_count = 0; foreach ($user_embeddings as $user_id => $user_embedding) { + L\crawlTimeoutLog("Have processed $user_count many users"); $user_embedding = unpack("E*", $user_embedding); + $resource_count = 0; foreach ($item_embeddings as $item_id => $item_embedding) { + L\crawlTimeoutLog("Have processed $resource_count resources"); if (in_array($item_id, $user_items[$user_id]) || !array_key_exists($item_id, $resource_metadata)) { continue; @@ -1146,7 +1215,9 @@ class RecommendationJob extends MediaJob unset($resource_metadata[$item_id]); $recommendations[] = [$user_id, $group_id, $page_id, $resource_path, $similarity, $item_id]; + $resource_count++; } + $user_count++; } $delete_sql = "DELETE FROM GROUP_RESOURCE_RECOMMENDATION"; $db->execute($delete_sql); @@ -1155,7 +1226,9 @@ class RecommendationJob extends MediaJob $insert_sql = $base_insert_sql; $comma = ""; $insert_count = 0; + $total_insert = 0; foreach ($recommendations as $recommendation) { + L\crawlTimeoutLog("Have inserted $total_insert recommendations"); list($user_id, $group_id, $page_id, $resource_path, $score, $item_id) = $recommendation; $time = $this->update_time; @@ -1163,6 +1236,7 @@ class RecommendationJob extends MediaJob "\"$resource_path\", $score, $time, $item_id)"; $comma = ","; $insert_count++; + $total_insert++; if ($insert_count == self::BATCH_SQL_INSERT_NUM) { $insert_sql = $db->insertIgnore($insert_sql); $db->execute($insert_sql); @@ -1243,12 +1317,15 @@ class RecommendationJob extends MediaJob $insert_sql = $base_insert_sql; $comma = ""; $insert_count = 0; + $total_insert = 0; foreach ($this->lru_cache->getAll() as $id => $embedding) { + L\crawlTimeoutLog("Have inserted $total_insert many embeddings"); $embedding = base64_encode($embedding); $insert_sql .= "$comma($id, $item_type, '$embedding')"; $delete_sql .= "$comma $id"; $comma = ","; $insert_count++; + $total_insert++; if ($insert_count == self::BATCH_SQL_INSERT_NUM) { $delete_sql .= ")"; $db->execute($delete_sql, [$item_type]);