name: data-sync-sql description: 从业务数据库同步数据到数据仓库,生成全量、增量同步SQL语句 license: MIT compatibility: opencode metadata: audience: data engineers, ETL developers workflow: data synchronization
本技能根据业务数据库表结构,生成将数据同步到数据仓库的SQL语句。技能严格遵循以下原则:
sync_last_timesync-sql目录下在使用本技能前,请确保以下信息已明确:
sync-sql目录)glob和read工具读取业务表定义文件updated_at(更新时间)created_at(创建时间)modified_date(修改日期)order_date、transaction_date)status、is_valid、is_active、approval_status等status > 1)LIMIT {page_size} OFFSET {offset}sync_last_timeSELECT COUNT(*) FROM 业务表 WHERE 登账字段 > 1SELECT COUNT(*) FROM 业务表 WHERE 登账字段 > 1 AND 增量字段 > :sync_last_timejson
{
"warehouse_table": "数据仓库表名称",
"business_tables": "业务数据表名称,多个用逗号隔开",
"description": "同步任务描述",
"fields": [
{
"name": "字段名",
"source_name": "源字段名",
"type": "字段类型",
"description": "字段中文描述",
"is_incremental": "是否增量字段",
"is_accounting": "是否登账字段"
}
],
"full_sync_sql": "全量同步SQL代码",
"incremental_sync_sql": "增量同步SQL代码",
"count_sql": {
"full_count": "全量统计SQL",
"incremental_count": "增量统计SQL"
},
"parameters": {
"incremental_param": "sync_last_time",
"page_size_param": "page_size",
"offset_param": "offset"
},
"config": {
"accounting_field": "登账字段名",
"accounting_condition": "登账条件",
"incremental_field": "增量字段名"
}
}
sync-sql目录(如果不存在){业务表名}_sync_{数据仓库表名}.jsonwrite工具将JSON内容写入文件{业务表名}_sync_{数据仓库表名}.sqlmysql --parse进行基本语法检查-- 全量同步:{业务表} -> {数据仓库表}
-- 分页参数::page_size, :offset
INSERT INTO {数据仓库表} ({字段列表})
SELECT
{字段映射列表}
FROM {业务表}
WHERE {登账字段} > 1
ORDER BY {主键字段}
LIMIT :page_size OFFSET :offset;
-- 增量同步:{业务表} -> {数据仓库表}
-- 参数::sync_last_time, :page_size, :offset
INSERT INTO {数据仓库表} ({字段列表})
SELECT
{字段映射列表}
FROM {业务表}
WHERE {登账字段} > 1
AND {增量字段} > :sync_last_time
ORDER BY {增量字段} ASC, {主键字段}
LIMIT :page_size OFFSET :offset;
-- 全量统计
SELECT COUNT(*) as total_count
FROM {业务表}
WHERE {登账字段} > 1;
-- 增量统计
SELECT COUNT(*) as incremental_count
FROM {业务表}
WHERE {登账字段} > 1
AND {增量字段} > :sync_last_time;
如果同步需要多表关联,技能支持以下模式:
-- 多表关联同步
INSERT INTO dw_order_details ({字段列表})
SELECT
o.order_id,
o.order_date,
c.customer_name,
p.product_name,
o.quantity,
o.amount
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN products p ON o.product_id = p.product_id
WHERE o.status > 1
AND o.updated_at > :sync_last_time
ORDER BY o.updated_at ASC
LIMIT :page_size OFFSET :offset;
sync_last_time - 上次同步时间点page_size - 每页记录数(建议值:1000)offset - 分页偏移量> 1(可根据需求调整){
"warehouse_table": "dw_user_info",
"business_tables": "user_info, user_profile",
"description": "用户信息同步到数据仓库",
"fields": [
{
"name": "user_id",
"source_name": "id",
"type": "BIGINT",
"description": "用户ID",
"is_incremental": false,
"is_accounting": false
},
{
"name": "user_name",
"source_name": "name",
"type": "VARCHAR(100)",
"description": "用户姓名",
"is_incremental": false,
"is_accounting": false
},
{
"name": "status",
"source_name": "status",
"type": "TINYINT",
"description": "用户状态",
"is_incremental": false,
"is_accounting": true
},
{
"name": "updated_at",
"source_name": "update_time",
"type": "DATETIME",
"description": "更新时间",
"is_incremental": true,
"is_accounting": false
}
],
"full_sync_sql": "INSERT INTO dw_user_info (user_id, user_name, status, updated_at)\nSELECT \n id as user_id,\n name as user_name,\n status,\n update_time as updated_at\nFROM user_info\nWHERE status > 1\nORDER BY id\nLIMIT :page_size OFFSET :offset;",
"incremental_sync_sql": "INSERT INTO dw_user_info (user_id, user_name, status, updated_at)\nSELECT \n id as user_id,\n name as user_name,\n status,\n update_time as updated_at\nFROM user_info\nWHERE status > 1\n AND update_time > :sync_last_time\nORDER BY update_time ASC, id\nLIMIT :page_size OFFSET :offset;",
"count_sql": {
"full_count": "SELECT COUNT(*) as total_count FROM user_info WHERE status > 1;",
"incremental_count": "SELECT COUNT(*) as incremental_count FROM user_info WHERE status > 1 AND update_time > :sync_last_time;"
},
"parameters": {
"incremental_param": "sync_last_time",
"page_size_param": "page_size",
"offset_param": "offset"
},
"config": {
"accounting_field": "status",
"accounting_condition": "> 1",
"incremental_field": "update_time"
}
}
sync_last_time 作为增量参数名称{登账字段} > 1 条件glob:查找业务表定义文件read:读取文件内容grep:搜索字段模式和注释write:保存生成的JSON和SQL文件bash:测试SQL语法question:询问用户字段信息和确认