postgresql与锁的一致性,并选择更新

时间:2022-09-19 06:52:18

I have an application that can support a certain number of concurrent actions. This is represented by a table of "slots" in postgres. When nodes come online, they insert a number of rows into the table, one per slot. As jobs claim the slots, they update a row in the table claiming one of the slots and release it again as they finish.

我有一个应用程序可以支持一定数量的并发操作。这由postgres中的“slot”表表示。当节点联机时,它们会向表中插入许多行,每个槽一个。当jobs声明插槽时,他们更新表中的一行并声明其中的一个插槽,并在结束时再次释放。

The slots table looks like this:

槽表是这样的:

CREATE TABLE slots (
    id INT8 PRIMARY KEY DEFAULT nextval('slots_seq'),
    node_name TEXT NOT NULL,
    job_name TEXT
);

At any time it has some semi-fixed number of rows each of which may or may not have a job_name filled in.

在任何时候,它都有一些半固定的行数,每个行都可能有或没有填入job_name。

When a new job wants to start up, it runs these queries to get the name of the node it should run on:

当一个新工作想要启动时,它运行这些查询以获取它应该运行的节点的名称:

BEGIN;
LOCK TABLE slots IN ACCESS EXCLUSIVE MODE;
SELECT id, node_name
    FROM slots
    WHERE job_name IS NULL
    LIMIT 1
    FOR UPDATE;

(the node_name and id are read out of the cursor)

(node_name和id从游标中读出)

UPDATE slots
    SET job_name = %(job_name)s
    WHERE id = %(slot_id)s;
COMMIT;

This is often able to claim rows without losing any updates but with higher levels of concurrency, only a few rows will be claimed while many SELECT ... FOR UPDATE and UPDATE queries have been executed. The net result is that we end up with far more jobs running than there are slots for them.

这通常能够在不丢失任何更新的情况下声明行,但是并发性更高的情况下,只有少数行会被声明,而许多选择……对于已执行的更新和更新查询。最终的结果是,我们的工作机会远远超过了他们的职位。

Am I making a locking error? Is there a better way to go about this? Something that doesn't use table locks?

我是不是犯了一个锁定错误?有没有更好的方法来解决这个问题?一些不使用表格锁的东西?

Transaction level SERIALIZABLE does not cut it, only a handful of rows are ever filled.

事务级SERIALIZABLE不会缩减它,只有少数行被填满。

I'm using postgresql version 8.4.

我使用的是postgresql版本8.4。

3 个解决方案

#1


2  

Well, I wrote a program in perl to simulate what was going on since I didn't think that what you were saying was possible. Indeed after running my simulation I didn't have any problems even when I turned locking off (since SELECT … FOR UPDATE and UPDATE should do the necessary locking).

我用perl编写了一个程序来模拟正在发生的事情,因为我不认为您所说的是可能的。实际上,在运行了我的模拟之后,即使我关闭了锁,我也没有任何问题(因为SELECT…FOR UPDATE和UPDATE应该做必要的锁定)。

I ran this on PG 8.3 and PG 9.0 and it worked fine on both locations.

我在PG 8.3和PG 9.0上运行过,在这两个地方都运行得很好。

I urge you to try the program and/or try a python version to have a nice tight test-case which you can share with the class. If it does work, you can investigate what the differences are and if it doesn't work, you have something that other people can play with.

我强烈建议您尝试这个程序,或者尝试一个python版本,以便有一个很好的测试用例,您可以与这个类共享。如果它确实起作用,你就可以研究它的不同之处,如果它不起作用,你就会有其他人可以玩的东西。

#!/usr/bin/perl
use DBI;
$numchild = 0;
$SIG{CHLD} = sub { if (wait) {$numchild--;} };

sub worker($)
{
  my ($i) = @_;
  my ($job);

  my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});

  my ($x) = 0;
  while(++$x)
  {
#    $dbh->do("lock table slots in access exclusive mode;") || die "Cannot lock at $i\n";
    my @id = $dbh->selectrow_array("select id from slots where job_name is NULL LIMIT 1 FOR UPDATE;");

    if ($#id < 0)
    {
      $dbh->rollback;
      sleep(.5);
      next;
    }
    $job = "$$-$i-($x)";
    $dbh->do("update slots set job_name='$job' where id=$id[0];") || die "Cannot update at $i\n";
    $dbh->commit || die "Cannot commit\n";
    last;
  }
  if (!$job)
  {
    print STDERR "Could not find slots in 5 attempts for $i $$\n" if ($ENV{'verbose'});
    return;
  }
  else
  {
    print STDERR "Got $job\n" if ($ENV{'verbose'} > 1);
  }
  sleep(rand(5));

#  $dbh->do("lock table slots in access exclusive mode;") || die "Cannot lock at $i\n";
  $dbh->do("update slots set usage=usage+1, job_name = NULL where job_name='$job';") || die "Cannot unlock $job";
  print STDERR "Unlocked $job\n" if ($ENV{'verbose'} > 2);
  $dbh->commit || die "Cannot commit";
}

