Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conditionally import cloud hooks #4

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 37 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Github Plugin
This plugin moves data from the Github API to Google Cloud Storage based on the specified object.
This plugin moves data from the Github API to S3 or Google Cloud Storage based on the specified object.

## Hooks
### GithubHook
Expand All @@ -10,18 +10,40 @@ Core Airflow S3Hook with the standard boto dependency.

## Operators
### GithubtoCloudStorageOperator
This operator composes the logic for this plugin. It fetches the Github specified object and saves the result in GCS. The parameters it can accept include the following:
```:param src: Path to the local file. (templated)
:type src: str
:param dst: Destination path within the specified bucket. (templated)
:type dst: str
:param bucket: The bucket to upload to. (templated)
:type bucket: str
:param google_cloud_storage_conn_id: The Airflow connection ID to upload with
:type google_cloud_storage_conn_id: str
:param mime_type: The mime-type string
:type mime_type: str
:param delegate_to: The account to impersonate, if any
:type delegate_to: str
:param gzip: Allows for file to be compressed and uploaded as gzip
This operator composes the logic for this plugin. It fetches the Github specified object and saves the result in GCS or S3. The parameters it can accept include the following:
```
:param github_conn_id: The Github connection id.
:type github_conn_id: string
:param github_org: The Github organization.
:type github_org: string
:param github_repo: The Github repository. Required for
commits, commit_comments, issue_comments,
and issues objects.
:type github_repo: string
:param github_object: The desired Github object. The currently
supported values are:
- commits
- commit_comments
- issue_comments
- issues
- members
- organizations
- pull_requests
- repositories
:type github_object: string
:param payload: The associated github parameters to
pass into the object request as
keyword arguments.
:type payload: dict
:param destination: The final destination where the data
should be stored. Possible values include:
- GCS
- S3
:type destination: string
:param dest_conn_id: The destination connection id.
:type dest_conn_id: string
:param bucket: The bucket to be used to store the data.
:type bucket: string
:param key: The filename to be used to store the data.
:type key: string
```
3 changes: 2 additions & 1 deletion operators/github_to_cloud_storage_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from airflow.utils.decorators import apply_defaults
from airflow.models import BaseOperator
from airflow.hooks import S3Hook, GoogleCloudStorageHook

from github_plugin.hooks.github_hook import GithubHook

Expand Down Expand Up @@ -112,6 +111,7 @@ def output_manager(self, output):
output = '\n'.join([json.dumps(flatten(record)) for record in output])

if self.destination.lower() == 's3':
from airflow.hooks import S3Hook
s3 = S3Hook(self.dest_conn_id)

s3.load_string(
Expand All @@ -124,6 +124,7 @@ def output_manager(self, output):
s3.connection.close()

elif self.destination.lower() == 'gcs':
from airflow.hooks import GoogleCloudStorageHook
with NamedTemporaryFile('w') as tmp:
tmp.write(output)

Expand Down