| 
 | DBMS_PIPE Provides API for intersession communication
 
 | 
Methods for intersession communication:
(a) Requiring permanent or semipermanent structures
- Advanced Queuing (introduced Oracle 9): 
 UseDBMS_AQADMandDBMS_AQpackages.
 AQ need to be set up for each participant. Use messages to exchange information between sessions.
- Use tables, grants, and synonyms to exchange data between sessions. 
 Subject to transaction control limitations: commits required.
(b) NOT requiring permanent or semipermanent structures- Use DBMS_PIPE.
- Uses dynamic memory structures in the SGA (pipes). 
- Similar to Unix pipes. pipes may be local, private, or publicly accessible.
- Act as FIFO queues. 
- Not binded by transaction control.
- Data transmitted asynchronously.
- DBMS_ALERT.
- Uses memory structure in SGA, that works as a public pipe.
- Populated on event triggers and subject to transaction control limits. 
- Alert pipes communicate between sessions asynchronously at the conclusion of an event. 
- Events are anything that you can build a trigger against, like DML or system action.
- Unlike DBMS_PIPE, DBMS_ALERT works on a publich-and-subscribe paradigm. 
- It publishes notifications, and then enables subscribers to register their interest in the alert and receive the notifications.
- Alerts also support two or more sessions of a single user.

Using 
public pipes with DBMS_PIPE
- Support communication between two or more sessions of a single user.
- Can also support communication between two or more users.
- DBMS_PIPE can help you mimic Unix pipes or POSIX-compliant threads. 
- Unix pipes allow you to move data between two active processes.
- Unix pipes control communication at the process level. 
- C/C++ also lets you control threading activities with mutex variables, which work at the process and thread levels. 
- Both provide higher programming language equivalents to operating system semaphores.
- DBMS_PIPE provides a non-secure mechanism for inter-session messaging. 
- Non-secure because messages can be lost from the pipe if the instance crashes or is shutdown before they are processed.
- Advanced Queues are arguably a better mechanism when secure messaging and greater flexibility are required.
- DBMS_PIPE: can also be used for passing information to external processes that may monitor or control system resources. With DBMS_PIPE you can:
- Use local pipes to control a single program’s execution.
- Use private pipes to control concurrent programs run by a single user.
- Use public pipes to control concurrent programs run by multiple users.
Pipes can be:- Implicit Pipes: created automatically when a message is sent with an unkown pipename using the SEND_MESSAGE function.
- Implicit pipes disappear when they are empty.
- Explicit pipes: created using CREATE_PIPE function. Must be removed with REMOVE_PIPE function.
Two levels of security:- Public Pipes: Accessible by any user with EXECUTE permission on DBMS_PIPE package. 
- Implicit pipes are always public.
- Implicit pipes can be created explicitly by calling the CREATE_PIPE function with the privateparameter set to FALSE.
- The domain of a public pipe is the schema in which it was created, either explicitly or implicitly.
- Public pipe works asynchronously. Any number of schema users can write to it.
- Private Pipes:  Accessible only by sessions with the same ownerid as the pipe creator, stored programs owned by the pipe creator, or by users connected as SYSDBA.
- V$DB_PIPES: Display info about pipes.
- Writing - Reading pipes:
- The sending session builds a message using one or more calls to the PACK_MESSAGE procedure. This procedure adds the message to the session's local message buffer. 
Potential applications: 
- External Service interface: communicate with user-written services external to the RDBMS. Services available asynchronously.
- Independent transactions: communicate to a separate session which can perform an operation in an independent transaction (i.e. logging)
- Alerters (non-transactional): 
- Debugging: Triggers and Stored procedures can send debug info to a pipe, that other session can keep reading and displaying
- Concentrator: multiplexing large # of users over a fewer # of netwrok connections.
|  | Using DBMS_PIPE: (1) Sending to and Receiving from the Local Pipe or Buffer
 (a) resets the local buffer, packs messages, and send them to an implicit public pipe.
 (b) Check the created pipe on
 V$DB_PIPES.(c) reads from the pipe, unpacks the buffer, reads the data, and prints the contents of the pipe to the console
 
 | 
