Skip to main content

Overview

ServiceSwitcher is a specialized parallel pipeline that enables dynamic switching between multiple service instances at runtime. This is useful when you need to change between different STT providers, TTS providers, or other frame processors based on user preferences, costs, performance requirements, or other runtime conditions. The switcher uses a strategy pattern to determine which service is active. Two built-in strategies are provided: manual switching for explicit control, and automatic failover for handling service errors.

How It Works

ServiceSwitcher wraps multiple services in a parallel pipeline where only the active service processes frames. Each service is “sandwiched” between two filters that check if it’s the currently active service before allowing frames to pass through. When you switch services, the filters update to redirect frame flow to the newly active service.

Constructor

from pipecat.pipeline.service_switcher import ServiceSwitcher

# Uses ServiceSwitcherStrategyManual by default
switcher = ServiceSwitcher(services=[stt_service1, stt_service2])

# Or explicitly specify a strategy
from pipecat.pipeline.service_switcher import ServiceSwitcherStrategyFailover
switcher = ServiceSwitcher(
    services=[stt_service1, stt_service2],
    strategy_type=ServiceSwitcherStrategyFailover
)
services
List[FrameProcessor]
required
List of service instances to switch between. Can be any frame processors (STT, TTS, or custom processors).
strategy_type
Type[ServiceSwitcherStrategy]
default:"ServiceSwitcherStrategyManual"
The strategy class to use for switching logic. Pass the class itself, not an instance. Defaults to ServiceSwitcherStrategyManual.

Switching Strategies

ServiceSwitcherStrategyManual

The manual strategy allows explicit control over which service is active by pushing ManuallySwitchServiceFrame frames into the pipeline. Initial State: The first service in the list is active by default. Switching: Push a ManuallySwitchServiceFrame with the desired service instance.

ServiceSwitcherStrategyFailover

The failover strategy automatically switches to the next available service when the active service reports a non-fatal error. This enables automatic recovery from service failures without manual intervention. Initial State: The first service in the list is active by default. Automatic Failover: When the active service pushes a non-fatal ErrorFrame, the strategy automatically switches to the next service in the list (wrapping around to the first service if needed). Recovery: The failed service remains in the list and can be switched back to manually or via application logic in the on_service_switched event handler. This allows implementing custom recovery policies.
from pipecat.pipeline.service_switcher import ServiceSwitcher, ServiceSwitcherStrategyFailover

switcher = ServiceSwitcher(
    services=[primary_stt, backup_stt],
    strategy_type=ServiceSwitcherStrategyFailover
)

@switcher.strategy.event_handler("on_service_switched")
async def on_switched(strategy, service):
    # Application decides when/how to recover the failed service
    print(f"Switched to: {service.name}")

Custom Strategies

You can create your own switching strategy by subclassing ServiceSwitcherStrategy and implementing the handle_frame and/or handle_error methods.
  • handle_frame(frame, direction): Called for control frames (like ManuallySwitchServiceFrame). Should return the newly active service if a switch occurred, or None otherwise.
  • handle_error(error): Called when the active service reports a non-fatal error. Override this to implement custom error-handling logic. Should return the newly active service if a switch occurred, or None otherwise.
Additionally, if you want to maintain either manual switching or automatic failover as an option when writing a custom strategy, your new strategy should inherit from ServiceSwitcherStrategyManual or ServiceSwitcherStrategyFailover, respectively.

Usage Examples

Switching Between TTS Services

from pipecat.frames.frames import ManuallySwitchServiceFrame
from pipecat.pipeline.service_switcher import ServiceSwitcher
from pipecat.services.elevenlabs.tts import ElevenLabsTTSService
from pipecat.services.cartesia.tts import CartesiaTTSService

# Create TTS services
elevenlabs = ElevenLabsTTSService(api_key=os.getenv("ELEVENLABS_API_KEY"), voice_id=os.getenv("ELEVENLABS_VOICE_ID"))
cartesia = CartesiaTTSService(api_key=os.getenv("CARTESIA_API_KEY"), voice_id=os.getenv("CARTESIA_VOICE_ID"))

# Create switcher with both services (uses ServiceSwitcherStrategyManual by default)
tts_switcher = ServiceSwitcher(services=[elevenlabs, cartesia])

# Use in pipeline
pipeline = Pipeline([
    transport.input(),
    stt,
    context_aggregator.user(),
    llm,
    tts_switcher,
    transport.output(),
    context_aggregator.assistant()
])

# Later, switch to Cartesia
await task.queue_frame(ManuallySwitchServiceFrame(service=cartesia))

Event Handlers

EventDescription
on_service_switchedActive service was switched
@switcher.event_handler("on_service_switched")
async def on_service_switched(switcher, service):
    print(f"Switched to: {service.name}")
Parameters:
ParameterTypeDescription
switcherServiceSwitcherStrategyThe switcher instance
serviceFrameProcessorThe newly active service