Airflow python callable function reusable Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern) The Ask Question Wizard is Live! Data science time! April 2019 and salary with experience Should we burninate the [wrap] tag?Calling an external command in PythonWhat are metaclasses in Python?Finding the index of an item given a list containing it in PythonDifference between append vs. extend list methods in PythonHow can I safely create a nested directory in Python?Does Python have a ternary conditional operator?How to get the current time in PythonUsing global variables in a functionHow to make a chain of function decorators?Does Python have a string 'contains' substring method?

WAN encapsulation

How much radiation do nuclear physics experiments expose researchers to nowadays?

Is the Standard Deduction better than Itemized when both are the same amount?

When -s is used with third person singular. What's its use in this context?

How can I make names more distinctive without making them longer?

What LEGO pieces have "real-world" functionality?

When is phishing education going too far?

What is the longest distance a 13th-level monk can jump while attacking on the same turn?

The logistics of corpse disposal

Is 1 ppb equal to 1 μg/kg?

How to deal with a team lead who never gives me credit?

Antler Helmet: Can it work?

Why is black pepper both grey and black?

Withdrew £2800, but only £2000 shows as withdrawn on online banking; what are my obligations?

Is there a service that would inform me whenever a new direct route is scheduled from a given airport?

How do I mention the quality of my school without bragging

Is a manifold-with-boundary with given interior and non-empty boundary essentially unique?

How to find all the available tools in macOS terminal?

What are the pros and cons of Aerospike nosecones?

When to stop saving and start investing?

Is there a concise way to say "all of the X, one of each"?

How to motivate offshore teams and trust them to deliver?

Check which numbers satisfy the condition [A*B*C = A! + B! + C!]

Should I call the interviewer directly, if HR aren't responding?



Airflow python callable function reusable



Announcing the arrival of Valued Associate #679: Cesar Manara
Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern)
The Ask Question Wizard is Live!
Data science time! April 2019 and salary with experience
Should we burninate the [wrap] tag?Calling an external command in PythonWhat are metaclasses in Python?Finding the index of an item given a list containing it in PythonDifference between append vs. extend list methods in PythonHow can I safely create a nested directory in Python?Does Python have a ternary conditional operator?How to get the current time in PythonUsing global variables in a functionHow to make a chain of function decorators?Does Python have a string 'contains' substring method?



.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;








3















airflow_version = 1.10.2; python_version = 3.6.8



I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.



So, the following works:



def my_function(temp_file, task_id, **kwargs):

xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)

if not xcom_vals:
return 'Xcom message not retrieved'

ack_messages = []

for item in xcom_vals:

ack_messages += <do stuff>

return ack_messages

with DAG(<dag args>):

process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs='task_id': 'previous_task_id',
'temp_file': temp_file,
provide_context=True,
)


But, moving my_function to a module lib/helpers.py and then importing it fails with error.



Broken DAG: [path to dag] cannot import my_function


NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.



How should my_function be implemented so that it is callable by other dags?










share|improve this question
























  • I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins

    – Mntfr
    Mar 8 at 16:17











  • Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?

    – Miguel Serrano
    Mar 8 at 16:24












  • @dorvak libs is subpath to dags, so folder structure is airflow -- dags -- libs, and libs is a py module so, init.py is there :

    – Miguel Serrano
    Mar 8 at 16:27












  • @dorvak libs has a __init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!

    – Miguel Serrano
    Mar 8 at 16:39











  • Alright, sorry, for the confusion (deleted my comments)

    – dorvak
    Mar 8 at 16:44

















3















airflow_version = 1.10.2; python_version = 3.6.8



I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.



So, the following works:



def my_function(temp_file, task_id, **kwargs):

xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)

if not xcom_vals:
return 'Xcom message not retrieved'

ack_messages = []

for item in xcom_vals:

ack_messages += <do stuff>

return ack_messages

with DAG(<dag args>):

process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs='task_id': 'previous_task_id',
'temp_file': temp_file,
provide_context=True,
)


But, moving my_function to a module lib/helpers.py and then importing it fails with error.



Broken DAG: [path to dag] cannot import my_function


NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.



How should my_function be implemented so that it is callable by other dags?










share|improve this question
























  • I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins

    – Mntfr
    Mar 8 at 16:17











  • Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?

    – Miguel Serrano
    Mar 8 at 16:24












  • @dorvak libs is subpath to dags, so folder structure is airflow -- dags -- libs, and libs is a py module so, init.py is there :

    – Miguel Serrano
    Mar 8 at 16:27












  • @dorvak libs has a __init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!

    – Miguel Serrano
    Mar 8 at 16:39











  • Alright, sorry, for the confusion (deleted my comments)

    – dorvak
    Mar 8 at 16:44













3












3








3








airflow_version = 1.10.2; python_version = 3.6.8



I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.



So, the following works:



def my_function(temp_file, task_id, **kwargs):

xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)

if not xcom_vals:
return 'Xcom message not retrieved'

ack_messages = []

for item in xcom_vals:

ack_messages += <do stuff>

return ack_messages

with DAG(<dag args>):

