Airbyte 是 2020 年 7 月推出的最新开源 ETL 工具之一。它不同于其他 ETL 工具,因为它通过 UI 和 API 提供开箱即用的连接器,允许社区开发人员监控和维护该工具。连接器作为 Docker 容器运行,可以以您选择的语言构建。通过提供模块化组件和可选功能子集,Airbyte 提供了更大的灵活性。

快速开始

本地部署Airbyte

git clone https://github.com/airbytehq/airbyte.git
cd airbyte
docker-compose up

启动后访问: http://localhost:8000

添加数据源

image-20220619122412393

添加目标源

首先先用docker-compose在本地启动一个postgres数据库

  1. mkdir ./data

  2. 创建docker-compose.yaml:

    version: "3"
    services:
      postgres:
        image: postgres:14
        container_name: postgres
        restart: always
        environment:
          POSTGRES_DB: postgres
          POSTGRES_USER: postgres
          POSTGRES_PASSWORD: 123456
        ports:
            - 5432:5432
        volumes:
          - ./data:/var/lib/postgresql/data
    
  3. docker-compose up -d启动

然后再Airbyte上配置目标源

image-20220619134741698

建立连接

image-20220619134925755

image-20220619134942144

image-20220619135033394

触发同步

image-20220619135226032

结果查看

image-20220619135522388

使用Airbyte CDK开发自定义API数据源

创建数据源模板

cd airbyte-integrations/connector-templates/generator # assumes you are starting from the root of the Airbyte project.
# Install NPM from https://www.npmjs.com/get-npm if you don't have it
./generate.sh

选择Python HTTP API Source,输入数据源名称,这里使用sola

image-20220619140718389

安装python依赖

cd ../../connectors/source-sola
python -m venv .venv # Create a virtual environment in the .venv directory
source .venv/bin/activate # enable the venv
pip install -r requirements.txt

执行样例

python main.py spec

输出以下内容说明初始化项目成功

{"type": "SPEC", "spec": {"documentationUrl": "https://docsurl.com", "connectionSpecification": {"$schema": "http://json-schema.org/draft-07/schema#", "title": "Solar Spec", "type": "object", "required": ["TODO"], "additionalProperties": false, "properties": {"TODO": {"type": "string", "description": "describe me"}}}}}

定义输入参数

实现这一点的最简单方法是创建一个spec.yaml文件,在source_<name>/spec.yaml该文件中根据ConnectorSpecification模式描述连接器的输入。在开发源代码时,这是一个很好的起点。使用 JsonSchema,定义输入是什么(例如用户名和密码)。

vi ./source_solar/spec.yaml
documentationUrl: https://api.le-systeme-solaire.net/swagger/#/
connectionSpecification:
  $schema: http://json-schema.org/draft-07/schema#
  title: Solar System openData
  type: object
  required:
    - page_size
  additionalProperties: false
  properties:
    page_size:
      type: integer
      title: Page size
      description: 分页大小
      default: 10
      minimum: 1
      pattern: \d+

连接健康检查

此操作验证用户提供的输入配置是否可用于连接到底层数据源。请注意,此用户提供的配置具有spec.yaml填写中描述的值。为了向 API 发出请求,我们需要指定访问权限。在我们的例子中,这是一个相当简单的检查,因为 API 不需要凭据。编辑./source_solar/source.py

class SolarStream(HttpStream, ABC):
    """
    TODO remove this comment

    This class represents a stream output by the connector.
    This is an abstract base class meant to contain all the common functionality at the API level e.g: the API base URL, pagination strategy,
    parsing responses etc..

    Each stream should extend this class (or another abstract subclass of it) to specify behavior unique to that stream.

    Typically for REST APIs each stream corresponds to a resource in the API. For example if the API
    contains the endpoints
        - GET v1/customers
        - GET v1/employees

    then you should have three classes:
    class SolarStream(HttpStream, ABC) which is the current class
    class Customers(SolarStream) contains behavior to pull data for customers using v1/customers
    class Employees(SolarStream) contains behavior to pull data for employees using v1/employees

    If some streams implement incremental sync, it is typical to create another class
    class IncrementalSolarStream((SolarStream), ABC) then have concrete stream implementations extend it. An example
    is provided below.

    See the reference docs for the full list of configurable options.
    """

    # TODO: Fill in the url base. Required.
    url_base = "https://api.le-systeme-solaire.net/rest/"
    
