Airflow python callable function reusable Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern) The Ask Question Wizard is Live! Data science time! April 2019 and salary with experience Should we burninate the [wrap] tag?Calling an external command in PythonWhat are metaclasses in Python?Finding the index of an item given a list containing it in PythonDifference between append vs. extend list methods in PythonHow can I safely create a nested directory in Python?Does Python have a ternary conditional operator?How to get the current time in PythonUsing global variables in a functionHow to make a chain of function decorators?Does Python have a string 'contains' substring method?
WAN encapsulation
How much radiation do nuclear physics experiments expose researchers to nowadays?
Is the Standard Deduction better than Itemized when both are the same amount?
When -s is used with third person singular. What's its use in this context?
How can I make names more distinctive without making them longer?
What LEGO pieces have "real-world" functionality?
When is phishing education going too far?
What is the longest distance a 13th-level monk can jump while attacking on the same turn?
The logistics of corpse disposal
Is 1 ppb equal to 1 μg/kg?
How to deal with a team lead who never gives me credit?
Antler Helmet: Can it work?
Why is black pepper both grey and black?
Withdrew £2800, but only £2000 shows as withdrawn on online banking; what are my obligations?
Is there a service that would inform me whenever a new direct route is scheduled from a given airport?
How do I mention the quality of my school without bragging
Is a manifold-with-boundary with given interior and non-empty boundary essentially unique?
How to find all the available tools in macOS terminal?
What are the pros and cons of Aerospike nosecones?
When to stop saving and start investing?
Is there a concise way to say "all of the X, one of each"?
How to motivate offshore teams and trust them to deliver?
Check which numbers satisfy the condition [A*B*C = A! + B! + C!]
Should I call the interviewer directly, if HR aren't responding?
Airflow python callable function reusable
Announcing the arrival of Valued Associate #679: Cesar Manara
Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern)
The Ask Question Wizard is Live!
Data science time! April 2019 and salary with experience
Should we burninate the [wrap] tag?Calling an external command in PythonWhat are metaclasses in Python?Finding the index of an item given a list containing it in PythonDifference between append vs. extend list methods in PythonHow can I safely create a nested directory in Python?Does Python have a ternary conditional operator?How to get the current time in PythonUsing global variables in a functionHow to make a chain of function decorators?Does Python have a string 'contains' substring method?
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;
airflow_version = 1.10.2; python_version = 3.6.8
I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.
So, the following works:
def my_function(temp_file, task_id, **kwargs):
xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)
if not xcom_vals:
return 'Xcom message not retrieved'
ack_messages = []
for item in xcom_vals:
ack_messages += <do stuff>
return ack_messages
with DAG(<dag args>):
process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs='task_id': 'previous_task_id',
'temp_file': temp_file,
provide_context=True,
)
But, moving my_function to a module lib/helpers.py and then importing it fails with error.
Broken DAG: [path to dag] cannot import my_function
NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.
How should my_function be implemented so that it is callable by other dags?
python airflow
|
show 5 more comments
airflow_version = 1.10.2; python_version = 3.6.8
I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.
So, the following works:
def my_function(temp_file, task_id, **kwargs):
xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)
if not xcom_vals:
return 'Xcom message not retrieved'
ack_messages = []
for item in xcom_vals:
ack_messages += <do stuff>
return ack_messages
with DAG(<dag args>):
process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs='task_id': 'previous_task_id',
'temp_file': temp_file,
provide_context=True,
)
But, moving my_function to a module lib/helpers.py and then importing it fails with error.
Broken DAG: [path to dag] cannot import my_function
NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.
How should my_function be implemented so that it is callable by other dags?
python airflow
I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins
– Mntfr
Mar 8 at 16:17
Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?
– Miguel Serrano
Mar 8 at 16:24
@dorvak libs is subpath to dags, so folder structure isairflow -- dags -- libs, and libs is a py module so, init.py is there :
– Miguel Serrano
Mar 8 at 16:27
@dorvak libs has a__init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!
– Miguel Serrano
Mar 8 at 16:39
Alright, sorry, for the confusion (deleted my comments)
– dorvak
Mar 8 at 16:44
|
show 5 more comments
airflow_version = 1.10.2; python_version = 3.6.8
I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.
So, the following works:
def my_function(temp_file, task_id, **kwargs):
xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)
if not xcom_vals:
return 'Xcom message not retrieved'
ack_messages = []
for item in xcom_vals:
ack_messages += <do stuff>
return ack_messages
with DAG(<dag args>):
process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs='task_id': 'previous_task_id',
'temp_file': temp_file,
provide_context=True,
)
But, moving my_function to a module lib/helpers.py and then importing it fails with error.
Broken DAG: [path to dag] cannot import my_function
NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.
How should my_function be implemented so that it is callable by other dags?
python airflow
airflow_version = 1.10.2; python_version = 3.6.8
I am having trouble understanding how to make a python callable more reusable to airflow's PythonOperator, as the same function declared within the dag file itself works, but importing it from an helper lib fails.
So, the following works:
def my_function(temp_file, task_id, **kwargs):
xcom_vals = kwargs['ti'].xcom_pull(task_ids=task_id)
if not xcom_vals:
return 'Xcom message not retrieved'
ack_messages = []
for item in xcom_vals:
ack_messages += <do stuff>
return ack_messages
with DAG(<dag args>):
process_messages = PythonOperator(
task_id='get_messages',
python_callable=my_function,
op_kwargs='task_id': 'previous_task_id',
'temp_file': temp_file,
provide_context=True,
)
But, moving my_function to a module lib/helpers.py and then importing it fails with error.
Broken DAG: [path to dag] cannot import my_function
NOTE: lib/helpers.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's.
How should my_function be implemented so that it is callable by other dags?
python airflow
python airflow
edited Mar 15 at 15:54
Miguel Serrano
asked Mar 8 at 16:13
Miguel SerranoMiguel Serrano
215
215
I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins
– Mntfr
Mar 8 at 16:17
Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?
– Miguel Serrano
Mar 8 at 16:24
@dorvak libs is subpath to dags, so folder structure isairflow -- dags -- libs, and libs is a py module so, init.py is there :
– Miguel Serrano
Mar 8 at 16:27
@dorvak libs has a__init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!
– Miguel Serrano
Mar 8 at 16:39
Alright, sorry, for the confusion (deleted my comments)
– dorvak
Mar 8 at 16:44
|
show 5 more comments
I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins
– Mntfr
Mar 8 at 16:17
Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?
– Miguel Serrano
Mar 8 at 16:24
@dorvak libs is subpath to dags, so folder structure isairflow -- dags -- libs, and libs is a py module so, init.py is there :
– Miguel Serrano
Mar 8 at 16:27
@dorvak libs has a__init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!
– Miguel Serrano
Mar 8 at 16:39
Alright, sorry, for the confusion (deleted my comments)
– dorvak
Mar 8 at 16:44
I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins
– Mntfr
Mar 8 at 16:17
I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins
– Mntfr
Mar 8 at 16:17
Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?
– Miguel Serrano
Mar 8 at 16:24
Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?
– Miguel Serrano
Mar 8 at 16:24
@dorvak libs is subpath to dags, so folder structure is
airflow -- dags -- libs, and libs is a py module so, init.py is there :– Miguel Serrano
Mar 8 at 16:27
@dorvak libs is subpath to dags, so folder structure is
airflow -- dags -- libs, and libs is a py module so, init.py is there :– Miguel Serrano
Mar 8 at 16:27
@dorvak libs has a
__init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!– Miguel Serrano
Mar 8 at 16:39
@dorvak libs has a
__init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!– Miguel Serrano
Mar 8 at 16:39
Alright, sorry, for the confusion (deleted my comments)
– dorvak
Mar 8 at 16:44
Alright, sorry, for the confusion (deleted my comments)
– dorvak
Mar 8 at 16:44
|
show 5 more comments
1 Answer
1
active
oldest
votes
Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.
What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55066989%2fairflow-python-callable-function-reusable%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.
What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.
add a comment |
Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.
What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.
add a comment |
Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.
What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.
Figured this one out, it came down to the airflow UI and scheduler not parsing the libs folders correctly, some kind of lag behaviour after a git sync? So both the UI and scheduler were parsing the dag file correctly, but not the lib folder.
What solved this behaviour in the end was a restart both to the UI and scheduler pod (we're running airflow on kubernetes) and that was it.
answered Mar 29 at 10:51
Miguel SerranoMiguel Serrano
215
215
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55066989%2fairflow-python-callable-function-reusable%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
I had a similar problem what solved it for me was update the plugin path in file airflow.cfg plugins_folder = /airflow/plugins
– Mntfr
Mar 8 at 16:17
Actually plugins_folder is already set to $airflow_home/plugins. You are saying that my_function should be implemented with its own custom operator?
– Miguel Serrano
Mar 8 at 16:24
@dorvak libs is subpath to dags, so folder structure is
airflow -- dags -- libs, and libs is a py module so, init.py is there :– Miguel Serrano
Mar 8 at 16:27
@dorvak libs has a
__init__.py, on my previous comment double underscore was parsed as md. Thing is libs/helper.py contains other functions (although more simple) that are successfully imported and used in the current and other DAG's!– Miguel Serrano
Mar 8 at 16:39
Alright, sorry, for the confusion (deleted my comments)
– dorvak
Mar 8 at 16:44