ThreadPool процессов CLI


Мне нужно передавать сообщения в CLI PHP-процессы через stdin из Java. Я бы хотел, чтобы в пуле работало около 20 PHP-процессов, так что, когда я передаю сообщение в пул, он отправляет каждое сообщение в отдельный поток, сохраняя очередь сообщений для доставки. Я бы хотел, чтобы эти PHP-процессы оставались живыми как можно дольше, вызывая новый, если кто-то умирает. Я рассматривал это со статическим пулом потоков, но он, кажется, больше предназначен для задач, которые выполняются и просто умирают. Как я мог это сделать? это, с простым интерфейсом, чтобы передать сообщение в пул? Должен ли я реализовать свой собственный пользовательский "пул потоков"?

2 5

2 ответа:

Я предоставляю некоторый код с этим, поскольку я думаю, что это сделает вещи более ясными. В основном вам нужно держать пул объектов процесса вокруг. Имейте в виду, что каждый из этих процессов имеет вход, выход и поток ошибок, которыми вы должны управлять тем или иным образом. В моем примере я просто перенаправляю ошибку и вывод на консоль основных процессов. Вы можете настроить обратные вызовы и обработчики для получения выходных данных программы PHP, если это необходимо. Если вы просто обрабатываете задачи и вам все равно, что говорит PHP, тогда оставьте все как есть или перенаправьте в файл.

Я использую библиотеку Apache Commons Pool для ObjectPool. Нет необходимости изобретать его заново.

У вас будет пул из 20 процессов, которые запускают вашу программу PHP. Это само по себе не даст вам того, что вам нужно. Возможно, вы захотите обрабатывать задачи против всех 20 этих процессов "одновременно.- Значит, Вам также понадобится ThreadPool, который будет вытягивать процесс из вашего ObjectPool.

Вам также нужно понять, что если вы убьете, или CTRL-C ваш процесс Java процесс init возьмет на себя ваши процессы php, и они будут просто сидеть там. Вы, вероятно, захотите сохранить журнал всех pid процессов PHP, которые вы порождаете, а затем очистить их, если вы повторно запустите свою программу Java.

public class StackOverflow_10037379 {

    private static Logger sLogger = Logger.getLogger(StackOverflow_10037379.class.getName());

    public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> {

        private String mProcessToRun;

        public CLIPoolableObjectFactory(String processToRun) {
            mProcessToRun = processToRun;
        }

        @Override
        public Process makeObject() throws Exception {
            ProcessBuilder builder = new ProcessBuilder();
            builder.redirectError(Redirect.INHERIT);
            // I am being lazy, but really the InputStream is where
            // you can get any output of the PHP Process. This setting
            // will make it output to the current processes console.
            builder.redirectOutput(Redirect.INHERIT);
            builder.redirectInput(Redirect.PIPE);
            builder.command(mProcessToRun);
            return builder.start();
        }

        @Override
        public boolean validateObject(Process process) {
            try {
                process.exitValue();
                return false;
            } catch (IllegalThreadStateException ex) {
                return true;
            }
        }

        @Override
        public void destroyObject(Process process) throws Exception {
            // If PHP has a way to stop it, do that instead of destroy
            process.destroy();
        }

        @Override
        public void passivateObject(Process process) throws Exception {
            // Should really try to read from the InputStream of the Process
            // to prevent lock-ups if Rediret.INHERIT is not used.
        }
    }

    public static class CLIWorkItem implements Runnable {

        private ObjectPool<Process> mPool;
        private String mWork;

        public CLIWorkItem(ObjectPool<Process> pool, String work) {
            mPool = pool;
            mWork = work;
        }

