<?php
/*
  Copyright (C) 2008 by Phase2 Technology.
  Author(s): Frank Febbraro, Irakli Nadareishvili

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License.
  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY. See the LICENSE.txt file for more details.

  $Id: calais.module,v 1.3.2.16.2.22.2.16 2010/08/05 21:49:51 febbraro Exp $
 */
/**
 * @file
 */

// Request constants
define('CALAIS_REQUEST_NO',     0);
define('CALAIS_REQUEST_MANUAL', 1);
define('CALAIS_REQUEST_AUTO',   2);


// Constants for how to apply tags once retrieved
define('CALAIS_PROCESS_AUTO',       'AUTO');
define('CALAIS_PROCESS_AUTO_ONCE',  'AUTO_ONCE');
define('CALAIS_PROCESS_MANUAL',     'MANUAL');

define('CALAIS_RDF_REPOSITORY',     'calais_node');
define('CALAIS_WATCHDOG',           'Calais');

/**
 * Implementation of hook_init().
 */
function calais_init() {
  // Only load these if it is not a cached page
  module_load_include('inc', 'calais', 'calais.ui');
  module_load_include('inc', 'calais', 'calais.admin');
}

/**
 * Implementation of hook_perm().
 */
function calais_perm() {
  return array('administer calais', 'access calais', 'access calais rdf');
}

/**
 * Implementation of hook_menu().
 */
function calais_menu() {
  $items = array();

  $items['admin/settings/calais/calais-node'] = array(
    'title' => 'Calais Node Settings',
    'description' => 'Configurations for Calais Integration with content nodes',
    'page callback' => 'drupal_get_form',
    'page arguments' => array('calais_admin_settings'),
    'access arguments' => array('administer calais'),
    'type' => MENU_LOCAL_TASK
  );

  $items['admin/settings/calais/queue'] = array(
    'title' => 'Calais Bulk Processing',
    'description' => 'Submits all Nodes of a specific type to Calais for processing',
    'page callback' => 'calais_bulk_process',
    'access arguments' => array('administer calais'),
    'weight' => 10,
    'type' => MENU_LOCAL_TASK
  );

  $items['admin/settings/calais/queue/clear/%'] = array(
    'title' => 'Clear Calais Bulk Queue',
    'description' => 'Clears the bulk queue for a particular content type',
    'page callback' => 'drupal_get_form',
    'page arguments' => array('calais_queue_clear_confirm', 5),
    'access arguments' => array('administer calais'),
    'type' => MENU_CALLBACK
  );

  $items['admin/settings/calais/queue/enqueue/%'] = array(
    'title' => 'Calais Queue Processing',
    'description' => 'Submits all Content of the specified type to Calais for processing',
    'page callback' => 'calais_enqueue',
    'page arguments' => array(5),
    'access arguments' => array('administer calais'),
    'type' => MENU_CALLBACK
  );

  $items['node/%node/calais'] = array(
    'title' => 'Calais',
    'page callback' => 'drupal_get_form',
    'page arguments' => array('calais_keywords_form', 1),
    'access callback' => 'calais_access',
    'access arguments' => array(1),
    'weight' => 2,
    'type' => MENU_LOCAL_TASK
  );

  $items['node/%/calais/rdf'] = array(
    'title' => 'Calais',
    'page callback' => 'calais_get_rdf',
    'page arguments' => array(1),
    'access callback' => 'user_access',
    'access arguments' => array('access calais rdf'),
    'type' => MENU_CALLBACK
  );

  return $items;
}

/**
 * Implementation of hook_preprocess_node().
 *
 * If RDF exists for a node, provide an autodiscovery link.
 */
function calais_preprocess_node(&$variables) {
  $node = $variables['node'];
  $count = rdf_count(NULL, NULL, NULL, calais_rdf_options($node->nid));
  
  // Add an RDF autodiscovery link when the node is displayed by itself as a page:
  if (user_access('access calais rdf') && $count) {
    rdf_add_autodiscovery_link(t('Calais RDF'), url("node/$node->nid/calais/rdf"), 'rdf+xml');
  }
}

/**
 * Provide the Calais RDF describing a node.
 *
 * @param $nid
 *    The node id
 */
