Нема описа
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "sync"
  6. "testing"
  7. "time"
  8. "git.x2erp.com/qdy/go-base/config"
  9. "git.x2erp.com/qdy/go-base/types"
  10. "git.x2erp.com/qdy/go-db/factory/doris"
  11. "git.x2erp.com/qdy/go-db/factory/http"
  12. )
  13. // // QueryRequest 查询请求结构体
  14. // type QueryRequest struct {
  15. // SQL string `json:"sql" binding:"required"`
  16. // Params map[string]interface{} `json:"params,omitempty"` // 名称参数
  17. // PositionalParams []interface{} `json:"positionalParams,omitempty"` // 位置参数
  18. // }
  19. // 分页配置
  20. type PageConfig1 struct {
  21. TotalRows int // 总共需要查询多少行
  22. PageSize int // 每次返回多少行
  23. MaxWorkers int // 几条线程同时工作
  24. StartRow int //从哪行开始查询
  25. }
  26. // 查询任务
  27. type QueryTask1 struct {
  28. Page int
  29. StartRow int
  30. EndRow int
  31. QuerySQL string
  32. QueryParams []interface{}
  33. CSVData string
  34. Error error
  35. QueryTime time.Duration
  36. SaveTime time.Duration
  37. }
  38. func TestQueryAndInsertToDoris1(t *testing.T) {
  39. // 记录总开始时间
  40. totalStartTime := time.Now()
  41. // 配置分页参数
  42. pageConfig := PageConfig{
  43. TotalRows: 100, // 总共需要查询多少行
  44. PageSize: 50, // 每次查询2000条
  45. MaxWorkers: 2, // 几条线程同时工作
  46. StartRow: 0, //从哪行开始查询
  47. }
  48. fmt.Printf("开始执行分页查询,总共%d行,每页%d条,%d线程工作\n", pageConfig.TotalRows, pageConfig.PageSize, pageConfig.MaxWorkers)
  49. // 1. 获取HTTP工厂实例
  50. httpFactory, err := http.GetHTTPFactory()
  51. if err != nil {
  52. t.Fatalf("Failed to get HTTP factory: %v", err)
  53. }
  54. fmt.Println("HTTP factory created successfully")
  55. // 7. 获取Doris工厂实例
  56. dorisFactory, err := doris.GetDorisFactory(httpFactory)
  57. if err != nil {
  58. t.Fatalf("Failed to get Doris factory: %v", err)
  59. }
  60. fmt.Println("Doris factory created successfully")
  61. // 检查Doris表结构
  62. fmt.Println("Checking Doris table structure...")
  63. // 获取Doris配置
  64. cfg, err := config.GetConfig()
  65. if err != nil {
  66. t.Fatalf("failed to load config: %v", err)
  67. return
  68. }
  69. database := "X6_STOCK_DEV"
  70. table := "A3_CLOTHING_LOG"
  71. skipHeader := false // 改为true,跳过CSV头行
  72. url := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", cfg.GetDoris().FEHost, cfg.GetDoris().FEPort, database, table)
  73. fmt.Printf("Doris stream load URL: %s\n", url)
  74. // 计算需要的页数
  75. totalPages := (pageConfig.TotalRows + pageConfig.PageSize - 1) / pageConfig.PageSize
  76. fmt.Printf("预计总共需要查询 %d 页\n", totalPages)
  77. // 创建任务通道和结果通道
  78. taskChan := make(chan QueryTask, totalPages)
  79. resultChan := make(chan QueryTask, totalPages)
  80. doneChan := make(chan bool)
  81. var wg sync.WaitGroup
  82. var mu sync.Mutex
  83. // 执行统计变量
  84. totalQueryTime := time.Duration(0)
  85. totalSaveTime := time.Duration(0)
  86. totalRowsInserted := 0
  87. completedTasks := 0
  88. // 启动工作线程
  89. for i := 0; i < pageConfig.MaxWorkers; i++ {
  90. wg.Add(1)
  91. go func(workerID int) {
  92. defer wg.Done()
  93. // 每个worker创建自己的HTTP客户端
  94. httpClient := httpFactory.CreateClient()
  95. for task := range taskChan {
  96. fmt.Printf("Worker %d 处理第 %d 页 (行 %d-%d, CLOTHING_ID > %s)...\n",
  97. workerID, task.Page, task.StartRow, task.EndRow)
  98. // 记录查询开始时间
  99. queryStartTime := time.Now()
  100. // 准备查询请求
  101. queryRequest := types.QueryRequest{
  102. SQL: task.QuerySQL,
  103. PositionalParams: task.QueryParams,
  104. WriterHeader: false,
  105. }
  106. // 发送POST请求到 /api/query/csv 获取CSV格式数据
  107. resp, err := httpClient.PostWithAuth(
  108. "http://localhost:8080/api/query/csv/param",
  109. queryRequest,
  110. "123", // Bearer Token
  111. nil,
  112. )
  113. if err != nil {
  114. task.Error = fmt.Errorf("第%d页查询失败: %v", task.Page, err)
  115. resultChan <- task
  116. continue
  117. }
  118. if resp.StatusCode() != 200 {
  119. task.Error = fmt.Errorf("第%d页查询请求失败, 状态码: %d", task.Page, resp.StatusCode())
  120. resultChan <- task
  121. continue
  122. }
  123. // 获取CSV数据
  124. csvData := string(resp.Body())
  125. if len(csvData) == 0 {
  126. task.Error = fmt.Errorf("第%d页没有数据", task.Page)
  127. resultChan <- task
  128. continue
  129. }
  130. log.Printf("csvData:\n%s", csvData)
  131. // 记录查询结束时间
  132. queryEndTime := time.Now()
  133. queryDuration := queryEndTime.Sub(queryStartTime)
  134. task.QueryTime = queryDuration
  135. task.CSVData = csvData
  136. // 插入数据到Doris
  137. saveStartTime := time.Now()
  138. err = dorisFactory.InsertCSV(database, table, csvData, skipHeader)
  139. if err != nil {
  140. task.Error = fmt.Errorf("第%d页数据插入Doris失败: %v", task.Page, err)
  141. resultChan <- task
  142. continue
  143. }
  144. // 记录保存结束时间
  145. saveEndTime := time.Now()
  146. saveDuration := saveEndTime.Sub(saveStartTime)
  147. task.SaveTime = saveDuration
  148. resultChan <- task
  149. }
  150. }(i + 1)
  151. }
  152. // 启动结果处理协程
  153. go func() {
  154. for task := range resultChan {
  155. mu.Lock()
  156. if task.Error != nil {
  157. fmt.Printf("❌ 第 %d 页处理失败: %v\n", task.Page, task.Error)
  158. } else {
  159. // 计算本页数据行数
  160. estimatedRows := task.EndRow - task.StartRow + 1
  161. // 如果数据不足,按实际估算
  162. if len(task.CSVData) < estimatedRows*50 { // 保守估计每行至少50字符
  163. estimatedRows = len(task.CSVData) / 50
  164. }
  165. totalQueryTime += task.QueryTime
  166. totalSaveTime += task.SaveTime
  167. totalRowsInserted += estimatedRows
  168. completedTasks++
  169. fmt.Printf("✅ Worker 完成第 %d 页 (行 %d-%d, CLOTHING_ID > %s)\n",
  170. task.Page, task.StartRow, task.EndRow)
  171. fmt.Printf(" 查询耗时: %v, 保存耗时: %v\n", task.QueryTime, task.SaveTime)
  172. fmt.Printf(" 估算数据行数: %d\n", estimatedRows)
  173. }
  174. mu.Unlock()
  175. // 如果所有任务都完成了,发送完成信号
  176. if completedTasks >= totalPages {
  177. doneChan <- true
  178. break
  179. }
  180. }
  181. }()
  182. // 生成任务并发送到任务通道
  183. fmt.Println("\n📋 开始生成查询任务...")
  184. for page := 1; page <= totalPages; page++ {
  185. startRow := pageConfig.StartRow + (page-1)*pageConfig.PageSize + 1
  186. endRow := pageConfig.StartRow + page*pageConfig.PageSize
  187. // 检查不超过要查询的总行数
  188. maxEndRow := pageConfig.StartRow + pageConfig.TotalRows
  189. if endRow > maxEndRow {
  190. endRow = maxEndRow
  191. }
  192. // 生成查询SQL(使用参数模式)
  193. querySQL, queryParams := getSQLWithPagination(startRow, endRow)
  194. task := QueryTask{
  195. Page: page,
  196. StartRow: startRow,
  197. EndRow: endRow,
  198. QuerySQL: querySQL,
  199. QueryParams: queryParams,
  200. }
  201. fmt.Printf("生成第 %d 页任务 (行 %d-%d), 使用CLOTHING_ID > '%s'\n", page, startRow, endRow)
  202. taskChan <- task
  203. }
  204. // 关闭任务通道,通知worker没有更多任务
  205. close(taskChan)
  206. // 等待所有worker完成
  207. wg.Wait()
  208. // 关闭结果通道
  209. close(resultChan)
  210. // 等待结果处理完成
  211. <-doneChan
  212. // 记录总结束时间
  213. totalEndTime := time.Now()
  214. totalDuration := totalEndTime.Sub(totalStartTime)
  215. // 打印性能统计
  216. fmt.Println("\n📊 性能统计:")
  217. fmt.Printf(" 完成页数: %d/%d\n", completedTasks, totalPages)
  218. fmt.Printf(" 总查询耗时: %v\n", totalQueryTime)
  219. fmt.Printf(" 总保存耗时: %v\n", totalSaveTime)
  220. fmt.Printf(" 总耗时: %v\n", totalDuration)
  221. fmt.Printf(" 估算插入总行数: %d\n", totalRowsInserted)
  222. if completedTasks > 0 {
  223. fmt.Printf(" 平均每页查询耗时: %v\n", totalQueryTime/time.Duration(completedTasks))
  224. fmt.Printf(" 平均每页保存耗时: %v\n", totalSaveTime/time.Duration(completedTasks))
  225. fmt.Printf(" 平均每秒处理行数: %.2f\n", float64(totalRowsInserted)/totalDuration.Seconds())
  226. }
  227. fmt.Println("✅ 所有数据成功插入到 Doris!")
  228. }
  229. // getSQLWithPagination 生成带分页的SQL语句(参数模式)
  230. // 返回SQL语句和参数映射
  231. func getSQLWithPagination1(startRow, endRow int) (string, []interface{}) {
  232. sql := `SELECT
  233. CLOTHING_ID,
  234. CLOTHING_YEAR,
  235. CLOTHING_NAME
  236. FROM (
  237. SELECT a.*, ROWNUM as rn
  238. FROM (
  239. SELECT *
  240. FROM X6_STOCK_DEV.A3_CLOTHING
  241. WHERE CLOTHING_ID > :1
  242. ORDER BY CLOTHING_ID
  243. ) a
  244. WHERE ROWNUM <= :2
  245. )
  246. WHERE rn > :3`
  247. // 创建参数映射
  248. params := []interface{}{
  249. endRow,
  250. startRow - 1, // WHERE rn > :start_row 所以是startRow-1
  251. }
  252. return sql, params
  253. }