Rain 2020-01-04
读本节文档之前建议先查看[Fabric 1.4 源码分析 committer记账节点]章节。
Multi-Version Concurrency Control 多版本并发控制,MVCC 是一种并发控制的方法,一般在数据库管理系统中,实现对数据库的并发访问。在数据库系统中,锁机制可以控制并发操作,但是其系统开销较大,而MVCC可以在大多数情况下代替行级锁,使用MVCC,能降低其系统开销。MVCC是通过保存数据在某个时间点的快照来实现的. 不同存储引擎的MVCC. 不同存储引擎的MVCC实现是不同的,典型的有乐观并发控制和悲观并发控制.
InnoDB的MVCC,是通过在每行记录后面保存两个隐藏的列来实现的,这两个列,分别保存了这个行的创建时间,一个保存的是行的删除时间。这里存储的并不是实际的时间值,而是系统版本号(可以理解为事务的ID),每开始一个新的事务,系统版本号就会自动递增,事务开始时刻的系统版本号会作为事务的ID。其中MVCC只在 READ COMMITTED 和 REPEATABLE READ 两个隔离级别下工作。
SELECT
InnoDB会根据以下两个条件检查每行纪录:
InnoDB只查找版本早于当前事务版本的数据行,即,==行的系统版本号小于或等于事务的系统版本号==,这样可以确保事务读取的行,要么是在事务开始前已经存在的,要么是事务自身插入或者修改过的。
==行的删除版本,要么未定义,要么大于当前事务版本号==。这样可以确保事务读取到的行,在事务开始之前未被删除。
只有符合上述两个条件的纪录,才能作为查询结果返回。
UPDATE
InnoDB为插入一行新纪录,保存当前系统版本号作为行版本号,同时,保存当前系统版本号到原来的行作为行删除标识。
优点:
保存这两个额外系统版本号,使大多数读操作都可以不用加锁。这样设计使得读数据操作很简单,性能很好。
缺点:
每行纪录都需要额外的存储空间,需要做更多的行检查工作,以及一些额外的维护工作
这里回顾几个知识点:
读集中键的版本和世界状态中键的版本一致就认为该交易是 合法的。
if !version.AreSame(committedVersion, rwsetutil.NewVersion(kvRead.Version)) { return false, nil } 版本数据结构 type Version struct { BlockNum uint64 TxNum uint64 }
当验证完一笔交易后,如果交易有效,会更新key版本,接着再验证下一笔交易。
committingTxHeight := version.NewHeight(block.Num, uint64(tx.IndexInBlock)) updates.ApplyWriteSet(tx.RWSet, committingTxHeight, v.db)
在此举例介绍,mycc链码a转账给b。
实例化链码交易在区块3中,则a、b版本为
{ "key": "a", "version": { "block_num": "3", "tx_num": "0" } }
发起一笔有效交易后,版本更新为
{ "key": "a", "version": { "block_num": "4", "tx_num": "0" } }
当读写集中包含一个或多个查询信息(query-info)时,需要执行额外的验证。这种额外的验证需要确保在根据查询信息获得的结果的超集(多个范围的合并)中没有插入、删除或者更新键。
"range_queries_info": [ { "end_key":"marble3", "itr_exhausted":true, "raw_reads":{ "kv_reads":[ { "key":"marble1", "version":{ "block_num":"8", "tx_num":"0" } }, { "key":"marble2", "version":{ "block_num":"9", "tx_num":"0" } } ] }, "start_key":"marble1" } ]
validate()方法会根据rangeQueryInfo是否包含了合法当梅克尔树摘要对象返回不同当验证方法。
if rangeQueryInfo.GetReadsMerkleHashes() != nil { logger.Debug(`Hashing results are present in the range query info hence, initiating hashing based validation`) // 暂时全局搜索只发现ReadsMerkleHashes读,没发现写 validator = &rangeQueryHashValidator{} } else { logger.Debug(`Hashing results are not present in the range query info hence, initiating raw KVReads based validation`) validator = &rangeQueryResultsValidator{} }
因此,在此只介绍rangeQueryResultsValidator;该方法会对读集key以及版本与查询结果进行一一比较。一致则返回true。
func (v *rangeQueryResultsValidator) validate() (bool, error) { rqResults := v.rqInfo.GetRawReads().GetKvReads() for i := 0; i < len(rqResults); i++ { versionedKV := result.(*statedb.VersionedKV) // versionedKV key验证 if versionedKV.Key != kvRead.Key { logger.Debugf("key name mismatch: Key in rwset = [%s], key in query results = [%s]", kvRead.Key, versionedKV.Key) return false, nil } // versionedKV版本验证 if !version.AreSame(versionedKV.Version, convertToVersionHeight(kvRead.Version)) { logger.Debugf(`Version mismatch for key [%s]: Version in rwset = [%#v], latest version = [%#v]`, versionedKV.Key, versionedKV.Version, kvRead.Version) return false, nil } if result, err = itr.Next(); err != nil { return false, err } } }
当读写集中存在collection_hashed_rwset,需要验证collHashedRWSet.HashedRwSet.HashedReads里面的KVReadHash.Version
{ "collection_hashed_rwset":[ { "collection_name":"collectionMarbles", "hashed_rwset":"CiYKIF4flG/gcV3gNm0J6EgLrXZyojVRVwKbDd+8lYUPBFcOEgIIDw==", "pvt_rwset_hash":null } ], "namespace":"marblesp", "rwset":null }
源码:
遍历collHashedRWSets,再遍历collHashedRWSet.HashedRwSet.HashedReads,最后对每个kvReadHash的版本进行验证。
for _, collHashedRWSet := range collHashedRWSets { if valid, err := v.validateCollHashedReadSet(ns, collHashedRWSet.CollectionName, collHashedRWSet.HashedRwSet.HashedReads, updates); !valid || err != nil { return valid, err } }
for _, kvReadHash := range kvReadHashes { if valid, err := v.validateKVReadHash(ns, coll, kvReadHash, updates); !valid || err != nil { return valid, err } }
验证代码与验证key类似
committedVersion, err := v.db.GetKeyHashVersion(ns, coll, kvReadHash.KeyHash) if err != nil { return false, err } if !version.AreSame(committedVersion, rwsetutil.NewVersion(kvReadHash.Version)) { logger.Debugf("Version mismatch for key hash [%s:%s:%#v]. Committed version = [%s], Version in hashedReadSet [%s]", ns, coll, kvReadHash.KeyHash, committedVersion, kvReadHash.Version) return false, nil }