Spring 5 reactive websockets: Clients not receiving same data from hot stream2019 Community Moderator ElectionSpring 5 Web Reactive - Hot Publishing - How to use EmitterProcessor to bridge a MessageListener to an event streamReceiving a Flux in a Spring Boot ClientSpring 5 Web Reactive - Web Client - Use of flatmap() on the response streamHow to use Spring Reactive WebSocket and transform it into the Flux stream?Log the payload with spring reactive web filter and web clientHow to ensure Reactive Stream completes with Spring WebFlux and WebSocketsTo use Spring data Cassandra Reactive or notSpring Cloud Stream Reactive Listener Without OutputMono Slice with data transformation in spring reactive cassandraHow to push data over reactive websocket with Spring in response to request?

Tabular environment - text vertically positions itself by bottom of tikz picture in adjacent cell

After Brexit, will the EU recognize British passports that are valid for more than ten years?

What does it take to become a wilderness skills guide as a business?

Does an unused member variable take up memory?

Draw this image in the TIKZ package

Having the player face themselves after the mid-game

Should we avoid writing fiction about historical events without extensive research?

Why do we say 'Pairwise Disjoint', rather than 'Disjoint'?

Ultrafilters as a double dual

Why does this boat have a landing pad? (SpaceX's GO Searcher) Any plans for propulsive capsule landings?

Did Amazon pay $0 in taxes last year?

Was this cameo in Captain Marvel computer generated?

Short story about an infectious indestructible metal bar?

How do you make a gun that shoots melee weapons and/or swords?

Why is there an extra space when I type "ls" on the Desktop?

Is it a Cyclops number? "Nobody" knows!

Averaging over columns while ignoring zero entries

Short story about cities being connected by a conveyor belt

Should I apply for my boss's promotion?

Mixed Feelings - What am I

Will the concrete slab in a partially heated shed conduct a lot of heat to the unconditioned area?

How to educate team mate to take screenshots for bugs with out unwanted stuff

How to write a chaotic neutral protagonist and prevent my readers from thinking they are evil?

What does *dead* mean in *What do you mean, dead?*?



Spring 5 reactive websockets: Clients not receiving same data from hot stream



2019 Community Moderator ElectionSpring 5 Web Reactive - Hot Publishing - How to use EmitterProcessor to bridge a MessageListener to an event streamReceiving a Flux in a Spring Boot ClientSpring 5 Web Reactive - Web Client - Use of flatmap() on the response streamHow to use Spring Reactive WebSocket and transform it into the Flux stream?Log the payload with spring reactive web filter and web clientHow to ensure Reactive Stream completes with Spring WebFlux and WebSocketsTo use Spring data Cassandra Reactive or notSpring Cloud Stream Reactive Listener Without OutputMono Slice with data transformation in spring reactive cassandraHow to push data over reactive websocket with Spring in response to request?










1















I have this in my WebSocketHandler implementation:



@Override
public Mono<Void> handle(WebSocketSession session)

return session.send(
session.receive()
.flatMap(webSocketMessage ->
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
var publisher = flux
.<String>handle((o, sink) ->
try
sink.next(objectMapper.writeValueAsString(o));
catch (JsonProcessingException e)
e.printStackTrace();

)
.map(session::textMessage);

return publisher;
)
);



The Flux<EfficiencyData> is currently generated for testing in the service as follows:



public Flux<EfficiencyData> subscribeToEfficiencyData(long weavingLoomId) 
return Flux.interval(Duration.ofSeconds(1))
.map(aLong ->
longAdder.increment();
return new EfficiencyData(new MachineSpeed(
RotationSpeed.ofRpm(longAdder.intValue()),
RotationSpeed.ofRpm(0),
RotationSpeed.ofRpm(400)));
).publish().autoConnect();



I am using publish().autoConnect() to make it a hot stream. I created a unit test that starts 2 threads that do this on the returned Flux:



flux.log().handle((s, sink) -> 
LOGGER.info("", s.getMachineSpeed().getCurrent());
).subscribe();


In this case, I see both threads printing out the same value every second.



However, when I open 2 browser tabs, I don't see the same values in both my web pages. The more websocket clients that connect, the more the values are apart (So each value from the original Flux seems to be sent to a different client, instead of sent to all of them).










share|improve this question


























    1















    I have this in my WebSocketHandler implementation:



    @Override
    public Mono<Void> handle(WebSocketSession session)

    return session.send(
    session.receive()
    .flatMap(webSocketMessage ->
    int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

    Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
    var publisher = flux
    .<String>handle((o, sink) ->
    try
    sink.next(objectMapper.writeValueAsString(o));
    catch (JsonProcessingException e)
    e.printStackTrace();

    )
    .map(session::textMessage);

    return publisher;
    )
    );



    The Flux<EfficiencyData> is currently generated for testing in the service as follows:



    public Flux<EfficiencyData> subscribeToEfficiencyData(long weavingLoomId) 
    return Flux.interval(Duration.ofSeconds(1))
    .map(aLong ->
    longAdder.increment();
    return new EfficiencyData(new MachineSpeed(
    RotationSpeed.ofRpm(longAdder.intValue()),
    RotationSpeed.ofRpm(0),
    RotationSpeed.ofRpm(400)));
    ).publish().autoConnect();



    I am using publish().autoConnect() to make it a hot stream. I created a unit test that starts 2 threads that do this on the returned Flux:



    flux.log().handle((s, sink) -> 
    LOGGER.info("", s.getMachineSpeed().getCurrent());
    ).subscribe();


    In this case, I see both threads printing out the same value every second.



    However, when I open 2 browser tabs, I don't see the same values in both my web pages. The more websocket clients that connect, the more the values are apart (So each value from the original Flux seems to be sent to a different client, instead of sent to all of them).










    share|improve this question
























      1












      1








      1








      I have this in my WebSocketHandler implementation:



      @Override
      public Mono<Void> handle(WebSocketSession session)

      return session.send(
      session.receive()
      .flatMap(webSocketMessage ->
      int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

      Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
      var publisher = flux
      .<String>handle((o, sink) ->
      try
      sink.next(objectMapper.writeValueAsString(o));
      catch (JsonProcessingException e)
      e.printStackTrace();

      )
      .map(session::textMessage);

      return publisher;
      )
      );



      The Flux<EfficiencyData> is currently generated for testing in the service as follows:



      public Flux<EfficiencyData> subscribeToEfficiencyData(long weavingLoomId) 
      return Flux.interval(Duration.ofSeconds(1))
      .map(aLong ->
      longAdder.increment();
      return new EfficiencyData(new MachineSpeed(
      RotationSpeed.ofRpm(longAdder.intValue()),
      RotationSpeed.ofRpm(0),
      RotationSpeed.ofRpm(400)));
      ).publish().autoConnect();



      I am using publish().autoConnect() to make it a hot stream. I created a unit test that starts 2 threads that do this on the returned Flux:



      flux.log().handle((s, sink) -> 
      LOGGER.info("", s.getMachineSpeed().getCurrent());
      ).subscribe();


      In this case, I see both threads printing out the same value every second.



      However, when I open 2 browser tabs, I don't see the same values in both my web pages. The more websocket clients that connect, the more the values are apart (So each value from the original Flux seems to be sent to a different client, instead of sent to all of them).










      share|improve this question














      I have this in my WebSocketHandler implementation:



      @Override
      public Mono<Void> handle(WebSocketSession session)

      return session.send(
      session.receive()
      .flatMap(webSocketMessage ->
      int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

      Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
      var publisher = flux
      .<String>handle((o, sink) ->
      try
      sink.next(objectMapper.writeValueAsString(o));
      catch (JsonProcessingException e)
      e.printStackTrace();

      )
      .map(session::textMessage);

      return publisher;
      )
      );



      The Flux<EfficiencyData> is currently generated for testing in the service as follows:



      public Flux<EfficiencyData> subscribeToEfficiencyData(long weavingLoomId) 
      return Flux.interval(Duration.ofSeconds(1))
      .map(aLong ->
      longAdder.increment();
      return new EfficiencyData(new MachineSpeed(
      RotationSpeed.ofRpm(longAdder.intValue()),
      RotationSpeed.ofRpm(0),
      RotationSpeed.ofRpm(400)));
      ).publish().autoConnect();



      I am using publish().autoConnect() to make it a hot stream. I created a unit test that starts 2 threads that do this on the returned Flux:



      flux.log().handle((s, sink) -> 
      LOGGER.info("", s.getMachineSpeed().getCurrent());
      ).subscribe();


      In this case, I see both threads printing out the same value every second.



      However, when I open 2 browser tabs, I don't see the same values in both my web pages. The more websocket clients that connect, the more the values are apart (So each value from the original Flux seems to be sent to a different client, instead of sent to all of them).







      java spring-webflux project-reactor






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked 2 days ago









      Wim DeblauweWim Deblauwe

      10.6k969135




      10.6k969135






















          1 Answer
          1






          active

          oldest

          votes


















          2














          Managed to fix this thanks to Brian Clozel on twitter.



          The issue is that for each connecting websocket client, I call the service.subscribeToEfficiencyData(id) method, which returns a new Flux every time it is called. So of course, those independent Flux'es are not being shared between the different websocket clients.



          To fix the issue, I create the Flux instance in the constructor or a PostConstruct method of my service so the subscribeToEfficiencyData returns the same Flux instance every time.



          Note that .publish().autoConnect() on the Flux remains important, because without that websocket clients will again see different values!






          share|improve this answer






















            Your Answer






            StackExchange.ifUsing("editor", function ()
            StackExchange.using("externalEditor", function ()
            StackExchange.using("snippets", function ()
            StackExchange.snippets.init();
            );
            );
            , "code-snippets");

            StackExchange.ready(function()
            var channelOptions =
            tags: "".split(" "),
            id: "1"
            ;
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function()
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled)
            StackExchange.using("snippets", function()
            createEditor();
            );

            else
            createEditor();

            );

            function createEditor()
            StackExchange.prepareEditor(
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader:
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            ,
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            );



            );













            draft saved

            draft discarded


















            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55023517%2fspring-5-reactive-websockets-clients-not-receiving-same-data-from-hot-stream%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            2














            Managed to fix this thanks to Brian Clozel on twitter.



            The issue is that for each connecting websocket client, I call the service.subscribeToEfficiencyData(id) method, which returns a new Flux every time it is called. So of course, those independent Flux'es are not being shared between the different websocket clients.



            To fix the issue, I create the Flux instance in the constructor or a PostConstruct method of my service so the subscribeToEfficiencyData returns the same Flux instance every time.



            Note that .publish().autoConnect() on the Flux remains important, because without that websocket clients will again see different values!






            share|improve this answer



























              2














              Managed to fix this thanks to Brian Clozel on twitter.



              The issue is that for each connecting websocket client, I call the service.subscribeToEfficiencyData(id) method, which returns a new Flux every time it is called. So of course, those independent Flux'es are not being shared between the different websocket clients.



              To fix the issue, I create the Flux instance in the constructor or a PostConstruct method of my service so the subscribeToEfficiencyData returns the same Flux instance every time.



              Note that .publish().autoConnect() on the Flux remains important, because without that websocket clients will again see different values!






              share|improve this answer

























                2












                2








                2







                Managed to fix this thanks to Brian Clozel on twitter.



                The issue is that for each connecting websocket client, I call the service.subscribeToEfficiencyData(id) method, which returns a new Flux every time it is called. So of course, those independent Flux'es are not being shared between the different websocket clients.



                To fix the issue, I create the Flux instance in the constructor or a PostConstruct method of my service so the subscribeToEfficiencyData returns the same Flux instance every time.



                Note that .publish().autoConnect() on the Flux remains important, because without that websocket clients will again see different values!






                share|improve this answer













                Managed to fix this thanks to Brian Clozel on twitter.



                The issue is that for each connecting websocket client, I call the service.subscribeToEfficiencyData(id) method, which returns a new Flux every time it is called. So of course, those independent Flux'es are not being shared between the different websocket clients.



                To fix the issue, I create the Flux instance in the constructor or a PostConstruct method of my service so the subscribeToEfficiencyData returns the same Flux instance every time.



                Note that .publish().autoConnect() on the Flux remains important, because without that websocket clients will again see different values!







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered yesterday









                Wim DeblauweWim Deblauwe

                10.6k969135




                10.6k969135





























                    draft saved

                    draft discarded
















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid


                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.

                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function ()
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55023517%2fspring-5-reactive-websockets-clients-not-receiving-same-data-from-hot-stream%23new-answer', 'question_page');

                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Save data to MySQL database using ExtJS and PHP [closed]2019 Community Moderator ElectionHow can I prevent SQL injection in PHP?Which MySQL data type to use for storing boolean valuesPHP: Delete an element from an arrayHow do I connect to a MySQL Database in Python?Should I use the datetime or timestamp data type in MySQL?How to get a list of MySQL user accountsHow Do You Parse and Process HTML/XML in PHP?Reference — What does this symbol mean in PHP?How does PHP 'foreach' actually work?Why shouldn't I use mysql_* functions in PHP?

                    Compiling GNU Global with universal-ctags support Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 23, 2019 at 23:30 UTC (7:30pm US/Eastern) Data science time! April 2019 and salary with experience The Ask Question Wizard is Live!Tags for Emacs: Relationship between etags, ebrowse, cscope, GNU Global and exuberant ctagsVim and Ctags tips and trickscscope or ctags why choose one over the other?scons and ctagsctags cannot open option file “.ctags”Adding tag scopes in universal-ctagsShould I use Universal-ctags?Universal ctags on WindowsHow do I install GNU Global with universal ctags support using Homebrew?Universal ctags with emacsHow to highlight ctags generated by Universal Ctags in Vim?

                    Add ONERROR event to image from jsp tldHow to add an image to a JPanel?Saving image from PHP URLHTML img scalingCheck if an image is loaded (no errors) with jQueryHow to force an <img> to take up width, even if the image is not loadedHow do I populate hidden form field with a value set in Spring ControllerStyling Raw elements Generated from JSP tagds with Jquery MobileLimit resizing of images with explicitly set width and height attributeserror TLD use in a jsp fileJsp tld files cannot be resolved