Ver código fonte

修复执行sql代码bugp

qdy 3 meses atrás
pai
commit
1d4a999786
11 arquivos alterados com 997 adições e 273 exclusões
  1. 0
    16
      functions/query.go
  2. 40
    10
      functions/query_csv.go
  3. 33
    0
      functions/query_json.go
  4. 3
    3
      go.mod
  5. 8
    8
      go.sum
  6. 1
    0
      main.go
  7. 328
    0
      test/my0_test.go
  8. 418
    0
      test/my_ora_clothingToDoris_test.go
  9. 0
    236
      test/my_test.go
  10. 69
    0
      test/mycsv_test.go
  11. 97
    0
      test/mysql_test.go

+ 0
- 16
functions/query.go Ver arquivo

@@ -1,16 +0,0 @@
1
-package functions
2
-
3
-import (
4
-	"github.com/gin-gonic/gin"
5
-
6
-	"git.x2erp.com/qdy/go-base/types"
7
-	"git.x2erp.com/qdy/go-db/factory"
8
-)
9
-
10
-// 执行查询,返回JSON格式数据
11
-func QueryToJSON(dbFactory *factory.DBFactory) func(c *gin.Context, req types.QueryRequest) {
12
-	return func(c *gin.Context, req types.QueryRequest) {
13
-		result := dbFactory.QueryToJSON(req.SQL)
14
-		c.JSON(200, result)
15
-	}
16
-}

+ 40
- 10
functions/query_csv.go Ver arquivo

@@ -8,23 +8,53 @@ import (
8 8
 	"git.x2erp.com/qdy/go-db/factory"
9 9
 )
10 10
 
11
-// 创建带 db 的 handler
11
+// 统一的错误响应处理
12
+func handleErrorResponseCSV(c *gin.Context, err error) {
13
+	errorCSV := mycsv.CreateStringToCSV(err.Error())
14
+	c.Header("Content-Type", "text/csv; charset=utf-8")
15
+	c.Header("X-Error", "true")
16
+	c.Data(400, "text/csv; charset=utf-8", errorCSV)
17
+}
18
+
19
+// 统一的成功响应处理
20
+func handleSuccessResponseCSV(c *gin.Context, csvData []byte) {
21
+	c.Header("Content-Type", "text/csv; charset=utf-8")
22
+	c.Data(200, "text/csv; charset=utf-8", csvData)
23
+}
24
+
25
+// 执行查询,返回CSV数据格式。无参数查询
12 26
 func QueryToCSV(dbFactory *factory.DBFactory) func(c *gin.Context, req types.QueryRequest) {
13 27
 	return func(c *gin.Context, req types.QueryRequest) {
28
+		csvData, err := dbFactory.QueryToCSV(req.SQL, req.WriterHeader)
29
+		if err != nil {
30
+			handleErrorResponseCSV(c, err)
31
+			return
32
+		}
33
+		handleSuccessResponseCSV(c, csvData)
34
+	}
35
+}
14 36
 
15
-		csvData, err := dbFactory.QueryToCSV(req.SQL)
37
+// 执行查询,返回CSV数据格式。带参数名称进行查询
38
+func QueryParamNameToCSV(dbFactory *factory.DBFactory) func(c *gin.Context, req types.QueryRequest) {
39
+	return func(c *gin.Context, req types.QueryRequest) {
16 40
 
41
+		csvData, err := dbFactory.QueryParamsNameToCSV(req.SQL, req.WriterHeader, req.Params)
17 42
 		if err != nil {
18
-			// 错误时也返回CSV格式,而不是JSON
19
-			errorCSV := mycsv.CreateStringToCSV(err.Error())
20
-			c.Header("Content-Type", "text/csv; charset=utf-8")
21
-			c.Header("X-Error", "true") // 可选:通过header标记这是错误响应
22
-			c.Data(400, "text/csv; charset=utf-8", errorCSV)
43
+			handleErrorResponseCSV(c, err)
23 44
 			return
24 45
 		}
46
+		handleSuccessResponseCSV(c, csvData)
47
+	}
48
+}
25 49
 
26
-		// 成功返回CSV数据
27
-		c.Header("Content-Type", "text/csv; charset=utf-8")
28
-		c.Data(200, "text/csv; charset=utf-8", csvData)
50
+// 执行查询,返回CSV数据格式。带占位参数进行查询
51
+func QueryPositionalToCSV(dbFactory *factory.DBFactory) func(c *gin.Context, req types.QueryRequest) {
52
+	return func(c *gin.Context, req types.QueryRequest) {
53
+		csvData, err := dbFactory.QueryPositionalToCSV(req.SQL, req.WriterHeader, req.PositionalParams)
54
+		if err != nil {
55
+			handleErrorResponseCSV(c, err)
56
+			return
57
+		}
58
+		handleSuccessResponseCSV(c, csvData)
29 59
 	}
30 60
 }

+ 33
- 0
functions/query_json.go Ver arquivo

@@ -0,0 +1,33 @@
1
+package functions
2
+
3
+import (
4
+	"github.com/gin-gonic/gin"
5
+
6
+	"git.x2erp.com/qdy/go-base/types"
7
+	"git.x2erp.com/qdy/go-db/factory"
8
+)
9
+
10
+// 执行查询,返回CSV数据格式。无参数查询
11
+func QueryToJSON(dbFactory *factory.DBFactory) func(c *gin.Context, req types.QueryRequest) {
12
+	return func(c *gin.Context, req types.QueryRequest) {
13
+		result := dbFactory.QueryToJSON(req.SQL)
14
+		c.JSON(200, result)
15
+	}
16
+}
17
+
18
+// 执行查询,返回CSV数据格式。带参数名称进行查询
19
+func QueryParamNameToJSON(dbFactory *factory.DBFactory) func(c *gin.Context, req types.QueryRequest) {
20
+	return func(c *gin.Context, req types.QueryRequest) {
21
+		result := dbFactory.QueryParamsNameToJSON(req.SQL, req.Params)
22
+		c.JSON(200, result)
23
+	}
24
+}
25
+
26
+// 执行查询,返回JSON数据格式。带占位参数进行查询
27
+func QueryPositionalToJSON(dbFactory *factory.DBFactory) func(c *gin.Context, req types.QueryRequest) {
28
+	return func(c *gin.Context, req types.QueryRequest) {
29
+		result := dbFactory.QueryPositionalToJSON(req.SQL, req.PositionalParams)
30
+
31
+		c.JSON(200, result)
32
+	}
33
+}

+ 3
- 3
go.mod Ver arquivo

@@ -3,9 +3,10 @@ module git.x2erp.com/qdy/go-service-agent
3 3
 go 1.25.4
4 4
 
5 5
 require (
6
-	git.x2erp.com/qdy/go-base v0.1.11
7
-	git.x2erp.com/qdy/go-db v0.1.20
6
+	git.x2erp.com/qdy/go-base v0.1.44
7
+	git.x2erp.com/qdy/go-db v0.1.42
8 8
 	github.com/gin-gonic/gin v1.11.0
9
+	github.com/go-sql-driver/mysql v1.9.3
9 10
 )
10 11
 
