For data in the pages collection, we would like to perform a RAG based on a similarity search on data where the creator userId field matches the userId of the currently logged in user and the pages data.
The LLM is using LangChain.
However, I am getting an error saying that I need index as a token.
Error during the document retrieval or generation process: MongoServerError: PlanExecutor error during aggregation :: caused by :: Path 'userId' needs to be indexed as token
at Connection.sendCommand (/workspaces/Siki/backend/node_modules/mongodb/src/cmap/connection.ts:511:17)
at processTicksAndRejections (node:internal/process/task_queues:95:5)
at async Connection.command (/workspaces/Siki/backend/node_modules/mongodb/src/cmap/connection.ts:575:22)
at async Server.command (/workspaces/Siki/backend/node_modules/mongodb/src/sdam/server.ts:322:16)
at async executeOperation (/workspaces/Siki/backend/node_modules/mongodb/src/operations/execute_operation.ts:181:12)
at async AggregationCursor._initialize (/workspaces/Siki/backend/node_modules/mongodb/src/cursor/aggregation_cursor.ts:71:22)
at async AggregationCursor.[kInit] (/workspaces/Siki/backend/node_modules/mongodb/src/cursor/abstract_cursor.ts:644:21)
at async next (/workspaces/Siki/backend/node_modules/mongodb/src/cursor/abstract_cursor.ts:717:7)
at async AggregationCursor.[Symbol.asyncIterator] (/workspaces/Siki/backend/node_modules/mongodb/src/cursor/abstract_cursor.ts:302:26)
at async AggregationCursor.toArray (/workspaces/Siki/backend/node_modules/mongodb/src/cursor/abstract_cursor.ts:438:22) {
ok: 0,
code: 8,
codeName: 'UnknownError',
'$clusterTime': {
clusterTime: new Timestamp({ t: 1713417868, i: 4 }),
signature: {
hash: Binary.createFromBase64('WUOtgKIQnk2CBY4KhQ86oggZnZo=', 0),
keyId: new Long('7314012942091943938')
}
},
operationTime: new Timestamp({ t: 1713417868, i: 4 }),
[Symbol(errorLabels)]: Set(0) {}
}
As a countermeasure, I created an AtlasSearch Index in the MongoDB Atlas UI as follows, but the result remains the same and the same error occurs.
{
"mappings": {
"dynamic": false,
"fields": {
"userId": {
"analyzer": "lucene.standard",
"type": "string"
}
}
},
"storedSource": {
"include": [
"userId"
]
}
}
import { ChatOpenAI } from "@langchain/openai";
import { MongoDBAtlasVectorSearch } from "@langchain/mongodb";
import { MongoClient } from "mongodb";
import { OpenAIEmbeddings } from "@langchain/openai";
import { ChatPromptTemplate } from "@langchain/core/prompts";
import { StringOutputParser } from "@langchain/core/output_parsers";
import { createStuffDocumentsChain } from "langchain/chains/combine_documents";
import { sleep } from "langchain/util/time";
import { SystemPrompt, contentPrompt } from "./promptUtils"
async function retrieverGeneration(userId: string, userMessage: string) {
let client;
try {
client = new MongoClient(process.env.MONGODB_URI || "");
await client.connect();
console.log("MongoDB connected successfully.");
} catch (error) {
console.error("Failed to connect to MongoDB:", error);
throw new Error("MongoDB connection failed.");
}
try {
const namespace = "Siki.pages";
const [dbName, collectionName] = namespace.split(".");
const collection = client.db(dbName).collection(collectionName);
// Check if the index already exists and create it if it does not
const indexExists = await collection.indexExists("userId_1");
if (!indexExists) {
await collection.createIndex({ "userId": 1 });
console.log("Index created on 'userId'");
}
const vectorStore = new MongoDBAtlasVectorSearch(new OpenAIEmbeddings({
openAIApiKey: process.env.OPENAI_API_KEY,
batchSize: 2048,
modelName: "text-embedding-3-small",
}), {
collection,
indexName: "vector_index",
textKey: "content",
embeddingKey: "vector",
});
const retriever = vectorStore.asRetriever({
k: 3,
searchType: "similarity",
filter: {
preFilter: {
userId: userId // Filtering by userId
}
}
});
await sleep(2000);
const prompt = ChatPromptTemplate.fromMessages([
["system", SystemPrompt],
["human", contentPrompt],
]);
const llm = new ChatOpenAI({ modelName: "gpt-4", temperature: 0, openAIApiKey: process.env.OPENAI_API_KEY });
const ragChain = await createStuffDocumentsChain({
llm,
prompt,
outputParser: new StringOutputParser(),
});
const retrievedDocs = await retriever.getRelevantDocuments(userMessage);
console.log("Documents retrieved:", retrievedDocs.length);
const response = await ragChain.invoke({
question: userMessage,
context: retrievedDocs,
});
console.log("Response generated:", response);
return response;
} catch (error) {
console.error("Error during the document retrieval or generation process:", error);
throw error;
} finally {
await client.close();
console.log("MongoDB connection closed.");
}
}
export { retrieverGeneration };
How can I use LangChain to filter data and then implement RAG?