How to call a REST end point using Airflow DAG

  airflow, airflow-scheduler, docker, rest

I’m new to Apache Airflow. I want to call a REST end point using DAG.
REST end point for example

@PostMapping(path = "/api/employees", consumes = "application/json")

Now I want to call this rest end point using Airflow DAG, and schedule it. What I’m doing is using SimpleHttpOperator to call the Rest end point.

t1 = SimpleHttpOperator(
task_id='post_op',
endpoint='http://localhost:8084/api/employees',
data=json.dumps({"department": "Digital","id": 102,"name": "Rakesh","salary": 80000}),
headers={"Content-Type": "application/json"},
dag=dag,)

When I trigger the DAG the task is getting failed

[2019-12-30 09:09:06,330] {{taskinstance.py:862}} INFO - Executing <Task(SimpleHttpOperator): 
post_op> on 2019-12-30T08:57:00.674386+00:00
[2019-12-30 09:09:06,331] {{base_task_runner.py:133}} INFO - Running: ['airflow', 'run', 
'example_http_operator', 'post_op', '2019-12-30T08:57:00.674386+00:00', '--job_id', '6', '--pool', 
'default_pool', '--raw', '-sd', 'DAGS_FOLDER/ExampleHttpOperator.py', '--cfg_path', 
'/tmp/tmpf9t6kzxb']
[2019-12-30 09:09:07,446] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30 
09:09:07,445] {{__init__.py:51}} INFO - Using executor SequentialExecutor
[2019-12-30 09:09:07,446] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30 
09:09:07,446] {{dagbag.py:92}} INFO - Filling up the DagBag from 
/usr/local/airflow/dags/ExampleHttpOperator.py
[2019-12-30 09:09:07,473] {{base_task_runner.py:115}} INFO - Job 6: Subtask post_op [2019-12-30 
09:09:07,472] {{cli.py:545}} INFO - Running <TaskInstance: example_http_operator.post_op 2019-12- 
30T08:57:00.674386+00:00 [running]> on host 855dbc2ce3a3
[2019-12-30 09:09:07,480] {{http_operator.py:87}} INFO - Calling HTTP method
[2019-12-30 09:09:07,483] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,483] 
{{base_hook.py:84}} INFO - Using connection to: id: http_default. Host: https://www.google.com/, 
Port: None, Schema: None, Login: None, Password: None, extra: {}
[2019-12-30 09:09:07,484] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,484] 
{{http_hook.py:131}} INFO - Sending 'POST' to url: 
https://www.google.com/http://localhost:8084/api/employees
[2019-12-30 09:09:07,501] {{logging_mixin.py:112}} INFO - [2019-12-30 09:09:07,501] 
{{http_hook.py:181}} WARNING - HTTPSConnectionPool(host='www.google.com', port=443): Max retries 
exceeded with url: /http://localhost:8084/api/employees (Caused by SSLError(SSLError("bad handshake: 
SysCallError(-1, 'Unexpected EOF')"))) Tenacity will retry to execute the operation
[2019-12-30 09:09:07,501] {{taskinstance.py:1058}} ERROR - 
HTTPSConnectionPool(host='www.google.com', port=443): Max retries exceeded with url: 
/http://localhost:8084/api/employees (Caused by SSLError(SSLError("bad handshake: SysCallError(-1, 
'Unexpected EOF')")))
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 485, in wrap_socket
cnx.do_handshake()
File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1934, in do_handshake
self._raise_ssl_error(self._ssl, result)
File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1664, in _raise_ssl_error
raise SysCallError(-1, "Unexpected EOF")
OpenSSL.SSL.SysCallError: (-1, 'Unexpected EOF')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 672, in urlopen
chunked=chunked,
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 376, in _make_request
self._validate_conn(conn)
File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 994, in _validate_conn
conn.connect()
File "/usr/local/lib/python3.7/site-packages/urllib3/connection.py", line 394, in connect
ssl_context=context,
File "/usr/local/lib/python3.7/site-packages/urllib3/util/ssl_.py", line 370, in ssl_wrap_socket
return context.wrap_socket(sock, server_hostname=server_hostname)
File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 491, in wrap_socket
raise ssl.SSLError("bad handshake: %r" % e)
ssl.SSLError: ("bad handshake: SysCallError(-1, 'Unexpected EOF')",)

Airflow is running on Docker and the docker image is puckel/docker-airflow.
Why it is calling the host http_default. Host: https://www.google.com/

Source: StackOverflow

LEAVE A COMMENT