function calais_get_rdf($nid) {
  $format = 'rdf+xml';
  $rdf_triples = rdf_query(NULL, NULL, NULL, calais_rdf_options($nid));
  $rdf = rdf_serialize($rdf_triples, array('format' => $format));

  $formats  = rdf_get_formats();
  $output_format = $formats[$format];
  drupal_set_header('Content-Type: '. $output_format->mime_type .'; charset='. $output_format->encoding);
  drupal_set_header('Content-Length: '. strlen($rdf));
  print $rdf;
}

/**
 * Implementation of hook_access().
 */
function calais_access($node) {
  return user_access('access calais') && calais_get_request_type($node) != CALAIS_REQUEST_NO;
}

/**
 * Implementation of hook_theme();
 */
function calais_theme() {
  return array(
    'calais_suggestions' => array(
      'arguments' => array('vid', 'terms'),
      'file' => 'calais.ui.inc',
    ),
    'calais_nodetype_settings' => array(
      'arguments' => array('form' => array()),
      'file' => 'calais.admin.inc',
    ),
    'calais_enqueue_form' => array(
      'arguments' => array('form' => array()),
      'file' => 'calais.admin.inc',
    ),
  );
}

/**
 * Implementation of hook_nodeapi().
 *
 * Process node updates and if applicable, update the Calais terms.
 */
function calais_nodeapi(&$node, $op, $a3 = NULL, $a4 = NULL) {
  switch ($op) {
    case "delete":
      calais_delete_node($node);
      break;
    case "insert":
    case "update":
      if (calais_get_request_type($node) == CALAIS_REQUEST_AUTO) {
        $process_type = calais_get_processing_type($node);
        $threshold = calais_get_node_threshold($node->type);
        calais_process_node($node, $process_type, $threshold, $op);
      }
      break;
  }
}

/**
 * A node is getting removed so do some cleanup.
 *
 * @param $node The node for deletion
 */
function calais_delete_node($node) {
  calais_remove_rdf($node);
  db_query("DELETE FROM {calais_term_node} WHERE nid=%d", $node->nid);
}

/**
 * Process the provided node according to the processing type.
 *
 * @param $node 
 *    The node to process
 * @param $process_type 
 *    The processing type override for this node.
 * @param $threshold 
 *    The threshold override for this node.
 * @param $op 
 *    Is this an 'insert' (first time) or and 'update'
 */
function calais_process_node(&$node, $process_type = NULL, $threshold = NULL, $op = 'insert') {
  $request_type = calais_get_request_type($node);
  if ($request_type == CALAIS_REQUEST_NO || isset($node->calais->processor))
    return;

  // On feedapi runs, updates are missing vid so get it.
  if(!isset($node->vid)) {
    $node->vid = db_result(db_query("SELECT vid FROM {node} WHERE nid=%d", $node->nid));
  }

  $keywords = _calais_use_semanticproxy($node) 
                  ? calais_process_with_semanticproxy($node)
                  : calais_process_with_calais($node);

  if(empty($keywords))
    return;

  if (!isset($process_type)) {
    $process_type = calais_get_processing_type($node);
  }
  if (!isset($threshold)) {
    $threshold = calais_get_node_threshold($node->type);
  }

  db_query("DELETE FROM {calais_term_node} WHERE nid=%d", $node->nid);

  module_invoke_all('calais_preprocess', $node, $keywords);

  $vocabularies = calais_get_entity_vocabularies($node->type);
  foreach ($keywords as $cat => $metadata) {
    $vid = $vocabularies[$cat];
    if ($vid) { // Only process terms for vocabularies that we care about
      foreach ($metadata->terms as $term) {
        calais_associate_term($vid, $term, $node);
        if ( ($process_type == CALAIS_PROCESS_AUTO || ($process_type == CALAIS_PROCESS_AUTO_ONCE && $op == "insert")) 
            && round($term->relevance * 1000) >= round($threshold * 1000)) {
          calais_apply_to_node($vid, $term, $node);
        }
      }
    }
  }

  calais_store_rdf($node, $node->calais->service->flatTriples);
  module_invoke_all('calais_postprocess', $node, $keywords);
}

/**
 * Use the actual Calais Web Service to process this Node's content.
 *
 * @param $node The node to process
 * @return The Calais keywords for this node.
 */
function calais_process_with_calais(&$node) {
  // #412796: This was changing $node->body w/o the clone
  $loaded_node = node_build_content(drupal_clone($node));
  $body = strip_tags(drupal_render($loaded_node->content));
  $date = format_date($node->created, 'custom', 'r');

  // Allow modification of the content sent to Calais
  drupal_alter("calais_body", $body, $loaded_node);

  if (!is_calais_api_key_set()) {
    $node->calais->service = NULL;
    return array();
  }

  $calais = new Calais();
  $keywords = $calais->analyzeXML($node->title, $body, $date);
  $node->calais->service = $calais;

  return $keywords;
}

