Channel communication between tasks The 2019 Stack Overflow Developer Survey Results Are In Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern) The Ask Question Wizard is Live! Data science time! April 2019 and salary with experienceWhat are the differences between Rust's `String` and `str`?How do I share a HashMap between Hyper handlers?`for<'r, 'r, 'r> …` cannot be sent between threads safelyRepeating a Rust task with tokio_timerHow to indefinitely read an unbounded channel in a Rust Tokio task?Forwarding data to a Tokio mpsc channel from a streamWhy `futures::channel::mpsc` can just notify one sender?Add new tasks to tokio event loop and retry tasks on failureImporting the channel module from Tokio Core failsCan a Tokio task terminate the whole runtime gracefully?

Is every episode of "Where are my Pants?" identical?

Are my PIs rude or am I just being too sensitive?

Why can't devices on different VLANs, but on the same subnet, communicate?

Segmentation fault output is suppressed when piping stdin into a function. Why?

Does Parliament need to approve the new Brexit delay to 31 October 2019?

Can the DM override racial traits?

Derivation tree not rendering

How to stretch delimiters to envolve matrices inside of a kbordermatrix?

What LEGO pieces have "real-world" functionality?

I could not break this equation. Please help me

Cooking pasta in a water boiler

How can I protect witches in combat who wear limited clothing?

Wall plug outlet change

Mortgage adviser recommends a longer term than necessary combined with overpayments

Why does the Event Horizon Telescope (EHT) not include telescopes from Africa, Asia or Australia?

Did the UK government pay "millions and millions of dollars" to try to snag Julian Assange?

What information about me do stores get via my credit card?

Would an alien lifeform be able to achieve space travel if lacking in vision?

The variadic template constructor of my class cannot modify my class members, why is that so?

Can smartphones with the same camera sensor have different image quality?

How to test the equality of two Pearson correlation coefficients computed from the same sample?

How to copy the contents of all files with a certain name into a new file?

does high air pressure throw off wheel balance?

Road tyres vs "Street" tyres for charity ride on MTB Tandem



Channel communication between tasks



The 2019 Stack Overflow Developer Survey Results Are In
Announcing the arrival of Valued Associate #679: Cesar Manara
Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern)
The Ask Question Wizard is Live!
Data science time! April 2019 and salary with experienceWhat are the differences between Rust's `String` and `str`?How do I share a HashMap between Hyper handlers?`for<'r, 'r, 'r> …` cannot be sent between threads safelyRepeating a Rust task with tokio_timerHow to indefinitely read an unbounded channel in a Rust Tokio task?Forwarding data to a Tokio mpsc channel from a streamWhy `futures::channel::mpsc` can just notify one sender?Add new tasks to tokio event loop and retry tasks on failureImporting the channel module from Tokio Core failsCan a Tokio task terminate the whole runtime gracefully?



.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;








0















I am trying to set up a channel based communication between one hyper service and one tokio stream. The problem is that the compiler rants with the following error:




closure is FnOnce because it moves the variable tx_queue out of
its environment.




After reading the explanation provided by rustc --explain E0525 it appears that tokio::sync::mpsc::Sender implements Clone but does not implement Copy (unless I overlooked something).



So I am a bit stuck. How can I get my service send messages to a tokio stream via a tokio::sync::mpsc channel ? I am sure I miss something obvious but can not see what :/



An excerpt of the problematic code (modified to make it shorter as @E_net4 requested):



 extern crate hyper;
extern crate tokio;
extern crate tokio_signal;

use futures::Stream;
use hyper::rt::Future;
use hyper::service::service_fn_ok;
use hyper::Body, Request, Response, Server;

use futures::sink::Sink;
use futures::sync::mpsc, oneshot;
use futures::future, stream;

fn main() _: Request<Body>);

let graceful = http_server
.with_graceful_shutdown(rx1)
.map_err(

pub fn start_queue(rx: mpsc::Receiver<usize>) -> impl Future<Item = (), Error = ()> ())



The entire code is available here : https://gist.github.com/jeromer/52aa2da43c5c93584c6ee55be68dd04e



Thanks :)










share|improve this question
























  • Please try to make a better Minimal, Complete, and Verifiable example, so that there is everything we need to reproduce the problem is in the question itself. That code snippet alone does not compile.

    – E_net4
    Mar 8 at 11:53

















0















I am trying to set up a channel based communication between one hyper service and one tokio stream. The problem is that the compiler rants with the following error:




closure is FnOnce because it moves the variable tx_queue out of
its environment.




After reading the explanation provided by rustc --explain E0525 it appears that tokio::sync::mpsc::Sender implements Clone but does not implement Copy (unless I overlooked something).



