Nav apraksta
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

my_ora_clothingToDoris_test.go 10KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "sync"
  6. "testing"
  7. "time"
  8. "git.x2erp.com/qdy/go-base/config"
  9. "git.x2erp.com/qdy/go-base/types"
  10. "git.x2erp.com/qdy/go-db/factory/doris"
  11. "git.x2erp.com/qdy/go-db/factory/http"
  12. )
  13. // 一定要执行环境变量
  14. // export DB_CONFIG_PATH=/Users/kenqdy/Documents/v-bdx-workspace/db_doris.yaml
  15. // // QueryRequest 查询请求结构体
  16. // type QueryRequest struct {
  17. // SQL string `json:"sql" binding:"required"`
  18. // Params map[string]interface{} `json:"params,omitempty"` // 名称参数
  19. // PositionalParams []interface{} `json:"positionalParams,omitempty"` // 位置参数
  20. // }
  21. // 分页配置
  22. type PageConfig struct {
  23. TotalRows int // 总共需要查询多少行
  24. PageSize int // 每次返回多少行
  25. MaxWorkers int // 几条线程同时工作
  26. StartRow int //从哪行开始查询
  27. }
  28. // 查询任务
  29. type QueryTask struct {
  30. Page int
  31. StartRow int
  32. EndRow int
  33. QuerySQL string
  34. QueryParams []interface{}
  35. CSVData string
  36. Error error
  37. QueryTime time.Duration
  38. SaveTime time.Duration
  39. }
  40. func TestQueryAndInsertToDoris(t *testing.T) {
  41. // 记录总开始时间
  42. totalStartTime := time.Now()
  43. // 配置分页参数
  44. pageConfig := PageConfig{
  45. TotalRows: 400000, // 总共需要查询多少行
  46. PageSize: 3000, // 每次查询2000条
  47. MaxWorkers: 10, // 几条线程同时工作
  48. StartRow: 0, //从哪行开始查询
  49. }
  50. fmt.Printf("开始执行分页查询,总共%d行,每页%d条,%d线程工作\n", pageConfig.TotalRows, pageConfig.PageSize, pageConfig.MaxWorkers)
  51. // 1. 获取HTTP工厂实例
  52. httpFactory, err := http.GetHTTPFactory()
  53. if err != nil {
  54. t.Fatalf("Failed to get HTTP factory: %v", err)
  55. }
  56. fmt.Println("HTTP factory created successfully")
  57. // 7. 获取Doris工厂实例
  58. dorisFactory, err := doris.GetDorisFactory(httpFactory)
  59. if err != nil {
  60. t.Fatalf("Failed to get Doris factory: %v", err)
  61. }
  62. fmt.Println("Doris factory created successfully")
  63. // 检查Doris表结构
  64. fmt.Println("Checking Doris table structure...")
  65. // 获取Doris配置
  66. cfg, err := config.GetConfig()
  67. if err != nil {
  68. fmt.Println("failed to load config: %v", err)
  69. return
  70. }
  71. database := "X6_STOCK_DEV"
  72. table := "A3_CLOTHING"
  73. skipHeader := false // 改为true,跳过CSV头行
  74. url := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", cfg.GetDoris().FEHost, cfg.GetDoris().FEPort, database, table)
  75. fmt.Printf("Doris stream load URL: %s\n", url)
  76. // 计算需要的页数
  77. totalPages := (pageConfig.TotalRows + pageConfig.PageSize - 1) / pageConfig.PageSize
  78. fmt.Printf("预计总共需要查询 %d 页\n", totalPages)
  79. // 创建任务通道和结果通道
  80. taskChan := make(chan QueryTask, totalPages)
  81. resultChan := make(chan QueryTask, totalPages)
  82. doneChan := make(chan bool)
  83. var wg sync.WaitGroup
  84. var mu sync.Mutex
  85. // 执行统计变量
  86. totalQueryTime := time.Duration(0)
  87. totalSaveTime := time.Duration(0)
  88. totalRowsInserted := 0
  89. completedTasks := 0
  90. // 启动工作线程
  91. for i := 0; i < pageConfig.MaxWorkers; i++ {
  92. wg.Add(1)
  93. go func(workerID int) {
  94. defer wg.Done()
  95. // 每个worker创建自己的HTTP客户端
  96. httpClient := httpFactory.CreateClient()
  97. for task := range taskChan {
  98. fmt.Printf("Worker %d 处理第 %d 页 (行 %d-%d, CLOTHING_ID > %s)...\n",
  99. workerID, task.Page, task.StartRow, task.EndRow)
  100. // 记录查询开始时间
  101. queryStartTime := time.Now()
  102. // 准备查询请求
  103. queryRequest := types.QueryRequest{
  104. SQL: task.QuerySQL,
  105. PositionalParams: task.QueryParams,
  106. WriterHeader: false,
  107. }
  108. // 发送POST请求到 /api/query/csv 获取CSV格式数据
  109. resp, err := httpClient.PostWithAuth(
  110. "http://localhost:8080/api/query/csv/param",
  111. queryRequest,
  112. "123", // Bearer Token
  113. nil,
  114. )
  115. if err != nil {
  116. task.Error = fmt.Errorf("第%d页查询失败: %v", task.Page, err)
  117. resultChan <- task
  118. continue
  119. }
  120. if resp.StatusCode() != 200 {
  121. task.Error = fmt.Errorf("第%d页查询请求失败, 状态码: %d", task.Page, resp.StatusCode())
  122. resultChan <- task
  123. continue
  124. }
  125. // 获取CSV数据
  126. csvData := string(resp.Body())
  127. if len(csvData) == 0 {
  128. task.Error = fmt.Errorf("第%d页没有数据", task.Page)
  129. resultChan <- task
  130. continue
  131. }
  132. log.Printf("csvData:\n%s", csvData)
  133. // 记录查询结束时间
  134. queryEndTime := time.Now()
  135. queryDuration := queryEndTime.Sub(queryStartTime)
  136. task.QueryTime = queryDuration
  137. task.CSVData = csvData
  138. // 插入数据到Doris
  139. saveStartTime := time.Now()
  140. err = dorisFactory.InsertCSV(database, table, csvData, skipHeader)
  141. if err != nil {
  142. task.Error = fmt.Errorf("第%d页数据插入Doris失败: %v", task.Page, err)
  143. resultChan <- task
  144. continue
  145. }
  146. // 记录保存结束时间
  147. saveEndTime := time.Now()
  148. saveDuration := saveEndTime.Sub(saveStartTime)
  149. task.SaveTime = saveDuration
  150. resultChan <- task
  151. }
  152. }(i + 1)
  153. }
  154. // 启动结果处理协程
  155. go func() {
  156. for task := range resultChan {
  157. mu.Lock()
  158. if task.Error != nil {
  159. fmt.Printf("❌ 第 %d 页处理失败: %v\n", task.Page, task.Error)
  160. } else {
  161. // 计算本页数据行数
  162. estimatedRows := task.EndRow - task.StartRow + 1
  163. // 如果数据不足,按实际估算
  164. if len(task.CSVData) < estimatedRows*50 { // 保守估计每行至少50字符
  165. estimatedRows = len(task.CSVData) / 50
  166. }
  167. totalQueryTime += task.QueryTime
  168. totalSaveTime += task.SaveTime
  169. totalRowsInserted += estimatedRows
  170. completedTasks++
  171. fmt.Printf("✅ Worker 完成第 %d 页 (行 %d-%d, CLOTHING_ID > %s)\n",
  172. task.Page, task.StartRow, task.EndRow)
  173. fmt.Printf(" 查询耗时: %v, 保存耗时: %v\n", task.QueryTime, task.SaveTime)
  174. fmt.Printf(" 估算数据行数: %d\n", estimatedRows)
  175. }
  176. mu.Unlock()
  177. // 如果所有任务都完成了,发送完成信号
  178. if completedTasks >= totalPages {
  179. doneChan <- true
  180. break
  181. }
  182. }
  183. }()
  184. // 生成任务并发送到任务通道
  185. fmt.Println("\n📋 开始生成查询任务...")
  186. for page := 1; page <= totalPages; page++ {
  187. startRow := pageConfig.StartRow + (page-1)*pageConfig.PageSize + 1
  188. endRow := pageConfig.StartRow + page*pageConfig.PageSize
  189. // 检查不超过要查询的总行数
  190. maxEndRow := pageConfig.StartRow + pageConfig.TotalRows
  191. if endRow > maxEndRow {
  192. endRow = maxEndRow
  193. }
  194. // 生成查询SQL(使用参数模式)
  195. querySQL, queryParams := getSQLWithPagination(startRow, endRow)
  196. task := QueryTask{
  197. Page: page,
  198. StartRow: startRow,
  199. EndRow: endRow,
  200. QuerySQL: querySQL,
  201. QueryParams: queryParams,
  202. }
  203. fmt.Printf("生成第 %d 页任务 (行 %d-%d), 使用CLOTHING_ID > '%s'\n", page, startRow, endRow)
  204. taskChan <- task
  205. }
  206. // 关闭任务通道,通知worker没有更多任务
  207. close(taskChan)
  208. // 等待所有worker完成
  209. wg.Wait()
  210. // 关闭结果通道
  211. close(resultChan)
  212. // 等待结果处理完成
  213. <-doneChan
  214. // 记录总结束时间
  215. totalEndTime := time.Now()
  216. totalDuration := totalEndTime.Sub(totalStartTime)
  217. // 打印性能统计
  218. fmt.Println("\n📊 性能统计:")
  219. fmt.Printf(" 完成页数: %d/%d\n", completedTasks, totalPages)
  220. fmt.Printf(" 总查询耗时: %v\n", totalQueryTime)
  221. fmt.Printf(" 总保存耗时: %v\n", totalSaveTime)
  222. fmt.Printf(" 总耗时: %v\n", totalDuration)
  223. fmt.Printf(" 估算插入总行数: %d\n", totalRowsInserted)
  224. if completedTasks > 0 {
  225. fmt.Printf(" 平均每页查询耗时: %v\n", totalQueryTime/time.Duration(completedTasks))
  226. fmt.Printf(" 平均每页保存耗时: %v\n", totalSaveTime/time.Duration(completedTasks))
  227. fmt.Printf(" 平均每秒处理行数: %.2f\n", float64(totalRowsInserted)/totalDuration.Seconds())
  228. }
  229. fmt.Println("✅ 所有数据成功插入到 Doris!")
  230. }
  231. // getSQLWithPagination 生成带分页的SQL语句(参数模式)
  232. // 返回SQL语句和参数映射
  233. func getSQLWithPagination(startRow, endRow int) (string, []interface{}) {
  234. sql := `SELECT
  235. CLOTHING_ID,
  236. CLOTHING_YEAR,
  237. CLOTHING_NAME,
  238. STYLECOLOR_ID,
  239. STYLE_ID,
  240. COLOR_ID,
  241. SIZE_ID,
  242. CREATE_DATE,
  243. STYLE_GROUP,
  244. J_PRICE,
  245. X_PRICE,
  246. V_PRICE,
  247. CLERK_ROYALTYRATE,
  248. CLERK_ROYALTYPRICE,
  249. BRAND_CODE,
  250. STYLEVER_ID,
  251. J_COST,
  252. CLOTHING_IMG,
  253. STYLE_UNIT_CODE,
  254. STYLE_SEX_CODE,
  255. STYLE_KIND_CODE,
  256. STYLE_CLASS_CODE,
  257. STYLE_SUBCLASS_CODE,
  258. STYLE_DESIGNER_CODE,
  259. STYLE_PLATER_CODE,
  260. STYLE_STYLES_CODE,
  261. STYLE_LOCATE_CODE,
  262. STYLE_SALETYPE_CODE,
  263. STYLE_COLORSYSTEM_CODE,
  264. STYLE_THEME_CODE,
  265. STYLE_INDENTTYPE_CODE,
  266. STYLE_PRICEBAND_CODE,
  267. STYLE_MONTH_CODE,
  268. STYLE_COMPOSITION_CODE,
  269. STYLE_SUPPLIER_CODE,
  270. STYLE_SPARE1_CODE,
  271. STYLE_SPARE2_CODE,
  272. STYLE_SPARE4_CODE,
  273. STYLE_SPARE5_CODE,
  274. CATEGORY_CODE,
  275. BRAND_ID,
  276. STYCOLVER_ID,
  277. STYLE_SAME,
  278. CLOTHING_BARCODE,
  279. CLOTHING_HELPID,
  280. CLOTHING_GBCODE,
  281. CLOTHING_RFID,
  282. STYLE_SUBJECT_ID,
  283. SIZEGRP_ID,
  284. STYLE_HELPID,
  285. CLOTHING_GBCODE1,
  286. COLOR_NAME,
  287. STYLEVER_NAME,
  288. SIZE_NAME,
  289. STYLE_UNIT,
  290. STYLE_SEX,
  291. STYLE_KIND,
  292. STYLE_CLASS,
  293. STYLE_SUBCLASS,
  294. STYLE_DESIGNER,
  295. STYLE_PLATER,
  296. STYLE_BAND,
  297. STYLE_STYLES,
  298. STYLE_LOCATE,
  299. STYLE_SALETYPE,
  300. STYLE_COLORSYSTEM,
  301. STYLE_THEME,
  302. STYLE_INDENTTYPE,
  303. STYLE_PRICEBAND,
  304. STYLE_MONTH,
  305. STYLE_COMPOSITION,
  306. STYLE_SUPPLIER,
  307. STYLE_SPARE1,
  308. STYLE_SPARE2,
  309. STYLE_SPARE3,
  310. STYLE_SPARE4,
  311. STYLE_SPARE5,
  312. CATEGORY_NAME,
  313. BRAND_NAME,
  314. STYLE_YEAR_NAME,
  315. STYLE_SEARCH_KEY,
  316. STYLE_SUBJECT_NAME,
  317. CLOTHING_REMARK,
  318. STYLE_SPARE3_CODE,
  319. COST,
  320. BRAND_GROUPCODE,
  321. CLASS_GROUPCODE,
  322. MONTH_GROUPCODE,
  323. RETURNSUBJECT_ID,
  324. PRODUCT_SORT,
  325. CLOTHING_PARTITION
  326. FROM (
  327. SELECT a.*, ROWNUM as rn
  328. FROM (
  329. SELECT *
  330. FROM X6_STOCK_DEV.A3_CLOTHING
  331. ORDER BY CLOTHING_ID
  332. ) a
  333. WHERE ROWNUM <= :1
  334. )
  335. WHERE rn > :2`
  336. // 创建参数映射
  337. params := []interface{}{
  338. //lastClothingID,
  339. endRow,
  340. startRow - 1, // WHERE rn > :start_row 所以是startRow-1
  341. }
  342. return sql, params
  343. }