Skip to content

@AiEndpoint & StreamingSession

This chapter introduces Atmosphere’s AI platform. If you completed the Getting Started guide, you already have a running Atmosphere application — that is all you need to start streaming LLM texts. The core concepts (Broadcaster, Rooms, Interceptors) are useful background but not prerequisites for @AiEndpoint.

@AiEndpoint turns a plain Java class into a streaming AI chat endpoint, and StreamingSession delivers streaming texts from an LLM to the browser in real time.

A chat endpoint that:

  1. Accepts user messages over WebSocket or SSE
  2. Sends them to an LLM (Gemini, GPT, Claude, or a local Ollama model)
  3. Streams the response back text-by-text
  4. Handles connect/disconnect lifecycle automatically
  5. Runs the LLM call on a virtual thread so it never blocks the transport

@AiEndpoint eliminates the boilerplate of @ManagedService + @Ready + @Disconnect + @Message for AI streaming use cases. The annotated class must have exactly one method annotated with @Prompt.

AttributeTypeDefaultDescription
pathString(required)The URL path for this AI endpoint
timeoutlong120_000Max inactive timeout in milliseconds before the connection is closed
systemPromptString""Inline system prompt text
systemPromptResourceString""Classpath path to a system prompt file (e.g., "prompts/system-prompt.md"). Takes precedence over systemPrompt
interceptorsClass<? extends AiInterceptor>[]{}AI interceptors applied to every prompt (FIFO for preProcess, LIFO for postProcess)
conversationMemorybooleanfalseEnable automatic multi-turn conversation memory per client
maxHistoryMessagesint20Max messages retained in conversation memory per client (10 turns)
toolsClass<?>[]{}Tool provider classes with @AiTool-annotated methods
excludeToolsClass<?>[]{}Tool classes to exclude (only relevant when tools is empty, meaning all tools available)
fallbackStrategyString"NONE"Fallback strategy for model routing when the primary backend fails
guardrailsClass<? extends AiGuardrail>[]{}Guardrail classes that inspect requests before the LLM call and responses after
contextProvidersClass<? extends ContextProvider>[]{}RAG context augmentation providers
modelString""Override the model name for this endpoint (otherwise uses AiConfig.get().model())
filtersClass<? extends BroadcastFilter>[]{}Broadcast filters for this endpoint’s Broadcaster

The framework automatically:

  • Configures broadcaster cache and inactive timeout
  • Logs connect/disconnect events
  • Creates a StreamingSession per message
  • Invokes the @Prompt method on a virtual thread

@Prompt marks the method that handles incoming user messages. It accepts two method signatures:

// Minimal: message + session
@Prompt
public void onPrompt(String message, StreamingSession session) { ... }
// With resource access
@Prompt
public void onPrompt(String message, StreamingSession session, AtmosphereResource resource) { ... }

The method is invoked on a virtual thread, so it may perform blocking I/O (HTTP calls to LLM APIs) without blocking the Atmosphere thread pool.

This is the AiChat class from the spring-boot-ai-chat sample:

@AiEndpoint(path = "/atmosphere/ai-chat",
systemPromptResource = "prompts/system-prompt.md")
public class AiChat {
private static final Logger logger = LoggerFactory.getLogger(AiChat.class);
@Ready
public void onReady(AtmosphereResource resource) {
logger.info("Client {} connected (broadcaster: {})",
resource.uuid(), resource.getBroadcaster().getID());
}
@Disconnect
public void onDisconnect(AtmosphereResourceEvent event) {
logger.info("Client {} disconnected", event.getResource().uuid());
}
@Prompt
public void onPrompt(String message, StreamingSession session) {
logger.info("Received prompt: {}", message);
var settings = AiConfig.get();
if (settings == null || settings.client().apiKey() == null
|| settings.client().apiKey().isBlank()) {
DemoResponseProducer.stream(message, session);
return;
}
session.stream(message);
}
}

Key observations:

  1. @Ready and @Disconnect work the same as in @ManagedService — they handle connection lifecycle.
  2. @Prompt receives the user’s raw message and a StreamingSession.
  3. session.stream(message) sends the message to the resolved AI backend and streams the response back. This is the simplest way to invoke the LLM — the framework resolves the correct adapter (Spring AI, LangChain4j, ADK, or built-in) automatically.
  4. Demo fallback — if no API key is configured, the sample uses DemoResponseProducer to simulate streaming. This pattern is useful for local development without an API key.
  5. System prompt — loaded once at startup from prompts/system-prompt.md on the classpath via PromptLoader.

