Server gesendete Ereignisse httpclient
public class SseSubscriber implements BodySubscriber<Void>
protected static final Pattern dataLinePattern = Pattern.compile( "^data: ?(.*)$" );
protected static String extractMessageData( String[] messageLines )
var s = new StringBuilder( );
for ( var line : messageLines )
var m = dataLinePattern.matcher( line );
if ( m.matches( ) )
s.append( 1 ) );
return s.toString( );
protected final Consumer<? super String> messageDataConsumer;
protected final CompletableFuture<Void> future;
protected volatile Subscription subscription;
protected volatile String deferredText;
public SseSubscriber( Consumer<? super String> messageDataConsumer )
this.messageDataConsumer = messageDataConsumer;
this.future = new CompletableFuture<>( );
this.subscription = null;
this.deferredText = null;
public void onSubscribe( Subscription subscription )
this.subscription = subscription;
this.deferredText = "";
this.subscription.request( 1 );
catch ( Exception e )
this.future.completeExceptionally( e );
this.subscription.cancel( );
public void onNext( List<ByteBuffer> buffers )
// Volatile read
var deferredText = this.deferredText;
for ( var buffer : buffers )
// TODO: Safe to assume multi-byte chars don't get split across buffers?
var s = deferredText + UTF_8.decode( buffer );
// -1 means don't discard trailing empty tokens ... so the final token will
// be whatever is left after the last \n\n (possibly the empty string, but
// not necessarily), which is the part we need to defer until the next loop
// iteration
var tokens = s.split( "\n\n", -1 );
// Final token gets deferred, not processed here
for ( var i = 0; i < tokens.length - 1; i++ )
var message = tokens[ i ];
var lines = message.split( "\n" );
var data = extractMessageData( lines );
this.messageDataConsumer.accept( data );
// TODO: Handle lines that start with "event:", "id:", "retry:"
// Defer the final token
deferredText = tokens[ tokens.length - 1 ];
// Volatile write
this.deferredText = deferredText;
this.subscription.request( 1 );
catch ( Exception e )
this.future.completeExceptionally( e );
this.subscription.cancel( );
public void onError( Throwable e )
this.future.completeExceptionally( e );
public void onComplete( )
this.future.complete( null );
catch ( Exception e )
this.future.completeExceptionally( e );
public CompletionStage<Void> getBody( )
return this.future;
Successful Starling