diff --git a/development/refresh_sources.sh b/development/refresh_sources.sh index b6ef51ef9..83518d7cc 100755 --- a/development/refresh_sources.sh +++ b/development/refresh_sources.sh @@ -12,7 +12,7 @@ git clone --depth=1 --branch=${SELFTESTING_BRANCH:-main} https://github.com/utPL rm -rf utPLSQL-cli/* # download latest release version of utPLSQL-cli -curl -Lk -o utPLSQL-cli.zip https://github.com/utPLSQL/utPLSQL-cli/releases/download/v${UTPLSQL_CLI_VERSION}/utPLSQL-cli.zip +curl -Lk -o utPLSQL-cli.zip https://github.com/utPLSQL/utPLSQL-cli/releases/download/${UTPLSQL_CLI_VERSION}/utPLSQL-cli.zip # unzip utPLSQL-cli and remove the zip file unzip utPLSQL-cli.zip && chmod u+x utPLSQL-cli/bin/utplsql && rm utPLSQL-cli.zip diff --git a/docs/userguide/install.md b/docs/userguide/install.md index ad3bfe97e..278bad788 100644 --- a/docs/userguide/install.md +++ b/docs/userguide/install.md @@ -121,7 +121,7 @@ The headless scripts accept three optional parameters that define: The scripts need to be executed by `SYSDBA`, in order to grant access to `DBMS_LOCK` and `DBMS_CRYPTO` system packages. !!! warning "Important" - - Grant on `DBMS_LOCK` is required only for installation on Oracle versions below 18c. For versions 18c and above, utPLSQL uses `DBMS_SESSION.SLEEP` so access to `DBMS_LOCK` package is no longer needed.
+ - `DBMS_LOCK` is required for session synchronization between main session and session consuming realtime reports.
- The user performing the installation must have the `ADMINISTER DATABASE TRIGGER` privilege. This is required for installation of trigger that is responsible for parsing annotations at at compile-time of a package.
- When installed with DDL trigger, utPLSQL will not be registering unit tests for any of oracle-maintained schemas.
- For Oracle 11g following users are excluded:
diff --git a/examples/developer_examples/RunExampleTestSuiteWithCompositeReporter.sql b/examples/developer_examples/RunExampleTestSuiteWithCompositeReporter.sql index 468d176ed..e713fa02d 100644 --- a/examples/developer_examples/RunExampleTestSuiteWithCompositeReporter.sql +++ b/examples/developer_examples/RunExampleTestSuiteWithCompositeReporter.sql @@ -20,6 +20,7 @@ begin ut_event_manager.initialize(); ut_event_manager.add_listener(l_doc_reporter); ut_event_manager.add_listener(l_tc_reporter); + ut_event_manager.trigger_event(ut_event_manager.gc_initialize, l_run); l_suite := ut_suite(user, 'ut_exampletest',a_line_no=>1); l_suite.description := 'Test Suite Name'; diff --git a/source/core/coverage/ut_coverage_reporter_base.tpb b/source/core/coverage/ut_coverage_reporter_base.tpb index aff4e6560..da2bd27ad 100644 --- a/source/core/coverage/ut_coverage_reporter_base.tpb +++ b/source/core/coverage/ut_coverage_reporter_base.tpb @@ -92,7 +92,6 @@ create or replace type body ut_coverage_reporter_base is ut_coverage_helper.cleanup_tmp_table(); (l_reporter as ut_output_reporter_base).before_calling_run(null); l_reporter.after_calling_run( ut_run( a_coverage_options => a_coverage_options, a_client_character_set => a_client_character_set ) ); - l_reporter.on_finalize(null); for i in (select /*+ no_parallel */ x.text from table(l_reporter.get_lines(1, 1)) x ) loop pipe row (i.text); end loop; @@ -106,7 +105,6 @@ create or replace type body ut_coverage_reporter_base is ut_coverage_helper.cleanup_tmp_table(); (l_reporter as ut_output_reporter_base).before_calling_run(null); l_reporter.after_calling_run( ut_run( a_coverage_options => a_coverage_options, a_client_character_set => a_client_character_set ) ); - l_reporter.on_finalize(null); open l_result for select /*+ no_parallel */ x.text from table(l_reporter.get_lines(1, 1)) x; return l_result; end; diff --git a/source/core/output_buffers/ut_output_buffer_base.tpb b/source/core/output_buffers/ut_output_buffer_base.tpb index 6d7964150..be4c92bd8 100644 --- a/source/core/output_buffers/ut_output_buffer_base.tpb +++ b/source/core/output_buffers/ut_output_buffer_base.tpb @@ -24,7 +24,7 @@ create or replace type body ut_output_buffer_base is self.self_type := coalesce(a_self_type,self.self_type); self.output_id := coalesce(a_output_id, self.output_id, sys_guid()); self.start_date := coalesce(self.start_date, sysdate); - self.last_message_id := 0; + self.last_write_message_id := 0; select /*+ no_parallel */ count(*) into l_exists from ut_output_buffer_info_tmp where output_id = self.output_id; if ( l_exists > 0 ) then update /*+ no_parallel */ ut_output_buffer_info_tmp set start_date = self.start_date where output_id = self.output_id; @@ -32,10 +32,86 @@ create or replace type body ut_output_buffer_base is insert /*+ no_parallel */ into ut_output_buffer_info_tmp(output_id, start_date) values (self.output_id, self.start_date); end if; commit; + dbms_lock.allocate_unique( self.output_id, self.lock_handle); self.is_closed := 0; end; - member function get_lines_cursor(a_initial_timeout natural := null, a_timeout_sec natural := null) return sys_refcursor is + member procedure lock_buffer(a_timeout_sec number := null) is + l_status integer; + begin + l_status := dbms_lock.request( self.lock_handle, dbms_lock.x_mode, 5, false ); + if l_status != 0 then + raise_application_error(-20000, 'Cannot allocate lock for output buffer of reporter. lock request status = '||l_status||', lock handle = '||self.lock_handle||', self.output_id ='||self.output_id); + end if; + end; + + member procedure close(self in out nocopy ut_output_buffer_base) is + l_status integer; + begin + l_status := dbms_lock.release( self.lock_handle ); + if l_status != 0 then + raise_application_error(-20000, 'Cannot release lock for output buffer of reporter. Lock_handle = '||self.lock_handle||' status = '||l_status); + end if; + self.is_closed := 1; + end; + + + member procedure remove_buffer_info(self in ut_output_buffer_base) is + pragma autonomous_transaction; + begin + delete from ut_output_buffer_info_tmp a + where a.output_id = self.output_id; + commit; + end; + + member function timeout_producer_not_started( a_producer_started boolean, a_already_waited_sec number, a_init_wait_sec number ) return boolean + is + l_result boolean := false; + begin + if not a_producer_started and a_already_waited_sec >= a_init_wait_sec then + if a_init_wait_sec > 0 then + self.remove_buffer_info(); + raise_application_error( + ut_utils.gc_out_buffer_timeout, + 'Timeout occurred while waiting for report data producer to start. Waited for: '||ut_utils.to_string( a_already_waited_sec )||' seconds.' + ); + else + l_result := true; + end if; + end if; + return l_result; + end; + + member function timeout_producer_not_finished(a_producer_finished boolean, a_already_waited_sec number, a_timeout_sec number) return boolean + is + l_result boolean := false; + begin + if not a_producer_finished and a_timeout_sec is not null and a_already_waited_sec >= a_timeout_sec then + if a_timeout_sec > 0 then + self.remove_buffer_info(); + raise_application_error( + ut_utils.gc_out_buffer_timeout, + 'Timeout occurred while waiting for more data from producer. Waited for: '||ut_utils.to_string( a_already_waited_sec )||' seconds.' + ); + else + l_result := true; + end if; + end if; + return l_result; + end; + + member function get_lock_status return integer is + l_result integer; + l_release_status integer; + begin + l_result := dbms_lock.request( self.lock_handle, dbms_lock.s_mode, 0, false ); + if l_result = 0 then + l_release_status := dbms_lock.release( self.lock_handle ); + end if; + return l_result; + end; + + member function get_lines_cursor(a_initial_timeout number := null, a_timeout_sec number := null) return sys_refcursor is l_lines sys_refcursor; begin open l_lines for @@ -44,7 +120,7 @@ create or replace type body ut_output_buffer_base is return l_lines; end; - member procedure lines_to_dbms_output(self in ut_output_buffer_base, a_initial_timeout natural := null, a_timeout_sec natural := null) is + member procedure lines_to_dbms_output(self in ut_output_buffer_base, a_initial_timeout number := null, a_timeout_sec number := null) is l_data sys_refcursor; l_clob clob; l_item_type varchar2(32767); @@ -54,16 +130,20 @@ create or replace type body ut_output_buffer_base is loop fetch l_data into l_clob, l_item_type; exit when l_data%notfound; - l_lines := ut_utils.clob_to_table(l_clob); - for i in 1 .. l_lines.count loop - dbms_output.put_line(l_lines(i)); - end loop; + if dbms_lob.getlength(l_clob) > 32767 then + l_lines := ut_utils.clob_to_table(l_clob); + for i in 1 .. l_lines.count loop + dbms_output.put_line(l_lines(i)); + end loop; + else + dbms_output.put_line(l_clob); + end if; end loop; close l_data; end; member procedure cleanup_buffer(self in ut_output_buffer_base, a_retention_time_sec natural := null) is - gc_buffer_retention_sec constant naturaln := coalesce(a_retention_time_sec, 60 * 60 * 24); -- 24 hours + gc_buffer_retention_sec constant naturaln := coalesce(a_retention_time_sec, 60 * 60 * 24 * 5); -- 5 days l_retention_days number := gc_buffer_retention_sec / (60 * 60 * 24); l_max_retention_date date := sysdate - l_retention_days; pragma autonomous_transaction; diff --git a/source/core/output_buffers/ut_output_buffer_base.tps b/source/core/output_buffers/ut_output_buffer_base.tps index 98a6847cd..53be365ae 100644 --- a/source/core/output_buffers/ut_output_buffer_base.tps +++ b/source/core/output_buffers/ut_output_buffer_base.tps @@ -19,16 +19,22 @@ create or replace type ut_output_buffer_base force authid definer as object( output_id raw(32), is_closed number(1,0), start_date date, - last_message_id number(38,0), + last_write_message_id number(38,0), + lock_handle varchar2(30 byte), self_type varchar2(250 byte), member procedure init(self in out nocopy ut_output_buffer_base, a_output_id raw := null, a_self_type varchar2 := null), - member function get_lines_cursor(a_initial_timeout natural := null, a_timeout_sec natural := null) return sys_refcursor, - member procedure lines_to_dbms_output(self in ut_output_buffer_base, a_initial_timeout natural := null, a_timeout_sec natural := null), + member procedure lock_buffer(a_timeout_sec number := null), + member function timeout_producer_not_started( a_producer_started boolean, a_already_waited_sec number, a_init_wait_sec number ) return boolean, + member function timeout_producer_not_finished(a_producer_finished boolean, a_already_waited_sec number, a_timeout_sec number) return boolean, + member function get_lock_status return integer, + member function get_lines_cursor(a_initial_timeout number := null, a_timeout_sec number := null) return sys_refcursor, + member procedure lines_to_dbms_output(self in ut_output_buffer_base, a_initial_timeout number := null, a_timeout_sec number := null), member procedure cleanup_buffer(self in ut_output_buffer_base, a_retention_time_sec natural := null), - not instantiable member procedure close(self in out nocopy ut_output_buffer_base), + member procedure remove_buffer_info(self in ut_output_buffer_base), + member procedure close(self in out nocopy ut_output_buffer_base), not instantiable member procedure send_line(self in out nocopy ut_output_buffer_base, a_text varchar2, a_item_type varchar2 := null), not instantiable member procedure send_lines(self in out nocopy ut_output_buffer_base, a_text_list ut_varchar2_rows, a_item_type varchar2 := null), not instantiable member procedure send_clob(self in out nocopy ut_output_buffer_base, a_text clob, a_item_type varchar2 := null), - not instantiable member function get_lines(a_initial_timeout natural := null, a_timeout_sec natural := null) return ut_output_data_rows pipelined + not instantiable member function get_lines(a_initial_timeout number := null, a_timeout_sec number := null) return ut_output_data_rows pipelined ) not final not instantiable / diff --git a/source/core/output_buffers/ut_output_clob_table_buffer.tpb b/source/core/output_buffers/ut_output_clob_table_buffer.tpb index 66ff71c62..c4cfdb059 100644 --- a/source/core/output_buffers/ut_output_clob_table_buffer.tpb +++ b/source/core/output_buffers/ut_output_clob_table_buffer.tpb @@ -22,23 +22,13 @@ create or replace type body ut_output_clob_table_buffer is return; end; - overriding member procedure close(self in out nocopy ut_output_clob_table_buffer) is - pragma autonomous_transaction; - begin - self.last_message_id := self.last_message_id + 1; - insert /*+ no_parallel */ into ut_output_clob_buffer_tmp(output_id, message_id, is_finished) - values (self.output_id, self.last_message_id, 1); - commit; - self.is_closed := 1; - end; - overriding member procedure send_line(self in out nocopy ut_output_clob_table_buffer, a_text varchar2, a_item_type varchar2 := null) is pragma autonomous_transaction; begin if a_text is not null or a_item_type is not null then - self.last_message_id := self.last_message_id + 1; + self.last_write_message_id := self.last_write_message_id + 1; insert /*+ no_parallel */ into ut_output_clob_buffer_tmp(output_id, message_id, text, item_type) - values (self.output_id, self.last_message_id, a_text, a_item_type); + values (self.output_id, self.last_write_message_id, a_text, a_item_type); end if; commit; end; @@ -47,10 +37,10 @@ create or replace type body ut_output_clob_table_buffer is pragma autonomous_transaction; begin insert /*+ no_parallel */ into ut_output_clob_buffer_tmp(output_id, message_id, text, item_type) - select /*+ no_parallel */ self.output_id, self.last_message_id + rownum, t.column_value, a_item_type + select /*+ no_parallel */ self.output_id, self.last_write_message_id + rownum, t.column_value, a_item_type from table(a_text_list) t where t.column_value is not null or a_item_type is not null; - self.last_message_id := self.last_message_id + SQL%rowcount; + self.last_write_message_id := self.last_write_message_id + SQL%rowcount; commit; end; @@ -58,98 +48,92 @@ create or replace type body ut_output_clob_table_buffer is pragma autonomous_transaction; begin if a_text is not null and a_text != empty_clob() or a_item_type is not null then - self.last_message_id := self.last_message_id + 1; + self.last_write_message_id := self.last_write_message_id + 1; insert /*+ no_parallel */ into ut_output_clob_buffer_tmp(output_id, message_id, text, item_type) - values (self.output_id, self.last_message_id, a_text, a_item_type); + values (self.output_id, self.last_write_message_id, a_text, a_item_type); end if; commit; end; - overriding member function get_lines(a_initial_timeout natural := null, a_timeout_sec natural := null) return ut_output_data_rows pipelined is - type t_rowid_tab is table of urowid; - l_message_rowids t_rowid_tab; - l_buffer_data ut_output_data_rows; - l_finished_flags ut_integer_list; - l_already_waited_for number(10,2) := 0; - l_finished boolean := false; - lc_init_wait_sec constant naturaln := coalesce(a_initial_timeout, 60 ); -- 1 minute - lc_max_wait_sec constant naturaln := coalesce(a_timeout_sec, 60 * 60 * 4); -- 4 hours - l_wait_for integer := lc_init_wait_sec; - lc_short_sleep_time constant number(1,1) := 0.1; --sleep for 100 ms between checks - lc_long_sleep_time constant number(1) := 1; --sleep for 1 s when waiting long - lc_long_wait_time constant number(1) := 1; --waiting more than 1 sec - l_sleep_time number(2,1) := lc_short_sleep_time; - lc_bulk_limit constant integer := 5000; - l_max_message_id integer := lc_bulk_limit; - - procedure remove_read_data(a_message_rowids t_rowid_tab) is - pragma autonomous_transaction; - begin - forall i in 1 .. a_message_rowids.count - delete from ut_output_clob_buffer_tmp a - where rowid = a_message_rowids(i); - commit; - end; - - procedure remove_buffer_info is - pragma autonomous_transaction; - begin - delete from ut_output_buffer_info_tmp a - where a.output_id = self.output_id; - commit; - end; - + overriding member function get_lines(a_initial_timeout number := null, a_timeout_sec number := null) return ut_output_data_rows pipelined is + lc_init_wait_sec constant number := coalesce(a_initial_timeout, 10 ); + l_buffer_rowids ut_varchar2_rows; + l_buffer_data ut_output_data_rows; + l_finished_flags ut_integer_list; + l_last_read_message_id integer; + l_already_waited_sec number(10,2) := 0; + l_finished boolean := false; + l_sleep_time number(2,1); + l_lock_status integer; + l_producer_started boolean := false; + l_producer_finished boolean := false; + procedure get_data_from_buffer_table( + a_last_read_message_id in out nocopy integer, + a_buffer_data out nocopy ut_output_data_rows, + a_buffer_rowids out nocopy ut_varchar2_rows, + a_finished_flags out nocopy ut_integer_list + ) is + lc_bulk_limit constant integer := 5000; begin - while not l_finished loop + a_last_read_message_id := coalesce(a_last_read_message_id, 0); with ordered_buffer as ( - select /*+ no_parallel index(a) */ a.rowid, ut_output_data_row(a.text, a.item_type), is_finished + select /*+ no_parallel index(a) */ ut_output_data_row(a.text, a.item_type), rowidtochar(a.rowid), is_finished from ut_output_clob_buffer_tmp a where a.output_id = self.output_id - and a.message_id <= l_max_message_id + and a.message_id <= a_last_read_message_id + lc_bulk_limit order by a.message_id ) select /*+ no_parallel */ b.* - bulk collect into l_message_rowids, l_buffer_data, l_finished_flags + bulk collect into a_buffer_data, a_buffer_rowids, a_finished_flags from ordered_buffer b; + a_last_read_message_id := a_last_read_message_id + a_finished_flags.count; + end; - --nothing fetched from output, wait and try again - if l_buffer_data.count = 0 then - $if dbms_db_version.version >= 18 $then - dbms_session.sleep(l_sleep_time); - $else - dbms_lock.sleep(l_sleep_time); - $end - l_already_waited_for := l_already_waited_for + l_sleep_time; - if l_already_waited_for > lc_long_wait_time then - l_sleep_time := lc_long_sleep_time; - end if; - else - --reset wait time - -- we wait lc_max_wait_sec for new message - l_wait_for := lc_max_wait_sec; - l_already_waited_for := 0; - l_sleep_time := lc_short_sleep_time; + procedure remove_read_data(a_buffer_rowids ut_varchar2_rows) is + pragma autonomous_transaction; + begin + forall i in 1 .. a_buffer_rowids.count + delete from ut_output_clob_buffer_tmp a + where rowid = chartorowid(a_buffer_rowids(i)); + commit; + end; + + begin + while not l_finished loop + + l_sleep_time := case when l_already_waited_sec >= 1 then 0.5 else 0.1 end; + l_lock_status := self.get_lock_status(); + get_data_from_buffer_table( l_last_read_message_id, l_buffer_data, l_buffer_rowids, l_finished_flags ); + + if l_buffer_data.count > 0 then + l_already_waited_sec := 0; for i in 1 .. l_buffer_data.count loop if l_buffer_data(i).text is not null then - pipe row(l_buffer_data(i)); + pipe row( l_buffer_data(i) ); elsif l_finished_flags(i) = 1 then l_finished := true; exit; end if; end loop; - remove_read_data(l_message_rowids); - l_max_message_id := l_max_message_id + lc_bulk_limit; - end if; - if l_finished or l_already_waited_for >= l_wait_for then - remove_buffer_info(); - if l_already_waited_for > 0 and l_already_waited_for >= l_wait_for then - raise_application_error( - ut_utils.gc_out_buffer_timeout, - 'Timeout occurred while waiting for output data. Waited for: '||l_already_waited_for||' seconds.' - ); - end if; + remove_read_data(l_buffer_rowids); + else + --nothing fetched from output, wait. + dbms_lock.sleep(l_sleep_time); + l_already_waited_sec := l_already_waited_sec + l_sleep_time; end if; + + l_producer_started := (l_lock_status <> 0 or l_buffer_data.count > 0) or l_producer_started; + l_producer_finished := (l_producer_started and l_lock_status = 0 and l_buffer_data.count = 0) or l_producer_finished; + + l_finished := + self.timeout_producer_not_finished(l_producer_finished, l_already_waited_sec, a_timeout_sec) + or self.timeout_producer_not_started(l_producer_started, l_already_waited_sec, lc_init_wait_sec) + or l_producer_finished + or l_finished; + end loop; + + self.remove_buffer_info(); return; end; diff --git a/source/core/output_buffers/ut_output_clob_table_buffer.tps b/source/core/output_buffers/ut_output_clob_table_buffer.tps index 7b98efaba..191e64c01 100644 --- a/source/core/output_buffers/ut_output_clob_table_buffer.tps +++ b/source/core/output_buffers/ut_output_clob_table_buffer.tps @@ -20,7 +20,6 @@ create or replace type ut_output_clob_table_buffer under ut_output_buffer_base ( overriding member procedure send_line(self in out nocopy ut_output_clob_table_buffer, a_text varchar2, a_item_type varchar2 := null), overriding member procedure send_lines(self in out nocopy ut_output_clob_table_buffer, a_text_list ut_varchar2_rows, a_item_type varchar2 := null), overriding member procedure send_clob(self in out nocopy ut_output_clob_table_buffer, a_text clob, a_item_type varchar2 := null), - overriding member procedure close(self in out nocopy ut_output_clob_table_buffer), - overriding member function get_lines(a_initial_timeout natural := null, a_timeout_sec natural := null) return ut_output_data_rows pipelined + overriding member function get_lines(a_initial_timeout number := null, a_timeout_sec number := null) return ut_output_data_rows pipelined ) not final / diff --git a/source/core/output_buffers/ut_output_table_buffer.tpb b/source/core/output_buffers/ut_output_table_buffer.tpb index 1809a49d5..f38363e49 100644 --- a/source/core/output_buffers/ut_output_table_buffer.tpb +++ b/source/core/output_buffers/ut_output_table_buffer.tpb @@ -22,16 +22,6 @@ create or replace type body ut_output_table_buffer is return; end; - overriding member procedure close(self in out nocopy ut_output_table_buffer) is - pragma autonomous_transaction; - begin - self.last_message_id := self.last_message_id + 1; - insert /*+ no_parallel */ into ut_output_buffer_tmp(output_id, message_id, is_finished) - values (self.output_id, self.last_message_id, 1); - commit; - self.is_closed := 1; - end; - overriding member procedure send_line(self in out nocopy ut_output_table_buffer, a_text varchar2, a_item_type varchar2 := null) is pragma autonomous_transaction; begin @@ -44,9 +34,9 @@ create or replace type body ut_output_table_buffer is a_item_type ); else - self.last_message_id := self.last_message_id + 1; + self.last_write_message_id := self.last_write_message_id + 1; insert /*+ no_parallel */ into ut_output_buffer_tmp(output_id, message_id, text, item_type) - values (self.output_id, self.last_message_id, a_text, a_item_type); + values (self.output_id, self.last_write_message_id, a_text, a_item_type); end if; commit; end if; @@ -56,10 +46,10 @@ create or replace type body ut_output_table_buffer is pragma autonomous_transaction; begin insert /*+ no_parallel */ into ut_output_buffer_tmp(output_id, message_id, text, item_type) - select /*+ no_parallel */ self.output_id, self.last_message_id + rownum, t.column_value, a_item_type + select /*+ no_parallel */ self.output_id, self.last_write_message_id + rownum, t.column_value, a_item_type from table(a_text_list) t where t.column_value is not null or a_item_type is not null; - self.last_message_id := self.last_message_id + SQL%rowcount; + self.last_write_message_id := self.last_write_message_id + SQL%rowcount; commit; end; @@ -75,99 +65,103 @@ create or replace type body ut_output_table_buffer is a_item_type ); else - self.last_message_id := self.last_message_id + 1; + self.last_write_message_id := self.last_write_message_id + 1; insert /*+ no_parallel */ into ut_output_buffer_tmp(output_id, message_id, text, item_type) - values (self.output_id, self.last_message_id, a_text, a_item_type); + values (self.output_id, self.last_write_message_id, a_text, a_item_type); end if; commit; end if; end; - overriding member function get_lines(a_initial_timeout natural := null, a_timeout_sec natural := null) return ut_output_data_rows pipelined is - l_buffer_data ut_varchar2_rows; - l_item_types ut_varchar2_rows; - l_finished_flags ut_integer_list; - l_already_waited_for number(10,2) := 0; - l_finished boolean := false; - lc_init_wait_sec constant naturaln := coalesce(a_initial_timeout, 60 ); -- 1 minute - lc_max_wait_sec constant naturaln := coalesce(a_timeout_sec, 60 * 60 * 4); -- 4 hours - l_wait_for integer := lc_init_wait_sec; - lc_short_sleep_time constant number(1,1) := 0.1; --sleep for 100 ms between checks - lc_long_sleep_time constant number(1) := 1; --sleep for 1 s when waiting long - lc_long_wait_time constant number(1) := 1; --waiting more than 1 sec - l_sleep_time number(2,1) := lc_short_sleep_time; - lc_bulk_limit constant integer := 5000; - l_max_message_id integer := lc_bulk_limit; - - procedure get_data_from_buffer( - a_max_message_id integer, - a_buffer_data out nocopy ut_varchar2_rows, - a_item_types out nocopy ut_varchar2_rows, - a_finished_flags out nocopy ut_integer_list - ) is - pragma autonomous_transaction; - begin - delete /*+ no_parallel */ from ( - select /*+ no_parallel */ * - from ut_output_buffer_tmp o - where o.output_id = self.output_id - and o.message_id <= a_max_message_id - order by o.message_id - ) d - returning d.text, d.item_type, d.is_finished - bulk collect into a_buffer_data, a_item_types, a_finished_flags; - commit; - - end; - - procedure remove_buffer_info is - pragma autonomous_transaction; - begin - delete from ut_output_buffer_info_tmp a - where a.output_id = self.output_id; - commit; - end; + overriding member procedure lines_to_dbms_output(self in ut_output_table_buffer, a_initial_timeout number := null, a_timeout_sec number := null) is + l_data sys_refcursor; + l_text varchar2(32767); + l_item_type varchar2(32767); + begin + l_data := self.get_lines_cursor(a_initial_timeout, a_timeout_sec); + loop + fetch l_data into l_text, l_item_type; + exit when l_data%notfound; + dbms_output.put_line(l_text); + end loop; + close l_data; + end; - begin + /* Important note. + This function code is almost duplicated between two types for performance reasons. + The pipe row clause is much faster on VARCHAR2 then it is on clob. + That is the key reason for two implementations. + */ + overriding member function get_lines(a_initial_timeout number := null, a_timeout_sec number := null) return ut_output_data_rows pipelined is + lc_init_wait_sec constant number := coalesce(a_initial_timeout, 10 ); + l_buffer_texts ut_varchar2_rows; + l_buffer_item_types ut_varchar2_rows; + l_finished_flags ut_integer_list; + l_last_read_message_id integer; + l_already_waited_sec number(10,2) := 0; + l_finished boolean := false; + l_sleep_time number(2,1); + l_lock_status integer; + l_producer_started boolean := false; + l_producer_finished boolean := false; + + procedure get_data_from_buffer_table( + a_last_read_message_id in out nocopy integer, + a_buffer_texts out nocopy ut_varchar2_rows, + a_buffer_item_types out nocopy ut_varchar2_rows, + a_finished_flags out nocopy ut_integer_list + ) is + lc_bulk_limit constant integer := 20000; + pragma autonomous_transaction; + begin + a_last_read_message_id := coalesce(a_last_read_message_id,0); + delete /*+ no_parallel */ from ( + select /*+ no_parallel */ * + from ut_output_buffer_tmp o + where o.output_id = self.output_id + and o.message_id <= a_last_read_message_id + lc_bulk_limit + order by o.message_id + ) d + returning d.text, d.item_type, d.is_finished + bulk collect into a_buffer_texts, a_buffer_item_types, a_finished_flags; + a_last_read_message_id := a_last_read_message_id + a_finished_flags.count; + commit; + end; + begin while not l_finished loop - get_data_from_buffer( l_max_message_id, l_buffer_data, l_item_types, l_finished_flags); - --nothing fetched from output, wait and try again - if l_buffer_data.count = 0 then - $if dbms_db_version.version >= 18 $then - dbms_session.sleep(l_sleep_time); - $else - dbms_lock.sleep(l_sleep_time); - $end - l_already_waited_for := l_already_waited_for + l_sleep_time; - if l_already_waited_for > lc_long_wait_time then - l_sleep_time := lc_long_sleep_time; - end if; - else - --reset wait time - -- we wait lc_max_wait_sec for new message - l_wait_for := lc_max_wait_sec; - l_already_waited_for := 0; - l_sleep_time := lc_short_sleep_time; - for i in 1 .. l_buffer_data.count loop - if l_buffer_data(i) is not null then - pipe row(ut_output_data_row(l_buffer_data(i),l_item_types(i))); + + l_sleep_time := case when l_already_waited_sec >= 1 then 0.5 else 0.1 end; + l_lock_status := self.get_lock_status(); + get_data_from_buffer_table( l_last_read_message_id, l_buffer_texts, l_buffer_item_types, l_finished_flags ); + + if l_buffer_texts.count > 0 then + l_already_waited_sec := 0; + for i in 1 .. l_buffer_texts.count loop + if l_buffer_texts(i) is not null then + pipe row( ut_output_data_row(l_buffer_texts(i), l_buffer_item_types(i)) ); elsif l_finished_flags(i) = 1 then l_finished := true; exit; end if; end loop; - l_max_message_id := l_max_message_id + lc_bulk_limit; - end if; - if l_finished or l_already_waited_for >= l_wait_for then - remove_buffer_info(); - if l_already_waited_for > 0 and l_already_waited_for >= l_wait_for then - raise_application_error( - ut_utils.gc_out_buffer_timeout, - 'Timeout occurred while waiting for output data. Waited for: '||l_already_waited_for||' seconds.' - ); - end if; + else + --nothing fetched from output, wait. + dbms_lock.sleep(l_sleep_time); + l_already_waited_sec := l_already_waited_sec + l_sleep_time; end if; + + l_producer_started := (l_lock_status <> 0 or l_buffer_texts.count > 0) or l_producer_started; + l_producer_finished := (l_producer_started and l_lock_status = 0 and l_buffer_texts.count = 0) or l_producer_finished; + + l_finished := + self.timeout_producer_not_finished(l_producer_finished, l_already_waited_sec, a_timeout_sec) + or self.timeout_producer_not_started(l_producer_started, l_already_waited_sec, lc_init_wait_sec) + or l_producer_finished + or l_finished; + end loop; + + self.remove_buffer_info(); return; end; diff --git a/source/core/output_buffers/ut_output_table_buffer.tps b/source/core/output_buffers/ut_output_table_buffer.tps index 726b692f8..154ce4de6 100644 --- a/source/core/output_buffers/ut_output_table_buffer.tps +++ b/source/core/output_buffers/ut_output_table_buffer.tps @@ -20,7 +20,7 @@ create or replace type ut_output_table_buffer under ut_output_buffer_base ( overriding member procedure send_line(self in out nocopy ut_output_table_buffer, a_text varchar2, a_item_type varchar2 := null), overriding member procedure send_lines(self in out nocopy ut_output_table_buffer, a_text_list ut_varchar2_rows, a_item_type varchar2 := null), overriding member procedure send_clob(self in out nocopy ut_output_table_buffer, a_text clob, a_item_type varchar2 := null), - overriding member procedure close(self in out nocopy ut_output_table_buffer), - overriding member function get_lines(a_initial_timeout natural := null, a_timeout_sec natural := null) return ut_output_data_rows pipelined + overriding member procedure lines_to_dbms_output(self in ut_output_table_buffer, a_initial_timeout number := null, a_timeout_sec number := null), + overriding member function get_lines(a_initial_timeout number := null, a_timeout_sec number := null) return ut_output_data_rows pipelined ) not final / diff --git a/source/core/types/ut_output_reporter_base.tpb b/source/core/types/ut_output_reporter_base.tpb index f6bb27b94..48970be5a 100644 --- a/source/core/types/ut_output_reporter_base.tpb +++ b/source/core/types/ut_output_reporter_base.tpb @@ -41,13 +41,6 @@ create or replace type body ut_output_reporter_base is return l_result; end; - overriding member procedure before_calling_run(self in out nocopy ut_output_reporter_base, a_run in ut_run) is - l_output_table_buffer ut_output_table_buffer; - begin - (self as ut_reporter_base).before_calling_run(a_run); - l_output_table_buffer := treat(self.output_buffer as ut_output_table_buffer); - end; - member procedure print_text(self in out nocopy ut_output_reporter_base, a_text varchar2, a_item_type varchar2 := null) is begin self.output_buffer.send_line(a_text, a_item_type); @@ -87,6 +80,7 @@ create or replace type body ut_output_reporter_base is overriding member procedure on_initialize(self in out nocopy ut_output_reporter_base, a_run in ut_run) is begin + self.output_buffer.lock_buffer(); self.output_buffer.send_line(null, 'initialize'); end; diff --git a/source/core/types/ut_output_reporter_base.tps b/source/core/types/ut_output_reporter_base.tps index 22f507f8d..21eed9957 100644 --- a/source/core/types/ut_output_reporter_base.tps +++ b/source/core/types/ut_output_reporter_base.tps @@ -20,8 +20,7 @@ create or replace type ut_output_reporter_base under ut_reporter_base( member procedure init(self in out nocopy ut_output_reporter_base, a_self_type varchar2, a_output_buffer ut_output_buffer_base := null), overriding member procedure set_reporter_id(self in out nocopy ut_output_reporter_base, a_reporter_id raw), member function set_reporter_id(self in ut_output_reporter_base, a_reporter_id raw) return ut_output_reporter_base, - overriding member procedure before_calling_run(self in out nocopy ut_output_reporter_base, a_run in ut_run), - + member procedure print_text(self in out nocopy ut_output_reporter_base, a_text varchar2, a_item_type varchar2 := null), member procedure print_text_lines(self in out nocopy ut_output_reporter_base, a_text_lines ut_varchar2_rows, a_item_type varchar2 := null), member procedure print_clob(self in out nocopy ut_output_reporter_base, a_clob clob, a_item_type varchar2 := null), diff --git a/source/create_utplsql_owner.sql b/source/create_utplsql_owner.sql index 64bcb52ce..d7e4f3040 100644 --- a/source/create_utplsql_owner.sql +++ b/source/create_utplsql_owner.sql @@ -31,15 +31,7 @@ create user &ut3_owner_schema identified by "&ut3_password" default tablespace & grant create session, create sequence, create procedure, create type, create table, create view, create synonym to &ut3_owner_schema; -begin - $if dbms_db_version.version < 18 $then - execute immediate 'grant execute on dbms_lock to &ut3_owner_schema'; - $else - null; - $end -end; -/ - +grant execute on dbms_lock to &ut3_owner_schema; grant execute on dbms_crypto to &ut3_owner_schema; grant execute on dbms_lob to &ut3_owner_schema; grant execute on dbms_xmlgen to &ut3_owner_schema; diff --git a/test/ut3_tester/core/test_output_buffer.pkb b/test/ut3_tester/core/test_output_buffer.pkb index 2e8b3337c..edb10e3e6 100644 --- a/test/ut3_tester/core/test_output_buffer.pkb +++ b/test/ut3_tester/core/test_output_buffer.pkb @@ -16,12 +16,13 @@ create or replace package body test_output_buffer is || chr(13) || chr(10) || to_clob(lpad('a text', 31000, ',a text')) || to_clob(lpad('a text', 31000, ',a text')); l_expected_item_type := lpad('some item type',1000,'-'); --Act + l_buffer.lock_buffer(); l_buffer.send_clob(l_expected_text, l_expected_item_type); l_buffer.close(); select text, item_type into l_actual_text, l_actual_item_type - from table(l_buffer.get_lines(0,0)); + from table(l_buffer.get_lines(0.1,0.1)); --Assert ut.expect(l_actual_text).to_equal(l_expected_text); @@ -32,7 +33,14 @@ create or replace package body test_output_buffer is ut.expect(l_remaining).to_equal(0); end; - + + procedure test_wait_for_producer is + l_buffer ut3_develop.ut_output_buffer_base; + begin + l_buffer := ut3_develop.ut_output_clob_table_buffer(); + ut.expect( l_buffer.get_lines_cursor(0.1) ).to_be_empty(); + end; + procedure test_doesnt_send_on_null_text is l_cur sys_refcursor; l_result integer; @@ -86,11 +94,12 @@ create or replace package body test_output_buffer is begin --Arrange l_expected := 'a text'; + l_buffer.lock_buffer(); l_buffer.send_line(l_expected); l_start := localtimestamp; --Act begin - select text into l_result from table(l_buffer.get_lines(1,1)); + select text into l_result from table(l_buffer.get_lines(0,0.3)); ut.fail('Expected a timeout exception but nothing was raised'); exception when others then @@ -101,7 +110,7 @@ create or replace package body test_output_buffer is --Throws a timeout exception ut.expect(dbms_utility.format_error_stack()).to_match('ORA'||ut3_develop.ut_utils.gc_out_buffer_timeout); --Waited for one second - ut.expect(l_duration).to_be_greater_than(interval '0.99' second); + ut.expect(l_duration).to_be_greater_or_equal(interval '0.3' second); end; select count(1) into l_remaining from table(ut3_tester_helper.run_helper.ut_output_buffer_tmp) where output_id = l_buffer.output_id; @@ -116,13 +125,15 @@ create or replace package body test_output_buffer is l_buffer ut3_develop.ut_output_buffer_base; begin --Arrange - l_stale_buffer.start_date := sysdate - 2; + l_stale_buffer.start_date := sysdate - 10; --initialize with new start date l_stale_buffer.init(); + l_stale_buffer.lock_buffer(); l_stale_buffer.send_line('some text'); l_stale_buffer.close(); l_fresh_buffer := ut3_develop.ut_output_table_buffer(); + l_fresh_buffer.lock_buffer(); l_fresh_buffer.send_line('some text'); l_fresh_buffer.close(); @@ -131,9 +142,9 @@ create or replace package body test_output_buffer is --Assert -- Data in "fresh" buffer remains - ut.expect( l_fresh_buffer.get_lines_cursor(0,0), l_buffer.self_type ).to_have_count(1); + ut.expect( l_fresh_buffer.get_lines_cursor(0,0), l_fresh_buffer.self_type ).to_have_count(1); -- Data in "stale" buffer is purged and so the call to get_lines_cursor throws ORA-20218 - ut.expect( l_stale_buffer.get_lines_cursor(0,0), l_buffer.self_type ).to_be_empty(); + ut.expect( l_stale_buffer.get_lines_cursor(0,0), l_stale_buffer.self_type ).to_be_empty(); end; procedure test_purge_text_buffer is diff --git a/test/ut3_tester/core/test_output_buffer.pks b/test/ut3_tester/core/test_output_buffer.pks index 24c2c01eb..feaa337f8 100644 --- a/test/ut3_tester/core/test_output_buffer.pks +++ b/test/ut3_tester/core/test_output_buffer.pks @@ -2,10 +2,33 @@ create or replace package test_output_buffer is --%suite(output_buffer) --%suitepath(utplsql.ut3_tester.core) + + + --%context(Read and write within the same session) + + + --%endcontext + --%context(Buffer is read in a different session than buffer write) + + --reader will wait for a_initial_timeout seconds for the writer process to start and then it will finish with error + + --reader will wait forever (beyond a_initial_timeout) if the writer process is started and end of data row was not received from the buffer + + --reader stops after reading the end of data signal from the buffer + + --reader stops when writer process ends and all data was read from the buffer + + + --%endcontext + --%test(Receives a line from buffer table and deletes) procedure test_receive; + --%test(Waits specified time for producer to lock the buffer ) + --%throws(-20218) + procedure test_wait_for_producer; + --%test(Does not send line if null text given) procedure test_doesnt_send_on_null_text; @@ -19,11 +42,9 @@ create or replace package test_output_buffer is procedure test_waiting_for_data; --%test(Purges text buffer data older than one day and leaves the rest) - --%throws(-20218) procedure test_purge_text_buffer; --%test(Purges clob buffer data older than one day and leaves the rest) - --%throws(-20218) procedure test_purge_clob_buffer; end test_output_buffer; diff --git a/test/ut3_tester_helper/coverage_helper.pkb b/test/ut3_tester_helper/coverage_helper.pkb index 99f26e9f8..2a508ca6a 100644 --- a/test/ut3_tester_helper/coverage_helper.pkb +++ b/test/ut3_tester_helper/coverage_helper.pkb @@ -290,11 +290,11 @@ create or replace package body coverage_helper is ut3_develop.ut_runner.coverage_stop(); end; - function get_job_status(a_job_name varchar2, a_job_started_after timestamp with time zone) return varchar2 is - l_status varchar2(1000); + function get_job_status(a_job_name varchar2, a_job_started_after timestamp with time zone) return user_scheduler_job_run_details%rowtype is + l_result user_scheduler_job_run_details%rowtype; begin begin - select status into l_status + select * into l_result from user_scheduler_job_run_details where job_name = upper(a_job_name) and req_start_date >= a_job_started_after; @@ -302,20 +302,11 @@ create or replace package body coverage_helper is when no_data_found then null; end; - return l_status; - end; - - procedure sleep(a_time number) is - begin - $if dbms_db_version.version >= 18 $then - dbms_session.sleep(a_time); - $else - dbms_lock.sleep(a_time ); - $end + return l_result; end; procedure run_job_and_wait_for_finish(a_job_action varchar2) is - l_status varchar2(1000); + l_job_run_info user_scheduler_job_run_details%rowtype; l_job_name varchar2(30); l_timestamp timestamp with time zone := current_timestamp; i integer := 0; @@ -323,7 +314,7 @@ create or replace package body coverage_helper is begin g_job_no := g_job_no + 1; l_job_name := 'utPLSQL_selftest_job_'||g_job_no; - sleep(0.15); + dbms_lock.sleep(0.15); dbms_scheduler.create_job( job_name => l_job_name, job_type => 'PLSQL_BLOCK', @@ -333,14 +324,14 @@ create or replace package body coverage_helper is auto_drop => TRUE, comments => 'one-time-job' ); - while (l_status is null or l_status not in ('SUCCEEDED','FAILED')) and i < 150 loop - l_status := get_job_status( l_job_name, l_timestamp ); - sleep(0.1); + while (l_job_run_info.status is null or l_job_run_info.status not in ('SUCCEEDED','FAILED')) and i < 6000 loop + l_job_run_info := get_job_status( l_job_name, l_timestamp ); + dbms_lock.sleep(0.1); i := i + 1; end loop; commit; - if l_status = 'FAILED' then - raise_application_error(-20000, 'Running a scheduler job failed'); + if nvl(l_job_run_info.status,'null') <> 'SUCCEEDED' then + raise_application_error(-20000, 'Scheduler job '''||l_job_name||''', status='''||l_job_run_info.status||'''. Additional info: '||l_job_run_info.additional_info); end if; end; @@ -378,7 +369,7 @@ create or replace package body coverage_helper is pragma autonomous_transaction; begin run_job_and_wait_for_finish( a_plsql_block ); - + dbms_lock.sleep(0.1); execute immediate q'[ declare l_results ut3_develop.ut_varchar2_list;