diff --git a/development/refresh_sources.sh b/development/refresh_sources.sh
index b6ef51ef9..83518d7cc 100755
--- a/development/refresh_sources.sh
+++ b/development/refresh_sources.sh
@@ -12,7 +12,7 @@ git clone --depth=1 --branch=${SELFTESTING_BRANCH:-main} https://github.com/utPL
rm -rf utPLSQL-cli/*
# download latest release version of utPLSQL-cli
-curl -Lk -o utPLSQL-cli.zip https://github.com/utPLSQL/utPLSQL-cli/releases/download/v${UTPLSQL_CLI_VERSION}/utPLSQL-cli.zip
+curl -Lk -o utPLSQL-cli.zip https://github.com/utPLSQL/utPLSQL-cli/releases/download/${UTPLSQL_CLI_VERSION}/utPLSQL-cli.zip
# unzip utPLSQL-cli and remove the zip file
unzip utPLSQL-cli.zip && chmod u+x utPLSQL-cli/bin/utplsql && rm utPLSQL-cli.zip
diff --git a/docs/userguide/install.md b/docs/userguide/install.md
index ad3bfe97e..278bad788 100644
--- a/docs/userguide/install.md
+++ b/docs/userguide/install.md
@@ -121,7 +121,7 @@ The headless scripts accept three optional parameters that define:
The scripts need to be executed by `SYSDBA`, in order to grant access to `DBMS_LOCK` and `DBMS_CRYPTO` system packages.
!!! warning "Important"
- - Grant on `DBMS_LOCK` is required only for installation on Oracle versions below 18c. For versions 18c and above, utPLSQL uses `DBMS_SESSION.SLEEP` so access to `DBMS_LOCK` package is no longer needed.
+ - `DBMS_LOCK` is required for session synchronization between main session and session consuming realtime reports.
- The user performing the installation must have the `ADMINISTER DATABASE TRIGGER` privilege. This is required for installation of trigger that is responsible for parsing annotations at at compile-time of a package.
- When installed with DDL trigger, utPLSQL will not be registering unit tests for any of oracle-maintained schemas.
- For Oracle 11g following users are excluded:
diff --git a/examples/developer_examples/RunExampleTestSuiteWithCompositeReporter.sql b/examples/developer_examples/RunExampleTestSuiteWithCompositeReporter.sql
index 468d176ed..e713fa02d 100644
--- a/examples/developer_examples/RunExampleTestSuiteWithCompositeReporter.sql
+++ b/examples/developer_examples/RunExampleTestSuiteWithCompositeReporter.sql
@@ -20,6 +20,7 @@ begin
ut_event_manager.initialize();
ut_event_manager.add_listener(l_doc_reporter);
ut_event_manager.add_listener(l_tc_reporter);
+ ut_event_manager.trigger_event(ut_event_manager.gc_initialize, l_run);
l_suite := ut_suite(user, 'ut_exampletest',a_line_no=>1);
l_suite.description := 'Test Suite Name';
diff --git a/source/core/coverage/ut_coverage_reporter_base.tpb b/source/core/coverage/ut_coverage_reporter_base.tpb
index aff4e6560..da2bd27ad 100644
--- a/source/core/coverage/ut_coverage_reporter_base.tpb
+++ b/source/core/coverage/ut_coverage_reporter_base.tpb
@@ -92,7 +92,6 @@ create or replace type body ut_coverage_reporter_base is
ut_coverage_helper.cleanup_tmp_table();
(l_reporter as ut_output_reporter_base).before_calling_run(null);
l_reporter.after_calling_run( ut_run( a_coverage_options => a_coverage_options, a_client_character_set => a_client_character_set ) );
- l_reporter.on_finalize(null);
for i in (select /*+ no_parallel */ x.text from table(l_reporter.get_lines(1, 1)) x ) loop
pipe row (i.text);
end loop;
@@ -106,7 +105,6 @@ create or replace type body ut_coverage_reporter_base is
ut_coverage_helper.cleanup_tmp_table();
(l_reporter as ut_output_reporter_base).before_calling_run(null);
l_reporter.after_calling_run( ut_run( a_coverage_options => a_coverage_options, a_client_character_set => a_client_character_set ) );
- l_reporter.on_finalize(null);
open l_result for select /*+ no_parallel */ x.text from table(l_reporter.get_lines(1, 1)) x;
return l_result;
end;
diff --git a/source/core/output_buffers/ut_output_buffer_base.tpb b/source/core/output_buffers/ut_output_buffer_base.tpb
index 6d7964150..be4c92bd8 100644
--- a/source/core/output_buffers/ut_output_buffer_base.tpb
+++ b/source/core/output_buffers/ut_output_buffer_base.tpb
@@ -24,7 +24,7 @@ create or replace type body ut_output_buffer_base is
self.self_type := coalesce(a_self_type,self.self_type);
self.output_id := coalesce(a_output_id, self.output_id, sys_guid());
self.start_date := coalesce(self.start_date, sysdate);
- self.last_message_id := 0;
+ self.last_write_message_id := 0;
select /*+ no_parallel */ count(*) into l_exists from ut_output_buffer_info_tmp where output_id = self.output_id;
if ( l_exists > 0 ) then
update /*+ no_parallel */ ut_output_buffer_info_tmp set start_date = self.start_date where output_id = self.output_id;
@@ -32,10 +32,86 @@ create or replace type body ut_output_buffer_base is
insert /*+ no_parallel */ into ut_output_buffer_info_tmp(output_id, start_date) values (self.output_id, self.start_date);
end if;
commit;
+ dbms_lock.allocate_unique( self.output_id, self.lock_handle);
self.is_closed := 0;
end;
- member function get_lines_cursor(a_initial_timeout natural := null, a_timeout_sec natural := null) return sys_refcursor is
+ member procedure lock_buffer(a_timeout_sec number := null) is
+ l_status integer;
+ begin
+ l_status := dbms_lock.request( self.lock_handle, dbms_lock.x_mode, 5, false );
+ if l_status != 0 then
+ raise_application_error(-20000, 'Cannot allocate lock for output buffer of reporter. lock request status = '||l_status||', lock handle = '||self.lock_handle||', self.output_id ='||self.output_id);
+ end if;
+ end;
+
+ member procedure close(self in out nocopy ut_output_buffer_base) is
+ l_status integer;
+ begin
+ l_status := dbms_lock.release( self.lock_handle );
+ if l_status != 0 then
+ raise_application_error(-20000, 'Cannot release lock for output buffer of reporter. Lock_handle = '||self.lock_handle||' status = '||l_status);
+ end if;
+ self.is_closed := 1;
+ end;
+
+
+ member procedure remove_buffer_info(self in ut_output_buffer_base) is
+ pragma autonomous_transaction;
+ begin
+ delete from ut_output_buffer_info_tmp a
+ where a.output_id = self.output_id;
+ commit;
+ end;
+
+ member function timeout_producer_not_started( a_producer_started boolean, a_already_waited_sec number, a_init_wait_sec number ) return boolean
+ is
+ l_result boolean := false;
+ begin
+ if not a_producer_started and a_already_waited_sec >= a_init_wait_sec then
+ if a_init_wait_sec > 0 then
+ self.remove_buffer_info();
+ raise_application_error(
+ ut_utils.gc_out_buffer_timeout,
+ 'Timeout occurred while waiting for report data producer to start. Waited for: '||ut_utils.to_string( a_already_waited_sec )||' seconds.'
+ );
+ else
+ l_result := true;
+ end if;
+ end if;
+ return l_result;
+ end;
+
+ member function timeout_producer_not_finished(a_producer_finished boolean, a_already_waited_sec number, a_timeout_sec number) return boolean
+ is
+ l_result boolean := false;
+ begin
+ if not a_producer_finished and a_timeout_sec is not null and a_already_waited_sec >= a_timeout_sec then
+ if a_timeout_sec > 0 then
+ self.remove_buffer_info();
+ raise_application_error(
+ ut_utils.gc_out_buffer_timeout,
+ 'Timeout occurred while waiting for more data from producer. Waited for: '||ut_utils.to_string( a_already_waited_sec )||' seconds.'
+ );
+ else
+ l_result := true;
+ end if;
+ end if;
+ return l_result;
+ end;
+
+ member function get_lock_status return integer is
+ l_result integer;
+ l_release_status integer;
+ begin
+ l_result := dbms_lock.request( self.lock_handle, dbms_lock.s_mode, 0, false );
+ if l_result = 0 then
+ l_release_status := dbms_lock.release( self.lock_handle );
+ end if;
+ return l_result;
+ end;
+
+ member function get_lines_cursor(a_initial_timeout number := null, a_timeout_sec number := null) return sys_refcursor is
l_lines sys_refcursor;
begin
open l_lines for
@@ -44,7 +120,7 @@ create or replace type body ut_output_buffer_base is
return l_lines;
end;
- member procedure lines_to_dbms_output(self in ut_output_buffer_base, a_initial_timeout natural := null, a_timeout_sec natural := null) is
+ member procedure lines_to_dbms_output(self in ut_output_buffer_base, a_initial_timeout number := null, a_timeout_sec number := null) is
l_data sys_refcursor;
l_clob clob;
l_item_type varchar2(32767);
@@ -54,16 +130,20 @@ create or replace type body ut_output_buffer_base is
loop
fetch l_data into l_clob, l_item_type;
exit when l_data%notfound;
- l_lines := ut_utils.clob_to_table(l_clob);
- for i in 1 .. l_lines.count loop
- dbms_output.put_line(l_lines(i));
- end loop;
+ if dbms_lob.getlength(l_clob) > 32767 then
+ l_lines := ut_utils.clob_to_table(l_clob);
+ for i in 1 .. l_lines.count loop
+ dbms_output.put_line(l_lines(i));
+ end loop;
+ else
+ dbms_output.put_line(l_clob);
+ end if;
end loop;
close l_data;
end;
member procedure cleanup_buffer(self in ut_output_buffer_base, a_retention_time_sec natural := null) is
- gc_buffer_retention_sec constant naturaln := coalesce(a_retention_time_sec, 60 * 60 * 24); -- 24 hours
+ gc_buffer_retention_sec constant naturaln := coalesce(a_retention_time_sec, 60 * 60 * 24 * 5); -- 5 days
l_retention_days number := gc_buffer_retention_sec / (60 * 60 * 24);
l_max_retention_date date := sysdate - l_retention_days;
pragma autonomous_transaction;
diff --git a/source/core/output_buffers/ut_output_buffer_base.tps b/source/core/output_buffers/ut_output_buffer_base.tps
index 98a6847cd..53be365ae 100644
--- a/source/core/output_buffers/ut_output_buffer_base.tps
+++ b/source/core/output_buffers/ut_output_buffer_base.tps
@@ -19,16 +19,22 @@ create or replace type ut_output_buffer_base force authid definer as object(
output_id raw(32),
is_closed number(1,0),
start_date date,
- last_message_id number(38,0),
+ last_write_message_id number(38,0),
+ lock_handle varchar2(30 byte),
self_type varchar2(250 byte),
member procedure init(self in out nocopy ut_output_buffer_base, a_output_id raw := null, a_self_type varchar2 := null),
- member function get_lines_cursor(a_initial_timeout natural := null, a_timeout_sec natural := null) return sys_refcursor,
- member procedure lines_to_dbms_output(self in ut_output_buffer_base, a_initial_timeout natural := null, a_timeout_sec natural := null),
+ member procedure lock_buffer(a_timeout_sec number := null),
+ member function timeout_producer_not_started( a_producer_started boolean, a_already_waited_sec number, a_init_wait_sec number ) return boolean,
+ member function timeout_producer_not_finished(a_producer_finished boolean, a_already_waited_sec number, a_timeout_sec number) return boolean,
+ member function get_lock_status return integer,
+ member function get_lines_cursor(a_initial_timeout number := null, a_timeout_sec number := null) return sys_refcursor,
+ member procedure lines_to_dbms_output(self in ut_output_buffer_base, a_initial_timeout number := null, a_timeout_sec number := null),
member procedure cleanup_buffer(self in ut_output_buffer_base, a_retention_time_sec natural := null),
- not instantiable member procedure close(self in out nocopy ut_output_buffer_base),
+ member procedure remove_buffer_info(self in ut_output_buffer_base),
+ member procedure close(self in out nocopy ut_output_buffer_base),
not instantiable member procedure send_line(self in out nocopy ut_output_buffer_base, a_text varchar2, a_item_type varchar2 := null),
not instantiable member procedure send_lines(self in out nocopy ut_output_buffer_base, a_text_list ut_varchar2_rows, a_item_type varchar2 := null),
not instantiable member procedure send_clob(self in out nocopy ut_output_buffer_base, a_text clob, a_item_type varchar2 := null),
- not instantiable member function get_lines(a_initial_timeout natural := null, a_timeout_sec natural := null) return ut_output_data_rows pipelined
+ not instantiable member function get_lines(a_initial_timeout number := null, a_timeout_sec number := null) return ut_output_data_rows pipelined
) not final not instantiable
/
diff --git a/source/core/output_buffers/ut_output_clob_table_buffer.tpb b/source/core/output_buffers/ut_output_clob_table_buffer.tpb
index 66ff71c62..c4cfdb059 100644
--- a/source/core/output_buffers/ut_output_clob_table_buffer.tpb
+++ b/source/core/output_buffers/ut_output_clob_table_buffer.tpb
@@ -22,23 +22,13 @@ create or replace type body ut_output_clob_table_buffer is
return;
end;
- overriding member procedure close(self in out nocopy ut_output_clob_table_buffer) is
- pragma autonomous_transaction;
- begin
- self.last_message_id := self.last_message_id + 1;
- insert /*+ no_parallel */ into ut_output_clob_buffer_tmp(output_id, message_id, is_finished)
- values (self.output_id, self.last_message_id, 1);
- commit;
- self.is_closed := 1;
- end;
-
overriding member procedure send_line(self in out nocopy ut_output_clob_table_buffer, a_text varchar2, a_item_type varchar2 := null) is
pragma autonomous_transaction;
begin
if a_text is not null or a_item_type is not null then
- self.last_message_id := self.last_message_id + 1;
+ self.last_write_message_id := self.last_write_message_id + 1;
insert /*+ no_parallel */ into ut_output_clob_buffer_tmp(output_id, message_id, text, item_type)
- values (self.output_id, self.last_message_id, a_text, a_item_type);
+ values (self.output_id, self.last_write_message_id, a_text, a_item_type);
end if;
commit;
end;
@@ -47,10 +37,10 @@ create or replace type body ut_output_clob_table_buffer is
pragma autonomous_transaction;
begin
insert /*+ no_parallel */ into ut_output_clob_buffer_tmp(output_id, message_id, text, item_type)
- select /*+ no_parallel */ self.output_id, self.last_message_id + rownum, t.column_value, a_item_type
+ select /*+ no_parallel */ self.output_id, self.last_write_message_id + rownum, t.column_value, a_item_type
from table(a_text_list) t
where t.column_value is not null or a_item_type is not null;
- self.last_message_id := self.last_message_id + SQL%rowcount;
+ self.last_write_message_id := self.last_write_message_id + SQL%rowcount;
commit;
end;
@@ -58,98 +48,92 @@ create or replace type body ut_output_clob_table_buffer is
pragma autonomous_transaction;
begin
if a_text is not null and a_text != empty_clob() or a_item_type is not null then
- self.last_message_id := self.last_message_id + 1;
+ self.last_write_message_id := self.last_write_message_id + 1;
insert /*+ no_parallel */ into ut_output_clob_buffer_tmp(output_id, message_id, text, item_type)
- values (self.output_id, self.last_message_id, a_text, a_item_type);
+ values (self.output_id, self.last_write_message_id, a_text, a_item_type);
end if;
commit;
end;
- overriding member function get_lines(a_initial_timeout natural := null, a_timeout_sec natural := null) return ut_output_data_rows pipelined is
- type t_rowid_tab is table of urowid;
- l_message_rowids t_rowid_tab;
- l_buffer_data ut_output_data_rows;
- l_finished_flags ut_integer_list;
- l_already_waited_for number(10,2) := 0;
- l_finished boolean := false;
- lc_init_wait_sec constant naturaln := coalesce(a_initial_timeout, 60 ); -- 1 minute
- lc_max_wait_sec constant naturaln := coalesce(a_timeout_sec, 60 * 60 * 4); -- 4 hours
- l_wait_for integer := lc_init_wait_sec;
- lc_short_sleep_time constant number(1,1) := 0.1; --sleep for 100 ms between checks
- lc_long_sleep_time constant number(1) := 1; --sleep for 1 s when waiting long
- lc_long_wait_time constant number(1) := 1; --waiting more than 1 sec
- l_sleep_time number(2,1) := lc_short_sleep_time;
- lc_bulk_limit constant integer := 5000;
- l_max_message_id integer := lc_bulk_limit;
-
- procedure remove_read_data(a_message_rowids t_rowid_tab) is
- pragma autonomous_transaction;
- begin
- forall i in 1 .. a_message_rowids.count
- delete from ut_output_clob_buffer_tmp a
- where rowid = a_message_rowids(i);
- commit;
- end;
-
- procedure remove_buffer_info is
- pragma autonomous_transaction;
- begin
- delete from ut_output_buffer_info_tmp a
- where a.output_id = self.output_id;
- commit;
- end;
-
+ overriding member function get_lines(a_initial_timeout number := null, a_timeout_sec number := null) return ut_output_data_rows pipelined is
+ lc_init_wait_sec constant number := coalesce(a_initial_timeout, 10 );
+ l_buffer_rowids ut_varchar2_rows;
+ l_buffer_data ut_output_data_rows;
+ l_finished_flags ut_integer_list;
+ l_last_read_message_id integer;
+ l_already_waited_sec number(10,2) := 0;
+ l_finished boolean := false;
+ l_sleep_time number(2,1);
+ l_lock_status integer;
+ l_producer_started boolean := false;
+ l_producer_finished boolean := false;
+ procedure get_data_from_buffer_table(
+ a_last_read_message_id in out nocopy integer,
+ a_buffer_data out nocopy ut_output_data_rows,
+ a_buffer_rowids out nocopy ut_varchar2_rows,
+ a_finished_flags out nocopy ut_integer_list
+ ) is
+ lc_bulk_limit constant integer := 5000;
begin
- while not l_finished loop
+ a_last_read_message_id := coalesce(a_last_read_message_id, 0);
with ordered_buffer as (
- select /*+ no_parallel index(a) */ a.rowid, ut_output_data_row(a.text, a.item_type), is_finished
+ select /*+ no_parallel index(a) */ ut_output_data_row(a.text, a.item_type), rowidtochar(a.rowid), is_finished
from ut_output_clob_buffer_tmp a
where a.output_id = self.output_id
- and a.message_id <= l_max_message_id
+ and a.message_id <= a_last_read_message_id + lc_bulk_limit
order by a.message_id
)
select /*+ no_parallel */ b.*
- bulk collect into l_message_rowids, l_buffer_data, l_finished_flags
+ bulk collect into a_buffer_data, a_buffer_rowids, a_finished_flags
from ordered_buffer b;
+ a_last_read_message_id := a_last_read_message_id + a_finished_flags.count;
+ end;
- --nothing fetched from output, wait and try again
- if l_buffer_data.count = 0 then
- $if dbms_db_version.version >= 18 $then
- dbms_session.sleep(l_sleep_time);
- $else
- dbms_lock.sleep(l_sleep_time);
- $end
- l_already_waited_for := l_already_waited_for + l_sleep_time;
- if l_already_waited_for > lc_long_wait_time then
- l_sleep_time := lc_long_sleep_time;
- end if;
- else
- --reset wait time
- -- we wait lc_max_wait_sec for new message
- l_wait_for := lc_max_wait_sec;
- l_already_waited_for := 0;
- l_sleep_time := lc_short_sleep_time;
+ procedure remove_read_data(a_buffer_rowids ut_varchar2_rows) is
+ pragma autonomous_transaction;
+ begin
+ forall i in 1 .. a_buffer_rowids.count
+ delete from ut_output_clob_buffer_tmp a
+ where rowid = chartorowid(a_buffer_rowids(i));
+ commit;
+ end;
+
+ begin
+ while not l_finished loop
+
+ l_sleep_time := case when l_already_waited_sec >= 1 then 0.5 else 0.1 end;
+ l_lock_status := self.get_lock_status();
+ get_data_from_buffer_table( l_last_read_message_id, l_buffer_data, l_buffer_rowids, l_finished_flags );
+
+ if l_buffer_data.count > 0 then
+ l_already_waited_sec := 0;
for i in 1 .. l_buffer_data.count loop
if l_buffer_data(i).text is not null then
- pipe row(l_buffer_data(i));
+ pipe row( l_buffer_data(i) );
elsif l_finished_flags(i) = 1 then
l_finished := true;
exit;
end if;
end loop;
- remove_read_data(l_message_rowids);
- l_max_message_id := l_max_message_id + lc_bulk_limit;
- end if;
- if l_finished or l_already_waited_for >= l_wait_for then
- remove_buffer_info();
- if l_already_waited_for > 0 and l_already_waited_for >= l_wait_for then
- raise_application_error(
- ut_utils.gc_out_buffer_timeout,
- 'Timeout occurred while waiting for output data. Waited for: '||l_already_waited_for||' seconds.'
- );
- end if;
+ remove_read_data(l_buffer_rowids);
+ else
+ --nothing fetched from output, wait.
+ dbms_lock.sleep(l_sleep_time);
+ l_already_waited_sec := l_already_waited_sec + l_sleep_time;
end if;
+
+ l_producer_started := (l_lock_status <> 0 or l_buffer_data.count > 0) or l_producer_started;
+ l_producer_finished := (l_producer_started and l_lock_status = 0 and l_buffer_data.count = 0) or l_producer_finished;
+
+ l_finished :=
+ self.timeout_producer_not_finished(l_producer_finished, l_already_waited_sec, a_timeout_sec)
+ or self.timeout_producer_not_started(l_producer_started, l_already_waited_sec, lc_init_wait_sec)
+ or l_producer_finished
+ or l_finished;
+
end loop;
+
+ self.remove_buffer_info();
return;
end;
diff --git a/source/core/output_buffers/ut_output_clob_table_buffer.tps b/source/core/output_buffers/ut_output_clob_table_buffer.tps
index 7b98efaba..191e64c01 100644
--- a/source/core/output_buffers/ut_output_clob_table_buffer.tps
+++ b/source/core/output_buffers/ut_output_clob_table_buffer.tps
@@ -20,7 +20,6 @@ create or replace type ut_output_clob_table_buffer under ut_output_buffer_base (
overriding member procedure send_line(self in out nocopy ut_output_clob_table_buffer, a_text varchar2, a_item_type varchar2 := null),
overriding member procedure send_lines(self in out nocopy ut_output_clob_table_buffer, a_text_list ut_varchar2_rows, a_item_type varchar2 := null),
overriding member procedure send_clob(self in out nocopy ut_output_clob_table_buffer, a_text clob, a_item_type varchar2 := null),
- overriding member procedure close(self in out nocopy ut_output_clob_table_buffer),
- overriding member function get_lines(a_initial_timeout natural := null, a_timeout_sec natural := null) return ut_output_data_rows pipelined
+ overriding member function get_lines(a_initial_timeout number := null, a_timeout_sec number := null) return ut_output_data_rows pipelined
) not final
/
diff --git a/source/core/output_buffers/ut_output_table_buffer.tpb b/source/core/output_buffers/ut_output_table_buffer.tpb
index 1809a49d5..f38363e49 100644
--- a/source/core/output_buffers/ut_output_table_buffer.tpb
+++ b/source/core/output_buffers/ut_output_table_buffer.tpb
@@ -22,16 +22,6 @@ create or replace type body ut_output_table_buffer is
return;
end;
- overriding member procedure close(self in out nocopy ut_output_table_buffer) is
- pragma autonomous_transaction;
- begin
- self.last_message_id := self.last_message_id + 1;
- insert /*+ no_parallel */ into ut_output_buffer_tmp(output_id, message_id, is_finished)
- values (self.output_id, self.last_message_id, 1);
- commit;
- self.is_closed := 1;
- end;
-
overriding member procedure send_line(self in out nocopy ut_output_table_buffer, a_text varchar2, a_item_type varchar2 := null) is
pragma autonomous_transaction;
begin
@@ -44,9 +34,9 @@ create or replace type body ut_output_table_buffer is
a_item_type
);
else
- self.last_message_id := self.last_message_id + 1;
+ self.last_write_message_id := self.last_write_message_id + 1;
insert /*+ no_parallel */ into ut_output_buffer_tmp(output_id, message_id, text, item_type)
- values (self.output_id, self.last_message_id, a_text, a_item_type);
+ values (self.output_id, self.last_write_message_id, a_text, a_item_type);
end if;
commit;
end if;
@@ -56,10 +46,10 @@ create or replace type body ut_output_table_buffer is
pragma autonomous_transaction;
begin
insert /*+ no_parallel */ into ut_output_buffer_tmp(output_id, message_id, text, item_type)
- select /*+ no_parallel */ self.output_id, self.last_message_id + rownum, t.column_value, a_item_type
+ select /*+ no_parallel */ self.output_id, self.last_write_message_id + rownum, t.column_value, a_item_type
from table(a_text_list) t
where t.column_value is not null or a_item_type is not null;
- self.last_message_id := self.last_message_id + SQL%rowcount;
+ self.last_write_message_id := self.last_write_message_id + SQL%rowcount;
commit;
end;
@@ -75,99 +65,103 @@ create or replace type body ut_output_table_buffer is
a_item_type
);
else
- self.last_message_id := self.last_message_id + 1;
+ self.last_write_message_id := self.last_write_message_id + 1;
insert /*+ no_parallel */ into ut_output_buffer_tmp(output_id, message_id, text, item_type)
- values (self.output_id, self.last_message_id, a_text, a_item_type);
+ values (self.output_id, self.last_write_message_id, a_text, a_item_type);
end if;
commit;
end if;
end;
- overriding member function get_lines(a_initial_timeout natural := null, a_timeout_sec natural := null) return ut_output_data_rows pipelined is
- l_buffer_data ut_varchar2_rows;
- l_item_types ut_varchar2_rows;
- l_finished_flags ut_integer_list;
- l_already_waited_for number(10,2) := 0;
- l_finished boolean := false;
- lc_init_wait_sec constant naturaln := coalesce(a_initial_timeout, 60 ); -- 1 minute
- lc_max_wait_sec constant naturaln := coalesce(a_timeout_sec, 60 * 60 * 4); -- 4 hours
- l_wait_for integer := lc_init_wait_sec;
- lc_short_sleep_time constant number(1,1) := 0.1; --sleep for 100 ms between checks
- lc_long_sleep_time constant number(1) := 1; --sleep for 1 s when waiting long
- lc_long_wait_time constant number(1) := 1; --waiting more than 1 sec
- l_sleep_time number(2,1) := lc_short_sleep_time;
- lc_bulk_limit constant integer := 5000;
- l_max_message_id integer := lc_bulk_limit;
-
- procedure get_data_from_buffer(
- a_max_message_id integer,
- a_buffer_data out nocopy ut_varchar2_rows,
- a_item_types out nocopy ut_varchar2_rows,
- a_finished_flags out nocopy ut_integer_list
- ) is
- pragma autonomous_transaction;
- begin
- delete /*+ no_parallel */ from (
- select /*+ no_parallel */ *
- from ut_output_buffer_tmp o
- where o.output_id = self.output_id
- and o.message_id <= a_max_message_id
- order by o.message_id
- ) d
- returning d.text, d.item_type, d.is_finished
- bulk collect into a_buffer_data, a_item_types, a_finished_flags;
- commit;
-
- end;
-
- procedure remove_buffer_info is
- pragma autonomous_transaction;
- begin
- delete from ut_output_buffer_info_tmp a
- where a.output_id = self.output_id;
- commit;
- end;
+ overriding member procedure lines_to_dbms_output(self in ut_output_table_buffer, a_initial_timeout number := null, a_timeout_sec number := null) is
+ l_data sys_refcursor;
+ l_text varchar2(32767);
+ l_item_type varchar2(32767);
+ begin
+ l_data := self.get_lines_cursor(a_initial_timeout, a_timeout_sec);
+ loop
+ fetch l_data into l_text, l_item_type;
+ exit when l_data%notfound;
+ dbms_output.put_line(l_text);
+ end loop;
+ close l_data;
+ end;
- begin
+ /* Important note.
+ This function code is almost duplicated between two types for performance reasons.
+ The pipe row clause is much faster on VARCHAR2 then it is on clob.
+ That is the key reason for two implementations.
+ */
+ overriding member function get_lines(a_initial_timeout number := null, a_timeout_sec number := null) return ut_output_data_rows pipelined is
+ lc_init_wait_sec constant number := coalesce(a_initial_timeout, 10 );
+ l_buffer_texts ut_varchar2_rows;
+ l_buffer_item_types ut_varchar2_rows;
+ l_finished_flags ut_integer_list;
+ l_last_read_message_id integer;
+ l_already_waited_sec number(10,2) := 0;
+ l_finished boolean := false;
+ l_sleep_time number(2,1);
+ l_lock_status integer;
+ l_producer_started boolean := false;
+ l_producer_finished boolean := false;
+
+ procedure get_data_from_buffer_table(
+ a_last_read_message_id in out nocopy integer,
+ a_buffer_texts out nocopy ut_varchar2_rows,
+ a_buffer_item_types out nocopy ut_varchar2_rows,
+ a_finished_flags out nocopy ut_integer_list
+ ) is
+ lc_bulk_limit constant integer := 20000;
+ pragma autonomous_transaction;
+ begin
+ a_last_read_message_id := coalesce(a_last_read_message_id,0);
+ delete /*+ no_parallel */ from (
+ select /*+ no_parallel */ *
+ from ut_output_buffer_tmp o
+ where o.output_id = self.output_id
+ and o.message_id <= a_last_read_message_id + lc_bulk_limit
+ order by o.message_id
+ ) d
+ returning d.text, d.item_type, d.is_finished
+ bulk collect into a_buffer_texts, a_buffer_item_types, a_finished_flags;
+ a_last_read_message_id := a_last_read_message_id + a_finished_flags.count;
+ commit;
+ end;
+ begin
while not l_finished loop
- get_data_from_buffer( l_max_message_id, l_buffer_data, l_item_types, l_finished_flags);
- --nothing fetched from output, wait and try again
- if l_buffer_data.count = 0 then
- $if dbms_db_version.version >= 18 $then
- dbms_session.sleep(l_sleep_time);
- $else
- dbms_lock.sleep(l_sleep_time);
- $end
- l_already_waited_for := l_already_waited_for + l_sleep_time;
- if l_already_waited_for > lc_long_wait_time then
- l_sleep_time := lc_long_sleep_time;
- end if;
- else
- --reset wait time
- -- we wait lc_max_wait_sec for new message
- l_wait_for := lc_max_wait_sec;
- l_already_waited_for := 0;
- l_sleep_time := lc_short_sleep_time;
- for i in 1 .. l_buffer_data.count loop
- if l_buffer_data(i) is not null then
- pipe row(ut_output_data_row(l_buffer_data(i),l_item_types(i)));
+
+ l_sleep_time := case when l_already_waited_sec >= 1 then 0.5 else 0.1 end;
+ l_lock_status := self.get_lock_status();
+ get_data_from_buffer_table( l_last_read_message_id, l_buffer_texts, l_buffer_item_types, l_finished_flags );
+
+ if l_buffer_texts.count > 0 then
+ l_already_waited_sec := 0;
+ for i in 1 .. l_buffer_texts.count loop
+ if l_buffer_texts(i) is not null then
+ pipe row( ut_output_data_row(l_buffer_texts(i), l_buffer_item_types(i)) );
elsif l_finished_flags(i) = 1 then
l_finished := true;
exit;
end if;
end loop;
- l_max_message_id := l_max_message_id + lc_bulk_limit;
- end if;
- if l_finished or l_already_waited_for >= l_wait_for then
- remove_buffer_info();
- if l_already_waited_for > 0 and l_already_waited_for >= l_wait_for then
- raise_application_error(
- ut_utils.gc_out_buffer_timeout,
- 'Timeout occurred while waiting for output data. Waited for: '||l_already_waited_for||' seconds.'
- );
- end if;
+ else
+ --nothing fetched from output, wait.
+ dbms_lock.sleep(l_sleep_time);
+ l_already_waited_sec := l_already_waited_sec + l_sleep_time;
end if;
+
+ l_producer_started := (l_lock_status <> 0 or l_buffer_texts.count > 0) or l_producer_started;
+ l_producer_finished := (l_producer_started and l_lock_status = 0 and l_buffer_texts.count = 0) or l_producer_finished;
+
+ l_finished :=
+ self.timeout_producer_not_finished(l_producer_finished, l_already_waited_sec, a_timeout_sec)
+ or self.timeout_producer_not_started(l_producer_started, l_already_waited_sec, lc_init_wait_sec)
+ or l_producer_finished
+ or l_finished;
+
end loop;
+
+ self.remove_buffer_info();
return;
end;
diff --git a/source/core/output_buffers/ut_output_table_buffer.tps b/source/core/output_buffers/ut_output_table_buffer.tps
index 726b692f8..154ce4de6 100644
--- a/source/core/output_buffers/ut_output_table_buffer.tps
+++ b/source/core/output_buffers/ut_output_table_buffer.tps
@@ -20,7 +20,7 @@ create or replace type ut_output_table_buffer under ut_output_buffer_base (
overriding member procedure send_line(self in out nocopy ut_output_table_buffer, a_text varchar2, a_item_type varchar2 := null),
overriding member procedure send_lines(self in out nocopy ut_output_table_buffer, a_text_list ut_varchar2_rows, a_item_type varchar2 := null),
overriding member procedure send_clob(self in out nocopy ut_output_table_buffer, a_text clob, a_item_type varchar2 := null),
- overriding member procedure close(self in out nocopy ut_output_table_buffer),
- overriding member function get_lines(a_initial_timeout natural := null, a_timeout_sec natural := null) return ut_output_data_rows pipelined
+ overriding member procedure lines_to_dbms_output(self in ut_output_table_buffer, a_initial_timeout number := null, a_timeout_sec number := null),
+ overriding member function get_lines(a_initial_timeout number := null, a_timeout_sec number := null) return ut_output_data_rows pipelined
) not final
/
diff --git a/source/core/types/ut_output_reporter_base.tpb b/source/core/types/ut_output_reporter_base.tpb
index f6bb27b94..48970be5a 100644
--- a/source/core/types/ut_output_reporter_base.tpb
+++ b/source/core/types/ut_output_reporter_base.tpb
@@ -41,13 +41,6 @@ create or replace type body ut_output_reporter_base is
return l_result;
end;
- overriding member procedure before_calling_run(self in out nocopy ut_output_reporter_base, a_run in ut_run) is
- l_output_table_buffer ut_output_table_buffer;
- begin
- (self as ut_reporter_base).before_calling_run(a_run);
- l_output_table_buffer := treat(self.output_buffer as ut_output_table_buffer);
- end;
-
member procedure print_text(self in out nocopy ut_output_reporter_base, a_text varchar2, a_item_type varchar2 := null) is
begin
self.output_buffer.send_line(a_text, a_item_type);
@@ -87,6 +80,7 @@ create or replace type body ut_output_reporter_base is
overriding member procedure on_initialize(self in out nocopy ut_output_reporter_base, a_run in ut_run) is
begin
+ self.output_buffer.lock_buffer();
self.output_buffer.send_line(null, 'initialize');
end;
diff --git a/source/core/types/ut_output_reporter_base.tps b/source/core/types/ut_output_reporter_base.tps
index 22f507f8d..21eed9957 100644
--- a/source/core/types/ut_output_reporter_base.tps
+++ b/source/core/types/ut_output_reporter_base.tps
@@ -20,8 +20,7 @@ create or replace type ut_output_reporter_base under ut_reporter_base(
member procedure init(self in out nocopy ut_output_reporter_base, a_self_type varchar2, a_output_buffer ut_output_buffer_base := null),
overriding member procedure set_reporter_id(self in out nocopy ut_output_reporter_base, a_reporter_id raw),
member function set_reporter_id(self in ut_output_reporter_base, a_reporter_id raw) return ut_output_reporter_base,
- overriding member procedure before_calling_run(self in out nocopy ut_output_reporter_base, a_run in ut_run),
-
+
member procedure print_text(self in out nocopy ut_output_reporter_base, a_text varchar2, a_item_type varchar2 := null),
member procedure print_text_lines(self in out nocopy ut_output_reporter_base, a_text_lines ut_varchar2_rows, a_item_type varchar2 := null),
member procedure print_clob(self in out nocopy ut_output_reporter_base, a_clob clob, a_item_type varchar2 := null),
diff --git a/source/create_utplsql_owner.sql b/source/create_utplsql_owner.sql
index 64bcb52ce..d7e4f3040 100644
--- a/source/create_utplsql_owner.sql
+++ b/source/create_utplsql_owner.sql
@@ -31,15 +31,7 @@ create user &ut3_owner_schema identified by "&ut3_password" default tablespace &
grant create session, create sequence, create procedure, create type, create table, create view, create synonym to &ut3_owner_schema;
-begin
- $if dbms_db_version.version < 18 $then
- execute immediate 'grant execute on dbms_lock to &ut3_owner_schema';
- $else
- null;
- $end
-end;
-/
-
+grant execute on dbms_lock to &ut3_owner_schema;
grant execute on dbms_crypto to &ut3_owner_schema;
grant execute on dbms_lob to &ut3_owner_schema;
grant execute on dbms_xmlgen to &ut3_owner_schema;
diff --git a/test/ut3_tester/core/test_output_buffer.pkb b/test/ut3_tester/core/test_output_buffer.pkb
index 2e8b3337c..edb10e3e6 100644
--- a/test/ut3_tester/core/test_output_buffer.pkb
+++ b/test/ut3_tester/core/test_output_buffer.pkb
@@ -16,12 +16,13 @@ create or replace package body test_output_buffer is
|| chr(13) || chr(10) || to_clob(lpad('a text', 31000, ',a text')) || to_clob(lpad('a text', 31000, ',a text'));
l_expected_item_type := lpad('some item type',1000,'-');
--Act
+ l_buffer.lock_buffer();
l_buffer.send_clob(l_expected_text, l_expected_item_type);
l_buffer.close();
select text, item_type
into l_actual_text, l_actual_item_type
- from table(l_buffer.get_lines(0,0));
+ from table(l_buffer.get_lines(0.1,0.1));
--Assert
ut.expect(l_actual_text).to_equal(l_expected_text);
@@ -32,7 +33,14 @@ create or replace package body test_output_buffer is
ut.expect(l_remaining).to_equal(0);
end;
-
+
+ procedure test_wait_for_producer is
+ l_buffer ut3_develop.ut_output_buffer_base;
+ begin
+ l_buffer := ut3_develop.ut_output_clob_table_buffer();
+ ut.expect( l_buffer.get_lines_cursor(0.1) ).to_be_empty();
+ end;
+
procedure test_doesnt_send_on_null_text is
l_cur sys_refcursor;
l_result integer;
@@ -86,11 +94,12 @@ create or replace package body test_output_buffer is
begin
--Arrange
l_expected := 'a text';
+ l_buffer.lock_buffer();
l_buffer.send_line(l_expected);
l_start := localtimestamp;
--Act
begin
- select text into l_result from table(l_buffer.get_lines(1,1));
+ select text into l_result from table(l_buffer.get_lines(0,0.3));
ut.fail('Expected a timeout exception but nothing was raised');
exception
when others then
@@ -101,7 +110,7 @@ create or replace package body test_output_buffer is
--Throws a timeout exception
ut.expect(dbms_utility.format_error_stack()).to_match('ORA'||ut3_develop.ut_utils.gc_out_buffer_timeout);
--Waited for one second
- ut.expect(l_duration).to_be_greater_than(interval '0.99' second);
+ ut.expect(l_duration).to_be_greater_or_equal(interval '0.3' second);
end;
select count(1) into l_remaining from table(ut3_tester_helper.run_helper.ut_output_buffer_tmp) where output_id = l_buffer.output_id;
@@ -116,13 +125,15 @@ create or replace package body test_output_buffer is
l_buffer ut3_develop.ut_output_buffer_base;
begin
--Arrange
- l_stale_buffer.start_date := sysdate - 2;
+ l_stale_buffer.start_date := sysdate - 10;
--initialize with new start date
l_stale_buffer.init();
+ l_stale_buffer.lock_buffer();
l_stale_buffer.send_line('some text');
l_stale_buffer.close();
l_fresh_buffer := ut3_develop.ut_output_table_buffer();
+ l_fresh_buffer.lock_buffer();
l_fresh_buffer.send_line('some text');
l_fresh_buffer.close();
@@ -131,9 +142,9 @@ create or replace package body test_output_buffer is
--Assert
-- Data in "fresh" buffer remains
- ut.expect( l_fresh_buffer.get_lines_cursor(0,0), l_buffer.self_type ).to_have_count(1);
+ ut.expect( l_fresh_buffer.get_lines_cursor(0,0), l_fresh_buffer.self_type ).to_have_count(1);
-- Data in "stale" buffer is purged and so the call to get_lines_cursor throws ORA-20218
- ut.expect( l_stale_buffer.get_lines_cursor(0,0), l_buffer.self_type ).to_be_empty();
+ ut.expect( l_stale_buffer.get_lines_cursor(0,0), l_stale_buffer.self_type ).to_be_empty();
end;
procedure test_purge_text_buffer is
diff --git a/test/ut3_tester/core/test_output_buffer.pks b/test/ut3_tester/core/test_output_buffer.pks
index 24c2c01eb..feaa337f8 100644
--- a/test/ut3_tester/core/test_output_buffer.pks
+++ b/test/ut3_tester/core/test_output_buffer.pks
@@ -2,10 +2,33 @@ create or replace package test_output_buffer is
--%suite(output_buffer)
--%suitepath(utplsql.ut3_tester.core)
+
+
+ --%context(Read and write within the same session)
+
+
+ --%endcontext
+ --%context(Buffer is read in a different session than buffer write)
+
+ --reader will wait for a_initial_timeout seconds for the writer process to start and then it will finish with error
+
+ --reader will wait forever (beyond a_initial_timeout) if the writer process is started and end of data row was not received from the buffer
+
+ --reader stops after reading the end of data signal from the buffer
+
+ --reader stops when writer process ends and all data was read from the buffer
+
+
+ --%endcontext
+
--%test(Receives a line from buffer table and deletes)
procedure test_receive;
+ --%test(Waits specified time for producer to lock the buffer )
+ --%throws(-20218)
+ procedure test_wait_for_producer;
+
--%test(Does not send line if null text given)
procedure test_doesnt_send_on_null_text;
@@ -19,11 +42,9 @@ create or replace package test_output_buffer is
procedure test_waiting_for_data;
--%test(Purges text buffer data older than one day and leaves the rest)
- --%throws(-20218)
procedure test_purge_text_buffer;
--%test(Purges clob buffer data older than one day and leaves the rest)
- --%throws(-20218)
procedure test_purge_clob_buffer;
end test_output_buffer;
diff --git a/test/ut3_tester_helper/coverage_helper.pkb b/test/ut3_tester_helper/coverage_helper.pkb
index 99f26e9f8..2a508ca6a 100644
--- a/test/ut3_tester_helper/coverage_helper.pkb
+++ b/test/ut3_tester_helper/coverage_helper.pkb
@@ -290,11 +290,11 @@ create or replace package body coverage_helper is
ut3_develop.ut_runner.coverage_stop();
end;
- function get_job_status(a_job_name varchar2, a_job_started_after timestamp with time zone) return varchar2 is
- l_status varchar2(1000);
+ function get_job_status(a_job_name varchar2, a_job_started_after timestamp with time zone) return user_scheduler_job_run_details%rowtype is
+ l_result user_scheduler_job_run_details%rowtype;
begin
begin
- select status into l_status
+ select * into l_result
from user_scheduler_job_run_details
where job_name = upper(a_job_name)
and req_start_date >= a_job_started_after;
@@ -302,20 +302,11 @@ create or replace package body coverage_helper is
when no_data_found then
null;
end;
- return l_status;
- end;
-
- procedure sleep(a_time number) is
- begin
- $if dbms_db_version.version >= 18 $then
- dbms_session.sleep(a_time);
- $else
- dbms_lock.sleep(a_time );
- $end
+ return l_result;
end;
procedure run_job_and_wait_for_finish(a_job_action varchar2) is
- l_status varchar2(1000);
+ l_job_run_info user_scheduler_job_run_details%rowtype;
l_job_name varchar2(30);
l_timestamp timestamp with time zone := current_timestamp;
i integer := 0;
@@ -323,7 +314,7 @@ create or replace package body coverage_helper is
begin
g_job_no := g_job_no + 1;
l_job_name := 'utPLSQL_selftest_job_'||g_job_no;
- sleep(0.15);
+ dbms_lock.sleep(0.15);
dbms_scheduler.create_job(
job_name => l_job_name,
job_type => 'PLSQL_BLOCK',
@@ -333,14 +324,14 @@ create or replace package body coverage_helper is
auto_drop => TRUE,
comments => 'one-time-job'
);
- while (l_status is null or l_status not in ('SUCCEEDED','FAILED')) and i < 150 loop
- l_status := get_job_status( l_job_name, l_timestamp );
- sleep(0.1);
+ while (l_job_run_info.status is null or l_job_run_info.status not in ('SUCCEEDED','FAILED')) and i < 6000 loop
+ l_job_run_info := get_job_status( l_job_name, l_timestamp );
+ dbms_lock.sleep(0.1);
i := i + 1;
end loop;
commit;
- if l_status = 'FAILED' then
- raise_application_error(-20000, 'Running a scheduler job failed');
+ if nvl(l_job_run_info.status,'null') <> 'SUCCEEDED' then
+ raise_application_error(-20000, 'Scheduler job '''||l_job_name||''', status='''||l_job_run_info.status||'''. Additional info: '||l_job_run_info.additional_info);
end if;
end;
@@ -378,7 +369,7 @@ create or replace package body coverage_helper is
pragma autonomous_transaction;
begin
run_job_and_wait_for_finish( a_plsql_block );
-
+ dbms_lock.sleep(0.1);
execute immediate q'[
declare
l_results ut3_develop.ut_varchar2_list;