So I am a bit stuck. How can I get my service send messages to a tokio stream via a tokio::sync::mpsc channel ? I am sure I miss something obvious but can not see what :/



An excerpt of the problematic code (modified to make it shorter as @E_net4 requested):



 extern crate hyper;
extern crate tokio;
extern crate tokio_signal;

use futures::Stream;
use hyper::rt::Future;
use hyper::service::service_fn_ok;
use hyper::Body, Request, Response, Server;

use futures::sink::Sink;
use futures::sync::mpsc, oneshot;
use futures::future, stream;

fn main() _: Request<Body>);

let graceful = http_server
.with_graceful_shutdown(rx1)
.map_err(

pub fn start_queue(rx: mpsc::Receiver<usize>) -> impl Future<Item = (), Error = ()> ())



The entire code is available here : https://gist.github.com/jeromer/52aa2da43c5c93584c6ee55be68dd04e



Thanks :)










share|improve this question
























  • Please try to make a better Minimal, Complete, and Verifiable example, so that there is everything we need to reproduce the problem is in the question itself. That code snippet alone does not compile.

    – E_net4
    Mar 8 at 11:53













0












0








0








I am trying to set up a channel based communication between one hyper service and one tokio stream. The problem is that the compiler rants with the following error:




closure is FnOnce because it moves the variable tx_queue out of
its environment.




After reading the explanation provided by rustc --explain E0525 it appears that tokio::sync::mpsc::Sender implements Clone but does not implement Copy (unless I overlooked something).



So I am a bit stuck. How can I get my service send messages to a tokio stream via a tokio::sync::mpsc channel ? I am sure I miss something obvious but can not see what :/



An excerpt of the problematic code (modified to make it shorter as @E_net4 requested):



 extern crate hyper;
extern crate tokio;
extern crate tokio_signal;

use futures::Stream;
use hyper::rt::Future;
use hyper::service::service_fn_ok;
use hyper::Body, Request, Response, Server;

use futures::sink::Sink;
use futures::sync::mpsc, oneshot;
use futures::future, stream;

fn main() _: Request<Body>);

let graceful = http_server
.with_graceful_shutdown(rx1)
.map_err(

pub fn start_queue(rx: mpsc::Receiver<usize>) -> impl Future<Item = (), Error = ()> ())



The entire code is available here : https://gist.github.com/jeromer/52aa2da43c5c93584c6ee55be68dd04e



Thanks :)










share|improve this question
















I am trying to set up a channel based communication between one hyper service and one tokio stream. The problem is that the compiler rants with the following error:




closure is FnOnce because it moves the variable tx_queue out of
its environment.




After reading the explanation provided by rustc --explain E0525 it appears that tokio::sync::mpsc::Sender implements Clone but does not implement Copy (unless I overlooked something).



So I am a bit stuck. How can I get my service send messages to a tokio stream via a tokio::sync::mpsc channel ? I am sure I miss something obvious but can not see what :/



An excerpt of the problematic code (modified to make it shorter as @E_net4 requested):



 extern crate hyper;
extern crate tokio;
extern crate tokio_signal;

use futures::Stream;
use hyper::rt::Future;
use hyper::service::service_fn_ok;
use hyper::Body, Request, Response, Server;

use futures::sink::Sink;
use futures::sync::mpsc, oneshot;
use futures::future, stream;

fn main() _: Request<Body>);

let graceful = http_server
.with_graceful_shutdown(rx1)
.map_err(

pub fn start_queue(rx: mpsc::Receiver<usize>) -> impl Future<Item = (), Error = ()> ())



The entire code is available here : https://gist.github.com/jeromer/52aa2da43c5c93584c6ee55be68dd04e



Thanks :)







rust rust-tokio hyper






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Mar 8 at 13:40







Jérôme R

















asked Mar 8 at 11:33









Jérôme RJérôme R

7022820




7022820












  • Please try to make a better Minimal, Complete, and Verifiable example, so that there is everything we need to reproduce the problem is in the question itself. That code snippet alone does not compile.

    – E_net4
    Mar 8 at 11:53

















  • Please try to make a better Minimal, Complete, and Verifiable example, so that there is everything we need to reproduce the problem is in the question itself. That code snippet alone does not compile.

    – E_net4
    Mar 8 at 11:53
















Please try to make a better Minimal, Complete, and Verifiable example, so that there is everything we need to reproduce the problem is in the question itself. That code snippet alone does not compile.

– E_net4
Mar 8 at 11:53





Please try to make a better Minimal, Complete, and Verifiable example, so that there is everything we need to reproduce the problem is in the question itself. That code snippet alone does not compile.

– E_net4
Mar 8 at 11:53












1 Answer
1






active

oldest

votes


















2