StreamingSession is the core SPI interface that all AI framework adapters push streaming texts through. It extends AutoCloseable and is thread-safe.

public interface StreamingSession extends AutoCloseable {
String sessionId();
void send(String streamingText);
void sendMetadata(String key, Object value);
void progress(String message);
void complete();
void complete(String summary);
void error(Throwable t);
boolean isClosed();
void sendContent(Content content);
void stream(String message);
}
MethodDescription
sessionId()Unique identifier for this streaming session
send(streamingText)Send a text chunk to the client (typically a single streaming text from the LLM)
sendMetadata(key, value)Send structured metadata alongside the stream (e.g., model name, usage stats)
progress(message)Send a human-readable progress update (e.g., “Thinking…”, “Searching documents…”)
complete()Signal that the stream has completed successfully
complete(summary)Signal completion with an aggregated final response
error(throwable)Signal that the stream has failed
isClosed()Whether this session has been completed or errored
sendContent(content)Send multi-modal content (text, images, files)
stream(message)Send the user message to the resolved AI backend and stream the response back

These two methods serve fundamentally different purposes:

  • send(streamingText) — pushes a single streaming text/chunk to the client. You call this yourself when you are manually generating or forwarding streaming texts. All AI adapter implementations call send() internally.
  • stream(message) — sends the user’s message to the AI backend resolved by the @AiEndpoint infrastructure and streams the response automatically. This is a one-call shortcut that handles the entire LLM round-trip.

In the AiChat example, session.stream(message) is used because the framework knows how to route to the correct AI backend. If you wanted to handle the LLM call yourself, you would call session.send() for each streaming text.

The sendContent(Content) method supports sending different content types:

// Text content (delegates to send())
session.sendContent(new Content.Text("Hello world"));
// Binary content types require overriding sendContent() in your session implementation

The wire protocol for content uses structured JSON:

{"type":"content","contentType":"text","data":"...","sessionId":"...","seq":1}
{"type":"content","contentType":"image","mimeType":"image/png","data":"<base64>","sessionId":"...","seq":2}

Every message from StreamingSession is a JSON object written directly to the WebSocket (or SSE) connection:

{"type":"streaming-text","data":"Hello","sessionId":"abc-123","seq":1}
{"type":"streaming-text","data":" world","sessionId":"abc-123","seq":2}
{"type":"progress","data":"Thinking...","sessionId":"abc-123","seq":3}
{"type":"metadata","data":"{\"model\":\"gemini-2.5-flash\"}","sessionId":"abc-123","seq":4}
{"type":"complete","data":"","sessionId":"abc-123","seq":5}
TypeDescription
streaming-textA single streaming text/chunk from the LLM
progressA human-readable status update (e.g., “Searching documents…”)
metadataStructured metadata (model name, usage stats)
completeStream finished successfully
errorStream failed — data contains the error message

The seq field is a monotonically increasing counter for deduplication on reconnect.

Option B: @ManagedService (Manual Approach)

Section titled “Option B: @ManagedService (Manual Approach)”

The @AiEndpoint annotation handles lifecycle, session creation, and virtual thread dispatch automatically. For more control, you can use @ManagedService directly:

@ManagedService(path = "/ai-chat")
public class AiChat {
@Inject private AtmosphereResource resource;
@Message
public void onMessage(String prompt) {
var settings = AiConfig.get();
var session = StreamingSessions.start(resource);
var request = ChatCompletionRequest.builder(settings.model())
.system("You are a helpful assistant.")
.user(prompt)
.build();
Thread.startVirtualThread(() -> settings.client().streamChatCompletion(request, session));
}
}

Key differences from @AiEndpoint:

  • You create the StreamingSession yourself via StreamingSessions.start(resource).
  • You build the ChatCompletionRequest manually with model name, system prompt, and user message.
  • You launch the LLM call on a virtual thread explicitly with Thread.startVirtualThread().
  • You have full control over @Ready, @Disconnect, and error handling.

Both approaches produce the same wire protocol on the client side.

AiConfig provides global LLM configuration. It can be set programmatically, from environment variables, or from Atmosphere init-params.

AiConfig.configure("remote", "gemini-2.5-flash", apiKey, null);

Parameters: mode (“remote” or “local”), model name, apiKey, and optional baseUrl (null for auto-detection).

