Developing a custom GoogleDriveOperator

A common useful operation than can be automated is the recurrent upload of a document to a Google Drive folder. For example, this could be a monthly upload of an Excel analysis of sales data to be shared with the team.

The Airflow provider package apache-airflow-providers-google offers components that allow an easy integration with Google Services on GCP. However, it doesn’t directly provide an operator to upload a local file to Google Drive directly. Which is why in this article we will learn how to build our own Airflow operator that does just that.

We will do this in four consecutive steps:

  1. Configuring the Google Drive API and a creating service account on GCP
  2. Configuring Domain-wide Delegation on our Google Workspace
  3. Writing the code for our custom GoogleDriveOperator
  4. Testing a minimal DAG that uploads a text file to our Google Drive account

To follow these steps we will need:

  • A Google account with admin privileges on GCP and the Google Workspace it belongs to
  • An Airflow 2.0.x installation

I have created a public GitHub repo with an Airflow installation on Docker and all the code in this article. Clone the project and launch it with docker-compose run.

1. Configuring the Google Drive API and a creating service account on GCP

Airflow uploads files to Google Drive through the Google Drive API on Google Cloud Platform.

Go to https://cloud.google.com/ and click on Console. Accept the conditions. We are greeted by the following dashboard (that might differ a bit depending on your organization).

alt text

Creating a new GCP project

We will create a new GCP project dedicated to this feature. Click on Select a project and then on NEW PROJECT. Let’s call it “airflow-to-drive”. Select the project so that the name is displayed in the top bar.

alt text

Enabling the Google Drive API

Now that the project is set up we can enable the Google Drive API. In the search bar on the top of the page search for “Google Drive API” and select the first search result.

alt text

Enable the Google Drive API by clicking on ENABLE.

alt text

Creating a service account

Airflow will use a Google service account to authenticate against the Google Drive API. Service accounts are a special type of Google account that represent a non-human user.

From the menu go to APIs & Services and select Service Accounts.

alt text

Currently there are no service accounts in this project. Create one by clicking on create service account.

alt text

Let’s give the service account the same name as the project “airflow-to-drive”.

alt text

Grant the service account the Owner role for this project.

alt text

Click on Done. The service account is now visible.

alt text

Now that we have the service account, we create a keyfile JSON that contains its credentials. Airflow will use these credentials to authenticate against the Google Drive API. Go to the KEYS tab and click on Add key. Select create new key and choose the JSON format.

alt text

If you are using the provided GitHub repo, save the JSON keyfile in the docker/ directory. Make sure you add its path to .gitignore so that you don’t accidentally push your keyfile to GitHub. Do not share this key with anyone!

.
├── dags
│   └── upload_to_drive_dag.py
├── docker
│   ├── airflow-to-drive-308020-f4e5a3186f4d.json
│   └── Dockerfile
├── docker-compose.yml
├── .git
├── .gitignore
├── logs
├── plugins
│   └── google_drive_operator.py
├── postgresql
│   └── data
├── README.md
├── requirements.txt
└── tmp

Enabling Domain-wide Delegation

Domain-wide Delegation allows a service account to perform actions on behalf of another user within the Google Workspace. This is important because files created or uploaded by a service account are not visible by other users. With Domain-wide Delegation enabled for our service account, we will be able to upload files in the name of our own Google account so that the uploaded files are directly accessible.

Go to Service Accounts in the menu and click on the service account’s email. This opens a page that shows the unique ID of the service account. Take note of this number because we will need it when we configure our Google Workspace.

alt text

Expand the section under Domain-wide Delegation. Check the box that says Enable G Suite Domain-wide Delegation. We can enter “airflow.to.drive” as the product name for the consent screen. This name is not important because we won’t use this feature. Click on Save.

alt text

The GCP configuration is now complete!

2. Configuring Domain-wide Delegation on our Google Workspace

Next we have to authorize Domain-wide Delegation for the service account through our Google Workspace. Go to https://myaccount.google.com/. And click on Admin console in the top right corner.

alt text

Go to Security and then Settings.

alt text

On the bottom of the list, open the section API controls.

alt text

Find the section about Domain wide delegation and click on Manage domain wide delegation.

alt text

Currently, there are no entries. Click on Add new.

alt text

Fill in the service account’s unique Client ID we noted in the previous part and add the following scope: https://www.googleapis.com/auth/drive. This grants the service account with that particular client ID full access to Google Drive. Click on Authorize.

alt text

The service account is now authorized to upload files to Google Drive in the name of any user in that Google Workspace.

Google Workspace is now configured!

3. Writing the code for our custom GoogleDriveOperator

In this part we will go over the Python code of the custom Airflow operator.