/**
 * Use SemanticProxy to process this Node.
 *
 * @param $node The node to process
 * @return The Calais keywords for this node.
 */
function calais_process_with_semanticproxy(&$node) {
  $key = drupal_strtolower($node->type);
  $spfield = variable_get("calais_semanticproxy_field_{$key}", '');
  
  if(!empty($spfield)) {
    if($spfield == 'calais_feedapi_node') {
      // On insert, $node->feedapi_node is available
      if(isset($node->feedapi_node)) {
        $url = $node->feedapi_node->feed_item->options->original_url;
      }
      else { // On update, we need to pull the URL from the DB. Should always be the same. #feedapifail
        $url = db_result(db_query('SELECT fi.url FROM {feedapi_node_item} fi WHERE fi.nid = %d', $node->nid));
      }
    }
    else {
      // Grab URL from link or textfield 
      $field = $node->{$spfield};
      if(isset($field)) {
       $url = isset($field[0]['url']) ? $field[0]['url'] : $field[0]['value'];
      }
    }
    
    drupal_alter('semanticproxy_url', $url);
    
    $sp = new SemanticProxy();
    $keywords = $sp->analyze($url);
    $node->calais->service = $sp;
    
    if($keywords) {
      // Store the document text if configured
      $docfield = variable_get("calais_semanticproxy_document_{$key}", '');
      if(!empty($docfield) && module_exists('content')) {
        $original_text = $sp->processor->document;
        $node->{$docfield}[0]['value'] = trim($original_text);
        if($node->is_new) {
          content_insert($node);
        }
        else {
          content_update($node);
        }
      }
      return $keywords;
    }
  } 
  
  return array(); 
}

/**
 * Implementation of hook_calais_preprocess().
 *
 * Make sure that a vocabulary exists for all entities returned, if not, create it.
 */
function calais_calais_preprocess(&$node, &$keywords) {
  $vocabularies = calais_get_entity_vocabularies();

  foreach ($keywords as $cat => $metadata) {
    $vid = $vocabularies[$cat];

    // Create a vocabulary if we come across an entity that we dont know of.
    if (!$vid) {
      $vocabularies[$cat] = calais_create_entity_vocabulary($metadata->readable_type());;
      variable_set('calais_vocabulary_names', $vocabularies);
      drupal_set_message(t('Added a Vocabulary for the new Calais Entity %entity. You may need to update your Calais Node Settings to take advantage of this new entity.', array('%entity' => $cat)));
    }
  }
}

/**
 * Associates a retrieved Calais term with a node.
 *
 * @param $vid  The vocabulary id of the term
 * @param $term The CalaisTerm object
 * @param $node The node for association
 */
function calais_associate_term($vid, &$term, $node) {
  $local_term = calais_get_term($vid, $term);

  $calais_term = $local_term ? $local_term : new CalaisTerm($term->guid, $term->name, $term->relevance);
  // We could array_merge, but defined properties with no value will 
  // overwrite the locals (tid, tdid, etc.), so just be explicit for now.
  $calais_term->vid = $vid; 
  $calais_term->relevance = $term->relevance;
  $calais_term->resolved_guid = $term->resolved_guid;
  $calais_term->resolved_name = $term->resolved_name;
  $calais_term->resolved_type = $term->resolved_type;
  $calais_term->extra = array_merge($calais_term->extra, $term->extra);
  
  calais_save_term($calais_term);
  calais_assign_term_to_node($node->nid, $calais_term);
  $term = $calais_term;
}

/**
 * Find the local Calais Term. First look it up by GUID, if not found, use Vocabulary ID and Name.
 *
 * @param $vid
 *    The Vocabulary ID of the calais term
 * @param $term
 *    The CalaisTerm object to lookup, or a term id
 * @return  
 *    The Calais Term object, false is the term is not found.
 */