...
class SourceSolar(AbstractSource):
    def check_connection(self, logger, config) -> Tuple[bool, any]:
        """
        See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
        for an example.

        :param config:  the user-input config object conforming to the connector's spec.yaml
        :param logger:  logger object
        :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
        """
        resp = requests.get(SolarStream.url_base)
        status = resp.status_code
        logger.info(f"Ping response code: {status}")
        if status == 200:
            return True, None
        else:
            message = resp.text        
        return False, message

创建输入文件并执行检查

mkdir sample_files
echo '{"page_size": 10}' > ./sample_files/secrets/config.json
python main.py check --config ./sample_files/secrets/config.json

显示以下内容则说明连接检查成功

{"type": "LOG", "log": {"level": "INFO", "message": "Ping response code: 200"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Check succeeded"}}
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}}

声明数据流

声明一个接口的数据流我们只需要以下几个步骤:

  1. ./source-solar/source.py中添加一个HttpStream类
class SolarStream(HttpStream, ABC):
    """
    TODO remove this comment

    This class represents a stream output by the connector.
    This is an abstract base class meant to contain all the common functionality at the API level e.g: the API base URL, pagination strategy,
    parsing responses etc..

    Each stream should extend this class (or another abstract subclass of it) to specify behavior unique to that stream.

    Typically for REST APIs each stream corresponds to a resource in the API. For example if the API
    contains the endpoints
        - GET v1/customers
        - GET v1/employees

    then you should have three classes:
    class SolarStream(HttpStream, ABC) which is the current class
    class Customers(SolarStream) contains behavior to pull data for customers using v1/customers
    class Employees(SolarStream) contains behavior to pull data for employees using v1/employees

    If some streams implement incremental sync, it is typical to create another class
    class IncrementalSolarStream((SolarStream), ABC) then have concrete stream implementations extend it. An example
    is provided below.

    See the reference docs for the full list of configurable options.
    """

    # TODO: Fill in the url base. Required.
    url_base = "https://api.le-systeme-solaire.net/rest/"
    page = 1

    def __init__(self, config: Mapping[str, Any], **kwargs):
        super().__init__(**kwargs)
        self.page_size = config["page_size"]


    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        """
        TODO: Override this method to define a pagination strategy. If you will not be using pagination, no action is required - just return None.

        This method should return a Mapping (e.g: dict) containing whatever information required to make paginated requests. This dict is passed
        to most other methods in this class to help you form headers, request bodies, query params, etc..

        For example, if the API accepts a 'page' parameter to determine which page of the result to return, and a response from the API contains a
        'page' number, then this method should probably return a dict {'page': response.json()['page'] + 1} to increment the page count by 1.
        The request_params method should then read the input next_page_token and set the 'page' param to next_page_token['page'].

        :param response: the most recent response from the API
        :return If there is another page in the result, a mapping (e.g: dict) containing information needed to query the next page in the response.
                If there are no more pages in the result, return None.
        """
        return None

    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        """
        TODO: Override this method to define any query parameters to be set. Remove this method if you don't need to define request params.
        Usually contains common params e.g. pagination size etc.
        """
        return {}

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        """
        TODO: Override this method to define how a response is parsed.
        :return an iterable containing each record in the response
        """
        yield response.json()


class Bodies(SolarStream):

    primary_key = 'id'

    def path(self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
             next_page_token: Mapping[str, Any] = None) -> str:
        return "bodies"

    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        response_data = response.json()
        if len(response_data["bodies"]) == self.page_size:
            self.page += 1
            return {"page": f"{self.page}, {self.page_size}"}

    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        """
        TODO: Override this method to define any query parameters to be set. Remove this method if you don't need to define request params.
        Usually contains common params e.g. pagination size etc.
        """
        params = {
            "order": "id,desc",
            "page": f"{self.page}, {self.page_size}"
        }
        if next_page_token:
            params.update(next_page_token)
        return params
  1. 实例化这个类
from airbyte_cdk.sources.streams.http.auth import NoAuth

class SourceSolar(AbstractSource):
    def check_connection(self, logger, config) -> Tuple[bool, any]:
        """
        See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
        for an example.

        :param config:  the user-input config object conforming to the connector's spec.yaml
        :param logger:  logger object
        :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
        """
        resp = requests.get(SolarStream.url_base)
        status = resp.status_code
        logger.info(f"Ping response code: {status}")
        if status == 200:
            return True, None
        else:
            message = resp.text
        return False, message

    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        """
        TODO: Replace the streams below with your own streams.

        :param config: A Mapping of the user input configuration as defined in the connector spec.
        """
        # TODO remove the authenticator if not required.
        auth = NoAuth()
        return [Bodies(authenticator=auth, config=config)]
  1. ./source-solar/schemas中新建bodies.json
{
  "type": "object",
  "required": [
    "page_size"
  ],
  "properties": {
    "bodies": {
      "type": "array",
      "items": {
        "type": "object",
        "properties": {
          "id": {
            "type": "string"
          },
          "name": {
            "type": "string"
          },
          "englishName": {
            "type": "string"
          },
          "isPlanet": {
            "type": "boolean"
          },
          "moons": {
            "type": "array",
            "items": {
              "type": "object",
              "properties": {
                "moon": {
                  "type": "string"
                },
                "rel": {
                  "type": "string"
                }
              }
            }
          },
          "semimajorAxis": {
            "type": "number"
          },
          "perihelion": {
            "type": "number"
          },
          "aphelion": {
            "type": "number"
          },
          "eccentricity": {
            "type": "number"
          },
          "inclination": {
            "type": "number"
          },
          "mass": {
            "type": "object",
            "properties": {
              "massValue": {
                "type": "number"
              },
              "massExponent": {
                "type": "integer"
              }
            }
          },
          "vol": {
            "type": "object",
            "properties": {
              "volValue": {
                "type": "number"
              },
              "volExponent": {
                "type": "integer"
              }
            }
          },
          "density": {
            "type": "number"
          },
          "gravity": {
            "type": "number"
          },
          "escape": {
            "type": "number"
          },
          "meanRadius": {
            "type": "number"
          },
          "equaRadius": {
            "type": "number"
          },
          "polarRadius": {
            "type": "number"
          },
          "flattening": {
            "type": "number"
          },
          "dimension": {
            "type": "string"
          },
          "sideralOrbit": {
            "type": "number"
          },
          "sideralRotation": {
            "type": "number"
          },
          "aroundPlanet": {
            "type": "object",
            "properties": {
              "planet": {
                "type": "string"
              },
              "rel": {
                "type": "string"
              }
            }
          },
          "discoveredBy": {
            "type": "string"
          },
          "discoveryDate": {
            "type": "string"
          },
          "alternativeName": {
            "type": "string"
          },
          "axialTilt": {
            "type": "number"
          },
          "avgTemp": {
            "type": "number"
          },
          "mainAnomaly": {
            "type": "number"
          },
          "argPeriapsis": {
            "type": "number"
          },
          "longAscNode": {
            "type": "number"
          },
          "bodyType": {
            "type": "string"
          },
          "rel": {
            "type": "string"
          }
        }
      }
    }
  }
}
  1. 执行检查
python main.py discover --config ./sample_files/secrets/config.json

正确输出为

