Skip to content

Broadcaster & Pub/Sub

The Broadcaster is the pub/sub engine at the heart of Atmosphere. Every real-time interaction flows through it: clients subscribe by connecting to a path, and messages are delivered to all subscribers when you call broadcast(). This chapter covers the Broadcaster API, the built-in implementations, BroadcasterFactory for managing multiple broadcasters, BroadcasterCache for missed messages, BroadcastFilter for message transformation, and @DeliverTo for controlling delivery scope.

When you annotate a class with @ManagedService(path = "/chat"), Atmosphere creates a Broadcaster for the /chat path. Every client that connects to /chat is subscribed to that Broadcaster as an AtmosphereResource. When your @Message method returns a value, that value is broadcast to every subscribed resource.

The flow:

  1. Client connects to /chat — Atmosphere creates an AtmosphereResource and adds it to the /chat Broadcaster.
  2. Client sends a message — Atmosphere invokes your @Message method.
  3. Your method returns a value — Atmosphere calls broadcaster.broadcast(returnValue).
  4. The Broadcaster delivers the message to every subscribed AtmosphereResource, using the appropriate transport for each client.

The Broadcaster interface (org.atmosphere.cpr.Broadcaster) defines the core operations:

// Broadcast to ALL subscribers
Future<Object> broadcast(Object message);
// Broadcast to a SPECIFIC subscriber
Future<Object> broadcast(Object message, AtmosphereResource resource);
// Broadcast to a SUBSET of subscribers
Future<Object> broadcast(Object message, Set<AtmosphereResource> subset);

All broadcast methods return a Future<Object>. Broadcasting is asynchronous by default — the Broadcaster uses an ExecutorService to deliver messages. Use future.get() if you need to wait for delivery to complete.

// Get all subscribed resources
Collection<AtmosphereResource> getAtmosphereResources();
// Add a resource (subscribe)
Broadcaster addAtmosphereResource(AtmosphereResource resource);
// Remove a resource (unsubscribe)
Broadcaster removeAtmosphereResource(AtmosphereResource resource);
// Get the broadcaster's ID (typically the path)
String getID();
// Set the broadcaster's ID
void setID(String name);
// Broadcast after a delay
Future<Object> delayBroadcast(Object message);
Future<Object> delayBroadcast(Object message, long delay, TimeUnit unit);
// Broadcast periodically
Future<Object> scheduleFixedBroadcast(Object message, long period, TimeUnit unit);
Future<Object> scheduleFixedBroadcast(Object message, long waitFor, long period, TimeUnit unit);

DefaultBroadcaster is the default implementation used by @ManagedService. It:

  • Supports all transports (WebSocket, SSE, long-polling, streaming)
  • Uses an ExecutorService for asynchronous message delivery
  • Uses ReentrantLock (not synchronized) to avoid virtual thread pinning on JDK 21+
  • Supports BroadcasterCache and BroadcastFilter
  • Handles subscriber lifecycle (automatic removal on disconnect)

You rarely need to interact with DefaultBroadcaster directly. The @ManagedService annotation uses it automatically:

@ManagedService(path = "/chat") // uses DefaultBroadcaster by default
public class Chat { ... }

SimpleBroadcaster extends DefaultBroadcaster and is a lighter-weight implementation designed for WebSocket-only scenarios. It is typically used with @WebSocketHandlerService:

@WebSocketHandlerService(path = "/chat", broadcaster = SimpleBroadcaster.class)
public class WebSocketChat extends WebSocketStreamingHandlerAdapter { ... }

You can also use it with @ManagedService if you know all clients will use WebSocket:

@ManagedService(path = "/chat", broadcaster = SimpleBroadcaster.class)
public class Chat { ... }

BroadcasterFactory is the registry that manages all active Broadcaster instances. You inject it and use it to look up, create, or iterate over broadcasters.

@Inject
private BroadcasterFactory factory;
// Look up by ID (returns null if not found)
Broadcaster b = factory.lookup("/chat");
// Look up by ID, creating if it does not exist
Broadcaster b = factory.lookup("/chat", true);
// Look up with Optional (preferred over lookup which returns null)
Optional<Broadcaster> b = factory.findBroadcaster("/chat");

