累积技术沉淀经验 2012-02-25
在完成了Apache Cassandra的四个基本学习步骤之后,可以尝试下实战性的编码了。
如有必要,建议再简单回顾一下:
基于第四点的建模思路,接下来我们要做的,就是搭建一个叫做JTwissandra的实战性项目,就是所谓的Java版本的Twissandra了。
其目的是为了以Twitter为假想对象,使用最简约(或者直接说简陋得了)的建模和实现,表达采用Apache Cassandra作为NoSQL平台的基本实现过程。
JTwissandra的基本编码环境:
1. Maven来管理
2. JUnit来测试
3. 基于hector client来作为Apache Cassandra的Java 客户端
大家可以通过下面的Github链接,直接clone出来最新的代码:
JTwissandra: https://github.com/itstarting/jtwissandra
也欢迎大家Fork或在这里直接拍砖——反正咱在NoSQL也是新手,脸皮厚点不要紧啦:)
1. 首先需要一个HFactoryHelper来初始化并建立Cassandra的客户端连接池和必要的对象:
import java.io.IOException; import java.util.Properties; import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel; import me.prettyprint.hector.api.Cluster; import me.prettyprint.hector.api.HConsistencyLevel; import me.prettyprint.hector.api.Keyspace; import me.prettyprint.hector.api.factory.HFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Helper for Cassandra initialization * * @author bright_zheng * */ public class HFactoryHelper { private static Logger logger = LoggerFactory.getLogger(HFactoryHelper.class); private static Cluster cluster; private static Keyspace keyspace = initKeyspace(); private static Properties properties; private HFactoryHelper(){} public static Keyspace getKeyspace(){ return keyspace; } private static Keyspace initKeyspace() { properties = new Properties(); try { properties.load(HFactoryHelper.class.getResourceAsStream("/config.properties")); } catch (IOException ioe) { ioe.printStackTrace(); } String cluster_name = properties.getProperty("cluster.name", "Test Cluster"); logger.debug("cluster.name={}", cluster_name); String cluster_hosts = properties.getProperty("cluster.hosts", "127.0.0.1:9160"); logger.debug("cluster.hosts={}", cluster_hosts); String active_keyspace = properties.getProperty("keyspace", "JTWISSANDRA"); logger.debug("keyspace={}", active_keyspace); cluster = HFactory.getOrCreateCluster(cluster_name, cluster_hosts); ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel(); ccl.setDefaultReadConsistencyLevel(HConsistencyLevel.ONE); return HFactory.createKeyspace( active_keyspace, cluster, ccl); } }
2. 建立各项业务服务的基类BaseService。
import java.util.UUID;
import me.prettyprint.cassandra.serializers.LongSerializer; import me.prettyprint.cassandra.serializers.StringSerializer; import me.prettyprint.cassandra.service.clock.MicrosecondsClockResolution; import me.prettyprint.cassandra.utils.TimeUUIDUtils; import me.prettyprint.hector.api.ClockResolution; import me.prettyprint.hector.api.Keyspace; import me.prettyprint.hector.api.beans.HColumn; import me.prettyprint.hector.api.factory.HFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import bright.zheng.jtwissandra.HFactoryHelper; /** * Base service which all business services should extend * * @author bright_zheng * */ public class BaseService { protected Logger logger = LoggerFactory.getLogger(getClass()); protected static Keyspace KEYSPACE = HFactoryHelper.getKeyspace(); protected static final String CF_USER = "USER"; protected static final String CF_FRIEND = "FRIEND"; protected static final String CF_FOLLOWER = "FOLLOWER"; protected static final String CF_TWEET = "TWEET"; protected static final String CF_TIMELINE = "TIMELINE"; protected static final StringSerializer SERIALIZER_STRING = StringSerializer.get(); protected static final LongSerializer SERIALIZER_LONG = LongSerializer.get(); protected static final int TWEETS_LIMIT_DEFAULT = 10; protected static final int TWEETS_LIMIT_MAX = 50; protected HColumn<String, String> createColumn(String name, String value) { return HFactory.createColumn(name, value, SERIALIZER_STRING, SERIALIZER_STRING); } protected HColumn<String, Long> createColumn(String name, Long value) { return HFactory.createColumn(name, value, SERIALIZER_STRING, SERIALIZER_LONG); } protected HColumn<Long, String> createColumn(Long name, String value) { return HFactory.createColumn(name, value, SERIALIZER_LONG, SERIALIZER_STRING); } /** * REF: http://wiki.apache.org/cassandra/FAQ#working_with_timeuuid_in_java * * @return UUID */ public UUID getUUID(){ //TODO: which UUID should we use to make sure it's unique? ClockResolution clock = new MicrosecondsClockResolution(); return TimeUUIDUtils.getTimeUUID(clock); //return TimeUUIDUtils.getUniqueTimeUUIDinMillis(); } protected Long getTimestamp(UUID uuid){ //return uuid.timestamp(); return TimeUUIDUtils.getTimeFromUUID(uuid); } protected Long generateTimestamp(){ return getTimestamp(getUUID()); } }
3. 下面是各项业务服务代码:
3.1 UserService
package bright.zheng.jtwissandra.service; import java.util.UUID; import me.prettyprint.hector.api.factory.HFactory; import me.prettyprint.hector.api.mutation.Mutator; import bright.zheng.jtwissandra.bean.User; /** * User service * * @author bright_zheng * */ public class UserService extends BaseService{ /** * Sample CLI cmd: * set USER['550e8400-e29b-41d4-a716-446655440000']['user_name'] = 'itstarting'; * set USER['550e8400-e29b-41d4-a716-446655440000']['password'] = '111222'; * set USER['550e8400-e29b-41d4-a716-446655440000']['create_timestamp'] = 1329836819890000; * * @param user */ public String addUser(User user) { Mutator<String> mutator = HFactory.createMutator( KEYSPACE, SERIALIZER_STRING); UUID uuid = this.getUUID(); String user_uuid = uuid.toString(); Long create_timestamp = this.getTimestamp(uuid); logger.debug("user_uuid={}", user_uuid); logger.debug("user_name={}", user.getUser_name()); logger.debug("password={}", user.getUser_password()); logger.debug("create_timestamp={}", create_timestamp); mutator.addInsertion(user_uuid, CF_USER, this.createColumn("user_name", user.getUser_name())); mutator.addInsertion(user_uuid, CF_USER, this.createColumn("password", user.getUser_password())); mutator.addInsertion(user_uuid, CF_USER, this.createColumn("create_timestamp", create_timestamp)); mutator.execute(); //return the generated uuid return user_uuid; } }
3.2 FriendService
package bright.zheng.jtwissandra.service; import me.prettyprint.hector.api.factory.HFactory; import me.prettyprint.hector.api.mutation.MutationResult; import me.prettyprint.hector.api.mutation.Mutator; /** * Friend service * * @author bright_zheng * */ public class FriendService extends BaseService{ /** * Adding a friend has two business logic: * 1. Add the friend's uuid to the Friend CF under my uuid * 2. Add my uuid to the friend's uuid as follower * * set FRIEND['550e8400-e29b-41d4-a716-446655440000']['1329836819859000'] * = '550e8400-e29b-41d4-a716-446655440001; * * set FOLLOWER['550e8400-e29b-41d4-a716-446655440001']['1329836819859000''] * = '550e8400-e29b-41d4-a716-446655440000; * * @param me * @param friend */ public MutationResult followFriend(String me, String friend) { Mutator<String> mutator = HFactory.createMutator( KEYSPACE, SERIALIZER_STRING); Long timestamp = this.generateTimestamp(); logger.debug("timestamp={}", timestamp); mutator.addInsertion(me, CF_FRIEND, this.createColumn(timestamp, friend)); mutator.addInsertion(friend, CF_FOLLOWER, this.createColumn(timestamp, me)); return mutator.execute(); } }
3.3 TimelineService
package bright.zheng.jtwissandra.service; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import me.prettyprint.hector.api.beans.ColumnSlice; import me.prettyprint.hector.api.beans.HColumn; import me.prettyprint.hector.api.factory.HFactory; import me.prettyprint.hector.api.query.QueryResult; import me.prettyprint.hector.api.query.SliceQuery; import bright.zheng.jtwissandra.bean.Timeline; /** * Timeline service * * @author bright_zheng * */ public class TimelineService extends BaseService{ /** * get specified user's first Timeline * * @param user_uuid * @return */ public TimelineWrapper getTimeline(String user_uuid){ return getTimeline(user_uuid, 0L, TWEETS_LIMIT_DEFAULT); } /** * get specified user's Timeline with start point * * @param user_uuid * @param start * @return */ public TimelineWrapper getTimeline(String user_uuid, long start){ return getTimeline(user_uuid, start, TWEETS_LIMIT_DEFAULT); } /** * get specified user's Timeline with start point and limit * * @param user_uuid * @param start * @param limit * @return */ public TimelineWrapper getTimeline(String user_uuid, long start, int limit){ if (start<0) start = 0; if (limit<0) limit = TWEETS_LIMIT_DEFAULT; if (limit>TWEETS_LIMIT_MAX) limit = TWEETS_LIMIT_MAX; SliceQuery<String, Long, String> sliceQuery = HFactory.createSliceQuery(KEYSPACE, SERIALIZER_STRING, SERIALIZER_LONG, SERIALIZER_STRING); sliceQuery.setColumnFamily(CF_TIMELINE); sliceQuery.setKey(user_uuid); sliceQuery.setRange(start, Long.MAX_VALUE, false, limit+1); QueryResult<ColumnSlice<Long, String>> result = sliceQuery.execute(); List<HColumn<Long, String>> list = result.get().getColumns(); long next = 0L; if(list==null){ return new TimelineWrapper(null, next); }else if (list.size()<=limit){ return new TimelineWrapper(convertToTimeline(list), 0L); }else{ HColumn<Long,String> last = list.get(list.size()-1); next = last.getName(); //the name is the timestamp as the "next" start list.remove(list.size()-1); return new TimelineWrapper(convertToTimeline(list), next); } } private List<Timeline> convertToTimeline(List<HColumn<Long,String>> cols){ Iterator<HColumn<Long,String>> it = cols.iterator(); List<Timeline> result = new ArrayList<Timeline>(); while(it.hasNext()){ HColumn<Long,String> col = it.next(); result.add(new Timeline(col.getValue(), col.getName())); } return result; } public class TimelineWrapper{ private List<Timeline> timelines; private long nextTimeline; public TimelineWrapper(List<Timeline> timelines, long nextTimeline){ this.timelines = timelines; this.nextTimeline = nextTimeline; } public long getNextTimeline() { return nextTimeline; } public List<Timeline> getTimelines() { return timelines; } } }
3.4 TweetService
package bright.zheng.jtwissandra.service; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.UUID; import me.prettyprint.hector.api.beans.ColumnSlice; import me.prettyprint.hector.api.beans.HColumn; import me.prettyprint.hector.api.beans.Row; import me.prettyprint.hector.api.beans.Rows; import me.prettyprint.hector.api.factory.HFactory; import me.prettyprint.hector.api.mutation.Mutator; import me.prettyprint.hector.api.query.MultigetSliceQuery; import me.prettyprint.hector.api.query.QueryResult; import me.prettyprint.hector.api.query.SliceQuery; import bright.zheng.jtwissandra.bean.Tweet; /** * Tweet service * * @author bright_zheng * */ public class TweetService extends BaseService{ /** * Adding a tweet has following logic: * 1. Save the tweet to CF of TWEET * 2. Add the new tweet to my TIMELINE * 3. Add the new tweet to all my followers' TIMELINE * * @param me * @param friend */ public String addTweet(String user_uuid, String tweet_content) { Mutator<String> mutator = HFactory.createMutator( KEYSPACE, SERIALIZER_STRING); //the tweet uuid UUID uuid = this.getUUID(); String tweet_uuid = uuid.toString(); logger.debug("tweet_uuid={}", tweet_uuid); //the timestamp to build the timeline Long timestamp = this.getTimestamp(uuid); logger.debug("timestamp={}", timestamp); mutator.addInsertion(tweet_uuid, CF_TWEET, this.createColumn("user_uuid", user_uuid)); mutator.addInsertion(tweet_uuid, CF_TWEET, this.createColumn("tweet_content", tweet_content)); mutator.addInsertion(user_uuid, CF_TIMELINE, this.createColumn(timestamp, tweet_uuid)); // get back all my follower and insert the tweet to his/her TIMELINE SliceQuery<String, Long, String> sliceQuery = HFactory.createSliceQuery(KEYSPACE, SERIALIZER_STRING, SERIALIZER_LONG, SERIALIZER_STRING); sliceQuery.setColumnFamily(CF_FOLLOWER); sliceQuery.setKey(user_uuid); sliceQuery.setRange(Long.MIN_VALUE, Long.MAX_VALUE, false, 500); //TODO: 500 followers hard code here? QueryResult<ColumnSlice<Long, String>> result = sliceQuery.execute(); Iterator<HColumn<Long, String>> followers = result.get().getColumns().iterator(); while(followers.hasNext()) { HColumn<Long, String> follower = followers.next(); String follower_uuid = follower.getValue(); logger.debug("follower's uuid={}", follower_uuid); logger.debug("timestamp={}", follower.getName()); //insert the tweet to the follower's TIMELINE mutator.addInsertion(follower_uuid, CF_TIMELINE, this.createColumn(timestamp, tweet_uuid)); } mutator.execute(); //return the new generated tweet's uuid return tweet_uuid; } /** * Should we add this service? * * @param tweet_uuid * @return */ public Tweet getTweet(String tweet_uuid){ return null; } public List<Tweet> getTweets(List<String> tweet_uuids){ MultigetSliceQuery<String, String, String> multigetSlicesQuery = HFactory.createMultigetSliceQuery(KEYSPACE, SERIALIZER_STRING, SERIALIZER_STRING, SERIALIZER_STRING); multigetSlicesQuery.setColumnFamily(CF_TWEET); multigetSlicesQuery.setColumnNames("user_uuid","tweet_content"); multigetSlicesQuery.setKeys(tweet_uuids); QueryResult<Rows<String, String, String>> results = multigetSlicesQuery.execute(); return convertRowsToTweets(results.get()); } private List<Tweet> convertRowsToTweets(Rows<String, String, String> rows){ List<Tweet> list = new ArrayList<Tweet>(); Iterator<Row<String, String, String>> iterator = rows.iterator(); while(iterator.hasNext()){ Row<String, String, String> row = iterator.next(); ColumnSlice<String, String> cs = row.getColumnSlice(); list.add(new Tweet(row.getKey(), cs.getColumnByName("tweet_content").getValue(), cs.getColumnByName("user_uuid").getValue())); } return list; } }
4. 当然少不了JUnit测试用例了:
package bright.zheng.jtwissandra; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import junit.framework.Assert; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import bright.zheng.jtwissandra.bean.Timeline; import bright.zheng.jtwissandra.bean.Tweet; import bright.zheng.jtwissandra.bean.User; import bright.zheng.jtwissandra.service.FriendService; import bright.zheng.jtwissandra.service.TimelineService; import bright.zheng.jtwissandra.service.TimelineService.TimelineWrapper; import bright.zheng.jtwissandra.service.TweetService; import bright.zheng.jtwissandra.service.UserService; /** * Test cases for all services currently provided. * Please drop and create schema first and then run all cases as one round * The 'me' and 'friend' will be created each round dynamically for easier testing * * @author bright_zheng * */ public class ServiceTest{ Logger logger = LoggerFactory.getLogger(ServiceTest.class); private static UserService SERVICE_USER = new UserService(); private static FriendService SERVICE_FRIEND = new FriendService(); private static TweetService SERVICE_TWEET = new TweetService(); private static TimelineService SERVICE_TIMELINE = new TimelineService(); private static String me; private static String friend; private static long nextTimeline = 0L; @BeforeClass public static void setUp(){ // } @Test public void addUser() { logger.debug("=====================addUser{===================="); //add user 1 me = SERVICE_USER.addUser(new User("itstarting","1234")); logger.debug("This round of tesing, ME={}", me); Assert.assertNotNull(me); //add user 2 friend = SERVICE_USER.addUser(new User("test1","1234")); logger.debug("This round of tesing, FRIEND={}", friend); Assert.assertNotNull(friend); logger.debug("=====================}//addUser===================="); } /** * I'm following a friend */ @Test public void followFriend() { logger.debug("=====================followFriend{===================="); SERVICE_FRIEND.followFriend(me, friend); logger.debug("=====================}//followFriend===================="); } /** * I'm followed by a follower */ @Test public void followedByFollower() { logger.debug("=====================followedByFollower{===================="); SERVICE_FRIEND.followFriend(friend, me); logger.debug("=====================}//followedByFollower===================="); } /** * I'm twittering */ @Test public void addTweetByMe() { logger.debug("=====================addTweetByMe{===================="); for(int i=0; i<100; i++){ String tweet_uuid = SERVICE_TWEET.addTweet(me, "Hellow JTWISSANDRA -- by itstarting:" + i); Assert.assertNotNull(tweet_uuid); } logger.debug("=====================}//addTweetByMe===================="); } /** * My friend is twittering * */ @Test public void addTweetByFriend() { logger.debug("=====================addTweetByFriend{===================="); for(int i=0; i<100; i++){ String tweet_uuid = SERVICE_TWEET.addTweet(friend, "Hellow JTWISSANDRA -- by test1:" + i); Assert.assertNotNull(tweet_uuid); } logger.debug("=====================}//addTweetByFriend===================="); } /** * Get tweets for me */ @Test public void getTweetsByMe(){ logger.debug("=====================getTweetsByMe{===================="); getTweets(me, 0); logger.debug("=====================}//getTweetsByMe===================="); } /** * Get tweets at next Timeline (if any) */ @Test public void getTweetsByMeForNextTimeline(){ logger.debug("=====================getTweetsByMeForNextTimeline{===================="); if(nextTimeline>0L){ getTweets(me, nextTimeline); } logger.debug("=====================}//getTweetsByMeForNextTimeline===================="); } /** * Get tweets for my friend */ @Test public void getTweetsByMyFriend(){ logger.debug("=====================getTweetsByMyFriend{===================="); getTweets(friend, 0); logger.debug("=====================}//getTweetsByMyFriend===================="); } /** * */ @Test public void getTweetsByMyFriendForNextTimeline(){ logger.debug("=====================getTweetsByMyFriendForNextTimeline{===================="); getTweets(friend, nextTimeline); logger.debug("=====================}//getTweetsByMyFriendForNextTimeline===================="); } private void getTweets(String user_uuid, long start){ TimelineWrapper wrapper = SERVICE_TIMELINE.getTimeline(user_uuid, start); Assert.assertNotNull(wrapper); List<Timeline> list = wrapper.getTimelines(); List<String> tweet_uuids = new ArrayList<String>(); for(Timeline timeline: list){ String tweet_uuid = timeline.getTweet_uuid(); logger.debug("From Timeline: tweet_uuid={}, tweet_timestamp={}", tweet_uuid, timeline.getTweet_timestamp()); tweet_uuids.add(tweet_uuid); } List<Tweet> tweets = SERVICE_TWEET.getTweets(tweet_uuids); Iterator<Tweet> it = tweets.iterator(); while(it.hasNext()){ Tweet tweet = it.next(); logger.debug("From Tweet: tweet_uuid={}, tweet_content={}, user_uuid={}", new Object[]{tweet.getTweet_uuid(), tweet.getTweet_content(), tweet.getUser_uuid() }); } if(wrapper.getNextTimeline() > 0L){ logger.debug("The start timeline of next page is: {}", wrapper.getNextTimeline()); nextTimeline = wrapper.getNextTimeline(); }else{ logger.debug("No next page available"); nextTimeline = 0L; } } @AfterClass public static void shutdown(){ //cluster.getConnectionManager().shutdown(); } }
这是个一锅端的测试用例,全部跑一次可以覆盖几乎所有的业务服务逻辑。
5. 最后,别忘了在跑之前创建必要的schema:
drop keyspace JTWISSANDRA; create keyspace JTWISSANDRA with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and strategy_options = [{replication_factor:1}]; use JTWISSANDRA; create column family USER with comparator = UTF8Type and key_validation_class = UTF8Type and default_validation_class = UTF8Type and column_metadata = [ {column_name: user_name, validation_class: UTF8Type, index_name:user_name_idx, index_type:KEYS } {column_name: user_password, validation_class: UTF8Type} {column_name: create_timestamp, validation_class: LongType, index_name:create_timestamp_idx, index_type:KEYS} ]; create column family FRIEND with comparator = LongType and key_validation_class = UTF8Type and default_validation_class = UTF8Type; create column family FOLLOWER with comparator = LongType and key_validation_class = UTF8Type and default_validation_class = UTF8Type; create column family TWEET with comparator = UTF8Type and key_validation_class = UTF8Type and default_validation_class = UTF8Type and column_metadata = [ {column_name: user_uuid, validation_class: UTF8Type} {column_name: tweet_content, validation_class: UTF8Type} ]; create column family TIMELINE with comparator = LongType and key_validation_class = UTF8Type and default_validation_class = UTF8Type;
6. 其他?
有些咋想,就不贴了吧,略过……有兴趣的可从Github clone下来跑跑看。
==========所发现的问题,接下来再谈再讨论,最最希望有丰富经验的高手前来助阵,去我谜团,谢谢!==========