Skip to content
This repository has been archived by the owner on Mar 1, 2024. It is now read-only.

Airbyte based readers #428

Merged
merged 15 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions llama_hub/airbyte_cdk/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test.py
42 changes: 42 additions & 0 deletions llama_hub/airbyte_cdk/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Airbyte CDK Loader

The Airbyte CDK Loader is a shim for sources creating using the [Airbyte Python CDK](https://docs.airbyte.com/connector-development/cdk-python/). It allows you to load data from any Airbyte source into LlamaIndex.
flash1293 marked this conversation as resolved.
Show resolved Hide resolved

## Installation

* Install llama_hub: `pip install llama_hub`
* Install airbyte-cdk: `pip install airbyte-cdk`
* Install a source via git (or implement your own): `pip install git+https://github.com/airbytehq/airbyte.git@master#egg=source_github&subdirectory=airbyte-integrations/connectors/source-github`
flash1293 marked this conversation as resolved.
Show resolved Hide resolved

## Usage

Here's an example usage of the AirbyteCdkReader.

```python
from llama_index import download_loader
from llama_hub.airbyte_cdk.base import AirbyteCDKReader
from source_github.source import SourceGithub


github_config = {
# ...
}
reader = AirbyteCDKReader(source_class=SourceGithub,config=github_config)
documents = reader.load_data(stream_name="issues")
```

By default all fields are stored as metadata in the documents and the text is set to an empty string. Construct the text of the document by transforming the documents returned by the reader.

## Incremental loads

If a stream supports it, this loader can be used to load data incrementally (only returning documents that weren't loaded last time or got updated in the meantime):
```python

reader = AirbyteCDKReader(source_class=SourceGithub,config=github_config)
documents = reader.load_data(stream_name="issues")
current_state = reader.last_state # can be pickled away or stored otherwise

updated_documents = reader.load_data(stream_name="issues", state=current_state) # only loads documents that were updated since last time
```

This loader is designed to be used as a way to load data into [LlamaIndex](https://github.com/jerryjliu/gpt_index/tree/main/gpt_index) and/or subsequently used as a Tool in a [LangChain](https://github.com/hwchase17/langchain) Agent. See [here](https://github.com/emptycrown/llama-hub/tree/main) for examples.
Empty file.
33 changes: 33 additions & 0 deletions llama_hub/airbyte_cdk/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import Any, List, Mapping, Optional

from llama_index.readers.base import BaseReader
from llama_index.readers.schema.base import Document
from airbyte_protocol.models.airbyte_protocol import AirbyteRecordMessage
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we should figure out a better test system on our side, but currently tests are failing because we assume that third-party imports are lazy imports.

i know specifically for AirbyteRecordMessage it's used in the RecordHandler type used to type record_handler. For this do you think we could type record_handler as Any first, and then within __init__ lazy import AirbyteRecordMessage, import RecordHandler, and do an isinstance() check on record_handler?

A bit hacky but will at least allow tests to pass. thanks

from airbyte_cdk.sources.embedded.base_integration import BaseEmbeddedIntegration
from airbyte_cdk.sources.embedded.runner import CDKRunner


class AirbyteCDKReader(BaseReader, BaseEmbeddedIntegration):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would prefer composition vs. multiple inheritance, easier to inspect the specific object (BaseEmbeddedIntegration) you're using

"""AirbyteCDKReader reader.

Retrieve documents from an Airbyte source implemented using the CDK.

Args:
source_class: The Airbyte source class.
config: The config object for the Airbyte source.
"""

def __init__(
self,
source_class: Any,
config: Mapping[str, Any],
) -> None:
"""Initialize with parameters."""

super().__init__(config=config, runner=CDKRunner(source=source_class(), name=source_class.__name__))
flash1293 marked this conversation as resolved.
Show resolved Hide resolved

def _handle_record(self, record: AirbyteRecordMessage, id: Optional[str]) -> Document:
return Document(doc_id=id,text="", extra_info=record.data)

def load_data(self, *args: Any, **load_kwargs: Any) -> List[Document]:
return list(self._load_data(*args, **load_kwargs))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oof, guessing there's no iterator method available for llamaindex?

Could we implement one anyway / is there any value in doing so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, what do you think, @jerryjliu ? Probably something we can split out of the first PR though

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by default every loader implements load_data but you're free to define custom methods too!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK cool, let's add a lazy_load as well that's returning an iterable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity where is _load_data defined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's on the base integration class that's coming from the CDK. We chose this approach so we can roll out improvements without having to update the loader itself

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see, makes sense

2 changes: 2 additions & 0 deletions llama_hub/airbyte_cdk/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
airbyte-cdk
airbyte-protocol-models
1 change: 1 addition & 0 deletions llama_hub/airbyte_hubspot/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test.py
54 changes: 54 additions & 0 deletions llama_hub/airbyte_hubspot/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Airbyte Hubspot Loader

The Airbyte Hubspot Loader allows you to access different Hubspot objects.

## Installation

* Install llama_hub: `pip install llama_hub`
* Install the hubspot source: `pip install source_hubspot`

## Usage

Here's an example usage of the AirbyteHubspotReader.

```python
from llama_hub.airbyte_hubspot.base import AirbyteHubspotReader

hubspot_config = {
# ...
}
reader = AirbyteHubspotReader(config=hubspot_config)
documents = reader.load_data(stream_name="products")
```

## Configuration

Check out the [Airbyte documentation page](https://docs.airbyte.com/integrations/sources/hubspot/) for details about how to configure the reader.
The JSON schema the config object should adhere to can be found on Github: [https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-hubspot/source_hubspot/spec.yaml](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-hubspot/source_hubspot/spec.yaml).

The general shape looks like this:
```python
{
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
"credentials": {
"credentials_title": "Private App Credentials",
"access_token": "<access token of your private app>"
}
}
```

By default all fields are stored as metadata in the documents and the text is set to an empty string. Construct the text of the document by transforming the documents returned by the reader.

## Incremental loads

This loader supports loading data incrementally (only returning documents that weren't loaded last time or got updated in the meantime):
```python

reader = AirbyteHubspotReader(...so many things...)
documents = reader.load_data(stream_name="products")
current_state = reader.last_state # can be pickled away or stored otherwise

updated_documents = reader.load_data(stream_name="products", state=current_state) # only loads documents that were updated since last time
```

This loader is designed to be used as a way to load data into [LlamaIndex](https://github.com/jerryjliu/gpt_index/tree/main/gpt_index) and/or subsequently used as a Tool in a [LangChain](https://github.com/hwchase17/langchain) Agent. See [here](https://github.com/emptycrown/llama-hub/tree/main) for examples.
Empty file.
21 changes: 21 additions & 0 deletions llama_hub/airbyte_hubspot/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import Any, Mapping
from llama_hub.airbyte_cdk.base import AirbyteCDKReader


class AirbyteHubspotReader(AirbyteCDKReader):
"""AirbyteHubspotReader reader.

Retrieve documents from Hubspot

Args:
config: The config object for the hubspot source.
"""

def __init__(
self,
config: Mapping[str, Any],
) -> None:
"""Initialize with parameters."""
import source_hubspot

super().__init__(source_class=source_hubspot.SourceHubspot, config=config)
1 change: 1 addition & 0 deletions llama_hub/airbyte_hubspot/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
source_hubspot
1 change: 1 addition & 0 deletions llama_hub/airbyte_salesforce/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test.py
59 changes: 59 additions & 0 deletions llama_hub/airbyte_salesforce/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Airbyte Salesforce Loader

The Airbyte Salesforce Loader allows you to access different Salesforce objects.

## Installation

* Install llama_hub: `pip install llama_hub`
* Install the salesforce source: `pip install source_salesforce`

## Usage

Here's an example usage of the AirbyteSalesforceReader.

```python
from llama_hub.airbyte_salesforce.base import AirbyteSalesforceReader

salesforce_config = {
# ...
}
reader = AirbyteSalesforceReader(config=salesforce_config)
documents = reader.load_data(stream_name="asset")
```

## Configuration

Check out the [Airbyte documentation page](https://docs.airbyte.com/integrations/sources/salesforce/) for details about how to configure the reader.
The JSON schema the config object should adhere to can be found on Github: [https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml).

The general shape looks like this:
```python
{
"client_id": "<oauth client id>",
"client_secret": "<oauth client secret>",
"refresh_token": "<oauth refresh token>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
"is_sandbox": False, # set to True if you're using a sandbox environment
"streams_criteria": [ # Array of filters for salesforce objects that should be loadable
{"criteria": "exacts", "value": "Account"}, # Exact name of salesforce object
{"criteria": "starts with", "value": "Asset"}, # Prefix of the name
# Other allowed criteria: ends with, contains, starts not with, ends not with, not contains, not exacts
],
}
```

By default all fields are stored as metadata in the documents and the text is set to an empty string. Construct the text of the document by transforming the documents returned by the reader.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add an example for how we expect people to do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly thought of a simple for loop, added an example for that. I couldn't find a concept within llama index that would abstract this away, @jerryjliu what do you think about this?


## Incremental loads

This loader supports loading data incrementally (only returning documents that weren't loaded last time or got updated in the meantime):
```python

reader = AirbyteSalesforceReader(...so many things...)
documents = reader.load_data(stream_name="asset")
current_state = reader.last_state # can be pickled away or stored otherwise

updated_documents = reader.load_data(stream_name="asset", state=current_state) # only loads documents that were updated since last time
```

This loader is designed to be used as a way to load data into [LlamaIndex](https://github.com/jerryjliu/gpt_index/tree/main/gpt_index) and/or subsequently used as a Tool in a [LangChain](https://github.com/hwchase17/langchain) Agent. See [here](https://github.com/emptycrown/llama-hub/tree/main) for examples.
Empty file.
21 changes: 21 additions & 0 deletions llama_hub/airbyte_salesforce/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import Any, Mapping
from llama_hub.airbyte_cdk.base import AirbyteCDKReader


class AirbyteSalesforceReader(AirbyteCDKReader):
"""AirbyteSalesforceReader reader.

Retrieve documents from Salesforce

Args:
config: The config object for the salesforce source.
"""

def __init__(
self,
config: Mapping[str, Any],
) -> None:
"""Initialize with parameters."""
import source_salesforce

super().__init__(source_class=source_salesforce.SourceSalesforce, config=config)
1 change: 1 addition & 0 deletions llama_hub/airbyte_salesforce/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
airbyte-source-salesforce
1 change: 1 addition & 0 deletions llama_hub/airbyte_shopify/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test.py
flash1293 marked this conversation as resolved.
Show resolved Hide resolved
55 changes: 55 additions & 0 deletions llama_hub/airbyte_shopify/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Airbyte Shopify Loader

The Airbyte Shopify Loader allows you to access different Shopify objects.

## Installation

* Install llama_hub: `pip install llama_hub`
* Install the shopify source: `pip install source_shopify`

## Usage

Here's an example usage of the AirbyteShopifyReader.

```python
from llama_hub.airbyte_shopify.base import AirbyteShopifyReader

shopify_config = {
# ...
}
reader = AirbyteShopifyReader(config=shopify_config)
documents = reader.load_data(stream_name="orders")
```

## Configuration

Check out the [Airbyte documentation page](https://docs.airbyte.com/integrations/sources/shopify/) for details about how to configure the reader.
The JSON schema the config object should adhere to can be found on Github: [https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-shopify/source_shopify/spec.json](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-shopify/source_shopify/spec.json).

The general shape looks like this:
```python
{
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
"shop": "<name of the shop you want to retrieve documents from>",
"credentials": {
"auth_method": "api_password",
"api_password": "<your api password>"
}
}
```

By default all fields are stored as metadata in the documents and the text is set to an empty string. Construct the text of the document by transforming the documents returned by the reader.

## Incremental loads

This loader supports loading data incrementally (only returning documents that weren't loaded last time or got updated in the meantime):
```python

reader = AirbyteShopifyReader(...so many things...)
documents = reader.load_data(stream_name="orders")
current_state = reader.last_state # can be pickled away or stored otherwise

updated_documents = reader.load_data(stream_name="orders", state=current_state) # only loads documents that were updated since last time
```

This loader is designed to be used as a way to load data into [LlamaIndex](https://github.com/jerryjliu/gpt_index/tree/main/gpt_index) and/or subsequently used as a Tool in a [LangChain](https://github.com/hwchase17/langchain) Agent. See [here](https://github.com/emptycrown/llama-hub/tree/main) for examples.
Empty file.
21 changes: 21 additions & 0 deletions llama_hub/airbyte_shopify/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import Any, Mapping
from llama_hub.airbyte_cdk.base import AirbyteCDKReader


class AirbyteShopifyReader(AirbyteCDKReader):
"""AirbyteShopifyReader reader.

Retrieve documents from Shopify

Args:
config: The config object for the shopify source.
"""

def __init__(
self,
config: Mapping[str, Any],
) -> None:
"""Initialize with parameters."""
import source_shopify

super().__init__(source_class=source_shopify.SourceShopify, config=config)
1 change: 1 addition & 0 deletions llama_hub/airbyte_shopify/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
airbyte-source-shopify
1 change: 1 addition & 0 deletions llama_hub/airbyte_stripe/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test.py
53 changes: 53 additions & 0 deletions llama_hub/airbyte_stripe/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Airbyte Stripe Loader

The Airbyte Stripe Loader allows you to access different Stripe objects.

## Installation

* Install llama_hub: `pip install llama_hub`
* Install the stripe source: `pip install source_stripe`

## Usage

Here's an example usage of the AirbyteStripeReader.

```python
from llama_hub.airbyte_stripe.base import AirbyteStripeReader

stripe_config = {
# ...
}
reader = AirbyteStripeReader(config=stripe_config)
documents = reader.load_data(stream_name="invoices")
```

## Configuration

Check out the [Airbyte documentation page](https://docs.airbyte.com/integrations/sources/stripe/) for details about how to configure the reader.
The JSON schema the config object should adhere to can be found on Github: [https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/spec.yaml).

The general shape looks like this:
```python

{
"client_secret": "<secret key>",
"account_id": "<account id>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
}
```

By default all fields are stored as metadata in the documents and the text is set to an empty string. Construct the text of the document by transforming the documents returned by the reader.

## Incremental loads

This loader supports loading data incrementally (only returning documents that weren't loaded last time or got updated in the meantime):
```python

reader = AirbyteStripeReader(...so many things...)
documents = reader.load_data(stream_name="invoices")
current_state = reader.last_state # can be pickled away or stored otherwise

updated_documents = reader.load_data(stream_name="invoices", state=current_state) # only loads documents that were updated since last time
```

This loader is designed to be used as a way to load data into [LlamaIndex](https://github.com/jerryjliu/gpt_index/tree/main/gpt_index) and/or subsequently used as a Tool in a [LangChain](https://github.com/hwchase17/langchain) Agent. See [here](https://github.com/emptycrown/llama-hub/tree/main) for examples.
Empty file.
Loading