The findBroadcaster method (added in 4.0) returns an Optional instead of null, making the absent-broadcaster case explicit at the call site.

// Look up with a specific Broadcaster type
SimpleBroadcaster b = factory.lookup(SimpleBroadcaster.class, "/chat");
// Look up with type, creating if it does not exist
SimpleBroadcaster b = factory.lookup(SimpleBroadcaster.class, "/chat", true);
// Get all active broadcasters
Collection<Broadcaster> all = factory.lookupAll();
// Create a broadcaster with a generated ID
Broadcaster b = factory.get();
// Create a broadcaster with a specific ID
Broadcaster b = factory.get("/my-channel");
// Create a broadcaster with a specific type and ID
SimpleBroadcaster b = factory.get(SimpleBroadcaster.class, "/my-channel");

Practical Example: Cross-Broadcaster Messaging

Section titled “Practical Example: Cross-Broadcaster Messaging”

Using BroadcasterFactory, you can send messages from one endpoint to subscribers of another:

@ManagedService(path = "/notifications")
public class Notifications {
@Inject
private BroadcasterFactory factory;
@org.atmosphere.config.service.Message
public void onMessage(String message) {
// Forward the message to the /chat broadcaster as well
factory.findBroadcaster("/chat").ifPresent(b -> b.broadcast(message));
}
}

When a client temporarily disconnects (network glitch, page navigation, etc.), messages broadcast during the disconnection are lost unless a BroadcasterCache is configured. The cache stores messages and replays them when the client reconnects.

UUIDBroadcasterCache is the default cache used by @ManagedService. It tracks which messages each AtmosphereResource has received using the resource’s UUID. When a client reconnects with the same UUID, any missed messages are delivered.

@ManagedService(path = "/chat", broadcasterCache = UUIDBroadcasterCache.class) // this is the default
public class Chat { ... }

The UUIDBroadcasterCache is configured automatically when you use @ManagedService. You do not need to set it explicitly unless you want to use a different implementation.

  1. When a message is broadcast, the cache stores it along with the set of resource UUIDs that received it.
  2. When a client reconnects, the cache checks which messages that client’s UUID has not received.
  3. Those missed messages are delivered to the reconnecting client.
  4. Cached messages expire after a configurable time period.

A BroadcastFilter intercepts messages before they are delivered to subscribers. Filters can transform the message, pass it through unchanged, or abort delivery entirely.

public interface BroadcastFilter {
record BroadcastAction(ACTION action, Object message, Object originalMessage) {
public enum ACTION {
CONTINUE, // pass to next filter
ABORT, // discard the message
SKIP // stop filtering, deliver the current message
}
}
BroadcastAction filter(String broadcasterId, Object originalMessage, Object message);
}

The filter method receives the broadcaster ID, the original message, and the (possibly already transformed) message. It returns a BroadcastAction that tells the framework what to do:

  • CONTINUE — pass the (possibly transformed) message to the next filter in the chain.
  • ABORT — discard the message entirely; it will not be delivered.
  • SKIP — stop the filter chain and deliver the message as-is.

Filters are added via the broadcastFilters attribute of @ManagedService:

@ManagedService(path = "/chat", broadcastFilters = {XSSHtmlFilter.class})
public class Chat { ... }

Multiple filters are applied in the order they are listed.

Alternatively, annotate the filter class itself with @BroadcasterFilterService and Atmosphere will discover and register it at startup:

@BroadcasterFilterService
public class ProfanityFilter implements BroadcastFilter { /* ... */ }

The standard BroadcastFilter sees the message but not the target client. When you need to transform or filter differently per subscriber (e.g., based on role or session data), implement PerRequestBroadcastFilter. It adds a four-argument filter method that receives the AtmosphereResource being delivered to:

