Ingen beskrivning
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

batch_save_dic_tables.go 12KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. package dicmanagement
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "git.x2erp.com/qdy/go-base/ctx"
  7. "git.x2erp.com/qdy/go-base/logger"
  8. "git.x2erp.com/qdy/go-base/model/response"
  9. "git.x2erp.com/qdy/go-base/util"
  10. "git.x2erp.com/qdy/go-db/factory/database"
  11. "github.com/google/uuid"
  12. "github.com/jmoiron/sqlx"
  13. )
  14. // BatchSaveDicTables 批量保存数据库表字典
  15. func BatchSaveDicTables(req *BatchSaveDicTablesRequest, ctx context.Context, dbFactory *database.DBFactory, reqCtx *ctx.RequestContext) *response.QueryResult[bool] {
  16. logger.Debug("BatchSaveDicTables-开始批量保存数据库表字典")
  17. // 参数验证
  18. if err := validateBatchSaveRequest(req); err != nil {
  19. logger.ErrorC(reqCtx, fmt.Sprintf("参数验证失败: %v", err))
  20. return util.CreateErrorResult[bool](fmt.Sprintf("参数验证失败: %v", err), reqCtx)
  21. }
  22. // 1.1: 从字段字典表中收集所有表名称,然后跟表集合中对比
  23. if err := validateTableReferences(req); err != nil {
  24. logger.ErrorC(reqCtx, fmt.Sprintf("表引用验证失败: %v", err))
  25. return util.CreateErrorResult[bool](fmt.Sprintf("表引用验证失败: %v", err), reqCtx)
  26. }
  27. // 获取数据库连接并开始事务
  28. db := dbFactory.GetDB()
  29. tx, err := db.BeginTxx(ctx, nil)
  30. if err != nil {
  31. logger.ErrorC(reqCtx, fmt.Sprintf("开始事务失败: %v", err))
  32. return util.CreateErrorResult[bool](fmt.Sprintf("开始事务失败: %v", err), reqCtx)
  33. }
  34. defer func() {
  35. if p := recover(); p != nil {
  36. tx.Rollback()
  37. panic(p)
  38. }
  39. }()
  40. // 获取当前用户
  41. creator := reqCtx.UserID
  42. if creator == "" {
  43. creator = "system"
  44. }
  45. // 2. 首先把这两个集合插入到内存表(数据库是mysql)
  46. // 使用临时表存储批量数据
  47. if err := batchProcessWithTempTables(ctx, tx, req, creator); err != nil {
  48. tx.Rollback()
  49. logger.ErrorC(reqCtx, fmt.Sprintf("批量处理失败: %v", err))
  50. return util.CreateErrorResult[bool](fmt.Sprintf("批量处理失败: %v", err), reqCtx)
  51. }
  52. // 提交事务
  53. if err := tx.Commit(); err != nil {
  54. logger.ErrorC(reqCtx, fmt.Sprintf("提交事务失败: %v", err))
  55. return util.CreateErrorResult[bool](fmt.Sprintf("提交事务失败: %v", err), reqCtx)
  56. }
  57. logger.Debug(fmt.Sprintf("成功批量保存数据库表字典: %d 个表, %d 个字段", len(req.Tables), len(req.Fields)))
  58. return util.CreateSuccessResultData[bool](true, reqCtx)
  59. }
  60. // validateBatchSaveRequest 验证批量保存请求
  61. func validateBatchSaveRequest(req *BatchSaveDicTablesRequest) error {
  62. if len(req.Tables) == 0 {
  63. return fmt.Errorf("表集合不能为空")
  64. }
  65. if len(req.Fields) == 0 {
  66. return fmt.Errorf("字段集合不能为空")
  67. }
  68. // 验证表集合
  69. tableIDs := make(map[string]bool)
  70. for i, table := range req.Tables {
  71. if table.TableID == "" {
  72. return fmt.Errorf("第%d个表的表ID不能为空", i+1)
  73. }
  74. if table.TableType == "" {
  75. return fmt.Errorf("第%d个表的表类型不能为空", i+1)
  76. }
  77. if table.Name == "" {
  78. return fmt.Errorf("第%d个表的表名称不能为空", i+1)
  79. }
  80. tableIDs[table.TableID] = true
  81. }
  82. // 验证字段集合
  83. for i, field := range req.Fields {
  84. if field.FieldID == "" {
  85. return fmt.Errorf("第%d个字段的字段ID不能为空", i+1)
  86. }
  87. if field.TableID == "" {
  88. return fmt.Errorf("第%d个字段的表ID不能为空", i+1)
  89. }
  90. if field.FieldName == "" {
  91. return fmt.Errorf("第%d个字段的字段名称不能为空", i+1)
  92. }
  93. if field.FiledType == "" {
  94. return fmt.Errorf("第%d个字段的字段类型不能为空", i+1)
  95. }
  96. if field.DataType == "" {
  97. return fmt.Errorf("第%d个字段的数据类型不能为空", i+1)
  98. }
  99. // 验证字段ID格式:table_id + "." + field_name
  100. expectedFieldID := field.TableID + "." + field.FieldName
  101. if field.FieldID != expectedFieldID {
  102. return fmt.Errorf("第%d个字段的字段ID格式错误:期望 '%s',实际 '%s'", i+1, expectedFieldID, field.FieldID)
  103. }
  104. }
  105. return nil
  106. }
  107. // validateTableReferences 验证表引用
  108. func validateTableReferences(req *BatchSaveDicTablesRequest) error {
  109. // 收集表集合中的所有表ID
  110. tableIDSet := make(map[string]bool)
  111. for _, table := range req.Tables {
  112. tableIDSet[table.TableID] = true
  113. }
  114. // 检查字段集合中的所有表ID是否都在表集合中存在
  115. for i, field := range req.Fields {
  116. if !tableIDSet[field.TableID] {
  117. return fmt.Errorf("第%d个字段引用的表ID '%s' 不在表集合中", i+1, field.TableID)
  118. }
  119. }
  120. return nil
  121. }
  122. // batchProcessWithTempTables 使用临时表批量处理数据
  123. func batchProcessWithTempTables(ctx context.Context, tx *sqlx.Tx, req *BatchSaveDicTablesRequest, creator string) error {
  124. // 创建临时表存储批量表数据
  125. tempTablesTable := "temp_dic_table_batch_" + uuid.New().String()[:8]
  126. if err := createTempTables(ctx, tx, tempTablesTable); err != nil {
  127. return fmt.Errorf("创建临时表失败: %w", err)
  128. }
  129. defer cleanupTempTable(ctx, tx, tempTablesTable)
  130. // 批量插入表数据到临时表
  131. if err := batchInsertTablesToTemp(ctx, tx, tempTablesTable, req.Tables, creator); err != nil {
  132. return fmt.Errorf("批量插入表数据到临时表失败: %w", err)
  133. }
  134. // 3. 跟dic_table.go对比,在dic_table.go中存在的,使用内存表更新;在dic_table.go不存在的插入
  135. if err := upsertTablesFromTemp(ctx, tx, tempTablesTable); err != nil {
  136. return fmt.Errorf("更新/插入表失败: %w", err)
  137. }
  138. // 创建临时表存储批量字段数据
  139. tempFieldsTable := "temp_dic_table_field_batch_" + uuid.New().String()[:8]
  140. if err := createTempFieldsTable(ctx, tx, tempFieldsTable); err != nil {
  141. return fmt.Errorf("创建临时字段表失败: %w", err)
  142. }
  143. defer cleanupTempTable(ctx, tx, tempFieldsTable)
  144. // 批量插入字段数据到临时表
  145. if err := batchInsertFieldsToTemp(ctx, tx, tempFieldsTable, req.Fields, creator); err != nil {
  146. return fmt.Errorf("批量插入字段数据到临时表失败: %w", err)
  147. }
  148. // 4. dic_table_field.go 字段字典处理:删除相关表的所有字段,然后批量插入
  149. if err := upsertFieldsFromTemp(ctx, tx, tempFieldsTable); err != nil {
  150. return fmt.Errorf("处理字段失败: %w", err)
  151. }
  152. return nil
  153. }
  154. // createTempTables 创建临时表存储批量表数据
  155. func createTempTables(ctx context.Context, tx *sqlx.Tx, tableName string) error {
  156. createSQL := fmt.Sprintf(`
  157. CREATE TEMPORARY TABLE %s (
  158. table_id VARCHAR(64) NOT NULL PRIMARY KEY,
  159. table_type VARCHAR(20) NOT NULL,
  160. table_name VARCHAR(100) NOT NULL,
  161. description VARCHAR(500) NOT NULL DEFAULT '',
  162. creator VARCHAR(32) NOT NULL
  163. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  164. `, tableName)
  165. _, err := tx.ExecContext(ctx, createSQL)
  166. return err
  167. }
  168. // createTempFieldsTable 创建临时字段表
  169. func createTempFieldsTable(ctx context.Context, tx *sqlx.Tx, tableName string) error {
  170. createSQL := fmt.Sprintf(`
  171. CREATE TEMPORARY TABLE %s (
  172. id VARCHAR(128) NOT NULL PRIMARY KEY,
  173. field_id VARCHAR(128) NOT NULL,
  174. table_id VARCHAR(64) NOT NULL,
  175. filed_type VARCHAR(20) NOT NULL,
  176. data_type VARCHAR(20) NOT NULL,
  177. field_name VARCHAR(64) NOT NULL,
  178. field_name_cn VARCHAR(64) NOT NULL DEFAULT '',
  179. description VARCHAR(500) NOT NULL DEFAULT '',
  180. creator VARCHAR(32) NOT NULL
  181. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
  182. `, tableName)
  183. _, err := tx.ExecContext(ctx, createSQL)
  184. return err
  185. }
  186. // cleanupTempTable 清理临时表
  187. func cleanupTempTable(ctx context.Context, tx *sqlx.Tx, tableName string) {
  188. dropSQL := fmt.Sprintf("DROP TEMPORARY TABLE IF EXISTS %s", tableName)
  189. tx.ExecContext(ctx, dropSQL)
  190. }
  191. // batchInsertTablesToTemp 批量插入表数据到临时表
  192. func batchInsertTablesToTemp(ctx context.Context, tx *sqlx.Tx, tempTableName string, tables []DicTableRequest, creator string) error {
  193. if len(tables) == 0 {
  194. return nil
  195. }
  196. query := fmt.Sprintf("INSERT INTO %s (table_id, table_type, table_name, description, creator) VALUES ", tempTableName)
  197. valueStrings := make([]string, 0, len(tables))
  198. valueArgs := make([]interface{}, 0, len(tables)*5)
  199. for _, table := range tables {
  200. valueStrings = append(valueStrings, "(?, ?, ?, ?, ?)")
  201. valueArgs = append(valueArgs, table.TableID)
  202. valueArgs = append(valueArgs, table.TableType)
  203. valueArgs = append(valueArgs, table.Name)
  204. valueArgs = append(valueArgs, table.Description)
  205. valueArgs = append(valueArgs, creator)
  206. }
  207. fullQuery := query + strings.Join(valueStrings, ", ")
  208. _, err := tx.ExecContext(ctx, fullQuery, valueArgs...)
  209. return err
  210. }
  211. // batchInsertFieldsToTemp 批量插入字段数据到临时表
  212. func batchInsertFieldsToTemp(ctx context.Context, tx *sqlx.Tx, tempTableName string, fields []DicTableFieldRequest, creator string) error {
  213. if len(fields) == 0 {
  214. return nil
  215. }
  216. query := fmt.Sprintf("INSERT INTO %s (id, field_id, table_id, filed_type, data_type, field_name, field_name_cn, description, creator) VALUES ", tempTableName)
  217. valueStrings := make([]string, 0, len(fields))
  218. valueArgs := make([]interface{}, 0, len(fields)*9)
  219. for _, field := range fields {
  220. valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?)")
  221. valueArgs = append(valueArgs, field.FieldID) // id字段
  222. valueArgs = append(valueArgs, field.FieldID) // field_id字段
  223. valueArgs = append(valueArgs, field.TableID)
  224. valueArgs = append(valueArgs, field.FiledType)
  225. valueArgs = append(valueArgs, field.DataType)
  226. valueArgs = append(valueArgs, field.FieldName)
  227. valueArgs = append(valueArgs, field.FieldNameCN)
  228. valueArgs = append(valueArgs, field.Description)
  229. valueArgs = append(valueArgs, creator)
  230. }
  231. fullQuery := query + strings.Join(valueStrings, ", ")
  232. _, err := tx.ExecContext(ctx, fullQuery, valueArgs...)
  233. return err
  234. }
  235. // upsertTablesFromTemp 从临时表更新或插入表数据
  236. func upsertTablesFromTemp(ctx context.Context, tx *sqlx.Tx, tempTableName string) error {
  237. // 更新已存在的记录
  238. updateSQL := fmt.Sprintf(`
  239. UPDATE dic_table dt
  240. JOIN %s tt ON dt.table_id = tt.table_id AND dt.deleted_at IS NULL
  241. SET dt.table_type = tt.table_type,
  242. dt.table_name = tt.table_name,
  243. dt.description = tt.description,
  244. dt.updated_at = CURRENT_TIMESTAMP
  245. `, tempTableName)
  246. if _, err := tx.ExecContext(ctx, updateSQL); err != nil {
  247. return fmt.Errorf("更新表记录失败: %w", err)
  248. }
  249. // 插入不存在的记录
  250. insertSQL := fmt.Sprintf(`
  251. INSERT INTO dic_table (table_id, table_type, table_name, description, creator, created_at, updated_at)
  252. SELECT tt.table_id, tt.table_type, tt.table_name, tt.description, tt.creator, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
  253. FROM %s tt
  254. WHERE NOT EXISTS (
  255. SELECT 1 FROM dic_table dt WHERE dt.table_id = tt.table_id AND dt.deleted_at IS NULL
  256. )
  257. `, tempTableName)
  258. if _, err := tx.ExecContext(ctx, insertSQL); err != nil {
  259. return fmt.Errorf("插入新表记录失败: %w", err)
  260. }
  261. return nil
  262. }
  263. // upsertFieldsFromTemp 从临时表删除并重新插入字段数据
  264. func upsertFieldsFromTemp(ctx context.Context, tx *sqlx.Tx, tempTableName string) error {
  265. // 1. 删除本次批量中所有表的所有字段记录
  266. deleteSQL := fmt.Sprintf(`
  267. DELETE dtf
  268. FROM dic_table_field dtf
  269. WHERE EXISTS (SELECT 1 FROM %s tt WHERE tt.table_id = dtf.table_id)
  270. AND dtf.deleted_at IS NULL
  271. `, tempTableName)
  272. if _, err := tx.ExecContext(ctx, deleteSQL); err != nil {
  273. return fmt.Errorf("删除字段记录失败: %w", err)
  274. }
  275. // 2. 批量插入字段记录
  276. insertSQL := fmt.Sprintf(`
  277. INSERT INTO dic_table_field
  278. (id, field_id, table_id, filed_type, data_type, field_name, field_name_cn, description, creator, created_at, updated_at)
  279. SELECT
  280. tt.field_id, tt.field_id, tt.table_id, tt.filed_type, tt.data_type, tt.field_name,
  281. tt.field_name_cn, tt.description, tt.creator, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
  282. FROM %s tt
  283. `, tempTableName)
  284. if _, err := tx.ExecContext(ctx, insertSQL); err != nil {
  285. return fmt.Errorf("插入字段记录失败: %w", err)
  286. }
  287. return nil
  288. }