Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 49 additions & 17 deletions src/db/src/Connection/Connection.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?php declare(strict_types=1);
<?php declare (strict_types = 1);

namespace Swoft\Db\Connection;

Expand All @@ -20,10 +20,9 @@
use Swoft\Db\Query\Expression;
use Swoft\Db\Query\Grammar\Grammar;
use Swoft\Db\Query\Processor\Processor;
use Swoft\Stdlib\Helper\StringHelper;
use Swoft\Log\Helper\CLog;
use Swoft\Stdlib\Helper\StringHelper;
use Throwable;
use function bean;

/**
* Class Connection
Expand Down Expand Up @@ -122,7 +121,7 @@ class Connection extends AbstractConnection implements ConnectionInterface
*/
public function initialize(Pool $pool, Database $database): void
{
$this->pool = $pool;
$this->pool = $pool;
$this->database = $database;
$this->lastTime = time();

Expand Down Expand Up @@ -215,7 +214,7 @@ public function reconnect(): bool
{
try {
switch ($this->pdoType) {
case self::TYPE_WRITE;
case self::TYPE_WRITE;
$this->createPdo();
break;
case self::TYPE_READ;
Expand Down Expand Up @@ -533,12 +532,13 @@ public function statement(string $query, array $bindings = []): bool
public function insertGetIdStatement(string $query, array $bindings = [], string $sequence = null): string
{
return $this->run($query, $bindings, function ($query, $bindings) use ($sequence) {
$statement = $this->getPdo()->prepare($query);
$pdo = $this->getPdo();
$statement = $pdo->prepare($query);

$this->bindValues($statement, $this->prepareBindings($bindings));
$statement->execute();

return $this->getPdo()->lastInsertId($sequence);
return $pdo->lastInsertId($sequence);
});
}

Expand Down Expand Up @@ -580,7 +580,7 @@ public function affectingStatement(string $query, array $bindings = []): int
*/
public function unprepared(string $query): bool
{
return (bool)$this->run($query, [], function ($query) {
return (bool) $this->run($query, [], function ($query) {

return $this->getPdo()->exec($query);
});
Expand All @@ -604,7 +604,7 @@ public function prepareBindings(array $bindings): array
if ($value instanceof DateTimeInterface) {
$bindings[$key] = $value->format($grammar->getDateFormat());
} elseif (is_bool($value)) {
$bindings[$key] = (int)$value;
$bindings[$key] = (int) $value;
}
}

Expand Down Expand Up @@ -680,7 +680,7 @@ protected function runQueryCallback(string $query, array $bindings, Closure $cal
$this->releaseOrRemove();

// Throw exception
throw new DbException($e->getMessage(), (int)$e->getCode(), $e);
throw new DbException($e->getMessage(), (int) $e->getCode(), $e);
}

// If an exception occurs when attempting to run a query, we'll format the error
Expand All @@ -698,7 +698,7 @@ protected function runQueryCallback(string $query, array $bindings, Closure $cal
CLog::error('Fail err=<error>%s</error> sql=%s', $e->getMessage(), $rawSql);

// Throw exception
throw new DbException($e->getMessage(), (int)$e->getCode(), $e);
throw new DbException($e->getMessage(), (int) $e->getCode(), $e);
}

$this->pdoType = self::TYPE_DEFAULT;
Expand Down Expand Up @@ -732,7 +732,7 @@ public function getRawSql(string $sql, array $bindings): string
} elseif ($value === null) {
$param = 'NULL';
} else {
$param = (string)$value;
$param = (string) $value;
}

$sql = StringHelper::replaceFirst($name, $param, $sql);
Expand All @@ -741,7 +741,6 @@ public function getRawSql(string $sql, array $bindings): string
return $sql;
}


/**
* Whether to reconnect
*
Expand Down Expand Up @@ -774,7 +773,7 @@ protected function reconnectIfMissingConnection(): void
protected function prepared(PDOStatement $statement): PDOStatement
{
if (!$this->fetchMode) {
$config = $this->database->getConfig();
$config = $this->database->getConfig();
$this->fetchMode = $config['fetchMode'] ?? self::DEFAULT_FETCH_MODE;
}

Expand Down Expand Up @@ -932,6 +931,12 @@ protected function getPdoForSelect($useReadPdo = true)
public function getPdo(): PDO
{
$this->pdoType = self::TYPE_WRITE;

// check pdo connection is alive or not
if (false === $this->pingPdo($this->pdo)) {
$this->reconnect();
}

return $this->pdo;
}

Expand All @@ -952,9 +957,37 @@ public function getReadPdo(): PDO
}

$this->pdoType = self::TYPE_READ;

// check pdo connection is alive or not
if (false === $this->pingPdo($this->readPdo)) {
$this->reconnect();
}

return $this->readPdo;
}

/**
* Check PDO connection is alive or not
*
* @param PDO $pdo
* @return boolean
*/
private function pingPdo(PDO $pdo): bool
{
try {
if (!is_object($pdo) || !method_exists($pdo, 'query')) {
return false;
}

// do ping
$pdo->query('do 1');
} catch (\Throwable $th) {
return false;
}

return true;
}

/**
* Bind values to their parameters in the given statement.
*
Expand Down Expand Up @@ -1063,7 +1096,6 @@ protected function getConMananger(): ConnectionManager
return BeanFactory::getBean(ConnectionManager::class);
}


/**
* Create pdo
*
Expand All @@ -1072,7 +1104,7 @@ protected function getConMananger(): ConnectionManager
private function createPdo()
{
$writes = $this->database->getWrites();
$write = $writes[array_rand($writes)];
$write = $writes[array_rand($writes)];

$dsn = $write['dsn'];
$this->parseDbName($dsn);
Expand Down Expand Up @@ -1128,7 +1160,7 @@ private function parseDbName(string $dns): void
private function selectDb(PDO $pdo, string $dbname): void
{
$useStmt = sprintf('use %s', $dbname);
$result = $pdo->exec($useStmt);
$result = $pdo->exec($useStmt);
if ($result !== false) {
return;
}
Expand Down