Software: Apache/2.4.41 (Ubuntu). PHP/8.0.30 uname -a: Linux apirnd 5.4.0-204-generic #224-Ubuntu SMP Thu Dec 5 13:38:28 UTC 2024 x86_64 uid=33(www-data) gid=33(www-data) groups=33(www-data) Safe-mode: OFF (not secure) /var/www/html/openAi/ drwxr-xr-x | |
| Viewing file: Select action/file-type: require("dotenv").config();
const mongoose = require("mongoose");
const {Product} = require("./models/Product");
const {Variant} = require("./models/Variant");
const {Category} = require("./models/Category");
const {ShopSetting} = require("./models/ShopSetting");
const ProductVector = require("./models/ProductVector");
const buildSearchableText = require("./services/vectorHandler");
const { generateEmbedding } = require("./services/embeddings");
async function connectDB() {
await mongoose.connect(process.env.MONGO_URI);
console.log("✅ MongoDB connected");
}
async function getCategoryNames(idsString) {
if (!idsString) return [];
const ids = idsString.split(",").map(Number);
const categories = await Category.find({ id: { $in: ids } });
return categories.map(c => c.name);
}
async function processVariantChange(variantDoc,retryCount=5) {
try {
const product = await Product.findOne({ id: variantDoc.product_id });
if (!product) return;
const categories = await getCategoryNames(product.categories);
const subCategories = await getCategoryNames(product.sub_categories);
const text =await buildSearchableText(product, variantDoc, categories, subCategories);
var shopSetting = await ShopSetting.findOne({ id: product.shop_id },{page_id:1});
const embedding = await generateEmbedding(text);
await ProductVector.updateOne(
{ product_id: product.id, variant_id: variantDoc.id },
{
$set: {
v_id: variantDoc._id,
product_id: product.id,
variant_id: variantDoc.id,
vendor_id: product.vendor_id,
page_id: shopSetting ? shopSetting.page_id : 0,
searchable_text: text,
embedding,
visibility: variantDoc.visibility,
}
},
{ upsert: true }
);
return "done";
} catch (err) {
if (retryCount > 1) {
// Wait 500ms before retry
await new Promise(res => setTimeout(res, 500));
return processVariantChange(variantDoc,retryCount - 1); // recursive retry
} else {
// throw err; // all retries failed
return err;
}
}
}
async function productWiseChange(product_id,retryCount=5) {
try{
const product = await Product.findOne({ _id: product_id });
if (!product) return;
const categories = await getCategoryNames(product.categories);
const subCategories = await getCategoryNames(product.sub_categories);
var shopSetting = await ShopSetting.findOne({ id: product.shop_id },{page_id:1});
const variants= await Variant.find({product_id : product.id});
await Promise.all(
variants.map(async (variantDoc) => {
const text = await buildSearchableText(product, variantDoc, categories, subCategories);
const embedding = await generateEmbedding(text);
await ProductVector.updateOne(
{ product_id: product.id, variant_id: variantDoc.id },
{
$set: {
v_id: variantDoc._id,
product_id: product.id,
variant_id: variantDoc.id,
vendor_id: product.vendor_id,
page_id: shopSetting ? shopSetting.page_id : 0,
searchable_text: text,
embedding,
visibility: variantDoc.visibility,
}
},
{ upsert: true }
);
})
);
return "done";
} catch (err) {
console.log(err);
if (retryCount > 1) {
// Wait 500ms before retry
await new Promise(res => setTimeout(res, 500));
return productWiseChange(product_id, retryCount - 1); // recursive retry
} else {
// throw err; // all retries failed
return err;
}
}
}
async function deleteProduct(product_id,retryCount=5) {
try{
const product = await Product.findOne({ _id: product_id });
if (!product) return;
const result = await Variant.deleteMany({
product_id: product.id,
});
return result;
} catch (err) {
if (retryCount > 1) {
await new Promise(res => setTimeout(res, 500));
return deleteProduct(product_id, retryCount - 1); // recursive retry
} else {
return err;
}
}
}
async function productVisibility(product_id,retryCount=5) {
try{
const product = await Product.findOne({ _id: product_id });
if (!product) return;
const result = await Variant.updateMany(
{ product_id: product.id },
{ visibility : product.visibility } // fields to update
);
return result;
} catch (err) {
if (retryCount > 1) {
await new Promise(res => setTimeout(res, 500));
return productVisibility(product_id, retryCount - 1); // recursive retry
} else {
return err;
}
}
}
async function startChangeStream() {
const variantCollection = mongoose.connection.collection("Variants");
const changeStream = variantCollection.watch();
console.log("👀 Watching Variants collection for changes...");
changeStream.on("change", async (change) => {
console.log("🔔 Change detected:", change.operationType);
if (change.operationType === "insert" || change.operationType === "update" || change.operationType === "replace") {
const variantId = change.documentKey._id;
const variantDoc = await Variant.findOne({ _id: variantId });
if (variantDoc) await processVariantChange(variantDoc,1);
}
else if (change.operationType === "delete") {
const variantId = change.documentKey._id;
await ProductVector.deleteOne({ v_id: variantId });
console.log(`🗑️ Deleted vector for variant ${variantId}`);
}
});
}
async function productChangeStream() {
const prodcutCollection = mongoose.connection.collection("Products");
const changeStream = prodcutCollection.watch();
console.log("👀 Watching Variants collection for changes...");
changeStream.on("change", async (change) => {
console.log( change.updateDescription.updatedFields);
if (change.operationType === "update") {
var updatedFields=change.updateDescription.updatedFields;
const keysToCheck = ['title', 'description','categories','sub_categories'];
const hasAny = keysToCheck.some(key => key in updatedFields);
if (hasAny) {
console.log(hasAny);
await productWiseChange(change.documentKey._id);
}
if ("is_archived" in updatedFields) {
await deleteProduct(change.documentKey._id);
}
if ("visibility" in updatedFields) {
await productVisibility(change.documentKey._id);
}
}
// if (change.operationType === "insert" || change.operationType === "update" || change.operationType === "replace") {
// const variantId = change.documentKey._id;
// const variantDoc = await Variant.findOne({ _id: variantId });
// if (variantDoc) await processVariantChange(variantDoc);
// }
// else if (change.operationType === "delete") {
// const variantId = change.documentKey._id;
// await ProductVector.deleteOne({ v_id: variantId });
// console.log(`🗑️ Deleted vector for variant ${variantId}`);
// }
});
}
(async () => {
await connectDB();
await startChangeStream();
await productChangeStream();
})();
|
:: Command execute :: | |
--[ c99shell v. 2.5 [PHP 8 Update] [24.05.2025] | Generation time: 0.0055 ]-- |