        @Override
        public void run() {
            Process workProcess = null;
            try {
                workProcess = mPool.borrowObject();
                OutputStream os = workProcess.getOutputStream();
                os.write(mWork.getBytes(Charset.forName("UTF-8")));
                os.flush();
                // Because of the INHERIT rule with the output stream
                // the console stream overwrites itself. REMOVE THIS in production.
                Thread.sleep(100);
            } catch (Exception ex) {
                sLogger.log(Level.SEVERE, null, ex);
            } finally {
                if (workProcess != null) {
                    try {
                        // Seriously.. so many exceptions.
                        mPool.returnObject(workProcess);
                    } catch (Exception ex) {
                        sLogger.log(Level.SEVERE, null, ex);
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {

        // Change the 5 to 20 in your case. 
        // Also change mock_php.exe to /usr/bin/php or wherever.
        ObjectPool<Process> pool =
                new GenericObjectPool<>(
                new CLIPoolableObjectFactory("mock_php.exe"), 5);         

        // This will only allow you to queue 100 work items at a time. I would suspect
        // that if you only want 20 PHP processes running at a time and this queue
        // filled up you'll need to implement some other strategy as you are doing
        // more work than PHP can keep up with. You'll need to block at some point
        // or throw work away.
        BlockingQueue<Runnable> queue = 
            new ArrayBlockingQueue<>(100, true);

        ThreadPoolExecutor executor = 
            new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue);

        // print some stuff out.
        executor.execute(new CLIWorkItem(pool, "Message 1\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 2\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 3\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 4\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 5\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 6\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 7\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 8\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 9\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 10\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 11\r\n"));

        executor.shutdown();
        executor.awaitTermination(4000, TimeUnit.HOURS);

        pool.close();        
    }
}

Вывод выполнения программы:

12172 - Message 2
10568 - Message 1
4804 - Message 3
11916 - Message 4
11116 - Message 5
12172 - Message 6
4804 - Message 7
10568 - Message 8
11916 - Message 9
11116 - Message 10
12172 - Message 11

Код программы C++, чтобы просто вывести то, что было введено:

#include <windows.h>
#include <iostream>
#include <string>

int main(int argc, char* argv[])
{
    DWORD pid = GetCurrentProcessId();
    std::string line;
    while (true) {      
        std::getline (std::cin, line);
        std::cout << pid << " - " << line << std::endl;
    }

    return 0;
}

Обновить

Извините за задержку. Вот версия JDK 6 для тех, кто заинтересован. Вы должны будете запустить a отдельный поток для чтения всех входных данных из входного потока процесса. Я настроил этот код, чтобы порождать новый поток вдоль каждого нового процесса. Этот поток всегда считывается из процесса, пока он жив. Вместо того, чтобы выводить непосредственно в файл, я настроил его таким образом, что он использует структуру ведения журнала. Таким образом, вы можете настроить конфигурацию ведения журнала, чтобы перейти к файлу, перевернуться, перейти к консоли и т. д. без того, чтобы он был жестко закодирован, чтобы перейти к файлу.

Вы заметите, что я начинаю только один Gobbler для каждого процесса, даже если процесс имеет stdout и stderr. Я перенаправляю stderr в stdout, просто чтобы сделать вещи проще. Очевидно, jdk6 поддерживает только этот тип перенаправления.

public class StackOverflow_10037379_jdk6 {

    private static Logger sLogger = Logger.getLogger(StackOverflow_10037379_jdk6.class.getName());

    // Shamelessy taken from Google and modified. 
    // I don't know who the original Author is.
    public static class StreamGobbler extends Thread {

        InputStream is;
        Logger logger;
        Level level;

        StreamGobbler(String logName, Level level, InputStream is) {
            this.is = is;
            this.logger = Logger.getLogger(logName);
            this.level = level;
        }

        public void run() {
            try {
                InputStreamReader isr = new InputStreamReader(is);
                BufferedReader br = new BufferedReader(isr);
                String line = null;
                while ((line = br.readLine()) != null) {
                    logger.log(level, line);
                }
            } catch (IOException ex) {
                logger.log(Level.SEVERE, "Failed to read from Process.", ex);
            }
            logger.log(
                    Level.INFO, 
                    String.format("Exiting Gobbler for %s.", logger.getName()));
        }
    }

    public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> {

        private String mProcessToRun;

        public CLIPoolableObjectFactory(String processToRun) {
            mProcessToRun = processToRun;
        }

        @Override
        public Process makeObject() throws Exception {
            ProcessBuilder builder = new ProcessBuilder();
            builder.redirectErrorStream(true);
            builder.command(mProcessToRun);
            Process process = builder.start();
            StreamGobbler loggingGobbler =
                    new StreamGobbler(
                    String.format("process.%s", process.hashCode()),
                    Level.INFO,
                    process.getInputStream());
            loggingGobbler.start();
            return process;
        }

        @Override
        public boolean validateObject(Process process) {
            try {
                process.exitValue();
                return false;
            } catch (IllegalThreadStateException ex) {
                return true;
            }
        }

        @Override
        public void destroyObject(Process process) throws Exception {
            // If PHP has a way to stop it, do that instead of destroy
            process.destroy();
        }

        @Override
        public void passivateObject(Process process) throws Exception {
            // Should really try to read from the InputStream of the Process
            // to prevent lock-ups if Rediret.INHERIT is not used.
        }
    }

    public static class CLIWorkItem implements Runnable {

        private ObjectPool<Process> mPool;
        private String mWork;

        public CLIWorkItem(ObjectPool<Process> pool, String work) {
            mPool = pool;
            mWork = work;
        }

        @Override
        public void run() {
            Process workProcess = null;
            try {
                workProcess = mPool.borrowObject();
                OutputStream os = workProcess.getOutputStream();
                os.write(mWork.getBytes(Charset.forName("UTF-8")));
                os.flush();
                // Because of the INHERIT rule with the output stream
                // the console stream overwrites itself. REMOVE THIS in production.
                Thread.sleep(100);
            } catch (Exception ex) {
                sLogger.log(Level.SEVERE, null, ex);
            } finally {
                if (workProcess != null) {
                    try {
                        // Seriously.. so many exceptions.
                        mPool.returnObject(workProcess);
                    } catch (Exception ex) {
                        sLogger.log(Level.SEVERE, null, ex);
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {

        // Change the 5 to 20 in your case. 
        ObjectPool<Process> pool =
                new GenericObjectPool<Process>(
                new CLIPoolableObjectFactory("mock_php.exe"), 5);

        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100, true);

        ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue);

        // print some stuff out.
        executor.execute(new CLIWorkItem(pool, "Message 1\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 2\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 3\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 4\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 5\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 6\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 7\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 8\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 9\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 10\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 11\r\n"));

        executor.shutdown();
        executor.awaitTermination(4000, TimeUnit.HOURS);

        pool.close();
    }
}

вывод

Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 9440 - Message 3
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8776 - Message 2
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 1
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 10096 - Message 4
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8868 - Message 5
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8868 - Message 8
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 10
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8776 - Message 9
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 10096 - Message 6
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 9440 - Message 7
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 11
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.295131993.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.756434719.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.332711452.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.1981440623.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.1043636732.

Здесь лучше всего использовать функции pcntl для развилки процесса, но связь между процессами затруднена. Я бы рекомендовал создать очередь, из которой ваши процессы могут читать, а не пытаться передавать сообщения в командную строку.

Beanstalk имеет несколько PHP-клиентов, которые можно использовать для обработки сообщений между процессами.