The futures::sync::mpsc::Sender::send consumes the Sender and produces a Send object, which is a future that has to be run to completion to actually send the data. If the channel is full, it will block until someone else receives from the channel. Upon completion it gives you back the Sender which you could use to send more data in.



In this case I don't think you can structure the code with just single instance of the Sender. You need to clone it so that there is new clone for every call of the service function. Notice both closures are move now:



 let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || 
// This closure has one instance of tx_queue that was moved-in here.
// Now we make a copy to be moved into the closure below.
let tx_queue = tx_queue.clone();
service_fn_ok(move );


But, this would give you following warning:



warning: unused `futures::sink::send::Send` that must be used


As I said, the send just gives you a future that must be run to actually perform the sending. If you ignore the return value, nothing will happen. In this case, it would be best to spawn it as a separate task (so it doesn't block responding to the client). To spawn it, you need an executor from the runtime, which also has to be cloned for the inner closure:



 let executor = runtime.executor();
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || _: Request<Body>);





share|improve this answer























  • Ok I see what you mean. Thanks for the help :)

    – Jérôme R
    Mar 8 at 13:42











  • damm the error message is non sense. Also the double clone sux a little didn't find why I can't fix the error cause of that

    – Stargateur
    Mar 8 at 14:28












  • The cloning looks bad, but it is inevitable. Both of the closures can be called multiple times (they are Fn, not FnOnce), so they can not give out their own instance of tx_queue because they would not have it the next time they are called. So they keep one instance as a template and clone it when called. The clone is then moved/consumed.

    – michalsrb
    Mar 9 at 20:16











Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













draft saved

draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55062400%2fchannel-communication-between-tasks%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









2














The futures::sync::mpsc::Sender::send consumes the Sender and produces a Send object, which is a future that has to be run to completion to actually send the data. If the channel is full, it will block until someone else receives from the channel. Upon completion it gives you back the Sender which you could use to send more data in.



In this case I don't think you can structure the code with just single instance of the Sender. You need to clone it so that there is new clone for every call of the service function. Notice both closures are move now:



 let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || 
// This closure has one instance of tx_queue that was moved-in here.
// Now we make a copy to be moved into the closure below.
let tx_queue = tx_queue.clone();
service_fn_ok(move );


But, this would give you following warning:



warning: unused `futures::sink::send::Send` that must be used


As I said, the send just gives you a future that must be run to actually perform the sending. If you ignore the return value, nothing will happen. In this case, it would be best to spawn it as a separate task (so it doesn't block responding to the client). To spawn it, you need an executor from the runtime, which also has to be cloned for the inner closure:



 let executor = runtime.executor();
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || _: Request<Body>);





share|improve this answer























  • Ok I see what you mean. Thanks for the help :)

    – Jérôme R
    Mar 8 at 13:42











  • damm the error message is non sense. Also the double clone sux a little didn't find why I can't fix the error cause of that

    – Stargateur
    Mar 8 at 14:28












  • The cloning looks bad, but it is inevitable. Both of the closures can be called multiple times (they are Fn, not FnOnce), so they can not give out their own instance of tx_queue because they would not have it the next time they are called. So they keep one instance as a template and clone it when called. The clone is then moved/consumed.

    – michalsrb
    Mar 9 at 20:16















2














The futures::sync::mpsc::Sender::send consumes the Sender and produces a Send object, which is a future that has to be run to completion to actually send the data. If the channel is full, it will block until someone else receives from the channel. Upon completion it gives you back the Sender which you could use to send more data in.



In this case I don't think you can structure the code with just single instance of the Sender. You need to clone it so that there is new clone for every call of the service function. Notice both closures are move now:



 let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || 
// This closure has one instance of tx_queue that was moved-in here.
// Now we make a copy to be moved into the closure below.
let tx_queue = tx_queue.clone();
service_fn_ok(move );


But, this would give you following warning:



warning: unused `futures::sink::send::Send` that must be used


As I said, the send just gives you a future that must be run to actually perform the sending. If you ignore the return value, nothing will happen. In this case, it would be best to spawn it as a separate task (so it doesn't block responding to the client). To spawn it, you need an executor from the runtime, which also has to be cloned for the inner closure:



 let executor = runtime.executor();
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || _: Request<Body>);





share|improve this answer























  • Ok I see what you mean. Thanks for the help :)

    – Jérôme R
    Mar 8 at 13:42











  • damm the error message is non sense. Also the double clone sux a little didn't find why I can't fix the error cause of that

    – Stargateur
    Mar 8 at 14:28












  • The cloning looks bad, but it is inevitable. Both of the closures can be called multiple times (they are Fn, not FnOnce), so they can not give out their own instance of tx_queue because they would not have it the next time they are called. So they keep one instance as a template and clone it when called. The clone is then moved/consumed.

    – michalsrb
    Mar 9 at 20:16













