81294850 2016-12-05
标签
PostgreSQL , 门禁广告 , 数组 , 范围类型 , 抢购 , 排他约束 , 大盘分析 , 广告查询 , 火车票
背景
抢火车票是很有意思的一个课题,对IT人的智商以及IT系统的健壮性,尤其是数据库的功能和性能都是一种挑战。
我记得很多年前写过一篇PostgreSQL varbit和火车票相关的文章,翻出来看看,发现又和BIT有关。
回顾一下我之前写的
《基于 阿里云 RDS PostgreSQL 打造实时用户画像推荐系统》
《门禁广告销售系统需求剖析 与 PostgreSQL数据库实现》
PostgreSQL的bit功能还是很强大的,阿里云RDS PostgreSQL的bitpack也是用户实际应用中的需求提炼的新功能,大伙一起来给阿里云提需求。
打造属于国人的PostgreSQL.
正文
在PostgreSQL 中可以使用varbit存储比特位, 下面模拟一个简单的应用场景.
马上春节了, 火车票又到了销售旺季, 一票难求依旧.
下面就以火车票销售为例来介绍一下PostgreSQL varbit类型的用法.
测试环境 :
PostgreSQL 9.2.1
测试表 :
列车信息表 :
create table train
(id int primary key, --主键
go_date date, -- 发车日期
train_num name, -- 车次
station text[] -- 途径站点
);
车厢或bucket信息表 :
create table train_bucket
(id int primary key, --主键
tid int references train (id), -- 关联列车ID
bno int, -- 车厢或bucket号
sit_level text, -- 席别
sit_cnt int, -- 该车厢或bucket的座位总数
sit_remain int, -- 剩余座位
sit_bit varbit -- 座位BIT位, 已售座位用1表示, 未售座位用0表示
);
位置信息表 :
create table train_sit
(id serial8 primary key, -- 主键
tid int references train (id), --关联列车ID
tbid int references train_bucket(id), --关联bucket表ID
sit_no int, -- 座位号, 来自train_bucket.sit_bit的位置信息.
station_bit varbit -- 途径站点组成的BIT位信息, 已售站点用1表示, 未售站点用0表示.
);
创建索引 :
create index idx_train_bucket_sit_remain on train_bucket(sit_remain) where sit_remain>0;
create index idx_train_sit_station_bit on train_sit (station_bit) where station_bit<>repeat('1', 13)::varbit;
插入测试数据, 1趟火车, 途径14个站点.
insert into train values (1, '2013-01-20', 'D645', array['上海南','嘉兴','杭州南','诸暨','义乌','金华','衢州','上饶','鹰潭','新余','宜春','萍乡','株洲','长沙']);
插入测试数据, 共计200W个车厢或bucket, 每个车厢98个位置.
insert into train_bucket values (generate_series(1,1000000), 1, generate_series(1,1000000), '一等座', 98, 98, repeat('0',98)::varbit);
insert into train_bucket values (generate_series(1000001,2000000), 1, generate_series(1000001,2000000), '二等座', 98, 98, repeat('0',98)::varbit);
创建取数组中元素位置的函数 :
create or replace function array_pos (a anyarray, b anyelement) returns int as $$
declare
i int;
begin
for i in 1..array_length(a,1) loop
if b=a[i] then
return i;
end if;
i := i+1;
end loop;
return null;
end;
$$ language plpgsql;
创建购票函数 :
create or replace function buy
(
inout i_train_num name,
inout i_fstation text,
inout i_tstation text,
inout i_go_date date,
out o_slevel text,
out o_bucket_no int,
out o_sit_no int,
out o_order_status boolean
)
returns record as $$
declare
curs1 refcursor;
curs2 refcursor;
v_row int;
v_station text[];
v_train_id int;
v_train_bucket_id int;
v_train_sit_id int;
v_from_station_idx int;
v_to_station_idx int;
v_station_len int;
begin
set enable_seqscan=off;
v_row := 0;
o_order_status := false;
select array_length(station,1), station, id, array_pos(station, i_fstation), array_pos(station, i_tstation)
into v_station_len, v_station, v_train_id, v_from_station_idx, v_to_station_idx
from train where train_num=i_train_num and go_date = i_go_date;
if ( found and array_pos(v_station, i_fstation) is not null
and array_pos(v_station, i_tstation) is not null
and array_pos(v_station, i_fstation) < array_pos(v_station, i_tstation)
) then
else
o_order_status := false;
return;
end if;
open curs2 for select tid,tbid,sit_no from train_sit
where (station_bit & bitsetvarbit(repeat('0', v_station_len-1)::varbit, v_from_station_idx-1, v_to_station_idx-v_from_station_idx, 1)) = repeat('0', v_station_len-1)::varbit
and station_bit <> repeat('1', v_station_len-1)::varbit
-- and ctid not in (select locked_row from pgrowlocks('train_sit')) -- 耗时约300毫秒, 用它来解决热点锁等待不划算.
limit 1
for update nowait; -- 也可不加nowait, 加了的话如果获取锁失败将返回55P03异常, 需要程序重新提交
loop
fetch curs2 into v_train_id,v_train_bucket_id,o_sit_no;
if found then
update train_sit set station_bit=bitsetvarbit(station_bit, v_from_station_idx-1, v_to_station_idx-v_from_station_idx, 1)
where current of curs2;
GET DIAGNOSTICS v_row = ROW_COUNT;
if (v_row = 1) then
select sit_level, bno into o_slevel, o_bucket_no from train_bucket where id=v_train_bucket_id;
close curs2;
o_order_status := true;
return;
end if;
else
close curs2;
exit;
end if;
end loop;
v_row := 0;
open curs1 for select id, tid, strpos(sit_bit::text,'0'), sit_level, bno from train_bucket
where sit_remain>0
-- and ctid not in (select locked_row from pgrowlocks('train_bucket')) -- 耗时约300毫秒, 用它来解决热点锁等待不划算.
limit 1
for update nowait; -- 也可不加nowait, 加了的话如果获取锁失败将返回55P03异常, 需要程序重新提交.
loop
fetch curs1 into v_train_bucket_id, v_train_id, o_sit_no, o_slevel, o_bucket_no;
if found then
update train_bucket set sit_bit = set_bit(sit_bit, strpos(sit_bit::text,'0')-1, 1), sit_remain = sit_remain-1
where current of curs1;
GET DIAGNOSTICS v_row = ROW_COUNT;
if (v_row = 1) then
close curs1;
exit;
end if;
else
close curs1;
exit;
end if;
end loop;
if v_row = 1 then
insert into train_sit(tid,tbid,sit_no,station_bit)
values (
v_train_id,
v_train_bucket_id,
o_sit_no,
bitsetvarbit(repeat('0', v_station_len-1)::varbit, v_from_station_idx-1, v_to_station_idx-v_from_station_idx, 1)
);
o_order_status := true;
return;
else
o_order_status := false;
return;
end if;
exception
when others then
o_order_status := false;
return;
end;
$$ language plpgsql;
测试 :
digoal=# select * from buy('D645','杭州南','宜春','2013-01-20');
i_train_num | i_fstation | i_tstation | i_go_date | o_slevel | o_bucket_no | o_sit_no | o_order_status
-------------+------------+------------+------------+----------+-------------+----------+----------------
D645 | 杭州南 | 宜春 | 2013-01-20 | 一等座 | 35356 | 9 | t
(1 row)
压力测试
vi test.sql
select * from buy('D645','上海南','长沙','2013-01-20');
不加nowait测试结果 :
ocz@db-172-16-3-150-> pgbench -M prepared -f ./test.sql -n -r -c 16 -j 8 -T 1200 -U postgres digoal
transaction type: Custom query
scaling factor: 1
query mode: prepared
number of clients: 16
number of threads: 8
duration: 1200 s
number of transactions actually processed: 2197407
tps = 1831.143708 (including connections establishing)
tps = 1831.169308 (excluding connections establishing)
statement latencies in milliseconds:
8.734424 select * from buy('D645','上海南','长沙','2013-01-20');
加nowait测试结果 :
ocz@db-172-16-3-150-> pgbench -M prepared -f ./test.sql -n -r -c 16 -j 16 -T 12 -U postgres digoal
transaction type: Custom query
scaling factor: 1
query mode: prepared
number of clients: 16
number of threads: 16
duration: 12 s
number of transactions actually processed: 93632
tps = 7800.056248 (including connections establishing)
tps = 7818.803904 (excluding connections establishing)
statement latencies in milliseconds:
2.042862 select * from buy('D645','上海南','长沙','2013-01-20');
小结
1. 需要解决更新热点, 降低等待, 提高并行处理几率.
本例的热点在 : update train_bucket set sit_bit = set_bit(sit_bit, strpos(sit_bit::text,'0')-1, 1), sit_remain = sit_remain-1
where current of curs1;
以及
update train_sit set station_bit=bitsetvarbit(station_bit, v_from_station_idx-1, v_to_station_idx-v_from_station_idx, 1)
where current of curs2;
对应的游标 :
open curs2 for select tid,tbid,sit_no from train_sit
where (station_bit & bitsetvarbit(repeat('0', v_station_len-1)::varbit, v_from_station_idx-1, v_to_station_idx-v_from_station_idx, 1)) = repeat('0', v_station_len-1)::varbit
and station_bit <> repeat('1', v_station_len-1)::varbit
-- and ctid not in (select locked_row from pgrowlocks('train_sit')) -- 耗时约300毫秒, 用它来解决热点锁等待不划算.
limit 1
for update;
以及
open curs1 for select id, tid, strpos(sit_bit::text,'0'), sit_level, bno from train_bucket
where sit_remain>0
-- and ctid not in (select locked_row from pgrowlocks('train_bucket')) -- 耗时约300毫秒, 用它来解决热点锁等待不划算.
limit 1
for update;
解决的关键在这里.
如果不能解决热点的问题, 那就提高处理速度, 精简字段数量和长度, 精简索引. 提高更新速度.
2. 减少数据扫描的量.
partial index, 避免满座车厢的扫描, 以及全程占位位子的扫描.
3. 先查bucket 是否空闲, 再查sit是否空闲.
4. 还需要考虑优先级的问题 :
例如有111000和111100两个位子, 如果请求要最后两个站的票, 应该优先匹配111100, 这样更不容易浪费。如下 :
111000 | 000011 = 111011
111100 | 000011 = 111111
参考
1. setbitvarbit
http://blog.163.com/digoal@126/blog/static/163877040201302192427651/