my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});

$dbh->do("drop table slots;");
$dbh->commit;
$dbh->do("create table slots (id serial primary key, job_name text, usage int);") || die "Cannot create\n";
$dbh->do("insert into slots values (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0);") || die "Cannot insert";
$dbh->commit;

for(my $i=0;$i<200;$i++)
{
  if (!fork)
  {
    worker($i);
    exit(0);
  }

  if (++$numchild > 50)
  {
    sleep(1);
  }
}
while (wait > 0)
{
  $numchild--;
  print "Waiting numchild $numchild\n";
  sleep(1);
}
my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});
my $slots = $dbh->selectall_arrayref("select * from slots;") || die "Cannot do final select";
my $sum=0;
foreach my $slot (@$slots)
{
  printf("%02d %3d %s\n",$slot->[0], $slot->[2], $slot->[1]);
  $sum += $slot->[2];
}
print "Successfully made $sum entries\n";

#2


2  

BEGIN; 
LOCK TABLE slots IN ACCESS EXCLUSIVE MODE; 
UPDATE slots SET job_name = '111' WHERE id IN (SELECT id FROM slots WHERE job_name IS NULL LIMIT 1) RETURNING *;
COMMIT;

This seems to work in Read Committed. It is only sql (same as your code) and can be executed in one call (faster).

这似乎在里德承诺。它只是sql(和您的代码一样),可以在一个调用中执行(更快)。

@Seth Robertson: It is not safe without LOCK TABLE and without while loop.

@Seth Robertson:没有锁表和while循环就不安全。

If there is transaction A and transaction B at same time: A will select first row and B will select first row. A will lock and update row, B have to wait until A commit. Then B will recheck condition job_name IS NULL. It is false and B will not update - B will not select next row but will only recheck and return empty result.

如果同时有事务A和事务B: A将选择第一行,B将选择第一行。A将锁定并更新行,B必须等到提交。然后,B将重新检查条件job_name为空。它是假的,B不会更新- B不会选择下一行,只会重新检查并返回空结果。

@joegester: SELECT FOR UPDATE is not the problem because all table is locked.

@joegester:选择FOR UPDATE不是问题,因为所有表都被锁住了。

Maybe there is another way to do job - if you delete and insert rows (in other table?) instead setting NULL. But I am not sure how.

也许还有另一种方法可以完成这项工作——如果您删除并插入行(在其他表中?)而不是设置NULL。但我不知道该怎么做。

#3


1  

You might want to look into advisory locks.

您可能需要查看咨询锁。

Haven't tested, but it might be possible to rewrite your locking query like so:

还没有经过测试,但是可以重写锁定查询,如下所示:

BEGIN;
SELECT id, node_name
    FROM slots
    WHERE job_name IS NULL
    AND pg_try_advisory_lock('slots'::regclass::int, id::int)
    LIMIT 1;

or, since you're using a bigint in the first place (you need that much ids?!?), something like:

或者,由于您首先使用的是bigint(您需要那么多的id吗?!)

BEGIN;
SELECT id, node_name
    FROM slots
    WHERE job_name IS NULL
    AND pg_try_advisory_lock(hashtext('slots_' || id))
    LIMIT 1;

Be wary of the gotchas if you do -- the advisory lock needs to be explicitly unlocked per session irrespective of whether the transaction succeeds or not.

如果您这样做了,请注意这些陷阱——每个会话都需要显式地解锁咨询锁,而不管事务是否成功。

There also is a risk of collision in the case of hashtext() but it's no big deal for you if you're processing jobs...

在hashtext()的情况下也存在冲突的风险,但是如果您正在处理作业,那么这对您来说并不是什么大问题……

#1


2  

Well, I wrote a program in perl to simulate what was going on since I didn't think that what you were saying was possible. Indeed after running my simulation I didn't have any problems even when I turned locking off (since SELECT … FOR UPDATE and UPDATE should do the necessary locking).

我用perl编写了一个程序来模拟正在发生的事情,因为我不认为您所说的是可能的。实际上,在运行了我的模拟之后,即使我关闭了锁,我也没有任何问题(因为SELECT…FOR UPDATE和UPDATE应该做必要的锁定)。

I ran this on PG 8.3 and PG 9.0 and it worked fine on both locations.

我在PG 8.3和PG 9.0上运行过,在这两个地方都运行得很好。

I urge you to try the program and/or try a python version to have a nice tight test-case which you can share with the class. If it does work, you can investigate what the differences are and if it doesn't work, you have something that other people can play with.

我强烈建议您尝试这个程序,或者尝试一个python版本,以便有一个很好的测试用例,您可以与这个类共享。如果它确实起作用,你就可以研究它的不同之处,如果它不起作用,你就会有其他人可以玩的东西。

#!/usr/bin/perl
use DBI;
$numchild = 0;
$SIG{CHLD} = sub { if (wait) {$numchild--;} };

