Sin descripción
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

doris_factory.go 2.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package factory
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "strings"
  8. "time"
  9. "git.x2erp.com/qdy/go-base/config"
  10. )
  11. // DorisFactory Doris HTTP客户端工厂
  12. type DorisFactory struct {
  13. config config.IConfig
  14. httpClient *http.Client
  15. }
  16. // NewDorisFactory 创建Doris HTTP客户端工厂
  17. func NewDorisFactory(httpFactory *HTTPFactory) (*DorisFactory, error) {
  18. cfg := config.GetConfig()
  19. if err := config.GetInitError(); err != nil {
  20. return nil, fmt.Errorf("failed to load config: %v", err)
  21. }
  22. if !cfg.IsDorisConfigured() {
  23. return nil, fmt.Errorf("doris configuration is incomplete")
  24. }
  25. return &DorisFactory{
  26. config: cfg,
  27. httpClient: httpFactory.CreateHTTPClient(),
  28. }, nil
  29. }
  30. // CreateDorisClient 创建Doris HTTP客户端
  31. func (f *DorisFactory) CreateDorisClient() *DorisClient {
  32. dorisConfig := f.config.GetDoris()
  33. return &DorisClient{
  34. httpClient: f.httpClient,
  35. FEHost: dorisConfig.FEHost,
  36. FEPort: dorisConfig.FEPort,
  37. Username: dorisConfig.FEUsername,
  38. Password: dorisConfig.FEPassword,
  39. Timeout: time.Duration(dorisConfig.StreamLoadTimeout) * time.Second,
  40. }
  41. }
  42. // DorisClient Doris HTTP客户端
  43. type DorisClient struct {
  44. httpClient *http.Client
  45. FEHost string
  46. FEPort int
  47. Username string
  48. Password string
  49. Timeout time.Duration
  50. }
  51. // InsertCSV 插入CSV数据到Doris 只认数据次序
  52. func (c *DorisClient) DorisInsertCSV(database, table, csvData string, skipHeader bool) error {
  53. url := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", c.FEHost, c.FEPort, database, table)
  54. req, err := http.NewRequest("PUT", url, strings.NewReader(csvData))
  55. if err != nil {
  56. return fmt.Errorf("创建请求失败: %v", err)
  57. }
  58. req.SetBasicAuth(c.Username, c.Password)
  59. req.Header.Set("Content-Type", "text/plain")
  60. req.Header.Set("format", "csv")
  61. req.Header.Set("column_separator", ",")
  62. if skipHeader {
  63. req.Header.Set("skip_header", "1")
  64. }
  65. resp, err := c.httpClient.Do(req)
  66. if err != nil {
  67. return fmt.Errorf("请求失败: %v", err)
  68. }
  69. defer resp.Body.Close()
  70. body, _ := io.ReadAll(resp.Body)
  71. if resp.StatusCode != 200 {
  72. return fmt.Errorf("插入失败: %s", string(body))
  73. }
  74. // 解析Stream Load响应
  75. var result struct {
  76. Status string `json:"Status"`
  77. Message string `json:"Message"`
  78. }
  79. if err := json.Unmarshal(body, &result); err != nil {
  80. return fmt.Errorf("解析响应失败: %v", err)
  81. }
  82. if result.Status != "Success" {
  83. return fmt.Errorf("插入失败: %s", result.Message)
  84. }
  85. return nil
  86. }