flink with python, execution of job failed2019 Community Moderator ElectionCalling an external command in PythonWhat are metaclasses in Python?Is there a way to run Python on Android?Finding the index of an item given a list containing it in PythonDifference between append vs. extend list methods in PythonHow can I safely create a nested directory in Python?Does Python have a ternary conditional operator?How to get the current time in PythonHow can I make a time delay in Python?Does Python have a string 'contains' substring method?

Are E natural minor and B harmonic minor related?

How can I portion out frozen cookie dough?

Writing text next to a table

Locked Away- What am I?

Either of .... (Plural/Singular)

What can I do if someone tampers with my SSH public key?

How to increase the accuracy of a plot

Can the Witch Sight warlock invocation see through the Mirror Image spell?

(Codewars) Linked Lists-Sorted Insert

Can one live in the U.S. and not use a credit card?

How do spaceships determine each other's mass in space?

Why does Central Limit Theorem break down in my simulation?

Is divide-by-zero a security vulnerability?

Giving a career talk in my old university, how prominently should I tell students my salary?

-1 to the power of a irrational number

What is the purpose of a disclaimer like "this is not legal advice"?

Is it appropriate to ask a former professor to order a book for me through an inter-library loan?

School performs periodic password audits. Is my password compromised?

Do Paladin Auras of Differing Oaths Stack?

How would an energy-based "projectile" blow up a spaceship?

How can a demon take control of a human body during REM sleep?

Did Amazon pay $0 in taxes last year?

Movie: boy escapes the real world and goes to a fantasy world with big furry trolls

Idiom for feeling after taking risk and someone else being rewarded



flink with python, execution of job failed



2019 Community Moderator ElectionCalling an external command in PythonWhat are metaclasses in Python?Is there a way to run Python on Android?Finding the index of an item given a list containing it in PythonDifference between append vs. extend list methods in PythonHow can I safely create a nested directory in Python?Does Python have a ternary conditional operator?How to get the current time in PythonHow can I make a time delay in Python?Does Python have a string 'contains' substring method?










1















For a first try I want to read JSON data from a file and pass it on to Flink. I defined a source (which reads JSON strings line by line) and a placeholder filter. See Code:



from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.api.common.functions import FilterFunction
import json
import sys

class Json_reader(SourceFunction):
def readjason(self, ctx):
sys.stdin = open('capture.json', 'r')
for line in sys.stdin:
ctx.collect(json.loads(line))


class Dummy_Filter(FilterFunction):
def filter(self, value):
return True

#
# The pipeline definition.
#
def main(factory):
env = factory.get_execution_environment()
env.create_python_source(Json_reader())
.filter(Dummy_Filter())
.output()
env.execute()


When I build the job and move it to my started Flink-cluster, I get the following error message:




VirtualBox:/media/sf_Python$ ./flink-1.7.2/bin/pyflink-stream.sh
./json_parser_flink.py Starting execution of program Failed to run
plan: null Traceback (most recent call last): File "", line
1, in File
"/tmp/flink_streaming_plan_fbe13c4c-6918-46d4-a4bc-36908a2bea24/json_parser_flink.py",
line 25, in main at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
at
org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
org.apache.flink.client.program.ProgramInvocationException:
org.apache.flink.client.program.ProgramInvocationException: Job
failed. (JobID: 31615948194c951be03d46576929aa23)



The program didn't contain a Flink job. Perhaps you forgot to call
execute() on the execution environment.




I haven't forgotten to call execute().










