Destination
Destination is a location in which dlt
creates and maintains the current version of the schema and loads your data. Destinations come in various forms: databases, datalakes, vector stores or files. dlt
deals with this variety via modules which you declare when creating a pipeline.
We maintain a set of built-in destinations that you can use right away.
Declare the destination type
We recommend that you declare the destination type when creating a pipeline instance with dlt.pipeline
. This allows the run
method to synchronize your local pipeline state with destination and extract
and normalize
to create compatible load packages and schemas. You can also pass the destination to run
and load
methods.
- Use destination shorthand type
import dlt
pipeline = dlt.pipeline("pipeline", destination="filesystem")
Above we want to use filesystem built-in destination. You can use shorthand types only for built-ins.
- Use full destination factory type
import dlt
pipeline = dlt.pipeline("pipeline", destination="dlt.destinations.filesystem")
Above we use built in filesystem destination by providing a factory type filesystem
from module dlt.destinations
. You can pass destinations from external modules as well.
- Import destination factory
import dlt
from dlt.destinations import filesystem
pipeline = dlt.pipeline("pipeline", destination=filesystem)
Above we import destination factory for filesystem and pass it to the pipeline.
All examples above will create the same destination class with default parameters and pull required config and secret values from configuration - they are equivalent.
Pass explicit parameters and a name to a destination
You can instantiate destination factory yourself to configure it explicitly. When doing this you work with destinations the same way you work with sources
import dlt
azure_bucket = filesystem("az://dlt-azure-bucket", destination_name="production_az_bucket")
pipeline = dlt.pipeline("pipeline", destination=azure_bucket)
Above we import and instantiate the filesystem
destination factory. We pass explicit url of the bucket and name the destination to production_az_bucket
.
If destination is not named, its shorthand type (the Python factory name) serves as a destination name. Name your destination explicitly if you need several separate configurations of destinations of the same type (i.e. you wish to maintain credentials for development, staging and production storage buckets in the same config file). Destination name is also stored in the load info and pipeline traces so use them also when you need more descriptive names (other than, for example, filesystem
).
Configure a destination
We recommend to pass the credentials and other required parameters to configuration via TOML files, environment variables or other config providers. This allows you, for example, to easily switch to production destinations after deployment.
We recommend to use the default config section layout as below:
[destination.filesystem]
bucket_url="az://dlt-azure-bucket"
[destination.filesystem.credentials]
azure_storage_account_name="dltdata"
azure_storage_account_key="storage key"
or via environment variables:
DESTINATION__FILESYSTEM__BUCKET_URL=az://dlt-azure-bucket
DESTINATION__FILESYSTEM__CREDENTIALS__AZURE_STORAGE_ACCOUNT_NAME=dltdata
DESTINATION__FILESYSTEM__CREDENTIALS__AZURE_STORAGE_ACCOUNT_KEY="storage key"
For named destinations you use their names in the config section
[destination.production_az_bucket]
bucket_url="az://dlt-azure-bucket"
[destination.production_az_bucket.credentials]
azure_storage_account_name="dltdata"
azure_storage_account_key="storage key"
Note that when you use dlt init
command to create or add a data source, dlt
creates a sample configuration for selected destination.
Pass explicit credentials
You can pass credentials explicitly when creating destination factory instance. This replaces the credentials
argument in dlt.pipeline
and pipeline.load
methods - which is now deprecated. You can pass the required credentials object, its dictionary representation or the supported native form like below:
import dlt
from dlt.destinations import postgres
# pass full credentials - together with the password (not recommended)
pipeline = dlt.pipeline(
"pipeline",
destination=postgres(credentials="postgresql://loader:loader@localhost:5432/dlt_data"),
)
You can create and pass partial credentials and dlt
will fill the missing data. Below we pass postgres connection string but without password and expect that it will be present in environment variables (or any other config provider)
import dlt
from dlt.destinations import postgres
# pass credentials without password
# dlt will retrieve the password from ie. DESTINATION__POSTGRES__CREDENTIALS__PASSWORD
prod_postgres = postgres(credentials="postgresql://loader@localhost:5432/dlt_data")
pipeline = dlt.pipeline("pipeline", destination=prod_postgres)
import dlt
from dlt.destinations import filesystem
from dlt.sources.credentials import AzureCredentials
credentials = AzureCredentials()
# fill only the account name, leave key to be taken from secrets
credentials.azure_storage_account_name = "production_storage"
pipeline = dlt.pipeline(
"pipeline", destination=filesystem("az://dlt-azure-bucket", credentials=credentials)
)
Please read how to use various built in credentials types.
Inspect destination capabilities
Destination capabilities tell dlt
what given destination can and cannot do. For example it tells which file formats it can load, what is maximum query or identifier length. Inspect destination capabilities as follows:
import dlt
pipeline = dlt.pipeline("snowflake_test", destination="snowflake")
print(dict(pipeline.destination.capabilities()))
Pass additional parameters and change destination capabilities
Destination factory accepts additional parameters that will be used to pre-configure it and change destination capabilities.
import dlt
duck_ = dlt.destinations.duckdb(naming_convention="duck_case", recommended_file_size=120000)
print(dict(duck_.capabilities()))
Example above is overriding naming_convention
and recommended_file_size
in the destination capabilities.
Configure multiple destinations in a pipeline
To configure multiple destinations within a pipeline, you need to provide the credentials for each destination in the "secrets.toml" file. This example demonstrates how to configure a BigQuery destination named destination_one
:
[destination.destination_one]
location = "US"
[destination.destination_one.credentials]
project_id = "please set me up!"
private_key = "please set me up!"
client_email = "please set me up!"
You can then use this destination in your pipeline as follows:
import dlt
from dlt.common.destination import Destination
# Configure the pipeline to use the "destination_one" BigQuery destination
pipeline = dlt.pipeline(
pipeline_name='pipeline',
destination=Destination.from_reference(
"bigquery",
destination_name="destination_one"
),
dataset_name='dataset_name'
)
Similarly, you can assign multiple destinations to the same or different drivers.
Access a destination
When loading data, dlt
will access the destination in two cases:
- At the beginning of the
run
method to sync the pipeline state with the destination (or if you callpipeline.sync_destination
explicitly). - In the
pipeline.load
method - to migrate schema and load the load package.
Obviously, dlt will access the destination when you instantiate sql_client.
dlt
will not import the destination dependencies or access destination configuration if access is not needed. You can build multi-stage pipelines where steps are executed in separate processes or containers - the extract
and normalize
step do not need destination dependencies, configuration and actual connection.
import dlt
from dlt.destinations import filesystem
# just declare the destination.
pipeline = dlt.pipeline("pipeline", destination="filesystem")
# no destination credentials not config needed to extract
pipeline.extract(["a", "b", "c"], table_name="letters")
# same to normalize
pipeline.normalize()
# here dependencies dependencies will be imported, secrets pulled and destination accessed
# we pass bucket_url explicitly and expect credentials passed by config provider
load_info = pipeline.load(destination=filesystem(bucket_url=bucket_url))
print(load_info)
Control how dlt
creates table, column and other identifiers
dlt
maps identifiers found in the source data into destination identifiers (ie. table and columns names) using naming conventions which ensure that
character set, identifier length and other properties fit into what given destination can handle. For example our default naming convention (snake case) converts all names in the source (ie. JSON document fields) into snake case, case insensitive identifiers.
Each destination declares its preferred naming convention, support for case sensitive identifiers and case folding function that case insensitive identifiers follow. For example:
- Redshift - by default does not support case sensitive identifiers and converts all of them to lower case.
- Snowflake - supports case sensitive identifiers and considers upper cased identifiers as case insensitive (which is the default case folding)
- DuckDb - does not support case sensitive identifiers but does not case fold them so it preserves the original casing in the information schema.
- Athena - does not support case sensitive identifiers and converts all of them to lower case.
- BigQuery - all identifiers are case sensitive, there's no case insensitive mode available via case folding (but it can be enabled in dataset level).
You can change the naming convention used in many different ways, below we set the preferred naming convention on the Snowflake destination to sql_cs
to switch Snowflake to case sensitive mode:
import dlt
snow_ = dlt.destinations.snowflake(naming_convention="sql_cs_v1")
Setting naming convention will impact all new schemas being created (ie. on first pipeline run) and will re-normalize all existing identifiers.
dlt
prevents re-normalization of identifiers in tables that were already created at the destination. Use refresh mode to drop the data. You can also disable this behavior via configuration
Destinations that support case sensitive identifiers but use case folding convention to enable case insensitive identifiers are configured in case insensitive mode by default. Examples: Postgres, Snowflake, Oracle.
If you use case sensitive naming convention with case insensitive destination, dlt
will:
- Fail the load if it detects identifier collision due to case folding
- Warn if any case folding is applied by the destination.
Enable case sensitive identifiers support
Selected destinations may be configured so they start accepting case sensitive identifiers. For example, it is possible to set case sensitive collation on mssql database and then tell dlt
about it.
from dlt.destinations import mssql
dest_ = mssql(has_case_sensitive_identifiers=True, naming_convention="sql_cs_v1")
Above we can safely use case sensitive naming convention without worrying of name collisions.
You can configure the case sensitivity, but configuring destination capabilities is not currently supported.
[destination.mssql]
has_case_sensitive_identifiers=true
In most cases setting the flag above just indicates to dlt
that you switched the case sensitive option on a destination. dlt
will not do that for you. Refer to destination documentation for details.
Create new destination
You have two ways to implement a new destination:
- You can use
@dlt.destination
decorator and implement a sink function. This is perfect way to implement reverse ETL destinations that push data back to REST APIs. - You can implement a full destination where you have a full control over load jobs and schema migration.