-- (a) Resets the local buffer, packs messages, and send them to the pipe. 
set serveroutput on
declare
 msg varchar2(30);
 success integer;
begin
 msg := dbms_pipe.unique_session_name;
 -- dbms_pipe.unique_session_name returns a VARCHAR2 string that 
 -- represents the current session.
 dbms_pipe.reset_buffer;
 -- PACK_MESSAGE takes the value of the actual parameter and puts it 
 -- into the buffer stack (FIFO)
 dbms_pipe.pack_message(msg);
 dbms_output.put_line('Written to pipe ['||msg||']');
 msg := 'Message body: read me';
 dbms_pipe.pack_message(msg);
 dbms_output.put_line('Written to pipe ['||msg||']');
 
 -- creates an IMPLICIT public pipe. Pipe will be deleted when last 
 -- message in its buffer is read..
 success := dbms_pipe.send_message(pipename => 'pipe1');
 if success = 0 then
   dbms_output.put_line('Contens of the buffer sent..');
 else
   dbms_output.put_line('Error. Return # ' || success);
 end if;
end;
-- (b) Check the created pipe on V$DB_PIPES:
SQL> SELECT * from v$DB_PIPES;
OWNERID  NAME   TYPE     PIPE_SIZE              
-------- ------ -------- ---------
         PIPE1  PUBLIC        4448                   
--(c) reads from the pipe, unpacks the buffer, reads the data, and prints the contents of the 
-- pipe to the console
set serveroutput on
declare
 msg varchar2(50);
 success integer;
 no_more_items exception;
 e_read_timeout exception;
 pragma exception_init(no_more_items, -06556);
begin
  -- read the message from the pipe. Wait max 5 seconds for 
  -- message in the pipe..
  success := dbms_pipe.receive_message(pipename => 'pipe1', 
                                       timeout => 5);
  if success = 0 then
    dbms_output.put_line('Message received...');
  elsif success = 1 then 
    raise e_read_timeout;
  else
    dbms_output.put_line('Error. Return # ' || success);
  end if;  
  
  begin
    loop    -- loop through all packed messages
      dbms_pipe.unpack_message(msg);
      dbms_output.put_line('Message ['||msg||']');
    end loop; 
  exception
      when no_more_items then 
        dbms_output.put_line('end of buffer reached. exiting..');
  end;
exception
  when e_read_timeout then 
    dbms_output.put_line('Receiving message timeout. Nothing in buffer..');        
end;
|  | Intersession Communication:
 Session UserA:
 (1) Creates an explicit public pipe (explicit_public_pipe)
 (2) Opens cursor with  the list of employees with highest salaries in all departments [format: dept, last_name, salary]
 (2) Writes into the pipe the contents of the cursor. Each row one message.
 
 Session UserB:
 (1) Reads from explicit_public_pipe the list of employees and salaries.
 (2) Uses UTL_FILE to create or append an OS file with the contents of the pipe. Each message one line.
 
 | 
-- USER A:
set serveroutput on
DECLARE 
  result integer;
  msg    varchar2(100);
  i      number;
  -- cursor returns 
  cursor c1 is 
   select e1.department_id dept, last_name, salary
   from employees e1, 
       ( select department_id, max(salary) sal
         from employees
         group by department_id ) sel
   where e1.department_id = sel.department_id
   and   e1.salary = sel.sal
   order by 1;
  type t_emprec is record (dept employees.department_id%type,
                          ln employees.last_name%type, 
                          sal employees.salary%type);
  emprec t_emprec;
