暂无描述
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "strings"
  8. "testing"
  9. "time"
  10. )
  11. // Doris Stream Load响应结构
  12. type DorisResponse struct {
  13. TxnID int `json:"TxnId"`
  14. Label string `json:"Label"`
  15. Status string `json:"Status"`
  16. Message string `json:"Message"`
  17. NumberTotalRows int `json:"NumberTotalRows"`
  18. NumberLoadedRows int `json:"NumberLoadedRows"`
  19. LoadBytes int `json:"LoadBytes"`
  20. ErrorURL string `json:"ErrorURL"`
  21. }
  22. func insertToDorisDirect(csvData string) error {
  23. url := "http://161.189.89.196:8040/api/X6_STOCK_DEV/A3_CLOTHING_LOG/_stream_load"
  24. username := "root"
  25. password := "mos8555"
  26. fmt.Printf("=== Doris Stream Load测试 ===\n")
  27. fmt.Printf("URL: %s\n", url)
  28. fmt.Printf("数据: %s\n", csvData)
  29. // 创建PUT请求
  30. req, err := http.NewRequest("PUT", url, strings.NewReader(csvData))
  31. if err != nil {
  32. return fmt.Errorf("创建请求失败: %v", err)
  33. }
  34. // 设置Basic Auth
  35. req.SetBasicAuth(username, password)
  36. // 设置headers - 与curl命令保持一致
  37. req.Header.Set("Expect", "100-continue")
  38. req.Header.Set("column_separator", ",")
  39. req.Header.Set("enclose", "\"")
  40. // 设置唯一的label用于追踪
  41. label := fmt.Sprintf("test_%d", time.Now().UnixNano())
  42. req.Header.Set("label", label)
  43. // 创建HTTP客户端,允许自动重定向
  44. client := &http.Client{
  45. Timeout: 30 * time.Second,
  46. }
  47. // 发送请求
  48. fmt.Println("发送请求到Doris...")
  49. resp, err := client.Do(req)
  50. if err != nil {
  51. return fmt.Errorf("请求失败: %v", err)
  52. }
  53. defer resp.Body.Close()
  54. // 处理响应
  55. body, err := io.ReadAll(resp.Body)
  56. if err != nil {
  57. return fmt.Errorf("读取响应失败: %v", err)
  58. }
  59. responseBody := string(body)
  60. fmt.Printf("HTTP状态码: %d\n", resp.StatusCode)
  61. fmt.Printf("响应内容: %s\n", responseBody)
  62. // 解析JSON响应
  63. var dorisResp DorisResponse
  64. if err := json.Unmarshal(body, &dorisResp); err != nil {
  65. return fmt.Errorf("解析响应失败: %v, 原始响应: %s", err, responseBody)
  66. }
  67. // 检查处理状态 - 修正状态检查逻辑
  68. if dorisResp.Status == "FAIL" {
  69. return fmt.Errorf("Stream Load失败: %s (TxnID: %d)", dorisResp.Message, dorisResp.TxnID)
  70. }
  71. // 修正:Doris返回的状态可能是 "Success" 而不是 "SUCCESS"
  72. if strings.ToUpper(dorisResp.Status) != "SUCCESS" {
  73. return fmt.Errorf("未知状态: %s, 消息: %s", dorisResp.Status, dorisResp.Message)
  74. }
  75. fmt.Printf("✅ Stream Load成功! \n")
  76. fmt.Printf(" TxnID: %d\n", dorisResp.TxnID)
  77. fmt.Printf(" Label: %s\n", dorisResp.Label)
  78. fmt.Printf(" 加载行数: %d/%d\n", dorisResp.NumberLoadedRows, dorisResp.NumberTotalRows)
  79. fmt.Printf(" 数据大小: %d bytes\n", dorisResp.LoadBytes)
  80. return nil
  81. }
  82. // 测试函数
  83. func TestDorisStreamLoad(t *testing.T) {
  84. fmt.Println("=== 测试Doris Stream Load ===")
  85. csvData := "EWE322Y20600491-03,2021,H-大部分"
  86. err := insertToDorisDirect(csvData)
  87. if err != nil {
  88. t.Fatalf("Stream Load失败: %v", err)
  89. }
  90. fmt.Println("✅ 数据成功插入Doris!")
  91. }