flink with python, execution of job failed2019 Community Moderator ElectionCalling an external command in PythonWhat are metaclasses in Python?Is there a way to run Python on Android?Finding the index of an item given a list containing it in PythonDifference between append vs. extend list methods in PythonHow can I safely create a nested directory in Python?Does Python have a ternary conditional operator?How to get the current time in PythonHow can I make a time delay in Python?Does Python have a string 'contains' substring method?
Are E natural minor and B harmonic minor related?
How can I portion out frozen cookie dough?
Writing text next to a table
Locked Away- What am I?
Either of .... (Plural/Singular)
What can I do if someone tampers with my SSH public key?
How to increase the accuracy of a plot
Can the Witch Sight warlock invocation see through the Mirror Image spell?
(Codewars) Linked Lists-Sorted Insert
Can one live in the U.S. and not use a credit card?
How do spaceships determine each other's mass in space?
Why does Central Limit Theorem break down in my simulation?
Is divide-by-zero a security vulnerability?
Giving a career talk in my old university, how prominently should I tell students my salary?
-1 to the power of a irrational number
What is the purpose of a disclaimer like "this is not legal advice"?
Is it appropriate to ask a former professor to order a book for me through an inter-library loan?
School performs periodic password audits. Is my password compromised?
Do Paladin Auras of Differing Oaths Stack?
How would an energy-based "projectile" blow up a spaceship?
How can a demon take control of a human body during REM sleep?
Did Amazon pay $0 in taxes last year?
Movie: boy escapes the real world and goes to a fantasy world with big furry trolls
Idiom for feeling after taking risk and someone else being rewarded
flink with python, execution of job failed
2019 Community Moderator ElectionCalling an external command in PythonWhat are metaclasses in Python?Is there a way to run Python on Android?Finding the index of an item given a list containing it in PythonDifference between append vs. extend list methods in PythonHow can I safely create a nested directory in Python?Does Python have a ternary conditional operator?How to get the current time in PythonHow can I make a time delay in Python?Does Python have a string 'contains' substring method?
For a first try I want to read JSON data from a file and pass it on to Flink. I defined a source (which reads JSON strings line by line) and a placeholder filter. See Code:
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.api.common.functions import FilterFunction
import json
import sys
class Json_reader(SourceFunction):
def readjason(self, ctx):
sys.stdin = open('capture.json', 'r')
for line in sys.stdin:
ctx.collect(json.loads(line))
class Dummy_Filter(FilterFunction):
def filter(self, value):
return True
#
# The pipeline definition.
#
def main(factory):
env = factory.get_execution_environment()
env.create_python_source(Json_reader())
.filter(Dummy_Filter())
.output()
env.execute()
When I build the job and move it to my started Flink-cluster, I get the following error message:
VirtualBox:/media/sf_Python$ ./flink-1.7.2/bin/pyflink-stream.sh
./json_parser_flink.py Starting execution of program Failed to run
plan: null Traceback (most recent call last): File "", line
1, in File
"/tmp/flink_streaming_plan_fbe13c4c-6918-46d4-a4bc-36908a2bea24/json_parser_flink.py",
line 25, in main at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
at
org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
org.apache.flink.client.program.ProgramInvocationException:
org.apache.flink.client.program.ProgramInvocationException: Job
failed. (JobID: 31615948194c951be03d46576929aa23)
The program didn't contain a Flink job. Perhaps you forgot to call
execute() on the execution environment.
I haven't forgotten to call execute().
python apache-flink
add a comment |
For a first try I want to read JSON data from a file and pass it on to Flink. I defined a source (which reads JSON strings line by line) and a placeholder filter. See Code:
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.api.common.functions import FilterFunction
import json
import sys
class Json_reader(SourceFunction):
def readjason(self, ctx):
sys.stdin = open('capture.json', 'r')
for line in sys.stdin:
ctx.collect(json.loads(line))
class Dummy_Filter(FilterFunction):
def filter(self, value):
return True
#
# The pipeline definition.
#
def main(factory):
env = factory.get_execution_environment()
env.create_python_source(Json_reader())
.filter(Dummy_Filter())
.output()
env.execute()
When I build the job and move it to my started Flink-cluster, I get the following error message:
VirtualBox:/media/sf_Python$ ./flink-1.7.2/bin/pyflink-stream.sh
./json_parser_flink.py Starting execution of program Failed to run
plan: null Traceback (most recent call last): File "", line
1, in File
"/tmp/flink_streaming_plan_fbe13c4c-6918-46d4-a4bc-36908a2bea24/json_parser_flink.py",
line 25, in main at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
at
org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
org.apache.flink.client.program.ProgramInvocationException:
org.apache.flink.client.program.ProgramInvocationException: Job
failed. (JobID: 31615948194c951be03d46576929aa23)
The program didn't contain a Flink job. Perhaps you forgot to call
execute() on the execution environment.
I haven't forgotten to call execute().
python apache-flink
add a comment |
For a first try I want to read JSON data from a file and pass it on to Flink. I defined a source (which reads JSON strings line by line) and a placeholder filter. See Code:
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.api.common.functions import FilterFunction
import json
import sys
class Json_reader(SourceFunction):
def readjason(self, ctx):
sys.stdin = open('capture.json', 'r')
for line in sys.stdin:
ctx.collect(json.loads(line))
class Dummy_Filter(FilterFunction):
def filter(self, value):
return True
#
# The pipeline definition.
#
def main(factory):
env = factory.get_execution_environment()
env.create_python_source(Json_reader())
.filter(Dummy_Filter())
.output()
env.execute()
When I build the job and move it to my started Flink-cluster, I get the following error message:
VirtualBox:/media/sf_Python$ ./flink-1.7.2/bin/pyflink-stream.sh
./json_parser_flink.py Starting execution of program Failed to run
plan: null Traceback (most recent call last): File "", line
1, in File
"/tmp/flink_streaming_plan_fbe13c4c-6918-46d4-a4bc-36908a2bea24/json_parser_flink.py",
line 25, in main at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
at
org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
org.apache.flink.client.program.ProgramInvocationException:
org.apache.flink.client.program.ProgramInvocationException: Job
failed. (JobID: 31615948194c951be03d46576929aa23)
The program didn't contain a Flink job. Perhaps you forgot to call
execute() on the execution environment.
I haven't forgotten to call execute().
python apache-flink
For a first try I want to read JSON data from a file and pass it on to Flink. I defined a source (which reads JSON strings line by line) and a placeholder filter. See Code:
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.api.common.functions import FilterFunction
import json
import sys
class Json_reader(SourceFunction):
def readjason(self, ctx):
sys.stdin = open('capture.json', 'r')
for line in sys.stdin:
ctx.collect(json.loads(line))
class Dummy_Filter(FilterFunction):
def filter(self, value):
return True
#
# The pipeline definition.
#
def main(factory):
env = factory.get_execution_environment()
env.create_python_source(Json_reader())
.filter(Dummy_Filter())
.output()
env.execute()
When I build the job and move it to my started Flink-cluster, I get the following error message:
VirtualBox:/media/sf_Python$ ./flink-1.7.2/bin/pyflink-stream.sh
./json_parser_flink.py Starting execution of program Failed to run
plan: null Traceback (most recent call last): File "", line
1, in File
"/tmp/flink_streaming_plan_fbe13c4c-6918-46d4-a4bc-36908a2bea24/json_parser_flink.py",
line 25, in main at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
at
org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
org.apache.flink.client.program.ProgramInvocationException:
org.apache.flink.client.program.ProgramInvocationException: Job
failed. (JobID: 31615948194c951be03d46576929aa23)
The program didn't contain a Flink job. Perhaps you forgot to call
execute() on the execution environment.
I haven't forgotten to call execute().
python apache-flink
python apache-flink
asked Mar 6 at 13:36
mudvaynemudvayne
306
306
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
I found the problem. Fast expects a run() function in the SourceFunction.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55024413%2fflink-with-python-execution-of-job-failed%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
I found the problem. Fast expects a run() function in the SourceFunction.
add a comment |
I found the problem. Fast expects a run() function in the SourceFunction.
add a comment |
I found the problem. Fast expects a run() function in the SourceFunction.
I found the problem. Fast expects a run() function in the SourceFunction.
answered 2 days ago
mudvaynemudvayne
306
306
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55024413%2fflink-with-python-execution-of-job-failed%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown