package dbs import ( "encoding/json" "fmt" "strings" "time" "git.x2erp.com/qdy/go-svc-mcp/internal/mcp" ) func init() { mcp.Register("get_postgresql_columns", "根据表名称获取PostgreSQL表的所有字段相关信息", map[string]interface{}{ "type": "object", "properties": map[string]interface{}{ "table_name": map[string]interface{}{ "type": "string", "description": "表名称", }, "schema": map[string]interface{}{ "type": "string", "description": "模式名称(默认为public)", "default": "", }, "include_comments": map[string]interface{}{ "type": "boolean", "description": "是否包含字段注释", "default": true, }, "database_key": map[string]interface{}{ "type": "string", "description": "数据库配置键名(如:business),可选,默认使用主数据库", "enum": []string{"warehouse", "business"}, "default": "warehouse", }, }, "required": []string{"table_name"}, }, func(input json.RawMessage, deps *mcp.ToolDependencies) (interface{}, error) { var params struct { TableName string `json:"table_name"` Schema string `json:"schema"` IncludeComments bool `json:"include_comments"` DatabaseKey string `json:"database_key"` } if len(input) > 0 { if err := json.Unmarshal(input, ¶ms); err != nil { return nil, err } } // 获取数据库工厂 dbFactory, err := GetDBFactory(params.DatabaseKey, deps) if err != nil { return nil, err } // 获取数据库类型,确保是PostgreSQL dbType := dbFactory.GetDBType() if dbType != "postgresql" { return nil, fmt.Errorf("当前数据库类型为 %s,此工具仅支持PostgreSQL数据库", dbType) } // 设置默认模式 schema := strings.TrimSpace(params.Schema) if schema == "" { schema = "public" } tableName := strings.TrimSpace(params.TableName) if tableName == "" { return nil, fmt.Errorf("表名称不能为空") } // 构建查询SQL var query string if params.IncludeComments { query = ` SELECT c.ordinal_position as column_id, c.column_name, c.data_type, c.is_nullable, c.column_default, pgd.description as column_comment, c.character_maximum_length, c.numeric_precision, c.numeric_scale, c.udt_name as user_defined_type, CASE WHEN pk.column_name IS NOT NULL THEN true ELSE false END as is_primary_key FROM information_schema.columns c LEFT JOIN pg_catalog.pg_description pgd ON pgd.objsubid = c.ordinal_position AND pgd.objoid = (SELECT oid FROM pg_catalog.pg_class WHERE relname = $2 AND relnamespace = (SELECT oid FROM pg_catalog.pg_namespace WHERE nspname = $1)) LEFT JOIN ( SELECT ku.column_name, tc.table_schema, tc.table_name FROM information_schema.table_constraints tc JOIN information_schema.key_column_usage ku ON tc.constraint_name = ku.constraint_name AND tc.table_schema = ku.table_schema WHERE tc.constraint_type = 'PRIMARY KEY' ) pk ON c.table_schema = pk.table_schema AND c.table_name = pk.table_name AND c.column_name = pk.column_name WHERE c.table_schema = $1 AND c.table_name = $2 ORDER BY c.ordinal_position` } else { query = ` SELECT c.ordinal_position as column_id, c.column_name, c.data_type, c.is_nullable, c.column_default, c.character_maximum_length, c.numeric_precision, c.numeric_scale, c.udt_name as user_defined_type, CASE WHEN pk.column_name IS NOT NULL THEN true ELSE false END as is_primary_key FROM information_schema.columns c LEFT JOIN ( SELECT ku.column_name, tc.table_schema, tc.table_name FROM information_schema.table_constraints tc JOIN information_schema.key_column_usage ku ON tc.constraint_name = ku.constraint_name AND tc.table_schema = ku.table_schema WHERE tc.constraint_type = 'PRIMARY KEY' ) pk ON c.table_schema = pk.table_schema AND c.table_name = pk.table_name AND c.column_name = pk.column_name WHERE c.table_schema = $1 AND c.table_name = $2 ORDER BY c.ordinal_position` } // 执行查询 results, err := dbFactory.QuerySliceMapWithParams(query, schema, tableName) if err != nil { return nil, fmt.Errorf("查询表字段信息失败: %v", err) } if len(results) == 0 { // 尝试查询表是否存在 tableExistsQuery := `SELECT COUNT(*) as table_count FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2` tableCheckResults, err := dbFactory.QuerySliceMapWithParams(tableExistsQuery, schema, tableName) if err == nil && len(tableCheckResults) > 0 { if count, ok := tableCheckResults[0]["table_count"].(int64); ok && count == 0 { return nil, fmt.Errorf("表 '%s' 不存在于模式 '%s' 中", tableName, schema) } } return nil, fmt.Errorf("表 '%s' 存在但没有字段信息或表为空", tableName) } // 处理字段信息 for i := range results { // 处理nullable字段 if nullable, ok := results[i]["is_nullable"].(string); ok { results[i]["is_nullable"] = nullable == "YES" } // 处理注释字段 if params.IncludeComments { if comment, ok := results[i]["column_comment"]; ok && comment == nil { results[i]["column_comment"] = "" } } // 处理默认值 if defaultValue, ok := results[i]["column_default"]; ok && defaultValue == nil { results[i]["column_default"] = "" } // 构建完整数据类型 if dataType, ok := results[i]["data_type"].(string); ok { fullType := dataType if length, ok := results[i]["character_maximum_length"].(int64); ok && length > 0 { fullType = fmt.Sprintf("%s(%d)", dataType, length) } else if precision, ok := results[i]["numeric_precision"].(int64); ok && precision > 0 { if scale, ok := results[i]["numeric_scale"].(int64); ok && scale > 0 { fullType = fmt.Sprintf("%s(%d,%d)", dataType, precision, scale) } else { fullType = fmt.Sprintf("%s(%d)", dataType, precision) } } results[i]["full_data_type"] = fullType } } // 获取表注释 tableCommentQuery := ` SELECT obj_description(pgc.oid, 'pg_class') as table_comment FROM pg_catalog.pg_class pgc JOIN pg_catalog.pg_namespace pgn ON pgn.oid = pgc.relnamespace WHERE pgc.relname = $2 AND pgn.nspname = $1` commentResults, err := dbFactory.QuerySliceMapWithParams(tableCommentQuery, schema, tableName) tableComment := "" if err == nil && len(commentResults) > 0 { if comment, ok := commentResults[0]["table_comment"].(string); ok { tableComment = comment } } return map[string]interface{}{ "tenant_id": deps.ReqCtx.TenantID, "user_id": deps.ReqCtx.UserID, "database_type": dbType, "database_name": dbFactory.GetDatabaseName(), "schema": schema, "table_name": tableName, "table_comment": tableComment, "include_comments": params.IncludeComments, "columns": results, "total_columns": len(results), "timestamp": time.Now().Format(time.RFC3339), }, nil }, ) }