{"type": "CATALOG", "catalog": {"streams": [{"name": "bodies", "json_schema": {"type": "object", "required": ["page_size"], "properties": {"bodies": {"type": "array", "items": {"type": "object", "properties": {"id": {"type": "string"}, "name": {"type": "string"}, "englishName": {"type": "string"}, "isPlanet": {"type": "boolean"}, "moons": {"type": "array", "items": {"type": "object", "properties": {"moon": {"type": "string"}, "rel": {"type": "string"}}}}, "semimajorAxis": {"type": "number"}, "perihelion": {"type": "number"}, "aphelion": {"type": "number"}, "eccentricity": {"type": "number"}, "inclination": {"type": "number"}, "mass": {"type": "object", "properties": {"massValue": {"type": "number"}, "massExponent": {"type": "integer"}}}, "vol": {"type": "object", "properties": {"volValue": {"type": "number"}, "volExponent": {"type": "integer"}}}, "density": {"type": "number"}, "gravity": {"type": "number"}, "escape": {"type": "number"}, "meanRadius": {"type": "number"}, "equaRadius": {"type": "number"}, "polarRadius": {"type": "number"}, "flattening": {"type": "number"}, "dimension": {"type": "string"}, "sideralOrbit": {"type": "number"}, "sideralRotation": {"type": "number"}, "aroundPlanet": {"type": "object", "properties": {"planet": {"type": "string"}, "rel": {"type": "string"}}}, "discoveredBy": {"type": "string"}, "discoveryDate": {"type": "string"}, "alternativeName": {"type": "string"}, "axialTilt": {"type": "number"}, "avgTemp": {"type": "number"}, "mainAnomaly": {"type": "number"}, "argPeriapsis": {"type": "number"}, "longAscNode": {"type": "number"}, "bodyType": {"type": "string"}, "rel": {"type": "string"}}}}}}, "supported_sync_modes": ["full_refresh"], "source_defined_primary_key": [["id"]]}]}}

读取数据

创建读取配置

vi ./sample_files/configured_catalog.json
{
  "streams": [
    {
      "stream": {
        "name": "bodies",
        "json_schema": {},
        "supported_sync_modes": [
          "full_refresh"
        ],
        "source_defined_primary_key": [
          [
            "id"
          ]
        ]
      },
      "sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"
    }
  ]
}

运行数据读取

python main.py read --config ./sample_files/secrets/config.json --catalog ./sample_files/configured_catalog.json

输出为