function calais_get_term($vid, $term) {
  
  if(is_numeric($term)) {
    $local_term = db_fetch_object(db_query("SELECT * FROM {calais_term} WHERE tid = %d", $term));
  }
  else {
    $local_term = db_fetch_object(db_query("SELECT * FROM {calais_term} WHERE vid = %d AND guid = '%s'", $vid, $term->guid));
    if (!$local_term) {
      // Fall back for older terms with no GUID
      $local_term = db_fetch_object(db_query("SELECT * FROM {calais_term} WHERE vid = %d AND name = '%s'", $vid, $term->name));
    }
  }

  if ($local_term) {
    calais_load_term_extra($local_term);
    return $local_term;    
  }
  
  return FALSE;
}

/**
 * Find the Calais Term by Taxonomy Term Data 
 *
 * @param $term_id
 *    The Term Data ID
 * @return  
 *    The Calais Term object, false is the term is not found.
 */
function calais_get_term_by_taxonomy($term_id) {
  $term = db_fetch_object(db_query("SELECT * FROM {calais_term} WHERE tdid = %d", $term_id));
  if ($term) {
    calais_load_term_extra($term);
    return $term;    
  }
  return FALSE;
}

/**
 * Get the CalaisTerm for the term name and node id. 
 * NOTE: Columns were renamed here so that this object has the same attributes as CalaisTerm for consistency
 *
 * @param $nid The node id to limit the calais term look up
 * @param $term_name The term name to look up
 * @return The Calais Term, or FALSE is it is not found.
 */
