Data Management: RAI Data Streams
This guide explains what RAI data streams are and shows how to manage them. It also covers how to create, list, monitor, and delete them.
Introduction
What is a RAI Data Stream?
A RAI data stream synchronizes your data from a Snowflake database to a RAI database. It uses change data capture (opens in a new tab) (CDC) to keep track of data changes and synchronize them with RAI.
You can think of a RAI data stream as a materialized view that connects your Snowflake data with RAI.
A RAI data stream originates from a Snowflake object, which can be a table (opens in a new tab) or a view (opens in a new tab). Only one RAI database link can originate from a given Snowflake object.
A RAI data stream is identified by the RAI database link and the fully qualified name of its source Snowflake object and has the form <dblink_database>.<dblink_schema>-<obj_database>.<obj_schema>.<obj_name>
.
The target data object of the RAI data stream is a base relation located in a RAI database.
A RAI data stream is associated with and managed by a RAI database link.
Behind the Scenes
A RAI data stream consists of a Snowflake stream (opens in a new tab) and a Snowflake task (opens in a new tab).
A stream object captures data manipulation language changes made to tables, such as inserts, updates, and deletes, along with associated metadata. This process is referred to as change data capture (CDC), and it enables actions to be taken on the modified data.
A table stream (or a “stream”) creates a “change table” that outlines the row-level changes between two transactional points of time in a table. See Change Tracking Using Table Streams (opens in a new tab) for more details.
Meanwhile, the Snowflake task associated with the RAI data stream handles the data synchronization. Tasks are combined with streams for continuous Extract, Load, Transform (ELT) workflows to process recently changed table rows.
The synchronization between RAI and Snowflake occurs at regular intervals — typically once every minute.
Create a RAI Data Stream
You can create a RAI data stream between RAI and Snowflake using the create_data_stream
procedure.
The create_data_stream
procedure takes as input a Snowflake data source and creates a stream between Snowflake and RAI.
Here is an example that creates a data stream between the sf_my_edges
Snowflake table and the rai_my_edges
RAI relation in the my_rai_db
RAI database:
CREATE OR REPLACE TABLE my_sf_edges(x INT, y INT)
AS SELECT * FROM VALUES
(11, 12), (12, 13), (13, 13), (12, 43), (13, 50);
CALL RAI.create_data_stream('my_sf_db.my_sf_schema.my_sf_edges', 'my_rai_db', 'my_rai_edges');
/*+------------------------------------------------+
| { "account": "***", ..., "state": "CREATED" } |
+------------------------------------------------+ */
The name of the RAI data object — a RAI relation — is case-sensitive, unlike the Snowflake objects.
Note that you can also use a version of create_data_stream
where you don’t need to specify the RAI relation rai_my_edges
.
In this case, the RAI relation name is identical to the Snowflake table:
-- equivalent to CALL RAI.create_data_stream('my_sf_db.my_sf_schema.my_sf_edges', 'my_rai_db', 'my_sf_edges');
CALL RAI.create_data_stream('my_sf_db.my_sf_schema.my_sf_edges', 'my_rai_db');
/*+------------------------------------------------+
| { "account": "***", ..., "state": "CREATED" } |
+------------------------------------------------+ */
Similarly, you can also omit the RAI database my_rai_db
but, in this case, the database is assumed to have been previously set using use_rai_database
:
-- equivalent to CALL RAI.create_data_stream('my_sf_db.my_sf_schema.my_sf_edges', 'my_rai_db', 'my_sf_edges');
CALL RAI.use_database('my_rai_db');
CALL RAI.create_data_stream('my_sf_db.my_sf_schema.my_sf_edges');
/*+------------------------------------------------+
| { "account": "***", ..., "state": "CREATED" } |
+------------------------------------------------+ */
If change tracking has not been enabled on the source SQL object by using ALTER TABLE … SET CHANGE_TRACKING = TRUE
, then only the object owner (i.e., the role that has the OWNERSHIP
privilege on the object) can create a stream on the object.
Creating a data stream automatically enables change tracking on the stream’s source table or view.
In addition to tables, you can also create data streams on views.
Here is an example that creates a view my_sf_view
on the my_sf_edges
table and then creates a new data stream on the view:
CREATE OR REPLACE TABLE my_sf_edges(x INT, y INT)
AS SELECT * FROM VALUES
(11, 12), (12, 13), (13, 13), (12, 43), (13, 50);
-- Create a view of rows with x > 12.
CREATE OR REPLACE VIEW my_sf_view
AS SELECT x, y FROM my_sf_edges
WHERE x > 12;
CALL RAI.create_data_stream('my_sf_db.my_sf_schema.my_sf_view', 'my_rai_db', 'my_rai_view');
/*+------------------------------------------------+
| { "account": "***", ..., "state": "CREATED" } |
+------------------------------------------------+ */
Once created, the output in Snowflake returns a JSON object, similar to the following:
{
"account": "************",
"createdBy": "************",
"createdOn": "2023-06-26T11:24:09.710Z",
"dbLink": "my_sf_db.my_sf_schema",
"id": "************",
"integration": "my_integration",
"name": "datasource",
"rai": {
"database": "my_rai_db",
"relation": "my_rai_view"
},
"snowflake": {
"database": "my_sf_db",
"object": "my_sf_db.my_sf_schema.datasource",
"schema": "my_sf_schema"
},
"state": "CREATED"
}
A data stream is identified by the fully qualified name (opens in a new tab)
of its source table or view in the form <database>.<schema>.<object>
.
Each SQL object can have only one data stream, i.e., creating multiple data streams that point to the same SQL object will result in an error.
Monitor a Data Stream
View All Available Data Streams
You can list all available data streams using the list_data_streams
function:
SELECT RAI.list_data_streams();
This function returns a JSON array describing the available data streams along with their status:
[
{
"account": "************",
"createdBy": "************",
"createdOn": "2023-06-26T14:27:19.514Z",
"dbLink": "my_sf_db.my_sf_schema",
"id": "************",
"integration": "my_integration",
"name": "my_sf_db.my_sf_schema.my_sf_edges",
"rai": {
"database": "my_rai_db",
"relation": "my_rai_edges"
},
"snowflake": {
"database": "my_sf_db",
"object": "my_sf_db.my_sf_schema.my_sf_edges",
"schema": "my_sf_schema"
},
"state": "CREATED"
},
{
"account": "************",
"createdBy": "************",
"createdOn": "2023-06-26T16:22:11.312Z",
"dbLink": "my_sf_db.my_sf_chema",
"id": "************",
"integration": "my_integration",
"name": "my_sf_db.my_sf_schema.my_sf_view",
"rai": {
"database": "my_rai_db",
"relation": "my_rai_view"
},
"snowflake": {
"database": "my_sf_db",
"object": "my_sf_db.my)sf_schema.my_sf_view",
"schema": "my_sf_schema"
},
"state": "CREATED"
}
]
Get Data Stream Information
You can get information about a specific data stream by calling the get_data_stream
function:
SELECT RAI.get_data_stream('my_sf_db.my_sf_schema.my_sf_edges');
In this case, you need to retrieve the part of the previous JSON array that refers to the specific fully qualified (opens in a new tab) data stream name passed as a parameter.
Get Data Stream Status
You can also check the status of a data stream by calling the get_data_stream_status
function
and specifying a fully qualified name (opens in a new tab).
Here is an example that checks the status of the previously created my_sf_db.my_sf_schema.my_sf_edges
data stream:
SELECT RAI.get_data_stream_status('my_sf_db.my_sf_schema.my_sf_edges');
The get_data_stream_status
function returns a JSON object with information about the specific data stream:
{
"account": "************",
"id": "************",,
"name": "my_integration.my_sf_db.my_sf_schema.my_sf_edges",
"raiLoadEnd": 2686925039110,
"snowflakeUnloadStart": 2686925028210
}
When a data stream is created, a task (opens in a new tab) is simultaneously initiated in the background to handle the data synchronization. The synchronization occurs periodically, typically once every minute.
In certain cases, right after creating a data stream and checking its status using get_data_stream_status
, you may receive a return value of NULL
.
This usually indicates that the stream is not ready yet, i.e., the data have not been fully synchronized.
In such cases, if you use a procedure such as create_graph
you will receive an error.
It’s best to wait for get_data_stream_status
to return a non-null value before proceeding with another task like creating a graph.
View Scheduled Tasks
You can check currently scheduled tasks and their respective schedules by querying the information_schema.task_history
table.
The query below shows that there are currently two scheduled tasks, one for the my_sf_edges
table and one for the my_sf_view
view:
SELECT name, condition_text, scheduled_time FROM
TABLE(information_schema.task_history())
WHERE SCHEMA_NAME = 'RAI'
AND database_name = 'USER_EDUCATION'
AND state != 'SKIPPED'
ORDER BY SCHEDULED_TIME DESC
;
/*+---------------------------------------------------------------------------------------------------------------------------------------------------+
| NAME | CONDITION_TEXT | SCHEDULED_TIME |
+---------------------------------------------------------------------------------------------------------------------------------------------------+
| MY_SF_DB14_MY_SF_SCHEMA3_MY_SF_VIEW_TASK | SYSTEM$STREAM_HAS_DATA('MY_SF_DB14_MY_SF_SCHEMA3_MY_SF_VIEW_STREAM') | 2023-06-26 15:50:30.729 -0700 |
| MY_SF_DB14_MY_SF_SCHEMA3_MY_SF_EDGES_TASK | SYSTEM$STREAM_HAS_DATA('MY_SF_DB14_MY_SF_SCHEMA3_MY_SF_EDGES_STREAM') | 2023-06-26 14:20:25.338 -0700 |
+---------------------------------------------------------------------------------------------------------------------------------------------------+ */
Delete a Data Stream
You can delete a data stream by calling the delete_data_stream
procedure and specifying the fully qualified name (opens in a new tab) name of the data stream in the form <database>.<schema>.<object>
.
Here is an example that deletes the sf_db.sf_schema.sf_my_view
data stream that was previously created:
-- Delete the my_sf_db.my_sf_schema.my_sf_view data stream.
CALL RAI.delete_data_stream('my_sf_db.my_sf_schema.my_sf_view');
/*+------+
| "ok" |
+------+ */
If you attempt to delete a data stream that does not exist, the return value in Snowflake is null
.
You can recreate a previously deleted data stream immediately after executing delete_data_stream
.
When you delete a data stream, both the Snowflake SQL object and the RAI relation persist, i.e., they are not deleted. Additionally, it’s advisable to delete any graphs in RAI that have been created from the data stream before deleting the data stream.
Using the RAI CLI
You can also manage data streams through the RAI CLI (opens in a new tab). You can read more on how to install and set up RAI CLI in the Install the RAI CLI (opens in a new tab) section of the Quickstart: Admin Guide (opens in a new tab).
Create a Data Stream
You can create a data stream using the following command:
rai create-snowflake-data-stream \
<integration> \
<database-link> \
<objectName> \
--username <username> \
--password <password> \
--rai-database <rai_database> \
--rai-relation <rai_relation> \
--role <role> \
--warehouse <warehouse>
The table below explains the options for the rai create-snowflake-data-stream
command:
Option | Value | Description |
---|---|---|
RAI integration name | <integration> | RAI integration name. It can only contain alphanumeric characters. |
Database link | <database-link> | The fully qualified database link to use. |
Object name | <objectName> | The fully qualified name of the object (i.e., table or view) that will be monitored by the data stream. |
Username | <username> | Your Snowflake username. |
Password | <password> | Your Snowflake password. |
RAI database | <rai_database> | The target RAI database for the data stream. |
RAI relation | <rai_relation> | The target RAI relation for the data stream. |
Role | <role> | The role to use that has the required privileges to create the data steam. |
Warehouse | <warehouse> | The Snowflake warehouse to use to create the data stream. |
Example:
rai create-snowflake-data-stream \
my_integration \
my_sf_db.my_sf_schema \
my_sf_db.my_sf_schema.my_sf_edges \
--rai-database my_rai_db \
--rai-relation my_rai_edges \
--role my_role \
--warehouse my_warehouse
If successful, you will receive a response that shows the data stream information in JSON format:
Create Snowflake data stream 'my_sf_db.my_sf_schema.my_sf_edges' (my_integration) ..
Ok (4.5s)
{
"account": "******",
"id": "******",
"name": "my_sf_db.my_sf_schema.my_sf_edges",
"integration": "my_integration",
"dbLink": "my_sf_db.my_sf_schema",
"createdBy": "******@****",
"createdOn": "2023-06-26T19:48:55.650Z",
"state": "CREATED",
"snowflake": {
"database": "my_sf_db",
"schema": "my_sf_schema",
"object": "my_sf_db.my_sf_schema.my_sf_edges"
},
"rai": {
"database": "my_rai_db",
"relation": "my_rai_edges"
}
}
Monitor a Data Stream
You can list all available data streams in a given integration and database link by using the following command:
rai list-snowflake-data-streams \
<integration> \
<database-link>
Here are the options for the list-snowflake-data-streams
command:
Option | Value | Description |
---|---|---|
RAI integration name | <integration> | RAI integration name. It can only contain alphanumeric characters. |
Database link | <database-link> | The fully qualified database link to use. |
Example:
rai list-snowflake-data-streams \
my_integration \
my_sf_db.my_sf_schema
If successful, you will receive a response that shows the data stream information in an array in JSON format:
List Snowflake dataStreams linked to my_sf_db.my_sf_schema (my_integration) ..
Ok (2.3s)
[
{
"account": "******",
"id": "******",
"name": "my_sf_db.my_sf_schema.my_sf_edges",
"integration": "my_integration",
"dbLink": "my_sf_db.my_sf_schema",
"createdBy": "******@****",
"createdOn": "2023-06-26T19:48:55.650Z",
"state": "CREATED",
"snowflake": {
"database": "my_sf_db",
"schema": "my_sf_schema",
"object": "my_sf_db.my_sf_schema.my_sf_edges"
},
"rai": {
"database": "my_rai_db",
"relation": "my_rai_edges"
}
}
]
Similarly, you can get information for a specific data stream using the following command:
rai get-snowflake-data-stream \
<integration> \
<database-link> \
<objectName>
And, finally, you can query the status of a specific data stream using the following command:
rai get-snowflake-data-stream-status \
<integration> \
<database-link> \
<objectName>
Below are the options for the get-snowflake-data-stream
and get-snowflake-data-stream-status
commands:
Option | Value | Description |
---|---|---|
RAI integration name | <integration> | RAI integration name. It can only contain alphanumeric characters. |
Database link | <database-link> | The fully qualified database link to use. |
Object name | <objectName> | The fully qualified name of the data stream. |
Example:
rai get-snowflake-data-stream-status \
my_integration \
my_sf_db.my_sf_schema \
my_sf_db.my_sf_schema.my_sf_edges
If successful, you will receive a response that shows the data stream status:
Get Snowflake data stream status my_sf_db.my_sf_schema.my_sf_edges (my_integration) ..
{
"account": "************",
"id": "************",
"name": "my_integration.my_sf_db.my_sf_schema.my_sf_edges",
"raiLoadEnd": 2687825039110,
"snowflakeUnloadStart": 2687825028210
}
Delete a Data Stream
You can delete a data stream using the following command:
rai delete-snowflake-data-stream \
<integration> \
<database-link> \
<objectName> \
--username <username> \
--password <password> \
--role <role> \
--warehouse <warehouse>
Below are the options for the rai delete-snowflake-data-stream
command:
Option | Value | Description |
---|---|---|
RAI integration name | <integration> | RAI integration name. It can only contain alphanumeric characters. |
Database link | <database-link> | The fully qualified database link to use. |
Object name | <objectName> | The fully qualified name of the data stream that will be deleted. |
Username | <username> | Your Snowflake username. |
Password | <password> | Your Snowflake password. |
Role | <role> | The role to use that has the required privileges to delete the data steam. |
Warehouse | <warehouse> | The Snowflake warehouse to use to delete the data stream. |
Example:
rai delete-snowflake-data-stream \
my_integration \
my_sf_db.my_sf_schema \
my_sf_db.my_sf_schema.my_sf_edges \
--role my_role \
--warehouse my_warehouse
If successful, you will receive a response that shows that the data stream was succesfully deleted:
Delete Snowflake data stream 'my_sf_db.my_sf_schema.my_sf_edges' (my_integration) ..
Ok (1.5s)