VariableDescriptionDefault
LLM_MODEremote (cloud API) or local (Ollama)remote
LLM_MODELModel name (e.g., gemini-2.5-flash, gpt-4o, llama3.2)gemini-2.5-flash
LLM_API_KEYAPI key (also checks OPENAI_API_KEY, GEMINI_API_KEY)(none)
LLM_BASE_URLOverride the API endpoint(auto-detected)

Auto-detection resolves the base URL from the model name:

  • Models starting with gpt- or o1/o3 route to https://api.openai.com/v1
  • All other remote models route to https://generativelanguage.googleapis.com/v1beta/openai
  • Local mode routes to http://localhost:11434/v1 (Ollama)
@ManagedService(path = "/ai-chat", atmosphereConfig = {
AiConfig.LLM_MODEL + "=gemini-2.5-flash",
AiConfig.LLM_MODE + "=remote",
AiConfig.LLM_API_KEY + "=AIza..."
})
var settings = AiConfig.get();
if (settings != null) {
String model = settings.model(); // e.g., "gemini-2.5-flash"
String baseUrl = settings.baseUrl(); // resolved API endpoint
boolean local = settings.isLocal(); // true if mode is "local"
var client = settings.client(); // OpenAiCompatibleClient instance
}

The LlmSettings record returned by AiConfig.get() contains:

FieldTypeDescription
client()OpenAiCompatibleClientHTTP client for the LLM API
model()StringModel name
mode()String”remote” or “local”
baseUrl()StringResolved API endpoint URL

AI interceptors run around the @Prompt method, separate from the transport-level AtmosphereInterceptor from Chapter 8:

public interface AiInterceptor {
default AiRequest preProcess(AiRequest request, AtmosphereResource resource) {
return request;
}
default void postProcess(AiRequest request, AtmosphereResource resource) {
}
}
  • preProcess runs FIFO (first declared, first executed). Return a modified AiRequest (e.g., with augmented message or different model) or the original request unchanged.
  • postProcess runs LIFO (last declared, first executed), matching the AtmosphereInterceptor convention.

Specify interceptors on the annotation:

@AiEndpoint(path = "/ai-chat",
interceptors = {RagInterceptor.class, LoggingInterceptor.class})

The spring-boot-ai-tools sample includes a CostMeteringInterceptor that estimates input costs in preProcess and sends routing metadata to the client in postProcess:

public class CostMeteringInterceptor implements AiInterceptor {
@Override
public AiRequest preProcess(AiRequest request, AtmosphereResource resource) {
int totalChars = request.systemPrompt().length() + request.message().length();
for (ChatMessage msg : request.history()) {
totalChars += msg.content().length();
}
long estimatedStreamingTexts = totalChars / 4;
// Store for postProcess
resource.getRequest().setAttribute("cost.estStreamingTexts", estimatedStreamingTexts);
resource.getRequest().setAttribute("cost.startNanos", System.nanoTime());
return request;
}
@Override
public void postProcess(AiRequest request, AtmosphereResource resource) {
var startNanos = (Long) resource.getRequest().getAttribute("cost.startNanos");
long elapsedMs = (System.nanoTime() - startNanos) / 1_000_000;
// Send metadata to the client via the streaming session
var session = resource.getRequest().getAttribute(
AiStreamingSession.STREAMING_SESSION_ATTR);
if (session instanceof StreamingSession s && !s.isClosed()) {
s.sendMetadata("routing.latency", elapsedMs);
}
}
}

The @Prompt method is invoked on a virtual thread. This is critical because LLM API calls are blocking HTTP requests that can take seconds or even minutes. Running them on virtual threads means:

  1. No thread pool exhaustion — Atmosphere’s platform thread pool is not consumed by LLM calls
  2. Simple blocking code — No need for reactive APIs or CompletableFuture; just call the API and it blocks the virtual thread
  3. Natural control flow — Use try/catch, loops, and sequential logic without callback chains

This is why session.stream(message) works so simply — it blocks the virtual thread until streaming completes, but the underlying platform threads are free to handle other connections.

Enable automatic conversation memory with conversationMemory = true:

@AiEndpoint(path = "/chat",
conversationMemory = true,
maxHistoryMessages = 30)

When enabled, the framework:

  1. Accumulates user/assistant turns per AtmosphereResource (keyed by resource.uuid())
  2. Injects the history into every AiRequest so all adapters get multi-turn context
  3. Clears memory automatically when the client disconnects

The AiConversationMemory interface defines the SPI:

public interface AiConversationMemory {
List<ChatMessage> getHistory(String conversationId);
void addMessage(String conversationId, ChatMessage message);
void clear(String conversationId);
int maxMessages();
}

The default implementation is InMemoryConversationMemory, which uses a sliding window capped at maxHistoryMessages.

AiGuardrail classes run before and after the LLM call:

@AiEndpoint(path = "/chat",
guardrails = {ContentSafetyGuardrail.class})

Execution order: guardrails -> interceptors -> [LLM] -> interceptors -> guardrails

ContextProvider classes augment the prompt with RAG context:

@AiEndpoint(path = "/chat",
contextProviders = {DocumentSearchProvider.class})

Use subscribeStreaming from atmosphere.js to connect to an @AiEndpoint:

import { subscribeStreaming } from 'atmosphere.js';
const handle = await subscribeStreaming(atmosphere, {
url: '/ai-chat',
transport: 'websocket',
}, {
onStreamingText: (streamingText) => output.textContent += streamingText,
onProgress: (msg) => status.textContent = msg,
onMetadata: (meta) => { /* model info, usage */ },
onComplete: () => console.log('Done'),
onError: (err) => console.error(err),
});
handle.send('Explain virtual threads in Java 21');
handle.close(); // disconnect when done

The callbacks map directly to the wire protocol message types: streaming-text, progress, metadata, complete, and error.

The useStreaming hook manages connection lifecycle, streaming text accumulation, and streaming state:

import { useStreaming } from 'atmosphere.js/react';
function AiChat() {
const { fullText, isStreaming, progress, send, reset } = useStreaming({
request: { url: '/ai-chat', transport: 'websocket' },
});
return (
<div>
<button onClick={() => send('What is Atmosphere?')} disabled={isStreaming}>Ask</button>
{isStreaming && <span>{progress ?? 'Generating...'}</span>}
<p>{fullText}</p>
<button onClick={reset}>Clear</button>
</div>
);
}

fullText accumulates all streaming-text messages into a single string. isStreaming is true between send() and complete/error. reset clears the accumulated text for a new prompt.

The Vue composable provides the same API surface as the React hook, with all values returned as Vue Ref or ComputedRef objects:

<script setup lang="ts">
import { useStreaming } from 'atmosphere.js/vue';
const { fullText, isStreaming, progress, send, reset } = useStreaming(
{ url: '/ai-chat', transport: 'websocket' },
);
</script>
<template>
<button @click="send('What is Atmosphere?')" :disabled="isStreaming">Ask</button>
<span v-if="isStreaming">{{ progress ?? 'Generating...' }}</span>
<p>{{ fullText }}</p>
<button @click="reset">Clear</button>
</template>

fullText is a computed ref that joins streaming texts automatically. Cleanup is handled via onUnmounted.

The Svelte store follows the same store contract as createAtmosphereStore. Use $store auto-subscription syntax:

<script>
import { createStreamingStore } from 'atmosphere.js/svelte';
const { store, send, reset } = createStreamingStore(
{ url: '/ai-chat', transport: 'websocket' },
);
</script>
<button on:click={() => send('What is Atmosphere?')} disabled={$store.isStreaming}>Ask</button>
{#if $store.isStreaming}
<span>{$store.progress ?? 'Generating...'}</span>
{/if}
<p>{$store.fullText}</p>
<button on:click={reset}>Clear</button>

$store.fullText, $store.isStreaming, $store.progress, and $store.error update reactively. The store connects when the first subscriber appears and disconnects when all unsubscribe.

The samples/spring-boot-ai-chat/ sample contains the complete AiChat endpoint shown above, along with a browser client. Run it with:

Terminal window
./mvnw spring-boot:run -pl samples/spring-boot-ai-chat
ConceptPurpose
@AiEndpointAnnotation that wires up an AI chat endpoint with streaming, lifecycle, and configuration
@PromptMarks the method that handles user messages (invoked on a virtual thread)
StreamingSessionSPI for pushing streaming texts to clients: send(), stream(), complete(), error()
AiConfigGlobal LLM configuration (model, API key, base URL)
AiInterceptorPre/post processing around the prompt (cost metering, RAG, logging)
AiConversationMemoryMulti-turn conversation history per client
AiGuardrailSafety checks before and after LLM calls
ContextProviderRAG context augmentation

In the next chapter, you will learn about @AiTool — Atmosphere’s framework-agnostic annotation for declaring tools that any LLM can call.