BEGIN
  -- (1) CREATE explicit public pipe
  result := dbms_pipe.remove_pipe('explicit_public_pipe');
  result := dbms_pipe.create_pipe(pipename => 'explicit_public_pipe',
                                  private  => FALSE);
  
  -- (2) PACK messages into the pipes.. (all employees from dept 10)
  open c1;
  i := 0;
  -- packs Session ID
  msg := i ||'$'|| dbms_pipe.unique_session_name;
  dbms_output.put_line('Packing: '|| msg);
  dbms_pipe.pack_message(msg);
  loop
    fetch c1 into emprec;
    exit when c1%notfound;
          i := i + 1;
          msg := i ||':'||emprec.dept||';'||emprec.ln ||';'||emprec.sal||'!';
          dbms_output.put_line('Packing: '|| msg);
          dbms_pipe.pack_message(msg);
  end loop;
  close c1;
  result := dbms_pipe.send_message(pipename => 'explicit_public_pipe',
                                   timeout => 5);
  dbms_output.put_line('Message sent...');
END;
/
anonymous block completed
Packing: 0$ORA$PIPE$007D02160001
Packing: 1:10;Whalen;4400!
Packing: 2:20;Hartstein;13000!
Packing: 3:30;Raphaely;11000!
Packing: 4:40;Mavris;6500!
Packing: 5:50;Fripp;8200!
Packing: 6:60;Hunold;9000!
Packing: 7:70;Baer;10000!
Packing: 8:80;Russell;14000!
Packing: 9:90;King;24000!
Packing: 10:100;Greenberg;12008!
Packing: 11:100;Chen;12008!
Packing: 12:110;Higgins;12008!
Message sent...
-- USER B runs this code:
set serveroutput on
DECLARE
  msg   varchar2(200);
  vinfo varchar2(200);
  result integer;
  no_more_items exception;
  e_read_timeout exception;
  pragma exception_init(no_more_items, -06556);
  fdest utl_file.file_type;
  type FileAttrRec is Record (
                      vfilexists BOOLEAN,
                      vfilelength number,
                      vblocksize  binary_integer);
  vfilerec fileattrrec;
BEGIN
  -- read the message from the pipe. Wait max 5 seconds for 
  -- message in the pipe..
  result := dbms_pipe.receive_message(pipename => 'explicit_public_pipe', 
                                      timeout => 5);
  if result = 0 then
    dbms_output.put_line('Message received...');
  elsif result = 1 then 
    raise e_read_timeout;
  else
    dbms_output.put_line('Error. Return # ' || result);
  end if;  
  
  -- Open FILE for writing..
  -- Check: If file ALREADY Exists: Append. Otherwise: Write
  utl_file.fgetattr('USER_DIR', 'fpipeout.txt', vfilerec.vfilexists, 
                     vfilerec.vfilelength, vfilerec.vblocksize);
  If vfilerec.vfilexists THEN
    fdest := utl_file.fopen('USER_DIR', 'fpipeout.txt', 'A', 1024);
    dbms_output.put_line('Destination file exists. Appending..');
  Else
    fdest := utl_file.fopen('USER_DIR', 'fpipeout.txt', 'W', 1024);
  End if; 
  select sysdate into vinfo from dual;
  vinfo := 'Entering new data on ' || vinfo||':';
  utl_file.put_line(fdest, vinfo, true);
  
  -- iteract through the messages in the buffer. 
  -- Write each one into the file..
  begin
    loop
      dbms_pipe.unpack_message(msg);
      dbms_output.put_line('Message ['||msg||']');
      utl_file.put_line(fdest, msg, true);
    end loop; 
  exception
      when no_more_items then 
        dbms_output.put_line('end of buffer reached. exiting..');
  end;
  -- close file
  utl_file.fclose(fdest);
  -- close pipe
  result := dbms_pipe.remove_pipe('explicit_public_pipe');
exception
  when e_read_timeout then 
    dbms_output.put_line('Receiving message timeout. Nothing in buffer..');        
end;
/
Check contents of file fpipeout.txt: 
$ cat fpipeout.txt 
Entering new data on 19-SEP-11:
0$ORA$PIPE$007D02160001
1:10;Whalen;4400!
2:20;Hartstein;13000!
3:30;Raphaely;11000!
4:40;Mavris;6500!
5:50;Fripp;8200!
6:60;Hunold;9000!
7:70;Baer;10000!
8:80;Russell;14000!
9:90;King;24000!
10:100;Greenberg;12008!
11:100;Chen;12008!
12:110;Higgins;12008!