public class RoleFilter implements PerRequestBroadcastFilter {
@Override
public BroadcastAction filter(String broadcasterId,
AtmosphereResource r,
Object originalMessage,
Object message) {
// Access r.getRequest() for session/auth info
if (isAdmin(r)) {
return new BroadcastAction(ACTION.CONTINUE, message);
}
return new BroadcastAction(ACTION.CONTINUE, redact(message));
}
@Override
public BroadcastAction filter(String broadcasterId,
Object originalMessage,
Object message) {
return new BroadcastAction(ACTION.CONTINUE, message);
}
}

The four-argument method is called once per subscriber, per message. The three-argument method (from BroadcastFilter) is called once per broadcast, before the per-resource pass. Return CONTINUE from the three-argument method to let the per-resource filter run.

A BroadcasterListener receives lifecycle events from a Broadcaster — creation, destruction, resource add/remove, and message queuing. Annotate the class with @BroadcasterListenerService for automatic discovery:

@BroadcasterListenerService
public class MyListener implements BroadcasterListener {
public void onPostCreate(Broadcaster b) { /* new broadcaster */ }
public void onComplete(Broadcaster b) { /* broadcast delivered */ }
public void onPreDestroy(Broadcaster b) { /* broadcaster shutting down */ }
public void onAddAtmosphereResource(Broadcaster b, AtmosphereResource r) { /* client joined */ }
public void onRemoveAtmosphereResource(Broadcaster b, AtmosphereResource r) { /* client left */ }
public void onMessage(Broadcaster b, Deliver deliver) { /* message queued */ }
}

This is useful for monitoring, auditing, or triggering side effects when broadcasters are created/destroyed or when clients subscribe/unsubscribe.

By default, the return value of a @Message method is broadcast to all subscribers on the endpoint’s Broadcaster. The @DeliverTo annotation changes the delivery scope.

The default behavior — deliver to all subscribers on this Broadcaster:

@org.atmosphere.config.service.Message(encoders = {JacksonEncoder.class}, decoders = {JacksonDecoder.class})
@DeliverTo(DELIVER_TO.BROADCASTER)
public Message onMessage(Message message) {
return message; // sent to all subscribers on this broadcaster
}

Deliver only to the resource that sent the message. Useful for acknowledgments or request/response patterns:

@org.atmosphere.config.service.Message(decoders = {JacksonDecoder.class})
@DeliverTo(DELIVER_TO.RESOURCE)
public String onMessage(Message message) {
return "Received: " + message.getMessage(); // sent only to the sender
}

Deliver to all resources across all Broadcaster instances. This is a global broadcast:

@org.atmosphere.config.service.Message(encoders = {JacksonEncoder.class}, decoders = {JacksonDecoder.class})
@DeliverTo(DELIVER_TO.ALL)
public Message onMessage(Message message) {
return message; // sent to every connected client, on every broadcaster
}

The enum and annotation are in org.atmosphere.config.service:

import org.atmosphere.config.service.DeliverTo;
import org.atmosphere.config.service.DeliverTo.DELIVER_TO;

In addition to @DeliverTo (which controls delivery at the method level), the Broadcaster itself has a scope that controls which resources it can reach:

public enum SCOPE {
REQUEST, // only the current request's AtmosphereResource
APPLICATION, // all resources in the current web application (default)
VM // all resources in the current JVM
}

Most applications use the default APPLICATION scope.

The natural pattern is one Broadcaster per topic or channel. With @ManagedService, this happens automatically based on the path:

@ManagedService(path = "/chat/{room}")
public class ChatRoom {
@PathParam("room")
private String room;
// Each room value (/chat/general, /chat/support) gets its own Broadcaster
}

When you need to broadcast from outside a @ManagedService class (e.g., from a REST controller or a scheduled task), use BroadcasterFactory:

@Inject
private BroadcasterFactory factory;
public void notifyAll(String message) {
factory.findBroadcaster("/notifications").ifPresent(b -> b.broadcast(message));
}

To send a message to a specific client, use the two-argument broadcast:

@Inject
private BroadcasterFactory factory;
public void notifyUser(AtmosphereResource target, String message) {
factory.findBroadcaster("/notifications").ifPresent(b -> b.broadcast(message, target));
}

