Channel communication between tasks The 2019 Stack Overflow Developer Survey Results Are In Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern) The Ask Question Wizard is Live! Data science time! April 2019 and salary with experienceWhat are the differences between Rust's `String` and `str`?How do I share a HashMap between Hyper handlers?`for<'r, 'r, 'r> …` cannot be sent between threads safelyRepeating a Rust task with tokio_timerHow to indefinitely read an unbounded channel in a Rust Tokio task?Forwarding data to a Tokio mpsc channel from a streamWhy `futures::channel::mpsc` can just notify one sender?Add new tasks to tokio event loop and retry tasks on failureImporting the channel module from Tokio Core failsCan a Tokio task terminate the whole runtime gracefully?
Is every episode of "Where are my Pants?" identical?
Are my PIs rude or am I just being too sensitive?
Why can't devices on different VLANs, but on the same subnet, communicate?
Segmentation fault output is suppressed when piping stdin into a function. Why?
Does Parliament need to approve the new Brexit delay to 31 October 2019?
Can the DM override racial traits?
Derivation tree not rendering
How to stretch delimiters to envolve matrices inside of a kbordermatrix?
What LEGO pieces have "real-world" functionality?
I could not break this equation. Please help me
Cooking pasta in a water boiler
How can I protect witches in combat who wear limited clothing?
Wall plug outlet change
Mortgage adviser recommends a longer term than necessary combined with overpayments
Why does the Event Horizon Telescope (EHT) not include telescopes from Africa, Asia or Australia?
Did the UK government pay "millions and millions of dollars" to try to snag Julian Assange?
What information about me do stores get via my credit card?
Would an alien lifeform be able to achieve space travel if lacking in vision?
The variadic template constructor of my class cannot modify my class members, why is that so?
Can smartphones with the same camera sensor have different image quality?
How to test the equality of two Pearson correlation coefficients computed from the same sample?
How to copy the contents of all files with a certain name into a new file?
does high air pressure throw off wheel balance?
Road tyres vs "Street" tyres for charity ride on MTB Tandem
Channel communication between tasks
The 2019 Stack Overflow Developer Survey Results Are In
Announcing the arrival of Valued Associate #679: Cesar Manara
Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern)
The Ask Question Wizard is Live!
Data science time! April 2019 and salary with experienceWhat are the differences between Rust's `String` and `str`?How do I share a HashMap between Hyper handlers?`for<'r, 'r, 'r> …` cannot be sent between threads safelyRepeating a Rust task with tokio_timerHow to indefinitely read an unbounded channel in a Rust Tokio task?Forwarding data to a Tokio mpsc channel from a streamWhy `futures::channel::mpsc` can just notify one sender?Add new tasks to tokio event loop and retry tasks on failureImporting the channel module from Tokio Core failsCan a Tokio task terminate the whole runtime gracefully?
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;
I am trying to set up a channel based communication between one hyper service and one tokio stream. The problem is that the compiler rants with the following error:
closure is
FnOnce
because it moves the variabletx_queue
out of
its environment.
After reading the explanation provided by rustc --explain E0525
it appears that tokio::sync::mpsc::Sender implements Clone
but does not implement Copy
(unless I overlooked something).
So I am a bit stuck. How can I get my service send messages to a tokio stream via a tokio::sync::mpsc
channel ? I am sure I miss something obvious but can not see what :/
An excerpt of the problematic code (modified to make it shorter as @E_net4 requested):
extern crate hyper;
extern crate tokio;
extern crate tokio_signal;
use futures::Stream;
use hyper::rt::Future;
use hyper::service::service_fn_ok;
use hyper::Body, Request, Response, Server;
use futures::sink::Sink;
use futures::sync::mpsc, oneshot;
use futures::future, stream;
fn main() _: Request<Body>);
let graceful = http_server
.with_graceful_shutdown(rx1)
.map_err(
pub fn start_queue(rx: mpsc::Receiver<usize>) -> impl Future<Item = (), Error = ()> ())
The entire code is available here : https://gist.github.com/jeromer/52aa2da43c5c93584c6ee55be68dd04e
Thanks :)
rust rust-tokio hyper
add a comment |
I am trying to set up a channel based communication between one hyper service and one tokio stream. The problem is that the compiler rants with the following error:
closure is
FnOnce
because it moves the variabletx_queue
out of
its environment.
After reading the explanation provided by rustc --explain E0525
it appears that tokio::sync::mpsc::Sender implements Clone
but does not implement Copy
(unless I overlooked something).
So I am a bit stuck. How can I get my service send messages to a tokio stream via a tokio::sync::mpsc
channel ? I am sure I miss something obvious but can not see what :/
An excerpt of the problematic code (modified to make it shorter as @E_net4 requested):
extern crate hyper;
extern crate tokio;
extern crate tokio_signal;
use futures::Stream;
use hyper::rt::Future;
use hyper::service::service_fn_ok;
use hyper::Body, Request, Response, Server;
use futures::sink::Sink;
use futures::sync::mpsc, oneshot;
use futures::future, stream;
fn main() _: Request<Body>);
let graceful = http_server
.with_graceful_shutdown(rx1)
.map_err(
pub fn start_queue(rx: mpsc::Receiver<usize>) -> impl Future<Item = (), Error = ()> ())
The entire code is available here : https://gist.github.com/jeromer/52aa2da43c5c93584c6ee55be68dd04e
Thanks :)
rust rust-tokio hyper
Please try to make a better Minimal, Complete, and Verifiable example, so that there is everything we need to reproduce the problem is in the question itself. That code snippet alone does not compile.
– E_net4
Mar 8 at 11:53
add a comment |
I am trying to set up a channel based communication between one hyper service and one tokio stream. The problem is that the compiler rants with the following error:
closure is
FnOnce
because it moves the variabletx_queue
out of
its environment.
After reading the explanation provided by rustc --explain E0525
it appears that tokio::sync::mpsc::Sender implements Clone
but does not implement Copy
(unless I overlooked something).
So I am a bit stuck. How can I get my service send messages to a tokio stream via a tokio::sync::mpsc
channel ? I am sure I miss something obvious but can not see what :/
An excerpt of the problematic code (modified to make it shorter as @E_net4 requested):
extern crate hyper;
extern crate tokio;
extern crate tokio_signal;
use futures::Stream;
use hyper::rt::Future;
use hyper::service::service_fn_ok;
use hyper::Body, Request, Response, Server;
use futures::sink::Sink;
use futures::sync::mpsc, oneshot;
use futures::future, stream;
fn main() _: Request<Body>);
let graceful = http_server
.with_graceful_shutdown(rx1)
.map_err(
pub fn start_queue(rx: mpsc::Receiver<usize>) -> impl Future<Item = (), Error = ()> ())
The entire code is available here : https://gist.github.com/jeromer/52aa2da43c5c93584c6ee55be68dd04e
Thanks :)
rust rust-tokio hyper
I am trying to set up a channel based communication between one hyper service and one tokio stream. The problem is that the compiler rants with the following error:
closure is
FnOnce
because it moves the variabletx_queue
out of
its environment.
After reading the explanation provided by rustc --explain E0525
it appears that tokio::sync::mpsc::Sender implements Clone
but does not implement Copy
(unless I overlooked something).
So I am a bit stuck. How can I get my service send messages to a tokio stream via a tokio::sync::mpsc
channel ? I am sure I miss something obvious but can not see what :/
An excerpt of the problematic code (modified to make it shorter as @E_net4 requested):
extern crate hyper;
extern crate tokio;
extern crate tokio_signal;
use futures::Stream;
use hyper::rt::Future;
use hyper::service::service_fn_ok;
use hyper::Body, Request, Response, Server;
use futures::sink::Sink;
use futures::sync::mpsc, oneshot;
use futures::future, stream;
fn main() _: Request<Body>);
let graceful = http_server
.with_graceful_shutdown(rx1)
.map_err(
pub fn start_queue(rx: mpsc::Receiver<usize>) -> impl Future<Item = (), Error = ()> ())
The entire code is available here : https://gist.github.com/jeromer/52aa2da43c5c93584c6ee55be68dd04e
Thanks :)
rust rust-tokio hyper
rust rust-tokio hyper
edited Mar 8 at 13:40
Jérôme R
asked Mar 8 at 11:33
Jérôme RJérôme R
7022820
7022820
Please try to make a better Minimal, Complete, and Verifiable example, so that there is everything we need to reproduce the problem is in the question itself. That code snippet alone does not compile.
– E_net4
Mar 8 at 11:53
add a comment |
Please try to make a better Minimal, Complete, and Verifiable example, so that there is everything we need to reproduce the problem is in the question itself. That code snippet alone does not compile.
– E_net4
Mar 8 at 11:53
Please try to make a better Minimal, Complete, and Verifiable example, so that there is everything we need to reproduce the problem is in the question itself. That code snippet alone does not compile.
– E_net4
Mar 8 at 11:53
Please try to make a better Minimal, Complete, and Verifiable example, so that there is everything we need to reproduce the problem is in the question itself. That code snippet alone does not compile.
– E_net4
Mar 8 at 11:53
add a comment |
1 Answer
1
active
oldest
votes
The futures::sync::mpsc::Sender::send
consumes the Sender
and produces a Send
object, which is a future that has to be run to completion to actually send the data. If the channel is full, it will block until someone else receives from the channel. Upon completion it gives you back the Sender
which you could use to send more data in.
In this case I don't think you can structure the code with just single instance of the Sender
. You need to clone it so that there is new clone for every call of the service function. Notice both closures are move
now:
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move ||
// This closure has one instance of tx_queue that was moved-in here.
// Now we make a copy to be moved into the closure below.
let tx_queue = tx_queue.clone();
service_fn_ok(move );
But, this would give you following warning:
warning: unused `futures::sink::send::Send` that must be used
As I said, the send
just gives you a future that must be run to actually perform the sending. If you ignore the return value, nothing will happen. In this case, it would be best to spawn
it as a separate task (so it doesn't block responding to the client). To spawn it, you need an executor from the runtime, which also has to be cloned for the inner closure:
let executor = runtime.executor();
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || _: Request<Body>);
Ok I see what you mean. Thanks for the help :)
– Jérôme R
Mar 8 at 13:42
damm the error message is non sense. Also the double clone sux a little didn't find why I can't fix the error cause of that
– Stargateur
Mar 8 at 14:28
The cloning looks bad, but it is inevitable. Both of the closures can be called multiple times (they areFn
, notFnOnce
), so they can not give out their own instance oftx_queue
because they would not have it the next time they are called. So they keep one instance as a template and clone it when called. The clone is then moved/consumed.
– michalsrb
Mar 9 at 20:16
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55062400%2fchannel-communication-between-tasks%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
The futures::sync::mpsc::Sender::send
consumes the Sender
and produces a Send
object, which is a future that has to be run to completion to actually send the data. If the channel is full, it will block until someone else receives from the channel. Upon completion it gives you back the Sender
which you could use to send more data in.
In this case I don't think you can structure the code with just single instance of the Sender
. You need to clone it so that there is new clone for every call of the service function. Notice both closures are move
now:
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move ||
// This closure has one instance of tx_queue that was moved-in here.
// Now we make a copy to be moved into the closure below.
let tx_queue = tx_queue.clone();
service_fn_ok(move );
But, this would give you following warning:
warning: unused `futures::sink::send::Send` that must be used
As I said, the send
just gives you a future that must be run to actually perform the sending. If you ignore the return value, nothing will happen. In this case, it would be best to spawn
it as a separate task (so it doesn't block responding to the client). To spawn it, you need an executor from the runtime, which also has to be cloned for the inner closure:
let executor = runtime.executor();
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || _: Request<Body>);
Ok I see what you mean. Thanks for the help :)
– Jérôme R
Mar 8 at 13:42
damm the error message is non sense. Also the double clone sux a little didn't find why I can't fix the error cause of that
– Stargateur
Mar 8 at 14:28
The cloning looks bad, but it is inevitable. Both of the closures can be called multiple times (they areFn
, notFnOnce
), so they can not give out their own instance oftx_queue
because they would not have it the next time they are called. So they keep one instance as a template and clone it when called. The clone is then moved/consumed.
– michalsrb
Mar 9 at 20:16
add a comment |
The futures::sync::mpsc::Sender::send
consumes the Sender
and produces a Send
object, which is a future that has to be run to completion to actually send the data. If the channel is full, it will block until someone else receives from the channel. Upon completion it gives you back the Sender
which you could use to send more data in.
In this case I don't think you can structure the code with just single instance of the Sender
. You need to clone it so that there is new clone for every call of the service function. Notice both closures are move
now:
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move ||
// This closure has one instance of tx_queue that was moved-in here.
// Now we make a copy to be moved into the closure below.
let tx_queue = tx_queue.clone();
service_fn_ok(move );
But, this would give you following warning:
warning: unused `futures::sink::send::Send` that must be used
As I said, the send
just gives you a future that must be run to actually perform the sending. If you ignore the return value, nothing will happen. In this case, it would be best to spawn
it as a separate task (so it doesn't block responding to the client). To spawn it, you need an executor from the runtime, which also has to be cloned for the inner closure:
let executor = runtime.executor();
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || _: Request<Body>);
Ok I see what you mean. Thanks for the help :)
– Jérôme R
Mar 8 at 13:42
damm the error message is non sense. Also the double clone sux a little didn't find why I can't fix the error cause of that
– Stargateur
Mar 8 at 14:28
The cloning looks bad, but it is inevitable. Both of the closures can be called multiple times (they areFn
, notFnOnce
), so they can not give out their own instance oftx_queue
because they would not have it the next time they are called. So they keep one instance as a template and clone it when called. The clone is then moved/consumed.
– michalsrb
Mar 9 at 20:16
add a comment |
The futures::sync::mpsc::Sender::send
consumes the Sender
and produces a Send
object, which is a future that has to be run to completion to actually send the data. If the channel is full, it will block until someone else receives from the channel. Upon completion it gives you back the Sender
which you could use to send more data in.
In this case I don't think you can structure the code with just single instance of the Sender
. You need to clone it so that there is new clone for every call of the service function. Notice both closures are move
now:
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move ||
// This closure has one instance of tx_queue that was moved-in here.
// Now we make a copy to be moved into the closure below.
let tx_queue = tx_queue.clone();
service_fn_ok(move );
But, this would give you following warning:
warning: unused `futures::sink::send::Send` that must be used
As I said, the send
just gives you a future that must be run to actually perform the sending. If you ignore the return value, nothing will happen. In this case, it would be best to spawn
it as a separate task (so it doesn't block responding to the client). To spawn it, you need an executor from the runtime, which also has to be cloned for the inner closure:
let executor = runtime.executor();
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || _: Request<Body>);
The futures::sync::mpsc::Sender::send
consumes the Sender
and produces a Send
object, which is a future that has to be run to completion to actually send the data. If the channel is full, it will block until someone else receives from the channel. Upon completion it gives you back the Sender
which you could use to send more data in.
In this case I don't think you can structure the code with just single instance of the Sender
. You need to clone it so that there is new clone for every call of the service function. Notice both closures are move
now:
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move ||
// This closure has one instance of tx_queue that was moved-in here.
// Now we make a copy to be moved into the closure below.
let tx_queue = tx_queue.clone();
service_fn_ok(move );
But, this would give you following warning:
warning: unused `futures::sink::send::Send` that must be used
As I said, the send
just gives you a future that must be run to actually perform the sending. If you ignore the return value, nothing will happen. In this case, it would be best to spawn
it as a separate task (so it doesn't block responding to the client). To spawn it, you need an executor from the runtime, which also has to be cloned for the inner closure:
let executor = runtime.executor();
let http_server = Server::bind(&([127, 0, 0, 1], 3000).into()).serve(move || _: Request<Body>);
answered Mar 8 at 13:37
michalsrbmichalsrb
2,883826
2,883826
Ok I see what you mean. Thanks for the help :)
– Jérôme R
Mar 8 at 13:42
damm the error message is non sense. Also the double clone sux a little didn't find why I can't fix the error cause of that
– Stargateur
Mar 8 at 14:28
The cloning looks bad, but it is inevitable. Both of the closures can be called multiple times (they areFn
, notFnOnce
), so they can not give out their own instance oftx_queue
because they would not have it the next time they are called. So they keep one instance as a template and clone it when called. The clone is then moved/consumed.
– michalsrb
Mar 9 at 20:16
add a comment |
Ok I see what you mean. Thanks for the help :)
– Jérôme R
Mar 8 at 13:42
damm the error message is non sense. Also the double clone sux a little didn't find why I can't fix the error cause of that
– Stargateur
Mar 8 at 14:28
The cloning looks bad, but it is inevitable. Both of the closures can be called multiple times (they areFn
, notFnOnce
), so they can not give out their own instance oftx_queue
because they would not have it the next time they are called. So they keep one instance as a template and clone it when called. The clone is then moved/consumed.
– michalsrb
Mar 9 at 20:16
Ok I see what you mean. Thanks for the help :)
– Jérôme R
Mar 8 at 13:42
Ok I see what you mean. Thanks for the help :)
– Jérôme R
Mar 8 at 13:42
damm the error message is non sense. Also the double clone sux a little didn't find why I can't fix the error cause of that
– Stargateur
Mar 8 at 14:28
damm the error message is non sense. Also the double clone sux a little didn't find why I can't fix the error cause of that
– Stargateur
Mar 8 at 14:28
The cloning looks bad, but it is inevitable. Both of the closures can be called multiple times (they are
Fn
, not FnOnce
), so they can not give out their own instance of tx_queue
because they would not have it the next time they are called. So they keep one instance as a template and clone it when called. The clone is then moved/consumed.– michalsrb
Mar 9 at 20:16
The cloning looks bad, but it is inevitable. Both of the closures can be called multiple times (they are
Fn
, not FnOnce
), so they can not give out their own instance of tx_queue
because they would not have it the next time they are called. So they keep one instance as a template and clone it when called. The clone is then moved/consumed.– michalsrb
Mar 9 at 20:16
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55062400%2fchannel-communication-between-tasks%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Please try to make a better Minimal, Complete, and Verifiable example, so that there is everything we need to reproduce the problem is in the question itself. That code snippet alone does not compile.
– E_net4
Mar 8 at 11:53