Hey bros,today,I`m going toshare you how to build a cloud storage system based on cephand‘amazonS3 api for java’.咳咳,好了不装逼了,首先,对不起亚马逊和Ceph,why?因为我首字母没大写。
上篇文章里面我们介绍了如何自行搭建ceph集群,当然那是基于虚拟机的,如果你有物理机那就再好不过了:ceph-deploy搭建和上篇文章一样,当然你也可以使用docker,比deploy搭建稍微简单一点,网上都有教程。
云存储,这几年非常的火热,小到公司集群使用,大到像亚马逊这样世界级的云服务系统,它们的实现都离不开分布式的文件系统,而ceph就是其中之一,底层的东西就不再赘述,很多前辈写了很多原理教程,奇虎360有位工程师写的ceph存储原理非常细致易懂,大家可以去找一找(我记不得在哪里看的了)领略一番。
1.什么是ceph radosgw
ceph的核心存储过程就依赖于 rados,其实如果你学习过网络基础,知道什么是网关(gateway),那么这个逻辑就很好理解了,我们将网络系统中的数据与我们的分布式文件系统的数据进行交互,而这个交互的过程由gateway来完成,由于我们的侧重点是api,底层的东西就不做多的分享,有兴趣的可以点击超链接。
在S3中,我们称S3存储文件的容器为Bucket,也就是桶的意思,很形象,就像java的散列桶,使用的也是这个单词,我们可以将文件放入桶中,或者将桶放入桶中。在放入桶之前,rados系统会将文件分为很多对象,再对对象进行一系列的散列计算和分割并保存其地址,然后一个被称为pg的东东采用
crush算法对对象进行分割并保存其地址,
crush算法不需要借助任何中枢分布器便可以计算出新数据的存储分部地址,而这个地址就是对象被放在哪个osd的哪个位置,获取文件时再反向寻址,获得我们的文件,crush的强大使得分布式文件系统更加的快速,安全,可靠,我是来介绍crush算法的吗
。
Java代码实现:
package com.changhong.data.upload.service; * Created by 1Chow on 2017/4/5. @Service public class CephStorageService { protected static AmazonS3 amazonS3=null; private static String accessKey; private static String secretKey; private static String host; private static Logger logger = LoggerFactory.getLogger(CephStorageService.class); private static Properties pro = new Properties(); static{ try { pro.load(CephStorageService.class.getResourceAsStream("/application.properties")); } catch (IOException e) { logger.error("properties file not found!"); } accessKey =pro.getProperty("ceph.accessKey"); secretKey =pro.getProperty("ceph.secretKey"); host=pro.getProperty("ceph.gateway"); System.out.println("开始初始化cephp配置..."+accessKey+host+secretKey); AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey); ClientConfiguration clientConfig = new ClientConfiguration(); clientConfig.setProtocol(Protocol.HTTP); amazonS3=new AmazonS3Client(credentials,clientConfig); amazonS3.setEndpoint(host);//This creates a connection so that you can interact with the server. System.out.println("ceph配置初始化成功!"); } * 创建目录 * @param bucketname * @return public Bucket createBucket(String bucketname) throws SocketTimeoutException { System.out.println("开始创建桶"+bucketname); Bucket bucket=null; bucket=amazonS3.createBucket(bucketname); System.out.println(amazonS3.toString()); return bucket; } * 查询已经创建的目录 * @return public List getBuckets(){ List list =new ArrayList(); List buckets = amazonS3.listBuckets(); for (Bucket bucket : buckets) { bucketEntity bucketEntity=new bucketEntity(); bucketEntity.setBucketName(bucket.getName()); bucketEntity.setCreationDate(StringUtils.fromDate(bucket.getCreationDate())); list.add(bucketEntity); System.out.println("已创建的bucket:"+bucket.getName() + "\t" + StringUtils.fromDate(bucket.getCreationDate())); } return list; } *根据指定的目录查询文件 * @param bucketName * @return public List getFiles(String bucketName){ System.out.println("开始查找文件:"); List list=new ArrayList(); ObjectListing objects = amazonS3.listObjects(bucketName); int i=0; do { for (S3ObjectSummary objectSummary : objects.getObjectSummaries()) { bucketContentEntity bucketContent=new bucketContentEntity(); bucketContent.setFileName(objectSummary.getKey()); list.add(i,bucketContent); i++; } objects = amazonS3.listNextBatchOfObjects(objects); } while (objects.isTruncated()); System.out.println("File find end"); for(int x=0;x System.out.println(x+"getFiles方法2中的文件名:"+list.get(x).getFileName()); } return list; } * 获取指定文件返回输入流 * @param path * @param filename * @return public S3ObjectInputStream getObject(String path,String filename){ S3Object s3Object=amazonS3.getObject(new GetObjectRequest(path,filename)); S3ObjectInputStream s3ObjectInputStream=s3Object.getObjectContent(); return s3ObjectInputStream; } * 删除单个文件 * @param bucketname * @param filename public void deleteFile(String bucketname,String filename){ amazonS3.deleteObject(bucketname, filename); } * 删除目录,不为空时先将bucket里的所有对象删除再删除目录,否则会抛异常 * @param bucketname * @return public boolean deleteBucket(String bucketname) throws SocketTimeoutException{ if(this.getFiles(bucketname).isEmpty()){ amazonS3.deleteBucket(bucketname); return true; }else{ List list=this.getFiles(bucketname); for(bucketContentEntity bc:list){ amazonS3.deleteObject(bucketname,bc.getFileName()); } amazonS3.deleteBucket(bucketname); return true; } } * 该方法用于检测目录是否存在 * @param bucketName * @return public boolean isBucketExists(String bucketName)throws SocketTimeoutException{ List list=this.getBuckets(); for(bucketEntity bucket:list){ if(bucket.getBucketName().equals(bucketName)){ return true; } } return false; } * 该方法用于检测指定目录下的某文件是否已经存在 * @param path * @param fileName * @return public boolean isFileExists(String path,String fileName)throws SocketTimeoutException{ List files=this.getFiles(path); System.out.println("files.size:"+files.size()); for(int j=0;j System.out.println("遍历LIST FILES:"+files.get(j).getFileName()); } for(int i=0;i bucketContentEntity file = files.get(i); System.out.println("当前目录下的文件名:"+file.getFileName()); if (fileName.equals(file.getFileName())) { return true; } } return false; } * 流存储文件 * @param bucketName * @param fileName * @param inputStream * @return public boolean saveData(String bucketName,String fileName,InputStream inputStream)throws SocketTimeoutException{ System.out.println("开始写入数据"); PutObjectResult result=amazonS3.putObject(bucketName,fileName,inputStream,new ObjectMetadata()); return result.isRequesterCharged(); } } 依赖:
上面的的代码中包含了最基本的bucket创建,文件的增删查等功能,文件我们是以流的方式写入,笔者使用的是springboot框架,本地做的集群端口转发所以配置的是本地端口,ceph基本的配置信息如下:
#ceph configration# ceph.gateway=127.0.0.1:3399 ceph.accessKey=XPVF8TESA1X4SFU***** ceph.secretKey=hBBEFpV3qsyI7HAdCBzA2ZdAhuANJFRIUz****
然后就能对集群进行访问了,怎么变成原谅色了,上面代码中的注释由于新浪比较垃圾会被Html注释,所以为了省事我把斜杠去掉了,导包自己用快捷键导一下。
【多读书,多看报,少谈恋爱,多睡觉】
下篇文章分享Swift对radosgw的支持,原理差不多,我去研究研究----------------拜拜