Ingen beskrivning
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

doris_factory.go 2.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package doris
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "git.x2erp.com/qdy/go-base/config"
  8. "git.x2erp.com/qdy/go-db/factory/http"
  9. "github.com/go-resty/resty/v2"
  10. )
  11. // DorisFactory Doris HTTP客户端工厂
  12. type DorisFactory struct {
  13. client *resty.Client // 线程安全
  14. FEHost string // 只读
  15. FEPort int // 只读
  16. Username string // 只读
  17. Password string // 只读
  18. Timeout time.Duration // 只读
  19. isClosed bool // 需要原子操作或锁保护
  20. closedMu sync.RWMutex // 只保护 isClosed 字段
  21. }
  22. var (
  23. instance *DorisFactory
  24. instanceOnce sync.Once
  25. )
  26. // GetDorisFactory 获取Doris工厂单例
  27. func GetDorisFactory(httpFactory *http.HTTPFactory) (*DorisFactory, error) {
  28. var initErr error
  29. instanceOnce.Do(func() {
  30. cfg := config.GetConfig()
  31. if err := config.GetInitError(); err != nil {
  32. initErr = fmt.Errorf("failed to load config: %v", err)
  33. return
  34. }
  35. if !cfg.IsDorisConfigured() {
  36. initErr = fmt.Errorf("doris configuration is incomplete")
  37. return
  38. }
  39. dorisConfig := cfg.GetDoris()
  40. instance = &DorisFactory{
  41. client: httpFactory.GetUnderlyingClient(),
  42. FEHost: dorisConfig.FEHost,
  43. FEPort: dorisConfig.FEPort,
  44. Username: dorisConfig.FEUsername,
  45. Password: dorisConfig.FEPassword,
  46. Timeout: time.Duration(dorisConfig.StreamLoadTimeout) * time.Second,
  47. isClosed: false,
  48. }
  49. })
  50. if initErr != nil {
  51. return nil, initErr
  52. }
  53. return instance, nil
  54. }
  55. // InsertCSV 插入CSV数据到Doris
  56. func (f *DorisFactory) InsertCSV(database, table, csvData string, skipHeader bool) error {
  57. // 检查关闭状态
  58. if f.IsClosed() {
  59. return fmt.Errorf("doris client is closed")
  60. }
  61. url := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", f.FEHost, f.FEPort, database, table)
  62. fmt.Print(url)
  63. // 使用 resty 的 API
  64. resp, err := f.client.R().
  65. SetBasicAuth(f.Username, f.Password).
  66. SetHeader("column_separator", ","). // 必需
  67. SetHeader("enclose", "\"").
  68. SetHeader("Expect", "100-continue").
  69. SetBody(csvData).
  70. Put(url)
  71. if skipHeader {
  72. resp.Request.SetHeader("skip_header", "1")
  73. }
  74. if err != nil {
  75. return fmt.Errorf("请求失败: %v", err)
  76. }
  77. if resp.StatusCode() != 200 {
  78. return fmt.Errorf("插入失败: %s", string(resp.Body()))
  79. }
  80. // 解析Stream Load响应
  81. var result struct {
  82. Status string `json:"Status"`
  83. Message string `json:"Message"`
  84. }
  85. if err := json.Unmarshal(resp.Body(), &result); err != nil {
  86. return fmt.Errorf("解析响应失败: %v", err)
  87. }
  88. if result.Status != "Success" {
  89. return fmt.Errorf("插入失败: %s", result.Message)
  90. }
  91. return nil
  92. }
  93. // Close 关闭Doris客户端
  94. func (f *DorisFactory) Close() error {
  95. f.closedMu.Lock()
  96. defer f.closedMu.Unlock()
  97. if f.isClosed {
  98. return nil
  99. }
  100. f.isClosed = true
  101. return nil
  102. }
  103. // IsClosed 检查客户端是否已关闭
  104. func (f *DorisFactory) IsClosed() bool {
  105. f.closedMu.RLock()
  106. defer f.closedMu.RUnlock()
  107. return f.isClosed
  108. }