| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- package factory
-
- import (
- "encoding/json"
- "fmt"
- "sync"
- "time"
-
- "git.x2erp.com/qdy/go-base/config"
- "github.com/go-resty/resty/v2"
- )
-
- // DorisFactory Doris HTTP客户端工厂
- type DorisFactory struct {
- client *resty.Client // 线程安全
- FEHost string // 只读
- FEPort int // 只读
- Username string // 只读
- Password string // 只读
- Timeout time.Duration // 只读
-
- isClosed bool // 需要原子操作或锁保护
- closedMu sync.RWMutex // 只保护 isClosed 字段
- }
-
- var (
- instance *DorisFactory
- instanceOnce sync.Once
- )
-
- // GetDorisFactory 获取Doris工厂单例
- func GetDorisFactory(httpFactory *HTTPFactory) (*DorisFactory, error) {
- var initErr error
- instanceOnce.Do(func() {
- cfg := config.GetConfig()
-
- if err := config.GetInitError(); err != nil {
- initErr = fmt.Errorf("failed to load config: %v", err)
- return
- }
-
- if !cfg.IsDorisConfigured() {
- initErr = fmt.Errorf("doris configuration is incomplete")
- return
- }
-
- dorisConfig := cfg.GetDoris()
- instance = &DorisFactory{
- client: httpFactory.CreateClient().client,
- FEHost: dorisConfig.FEHost,
- FEPort: dorisConfig.FEPort,
- Username: dorisConfig.FEUsername,
- Password: dorisConfig.FEPassword,
- Timeout: time.Duration(dorisConfig.StreamLoadTimeout) * time.Second,
- isClosed: false,
- }
- })
-
- if initErr != nil {
- return nil, initErr
- }
-
- return instance, nil
- }
-
- // InsertCSV 插入CSV数据到Doris
- func (f *DorisFactory) InsertCSV(database, table, csvData string, skipHeader bool) error {
- // 检查关闭状态
- if f.IsClosed() {
- return fmt.Errorf("doris client is closed")
- }
-
- url := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", f.FEHost, f.FEPort, database, table)
-
- fmt.Print(url)
-
- // 使用 resty 的 API
- resp, err := f.client.R().
- SetBasicAuth(f.Username, f.Password).
- SetHeader("column_separator", ","). // 必需
- SetHeader("enclose", "\"").
- SetHeader("Expect", "100-continue").
- SetBody(csvData).
- Put(url)
-
- if skipHeader {
- resp.Request.SetHeader("skip_header", "1")
- }
-
- if err != nil {
- return fmt.Errorf("请求失败: %v", err)
- }
-
- if resp.StatusCode() != 200 {
- return fmt.Errorf("插入失败: %s", string(resp.Body()))
- }
-
- // 解析Stream Load响应
- var result struct {
- Status string `json:"Status"`
- Message string `json:"Message"`
- }
- if err := json.Unmarshal(resp.Body(), &result); err != nil {
- return fmt.Errorf("解析响应失败: %v", err)
- }
-
- if result.Status != "Success" {
- return fmt.Errorf("插入失败: %s", result.Message)
- }
-
- return nil
- }
-
- // Close 关闭Doris客户端
- func (f *DorisFactory) Close() error {
- f.closedMu.Lock()
- defer f.closedMu.Unlock()
-
- if f.isClosed {
- return nil
- }
-
- f.isClosed = true
- return nil
- }
-
- // IsClosed 检查客户端是否已关闭
- func (f *DorisFactory) IsClosed() bool {
- f.closedMu.RLock()
- defer f.closedMu.RUnlock()
- return f.isClosed
- }
|