略微加速

PHP官方手册 - 互联网笔记

PHP - Manual: parallel\run

2025-01-10

parallel\run

(1.0.0)

parallel\runExecution

说明

parallel\run(Closure $task): ?Future

Shall schedule task for execution in parallel.

parallel\run(Closure $task, array $argv): ?Future

Shall schedule task for execution in parallel, passing argv at execution time.

Automatic Scheduling

If a \parallel\Runtime internally created and cached by a previous call to parallel\run() is idle, it will be used to execute the task. If no \parallel\Runtime is idle parallel will create and cache a \parallel\Runtime.

注意:

\parallel\Runtime objects created by the programmer are not used for automatic scheduling.

参数

task

A Closure with specific characteristics.

argv

An array of arguments with specific characteristics to be passed to task at execution time.

Task Characteristics

Closures scheduled for parallel execution must not:

  • accept or return by reference
  • accept or return internal objects (see notes)
  • execute a limited set of instructions

Instructions prohibited in Closures intended for parallel execution are:

  • yield
  • use by-reference
  • declare class
  • declare named function

注意:

Nested closures may yield or use by-reference, but must not contain class or named function declarations.

注意:

No instructions are prohibited in the files which the task may include.

Arguments Characteristics

Arguments must not:

  • contain references
  • contain resources
  • contain internal objects (see notes)

注意:

In the case of file stream resources, the resource will be cast to the file descriptor and passed as int where possible, this is unsupported on Windows.

Internal Objects Notes

Internal objects generally use a custom structure which cannot be copied by value safely, PHP currently lacks the mechanics to do this (without serialization) and so only objects that do not use a custom structure may be shared.

Some internal objects do not use a custom structure, for example parallel\Events\Event and so may be shared.

Closures are a special kind of internal object and support being copied by value, and so may be shared.

Channels are central to writing parallel code and support concurrent access and execution by necessity, and so may be shared.

警告

A user class that extends an internal class may use a custom structure as defined by the internal class, in which case they cannot be copied by value safely, and so may not be shared.

返回值

警告

The return parallel\Future must not be ignored when the task contains a return or throw statement.

Exceptions

警告

Shall throw parallel\Runtime\Error\Closed if parallel\Runtime was closed.

警告

Shall throw parallel\Runtime\Error\IllegalFunction if task is a closure created from an internal function.

警告

Shall throw parallel\Runtime\Error\IllegalInstruction if task contains illegal instructions.

警告

Shall throw parallel\Runtime\Error\IllegalParameter if task accepts or argv contains illegal variables.

警告

Shall throw parallel\Runtime\Error\IllegalReturn if task returns illegally.

add a noteadd a note

User Contributed Notes 3 notes

up
14
john_2885 at yahoo dot com
2 years ago
Here's a more substantial example of how to use the run functional API.

<?php
/*********************************************
* Sample parallel functional API
*
* Scenario
* -------------------------------------------
* Given a large number of rows of
* data to process, divide the work amongst
* a set of workers.  Each worker is responsible
* for finishing their assigned task.
*
* In the code below, assume we have arbitrary
* start and end IDs (rows) - we will try to
* divide the number of IDs (rows) evenly
* across 8 workers.  The workers will get the
* following batches to process to completion:
*
* Total number of IDs (rows): 1371129
* Each worker will get 171392 IDs to process
*
* Worker 1: IDs from 11001 to 182393
* Worker 2: IDs from 182393 to 353785
* Worker 3: IDs from 353785 to 525177
* Worker 4: IDs from 525177 to 696569
* Worker 5: IDs from 696569 to 867961
* Worker 6: IDs from 867961 to 1039353
* Worker 7: IDs from 1039353 to 1210745
* Worker 8: IDs from 1210745 to 1382130
*
* Each worker then processes 5000 rows at a time
* until they are done with their assigned work
*
*********************************************/

use \parallel\{Runtime, Future, Channel, Events};

$minId = 11001;
$maxId = 1382130;
$workers = 8;
$totalIds = $maxId - $minId;
// Try to divide IDs evenly across the number of workers
$batchSize = ceil($totalIds / $workers);
// The last batch gets whatever is left over
$lastBatch = $totalIds % $batchSize;
// The number of IDs (rows) to divide the overall
// task into sub-batches
$rowsToFetch = 5000;

print
"Total IDs: " . $totalIds . "\n";
print
"Batch Size: " . $batchSize . "\n";
print
"Last Batch: " . $lastBatch . "\n";

$producer = function(int $worker, int $startId, int $endId, int $fetchSize) {
   
$tempMinId = $startId;
   
$tempMaxId = $tempMinId + $fetchSize;
   
$fetchCount = 1;
   
    print
"Worker " . $worker . " working on IDs from " . $startId . " to " . $endId . "\n";
   
    while(
$tempMinId < $endId) {
        for(
$i = $tempMinId; $i < $tempMaxId; $i++) {
           
$usleep = rand(500000, 1000000);
           
usleep($usleep);
            print
"Worker " . $worker " finished batch " . $fetchCount . " from ID " . $tempMinId . " to " . $tempMaxId . "\n";
           
// Need to explicitly break out of the for loop once complete or else it will forever process only the first sub-batch
           
break;
        }
       
       
// Now we move on to the next sub-batch for this worker
       
$tempMinId = $tempMaxId;
       
$tempMaxId = $tempMinId + $fetchSize;
        if(
$tempMaxId > $endId) {
           
$tempMaxId = $endId;
        }
       
// Introduce some timing randomness
       
$sleep = rand(1,5);
       
sleep($sleep);
       
$fetchCount++;
    }
   
   
// This worker has completed their entire batch
   
print "Worker " . $worker " finished\n";
   
};