2












2








2







The futures::sync::mpsc::Sender::send consumes the Sender and produces a Send object, which is a future that has to be run to completion to actually send the data. If the channel is full, it will block until someone else receives from the channel. Upon completion it gives you back the Sender which you could use to send more data in.



In this case I don't think you can structure the code with just single instance of the Sender. You need to clone it so that there is new clone for every call of the service function. Notice both closures are move now:



 let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || 
// This closure has one instance of tx_queue that was moved-in here.
// Now we make a copy to be moved into the closure below.
let tx_queue = tx_queue.clone();
service_fn_ok(move );


But, this would give you following warning:



warning: unused `futures::sink::send::Send` that must be used


As I said, the send just gives you a future that must be run to actually perform the sending. If you ignore the return value, nothing will happen. In this case, it would be best to spawn it as a separate task (so it doesn't block responding to the client). To spawn it, you need an executor from the runtime, which also has to be cloned for the inner closure:



 let executor = runtime.executor();
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || _: Request<Body>);





share|improve this answer













The futures::sync::mpsc::Sender::send consumes the Sender and produces a Send object, which is a future that has to be run to completion to actually send the data. If the channel is full, it will block until someone else receives from the channel. Upon completion it gives you back the Sender which you could use to send more data in.



In this case I don't think you can structure the code with just single instance of the Sender. You need to clone it so that there is new clone for every call of the service function. Notice both closures are move now:



 let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || 
// This closure has one instance of tx_queue that was moved-in here.
// Now we make a copy to be moved into the closure below.
let tx_queue = tx_queue.clone();
service_fn_ok(move );


But, this would give you following warning:



warning: unused `futures::sink::send::Send` that must be used


As I said, the send just gives you a future that must be run to actually perform the sending. If you ignore the return value, nothing will happen. In this case, it would be best to spawn it as a separate task (so it doesn't block responding to the client). To spawn it, you need an executor from the runtime, which also has to be cloned for the inner closure:



 let executor = runtime.executor();
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || _: Request<Body>);






share|improve this answer












share|improve this answer



share|improve this answer










answered Mar 8 at 13:37









michalsrbmichalsrb

2,883826




2,883826












  • Ok I see what you mean. Thanks for the help :)

    – Jérôme R
    Mar 8 at 13:42











  • damm the error message is non sense. Also the double clone sux a little didn't find why I can't fix the error cause of that

    – Stargateur
    Mar 8 at 14:28












  • The cloning looks bad, but it is inevitable. Both of the closures can be called multiple times (they are Fn, not FnOnce), so they can not give out their own instance of tx_queue because they would not have it the next time they are called. So they keep one instance as a template and clone it when called. The clone is then moved/consumed.

    – michalsrb
    Mar 9 at 20:16

















  • Ok I see what you mean. Thanks for the help :)

    – Jérôme R
    Mar 8 at 13:42











  • damm the error message is non sense. Also the double clone sux a little didn't find why I can't fix the error cause of that

    – Stargateur
    Mar 8 at 14:28












  • The cloning looks bad, but it is inevitable. Both of the closures can be called multiple times (they are Fn, not FnOnce), so they can not give out their own instance of tx_queue because they would not have it the next time they are called. So they keep one instance as a template and clone it when called. The clone is then moved/consumed.

    – michalsrb
    Mar 9 at 20:16
















Ok I see what you mean. Thanks for the help :)

– Jérôme R
Mar 8 at 13:42





Ok I see what you mean. Thanks for the help :)

– Jérôme R
Mar 8 at 13:42













damm the error message is non sense. Also the double clone sux a little didn't find why I can't fix the error cause of that

– Stargateur
Mar 8 at 14:28






damm the error message is non sense. Also the double clone sux a little didn't find why I can't fix the error cause of that

– Stargateur
Mar 8 at 14:28














The cloning looks bad, but it is inevitable. Both of the closures can be called multiple times (they are Fn, not FnOnce), so they can not give out their own instance of tx_queue because they would not have it the next time they are called. So they keep one instance as a template and clone it when called. The clone is then moved/consumed.

– michalsrb
Mar 9 at 20:16





The cloning looks bad, but it is inevitable. Both of the closures can be called multiple times (they are Fn, not FnOnce), so they can not give out their own instance of tx_queue because they would not have it the next time they are called. So they keep one instance as a template and clone it when called. The clone is then moved/consumed.

– michalsrb
Mar 9 at 20:16



















draft saved

draft discarded
















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55062400%2fchannel-communication-between-tasks%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

1928 у кіно

Захаров Федір Захарович

Ель Греко