package main import ( "encoding/json" "fmt" "io" "net/http" "strings" "testing" "time" ) // Doris Stream Load响应结构 type DorisResponse struct { TxnID int `json:"TxnId"` Label string `json:"Label"` Status string `json:"Status"` Message string `json:"Message"` NumberTotalRows int `json:"NumberTotalRows"` NumberLoadedRows int `json:"NumberLoadedRows"` LoadBytes int `json:"LoadBytes"` ErrorURL string `json:"ErrorURL"` } func insertToDorisDirect(csvData string) error { url := "http://161.189.89.196:8040/api/X6_STOCK_DEV/A3_CLOTHING_LOG/_stream_load" username := "root" password := "mos8555" fmt.Printf("=== Doris Stream Load测试 ===\n") fmt.Printf("URL: %s\n", url) fmt.Printf("数据: %s\n", csvData) // 创建PUT请求 req, err := http.NewRequest("PUT", url, strings.NewReader(csvData)) if err != nil { return fmt.Errorf("创建请求失败: %v", err) } // 设置Basic Auth req.SetBasicAuth(username, password) // 设置headers - 与curl命令保持一致 req.Header.Set("Expect", "100-continue") req.Header.Set("column_separator", ",") req.Header.Set("enclose", "\"") // 设置唯一的label用于追踪 label := fmt.Sprintf("test_%d", time.Now().UnixNano()) req.Header.Set("label", label) // 创建HTTP客户端,允许自动重定向 client := &http.Client{ Timeout: 30 * time.Second, } // 发送请求 fmt.Println("发送请求到Doris...") resp, err := client.Do(req) if err != nil { return fmt.Errorf("请求失败: %v", err) } defer resp.Body.Close() // 处理响应 body, err := io.ReadAll(resp.Body) if err != nil { return fmt.Errorf("读取响应失败: %v", err) } responseBody := string(body) fmt.Printf("HTTP状态码: %d\n", resp.StatusCode) fmt.Printf("响应内容: %s\n", responseBody) // 解析JSON响应 var dorisResp DorisResponse if err := json.Unmarshal(body, &dorisResp); err != nil { return fmt.Errorf("解析响应失败: %v, 原始响应: %s", err, responseBody) } // 检查处理状态 - 修正状态检查逻辑 if dorisResp.Status == "FAIL" { return fmt.Errorf("Stream Load失败: %s (TxnID: %d)", dorisResp.Message, dorisResp.TxnID) } // 修正:Doris返回的状态可能是 "Success" 而不是 "SUCCESS" if strings.ToUpper(dorisResp.Status) != "SUCCESS" { return fmt.Errorf("未知状态: %s, 消息: %s", dorisResp.Status, dorisResp.Message) } fmt.Printf("✅ Stream Load成功! \n") fmt.Printf(" TxnID: %d\n", dorisResp.TxnID) fmt.Printf(" Label: %s\n", dorisResp.Label) fmt.Printf(" 加载行数: %d/%d\n", dorisResp.NumberLoadedRows, dorisResp.NumberTotalRows) fmt.Printf(" 数据大小: %d bytes\n", dorisResp.LoadBytes) return nil } // 测试函数 func TestDorisStreamLoad(t *testing.T) { fmt.Println("=== 测试Doris Stream Load ===") csvData := "EWE322Y20600491-03,2021,H-大部分" err := insertToDorisDirect(csvData) if err != nil { t.Fatalf("Stream Load失败: %v", err) } fmt.Println("✅ 数据成功插入Doris!") }