share|improve this question


























    1















    For a first try I want to read JSON data from a file and pass it on to Flink. I defined a source (which reads JSON strings line by line) and a placeholder filter. See Code:



    from org.apache.flink.streaming.api.functions.source import SourceFunction
    from org.apache.flink.api.common.functions import FilterFunction
    import json
    import sys

    class Json_reader(SourceFunction):
    def readjason(self, ctx):
    sys.stdin = open('capture.json', 'r')
    for line in sys.stdin:
    ctx.collect(json.loads(line))


    class Dummy_Filter(FilterFunction):
    def filter(self, value):
    return True

    #
    # The pipeline definition.
    #
    def main(factory):
    env = factory.get_execution_environment()
    env.create_python_source(Json_reader())
    .filter(Dummy_Filter())
    .output()
    env.execute()


    When I build the job and move it to my started Flink-cluster, I get the following error message:




    VirtualBox:/media/sf_Python$ ./flink-1.7.2/bin/pyflink-stream.sh
    ./json_parser_flink.py Starting execution of program Failed to run
    plan: null Traceback (most recent call last): File "", line
    1, in File
    "/tmp/flink_streaming_plan_fbe13c4c-6918-46d4-a4bc-36908a2bea24/json_parser_flink.py",
    line 25, in main at
    org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
    at
    org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
    at
    org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
    at
    org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
    at
    org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
    sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    org.apache.flink.client.program.ProgramInvocationException:
    org.apache.flink.client.program.ProgramInvocationException: Job
    failed. (JobID: 31615948194c951be03d46576929aa23)



    The program didn't contain a Flink job. Perhaps you forgot to call
    execute() on the execution environment.




    I haven't forgotten to call execute().










    share|improve this question
























      1












      1








      1








      For a first try I want to read JSON data from a file and pass it on to Flink. I defined a source (which reads JSON strings line by line) and a placeholder filter. See Code:



      from org.apache.flink.streaming.api.functions.source import SourceFunction
      from org.apache.flink.api.common.functions import FilterFunction
      import json
      import sys

      class Json_reader(SourceFunction):
      def readjason(self, ctx):
      sys.stdin = open('capture.json', 'r')
      for line in sys.stdin:
      ctx.collect(json.loads(line))


      class Dummy_Filter(FilterFunction):
      def filter(self, value):
      return True

      #
      # The pipeline definition.
      #
      def main(factory):
      env = factory.get_execution_environment()
      env.create_python_source(Json_reader())
      .filter(Dummy_Filter())
      .output()
      env.execute()


      When I build the job and move it to my started Flink-cluster, I get the following error message:




      VirtualBox:/media/sf_Python$ ./flink-1.7.2/bin/pyflink-stream.sh
      ./json_parser_flink.py Starting execution of program Failed to run
      plan: null Traceback (most recent call last): File "", line
      1, in File
      "/tmp/flink_streaming_plan_fbe13c4c-6918-46d4-a4bc-36908a2bea24/json_parser_flink.py",
      line 25, in main at
      org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
      at
      org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
      at
      org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
      at
      org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
      at
      org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
      sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at
      sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      org.apache.flink.client.program.ProgramInvocationException:
      org.apache.flink.client.program.ProgramInvocationException: Job
      failed. (JobID: 31615948194c951be03d46576929aa23)



      The program didn't contain a Flink job. Perhaps you forgot to call
      execute() on the execution environment.




      I haven't forgotten to call execute().










      share|improve this question














      For a first try I want to read JSON data from a file and pass it on to Flink. I defined a source (which reads JSON strings line by line) and a placeholder filter. See Code:



      from org.apache.flink.streaming.api.functions.source import SourceFunction
      from org.apache.flink.api.common.functions import FilterFunction
      import json
      import sys

      class Json_reader(SourceFunction):
      def readjason(self, ctx):
      sys.stdin = open('capture.json', 'r')
      for line in sys.stdin:
      ctx.collect(json.loads(line))


      class Dummy_Filter(FilterFunction):
      def filter(self, value):
      return True

      #
      # The pipeline definition.
      #
      def main(factory):
      env = factory.get_execution_environment()
      env.create_python_source(Json_reader())
      .filter(Dummy_Filter())
      .output()
      env.execute()


      When I build the job and move it to my started Flink-cluster, I get the following error message:




      VirtualBox:/media/sf_Python$ ./flink-1.7.2/bin/pyflink-stream.sh
      ./json_parser_flink.py Starting execution of program Failed to run
      plan: null Traceback (most recent call last): File "", line
      1, in File
      "/tmp/flink_streaming_plan_fbe13c4c-6918-46d4-a4bc-36908a2bea24/json_parser_flink.py",
      line 25, in main at
      org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
      at
      org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
      at
      org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
      at
      org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
      at
      org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
      sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at
      sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      org.apache.flink.client.program.ProgramInvocationException:
      org.apache.flink.client.program.ProgramInvocationException: Job
      failed. (JobID: 31615948194c951be03d46576929aa23)



      The program didn't contain a Flink job. Perhaps you forgot to call
      execute() on the execution environment.




      I haven't forgotten to call execute().







      python apache-flink






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Mar 6 at 13:36









      mudvaynemudvayne

      306




      306






















          1 Answer
          1






          active

          oldest

          votes


















          1














          I found the problem. Fast expects a run() function in the SourceFunction.






          share|improve this answer






















            Your Answer






            StackExchange.ifUsing("editor", function ()
            StackExchange.using("externalEditor", function ()
            StackExchange.using("snippets", function ()
            StackExchange.snippets.init();
            );
            );
            , "code-snippets");

            StackExchange.ready(function()
            var channelOptions =
            tags: "".split(" "),
            id: "1"
            ;
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function()
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled)
            StackExchange.using("snippets", function()
            createEditor();
            );

            else
            createEditor();

            );

            function createEditor()
            StackExchange.prepareEditor(
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader:
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            ,
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            );



            );













            draft saved

            draft discarded


















            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55024413%2fflink-with-python-execution-of-job-failed%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            1














            I found the problem. Fast expects a run() function in the SourceFunction.






            share|improve this answer



























              1














              I found the problem. Fast expects a run() function in the SourceFunction.






              share|improve this answer

























                1












                1








                1







                I found the problem. Fast expects a run() function in the SourceFunction.






                share|improve this answer













                I found the problem. Fast expects a run() function in the SourceFunction.







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered 2 days ago









                mudvaynemudvayne

                306




                306





























                    draft saved

                    draft discarded
















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid


                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.

                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function ()
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55024413%2fflink-with-python-execution-of-job-failed%23new-answer', 'question_page');

                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    AWS Lex not identifying response if by a variable The 2019 Stack Overflow Developer Survey Results Are In Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern) The Ask Question Wizard is Live! Data science time! April 2019 and salary with experienceEnforcing custom enumeration in AWS LEX for slot valuesHow to give response based on user response in Amazon Lex?Intercepting AWS Lambda Response to a AWS Lex QueryLex chat bot error: Reached second execution of fulfillment lambda on the same utteranceamazon lex showing invalid responseLambda response send back to Lex slot?Response card in Amazon lexAmazon Lex - Lambda response return HTML to botHow can I solve 424 (Failed Dependency) (python) obtained from Amazon lex?

                    Алба-Юлія

                    Захаров Федір Захарович