{"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourceSolar"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: bodies "}}
{"type": "RECORD", "record": {"stream": "bodies", "data": {"bodies": [{"id": "ymir", "name": "Ymir", "englishName": "Ymir", "isPlanet": false, "moons": null, "semimajorAxis": 23040000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.187, "inclination": 167.9, "mass": {"massValue": 3.97, "massExponent": 15}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 9.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 1315.4, "sideralRotation": 0.0, "aroundPlanet": {"planet": "saturne", "rel": "https://api.le-systeme-solaire.net/rest/bodies/saturne"}, "discoveredBy": "Brett J. Gladman", "discoveryDate": "07/08/2000", "alternativeName": "S/2000 S 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ymir"}, {"id": "weywot", "name": "Weywot", "englishName": "Weywot", "isPlanet": false, "moons": null, "semimajorAxis": 14500, "perihelion": 12470, "aphelion": 16530, "eccentricity": 0.148, "inclination": 14.0, "mass": null, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 170.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 12.438, "sideralRotation": 0.0, "aroundPlanet": {"planet": "quaoar", "rel": "https://api.le-systeme-solaire.net/rest/bodies/quaoar"}, "discoveredBy": "Michael E. Brown, T.A. Suer", "discoveryDate": "22/02/2007", "alternativeName": "S/2006 (50000) 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/weywot"}, {"id": "vesta", "name": "(4) Vesta", "englishName": "4 Vesta", "isPlanet": false, "moons": null, "semimajorAxis": 353343000, "perihelion": 321767000, "aphelion": 384920000, "eccentricity": 0.0893, "inclination": 7.1337, "mass": {"massValue": 2.7, "massExponent": 20}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 265.0, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "560x544x454", "sideralOrbit": 1325.886, "sideralRotation": 0.0, "aroundPlanet": null, "discoveredBy": "H. W. Olbers", "discoveryDate": "29/03/1807", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Asteroid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/vesta"}, {"id": "venus", "name": "V\u00e9nus", "englishName": "Venus", "isPlanet": true, "moons": null, "semimajorAxis": 108208475, "perihelion": 107477000, "aphelion": 108939000, "eccentricity": 0.0067, "inclination": 
3.39, "mass": {"massValue": 4.86747, "massExponent": 24}, "vol": {"volValue": 9.2843, "volExponent": 11}, "density": 5.243, "gravity": 8.87, "escape": 10360.0, "meanRadius": 6051.8, "equaRadiu
s": 6051.8, "polarRadius": 6051.8, "flattening": 0.0, "dimension": "", "sideralOrbit": 224.701, "sideralRotation": -5832.5, "aroundPlanet": null, "discoveredBy": "", "discoveryDate": "", "alte
rnativeName": "", "axialTilt": 177.36, "avgTemp": 737, "mainAnomaly": 50.115, "argPeriapsis": 54.78, "longAscNode": 76.785, "bodyType": "Planet", "rel": "https://api.le-systeme-solaire.net/res
t/bodies/venus"}, {"id": "varuna", "name": "(20000) Varuna", "englishName": "20000 Varuna", "isPlanet": false, "moons": null, "semimajorAxis": 6451398000, "perihelion": 6120810000, "aphelion":
 6781985000, "eccentricity": 0.051, "inclination": 17.158, "mass": {"massValue": 1.55, "massExponent": 20}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 330.0, "eq
uaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 103440.0, "sideralRotation": 0.0, "aroundPlanet": null, "discoveredBy": "Robert S. McMillan Spacewatch",
 "discoveryDate": "28/11/2000", "alternativeName": "2000 WR106", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Asteroid", "rel": "http
s://api.le-systeme-solaire.net/rest/bodies/varuna"}, {"id": "vanth", "name": "Vanth", "englishName": "Vanth", "isPlanet": false, "moons": null, "semimajorAxis": 9000, "perihelion": 0, "aphelio
n": 0, "eccentricity": 0.0009, "inclination": 105.03, "mass": {"massValue": 3.6, "massExponent": 19}, "vol": null, "density": 1.0, "gravity": 0.0, "escape": 0.0, "meanRadius": 434.0, "equaRadi
us": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 9.539, "sideralRotation": 0.0, "aroundPlanet": {"planet": "orcus", "rel": "https://api.le-systeme-solaire.net/
rest/bodies/orcus"}, "discoveredBy": "Michael E. Brown, T.A. Suer", "discoveryDate": "13/11/2005", "alternativeName": "S/2005 (90482) 1", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "arg
Periapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/vanth"}, {"id": "valetudo", "name": "Val\u00e9tudo", "englishName": "Valetudo", 
"isPlanet": false, "moons": null, "semimajorAxis": 18928000, "perihelion": 0, "aphelion": 0, "eccentricity": 0.222, "inclination": 34.0, "mass": null, "vol": null, "density": 1.0, "gravity": 0
.0, "escape": 0.0, "meanRadius": 0.5, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 533.0, "sideralRotation": 0.0, "aroundPlanet": {"planet": "jupi
ter", "rel": "https://api.le-systeme-solaire.net/rest/bodies/jupiter"}, "discoveredBy": "Scott Sheppard", "discoveryDate": "17/07/2018", "alternativeName": "S/2016 J 2", "axialTilt": 0, "avgTe
mp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/valetudo"}, {"id": "uranus", "name": "Uranus", "
englishName": "Uranus", "isPlanet": true, "moons": [{"moon": "Ariel", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ariel"}, {"moon": "Umbriel", "rel": "https://api.le-systeme-solaire
.net/rest/bodies/umbriel"}, {"moon": "Titania", "rel": "https://api.le-systeme-solaire.net/rest/bodies/titania"}, {"moon": "Ob\u00e9ron", "rel": "https://api.le-systeme-solaire.net/rest/bodies
/oberon"}, {"moon": "Miranda", "rel": "https://api.le-systeme-solaire.net/rest/bodies/miranda"}, {"moon": "Cord\u00e9lia", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cordelia"}, {"
moon": "Oph\u00e9lie", "rel": "https://api.le-systeme-solaire.net/rest/bodies/ophelia"}, {"moon": "Bianca", "rel": "https://api.le-systeme-solaire.net/rest/bodies/bianca"}, {"moon": "Cressida"
, "rel": "https://api.le-systeme-solaire.net/rest/bodies/cressida"}, {"moon": "Desd\u00e9mone", "rel": "https://api.le-systeme-solaire.net/rest/bodies/desdemona"}, {"moon": "Juliette", "rel": 
"https://api.le-systeme-solaire.net/rest/bodies/juliet"}, {"moon": "Portia", "rel": "https://api.le-systeme-solaire.net/rest/bodies/portia"}, {"moon": "Rosalinde", "rel": "https://api.le-syste
me-solaire.net/rest/bodies/rosalind"}, {"moon": "Belinda", "rel": "https://api.le-systeme-solaire.net/rest/bodies/belinda"}, {"moon": "Puck", "rel": "https://api.le-systeme-solaire.net/rest/bo
dies/puck"}, {"moon": "Caliban", "rel": "https://api.le-systeme-solaire.net/rest/bodies/caliban"}, {"moon": "Sycorax", "rel": "https://api.le-systeme-solaire.net/rest/bodies/sycorax"}, {"moon"
: "Prospero", "rel": "https://api.le-systeme-solaire.net/rest/bodies/prospero"}, {"moon": "Setebos", "rel": "https://api.le-systeme-solaire.net/rest/bodies/setebos"}, {"moon": "Stephano", "rel
": "https://api.le-systeme-solaire.net/rest/bodies/stephano"}, {"moon": "Trinculo", "rel": "https://api.le-systeme-solaire.net/rest/bodies/trinculo"}, {"moon": "Francisco", "rel": "https://api
.le-systeme-solaire.net/rest/bodies/francisco"}, {"moon": "Margaret", "rel": "https://api.le-systeme-solaire.net/rest/bodies/margaret"}, {"moon": "Ferdinand", "rel": "https://api.le-systeme-so
laire.net/rest/bodies/ferdinand"}, {"moon": "Perdita", "rel": "https://api.le-systeme-solaire.net/rest/bodies/perdita"}, {"moon": "Mab", "rel": "https://api.le-systeme-solaire.net/rest/bodies/
mab"}, {"moon": "Cupid", "rel": "https://api.le-systeme-solaire.net/rest/bodies/cupid"}], "semimajorAxis": 2870658186, "perihelion": 2734998229, "aphelion": 3006318143, "eccentricity": 0.0457,
 "inclination": 0.772, "mass": {"massValue": 8.68127, "massExponent": 25}, "vol": {"volValue": 6.833, "volExponent": 13}, "density": 1.27, "gravity": 8.87, "escape": 21380.0, "meanRadius": 253
62.0, "equaRadius": 25559.0, "polarRadius": 24973.0, "flattening": 0.02293, "dimension": "", "sideralOrbit": 30685.4, "sideralRotation": -17.24, "aroundPlanet": null, "discoveredBy": "William 
Herschel", "discoveryDate": "13/03/1781", "alternativeName": "", "axialTilt": 97.77, "avgTemp": 76, "mainAnomaly": 142.2386, "argPeriapsis": 98.862, "longAscNode": 73.967, "bodyType": "Planet"
, "rel": "https://api.le-systeme-solaire.net/rest/bodies/uranus"}, {"id": "umbriel", "name": "Umbriel", "englishName": "Umbriel", "isPlanet": false, "moons": null, "semimajorAxis": 266000, "pe
rihelion": 265100, "aphelion": 267500, "eccentricity": 0.0039, "inclination": 0.13, "mass": {"massValue": 12.2, "massExponent": 20}, "vol": null, "density": 1.46, "gravity": 0.0, "escape": 0.0
, "meanRadius": 584.7, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 4.14418, "sideralRotation": 99.499, "aroundPlanet": {"planet": "uranus", "rel"
: "https://api.le-systeme-solaire.net/rest/bodies/uranus"}, "discoveredBy": "William Lassell", "discoveryDate": "24/10/1851", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly"
: 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/umbriel"}, {"id": "triton", "name": "Triton", "englishName": "Triton"
, "isPlanet": false, "moons": null, "semimajorAxis": 354760, "perihelion": 354753, "aphelion": 354765, "eccentricity": 2e-05, "inclination": 157.345, "mass": {"massValue": 2.14, "massExponent"
: 22}, "vol": null, "density": 2.05, "gravity": 0.78, "escape": 0.0, "meanRadius": 1353.4, "equaRadius": 0.0, "polarRadius": 0.0, "flattening": 0.0, "dimension": "", "sideralOrbit": 5.87685, "
sideralRotation": 141.0444, "aroundPlanet": {"planet": "neptune", "rel": "https://api.le-systeme-solaire.net/rest/bodies/neptune"}, "discoveredBy": "William Lassell", "discoveryDate": "10/10/1
846", "alternativeName": "", "axialTilt": 0, "avgTemp": 0, "mainAnomaly": 0.0, "argPeriapsis": 0.0, "longAscNode": 0.0, "bodyType": "Moon", "rel": "https://api.le-systeme-solaire.net/rest/bodies/triton"}]}, "emitted_at": 1656152784004}}

...

{"type": "LOG", "log": {"level": "INFO", "message": "Read 29 records from bodies stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing bodies"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceSolar runtimes:\nSyncing stream bodies 0:00:35.475559"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceSolar"}}

处理依赖关系

  1. 创建HttpSubStream类
class SolarSubStream(HttpSubStream, SolarStream, ABC):
    # 跳过无效链接
    raise_on_http_errors = False

    def __init__(self, parent: SolarStream, **kwargs):
        super().__init__(parent=parent, **kwargs)

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        """
        TODO: Override this method to define how a response is parsed.
        :return an iterable containing each record in the response
        """
        yield response.json()
  1. 创建详情接口类
class DetailOfBodies(SolarSubStream):
    primary_key = 'id'

    def path(self, *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None,
             next_page_token: Mapping[str, Any] = None) -> str:
        return f'bodies/{stream_slice["id"]}'

    def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
        for stream_slices in self.parent.stream_slices(sync_mode=SyncMode.full_refresh):
            for record in self.parent.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slices):
                for data in record["bodies"]:
                    yield {"id": data["id"]}

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        if response.status_code == 200:
            yield response.json()
        else:
            self.logger.warn(f"Get data failed: code={response.status_code}, url={response.url}, message={response.text}")
  1. 实例化详情接口
class SourceSolar(AbstractSource):
    def check_connection(self, logger, config) -> Tuple[bool, any]:
        """
        See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
        for an example.

        :param config:  the user-input config object conforming to the connector's spec.yaml
        :param logger:  logger object
        :return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
        """
        resp = requests.get(SolarStream.url_base)
        status = resp.status_code
        logger.info(f"Ping response code: {status}")
        if status == 200:
            return True, None
        else:
            message = resp.text
        return False, message

    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        """
        TODO: Replace the streams below with your own streams.

        :param config: A Mapping of the user input configuration as defined in the connector spec.
        """
        # TODO remove the authenticator if not required.
        auth = NoAuth()
        return [Bodies(authenticator=auth, config=config),
                DetailOfBodies(parent=Bodies(authenticator=auth, config=config), authenticator=auth, config=config)]
  1. 申明响应结构
vi ./source_solar/schemas/detail_of_bodies.json
{
  "type": "object",
  "required": [
    "page_size"
  ],
  "properties": {
    "id": {
      "type": "string"
    },
    "name": {
      "type": "string"
    },
    "englishName": {
      "type": "string"
    },
    "isPlanet": {
      "type": "boolean"
    },
    "moons": {
      "type": "array",
      "items": {
        "type": "object",
        "properties": {
          "moon": {
            "type": "string"
          },
          "rel": {
            "type": "string"
          }
        }
      }
    },
    "semimajorAxis": {
      "type": "number"
    },
    "perihelion": {
      "type": "number"
    },
    "aphelion": {
      "type": "number"
    },
    "eccentricity": {
      "type": "number"
    },
    "inclination": {
      "type": "number"
    },
    "mass": {
      "type": "object",
      "properties": {
        "massValue": {
          "type": "number"
        },
        "massExponent": {
          "type": "integer"
        }
      }
    },
    "vol": {
      "type": "object",
      "properties": {
        "volValue": {
          "type": "number"
        },
        "volExponent": {
          "type": "integer"
        }
      }
    },
    "density": {
      "type": "number"
    },
    "gravity": {
      "type": "number"
    },
    "escape": {
      "type": "number"
    },
    "meanRadius": {
      "type": "number"
    },
    "equaRadius": {
      "type": "number"
    },
    "polarRadius": {
      "type": "number"
    },
    "flattening": {
      "type": "number"
    },
    "dimension": {
      "type": "string"
    },
    "sideralOrbit": {
      "type": "number"
    },
    "sideralRotation": {
      "type": "number"
    },
    "aroundPlanet": {
      "type": "object",
      "properties": {
        "planet": {
          "type": "string"
        },
        "rel": {
          "type": "string"
        }
      }
    },
    "discoveredBy": {
      "type": "string"
    },
    "discoveryDate": {
      "type": "string"
    },
    "alternativeName": {
      "type": "string"
    },
    "axialTilt": {
      "type": "number"
    },
    "avgTemp": {
      "type": "number"
    },
    "mainAnomaly": {
      "type": "number"
    },
    "argPeriapsis": {
      "type": "number"
    },
    "longAscNode": {
      "type": "number"
    },
    "bodyType": {
      "type": "string"
    }
  }
}
  1. 更新同步配置./sample_files/configured_catalog.json
{
  "streams": [
    {
      "stream": {
        "name": "bodies",
        "json_schema": {},
        "supported_sync_modes": [
          "full_refresh"
        ],
        "source_defined_primary_key": [
          [
            "id"
          ]
        ]
      },
      "sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"
    },
    {
      "stream": {
        "name": "detail_of_bodies",
        "json_schema": {},
        "supported_sync_modes": [
          "full_refresh"
        ],
        "source_defined_primary_key": [
          [
            "id"
          ]
        ]
      },
      "sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"
    }
  ]
}
  1. 测试读取数据
python main.py read --config ./sample_files/secrets/config.json --catalog ./sample_files/configured_catalog.json

最后的输出为读取284条记录

{"type": "LOG", "log": {"level": "INFO", "message": "Read 284 records from detail_of_bodies stream"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing detail_of_bodies"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceSolar runtimes:\nSyncing stream bodies 0:00:51.038142\nSyncing stream detail_of_bodies 0:01:41.687393"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing SourceSolar"}}

部署自定义连接器

  1. 制作连接器docker镜像
docker build . -t airbyte/source-solar:dev
docker images

显示

REPOSITORY                     TAG             IMAGE ID       CREATED          SIZE
airbyte/source-solar           dev             b42429012ce0   22 seconds ago   120MB
  1. http://localhost:8000/打开airbyte页面,新建连接器

image-20220626210721477

image-20220626211703757

创建好后可在连接器列表中看到

image-20220626211844654

使用自定义连接器

新建一个连接

image-20220626212028444

image-20220626212107982

image-20220626212142644

执行同步

image-20221203202150388

同步成功

image-20221203202516832

查看数据库

image-20221203202717347

由于我们选择的Normalized tabular data,因此同步的时候会自动将我们json对象全部展开,因此会有多张表

image-20221203202815537

其中solar_bodies对应的就是raw_data,bodies一列即为最原始的响应数据

image-20221203203008198

总结

至此,airbyte的整个入门教程就结束了,可以看见相对其他的ETL工具,airbyte可以对接口文档清晰的服务直接进行ETL过程,配合dbt也可以实现ELT流程,全程也是只需编写python脚本即可,灵活性非常高。虽然airbyte仍处于初生阶段,但未来可期。

参考资料


大家好,我是萬事平安屋屋主子文,一个爱瞎折腾的少年。