The Broadcaster is a low-level pub/sub primitive. If your application needs presence tracking, stable member identity, or message history, consider the Room API instead. Rooms wrap Broadcasters and add higher-level features:

FeatureBroadcasterRoom
LevelLow-levelHigh-level
IdentityPath-based IDNamed group
PresenceManual trackingBuilt-in events
Direct messagingManual UUID lookuproom.sendTo(memberId, msg)
Message historyVia BroadcasterCacheroom.enableHistory(n)
Client protocolManualBuilt into atmosphere.js

For most chat and collaboration apps, prefer Rooms. Use Broadcaster directly when you need fine-grained control over message delivery, custom filters, or lifecycle policies.

You are not limited to broadcasting from within a @ManagedService or @WebSocketHandlerService handler. Any server-side code — a scheduled task, a JMS listener, a background thread, a REST controller — can look up a Broadcaster and push messages to connected clients.

Use BroadcasterFactory.lookup() to find a Broadcaster by its path, then call broadcast():

@Inject
private BroadcasterFactory factory;
public void startPeriodicUpdates(ScheduledExecutorService scheduler) {
scheduler.scheduleAtFixedRate(() -> {
factory.findBroadcaster("/dashboard").ifPresent(b ->
b.broadcast("{\"type\":\"heartbeat\",\"time\":" + System.currentTimeMillis() + "}")
);
}, 0, 10, TimeUnit.SECONDS);
}

The Broadcaster itself has a built-in scheduleFixedBroadcast method that simplifies periodic broadcasting without an external scheduler:

factory.findBroadcaster("/dashboard").ifPresent(b ->
b.scheduleFixedBroadcast(() -> "Server Push " + Instant.now(), 10, TimeUnit.SECONDS)
);

A common pattern is reacting to an external event source (database change, message queue, file watcher) and pushing the update to connected clients:

// Called by a JMS listener, Kafka consumer, or any background thread
public void onExternalEvent(String topic, String payload) {
factory.findBroadcaster("/events/" + topic).ifPresent(b -> b.broadcast(payload));
}

The key point: BroadcasterFactory is thread-safe and can be called from any thread. Inject it or obtain it from the AtmosphereConfig.

Atmosphere 4.0 uses virtual threads (Executors.newVirtualThreadPerTaskExecutor()) by default for message delivery and async writes. Virtual threads are cheap to create and do not require pool sizing — the JVM schedules them onto a small number of carrier threads automatically. This means:

  • No thread pool exhaustion — each broadcast gets its own virtual thread at near-zero cost
  • No maxProcessingThreads tuning needed — the old 2.x advice about bounding thread pools does not apply
  • ReentrantLock instead of synchronizedDefaultBroadcaster uses ReentrantLock to avoid virtual thread pinning

To opt out and use platform threads instead (e.g., for compatibility with libraries that don’t work well with virtual threads):

<init-param>
<param-name>org.atmosphere.cpr.useVirtualThreads</param-name>
<param-value>false</param-value>
</init-param>

By default, message ordering is guaranteed: broadcast("A") followed by broadcast("B") delivers A before B to every client. If your application does not require strict ordering, disabling this constraint can improve throughput:

<init-param>
<param-name>org.atmosphere.cpr.Broadcaster.supportOutOfOrderBroadcast</param-name>
<param-value>true</param-value>
</init-param>
ConceptPurpose
BroadcasterPub/sub hub; delivers messages to subscribed AtmosphereResource instances
DefaultBroadcasterDefault implementation; supports all transports, async delivery
SimpleBroadcasterLighter implementation; typically used with WebSocket-only endpoints
BroadcasterFactoryRegistry for looking up, creating, and iterating over Broadcaster instances
UUIDBroadcasterCacheDefault cache; replays missed messages on reconnection
BroadcastFilterIntercepts and transforms messages before delivery
PerRequestBroadcastFilterPer-subscriber message filtering (e.g., role-based redaction)
BroadcasterListenerLifecycle events: creation, destruction, subscribe, unsubscribe
@DeliverToControls delivery scope: RESOURCE, BROADCASTER, or ALL