Like all Airflow operators, our GoogleDriveOperator inherits from the Airflow BaseOperator class. The upload itself is performed by the GoogleDriveHook provided by the apache-airflow-providers-google package. The operator accepts the path to the local file we want to upload, the target folder on Google Drive and some arguments required by the GoogleDriveHook. If the parameter delete is set to True, the local file is deleted after a successful upload. Place this file in the plugins/ folder so that it can be readily imported in the DAG module.

import os

from airflow.models.dag import BaseOperator
from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
from airflow.utils.decorators import apply_defaults
from typing import Optional, Union, Sequence


class GoogleDriveOperator(BaseOperator):
    """Upload a list of files to a Google Drive folder.
    This operator uploads a list of local files to a Google Drive folder.
    The local fils is deleted after upload (optional).
    Args:
        local_path: Python list of local file paths (templated)
        drive_folder: path of the Drive folder
        api_version: version of the Google Drive API
        gcp_conn_id: Airflow Connections ID for GCP
        delegate_to: Google Account to which to delegate the file upload
            requires Domain-Wide Delegation enabled
        impersonation_chain: service account to impersonate
        delete: should the local file be deleted after upload?
    """

    template_fields = ('local_path', )

    @apply_defaults
    def __init__(
        self,
        local_path: str,
        drive_folder: str,
        gcp_conn_id: str,
        delegate_to: Optional[str] = None,
        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
        api_version: str = 'v3',
        delete: bool = True,
        **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.local_path = local_path
        self.drive_folder = drive_folder
        self.api_version = api_version
        self.gcp_conn_id = gcp_conn_id
        self.delegate_to = delegate_to
        self.impersonation_chain = impersonation_chain
        self.delete = delete

    def execute(self, context):
        hook = GoogleDriveHook(
            api_version=self.api_version,
            gcp_conn_id=self.gcp_conn_id,
            delegate_to=self.delegate_to,
            impersonation_chain=self.impersonation_chain
        )

        self.log.info(f'Uploading file: {self.local_path}')
        file_name = self.local_path.split('/')[-1]

        try:
            hook.upload_file(
                local_location=self.local_path,
                remote_location=os.path.join(self.drive_folder, file_name)
            )
        except FileNotFoundError:
            self.log.error(f"File {self.local_path} can't be found")

        if self.delete is True:
            os.remove(self.local_path)
            self.log.info(f'Deleted local file: {self.local_path}')

Example DAG

Now that everything is set up we can test the GoogleDriveOperator and upload some files to our personal Google Drive.

Let’s write a minimal daily DAG consisting of two tasks:

  1. Creating a text file with the current date in the tmp/ folder of the $AIRFLOW_HOME using the BashOperator
  2. Uploading this file to a specified folder on Google Drive using the GoogleDriveOperator
from datetime import datetime
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

from google_drive_operator import GoogleDriveOperator

dag = DAG(
    dag_id='upload_to_drive',
    description='Create spare parts analysis sheet in Google Drive',
    schedule_interval='@daily',
    start_date=datetime(2021, 2, 10),
    end_date=datetime(2021, 2, 13)
)

create_file = BashOperator(
    task_id='create_file',
    bash_command=(
        'echo file created on . > '
        '${AIRFLOW_HOME}/tmp/my_file_.txt'
    ),
    dag=dag
)

upload_file = GoogleDriveOperator(
    task_id='upload_file',
    local_path='tmp/my_file_.txt',
    drive_folder='google-drive-operator',
    gcp_conn_id='airflow-to-drive',
    delegate_to='denis@gontcharov.be',
)

create_file >> upload_file

This DAG will be executed daily from 2021–02–10 until 2021–02–13. On each day a file, e.g. my_file_2021-02-10.txt is created with the contents file created on 2021-02-10. This file is then uploaded to the folder google-drive-operator in the root of our Google account’s Drive. Note that the delegate_to parameter specifies the email-address to which the service accounts delegates the upload. This Google account will be the owner of the uploaded file.

Creating the Airflow connection

There is one last thing left to do: we have to create the Airflow connection specified by the gcp_conn_id in the upload_file task. Fire up the Airflow webserver and go to the Admin tab. Open Connections and create a new connection with the following fields:

  • Conn Id: gcp_conn_id
  • Conn Type: Google Cloud
  • Keyfile Path: /opt/airflow/«name of your JSON keyfile».json (alternatively, we can paste the contents of this JSON keyfile into the Keyfile JSON field)
  • Project Id: airflow-to-drive
  • Scopes: https://www.googleapis.com/auth/drive

alt text

Running the DAG

Go back to the DAGs section and activate the DAG.

alt text

After execution, the text files can be seen in our Google Drive folder.

alt text

Note that thanks to Domain-wide Delegation, the Owner of the files is “me”. If google-drive-operator is a shared folder, anyone who is authorized can access these files.