Aucune description
Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.

doris_factory.go 2.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package factory
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "git.x2erp.com/qdy/go-base/config"
  8. "github.com/go-resty/resty/v2"
  9. )
  10. // DorisFactory Doris HTTP客户端工厂
  11. type DorisFactory struct {
  12. client *resty.Client // 线程安全
  13. FEHost string // 只读
  14. FEPort int // 只读
  15. Username string // 只读
  16. Password string // 只读
  17. Timeout time.Duration // 只读
  18. isClosed bool // 需要原子操作或锁保护
  19. closedMu sync.RWMutex // 只保护 isClosed 字段
  20. }
  21. var (
  22. instance *DorisFactory
  23. instanceOnce sync.Once
  24. )
  25. // GetDorisFactory 获取Doris工厂单例
  26. func GetDorisFactory(httpFactory *HTTPFactory) (*DorisFactory, error) {
  27. var initErr error
  28. instanceOnce.Do(func() {
  29. cfg := config.GetConfig()
  30. if err := config.GetInitError(); err != nil {
  31. initErr = fmt.Errorf("failed to load config: %v", err)
  32. return
  33. }
  34. if !cfg.IsDorisConfigured() {
  35. initErr = fmt.Errorf("doris configuration is incomplete")
  36. return
  37. }
  38. dorisConfig := cfg.GetDoris()
  39. instance = &DorisFactory{
  40. client: httpFactory.CreateClient().client,
  41. FEHost: dorisConfig.FEHost,
  42. FEPort: dorisConfig.FEPort,
  43. Username: dorisConfig.FEUsername,
  44. Password: dorisConfig.FEPassword,
  45. Timeout: time.Duration(dorisConfig.StreamLoadTimeout) * time.Second,
  46. isClosed: false,
  47. }
  48. })
  49. if initErr != nil {
  50. return nil, initErr
  51. }
  52. return instance, nil
  53. }
  54. // InsertCSV 插入CSV数据到Doris
  55. func (f *DorisFactory) InsertCSV(database, table, csvData string, skipHeader bool) error {
  56. // 检查关闭状态
  57. if f.IsClosed() {
  58. return fmt.Errorf("doris client is closed")
  59. }
  60. url := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", f.FEHost, f.FEPort, database, table)
  61. fmt.Print(url)
  62. // 使用 resty 的 API
  63. resp, err := f.client.R().
  64. SetBasicAuth(f.Username, f.Password).
  65. SetHeader("column_separator", ","). // 必需
  66. SetHeader("enclose", "\"").
  67. SetBody(csvData).
  68. Put(url)
  69. if skipHeader {
  70. resp.Request.SetHeader("skip_header", "1")
  71. }
  72. if err != nil {
  73. return fmt.Errorf("请求失败: %v", err)
  74. }
  75. if resp.StatusCode() != 200 {
  76. return fmt.Errorf("插入失败: %s", string(resp.Body()))
  77. }
  78. // 解析Stream Load响应
  79. var result struct {
  80. Status string `json:"Status"`
  81. Message string `json:"Message"`
  82. }
  83. if err := json.Unmarshal(resp.Body(), &result); err != nil {
  84. return fmt.Errorf("解析响应失败: %v", err)
  85. }
  86. if result.Status != "Success" {
  87. return fmt.Errorf("插入失败: %s", result.Message)
  88. }
  89. return nil
  90. }
  91. // Close 关闭Doris客户端
  92. func (f *DorisFactory) Close() error {
  93. f.closedMu.Lock()
  94. defer f.closedMu.Unlock()
  95. if f.isClosed {
  96. return nil
  97. }
  98. f.isClosed = true
  99. return nil
  100. }
  101. // IsClosed 检查客户端是否已关闭
  102. func (f *DorisFactory) IsClosed() bool {
  103. f.closedMu.RLock()
  104. defer f.closedMu.RUnlock()
  105. return f.isClosed
  106. }