|  | Intersession Communication: an api for communication
 Creates PACKAGE with send and receive procedures
 
 | 
-- In a package
create or replace package message_api AS
  procedure send_max_sal(p_dept in employees.department_id%type);
  procedure receive;
end message_api;
create or replace package body message_api AS
  procedure send_max_sal(p_dept in employees.department_id%type) AS
    result integer;
    msg    varchar2(100);
    cursor c1(dept employees.department_id%type) is 
      select last_name, salary
      from employees
      where salary = ( select max(salary)
                         from employees
                         where department_id = 20)
      and department_id =dept;
    vlastname employees.last_name%type;
    vsal      employees.salary%type;
  BEGIN  
    -- (2) PACK messages into the pipe.. 
    open c1(p_dept);
    fetch c1 into vlastname, vsal;
    close c1;
    msg := vlastname||':'||vsal;
    dbms_output.put_line('Packing: '|| msg);
    dbms_pipe.pack_message(msg);  
    result := dbms_pipe.send_message(pipename => 'message_pipe',
                                   timeout => 5);
    if result != 0 then
      raise_application_error(-20001, 'message_pipe error');
    end if;
    dbms_output.put_line('Message sent...');
  END send_max_sal;
    
  procedure receive as
    result integer;
    msg    varchar2(200);
  begin
    result := dbms_pipe.receive_message( pipename => 'message_pipe', 
                                         timeout  => dbms_pipe.maxwait);
    if result = 0 then
      dbms_pipe.unpack_message(msg);
      dbms_output.put_line('Message is: '|| msg);
    else
      raise_application_error(-20002, 'message_api.receive error');
    end if;
  end receive;
end message_api;
/
-- time t0: USER B: 
set serveroutput on
exec usera.message_api.receive;
(hangs)..
-- time t1: USER A:
begin 
 message_api.send_max_sal(20);
end;
/
anonymous block completed
Packing: Hartstein:13000
Message sent...
-- time t2: USER B:
anonymous block completed
Message is: Hartstein:13000
SQL> analyze table emp2 compute statistics;
table EMP2 analyzed.
SQL> set serveroutput on 
SQL> declare
 l_used_bytes number;
 l_alloc_bytes number;
begin
  dbms_space.create_index_cost
    ( ddl => 'create index emp_idx1 on emp2' ||
             '(first_name, department_id)',
      used_bytes => l_used_bytes,
      alloc_bytes => l_alloc_bytes);
  
  dbms_output.put_line('Used bytes: ' || l_used_bytes);
  dbms_output.put_line('Allocated bytes:  ' || l_alloc_bytes);
end;
/
Used bytes: 1753088
Allocated bytes:  6291456
SQL>
(1) Using UTL_FILE package
- UTL_FILE provides a restricted version of operating system stream file I/O.
- Files and directories accessible through UTL_FILE: controlled by a number of factors and database parameters. 
 The most important of these is the set of directory objects that have been granted to the user.
- Provides file access both on the client side (FORMS apps) and on the server side.
- Note that symbolic links are not supported.
Previous Oracle Versions:- In the past, accessible directories for UTL_FILE were specified in the init.ora using the UTL_FILE_DIRparameter.
- UTL_FILE_DIRaccess is NO LONGER recommended.
- Oracle recommends that you instead use the DIRECTORY object feature, which replaces UTL_FILE_DIR.
- Directory objects:
 (a) offer more flexibility and granular control to the UTL_FILE application administrator,
 (b) can be maintained dynamically (that is, without shutting down the database), and
 (c) are consistent with other Oracle tools.
UTL_FILE.GET_LINE
- the len parameter of UTL_FILE.GET_LINEspecifies the requested number of bytes of character data.
- The number of bytes actually returned to the user will be the lesser of:- The GET_LINE len parameter, or  - The number of bytes until the next line terminator character, or  - The max_linesize parameter specified by
UTL_FOPEN
- The FOPEN max_linesize parameter must be a number in the range 1 and 32767. 
 If unspecified, Oracle supplies a default value of 1024.
