package com.tandbergtv.metadatamanager.specimpl;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringWriter;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;

import org.apache.log4j.Logger;
import org.springframework.transaction.annotation.Transactional;
import org.w3c.dom.Document;
import org.xml.sax.SAXException;

import com.sun.org.apache.xml.internal.serialize.OutputFormat;
import com.sun.org.apache.xml.internal.serialize.XMLSerializer;
import com.tandbergtv.metadatamanager.MetadataManagerDAO;
import com.tandbergtv.metadatamanager.exception.InvalidRevisionException;
import com.tandbergtv.metadatamanager.exception.MetadataException;
import com.tandbergtv.metadatamanager.exception.SearchException;
import com.tandbergtv.metadatamanager.exception.TranslationException;
import com.tandbergtv.metadatamanager.factoryImpl.IdentifierFactory;
import com.tandbergtv.metadatamanager.factoryImpl.SpecHandlerFactory;
import com.tandbergtv.metadatamanager.model.Asset;
import com.tandbergtv.metadatamanager.model.AssetState;
import com.tandbergtv.metadatamanager.model.Field;
import com.tandbergtv.metadatamanager.model.FieldRevision;
import com.tandbergtv.metadatamanager.model.File;
import com.tandbergtv.metadatamanager.model.Group;
import com.tandbergtv.metadatamanager.model.IField;
import com.tandbergtv.metadatamanager.model.Item;
import com.tandbergtv.metadatamanager.model.NextRevision;
import com.tandbergtv.metadatamanager.model.Relation;
import com.tandbergtv.metadatamanager.model.RootAssetRevision;
import com.tandbergtv.metadatamanager.model.SearchCriteria;
import com.tandbergtv.metadatamanager.model.Spec;
import com.tandbergtv.metadatamanager.model.Item.ItemType;
import com.tandbergtv.metadatamanager.search.AssetSearchService;
import com.tandbergtv.metadatamanager.search.CriteriaBuilder;
import com.tandbergtv.metadatamanager.spec.IIdentifier;
import com.tandbergtv.metadatamanager.spec.IRuleManager;
import com.tandbergtv.metadatamanager.spec.ISpecHandler;
import com.tandbergtv.metadatamanager.spec.ITranslator;
import com.tandbergtv.metadatamanager.spec.IValidator;
import com.tandbergtv.metadatamanager.specimpl.ttv.TTVId;
import com.tandbergtv.metadatamanager.util.AssetUtil;
import com.tandbergtv.metadatamanager.util.Binder;
import com.tandbergtv.metadatamanager.util.MappingFileParser;
import com.tandbergtv.metadatamanager.validation.ValidationError;
import com.tandbergtv.metadatamanager.validation.Schema.SchemaValidator;
import com.tandbergtv.metadatamanager.validation.Schematron.CustomSchematronBuilder;
import com.tandbergtv.metadatamanager.validation.Schematron.SchematronValidator;

/**
 * This is an abstract base for any spec handler that provides basic support.
 * 
 * @author spuranik
 * 
 */
public abstract class SpecHandlerBase implements ISpecHandler {

	public static final String EMPTY_STRING_VALUE = "";

	/* name suffix for schema validator */
	private static String SCHEMA_VALIDATOR = "SchemaValidator";

	/* metadata manager impl reference to invoke CRUD operations */
	protected MetadataManagerDAO metadataManagerDAO;

	/* search service */
	protected AssetSearchService searchService;

	protected final Logger logger = Logger.getLogger(this.getClass());

	/* used for converting doc from one spec to TTV. */
	protected ITranslator toTTV;

	/* used for converting doc from TTV to another. */
	protected ITranslator fromTTV;

	/* file which contains spec xpaths -> ttv xpaths */
	protected URL mappingResourceUrl;

	/*
	 * validator name -> validator instance.
	 */
	protected Map<String, IValidator> ruleValidators;

	/* file which contains the schema for this spec. */
	protected String schemaResource;

	/* used for identification and schema validator naming */
	protected Spec spec;

	protected String rootElementName;
	
	/* used for managing override rules */
	protected IRuleManager ruleManager;

	protected Map<String, Set<String>> specSpecificTTVXpaths;

	protected InputStream specSpecificTTVXpathsStream;

	public void setRuleManager(IRuleManager ruleManager) {
		this.ruleManager = ruleManager;
	}

	public IRuleManager getRuleManager() {
		return ruleManager;
	}

	/**
	 * @return the spec
	 */
	public Spec getSpec() {
		return spec;
	}

	/**
	 * @param spec
	 *            the spec to set
	 */
	public void setSpec(Spec spec) {
		this.spec = spec;
	}

	/**
	 * @return the schemaResource
	 */
	public String getSchemaResource() {
		return schemaResource;
	}

	/**
	 * @param schemaResource
	 *            the schemaResource to set
	 */
	public void setSchemaResource(String schemaResource) {
		this.schemaResource = schemaResource;
	}

	/**
	 * @return the validators
	 */
	public Map<String, IValidator> getRuleValidators() {
		return ruleValidators;
	}

	/**
	 * @param validators
	 *            the validators to set
	 */
	public void setRuleValidators(Map<String, IValidator> validators) {
		this.ruleValidators = validators;
	}

	/**
	 * @return the mappingResource
	 */
	public URL getMappingResourceUrl() {
		return mappingResourceUrl;
	}

	/**
	 * @param mappingResource
	 *            the mappingResource to set
	 */
	public void setMappingResourceUrl(URL mappingResourceUrl) {
		this.mappingResourceUrl = mappingResourceUrl;
	}

	/**
	 * @return the toTTV
	 */
	public ITranslator getToTTV() {
		return toTTV;
	}

	/**
	 * @param toTTV
	 *            the toTTV to set
	 */
	public void setToTTV(ITranslator toTTV) {
		this.toTTV = toTTV;
	}

	/**
	 * @return the fromTTV
	 */
	public ITranslator getFromTTV() {
		return fromTTV;
	}

	/**
	 * @param fromTTV
	 *            the fromTTV to set
	 */
	public void setFromTTV(ITranslator fromTTV) {
		this.fromTTV = fromTTV;
	}

	/**
	 * @return the metadataManagerDAO
	 */
	public MetadataManagerDAO getMetadataManagerDAO() {
		return metadataManagerDAO;
	}

	/**
	 * @param metadataManagerDAO
	 *            the metadataManagerDAO to set
	 */
	public void setMetadataManagerDAO(MetadataManagerDAO metadataManagerDAO) {
		this.metadataManagerDAO = metadataManagerDAO;
	}

	/**
	 * @return the searchService
	 */
	public AssetSearchService getSearchService() {
		return searchService;
	}

	/**
	 * @param searchService
	 *            the searchService to set
	 */
	public void setSearchService(AssetSearchService searchService) {
		this.searchService = searchService;
	}

