Engineer #1 at Quintilon

June 30th, 2024

I started off as an engineer at Quintilon in September 2023, and I was tasked with building an entire MVP from the ground up. The product I worked on is a co-pilot for small business owners / entrepreneurs, and essentially served as an assistant in helping somebody build their own business.

I built four main services / pieces at Quintilon that in harmony serve as an MVP. Two main microservices, a GraphQL federation layer (i.e. an API Gateway), and a Next.js web application.

Conversations

The main technical challenge to building this MVP was to figure out a way to handle conversations. The first iteration of the conversation service was built using LangChain's conversational buffer memory. We could build a basic conversation with something like this

import { OpenAI } from "@langchain/openai";
import { BufferMemory } from "langchain/memory";
import { ConversationChain } from "langchain/chains";

const model = new OpenAI({});
const memory = new BufferMemory();
const chain = new ConversationChain({ llm: model, memory: memory });
const res1 = await chain.call({ input: "Hi, I'm Aryaman! " });
console.log({ res1 });

This also required us to maintain conversations in our database (we used Azure SQL with Prisma).

Although we came up with a solution that worked quite well and was extensible for when we decided to plug in a vector database, this was right around the time that OpenAI announced their Assistants API. The Assistants API was a total game changer for us. All we had to do was preconfigure our Assistant with some basic prompting, and then we could use the API to create and manage threads between users and the assistant, completely removing the need for storing conversations on our end.

This could be done by creating a thread with all user context, and then creating a run to execute the thread and prompt a response.

// First, we must create the thread

// Retrieve the users input
const message = getUserInput();

const thread = await openai.beta.threads.create({
    messages: [
        {
            role: "user",
            content: message
        }
    ]
});

// At this point, the thread has been created and is now ready to be executed on by an assistant of your choice.

// We must get the ID of our pre-configured assistant
const ASSISTANT_ID = getAssistantID();

// Create the run using our specific assistant
const run = await openapi.beta.threads.runs.create(
    thread.id,
    { assistant_id: ASSISTANT_ID }
)

This is how to create a thread, and then get a run to execute on that thread with the Assistants API. However, now we have a problem. Creating this run doesn't actually give us a response from the assistant, it just creats that run and gives us a run_id. At the time of writing this, OpenAI has introduced streaming responses to runs, but while building this product, they only supported polling as a means of receiving responses.

To come up with a solution of recieving responses, we must look into what states a 'run' can be in.

Runs

As soon as the create run function is complete, the run is queued and then put into the in_progress state. Then, the assistant produces the output and once it is done, the run moves into the complete state (or some other state but we don't care about those edge cases for now).

To process these runs, poll the assistant for responses, and provide the responses to the frontend will require some careful orchestration.

The first part of the process was creating a ThreadPolling class that wrapped up all thread polling logic and then had a callback for when a response was received. This class looked something like this.

class ThreadPoller {
    private queue: ThreadObject[] = [];
    private batchSize = 50;
    private TIMEOUT_MS = 1000;

    constructor(batchSize?: number, timeout_ms?: number) {
        this.batchSize = batchSize;
        this.TIMEOUT_MS = timeout_ms;
    }

    public queueItem(run: ThreadObject) {
        this.queue.push(run);
    }

    public process() {
        const toProcess = this.queue.splice(0, Math.max(batchSize, this.queue.length))
        toProcess.forEach(async (item) => {
            const response = openai.beta.threads.runs.retrieve(item.thread_id, item.run_id);

            if (response.status == 'complete') {
                runCallback(); 
            } else {
                this.queueItem(item);
            }
        })

        setTimeout(this.process(this.batchSize), this.TIMEOUT_MS);
    }
}

Now, we have a reliable method for polling Assistant responses. How do we propagate this to the end user? If you've been paying attention, you'll remember these microservices exposed data through a GraphQL API. To propagate an update like this, we'll have to extend that API to include a WebSocket. This will allow us to support a publisher subscribe model, where the Next.js application can subscribe to updates published by our conversations service.

Using the graphql-subscriptions library, we can execute something like this within the runCallback function above:

const pubsub = new PubSub();
...
if (response.status == 'complete') {
    pubsub.publish('RESPONSE_RECEIVED', {
        response: {
            threadId: item.threadId,
            runId: item.runId,
            content: response.content
        }
    })
} else { ... }
...

The upper level GraphQL resolver will have to simply listen for this action, and then this will publish the response to all client-side subscribers.

const resolvers = {
  Subscription: {
    postCreated: {
      subscribe: () => pubsub.asyncIterator(['RESPONSE_RECEIVED']),
    },
  },
  // ...other resolvers...
};

This resolver can be updated to filter out only those responses that a specific client is intersted in (i.e. the responses for a specific thread).

Now, with this, we have described how to build a full-fledged microservices based application that can handle conversations with LLMs. Or, you could just use the Vercel AI SDK and call it day :).