- The GET_LINE len parameter must be a number in the range 1 and 32767. 
 If unspecified, Oracle supplies the default value of max_linesize.
- If max_linesize and len are defined to be different values, then the lesser value takes precedence.
Examples:
First create the directory objects that identify the accessible directories:
As System:
SQL> create directory user_dir AS '/home/oracle/fileio/user';
SQL> grant read on directory user_dir to public;
SQL> grant write on directory user_dir to public;
Note:
- Oracle does not check whether the directory exists during the creation of the directory object.
- with revoke write on directory user_dir to public:
 - The file is open and read successfully when the user has only READ permission in the directory.
 - Directory name in utl_file.fopen is CASE SENSITIVE. ALL CAPS required.
|  | (a) Simple Read:
 (1) Declare a file handler (utl_file.file_type)
 (2) Open file. (The file is only acessible in the directory specified by the directory object).
 (3) Read and prints chunks of 1kb until reach EOF. (until no_data_found)
 | 
set serveroutput on 
declare 
  v1 varchar2(32767);
  v2 varchar2(32767);
  f1 utl_file.file_type;   -- declare a file handler.
begin
 f1 := utl_file.fopen('USER_DIR', 'ftest1.txt', 'R', 1024);
 begin
   loop 
     utl_file.get_line(f1, v1, 32767);
     v2 := v2 || chr(10)|| v1;
   end  loop;
 exception
   when no_data_found then 
     null;
 end;
 utl_file.fclose(f1);
 dbms_output.put_line(v2);
end;
/
anonymous block completed
"By midmorning the rain had stopped. 
Water dripped from the trees in the alameda and the crepe hung in soggy strings. 
He stood with the horses and watched the wedding party emerge from the church. 
The groom wore a dull black suit too large for him and he looked not uneasy but half 
desperate, as if unused to clothes at all. 
The bride was embarrassed and clung to him and they stood on the steps for their photograph to
be taken and in their antique formalwear posed there in front of the church they already had 
the look of old photos. 
In the sepia monochrome of a rainy day in the lost village they'd grown old instantly"
Cormac McCarthy
|  | (b) Copying data from file A into File B.
 
 If destination file exists, open it in Append mode.
 | 
set serveroutput on 
declare 
  v1 varchar2(32767);
  v2 varchar2(32767);
  fsource utl_file.file_type;
  fdest utl_file.file_type;
  vinfo varchar2(1000);
  type FileAttrRec is Record (
                      vfilexists BOOLEAN,
                      vfilelength number,
                      vblocksize  binary_integer);
  vfilerec fileattrrec;
  
begin
  fsource := utl_file.fopen('USER_DIR', 'fsource.txt', 'R', 1024);
    utl_file.fgetattr('USER_DIR', 'fdest.txt', vfilerec.vfilexists, 
                     vfilerec.vfilelength, vfilerec.vblocksize);
  
  If vfilerec.vfilexists THEN 
      fdest   := utl_file.fopen('USER_DIR', 'fdest.txt', 'a', 1024);
      dbms_output.put_line('Destination file exists. Appending..');
  ELSE
      fdest   := utl_file.fopen('USER_DIR', 'fdest.txt', 'W', 1024);
  END IF;
  
  if utl_file.is_open(fsource) and utl_file.is_open(fdest) then 
     dbms_output.put_line('Files fsource and fdest opened successfully..');
  end if;
  begin
    select sysdate into vinfo from dual;
    vinfo := vinfo || '. Entering new data: ';
    utl_file.put_line(fdest, vinfo, true);
    loop 
      utl_file.get_line(fsource, v1, 32767);
      dbms_output.put_line('Transferring: ' || v1);
      utl_file.put_line(fdest, v1, TRUE);
      v2 := v2 || chr(10)|| v1;
    end  loop;
  exception
    when no_data_found then 
      null;
  end;
  utl_file.fclose(fsource);
  utl_file.fclose(fdest);
  dbms_output.put_line(v2);
end;