Açıklama Yok
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

agent_to_doris.go 4.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package service
  2. import (
  3. "fmt"
  4. "strings"
  5. "time"
  6. "git.x2erp.com/qdy/go-base/logger"
  7. "git.x2erp.com/qdy/go-base/types"
  8. "git.x2erp.com/qdy/go-db/factory/doris"
  9. "git.x2erp.com/qdy/go-db/factory/http"
  10. )
  11. // queryToCSVAndInsert 处理单次查询并将结果插入到Doris
  12. func queryToCSVAndInsert(queryRequest types.QueryRequest) *types.QueryResult[interface{}] {
  13. // 记录查询开始时间
  14. queryStartTime := time.Now()
  15. // 1. 获取HTTP工厂实例
  16. httpFactory, err := http.GetHTTPFactory()
  17. if err != nil {
  18. logger.Errorf("Failed to get HTTP factory: %v", err)
  19. return &types.QueryResult[interface{}]{
  20. Success: false,
  21. Error: fmt.Sprintf("failed to get HTTP factory: %v", err),
  22. }
  23. }
  24. logger.Debug("HTTP factory created successfully")
  25. // 2. 获取Doris工厂实例
  26. dorisFactory, err := doris.GetDorisFactory(httpFactory)
  27. if err != nil {
  28. logger.Errorf("Failed to get Doris factory: %v", err)
  29. return &types.QueryResult[interface{}]{
  30. Success: false,
  31. Error: fmt.Sprintf("failed to get Doris factory: %v", err),
  32. }
  33. }
  34. logger.Debug("Doris factory created successfully")
  35. // 3. 创建HTTP客户端
  36. httpClient := httpFactory.CreateClient()
  37. agentQueryRequest := types.QueryRequest{
  38. SQL: queryRequest.SQL,
  39. Params: queryRequest.Params,
  40. PositionalParams: queryRequest.PositionalParams,
  41. WriterHeader: queryRequest.WriterHeader,
  42. }
  43. // 4. 发送POST请求到 /api/query/csv 获取CSV格式数据
  44. resp, err := httpClient.PostWithAuth(
  45. queryRequest.AgentUrl,
  46. agentQueryRequest,
  47. queryRequest.AgentToken,
  48. nil,
  49. )
  50. queryEndTime := time.Now()
  51. queryDuration := queryEndTime.Sub(queryStartTime)
  52. if err != nil {
  53. logger.Errorf("Query request failed: %v", err)
  54. return &types.QueryResult[interface{}]{
  55. Success: false,
  56. Error: fmt.Sprintf("查询失败: %v", err),
  57. QueryTime: queryDuration,
  58. }
  59. }
  60. if resp.StatusCode() != 200 {
  61. logger.Errorf("Query request failed with status code: %d", resp.StatusCode())
  62. return &types.QueryResult[interface{}]{
  63. Success: false,
  64. Error: fmt.Sprintf("查询请求失败, 状态码: %d", resp.StatusCode()),
  65. QueryTime: queryDuration,
  66. }
  67. }
  68. // 5. 获取CSV数据
  69. csvData := string(resp.Body())
  70. if len(csvData) == 0 {
  71. logger.Warn("No data queried")
  72. return &types.QueryResult[interface{}]{
  73. Success: true,
  74. Error: "没有查询到数据",
  75. QueryTime: queryDuration,
  76. }
  77. }
  78. // 估算数据行数(CSV行数)
  79. var totalRows int
  80. if queryRequest.WriterHeader {
  81. // 如果有表头,需要减1
  82. lines := strings.Count(csvData, "\n")
  83. if lines > 0 {
  84. totalRows = lines - 1
  85. }
  86. } else {
  87. totalRows = strings.Count(csvData, "\n")
  88. }
  89. logger.Debug("Query successful, retrieved %d rows of data", totalRows)
  90. // 6. 插入数据到Doris
  91. database := queryRequest.DorisDatabase
  92. table := queryRequest.DorisTable
  93. skipHeader := !queryRequest.WriterHeader // 如果包含表头,则跳过
  94. saveStartTime := time.Now()
  95. err = dorisFactory.InsertCSV(database, table, csvData, skipHeader)
  96. saveEndTime := time.Now()
  97. saveDuration := saveEndTime.Sub(saveStartTime)
  98. if err != nil {
  99. logger.Errorf("Failed to insert data into Doris: %v", err)
  100. return &types.QueryResult[interface{}]{
  101. Success: false,
  102. Error: fmt.Sprintf("数据插入Doris失败: %v", err),
  103. QueryTime: queryDuration,
  104. SaveTime: saveDuration,
  105. Count: totalRows,
  106. }
  107. }
  108. logger.Debug("Data successfully inserted into Doris: database=%s, table=%s, rows=%d",
  109. database, table, totalRows)
  110. return &types.QueryResult[interface{}]{
  111. Success: true,
  112. QueryTime: queryDuration,
  113. SaveTime: saveDuration,
  114. Count: totalRows,
  115. }
  116. }
  117. // ServiceHandler 服务处理器(供router使用)
  118. func ServiceAgentToDoris(queryRequest types.QueryRequest) *types.QueryResult[interface{}] {
  119. // 添加调试日志,确认 queryRequest 是否有数据
  120. logger.Debug("Processing query request: SQL=%s, AgentUrl=%s, Params count=%d",
  121. queryRequest.SQL, queryRequest.AgentUrl, len(queryRequest.PositionalParams))
  122. // 执行查询并插入
  123. return queryToCSVAndInsert(queryRequest)
  124. }