PHP的并发性问题,多个同时请求;互斥锁?

6 浏览
0 Comments

PHP的并发性问题,多个同时请求;互斥锁?

所以我意识到PHP可能同时运行多个请求。昨晚的日志似乎显示有两个请求同时进来并且在并行处理中;每个请求触发了从另一个服务器导入数据;每个请求都尝试向数据库插入记录。当其中一个线程插入了刚刚插入另一个线程的记录时,一个请求在尝试插入记录时失败了 (导入的数据带有PKs;我不使用增加ID):SQLSTATE[23000]: Integrity constraint violation: 1062 Duplicate entry \'865020\' for key \'PRIMARY\' ...

  1. 我对这个问题做出了正确的诊断吗?
  2. 我应该如何处理此问题?

以下是部分代码。我已剥离了大部分代码(日志记录、从数据创建其他实体之类的),但以下内容应该包括相关片段。请求打到import()方法,该方法为每个要导入的记录调用importOne()。请注意importOne()中的save方法,这是一个Eloquent方法(使用Laravel和Eloquent),它将生成适当的SQL来插入/更新记录。

public function import()
{
        $now = Carbon::now();
        // Get data from the other server in the time range from last import to current import
        $calls = $this->getCalls($this->getLastImport(), $now);
        // For each call to import, insert it into the DB (or update if it already exists)
        foreach ($calls as $call) {
            $this->importOne($call);
        }
        // Update the last import time to now so that the next import uses the correct range
        $this->setLastImport($now);
}
private function importOne($call)
{
    // Get the existing patient for the call, or create a new one
    $patient = Patient::where('id', '=', $call['PatientID'])->first();
    $isNewPatient = $patient === null;
    if ($isNewPatient) {
        $patient = new Patient(array('id' => $call['PatientID']));
    }
    // Set the fields
    $patient->given_name = $call['PatientGivenName'];
    $patient->family_name = $call['PatientFamilyName'];
    // Save; will insert/update appropriately
    $patient->save();
}

我猜解决方案需要在整个导入块周围加上互斥锁吗?如果请求无法获得互斥锁,它只会继续执行其余的请求。你有什么想法?

编辑:只是想提醒一下,这不是一个致命的失败。异常被捕获和记录,然后请求按照平常一样响应。并且对于另一个请求,导入成功,然后响应了请求。用户对此毫不知情;他们甚至不知道导入,这也不是请求进来的主要重点。因此,我可以将其保持原样运行,除了偶尔的异常之外,不会发生任何不良情况。但如果有修复可以防止向该服务器发送额外的工作/不必要的多个请求,那可能值得追求。

编辑2:好吧,我试着用flock()实现了一个锁定机制,你们有何想法?以下代码是否可行?如何对这个添加进行单元测试?

public function import()
{
    try {
        $fp = fopen('/tmp/lock.txt', 'w+');
        if (flock($fp, LOCK_EX)) {
            $now = Carbon::now();
            $calls = $this->getCalls($this->getLastImport(), $now);
            foreach ($calls as $call) {
                $this->importOne($call);
            }
            $this->setLastImport($now);
            flock($fp, LOCK_UN);
            // Log success.
        } else {
            // Could not acquire file lock. Log this.
        }
        fclose($fp);
    } catch (Exception $ex) {
        // Log failure.
    }
}

编辑3:对于锁的替代实现,你们有何想法?

public function import()
{
    try {
        if ($this->lock()) {
            $now = Carbon::now();
            $calls = $this->getCalls($this->getLastImport(), $now);
            foreach ($calls as $call) {
                $this->importOne($call);
            }
            $this->setLastImport($now);
            $this->unlock();
            // Log success
        } else {
            // Could not acquire DB lock. Log this.
        }
    } catch (Exception $ex) {
        // Log failure
    }
}
/**
 * Get a DB lock, returns true if successful.
 *
 * @return boolean
 */
public function lock()
{
    return DB::SELECT("SELECT GET_LOCK('lock_name', 1) AS result")[0]->result === 1;
}
/**
 * Release a DB lock, returns true if successful.
 *
 * @return boolean
 */
public function unlock()
{
    return DB::select("SELECT RELEASE_LOCK('lock_name') AS result")[0]->result === 1;
}

admin 更改状态以发布 2023年5月20日
0
0 Comments

看起来你不是遇到竞态条件,因为ID是来自导入文件的,如果你的导入算法正常工作,那么每个线程应该有自己的工作内容,并且不应该冲突。现在似乎2个线程都接收到了创建同一个患者的请求,并且由于算法有问题而发生了冲突。

conflictfree

确保每个生成的线程从导入文件中获取一行新数据,并且只在失败时重复执行。

如果你无法做到这一点,想要坚持使用互斥锁,使用文件锁似乎不是一个很好的解决方案,因为现在你在应用程序中解决了冲突,而实际上冲突发生在数据库中。使用数据库锁应该更快,也是一个更合理的解决方案。

请求数据库锁,像这样:

$db -> exec('LOCK TABLES table1 WRITE, table2 WRITE');

你可以期望在你写入被锁定的表时收到SQL错误,因此用try catch包围你的Patient->save()。

甚至更好的解决方案是使用条件原子查询。一个同时具有条件的DB查询。你可以使用如下查询:

INSERT INTO targetTable(field1) 
SELECT field1
FROM myTable
WHERE NOT(field1 IN (SELECT field1 FROM targetTable))

0
0 Comments

你的示例代码会阻塞第二个请求,直到第一个请求完成。您需要为flock()使用LOCK_NB选项,以便立即返回错误而不是等待。

是的,您可以在文件系统级别或直接在数据库中使用锁定或信号量。

在您的情况下,当您只需要处理每个导入文件一次时,最好的解决方案是拥有一个SQL表,每个导入文件有一行。在导入开始时,插入导入正在进行的信息,这样其他线程就会知道不再处理它。导入完成后,将其标记为已完成。(然后几个小时后,您可以检查表以查看导入是否真的完成。)

此外,最好将这种一次性的长时间事物(例如导入)作为单独的脚本处理,而不是在为访问者提供正常网页服务时处理。例如,您可以安排每晚执行的cron作业,该作业将接收导入文件并进行处理。

0