Açıklama Yok
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

doris_factory.go 2.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package doris
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "git.x2erp.com/qdy/go-base/config"
  8. "git.x2erp.com/qdy/go-db/factory/http"
  9. "github.com/go-resty/resty/v2"
  10. )
  11. // DorisFactory Doris HTTP客户端工厂
  12. type DorisFactory struct {
  13. client *resty.Client // 线程安全
  14. FEHost string // 只读
  15. FEPort int // 只读
  16. Username string // 只读
  17. Password string // 只读
  18. Timeout time.Duration // 只读
  19. isClosed bool // 需要原子操作或锁保护
  20. closedMu sync.RWMutex // 只保护 isClosed 字段
  21. }
  22. var (
  23. instanceDoris *DorisFactory
  24. instanceDorisOnce sync.Once
  25. )
  26. // GetDorisFactory 获取Doris工厂单例
  27. func GetDorisFactory(httpFactory *http.HTTPFactory) (*DorisFactory, error) {
  28. var initErr error
  29. instanceDorisOnce.Do(func() {
  30. cfg := config.GetConfig()
  31. if !cfg.IsDorisConfigured() {
  32. initErr = fmt.Errorf("doris configuration is incomplete")
  33. return
  34. }
  35. dorisConfig := cfg.GetDorisConfig()
  36. instanceDoris = &DorisFactory{
  37. client: httpFactory.GetUnderlyingClient(),
  38. FEHost: dorisConfig.FEHost,
  39. FEPort: dorisConfig.FEPort,
  40. Username: dorisConfig.FEUsername,
  41. Password: dorisConfig.FEPassword,
  42. Timeout: time.Duration(dorisConfig.StreamLoadTimeout) * time.Second,
  43. isClosed: false,
  44. }
  45. })
  46. if initErr != nil {
  47. return nil, initErr
  48. }
  49. return instanceDoris, nil
  50. }
  51. // InsertCSV 插入CSV数据到Doris
  52. func (f *DorisFactory) InsertCSV(database, table, csvData string, skipHeader bool) error {
  53. // 检查关闭状态
  54. if f.IsClosed() {
  55. return fmt.Errorf("doris client is closed")
  56. }
  57. url := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", f.FEHost, f.FEPort, database, table)
  58. fmt.Print(url)
  59. // 使用 resty 的 API
  60. resp, err := f.client.R().
  61. SetBasicAuth(f.Username, f.Password).
  62. SetHeader("column_separator", ","). // 必需
  63. SetHeader("enclose", "\"").
  64. SetHeader("Expect", "100-continue").
  65. SetBody(csvData).
  66. Put(url)
  67. if skipHeader {
  68. resp.Request.SetHeader("skip_header", "1")
  69. }
  70. if err != nil {
  71. return fmt.Errorf("请求失败: %v", err)
  72. }
  73. if resp.StatusCode() != 200 {
  74. return fmt.Errorf("插入失败: %s", string(resp.Body()))
  75. }
  76. // 解析Stream Load响应
  77. var result struct {
  78. Status string `json:"Status"`
  79. Message string `json:"Message"`
  80. }
  81. if err := json.Unmarshal(resp.Body(), &result); err != nil {
  82. return fmt.Errorf("解析响应失败: %v", err)
  83. }
  84. if result.Status != "Success" {
  85. return fmt.Errorf("插入失败: %s", result.Message)
  86. }
  87. return nil
  88. }
  89. // Close 关闭Doris客户端
  90. func (f *DorisFactory) Close() error {
  91. f.closedMu.Lock()
  92. defer f.closedMu.Unlock()
  93. if f.isClosed {
  94. return nil
  95. }
  96. f.isClosed = true
  97. return nil
  98. }
  99. // IsClosed 检查客户端是否已关闭
  100. func (f *DorisFactory) IsClosed() bool {
  101. f.closedMu.RLock()
  102. defer f.closedMu.RUnlock()
  103. return f.isClosed
  104. }