copy Task¶
About¶
The copy task copies tables from one database to another. It can be used to automatically
ingest data from operational databases (e.g. PostgreSQL) to your analytics warehouse.
Attention
Copy tasks can only be defined in YAML groups in the tasks folder, not directly in project.yaml.
Defining copy Tasks¶
A copy task is defined as follows:
tasks/base.yaml
task_copy:
type: copy
source:
db: from_db
schema: from_schema
table: from_table
destination:
tmp_schema: staging_schema
schema: schema
table: table_name
copy tasks have the following parameters that need to be set:
type:copy.source: the source detailsdb: the source database.schema: the (optional) source schema.table: the name of the table top copy.
destination: the destination details.tmp_schema: the (optional) staging schema used in the process of copying data.schema: the (optional) destination schema.table: the name of the table to store data into.db: the (optional) destination database.
Info
By default the destination is the database defined by default_db in project.yaml. db can be specified to change this, in which case the connection specified needs to:
- Be a credential from the
required_credentialslist inproject.yaml. - Be defined in your
settings.yaml. - Be one of the supported databases.
The tables specified in destination and source will be affected by prefixes, suffixes and overrides as
described in database objects, meaning it only affects tables in the default_db
(typically the destination in extraction tasks and the source in reverse ETL tasks).
By default, tables will be copied in full every time SAYN runs replacing the table with the newly pulled data. This behaviour can be altered with the following:
incremental_key: the column to use to determine what data is new. The process will transfer any data in the source table with anincremental_keyvalue superior or equal to the maximum found in the destination, or with aNULLvalue.delete_key: the column which will be used for deleting data in incremental loads. The process will delete any data in the destination table with adelete_keyvalue found in the new dataset obtained before inserting.append: a boolean flag indicating if data should be replaced in the destination. This means that in full load mode (incremental_keynot specified) records will be appended rather than the table being recreated every time; and in incremental mode records will not be removed, sodelete_keyshouldn't be specified. Additionally an extra column_sayn_load_tswill be added to the destination table to help with de-duplication.
tasks/base.yaml
task_copy:
type: copy
source:
db: from_db
schema: from_schema
table: from_table
destination:
tmp_schema: staging_schema
schema: schema
table: table_name
incremental_key: updated_at
delete_key: id
In this example, we use updated_at which is a field updated every time a record changes (or is created)
on a hypothetical backend database to select new records, and then we replace all records in the target
based on the ids found in this new dataset.
tasks/base.yaml
task_copy:
type: copy
source:
db: from_db
schema: from_schema
table: from_table
destination:
tmp_schema: staging_schema
schema: schema
table: table_name
incremental_key: updated_at
append: True
In this other example, whenever the task runs it checks the latest value of updated_at and appends to the
destination table every record in the source with an updated_at greater than or equal to the maximum value
present in the destination.
While the task is running, SAYN will get records from the source database and load into a temporary table,
and will merge into the destination table once all records have been loaded. The frequency of loading
into this table is determined by the value of max_batch_rows as defined in the credentials for the
destination database, which defaults to 50000. However this behaviour can be changed with 2 properties:
max_batch_rows: this allows you to overwrite the value specified in the credential for this task only.max_merge_rows: this value changes the behaviour so that instead of merging into the destination table once all rows have been loaded, instead SAYN will merge after this number of records have been loaded and then it will repeat the whole process. The advantage of using this parameter is that for copies that take a long time, an error (ie: loosing the connection with the source) wouldn't result in the process having to be started again from the beginning.
Warning
When using max_merge_rows SAYN will loop through the merge load and merge process until the number
of records loaded is lower than the value of max_merge_rows. In order to avoid infinite loops, the
process will also stop after a maximum of 100 iteration. To avoid issues, it should be set to a very
large value (larger than max_batch_rows).
Data types and columns¶
copy tasks accept a columns field in the task definition in the same way that autosql does. With this
specification, we can override the default behaviour of copy when it comes to column types by enforcing
specific column types in the final table:
tasks/base.yaml
task_copy:
type: copy
source:
db: from_db
schema: from_schema
table: from_table
destination:
tmp_schema: staging_schema
schema: schema
table: table_name
incremental_key: updated_at
delete_key: id
columns:
- id
- name: updated_at
type: timestamp
In this example we define 2 columns for task_copy: id and updated_at. This will make SAYN:
1. Copy only those 2 columns, disregarding any other columns present at source
2. Infer the type of id based on the type of that column at source
3. Enforce the destination table type for updated_at to be TIMESTAMP
An additional property dst_name in columns is also supported. Specifying this property will
change the name of the column in the destination table. When using this property, delete_key
and incremental_key need to reference this new name.
tasks/base.yaml
task_copy:
type: copy
source:
db: from_db
schema: from_schema
table: from_table
destination:
tmp_schema: staging_schema
schema: schema
table: table_name
incremental_key: updated_ts
delete_key: id
columns:
- id
- name: updated_at
dst_name: updated_ts
In this example, the updated_at column at source will be called updated_ts on the target.
Note the name in incremental_key uses the name on the target.
Additionally, in the ddl property we can specify indexes and permissions like in autosql.
Note that some databases support specific DDL other than these.