process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs='task_id': 'previous_task_id',
'temp_file': temp_file,
provide_context=True,
)


But, moving my_function to a module lib/helpers.py and then importing it fails with error.



Broken DAG: [path to dag] cannot import my_function


NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.



How should my_function be implemented so that it is callable by other dags?










share|improve this question
















airflow_version = 1.10.2; python_version = 3.6.8



I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.



So, the following works:



def my_function(temp_file, task_id, **kwargs):

xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)

if not xcom_vals:
return 'Xcom message not retrieved'

ack_messages = []

for item in xcom_vals:

ack_messages += <do stuff>

return ack_messages

with DAG(<dag args>):

process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs='task_id': 'previous_task_id',
'temp_file': temp_file,
provide_context=True,
)


But, moving my_function to a module lib/helpers.py and then importing it fails with error.



Broken DAG: [path to dag] cannot import my_function


NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.



How should my_function be implemented so that it is callable by other dags?







python airflow






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Mar 15 at 15:54







Miguel Serrano

















asked Mar 8 at 16:13









Miguel SerranoMiguel Serrano

215




215












  • I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins

    – Mntfr
    Mar 8 at 16:17











  • Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?

    – Miguel Serrano
    Mar 8 at 16:24












  • @dorvak libs is subpath to dags, so folder structure is airflow -- dags -- libs, and libs is a py module so, init.py is there :

    – Miguel Serrano
    Mar 8 at 16:27












  • @dorvak libs has a __init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!

    – Miguel Serrano
    Mar 8 at 16:39











  • Alright, sorry, for the confusion (deleted my comments)

    – dorvak
    Mar 8 at 16:44

















  • I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins

    – Mntfr
    Mar 8 at 16:17











  • Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?

    – Miguel Serrano
    Mar 8 at 16:24












  • @dorvak libs is subpath to dags, so folder structure is airflow -- dags -- libs, and libs is a py module so, init.py is there :

    – Miguel Serrano
    Mar 8 at 16:27












  • @dorvak libs has a __init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!

    – Miguel Serrano
    Mar 8 at 16:39











  • Alright, sorry, for the confusion (deleted my comments)

    – dorvak
    Mar 8 at 16:44
















I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins

– Mntfr
Mar 8 at 16:17





I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins

– Mntfr
Mar 8 at 16:17













Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?

– Miguel Serrano
Mar 8 at 16:24






Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?

– Miguel Serrano
Mar 8 at 16:24














@dorvak libs is subpath to dags, so folder structure is airflow -- dags -- libs, and libs is a py module so, init.py is there :

– Miguel Serrano
Mar 8 at 16:27






@dorvak libs is subpath to dags, so folder structure is airflow -- dags -- libs, and libs is a py module so, init.py is there :

– Miguel Serrano
Mar 8 at 16:27














@dorvak libs has a __init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!

– Miguel Serrano
Mar 8 at 16:39





@dorvak libs has a __init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!

– Miguel Serrano
Mar 8 at 16:39













Alright, sorry, for the confusion (deleted my comments)

– dorvak
Mar 8 at 16:44





Alright, sorry, for the confusion (deleted my comments)

– dorvak
Mar 8 at 16:44












1 Answer
1






active

oldest

votes


















0














Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.



What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.






share|improve this answer























    Your Answer






    StackExchange.ifUsing("editor", function ()
    StackExchange.using("externalEditor", function ()
    StackExchange.using("snippets", function ()
    StackExchange.snippets.init();
    );
    );
    , "code-snippets");

    StackExchange.ready(function()
    var channelOptions =
    tags: "".split(" "),
    id: "1"
    ;
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function()
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled)
    StackExchange.using("snippets", function()
    createEditor();
    );

    else
    createEditor();

    );

    function createEditor()
    StackExchange.prepareEditor(
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader:
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    ,
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    );



    );













    draft saved

    draft discarded


















    StackExchange.ready(
    function ()
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55066989%2fairflow-python-callable-function-reusable%23new-answer', 'question_page');

    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    0














    Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.



    What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.






    share|improve this answer



























      0














      Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.



      What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.






      share|improve this answer

























        0












        0








        0







        Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.



        What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.






        share|improve this answer













        Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.



        What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Mar 29 at 10:51









        Miguel SerranoMiguel Serrano

        215




        215





























            draft saved

            draft discarded
















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid


            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.

            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55066989%2fairflow-python-callable-function-reusable%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            AWS Lex not identifying response if by a variable The 2019 Stack Overflow Developer Survey Results Are In Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern) The Ask Question Wizard is Live! Data science time! April 2019 and salary with experienceEnforcing custom enumeration in AWS LEX for slot valuesHow to give response based on user response in Amazon Lex?Intercepting AWS Lambda Response to a AWS Lex QueryLex chat bot error: Reached second execution of fulfillment lambda on the same utteranceamazon lex showing invalid responseLambda response send back to Lex slot?Response card in Amazon lexAmazon Lex - Lambda response return HTML to botHow can I solve 424 (Failed Dependency) (python) obtained from Amazon lex?

            Алба-Юлія

            Захаров Федір Захарович