Fivetran
Important Capabilities
Capability | Status | Notes |
---|---|---|
Platform Instance | ✅ | Enabled by default |
This plugin extracts fivetran users, connectors, destinations and sync history. This plugin is in beta and has only been tested on Snowflake connector.
Integration Details
This source extracts the following:
- Connectors in fivetran as Data Pipelines and Data Jobs to represent data lineage information between source and destination.
- Connector sources - DataJob input Datasets.
- Connector destination - DataJob output Datasets.
- Connector runs - DataProcessInstances as DataJob runs.
Configuration Notes
- Fivetran supports the fivetran platform connector to dump the log events and connectors, destinations, users and roles metadata in your destination.
- You need to setup and start the initial sync of the fivetran platform connector before using this source. Refer link.
- Once initial sync up of your fivetran platform connector is done, you need to provide the fivetran platform connector's destination platform and its configuration in the recipe.
Concept mapping
Fivetran | Datahub |
---|---|
Connector | DataJob |
Source | Dataset |
Destination | Dataset |
Connector Run | DataProcessInstance |
Source and destination are mapped to Dataset as an Input and Output of Connector.
Snowflake destination Configuration Guide
- If your fivetran platform connector destination is snowflake, you need to provide user details and its role with correct privileges in order to fetch metadata.
- Snowflake system admin can follow this guide to create a fivetran_datahub role, assign it the required privileges, and assign it to a user by executing the following Snowflake commands from a user with the ACCOUNTADMIN role or MANAGE GRANTS privilege.
create or replace role fivetran_datahub;
// Grant access to a warehouse to run queries to view metadata
grant operate, usage on warehouse "<your-warehouse>" to role fivetran_datahub;
// Grant access to view database and schema in which your log and metadata tables exist
grant usage on DATABASE "<fivetran-log-database>" to role fivetran_datahub;
grant usage on SCHEMA "<fivetran-log-database>"."<fivetran-log-schema>" to role fivetran_datahub;
// Grant access to execute select query on schema in which your log and metadata tables exist
grant select on all tables in SCHEMA "<fivetran-log-database>"."<fivetran-log-schema>" to role fivetran_datahub;
// Grant the fivetran_datahub to the snowflake user.
grant role fivetran_datahub to user snowflake_user;
Advanced Configurations
Working with Platform Instances
If you've multiple instances of source/destination systems that are referred in your fivetran
setup, you'd need to configure platform instance for these systems in fivetran
recipe to generate correct lineage edges. Refer the document Working with Platform Instances to understand more about this.
While configuration of platform instance for source system you need to provide connector id as key and for destination system provide destination id as key.
Example - Multiple Postgres Source Connectors each reading from different postgres instance
# Map of connector source to platform instance
sources_to_platform_instance:
postgres_connector_id1:
platform_instance: cloud_postgres_instance
env: PROD
postgres_connector_id2:
platform_instance: local_postgres_instance
env: DEV
Example - Multiple Snowflake Destinations each writing to different snowflake instance
# Map of destination to platform instance
destination_to_platform_instance:
snowflake_destination_id1:
platform_instance: prod_snowflake_instance
env: PROD
snowflake_destination_id2:
platform_instance: dev_snowflake_instance
env: PROD
CLI based Ingestion
Install the Plugin
pip install 'acryl-datahub[fivetran]'
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
type: fivetran
config:
# Fivetran log connector destination server configurations
fivetran_log_config:
destination_platform: snowflake
destination_config:
# Coordinates
account_id: "abc48144"
warehouse: "COMPUTE_WH"
database: "fivetran_log_database"
log_schema: "fivetran_log_schema"
# Credentials
username: "${SNOWFLAKE_USER}"
password: "${SNOWFLAKE_PASS}"
role: "snowflake_role"
connector_patterns:
allow:
- connector_name
# Optional -- This mapping is optional and only required to configure platform-instance for source
# A mapping of Fivetran connector id to data platform instance
sources_to_platform_instance:
calendar_elected:
platform_instance: cloud_postgres_instance
env: DEV
# Optional -- This mapping is optional and only required to configure platform-instance for destination.
# A mapping of Fivetran destination id to data platform instance
destination_to_platform_instance:
calendar_elected:
platform_instance: cloud_postgres_instance
env: DEV
sink:
# sink configs
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
fivetran_log_config ✅ FivetranLogConfig | Fivetran log connector destination server configurations. |
fivetran_log_config.destination_platform string | The destination platform where fivetran connector log tables are dumped. Default: snowflake |
fivetran_log_config.destination_config DestinationConfig | If destination platform is 'snowflake', provide snowflake configuration. |
fivetran_log_config.destination_config.account_id ❓ string | Snowflake account identifier. e.g. xy12345, xy12345.us-east-2.aws, xy12345.us-central1.gcp, xy12345.central-us.azure, xy12345.us-west-2.privatelink. Refer Account Identifiers for more details. |
fivetran_log_config.destination_config.database ❓ string | The fivetran connector log database. |
fivetran_log_config.destination_config.log_schema ❓ string | The fivetran connector log schema. |
fivetran_log_config.destination_config.authentication_type string | The type of authenticator to use when connecting to Snowflake. Supports "DEFAULT_AUTHENTICATOR", "OAUTH_AUTHENTICATOR", "EXTERNAL_BROWSER_AUTHENTICATOR" and "KEY_PAIR_AUTHENTICATOR". Default: DEFAULT_AUTHENTICATOR |
fivetran_log_config.destination_config.connect_args object | Connect args to pass to Snowflake SqlAlchemy driver |
fivetran_log_config.destination_config.options object | Any options specified here will be passed to SQLAlchemy.create_engine as kwargs. |
fivetran_log_config.destination_config.password string(password) | Snowflake password. |
fivetran_log_config.destination_config.private_key string | Private key in a form of '-----BEGIN PRIVATE KEY-----\nprivate-key\n-----END PRIVATE KEY-----\n' if using key pair authentication. Encrypted version of private key will be in a form of '-----BEGIN ENCRYPTED PRIVATE KEY-----\nencrypted-private-key\n-----END ECNCRYPTED PRIVATE KEY-----\n' See: https://docs.snowflake.com/en/user-guide/key-pair-auth.html |
fivetran_log_config.destination_config.private_key_password string(password) | Password for your private key. Required if using key pair authentication with encrypted private key. |
fivetran_log_config.destination_config.private_key_path string | The path to the private key if using key pair authentication. Ignored if private_key is set. See: https://docs.snowflake.com/en/user-guide/key-pair-auth.html |
fivetran_log_config.destination_config.role string | Snowflake role. |
fivetran_log_config.destination_config.scheme string | Default: snowflake |
fivetran_log_config.destination_config.username string | Snowflake username. |
fivetran_log_config.destination_config.warehouse string | Snowflake warehouse. |
fivetran_log_config.destination_config.oauth_config OAuthConfiguration | oauth configuration - https://docs.snowflake.com/en/user-guide/python-connector-example.html#connecting-with-oauth |
fivetran_log_config.destination_config.oauth_config.authority_url ❓ string | Authority url of your identity provider |
fivetran_log_config.destination_config.oauth_config.client_id ❓ string | client id of your registered application |
fivetran_log_config.destination_config.oauth_config.provider ❓ Enum | Identity provider for oauth.Supported providers are microsoft and okta. |
fivetran_log_config.destination_config.oauth_config.client_secret string(password) | client secret of the application if use_certificate = false |
fivetran_log_config.destination_config.oauth_config.encoded_oauth_private_key string | base64 encoded private key content if use_certificate = true |
fivetran_log_config.destination_config.oauth_config.encoded_oauth_public_key string | base64 encoded certificate content if use_certificate = true |
fivetran_log_config.destination_config.oauth_config.scopes array(string) | |
fivetran_log_config.destination_config.oauth_config.use_certificate boolean | Do you want to use certificate and private key to authenticate using oauth Default: False |
platform_instance string | The instance of the platform that all assets produced by this recipe belong to |
env string | The environment that all assets produced by this connector belong to Default: PROD |
connector_patterns AllowDenyPattern | Regex patterns for connectors to filter in ingestion. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
connector_patterns.allow array(string) | |
connector_patterns.deny array(string) | |
connector_patterns.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
destination_to_platform_instance map(str,PlatformDetail) | |
destination_to_platform_instance. key .platform_instancestring | The instance of the platform that all assets produced by this recipe belong to |
destination_to_platform_instance. key .envstring | The environment that all assets produced by DataHub platform ingestion source belong to Default: PROD |
sources_to_platform_instance map(str,PlatformDetail) | |
sources_to_platform_instance. key .platform_instancestring | The instance of the platform that all assets produced by this recipe belong to |
sources_to_platform_instance. key .envstring | The environment that all assets produced by DataHub platform ingestion source belong to Default: PROD |
stateful_ingestion StatefulStaleMetadataRemovalConfig | Airbyte Stateful Ingestion Config. |
stateful_ingestion.enabled boolean | The type of the ingestion state provider registered with datahub. Default: False |
stateful_ingestion.remove_stale_metadata boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. Default: True |
The JSONSchema for this configuration is inlined below.
{
"title": "FivetranSourceConfig",
"description": "Base configuration class for stateful ingestion for source configs to inherit from.",
"type": "object",
"properties": {
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to",
"type": "string"
},
"stateful_ingestion": {
"title": "Stateful Ingestion",
"description": "Airbyte Stateful Ingestion Config.",
"allOf": [
{
"$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
}
]
},
"fivetran_log_config": {
"title": "Fivetran Log Config",
"description": "Fivetran log connector destination server configurations.",
"allOf": [
{
"$ref": "#/definitions/FivetranLogConfig"
}
]
},
"connector_patterns": {
"title": "Connector Patterns",
"description": "Regex patterns for connectors to filter in ingestion.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"sources_to_platform_instance": {
"title": "Sources To Platform Instance",
"description": "A mapping of the connector's all sources dataset to platform instance. Use connector id as key.",
"default": {},
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/PlatformDetail"
}
},
"destination_to_platform_instance": {
"title": "Destination To Platform Instance",
"description": "A mapping of destination dataset to platform instance. Use destination id as key.",
"default": {},
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/PlatformDetail"
}
}
},
"required": [
"fivetran_log_config"
],
"additionalProperties": false,
"definitions": {
"DynamicTypedStateProviderConfig": {
"title": "DynamicTypedStateProviderConfig",
"type": "object",
"properties": {
"type": {
"title": "Type",
"description": "The type of the state provider to use. For DataHub use `datahub`",
"type": "string"
},
"config": {
"title": "Config",
"description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19)."
}
},
"required": [
"type"
],
"additionalProperties": false
},
"StatefulStaleMetadataRemovalConfig": {
"title": "StatefulStaleMetadataRemovalConfig",
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "The type of the ingestion state provider registered with datahub.",
"default": false,
"type": "boolean"
},
"remove_stale_metadata": {
"title": "Remove Stale Metadata",
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"OAuthIdentityProvider": {
"title": "OAuthIdentityProvider",
"description": "An enumeration.",
"enum": [
"microsoft",
"okta"
]
},
"OAuthConfiguration": {
"title": "OAuthConfiguration",
"type": "object",
"properties": {
"provider": {
"description": "Identity provider for oauth.Supported providers are microsoft and okta.",
"allOf": [
{
"$ref": "#/definitions/OAuthIdentityProvider"
}
]
},
"authority_url": {
"title": "Authority Url",
"description": "Authority url of your identity provider",
"type": "string"
},
"client_id": {
"title": "Client Id",
"description": "client id of your registered application",
"type": "string"
},
"scopes": {
"title": "Scopes",
"description": "scopes required to connect to snowflake",
"type": "array",
"items": {
"type": "string"
}
},
"use_certificate": {
"title": "Use Certificate",
"description": "Do you want to use certificate and private key to authenticate using oauth",
"default": false,
"type": "boolean"
},
"client_secret": {
"title": "Client Secret",
"description": "client secret of the application if use_certificate = false",
"type": "string",
"writeOnly": true,
"format": "password"
},
"encoded_oauth_public_key": {
"title": "Encoded Oauth Public Key",
"description": "base64 encoded certificate content if use_certificate = true",
"type": "string"
},
"encoded_oauth_private_key": {
"title": "Encoded Oauth Private Key",
"description": "base64 encoded private key content if use_certificate = true",
"type": "string"
}
},
"required": [
"provider",
"authority_url",
"client_id",
"scopes"
],
"additionalProperties": false
},
"DestinationConfig": {
"title": "DestinationConfig",
"type": "object",
"properties": {
"options": {
"title": "Options",
"description": "Any options specified here will be passed to [SQLAlchemy.create_engine](https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine) as kwargs.",
"type": "object"
},
"scheme": {
"title": "Scheme",
"default": "snowflake",
"type": "string"
},
"username": {
"title": "Username",
"description": "Snowflake username.",
"type": "string"
},
"password": {
"title": "Password",
"description": "Snowflake password.",
"type": "string",
"writeOnly": true,
"format": "password"
},
"private_key": {
"title": "Private Key",
"description": "Private key in a form of '-----BEGIN PRIVATE KEY-----\\nprivate-key\\n-----END PRIVATE KEY-----\\n' if using key pair authentication. Encrypted version of private key will be in a form of '-----BEGIN ENCRYPTED PRIVATE KEY-----\\nencrypted-private-key\\n-----END ECNCRYPTED PRIVATE KEY-----\\n' See: https://docs.snowflake.com/en/user-guide/key-pair-auth.html",
"type": "string"
},
"private_key_path": {
"title": "Private Key Path",
"description": "The path to the private key if using key pair authentication. Ignored if `private_key` is set. See: https://docs.snowflake.com/en/user-guide/key-pair-auth.html",
"type": "string"
},
"private_key_password": {
"title": "Private Key Password",
"description": "Password for your private key. Required if using key pair authentication with encrypted private key.",
"type": "string",
"writeOnly": true,
"format": "password"
},
"oauth_config": {
"title": "Oauth Config",
"description": "oauth configuration - https://docs.snowflake.com/en/user-guide/python-connector-example.html#connecting-with-oauth",
"allOf": [
{
"$ref": "#/definitions/OAuthConfiguration"
}
]
},
"authentication_type": {
"title": "Authentication Type",
"description": "The type of authenticator to use when connecting to Snowflake. Supports \"DEFAULT_AUTHENTICATOR\", \"OAUTH_AUTHENTICATOR\", \"EXTERNAL_BROWSER_AUTHENTICATOR\" and \"KEY_PAIR_AUTHENTICATOR\".",
"default": "DEFAULT_AUTHENTICATOR",
"type": "string"
},
"account_id": {
"title": "Account Id",
"description": "Snowflake account identifier. e.g. xy12345, xy12345.us-east-2.aws, xy12345.us-central1.gcp, xy12345.central-us.azure, xy12345.us-west-2.privatelink. Refer [Account Identifiers](https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#format-2-legacy-account-locator-in-a-region) for more details.",
"type": "string"
},
"warehouse": {
"title": "Warehouse",
"description": "Snowflake warehouse.",
"type": "string"
},
"role": {
"title": "Role",
"description": "Snowflake role.",
"type": "string"
},
"connect_args": {
"title": "Connect Args",
"description": "Connect args to pass to Snowflake SqlAlchemy driver",
"type": "object"
},
"database": {
"title": "Database",
"description": "The fivetran connector log database.",
"type": "string"
},
"log_schema": {
"title": "Log Schema",
"description": "The fivetran connector log schema.",
"type": "string"
}
},
"required": [
"account_id",
"database",
"log_schema"
],
"additionalProperties": false
},
"FivetranLogConfig": {
"title": "FivetranLogConfig",
"type": "object",
"properties": {
"destination_platform": {
"title": "Destination Platform",
"description": "The destination platform where fivetran connector log tables are dumped.",
"default": "snowflake",
"type": "string"
},
"destination_config": {
"title": "Destination Config",
"description": "If destination platform is 'snowflake', provide snowflake configuration.",
"allOf": [
{
"$ref": "#/definitions/DestinationConfig"
}
]
}
},
"additionalProperties": false
},
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"PlatformDetail": {
"title": "PlatformDetail",
"type": "object",
"properties": {
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to",
"type": "string"
},
"env": {
"title": "Env",
"description": "The environment that all assets produced by DataHub platform ingestion source belong to",
"default": "PROD",
"type": "string"
}
},
"additionalProperties": false
}
}
}
Code Coordinates
- Class Name:
datahub.ingestion.source.fivetran.fivetran.FivetranSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for Fivetran, feel free to ping us on our Slack.