diff --git a/forkrun.bash b/forkrun.bash index 30223d0..10db5a2 100644 --- a/forkrun.bash +++ b/forkrun.bash @@ -22,22 +22,25 @@ forkrun() { ############################ BEGIN FUNCTION ############################ - trap - EXIT INT TERM HUP USR1 USR2 + trap - EXIT INT TERM HUP USR1 shopt -s extglob # make all variables local - local tmpDir fPath outStr delimiterVal delimiterReadStr delimiterRemoveStr exitTrapStr exitTrapStr_kill nLines0 nOrder nProcs nProcsMax nBytes tTimeout coprocSrcCode outCur tmpDirRoot returnVal tmpVar t0 readBytesProg nullDelimiterProg ddQuietStr trailingNullFlag inotifyFlag lseekFlag fallocateFlag nLinesAutoFlag nQueueFlag substituteStringFlag substituteStringIDFlag nOrderFlag readBytesFlag readBytesExactFlag nullDelimiterFlag subshellRunFlag stdinRunFlag pipeReadFlag rmTmpDirFlag exportOrderFlag noFuncFlag unescapeFlag optParseFlag continueFlag doneIndicatorFlag FORCE_allowCarriageReturnsFlag ddAvailableFlag fd_continue fd_inotify fd_inotify0 fd_nAuto fd_nAuto0 fd_nOrder fd_nOrder0 fd_read fd_read0 fd_write fd_stdout fd_stdin fd_stdin0 fd_stderr pWrite pOrder pAuto pQueue pWrite_PID pNotify_PID pOrder_PID pAuto_PID pQueue_PID fd_read_pos fd_read_pos_old fd_write_pos DEBUG_FORKRUN + local tmpDir fPath outStr delimiterVal delimiterReadStr delimiterRemoveStr exitTrapStr exitTrapStr_kill nLines0 nOrder nProcs nProcsMax nBytes tTimeout coprocSrcCode outCur tmpDirRoot returnVal tmpVar t0 readBytesProg nullDelimiterProg ddQuietStr fd_read_pos fd_read_pos_old fd_write_pos pWrite pOrder pAuto pQueue pWrite_PID pNotify_PID pOrder_PID pAuto_PID pQueue_PID DEBUG_FORKRUN + # trailingNullFlag inotifyFlag lseekFlag fallocateFlag nLinesAutoFlag nQueueFlag substituteStringFlag substituteStringIDFlag nOrderFlag readBytesFlag readBytesExactFlag nullDelimiterFlag subshellRunFlag stdinRunFlag pipeReadFlag rmTmpDirFlag exportOrderFlag noFuncFlag unescapeFlag optParseFlag continueFlag doneIndicatorFlag FORCE_allowCarriageReturnsFlag ddAvailableFlag + # fd_continue fd_inotify fd_inotify0 fd_nAuto fd_nAuto0 fd_nOrder fd_nOrder0 fd_read fd_read0 fd_write fd_stdout fd_stdin fd_stdin0 fd_stderr local -i PID0 nLines nLinesCur nLinesNew nLinesMax nRead nWait nOrder0 nBytesRead nQueue nQueueLast nQueueMin nQueueLastCount nCPU v9 kkMax kkCur kk kkProcs verboseLevel pLOAD_max pAdd local -a A p_PID runCmd outHave outPrint pLOADA + local -A flagsA fdA # # # # # PARSE OPTIONS # # # # # - : "${verboseLevel:=0}" "${returnVal:=0}" "${fd_stdin0:=0}" + : "${verboseLevel:=0}" "${returnVal:=0}" "${fdA['stdin0']:=0}" # check inputs and set defaults if needed - [[ $# == 0 ]] && optParseFlag=false || optParseFlag=true - while ${optParseFlag} && (( $# > 0 )) && [[ "$1" == [-+]* ]]; do + [[ $# == 0 ]] && flagsA['optParse']=false || flagsA['optParse']=true + while ${flagsA['optParse']} && (( $# > 0 )) && [[ "$1" == [-+]* ]]; do case "${1}" in -?(-)@([jP]|?(n)[Pp]roc?(s)?)?(?([= ])?([+-])*([0-9])@([0-9,])*([0-9])?(,*([0-9])))) @@ -52,10 +55,10 @@ forkrun() { -?(-)?(n)l?(ine?(s))?(?([= ])+([0-9]))) if [[ "${1}" == -?(-)?(n)l?(ine?(s))?([= ])+([0-9]) ]]; then nLines="${1##@(-?(-)?(n)l?(ine?(s))?([= ]))}" - nLinesAutoFlag=false + flagsA['nLinesAuto']=false elif [[ "${1}" == -?(-)?(n)l?(ine?(s)) ]] && [[ "${2}" == +([0-9]) ]]; then nLines="${2}" - nLinesAutoFlag=false + flagsA['nLinesAuto']=false shift 1 fi ;; @@ -63,10 +66,10 @@ forkrun() { -?(-)?(N)L?(INE?(S))?(?([= ])+([0-9])?(,+([0-9])))) if [[ "${1}" == -?(-)?(N)L?(INE?(S))?([= ])+([0-9])?(,+([0-9])) ]]; then nLines0="${1##@(-?(-)?(N)L?(INE?(S))?([= ]))}" - nLinesAutoFlag=true + flagsA['nLinesAuto']=true elif [[ "${1}" == -?(-)?(N)L?(INE?(S)) ]] && [[ "${2}" == +([0-9])?(,+([0-9])) ]]; then nLines0="${2}" - nLinesAutoFlag=true + flagsA['nLinesAuto']=true shift 1 else continue @@ -82,12 +85,12 @@ forkrun() { -?(-)b?(yte?(s))?(?([= ])+([0-9])?([KkMmGgTtPp])?(i)?([Bb]))) if [[ "${1}" == -?(-)b?(yte?(s))?([= ])+([0-9])?([KkMmGgTtPp])?(i)?([Bb]) ]]; then nBytes="${1##@(+([0-9])?([KkMmGgTtPp])?(i)?([Bb]))}" - readBytesFlag=true - readBytesExactFlag=false + flagsA['readBytes']=true + flagsA['readBytesExact']=false elif [[ "${1}" == -?(-)b?(yte?(s)) ]] && [[ "${2}" == +([0-9])?([KkMmGgTtPp])?(i)?([Bb]) ]]; then nBytes="${2}" - readBytesFlag=true - readBytesExactFlag=false + flagsA['readBytes']=true + flagsA['readBytesExact']=false shift 1 fi ;; @@ -95,12 +98,12 @@ forkrun() { -?(-)B?(YTE?(S))?(?([= ])+([0-9])?([KkMmGgTtPp])?(i)?([Bb])?(,+([0-9])?(.+([0-9]))))) if [[ "${1}" == -?(-)B?(YTE?(S))?([= ])+([0-9])?([KkMmGgTtPp])?(i)?([Bb])?(,+([0-9])?(.+([0-9]))) ]]; then nBytes="${1##@(+([0-9])?([KkMmGgTtPp])?(i)?([Bb])?(,+([0-9])?(.+([0-9]))))}" - readBytesFlag=true - readBytesExactFlag=true + flagsA['readBytes']=true + flagsA['readBytesExact']=true elif [[ "${1}" == -?(-)B?(YTE?(S)) ]] && [[ "${2}" == +([0-9])?([KkMmGgTtPp])?(i)?([Bb])?(,+([0-9])?(.+([0-9]))) ]]; then nBytes="${2}" - readBytesFlag=true - readBytesExactFlag=true + flagsA['readBytes']=true + flagsA['readBytesExact']=true shift 1 fi ;; @@ -120,65 +123,65 @@ forkrun() { if [[ "${1}" == -?(-)d?(elim?(iter))?([= ])@([[:graph:]])* ]]; then delimiterVal="${1##@(-?(-)d?(elim?(iter))?([= ]))}" (( ${#delimiterVal} > 1 )) && printf '\nWARNING: the delimiter must be a single character, and a multi-character string was given. Only using the 1st character.\n\n' >&2 - (( ${#delimiterVal} == 0 )) && nullDelimiterFlag=true || delimiterVal="${delimiterVal:0:1}" + (( ${#delimiterVal} == 0 )) && flagsA['nullDelimiter']=true || delimiterVal="${delimiterVal:0:1}" elif [[ "${1}" == -?(-)d?(elim?(iter)) ]] && [[ "${2}" == @([[:graph:]])* ]]; then (( ${#2} > 1 )) && printf '\nWARNING: the delimiter must be a single character, and a multi-character string was given. Only using the 1st character.\n\n' >&2 - (( ${#2} == 0 )) && nullDelimiterFlag=true || delimiterVal="${2:0:1}" + (( ${#2} == 0 )) && flagsA['nullDelimiter']=true || delimiterVal="${2:0:1}" shift 1 fi ;; -?(-)@(u|fd|file?(-)descriptor)?(?([= ])+([0-9]))) if [[ "${1}" == -?(-)@(u|fd|file?(-)descriptor)?([= ])+([0-9]) ]]; then - fd_stdin0="${1##@(-?(-)@(u|fd|file?(-)descriptor)?([= ]))}" + fdA['stdin0']="${1##@(-?(-)@(u|fd|file?(-)descriptor)?([= ]))}" elif [[ "${1}" == -?(-)@(u|fd|file?(-)descriptor) ]] && [[ "${2}" == +([0-9]) ]]; then - fd_stdin0="${2}" + fdA['stdin0']="${2}" shift 1 fi ;; [+-]?([+-])i?(nsert)) - [[ "${1:0:1}" == '-' ]] && substituteStringFlag=true || substituteStringFlag=false + [[ "${1:0:1}" == '-' ]] && flagsA['substituteString']=true || flagsA['substituteString']=false ;; [+-]?([+-])@(I?(D)|INSERT?(?(-)ID))) - [[ "${1:0:1}" == '-' ]] && substituteStringIDFlag=true || substituteStringIDFlag=false + [[ "${1:0:1}" == '-' ]] && flagsA['substituteStringID']=true || flagsA['substituteStringID']=false ;; [+-]?([+-])k?(eep?(?(-)order))) - [[ "${1:0:1}" == '-' ]] && nOrderFlag=true || nOrderFlag=false + [[ "${1:0:1}" == '-' ]] && flagsA['nOrder']=true || flagsA['nOrder']=false ;; [+-]?([+-])@(0|z?(ero)|null)) - [[ "${1:0:1}" == '-' ]] && nullDelimiterFlag=true || nullDelimiterFlag=false + [[ "${1:0:1}" == '-' ]] && flagsA['nullDelimiter']=true || flagsA['nullDelimiter']=false ;; [+-]?([+-])s?(ub)?(?(-)shell)?(?(-)run)) - [[ "${1:0:1}" == '-' ]] && subshellRunFlag=true || subshellRunFlag=false + [[ "${1:0:1}" == '-' ]] && flagsA['subshellRun']=true || flagsA['subshellRun']=false ;; [+-]?([+-])@(S|[Ss]tdin?(?(-)run))) - [[ "${1:0:1}" == '-' ]] && stdinRunFlag=true || stdinRunFlag=false + [[ "${1:0:1}" == '-' ]] && flagsA['stdinRun']=true || flagsA['stdinRun']=false ;; [+-]?([+-])p?(ipe)?(?(-)read)) - [[ "${1:0:1}" == '-' ]] && pipeReadFlag=true || pipeReadFlag=false + [[ "${1:0:1}" == '-' ]] && flagsA['pipeRead']=true || flagsA['pipeRead']=false ;; [+-]?([+-])@(D|[Dd]elete)) - [[ "${1:0:1}" == '-' ]] && rmTmpDirFlag=true || rmTmpDirFlag=false + [[ "${1:0:1}" == '-' ]] && flagsA['rmTmpDir']=true || flagsA['rmTmpDir']=false ;; [+-]?([+-])n?(umber)?(-)?(line?(s))) - [[ "${1:0:1}" == '-' ]] && exportOrderFlag=true || exportOrderFlag=false + [[ "${1:0:1}" == '-' ]] && flagsA['exportOrder']=true || flagsA['exportOrder']=false ;; [+-]?([+-])@(N?(O)|[Nn][Oo]?(-)func)) - [[ "${1:0:1}" == '-' ]] && noFuncFlag=true || noFuncFlag=false + [[ "${1:0:1}" == '-' ]] && flagsA['noFunc']=true || flagsA['noFunc']=false ;; [+-]?([+-])U?(NESCAPE)) - [[ "${1:0:1}" == '-' ]] && unescapeFlag=true || unescapeFlag=false + [[ "${1:0:1}" == '-' ]] && flagsA['unescape']=true || flagsA['unescape']=false ;; [+-]?([+-])@(+(v)|verbose)) @@ -200,7 +203,7 @@ forkrun() { ;; --) - optParseFlag=false + flagsA['optParse']=false ;; @([-+])?([-+])*@([[:graph:]])*) @@ -212,7 +215,7 @@ forkrun() { ;; *) - optParseFlag=false + flagsA['optParse']=false break ;; @@ -223,7 +226,7 @@ forkrun() { done - [ -t "${fd_stdin0}" ] && { + [ -t "${fdA['stdin0']}" ] && { (( ${verboseLevel} > 0 )) && printf '\n\nERROR: STDIN is a terminal. \n\nforkrun requires STDIN to be a pipe \n(containing the inputs to parallelize over); e.g.: \n\nprintf '"'"'%%s\\n'"'"' "${args[@]}" | forkrun parFunc \n\nABORTING! \n\n' returnVal=1 return 1 @@ -272,35 +275,35 @@ forkrun() { shopt -s nullglob # dynamically set defaults for a few flags - : "${noFuncFlag:=false}" "${FORCE_allowCarriageReturnsFlag:=false}" "${readBytesFlag:=false}" "${readBytesExactFlag:=false}" "${nullDelimiterFlag:=false}" + : "${flagsA['noFunc']:=false}" "${flagsA['FORCE_allowCarriageReturns']:=false}" "${flagsA['readBytes']:=false}" "${flagsA['readBytesExact']:=false}" "${flagsA['nullDelimiter']:=false}" if enable lseek &>/dev/null; then - : "${lseekFlag:=true}" + : "${flagsA['lseek']:=true}" else - : "${lseekFlag:=false}" + : "${flagsA['lseek']:=false}" fi # determine what forkrun is using lines on stdin for - if ${FORCE_allowCarriageReturnsFlag}; then + if ${flagsA['FORCE_allowCarriageReturns']}; then # NOTE: allowing carriage returns in parFunC (or its initial args) is DANGEROUS. Dont do this unless you know what you are doing. - # As such, `FORCE_allowCarriageReturnsFlag` can only be enabled (set to `true`) using the DEBUG_FORKRUN environment variable + # As such, flagsA['`FORCE_allowCarriageReturns']` can only be enabled (set to `true`) using the DEBUG_FORKRUN environment variable runCmd=("${@}") else runCmd=("${@//$'\r'/}") fi - (( ${#runCmd[@]} > 0 )) || ${noFuncFlag} || runCmd=(printf '%s\n') - (( ${#runCmd[@]} > 0 )) && noFuncFlag=false - ${noFuncFlag} && runCmd=('source' '/proc/self/fd/0') + (( ${#runCmd[@]} > 0 )) || ${flagsA['noFunc']} || runCmd=(printf '%s\n') + (( ${#runCmd[@]} > 0 )) && flagsA['noFunc']=false + ${flagsA['noFunc']} && runCmd=('source' '/proc/self/fd/0') hash "${runCmd[0]}" &>/dev/null || hash "${runCmd[0]%% *}" &>/dev/null # setup byte reading if passed -b or -B - if ${readBytesFlag}; then + if ${flagsA['readBytes']}; then # turn off nLinesAuto - nLinesAutoFlag=false + flagsA['nLinesAuto']=false # turn on passing data via stdin (to prevent mangling NULL's in binary data) by default when byte splitting - : "${stdinRunFlag:=true}" + : "${flagsA['stdinRun']:=true}" # parse read size (in bytes) nBytes="${nBytes,,}" @@ -325,15 +328,15 @@ forkrun() { # make sure nBytes is only digits [[ "${nBytes//[0-9]/}" ]] && (( ${verboseLevel} >= 0 )) && { - printf '\nERROR: the byte count passed to the ( -b | -B ) flag did not parse correctly. \nThis must consist solely of numbers, optionally followed by a standard si prefix. \nVALID EXAMPLES: 4 8b 16k 32mb 64G 128TB 256PiB \nNOTE: the count is always in bytes, never in bits, regardless of the case of the (optional) trailing "b" / "B"\n\n' >&${fd_stderr}; + printf '\nERROR: the byte count passed to the ( -b | -B ) flag did not parse correctly. \nThis must consist solely of numbers, optionally followed by a standard si prefix. \nVALID EXAMPLES: 4 8b 16k 32mb 64G 128TB 256PiB \nNOTE: the count is always in bytes, never in bits, regardless of the case of the (optional) trailing "b" / "B"\n\n' >&${fdA['stderr']}; returnVal=1 return 1 } # check for incompatible flags - { ${nullDelimiterFlag} || [[ ${delimiterVal} ]] || [[ ${delimiterRemoveStr} ]]; } && (( ${verboseLevel} >= 0 )) && { - (( ${verboseLevel} >= 0 )) && printf '\nWARNING: The flag to use a null or a custom delimiter (-z | -0 | -d ) and the flag to read by byte count ( -b | -B ) were both passed.\nThere are no delimiters required when reading by bytes, so the delimiter flag will be unset and ignored\n\n' >&${fd_stderr}; - nullDelimiterFlag=false + { ${flagsA['nullDelimiter']} || [[ ${delimiterVal} ]] || [[ ${delimiterRemoveStr} ]]; } && (( ${verboseLevel} >= 0 )) && { + (( ${verboseLevel} >= 0 )) && printf '\nWARNING: The flag to use a null or a custom delimiter (-z | -0 | -d ) and the flag to read by byte count ( -b | -B ) were both passed.\nThere are no delimiters required when reading by bytes, so the delimiter flag will be unset and ignored\n\n' >&${fdA['stderr']}; + flagsA['nullDelimiter']=false unset delimiterVal delimiterRemoveStr } @@ -345,38 +348,38 @@ forkrun() { readBytesProg='head' else readBytesProg='bash' - (( ${verboseLevel} >= 0 )) && printf '\nWARNING: neither "dd (GNU)" nor "head" are available. The `read` builtin will be used by the worker coprocs to read data. forkrun will run considerably slower. \n\n' >&${fd_stderr}; + (( ${verboseLevel} >= 0 )) && printf '\nWARNING: neither "dd (GNU)" nor "head" are available. The `read` builtin will be used by the worker coprocs to read data. forkrun will run considerably slower. \n\n' >&${fdA['stderr']}; fi } - ${stdinRunFlag} || (( ${verboseLevel} < 0 )) || printf '\nWARNING: data will be passed to coprocs via bash variables, which will drop NULLs and will probably mangle binary data (text data should still work). \nIt is not recommended to use the `+S` flag to prevent passing data to the coprocs stdin\n\n' >&${fd_stderr}; + ${flagsA['stdinRun']} || (( ${verboseLevel} < 0 )) || printf '\nWARNING: data will be passed to coprocs via bash variables, which will drop NULLs and will probably mangle binary data (text data should still work). \nIt is not recommended to use the `+S` flag to prevent passing data to the coprocs stdin\n\n' >&${fdA['stderr']}; - # TEMP FIX - force readBytesExactFlag if using bash and stdinRunFlag. Otherwise it gets stuck somewhere :/ - [[ ${readBytesProg} == 'bash' ]] && ${stdinRunFlag} && readBytesExactFlag=true + # TEMP FIX - force flagsA['readBytesExact'] if using bash and flagsA['stdinRun']. Otherwise it gets stuck somewhere :/ + [[ ${readBytesProg} == 'bash' ]] && ${flagsA['stdinRun']} && flagsA['readBytesExact']=true # read from pipe for -B if using dd/head. also dont need inotifywait - if ${readBytesExactFlag} && ! { [[ ${readBytesProg} == 'bash' ]] && ${stdinRunFlag}; }; then - pipeReadFlag=true - inotifyFlag=false + if ${flagsA['readBytesExact']} && ! { [[ ${readBytesProg} == 'bash' ]] && ${stdinRunFlag}; }; then + flagsA['pipeRead']=true + flagsA['inotify']=false else - pipeReadFlag=false + flagsA['pipeRead']=false fi [[ ${readBytesProg} == 'bash' ]] || hash "${readBytesProg}" else # set batch size - { [[ ${nLines} ]] && (( ${nLines} > 0 )) && : "${nLinesAutoFlag:=false}"; } || : "${nLinesAutoFlag:=true}" + { [[ ${nLines} ]] && (( ${nLines} > 0 )) && : "${flagsA['nLinesAuto']:=false}"; } || : "${flagsA['nLinesAuto']:=true}" { [[ -z ${nLines} ]] || [[ ${nLines} == 0 ]]; } && nLines=1 fi # set number of coproc workers and (if enabled) minimim worker read queue length [[ "${nProcs}" == '-'* ]] && { - : "${nQueueFlag:=true}" + : "${flagsA['nQueue']:=true}" nProcs="${nProcs#'-'}" } [[ "${nProcs}" == *','* ]] && { - : "${nQueueFlag:=true}" + : "${flagsA['nQueue']:=true}" nProcsMax="${nProcs#*,}" nProcs="${nProcs%%,*}" [[ "${nProcsMax}" == *','* ]] && { @@ -385,56 +388,56 @@ forkrun() { } } - : "${nQueueFlag:=false}" "${nQueueMin:=1}" + : "${flagsA['nQueue']:=false}" "${nQueueMin:=1}" local -i nProcs="${nProcs}" nProcsMax="${nProcsMax}" nCPU="$({ type -a nproc &>/dev/null && nproc; } || { type -a grep &>/dev/null && grep -cE '^processor.*: ' /proc/cpuinfo; } || { mapfile -t tmpA 0 )); } || { ${nQueueFlag} && nProcs=$(( ${nCPU} / 2 )) || nProcs=${nCPU}; } + { [[ ${nProcs} ]] && (( ${nProcs:-0} > 0 )); } || { ${flagsA['nQueue']} && nProcs=$(( ${nCPU} / 2 )) || nProcs=${nCPU}; } - ${nQueueFlag} && { + ${flagsA['nQueue']} && { [[ ${nProcsMax//0/} ]] || nProcsMax=$(( ${nCPU} * 2 )); [[ ${nQueueMin//0/} ]] || nQueueMin=1 } - { ${nQueueFlag} && (( ${nQueueMin:-0} > 0 )) && { [[ ${nProcsMax:-0} == '0' ]] || (( ${nProcs} < ${nProcsMax} )); }; } || : "${nQueueFlag:=false}" + { ${flagsA['nQueue']} && (( ${nQueueMin:-0} > 0 )) && { [[ ${nProcsMax:-0} == '0' ]] || (( ${nProcs} < ${nProcsMax} )); }; } || : "${flagsA['nQueue']:=false}" # if reading 1 line at a time (and not automatically adjusting it) skip saving the data in a tmpfile and read directly from stdin pipe - ${nLinesAutoFlag} || { [[ ${nLines} == 1 ]] && : "${pipeReadFlag:=true}"; } + ${flagsA['nLinesAuto']} || { [[ ${nLines} == 1 ]] && : "${flagsA['pipeRead']:=true}"; } # set defaults for control flags/parameters - : "${nOrderFlag:=false}" "${rmTmpDirFlag:=true}" "${nLinesMax:=1024}" "${subshellRunFlag:=false}" "${pipeReadFlag:=false}" "${substituteStringFlag:=false}" "${substituteStringIDFlag:=false}" "${exportOrderFlag:=false}" "${unescapeFlag:=false}" "${stdinRunFlag:=false}" - doneIndicatorFlag=false + : "${flagsA['nOrder']:=false}" "${flagsA['rmTmpDir']:=true}" "${nLinesMax:=1024}" "${flagsA['subshellRun']:=false}" "${flagsA['pipeRead']:=false}" "${flagsA['substituteString']:=false}" "${flagsA['substituteStringID']:=false}" "${flagsA['exportOrder']:=false}" "${flagsA['unescape']:=false}" "${flagsA['stdinRun']:=false}" + flagsA['doneIndicator']=false # check for inotifywait - type -a inotifywait &>/dev/null && ! ${pipeReadFlag} && : "${inotifyFlag:=true}" || : "${inotifyFlag:=false}" + type -a inotifywait &>/dev/null && ! ${flagsA['pipeRead']} && : "${flagsA['inotify']:=true}" || : "${flagsA['inotify']:=false}" # check for fallocate - type -a fallocate &>/dev/null && ! ${pipeReadFlag} && : "${fallocateFlag:=true}" || : "${fallocateFlag:=false}" + type -a fallocate &>/dev/null && ! ${flagsA['pipeRead']} && : "${flagsA['fallocate']:=true}" || : "${flagsA['fallocate']:=false}" # check for conflict in flags that were defined on the commandline when forkrun was called - ${pipeReadFlag} && ${nLinesAutoFlag} && (( ${verboseLevel} >= 0 )) && { printf '%s\n' '' 'WARNING: automatically adjusting number of lines used per function call not supported when reading directly from stdin pipe' ' Disabling reading directly from stdin pipe...a tmpfile will be used' '' >&${fd_stderr}; pipeReadFlag=false; } + ${flagsA['pipeRead']} && ${flagsA['nLinesAuto']} && (( ${verboseLevel} >= 0 )) && { printf '%s\n' '' 'WARNING: automatically adjusting number of lines used per function call not supported when reading directly from stdin pipe' ' Disabling reading directly from stdin pipe...a tmpfile will be used' '' >&${fdA['stderr']}; flagsA['pipeRead']=false; } # require -k to use -n - ${exportOrderFlag} && nOrderFlag=true + ${flagsA['exportOrder']} && flagsA['nOrder']=true # setup delimiter - ${readBytesFlag} || { - if ${nullDelimiterFlag}; then + ${flagsA['readBytes']} || { + if ${flagsA['nullDelimiter']}; then delimiterReadStr="-d ''" - ${lseekFlag} && : "${nullDelimiterProg:='lseek'}" - : "${nullDelimiterProg:=bash}" + ${flagsA['lseek']} && : "${nullDelimiterProg:='lseek'}" + : "${nullDelimiterProg:=bash}" "${flagsA['FORCE_allowUnsafeNullDelimiter']:=false}" if type -p dd &>/dev/null; then - ddAvailableFlag=true + flagsA['ddAvailable']=true if dd --version | grep -qF 'coreutils'; then ddQuietStr='status=none' else ddQuietStr='2>/dev/null' fi else - : "${ddAvailableFlag=false}" + : "${flagsA['ddAvailable']=false}" fi [[ "${nullDelimiterProg}" == @(dd|bash|lseek) ]] || { - if ${FORCE_allowUnsafeNullDelimiterFlag}; then + if ${flagsA['FORCE_allowUnsafeNullDelimiter']}; then nullDelimiterProg='' else nullDelimiterProg='bash' @@ -442,33 +445,33 @@ forkrun() { } elif [[ -z ${delimiterVal} ]]; then delimiterVal='$'"'"'\n'"'" - ${noFuncFlag} || ${lseekFlag} || delimiterRemoveStr='%$'"'"'\n'"'" + ${flagsA['noFunc']} || ${flagsA['lseek']} || delimiterRemoveStr='%$'"'"'\n'"'" else delimiterVal="$(printf '%q' "${delimiterVal}")" - ${lseekFlag} || { - ${noFuncFlag} && delimiterRemoveStr='//'"${delimiterVal}"'/\$'"'"'\n'"'" || delimiterRemoveStr="%${delimiterVal}" + ${flagsA['lseek']} || { + ${flagsA['noFunc']} && delimiterRemoveStr='//'"${delimiterVal}"'/\$'"'"'\n'"'" || delimiterRemoveStr="%${delimiterVal}" } delimiterReadStr="-d ${delimiterVal}" fi } # modify runCmd if '-i' '-I' or '-u' flags are set - if ${unescapeFlag}; then - ${substituteStringFlag} && { + if ${flagsA['unescape']}; then + ${flagsA['substituteString']} && { runCmd=("${runCmd[@]//'{}'/'"${A[@]%$'"'"'\n'"'"'}"'}") } - ${substituteStringIDFlag} && { + ${flagsA['substituteStringID']} && { runCmd=("${runCmd[@]//'{ID}'/'{<#>}'}") - ${nOrderFlag} && runCmd=("${runCmd[@]//'{IND}'/'${nOrder0}'}") + ${flagsA['nOrder']} && runCmd=("${runCmd[@]//'{IND}'/'${nOrder0}'}") } else mapfile -t runCmd < <(printf '%q\n' "${runCmd[@]}") - ${substituteStringFlag} && { + ${flagsA['substituteString']} && { runCmd=("${runCmd[@]//'\{\}'/'"${A[@]%$'"'"'\n'"'"'}"'}") } - ${substituteStringIDFlag} && { + ${flagsA['substituteStringID']} && { runCmd=("${runCmd[@]//'\{ID\}'/'{<#>}'}") - ${nOrderFlag} && runCmd=("${runCmd[@]//'\{IND\}'/'${nOrder0}'}") + ${flagsA['nOrder']} && runCmd=("${runCmd[@]//'\{IND\}'/'${nOrder0}'}") } fi @@ -477,35 +480,35 @@ forkrun() { mkdir -p "${tmpDir}"/.run # if keeping tmpDir print its location to stderr - ${rmTmpDirFlag} || (( ${verboseLevel} <= 0 )) || printf '\ntmpDir path: %s\n\n' "${tmpDir}" >&${fd_stderr} + ${flagsA['rmTmpDir']} || (( ${verboseLevel} <= 0 )) || printf '\ntmpDir path: %s\n\n' "${tmpDir}" >&${fdA['stderr']} (( ${verboseLevel} > 0 )) && { printf '\n\n------------------- FLAGS INFO -------------------\n\nCOMMAND TO PARALLELIZE: %s\n' "$(printf '%s ' "${runCmd[@]}")" - ${inotifyFlag} && echo 'using inotify to efficiently wait for slow inputs on stdin' - ${fallocateFlag} && echo 'using fallocate to shrink the tmpfile containing stdin as forkrun runs' - ${lseekFlag} && echo 'using "lseek" loadable builtin to read data faster and more efficiently' - ${nQueueFlag} && printf '(-j|-P) initial / max workers: %s / %s. workers will be dynamically spawned (up to a %s workers max) whenever read queue depth is less than %s\n' "${nProcs}" "${nProcsMax}" "${nProcsMax}" "${nQueueMin}" || printf '(-j|-P) using %s coproc workers\n' ${nProcs} - ${nLinesAutoFlag} && printf '(-L) automatically adjusting batch size (lines per function call). initial = %s line(s). maximum = %s line(s).\n' "${nLines}" "${nLinesMax}" + ${flagsA['inotify']} && echo 'using inotify to efficiently wait for slow inputs on stdin' + ${flagsA['fallocate']} && echo 'using fallocate to shrink the tmpfile containing stdin as forkrun runs' + ${flagsA['lseek']} && echo 'using "lseek" loadable builtin to read data faster and more efficiently' + ${flagsA['nQueue']} && printf '(-j|-P) initial / max workers: %s / %s. workers will be dynamically spawned (up to a %s workers max) whenever read queue depth is less than %s\n' "${nProcs}" "${nProcsMax}" "${nProcsMax}" "${nQueueMin}" || printf '(-j|-P) using %s coproc workers\n' ${nProcs} + ${flagsA['nLinesAuto']} && printf '(-L) automatically adjusting batch size (lines per function call). initial = %s line(s). maximum = %s line(s).\n' "${nLines}" "${nLinesMax}" printf '(-t) forkrun tmpdir will be under %s\n' "${tmpDirRoot}" - ${readBytesFlag} && printf '(-%s) data will be read in chunks of %s %s bytes using %s\n' "$(${readBytesExactFlag} && echo 'B' || echo 'b')" "$(${readBytesExactFlag} && echo 'exactly' || echo 'up to')" "${nBytes}" "${readBytesProg}" - ${nOrderFlag} && echo '(-k) output will be ordered the same as if the inputs were run sequentially' - ${exportOrderFlag} && echo '(-n) output batches will be numbered (index is per-batch, denoted using \\034:\\035\\n)' - ${substituteStringFlag} && echo '(-i) replacing {} with lines from stdin' - ${substituteStringIDFlag} && printf '%s %s\n' '(-I) replacing {ID} with coproc worker ID' "$(${nOrderFlag} && echo 'and replacing {IND} with output order INDex')" - ${unescapeFlag} && echo '(-u) not escaping special characters in ${runCmd}' - ${pipeReadFlag} && echo '(-p) worker coprocs will read directly from stdin pipe, not from a tmpfile' - if ${nullDelimiterFlag}; then + ${flagsA['readBytes']} && printf '(-%s) data will be read in chunks of %s %s bytes using %s\n' "$(${flagsA['readBytesExact']} && echo 'B' || echo 'b')" "$(${flagsA['readBytesExact']} && echo 'exactly' || echo 'up to')" "${nBytes}" "${readBytesProg}" + ${flagsA['nOrder']} && echo '(-k) output will be ordered the same as if the inputs were run sequentially' + ${flagsA['exportOrder']} && echo '(-n) output batches will be numbered (index is per-batch, denoted using \\034:\\035\\n)' + ${flagsA['substituteString']} && echo '(-i) replacing {} with lines from stdin' + ${flagsA['substituteStringID']} && printf '%s %s\n' '(-I) replacing {ID} with coproc worker ID' "$(${flagsA['nOrder']} && echo 'and replacing {IND} with output order INDex')" + ${flagsA['unescape']} && echo '(-u) not escaping special characters in ${runCmd}' + ${flagsA['pipeRead']} && echo '(-p) worker coprocs will read directly from stdin pipe, not from a tmpfile' + if ${flagsA['nullDelimiter']}; then printf '((-0|-z)|(-d)) stdin will be parsed using nulls as delimiter (instead of newlines) (helper: %s)\n' "${nullDelimiterProg}" else printf '(%s) delimiter: %s\n' "$([[ "${delimiterVal}" == '$'"'"'\n'"'" ]] && echo '--' || echo '-d')" "${delimiterVal}" fi - ${rmTmpDirFlag} || printf '(-r) tmpdir (%s) will NOT be automatically removed\n' "${tmpDir}" - ${subshellRunFlag} && echo '(-s) coproc workers will run each group of N lines in a subshell' - ${stdinRunFlag} && echo '(-S) coproc workers will pass lines to the command being parallelized via the command'"'"'s stdin' - ${noFuncFlag} && echo '(-N) no function mode enabled: commands should be included in stdin' || printf 'tmpdir: %s\n' "${tmpDir}" + ${flagsA['rmTmpDir']} || printf '(-r) tmpdir (%s) will NOT be automatically removed\n' "${tmpDir}" + ${flagsA['subshellRun']} && echo '(-s) coproc workers will run each group of N lines in a subshell' + ${flagsA['stdinRun']} && echo '(-S) coproc workers will pass lines to the command being parallelized via the command'"'"'s stdin' + ${flagsA['noFunc']} && echo '(-N) no function mode enabled: commands should be included in stdin' || printf 'tmpdir: %s\n' "${tmpDir}" echo "(-v) Verbosity Level: ${verboseLevel}" printf '\n------------------------------------------\n\n' - } >&${fd_stderr} + } >&${fdA['stderr']} # # # # # FORK "HELPER" PROCESSES # # # # # @@ -514,7 +517,7 @@ forkrun() { : >"'"${tmpDir}"'"/.quit; kill -USR1 $(cat /dev/null) 2>/dev/null; '$'\n' - ${pipeReadFlag} && { + ${flagsA['pipeRead']} && { # '.done' file makes no sense when reading from a pipe : >"${tmpDir}"/.done } || { @@ -530,12 +533,12 @@ kill -USR1 $(cat /dev/null) 2>/dev/null; '$ trap 'trap - TERM INT HUP USR1; kill -HUP '"${PID0}"' ${BASHPID}' HUP trap 'trap - TERM INT HUP USR1' USR1 - cat <&${fd_stdin} >&${fd_write} + cat <&${fdA['stdin']} >&${fdA['write']} : >"${tmpDir}"/.done - (( ${verboseLevel} > 1 )) && printf '\nINFO: pWrite has finished - all of stdin has been saved to the tmpfile at %s\n' "${fPath}" >&${fd_stderr} - ${inotifyFlag} && { + (( ${verboseLevel} > 1 )) && printf '\nINFO: pWrite has finished - all of stdin has been saved to the tmpfile at %s\n' "${fPath}" >&${fdA['stderr']} + ${flagsA['inotify']} && { for (( kk=0 ; kk<=nProcs ; kk++ )); do - : >&${fd_write} + : >&${fdA['write']} done } } @@ -545,14 +548,14 @@ kill -USR1 $(cat /dev/null) 2>/dev/null; '$ } # setup (ordered) output. This uses the same naming scheme as `split -d` to ensure a simple `cat /path/*` always orders things correctly. - if ${nOrderFlag}; then + if ${flagsA['nOrder']}; then mkdir -p "${tmpDir}"/.out outStr='>"'"${tmpDir}"'"/.out/x${nOrder}' - printf '%s\n' {10..89} >&${fd_nOrder} + printf '%s\n' {10..89} >&${fdA['nOrder']} - # fork coproc to populate a pipe (fd_nOrder) with ordered output file name indicies for the worker copropcs to use + # fork coproc to populate a pipe (fdA['nOrder']) with ordered output file name indicies for the worker copropcs to use { coproc pOrder { export LC_ALL=C LANG=C IFS= @@ -565,7 +568,7 @@ kill -USR1 $(cat /dev/null) 2>/dev/null; '$ # generate enough nOrder indices (~10000) to fill up 64 kb pipe buffer # start at 10 so that bash wont try to treat x0_ as an octal - printf '%s\n' {9000..9899} {990000..998999} >&${fd_nOrder} + printf '%s\n' {9000..9899} {990000..998999} >&${fdA['nOrder']} # now that pipe buffer is full, add additional indices 1000 at a time (as needed) v9='99' @@ -576,7 +579,7 @@ kill -USR1 $(cat /dev/null) 2>/dev/null; '$ for (( kk=0 ; kk<=kkMax ; kk++ )); do kkCur="$(printf '%0.'"${#kkMax}"'d' "$kk")" - { source /proc/self/fd/0 >&${fd_nOrder}; }<<<"printf '%s\n' {${v9}${kkCur}000..${v9}${kkCur}999}" + { source /proc/self/fd/0 >&${fdA['nOrder']}; }<<<"printf '%s\n' {${v9}${kkCur}000..${v9}${kkCur}999}" done done @@ -585,16 +588,16 @@ kill -USR1 $(cat /dev/null) 2>/dev/null; '$ exitTrapStr_kill+="${pOrder_PID} " else - outStr='>&'"${fd_stdout}"; + outStr='>&'"${fdA['stdout']}"; fi # setup automatic dynamic nLines adjustment and/or fallocate pre-truncation of (already processed) stdin - if ${nLinesAutoFlag} || ${fallocateFlag}; then + if ${flagsA['nLinesAuto']} || ${flagsA['fallocate']}; then printf '%s\n' ${nLines} >"${tmpDir}"/.nLines # LOGIC FOR DYNAMICALLY SETTING 'nLines': - # The avg_bytes_per_line is estimated by looking at the byte offset position of fd_read and having each coproc keep track of how many lines it has read + # The avg_bytes_per_line is estimated by looking at the byte offset position of fdA['read'] and having each coproc keep track of how many lines it has read # the new "proposed" 'nLines' is determined by estimating the average bytes per line, then taking the averge of the "current nLines" and "(numbedr unread bytes) / ( (avg bytes per line) * (nProcs) )" # --> if proposed new 'nLines' is greater than current 'nLines' then use it (use case: stdin is arriving fairly fast, increase 'nLines' to match the rate lines are coming in on stdin) # --> if proposed new 'nLines' is less than or equal to current 'nLines' ignore it (i.e., nLines can only ever increase...it will never decrease) @@ -609,34 +612,34 @@ kill -USR1 $(cat /dev/null) 2>/dev/null; '$ trap 'trap - TERM INT HUP USR1; kill -HUP '"${PID0}"' ${BASHPID}' HUP trap 'trap - TERM INT HUP USR1' USR1 - ${fallocateFlag} && { + ${flagsA['fallocate']} && { nWait=$(( 16 + ( ${nProcs} / 2 ) )) fd_read_pos_old=0 } - ${nLinesAutoFlag} && nRead=0 + ${flagsA['nLinesAuto']} && nRead=0 - while ${fallocateFlag} || ${nLinesAutoFlag}; do + while ${flagsA['fallocate']} || ${flagsA['nLinesAuto']}; do - read -u ${fd_nAuto} -t 0.1 + read -u ${fdA['nAuto']} -t 0.1 case ${REPLY} in 0) - nLinesAutoFlag=false - fallocateFlag=false + flagsA['nLinesAuto']=false + flagsA['fallocate']=false break ;; '') - nLinesAutoFlag=false + flagsA['nLinesAuto']=false ;; esac - read -r fd_read_pos /dev/null) 2>/dev/null; '$ (( ${nLinesNew} > ${nLinesCur} )) && { - (( ${nLinesNew} >= ${nLinesMax} )) && { nLinesNew=${nLinesMax}; nLinesAutoFlag=false; } + (( ${nLinesNew} >= ${nLinesMax} )) && { nLinesNew=${nLinesMax}; flagsA['nLinesAuto']=false; } printf '%s\n' ${nLinesNew} >"${tmpDir}"/.nLines # verbose output - (( ${verboseLevel} > 2 )) && printf '\nCHANGING nLines from %s to %s!!! -- ( nRead = %s ; write pos = %s ; read pos = %s )\n' ${nLinesCur} ${nLinesNew} ${nRead} ${fd_write_pos} ${fd_read_pos} >&${fd_stderr} + (( ${verboseLevel} > 2 )) && printf '\nCHANGING nLines from %s to %s!!! -- ( nRead = %s ; write pos = %s ; read pos = %s )\n' ${nLinesCur} ${nLinesNew} ${nRead} ${fd_write_pos} ${fd_read_pos} >&${fdA['stderr']} nLinesCur=${nLinesNew} } fi - if ${fallocateFlag}; then + if ${flagsA['fallocate']}; then case ${nWait} in 0) fd_read_pos=$(( 4096 * ( ${fd_read_pos} / 4096 ) )) (( ${fd_read_pos} > ${fd_read_pos_old} )) && { fallocate -p -o ${fd_read_pos_old} -l $(( ${fd_read_pos} - ${fd_read_pos_old} )) "${fPath}" - (( ${verboseLevel} > 2 )) && echo "Truncating $(( ${fd_read_pos} - ${fd_read_pos_old} )) bytes off the start of the tmp file storing stdin" >&${fd_stderr} + (( ${verboseLevel} > 2 )) && echo "Truncating $(( ${fd_read_pos} - ${fd_read_pos_old} )) bytes off the start of the tmp file storing stdin" >&${fdA['stderr']} fd_read_pos_old=${fd_read_pos} } nWait=$(( 16 + ( ${nProcs} / 2 ) )) @@ -674,25 +677,25 @@ kill -USR1 $(cat /dev/null) 2>/dev/null; '$ fi [[ -f "${tmpDir}"/.quit ]] && { - nLinesAutoFlag=false - fallocateFlag=false + flagsA['nLinesAuto']=false + flagsA['fallocate']=false } done - } 2>&${fd_stderr} + } 2>&${fdA['stderr']} } 2>/dev/null - exitTrapStr+='( printf '"'"'0\n'"'"' >&${fd_nAuto0}; ) {fd_nAuto0}>&'"${fd_nAuto}"'; '$'\n' + exitTrapStr+='( printf '"'"'0\n'"'"' >&${fdA['"'"'nAuto0'"'"']}; ) {fdA['"'"'nAuto0'"'"']}>&'"${fdA['nAuto']}"'; '$'\n' printf '%s\n' "${pAuto_PID}" > "${tmpDir}"/.run/pAuto fi # setup+fork inotifywait (if available) - ${inotifyFlag} && { + ${flagsA['inotify']} && { { - # initially add 1 newline for each coproc to fd_inotify - { source /proc/self/fd/0 >&${fd_inotify0}; }<<<"printf '%.0s\n' {0..${nProcs}}" + # initially add 1 newline for each coproc to fdA['inotify'] + { source /proc/self/fd/0 >&${fdA['inotify0']}; }<<<"printf '%.0s\n' {0..${nProcs}}" # run inotifywait ( @@ -703,16 +706,16 @@ kill -USR1 $(cat /dev/null) 2>/dev/null; '$ trap 'trap - TERM INT HUP USR1; kill -TERM '"${PID0}"' ${BASHPID}' TERM trap 'trap - TERM INT HUP USR1; kill -HUP '"${PID0}"' ${BASHPID}' HUP trap 'trap - TERM INT HUP USR1' USR1 - inotifywait -q -m -e modify,close --format '' "${fPath}" >&${fd_inotify0} & + inotifywait -q -m -e modify,close --format '' "${fPath}" >&${fdA['inotify0']} & printf '%s\n' "${!}" >"${tmpDir}"/.run/pNotify ) pNotify_PID="$(<"${tmpDir}"/.run/pNotify)" - } 2>/dev/null {fd_inotify0}>&${fd_inotify} + } 2>/dev/null {fdA['inotify0']}>&${fdA['inotify']} exitTrapStr+=': > "'"${tmpDir}"'"/.stdin; '$'\n' - ${nOrderFlag} && exitTrapStr+=': >"'"${tmpDir}"'"/.out/.quit; '$'\n' + ${flagsA['nOrder']} && exitTrapStr+=': >"'"${tmpDir}"'"/.out/.quit; '$'\n' exitTrapStr_kill+="${pNotify_PID} " } @@ -720,14 +723,14 @@ kill -USR1 $(cat /dev/null) 2>/dev/null; '$ # Due to how the coproc code is dynamically generated and sourced, it cannot directly contain comments. A very brief overview of their function is below. # - # on each loop, they will acquire a read lock by read {fd_continue}, which blocks them until they have exclusive read access - # they then read N lines with mapfile and check/fix a partial read (or read N bytes with $readBytesProg) and (if -k/-n) read the output order from {fd_nOrder} - # they then release the read lock by sending \n to {fd_continue} (so the next coproc can start to read) + # on each loop, they will acquire a read lock by read {fdA['continue']}, which blocks them until they have exclusive read access + # they then read N lines with mapfile and check/fix a partial read (or read N bytes with $readBytesProg) and (if -k/-n) read the output order from {fdA['nOrder']} + # they then release the read lock by sending \n to {fdA['continue']} (so the next coproc can start to read) # if no data was read, the coproc will either wait/continue or break, depending on if end conditions are met - # finally (assuming it read data) it will run it through whatever is being parallelized. If -k/-n write [x]$nOrder to {fd_nOrder0} to indicate that index has run / was empty + # finally (assuming it read data) it will run it through whatever is being parallelized. If -k/-n write [x]$nOrder to {fdA['nOrder0']} to indicate that index has run / was empty # - # NOTE: All coprocs share the same {fd_read} file descriptor ( defined just after the end of the main forkrun subshell ) - # This has the benefit of keeping the coprocs in sync with each other - when one reads data the {fd_read} used by *all* of them is advanced. + # NOTE: All coprocs share the same {fdA['read']} file descriptor ( defined just after the end of the main forkrun subshell ) + # This has the benefit of keeping the coprocs in sync with each other - when one reads data the {fdA['read']} used by *all* of them is advanced. # generate coproc source code template (which, in turn, allows you to then spawn many coprocs very quickly and have many "code branch selection" decisions already resolved) # this contains the code for the coprocs but has the worker ID represented using {<#>}. coprocs will be sourced via source<<<"${coprocSrcCode//'{<#>}'/${kk}}" @@ -747,7 +750,7 @@ echo \"\${BASH_PID}\" >\"${tmpDir}\"/.run/p{<#>} trap ': >\"${tmpDir}\"/.quit; [[ -f \"${tmpDir}\"/.run/p{<#>} ]] && \\rm -f \"${tmpDir}\"/.run/p{<#>}; -printf '\"'\"'\n'\"'\"' >&${fd_continue}' EXIT +printf '\"'\"'\n'\"'\"' >&${fdA['"'"'continue'"'"']}' EXIT trap 'trap - TERM INT HUP USR1; kill -INT ${PID0} \${BASHPID}' INT trap 'trap - TERM INT HUP USR1; kill -TERM ${PID0} \${BASHPID}' TERM @@ -755,63 +758,63 @@ trap 'trap - TERM INT HUP USR1; kill -HUP ${PID0} \${BASHPID}' HUP trap 'trap - TERM INT HUP USR1' USR1 while true; do""" -${nLinesAutoFlag} && echo "\${nLinesAutoFlag} && read -r <\"${tmpDir}\"/.nLines && [[ \${REPLY} == +([0-9]) ]] && nLinesCur=\${REPLY}" -${nQueueFlag} && echo "printf '%s' '+' >&${fd_nQueue}" +${flagsA['nLinesAuto']} && echo "\${flagsA['nLinesAuto']} && read -r <\"${tmpDir}\"/.nLines && [[ \${REPLY} == +([0-9]) ]] && nLinesCur=\${REPLY}" +${flagsA['nQueue']} && echo "printf '%s' '+' >&${fdA['nQueue']}" echo """ - read -u ${fd_continue} + read -u ${fdA['continue']} [[ -f \"${tmpDir}\"/.quit ]] && { - printf '\n' >&${fd_continue} + printf '\n' >&${fdA['continue']} break } - [[ -f \"${tmpDir}\"/.done ]] && doneIndicatorFlag=true""" -if ${readBytesFlag}; then + [[ -f \"${tmpDir}\"/.done ]] && flagsA['doneIndicator']=true""" +if ${flagsA['readBytes']}; then case "${readBytesProg}" in 'dd') printf 'dd bs=32768 count=%sB of="%s"/.stdin.tmp.{<#>} 2>"%s"/.stdin.tmp-status.{<#>} ' "${nBytes}" "${tmpDir}" "${tmpDir}" - ${pipeReadFlag} && printf 'iflag=fullblock <&%s\n' "${fd_stdin}" || printf '<&%s\n' "${fd_read}" + ${flagsA['pipeRead']} && printf 'iflag=fullblock <&%s\n' "${fdA['stdin']}" || printf '<&%s\n' "${fdA['read']}" printf '[[ "$(<"%s"/.stdin.tmp-status.{<#>})" == *$'"'"'\\n'"'"'"0 bytes"* ]] && A=() || A[0]=1\n' "${tmpDir}" ;; 'head') printf 'head -c %s ' "${nBytes}" - ${pipeReadFlag} && printf '<&%s ' "${fd_stdin}" || printf '<&%s ' "${fd_read}" + ${flagsA['pipeRead']} && printf '<&%s ' "${fdA['stdin']}" || printf '<&%s ' "${fdA['read']}" printf '>"%s"/.stdin.tmp.{<#>}\n' "${tmpDir}" printf '[[ $(<"%s"/.stdin.tmp.{<#>}) ]] 2>/dev/null && A[0]=1 || A=()\n' "${tmpDir}" ;; 'bash') - if ${stdinRunFlag}; then + if ${flagsA['stdinRun']}; then [[ ${tTimeout} ]] && echo "SECONDS=0" - printf 'if read -r -d '"''"' -n %s -u %s' "${nBytes}" "${fd_read}" + printf 'if read -r -d '"''"' -n %s -u %s' "${nBytes}" "${fdA['read']}" [[ ${tTimeout} ]] && printf ' -t %s' "${tTimeout}" echo """; then [[ \${REPLY} ]] && A=(\"\${REPLY}\") || A=('') - trailingNullFlag=true""" - ${readBytesExactFlag} && echo 'nBytesRead=1' + flagsA['trailingNull']=true""" + ${flagsA['readBytesExact']} && echo 'nBytesRead=1' echo """ else [[ \${REPLY} ]] && A=(\"\${REPLY}\") || A=() - trailingNullFlag=false""" - ${readBytesExactFlag} && echo 'nBytesRead=0' + flagsA['trailingNull']=false""" + ${flagsA['readBytesExact']} && echo 'nBytesRead=0' echo 'fi' - if ${readBytesExactFlag}; then + if ${flagsA['readBytesExact']}; then echo """ nBytesRead+=\${#REPLY} [[ \${nBytesRead} == 0 ]] || (( \${nBytesRead} >= ${nBytes} )) || {""" [[ ${tTimeout} ]] && echo "while (( \${SECONDS} < ${tTimeout} )); do" || echo "while true; do" - echo "[[ -f \"${tmpDir}\"/.done ]] && doneIndicatorFlag=true" + echo "[[ -f \"${tmpDir}\"/.done ]] && flagsA['doneIndicator']=true" - printf "if read -r -d '' -n \$(( ${nBytes} - \${nBytesRead} )) -u ${fd_read}" + printf "if read -r -d '' -n \$(( ${nBytes} - \${nBytesRead} )) -u ${fdA['read']}" [[ ${tTimeout} ]] && printf ' -t %s' "${tTimeout}" echo """; then ((nBytesRead++)) nBytesRead+=\${#REPLY} [[ \${REPLY} ]] && A+=(\"\${REPLY}\") || A+=('') - (( \${nBytesRead} >= ${nBytes} )) && { trailingNullFlag=true; break; } + (( \${nBytesRead} >= ${nBytes} )) && { flagsA['trailingNull']=true; break; } else - trailingNullFlag=false + flagsA['trailingNull']=false [[ \${REPLY} ]] && A+=(\"\${REPLY}\") - { (( \${nBytesRead} >= ${nBytes} )) || ${doneIndicatorFlag}; } && { trailingNullFlag=false; break; } + { (( \${nBytesRead} >= ${nBytes} )) || ${flagsA['doneIndicator']}; } && { flagsA['trailingNull']=false; break; } break fi done @@ -819,7 +822,7 @@ if ${readBytesFlag}; then fi echo """ { - if \${trailingNullFlag}; then + if \${flagsA['trailingNull']}; then printf '%s\0' \"\${A[@]}\" else printf '%s' \"\${A[0]}\" @@ -828,11 +831,11 @@ if ${readBytesFlag}; then } >\"${tmpDir}\"/.stdin.tmp.{<#>}""" else printf 'read -r -N %s -u ' "${nBytes}" - if ${readBytesExactFlag}; then - printf '%s ' "${fd_stdin}" + if ${flagsA['readBytesExact']}; then + printf '%s ' "${fdA['stdin']}" [[ ${tTimeout} ]] && printf '-t %s ' "${tTimeout} " else - printf '%s ' ${fd_read} + printf '%s ' ${fdA['read']} fi echo '-a A' fi @@ -840,47 +843,47 @@ if ${readBytesFlag}; then esac else printf '%s ' "mapfile" - ${lseekFlag} && printf '%s ' '-t' + ${flagsA['lseek']} && printf '%s ' '-t' printf '%s ' '-n' "\${nLinesCur}" '-u' - ${pipeReadFlag} && printf '%s ' ${fd_stdin} || printf '%s ' ${fd_read} - { ${pipeReadFlag} || ${nullDelimiterFlag}; } && printf '%s ' '-t' + ${flagsA['pipeRead']} && printf '%s ' ${fdA['stdin']} || printf '%s ' ${fdA['read']} + { ${flagsA['pipeRead']} || ${flagsA['nullDelimiter']}; } && printf '%s ' '-t' echo "${delimiterReadStr} A" - ${pipeReadFlag} || { ${nullDelimiterFlag} && [[ -z ${nullDelimiterProg} ]]; } || { - echo "[[ \${#A[@]} == 0 ]] || \${doneIndicatorFlag} || {" - if ${lseekFlag}; then + ${flagsA['pipeRead']} || { ${flagsA['nullDelimiter']} && [[ -z ${nullDelimiterProg} ]]; } || { + echo "[[ \${#A[@]} == 0 ]] || \${flagsA['doneIndicator']} || {" + if ${flagsA['lseek']}; then echo """ - lseek ${fd_read} -1 - read -r -u ${fd_read} -N 1""" - if ${nullDelimiterFlag}; then + lseek ${fdA['read']} -1 + read -r -u ${fdA['read']} -N 1""" + if ${flagsA['nullDelimiter']}; then echo "[[ \${#REPLY} == 0 ]] || {" else echo "[[ \"\${REPLY}\" == ${delimiterVal} ]] || {" fi - elif ${nullDelimiterFlag}; then + elif ${flagsA['nullDelimiter']}; then echo """ - read -r fd_read_pos 65535 )); then { dd if=\"${fPath}\" bs=1 count=1 ${ddQuietStr} skip=\$(( \${fd_read_pos##*\$'\t'} - 1 )) | read -t 1 -r -d ''; } else - read -r -u ${fd_read0} -N \${nBytes} _ - read -r -u ${fd_read0} -d '' + read -r -u ${fdA['read0']} -N \${nBytes} _ + read -r -u ${fdA['read0']} -d '' [[ \${#REPLY} == 0 ]] fi } || {""" else echo """ - read -r -u ${fd_read0} -N \${nBytes} _ - read -r -u ${fd_read0} -d '' + read -r -u ${fdA['read0']} -N \${nBytes} _ + read -r -u ${fdA['read0']} -d '' [[ \${#REPLY} == 0 ]] || {""" fi ;; @@ -889,82 +892,82 @@ else echo "[[ \"\${A[-1]: -1}\" == ${delimiterVal} ]] || {" fi (( ${verboseLevel} > 2 )) && echo """ - echo \"Partial read at: \${A[-1]}\" >&${fd_stderr}""" + echo \"Partial read at: \${A[-1]}\" >&${fdA['stderr']}""" echo """ - until read -r -u ${fd_read} ${delimiterReadStr}; do + until read -r -u ${fdA['read']} ${delimiterReadStr}; do A[-1]+=\"\${REPLY}\"; done""" printf '%s' "A[-1]+=\"\${REPLY}\"" - ${lseekFlag} && printf '\n' || printf '%s\n' "${delimiterVal}" - (( ${verboseLevel} > 2 )) && echo "echo \"Partial read fixed to: \${A[-1]}\" >&${fd_stderr}" + ${flagsA['lseek']} && printf '\n' || printf '%s\n' "${delimiterVal}" + (( ${verboseLevel} > 2 )) && echo "echo \"Partial read fixed to: \${A[-1]}\" >&${fdA['stderr']}" echo "}" } fi -${pipeReadFlag} || { ${nullDelimiterFlag} && [[ -z ${nullDelimiterProg} ]]; } || ${readBytesFlag} || echo "}" -${nOrderFlag} && echo "read -u ${fd_nOrder} nOrder" +${flagsA['pipeRead']} || { ${flagsA['nullDelimiter']} && [[ -z ${nullDelimiterProg} ]]; } || ${flagsA['readBytes']} || echo "}" +${flagsA['nOrder']} && echo "read -u ${fdA['nOrder']} nOrder" echo """ - printf '\\n' >&${fd_continue}""" -${nQueueFlag} && echo "printf '%s' '-' >&${fd_nQueue}" + printf '\\n' >&${fdA['continue']}""" +${flagsA['nQueue']} && echo "printf '%s' '-' >&${fdA['nQueue']}" echo """ [[ \${#A[@]} == 0 ]] && { - \${doneIndicatorFlag} || { + \${flagsA['doneIndicator']} || { [[ -f \"${tmpDir}\"/.done ]] && { - read -r fd_read_pos &\${fd_nAuto0}" -${nOrderFlag} && echo ": >\"${tmpDir}\"/.out/.quit{<#>}" -${nQueueFlag} && echo "\printf '%s' '0' >&${fd_nQueue}" -${inotifyFlag} && echo 'kill -9 '"${pNotify_PID}"' 2>/dev/null' + if \${flagsA['doneIndicator']} || [[ -f \"${tmpDir}\"/.quit ]]; then""" +${flagsA['nLinesAuto']} && echo "printf '\\n' >&\${fdA['nAuto0']}" +${flagsA['nOrder']} && echo ": >\"${tmpDir}\"/.out/.quit{<#>}" +${flagsA['nQueue']} && echo "\printf '%s' '0' >&${fdA['nQueue']}" +${flagsA['inotify']} && echo 'kill -9 '"${pNotify_PID}"' 2>/dev/null' echo """ : >\"${tmpDir}\"/.quit - printf '%.0s\\n' \"${tmpDir}\"/.run/p* >&${fd_continue} + printf '%.0s\\n' \"${tmpDir}\"/.run/p* >&${fdA['continue']} break""" -{ ${inotifyFlag} || ${nOrderFlag}; } && echo "else" -${nOrderFlag} && echo "printf 'x%s\n' \"\${nOrder}\" >&\${fd_nOrder0}" -${inotifyFlag} && echo "[[ -f \"${tmpDir}\"/.done ]] && doneIndicatorFlag=true || read -u ${fd_inotify}" +{ ${flagsA['inotify']} || ${flagsA['nOrder']}; } && echo "else" +${flagsA['nOrder']} && echo "printf 'x%s\n' \"\${nOrder}\" >&\${fdA['nOrder0']}" +${flagsA['inotify']} && echo "[[ -f \"${tmpDir}\"/.done ]] && flagsA['doneIndicator']=true || read -u ${fdA['inotify']}" echo """ fi continue }""" -${nLinesAutoFlag} && { printf '%s' """ - \${nLinesAutoFlag} && { - printf '%s\\n' \${#A[@]} >&\${fd_nAuto0} - (( \${nLinesCur} < ${nLinesMax} )) || nLinesAutoFlag=false +${flagsA['nLinesAuto']} && { printf '%s' """ + \${flagsA['nLinesAuto']} && { + printf '%s\\n' \${#A[@]} >&\${fdA['nAuto0']} + (( \${nLinesCur} < ${nLinesMax} )) || flagsA['nLinesAuto']=false }""" - ${fallocateFlag} && printf '%s' ' || ' || echo + ${flagsA['fallocate']} && printf '%s' ' || ' || echo } -${fallocateFlag} && echo "printf '\\n' >&\${fd_nAuto0}" -${pipeReadFlag} || ${nullDelimiterFlag} || ${readBytesFlag} || ${lseekFlag} || { +${flagsA['fallocate']} && echo "printf '\\n' >&\${fdA['nAuto0']}" +${flagsA['pipeRead']} || ${flagsA['nullDelimiter']} || ${flagsA['readBytes']} || ${flagsA['lseek']} || { echo """ { [[ \"\${A[*]##*${delimiterVal}}\" ]] || [[ -z \${A[0]} ]]; } && {""" - (( ${verboseLevel} > 2 )) && echo "echo \"FIXING SPLIT READ\" >&${fd_stderr}" + (( ${verboseLevel} > 2 )) && echo "echo \"FIXING SPLIT READ\" >&${fdA['stderr']}" echo """ A[-1]=\"\${A[-1]%${delimiterVal}}\" IFS= mapfile ${delimiterReadStr} A <<<\"\${A[*]}\" }""" } -${subshellRunFlag} && echo '(' || echo '{' -{ ${exportOrderFlag} || { ${nOrderFlag} && ${substituteStringIDFlag}; }; } && echo 'nOrder0="$(( ${nOrder##*(9)*(0)} + ${nOrder%%*(0)${nOrder##*(9)*(0)}}0 - 9 ))"' -${exportOrderFlag} && echo "printf '\034%s:\035\n' \"\${nOrder0}\"" -${noFuncFlag} && echo 'IFS=$'"'"'\n'"'" +${flagsA['subshellRun']} && echo '(' || echo '{' +{ ${flagsA['exportOrder']} || { ${flagsA['nOrder']} && ${flagsA['substituteStringID']}; }; } && echo 'nOrder0="$(( ${nOrder##*(9)*(0)} + ${nOrder%%*(0)${nOrder##*(9)*(0)}}0 - 9 ))"' +${flagsA['exportOrder']} && echo "printf '\034%s:\035\n' \"\${nOrder0}\"" +${flagsA['noFunc']} && echo 'IFS=$'"'"'\n'"'" printf '%s ' "${runCmd[@]}" -if ${readBytesFlag} && ! { [[ ${readBytesProg} == 'bash' ]] && ! ${stdinRunFlag}; }; then - if ${stdinRunFlag} || ${noFuncFlag}; then +if ${flagsA['readBytes']} && ! { [[ ${readBytesProg} == 'bash' ]] && ! ${flagsA['stdinRun']}; }; then + if ${flagsA['stdinRun']} || ${flagsA['noFunc']}; then printf '<"%s"/%s' "${tmpDir}" '.stdin.tmp.{<#>}' else printf '"$(<"%s"/%s)"' "${tmpDir}" '.stdin.tmp.{<#>}' fi else - if ${stdinRunFlag}; then + if ${flagsA['stdinRun']}; then printf '<<<%s' "\"\${A[@]${delimiterRemoveStr}}\"" - elif ${noFuncFlag}; then + elif ${flagsA['noFunc']}; then printf "<<<\"\${A[*]%s}\"" "${delimiterRemoveStr}" - elif ! ${substituteStringFlag}; then + elif ! ${flagsA['substituteString']}; then printf '%s' "\"\${A[@]${delimiterRemoveStr}}\"" fi fi @@ -972,29 +975,29 @@ fi { printf '\\n\\n----------------------------------------------\\n\\n' echo 'ERROR DURING \"${runCmd[*]}\" CALL' - declare -p A nLinesCur nLinesAutoFlag - echo 'fd_read:' - cat /proc/self/fdinfo/${fd_read} - echo 'fd_write:' - cat /proc/self/fdinfo/${fd_write} + declare -p A nLinesCur flagsA['nLinesAuto'] + echo \"fdA['read']:\" + cat /proc/self/fdinfo/${fdA['read']} + echo \"fdA['write']:\" + cat /proc/self/fdinfo/${fdA['write']} echo - } >&${fd_stderr} + } >&${fdA['stderr']} }""" -${readBytesFlag} && { [[ ${readBytesProg//bash/} ]] || ${stdinRunFlag}; } && printf '\n\\rm -f "'"${tmpDir}"'"/.stdin.tmp.{<#>}\n' -${noFuncFlag} && echo 'IFS=' -${subshellRunFlag} && printf '\n%s ' ')' || printf '\n%s ' '}' +${flagsA['readBytes']} && { [[ ${readBytesProg//bash/} ]] || ${flagsA['stdinRun']}; } && printf '\n\\rm -f "'"${tmpDir}"'"/.stdin.tmp.{<#>}\n' +${flagsA['noFunc']} && echo 'IFS=' +${flagsA['subshellRun']} && printf '\n%s ' ')' || printf '\n%s ' '}' echo "${outStr}" -${nOrderFlag} && echo "printf '%s\n' \"\${nOrder}\" >&${fd_nOrder0}" +${flagsA['nOrder']} && echo "printf '%s\n' \"\${nOrder}\" >&${fdA['nOrder0']}" echo """ done -} 2>&${fd_stderr} {fd_nAuto0}>&${fd_nAuto} +} 2>&${fdA['stderr']} {fdA['nAuto0']}>&${fdA['nAuto']} } 2>/dev/null p_PID+=(\${p{<#>}_PID})""" )" # set traps (dynamically determined based on which option flags were active) # if ordering output print the remaining ones in trap - ${nOrderFlag} && exitTrapStr+='cat &'"${fd_stdout}"'; '$'\n' + ${flagsA['nOrder']} && exitTrapStr+='cat &'"${fdA['stdout']}"'; '$'\n' # make sure all processes are dead exitTrapStr+='kill $(cat /dev/null) 2>/dev/null; @@ -1003,7 +1006,7 @@ p_PID+=(\${p{<#>}_PID})""" )" # if removiung tmpdir delete it in trap - ${rmTmpDirFlag} && exitTrapStr+='\rm -rf "'"${tmpDir}"'" 2>/dev/null; '$'\n' + ${flagsA['rmTmpDir']} && exitTrapStr+='\rm -rf "'"${tmpDir}"'" 2>/dev/null; '$'\n' exitTrapStr+='trap - INT TERM HUP USR1; return ${returnVal:-0}' @@ -1024,16 +1027,16 @@ p_PID+=(\${p{<#>}_PID})""" )" kill -USR1 $(cat /dev/null); kill -HUP $(cat /dev/null) '"${PID0}" HUP - (( ${verboseLevel} > 1 )) && printf '\n\nALL HELPER COPROCS FORKED\n\n' >&${fd_stderr} - (( ${verboseLevel} > 3 )) && { printf '\nSET TRAPS:\n\n'; trap -p; } >&${fd_stderr} + (( ${verboseLevel} > 1 )) && printf '\n\nALL HELPER COPROCS FORKED\n\n' >&${fdA['stderr']} + (( ${verboseLevel} > 3 )) && { printf '\nSET TRAPS:\n\n'; trap -p; } >&${fdA['stderr']} # # # # # FORK COPROC "WORKERS" # # # # # - # initialize read lock {fd_continue} will act as an exclusive read lock (so lines from stdin are read atomically): + # initialize read lock {fdA['continue']} will act as an exclusive read lock (so lines from stdin are read atomically): # when there is a '\n' the pipe buffer then nothing has a read lock - # a process reads 1 byte from {fd_continue} to get the read lock, and + # a process reads 1 byte from {fdA['continue']} to get the read lock, and # when that process writes a '\n' back to the pipe it releases the read lock - printf '\n' >&${fd_continue}; + printf '\n' >&${fdA['continue']}; # source the coproc code for each coproc worker @@ -1044,10 +1047,10 @@ p_PID+=(\${p{<#>}_PID})""" )" echo "${kkProcs}" >"${tmpDir}"/.nWorkers : >"${tmpDir}"/.spawned - (( ${verboseLevel} > 1 )) && printf '\n\n%s WORKER COPROCS FORKED\n\n' "${nProcs}" >&${fd_stderr} + (( ${verboseLevel} > 1 )) && printf '\n\n%s WORKER COPROCS FORKED\n\n' "${nProcs}" >&${fdA['stderr']} # setup dynamically coproc to spawn new workers based on read queue length - ${nQueueFlag} && ! [[ -f "${tmpDir}"/.quit ]] && { + ${flagsA['nQueue']} && ! [[ -f "${tmpDir}"/.quit ]] && { export -f _forkrun_get_load { coproc pQueue { @@ -1074,7 +1077,7 @@ p_PID+=(\${p{<#>}_PID})""" )" mapfile -t pLOADA < <(_forkrun_get_load -i) - (( ${verboseLevel} > 2 )) && printf 'pLOADA = ( %s %s %s %s )\n' "${pLOADA[@]}" >&${fd_stderr} + (( ${verboseLevel} > 2 )) && printf 'pLOADA = ( %s %s %s %s )\n' "${pLOADA[@]}" >&${fdA['stderr']} until [[ -f "${tmpDir}"/.quit ]] || (( ${kkProcs} >= ${nProcsMax} )); do nQueueLast=${nQueue} @@ -1083,7 +1086,7 @@ p_PID+=(\${p{<#>}_PID})""" )" # '+' --> increase queue depth by 1. # '-' --> decrease queue depth by 1. # '0' --> quit - read -r -u ${fd_nQueue} -N 1 + read -r -u ${fdA['nQueue']} -N 1 case "${REPLY}" in '+') ((nQueue++)) ;; @@ -1092,7 +1095,7 @@ p_PID+=(\${p{<#>}_PID})""" )" *) continue ;; esac - # (( ${verboseLevel} > 3 )) && { printf '\nnQueue = %s (nProcs = %s)\n' "${nQueue}" "${kkProcs}"; cat /proc/self/schedstat; } >&${fd_stderr} + # (( ${verboseLevel} > 3 )) && { printf '\nnQueue = %s (nProcs = %s)\n' "${nQueue}" "${kkProcs}"; cat /proc/self/schedstat; } >&${fdA['stderr']} if (( ( ${nQueue} + ${nQueueLast} ) < ( 2 * ${nQueueMin} ) )); then @@ -1103,7 +1106,7 @@ p_PID+=(\${p{<#>}_PID})""" )" mapfile -t pLOADA < <(_forkrun_get_load "${pLOADA[@]}") - (( ${verboseLevel} > 2 )) && printf 'pLOADA = ( %s %s %s %s )\n' "${pLOADA[@]}" >&${fd_stderr} + (( ${verboseLevel} > 2 )) && printf 'pLOADA = ( %s %s %s %s )\n' "${pLOADA[@]}" >&${fdA['stderr']} (( ${pLOADA} >= ${pLOAD_max} )) || { @@ -1111,7 +1114,7 @@ p_PID+=(\${p{<#>}_PID})""" )" pAdd=$(( 1 + ( ( ${nCPU} - ${kkProcs} ) * ( ${pLOAD_max} - ${pLOADA} ) ) / ( 1 + ${pLOADA} ) )) - (( ${verboseLevel} > 3 )) && printf '(pLOAD=%s -- initial pAdd: %s ' "${pLOADA}" "${pAdd}" >&${fd_stderr} + (( ${verboseLevel} > 3 )) && printf '(pLOAD=%s -- initial pAdd: %s ' "${pLOADA}" "${pAdd}" >&${fdA['stderr']} (( ${pAdd} > ( ( ${nProcsMax} - ${kkProcs} ) - ( ( ${nProcsMax} - ${kkProcs} ) / ( 1 + ( 3 * ${nQueueMin} ) - ( 2 * ${nQueue} ) - ${nQueueLast} ) ) ) )) && pAdd=$(( ( ${nProcsMax} - ${kkProcs} ) - ( ( ${nProcsMax} - ${kkProcs} ) / ( 1 + ( 3 * ${nQueueMin} ) - ( 2 * ${nQueue} ) - ${nQueueLast} ) ) )) (( ${pAdd} > ( 1 + ( ${nCPU} / 16 ) ) )) && pAdd=$(( 1 + ( ${nCPU} / 16 ) )) @@ -1121,11 +1124,11 @@ p_PID+=(\${p{<#>}_PID})""" )" pAdd=1 fi - (( ${verboseLevel} > 3 )) && printf 'final pAdd: %s \n' "${pAdd}" >&${fd_stderr} + (( ${verboseLevel} > 3 )) && printf 'final pAdd: %s \n' "${pAdd}" >&${fdA['stderr']} for (( kk=0; kk<${pAdd}; kk++ )); do source /proc/self/fd/0 <<<"${coprocSrcCode//'{<#>}'/"${kkProcs}"}" - (( ${verboseLevel} > 2 )) && printf '\nSPAWNING A NEW WORKER COPROC (%s/%s). There are now %s coprocs. (read queue depth = %s)\n' "${kk}" "${pAdd}" "${kkProcs}" "${nQueue}" >&${fd_stderr} + (( ${verboseLevel} > 2 )) && printf '\nSPAWNING A NEW WORKER COPROC (%s/%s). There are now %s coprocs. (read queue depth = %s)\n' "${kk}" "${pAdd}" "${kkProcs}" "${nQueue}" >&${fdA['stderr']} ((kkProcs++)) done echo "${kkProcs}" >"${tmpDir}"/.nWorkers @@ -1139,32 +1142,32 @@ p_PID+=(\${p{<#>}_PID})""" )" [[ ${#p_PID[@]} == 0 ]] || wait "${p_PID[@]}" - } 2>&${fd_stderr} + } 2>&${fdA['stderr']} } 2>/dev/null - exitTrapStr+='echo "0" >&'"${fd_nQueue}"'; '$'\n' + exitTrapStr+='echo "0" >&'"${fdA['nQueue']}"'; '$'\n' printf '%s\n' "${pQueue_PID}" > "${tmpDir}"/.run/pQueue } (( ${verboseLevel} > 3 )) && { printf '\n\nDYNAMICALLY GENERATED COPROC CODE:\n\n%s\n\n' "${coprocSrcCode}" - declare -p fd_continue fd_inotify fd_nAuto fd_nOrder fd_nOrder0 fd_nQueue fd_read fd_write fd_stdin fd_stdout fd_stderr - } >&${fd_stderr} + declare -p fdA + } >&${fdA['stderr']} # # # # # WAIT FOR THEM TO FINISH # # # # # # # # PRINT OUTPUT IF ORDERED # # # - if ${nOrderFlag}; then + if ${flagsA['nOrder']}; then # initialize real-time printing of ordered output as forkrun runs outCur=10 - continueFlag=true + flagsA['continue']=true outPrint=() - while ${continueFlag}; do + while ${flagsA['continue']}; do # read order indices that are done running. - read -r -u ${fd_nOrder0} + read -r -u ${fdA['nOrder0']} case "${REPLY}" in +([0-9])) # index has an output file @@ -1176,7 +1179,7 @@ p_PID+=(\${p{<#>}_PID})""" )" ;; '') # end condition was met - continueFlag=false + flagsA['continue']=false break ;; esac @@ -1208,25 +1211,25 @@ p_PID+=(\${p{<#>}_PID})""" )" } # check for end condition - [[ -f "${tmpDir}"/.quit ]] && { continueFlag=false; break; } + [[ -f "${tmpDir}"/.quit ]] && { flagsA['continue']=false; break; } done fi # wait for coprocs to finish - (( ${verboseLevel} > 1 )) && printf '\n\nWAITING FOR WORKER COPROCS TO FINISH\n\n' >&${fd_stderr} + (( ${verboseLevel} > 1 )) && printf '\n\nWAITING FOR WORKER COPROCS TO FINISH\n\n' >&${fdA['stderr']} #p_PID=($(_forkrun_rmdups "${p_PID[@]}" $(cat /dev/null))) p_PID+=($(cat /dev/null)) wait "${p_PID[@]}" "${pQueue_PID}" &>/dev/null; # print final nLines count (( ${verboseLevel} > 1 )) && { - ${nLinesAutoFlag} && printf 'nLines (final) = %s ( max = %s )\n' "$(<"${tmpDir}"/.nLines)" "${nLinesMax}" - ${nQueueFlag} && printf 'final worker process count: %s ( min read queue: %s )\n' "$(<"${tmpDir}"/.nWorkers)" "${nQueueMin}" - } >&${fd_stderr} + ${flagsA['nLinesAuto']} && printf 'nLines (final) = %s ( max = %s )\n' "$(<"${tmpDir}"/.nLines)" "${nLinesMax}" + ${flagsA['nQueue']} && printf 'final worker process count: %s ( min read queue: %s )\n' "$(<"${tmpDir}"/.nWorkers)" "${nQueueMin}" + } >&${fdA['stderr']} # open anonymous pipes + other misc file descriptors for the above code block - ) {fd_continue}<><(:) {fd_inotify}<><(:) {fd_nAuto}<><(:) {fd_nOrder}<><(:) {fd_nOrder0}<><(:) {fd_nQueue}<><(:) {fd_read}<"${fPath}" {fd_read0}<"${fPath}" {fd_write}>"${fPath}" {fd_stdin}<&${fd_stdin0} {fd_stdout}>&1 {fd_stderr}>&2 + ) {fdA['continue']}<><(:) {fdA['inotify']}<><(:) {fdA['nAuto']}<><(:) {fdA['nOrder']}<><(:) {fdA['nOrder0']}<><(:) {fdA['nQueue']}<><(:) {fdA['read']}<"${fPath}" {fdA['read0']}<"${fPath}" {fdA['write']}>"${fPath}" {fdA['stdin']}<&${fdA['stdin0']} {fdA['stdout']}>&1 {fdA['stderr']}>&2 }