00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #ifndef _FILE_OFFSET_BITS
00025 #define _FILE_OFFSET_BITS 64
00026 #endif
00027
00028 #include "stx-execpipe.h"
00029
00030 #include <stdexcept>
00031 #include <sstream>
00032 #include <iostream>
00033
00034 #include <assert.h>
00035 #include <stdlib.h>
00036 #include <string.h>
00037 #include <unistd.h>
00038 #include <errno.h>
00039 #include <fcntl.h>
00040 #include <sys/types.h>
00041 #include <sys/wait.h>
00042 #include <sys/select.h>
00043
00044 #define LOG_OUTPUT(msg, level) \
00045 do { \
00046 if (m_debug_level >= level) { \
00047 std::ostringstream oss; \
00048 oss << msg; \
00049 if (m_debug_output) \
00050 m_debug_output(oss.str().c_str()); \
00051 else \
00052 std::cout << oss.str() << std::endl; \
00053 } \
00054 } while (0)
00055
00056 #define LOG_ERROR(msg) LOG_OUTPUT(msg, ExecPipe::DL_ERROR)
00057 #define LOG_INFO(msg) LOG_OUTPUT(msg, ExecPipe::DL_INFO)
00058 #define LOG_DEBUG(msg) LOG_OUTPUT(msg, ExecPipe::DL_DEBUG)
00059 #define LOG_TRACE(msg) LOG_OUTPUT(msg, ExecPipe::DL_TRACE)
00060
00061 namespace stx {
00062
00063 #ifndef _STX_RINGBUFFER_H_
00064 #define _STX_RINGBUFFER_H_
00065
00067 namespace {
00068
00103 class RingBuffer
00104 {
00105 private:
00107 char* m_data;
00108
00110 unsigned int m_buffsize;
00111
00113 unsigned int m_size;
00114
00116 unsigned int m_bottom;
00117
00118 public:
00120 inline RingBuffer()
00121 : m_data(NULL),
00122 m_buffsize(0), m_size(0), m_bottom(0)
00123 {
00124 }
00125
00127 inline ~RingBuffer()
00128 {
00129 if (m_data) free(m_data);
00130 }
00131
00133 inline unsigned int size() const
00134 {
00135 return m_size;
00136 }
00137
00139 inline unsigned int buffsize() const
00140 {
00141 return m_buffsize;
00142 }
00143
00145 inline void clear()
00146 {
00147 m_size = m_bottom = 0;
00148 }
00149
00155 inline char* bottom() const
00156 {
00157 return m_data + m_bottom;
00158 }
00159
00161 inline unsigned int bottomsize() const
00162 {
00163 return (m_bottom + m_size > m_buffsize)
00164 ? (m_buffsize - m_bottom)
00165 : (m_size);
00166 }
00167
00172 inline void advance(unsigned int n)
00173 {
00174 assert(m_size >= n);
00175 m_bottom += n;
00176 m_size -= n;
00177 if (m_bottom >= m_buffsize) m_bottom -= m_buffsize;
00178 }
00179
00184 void write(const void *src, unsigned int len)
00185 {
00186 if (len == 0) return;
00187
00188 if (m_buffsize < m_size + len)
00189 {
00190
00191
00192
00193 unsigned int newbuffsize = m_buffsize;
00194 while (newbuffsize < m_size + len)
00195 {
00196 if (newbuffsize == 0) newbuffsize = 1024;
00197 else newbuffsize = newbuffsize * 2;
00198 }
00199
00200 m_data = static_cast<char*>(realloc(m_data, newbuffsize));
00201
00202 if (m_bottom + m_size > m_buffsize)
00203 {
00204
00205
00206
00207 unsigned int taillen = m_buffsize - m_bottom;
00208
00209 memcpy(m_data + newbuffsize - taillen,
00210 m_data + m_bottom, taillen);
00211
00212 m_bottom = newbuffsize - taillen;
00213 }
00214
00215 m_buffsize = newbuffsize;
00216 }
00217
00218
00219
00220
00221 if (m_bottom + m_size > m_buffsize)
00222 {
00223 memcpy(m_data + m_bottom + m_size - m_buffsize, src, len);
00224 m_size += len;
00225 }
00226 else
00227 {
00228
00229 unsigned int tailfit = m_buffsize - (m_bottom + m_size);
00230
00231 if (tailfit >= len)
00232 {
00233 memcpy(m_data + m_bottom + m_size, src, len);
00234 m_size += len;
00235 }
00236 else
00237 {
00238
00239 memcpy(m_data + m_bottom + m_size, src, tailfit);
00240 memcpy(m_data, reinterpret_cast<const char*>(src) + tailfit,
00241 len - tailfit);
00242 m_size += len;
00243 }
00244 }
00245 }
00246 };
00247
00248 }
00249
00250 #endif // _STX_RINGBUFFER_H_
00251
00258 class ExecPipeImpl
00259 {
00260 private:
00261
00263 unsigned int m_refs;
00264
00265 private:
00266
00267
00268
00270 enum ExecPipe::DebugLevel m_debug_level;
00271
00273 void (*m_debug_output)(const char* line);
00274
00275 public:
00276
00278 void set_debug_level(enum ExecPipe::DebugLevel dl)
00279 {
00280 m_debug_level = dl;
00281 }
00282
00285 void set_debug_output(void (*output)(const char *line))
00286 {
00287 m_debug_output = output;
00288 }
00289
00290 private:
00291
00293 enum StreamType
00294 {
00295 ST_NONE = 0,
00296 ST_FD,
00297 ST_FILE,
00298 ST_STRING,
00299 ST_OBJECT
00300 };
00301
00303 StreamType m_input;
00304
00305
00306
00309 int m_input_fd;
00310
00312 const char* m_input_file;
00313
00316 const std::string* m_input_string;
00317
00319 std::string::size_type m_input_string_pos;
00320
00322 PipeSource* m_input_source;
00323
00325 RingBuffer m_input_rbuffer;
00326
00327
00328
00330 StreamType m_output;
00331
00334 int m_output_fd;
00335
00337 const char* m_output_file;
00338
00340 int m_output_file_mode;
00341
00344 std::string* m_output_string;
00345
00347 PipeSink* m_output_sink;
00348
00349
00350
00355 struct Stage
00356 {
00358 std::vector<std::string> args;
00359
00361 const char* prog;
00362
00364 const std::vector<std::string>* argsp;
00365
00367 const std::vector<std::string>* envp;
00368
00370 PipeFunction* func;
00371
00373 RingBuffer outbuffer;
00374
00375
00376
00378 bool withpath;
00379
00381 pid_t pid;
00382
00384 int retstatus;
00385
00387 int stdin_fd;
00388
00390 int stdout_fd;
00391
00393 Stage()
00394 : prog(NULL), argsp(NULL), envp(NULL), func(NULL),
00395 withpath(false), pid(0), retstatus(0),
00396 stdin_fd(-1), stdout_fd(-1)
00397 {
00398 }
00399 };
00400
00402 typedef std::vector<Stage> stagelist_type;
00403
00405 stagelist_type m_stages;
00406
00408 char m_buffer[4096];
00409
00410 public:
00411
00413 ExecPipeImpl()
00414 : m_refs(0),
00415 m_debug_level(ExecPipe::DL_ERROR),
00416 m_debug_output(NULL),
00417 m_input(ST_NONE),
00418 m_input_fd(-1),
00419 m_output(ST_NONE),
00420 m_output_fd(-1)
00421 {
00422 }
00423
00425 unsigned int& refs()
00426 {
00427 return m_refs;
00428 }
00429
00430
00431
00433
00438 void set_input_fd(int fd)
00439 {
00440 assert(m_input == ST_NONE);
00441 if (m_input != ST_NONE) return;
00442
00443 m_input = ST_FD;
00444 m_input_fd = fd;
00445 }
00446
00451 void set_input_file(const char* path)
00452 {
00453 assert(m_input == ST_NONE);
00454 if (m_input != ST_NONE) return;
00455
00456 m_input = ST_FILE;
00457 m_input_file = path;
00458 }
00459
00465 void set_input_string(const std::string* input)
00466 {
00467 assert(m_input == ST_NONE);
00468 if (m_input != ST_NONE) return;
00469
00470 m_input = ST_STRING;
00471 m_input_string = input;
00472 m_input_string_pos = 0;
00473 }
00474
00480 void set_input_source(PipeSource* source)
00481 {
00482 assert(m_input == ST_NONE);
00483 if (m_input != ST_NONE) return;
00484
00485 m_input = ST_OBJECT;
00486 m_input_source = source;
00487 source->m_impl = this;
00488 }
00489
00491
00496 void input_source_write(const void* data, unsigned int datalen)
00497 {
00498 m_input_rbuffer.write(data, datalen);
00499 }
00500
00501
00502
00504
00509 void set_output_fd(int fd)
00510 {
00511 assert(m_output == ST_NONE);
00512 if (m_output != ST_NONE) return;
00513
00514 m_output = ST_FD;
00515 m_output_fd = fd;
00516 }
00517
00522 void set_output_file(const char* path, int mode = 0666)
00523 {
00524 assert(m_output == ST_NONE);
00525 if (m_output != ST_NONE) return;
00526
00527 m_output = ST_FILE;
00528 m_output_file = path;
00529 m_output_file_mode = mode;
00530 }
00531
00537 void set_output_string(std::string* output)
00538 {
00539 assert(m_output == ST_NONE);
00540 if (m_output != ST_NONE) return;
00541
00542 m_output = ST_STRING;
00543 m_output_string = output;
00544 }
00545
00550 void set_output_sink(PipeSink* sink)
00551 {
00552 assert(m_output == ST_NONE);
00553 if (m_output != ST_NONE) return;
00554
00555 m_output = ST_OBJECT;
00556 m_output_sink = sink;
00557 }
00558
00560
00561
00562
00564
00568 unsigned int size() const
00569 {
00570 return m_stages.size();
00571 }
00572
00577 void add_exec(const char* prog)
00578 {
00579 struct Stage newstage;
00580 newstage.prog = prog;
00581 newstage.args.push_back(prog);
00582 m_stages.push_back(newstage);
00583 }
00584
00589 void add_exec(const char* prog, const char* arg1)
00590 {
00591 struct Stage newstage;
00592 newstage.prog = prog;
00593 newstage.args.push_back(prog);
00594 newstage.args.push_back(arg1);
00595 m_stages.push_back(newstage);
00596 }
00597
00602 void add_exec(const char* prog, const char* arg1, const char* arg2)
00603 {
00604 struct Stage newstage;
00605 newstage.prog = prog;
00606 newstage.args.push_back(prog);
00607 newstage.args.push_back(arg1);
00608 newstage.args.push_back(arg2);
00609 m_stages.push_back(newstage);
00610 }
00611
00616 void add_exec(const char* prog, const char* arg1, const char* arg2, const char* arg3)
00617 {
00618 struct Stage newstage;
00619 newstage.prog = prog;
00620 newstage.args.push_back(prog);
00621 newstage.args.push_back(arg1);
00622 newstage.args.push_back(arg2);
00623 newstage.args.push_back(arg3);
00624 m_stages.push_back(newstage);
00625 }
00626
00632 void add_exec(const std::vector<std::string>* args)
00633 {
00634 assert(args->size() > 0);
00635 if (args->size() == 0) return;
00636
00637 struct Stage newstage;
00638 newstage.prog = (*args)[0].c_str();
00639 newstage.argsp = args;
00640 m_stages.push_back(newstage);
00641 }
00642
00648 void add_execp(const char* prog)
00649 {
00650 struct Stage newstage;
00651 newstage.prog = prog;
00652 newstage.args.push_back(prog);
00653 newstage.withpath = true;
00654 m_stages.push_back(newstage);
00655 }
00656
00662 void add_execp(const char* prog, const char* arg1)
00663 {
00664 struct Stage newstage;
00665 newstage.prog = prog;
00666 newstage.args.push_back(prog);
00667 newstage.args.push_back(arg1);
00668 newstage.withpath = true;
00669 m_stages.push_back(newstage);
00670 }
00671
00677 void add_execp(const char* prog, const char* arg1, const char* arg2)
00678 {
00679 struct Stage newstage;
00680 newstage.prog = prog;
00681 newstage.args.push_back(prog);
00682 newstage.args.push_back(arg1);
00683 newstage.args.push_back(arg2);
00684 newstage.withpath = true;
00685 m_stages.push_back(newstage);
00686 }
00687
00693 void add_execp(const char* prog, const char* arg1, const char* arg2, const char* arg3)
00694 {
00695 struct Stage newstage;
00696 newstage.prog = prog;
00697 newstage.args.push_back(prog);
00698 newstage.args.push_back(arg1);
00699 newstage.args.push_back(arg2);
00700 newstage.args.push_back(arg3);
00701 newstage.withpath = true;
00702 m_stages.push_back(newstage);
00703 }
00704
00711 void add_execp(const std::vector<std::string>* args)
00712 {
00713 assert(args->size() > 0);
00714 if (args->size() == 0) return;
00715
00716 struct Stage newstage;
00717 newstage.prog = (*args)[0].c_str();
00718 newstage.argsp = args;
00719 newstage.withpath = true;
00720 m_stages.push_back(newstage);
00721 }
00722
00731 void add_exece(const char* path,
00732 const std::vector<std::string>* argsp,
00733 const std::vector<std::string>* envp)
00734 {
00735 assert(path && argsp);
00736 assert(argsp->size() > 0);
00737 if (argsp->size() == 0) return;
00738
00739 struct Stage newstage;
00740 newstage.prog = path;
00741 newstage.argsp = argsp;
00742 newstage.envp = envp;
00743 m_stages.push_back(newstage);
00744 }
00745
00751 void add_function(PipeFunction* func)
00752 {
00753 assert(func);
00754 if (!func) return;
00755
00756 func->m_impl = this;
00757 func->m_stageid = m_stages.size();
00758
00759 struct Stage newstage;
00760 newstage.func = func;
00761 m_stages.push_back(newstage);
00762 }
00763
00765
00770 void stage_function_write(unsigned int st, const void* data, unsigned int datalen)
00771 {
00772 assert(st < m_stages.size());
00773
00774 return m_stages[st].outbuffer.write(data, datalen);
00775 }
00776
00777
00778
00786 void run();
00787
00788
00789
00791
00796 int get_return_status(unsigned int stageid) const
00797 {
00798 assert(stageid < m_stages.size());
00799 assert(!m_stages[stageid].func);
00800
00801 return m_stages[stageid].retstatus;
00802 }
00803
00808 int get_return_code(unsigned int stageid) const
00809 {
00810 assert(stageid < m_stages.size());
00811 assert(!m_stages[stageid].func);
00812
00813 if (WIFEXITED(m_stages[stageid].retstatus))
00814 return WEXITSTATUS(m_stages[stageid].retstatus);
00815 else
00816 return -1;
00817 }
00818
00823 int get_return_signal(unsigned int stageid) const
00824 {
00825 assert(stageid < m_stages.size());
00826 assert(!m_stages[stageid].func);
00827
00828 if (WIFSIGNALED(m_stages[stageid].retstatus))
00829 return WTERMSIG(m_stages[stageid].retstatus);
00830 else
00831 return -1;
00832 }
00833
00837 bool all_return_codes_zero() const
00838 {
00839 for (unsigned int i = 0; i < m_stages.size(); ++i)
00840 {
00841 if (m_stages[i].func) continue;
00842
00843 if (get_return_code(i) != 0)
00844 return false;
00845 }
00846
00847 return true;
00848 }
00849
00851
00852 protected:
00853
00854
00855
00858 void exec_stage(const Stage& stage);
00859
00861 void print_exec(const std::vector<std::string>& args);
00862
00864 void sclose(int fd);
00865 };
00866
00867
00868
00869 void ExecPipeImpl::print_exec(const std::vector<std::string>& args)
00870 {
00871 std::ostringstream oss;
00872 oss << "Exec()";
00873 for (unsigned ai = 0; ai < args.size(); ++ai)
00874 {
00875 oss << " " << args[ai];
00876 }
00877 LOG_INFO(oss.str());
00878 }
00879
00880 void ExecPipeImpl::exec_stage(const Stage& stage)
00881 {
00882
00883 const std::vector<std::string>& args = stage.argsp ? *stage.argsp : stage.args;
00884
00885
00886
00887 const char* cargs[args.size()+1];
00888
00889 for (unsigned ai = 0; ai < args.size(); ++ai)
00890 {
00891 cargs[ai] = args[ai].c_str();
00892 }
00893 cargs[ args.size() ] = NULL;
00894
00895 if (!stage.envp)
00896 {
00897 if (stage.withpath)
00898 execvp(stage.prog, (char* const*)cargs);
00899 else
00900 execv(stage.prog, (char* const*)cargs);
00901 }
00902 else
00903 {
00904
00905
00906 const char* cenv[args.size()+1];
00907
00908 for (unsigned ei = 0; ei < stage.envp->size(); ++ei)
00909 {
00910 cenv[ei] = (*stage.envp)[ei].c_str();
00911 }
00912 cenv[ stage.envp->size() ] = NULL;
00913
00914 execve(stage.prog, (char* const*)cargs, (char* const*)cenv);
00915 }
00916
00917 LOG_ERROR("Error executing child process: " << strerror(errno));
00918 }
00919
00920 void ExecPipeImpl::sclose(int fd)
00921 {
00922 int r = close(fd);
00923
00924 if (r != 0) {
00925 LOG_ERROR("Could not correctly close fd: " << strerror(errno));
00926 }
00927 }
00928
00929
00930
00931 void ExecPipeImpl::run()
00932 {
00933 if (m_stages.size() == 0)
00934 throw(std::runtime_error("No stages to in exec pipe."));
00935
00936
00937
00938
00939 switch(m_input)
00940 {
00941 case ST_NONE:
00942
00943 m_stages[0].stdin_fd = -1;
00944 break;
00945
00946 case ST_STRING:
00947 case ST_OBJECT: {
00948
00949 int pipefd[2];
00950
00951 if (pipe(pipefd) != 0)
00952 throw(std::runtime_error(std::string("Could not create an input pipe: ") + strerror(errno)));
00953
00954 if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) != 0)
00955 throw(std::runtime_error(std::string("Could not set non-block mode on input pipe: ") + strerror(errno)));
00956
00957 m_input_fd = pipefd[1];
00958 m_stages[0].stdin_fd = pipefd[0];
00959 break;
00960 }
00961 case ST_FILE: {
00962
00963
00964 int infd = open(m_input_file, O_RDONLY);
00965 if (infd < 0)
00966 throw(std::runtime_error(std::string("Could not open input file: ") + strerror(errno)));
00967
00968 m_stages[0].stdin_fd = infd;
00969 break;
00970 }
00971 case ST_FD:
00972
00973 m_stages[0].stdin_fd = m_input_fd;
00974 m_input_fd = -1;
00975 break;
00976 }
00977
00978
00979 for (unsigned int i = 0; i < m_stages.size() - 1; ++i)
00980 {
00981 int pipefd[2];
00982
00983 if (pipe(pipefd) != 0)
00984 throw(std::runtime_error(std::string("Could not create a stage pipe: ") + strerror(errno)));
00985
00986 m_stages[i].stdout_fd = pipefd[1];
00987 m_stages[i+1].stdin_fd = pipefd[0];
00988
00989 if (m_stages[i].func)
00990 {
00991 if (fcntl(m_stages[i].stdout_fd, F_SETFL, O_NONBLOCK) != 0)
00992 throw(std::runtime_error(std::string("Could not set non-block mode on a stage pipe: ") + strerror(errno)));
00993 }
00994 if (m_stages[i+1].func)
00995 {
00996 if (fcntl(m_stages[i+1].stdin_fd, F_SETFL, O_NONBLOCK) != 0)
00997 throw(std::runtime_error(std::string("Could not set non-block mode on a stage pipe: ") + strerror(errno)));
00998 }
00999 }
01000
01001
01002 switch(m_output)
01003 {
01004 case ST_NONE:
01005
01006 m_stages.back().stdout_fd = -1;
01007 break;
01008
01009 case ST_STRING:
01010 case ST_OBJECT: {
01011
01012 int pipefd[2];
01013
01014 if (pipe(pipefd) != 0)
01015 throw(std::runtime_error(std::string("Could not create an output pipe: ") + strerror(errno)));
01016
01017 if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) != 0)
01018 throw(std::runtime_error(std::string("Could not set non-block mode on output pipe: ") + strerror(errno)));
01019
01020 m_stages.back().stdout_fd = pipefd[1];
01021 m_output_fd = pipefd[0];
01022 break;
01023 }
01024 case ST_FILE: {
01025
01026
01027 int outfd = open(m_output_file, O_WRONLY | O_CREAT | O_TRUNC, m_output_file_mode);
01028 if (outfd < 0)
01029 throw(std::runtime_error(std::string("Could not open output file: ") + strerror(errno)));
01030
01031 m_stages.back().stdout_fd = outfd;
01032 break;
01033 }
01034 case ST_FD:
01035
01036 m_stages.back().stdout_fd = m_output_fd;
01037 m_output_fd = -1;
01038 break;
01039 }
01040
01041
01042
01043 for (unsigned int i = 0; i < m_stages.size(); ++i)
01044 {
01045 if (m_stages[i].func) continue;
01046
01047 print_exec(m_stages[i].args);
01048
01049 pid_t child = fork();
01050 if (child == 0)
01051 {
01052
01053
01054
01055 if (m_input_fd >= 0)
01056 sclose(m_input_fd);
01057
01058 for (unsigned int j = 0; j < m_stages.size(); ++j)
01059 {
01060 if (i == j)
01061 {
01062
01063
01064 if (m_stages[i].stdin_fd >= 0)
01065 {
01066 if (dup2(m_stages[i].stdin_fd, STDIN_FILENO) == -1) {
01067 LOG_ERROR("Could not redirect file descriptor: " << strerror(errno));
01068 exit(255);
01069 }
01070 }
01071
01072 if (m_stages[i].stdout_fd >= 0)
01073 {
01074 if (dup2(m_stages[i].stdout_fd, STDOUT_FILENO) == -1) {
01075 LOG_ERROR("Could not redirect file descriptor: " << strerror(errno));
01076 exit(255);
01077 }
01078 }
01079 }
01080 else
01081 {
01082
01083
01084 if (m_stages[j].stdin_fd >= 0)
01085 sclose(m_stages[j].stdin_fd);
01086
01087 if (m_stages[j].stdout_fd >= 0)
01088 sclose(m_stages[j].stdout_fd);
01089 }
01090 }
01091
01092 if (m_output_fd >= 0)
01093 sclose(m_output_fd);
01094
01095
01096 exec_stage(m_stages[i]);
01097
01098 exit(255);
01099 }
01100
01101 m_stages[i].pid = child;
01102 }
01103
01104
01105
01106 for (stagelist_type::const_iterator st = m_stages.begin();
01107 st != m_stages.end(); ++st)
01108 {
01109 if (st->func) continue;
01110
01111 if (st->stdin_fd >= 0)
01112 sclose(st->stdin_fd);
01113
01114 if (st->stdout_fd >= 0)
01115 sclose(st->stdout_fd);
01116 }
01117
01118
01119
01120 while(1)
01121 {
01122
01123
01124 int max_fds = -1;
01125 fd_set read_fds, write_fds;
01126
01127 FD_ZERO(&read_fds);
01128 FD_ZERO(&write_fds);
01129
01130 if (m_input_fd >= 0)
01131 {
01132 if (m_input == ST_OBJECT)
01133 {
01134 assert(m_input_source);
01135
01136 if (!m_input_rbuffer.size() && !m_input_source->poll() && !m_input_rbuffer.size())
01137 {
01138 sclose(m_input_fd);
01139 m_input_fd = -1;
01140
01141 LOG_INFO("Closing input file descriptor: " << strerror(errno));
01142 }
01143 else
01144 {
01145 FD_SET(m_input_fd, &write_fds);
01146 if (max_fds < m_input_fd) max_fds = m_input_fd;
01147
01148 LOG_DEBUG("Select on input file descriptor");
01149 }
01150 }
01151 else
01152 {
01153 FD_SET(m_input_fd, &write_fds);
01154 if (max_fds < m_input_fd) max_fds = m_input_fd;
01155
01156 LOG_DEBUG("Select on input file descriptor");
01157 }
01158 }
01159
01160 for (unsigned int i = 0; i < m_stages.size(); ++i)
01161 {
01162 if (!m_stages[i].func) continue;
01163
01164 if (m_stages[i].stdin_fd >= 0)
01165 {
01166 FD_SET(m_stages[i].stdin_fd, &read_fds);
01167 if (max_fds < m_stages[i].stdin_fd) max_fds = m_stages[i].stdin_fd;
01168
01169 LOG_DEBUG("Select on stage input file descriptor");
01170 }
01171
01172 if (m_stages[i].stdout_fd >= 0)
01173 {
01174 if (m_stages[i].outbuffer.size())
01175 {
01176 FD_SET(m_stages[i].stdout_fd, &write_fds);
01177 if (max_fds < m_stages[i].stdout_fd) max_fds = m_stages[i].stdout_fd;
01178
01179 LOG_DEBUG("Select on stage output file descriptor");
01180 }
01181 else if (m_stages[i].stdin_fd < 0 && !m_stages[i].outbuffer.size())
01182 {
01183 sclose(m_stages[i].stdout_fd);
01184 m_stages[i].stdout_fd = -1;
01185
01186 LOG_INFO("Close stage output file descriptor");
01187 }
01188 }
01189 }
01190
01191 if (m_output_fd >= 0)
01192 {
01193 FD_SET(m_output_fd, &read_fds);
01194 if (max_fds < m_output_fd) max_fds = m_output_fd;
01195
01196 LOG_DEBUG("Select on output file descriptor");
01197 }
01198
01199
01200
01201 if (max_fds < 0)
01202 break;
01203
01204 int retval = select(max_fds+1, &read_fds, &write_fds, NULL, NULL);
01205 if (retval < 0)
01206 throw(std::runtime_error(std::string("Error during select() on file descriptors: ") + strerror(errno)));
01207
01208 LOG_TRACE("select() on " << retval << " file descriptors: " << strerror(errno));
01209
01210
01211
01212 if (m_input_fd >= 0 && FD_ISSET(m_input_fd, &write_fds))
01213 {
01214 if (m_input == ST_STRING)
01215 {
01216
01217
01218 assert(m_input_string);
01219 assert(m_input_string_pos < m_input_string->size());
01220
01221 ssize_t wb;
01222
01223 do
01224 {
01225 wb = write(m_input_fd,
01226 m_input_string->data() + m_input_string_pos,
01227 m_input_string->size() - m_input_string_pos);
01228
01229 LOG_TRACE("Write on input fd: " << wb);
01230
01231 if (wb < 0)
01232 {
01233 if (errno == EAGAIN || errno == EINTR)
01234 {
01235 }
01236 else
01237 {
01238 LOG_DEBUG("Error writing to input file descriptor: " << strerror(errno));
01239
01240 sclose(m_input_fd);
01241 m_input_fd = -1;
01242
01243 LOG_INFO("Closing input file descriptor: " << strerror(errno));
01244 }
01245 }
01246 else if (wb > 0)
01247 {
01248 m_input_string_pos += wb;
01249
01250 if (m_input_string_pos >= m_input_string->size())
01251 {
01252 sclose(m_input_fd);
01253 m_input_fd = -1;
01254
01255 LOG_INFO("Closing input file descriptor: " << strerror(errno));
01256 break;
01257 }
01258 }
01259 } while (wb > 0);
01260
01261 }
01262 else if (m_input == ST_OBJECT)
01263 {
01264
01265
01266 ssize_t wb;
01267
01268 do
01269 {
01270 wb = write(m_input_fd,
01271 m_input_rbuffer.bottom(),
01272 m_input_rbuffer.bottomsize());
01273
01274 LOG_TRACE("Write on input fd: " << wb);
01275
01276 if (wb < 0)
01277 {
01278 if (errno == EAGAIN || errno == EINTR)
01279 {
01280 }
01281 else
01282 {
01283 LOG_INFO("Error writing to input file descriptor: " << strerror(errno));
01284
01285 sclose(m_input_fd);
01286 m_input_fd = -1;
01287
01288 LOG_INFO("Closing input file descriptor: " << strerror(errno));
01289 }
01290 }
01291 else if (wb > 0)
01292 {
01293 m_input_rbuffer.advance(wb);
01294 }
01295 } while (wb > 0);
01296 }
01297 }
01298
01299 if (m_output_fd >= 0 && FD_ISSET(m_output_fd, &read_fds))
01300 {
01301
01302
01303 ssize_t rb;
01304
01305 do
01306 {
01307 errno = 0;
01308
01309 rb = read(m_output_fd,
01310 m_buffer, sizeof(m_buffer));
01311
01312 LOG_TRACE("Read on output fd: " << rb);
01313
01314 if (rb <= 0)
01315 {
01316 if (rb == 0 && errno == 0)
01317 {
01318
01319
01320 LOG_INFO("Closing output file descriptor: " << strerror(errno));
01321
01322 if (m_output == ST_OBJECT)
01323 {
01324 assert(m_output_sink);
01325 m_output_sink->eof();
01326 }
01327
01328 sclose(m_output_fd);
01329 m_output_fd = -1;
01330 }
01331 else if (errno == EAGAIN || errno == EINTR)
01332 {
01333 }
01334 else
01335 {
01336 LOG_ERROR("Error reading from output file descriptor: " << strerror(errno));
01337 }
01338 }
01339 else
01340 {
01341 if (m_output == ST_STRING)
01342 {
01343 assert(m_output_string);
01344 m_output_string->append(m_buffer, rb);
01345 }
01346 else if (m_output == ST_OBJECT)
01347 {
01348 assert(m_output_sink);
01349 m_output_sink->process(m_buffer, rb);
01350 }
01351 }
01352 } while (rb > 0);
01353 }
01354
01355 for (unsigned int i = 0; i < m_stages.size(); ++i)
01356 {
01357 if (!m_stages[i].func) continue;
01358
01359 if (m_stages[i].stdin_fd >= 0 && FD_ISSET(m_stages[i].stdin_fd, &read_fds))
01360 {
01361 ssize_t rb;
01362
01363 do
01364 {
01365 errno = 0;
01366
01367 rb = read(m_stages[i].stdin_fd,
01368 m_buffer, sizeof(m_buffer));
01369
01370 LOG_TRACE("Read on stage fd: " << rb);
01371
01372 if (rb <= 0)
01373 {
01374 if (rb == 0 && errno == 0)
01375 {
01376
01377
01378 LOG_INFO("Closing stage input file descriptor: " << strerror(errno));
01379
01380 m_stages[i].func->eof();
01381
01382 sclose(m_stages[i].stdin_fd);
01383 m_stages[i].stdin_fd = -1;
01384 }
01385 else if (errno == EAGAIN || errno == EINTR)
01386 {
01387 }
01388 else
01389 {
01390 LOG_ERROR("Error reading from stage input file descriptor: " << strerror(errno));
01391 }
01392 }
01393 else
01394 {
01395 m_stages[i].func->process(m_buffer, rb);
01396 }
01397 } while (rb > 0);
01398 }
01399
01400 if (m_stages[i].stdout_fd >= 0 && FD_ISSET(m_stages[i].stdout_fd, &write_fds))
01401 {
01402 while (m_stages[i].outbuffer.size() > 0)
01403 {
01404 ssize_t wb = write(m_stages[i].stdout_fd,
01405 m_stages[i].outbuffer.bottom(),
01406 m_stages[i].outbuffer.bottomsize());
01407
01408 LOG_TRACE("Write on stage fd: " << wb);
01409
01410 if (wb < 0)
01411 {
01412 if (errno == EAGAIN || errno == EINTR)
01413 {
01414 }
01415 else
01416 {
01417 LOG_INFO("Error writing to stage output file descriptor: " << strerror(errno));
01418 }
01419 break;
01420 }
01421 else if (wb > 0)
01422 {
01423 m_stages[i].outbuffer.advance(wb);
01424 }
01425 }
01426
01427 if (m_stages[i].stdin_fd < 0 && !m_stages[i].outbuffer.size())
01428 {
01429 LOG_INFO("Closing stage output file descriptor: " << strerror(errno));
01430
01431 sclose(m_stages[i].stdout_fd);
01432 m_stages[i].stdout_fd = -1;
01433 }
01434 }
01435 }
01436 }
01437
01438
01439
01440 unsigned int donepid = 0;
01441
01442 for (unsigned int i = 0; i < m_stages.size(); ++i)
01443 {
01444 if (!m_stages[i].func) continue;
01445 ++donepid;
01446 }
01447
01448 while (donepid != m_stages.size())
01449 {
01450 int status;
01451 int p = wait(&status);
01452
01453 if (p < 0)
01454 {
01455 LOG_ERROR("Error calling wait(): " << strerror(errno));
01456 break;
01457 }
01458
01459 bool found = false;
01460
01461 for (unsigned int i = 0; i < m_stages.size(); ++i)
01462 {
01463 if (p == m_stages[i].pid)
01464 {
01465 m_stages[i].retstatus = status;
01466
01467 if (WIFEXITED(status))
01468 {
01469 LOG_INFO("Finished exec() stage " << p << " with retcode " << WEXITSTATUS(status));
01470 }
01471 else if (WIFSIGNALED(status))
01472 {
01473 LOG_INFO("Finished exec() stage " << p << " with signal " << WTERMSIG(status));
01474 }
01475 else
01476 {
01477 LOG_ERROR("Error in wait(): unknown return status for pid " << p);
01478 }
01479
01480 ++donepid;
01481 found = true;
01482 break;
01483 }
01484 }
01485
01486 if (!found)
01487 {
01488 LOG_ERROR("Error in wait(): syscall returned an unknown child pid.");
01489 }
01490 }
01491
01492 LOG_INFO("Finished running pipe.");
01493 }
01494
01495
01496
01497 ExecPipe::ExecPipe()
01498 : m_impl(new ExecPipeImpl)
01499 {
01500 ++m_impl->refs();
01501 }
01502
01503 ExecPipe::~ExecPipe()
01504 {
01505 if (--m_impl->refs() == 0)
01506 delete m_impl;
01507 }
01508
01509 ExecPipe::ExecPipe(const ExecPipe& ep)
01510 : m_impl(ep.m_impl)
01511 {
01512 ++m_impl->refs();
01513 }
01514
01515 ExecPipe& ExecPipe::operator=(const ExecPipe& ep)
01516 {
01517 if (this != &ep)
01518 {
01519 if (--m_impl->refs() == 0)
01520 delete m_impl;
01521
01522 m_impl = ep.m_impl;
01523 ++m_impl->refs();
01524 }
01525 return *this;
01526 }
01527
01528 void ExecPipe::set_debug_level(enum DebugLevel dl)
01529 {
01530 return m_impl->set_debug_level(dl);
01531 }
01532
01533 void ExecPipe::set_debug_output(void (*output)(const char *line))
01534 {
01535 return m_impl->set_debug_output(output);
01536 }
01537
01538 void ExecPipe::set_input_fd(int fd)
01539 {
01540 return m_impl->set_input_fd(fd);
01541 }
01542
01543 void ExecPipe::set_input_file(const char* path)
01544 {
01545 return m_impl->set_input_file(path);
01546 }
01547
01548 void ExecPipe::set_input_string(const std::string* input)
01549 {
01550 return m_impl->set_input_string(input);
01551 }
01552
01553 void ExecPipe::set_input_source(PipeSource* source)
01554 {
01555 return m_impl->set_input_source(source);
01556 }
01557
01558 void ExecPipe::set_output_fd(int fd)
01559 {
01560 return m_impl->set_output_fd(fd);
01561 }
01562
01563 void ExecPipe::set_output_file(const char* path, int mode)
01564 {
01565 return m_impl->set_output_file(path, mode);
01566 }
01567
01568 void ExecPipe::set_output_string(std::string* output)
01569 {
01570 return m_impl->set_output_string(output);
01571 }
01572
01573 void ExecPipe::set_output_sink(PipeSink* sink)
01574 {
01575 return m_impl->set_output_sink(sink);
01576 }
01577
01578 unsigned int ExecPipe::size() const
01579 {
01580 return m_impl->size();
01581 }
01582
01583 void ExecPipe::add_exec(const char* prog)
01584 {
01585 return m_impl->add_exec(prog);
01586 }
01587
01588 void ExecPipe::add_exec(const char* prog, const char* arg1)
01589 {
01590 return m_impl->add_exec(prog, arg1);
01591 }
01592
01593 void ExecPipe::add_exec(const char* prog, const char* arg1, const char* arg2)
01594 {
01595 return m_impl->add_exec(prog, arg1, arg2);
01596 }
01597
01598 void ExecPipe::add_exec(const char* prog, const char* arg1, const char* arg2, const char* arg3)
01599 {
01600 return m_impl->add_exec(prog, arg1, arg2, arg3);
01601 }
01602
01603 void ExecPipe::add_exec(const std::vector<std::string>* args)
01604 {
01605 return m_impl->add_exec(args);
01606 }
01607
01608 void ExecPipe::add_execp(const char* prog)
01609 {
01610 return m_impl->add_execp(prog);
01611 }
01612
01613 void ExecPipe::add_execp(const char* prog, const char* arg1)
01614 {
01615 return m_impl->add_execp(prog, arg1);
01616 }
01617
01618 void ExecPipe::add_execp(const char* prog, const char* arg1, const char* arg2)
01619 {
01620 return m_impl->add_execp(prog, arg1, arg2);
01621 }
01622
01623 void ExecPipe::add_execp(const char* prog, const char* arg1, const char* arg2, const char* arg3)
01624 {
01625 return m_impl->add_execp(prog, arg1, arg2, arg3);
01626 }
01627
01628 void ExecPipe::add_execp(const std::vector<std::string>* args)
01629 {
01630 return m_impl->add_execp(args);
01631 }
01632
01633 void ExecPipe::add_exece(const char* path,
01634 const std::vector<std::string>* args,
01635 const std::vector<std::string>* env)
01636 {
01637 return m_impl->add_exece(path, args, env);
01638 }
01639
01640 void ExecPipe::add_function(PipeFunction* func)
01641 {
01642 return m_impl->add_function(func);
01643 }
01644
01645 ExecPipe& ExecPipe::run()
01646 {
01647 m_impl->run();
01648 return *this;
01649 }
01650
01651 int ExecPipe::get_return_status(unsigned int stageid) const
01652 {
01653 return m_impl->get_return_status(stageid);
01654 }
01655
01656 int ExecPipe::get_return_code(unsigned int stageid) const
01657 {
01658 return m_impl->get_return_code(stageid);
01659 }
01660
01661 int ExecPipe::get_return_signal(unsigned int stageid) const
01662 {
01663 return m_impl->get_return_signal(stageid);
01664 }
01665
01666 bool ExecPipe::all_return_codes_zero() const
01667 {
01668 return m_impl->all_return_codes_zero();
01669 }
01670
01671
01672
01673 PipeSource::PipeSource()
01674 : m_impl(NULL)
01675 {
01676 }
01677
01678 void PipeSource::write(const void* data, unsigned int datalen)
01679 {
01680 assert(m_impl);
01681 return m_impl->input_source_write(data, datalen);
01682 }
01683
01684
01685
01686 PipeFunction::PipeFunction()
01687 : m_impl(NULL), m_stageid(0)
01688 {
01689 }
01690
01691 void PipeFunction::write(const void* data, unsigned int datalen)
01692 {
01693 assert(m_impl);
01694 return m_impl->stage_function_write(m_stageid, data, datalen);
01695 }
01696
01697
01698
01699 }