package main import ( "database/sql" "fmt" "testing" _ "github.com/go-sql-driver/mysql" "github.com/google/uuid" ) const ( // 使用 svc-configure.yaml 中的配置 mysqlHost = "69.235.172.218:3306" mysqlUser = "x6_archives" mysqlPassword = "mosdev" mysqlDatabase = "x6_archives" ) // 创建测试表结构 func createTestTables(db *sql.DB) error { // 创建 dic_table 测试表 _, err := db.Exec(` CREATE TABLE IF NOT EXISTS test_dic_table ( table_id VARCHAR(64) NOT NULL PRIMARY KEY, table_type VARCHAR(20) NOT NULL, table_name VARCHAR(100) NOT NULL, description VARCHAR(500) NOT NULL DEFAULT '', creator VARCHAR(32) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, deleted_at TIMESTAMP NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 `) if err != nil { return fmt.Errorf("创建test_dic_table失败: %v", err) } // 创建 dic_table_field 测试表 _, err = db.Exec(` CREATE TABLE IF NOT EXISTS test_dic_table_field ( id VARCHAR(128) NOT NULL PRIMARY KEY, field_id VARCHAR(128) NOT NULL, table_id VARCHAR(64) NOT NULL, filed_type VARCHAR(20) NOT NULL, data_type VARCHAR(20) NOT NULL, field_name VARCHAR(64) NOT NULL, field_name_cn VARCHAR(64) NOT NULL DEFAULT '', description VARCHAR(500) NOT NULL DEFAULT '', creator VARCHAR(32) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, deleted_at TIMESTAMP NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 `) if err != nil { return fmt.Errorf("创建test_dic_table_field失败: %v", err) } return nil } // 清理测试数据 func cleanupTestTables(db *sql.DB) { db.Exec("DELETE FROM test_dic_table_field") db.Exec("DELETE FROM test_dic_table") } // TestTempTableUpsert 测试临时表更新插入操作 func TestTempTableUpsert(t *testing.T) { // 连接数据库 dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?parseTime=true&multiStatements=true", mysqlUser, mysqlPassword, mysqlHost, mysqlDatabase) db, err := sql.Open("mysql", dsn) if err != nil { t.Fatalf("连接数据库失败: %v", err) } defer db.Close() // 测试连接 if err := db.Ping(); err != nil { t.Fatalf("数据库连接失败: %v", err) } // 创建测试表 if err := createTestTables(db); err != nil { t.Fatalf("创建测试表失败: %v", err) } defer cleanupTestTables(db) // 插入一些基础数据到 test_dic_table_field _, err = db.Exec(` INSERT INTO test_dic_table_field (id, field_id, table_id, filed_type, data_type, field_name, field_name_cn, description, creator) VALUES ('table1.id', 'table1.id', 'table1', '实际字段', '数值型', 'id', '主键ID', '原始字段', 'test'), ('table1.name', 'table1.name', 'table1', '实际字段', '字符型', 'name', '名称', '原始名称字段', 'test'), ('table2.code', 'table2.code', 'table2', '实际字段', '字符型', 'code', '编码', '原始编码字段', 'test') `) if err != nil { t.Fatalf("插入基础数据失败: %v", err) } // 开始事务测试 tx, err := db.Begin() if err != nil { t.Fatalf("开始事务失败: %v", err) } defer func() { if err != nil { tx.Rollback() } }() // 创建临时表 tempTableName := "temp_dic_table_field_batch_" + uuid.New().String()[:8] createSQL := fmt.Sprintf(` CREATE TEMPORARY TABLE %s ( id VARCHAR(128) NOT NULL PRIMARY KEY, field_id VARCHAR(128) NOT NULL, table_id VARCHAR(64) NOT NULL, filed_type VARCHAR(20) NOT NULL, data_type VARCHAR(20) NOT NULL, field_name VARCHAR(64) NOT NULL, field_name_cn VARCHAR(64) NOT NULL DEFAULT '', description VARCHAR(500) NOT NULL DEFAULT '', creator VARCHAR(32) NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 `, tempTableName) _, err = tx.Exec(createSQL) if err != nil { t.Fatalf("创建临时表失败: %v", err) } // 插入测试数据到临时表 insertSQL := fmt.Sprintf(` INSERT INTO %s (id, field_id, table_id, filed_type, data_type, field_name, field_name_cn, description, creator) VALUES ('table1.id', 'table1.id', 'table1', '实际字段', '数值型', 'id', '更新后的主键ID', '更新描述', 'test'), ('table1.status', 'table1.status', 'table1', '实际字段', '字符型', 'status', '状态', '新增状态字段', 'test') `, tempTableName) _, err = tx.Exec(insertSQL) if err != nil { t.Fatalf("插入临时表数据失败: %v", err) } // 执行UPDATE:存在则更新 t.Run("UPDATE: 存在更新", func(t *testing.T) { query := fmt.Sprintf(` UPDATE test_dic_table_field dtf INNER JOIN %s tt ON dtf.id = tt.id SET dtf.field_name_cn = tt.field_name_cn, dtf.description = tt.description, dtf.updated_at = CURRENT_TIMESTAMP WHERE dtf.deleted_at IS NULL `, tempTableName) result, err := tx.Exec(query) if err != nil { t.Fatalf("UPDATE执行失败: %v", err) } rowsAffected, err := result.RowsAffected() if err != nil { t.Fatalf("获取UPDATE影响行数失败: %v", err) } t.Logf("UPDATE成功,影响行数: %d", rowsAffected) if rowsAffected != 1 { t.Errorf("UPDATE影响行数错误: 期望1, 实际%d", rowsAffected) } }) // 执行INSERT:不存在则插入 t.Run("INSERT: 不存在插入", func(t *testing.T) { query := fmt.Sprintf(` INSERT INTO test_dic_table_field (id, field_id, table_id, filed_type, data_type, field_name, field_name_cn, description, creator) SELECT tt.id, tt.field_id, tt.table_id, tt.filed_type, tt.data_type, tt.field_name, tt.field_name_cn, tt.description, tt.creator FROM %s tt LEFT JOIN test_dic_table_field dtf ON tt.id = dtf.id AND dtf.deleted_at IS NULL WHERE dtf.id IS NULL `, tempTableName) result, err := tx.Exec(query) if err != nil { t.Fatalf("INSERT执行失败: %v", err) } rowsAffected, err := result.RowsAffected() if err != nil { t.Fatalf("获取INSERT影响行数失败: %v", err) } t.Logf("INSERT成功,插入行数: %d", rowsAffected) if rowsAffected != 1 { t.Errorf("INSERT影响行数错误: 期望1, 实际%d", rowsAffected) } }) // 提交事务 err = tx.Commit() if err != nil { t.Logf("提交事务失败: %v", err) } } func main() { // 仅用于直接运行测试 t := &testing.T{} TestTempTableUpsert(t) }