11 12
 require (
@@ -22,7 +23,6 @@ require (
22 23
 	github.com/go-playground/validator/v10 v10.27.0 // indirect
23 24
 	github.com/go-redis/redis/v8 v8.11.5 // indirect
24 25
 	github.com/go-resty/resty/v2 v2.17.0 // indirect
25
-	github.com/go-sql-driver/mysql v1.9.3 // indirect
26 26
 	github.com/goccy/go-json v0.10.2 // indirect
27 27
 	github.com/goccy/go-yaml v1.18.0 // indirect
28 28
 	github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect

+ 8
- 8
go.sum Ver arquivo

@@ -1,13 +1,13 @@
1 1
 filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
2 2
 filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
3
-git.x2erp.com/qdy/go-base v0.1.11 h1:STHT6z+zaN8kMIiXfggUdPaP4vcz+kpXaxHkw5ziXzA=
4
-git.x2erp.com/qdy/go-base v0.1.11/go.mod h1:Q+YLwpCoU8CVSnzATLdz2LAzVMlz/CEGzo8DePf7cug=
5
-git.x2erp.com/qdy/go-db v0.1.18 h1:jCNRCZpFuW1eN769R91YW49B+24Xl9t4qzS3Wtt6V40=
6
-git.x2erp.com/qdy/go-db v0.1.18/go.mod h1:BpoceA0ROuIp6N2aOplVNWKrwntDQeWGF63diEd1hp4=
7
-git.x2erp.com/qdy/go-db v0.1.19 h1:ydKKsu9XoZBYkJo81bat1IQuKnZVc7xpD/qahI1BSRE=
8
-git.x2erp.com/qdy/go-db v0.1.19/go.mod h1:BpoceA0ROuIp6N2aOplVNWKrwntDQeWGF63diEd1hp4=
9
-git.x2erp.com/qdy/go-db v0.1.20 h1:6+esB8b/KzF6WAKRe6lWIwUZqX/tPbqgBYJhpYW3uhc=
10
-git.x2erp.com/qdy/go-db v0.1.20/go.mod h1:BpoceA0ROuIp6N2aOplVNWKrwntDQeWGF63diEd1hp4=
3
+git.x2erp.com/qdy/go-base v0.1.31 h1:5YolS0/zJ1Tv+EfisREDa6+LEPrsA7uSIhh4Q5CrW04=
4
+git.x2erp.com/qdy/go-base v0.1.31/go.mod h1:Q+YLwpCoU8CVSnzATLdz2LAzVMlz/CEGzo8DePf7cug=
5
+git.x2erp.com/qdy/go-base v0.1.43 h1:O8aSfY5QpmunQkEdEU8Jbd/2NEs513QgTK2GKQvO8jU=
6
+git.x2erp.com/qdy/go-base v0.1.43/go.mod h1:Q+YLwpCoU8CVSnzATLdz2LAzVMlz/CEGzo8DePf7cug=
7
+git.x2erp.com/qdy/go-base v0.1.44 h1:xHpMppSNj79lqdUc+lmgGXOgEvHyNotUayoe5/hHWr4=
8
+git.x2erp.com/qdy/go-base v0.1.44/go.mod h1:Q+YLwpCoU8CVSnzATLdz2LAzVMlz/CEGzo8DePf7cug=
9
+git.x2erp.com/qdy/go-db v0.1.42 h1:WOnu5dPkKAoZXITLPUxFZO8OQhs1gliNW29HSlBsSi4=
10
+git.x2erp.com/qdy/go-db v0.1.42/go.mod h1:qTzxo5oTWLhNcaPHRX/WRuX0ROTSQGIXQFMmJHI6VKM=
11 11
 github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.0 h1:Gt0j3wceWMwPmiazCa8MzMA0MfhmPIz0Qp0FJ6qcM0U=
12 12
 github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.0/go.mod h1:Ot/6aikWnKWi4l9QB7qVSwa8iMphQNqkWALMoNT3rzM=
13 13
 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.10.1 h1:B+blDbyVIG3WaikNxPnhPiJ1MThR03b3vKGtER95TP4=

+ 1
- 0
main.go Ver arquivo

@@ -59,6 +59,7 @@ func startHTTPServer() {
59 59
 	router.GET("/api/health", functions.HealthHandler(dbFactory, cfg.GetDatabase().Type))
60 60
 	router.POST("/api/query", auth.AuthMiddleware(), withQueryRequest(functions.QueryToJSON(dbFactory)))
61 61
 	router.POST("/api/query/csv", auth.AuthMiddleware(), withQueryRequest(functions.QueryToCSV(dbFactory)))
62
+	router.POST("/api/query/csv/param", auth.AuthMiddleware(), withQueryRequest(functions.QueryPositionalToCSV(dbFactory)))
62 63
 	router.GET("/api/info", functions.InfoHandler(dbFactory))
63 64
 
64 65
 	// 日志输出配置信息

+ 328
- 0
test/my0_test.go Ver arquivo

@@ -0,0 +1,328 @@
1
+package main
2
+
3
+import (
4
+	"fmt"
5
+	"log"
6
+	"strings"
7
+	"sync"
8
+	"testing"
9
+	"time"
10
+
11
+	"git.x2erp.com/qdy/go-base/config"
12
+	"git.x2erp.com/qdy/go-base/types"
13
+	"git.x2erp.com/qdy/go-db/factory"
14
+)
15
+
16
+// // QueryRequest 查询请求结构体
17
+// type QueryRequest struct {
18
+// 	SQL              string                 `json:"sql" binding:"required"`
19
+// 	Params           map[string]interface{} `json:"params,omitempty"`           // 名称参数
20
+// 	PositionalParams []interface{}          `json:"positionalParams,omitempty"` // 位置参数
21
+// }
22
+
23
+// 分页配置
24
+type PageConfig1 struct {
25
+	TotalRows  int // 总共需要查询多少行
26
+	PageSize   int // 每次返回多少行
27
+	MaxWorkers int // 几条线程同时工作
28
+	StartRow   int //从哪行开始查询
29
+}
30
+
31
+// 查询任务
32
+type QueryTask1 struct {
33
+	Page           int
34
+	StartRow       int
35
+	EndRow         int
36
+	QuerySQL       string
37
+	QueryParams    []interface{}
38
+	CSVData        string
39
+	Error          error
40
+	QueryTime      time.Duration
41
+	SaveTime       time.Duration
42
+	LastClothingID string // 上一页最后一条记录的CLOTHING_ID
43
+}
44
+
45
+func TestQueryAndInsertToDoris1(t *testing.T) {
46
+	// 记录总开始时间
47
+	totalStartTime := time.Now()
48
+
49
+	// 配置分页参数
50
+	pageConfig := PageConfig{
51
+		TotalRows:  100, // 总共需要查询多少行
52
+		PageSize:   50,  // 每次查询2000条
53
+		MaxWorkers: 2,   // 几条线程同时工作
54
+		StartRow:   0,   //从哪行开始查询
55
+	}
56
+
57
+	fmt.Printf("开始执行分页查询,总共%d行,每页%d条,%d线程工作\n", pageConfig.TotalRows, pageConfig.PageSize, pageConfig.MaxWorkers)
58
+
59
+	// 1. 获取HTTP工厂实例
60
+	httpFactory, err := factory.GetHTTPFactory()
61
+	if err != nil {
62
+		t.Fatalf("Failed to get HTTP factory: %v", err)
63
+	}
64
+	fmt.Println("HTTP factory created successfully")
65
+
66
+	// 7. 获取Doris工厂实例
67
+	dorisFactory, err := factory.GetDorisFactory(httpFactory)
68
+	if err != nil {
69
+		t.Fatalf("Failed to get Doris factory: %v", err)
70
+	}
71
+	fmt.Println("Doris factory created successfully")
72
+
73
+	// 检查Doris表结构
74
+	fmt.Println("Checking Doris table structure...")
75
+
76
+	// 获取Doris配置
77
+	cfg := config.GetConfig()
78
+	database := "X6_STOCK_DEV"
79
+	table := "A3_CLOTHING_LOG"
80
+	skipHeader := false // 改为true,跳过CSV头行
81
+	url := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", cfg.GetDoris().FEHost, cfg.GetDoris().FEPort, database, table)
82
+	fmt.Printf("Doris stream load URL: %s\n", url)
83
+
84
+	// 计算需要的页数
85
+	totalPages := (pageConfig.TotalRows + pageConfig.PageSize - 1) / pageConfig.PageSize
86
+	fmt.Printf("预计总共需要查询 %d 页\n", totalPages)
87
+
88
+	// 创建任务通道和结果通道
89
+	taskChan := make(chan QueryTask, totalPages)
90
+	resultChan := make(chan QueryTask, totalPages)
91
+	doneChan := make(chan bool)
92
+
93
+	var wg sync.WaitGroup
94
+	var mu sync.Mutex
95
+
96
+	// 执行统计变量
97
+	totalQueryTime := time.Duration(0)
98
+	totalSaveTime := time.Duration(0)
99
+	totalRowsInserted := 0
100
+	completedTasks := 0
101
+
102
+	// 启动工作线程
103
+	for i := 0; i < pageConfig.MaxWorkers; i++ {
104
+		wg.Add(1)
105
+		go func(workerID int) {
106
+			defer wg.Done()
107
+
108
+			// 每个worker创建自己的HTTP客户端
109
+			httpClient := httpFactory.CreateClient()
110
+
111
+			for task := range taskChan {
112
+				fmt.Printf("Worker %d 处理第 %d 页 (行 %d-%d, CLOTHING_ID > %s)...\n",
113
+					workerID, task.Page, task.StartRow, task.EndRow, task.LastClothingID)
114
+
115
+				// 记录查询开始时间
116
+				queryStartTime := time.Now()
117
+
118
+				// 准备查询请求
119
+				queryRequest := types.QueryRequest{
120
+					SQL:              task.QuerySQL,
121
+					PositionalParams: task.QueryParams,
122
+					WriterHeader:     false,
123
+				}
124
+
125
+				// 发送POST请求到 /api/query/csv 获取CSV格式数据
126
+				resp, err := httpClient.PostWithAuth(
127
+					"http://localhost:8080/api/query/csv/param",
128
+					queryRequest,
129
+					"123", // Bearer Token
130
+					nil,
131
+				)
132
+
133
+				if err != nil {
134
+					task.Error = fmt.Errorf("第%d页查询失败: %v", task.Page, err)
135
+					resultChan <- task
136
+					continue
137
+				}
138
+
139
+				if resp.StatusCode() != 200 {
140
+					task.Error = fmt.Errorf("第%d页查询请求失败, 状态码: %d", task.Page, resp.StatusCode())
141
+					resultChan <- task
142
+					continue
143
+				}
144
+
145
+				// 获取CSV数据
146
+				csvData := string(resp.Body())
147
+				if len(csvData) == 0 {
148
+					task.Error = fmt.Errorf("第%d页没有数据", task.Page)
149
+					resultChan <- task
150
+					continue
151
+				}
152
+				log.Printf("csvData:\n%s", csvData)
153
+				// 记录查询结束时间
154
+				queryEndTime := time.Now()
155
+				queryDuration := queryEndTime.Sub(queryStartTime)
156
+				task.QueryTime = queryDuration
157
+				task.CSVData = csvData
158
+
159
+				// 插入数据到Doris
160
+				saveStartTime := time.Now()
161
+				err = dorisFactory.InsertCSV(database, table, csvData, skipHeader)
162
+				if err != nil {
163
+					task.Error = fmt.Errorf("第%d页数据插入Doris失败: %v", task.Page, err)
164
+					resultChan <- task
165
+					continue
166
+				}
167
+
168
+				// 记录保存结束时间
169
+				saveEndTime := time.Now()
170
+				saveDuration := saveEndTime.Sub(saveStartTime)
171
+				task.SaveTime = saveDuration
172
+
173
+				resultChan <- task
174
+			}
175
+		}(i + 1)
176
+	}
177
+
178
+	// 启动结果处理协程
179
+	go func() {
180
+		for task := range resultChan {
181
+			mu.Lock()
182
+
183
+			if task.Error != nil {
184
+				fmt.Printf("❌ 第 %d 页处理失败: %v\n", task.Page, task.Error)
185
+			} else {
186
+				// 计算本页数据行数
187
+				estimatedRows := task.EndRow - task.StartRow + 1
188
+				// 如果数据不足,按实际估算
189
+				if len(task.CSVData) < estimatedRows*50 { // 保守估计每行至少50字符
190
+					estimatedRows = len(task.CSVData) / 50
191
+				}
192
+
193
+				totalQueryTime += task.QueryTime
194
+				totalSaveTime += task.SaveTime
195
+				totalRowsInserted += estimatedRows
196
+				completedTasks++
197
+
198
+				fmt.Printf("✅ Worker 完成第 %d 页 (行 %d-%d, CLOTHING_ID > %s)\n",
199
+					task.Page, task.StartRow, task.EndRow, task.LastClothingID)
200
+				fmt.Printf("   查询耗时: %v, 保存耗时: %v\n", task.QueryTime, task.SaveTime)
201
+				fmt.Printf("   估算数据行数: %d\n", estimatedRows)
202
+
203
+				// 从CSV数据中提取最后一行的CLOTHING_ID
204
+				if task.CSVData != "" {
205
+					// 简单实现:取最后一行第一列的第一个记录
206
+					// 这里假设CSV格式是逗号分隔,没有标题行
207
+					lines := strings.Split(strings.TrimSpace(task.CSVData), "\n")
208
+					if len(lines) > 0 {
209
+						// 取最后一行(如果有多行数据)
210
+						lastLine := lines[len(lines)-1]
211
+						fields := strings.Split(lastLine, ",")
212
+						if len(fields) > 0 {
213
+							// 更新任务的LastClothingID
214
+							task.LastClothingID = strings.TrimSpace(fields[0]) // CLOTHING_ID是第一列
215
+							fmt.Printf("   提取到最后一个CLOTHING_ID: %s\n", task.LastClothingID)
216
+						}
217
+					}
218
+				}
219
+			}
220
+
221
+			mu.Unlock()
222
+
223
+			// 如果所有任务都完成了,发送完成信号
224
+			if completedTasks >= totalPages {
225
+				doneChan <- true
226
+				break
227
+			}
228
+		}
229
+	}()
230
+
231
+	// 生成任务并发送到任务通道
232
+	fmt.Println("\n📋 开始生成查询任务...")
233
+
234
+	// 初始CLOTHING_ID为"0"
235
+	lastClothingID := "A"
236
+
237
+	for page := 1; page <= totalPages; page++ {
238
+		startRow := pageConfig.StartRow + (page-1)*pageConfig.PageSize + 1
239
+		endRow := pageConfig.StartRow + page*pageConfig.PageSize
240
+
241
+		// 检查不超过要查询的总行数
242
+		maxEndRow := pageConfig.StartRow + pageConfig.TotalRows
243
+		if endRow > maxEndRow {
244
+			endRow = maxEndRow
245
+		}
246
+
247
+		// 生成查询SQL(使用参数模式)
248
+		querySQL, queryParams := getSQLWithPagination(startRow, endRow, lastClothingID)
249
+
250
+		task := QueryTask{
251
+			Page:           page,
252
+			StartRow:       startRow,
253
+			EndRow:         endRow,
254
+			QuerySQL:       querySQL,
255
+			QueryParams:    queryParams,
256
+			LastClothingID: lastClothingID,
257
+		}
258
+
259
+		fmt.Printf("生成第 %d 页任务 (行 %d-%d), 使用CLOTHING_ID > '%s'\n",
260
+			page, startRow, endRow, lastClothingID)
261
+		taskChan <- task
262
+
263
+		// 注意:在实际场景中,lastClothingID需要在任务完成后更新
264
+		// 这里简化处理,每个任务使用相同的lastClothingID
265
+		// 更好的做法是使用任务队列和结果回调来更新lastClothingID
266
+	}
267
+
268
+	// 关闭任务通道,通知worker没有更多任务
269
+	close(taskChan)
270
+
271
+	// 等待所有worker完成
272
+	wg.Wait()
273
+
274
+	// 关闭结果通道
275
+	close(resultChan)
276
+
277
+	// 等待结果处理完成
278
+	<-doneChan
279
+
280
+	// 记录总结束时间
281
+	totalEndTime := time.Now()
282
+	totalDuration := totalEndTime.Sub(totalStartTime)
283
+
284
+	// 打印性能统计
285
+	fmt.Println("\n📊 性能统计:")
286
+	fmt.Printf("   完成页数: %d/%d\n", completedTasks, totalPages)
287
+	fmt.Printf("   总查询耗时: %v\n", totalQueryTime)
288
+	fmt.Printf("   总保存耗时: %v\n", totalSaveTime)
289
+	fmt.Printf("   总耗时: %v\n", totalDuration)
290
+	fmt.Printf("   估算插入总行数: %d\n", totalRowsInserted)
291
+	if completedTasks > 0 {
292
+		fmt.Printf("   平均每页查询耗时: %v\n", totalQueryTime/time.Duration(completedTasks))
293
+		fmt.Printf("   平均每页保存耗时: %v\n", totalSaveTime/time.Duration(completedTasks))
294
+		fmt.Printf("   平均每秒处理行数: %.2f\n", float64(totalRowsInserted)/totalDuration.Seconds())
295
+	}
296
+
297
+	fmt.Println("✅ 所有数据成功插入到 Doris!")
298
+}
299
+
300
+// getSQLWithPagination 生成带分页的SQL语句(参数模式)
301
+// 返回SQL语句和参数映射
302
+func getSQLWithPagination1(startRow, endRow int, lastClothingID string) (string, []interface{}) {
303
+	sql := `SELECT
304
+    CLOTHING_ID,
305
+    CLOTHING_YEAR,
306
+    CLOTHING_NAME
307
+
308
+   FROM (
309
+    SELECT a.*, ROWNUM as rn
310
+    FROM (
311
+        SELECT *
312
+        FROM X6_STOCK_DEV.A3_CLOTHING 
313
+        WHERE CLOTHING_ID > :1
314
+        ORDER BY CLOTHING_ID
315
+    ) a
316
+    WHERE ROWNUM <= :2
317
+)
318
+WHERE rn > :3`
319
+
320
+	// 创建参数映射
321
+	params := []interface{}{
322
+		lastClothingID,
323
+		endRow,
324
+		startRow - 1, // WHERE rn > :start_row 所以是startRow-1
325
+	}
326
+
327
+	return sql, params
328
+}

+ 418
- 0
test/my_ora_clothingToDoris_test.go Ver arquivo

@@ -0,0 +1,418 @@
1
+package main
2
+
3
+import (
4
+	"fmt"
5
+	"log"
6
+	"strings"
7
+	"sync"
8
+	"testing"
9
+	"time"
10
+
11
+	"git.x2erp.com/qdy/go-base/config"
12
+	"git.x2erp.com/qdy/go-base/types"
13
+	"git.x2erp.com/qdy/go-db/factory"
14
+)
15
+
16
+// 一定要执行环境变量
17
+// export DB_CONFIG_PATH=/Users/kenqdy/Documents/v-bdx-workspace/db_doris.yaml
18
+
19
+// // QueryRequest 查询请求结构体
20
+// type QueryRequest struct {
21
+// 	SQL              string                 `json:"sql" binding:"required"`
22
+// 	Params           map[string]interface{} `json:"params,omitempty"`           // 名称参数
23
+// 	PositionalParams []interface{}          `json:"positionalParams,omitempty"` // 位置参数
24
+// }
25
+
26
+// 分页配置
27
+type PageConfig struct {
28
+	TotalRows  int // 总共需要查询多少行
29
+	PageSize   int // 每次返回多少行
30
+	MaxWorkers int // 几条线程同时工作
31
+	StartRow   int //从哪行开始查询
32
+}
33
+
34
+// 查询任务
35
+type QueryTask struct {
36
+	Page           int
37
+	StartRow       int
38
+	EndRow         int
39
+	QuerySQL       string
40
+	QueryParams    []interface{}
41
+	CSVData        string
42
+	Error          error
43
+	QueryTime      time.Duration
44
+	SaveTime       time.Duration
45
+	LastClothingID string // 上一页最后一条记录的CLOTHING_ID
46
+}
47
+
48
+func TestQueryAndInsertToDoris(t *testing.T) {
49
+	// 记录总开始时间
50
+	totalStartTime := time.Now()
51
+
52
+	// 配置分页参数
53
+	pageConfig := PageConfig{
54
+		TotalRows:  400000, // 总共需要查询多少行
55
+		PageSize:   3000,   // 每次查询2000条
56
+		MaxWorkers: 10,     // 几条线程同时工作
57
+		StartRow:   0,      //从哪行开始查询
58
+	}
59
+
60
+	fmt.Printf("开始执行分页查询,总共%d行,每页%d条,%d线程工作\n", pageConfig.TotalRows, pageConfig.PageSize, pageConfig.MaxWorkers)
61
+
62
+	// 1. 获取HTTP工厂实例
63
+	httpFactory, err := factory.GetHTTPFactory()
64
+	if err != nil {
65
+		t.Fatalf("Failed to get HTTP factory: %v", err)
66
+	}
67
+	fmt.Println("HTTP factory created successfully")
68
+
69
+	// 7. 获取Doris工厂实例
70
+	dorisFactory, err := factory.GetDorisFactory(httpFactory)
71
+	if err != nil {
72
+		t.Fatalf("Failed to get Doris factory: %v", err)
73
+	}
74
+	fmt.Println("Doris factory created successfully")
75
+
76
+	// 检查Doris表结构
77
+	fmt.Println("Checking Doris table structure...")
78
+
79
+	// 获取Doris配置
80
+	cfg := config.GetConfig()
81
+	database := "X6_STOCK_DEV"
82
+	table := "A3_CLOTHING"
83
+	skipHeader := false // 改为true,跳过CSV头行
84
+	url := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", cfg.GetDoris().FEHost, cfg.GetDoris().FEPort, database, table)
85
+	fmt.Printf("Doris stream load URL: %s\n", url)
86
+
87
+	// 计算需要的页数
88
+	totalPages := (pageConfig.TotalRows + pageConfig.PageSize - 1) / pageConfig.PageSize
89
+	fmt.Printf("预计总共需要查询 %d 页\n", totalPages)
90
+
91
+	// 创建任务通道和结果通道
92
+	taskChan := make(chan QueryTask, totalPages)
93
+	resultChan := make(chan QueryTask, totalPages)
94
+	doneChan := make(chan bool)
95
+
96
+	var wg sync.WaitGroup
97
+	var mu sync.Mutex
98
+
99
+	// 执行统计变量
100
+	totalQueryTime := time.Duration(0)
101
+	totalSaveTime := time.Duration(0)
102
+	totalRowsInserted := 0
103
+	completedTasks := 0
104
+
105
+	// 启动工作线程
106
+	for i := 0; i < pageConfig.MaxWorkers; i++ {
107
+		wg.Add(1)
108
+		go func(workerID int) {
109
+			defer wg.Done()
110
+
111
+			// 每个worker创建自己的HTTP客户端
112
+			httpClient := httpFactory.CreateClient()
113
+
114
+			for task := range taskChan {
115
+				fmt.Printf("Worker %d 处理第 %d 页 (行 %d-%d, CLOTHING_ID > %s)...\n",
116
+					workerID, task.Page, task.StartRow, task.EndRow, task.LastClothingID)
117
+
118
+				// 记录查询开始时间
119
+				queryStartTime := time.Now()
120
+
121
+				// 准备查询请求
122
+				queryRequest := types.QueryRequest{
123
+					SQL:              task.QuerySQL,
124
+					PositionalParams: task.QueryParams,
125
+					WriterHeader:     false,
126
+				}
127
+
128
+				// 发送POST请求到 /api/query/csv 获取CSV格式数据
129
+				resp, err := httpClient.PostWithAuth(
130
+					"http://localhost:8080/api/query/csv/param",
131
+					queryRequest,
132
+					"123", // Bearer Token
133
+					nil,
134
+				)
135
+
136
+				if err != nil {
137
+					task.Error = fmt.Errorf("第%d页查询失败: %v", task.Page, err)
138
+					resultChan <- task
139
+					continue
140
+				}
141
+
142
+				if resp.StatusCode() != 200 {
143
+					task.Error = fmt.Errorf("第%d页查询请求失败, 状态码: %d", task.Page, resp.StatusCode())
144
+					resultChan <- task
145
+					continue
146
+				}
147
+
148
+				// 获取CSV数据
149
+				csvData := string(resp.Body())
150
+				if len(csvData) == 0 {
151
+					task.Error = fmt.Errorf("第%d页没有数据", task.Page)
152
+					resultChan <- task
153
+					continue
154
+				}
155
+				log.Printf("csvData:\n%s", csvData)
156
+				// 记录查询结束时间
157
+				queryEndTime := time.Now()
158
+				queryDuration := queryEndTime.Sub(queryStartTime)
159
+				task.QueryTime = queryDuration
160
+				task.CSVData = csvData
161
+
162
+				// 插入数据到Doris
163
+				saveStartTime := time.Now()
164
+				err = dorisFactory.InsertCSV(database, table, csvData, skipHeader)
165
+				if err != nil {
166
+					task.Error = fmt.Errorf("第%d页数据插入Doris失败: %v", task.Page, err)
167
+					resultChan <- task
168
+					continue
169
+				}
170
+
171
+				// 记录保存结束时间
172
+				saveEndTime := time.Now()
173
+				saveDuration := saveEndTime.Sub(saveStartTime)
174
+				task.SaveTime = saveDuration
175
+
176
+				resultChan <- task
177
+			}
178
+		}(i + 1)
179
+	}
180
+
181
+	// 启动结果处理协程
182
+	go func() {
183
+		for task := range resultChan {
184
+			mu.Lock()
185
+
186
+			if task.Error != nil {
187
+				fmt.Printf("❌ 第 %d 页处理失败: %v\n", task.Page, task.Error)
188
+			} else {
189
+				// 计算本页数据行数
190
+				estimatedRows := task.EndRow - task.StartRow + 1
191
+				// 如果数据不足,按实际估算
192
+				if len(task.CSVData) < estimatedRows*50 { // 保守估计每行至少50字符
193
+					estimatedRows = len(task.CSVData) / 50
194
+				}
195
+
196
+				totalQueryTime += task.QueryTime
197
+				totalSaveTime += task.SaveTime
198
+				totalRowsInserted += estimatedRows
199
+				completedTasks++
200
+
201
+				fmt.Printf("✅ Worker 完成第 %d 页 (行 %d-%d, CLOTHING_ID > %s)\n",
202
+					task.Page, task.StartRow, task.EndRow, task.LastClothingID)
203
+				fmt.Printf("   查询耗时: %v, 保存耗时: %v\n", task.QueryTime, task.SaveTime)
204
+				fmt.Printf("   估算数据行数: %d\n", estimatedRows)
205
+
206
+				// 从CSV数据中提取最后一行的CLOTHING_ID
207
+				if task.CSVData != "" {
208
+					// 简单实现:取最后一行第一列的第一个记录
209
+					// 这里假设CSV格式是逗号分隔,没有标题行
210
+					lines := strings.Split(strings.TrimSpace(task.CSVData), "\n")
211
+					if len(lines) > 0 {
212
+						// 取最后一行(如果有多行数据)
213
+						lastLine := lines[len(lines)-1]
214
+						fields := strings.Split(lastLine, ",")
215
+						if len(fields) > 0 {
216
+							// 更新任务的LastClothingID
217
+							task.LastClothingID = strings.TrimSpace(fields[0]) // CLOTHING_ID是第一列
218
+							fmt.Printf("   提取到最后一个CLOTHING_ID: %s\n", task.LastClothingID)
219
+						}
220
+					}
221
+				}
222
+			}
223
+
224
+			mu.Unlock()
225
+
226
+			// 如果所有任务都完成了,发送完成信号
227
+			if completedTasks >= totalPages {
228
+				doneChan <- true
229
+				break
230
+			}
231
+		}
232
+	}()
233
+
234
+	// 生成任务并发送到任务通道
235
+	fmt.Println("\n📋 开始生成查询任务...")
236
+
237
+	// 初始CLOTHING_ID为"0"
238
+	lastClothingID := "0"
239
+
240
+	for page := 1; page <= totalPages; page++ {
241
+		startRow := pageConfig.StartRow + (page-1)*pageConfig.PageSize + 1
242
+		endRow := pageConfig.StartRow + page*pageConfig.PageSize
243
+
244
+		// 检查不超过要查询的总行数
245
+		maxEndRow := pageConfig.StartRow + pageConfig.TotalRows
246
+		if endRow > maxEndRow {
247
+			endRow = maxEndRow
248
+		}
249
+
250
+		// 生成查询SQL(使用参数模式)
251
+		querySQL, queryParams := getSQLWithPagination(startRow, endRow, lastClothingID)
252
+
253
+		task := QueryTask{
254
+			Page:           page,
255
+			StartRow:       startRow,
256
+			EndRow:         endRow,
257
+			QuerySQL:       querySQL,
258
+			QueryParams:    queryParams,
259
+			LastClothingID: lastClothingID,
260
+		}
261
+
262
+		fmt.Printf("生成第 %d 页任务 (行 %d-%d), 使用CLOTHING_ID > '%s'\n",
263
+			page, startRow, endRow, lastClothingID)
264
+		taskChan <- task
265
+
266
+		// 注意:在实际场景中,lastClothingID需要在任务完成后更新
267
+		// 这里简化处理,每个任务使用相同的lastClothingID
268
+		// 更好的做法是使用任务队列和结果回调来更新lastClothingID
269
+	}
270
+
271
+	// 关闭任务通道,通知worker没有更多任务
272
+	close(taskChan)
273
+
274
+	// 等待所有worker完成
275
+	wg.Wait()
276
+
277
+	// 关闭结果通道
278
+	close(resultChan)
279
+
280
+	// 等待结果处理完成
281
+	<-doneChan
282
+
283
+	// 记录总结束时间
284
+	totalEndTime := time.Now()
285
+	totalDuration := totalEndTime.Sub(totalStartTime)
286
+
287
+	// 打印性能统计
288
+	fmt.Println("\n📊 性能统计:")
289
+	fmt.Printf("   完成页数: %d/%d\n", completedTasks, totalPages)
290
+	fmt.Printf("   总查询耗时: %v\n", totalQueryTime)
291
+	fmt.Printf("   总保存耗时: %v\n", totalSaveTime)
292
+	fmt.Printf("   总耗时: %v\n", totalDuration)
293
+	fmt.Printf("   估算插入总行数: %d\n", totalRowsInserted)
294
+	if completedTasks > 0 {
295
+		fmt.Printf("   平均每页查询耗时: %v\n", totalQueryTime/time.Duration(completedTasks))
296
+		fmt.Printf("   平均每页保存耗时: %v\n", totalSaveTime/time.Duration(completedTasks))
297
+		fmt.Printf("   平均每秒处理行数: %.2f\n", float64(totalRowsInserted)/totalDuration.Seconds())
298
+	}
299
+
300
+	fmt.Println("✅ 所有数据成功插入到 Doris!")
301
+}
302
+
303
+// getSQLWithPagination 生成带分页的SQL语句(参数模式)
304
+// 返回SQL语句和参数映射
305
+func getSQLWithPagination(startRow, endRow int, lastClothingID string) (string, []interface{}) {
306
+	sql := `SELECT
307
+    CLOTHING_ID,
308
+    CLOTHING_YEAR,
309
+    CLOTHING_NAME,
310
+    STYLECOLOR_ID,
311
+    STYLE_ID,
312
+    COLOR_ID,
313
+    SIZE_ID,
314
+    CREATE_DATE,
315
+    STYLE_GROUP,
316
+    J_PRICE,
317
+    X_PRICE,
318
+    V_PRICE,
319
+    CLERK_ROYALTYRATE,
320
+    CLERK_ROYALTYPRICE,
321
+    BRAND_CODE,
322
+    STYLEVER_ID,
323
+    J_COST,
324
+    CLOTHING_IMG,
325
+    STYLE_UNIT_CODE,
326
+    STYLE_SEX_CODE,
327
+    STYLE_KIND_CODE,
328
+    STYLE_CLASS_CODE,
329
+    STYLE_SUBCLASS_CODE,
330
+    STYLE_DESIGNER_CODE,
331
+    STYLE_PLATER_CODE,
332
+    STYLE_STYLES_CODE,
333
+    STYLE_LOCATE_CODE,
334
+    STYLE_SALETYPE_CODE,
335
+    STYLE_COLORSYSTEM_CODE,
336
+    STYLE_THEME_CODE,
337
+    STYLE_INDENTTYPE_CODE,
338
+    STYLE_PRICEBAND_CODE,
339
+    STYLE_MONTH_CODE,
340
+    STYLE_COMPOSITION_CODE,
341
+    STYLE_SUPPLIER_CODE,
342
+    STYLE_SPARE1_CODE,
343
+    STYLE_SPARE2_CODE,
344
+    STYLE_SPARE4_CODE,
345
+    STYLE_SPARE5_CODE,
346
+    CATEGORY_CODE,
347
+    BRAND_ID,
348
+    STYCOLVER_ID,
349
+    STYLE_SAME,
350
+    CLOTHING_BARCODE,
351
+    CLOTHING_HELPID,
352
+    CLOTHING_GBCODE,
353
+    CLOTHING_RFID,
354
+    STYLE_SUBJECT_ID,
355
+    SIZEGRP_ID,
356
+    STYLE_HELPID,
357
+    CLOTHING_GBCODE1,
358
+    COLOR_NAME,
359
+    STYLEVER_NAME,
360
+    SIZE_NAME,
361
+    STYLE_UNIT,
362
+    STYLE_SEX,
363
+    STYLE_KIND,
364
+    STYLE_CLASS,
365
+    STYLE_SUBCLASS,
366
+    STYLE_DESIGNER,
367
+    STYLE_PLATER,
368
+    STYLE_BAND,
369
+    STYLE_STYLES,
370
+    STYLE_LOCATE,
371
+    STYLE_SALETYPE,
372
+    STYLE_COLORSYSTEM,
373
+    STYLE_THEME,
374
+    STYLE_INDENTTYPE,
375
+    STYLE_PRICEBAND,
376
+    STYLE_MONTH,
377
+    STYLE_COMPOSITION,
378
+    STYLE_SUPPLIER,
379
+    STYLE_SPARE1,
380
+    STYLE_SPARE2,
381
+    STYLE_SPARE3,
382
+    STYLE_SPARE4,
383
+    STYLE_SPARE5,
384
+    CATEGORY_NAME,
385
+    BRAND_NAME,
386
+    STYLE_YEAR_NAME,
387
+    STYLE_SEARCH_KEY,
388
+    STYLE_SUBJECT_NAME,
389
+    CLOTHING_REMARK,
390
+    STYLE_SPARE3_CODE,
391
+    COST,
392
+    BRAND_GROUPCODE,
393
+    CLASS_GROUPCODE,
394
+    MONTH_GROUPCODE,
395
+    RETURNSUBJECT_ID,
396
+    PRODUCT_SORT,
397
+    CLOTHING_PARTITION
398
+   FROM (
399
+    SELECT a.*, ROWNUM as rn
400
+    FROM (
401
+        SELECT *
402
+        FROM X6_STOCK_DEV.A3_CLOTHING 
403
+       
404
+        ORDER BY CLOTHING_ID
405
+    ) a
406
+    WHERE ROWNUM <= :1
407
+)
408
+WHERE rn > :2`
409
+
410
+	// 创建参数映射
411
+	params := []interface{}{
412
+		//lastClothingID,
413
+		endRow,
414
+		startRow - 1, // WHERE rn > :start_row 所以是startRow-1
415
+	}
416
+
417
+	return sql, params
418
+}

+ 0
- 236
test/my_test.go Ver arquivo

@@ -1,236 +0,0 @@
1
-package main
2
-
3
-import (
4
-	"fmt"
5
-	"testing"
6
-	"time"
7
-
8
-	"git.x2erp.com/qdy/go-base/config"
9
-	"git.x2erp.com/qdy/go-db/factory"
10
-)
11
-
12
-// QueryRequest 查询请求结构体
13
-type QueryRequest struct {
14
-	SQL string `json:"sql"`
15
-}
16
-
17
-func TestQueryAndInsertToDoris(t *testing.T) {
18
-	// 记录总开始时间
19
-	totalStartTime := time.Now()
20
-	// 1. 获取HTTP工厂实例
21
-	httpFactory, err := factory.GetHTTPFactory()
22
-	if err != nil {
23
-		t.Fatalf("Failed to get HTTP factory: %v", err)
24
-	}
25
-	fmt.Println("HTTP factory created successfully")
26
-
27
-	// 2. 创建HTTP客户端
28
-	httpClient := httpFactory.CreateClient()
29
-
30
-	// 3. 准备查询SQL - 限制数据量用于测试
31
-	querySQL := getSQL()
32
-
33
-	// 4. 准备查询请求
34
-	queryRequest := QueryRequest{
35
-		SQL: querySQL,
36
-	}
37
-
38
-	// 记录查询开始时间
39
-	queryStartTime := time.Now()
40
-	// 5. 发送POST请求到 /api/query/csv 获取CSV格式数据
41
-	fmt.Println("Sending query request to localhost:8080/api/query/csv...")
42
-
43
-	resp, err := httpClient.PostWithAuth(
44
-		"http://localhost:8080/api/query/csv",
45
-		queryRequest,
46
-		"123", // Bearer Token
47
-		nil,
48
-	)
49
-
50
-	if err != nil {
51
-		t.Fatalf("Failed to send query request: %v", err)
52
-	}
53
-
54
-	if resp.StatusCode() != 200 {
55
-		t.Fatalf("Query request failed with status: %d, body: %s", resp.StatusCode(), string(resp.Body()))
56
-	}
57
-
58
-	// 6. 获取CSV数据
59
-	csvData := string(resp.Body())
60
-	if len(csvData) == 0 {
61
-		t.Fatalf("No CSV data received")
62
-	}
63
-
64
-	// 记录查询结束时间
65
-	queryEndTime := time.Now()
66
-	queryDuration := queryEndTime.Sub(queryStartTime)
67
-
68
-	fmt.Printf("✅ Query completed in %v\n", queryDuration)
69
-	fmt.Printf("Successfully retrieved CSV data, length: %d bytes\n", len(csvData))
70
-	//fmt.Printf("CSV data content:\n%s\n", csvData)
71
-
72
-	// 7. 获取Doris工厂实例
73
-	dorisFactory, err := factory.GetDorisFactory(httpFactory)
74
-	if err != nil {
75
-		t.Fatalf("Failed to get Doris factory: %v", err)
76
-	}
77
-	fmt.Println("Doris factory created successfully")
78
-
79
-	// 8. 先检查Doris表结构
80
-	fmt.Println("Checking Doris table structure...")
81
-
82
-	// 9. 插入数据到Doris
83
-	database := "X6_STOCK_DEV"
84
-	table := "A3_CLOTHING"
85
-	skipHeader := false // 改为true,跳过CSV头行
86
-
87
-	fmt.Printf("Inserting data to Doris database: %s, table: %s\n", database, table)
88
-	//fmt.Printf("CSV data to insert:\n%q\n", csvData)
89
-
90
-	cfg := config.GetConfig()
91
-	url := fmt.Sprintf("http://%s:%d/api/%s/%s/_stream_load", cfg.GetDoris().FEHost, cfg.GetDoris().FEPort, database, table)
92
-
93
-	fmt.Print(url + ".\n")
94
-
95
-	// 记录保存开始时间
96
-	saveStartTime := time.Now()
97
-	err = dorisFactory.InsertCSV(database, table, csvData, skipHeader)
98
-	if err != nil {
99
-		t.Fatalf("Failed to insert data to Doris: %v", err)
100
-	}
101
-
102
-	// 记录保存结束时间
103
-	saveEndTime := time.Now()
104
-	saveDuration := saveEndTime.Sub(saveStartTime)
105
-
106
-	fmt.Printf("✅ Data saved to Doris in %v\n", saveDuration)
107
-
108
-	// 记录总结束时间
109
-	totalEndTime := time.Now()
110
-	totalDuration := totalEndTime.Sub(totalStartTime)
111
-
112
-	// 打印性能统计
113
-	fmt.Println("\n📊 Performance Statistics:")
114
-	fmt.Printf("   Query Time: %v\n", queryDuration)
115
-	fmt.Printf("   Save Time:  %v\n", saveDuration)
116
-	fmt.Printf("   Total Time: %v\n", totalDuration)
117
-	fmt.Printf("   Data Size:  %d bytes\n", len(csvData))
118
-
119
-	fmt.Println("Data successfully inserted to Doris!")
120
-}
121
-
122
-func getSQLm() string {
123
-	return `SELECT
124
-    CLOTHING_ID,
125
-    CLOTHING_YEAR,
126
-    CLOTHING_NAME
127
-FROM X6_STOCK_DEV.A3_CLOTHING WHERE rownum <= 100000`
128
-}
129
-
130
-func getSQL() string {
131
-	// 3. 准备查询SQL
132
-	return `SELECT
133
-    CLOTHING_ID,
134
-    CLOTHING_YEAR,
135
-    CLOTHING_NAME,
136
-    STYLECOLOR_ID,
137
-    STYLE_ID,
138
-    COLOR_ID,
139
-    SIZE_ID,
140
-    CREATE_DATE,
141
-    STYLE_GROUP,
142
-    J_PRICE,
143
-    X_PRICE,
144
-    V_PRICE,
145
-    CLERK_ROYALTYRATE,
146
-    CLERK_ROYALTYPRICE,
147
-    BRAND_CODE,
148
-    STYLEVER_ID,
149
-    J_COST,
150
-    CLOTHING_IMG,
151
-    STYLE_UNIT_CODE,
152
-    STYLE_SEX_CODE,
153
-    STYLE_KIND_CODE,
154
-    STYLE_CLASS_CODE,
155
-    STYLE_SUBCLASS_CODE,
156
-    STYLE_DESIGNER_CODE,
157
-    STYLE_PLATER_CODE,
158
-    STYLE_STYLES_CODE,
159
-    STYLE_LOCATE_CODE,
160
-    STYLE_SALETYPE_CODE,
161
-    STYLE_COLORSYSTEM_CODE,
162
-    STYLE_THEME_CODE,
163
-    STYLE_INDENTTYPE_CODE,
164
-    STYLE_PRICEBAND_CODE,
165
-    STYLE_MONTH_CODE,
166
-    STYLE_COMPOSITION_CODE,
167
-    STYLE_SUPPLIER_CODE,
168
-    STYLE_SPARE1_CODE,
169
-    STYLE_SPARE2_CODE,
170
-    STYLE_SPARE4_CODE,
171
-    STYLE_SPARE5_CODE,
172
-    CATEGORY_CODE,
173
-    BRAND_ID,
174
-    STYCOLVER_ID,
175
-    STYLE_SAME,
176
-    CLOTHING_BARCODE,
177
-    CLOTHING_HELPID,
178
-    CLOTHING_GBCODE,
179
-    CLOTHING_RFID,
180
-    STYLE_SUBJECT_ID,
181
-    SIZEGRP_ID,
182
-    STYLE_HELPID,
183
-    CLOTHING_GBCODE1,
184
-    COLOR_NAME,
185
-    STYLEVER_NAME,
186
-    SIZE_NAME,
187
-    STYLE_UNIT,
188
-    STYLE_SEX,
189
-    STYLE_KIND,
190
-    STYLE_CLASS,
191
-    STYLE_SUBCLASS,
192
-    STYLE_DESIGNER,
193
-    STYLE_PLATER,
194
-    STYLE_BAND,
195
-    STYLE_STYLES,
196
-    STYLE_LOCATE,
197
-    STYLE_SALETYPE,
198
-    STYLE_COLORSYSTEM,
199
-    STYLE_THEME,
200
-    STYLE_INDENTTYPE,
201
-    STYLE_PRICEBAND,
202
-    STYLE_MONTH,
203
-    STYLE_COMPOSITION,
204
-    STYLE_SUPPLIER,
205
-    STYLE_SPARE1,
206
-    STYLE_SPARE2,
207
-    STYLE_SPARE3,
208
-    STYLE_SPARE4,
209
-    STYLE_SPARE5,
210
-    CATEGORY_NAME,
211
-    BRAND_NAME,
212
-    STYLE_YEAR_NAME,
213
-    STYLE_SEARCH_KEY,
214
-    STYLE_SUBJECT_NAME,
215
-    CLOTHING_REMARK,
216
-    STYLE_SPARE3_CODE,
217
-    COST,
218
-    BRAND_GROUPCODE,
219
-    CLASS_GROUPCODE,
220
-    MONTH_GROUPCODE,
221
-    RETURNSUBJECT_ID,
222
-    PRODUCT_SORT,
223
-    CLOTHING_PARTITION
224
-   FROM (
225
-    SELECT a.*, ROWNUM as rn
226
-    FROM (
227
-        SELECT *
228
-        FROM X6_STOCK_DEV.A3_CLOTHING 
229
-        ORDER BY CLOTHING_ID
230
-    ) a
231
-    WHERE ROWNUM <= 41000 
232
-)
233
-WHERE rn > 39000`
234
-
235
-	// FROM X6_STOCK_DEV.A3_CLOTHING WHERE rownum > 100 and rownum <= 20000`
236
-}

+ 69
- 0
test/mycsv_test.go Ver arquivo

@@ -0,0 +1,69 @@
1
+package main
2
+
3
+import (
4
+	"encoding/json"
5
+	"log"
6
+	"testing"
7
+
8
+	"git.x2erp.com/qdy/go-db/factory"
9
+)
10
+
11
+func TestNamedParamsQueryCSV(t *testing.T) {
12
+	factory, err := factory.GetDBFactory()
13
+	if err != nil {
14
+		t.Fatalf("Failed to get DB factory: %v", err)
15
+	}
16
+	defer factory.Close()
17
+
18
+	// 简化的SQL,只测试3个参数
19
+	sql := `
20
+		SELECT * FROM (
21
+			SELECT a.*, ROWNUM rn FROM (
22
+				SELECT CLOTHING_ID, CLOTHING_NAME 
23
+				FROM X6_STOCK_DEV.A3_CLOTHING 
24
+				where clothing_id>:1
25
+				ORDER BY CLOTHING_ID
26
+			) a WHERE ROWNUM <= :2
27
+		) WHERE rn > :3
28
+	`
29
+
30
+	// // 3个参数
31
+	params := []interface{}{
32
+		"A",
33
+		10,
34
+		0,
35
+	}
36
+
37
+	// // 3个参数
38
+	// params := map[string]interface{}{
39
+
40
+	// 	"end_row":   10,
41
+	// 	"start_row": 0,
42
+	// }
43
+	// // 3个参数
44
+	// params := map[string]interface{}{
45
+	// 	"clothing_id": "`0`",
46
+	// 	"end_row":     10,
47
+	// 	"start_row":   0,
48
+	// }
49
+
50
+	// 执行查询
51
+	result := factory.QueryPositionalToJSON(sql, params)
52
+
53
+	// 检查结果 - 根据你的错误信息,result.Error 是 string 类型
54
+	if err != nil { // 改成检查空字符串
55
+		t.Errorf("Named parameters query failed: %s", err)
56
+	} else {
57
+		// 将请求参数输出为格式化的 JSON 日志
58
+		reqJSON, err := json.MarshalIndent(result, "", "  ")
59
+		if err != nil {
60
+			log.Printf("无法序列化请求参数: %v", err)
61
+		} else {
62
+			log.Printf("QueryRequest 参数:\n%s", string(reqJSON))
63
+		}
64
+
65
+		// 或者使用 fmt 输出
66
+		//fmt.Printf("=== QueryRequest ===\n%s\n====================\n", string(reqJSON))
67
+	}
68
+
69
+}

+ 97
- 0
test/mysql_test.go Ver arquivo

@@ -0,0 +1,97 @@
1
+package main
2
+
3
+import (
4
+	"encoding/json"
5
+	"log"
6
+	"testing"
7
+
8
+	"git.x2erp.com/qdy/go-db/factory"
9
+)
10
+
11
+func TestNamedParamsQuery(t *testing.T) {
12
+	factory, err := factory.GetDBFactory()
13
+	if err != nil {
14
+		t.Fatalf("Failed to get DB factory: %v", err)
15
+	}
16
+	defer factory.Close()
17
+
18
+	// 简化的SQL,只测试3个参数
19
+	sql := `
20
+		SELECT * FROM (
21
+			SELECT a.*, ROWNUM rn FROM (
22
+				SELECT CLOTHING_ID, CLOTHING_NAME 
23
+				FROM X6_STOCK_DEV.A3_CLOTHING 
24
+				
25
+				ORDER BY CLOTHING_ID
26
+			) a WHERE ROWNUM <= :1
27
+		) WHERE rn > :2
28
+	`
29
+
30
+	// // 3个参数
31
+	params := []interface{}{
32
+		10,
33
+		0,
34
+	}
35
+
36
+	db := factory.GetDB()
37
+	rows, err := db.Query(sql, params...) // 获取行结果集
38
+
39
+	if err != nil {
40
+		log.Fatalf("查询失败: %v", err)
41
+	}
42
+	defer rows.Close()
43
+
44
+	// 1. 获取列信息
45
+	columns, err := rows.Columns()
46
+	if err != nil {
47
+		log.Fatalf("获取列失败: %v", err)
48
+	}
49
+
50
+	// 2. 准备存储结果的切片
51
+	var results []map[string]interface{}
52
+
53
+	// 3. 遍历每一行
54
+	for rows.Next() {
55
+		// 创建值的切片(每个列一个值)
56
+		values := make([]interface{}, len(columns))
57
+		valuePtrs := make([]interface{}, len(columns))
58
+		for i := range values {
59
+			valuePtrs[i] = &values[i]
60
+		}
61
+
62
+		// 扫描行数据
63
+		err := rows.Scan(valuePtrs...)
64
+		if err != nil {
65
+			log.Fatalf("扫描行失败: %v", err)
66
+		}
67
+
68
+		// 将当前行转换为 map
69
+		rowMap := make(map[string]interface{})
70
+		for i, col := range columns {
71
+			val := values[i]
72
+
73
+			// 处理特殊类型(如 []byte 转换为 string)
74
+			if b, ok := val.([]byte); ok {
75
+				rowMap[col] = string(b)
76
+			} else {
77
+				rowMap[col] = val
78
+			}
79
+		}
80
+
81
+		results = append(results, rowMap)
82
+	}
83
+
84
+	// 检查遍历过程中的错误
85
+	if err = rows.Err(); err != nil {
86
+		log.Fatalf("遍历行错误: %v", err)
87
+	}
88
+
89
+	// 4. 现在可以序列化为 JSON
90
+	reqJSON1, err := json.MarshalIndent(results, "", "  ")
91
+	if err != nil {
92
+		log.Fatalf("JSON序列化失败: %v", err)
93
+	}
94
+
95
+	log.Printf("查询结果:\n%s", string(reqJSON1))
96
+	log.Printf("共 %d 行数据", len(results))
97
+}

Carregando…
Cancelar
Salvar