| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396 |
- package main
-
- import (
- "fmt"
- "log"
- "sync"
- "testing"
- "time"
-
- "git.x2erp.com/qdy/go-base/config"
- "git.x2erp.com/qdy/go-base/model/request"
-
- "git.x2erp.com/qdy/go-db/factory/doris"
- "git.x2erp.com/qdy/go-db/factory/http"
- )
-
- // 一定要执行环境变量
- // export DB_CONFIG_PATH=/Users/kenqdy/Documents/v-bdx-workspace/db_doris.yaml
-
- // // QueryRequest 查询请求结构体
- // type QueryRequest struct {
- // SQL string `json:"sql" binding:"required"`
- // Params map[string]interface{} `json:"params,omitempty"` // 名称参数
- // PositionalParams []interface{} `json:"positionalParams,omitempty"` // 位置参数
- // }
-
- // 分页配置
- type PageConfig struct {
- TotalRows int // 总共需要查询多少行
- PageSize int // 每次返回多少行
- MaxWorkers int // 几条线程同时工作
- StartRow int //从哪行开始查询
- }
-
- // 查询任务
- type QueryTask struct {
- Page int
- StartRow int
- EndRow int
- QuerySQL string
- QueryParams []interface{}
- CSVData string
- Error error
- QueryTime time.Duration
- SaveTime time.Duration
- }
-
- func TestQueryAndInsertToDoris(t *testing.T) {
- // 记录总开始时间
- totalStartTime := time.Now()
-
- // 配置分页参数
- pageConfig := PageConfig{
- TotalRows: 400000, // 总共需要查询多少行
- PageSize: 3000, // 每次查询2000条
- MaxWorkers: 10, // 几条线程同时工作
- StartRow: 0, //从哪行开始查询
- }
-
- fmt.Printf("开始执行分页查询,总共%d行,每页%d条,%d线程工作\n", pageConfig.TotalRows, pageConfig.PageSize, pageConfig.MaxWorkers)
-
- // 1. 获取HTTP工厂实例
- httpFactory, err := http.GetHTTPFactory()
- if err != nil {
- t.Fatalf("Failed to get HTTP factory: %v", err)
- }
- fmt.Println("HTTP factory created successfully")
-
- // 7. 获取Doris工厂实例
- dorisFactory, err := doris.GetDorisFactory(httpFactory)
- if err != nil {
- t.Fatalf("Failed to get Doris factory: %v", err)
- }
- fmt.Println("Doris factory created successfully")
-
- // 检查Doris表结构
- fmt.Println("Checking Doris table structure...")
-
- // 获取Doris配置
- cfg := config.GetConfig()
-
- database := "X6_STOCK_DEV"
- table := "A3_CLOTHING"
- skipHeader := false // 改为true,跳过CSV头行
- url := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", cfg.GetDorisConfig().FEHost, cfg.GetDorisConfig().FEPort, database, table)
- fmt.Printf("Doris stream load URL: %s\n", url)
-
- // 计算需要的页数
- totalPages := (pageConfig.TotalRows + pageConfig.PageSize - 1) / pageConfig.PageSize
- fmt.Printf("预计总共需要查询 %d 页\n", totalPages)
-
- // 创建任务通道和结果通道
- taskChan := make(chan QueryTask, totalPages)
- resultChan := make(chan QueryTask, totalPages)
- doneChan := make(chan bool)
-
- var wg sync.WaitGroup
- var mu sync.Mutex
-
- // 执行统计变量
- totalQueryTime := time.Duration(0)
- totalSaveTime := time.Duration(0)
- totalRowsInserted := 0
- completedTasks := 0
-
- // 启动工作线程
- for i := 0; i < pageConfig.MaxWorkers; i++ {
- wg.Add(1)
- go func(workerID int) {
- defer wg.Done()
-
- // 每个worker创建自己的HTTP客户端
- httpClient := httpFactory.CreateClient()
-
- for task := range taskChan {
- fmt.Printf("Worker %d 处理第 %d 页 (行 %d-%d, CLOTHING_ID > %s)...\n",
- workerID, task.Page, task.StartRow, task.EndRow)
-
- // 记录查询开始时间
- queryStartTime := time.Now()
-
- // 准备查询请求
- queryRequest := request.QueryRequest{
- SQL: task.QuerySQL,
- PositionalParams: task.QueryParams,
- WriterHeader: false,
- }
-
- // 发送POST请求到 /api/query/csv 获取CSV格式数据
- resp, err := httpClient.PostWithAuth(
- "http://localhost:8080/api/query/csv/param",
- queryRequest,
- "123", // Bearer Token
- nil,
- )
-
- if err != nil {
- task.Error = fmt.Errorf("第%d页查询失败: %v", task.Page, err)
- resultChan <- task
- continue
- }
-
- if resp.StatusCode() != 200 {
- task.Error = fmt.Errorf("第%d页查询请求失败, 状态码: %d", task.Page, resp.StatusCode())
- resultChan <- task
- continue
- }
-
- // 获取CSV数据
- csvData := string(resp.Body())
- if len(csvData) == 0 {
- task.Error = fmt.Errorf("第%d页没有数据", task.Page)
- resultChan <- task
- continue
- }
- log.Printf("csvData:\n%s", csvData)
- // 记录查询结束时间
- queryEndTime := time.Now()
- queryDuration := queryEndTime.Sub(queryStartTime)
- task.QueryTime = queryDuration
- task.CSVData = csvData
-
- // 插入数据到Doris
- saveStartTime := time.Now()
- err = dorisFactory.InsertCSV(database, table, csvData, skipHeader)
- if err != nil {
- task.Error = fmt.Errorf("第%d页数据插入Doris失败: %v", task.Page, err)
- resultChan <- task
- continue
- }
-
- // 记录保存结束时间
- saveEndTime := time.Now()
- saveDuration := saveEndTime.Sub(saveStartTime)
- task.SaveTime = saveDuration
-
- resultChan <- task
- }
- }(i + 1)
- }
-
- // 启动结果处理协程
- go func() {
- for task := range resultChan {
- mu.Lock()
-
- if task.Error != nil {
- fmt.Printf("❌ 第 %d 页处理失败: %v\n", task.Page, task.Error)
- } else {
- // 计算本页数据行数
- estimatedRows := task.EndRow - task.StartRow + 1
- // 如果数据不足,按实际估算
- if len(task.CSVData) < estimatedRows*50 { // 保守估计每行至少50字符
- estimatedRows = len(task.CSVData) / 50
- }
-
- totalQueryTime += task.QueryTime
- totalSaveTime += task.SaveTime
- totalRowsInserted += estimatedRows
- completedTasks++
-
- fmt.Printf("✅ Worker 完成第 %d 页 (行 %d-%d, CLOTHING_ID > %s)\n",
- task.Page, task.StartRow, task.EndRow)
- fmt.Printf(" 查询耗时: %v, 保存耗时: %v\n", task.QueryTime, task.SaveTime)
- fmt.Printf(" 估算数据行数: %d\n", estimatedRows)
-
- }
-
- mu.Unlock()
-
- // 如果所有任务都完成了,发送完成信号
- if completedTasks >= totalPages {
- doneChan <- true
- break
- }
- }
- }()
-
- // 生成任务并发送到任务通道
- fmt.Println("\n📋 开始生成查询任务...")
-
- for page := 1; page <= totalPages; page++ {
- startRow := pageConfig.StartRow + (page-1)*pageConfig.PageSize + 1
- endRow := pageConfig.StartRow + page*pageConfig.PageSize
-
- // 检查不超过要查询的总行数
- maxEndRow := pageConfig.StartRow + pageConfig.TotalRows
- if endRow > maxEndRow {
- endRow = maxEndRow
- }
-
- // 生成查询SQL(使用参数模式)
- querySQL, queryParams := getSQLWithPagination(startRow, endRow)
-
- task := QueryTask{
- Page: page,
- StartRow: startRow,
- EndRow: endRow,
- QuerySQL: querySQL,
- QueryParams: queryParams,
- }
-
- fmt.Printf("生成第 %d 页任务 (行 %d-%d), 使用CLOTHING_ID > '%s'\n", page, startRow, endRow)
-
- taskChan <- task
-
- }
-
- // 关闭任务通道,通知worker没有更多任务
- close(taskChan)
-
- // 等待所有worker完成
- wg.Wait()
-
- // 关闭结果通道
- close(resultChan)
-
- // 等待结果处理完成
- <-doneChan
-
- // 记录总结束时间
- totalEndTime := time.Now()
- totalDuration := totalEndTime.Sub(totalStartTime)
-
- // 打印性能统计
- fmt.Println("\n📊 性能统计:")
- fmt.Printf(" 完成页数: %d/%d\n", completedTasks, totalPages)
- fmt.Printf(" 总查询耗时: %v\n", totalQueryTime)
- fmt.Printf(" 总保存耗时: %v\n", totalSaveTime)
- fmt.Printf(" 总耗时: %v\n", totalDuration)
- fmt.Printf(" 估算插入总行数: %d\n", totalRowsInserted)
- if completedTasks > 0 {
- fmt.Printf(" 平均每页查询耗时: %v\n", totalQueryTime/time.Duration(completedTasks))
- fmt.Printf(" 平均每页保存耗时: %v\n", totalSaveTime/time.Duration(completedTasks))
- fmt.Printf(" 平均每秒处理行数: %.2f\n", float64(totalRowsInserted)/totalDuration.Seconds())
- }
-
- fmt.Println("✅ 所有数据成功插入到 Doris!")
- }
-
- // getSQLWithPagination 生成带分页的SQL语句(参数模式)
- // 返回SQL语句和参数映射
- func getSQLWithPagination(startRow, endRow int) (string, []interface{}) {
- sql := `SELECT
- CLOTHING_ID,
- CLOTHING_YEAR,
- CLOTHING_NAME,
- STYLECOLOR_ID,
- STYLE_ID,
- COLOR_ID,
- SIZE_ID,
- CREATE_DATE,
- STYLE_GROUP,
- J_PRICE,
- X_PRICE,
- V_PRICE,
- CLERK_ROYALTYRATE,
- CLERK_ROYALTYPRICE,
- BRAND_CODE,
- STYLEVER_ID,
- J_COST,
- CLOTHING_IMG,
- STYLE_UNIT_CODE,
- STYLE_SEX_CODE,
- STYLE_KIND_CODE,
- STYLE_CLASS_CODE,
- STYLE_SUBCLASS_CODE,
- STYLE_DESIGNER_CODE,
- STYLE_PLATER_CODE,
- STYLE_STYLES_CODE,
- STYLE_LOCATE_CODE,
- STYLE_SALETYPE_CODE,
- STYLE_COLORSYSTEM_CODE,
- STYLE_THEME_CODE,
- STYLE_INDENTTYPE_CODE,
- STYLE_PRICEBAND_CODE,
- STYLE_MONTH_CODE,
- STYLE_COMPOSITION_CODE,
- STYLE_SUPPLIER_CODE,
- STYLE_SPARE1_CODE,
- STYLE_SPARE2_CODE,
- STYLE_SPARE4_CODE,
- STYLE_SPARE5_CODE,
- CATEGORY_CODE,
- BRAND_ID,
- STYCOLVER_ID,
- STYLE_SAME,
- CLOTHING_BARCODE,
- CLOTHING_HELPID,
- CLOTHING_GBCODE,
- CLOTHING_RFID,
- STYLE_SUBJECT_ID,
- SIZEGRP_ID,
- STYLE_HELPID,
- CLOTHING_GBCODE1,
- COLOR_NAME,
- STYLEVER_NAME,
- SIZE_NAME,
- STYLE_UNIT,
- STYLE_SEX,
- STYLE_KIND,
- STYLE_CLASS,
- STYLE_SUBCLASS,
- STYLE_DESIGNER,
- STYLE_PLATER,
- STYLE_BAND,
- STYLE_STYLES,
- STYLE_LOCATE,
- STYLE_SALETYPE,
- STYLE_COLORSYSTEM,
- STYLE_THEME,
- STYLE_INDENTTYPE,
- STYLE_PRICEBAND,
- STYLE_MONTH,
- STYLE_COMPOSITION,
- STYLE_SUPPLIER,
- STYLE_SPARE1,
- STYLE_SPARE2,
- STYLE_SPARE3,
- STYLE_SPARE4,
- STYLE_SPARE5,
- CATEGORY_NAME,
- BRAND_NAME,
- STYLE_YEAR_NAME,
- STYLE_SEARCH_KEY,
- STYLE_SUBJECT_NAME,
- CLOTHING_REMARK,
- STYLE_SPARE3_CODE,
- COST,
- BRAND_GROUPCODE,
- CLASS_GROUPCODE,
- MONTH_GROUPCODE,
- RETURNSUBJECT_ID,
- PRODUCT_SORT,
- CLOTHING_PARTITION
- FROM (
- SELECT a.*, ROWNUM as rn
- FROM (
- SELECT *
- FROM X6_STOCK_DEV.A3_CLOTHING
-
- ORDER BY CLOTHING_ID
- ) a
- WHERE ROWNUM <= :1
- )
- WHERE rn > :2`
-
- // 创建参数映射
- params := []interface{}{
- //lastClothingID,
- endRow,
- startRow - 1, // WHERE rn > :start_row 所以是startRow-1
- }
-
- return sql, params
- }
|