Обработка большого объема данных в Drupal с помощью Batch API

Обрабатывать большой объем данных будем с помощью Queue API и Batch API, причем есть несколько вариантов их исполнения и некоторые отличия для 6й и 7й версий друпала.

Начнем с батча для Drupal 6 со шкалой прогресса:

И так, имеем какую-нибудь кнопку в форме для запуска обработки или какую-то другую функцию, пускай у нас будет кнопка:

  1. function mymodule_settings_form_submit($form, &$form_state) {
  2.  
  3. $values = $form_state['values'];
  4.  
  5. if ($values['op'] == $values['batch_process']) {
  6.  
  7. //получаем нужные нам данные для обработки
  8. $items = _mymodule_get_data_to_process();
  9.  
  10. //если данные получены, создаем батч
  11. if ($items) {
  12. $batch = array(
  13. 'title' => t('My Batch'),
  14. 'operations' => array(
  15. array('mymodule_batch_process', array($items)),
  16. //функция mymodule_batch_process() задает настройки шкалы прогресса,
  17. //указывает функции для обработки наших данных и собирает данные о их выполнении
  18. ),
  19. 'file' => drupal_get_path('module', 'mymodule') .'/mymodule.batch.inc', //для удобства сделаем отдельным файлом
  20. 'finished' => 'mymodule_batch_finished', //функция, вызываемая после работы батча
  21. 'init_message' => t('Обработка данных...'),
  22. 'progress_message' => t('Очередь @current из @total'),
  23. 'error_message' => t('An error occurred and some or all of the batch has failed.'),
  24. );
  25. batch_set($batch);
  26. batch_process();
  27. }
  28. }
  29. }

Теперь посмотрим содержимое файла mymodule.batch.inc

  1. function mymodule_batch_process($items, &$context) {
  2.  
  3. //задаем поток обрабатываемых данных за раз
  4. $limit = 50;
  5.  
  6. //обнуляем счетчики прогресса
  7. if (empty($context['sandbox']['progress'])) {
  8. $context['sandbox']['progress'] = 0;
  9. $context['sandbox']['max'] = count($items);
  10. $context['results']['import_success'] = 0;
  11. $context['results']['import_fail'] = 0;
  12. }
  13.  
  14. //задаем очередь элементов для обработки потоками
  15. if(empty($context['sandbox']['items'])) {
  16. $context['sandbox']['items'] = $items;
  17. }
  18.  
  19. $counter = 0;
  20. if(!empty($context['sandbox']['items'])) {
  21.  
  22. if ($context['sandbox']['progress'] != 0) {
  23. array_splice($context['sandbox']['items'], 0, $limit);
  24. }
  25.  
  26. foreach ($context['sandbox']['items'] as $item) {
  27. if ($counter != $limit) {
  28. //обрабатываем единицу данных
  29. $result = _mymodule_process_item($item);
  30.  
  31. //обновляем счетчики результатов работы батча
  32. if ($result) {
  33. $context['results']['import_success']++;
  34. } else {
  35. $context['results']['import_fail']++;
  36. }
  37. $counter++;
  38. $context['sandbox']['progress']++;
  39. $context['message'] = t('Now processing item %node of %count', array('%node' => $context['sandbox']['progress'], '%count' => $context['sandbox']['max']));
  40. $context['results']['nodes'] = $context['sandbox']['progress'];
  41. }
  42. }
  43. }
  44.  
  45. if ($context['sandbox']['progress'] != $context['sandbox']['max']) {
  46. $context['finished'] = $context['sandbox']['progress'] / $context['sandbox']['max'];
  47. }
  48. }
  49.  
  50. //функция, срабатывающая по завершению батча, используется например для записи лога и вывода ошибок
  51. function mymodule_batch_finished($success, $results, $operations) {
  52. if ($success) {
  53. _mymodule_write_log_messages($results['import_success'], $results['import_fail']);
  54. } else {
  55. $error_operation = reset($operations);
  56. $message = t('An error occurred while processing %error_operation with arguments: @arguments',
  57. array('%error_operation' => $error_operation[0], '@arguments' => print_r($error_operation[1], TRUE)));
  58. drupal_set_message($message);
  59. }
  60. }

Если нужно запускать батч в фоновом режиме, например при запуске крона, для этого вместо функции mymodule_settings_form_submit() пишем функцию:

  1. function mymodule_cron() {
  2. $items = _mymodule_get_items_to_process();
  3.  
  4. if ($items) {
  5. $batch = array(
  6. 'title' => t('My Batch'),
  7. 'operations' => array(
  8. array('mymodule_batch_process', array($items)),
  9. ),
  10. 'file' => drupal_get_path('module', 'legion_loader') .'/mymodule.batch.inc',
  11. 'finished' => 'mymodule_batch_finished',
  12. 'init_message' => t('Обработка данных...'),
  13. 'progress_message' => t('Очередь @current из @total'),
  14. 'error_message' => t('An error occurred and some or all of the batch has failed.'),
  15. );
  16. batch_set($batch);
  17. $batch = &batch_get();
  18. //отключаем шкалу прогресса
  19. $batch['progressive'] = FALSE;
  20. batch_process();
  21. }
  22. }

Батч для Drupal 7

Удалим для примера все ноды на сайте

  1. function mymodule_settings_form_submit($form, &$form_state) {
  2. $values = $form_state['values'];
  3.  
  4. if ($values['op'] == $values['batch_process']) {
  5.  
  6. $function = 'my_batch';
  7. $batch = $function();
  8. batch_set($batch);
  9. }
  10. }
  11.  
  12. function my_batch() {
  13.  
  14. $nodes = db_query("SELECT nid FROM {node}");
  15.  
  16. foreach ($nodes as $node) {
  17. $items[] = $node->nid;
  18. }
  19.  
  20. foreach ($items as $nid) {
  21. $operations[] = array('node_delete', array($nid));
  22. }
  23. $batch = array(
  24. 'operations' => $operations,
  25. 'finished' => 'my_batch_finished',
  26. 'title' => t('Deleting nodes'),
  27. 'init_message' => t('Batch is starting.'),
  28. 'progress_message' => t('Processed out of.'),
  29. 'error_message' => t('Batch has encountered an error.'),
  30. );
  31. return $batch;
  32. }
  33.  
  34. function my_batch_finished() {
  35. drupal_set_message('finished');
  36. }