// Create our workers and have them start working on their task
// In this case, it's a set of 171392 IDs to process
for($i = 0; $i < $workers; $i++) {
   
$startId = $minId + ($i * $batchSize);
   
$endId = $startId + $batchSize;
    if(
$i == ($workers - 1)) {
       
$endId = $maxId;
    }
    \
parallel\run($producer, array(($i+1), $startId, $endId, $rowsToFetch));
}

?>
up
7
anonymous user
1 year ago
Although function declaration is not allowed inside thread exec code, include is allowed. So if we want to declare a function, we could write another file that contain the function and include it.
# main.php
<?php
$runtime
= new parallel\Runtime ();
$future = $runtime->run ( function () {
   
$future = $runtime->run ( function () {
        include
"included.php";
        return
add (1, 3);
    }, [ ] );
echo
$future->value ();
# output: 4
# included.php
<?php
function add($a, $b){
    return
$a + $b;
}
up
1
Thierry Kauffmann
7 months ago
<?php

/**
* Sample parralel functional API
* using a generator instead of a static list of items to process
*
* Items to process in parallel come from a generator
* It could be anything : e.g fetch a mysql array, a DirectoryIterator...
* Thus the number of items to process in parallel is NOT known in advance
*
* This algorithm attributes items to each parallel thread dynamically
* As soon as a thread has finished working
* It is assigned a new item to process
* until all items are processed (generator closes)
*
* In this example we process 50 items in 5 parallel threads
* It produces output in this form (output changes at each run) :
*
* ThreadId: 1 => Item: 1 (Start)
* ThreadId: 2 => Item: 2 (Start)
* ThreadId: 3 => Item: 3 (Start)
* ThreadId: 4 => Item: 4 (Start)
* ThreadId: 5 => Item: 5 (Start)
* ThreadId: 5 => Item: 5 Sleep: 3s (End)
* ThreadId: 5 => Item: 6 (Start)
* ThreadId: 3 => Item: 3 Sleep: 4s (End)
* ThreadId: 3 => Item: 7 (Start)
* ThreadId: 2 => Item: 2 Sleep: 6s (End)
* ThreadId: 2 => Item: 8 (Start)
* ...
* ThreadId: 4 => Item: 44 Sleep: 6s (End)
* ThreadId: 4 => Item: 49 (Start)
* ThreadId: 3 => Item: 46 Sleep: 5s (End)
* ThreadId: 3 => Item: 50 (Start)
* ThreadId: 2 => Item: 43 Sleep: 9s (End)
* Destroy ThreadId: 2
* ThreadId: 1 => Item: 47 Sleep: 5s (End)
* Destroy ThreadId: 1
* ThreadId: 4 => Item: 49 Sleep: 7s (End)
* Destroy ThreadId: 4
* ThreadId: 5 => Item: 48 Sleep: 10s (End)
* Destroy ThreadId: 5
* ThreadId: 3 => Item: 50 Sleep: 10s (End)
* Destroy ThreadId: 3
*/

use \parallel\{Runtime, Future, Channel, Events};

// Generate list of items to process with a generator
function generator(int $item_count) {
    for (
$i=1; $i <= $item_count; $i++) {
        yield
$i;
    }
}

function
testConcurrency(int $concurrency, int $item_count) {

   
$generator = generator($item_count);

   
// Function executing in each thread. Have a snap for a random time for example !
   
$producer = function (int $item_id) {
       
$seconds = rand(1, 10);
       
sleep($seconds);
        return [
'item_id' => $item_id, 'sleep_seconds' => $seconds];
    };

   
// Fill up threads with initial 'inactive' state
   
$threads = array_fill(1, $concurrency, ['is_active' => false]);

    while (
true) {
       
// Loop through threads until all threads are finished
       
foreach ($threads as $thread_id => $thread) {
            if (!
$thread['is_active'] and $generator->valid()) {
               
// Thread is inactive and generator still has values : run something in the thread
               
$item_id = $generator->current();
               
$threads[$thread_id]['run'] = \parallel\run($producer, [$item_id]);
                echo
"ThreadId: $thread_id => Item: $item_id (Start)\n";
               
$threads[$thread_id]['is_active'] = true;
               
$generator->next();
            } elseif (!isset(
$threads[$thread_id]['run'])) {
               
// Destroy supplementary threads in case generator closes sooner than number of threads
               
echo "Destroy ThreadId: $thread_id\n";
                unset(
$threads[$thread_id]);
            } elseif (
$threads[$thread_id]['run']->done()) {
               
// Thread finished. Get results
               
$item = $threads[$thread_id]['run']->value();
                echo
"ThreadId: $thread_id => Item: {$item['item_id']} Sleep: {$item['sleep_seconds']}s (End)\n";

                if (!
$generator->valid()) {
                   
// Generator is closed then destroy thread
                   
echo "Destroy ThreadId: $thread_id\n";
                    unset(
$threads[$thread_id]);
                } else {
                   
// Thread is ready to run again
                   
$threads[$thread_id]['is_active'] = false;
                }
            }
        }

       
// Escape loop when all threads are destroyed
       
if (empty($threads)) break;
    }
}

$concurrency = 5;
$item_count = 50;

testConcurrency($concurrency, $item_count);

?>

官方地址:https://www.php.net/manual/en/parallel.run.php

北京半月雨文化科技有限公司.版权所有 京ICP备12026184号-3