	/**
	 * 
	 * @param specSpecificTTVXpathsFileName
	 */
	public void setSpecSpecificTTVXpathsStream(
			InputStream specSpecificTTVXpathsStream) {
		this.specSpecificTTVXpathsStream = specSpecificTTVXpathsStream;
		// Read the stream and populate the list
		BufferedReader br = new BufferedReader(new InputStreamReader(specSpecificTTVXpathsStream));
		String line;
		specSpecificTTVXpaths = new HashMap<String, Set<String>>();
		try {
			String type = EMPTY_STRING_VALUE;
			HashSet<String> set = null;
			while ((line = br.readLine()) != null) {
				if (!line.equals(EMPTY_STRING_VALUE)) {
					if (line.startsWith("[")) {
						type = line.replaceAll("\\[", EMPTY_STRING_VALUE);
						type = type.replaceAll("\\]", EMPTY_STRING_VALUE);
						type = type.trim().toLowerCase();

						if (!specSpecificTTVXpaths.containsKey(type)) {
							set = new HashSet<String>();
							specSpecificTTVXpaths.put(type, set);
						}
					} else {
						specSpecificTTVXpaths.get(type).add(line.trim());
					}
				}
			}
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
	}

	/**
	 * 
	 * @return
	 */
	public Map<String, Set<String>> getSpecSpecificTTVXpaths() {
		return specSpecificTTVXpaths;
	}

	@Transactional
	public List<IIdentifier> deleteAll(List<IIdentifier> ids) {
		List<IIdentifier> deletedAssetIds = new ArrayList<IIdentifier>();
		for (IIdentifier id : ids) {
			try {
				deleteUnique(id);
				deletedAssetIds.add(id);
			} catch (Exception e) {
				logger.error("Error deleting asset with id: " + id.toString(),
						e);
			}
		}
		return deletedAssetIds;
	}

	@Transactional
	public void deleteUnique(IIdentifier id) throws SearchException {
		Asset asset = id.getAsset();
		logger.debug("Deleting asset - TTV Id for: " + id.toString() + " is = " + asset.getId());
		metadataManagerDAO.delete(asset);
	}

	private static Document kludge(Document doc) {
		if (doc == null) {
			return null;
		}

		try {
			OutputFormat format = new OutputFormat(doc);
			format.setIndenting(true);
			ByteArrayOutputStream baos = new ByteArrayOutputStream();
			XMLSerializer serializer = new XMLSerializer(baos, format);
			serializer.serialize(doc);

			byte docBytes[] = baos.toByteArray();
			ByteArrayInputStream bais = new ByteArrayInputStream(docBytes);

			DocumentBuilder builder;
			Document input = null;
			builder = DocumentBuilderFactory.newInstance().newDocumentBuilder();

			input = builder.parse(bais);
			return input;
		} catch (ParserConfigurationException e) {
		} catch (SAXException e) {
		} catch (IOException e) {
		}
		return null;
	}

	public Document get(IIdentifier id) throws MetadataException,
			SearchException, TranslationException {
		return get(id,null);
	}

	/**
	 * given a doc, looks up the IDs from it
	 * 
	 * @param doc
	 * @return List<IIdentifier>
	 * @throws MetadataException
	 */
	@Transactional
	public List<IIdentifier> getIdentifiers(Document doc)
			throws MetadataException {
		logger.debug("Trying to extract identifiers from the given document");
		List<Asset> assets = convertDocumentToAssets(doc);

		List<IIdentifier> returnIdentifierList = new ArrayList<IIdentifier>();

		if (assets != null) {
			for (Asset a : assets) {
				IIdentifier identifier = extractId(a);
				returnIdentifierList.add(identifier);
			}
		}

		return returnIdentifierList;
	}
	

	/**
	 * {@inheritDoc}
	 */
	@Transactional
	public List<IIdentifier> put(Document doc) throws MetadataException {
		return put(doc, null, null, null);
	}
	


	@Override
	@Transactional
	public List<IIdentifier> put(Document doc, String revisionSource,
			String revisionComment, String externalRevision) throws MetadataException {
		logger.debug("In put()");
		RootAssetRevision rootAssetRevision = new RootAssetRevision(revisionSource, revisionComment, externalRevision);
		boolean prune = true;
		List<Asset> assetList = save(doc, prune, rootAssetRevision);

		List<IIdentifier> savedIdList = new ArrayList<IIdentifier>();
		for (Asset ma : assetList) {
			savedIdList.add(extractId(ma));
		}

		return savedIdList;
	}

	/**
	 * {@inheritDoc}
	 */
	@Transactional
	public List<Asset> save(Document doc) throws MetadataException {
		boolean prune = true;
		return save(doc, prune, new RootAssetRevision());
	}

	/**
	 * @param doc
	 * @param prune
	 * @return
	 * @throws MetadataException
	 */
	private List<Asset> save(Document doc, boolean prune, RootAssetRevision rootAssetRevision)
			throws MetadataException {
		List<Asset> assets = convertDocumentToAssets(doc);

		// NOTE: If there is a problem finding any one, none of them will be
		// saved.

		return mergeSaveAssets(assets, prune, rootAssetRevision, false, false);
	}

	/**
	 * @param doc
	 * @return
	 * @throws MetadataException
	 */
	private List<Asset> convertDocumentToAssets(Document doc)
			throws MetadataException {
		Document convertedDoc = null;

		// convert doc
		try {
			convertedDoc = getToTTV().translate(doc);
		} catch (TranslationException e) {
			logger.error(e);
			throw new MetadataException(e.getMessage(), e);
		}

		// bind doc to TTV assets
		List<Asset> assets = new Binder().bind(convertedDoc);
		return assets;
	}

	protected List<Asset> mergeSaveAssets(List<Asset> toBeMergedAssets, boolean prune,
			RootAssetRevision rootAssetRevision, boolean isRollback, boolean saveAsDraft) throws MetadataException {
		List<Asset> assetList = new ArrayList<Asset>();
		List<Asset> mergedAssets;
		boolean createRevision = true;
		
		for(Asset asset: toBeMergedAssets){
			Set<String> criteriaValueSet = new HashSet<String>();
			checkDuplicateChildItems(asset, criteriaValueSet);
		}
		
		if(saveAsDraft) {
			for(Asset asset: toBeMergedAssets) {
				try {
					Asset existingAsset = getMetadataManagerDAO().getAsset(asset.getTTVId(), true);
					if(existingAsset instanceof Group) {
						int revNumber = ((Group)existingAsset).getLatestRevisionNumber();
						RootAssetRevision revision = ((Group)existingAsset).getRevision(revNumber);
						if(revision.isDraft()) {
							//latest revision is already a draft revision. no need to create a new revision. just update the current revsion
							createRevision = false;
						} else {
							//create a new draft revision
							rootAssetRevision.setDraft(true);
							createRevision = true;
						}
					}
				} catch (SearchException e) {
					rootAssetRevision.setDraft(true);
					createRevision = true;
				}
			}
		} else {
			//if we are not saving a draft version, create a new revision always
			createRevision = true;
		}
		
		try {
			mergedAssets = mergeAssets(toBeMergedAssets, prune, createRevision);
		} catch (SearchException e) {
			logger.error(e);
			throw new MetadataException(e.getMessage(), e);
		}

		for (Asset ma : mergedAssets) {
				assetList.add(ma);
			// add the asset.
			if(ma instanceof Group){
				Group group = (Group)ma;
				if(createRevision) {
					if(!isRollback && !group.isRevisionNumberUpdated())  {
						logger.info("No new changes made to Asset:" + group
								+ ", abort saving a new revision!");
						continue;
					} else if(isRollback && !group.isRevisionNumberUpdated()){
						group.setLatestRevisionNumber(group.getLatestRevisionNumber() + 1);
					}
					group.addRootAssetRevision( rootAssetRevision);	
				}
				
				getMetadataManagerDAO().saveAsset(group);
			}
		}

		return assetList;
	}

	/**
	 * {@inheritDoc}
	 */
	@Transactional
	public List<Asset> incrementalSave(Document doc) throws MetadataException {
		boolean prune = false;
		return save(doc, prune, new RootAssetRevision());
	}

	public List<ValidationError> validateSchema(Document doc) {
		SchemaValidator validator = new SchemaValidator(
				getSchemaValidatorName(), schemaResource);
		return validator.validate(doc);
	}

	/**
	 * Validates a document using custom defined rules.
	 * 
	 * @param Document
	 *            doc to be validated
	 * @param IValidator
	 *            baseRules
	 * @param String
	 *            ruleSet
	 * @return List<ValidationError> list validation errors
	 */
	public List<ValidationError> customValidate(Document doc,
			IValidator baseRules, String ruleSet) {
		List<ValidationError> errors = baseRules.validate(doc);
		Map<String, Boolean> ruleMap = ruleManager.getRuleSet(ruleSet);
		errors = removeOverridenErrors(errors, ruleMap);
		errors.addAll(addRequiredErrors(doc, ruleMap));
		return errors;
	}

	private List<ValidationError> addRequiredErrors(Document doc,
			Map<String, Boolean> ruleMap) {
		InputStream is = CustomSchematronBuilder.buildSchematron(ruleMap);
		IValidator customValidator = new SchematronValidator("custom", is);
		return customValidator.validate(doc);
	}

	private List<ValidationError> removeOverridenErrors(
			List<ValidationError> errors, Map<String, Boolean> ruleMap) {
		List<ValidationError> toDelete = new ArrayList<ValidationError>();
		for (ValidationError ve : errors)
			if (ve.getErrorCode().equals("ERR-02")
					&& errorFoundInMap(ve, ruleMap))
				toDelete.add(ve);
		for (ValidationError ve : toDelete)
			errors.remove(ve);
		return errors;
	}

	private boolean errorFoundInMap(ValidationError ve,
			Map<String, Boolean> ruleMap) {
		for (Map.Entry<String, Boolean> entry : ruleMap.entrySet()) {
			try {
				if (compareXPaths(entry.getKey(), ve.getErrorLocation() + "/"
						+ ve.getErrorFields().get(0)))
					return true;
			} catch (IndexOutOfBoundsException e) {
			}
		}
		return false;
	}

	private boolean compareXPaths(String string1, String string2) {
		String str1 = string1.replaceAll("\\[[0-9]*\\]|@", EMPTY_STRING_VALUE);
		String str2 = string2.replaceAll("\\[[0-9]*\\]|@", EMPTY_STRING_VALUE);
		return str1.equals(str2);
	}

	/**
	 * Validates a document ignoring errors not for the field defined in the
	 * given xpath.
	 * 
	 * @param Document
	 *            doc to be validated
	 * @param IValidator
	 *            baseRules
	 * @param String
	 *            xpath
	 * @return List<ValidationError> list validation errors
	 */
	public List<ValidationError> validateField(Document doc,
			IValidator baseRules, String xpath) {
		List<ValidationError> errors = baseRules.validate(doc);
		List<ValidationError> results = new ArrayList<ValidationError>();
		for (ValidationError ve : errors) {
			try {
				if (compareXPaths(xpath, ve.getErrorLocation() + "/"
						+ ve.getErrorFields().get(0)))
					results.add(ve);
			} catch (IndexOutOfBoundsException e) {
			}
		}
		return results;
	}

	/**
	 * Concatenates the spec name and a constant suffix to generate this spec's
	 * schema validator name.
	 * 
	 * @return
	 */
	private String getSchemaValidatorName() {
		return getSpec().toString() + SCHEMA_VALIDATOR;
	}

	/**
	 * Iterates thru the list of top level assets and merges the fields. A
	 * problem in any one of the items will stop merging the rest of the list.
	 * 
	 * @throws MetadataException
	 */
	private List<Asset> mergeAssets(List<Asset> assets, boolean prune, boolean createRevision)
			throws SearchException, MetadataException {
		Iterator<Asset> assetIter = assets.iterator();
		List<Asset> mergedAssets = new ArrayList<Asset>();
		while (assetIter.hasNext()) {
			Asset currAsset = assetIter.next();
			currAsset = AssetUtil.deleteEmptyFields(currAsset);
			if (currAsset instanceof Group) {
				NextRevision nextRevision = new NextRevision();
				TTVId existingTTVId = getTTVId(currAsset, null);
				ifNonPrimarySpecIDExisting(existingTTVId,currAsset, null);
				Asset mergedAsset = currAsset;
				if (existingTTVId != null) {
					Asset existingAsset = getMetadataManagerDAO().getAsset(
							existingTTVId, false);
					if(createRevision) {
						nextRevision.setRevisionNumber(existingAsset.getLatestRevisionNumber() + 1);
					} else {
						nextRevision = null;
					}
					mergedAsset = mergeAsset(currAsset, existingAsset, prune, nextRevision);				
				}else{
//					mergedAsset.setRevisionNumberUpdated(true);
					AssetUtil.copyFieldsToFieldRevisions(mergedAsset, nextRevision);
				}
				mergedAssets.add(mergedAsset);
			} else {
				throw new MetadataException(
						"Adding an ITEM is not supported yet! Add a Group");
			}
		}
		return mergedAssets;
	}
	

	private void ifNonPrimarySpecIDExisting(TTVId primarySpecTTVId, Asset currAsset, Asset rootAsset) throws SearchException {
		Map<String, ISpecHandler> handlers = SpecHandlerFactory.getHandlers();
		String primarySpec = this.spec.toString();
		Map<String, TTVId> specTTVIdMap = new HashMap<String, TTVId>();
		specTTVIdMap.put(primarySpec, primarySpecTTVId);
		
		for(String specName: handlers.keySet()){
			
			if(!specName.equalsIgnoreCase(this.spec.toString())){
				ISpecHandler iSpecHandler = handlers.get(specName);
				TTVId existingTTVId = iSpecHandler.getTTVId(currAsset, rootAsset);
				if ((existingTTVId != null) && 
						((primarySpecTTVId == null )||(primarySpecTTVId != null && !primarySpecTTVId.equals(existingTTVId)))){
					throw new SearchException("Multiple assets found, for primary spec( "  +this.spec.toString() + "), the TTVID is: "
							+ primarySpecTTVId + "; for non-primary spec( "  +specName + "), the TTVID is: "
							+ existingTTVId );
				}
			}
			
		}
	}

	/**
	 * Merges the new asset tree into the existing asset tree;
	 * 
	 * @param newAsset
	 * @param oldAsset
	 * @param nextRevision TODO
	 * @return
	 * @throws SearchException
	 */
	protected Asset mergeAsset(Asset newAsset, Asset oldAsset, boolean prune, NextRevision nextRevision)
			throws SearchException {

		Asset rootAsset = oldAsset;

		oldAsset = mergeAndAddIncomingIntoExistingAsset(newAsset, oldAsset,
				rootAsset, nextRevision);
		
		if (prune) {
			oldAsset = pruneExistingAssetTree(newAsset, oldAsset, nextRevision);
		}

		return oldAsset;
	}

	/**
	 * Merges the fields for all assets in the existingAsset tree that came in
	 * the new asset tree. if new asset tree has new items that are not present
	 * in the existing tree, adds them as well.
	 * 
	 * @param newAsset
	 * @param oldAsset
	 * @param nextRevision 
	 * @return
	 * @throws SearchException
	 */
	protected Asset mergeAndAddIncomingIntoExistingAsset(Asset newAsset,
			Asset oldAsset, Asset rootAsset, NextRevision nextRevision) throws SearchException {
		String assetType = getAssetType(newAsset);

		mergeFields(newAsset, oldAsset, assetType, nextRevision);

		// merge all target assets
		for (Relation newRelation : newAsset.getRelations()) {
			Asset newTargetAsset = newRelation.getTargetAsset();
			if (requireSpecSpecificMerging(getAssetType(newTargetAsset))) {
				specSpecificMerging(oldAsset, nextRevision, newRelation,
						newTargetAsset);
			} else {
				mergeExistingTargetAsset(newTargetAsset, rootAsset, oldAsset, nextRevision);
			}
		}

		return oldAsset;
	}
	
	protected void mergeExistingTargetAsset(Asset newTargetAsset, Asset rootAsset, Asset oldAsset,
			NextRevision nextRevision) throws SearchException {
		
		NextRevision currentAssetRevision = null;
		
		if(nextRevision == null) {
			currentAssetRevision = new NextRevision();
			currentAssetRevision.setRevisionNumber(oldAsset.getLatestRevisionNumber());
		}
		
		boolean ifExistingTargetAsset = true;
		Asset existingTargetAsset = searchTargetAssetInMemory(newTargetAsset, rootAsset, oldAsset);

		if (existingTargetAsset == null) {
			TTVId existingTtvId = getTTVId(newTargetAsset, rootAsset);
			if (existingTtvId != null) {
				// if the target asset not a child asset of oldAsset
				// i.e. re-connecting an existing asset
				existingTargetAsset = getMetadataManagerDAO().getAsset(existingTtvId, false);
				if(nextRevision == null) {
					oldAsset.addChild(existingTargetAsset, currentAssetRevision, false);
				} else {
					oldAsset.addChild(existingTargetAsset, nextRevision, false);
				}
			} else {
				ifExistingTargetAsset = false;
				if(nextRevision == null) {
					oldAsset.addChild(newTargetAsset, currentAssetRevision, true);
				} else {
					oldAsset.addChild(newTargetAsset, nextRevision, true);
				}
			}
		} 
		if(ifExistingTargetAsset){
			mergeAndAddIncomingIntoExistingAsset(newTargetAsset, existingTargetAsset, rootAsset, nextRevision);
			if(nextRevision != null) {
				if (existingTargetAsset.isRevisionNumberUpdated()) {
					oldAsset.setLatestRevisionNumber(nextRevision.getRevisionNumber());
				}
			}
		}

	}

	protected Asset searchTargetAssetInMemory(Asset newTargetAsset, Asset rootAsset, Asset oldAsset){
		SearchCriteria criteria = prepareSearchPathById(newTargetAsset.getFields());
		return oldAsset.getAsset(criteria);
	}
	
	
	protected void specSpecificMerging(Asset oldAsset, NextRevision nextRevision,
			Relation newRelation, Asset targetAsset) {
	}

	protected void specSpecificMergingNoSave(Asset oldAsset, Relation newRelation, Asset targetAsset) {
	}

	/**
	 * goes through the existing asset tree and determines if any of them need
	 * to be deleted (coz they were not in the incoming asset tree)
	 * 
	 * @param newAsset
	 * @param oldAsset
	 * @param nextRevision TODO
	 * @return
	 * @throws SearchException
	 */
	protected Asset pruneExistingAssetTree(Asset newAsset, Asset oldAsset, NextRevision nextRevision)
			throws SearchException {

		ArrayList<Relation> oldRelationsToBeDeleted = new ArrayList<Relation>();

		for (Relation oldRelation : oldAsset.getRelations()) {
			Asset oldTargetAsset = oldRelation.getTargetAsset();
			
			//If newly added from mergeAndAddIncomingIntoExistingAsset()
			if(oldTargetAsset.getTTVId().getId()<=0){
				continue;
			}

			/*
			 * spechandler gets called for xml dom input. in this case, there
			 * will be no File present. Make sure we don't prune any existing
			 * file
			 */
			if (oldTargetAsset instanceof File) {
				continue;
			}

			
			String oldTargetAssetType = getAssetType(oldTargetAsset);
			if(!isAssetTypePartOfSpec(oldTargetAssetType)) {
				continue;
			}
			
			Asset newTargetAsset = searchNewTargetAsset(oldTargetAssetType, newAsset, oldTargetAsset);
			
			if (newTargetAsset == null) {
				oldRelationsToBeDeleted.add(oldRelation);
			} else {
				pruneExistingAssetTree(newTargetAsset, oldTargetAsset, nextRevision);
			}
		}

		removeOldRelations(oldAsset, nextRevision, oldRelationsToBeDeleted);
		return oldAsset;
	}

	protected Asset searchNewTargetAsset(String oldTargetAssetType, Asset newAsset, Asset oldTargetAsset){
		Asset newTargetAsset = null;
		if (requireSpecSpecificSearchForPruning(oldTargetAssetType)) {
			newTargetAsset  = specSpecificSearchForPruning(oldTargetAssetType, newAsset,
					oldTargetAsset);
		} else {
			SearchCriteria crit = prepareSearchPathById(oldTargetAsset
					.getFields());
			for (Relation rel : newAsset.getRelations()) {
				Asset currentTargetAsset = rel.getTargetAsset();

				List<Field> fields = currentTargetAsset.getFields();
				if (isSearchCriteriaPresent(crit, fields)){
					newTargetAsset = currentTargetAsset;
					break;
				}
			}
		
		}
		return newTargetAsset;
	}
	/*
	 * now determine the type of the item. then lookup the specspecific
	 * item types and if its not listed in it, then don;t even bother
	 * checking and deleting. we need to maintain this item.
	 * 
	 */
	protected boolean isAssetTypePartOfSpec(String assetType){
		return (specSpecificTTVXpaths.containsKey(assetType.toLowerCase()));
	}
	
	protected void removeOldRelations(Asset oldAsset,
			NextRevision nextRevision,
			ArrayList<Relation> oldRelationsToBeDeleted) throws SearchException {
		if(nextRevision != null) {
			for (Relation oldRelationToBeDeleted : oldRelationsToBeDeleted) {
				//set relation's DeleteRevison
				oldRelationToBeDeleted.setDeleteRevision(nextRevision.getRevisionNumber());
				
				//Remove all its targetAsset's Fields
				List<Field> oldTargetAssetFields = oldRelationToBeDeleted.getTargetAsset().getFields();
				oldTargetAssetFields.removeAll(oldTargetAssetFields);
			}
			//if any child relation is deleted, reset the asset's latestRevisionNumber
			if(oldRelationsToBeDeleted.size()>0){
				oldAsset.setLatestRevisionNumber(nextRevision.getRevisionNumber());
			}
		} 
	}

	/**
	 * Find out if the oldTargetAsset with assetType is found as one of newAsset's target asset
	 * Meant to be overridden by subclass along with {@link #requireSpecSpecificSearchForPruning(String)}
	 * @param assetType
	 * @param newAsset
	 * @param oldTargetAsset
	 * @return newTargetAsset if found; null otherwise
	 */
	protected Asset specSpecificSearchForPruning(String assetType, Asset newAsset,
			Asset oldTargetAsset) {
		return null;
	}

	/**
	 * Based on assetType, determine if it needs spec specific searching for Pruning
	 * Meant to be overridden by subclass if needed
	 * @param assetType
	 * @return false by default
	 */
	protected boolean requireSpecSpecificSearchForPruning(String assetType) {
		return false;
	}
	
	/**
	 * Based on assetType, determine if it needs spec specific merging
	 * Meant to be overridden by subclass if needed
	 * @param assetType
	 * @return false by default
	 */
	protected boolean requireSpecSpecificMerging(String assetType) {
		return false;
	}

	/**
	 * 
	 * @param targetAsset
	 * @return
	 */
	protected String getAssetType(Asset targetAsset) {
		String assetType = EMPTY_STRING_VALUE;
		targetAsset = new AssetUtil().unWrap(targetAsset);
		if (targetAsset instanceof Group) {
			Group g = (Group) targetAsset;
			assetType = g.getType();
		} else {
			Item i = (Item) targetAsset;
			assetType = i.getType();
		}
		return assetType;
	}

	/**
	 * for a given set of xpath and values int he searchCritreria, this method
	 * looks up the field list and returns true only if all xpaths and values
	 * are present
	 * 
	 * @param criteria
	 * @param fields
	 * @return boolean
	 */
	protected boolean isSearchCriteriaPresent(SearchCriteria criteria,
			List<Field> fields) {
		Iterator<Entry<String, String>> entryIterator = criteria.entrySet()
				.iterator();
		boolean fieldsFound = true;
		while (entryIterator.hasNext()) {
			Entry<String, String> entry = entryIterator.next();
			String xpath = entry.getKey();
			String value = entry.getValue();
			fieldsFound = false;
			for (Field f : fields) {
				if (f.getTtvXPath().equals(xpath) && f.getValue().equals(value)) {
					fieldsFound = true;
					break;
				}
			}
			if (fieldsFound == false) {
				return false;
			}
		}
		return true;
	}

	/**
	 * Checks if the given asset already exists based on FieldRevisons,
	 * !!!Not Fields !!!
	 * @param asset
	 * @return
	 * @throws SearchException
	 */
	public TTVId getTTVId(Asset asset, Asset rootAsset)
			throws SearchException {
		SearchCriteria criteria = prepareSearchPathById(asset.getFields());
		if (rootAsset == null) {
			criteria.put("rootlevel", "true");
		}
		com.tandbergtv.workflow.util.SearchCriteria crit = CriteriaBuilder
				.buildFieldRevisionSearchCriteria(criteria, false);
		if (crit.getSearchList().size() == 0) {
			return null;
		}

		try {
			List<TTVId> ids = new ArrayList<TTVId>();
			Collection<Asset> assets = searchService.search(crit);

			if (rootAsset == null) {
				for (Asset a : assets) {
					ids.add(a.getTTVId());
				}
			} else {
				if (assets != null && assets.size() != 0) {
					long rootId = rootAsset.getId();

					for (Asset a : assets) {
						if (a.getRoot() != null) {
							if (a.getRoot().getId() == rootId) {
								ids.add(a.getTTVId());
							}
						}
					}
				}
			}
			if (ids.size() > 1) {
				throw new SearchException("Multiple assets with spec Id: "
						+ criteria.toString() + " were found.");
			}
			return (ids.size() == 1) ? ids.get(0) : null;
		} catch (SearchException e) {
			throw e;
		}
	}
	
	/**
	 * 
	 * @param asset
	 * @param criteriaValueSet Concatenation of all criteria values
	 * @throws SearchException 
	 */
	protected void checkDuplicateChildItems(Asset asset, Set<String> criteriaValueSet) throws MetadataException{		
		SearchCriteria criteria = prepareSearchPathById(asset.getFields());
		StringBuffer criteriaValues = new StringBuffer();
		for(String value: criteria.values()){
			criteriaValues.append(value);
		}
		
		if(criteriaValueSet.contains(criteriaValues.toString())){
			throw new MetadataException("Multiple assets with spec Id: "
					+ criteria.toString() + " were found.");
		}else{
			criteriaValueSet.add(criteriaValues.toString());
			for(Relation relation: asset.getRelations()){
				checkDuplicateChildItems(relation.getTargetAsset(), criteriaValueSet);
			}
		}
	}

	/**
	 * Prepares a map of field names-> TTVXpath
	 * 
	 * @param fields
	 * @return
	 */
	protected Map<String, String> translate(List<String> fields) {
		Map<String, String> fieldXpaths = new HashMap<String, String>();
		Map<String, String> fieldMappings = MappingFileParser
				.readFieldMapping(getMappingResourceUrl());
		for (String field : fields) {
			if (fieldMappings.containsKey(field)) {
				fieldXpaths.put(field, fieldMappings.get(field));
			} else {
				throw new RuntimeException("Cannot map field: " + field
						+ " to TTV equivalent.");
			}
		}
		return fieldXpaths;
	}

	/**
	 * Every implementation of the spec knows which field(s) comprise of its
	 * identifier and uses them to build the search xpath list.
	 * 
	 * @param fields
	 *            Complete Fields document for the asset.
	 * @return Search criteria which is a map of xpath and value
	 */
	protected SearchCriteria prepareSearchPathById(List<Field> fields){
		SearchCriteria criteria = new SearchCriteria();
		IdentifierBase identifer = getIdentifier();
		identifer.setMappingResourceUrl(getMappingResourceUrl());
		Map<String, String> idPaths = identifer.getTTVPaths();

		Iterator<Entry<String, String>> pathIter = idPaths.entrySet()
				.iterator();
		while (pathIter.hasNext()) {
			boolean isEntryFound = false;
			Entry<String, String> entry = pathIter.next();
			for (Field f : fields) {
				if (f.getTtvXPath().equals(entry.getValue())) {
					isEntryFound = true;
					criteria.put(entry.getValue(), f.getValue());
					break;
				}
			}
			if(!isEntryFound){
				criteria.put(entry.getValue(), EMPTY_STRING_VALUE);
			}
		}
		return criteria;
	}


	/**
	 * This method merges the two sets of fields together. - For fields that
	 * already exist in the database, and are also there in the incoming fields,
	 * the values in the existing field gets overwritten. - If the incoming
	 * field value is blank, the existing field gets deleted. - Finally, fields
	 * that are in the database but not in the incoming asset (and their xpaths
	 * are listed in the list of spec specific xpaths) get deleted . This way we
	 * make sure that fields in the database that are used only by another spec
	 * do not get deleted.
	 * 
	 * at the end, fields in oldAsset is the updated set of fields
	 * 
	 * @param newAsset
	 * @param oldAsset
	 * @param nextRevision TODO
	 */
	public void mergeFields(Asset newAsset, Asset oldAsset, String assetType, NextRevision nextRevision) {
		
		List<Field> oldFields = oldAsset.getFields();
		List<FieldRevision> oldFieldRevisions = oldAsset.getFieldRevisions();
		
		List<FieldRevision> toBeDeletedFieldRevisions = new ArrayList<FieldRevision>();
		List<Field> toBeDeletedFields = new ArrayList<Field>();

		// Take care of Revisions (in FieldRevision table)
		toBeDeletedFieldRevisions = getToBeDeleted(newAsset.getFieldsMap(), oldAsset.getFieldRevisions(), assetType);
		if(nextRevision == null) {
			//no need to do revisioning. 
			oldFieldRevisions.removeAll(toBeDeletedFieldRevisions);
		} else if (toBeDeletedFieldRevisions != null && toBeDeletedFieldRevisions.size() > 0) {
			removeFieldRevisions(oldAsset, toBeDeletedFieldRevisions, nextRevision);
		}

		// Take care Fields (in Field table)
		toBeDeletedFields = getToBeDeleted(newAsset.getFieldsMap(), oldFields, assetType);
		oldFields.removeAll(toBeDeletedFields);
		
		updateExistingFields(newAsset, oldAsset, nextRevision);
	}
	

	/**
	 * updates fields in old assets fields with values in new assets fields. IF
	 * field does not exist, adds the new field to old assets fields
	 * 
	 * @param newAsset
	 * @param oldAsset
	 * @param nextRevision TODO
	 */
	protected void updateExistingFields(Asset newAsset, Asset oldAsset,
			NextRevision nextRevision) {
		List<Field> newFields = newAsset.getFields();
		
		if(newFields != null) {
			for (Field newField : newFields) {
				// Take care of Revisions (in FieldRevision table)
				FieldRevision oldFieldRevision = getFieldForXpathAndIndex(
						oldAsset.getFieldRevisionsMap(), newField.getTtvXPath(), newField
								.getIndices());
				if (oldFieldRevision != null) {// update old fieldRevision
					if (!newField.getValue().equalsIgnoreCase(
							oldFieldRevision.getValue())) {
						if(nextRevision != null) {
							FieldRevision newFieldRevision = new FieldRevision(
									oldFieldRevision, nextRevision);
							newFieldRevision.setValue(newField.getValue());
							oldAsset.addFieldRevision(newFieldRevision, nextRevision);
						} else {
							//we are not creating a new version. just update the existing value
							oldFieldRevision.setValue(newField.getValue());
						}
					}
				} else {// add new field
					if(nextRevision == null) {
						// if we were passed a null nextRevision, the caller did not
						// want to create a new revision. So, in this case, we get
						// the current version number of the asset and use that to
						// create the new FieldRevision
						NextRevision tempNextRevision = new NextRevision();
						tempNextRevision.setRevisionNumber(oldAsset.getLatestRevisionNumber());

						FieldRevision newf = new FieldRevision(newField, tempNextRevision);
						newf.setDeleteRevision(NextRevision.NULL_REVISION_NUMBER);
						newf.setAddRevision(tempNextRevision.getRevisionNumber());
						newf.setRevisionNumber(tempNextRevision.getRevisionNumber());
						oldAsset.addFieldRevision(newf, tempNextRevision);
					} else {
						FieldRevision newf = new FieldRevision(newField, nextRevision);
						newf.setDeleteRevision(NextRevision.NULL_REVISION_NUMBER);
						newf.setAddRevision(nextRevision.getRevisionNumber());
						newf.setRevisionNumber(nextRevision.getRevisionNumber());
						oldAsset.addFieldRevision(newf, nextRevision);
					}
				}
				
				// Take care Fields (in Field table)
				Field oldField = getFieldForXpathAndIndex(oldAsset.getFieldsMap(), newField
						.getTtvXPath(), newField.getIndices());
				if (oldField != null) {
					oldField.setValue(newField.getValue());
				} else {
					Field newf = new Field(newField);
					oldAsset.addField(newf);
				}
			}	
		}

	}

	/**
	 * looks up the fieldList for a "xpath + indices"
	 * 
	 * @param fields
	 * @param xpath
	 * @param indices
	 * @return
	 */
	protected <T extends IField> T getFieldForXpathAndIndex(Map<String, T> iFieldsMap, String xpath, List<Integer> indices) {
//		for (T f : fields) {
//			if (f.getTtvXPath().trim().equals(xpath.trim())
//					&& f.getIndices().equals(indices)) {
//				return f;
//			}
//		}
//		return null;
		return iFieldsMap.get(xpath + indices);
	}
	
	
	
	@Override
	public Document get(IIdentifier id, String revision)
			throws MetadataException, SearchException, TranslationException {
		//TODO currently, id.getAsset() won't retrieve the whole asset tree, thus need to 
		//invoke getAssetTree to get again
		logger.debug("In get(id, revision)");
		TTVId assetId = id.getAssetTTVId();
		Document assetDoc = getRevision(assetId, revision, true);
		return assetDoc;
	}

	protected Document getRevision(TTVId ttvId, String revision, boolean isForReadOnly) throws SearchException, TranslationException {
		Asset asset = getAssetTree(ttvId, revision, isForReadOnly);
		// convert the asset to TTV doc
		
		Document tree = new Binder().bind(asset);
		// convert TTV doc to spec xml
		Document assetDoc = getFromTTV().translate(kludge(tree));
		
		return assetDoc;
	}
	
	
	protected int parseThenGetInternalRevisionNumber(String revision) throws SearchException{
		int delimiterIndex = revision.lastIndexOf(Asset.EXTERNAL_INTERNAL_REVISION_DELIMITER);
		if(delimiterIndex<0|| delimiterIndex==revision.length()-1){
			throw new SearchException("Revision: " + revision + " is not in the format of " + RootAssetRevision.REVISION_FORMAT);
		}else{
			return Integer.parseInt(revision.substring(delimiterIndex+1));
		}
	}
	
	protected String parseThenGetExternalRevisionNumber(String revision) throws SearchException{
		int delimiterIndex = revision.lastIndexOf(Asset.EXTERNAL_INTERNAL_REVISION_DELIMITER);
		if(delimiterIndex<0){
			throw new SearchException("Revision: " + revision + " is not in the format of " + RootAssetRevision.REVISION_FORMAT);
		}else{
			String externalRevision = revision.substring(0, delimiterIndex);
			if(externalRevision == null){
				externalRevision = EMPTY_STRING_VALUE;
			}
			return externalRevision;
		}
	}

	@Override
	public List<RootAssetRevision> getRevisions(IIdentifier id)
			throws MetadataException, SearchException, TranslationException {
		Asset asset = id.getAsset();
		if(!(asset instanceof Group)){
			throw new MetadataException("Asset identified by input id: " + id + " is not a top-level asset!");
		}
		 //TODO, this is work around to get the whole asset tree (after id.getAsset())
		 // needs to be moved to a Stored Procedure
		asset = metadataManagerDAO.getAsset(asset.getTTVId(), true);
		return ((Group)asset).getRevisions();
	}
	
	/**
	 * Given an Asset, we need to find out if it exists in a list of search targets
	 * @param searchCandidate the asset we are searching for
	 * @param searchTargets list of Assets to be searched
	 * @return the asset from searchTargets if found; null if not found
	 */
	protected Asset findAsset(Asset searchCandidate, List<Asset> searchTargets) {
		SearchCriteria crit = prepareSearchPathById(searchCandidate.getFields());
		boolean assetFound = false;
		for (Asset searchTarget : searchTargets) {

			List<Field> fields = searchTarget.getFields();
			assetFound = isSearchCriteriaPresent(crit, fields);
			if (assetFound == true)
				return searchTarget;
		}
		return null;
	}
	
	/**
	 * @param ttvId
	 * @param isForReadOnly TODO
	 * @param revisionNumber
	 * @return
	 * @throws SearchException
	 */
	protected Asset getAssetTree(TTVId ttvId, String revision, boolean isForReadOnly) throws SearchException{
		//TODO, this is work around to get the whole asset tree (after id.getAsset())
		 // needs to be moved to a Stored Procedure
		int internalRevisionNumber = NextRevision.NULL_REVISION_NUMBER;
		if (revision != null) {
			internalRevisionNumber= parseThenGetInternalRevisionNumber(revision);
			if(internalRevisionNumber<=NextRevision.NULL_REVISION_NUMBER){
				throw new SearchException("Version: " + revision + " is invalid, please provide a valid version.");
			}
		} 
		
		Asset asset;
		try {
			asset = metadataManagerDAO.getAsset(ttvId, internalRevisionNumber, isForReadOnly);
		} catch (InvalidRevisionException e) {
			throw new SearchException("Version: " + revision + " is invalid, please provide a valid version.");
		}
		return asset;
	}
	
	@Override
	public Document rollBackToRevision(TTVId ttvId, String revision)
			throws MetadataException, SearchException, TranslationException, InvalidRevisionException {
//		TODO uncomment the following line to verify that rollback revision is not currently the latest revision number
		verifyVersionNumber(ttvId, revision);
		Asset oldAsset = getAssetTree(ttvId, revision, true);
		List<Asset> toBeMergedAssets = new ArrayList<Asset>();
		toBeMergedAssets.add(oldAsset);
		RootAssetRevision rootAssetRevision = new RootAssetRevision("test", "Rollback to revision " + revision, parseThenGetExternalRevisionNumber(revision));
		mergeSaveAssets(toBeMergedAssets, true, rootAssetRevision, true, false);
		return getRevision(ttvId,null, true);
	}


	/**
	 * Verify that rollback revision is not currently the latest revision number
	 * @param ttvId
	 * @param revision
	 * @throws SearchException 
	 * @throws InvalidRevisionException 
	 */
	private void verifyVersionNumber(TTVId ttvId, String revision) throws SearchException, InvalidRevisionException{
		//get the latest revision
		Asset oldAsset = getAssetTree(ttvId, null, true);
		if(revision.equalsIgnoreCase(oldAsset.getVersion())){
			throw new InvalidRevisionException("Can't rollback to latest version: " + revision + ", please try a different version.");
		}
		
	}

	@Override
	public List<Asset> save(Document doc, String revisionSource,
			String revisionComment, String externalRevision)
			throws MetadataException {
		// TODO Auto-generated method stub
		return null;
	}

	public Spec getSpecName(Document doc) {
		String name = doc.getDocumentElement().getNodeName();
		if (name.equals(rootElementName)) {
			return spec;
		}
		return null;
	}

	
	/**
	 * Iterate through oldFields to find out any fields missing from newFields, i.e. to be deleted
	 * 
	 * @param <N>
	 * @param <O>
	 * @param newFields
	 * @param oldFields
	 * @param assetType
	 * @return
	 */
	protected  <N extends IField, O extends IField> List<O> getToBeDeleted(Map<String,N> newFieldsMap, List<O> oldFields, String assetType){
		List<O> toBeDeletedFields = new ArrayList<O>();
		
		if(oldFields != null) {
			for (O oldField : oldFields) {
				if(isXPathPartOfSpec(assetType, oldField.getTtvXPath())){
				// see if this existing field is in the incoming asset
				N newField = getFieldForXpathAndIndex(newFieldsMap, oldField.getTtvXPath(), oldField
						.getIndices());
				if (newField == null ) {
					toBeDeletedFields.add(oldField);
				}
				}
			}
		}
		return toBeDeletedFields;
	}
	
	/**
	 * check if this xpath is  in the spec specific ttv xpath list.
	 * 
	 * @param assetType
	 * @param ttvXPath
	 * @return
	 */
	protected boolean isXPathPartOfSpec(String assetType, String ttvXPath){
		assetType = assetType.toLowerCase();
		Set<String> xpathSet = null;
		if (specSpecificTTVXpaths.containsKey(assetType)) {
			xpathSet = specSpecificTTVXpaths.get(assetType);
		}
		
		return (xpathSet != null && xpathSet.contains(ttvXPath));
	}
	/**
	 * To remove a FieldRevision, we need to
	 * 1. Get all its previous revisions whose DeleteRevision is NULL_REVISION_NUMBER(in class NextRevision)
	 * 2. Reset all of them and the FieldRevison itself's DeleteRevision to {@param nextRevision}}
	 * @param toBeDeleted
	 * @param nextRevision
	 */
	protected void removeFieldRevisions(Asset oldAsset, List<FieldRevision> toBeDeleted, NextRevision nextRevision){
		if(toBeDeleted != null && toBeDeleted.size()>0){
			oldAsset.setLatestRevisionNumber(nextRevision.getRevisionNumber());
			for(FieldRevision field: toBeDeleted){
				field.setDeleteRevision(nextRevision.getRevisionNumber());
				List<FieldRevision> previousUnDeletedFields = metadataManagerDAO.getUnDeletedFieldRevsions(field);
				for(FieldRevision previousUnDeletedField : previousUnDeletedFields){
					previousUnDeletedField.setDeleteRevision(nextRevision.getRevisionNumber());
					oldAsset.addFieldRevision(previousUnDeletedField, nextRevision);
				}
			}
		}
		
	}
	
	/**
	 * Searches for the incoming doc in the database and merges the existing
	 * asset with the incoming doc and returns it (Merge target is the new
	 * asset). If the incoming doc was not found in the db, it converts it to
	 * asset model and returns it.
	 * 
	 * @param doc
	 * @return
	 * @throws MetadataException
	 */
	@Transactional
	public List<Asset> mergeWithoutSave(Document doc) throws MetadataException {
		List<Asset> assets = convertDocumentToAssets(doc);
		List<Asset> returnAssets = new ArrayList<Asset>();

		Iterator<Asset> assetIter = assets.iterator();

		while (assetIter.hasNext()) {
			Asset newAsset = assetIter.next();
			newAsset = AssetUtil.deleteEmptyFields(newAsset);
			Asset existingAsset = null;

			try {
				existingAsset = extractId(newAsset).getAsset();
			} catch (SearchException e) {
				// was not found. will use asset instead.
			}

			if (existingAsset == null || existingAsset.getState() == AssetState.INACTIVE) {
				NextRevision nextRevision = new NextRevision();
				AssetUtil.copyFieldsToFieldRevisions(newAsset, nextRevision);
				returnAssets.add(newAsset);
				continue;
			}

			// so we found this asset in db. now start doing the merge
			if (existingAsset instanceof Group) {
				mergeRecursively(newAsset, existingAsset);
				returnAssets.add(newAsset);
			} else {
				String assetType = getAssetType(newAsset);
				mergeFields(newAsset, existingAsset, assetType);
				newAsset.setTTVId(existingAsset.getTTVId());
				returnAssets.add(existingAsset);
			}
		}

		return returnAssets;
	}

	/**
	 * Assumes that the incoming document contains a single title, which needs to be
	 * merged with asset specified by assetId
	 * 
	 */
	@Transactional
	public List<Asset> mergeWithoutSave(Long assetId, Document doc) throws MetadataException {
		List<Asset> returnAssets = new ArrayList<Asset>();
		Asset existingAsset = null;
		TTVId ttvId = (TTVId) IdentifierFactory.getTTVIdentifier();
		ttvId.setId(assetId);		
		try {
			 existingAsset = getMetadataManagerDAO().getAsset(ttvId, false);
		} catch (SearchException e) {	
			throw new MetadataException("Unable to find asset with id=" + ttvId.getId());
		}
		
		List<Asset> assets = convertDocumentToAssets(doc);
		Iterator<Asset> assetIter = assets.iterator();

		while (assetIter.hasNext()) {
			Asset newAsset = assetIter.next();
			newAsset = AssetUtil.deleteEmptyFields(newAsset);
			
			if (existingAsset == null || existingAsset.getState() == AssetState.INACTIVE) {
				NextRevision nextRevision = new NextRevision();
				AssetUtil.copyFieldsToFieldRevisions(newAsset, nextRevision);
				returnAssets.add(newAsset);
				continue;
			}
			// so we found this asset in db. now start doing the merge
			if (existingAsset instanceof Group) {
				mergeRecursively(newAsset, existingAsset);
				returnAssets.add(newAsset);
			} else {
				String assetType = getAssetType(newAsset);
				mergeFields(newAsset, existingAsset, assetType);
				newAsset.setTTVId(existingAsset.getTTVId());
				returnAssets.add(existingAsset);
			}
		}
		
		return returnAssets;
	}

	
	/**
	 * merges the existingAsset into the newAsset. Note: No changes are made to
	 * the existingAsset. Also, since File is not pruned by TTVSPecHandler and
	 * since we don't have a Identifier for FILE, we just add all exsiting files.
	 * 
	 * @param newAsset
	 * @param existingAsset
	 */
	private void mergeRecursively(Asset newAsset, Asset existingAsset) {
		String assetType = getAssetType(newAsset);

		newAsset.setTTVId(existingAsset.getTTVId());
		mergeFields(newAsset, existingAsset, assetType);

		for (Relation r : existingAsset.getRelations()) {
			Asset existingTargetAsset = r.getTargetAsset();
			String existingAssetType = getAssetType(existingTargetAsset);
			if (requireSpecSpecificMerging(existingAssetType)) {
				// if newAsset has a title, let it be. else add this title to
				// that asset.
				specSpecificMergingNoSave(newAsset, r, existingTargetAsset);
			} else {
				Asset newTargetAsset = findAsset(existingTargetAsset, newAsset
						.getImmediateChildren());
				if (newTargetAsset == null) {
					if (!isAssetTypePartOfSpec(existingAssetType)) {
						// this means the existing Asset was a part of another
						// spec and thats why its not coming in this doc. Need
						// to add it.
						Item i = new Item();
						if (existingAssetType.equals(ItemType.FILE.toString())) {
							i = new File();
						}
						i.setType(existingAssetType);
						i.setFields(existingTargetAsset.getFields());
						i.setTTVId(existingTargetAsset.getTTVId());
						newAsset.addChild(i);
					}
				} else {
					mergeRecursively(newTargetAsset, existingTargetAsset);
				}
			}
		}
	}

	/**
	 * newFields is the latest field list. Only merge that needss to happen is
	 * the copy of fields from the existingAsset that were not a part of the
	 * currentSpec (hence they did not come in the current doc).
	 * 
	 * @param newAsset
	 * @param existingAsset
	 * @param assetType
	 */
	protected void mergeFields(Asset newAsset, Asset existingAsset,
			String assetType) {
		List<Field> newFields = newAsset.getFields();
		List<Field> oldFields = existingAsset.getFields();

		for (Field f : oldFields) {
			Field newField = getFieldForXpathAndIndex(newAsset.getFieldsMap(), f
					.getTtvXPath(), f.getIndices());
			if (newField == null) {
				// if field was not there in new asset and if the field in old
				// asset was not a part of this spec, then create a new field in
				// new asset
				if (!isXPathPartOfSpec(assetType, f.getTtvXPath())) {
					Field newf = new Field();
					newf.setTtvXPath(f.getTtvXPath());
					newf.setValue(f.getValue());
					newf.setIndices(f.getIndices());
					newFields.add(newf);
				}
			}
		}
	}

	/**
	 * {@inheritDoc}
	 */
	public Document convertAssetToXMLDocument(Asset asset) throws TranslationException {
		Document tree = new Binder().bind(asset);
		// convert TTV doc to spec xml
		Document assetDoc = getFromTTV().translate(kludge(tree));
		
		return assetDoc;
	}
}
