| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- package main
-
- import (
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "strings"
- "testing"
- "time"
- )
-
- // Doris Stream Load响应结构
- type DorisResponse struct {
- TxnID int `json:"TxnId"`
- Label string `json:"Label"`
- Status string `json:"Status"`
- Message string `json:"Message"`
- NumberTotalRows int `json:"NumberTotalRows"`
- NumberLoadedRows int `json:"NumberLoadedRows"`
- LoadBytes int `json:"LoadBytes"`
- ErrorURL string `json:"ErrorURL"`
- }
-
- func insertToDorisDirect(csvData string) error {
- url := "http://161.189.89.196:8040/api/X6_STOCK_DEV/A3_CLOTHING_LOG/_stream_load"
- username := "root"
- password := "mos8555"
-
- fmt.Printf("=== Doris Stream Load测试 ===\n")
- fmt.Printf("URL: %s\n", url)
- fmt.Printf("数据: %s\n", csvData)
-
- // 创建PUT请求
- req, err := http.NewRequest("PUT", url, strings.NewReader(csvData))
- if err != nil {
- return fmt.Errorf("创建请求失败: %v", err)
- }
-
- // 设置Basic Auth
- req.SetBasicAuth(username, password)
-
- // 设置headers - 与curl命令保持一致
- req.Header.Set("Expect", "100-continue")
- req.Header.Set("column_separator", ",")
- req.Header.Set("enclose", "\"")
-
- // 设置唯一的label用于追踪
- label := fmt.Sprintf("test_%d", time.Now().UnixNano())
- req.Header.Set("label", label)
-
- // 创建HTTP客户端,允许自动重定向
- client := &http.Client{
- Timeout: 30 * time.Second,
- }
-
- // 发送请求
- fmt.Println("发送请求到Doris...")
- resp, err := client.Do(req)
- if err != nil {
- return fmt.Errorf("请求失败: %v", err)
- }
- defer resp.Body.Close()
-
- // 处理响应
- body, err := io.ReadAll(resp.Body)
- if err != nil {
- return fmt.Errorf("读取响应失败: %v", err)
- }
-
- responseBody := string(body)
- fmt.Printf("HTTP状态码: %d\n", resp.StatusCode)
- fmt.Printf("响应内容: %s\n", responseBody)
-
- // 解析JSON响应
- var dorisResp DorisResponse
- if err := json.Unmarshal(body, &dorisResp); err != nil {
- return fmt.Errorf("解析响应失败: %v, 原始响应: %s", err, responseBody)
- }
-
- // 检查处理状态 - 修正状态检查逻辑
- if dorisResp.Status == "FAIL" {
- return fmt.Errorf("Stream Load失败: %s (TxnID: %d)", dorisResp.Message, dorisResp.TxnID)
- }
-
- // 修正:Doris返回的状态可能是 "Success" 而不是 "SUCCESS"
- if strings.ToUpper(dorisResp.Status) != "SUCCESS" {
- return fmt.Errorf("未知状态: %s, 消息: %s", dorisResp.Status, dorisResp.Message)
- }
-
- fmt.Printf("✅ Stream Load成功! \n")
- fmt.Printf(" TxnID: %d\n", dorisResp.TxnID)
- fmt.Printf(" Label: %s\n", dorisResp.Label)
- fmt.Printf(" 加载行数: %d/%d\n", dorisResp.NumberLoadedRows, dorisResp.NumberTotalRows)
- fmt.Printf(" 数据大小: %d bytes\n", dorisResp.LoadBytes)
-
- return nil
- }
-
- // 测试函数
- func TestDorisStreamLoad(t *testing.T) {
- fmt.Println("=== 测试Doris Stream Load ===")
-
- csvData := "EWE322Y20600491-03,2021,H-大部分"
-
- err := insertToDorisDirect(csvData)
- if err != nil {
- t.Fatalf("Stream Load失败: %v", err)
- }
-
- fmt.Println("✅ 数据成功插入Doris!")
- }
|