Broadcaster & Pub/Sub
Broadcaster & Pub/Sub
Section titled “Broadcaster & Pub/Sub”The Broadcaster is the pub/sub engine at the heart of Atmosphere. Every real-time interaction flows through it: clients subscribe by connecting to a path, and messages are delivered to all subscribers when you call broadcast(). This chapter covers the Broadcaster API, the built-in implementations, BroadcasterFactory for managing multiple broadcasters, BroadcasterCache for missed messages, BroadcastFilter for message transformation, and @DeliverTo for controlling delivery scope.
How Broadcasting Works
Section titled “How Broadcasting Works”When you annotate a class with @ManagedService(path = "/chat"), Atmosphere creates a Broadcaster for the /chat path. Every client that connects to /chat is subscribed to that Broadcaster as an AtmosphereResource. When your @Message method returns a value, that value is broadcast to every subscribed resource.
The flow:
- Client connects to
/chat— Atmosphere creates anAtmosphereResourceand adds it to the/chatBroadcaster. - Client sends a message — Atmosphere invokes your
@Messagemethod. - Your method returns a value — Atmosphere calls
broadcaster.broadcast(returnValue). - The
Broadcasterdelivers the message to every subscribedAtmosphereResource, using the appropriate transport for each client.
The Broadcaster Interface
Section titled “The Broadcaster Interface”The Broadcaster interface (org.atmosphere.cpr.Broadcaster) defines the core operations:
Broadcasting Messages
Section titled “Broadcasting Messages”// Broadcast to ALL subscribersFuture<Object> broadcast(Object message);
// Broadcast to a SPECIFIC subscriberFuture<Object> broadcast(Object message, AtmosphereResource resource);
// Broadcast to a SUBSET of subscribersFuture<Object> broadcast(Object message, Set<AtmosphereResource> subset);All broadcast methods return a Future<Object>. Broadcasting is asynchronous by default — the Broadcaster uses an ExecutorService to deliver messages. Use future.get() if you need to wait for delivery to complete.
Managing Subscribers
Section titled “Managing Subscribers”// Get all subscribed resourcesCollection<AtmosphereResource> getAtmosphereResources();
// Add a resource (subscribe)Broadcaster addAtmosphereResource(AtmosphereResource resource);
// Remove a resource (unsubscribe)Broadcaster removeAtmosphereResource(AtmosphereResource resource);Identification
Section titled “Identification”// Get the broadcaster's ID (typically the path)String getID();
// Set the broadcaster's IDvoid setID(String name);Scheduled Broadcasting
Section titled “Scheduled Broadcasting”// Broadcast after a delayFuture<Object> delayBroadcast(Object message);Future<Object> delayBroadcast(Object message, long delay, TimeUnit unit);
// Broadcast periodicallyFuture<Object> scheduleFixedBroadcast(Object message, long period, TimeUnit unit);Future<Object> scheduleFixedBroadcast(Object message, long waitFor, long period, TimeUnit unit);DefaultBroadcaster
Section titled “DefaultBroadcaster”DefaultBroadcaster is the default implementation used by @ManagedService. It:
- Supports all transports (WebSocket, SSE, long-polling, streaming)
- Uses an
ExecutorServicefor asynchronous message delivery - Uses
ReentrantLock(notsynchronized) to avoid virtual thread pinning on JDK 21+ - Supports
BroadcasterCacheandBroadcastFilter - Handles subscriber lifecycle (automatic removal on disconnect)
You rarely need to interact with DefaultBroadcaster directly. The @ManagedService annotation uses it automatically:
@ManagedService(path = "/chat") // uses DefaultBroadcaster by defaultpublic class Chat { ... }SimpleBroadcaster
Section titled “SimpleBroadcaster”SimpleBroadcaster extends DefaultBroadcaster and is a lighter-weight implementation designed for WebSocket-only scenarios. It is typically used with @WebSocketHandlerService:
@WebSocketHandlerService(path = "/chat", broadcaster = SimpleBroadcaster.class)public class WebSocketChat extends WebSocketStreamingHandlerAdapter { ... }You can also use it with @ManagedService if you know all clients will use WebSocket:
@ManagedService(path = "/chat", broadcaster = SimpleBroadcaster.class)public class Chat { ... }BroadcasterFactory
Section titled “BroadcasterFactory”BroadcasterFactory is the registry that manages all active Broadcaster instances. You inject it and use it to look up, create, or iterate over broadcasters.
Injection
Section titled “Injection”@Injectprivate BroadcasterFactory factory;Looking Up Broadcasters
Section titled “Looking Up Broadcasters”// Look up by ID (returns null if not found)Broadcaster b = factory.lookup("/chat");
// Look up by ID, creating if it does not existBroadcaster b = factory.lookup("/chat", true);
// Look up with Optional (preferred over lookup which returns null)Optional<Broadcaster> b = factory.findBroadcaster("/chat");The findBroadcaster method (added in 4.0) returns an Optional instead of null, making the absent-broadcaster case explicit at the call site.
Looking Up with Type
Section titled “Looking Up with Type”// Look up with a specific Broadcaster typeSimpleBroadcaster b = factory.lookup(SimpleBroadcaster.class, "/chat");
// Look up with type, creating if it does not existSimpleBroadcaster b = factory.lookup(SimpleBroadcaster.class, "/chat", true);Listing All Broadcasters
Section titled “Listing All Broadcasters”// Get all active broadcastersCollection<Broadcaster> all = factory.lookupAll();Creating New Broadcasters
Section titled “Creating New Broadcasters”// Create a broadcaster with a generated IDBroadcaster b = factory.get();
// Create a broadcaster with a specific IDBroadcaster b = factory.get("/my-channel");
// Create a broadcaster with a specific type and IDSimpleBroadcaster b = factory.get(SimpleBroadcaster.class, "/my-channel");Practical Example: Cross-Broadcaster Messaging
Section titled “Practical Example: Cross-Broadcaster Messaging”Using BroadcasterFactory, you can send messages from one endpoint to subscribers of another:
@ManagedService(path = "/notifications")public class Notifications {
@Inject private BroadcasterFactory factory;
@org.atmosphere.config.service.Message public void onMessage(String message) { // Forward the message to the /chat broadcaster as well factory.findBroadcaster("/chat").ifPresent(b -> b.broadcast(message)); }}BroadcasterCache
Section titled “BroadcasterCache”When a client temporarily disconnects (network glitch, page navigation, etc.), messages broadcast during the disconnection are lost unless a BroadcasterCache is configured. The cache stores messages and replays them when the client reconnects.
UUIDBroadcasterCache
Section titled “UUIDBroadcasterCache”UUIDBroadcasterCache is the default cache used by @ManagedService. It tracks which messages each AtmosphereResource has received using the resource’s UUID. When a client reconnects with the same UUID, any missed messages are delivered.
@ManagedService(path = "/chat", broadcasterCache = UUIDBroadcasterCache.class) // this is the defaultpublic class Chat { ... }The UUIDBroadcasterCache is configured automatically when you use @ManagedService. You do not need to set it explicitly unless you want to use a different implementation.
How It Works
Section titled “How It Works”- When a message is broadcast, the cache stores it along with the set of resource UUIDs that received it.
- When a client reconnects, the cache checks which messages that client’s UUID has not received.
- Those missed messages are delivered to the reconnecting client.
- Cached messages expire after a configurable time period.
BroadcastFilter
Section titled “BroadcastFilter”A BroadcastFilter intercepts messages before they are delivered to subscribers. Filters can transform the message, pass it through unchanged, or abort delivery entirely.
The BroadcastFilter Interface
Section titled “The BroadcastFilter Interface”public interface BroadcastFilter {
record BroadcastAction(ACTION action, Object message, Object originalMessage) {
public enum ACTION { CONTINUE, // pass to next filter ABORT, // discard the message SKIP // stop filtering, deliver the current message } }
BroadcastAction filter(String broadcasterId, Object originalMessage, Object message);}The filter method receives the broadcaster ID, the original message, and the (possibly already transformed) message. It returns a BroadcastAction that tells the framework what to do:
CONTINUE— pass the (possibly transformed) message to the next filter in the chain.ABORT— discard the message entirely; it will not be delivered.SKIP— stop the filter chain and deliver the message as-is.
Configuring Filters
Section titled “Configuring Filters”Filters are added via the broadcastFilters attribute of @ManagedService:
@ManagedService(path = "/chat", broadcastFilters = {XSSHtmlFilter.class})public class Chat { ... }Multiple filters are applied in the order they are listed.
Alternatively, annotate the filter class itself with @BroadcasterFilterService and Atmosphere will discover and register it at startup:
@BroadcasterFilterServicepublic class ProfanityFilter implements BroadcastFilter { /* ... */ }PerRequestBroadcastFilter
Section titled “PerRequestBroadcastFilter”The standard BroadcastFilter sees the message but not the target client. When you need to transform or filter differently per subscriber (e.g., based on role or session data), implement PerRequestBroadcastFilter. It adds a four-argument filter method that receives the AtmosphereResource being delivered to:
public class RoleFilter implements PerRequestBroadcastFilter { @Override public BroadcastAction filter(String broadcasterId, AtmosphereResource r, Object originalMessage, Object message) { // Access r.getRequest() for session/auth info if (isAdmin(r)) { return new BroadcastAction(ACTION.CONTINUE, message); } return new BroadcastAction(ACTION.CONTINUE, redact(message)); }
@Override public BroadcastAction filter(String broadcasterId, Object originalMessage, Object message) { return new BroadcastAction(ACTION.CONTINUE, message); }}The four-argument method is called once per subscriber, per message. The three-argument method (from BroadcastFilter) is called once per broadcast, before the per-resource pass. Return CONTINUE from the three-argument method to let the per-resource filter run.
BroadcasterListener
Section titled “BroadcasterListener”A BroadcasterListener receives lifecycle events from a Broadcaster — creation, destruction, resource add/remove, and message queuing. Annotate the class with @BroadcasterListenerService for automatic discovery:
@BroadcasterListenerServicepublic class MyListener implements BroadcasterListener { public void onPostCreate(Broadcaster b) { /* new broadcaster */ } public void onComplete(Broadcaster b) { /* broadcast delivered */ } public void onPreDestroy(Broadcaster b) { /* broadcaster shutting down */ } public void onAddAtmosphereResource(Broadcaster b, AtmosphereResource r) { /* client joined */ } public void onRemoveAtmosphereResource(Broadcaster b, AtmosphereResource r) { /* client left */ } public void onMessage(Broadcaster b, Deliver deliver) { /* message queued */ }}This is useful for monitoring, auditing, or triggering side effects when broadcasters are created/destroyed or when clients subscribe/unsubscribe.
@DeliverTo
Section titled “@DeliverTo”By default, the return value of a @Message method is broadcast to all subscribers on the endpoint’s Broadcaster. The @DeliverTo annotation changes the delivery scope.
DELIVER_TO.BROADCASTER
Section titled “DELIVER_TO.BROADCASTER”The default behavior — deliver to all subscribers on this Broadcaster:
@org.atmosphere.config.service.Message(encoders = {JacksonEncoder.class}, decoders = {JacksonDecoder.class})@DeliverTo(DELIVER_TO.BROADCASTER)public Message onMessage(Message message) { return message; // sent to all subscribers on this broadcaster}DELIVER_TO.RESOURCE
Section titled “DELIVER_TO.RESOURCE”Deliver only to the resource that sent the message. Useful for acknowledgments or request/response patterns:
@org.atmosphere.config.service.Message(decoders = {JacksonDecoder.class})@DeliverTo(DELIVER_TO.RESOURCE)public String onMessage(Message message) { return "Received: " + message.getMessage(); // sent only to the sender}DELIVER_TO.ALL
Section titled “DELIVER_TO.ALL”Deliver to all resources across all Broadcaster instances. This is a global broadcast:
@org.atmosphere.config.service.Message(encoders = {JacksonEncoder.class}, decoders = {JacksonDecoder.class})@DeliverTo(DELIVER_TO.ALL)public Message onMessage(Message message) { return message; // sent to every connected client, on every broadcaster}Import
Section titled “Import”The enum and annotation are in org.atmosphere.config.service:
import org.atmosphere.config.service.DeliverTo;import org.atmosphere.config.service.DeliverTo.DELIVER_TO;Broadcaster Scope
Section titled “Broadcaster Scope”In addition to @DeliverTo (which controls delivery at the method level), the Broadcaster itself has a scope that controls which resources it can reach:
public enum SCOPE { REQUEST, // only the current request's AtmosphereResource APPLICATION, // all resources in the current web application (default) VM // all resources in the current JVM}Most applications use the default APPLICATION scope.
Patterns and Best Practices
Section titled “Patterns and Best Practices”One Broadcaster Per Topic
Section titled “One Broadcaster Per Topic”The natural pattern is one Broadcaster per topic or channel. With @ManagedService, this happens automatically based on the path:
@ManagedService(path = "/chat/{room}")public class ChatRoom { @PathParam("room") private String room; // Each room value (/chat/general, /chat/support) gets its own Broadcaster}Programmatic Broadcasting
Section titled “Programmatic Broadcasting”When you need to broadcast from outside a @ManagedService class (e.g., from a REST controller or a scheduled task), use BroadcasterFactory:
@Injectprivate BroadcasterFactory factory;
public void notifyAll(String message) { factory.findBroadcaster("/notifications").ifPresent(b -> b.broadcast(message));}Targeted Delivery
Section titled “Targeted Delivery”To send a message to a specific client, use the two-argument broadcast:
@Injectprivate BroadcasterFactory factory;
public void notifyUser(AtmosphereResource target, String message) { factory.findBroadcaster("/notifications").ifPresent(b -> b.broadcast(message, target));}Broadcaster vs. Room
Section titled “Broadcaster vs. Room”The Broadcaster is a low-level pub/sub primitive. If your application needs presence tracking, stable member identity, or message history, consider the Room API instead. Rooms wrap Broadcasters and add higher-level features:
| Feature | Broadcaster | Room |
|---|---|---|
| Level | Low-level | High-level |
| Identity | Path-based ID | Named group |
| Presence | Manual tracking | Built-in events |
| Direct messaging | Manual UUID lookup | room.sendTo(memberId, msg) |
| Message history | Via BroadcasterCache | room.enableHistory(n) |
| Client protocol | Manual | Built into atmosphere.js |
For most chat and collaboration apps, prefer Rooms. Use Broadcaster directly when you need fine-grained control over message delivery, custom filters, or lifecycle policies.
Server-Side Broadcasts
Section titled “Server-Side Broadcasts”You are not limited to broadcasting from within a @ManagedService or @WebSocketHandlerService handler. Any server-side code — a scheduled task, a JMS listener, a background thread, a REST controller — can look up a Broadcaster and push messages to connected clients.
Broadcasting from a Scheduled Task
Section titled “Broadcasting from a Scheduled Task”Use BroadcasterFactory.lookup() to find a Broadcaster by its path, then call broadcast():
@Injectprivate BroadcasterFactory factory;
public void startPeriodicUpdates(ScheduledExecutorService scheduler) { scheduler.scheduleAtFixedRate(() -> { factory.findBroadcaster("/dashboard").ifPresent(b -> b.broadcast("{\"type\":\"heartbeat\",\"time\":" + System.currentTimeMillis() + "}") ); }, 0, 10, TimeUnit.SECONDS);}Using scheduleFixedBroadcast
Section titled “Using scheduleFixedBroadcast”The Broadcaster itself has a built-in scheduleFixedBroadcast method that simplifies periodic broadcasting without an external scheduler:
factory.findBroadcaster("/dashboard").ifPresent(b -> b.scheduleFixedBroadcast(() -> "Server Push " + Instant.now(), 10, TimeUnit.SECONDS));Broadcasting from External Events
Section titled “Broadcasting from External Events”A common pattern is reacting to an external event source (database change, message queue, file watcher) and pushing the update to connected clients:
// Called by a JMS listener, Kafka consumer, or any background threadpublic void onExternalEvent(String topic, String payload) { factory.findBroadcaster("/events/" + topic).ifPresent(b -> b.broadcast(payload));}The key point: BroadcasterFactory is thread-safe and can be called from any thread. Inject it or obtain it from the AtmosphereConfig.
Performance Tuning
Section titled “Performance Tuning”Virtual Threads (Default)
Section titled “Virtual Threads (Default)”Atmosphere 4.0 uses virtual threads (Executors.newVirtualThreadPerTaskExecutor()) by default for message delivery and async writes. Virtual threads are cheap to create and do not require pool sizing — the JVM schedules them onto a small number of carrier threads automatically. This means:
- No thread pool exhaustion — each broadcast gets its own virtual thread at near-zero cost
- No
maxProcessingThreadstuning needed — the old 2.x advice about bounding thread pools does not apply ReentrantLockinstead ofsynchronized—DefaultBroadcasterusesReentrantLockto avoid virtual thread pinning
To opt out and use platform threads instead (e.g., for compatibility with libraries that don’t work well with virtual threads):
<init-param> <param-name>org.atmosphere.cpr.useVirtualThreads</param-name> <param-value>false</param-value></init-param>Out-of-Order Broadcasts
Section titled “Out-of-Order Broadcasts”By default, message ordering is guaranteed: broadcast("A") followed by broadcast("B") delivers A before B to every client. If your application does not require strict ordering, disabling this constraint can improve throughput:
<init-param> <param-name>org.atmosphere.cpr.Broadcaster.supportOutOfOrderBroadcast</param-name> <param-value>true</param-value></init-param>Summary
Section titled “Summary”| Concept | Purpose |
|---|---|
Broadcaster | Pub/sub hub; delivers messages to subscribed AtmosphereResource instances |
DefaultBroadcaster | Default implementation; supports all transports, async delivery |
SimpleBroadcaster | Lighter implementation; typically used with WebSocket-only endpoints |
BroadcasterFactory | Registry for looking up, creating, and iterating over Broadcaster instances |
UUIDBroadcasterCache | Default cache; replays missed messages on reconnection |
BroadcastFilter | Intercepts and transforms messages before delivery |
PerRequestBroadcastFilter | Per-subscriber message filtering (e.g., role-based redaction) |
BroadcasterListener | Lifecycle events: creation, destruction, subscribe, unsubscribe |
@DeliverTo | Controls delivery scope: RESOURCE, BROADCASTER, or ALL |