/* * Copyright (c) 2008 Intel Corporation * All rights reserved. * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * -- Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * -- Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * -- Neither the name of the Intel Corporation nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE INTEL OR ITS * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ using System; using System.Collections.Generic; using System.Net; using System.IO; using ExtensionLoader; using ExtensionLoader.Config; using OpenMetaverse; using OpenMetaverse.StructuredData; using LitS3; namespace AssetServer.AmazonS3Storage { public class AmazonS3Storage : IExtension, IStorageProvider { const string EXTENSION_NAME = "AmazonS3Storage"; // Used in metrics reporting AssetServer server; S3Service s3; string bucketName; string cloudFrontHost; public AmazonS3Storage() { } public void Start(AssetServer server) { this.server = server; bool useCloudFront = false; try { IConfig amazonConfig = server.ConfigFile.Configs["Amazon"]; s3 = new S3Service(); s3.AccessKeyID = amazonConfig.GetString("AccessKeyID"); s3.SecretAccessKey = amazonConfig.GetString("SecretAccessKey"); bucketName = amazonConfig.GetString("BucketName"); useCloudFront = amazonConfig.GetBoolean("UseCloudFront"); } catch (Exception) { Logger.Log.Error("Failed to load [Amazon] section from config file " + AssetServer.CONFIG_FILE); return; } try { s3.CreateBucket(bucketName); BucketAccess access = s3.QueryBucket(bucketName); if (access == BucketAccess.Accessible) { string bucketHost = bucketName + ".s3.amazonaws.com"; Logger.Log.Info("Using Amazon S3 bucket http://" + bucketHost); if (useCloudFront) { CloudFrontService cf = new CloudFrontService(); cf.AccessKeyID = s3.AccessKeyID; cf.SecretAccessKey = s3.SecretAccessKey; // List all of the distributions associated with our account foreach (Distribution distribution in cf.GetAllDistributions()) { // Check if any of the distributions are tied to the current S3 bucket if (distribution.Config.Origin == bucketHost && distribution.Config.Enabled) { if (WaitForCloudFront(cf, distribution)) cloudFrontHost = distribution.DomainName; break; } } if (cloudFrontHost == null) { // No valid distribution was found, create a new one DistributionConfig config = new DistributionConfig(); config.Origin = bucketHost; config.Enabled = true; Distribution distribution = cf.CreateDistribution(config); if (WaitForCloudFront(cf, distribution)) cloudFrontHost = distribution.DomainName; else Logger.Log.Error("Failed to create Amazon CloudFront distribution"); } if (cloudFrontHost != null) { Logger.Log.Info("Using Amazon CloudFront distribution http://" + cloudFrontHost); } } } else { Logger.Log.ErrorFormat("Cannot use Amazon S3 bucket {0}: {1}", bucketName, access); } } catch (Exception ex) { Logger.Log.ErrorFormat("Failed to create/use Amazon S3 bucket {0}: {1}", bucketName, ex.Message); } } public void Stop() { } public BackendResponse TryFetchMetadata(UUID assetID, out Metadata metadata) { metadata = null; BackendResponse ret; GetObjectRequest request = new GetObjectRequest(s3, bucketName, assetID.ToString(), true); try { using (GetObjectResponse response = request.GetResponse()) { metadata = new Metadata(); metadata.CreationDate = response.LastModified; metadata.Description = response.Metadata.Get("description"); metadata.ID = assetID; metadata.Name = response.Metadata.Get("name"); metadata.SHA1 = OpenMetaverse.Utils.HexStringToBytes(response.Metadata.Get("sha1"), false); Boolean.TryParse(response.Metadata.Get("temporary"), out metadata.Temporary); metadata.ContentType = response.ContentType; // Return the CloudFront URL if enabled if (cloudFrontHost != null) metadata.Methods["data"] = new Uri(String.Format("http://{0}/{1}", cloudFrontHost, assetID)); else metadata.Methods["data"] = new Uri(s3.GetUrl(bucketName, assetID.ToString())); ret = BackendResponse.Success; } } catch (WebException ex) { if (ex.Response != null && (ex.Response as HttpWebResponse).StatusCode == HttpStatusCode.NotFound) { ret = BackendResponse.NotFound; } else { Logger.Log.WarnFormat("Failed fetching metadata for {0}: {1}", assetID, ex.Message); ret = BackendResponse.Failure; } } server.MetricsProvider.LogAssetMetadataFetch(EXTENSION_NAME, ret, assetID, DateTime.Now); return ret; } public BackendResponse TryFetchData(UUID assetID, out byte[] assetData) { assetData = null; long contentLength = 0; string contentType; BackendResponse ret; try { using (Stream stream = s3.GetObjectStream(bucketName, assetID.ToString(), out contentLength, out contentType)) { assetData = new byte[contentLength]; int pos = 0; while (pos < contentLength) pos += stream.Read(assetData, pos, (int)contentLength - pos); } ret = BackendResponse.Success; } catch (WebException ex) { if (ex.Response != null && (ex.Response as HttpWebResponse).StatusCode == HttpStatusCode.NotFound) { ret = BackendResponse.NotFound; } else { Logger.Log.WarnFormat("Failed fetching data for {0}: {1}", assetID, ex.Message); ret = BackendResponse.Failure; } } server.MetricsProvider.LogAssetDataFetch(EXTENSION_NAME, ret, assetID, (int)contentLength, DateTime.Now); return ret; } public BackendResponse TryFetchDataMetadata(UUID assetID, out Metadata metadata, out byte[] assetData) { metadata = null; BackendResponse response = TryFetchData(assetID, out assetData); if (response == BackendResponse.Success) response = TryFetchMetadata(assetID, out metadata); return response; } public BackendResponse TryCreateAsset(Metadata metadata, byte[] assetData, out UUID assetID) { assetID = metadata.ID = UUID.Random(); return TryCreateAsset(metadata, assetData); } public BackendResponse TryCreateAsset(Metadata metadata, byte[] assetData) { BackendResponse ret; // Calculate the MD5 to compare against what AmazonS3 returns string md5String = BitConverter.ToString(OpenMetaverse.Utils.MD5(assetData)).Replace("-", String.Empty); AddObjectRequest request = new AddObjectRequest(s3, bucketName, metadata.ID.ToString()); request.CannedAcl = CannedAcl.PublicRead; request.ReadWriteTimeout = 1000 * 60; request.ContentLength = assetData.Length; request.ContentType = metadata.ContentType; //request.ContentDisposition = String.Format("attachment; filename={0}", metadata.ID); request.Metadata.Add("name", metadata.Name); request.Metadata.Add("description", metadata.Description); request.Metadata.Add("temporary", metadata.Temporary.ToString()); request.Metadata.Add("sha1", BitConverter.ToString(metadata.SHA1).Replace("-", String.Empty)); try { using (Stream stream = request.GetRequestStream()) { stream.Write(assetData, 0, assetData.Length); stream.Flush(); } AddObjectResponse response = request.GetResponse(); response.Close(); if (md5String.Equals(response.ETag.Trim('"'), StringComparison.OrdinalIgnoreCase)) { ret = BackendResponse.Success; } else { Logger.Log.ErrorFormat( "MD5 of uploaded asset ({0}) does not match match asset MD5 ({1}), deleting upload", response.ETag.Trim('"').ToLower(), md5String); // Delete the failed upload s3.DeleteObject(bucketName, metadata.ID.ToString()); ret = BackendResponse.Failure; } } catch (WebException ex) { Logger.Log.WarnFormat("Failed uploading asset {0}: {1}", metadata.ID, ex.Message); ret = BackendResponse.Failure; } server.MetricsProvider.LogAssetCreate(EXTENSION_NAME, ret, metadata.ID, assetData.Length, DateTime.Now); return ret; } public int ForEach(Action action, int start, int count) { string nextMarker = null; int pos = 0; int rowCount = 0; #region Move To Beginning // S3 does not allow us to say "start at asset 14", but we can say // "start at asset N", where N is a marker. We first find the // marker for the asset at our starting position try { while (pos < start) { ListObjectsArgs args = new ListObjectsArgs(); args.Marker = nextMarker; args.MaxKeys = start - pos; args.Delimiter = "/"; ListObjectsRequest request = new ListObjectsRequest(s3, bucketName, args); ListObjectsResponse response = request.GetResponse(); response.Close(); foreach (ObjectEntry entry in response.Entries) ++pos; if (response.IsTruncated) { nextMarker = response.NextMarker; } else { // We reached the end of the asset list without getting to the // start position Logger.Log.WarnFormat( "AmazonS3: Iterated through the entire asset list ({0} counted) without reaching start position {1}", pos, start); return 0; } } } catch (WebException ex) { Logger.Log.Warn("Failed fetching AmazonS3 asset list: " + ex.Message); return 0; } #endregion Move To Beginning // nextMarker is now set to the position of start, or null (indicating starting at the beginning) Logger.Log.DebugFormat("Starting AmazonS3 ForEach(), start={0}, pos={1}, nextMarker={2}", start, pos, nextMarker); try { ListObjectsArgs args = new ListObjectsArgs(); args.Marker = nextMarker; args.MaxKeys = count; args.Delimiter = "/"; ListObjectsRequest request = new ListObjectsRequest(s3, bucketName, args); ListObjectsResponse response = request.GetResponse(); response.Close(); foreach (ObjectEntry entry in response.Entries) { UUID assetID; Metadata metadata; if (UUID.TryParse(entry.Key, out assetID) && TryFetchMetadata(assetID, out metadata) == BackendResponse.Success) { action(metadata); ++rowCount; } } } catch (WebException ex) { Logger.Log.Warn("Failed fetching AmazonS3 asset list: " + ex.Message); } return rowCount; } static bool WaitForCloudFront(CloudFrontService cf, Distribution distribution) { while (distribution.Status == DistributionStatus.InProgress) { Logger.Log.Debug("Distribution is being deployed, waiting..."); System.Threading.Thread.Sleep(1000 * 10); try { distribution = cf.GetDistributionInfo(distribution.ID); } catch (Exception ex) { Logger.Log.Error("Error waiting for CloudFront distribution to come online: " + ex.Message); return false; } } return true; } } }