Skip to content
This repository has been archived by the owner on Mar 1, 2024. It is now read-only.

Airbyte based readers #428

Merged
merged 15 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions llama_hub/airbyte_cdk/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test.py
54 changes: 54 additions & 0 deletions llama_hub/airbyte_cdk/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Airbyte CDK Loader

The Airbyte CDK Loader is a shim for sources created using the [Airbyte Python CDK](https://docs.airbyte.com/connector-development/cdk-python/). It allows you to load data from any Airbyte source into LlamaIndex.

## Installation

* Install llama_hub: `pip install llama_hub`
* Install airbyte-cdk: `pip install airbyte-cdk`
* Install a source via git (or implement your own): `pip install git+https://github.com/airbytehq/airbyte.git@master#egg=source_github&subdirectory=airbyte-integrations/connectors/source-github`
flash1293 marked this conversation as resolved.
Show resolved Hide resolved

## Usage

Implement and import your own source. You can find lots of resources for how to achieve this on the [Airbyte documentation page](https://docs.airbyte.com/connector-development/).

Here's an example usage of the AirbyteCdkReader.

```python
from llama_index import download_loader
from llama_hub.airbyte_cdk.base import AirbyteCDKReader
from source_github.source import SourceGithub # this is just an example, you can use any source here - this one is loaded from the Airbyte Github repo via pip install git+https://github.com/airbytehq/airbyte.git@master#egg=source_github&subdirectory=airbyte-integrations/connectors/source-github`


github_config = {
# ...
}
reader = AirbyteCDKReader(source_class=SourceGithub,config=github_config)
documents = reader.load_data(stream_name="issues")
```

By default all fields are stored as metadata in the documents and the text is set to the JSON representation of all the fields. Construct the text of the document by passing a `record_handler` to the reader:
```python
def handle_record(record, id):
return Document(doc_id=id, text=record.data["title"], extra_info=record.data)

reader = AirbyteCDKReader(source_class=SourceGithub,config=github_config, record_handler=handle_record)
```

## Lazy loads

The `reader.load_data` endpoint will collect all documents and return them as a list. If there are a large number of documents, this can cause issues. By using `reader.lazy_load_data` instead, an iterator is returned which can be consumed document by document without the need to keep all documents in memory.

## Incremental loads

If a stream supports it, this loader can be used to load data incrementally (only returning documents that weren't loaded last time or got updated in the meantime):
```python

reader = AirbyteCDKReader(source_class=SourceGithub,config=github_config)
documents = reader.load_data(stream_name="issues")
current_state = reader.last_state # can be pickled away or stored otherwise

updated_documents = reader.load_data(stream_name="issues", state=current_state) # only loads documents that were updated since last time
```

This loader is designed to be used as a way to load data into [LlamaIndex](https://github.com/jerryjliu/gpt_index/tree/main/gpt_index) and/or subsequently used as a Tool in a [LangChain](https://github.com/hwchase17/langchain) Agent. See [here](https://github.com/emptycrown/llama-hub/tree/main) for examples.
Empty file.
54 changes: 54 additions & 0 deletions llama_hub/airbyte_cdk/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import json
from typing import Any, Callable, Iterator, List, Mapping, Optional

from llama_index.readers.base import BaseReader
from llama_index.readers.schema.base import Document
from airbyte_protocol.models.airbyte_protocol import AirbyteRecordMessage
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we should figure out a better test system on our side, but currently tests are failing because we assume that third-party imports are lazy imports.

i know specifically for AirbyteRecordMessage it's used in the RecordHandler type used to type record_handler. For this do you think we could type record_handler as Any first, and then within __init__ lazy import AirbyteRecordMessage, import RecordHandler, and do an isinstance() check on record_handler?

A bit hacky but will at least allow tests to pass. thanks

from airbyte_cdk.sources.embedded.base_integration import BaseEmbeddedIntegration
from airbyte_cdk.sources.embedded.runner import CDKRunner

RecordHandler = Callable[[AirbyteRecordMessage, Optional[str]], Document]


class AirbyteCDKReader(BaseReader):
"""AirbyteCDKReader reader.

Retrieve documents from an Airbyte source implemented using the CDK.

Args:
source_class: The Airbyte source class.
config: The config object for the Airbyte source.
"""

def __init__(
self,
source_class: Any,
config: Mapping[str, Any],
record_handler: Optional[RecordHandler] = None,
) -> None:
"""Initialize with parameters."""

class CDKIntegration(BaseEmbeddedIntegration):
def _handle_record(
self, record: AirbyteRecordMessage, id: Optional[str]
) -> Document:
if record_handler:
return record_handler(record, id)
return Document(
doc_id=id, text=json.dumps(record.data), extra_info=record.data
)

self._integration = CDKIntegration(
config=config,
runner=CDKRunner(source=source_class(), name=source_class.__name__),
)

def load_data(self, *args: Any, **kwargs: Any) -> List[Document]:
return list(self.lazy_load_data(*args, **kwargs))

def lazy_load_data(self, *args: Any, **kwargs: Any) -> Iterator[Document]:
return self._integration._load_data(*args, **kwargs)

@property
def last_state(self):
return self._integration.last_state
2 changes: 2 additions & 0 deletions llama_hub/airbyte_cdk/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
airbyte-cdk
airbyte-protocol-models
62 changes: 62 additions & 0 deletions llama_hub/airbyte_gong/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Airbyte Gong Loader

The Airbyte Gong Loader allows you to access different Gong objects.

## Installation

* Install llama_hub: `pip install llama_hub`
* Install the gong source: `pip install airbyte-source-gong`

## Usage

Here's an example usage of the AirbyteGongReader.

```python
from llama_hub.airbyte_gong.base import AirbyteGongReader

gong_config = {
# ...
}
reader = AirbyteGongReader(config=gong_config)
documents = reader.load_data(stream_name="calls")
```

## Configuration

Check out the [Airbyte documentation page](https://docs.airbyte.com/integrations/sources/gong/) for details about how to configure the reader.
The JSON schema the config object should adhere to can be found on Github: [https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-gong/source_gong/spec.yaml](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-gong/source_gong/spec.yaml).

The general shape looks like this:
```python
{
"access_key": "<access key name>",
"access_key_secret": "<access key secret>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
}
```

By default all fields are stored as metadata in the documents and the text is set to the JSON representation of all the fields. Construct the text of the document by passing a `record_handler` to the reader:
```python
def handle_record(record, id):
return Document(doc_id=id, text=record.data["title"], extra_info=record.data)

reader = AirbyteGongReader(config=gong_config, record_handler=handle_record)
```

## Lazy loads

The `reader.load_data` endpoint will collect all documents and return them as a list. If there are a large number of documents, this can cause issues. By using `reader.lazy_load_data` instead, an iterator is returned which can be consumed document by document without the need to keep all documents in memory.

## Incremental loads

This loader supports loading data incrementally (only returning documents that weren't loaded last time or got updated in the meantime):
```python

reader = AirbyteGongReader(config={...})
documents = reader.load_data(stream_name="calls")
current_state = reader.last_state # can be pickled away or stored otherwise

updated_documents = reader.load_data(stream_name="calls", state=current_state) # only loads documents that were updated since last time
```

This loader is designed to be used as a way to load data into [LlamaIndex](https://github.com/jerryjliu/gpt_index/tree/main/gpt_index) and/or subsequently used as a Tool in a [LangChain](https://github.com/hwchase17/langchain) Agent. See [here](https://github.com/emptycrown/llama-hub/tree/main) for examples.
Empty file.
22 changes: 22 additions & 0 deletions llama_hub/airbyte_gong/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import Any, Mapping, Optional
from llama_hub.airbyte_cdk.base import AirbyteCDKReader, RecordHandler


class AirbyteGongReader(AirbyteCDKReader):
"""AirbyteGongReader reader.

Retrieve documents from Gong

Args:
config: The config object for the gong source.
"""

def __init__(
self,
config: Mapping[str, Any],
record_handler: Optional[RecordHandler] = None,
) -> None:
"""Initialize with parameters."""
import source_gong

super().__init__(source_class=source_gong.SourceGong, config=config, record_handler=record_handler)
1 change: 1 addition & 0 deletions llama_hub/airbyte_gong/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
airbyte-source-gong
64 changes: 64 additions & 0 deletions llama_hub/airbyte_hubspot/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Airbyte Hubspot Loader

The Airbyte Hubspot Loader allows you to access different Hubspot objects.

## Installation

* Install llama_hub: `pip install llama_hub`
* Install the hubspot source: `pip install airbyte-source-hubspot`

## Usage

Here's an example usage of the AirbyteHubspotReader.

```python
from llama_hub.airbyte_hubspot.base import AirbyteHubspotReader

hubspot_config = {
# ...
}
reader = AirbyteHubspotReader(config=hubspot_config)
documents = reader.load_data(stream_name="products")
```

## Configuration

Check out the [Airbyte documentation page](https://docs.airbyte.com/integrations/sources/hubspot/) for details about how to configure the reader.
The JSON schema the config object should adhere to can be found on Github: [https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-hubspot/source_hubspot/spec.yaml](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-hubspot/source_hubspot/spec.yaml).

The general shape looks like this:
```python
{
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
"credentials": {
"credentials_title": "Private App Credentials",
"access_token": "<access token of your private app>"
}
}
```

By default all fields are stored as metadata in the documents and the text is set to the JSON representation of all the fields. Construct the text of the document by passing a `record_handler` to the reader:
```python
def handle_record(record, id):
return Document(doc_id=id, text=record.data["title"], extra_info=record.data)

reader = AirbyteHubspotReader(config=hubspot_config, record_handler=handle_record)
```

## Lazy loads

The `reader.load_data` endpoint will collect all documents and return them as a list. If there are a large number of documents, this can cause issues. By using `reader.lazy_load_data` instead, an iterator is returned which can be consumed document by document without the need to keep all documents in memory.

## Incremental loads

This loader supports loading data incrementally (only returning documents that weren't loaded last time or got updated in the meantime):
```python

reader = AirbyteHubspotReader(config={...})
documents = reader.load_data(stream_name="products")
current_state = reader.last_state # can be pickled away or stored otherwise

updated_documents = reader.load_data(stream_name="products", state=current_state) # only loads documents that were updated since last time
```

This loader is designed to be used as a way to load data into [LlamaIndex](https://github.com/jerryjliu/gpt_index/tree/main/gpt_index) and/or subsequently used as a Tool in a [LangChain](https://github.com/hwchase17/langchain) Agent. See [here](https://github.com/emptycrown/llama-hub/tree/main) for examples.
Empty file.
22 changes: 22 additions & 0 deletions llama_hub/airbyte_hubspot/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import Any, Mapping, Optional
from llama_hub.airbyte_cdk.base import AirbyteCDKReader, RecordHandler


class AirbyteHubspotReader(AirbyteCDKReader):
"""AirbyteHubspotReader reader.

Retrieve documents from Hubspot

Args:
config: The config object for the hubspot source.
"""

def __init__(
self,
config: Mapping[str, Any],
record_handler: Optional[RecordHandler] = None,
) -> None:
"""Initialize with parameters."""
import source_hubspot

super().__init__(source_class=source_hubspot.SourceHubspot, config=config, record_handler=record_handler)
1 change: 1 addition & 0 deletions llama_hub/airbyte_hubspot/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
source_hubspot
69 changes: 69 additions & 0 deletions llama_hub/airbyte_salesforce/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Airbyte Salesforce Loader

The Airbyte Salesforce Loader allows you to access different Salesforce objects.

## Installation

* Install llama_hub: `pip install llama_hub`
* Install the salesforce source: `pip install airbyte-source-salesforce`

## Usage

Here's an example usage of the AirbyteSalesforceReader.

```python
from llama_hub.airbyte_salesforce.base import AirbyteSalesforceReader

salesforce_config = {
# ...
}
reader = AirbyteSalesforceReader(config=salesforce_config)
documents = reader.load_data(stream_name="asset")
```

## Configuration

Check out the [Airbyte documentation page](https://docs.airbyte.com/integrations/sources/salesforce/) for details about how to configure the reader.
The JSON schema the config object should adhere to can be found on Github: [https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml).

The general shape looks like this:
```python
{
"client_id": "<oauth client id>",
"client_secret": "<oauth client secret>",
"refresh_token": "<oauth refresh token>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
"is_sandbox": False, # set to True if you're using a sandbox environment
"streams_criteria": [ # Array of filters for salesforce objects that should be loadable
{"criteria": "exacts", "value": "Account"}, # Exact name of salesforce object
{"criteria": "starts with", "value": "Asset"}, # Prefix of the name
# Other allowed criteria: ends with, contains, starts not with, ends not with, not contains, not exacts
],
}
```

By default all fields are stored as metadata in the documents and the text is set to the JSON representation of all the fields. Construct the text of the document by passing a `record_handler` to the reader:
```python
def handle_record(record, id):
return Document(doc_id=id, text=record.data["title"], extra_info=record.data)

reader = AirbyteSalesforceReader(config=salesforce_config, record_handler=handle_record)
```

## Lazy loads

The `reader.load_data` endpoint will collect all documents and return them as a list. If there are a large number of documents, this can cause issues. By using `reader.lazy_load_data` instead, an iterator is returned which can be consumed document by document without the need to keep all documents in memory.

## Incremental loads

This loader supports loading data incrementally (only returning documents that weren't loaded last time or got updated in the meantime):
```python

reader = AirbyteSalesforceReader(config={...})
documents = reader.load_data(stream_name="asset")
current_state = reader.last_state # can be pickled away or stored otherwise

updated_documents = reader.load_data(stream_name="asset", state=current_state) # only loads documents that were updated since last time
```

This loader is designed to be used as a way to load data into [LlamaIndex](https://github.com/jerryjliu/gpt_index/tree/main/gpt_index) and/or subsequently used as a Tool in a [LangChain](https://github.com/hwchase17/langchain) Agent. See [here](https://github.com/emptycrown/llama-hub/tree/main) for examples.
Empty file.
22 changes: 22 additions & 0 deletions llama_hub/airbyte_salesforce/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import Any, Mapping, Optional
from llama_hub.airbyte_cdk.base import AirbyteCDKReader, RecordHandler


class AirbyteSalesforceReader(AirbyteCDKReader):
"""AirbyteSalesforceReader reader.

Retrieve documents from Salesforce

Args:
config: The config object for the salesforce source.
"""

def __init__(
self,
config: Mapping[str, Any],
record_handler: Optional[RecordHandler] = None,
) -> None:
"""Initialize with parameters."""
import source_salesforce

super().__init__(source_class=source_salesforce.SourceSalesforce, config=config, record_handler=record_handler)
1 change: 1 addition & 0 deletions llama_hub/airbyte_salesforce/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
airbyte-source-salesforce
Loading
Loading