|
|
@@ -8,6 +8,7 @@ import (
|
|
8
|
8
|
"sync"
|
|
9
|
9
|
"time"
|
|
10
|
10
|
|
|
|
11
|
+ "git.x2erp.com/qdy/go-base/config"
|
|
11
|
12
|
"git.x2erp.com/qdy/go-base/config/subconfigs"
|
|
12
|
13
|
"git.x2erp.com/qdy/go-base/logger"
|
|
13
|
14
|
"go.mongodb.org/mongo-driver/bson"
|
|
|
@@ -21,7 +22,7 @@ type MongoDBFactory struct {
|
|
21
|
22
|
client *mongo.Client
|
|
22
|
23
|
db *mongo.Database
|
|
23
|
24
|
config *subconfigs.MongoDBConfig
|
|
24
|
|
- //mu sync.RWMutex // 添加读写锁保护
|
|
|
25
|
+ mu sync.RWMutex // 添加读写锁保护
|
|
25
|
26
|
}
|
|
26
|
27
|
|
|
27
|
28
|
var (
|
|
|
@@ -30,8 +31,10 @@ var (
|
|
30
|
31
|
initErr error
|
|
31
|
32
|
)
|
|
32
|
33
|
|
|
33
|
|
-// GetMongoDBFactory 获取MongoDB工厂单例
|
|
34
|
|
-func GetMongoDBFactory(config *subconfigs.MongoDBConfig) *MongoDBFactory {
|
|
|
34
|
+// CreateFactory 获取MongoDB工厂单例
|
|
|
35
|
+func CreateFactory(cfg config.IConfig) *MongoDBFactory {
|
|
|
36
|
+
|
|
|
37
|
+ config := cfg.GetMongoDBConfig()
|
|
35
|
38
|
|
|
36
|
39
|
instanceMongodbOnce.Do(func() {
|
|
37
|
40
|
if config == nil {
|
|
|
@@ -40,7 +43,7 @@ func GetMongoDBFactory(config *subconfigs.MongoDBConfig) *MongoDBFactory {
|
|
40
|
43
|
|
|
41
|
44
|
// 设置默认值
|
|
42
|
45
|
if config.Timeout == 0 {
|
|
43
|
|
- config.Timeout = 20 * time.Second
|
|
|
46
|
+ config.Timeout = 30
|
|
44
|
47
|
}
|
|
45
|
48
|
if config.MaxPoolSize == 0 {
|
|
46
|
49
|
config.MaxPoolSize = 100
|
|
|
@@ -66,14 +69,14 @@ func GetMongoDBFactory(config *subconfigs.MongoDBConfig) *MongoDBFactory {
|
|
66
|
69
|
|
|
67
|
70
|
// 创建客户端选项
|
|
68
|
71
|
clientOptions := options.Client().
|
|
69
|
|
- ApplyURI(config.URI).
|
|
70
|
|
- SetConnectTimeout(config.Timeout).
|
|
71
|
|
- SetSocketTimeout(config.Timeout).
|
|
72
|
|
- SetServerSelectionTimeout(config.Timeout).
|
|
73
|
|
- SetMaxPoolSize(config.MaxPoolSize).
|
|
74
|
|
- SetMinPoolSize(config.MinPoolSize).
|
|
75
|
|
- SetMaxConnIdleTime(5 * time.Minute).
|
|
76
|
|
- SetHeartbeatInterval(10 * time.Second)
|
|
|
72
|
+ ApplyURI(config.URI)
|
|
|
73
|
+ //SetConnectTimeout(config.Timeout).
|
|
|
74
|
+ //SetSocketTimeout(config.Timeout).
|
|
|
75
|
+ //SetServerSelectionTimeout(config.Timeout).
|
|
|
76
|
+ //SetMaxPoolSize(config.MaxPoolSize).
|
|
|
77
|
+ //SetMinPoolSize(config.MinPoolSize).
|
|
|
78
|
+ //SetMaxConnIdleTime(5 * time.Minute).
|
|
|
79
|
+ //SetHeartbeatInterval(10 * time.Second)
|
|
77
|
80
|
|
|
78
|
81
|
// 设置认证
|
|
79
|
82
|
if config.Username != "" && config.Password != "" {
|
|
|
@@ -92,7 +95,7 @@ func GetMongoDBFactory(config *subconfigs.MongoDBConfig) *MongoDBFactory {
|
|
92
|
95
|
}
|
|
93
|
96
|
|
|
94
|
97
|
// 创建上下文
|
|
95
|
|
- ctx, cancel := context.WithTimeout(context.Background(), config.Timeout)
|
|
|
98
|
+ ctx, cancel := context.WithTimeout(context.Background(), config.GetTimeout())
|
|
96
|
99
|
defer cancel()
|
|
97
|
100
|
|
|
98
|
101
|
log.Printf(" context.WithTimeout ... successfully.")
|
|
|
@@ -133,6 +136,23 @@ func GetMongoDBFactory(config *subconfigs.MongoDBConfig) *MongoDBFactory {
|
|
133
|
136
|
return instanceMongodb
|
|
134
|
137
|
}
|
|
135
|
138
|
|
|
|
139
|
+func (c *MongoDBFactory) GetTimeout() time.Duration {
|
|
|
140
|
+ return c.config.GetTimeout()
|
|
|
141
|
+}
|
|
|
142
|
+
|
|
|
143
|
+// TestConnection 测试连接
|
|
|
144
|
+func (f *MongoDBFactory) TestConnection() {
|
|
|
145
|
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
|
146
|
+ defer cancel()
|
|
|
147
|
+
|
|
|
148
|
+ if err := f.client.Ping(ctx, readpref.Primary()); err != nil {
|
|
|
149
|
+ log.Printf("MongoDB连接测试失败: %v", err)
|
|
|
150
|
+ } else {
|
|
|
151
|
+ log.Printf("MongoDB连接测试通过.")
|
|
|
152
|
+ }
|
|
|
153
|
+
|
|
|
154
|
+}
|
|
|
155
|
+
|
|
136
|
156
|
// ========== MongoDBFactory 实例方法 ==========
|
|
137
|
157
|
|
|
138
|
158
|
// GetClient 获取MongoDB客户端
|
|
|
@@ -150,6 +170,10 @@ func (f *MongoDBFactory) GetCollection(collectionName string) *mongo.Collection
|
|
150
|
170
|
return f.db.Collection(collectionName)
|
|
151
|
171
|
}
|
|
152
|
172
|
|
|
|
173
|
+func (f *MongoDBFactory) GetName() string {
|
|
|
174
|
+ return "MongoDBFactory"
|
|
|
175
|
+}
|
|
|
176
|
+
|
|
153
|
177
|
// Close 关闭MongoDB连接
|
|
154
|
178
|
func (f *MongoDBFactory) Close() {
|
|
155
|
179
|
if f.client != nil {
|
|
|
@@ -174,20 +198,12 @@ func (f *MongoDBFactory) GetConfig() subconfigs.MongoDBConfig {
|
|
174
|
198
|
return *f.config
|
|
175
|
199
|
}
|
|
176
|
200
|
|
|
177
|
|
-// TestConnection 测试连接
|
|
178
|
|
-func (f *MongoDBFactory) TestConnection() error {
|
|
179
|
|
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
180
|
|
- defer cancel()
|
|
181
|
|
-
|
|
182
|
|
- return f.client.Ping(ctx, readpref.Primary())
|
|
183
|
|
-}
|
|
184
|
|
-
|
|
185
|
201
|
// ========== 快捷操作方法(增强版) ==========
|
|
186
|
202
|
|
|
187
|
203
|
// InsertOne 插入单个文档,返回是否成功
|
|
188
|
204
|
func (f *MongoDBFactory) InsertOne(collectionName string, document interface{}) bool {
|
|
189
|
205
|
collection := f.GetCollection(collectionName)
|
|
190
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
206
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
191
|
207
|
defer cancel()
|
|
192
|
208
|
|
|
193
|
209
|
_, err := collection.InsertOne(ctx, document)
|
|
|
@@ -201,7 +217,7 @@ func (f *MongoDBFactory) InsertOne(collectionName string, document interface{})
|
|
201
|
217
|
// InsertOneWithResult 插入单个文档并返回结果
|
|
202
|
218
|
func (f *MongoDBFactory) InsertOneWithResult(collectionName string, document interface{}) (*mongo.InsertOneResult, bool) {
|
|
203
|
219
|
collection := f.GetCollection(collectionName)
|
|
204
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
220
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
205
|
221
|
defer cancel()
|
|
206
|
222
|
|
|
207
|
223
|
result, err := collection.InsertOne(ctx, document)
|
|
|
@@ -215,7 +231,7 @@ func (f *MongoDBFactory) InsertOneWithResult(collectionName string, document int
|
|
215
|
231
|
// InsertMany 插入多个文档,返回是否成功
|
|
216
|
232
|
func (f *MongoDBFactory) InsertMany(collectionName string, documents []interface{}) bool {
|
|
217
|
233
|
collection := f.GetCollection(collectionName)
|
|
218
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
234
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
219
|
235
|
defer cancel()
|
|
220
|
236
|
|
|
221
|
237
|
_, err := collection.InsertMany(ctx, documents)
|
|
|
@@ -229,7 +245,7 @@ func (f *MongoDBFactory) InsertMany(collectionName string, documents []interface
|
|
229
|
245
|
// InsertManyWithResult 插入多个文档并返回结果
|
|
230
|
246
|
func (f *MongoDBFactory) InsertManyWithResult(collectionName string, documents []interface{}) (*mongo.InsertManyResult, bool) {
|
|
231
|
247
|
collection := f.GetCollection(collectionName)
|
|
232
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
248
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
233
|
249
|
defer cancel()
|
|
234
|
250
|
|
|
235
|
251
|
result, err := collection.InsertMany(ctx, documents)
|
|
|
@@ -243,7 +259,7 @@ func (f *MongoDBFactory) InsertManyWithResult(collectionName string, documents [
|
|
243
|
259
|
// FindOne 查询单个文档,返回解码后的对象
|
|
244
|
260
|
func (f *MongoDBFactory) FindOne(collectionName string, filter interface{}, result interface{}) error {
|
|
245
|
261
|
collection := f.GetCollection(collectionName)
|
|
246
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
262
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
247
|
263
|
defer cancel()
|
|
248
|
264
|
|
|
249
|
265
|
err := collection.FindOne(ctx, filter).Decode(result)
|
|
|
@@ -259,7 +275,7 @@ func (f *MongoDBFactory) FindOne(collectionName string, filter interface{}, resu
|
|
259
|
275
|
// FindOneAndDecode 查询单个文档并自动解码(简化版)
|
|
260
|
276
|
func (f *MongoDBFactory) FindOneAndDecode(collectionName string, filter interface{}) (interface{}, error) {
|
|
261
|
277
|
collection := f.GetCollection(collectionName)
|
|
262
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
278
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
263
|
279
|
defer cancel()
|
|
264
|
280
|
|
|
265
|
281
|
var result map[string]interface{}
|
|
|
@@ -276,7 +292,7 @@ func (f *MongoDBFactory) FindOneAndDecode(collectionName string, filter interfac
|
|
276
|
292
|
// Find 查询多个文档,返回对象数组
|
|
277
|
293
|
func (f *MongoDBFactory) Find(collectionName string, filter interface{}, results interface{}, opts ...*options.FindOptions) error {
|
|
278
|
294
|
collection := f.GetCollection(collectionName)
|
|
279
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
295
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
280
|
296
|
defer cancel()
|
|
281
|
297
|
|
|
282
|
298
|
cursor, err := collection.Find(ctx, filter, opts...)
|
|
|
@@ -302,7 +318,7 @@ func (f *MongoDBFactory) FindAll(collectionName string, results interface{}) err
|
|
302
|
318
|
// UpdateOneWithResult 更新单个文档并返回结果
|
|
303
|
319
|
func (f *MongoDBFactory) UpdateOneWithResult(collectionName string, filter interface{}, update interface{}) (*mongo.UpdateResult, bool) {
|
|
304
|
320
|
collection := f.GetCollection(collectionName)
|
|
305
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
321
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
306
|
322
|
defer cancel()
|
|
307
|
323
|
|
|
308
|
324
|
result, err := collection.UpdateOne(ctx, filter, update)
|
|
|
@@ -316,7 +332,7 @@ func (f *MongoDBFactory) UpdateOneWithResult(collectionName string, filter inter
|
|
316
|
332
|
// CountDocuments 统计文档数量,返回数量
|
|
317
|
333
|
func (f *MongoDBFactory) CountDocuments(collectionName string, filter interface{}) (int64, error) {
|
|
318
|
334
|
collection := f.GetCollection(collectionName)
|
|
319
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
335
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
320
|
336
|
defer cancel()
|
|
321
|
337
|
|
|
322
|
338
|
count, err := collection.CountDocuments(ctx, filter)
|
|
|
@@ -330,7 +346,7 @@ func (f *MongoDBFactory) CountDocuments(collectionName string, filter interface{
|
|
330
|
346
|
// Aggregate 聚合查询,返回对象数组
|
|
331
|
347
|
func (f *MongoDBFactory) Aggregate(collectionName string, pipeline interface{}, results interface{}) error {
|
|
332
|
348
|
collection := f.GetCollection(collectionName)
|
|
333
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
349
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
334
|
350
|
defer cancel()
|
|
335
|
351
|
|
|
336
|
352
|
cursor, err := collection.Aggregate(ctx, pipeline)
|
|
|
@@ -350,7 +366,7 @@ func (f *MongoDBFactory) Aggregate(collectionName string, pipeline interface{},
|
|
350
|
366
|
// FindOneAndUpdate 查找并更新,返回更新后的文档
|
|
351
|
367
|
func (f *MongoDBFactory) FindOneAndUpdate(collectionName string, filter interface{}, update interface{}, result interface{}) error {
|
|
352
|
368
|
collection := f.GetCollection(collectionName)
|
|
353
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
369
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
354
|
370
|
defer cancel()
|
|
355
|
371
|
|
|
356
|
372
|
err := collection.FindOneAndUpdate(ctx, filter, update).Decode(result)
|
|
|
@@ -366,7 +382,7 @@ func (f *MongoDBFactory) FindOneAndUpdate(collectionName string, filter interfac
|
|
366
|
382
|
// FindOneAndDelete 查找并删除,返回删除的文档
|
|
367
|
383
|
func (f *MongoDBFactory) FindOneAndDelete(collectionName string, filter interface{}, result interface{}) error {
|
|
368
|
384
|
collection := f.GetCollection(collectionName)
|
|
369
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
385
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
370
|
386
|
defer cancel()
|
|
371
|
387
|
|
|
372
|
388
|
err := collection.FindOneAndDelete(ctx, filter).Decode(result)
|
|
|
@@ -382,7 +398,7 @@ func (f *MongoDBFactory) FindOneAndDelete(collectionName string, filter interfac
|
|
382
|
398
|
// BulkWrite 批量写入操作,返回是否成功
|
|
383
|
399
|
func (f *MongoDBFactory) BulkWrite(collectionName string, operations []mongo.WriteModel) bool {
|
|
384
|
400
|
collection := f.GetCollection(collectionName)
|
|
385
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
401
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
386
|
402
|
defer cancel()
|
|
387
|
403
|
|
|
388
|
404
|
_, err := collection.BulkWrite(ctx, operations)
|
|
|
@@ -396,7 +412,7 @@ func (f *MongoDBFactory) BulkWrite(collectionName string, operations []mongo.Wri
|
|
396
|
412
|
// CreateIndex 创建索引,返回是否成功
|
|
397
|
413
|
func (f *MongoDBFactory) CreateIndex(collectionName string, keys interface{}, opts ...*options.IndexOptions) bool {
|
|
398
|
414
|
collection := f.GetCollection(collectionName)
|
|
399
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
415
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
400
|
416
|
defer cancel()
|
|
401
|
417
|
|
|
402
|
418
|
indexModel := mongo.IndexModel{
|
|
|
@@ -478,7 +494,7 @@ func (f *MongoDBFactory) FindAndCount(
|
|
478
|
494
|
// UpdateOne 更新单个文档,返回是否执行成功和影响记录数
|
|
479
|
495
|
func (f *MongoDBFactory) UpdateOne(collectionName string, filter interface{}, update interface{}) (bool, int64) {
|
|
480
|
496
|
collection := f.GetCollection(collectionName)
|
|
481
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
497
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
482
|
498
|
defer cancel()
|
|
483
|
499
|
|
|
484
|
500
|
result, err := collection.UpdateOne(ctx, filter, update)
|
|
|
@@ -500,7 +516,7 @@ func (f *MongoDBFactory) UpdateOneWithMatch(collectionName string, filter interf
|
|
500
|
516
|
// UpdateMany 更新多个文档,返回是否执行成功和影响记录数
|
|
501
|
517
|
func (f *MongoDBFactory) UpdateMany(collectionName string, filter interface{}, update interface{}) (bool, int64) {
|
|
502
|
518
|
collection := f.GetCollection(collectionName)
|
|
503
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
519
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
504
|
520
|
defer cancel()
|
|
505
|
521
|
|
|
506
|
522
|
result, err := collection.UpdateMany(ctx, filter, update)
|
|
|
@@ -515,7 +531,7 @@ func (f *MongoDBFactory) UpdateMany(collectionName string, filter interface{}, u
|
|
515
|
531
|
// DeleteOne 删除单个文档,返回是否执行成功和删除的记录数
|
|
516
|
532
|
func (f *MongoDBFactory) DeleteOne(collectionName string, filter interface{}) (bool, int64) {
|
|
517
|
533
|
collection := f.GetCollection(collectionName)
|
|
518
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
534
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
519
|
535
|
defer cancel()
|
|
520
|
536
|
|
|
521
|
537
|
result, err := collection.DeleteOne(ctx, filter)
|
|
|
@@ -537,7 +553,7 @@ func (f *MongoDBFactory) DeleteOneWithMatch(collectionName string, filter interf
|
|
537
|
553
|
// DeleteMany 删除多个文档,返回是否执行成功和删除的记录数
|
|
538
|
554
|
func (f *MongoDBFactory) DeleteMany(collectionName string, filter interface{}) (bool, int64) {
|
|
539
|
555
|
collection := f.GetCollection(collectionName)
|
|
540
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
556
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
541
|
557
|
defer cancel()
|
|
542
|
558
|
|
|
543
|
559
|
result, err := collection.DeleteMany(ctx, filter)
|
|
|
@@ -552,7 +568,7 @@ func (f *MongoDBFactory) DeleteMany(collectionName string, filter interface{}) (
|
|
552
|
568
|
// UpsertOne 更新或插入文档(upsert操作)
|
|
553
|
569
|
func (f *MongoDBFactory) UpsertOne(collectionName string, filter interface{}, update interface{}) (bool, interface{}) {
|
|
554
|
570
|
collection := f.GetCollection(collectionName)
|
|
555
|
|
- ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
|
|
|
571
|
+ ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
|
|
556
|
572
|
defer cancel()
|
|
557
|
573
|
|
|
558
|
574
|
// 设置 upsert 选项
|