| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614 |
- package mongodb
-
- import (
- "context"
- "crypto/tls"
- "fmt"
- "log"
- "sync"
- "time"
-
- "git.x2erp.com/qdy/go-base/config"
- "git.x2erp.com/qdy/go-base/config/subconfigs"
- "git.x2erp.com/qdy/go-base/logger"
- "git.x2erp.com/qdy/go-base/model/response"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/mongo"
- "go.mongodb.org/mongo-driver/mongo/options"
- "go.mongodb.org/mongo-driver/mongo/readpref"
- )
-
- // MongoDBFactory MongoDB工厂(全局单例模式)
- type MongoDBFactory struct {
- client *mongo.Client
- db *mongo.Database
- config *subconfigs.MongoDBConfig
- mu sync.RWMutex // 添加读写锁保护
- }
-
- var (
- instanceMongodb *MongoDBFactory
- instanceMongodbOnce sync.Once
- initErr error
- )
-
- // CreateFactory 获取MongoDB工厂单例
- func CreateFactory(cfg config.IConfig) *MongoDBFactory {
-
- config := cfg.GetMongoDBConfig()
-
- instanceMongodbOnce.Do(func() {
- if config == nil {
- log.Fatal("配置未初始化,请先在yaml进行配置")
- }
-
- // 设置默认值
- if config.Timeout == 0 {
- config.Timeout = 30
- }
- if config.MaxPoolSize == 0 {
- config.MaxPoolSize = 100
- }
- if config.MinPoolSize == 0 {
- config.MinPoolSize = 10
- }
- if config.AuthSource == "" {
- config.AuthSource = "admin"
- }
-
- // 验证配置
- if config.URI == "" {
- initErr = fmt.Errorf("mongodb URI must be configured")
- return
- }
- if config.Database == "" {
- initErr = fmt.Errorf("mongodb database name must be configured")
- return
- }
-
- log.Printf("Creating MongoDB connection...")
-
- // 创建客户端选项
- clientOptions := options.Client().
- ApplyURI(config.URI)
- //SetConnectTimeout(config.Timeout).
- //SetSocketTimeout(config.Timeout).
- //SetServerSelectionTimeout(config.Timeout).
- //SetMaxPoolSize(config.MaxPoolSize).
- //SetMinPoolSize(config.MinPoolSize).
- //SetMaxConnIdleTime(5 * time.Minute).
- //SetHeartbeatInterval(10 * time.Second)
-
- // 设置认证
- if config.Username != "" && config.Password != "" {
- clientOptions.SetAuth(options.Credential{
- Username: config.Username,
- Password: config.Password,
- AuthSource: config.AuthSource,
- })
- }
-
- // 设置SSL
- if config.SSL {
- clientOptions.SetTLSConfig(&tls.Config{
- InsecureSkipVerify: false,
- })
- }
-
- // 创建上下文
- ctx, cancel := context.WithTimeout(context.Background(), config.GetTimeout())
- defer cancel()
-
- log.Printf(" context.WithTimeout ... successfully.")
-
- // 连接MongoDB
- client, err := mongo.Connect(ctx, clientOptions)
- if err != nil {
- initErr = fmt.Errorf("failed to connect to MongoDB: %v", err)
- return
- }
-
- log.Printf(" mongo.connect ... successfully.")
-
- // // 测试连接
- // if err := client.Ping(ctx, nil); err != nil {
- // initErr = fmt.Errorf("failed to ping MongoDB: %v", err)
- // return
- // }
-
- // 获取数据库
- database := client.Database(config.Database)
-
- log.Printf("MongoDBFactory is successfully created.\n")
-
- instanceMongodb = &MongoDBFactory{
- client: client,
- db: database,
- config: config,
- }
- })
-
- if initErr != nil {
- //logger.Errorf("MongoDBFactory is error:'%v'", initErr)
- log.Fatalf("MongoDBFactory is error:'%v'", initErr)
- //return nil
- }
-
- return instanceMongodb
- }
-
- func (c *MongoDBFactory) GetTimeout() time.Duration {
- return c.config.GetTimeout()
- }
-
- // TestConnection 测试连接
- func (f *MongoDBFactory) TestConnection() {
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
-
- if err := f.client.Ping(ctx, readpref.Primary()); err != nil {
- log.Printf("MongoDB连接测试失败: %v", err)
- } else {
- log.Printf("MongoDB连接测试通过.")
- }
-
- }
-
- // ========== MongoDBFactory 实例方法 ==========
-
- // GetClient 获取MongoDB客户端
- func (f *MongoDBFactory) GetClient() *mongo.Client {
- return f.client
- }
-
- // GetDatabase 获取数据库
- func (f *MongoDBFactory) GetDatabase() *mongo.Database {
- return f.db
- }
-
- // GetCollection 获取集合(类似获取数据库连接)
- func (f *MongoDBFactory) GetCollection(collectionName string) *mongo.Collection {
- return f.db.Collection(collectionName)
- }
-
- func (f *MongoDBFactory) GetName() string {
- return "MongoDBFactory"
- }
-
- // Close 关闭MongoDB连接
- func (f *MongoDBFactory) Close() {
- if f.client != nil {
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
-
- err := f.client.Disconnect(ctx)
- if err != nil {
- logger.Errorf("failed to disconnect MongoDB: %v", err)
- //return fmt.Errorf("failed to disconnect MongoDB: %v", err)
- }
-
- log.Printf("MongoDB connection closed gracefully\n")
- f.client = nil
- f.db = nil
- }
-
- }
-
- // GetConfig 获取配置信息
- func (f *MongoDBFactory) GetConfig() subconfigs.MongoDBConfig {
- return *f.config
- }
-
- // ========== 快捷操作方法(增强版) ==========
-
- // InsertOne 插入单个文档,返回是否成功
- func (f *MongoDBFactory) InsertOne(collectionName string, document interface{}) (*response.QueryResult[interface{}], error) {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- result, err := collection.InsertOne(ctx, document)
- if err != nil {
- logger.Errorf("插入文档失败,集合:%s,错误:%v", collectionName, err)
- return &response.QueryResult[interface{}]{
- Success: false,
- Error: err.Error(),
- }, err
- }
- return &response.QueryResult[interface{}]{
- Success: true,
- Data: result.InsertedID,
- }, nil
- }
-
- // InsertOneWithResult 插入单个文档并返回结果
- func (f *MongoDBFactory) InsertOneWithResult(collectionName string, document interface{}) (*mongo.InsertOneResult, bool) {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- result, err := collection.InsertOne(ctx, document)
- if err != nil {
- logger.Errorf("插入文档失败,集合:%s,错误:%v", collectionName, err)
- return nil, false
- }
- return result, true
- }
-
- // InsertMany 插入多个文档,返回是否成功
- func (f *MongoDBFactory) InsertMany(collectionName string, documents []interface{}) bool {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- _, err := collection.InsertMany(ctx, documents)
- if err != nil {
- logger.Errorf("批量插入文档失败,集合:%s,错误:%v", collectionName, err)
- return false
- }
- return true
- }
-
- // InsertManyWithResult 插入多个文档并返回结果
- func (f *MongoDBFactory) InsertManyWithResult(collectionName string, documents []interface{}) (*mongo.InsertManyResult, bool) {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- result, err := collection.InsertMany(ctx, documents)
- if err != nil {
- logger.Errorf("批量插入文档失败,集合:%s,错误:%v", collectionName, err)
- return nil, false
- }
- return result, true
- }
-
- // FindOne 查询单个文档,返回解码后的对象
- func (f *MongoDBFactory) FindOne(collectionName string, filter interface{}, result interface{}) error {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- err := collection.FindOne(ctx, filter).Decode(result)
- if err != nil {
- if err != mongo.ErrNoDocuments {
- logger.Errorf("查询文档失败,集合:%s,过滤条件:%v,错误:%v", collectionName, filter, err)
- }
- return err
- }
- return nil
- }
-
- // FindOneAndDecode 查询单个文档并自动解码(简化版)
- func (f *MongoDBFactory) FindOneAndDecode(collectionName string, filter interface{}) (interface{}, error) {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- var result map[string]interface{}
- err := collection.FindOne(ctx, filter).Decode(&result)
- if err != nil {
- if err != mongo.ErrNoDocuments {
- logger.Errorf("查询文档失败,集合:%s,过滤条件:%v,错误:%v", collectionName, filter, err)
- }
- return nil, err
- }
- return result, nil
- }
-
- // Find 查询多个文档,返回对象数组
- func (f *MongoDBFactory) Find(collectionName string, filter interface{}, results interface{}, opts ...*options.FindOptions) error {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- cursor, err := collection.Find(ctx, filter, opts...)
- if err != nil {
- logger.Errorf("查询文档失败,集合:%s,过滤条件:%v,错误:%v", collectionName, filter, err)
- return err
- }
- defer cursor.Close(ctx)
-
- // 解码到传入的切片指针中
- if err = cursor.All(ctx, results); err != nil {
- logger.Errorf("解码查询结果失败,集合:%s,错误:%v", collectionName, err)
- return err
- }
- return nil
- }
-
- // FindAll 查询所有文档,返回对象数组
- func (f *MongoDBFactory) FindAll(collectionName string, results interface{}) error {
- return f.Find(collectionName, bson.M{}, results)
- }
-
- // UpdateOneWithResult 更新单个文档并返回结果
- func (f *MongoDBFactory) UpdateOneWithResult(collectionName string, filter interface{}, update interface{}) (*mongo.UpdateResult, bool) {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- result, err := collection.UpdateOne(ctx, filter, update)
- if err != nil {
- logger.Errorf("更新文档失败,集合:%s,过滤条件:%v,错误:%v", collectionName, filter, err)
- return nil, false
- }
- return result, true
- }
-
- // CountDocuments 统计文档数量,返回数量
- func (f *MongoDBFactory) CountDocuments(collectionName string, filter interface{}) (int64, error) {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- count, err := collection.CountDocuments(ctx, filter)
- if err != nil {
- logger.Errorf("统计文档数量失败,集合:%s,过滤条件:%v,错误:%v", collectionName, filter, err)
- return 0, err
- }
- return count, nil
- }
-
- // Aggregate 聚合查询,返回对象数组
- func (f *MongoDBFactory) Aggregate(collectionName string, pipeline interface{}, results interface{}) error {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- cursor, err := collection.Aggregate(ctx, pipeline)
- if err != nil {
- logger.Errorf("聚合查询失败,集合:%s,管道:%v,错误:%v", collectionName, pipeline, err)
- return err
- }
- defer cursor.Close(ctx)
-
- if err = cursor.All(ctx, results); err != nil {
- logger.Errorf("解码聚合结果失败,集合:%s,错误:%v", collectionName, err)
- return err
- }
- return nil
- }
-
- // FindOneAndUpdate 查找并更新,返回更新后的文档
- func (f *MongoDBFactory) FindOneAndUpdate(collectionName string, filter interface{}, update interface{}, result interface{}) error {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- err := collection.FindOneAndUpdate(ctx, filter, update).Decode(result)
- if err != nil {
- if err != mongo.ErrNoDocuments {
- logger.Errorf("查找并更新失败,集合:%s,过滤条件:%v,错误:%v", collectionName, filter, err)
- }
- return err
- }
- return nil
- }
-
- // FindOneAndDelete 查找并删除,返回删除的文档
- func (f *MongoDBFactory) FindOneAndDelete(collectionName string, filter interface{}, result interface{}) error {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- err := collection.FindOneAndDelete(ctx, filter).Decode(result)
- if err != nil {
- if err != mongo.ErrNoDocuments {
- logger.Errorf("查找并删除失败,集合:%s,过滤条件:%v,错误:%v", collectionName, filter, err)
- }
- return err
- }
- return nil
- }
-
- // BulkWrite 批量写入操作,返回是否成功
- func (f *MongoDBFactory) BulkWrite(collectionName string, operations []mongo.WriteModel) bool {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- _, err := collection.BulkWrite(ctx, operations)
- if err != nil {
- logger.Errorf("批量写入操作失败,集合:%s,错误:%v", collectionName, err)
- return false
- }
- return true
- }
-
- // CreateIndex 创建索引,返回是否成功
- func (f *MongoDBFactory) CreateIndex(collectionName string, keys interface{}, opts ...*options.IndexOptions) bool {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- indexModel := mongo.IndexModel{
- Keys: keys,
- }
-
- if len(opts) > 0 && opts[0] != nil {
- indexModel.Options = opts[0]
- }
-
- _, err := collection.Indexes().CreateOne(ctx, indexModel)
- if err != nil {
- logger.Errorf("创建索引失败,集合:%s,键:%v,错误:%v", collectionName, keys, err)
- return false
- }
- return true
- }
-
- // FindWithPagination 分页查询,返回对象数组
- func (f *MongoDBFactory) FindWithPagination(
- collectionName string,
- filter interface{},
- skip, limit int64,
- sort interface{},
- results interface{},
- ) error {
- //collection := f.GetCollection(collectionName)
- //ctx, cancel := context.WithTimeout(context.Background(), f.config.Timeout)
- //defer cancel()
-
- findOptions := options.Find().
- SetSkip(skip).
- SetLimit(limit)
-
- if sort != nil {
- findOptions.SetSort(sort)
- }
-
- return f.Find(collectionName, filter, results, findOptions)
- }
-
- // FindOneByID 根据ID查询文档
- func (f *MongoDBFactory) FindOneByID(collectionName string, id interface{}, result interface{}) error {
- return f.FindOne(collectionName, bson.M{"_id": id}, result)
- }
-
- // Exists 检查文档是否存在
- func (f *MongoDBFactory) Exists(collectionName string, filter interface{}) (bool, error) {
- count, err := f.CountDocuments(collectionName, filter)
- if err != nil {
- return false, err
- }
- return count > 0, nil
- }
-
- // FindAndCount 查询文档并返回总数(用于分页场景)
- func (f *MongoDBFactory) FindAndCount(
- collectionName string,
- filter interface{},
- skip, limit int64,
- sort interface{},
- results interface{},
- ) (int64, error) {
- // 查询数据
- err := f.FindWithPagination(collectionName, filter, skip, limit, sort, results)
- if err != nil {
- return 0, err
- }
-
- // 查询总数
- total, err := f.CountDocuments(collectionName, filter)
- if err != nil {
- return 0, err
- }
-
- return total, nil
- }
-
- // UpdateOne 更新单个文档,返回是否执行成功和影响记录数
- func (f *MongoDBFactory) UpdateOne(collectionName string, filter interface{}, update interface{}) (bool, int64) {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- result, err := collection.UpdateOne(ctx, filter, update)
- if err != nil {
- logger.Errorf("更新文档失败,集合:%s,过滤条件:%v,错误:%v", collectionName, filter, err)
- return false, 0
- }
-
- // 返回执行成功 和 实际更新的文档数
- return true, result.ModifiedCount
- }
-
- // UpdateOneWithMatch 更新单个文档,只有匹配到文档才返回成功
- func (f *MongoDBFactory) UpdateOneWithMatch(collectionName string, filter interface{}, update interface{}) bool {
- success, modifiedCount := f.UpdateOne(collectionName, filter, update)
- return success && modifiedCount > 0
- }
-
- // UpdateMany 更新多个文档,返回是否执行成功和影响记录数
- func (f *MongoDBFactory) UpdateMany(collectionName string, filter interface{}, update interface{}) (bool, int64) {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- result, err := collection.UpdateMany(ctx, filter, update)
- if err != nil {
- logger.Errorf("批量更新文档失败,集合:%s,过滤条件:%v,错误:%v", collectionName, filter, err)
- return false, 0
- }
-
- return true, result.ModifiedCount
- }
-
- // DeleteOne 删除单个文档,返回是否执行成功和删除的记录数
- func (f *MongoDBFactory) DeleteOne(collectionName string, filter interface{}) (bool, int64) {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- result, err := collection.DeleteOne(ctx, filter)
- if err != nil {
- logger.Errorf("删除文档失败,集合:%s,过滤条件:%v,错误:%v", collectionName, filter, err)
- return false, 0
- }
-
- // 返回执行成功 和 实际删除的文档数
- return true, result.DeletedCount
- }
-
- // DeleteOneWithMatch 删除单个文档,只有删除了文档才返回成功
- func (f *MongoDBFactory) DeleteOneWithMatch(collectionName string, filter interface{}) bool {
- success, deletedCount := f.DeleteOne(collectionName, filter)
- return success && deletedCount > 0
- }
-
- // DeleteMany 删除多个文档,返回是否执行成功和删除的记录数
- func (f *MongoDBFactory) DeleteMany(collectionName string, filter interface{}) (bool, int64) {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- result, err := collection.DeleteMany(ctx, filter)
- if err != nil {
- logger.Errorf("批量删除文档失败,集合:%s,过滤条件:%v,错误:%v", collectionName, filter, err)
- return false, 0
- }
-
- return true, result.DeletedCount
- }
-
- // UpsertOne 更新或插入文档(upsert操作)
- func (f *MongoDBFactory) UpsertOne(collectionName string, filter interface{}, update interface{}) (bool, interface{}) {
- collection := f.GetCollection(collectionName)
- ctx, cancel := context.WithTimeout(context.Background(), f.config.GetTimeout())
- defer cancel()
-
- // 设置 upsert 选项
- opts := options.Update().SetUpsert(true)
-
- result, err := collection.UpdateOne(ctx, filter, update, opts)
- if err != nil {
- logger.Errorf("Upsert操作失败,集合:%s,过滤条件:%v,错误:%v", collectionName, filter, err)
- return false, result
- }
-
- // 返回: 成功状态, 匹配数, 修改数, 插入数
- return true, result
- }
-
- // UpdateOneByID 根据ID更新文档
- func (f *MongoDBFactory) UpdateOneByID(collectionName string, id interface{}, update interface{}) (bool, int64) {
- return f.UpdateOne(collectionName, bson.M{"_id": id}, update)
- }
-
- // UpdateOneByIDWithMatch 根据ID更新文档,只有匹配到才返回成功
- func (f *MongoDBFactory) UpdateOneByIDWithMatch(collectionName string, id interface{}, update interface{}) bool {
- success, modifiedCount := f.UpdateOneByID(collectionName, id, update)
- return success && modifiedCount > 0
- }
-
- // DeleteOneByID 根据ID删除文档
- func (f *MongoDBFactory) DeleteOneByID(collectionName string, id interface{}) (bool, int64) {
- return f.DeleteOne(collectionName, bson.M{"_id": id})
- }
-
- // DeleteOneByIDWithMatch 根据ID删除文档,只有删除了才返回成功
- func (f *MongoDBFactory) DeleteOneByIDWithMatch(collectionName string, id interface{}) bool {
- success, deletedCount := f.DeleteOneByID(collectionName, id)
- return success && deletedCount > 0
- }
|