Skip to main content

Utility Services

In addition to the various AI services available in Pipecat, there are a handful of utility service classes that help you build your app.

Aggregators

These should all be imported from pipecat.pipeline.aggregators.

LLMUserResponseAggregator and LLMAssistantResponseAggregator

These services are the best way to take a conversation between a user and a bot and save it to a list of messages that can be used for LLM completions. Both service initializers take a messages object, which you should create in your app and pass to both services, like this:

messages = []
user_aggregator = LLMUserResponseAggregator(messages, transport._my_participant_id)
bot_aggregator = LLMAssistantResponseAggregator(messages, transport._my_participant_id)

Each service listens for a start frame, one or more text frames, and an end frame.

LLMUserResponseAggregator:

  • Start: UserStartedSpeakingFrame
  • Text: TranscriptionFrame
  • End: UserStoppedSpeakingFrame

(Note: This uses VAD to generate the started and stopped speaking frames.)

LLMAssistantResponseAggregator:

  • Start: LLMResponseStartFrame
  • Text: `TextFrame
  • End: LLMResponseEndFrame

You can see this in action in the chatbot example app.

UserResponseAggregator

If you want to accumulate user speech between 'started' and 'stopped' talking events, but you don't want to store that in a messages list for an LLM, you can use a UserResponseAggregator. This will accumulate all TranscriptionFrames received between a UserStartedSpeakingFrame and a UserStoppedSpeakingFrame, then emit them as a single TextFrame.

LLMFullResponseAggregator

The same as the UserResponseAggregator, but, you know, for the LLM.

SentenceAggregator

This uses an approach similar to the approach used by text-to-speech services: It will accumulate TextFrames until it sees one with sentence-ending punctuation, then emit all of the accumulated text as one frame. So, for example:

<TextFrame text="Hello, it's">
<TextFrame text=" nice to meet">
<TextFrame text=" you.">

Becomes a single TextFrame containing the text "Hello, it's nice to meet you."

StatelessTextTransformer

This service transforms the text of any TextFrames it sees. For example, this instance:

aggregator = StatelessTextTransformer(lambda x: x.upper())

Would receive <TextFrame text="Hello!"> and emit <TextFrame text="HELLO!">.

Advanced Aggregators

These services enable some complex pipeline architectures.

ParallelPipeline

This service lets you run a set of services in parallel, instead of sequentially. For example, it's used in 05-sync-speech-and-image in the framework repo:

pipeline = Pipeline(
processors=[
llm,
sentence_aggregator,
ParallelPipeline(
[[month_prepender, tts], [llm_full_response_aggregator, imagegen]]
),
gated_aggregator,
],
)

It's important to keep in mind that the parallel pipeline doesn't let frames get 'out of order'; it takes frame A from its source queue and sends it through all of the parallel pipelines at the same time, but it doesn't start processing frame B from its source queue until all of the parallel branches from frame A have completed and yielded frames to the sink queue.

GatedAggregator

For more information on the GatedAggregator, take a look at this example in the framework, as well as the comments in the source code.

VisionImageFrameAggregator

Use a VisionImageFrameAggregator to build a VisionImageFrame out of a TextFrame and an ImageFrame. See Vision Services for more info.