zffj 2012-03-16
1、规划种子节点ip和Token值的对应
4个种子节点ip
192.168.0.231 192.168.0.232 192.168.0.233 192.168.0.234 进入python,计算Token #192.168.0.231对应的Token为 >>> print 2 ** 127 / 4 * 1 42535295865117307932921825928971026432 #192.168.0.232对应的Token为 >>> print 2 ** 127 / 4 * 2 85070591730234615865843651857942052864 #192.168.0.233对应的Token为 >>> print 2 ** 127 / 4 * 3 127605887595351923798765477786913079296 #192.168.0.234对应的Token为 >>> print 2 ** 127 / 4 * 4 170141183460469231731687303715884105728
或者
import java.math.BigInteger; public class InitToken { public static void main(String[] args) { int nodes = 4;//节点的总数量 for (int i = 1; i <= nodes; i++) { BigInteger hs = new BigInteger("2"); BigInteger res = hs.pow(127); BigInteger div = res.divide(new BigInteger(nodes + "")); BigInteger fin = div.multiply(new BigInteger(i + "")); System.out.println(fin); } } }
2、配置cassandra的每个节点
a、cassandra/bin/cassandra.in.sh配置本机的jdk安装路径
JAVA_HOME=/usr/local/jdk6
b、cassandra/conf/cassandra.yaml
cluster_name: 'ots' commitlog_directory: cassandra/data/commitlog saved_caches_directory: cassandra/data/saved_caches data_file_directories: - cassandra/data/data #配置种子节点ip列表 - seeds: "192.168.0.231,192.168.0.232,192.168.0.233,192.168.0.234" #上面的配置基本保持一致 #当前节点的ip(这个ip主要是用来节点和节点之间通讯的ip) listen_address: 192.168.0.231 #当前节点的ip(这个ip主要是用来相应客户端操作的ip) rpc_address: 192.168.0.231
3、启动每个节点的cassandra
nohup cassandra/bin/cassandra -f &
4、设置每个节点的Token值
进入每个节点,把启动时默认生成的Token值改变为我们规划的Token值
或者直接在配置文件cassandra.yaml中指定Token值来规划,就不用下面的动态规划了。
如下:./bin/nodetool -h 192.168.0.231 -p 7199 move 42535295865117307932921825928971026432 ./bin/nodetool -h 192.168.0.232 -p 7199 move 42535295865117307932921825928971026432 。。。。。。。。。。。。。。。。。。。
5、初始化数据存储结构
客户端连接到集群中的某一个节点,初始化数据结构。运行初始化脚本,如下
./bin/cassandra-cli -h 192.168.0.231 -p 9160
参考json数据结构模型
{ "key":{ "name":"140 bytes", "cardno":"140 bytes", "ticketno":"140 bytes", "traindate":"140 bytes", "startstation":"140 bytes", "endtstation":"140 bytes", "seatinfo":"140 bytes", } }
具体脚本:
create keyspace user_train; use user_train; create column family users with comparator=UTF8Type and column_metadata=[{column_name:name,validation_class:UTF8Type,index_type:KEYS}, {column_name:cardno,validation_class:UTF8Type,index_type:KEYS}, {column_name:ticketno,validation_class:UTF8Type,index_type:KEYS}, {column_name:traindate,validation_class:UTF8Type}, {column_name:startstation,validation_class:UTF8Type}, {column_name:endtstation,validation_class:UTF8Type}, {column_name:seatinfo,validation_class:UTF8Type}]; #分发策略,主要是将存放到一个节点的一份数据,分发到另一个节点一份,节点的选取由Cassandra和配置文件决定。 update keyspace user_train with strategy_options = {datacenter1:2};
5、使用hector客户端api操作cassandra数据库,观察数据分布
插入数据的时候观察每个节点的数据分布是否均衡。 首先需要登录每个节点,不停的使用如下命令,刷新数据的分布情况。
./bin/nodetool -h 192.168.0.232 -p 7199 ring
写入数据代码
Cluster cluster = HFactory .getOrCreateCluster("ots", "192.168.0.231:9160," + "192.168.0.232:9160," + "192.168.0.233:9160," + "192.168.0.234:9160"); Keyspace keyspace = HFactory.createKeyspace("user_train", cluster); String ktest = ""; for (int i = 0; i < 120; i++) { ktest += "x"; } try { Mutator<String> mutator = HFactory.createMutator(keyspace, stringSerializer); long startTime = System.currentTimeMillis(); for (int i = 0; i < 10000*10; i++) { mutator.addInsertion("username" + i, "users", HFactory.createStringColumn("name", ktest+"username" + i)) .addInsertion( "username" + i, "users", HFactory.createStringColumn("cardno", ktest+"cardno" + i)) .addInsertion( "username" + i, "users", HFactory.createStringColumn("ticketno", ktest+"ticketno" + i)) .addInsertion( "username" + i, "users", HFactory.createStringColumn("traindate", ktest+"traindate" + i)) .addInsertion( "username" + i, "users", HFactory.createStringColumn("startstation", ktest+"startstation" + i)) .addInsertion( "username" + i, "users", HFactory.createStringColumn("endtstation", ktest+"endtstation" + i)) .addInsertion( "username" + i, "users", HFactory.createStringColumn("seatinfo", ktest+"seatinfo" + i)); if (i % 500 == 0) { mutator.execute(); System.out.println(i); } } mutator.execute(); System.out.println("insert time: " + (System.currentTimeMillis() - startTime)); } catch (HectorException he) { he.printStackTrace(); } cluster.getConnectionManager().shutdown();