/**###################################################################**/ /* */ /* Copyright Bartosz Jablonski, since October 2018. */ /* */ /* Code is free and open source. If you want - you can use it. */ /* But it comes with absolutely no warranty whatsoever. */ /* If you cause any damage or something - it will be your own fault. */ /* You've been warned! You are using it on your own risk. */ /* However, if you decide to use it don't forget to mention author: */ /* Bartosz Jablonski (yabwon@gmail.com) */ /* */ /**###################################################################**/ /* The purpose of following code is to show how to execute independent SAS codes in parallel. I had opportunity to utilise this code in production environment in several projects and every time it reduced processing time (in one case reduction was ~92% :-) Additional advantage is that nothing more than "plain" BASE SAS is required to run it. One side note: Parallel execution improves performance and decreases processing time and is good in most cases. But before you use it verify if time overhead for dividing task into smaller ones isn't longer than overall processing ;-) contact info: Bartosz Jabłoński (yabwon@gmail.com) key words: PARALLEL, THREADS, SASROOT, DLCREATEDIR, DLGCDIR, PROC CATALOG, SYSTASK, NOWAIT */ %put %sysfunc(getoption(XCMD)); /* XCMD/NOXCMD */ /* Let's assume that we have this input dataset named HAVE which "drives" our calculations e.g. each observation in HAVE contains a list of parameters for some bigger process (like some %macro to be run by Call Execute or DoSubL). In our case "parameter" is one (the variable I) but we have it this way just for code simplicity. */ data have; do I = 1 to 12; output; end; run; %let un=1; /* time_unit, is just for simulation purpose (i.e. to help simulate "long" lasting calculations), value 1 means one second */ /* Note: through all code I'm using this "macro style" time measuring with _macro_start_ and _macro_end_ variables. */ %let _macro_start_ = %sysfunc(datetime()); %put NOTE:[&SYSMACRONAME.] START at %sysfunc(abs(&_macro_start_.),datetime21.); /* So, we have the HAVE dataset which contains parameters for series of independent and small but "multiply repetitive" calculations which are to be performed for each observation in it. */ data _want_V1_; set have; _rc_ = sleep(1,&un.); j = i**2; put i= j=; drop _rc_; /* This line represents "independent and small calculation" which will be repeated multiple times, for example: Call Execute('%SomeMacroWhichExecutesABlockOfCodeWithSome(Parameters)') [the sleep() function pretends such call execute quite well] */ run; %let _macro_end_ = %sysfunc(datetime()); %put NOTE:[&SYSMACRONAME.] END at %sysfunc(abs(&_macro_end_.),datetime21.); %put NOTE:[&SYSMACRONAME.] Processing time: %sysevalf(&_macro_end_. - &_macro_start_.) sec.; /* When you look into the log you will see that it takes ~ 12*time_unit seconds to finish */ proc print data = _want_V1_; run; %let _macro_start_ = %sysfunc(datetime()); %put NOTE:[&SYSMACRONAME.] START at %sysfunc(abs(&_macro_start_.),datetime21.); /* Since we have series of INDEPENDENT and small but "multiply repetitive" calculations we can divided process into separate data steps. At this point there is no gain in time but we have threads. */ data _null_0; /* Thread 0 */ set have (where=( mod(i,3) = 0 )); _rc_ = sleep(1,&un.); j = i**2; drop _rc_; run; data _null_1; /* Thread 1 */ set have (where=( mod(i,3) = 1 )); _rc_ = sleep(1,&un.); j = i**2; drop _rc_; run; data _null_2; /* Thread 2 */ set have (where=( mod(i,3) = 2 )); _rc_ = sleep(1,&un.); j = i**2; drop _rc_; run; data _want_V2_; /* Here we are combining threads' results */ set _null_0 _null_1 _null_2; by i; run; %let _macro_end_ = %sysfunc(datetime()); %put NOTE:[&SYSMACRONAME.] END at %sysfunc(abs(&_macro_end_.),datetime21.); %put NOTE:[&SYSMACRONAME.] Processing time: %sysevalf(&_macro_end_. - &_macro_start_.) sec.; /* Time is still ~ 12*time_unit seconds */ proc print data = _want_V2_; run; /* Now we can "wrap" a thread into small macro */ %macro parallel(Threads_No, Thread); %local _macro_start_; %let _macro_start_ = %sysfunc(datetime()); %put NOTE:[&SYSMACRONAME.] START at %sysfunc(abs(&_macro_start_.),datetime21.); data _null_&Thread.; set have (where=( mod(i,&Threads_No.) = &Thread. )); _rc_ = sleep(1,&un.); j = i**2; drop _rc_; run; %local _macro_end_; %let _macro_end_ = %sysfunc(datetime()); %put NOTE:[&SYSMACRONAME.] END at %sysfunc(abs(&_macro_end_.),datetime21.); %put NOTE:[&SYSMACRONAME.] Processing time: %sysevalf(&_macro_end_. - &_macro_start_.) sec.; %mend parallel; %let _macro_start_ = %sysfunc(datetime()); %put NOTE:[&SYSMACRONAME.] START at %sysfunc(abs(&_macro_start_.),datetime21.); /* And we have series of INDEPENDENT and small but "multiply repetitive" calculations divided into threads and wrapped into macros */ %parallel(4, 0); %parallel(4, 1); %parallel(4, 2); %parallel(4, 3); data _want_V3_; set _null_0 _null_1 _null_2 _null_3; by i; run; %let _macro_end_ = %sysfunc(datetime()); %put NOTE:[&SYSMACRONAME.] END at %sysfunc(abs(&_macro_end_.),datetime21.); %put NOTE:[&SYSMACRONAME.] Processing time: %sysevalf(&_macro_end_. - &_macro_start_.) sec.; /* Time is still ~ 12*time_unit seconds */ proc print data = _want_V3_; run; /* - How to run it in parallel instead of sequentially? - Use SAS/Connect to parallelize it via TCP/IP. - But I have only BASE SAS... :-/ - Hmm... ... Ah, yes! BASE will do. :-) */ /* Here are steps you need to perform to run it in parallel */ /* 1) Localize your SAS.exe */ filename sasroot "!SASROOT"; %let SASROOT=%sysfunc(PATHNAME(sasroot)); %put *&=SASROOT.*; filename sasroot; %let SlasH = %qsysfunc(ifc(%bquote(&SYSSCP.)=WIN,\,/)); %let Ext = %qsysfunc(ifc(%bquote(&SYSSCP.)=WIN,.exe,)); %put *&=SYSSCP.*&=SlasH.*&=Ext.; /* folders separator and binary */ %let SASEXE=&SASROOT.&SlasH.sas&Ext.; %put *&SASEXE.*; /* 2) Define folders for data exchange ... */ %let SASWORK=%sysfunc(GETOPTION(work)); %put *&SASWORK.*; options DLCREATEDIR; /* useful option - turns-on creation of subdirectories */ libname FOR_PRLL "&SASWORK.&SlasH.PARALELL_IN"; libname OUT_PRLL "&SASWORK.&SlasH.PARALELL_OUT"; /* 2.5) ...and clear them */ proc datasets lib=FOR_PRLL KILL NOLIST MEMTYPE=DATA; run; quit; proc datasets lib=OUT_PRLL KILL NOLIST MEMTYPE=DATA; run; quit; /* 3) Copy your data (if needed) to exchange library */ data FOR_PRLL.have; set have; run; /* 4) prepare/extend the PARALLEL macro and compile it (to have it stored in work.sasmacr catalog) */ libname ctlgmin "&SASWORK."; options mstored sasmstore=ctlgmin; /* force storage in work.sasmacr catalog (it makes difference if you use SAS EG)*/ %macro parallel( Threads_No , Thread , PATH=; ) / store; %local Threads_No Thread PATH SlasH; %local _macro_start_; %let _macro_start_ = %sysfunc(datetime()); %put NOTE:[&SYSMACRONAME.] START at %sysfunc(abs(&_macro_start_.),datetime21.); %let SlasH = %qsysfunc(ifc(%bquote(&SYSSCP.)=WIN,\,/)); libname FOR_PRLL "&PATH.&SlasH.PARALELL_IN " ACCESS=READONLY; libname OUT_PRLL "&PATH.&SlasH.PARALELL_OUT"; /*++++++++++++++++++++++++++++++++++++++++++++++++++++++++++*/ filename ttt "."; data OUT_PRLL._null_&Thread.; set FOR_PRLL.have (where=( mod(i,&Threads_No.) = &Thread. )); _rc_ = sleep(1,5); drop _rc_; j = i**2; /* Stuff below is collected just to show you that it is really done in parallel :-) */ t = &Thread.; w = getoption("work"); l = getoption("log"); al = getoption("altlog"); p = pathname("ttt"); u = symget('sysuserid'); cz= put(datetime(), datetime21.); run; proc print; run; /* test macro showing that parallel can run other macros too */ %little_test_macro() /* This part is to test WAITFOR and error handling (see below) */ /* for Thread 0 - get an error */ /* for Thread 1 - get too long session */ /* uncomment for tests */ /* %if &Thread. = 0 OR &Thread. = 1 %then %do; data _null_; array a[&Thread.] (33); _rc_ = sleep(1,a[&Thread.]); stop; run; %end; */ /*++++++++++++++++++++++++++++++++++++++++++++++++++++++++++*/ %local _macro_end_; %let _macro_end_ = %sysfunc(datetime()); %put NOTE:[&SYSMACRONAME.] END at %sysfunc(abs(&_macro_end_.),datetime21.); %put NOTE:[&SYSMACRONAME.] Processing time: %sysevalf(&_macro_end_. - &_macro_start_.) sec.; %mend parallel; %macro little_test_macro()/STORE; /*macros used in parallel runs should be STOREd, so the will be saved in sasmacr catalog */ %put *******************************; %put JUST A TEST MACRO %str(:-%)); %put *******************************; %mend little_test_macro; /* 5) Prepare a macro to run execution in parallel */ %macro execute_in_parallel( Number_of_threads=%sysfunc(max(%eval(&SYSNCPU.-1), 1)) /* This parameter sets the number of threads */ ,listOfStoredMacros=parallel /* The list of macros copied to parallel env. */ ,sasmacrCatalog = sasmacr /* The default catalog with macros */ ,systaskTimeout = %sysevalf(2 * 3600) /* Max waiting time for systask in seconds */ ); %local _macro_start_; %let _macro_start_ = %sysfunc(datetime()); %put NOTE:[&SYSMACRONAME.] START at %sysfunc(abs(&_macro_start_.),datetime21.); %local Number_of_threads _Thr_; /* Create place for parallel sessions' codes */ options DLCREATEDIR; libname TMP_CODE "&SASWORK.&SlasH.TMP CODE"; filename PLK "%sysfunc(pathname(TMP_CODE))"; /* Loop 1 - to prepare codes */ %do _Thr_ = 0 %to %sysevalf(&Number_of_threads. - 1); %if &_Thr_. = 0 %then %do; PROC CATALOG FORCE ENTRYTYPE=macro; /* Copy macro catalog to avoid catalogs blocking by parallel sessions */ COPY in=work.&sasmacrCatalog. OUT=work.prll_&_Thr_. NEW; SELECT &listOfStoredMacros.; run; quit; %end; %else %do; filename f_in "%sysfunc(pathname(WORK))&SlasH.prll_0.sas7bcat" lrecl=1 recfm=F; filename f_out "%sysfunc(pathname(WORK))&SlasH.prll_&_Thr_..sas7bcat" lrecl=1 recfm=F; data _null_; rc = fcopy("f_in", "f_out"); run; filename f_in clear; filename f_out clear; %end; %end; %do _Thr_ = 0 %to %sysevalf(&Number_of_threads. - 1); /* Ccreate code to be executed by parallel sessions*/ data _null_; file PLK(PRLL&_Thr_..SAS); put " "; put "proc printto log=""%sysfunc(pathname(TMP_CODE))&SlasH.log&_Thr_..log""; run;"; /* you can redirect log */ put " "; put "libname ctlgin ""%sysfunc(pathname(work))""; "; /* <- double QUOTES !!! */ put 'libname ctlgout "%sysfunc(pathname(work))" ; '; /* <- single QUOTES !!! */ put " "; put "PROC CATALOG FORCE ENTRYTYPE=macro; "; /* copy catalog with compiled macro */ put " COPY in=ctlgin.prll_&_Thr_. OUT=ctlgout.sasmacr;"; put " SELECT &listOfStoredMacros.;"; put " run;"; put "quit; "; put "options mstored sasmstore=ctlgout;"; put " "; put '%parallel('; put " &Number_of_threads."; put ",&_Thr_."; /* Thread numbers, starting from 0 */ put ",PATH=%sysfunc(pathname(work))"; /* Set to main session work folder */ put ');'; put " "; run; %end; /* Change current directory location */ %put *%sysfunc(pathname(PLK))*; /*x "cd %sysfunc(pathname(PLK))";*/ /*previous version 1*/ /* -SASINITIALFOLDER ""%sysfunc(pathname(PLK))"" */ /*previous version 2*/ %put *%sysfunc(DLGCDIR(%sysfunc(pathname(PLK))))*; /* DLGCDIR changes current directory */ /* Time for SYSTASK */ /* Loop 2 - clear list of tasks to have names for threads */ systask kill %do _Thr_ = 0 %to %sysevalf(&Number_of_threads. - 1); sas&_Thr_. %end; wait ; /* Loop 3 - run parallel threads - with NOWAIT option - named sas0, ..., sasN */ %do _Thr_ = 0 %to %sysevalf(&Number_of_threads. - 1); systask command """&SASEXE."" -sysin "".&SlasH.PRLL&_Thr_..SAS"" -print "".&SlasH.lst&_Thr_..lst"" -log "".&SlasH.log&_Thr_..log"" -config ""&SASROOT.&SlasH.sasv9.cfg"" -noterminal -rsasuser" /* optionally add: -realmemsize, -memsize, -sortsize, -sumsize */ taskname=sas&_Thr_. status=sasstat&_Thr_. NOWAIT ; %end; /* Loop 4 - wait N seconds (TIMEOUT) for ALL threads */ waitfor _all_ %do _Thr_ = 0 %to %sysevalf(&Number_of_threads. - 1); sas&_Thr_. %end; timeout = &systaskTimeout. ; %let waitfor_status = &SYSRC.; /* collect exit status of WAITFOR */ /* Loop 5 - print out statuses */ %put *waitfor_status*&waitfor_status.*; %do _Thr_ = 0 %to %sysevalf(&Number_of_threads. - 1); %put *sasstat&_Thr_.*&&sasstat&_Thr_.*; %end; filename PLK; %local _macro_end_; %let _macro_end_ = %sysfunc(datetime()); %put NOTE:[&SYSMACRONAME.] END at %sysfunc(abs(&_macro_end_.),datetime21.); %put NOTE:[&SYSMACRONAME.] Processing time: %sysevalf(&_macro_end_. - &_macro_start_.) sec.; %mend execute_in_parallel; options mprint nosymbolgen nomlogic; %execute_in_parallel( Number_of_threads = 3 ,listOfStoredMacros = parallel little_test_macro ,systaskTimeout = 30 ); data _want_V4_; /* Collect results (if needed) */ set OUT_PRLL._null_:; by i; run; proc print; run; /* options nomstored sasmstore=work; libname ctlgmin clear; */