function calais_get_node_term_by_name($nid, $term_name) {
  return db_fetch_object(
            db_query("SELECT t.*, tn.relevance
                      FROM {calais_term} t
                      JOIN {calais_term_node} tn ON tn.tid = t.tid
                      WHERE tn.nid = %d and t.name = '%s'", $nid, $term_name));
}

/**
 * Save an existing Calais term
 *
 * @param $term
 *    The CalaisTerm to lookup extra data
 */
function calais_save_term(&$term) {
  if(empty($term->tid)) {
    drupal_write_record('calais_term', $term);        
  }
  else {
    drupal_write_record('calais_term', $term, 'tid');        
  }
  calais_save_term_extra($term);
}

/**
 * Loads the resolved data associated with the supplied Calais Term
 *
 * @param $term
 *    The CalaisTerm to lookup extra data
 */
function calais_load_term_extra(&$term) {
  if(isset($term->resolved_type)) {
    $table = 'calais_term_data_' . $term->resolved_type;
    $data = db_fetch_array(db_query("SELECT * FROM {" . $table . "} WHERE tid = %d", $term->tid));
    $term->extra = $data ? $data : array();
  }
  else {
    $term->extra = array();    
  }
}

/**
 * Saves the resolved data associated with the supplied Calais Term
 *
 * @param $term
 *    The CalaisTerm to lookup extra data
 */
function calais_save_term_extra(&$term) {
  if(isset($term->resolved_type) && !empty($term->extra)) {
    $table = 'calais_term_data_' . $term->resolved_type;
    $data = &$term->extra;
    $data['tid'] = $term->tid;        
    if(isset($data['did'])) {
      drupal_write_record($table, $data, 'did');        
    }
    else {
      drupal_write_record($table, $data);
    }
  }
}


/**
 * Assign a taxonomy term to the node based on the Calais Term. If no relationship
 * exists with the CalaisTerm and the target taxonomy term add it.
 *
 * @param $vid  The vocabulary id of the term
 * @param $term The CalaisTerm object
 * @param $node The node for association
 */
function calais_apply_to_node($vid, &$term, $node) {
  
  if(empty($term->tdid)) {
    $term->tdid = calais_find_taxonomy_term($vid, $term->name);
    if (!$term->tdid) {
      // An existing term was not found, create it
      $tax_term = array('vid' => $vid, 'name' => $term->name);
      taxonomy_save_term($tax_term);
      $term->tdid = $tax_term['tid'];
    }
    calais_save_term($term);
  }  
  calais_assign_taxonomyterm_to_node($node->nid, $node->vid, $term->tdid);
}

/**
 * Find a taxonomy term in the provided vocabulary by name.
 *
 * @param $vid
 *    The vocabulary id for the term to find.
 * @param $term_name
 *    The Term Name to find a matching taxonomy term
 * @return $tid 
 *    Term id from the {term_data} table, or FALSE is one is not found.
 */
function calais_find_taxonomy_term($vid, $term_name) {
  return db_result(db_query("SELECT tid FROM {term_data} WHERE vid = %d AND name = '%s' ORDER BY tid ASC", $vid, $term_name));
}

/**
 * Create the relationship between a Node and Calais term.
 *
 * @param $nid
 *    The node id
 * @param $term
 *    The CalaisTerm
 */
function calais_assign_term_to_node($nid, $term) {
  db_query("DELETE FROM {calais_term_node} where nid = %d and tid = %d", $nid, $term->tid);
  db_query("INSERT INTO {calais_term_node} (nid, tid, relevance) VALUES(%d, %d, %f)", $nid, $term->tid, $term->relevance);
}

/**
 * Create the relationship between a Node and a Taxonomy Term.
 *
 * @param $nid
 *    The Node ID
 * @param $vid
 *    The Node Revision ID
 * @param $tid
 *    The Taxonomy Term ID
 */
function calais_assign_taxonomyterm_to_node($nid, $vid, $tid) {
  db_query("DELETE FROM {term_node} where nid = %d and vid = %d and tid = %d", $nid, $vid, $tid);
  db_query("INSERT INTO {term_node} (nid, vid, tid) VALUES(%d, %d, %d)", $nid, $vid, $tid);
}

/**
 * Implementation of hook_taxonomy();
 *
 * Process the delete of taxonomy terms to make sure the proper cleanup 
 * happens to Calais Terms that reference them.
 */
function calais_taxonomy($op, $type, $data) {
  if ($type == 'term' && $op == 'delete') {
    db_query("UPDATE {calais_term} SET tdid = NULL WHERE tdid = %d", $data['tid']);
  }
}

/**
 * Determines which retrieval type (none, manual, auto) is
 * applicable to a node specific node instance passed as an argument.
 *
 * @param mixed $var either a Node object or a valid node_type String
 */
function calais_get_request_type($var) {
  $nodetype = is_object($var) ? $var->type : $var;
  $key = drupal_strtolower($nodetype);
  return variable_get("calais_node_{$key}_request", CALAIS_REQUEST_NO);
}

/**
 * Determines which processing type (none, manual, auto) is
 * applicable to a node specific node instance passed as an argument.
 *
 * @param mixed $var either a Node object or a valid node_type String
 */
function calais_get_processing_type($var) {
  $nodetype = is_object($var) ? $var->type : $var;
  $key = drupal_strtolower($nodetype);
  return variable_get("calais_node_{$key}_process", CALAIS_PROCESS_MANUAL);
}

/**
 * Return the relevancy threshold settings for the specific node type.
 *
 * @param $node_type Node type.
 */
function calais_get_node_threshold($node_type) {
  $type = drupal_strtolower($node_type);
  return variable_get("calais_threshold_{$type}", 0.0);
}


/**
 * Get a list of the entities that Calais API defines:
 * http://opencalais.com/APIresponses
 *
 * If $type is not specified it will return all known Calais Entities.
 * however if $type is specified it will return only those Entities that the
 * specific node type is interested in, or the global list if no specific node type
 * settings have been configures.
 *
 * @param $type
 *    A node type, if Entities need to be filtered.
 * @return 
 *    Associative array of [entity_name => vid]'s
 */
function calais_get_entity_vocabularies($type = NULL) {
  $all_vocabs = variable_get('calais_vocabulary_names', array());

  if ($type) {
    $applied_entities = variable_get('calais_applied_entities_global', array());
    if (!variable_get("calais_use_global_{$type}", TRUE)) {
      $applied_entities = variable_get("calais_applied_entities_{$type}", array());
    }

    foreach ($applied_entities as $entity => $apply_entity) {
      if ($apply_entity) {
        $applied_vocabs[$entity] = $all_vocabs[$entity];
      }
    }
  }
  else {
    $applied_vocabs = $all_vocabs;
  }

  return $applied_vocabs;
}

/**
 * Returns the Calais vocabularies that are enabled for this node type.
 * <p>Compare to: calais_api_get_all_entities() which gives entityname/vid pairs.
 * Vocabulary names can be updated by users. Entity names stay as defined
 * by Calais.
 *
 * @param $node_type The node type
 *
 * @return Array of vocabularies keyed on vid;
 */
function calais_get_vocabularies($type = NULL) {

  $vocs = calais_get_entity_vocabularies($type);
  
  if(empty($vocs))
    return array();
  
  $pattern = implode(",", $vocs);

  if ($type) {

    $result = db_query("SELECT v.vid, v.*, n.type
                        FROM {vocabulary} v
                        LEFT JOIN {vocabulary_node_types} n ON v.vid = n.vid
                        WHERE n.type = '%s' and v.vid in (%s)
                        ORDER BY v.weight, v.name",
      $type, $pattern);
  }
  else {
    $result = db_query('SELECT v.*
                        FROM {vocabulary} v
                        WHERE v.vid in (%s)
                        ORDER BY v.weight, v.name',
      $pattern);
  }

  $vocabularies = array();
  $node_types = array();
  while ($voc = db_fetch_object($result)) {

    // If no node types are associated with a vocabulary, the LEFT JOIN will
    // return a NULL value for type.
    if (isset($voc->type)) {
      $node_types[$voc->vid][$voc->type] = $voc->type;
      unset($voc->type);
      $voc->nodes = $node_types[$voc->vid];
    }
    elseif (!isset($voc->nodes)) {
      $voc->nodes = array();
    }
    $vocabularies[$voc->vid] = $voc;
  }

  return $vocabularies;
}

/**
 * Returns a map of suggested terms for a given vocabulary id. This will return
 * suggested terms for all vocabularies if no vid is specified.
 *
 * @param $nid The node id to get the calais keywords
 * @param $type The node type
 * @param $vid Optional, a specific vocabulary id to return terms
 * @param $threshold 
 *    Optional, A relevance threshold for suggestions. If not specified the default for the node type will be used.
 *
 * @return Array { $vid => array(termObj1, termObj2, termObj3) }
 */
function calais_get_keywords($nid, $type, $vid = NULL, $threshold = NULL) {
  $terms = array();

  if(!isset($threshold)) {
    $threshold = calais_get_node_threshold(drupal_strtolower($type));
  }

  if ($vid) {
    $res = db_query("SELECT t.*, tn.relevance
                     FROM {calais_term} t
                     JOIN {calais_term_node} tn ON tn.tid = t.tid
                     WHERE tn.nid = %d and t.vid = %d and tn.relevance >= %f
                     ORDER BY relevance DESC ", $nid, $vid, $threshold);
    $terms[$vid] = array();
    while ($obj = db_fetch_object($res)) {
      $terms[$vid][] = $obj;
    }
  }
  else {
    $vocabularies = calais_get_entity_vocabularies();
    foreach ($vocabularies as $vid) {
      $keys = calais_get_keywords($nid, $type, $vid, $threshold);
      $terms[$vid] = $keys[$vid];
    }
  }

  return $terms;
}

/**
 * Creates a new vocabulary for the supplied Calais entity name.
 */
function calais_create_entity_vocabulary($entity) {
    $description = t("Calais Entity Vocabulary: @name", array('@name' => $entity));
    db_query("INSERT INTO {vocabulary} (name,description,module,tags) 
              VALUES('%s','%s','calais',1)", $entity, $description);
    return db_last_insert_id('vocabulary', 'vid');
}

/**
 * Get the standard RDF options array.
 *
 * @param $nid The node id
 */
function calais_rdf_options($nid) {
  return array(
    'graph' => "node/$nid/calais/rdf", 
    'repository' => CALAIS_RDF_REPOSITORY,
  );
}

/**
 * Store the triples for this node in the local RDF store. Storing these allows for more
 * Semantic Web functionality to be developed later by not losing all of the rich metadata
 * returned by Calais. This could be used to batch process data that was missed originally
 * or by querying it later for use in RDFa rendering, etc.
 *
 * @param $node The node for association
 * @param $triples The flat set of triples
 */
function calais_store_rdf($node, $triples) {
  if(variable_get('calais_store_rdf', TRUE) && !empty($triples)) {
    calais_remove_rdf($node);
    rdf_insert_all($triples, calais_rdf_options($node->nid));  
  }
}

/**
 * Store the triples for this node in the local RDF store. Storing these allows for more
 * Semantic Web functionality to be developed later by not losing all of the rich metadata
 * returned by Calais. This could be used to batch process data that was missed originally
 * or by querying it later for use in RDFa rendering, etc.
 *
 * @param $node The node for association
 * @param $triples The flat set of triples
 */
function calais_remove_rdf($node) {
  if(variable_get('calais_store_rdf', TRUE)) {
    rdf_delete(NULL, NULL, NULL, calais_rdf_options($node->nid));
  }
}

/**
 * Is this node configured to use SemanticProxy?
 *
 * @param $node 
 *    The node to examine
 * @return 
 *    TRUE if this node is configured to use SemanticProxy, FALSE otherwise.
 */
function _calais_use_semanticproxy($node) {
  $key = drupal_strtolower($node->type);
  $spfield = variable_get("calais_semanticproxy_field_{$key}", '');
  return !empty($spfield);
}

/**
 * Implementation of hook_semanticproxy_url_alter().
 *
 * SemanticProxy obeys robots.txt and as such will not follow redirects, etc. For example,
 * RSS feeds for Google News Search will send you through Google News to redirect to the real source.
 * SemanticProxy will give a <em>Content Permissions Validator Exception</em> and not follow the 
 * link b/c Google's robots.txt disallows following /news.  We try to strip those out.
 *
 * TODO: Make this URL extraction configurable via admin ui, if possible
 *
 * @param $url 
 *    The URL to alter.
 */
function calais_semanticproxy_url_alter(&$url) {
  $host = parse_url($url, PHP_URL_HOST);

  // Google's robot.txt disallows the follow by bots, so lets extract the real source
  if ($host == 'news.google.com') {
    $querystring = parse_url($url, PHP_URL_QUERY);

    // Split on &, but not if the & is in an html entity. This is a better impl of parse_str().
    $args = preg_split('|&(?!.[a-z0-9]{1,6}+;)|ims', $querystring);
    if(!is_array($args)) {
      return;
    }

    // Process query string args, find the 'url' arg and return the value if it's a valid URL
    foreach($args as $arg) {
      list($key, $val) = explode('=', $arg, 2);
      if ($key == 'url') {
        $val = urldecode($val);
        if(valid_url($val, TRUE)) {
          $url = $val;
          return;
        }
      }
    }
  }
}

/**
 * Implementation of hook_cron();
 */
function calais_cron() {
  $count = variable_get('calais_cron_limit', 0);
  if ($count > 0) {
    calais_process_queue($count);
  }
}

/**
 * Truncate and rebuild the calais queue table.
 * This is how to force processing of legacy content or reprocess existing content.
 *
 * @param $type
 *   A single content type to be processed, leaving the others unaltered.
 */
function calais_build_queue($type = NULL, $process_type = NULL, $threshold = NULL) {
  if (isset($type)) {
    $request_type = calais_get_request_type($type);
    if ($request_type == CALAIS_REQUEST_NO) {
      watchdog(CALAIS_WATCHDOG, 'Node type @type is not configured to process with OpenCalais. Skipped adding to queue for bulk processing.', array('@type' => $type));
      return;
    }
  
    if (!isset($process_type)) {
      $process_type = calais_get_processing_type($type);
    }
    if (!isset($threshold)) {
      $threshold = calais_get_node_threshold($type);
    }
    db_query("DELETE FROM {calais_node_queue} WHERE type = '%s'", $type);
    db_query("INSERT INTO {calais_node_queue} (nid, type, changed, process_type, threshold)
              SELECT n.nid, n.type, n.changed, '%s', %f
              FROM {node} n WHERE n.status = 1 AND n.type = '%s'", array($process_type, $threshold, $type));
      
    return db_result(db_query("SELECT COUNT(*) FROM {calais_node_queue} WHERE type = '%s'", $type));    
  }
  
  return 0;
}

/**
 * Process the queue sending Nodes to Calais for tagging.
 *
 * @param $count
 *      The number of items to process from the queue
 * @param $type
 *      Optional. The node type from the queue to process. Used to limit the type of nodes that get sent.
 */
function calais_process_queue($count, $type = NULL) {
  if (isset($type)) {
    $result = db_query_range("SELECT * FROM {calais_node_queue} WHERE type = '%s' ORDER BY changed DESC, nid DESC", $type, 0, $count);
  }
  else {
    $result = db_query_range("SELECT * FROM {calais_node_queue} ORDER BY changed DESC, nid DESC", 0, $count);
  }
  
  while ($data = db_fetch_object($result)) {
    $node = node_load($data->nid, NULL, TRUE);
    calais_process_node($node, $data->process_type, $data->threshold);
    db_query("DELETE FROM {calais_node_queue} WHERE nid = %d", $data->nid);
    // Sleep for .1 seconds to avoid throttling by OpenCalais 
    usleep(100000); 
  }
  
  watchdog(CALAIS_WATCHDOG, "Processed @count nodes with Calais", array('@count' => $count));  
}


/**
 * Implementation of hook_views_api().
 */
function calais_views_api() {
  return array(
    'api' => 2.0,
    'path' => drupal_get_path('module', 'calais') .'/views',
  );
}
