PHP的并发性问题,多个同时请求;互斥锁?
PHP的并发性问题,多个同时请求;互斥锁?
所以我意识到PHP可能同时运行多个请求。昨晚的日志似乎显示有两个请求同时进来并且在并行处理中;每个请求触发了从另一个服务器导入数据;每个请求都尝试向数据库插入记录。当其中一个线程插入了刚刚插入另一个线程的记录时,一个请求在尝试插入记录时失败了 (导入的数据带有PKs;我不使用增加ID):SQLSTATE[23000]: Integrity constraint violation: 1062 Duplicate entry \'865020\' for key \'PRIMARY\' ...
。
- 我对这个问题做出了正确的诊断吗?
- 我应该如何处理此问题?
以下是部分代码。我已剥离了大部分代码(日志记录、从数据创建其他实体之类的),但以下内容应该包括相关片段。请求打到import()方法,该方法为每个要导入的记录调用importOne()。请注意importOne()中的save方法,这是一个Eloquent方法(使用Laravel和Eloquent),它将生成适当的SQL来插入/更新记录。
public function import() { $now = Carbon::now(); // Get data from the other server in the time range from last import to current import $calls = $this->getCalls($this->getLastImport(), $now); // For each call to import, insert it into the DB (or update if it already exists) foreach ($calls as $call) { $this->importOne($call); } // Update the last import time to now so that the next import uses the correct range $this->setLastImport($now); } private function importOne($call) { // Get the existing patient for the call, or create a new one $patient = Patient::where('id', '=', $call['PatientID'])->first(); $isNewPatient = $patient === null; if ($isNewPatient) { $patient = new Patient(array('id' => $call['PatientID'])); } // Set the fields $patient->given_name = $call['PatientGivenName']; $patient->family_name = $call['PatientFamilyName']; // Save; will insert/update appropriately $patient->save(); }
我猜解决方案需要在整个导入块周围加上互斥锁吗?如果请求无法获得互斥锁,它只会继续执行其余的请求。你有什么想法?
编辑:只是想提醒一下,这不是一个致命的失败。异常被捕获和记录,然后请求按照平常一样响应。并且对于另一个请求,导入成功,然后响应了请求。用户对此毫不知情;他们甚至不知道导入,这也不是请求进来的主要重点。因此,我可以将其保持原样运行,除了偶尔的异常之外,不会发生任何不良情况。但如果有修复可以防止向该服务器发送额外的工作/不必要的多个请求,那可能值得追求。
编辑2:好吧,我试着用flock()实现了一个锁定机制,你们有何想法?以下代码是否可行?如何对这个添加进行单元测试?
public function import() { try { $fp = fopen('/tmp/lock.txt', 'w+'); if (flock($fp, LOCK_EX)) { $now = Carbon::now(); $calls = $this->getCalls($this->getLastImport(), $now); foreach ($calls as $call) { $this->importOne($call); } $this->setLastImport($now); flock($fp, LOCK_UN); // Log success. } else { // Could not acquire file lock. Log this. } fclose($fp); } catch (Exception $ex) { // Log failure. } }
编辑3:对于锁的替代实现,你们有何想法?
public function import() { try { if ($this->lock()) { $now = Carbon::now(); $calls = $this->getCalls($this->getLastImport(), $now); foreach ($calls as $call) { $this->importOne($call); } $this->setLastImport($now); $this->unlock(); // Log success } else { // Could not acquire DB lock. Log this. } } catch (Exception $ex) { // Log failure } } /** * Get a DB lock, returns true if successful. * * @return boolean */ public function lock() { return DB::SELECT("SELECT GET_LOCK('lock_name', 1) AS result")[0]->result === 1; } /** * Release a DB lock, returns true if successful. * * @return boolean */ public function unlock() { return DB::select("SELECT RELEASE_LOCK('lock_name') AS result")[0]->result === 1; }
看起来你不是遇到竞态条件,因为ID是来自导入文件的,如果你的导入算法正常工作,那么每个线程应该有自己的工作内容,并且不应该冲突。现在似乎2个线程都接收到了创建同一个患者的请求,并且由于算法有问题而发生了冲突。
确保每个生成的线程从导入文件中获取一行新数据,并且只在失败时重复执行。
如果你无法做到这一点,想要坚持使用互斥锁,使用文件锁似乎不是一个很好的解决方案,因为现在你在应用程序中解决了冲突,而实际上冲突发生在数据库中。使用数据库锁应该更快,也是一个更合理的解决方案。
请求数据库锁,像这样:
$db -> exec('LOCK TABLES table1
WRITE, table2
WRITE');
你可以期望在你写入被锁定的表时收到SQL错误,因此用try catch包围你的Patient->save()。
甚至更好的解决方案是使用条件原子查询。一个同时具有条件的DB查询。你可以使用如下查询:
INSERT INTO targetTable(field1) SELECT field1 FROM myTable WHERE NOT(field1 IN (SELECT field1 FROM targetTable))
你的示例代码会阻塞第二个请求,直到第一个请求完成。您需要为flock()
使用LOCK_NB
选项,以便立即返回错误而不是等待。
是的,您可以在文件系统级别或直接在数据库中使用锁定或信号量。
在您的情况下,当您只需要处理每个导入文件一次时,最好的解决方案是拥有一个SQL表,每个导入文件有一行。在导入开始时,插入导入正在进行的信息,这样其他线程就会知道不再处理它。导入完成后,将其标记为已完成。(然后几个小时后,您可以检查表以查看导入是否真的完成。)
此外,最好将这种一次性的长时间事物(例如导入)作为单独的脚本处理,而不是在为访问者提供正常网页服务时处理。例如,您可以安排每晚执行的cron作业,该作业将接收导入文件并进行处理。