Spring 5 reactive websockets: Clients not receiving same data from hot stream2019 Community Moderator ElectionSpring 5 Web Reactive - Hot Publishing - How to use EmitterProcessor to bridge a MessageListener to an event streamReceiving a Flux in a Spring Boot ClientSpring 5 Web Reactive - Web Client - Use of flatmap() on the response streamHow to use Spring Reactive WebSocket and transform it into the Flux stream?Log the payload with spring reactive web filter and web clientHow to ensure Reactive Stream completes with Spring WebFlux and WebSocketsTo use Spring data Cassandra Reactive or notSpring Cloud Stream Reactive Listener Without OutputMono Slice with data transformation in spring reactive cassandraHow to push data over reactive websocket with Spring in response to request?
Tabular environment - text vertically positions itself by bottom of tikz picture in adjacent cell
After Brexit, will the EU recognize British passports that are valid for more than ten years?
What does it take to become a wilderness skills guide as a business?
Does an unused member variable take up memory?
Draw this image in the TIKZ package
Having the player face themselves after the mid-game
Should we avoid writing fiction about historical events without extensive research?
Why do we say 'Pairwise Disjoint', rather than 'Disjoint'?
Ultrafilters as a double dual
Why does this boat have a landing pad? (SpaceX's GO Searcher) Any plans for propulsive capsule landings?
Did Amazon pay $0 in taxes last year?
Was this cameo in Captain Marvel computer generated?
Short story about an infectious indestructible metal bar?
How do you make a gun that shoots melee weapons and/or swords?
Why is there an extra space when I type "ls" on the Desktop?
Is it a Cyclops number? "Nobody" knows!
Averaging over columns while ignoring zero entries
Short story about cities being connected by a conveyor belt
Should I apply for my boss's promotion?
Mixed Feelings - What am I
Will the concrete slab in a partially heated shed conduct a lot of heat to the unconditioned area?
How to educate team mate to take screenshots for bugs with out unwanted stuff
How to write a chaotic neutral protagonist and prevent my readers from thinking they are evil?
What does *dead* mean in *What do you mean, dead?*?
Spring 5 reactive websockets: Clients not receiving same data from hot stream
2019 Community Moderator ElectionSpring 5 Web Reactive - Hot Publishing - How to use EmitterProcessor to bridge a MessageListener to an event streamReceiving a Flux in a Spring Boot ClientSpring 5 Web Reactive - Web Client - Use of flatmap() on the response streamHow to use Spring Reactive WebSocket and transform it into the Flux stream?Log the payload with spring reactive web filter and web clientHow to ensure Reactive Stream completes with Spring WebFlux and WebSocketsTo use Spring data Cassandra Reactive or notSpring Cloud Stream Reactive Listener Without OutputMono Slice with data transformation in spring reactive cassandraHow to push data over reactive websocket with Spring in response to request?
I have this in my WebSocketHandler
implementation:
@Override
public Mono<Void> handle(WebSocketSession session)
return session.send(
session.receive()
.flatMap(webSocketMessage ->
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
var publisher = flux
.<String>handle((o, sink) ->
try
sink.next(objectMapper.writeValueAsString(o));
catch (JsonProcessingException e)
e.printStackTrace();
)
.map(session::textMessage);
return publisher;
)
);
The Flux<EfficiencyData>
is currently generated for testing in the service as follows:
public Flux<EfficiencyData> subscribeToEfficiencyData(long weavingLoomId)
return Flux.interval(Duration.ofSeconds(1))
.map(aLong ->
longAdder.increment();
return new EfficiencyData(new MachineSpeed(
RotationSpeed.ofRpm(longAdder.intValue()),
RotationSpeed.ofRpm(0),
RotationSpeed.ofRpm(400)));
).publish().autoConnect();
I am using publish().autoConnect()
to make it a hot stream. I created a unit test that starts 2 threads that do this on the returned Flux
:
flux.log().handle((s, sink) ->
LOGGER.info("", s.getMachineSpeed().getCurrent());
).subscribe();
In this case, I see both threads printing out the same value every second.
However, when I open 2 browser tabs, I don't see the same values in both my web pages. The more websocket clients that connect, the more the values are apart (So each value from the original Flux seems to be sent to a different client, instead of sent to all of them).
java spring-webflux project-reactor
add a comment |
I have this in my WebSocketHandler
implementation:
@Override
public Mono<Void> handle(WebSocketSession session)
return session.send(
session.receive()
.flatMap(webSocketMessage ->
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
var publisher = flux
.<String>handle((o, sink) ->
try
sink.next(objectMapper.writeValueAsString(o));
catch (JsonProcessingException e)
e.printStackTrace();
)
.map(session::textMessage);
return publisher;
)
);
The Flux<EfficiencyData>
is currently generated for testing in the service as follows:
public Flux<EfficiencyData> subscribeToEfficiencyData(long weavingLoomId)
return Flux.interval(Duration.ofSeconds(1))
.map(aLong ->
longAdder.increment();
return new EfficiencyData(new MachineSpeed(
RotationSpeed.ofRpm(longAdder.intValue()),
RotationSpeed.ofRpm(0),
RotationSpeed.ofRpm(400)));
).publish().autoConnect();
I am using publish().autoConnect()
to make it a hot stream. I created a unit test that starts 2 threads that do this on the returned Flux
:
flux.log().handle((s, sink) ->
LOGGER.info("", s.getMachineSpeed().getCurrent());
).subscribe();
In this case, I see both threads printing out the same value every second.
However, when I open 2 browser tabs, I don't see the same values in both my web pages. The more websocket clients that connect, the more the values are apart (So each value from the original Flux seems to be sent to a different client, instead of sent to all of them).
java spring-webflux project-reactor
add a comment |
I have this in my WebSocketHandler
implementation:
@Override
public Mono<Void> handle(WebSocketSession session)
return session.send(
session.receive()
.flatMap(webSocketMessage ->
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
var publisher = flux
.<String>handle((o, sink) ->
try
sink.next(objectMapper.writeValueAsString(o));
catch (JsonProcessingException e)
e.printStackTrace();
)
.map(session::textMessage);
return publisher;
)
);
The Flux<EfficiencyData>
is currently generated for testing in the service as follows:
public Flux<EfficiencyData> subscribeToEfficiencyData(long weavingLoomId)
return Flux.interval(Duration.ofSeconds(1))
.map(aLong ->
longAdder.increment();
return new EfficiencyData(new MachineSpeed(
RotationSpeed.ofRpm(longAdder.intValue()),
RotationSpeed.ofRpm(0),
RotationSpeed.ofRpm(400)));
).publish().autoConnect();
I am using publish().autoConnect()
to make it a hot stream. I created a unit test that starts 2 threads that do this on the returned Flux
:
flux.log().handle((s, sink) ->
LOGGER.info("", s.getMachineSpeed().getCurrent());
).subscribe();
In this case, I see both threads printing out the same value every second.
However, when I open 2 browser tabs, I don't see the same values in both my web pages. The more websocket clients that connect, the more the values are apart (So each value from the original Flux seems to be sent to a different client, instead of sent to all of them).
java spring-webflux project-reactor
I have this in my WebSocketHandler
implementation:
@Override
public Mono<Void> handle(WebSocketSession session)
return session.send(
session.receive()
.flatMap(webSocketMessage ->
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
var publisher = flux
.<String>handle((o, sink) ->
try
sink.next(objectMapper.writeValueAsString(o));
catch (JsonProcessingException e)
e.printStackTrace();
)
.map(session::textMessage);
return publisher;
)
);
The Flux<EfficiencyData>
is currently generated for testing in the service as follows:
public Flux<EfficiencyData> subscribeToEfficiencyData(long weavingLoomId)
return Flux.interval(Duration.ofSeconds(1))
.map(aLong ->
longAdder.increment();
return new EfficiencyData(new MachineSpeed(
RotationSpeed.ofRpm(longAdder.intValue()),
RotationSpeed.ofRpm(0),
RotationSpeed.ofRpm(400)));
).publish().autoConnect();
I am using publish().autoConnect()
to make it a hot stream. I created a unit test that starts 2 threads that do this on the returned Flux
:
flux.log().handle((s, sink) ->
LOGGER.info("", s.getMachineSpeed().getCurrent());
).subscribe();
In this case, I see both threads printing out the same value every second.
However, when I open 2 browser tabs, I don't see the same values in both my web pages. The more websocket clients that connect, the more the values are apart (So each value from the original Flux seems to be sent to a different client, instead of sent to all of them).
java spring-webflux project-reactor
java spring-webflux project-reactor
asked 2 days ago
Wim DeblauweWim Deblauwe
10.6k969135
10.6k969135
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
Managed to fix this thanks to Brian Clozel on twitter.
The issue is that for each connecting websocket client, I call the service.subscribeToEfficiencyData(id)
method, which returns a new Flux every time it is called. So of course, those independent Flux'es are not being shared between the different websocket clients.
To fix the issue, I create the Flux
instance in the constructor or a PostConstruct
method of my service so the subscribeToEfficiencyData
returns the same Flux instance every time.
Note that .publish().autoConnect()
on the Flux remains important, because without that websocket clients will again see different values!
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55023517%2fspring-5-reactive-websockets-clients-not-receiving-same-data-from-hot-stream%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Managed to fix this thanks to Brian Clozel on twitter.
The issue is that for each connecting websocket client, I call the service.subscribeToEfficiencyData(id)
method, which returns a new Flux every time it is called. So of course, those independent Flux'es are not being shared between the different websocket clients.
To fix the issue, I create the Flux
instance in the constructor or a PostConstruct
method of my service so the subscribeToEfficiencyData
returns the same Flux instance every time.
Note that .publish().autoConnect()
on the Flux remains important, because without that websocket clients will again see different values!
add a comment |
Managed to fix this thanks to Brian Clozel on twitter.
The issue is that for each connecting websocket client, I call the service.subscribeToEfficiencyData(id)
method, which returns a new Flux every time it is called. So of course, those independent Flux'es are not being shared between the different websocket clients.
To fix the issue, I create the Flux
instance in the constructor or a PostConstruct
method of my service so the subscribeToEfficiencyData
returns the same Flux instance every time.
Note that .publish().autoConnect()
on the Flux remains important, because without that websocket clients will again see different values!
add a comment |
Managed to fix this thanks to Brian Clozel on twitter.
The issue is that for each connecting websocket client, I call the service.subscribeToEfficiencyData(id)
method, which returns a new Flux every time it is called. So of course, those independent Flux'es are not being shared between the different websocket clients.
To fix the issue, I create the Flux
instance in the constructor or a PostConstruct
method of my service so the subscribeToEfficiencyData
returns the same Flux instance every time.
Note that .publish().autoConnect()
on the Flux remains important, because without that websocket clients will again see different values!
Managed to fix this thanks to Brian Clozel on twitter.
The issue is that for each connecting websocket client, I call the service.subscribeToEfficiencyData(id)
method, which returns a new Flux every time it is called. So of course, those independent Flux'es are not being shared between the different websocket clients.
To fix the issue, I create the Flux
instance in the constructor or a PostConstruct
method of my service so the subscribeToEfficiencyData
returns the same Flux instance every time.
Note that .publish().autoConnect()
on the Flux remains important, because without that websocket clients will again see different values!
answered yesterday
Wim DeblauweWim Deblauwe
10.6k969135
10.6k969135
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55023517%2fspring-5-reactive-websockets-clients-not-receiving-same-data-from-hot-stream%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown