package main import ( "fmt" "log" "sync" "testing" "time" "git.x2erp.com/qdy/go-base/config" "git.x2erp.com/qdy/go-base/types" "git.x2erp.com/qdy/go-db/factory/doris" "git.x2erp.com/qdy/go-db/factory/http" ) // 一定要执行环境变量 // export DB_CONFIG_PATH=/Users/kenqdy/Documents/v-bdx-workspace/db_doris.yaml // // QueryRequest 查询请求结构体 // type QueryRequest struct { // SQL string `json:"sql" binding:"required"` // Params map[string]interface{} `json:"params,omitempty"` // 名称参数 // PositionalParams []interface{} `json:"positionalParams,omitempty"` // 位置参数 // } // 分页配置 type PageConfig struct { TotalRows int // 总共需要查询多少行 PageSize int // 每次返回多少行 MaxWorkers int // 几条线程同时工作 StartRow int //从哪行开始查询 } // 查询任务 type QueryTask struct { Page int StartRow int EndRow int QuerySQL string QueryParams []interface{} CSVData string Error error QueryTime time.Duration SaveTime time.Duration } func TestQueryAndInsertToDoris(t *testing.T) { // 记录总开始时间 totalStartTime := time.Now() // 配置分页参数 pageConfig := PageConfig{ TotalRows: 400000, // 总共需要查询多少行 PageSize: 3000, // 每次查询2000条 MaxWorkers: 10, // 几条线程同时工作 StartRow: 0, //从哪行开始查询 } fmt.Printf("开始执行分页查询,总共%d行,每页%d条,%d线程工作\n", pageConfig.TotalRows, pageConfig.PageSize, pageConfig.MaxWorkers) // 1. 获取HTTP工厂实例 httpFactory, err := http.GetHTTPFactory() if err != nil { t.Fatalf("Failed to get HTTP factory: %v", err) } fmt.Println("HTTP factory created successfully") // 7. 获取Doris工厂实例 dorisFactory, err := doris.GetDorisFactory(httpFactory) if err != nil { t.Fatalf("Failed to get Doris factory: %v", err) } fmt.Println("Doris factory created successfully") // 检查Doris表结构 fmt.Println("Checking Doris table structure...") // 获取Doris配置 cfg := config.GetConfig() database := "X6_STOCK_DEV" table := "A3_CLOTHING" skipHeader := false // 改为true,跳过CSV头行 url := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", cfg.GetDoris().FEHost, cfg.GetDoris().FEPort, database, table) fmt.Printf("Doris stream load URL: %s\n", url) // 计算需要的页数 totalPages := (pageConfig.TotalRows + pageConfig.PageSize - 1) / pageConfig.PageSize fmt.Printf("预计总共需要查询 %d 页\n", totalPages) // 创建任务通道和结果通道 taskChan := make(chan QueryTask, totalPages) resultChan := make(chan QueryTask, totalPages) doneChan := make(chan bool) var wg sync.WaitGroup var mu sync.Mutex // 执行统计变量 totalQueryTime := time.Duration(0) totalSaveTime := time.Duration(0) totalRowsInserted := 0 completedTasks := 0 // 启动工作线程 for i := 0; i < pageConfig.MaxWorkers; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() // 每个worker创建自己的HTTP客户端 httpClient := httpFactory.CreateClient() for task := range taskChan { fmt.Printf("Worker %d 处理第 %d 页 (行 %d-%d, CLOTHING_ID > %s)...\n", workerID, task.Page, task.StartRow, task.EndRow) // 记录查询开始时间 queryStartTime := time.Now() // 准备查询请求 queryRequest := types.QueryRequest{ SQL: task.QuerySQL, PositionalParams: task.QueryParams, WriterHeader: false, } // 发送POST请求到 /api/query/csv 获取CSV格式数据 resp, err := httpClient.PostWithAuth( "http://localhost:8080/api/query/csv/param", queryRequest, "123", // Bearer Token nil, ) if err != nil { task.Error = fmt.Errorf("第%d页查询失败: %v", task.Page, err) resultChan <- task continue } if resp.StatusCode() != 200 { task.Error = fmt.Errorf("第%d页查询请求失败, 状态码: %d", task.Page, resp.StatusCode()) resultChan <- task continue } // 获取CSV数据 csvData := string(resp.Body()) if len(csvData) == 0 { task.Error = fmt.Errorf("第%d页没有数据", task.Page) resultChan <- task continue } log.Printf("csvData:\n%s", csvData) // 记录查询结束时间 queryEndTime := time.Now() queryDuration := queryEndTime.Sub(queryStartTime) task.QueryTime = queryDuration task.CSVData = csvData // 插入数据到Doris saveStartTime := time.Now() err = dorisFactory.InsertCSV(database, table, csvData, skipHeader) if err != nil { task.Error = fmt.Errorf("第%d页数据插入Doris失败: %v", task.Page, err) resultChan <- task continue } // 记录保存结束时间 saveEndTime := time.Now() saveDuration := saveEndTime.Sub(saveStartTime) task.SaveTime = saveDuration resultChan <- task } }(i + 1) } // 启动结果处理协程 go func() { for task := range resultChan { mu.Lock() if task.Error != nil { fmt.Printf("❌ 第 %d 页处理失败: %v\n", task.Page, task.Error) } else { // 计算本页数据行数 estimatedRows := task.EndRow - task.StartRow + 1 // 如果数据不足,按实际估算 if len(task.CSVData) < estimatedRows*50 { // 保守估计每行至少50字符 estimatedRows = len(task.CSVData) / 50 } totalQueryTime += task.QueryTime totalSaveTime += task.SaveTime totalRowsInserted += estimatedRows completedTasks++ fmt.Printf("✅ Worker 完成第 %d 页 (行 %d-%d, CLOTHING_ID > %s)\n", task.Page, task.StartRow, task.EndRow) fmt.Printf(" 查询耗时: %v, 保存耗时: %v\n", task.QueryTime, task.SaveTime) fmt.Printf(" 估算数据行数: %d\n", estimatedRows) } mu.Unlock() // 如果所有任务都完成了,发送完成信号 if completedTasks >= totalPages { doneChan <- true break } } }() // 生成任务并发送到任务通道 fmt.Println("\n📋 开始生成查询任务...") for page := 1; page <= totalPages; page++ { startRow := pageConfig.StartRow + (page-1)*pageConfig.PageSize + 1 endRow := pageConfig.StartRow + page*pageConfig.PageSize // 检查不超过要查询的总行数 maxEndRow := pageConfig.StartRow + pageConfig.TotalRows if endRow > maxEndRow { endRow = maxEndRow } // 生成查询SQL(使用参数模式) querySQL, queryParams := getSQLWithPagination(startRow, endRow) task := QueryTask{ Page: page, StartRow: startRow, EndRow: endRow, QuerySQL: querySQL, QueryParams: queryParams, } fmt.Printf("生成第 %d 页任务 (行 %d-%d), 使用CLOTHING_ID > '%s'\n", page, startRow, endRow) taskChan <- task } // 关闭任务通道,通知worker没有更多任务 close(taskChan) // 等待所有worker完成 wg.Wait() // 关闭结果通道 close(resultChan) // 等待结果处理完成 <-doneChan // 记录总结束时间 totalEndTime := time.Now() totalDuration := totalEndTime.Sub(totalStartTime) // 打印性能统计 fmt.Println("\n📊 性能统计:") fmt.Printf(" 完成页数: %d/%d\n", completedTasks, totalPages) fmt.Printf(" 总查询耗时: %v\n", totalQueryTime) fmt.Printf(" 总保存耗时: %v\n", totalSaveTime) fmt.Printf(" 总耗时: %v\n", totalDuration) fmt.Printf(" 估算插入总行数: %d\n", totalRowsInserted) if completedTasks > 0 { fmt.Printf(" 平均每页查询耗时: %v\n", totalQueryTime/time.Duration(completedTasks)) fmt.Printf(" 平均每页保存耗时: %v\n", totalSaveTime/time.Duration(completedTasks)) fmt.Printf(" 平均每秒处理行数: %.2f\n", float64(totalRowsInserted)/totalDuration.Seconds()) } fmt.Println("✅ 所有数据成功插入到 Doris!") } // getSQLWithPagination 生成带分页的SQL语句(参数模式) // 返回SQL语句和参数映射 func getSQLWithPagination(startRow, endRow int) (string, []interface{}) { sql := `SELECT CLOTHING_ID, CLOTHING_YEAR, CLOTHING_NAME, STYLECOLOR_ID, STYLE_ID, COLOR_ID, SIZE_ID, CREATE_DATE, STYLE_GROUP, J_PRICE, X_PRICE, V_PRICE, CLERK_ROYALTYRATE, CLERK_ROYALTYPRICE, BRAND_CODE, STYLEVER_ID, J_COST, CLOTHING_IMG, STYLE_UNIT_CODE, STYLE_SEX_CODE, STYLE_KIND_CODE, STYLE_CLASS_CODE, STYLE_SUBCLASS_CODE, STYLE_DESIGNER_CODE, STYLE_PLATER_CODE, STYLE_STYLES_CODE, STYLE_LOCATE_CODE, STYLE_SALETYPE_CODE, STYLE_COLORSYSTEM_CODE, STYLE_THEME_CODE, STYLE_INDENTTYPE_CODE, STYLE_PRICEBAND_CODE, STYLE_MONTH_CODE, STYLE_COMPOSITION_CODE, STYLE_SUPPLIER_CODE, STYLE_SPARE1_CODE, STYLE_SPARE2_CODE, STYLE_SPARE4_CODE, STYLE_SPARE5_CODE, CATEGORY_CODE, BRAND_ID, STYCOLVER_ID, STYLE_SAME, CLOTHING_BARCODE, CLOTHING_HELPID, CLOTHING_GBCODE, CLOTHING_RFID, STYLE_SUBJECT_ID, SIZEGRP_ID, STYLE_HELPID, CLOTHING_GBCODE1, COLOR_NAME, STYLEVER_NAME, SIZE_NAME, STYLE_UNIT, STYLE_SEX, STYLE_KIND, STYLE_CLASS, STYLE_SUBCLASS, STYLE_DESIGNER, STYLE_PLATER, STYLE_BAND, STYLE_STYLES, STYLE_LOCATE, STYLE_SALETYPE, STYLE_COLORSYSTEM, STYLE_THEME, STYLE_INDENTTYPE, STYLE_PRICEBAND, STYLE_MONTH, STYLE_COMPOSITION, STYLE_SUPPLIER, STYLE_SPARE1, STYLE_SPARE2, STYLE_SPARE3, STYLE_SPARE4, STYLE_SPARE5, CATEGORY_NAME, BRAND_NAME, STYLE_YEAR_NAME, STYLE_SEARCH_KEY, STYLE_SUBJECT_NAME, CLOTHING_REMARK, STYLE_SPARE3_CODE, COST, BRAND_GROUPCODE, CLASS_GROUPCODE, MONTH_GROUPCODE, RETURNSUBJECT_ID, PRODUCT_SORT, CLOTHING_PARTITION FROM ( SELECT a.*, ROWNUM as rn FROM ( SELECT * FROM X6_STOCK_DEV.A3_CLOTHING ORDER BY CLOTHING_ID ) a WHERE ROWNUM <= :1 ) WHERE rn > :2` // 创建参数映射 params := []interface{}{ //lastClothingID, endRow, startRow - 1, // WHERE rn > :start_row 所以是startRow-1 } return sql, params }