php7.0的mongodb扩展是完全不兼容以前老的php5.6的mongo扩展的,在这里对mongodb的扩展做一个简单的总结。
废话不多说直接写代码
# 创建mongodb链接 public function connect($conf_arr) { try{ $conn_str = "mongodb://" . $conf_arr['host'] . ":" . $conf_arr['port'] . "/" . $conf_arr['db_name']; $options = array( 'username' => $conf_arr['username'], 'password' => $conf_arr['password'], 'readPreference' => $conf_arr['read_preference'], 'connectTimeoutMS' => intval($conf_arr['connect_timeout_ms']), 'socketTimeoutMS' => intval($conf_arr['socket_timeout_ms']), ); return new MongoDBDriverManager($conn_str, $options); } catch(Exception $e){ return false; } }
# find查询方法 public function find($query = [], $fields = [], $collection, $sort = [], $limit = 0, $skip = 0) { $conn = $this->connect(); if (empty($conn)) return false; try { $data = []; $options = []; if (!empty($query)) { $options['projection'] = array_fill_keys($fields, 1); } if (!empty($sort)) $options['sort'] = $sort; if (!empty($limit)) $options['limit'] = $limit; if (!empty($skip)) $options['skip'] = $skip; $mongoQuery = new MongoDBDriverQuery($query, $options); $readPreference = new MongoDBDriverReadPreference(MongoDBDriverReadPreference::RP_SECONDARY); $cursor = $conn->executeQuery($collection, $mongoQuery, $readPreference); foreach($cursor as $value) { $data[] = (array)$value; } return $data; } catch (Exception $e) { // write log } return false; }
# insert 插入方法 public function insert($add_arr, $collection) { if (empty($add_arr) || !is_array($add_arr) || '' == $collection) { return false; } $conn = $this->connect(); if (empty($conn)) { return false; } try { $bulk = new MongoDBDriverBulkWrite(); $bulk->insert($add_arr); $writeConcern = new MongoDBDriverWriteConcern(MongoDBDriverWriteConcern::MAJORITY, 6000); $result = $conn->executeBulkWrite($collection, $bulk, $writeConcern); if ($result->getInsertedCount()) { return true; } } catch (Exception $e) { // write log pass } return false; }
# delete 删除方法 public function delete($where_arr, $options = [], $collection) { if (empty($where_arr) || '' == $collection) { return false; } if (!isset($options['justOne'])) { $options['justOne'] = false; } $conn = $this->connect(); if (empty($conn)) { return false; } try { $bulk = new MongoDBDriverBulkWrite(); $bulk->delete($where_arr, $options); $writeConcern = new MongoDBDriverWriteConcern(MongoDBDriverWriteConcern::MAJORITY, 30000); $result = $conn->executeBulkWrite($collection, $bulk, $writeConcern); return true; } catch (Exception $e) { // write log pass } return false; }
# 执行cmd命令操作 public function command($params, $dbName) { $conn = $this->connect(); if (empty($conn)) { return false; } try { $cmd = new MongoDBDriverCommand($params); $result = $conn->executeCommand($dbName, $cmd); return $result; } catch (Exception $e) { // write log pass } return false; }
# distinct方法 public function distinct($key, $where, $collection) { try { $cmd = [ 'distinct' => $collection, 'key' => $key, 'query' => $where, ]; $res = $this->command($cmd); $result = $res->toArray(); return $result[0]->values; } catch (Exception $e) { // write log pass } return false; }
# count方法 public function count($query, $collection) { try { $cmd = [ 'count' => $collection, 'query' => $query, ]; $res = $this->command($cmd); $result = $res->toArray(); return $result[0]->n; } catch (Exception $e) { // write log pass } return false; }
# aggregate 方法 public function aggregate($where, $group, $collection) { try { $cmd = [ 'aggregate' => $collection, 'pipeline' => [ ['$match' => $where], ['$group' => $group], ], 'explain' => false, ]; $res = $this->command($cmd); if (!$res) { return false; } $result = $res->toArray(); return $result[0]->total; } catch (Exception $e) { // write log } return false; }