sub worker($)
{
  my ($i) = @_;
  my ($job);

  my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});

  my ($x) = 0;
  while(++$x)
  {
#    $dbh->do("lock table slots in access exclusive mode;") || die "Cannot lock at $i\n";
    my @id = $dbh->selectrow_array("select id from slots where job_name is NULL LIMIT 1 FOR UPDATE;");

    if ($#id < 0)
    {
      $dbh->rollback;
      sleep(.5);
      next;
    }
    $job = "$$-$i-($x)";
    $dbh->do("update slots set job_name='$job' where id=$id[0];") || die "Cannot update at $i\n";
    $dbh->commit || die "Cannot commit\n";
    last;
  }
  if (!$job)
  {
    print STDERR "Could not find slots in 5 attempts for $i $$\n" if ($ENV{'verbose'});
    return;
  }
  else
  {
    print STDERR "Got $job\n" if ($ENV{'verbose'} > 1);
  }
  sleep(rand(5));

#  $dbh->do("lock table slots in access exclusive mode;") || die "Cannot lock at $i\n";
  $dbh->do("update slots set usage=usage+1, job_name = NULL where job_name='$job';") || die "Cannot unlock $job";
  print STDERR "Unlocked $job\n" if ($ENV{'verbose'} > 2);
  $dbh->commit || die "Cannot commit";
}

my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});

$dbh->do("drop table slots;");
$dbh->commit;
$dbh->do("create table slots (id serial primary key, job_name text, usage int);") || die "Cannot create\n";
$dbh->do("insert into slots values (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0), (DEFAULT,NULL,0);") || die "Cannot insert";
$dbh->commit;

for(my $i=0;$i<200;$i++)
{
  if (!fork)
  {
    worker($i);
    exit(0);
  }

  if (++$numchild > 50)
  {
    sleep(1);
  }
}
while (wait > 0)
{
  $numchild--;
  print "Waiting numchild $numchild\n";
  sleep(1);
}
my $dbh = DBI->connect("dbi:Pg:host=localhost",undef,undef,{'RaiseError'=>0, 'AutoCommit'=>0});
my $slots = $dbh->selectall_arrayref("select * from slots;") || die "Cannot do final select";
my $sum=0;
foreach my $slot (@$slots)
{
  printf("%02d %3d %s\n",$slot->[0], $slot->[2], $slot->[1]);
  $sum += $slot->[2];
}
print "Successfully made $sum entries\n";

#2


2  

BEGIN; 
LOCK TABLE slots IN ACCESS EXCLUSIVE MODE; 
UPDATE slots SET job_name = '111' WHERE id IN (SELECT id FROM slots WHERE job_name IS NULL LIMIT 1) RETURNING *;
COMMIT;

This seems to work in Read Committed. It is only sql (same as your code) and can be executed in one call (faster).

这似乎在里德承诺。它只是sql(和您的代码一样),可以在一个调用中执行(更快)。

@Seth Robertson: It is not safe without LOCK TABLE and without while loop.

@Seth Robertson:没有锁表和while循环就不安全。

If there is transaction A and transaction B at same time: A will select first row and B will select first row. A will lock and update row, B have to wait until A commit. Then B will recheck condition job_name IS NULL. It is false and B will not update - B will not select next row but will only recheck and return empty result.

如果同时有事务A和事务B: A将选择第一行,B将选择第一行。A将锁定并更新行,B必须等到提交。然后,B将重新检查条件job_name为空。它是假的,B不会更新- B不会选择下一行,只会重新检查并返回空结果。

@joegester: SELECT FOR UPDATE is not the problem because all table is locked.

@joegester:选择FOR UPDATE不是问题,因为所有表都被锁住了。

Maybe there is another way to do job - if you delete and insert rows (in other table?) instead setting NULL. But I am not sure how.

也许还有另一种方法可以完成这项工作——如果您删除并插入行(在其他表中?)而不是设置NULL。但我不知道该怎么做。

#3


1  

You might want to look into advisory locks.

您可能需要查看咨询锁。

Haven't tested, but it might be possible to rewrite your locking query like so:

还没有经过测试,但是可以重写锁定查询,如下所示:

BEGIN;
SELECT id, node_name
    FROM slots
    WHERE job_name IS NULL
    AND pg_try_advisory_lock('slots'::regclass::int, id::int)
    LIMIT 1;

or, since you're using a bigint in the first place (you need that much ids?!?), something like:

或者,由于您首先使用的是bigint(您需要那么多的id吗?!)

BEGIN;
SELECT id, node_name
    FROM slots
    WHERE job_name IS NULL
    AND pg_try_advisory_lock(hashtext('slots_' || id))
    LIMIT 1;

Be wary of the gotchas if you do -- the advisory lock needs to be explicitly unlocked per session irrespective of whether the transaction succeeds or not.

如果您这样做了,请注意这些陷阱——每个会话都需要显式地解锁咨询锁,而不管事务是否成功。

There also is a risk of collision in the case of hashtext() but it's no big deal for you if you're processing jobs...

在hashtext()的情况下也存在冲突的风险,但是如果您正在处理作业,那么这对您来说并不是什么大问题……