Thursday, December 24, 2020

Speed up insert of billions of rows into tables using dbms_parallel_execute

I have written an earlier example of how to delete millions of rows as part of a complex delete procedure. You can check it out here.  
A decade later, I was tasked to do a POC with Oracle InMemory with billions of rows.
I needed to generate rows to simulate sensor data. 
The simulation is to create data for every minute of sensor. that would mean, for one sensor, the data generated in a year is 60 minutes * 24 hours * 365 days. 

To speedup the process, I did use FORALL bulk options and could reduce the performance by 50%. However, I did not spend time looking at how I could do this using  as " Create Table As Select" (CTAS)  which is  much more efficient.  

Below is the full code used to generate billions of rows. You can use similar framework to complete tasks in parallel.

1) Create sensor_data table.


drop table sensor_data purge; 
create table sensor_data (
 sensor_id number ,
 time_id date,
 temprature number )
 PARTITION BY RANGE(time_id)
 INTERVAL(NUMTOYMINTERVAL(1,'MONTH'))
 ( PARTITION p0 VALUES LESS THAN (TO_DATE('1-1-2008', 'DD-MM-YYYY'))
);

2) create a procedure to insert data for the year for the given sensor_id. You might have to grant these privileges below as well. 

 exec DBMS_PARALLEL_EXECUTE.drop_task('test_task');
 grant select on sensor_data to public;


create or replace PROCEDURE parallel_DML_PO (p_start_id IN NUMBER, p_end_id IN NUMBER) AS
p_array_size number := 1440;
type array is table of number;
l_data ARRAY;
cursor c is select rownum seq_no from dual connect by rownum <= p_array_size;
cnt number;
BEGIN
    -- Insert in parallel for given sensor_id range
       null;
       for i in p_start_id..p_end_id loop
          for j in 1..365 loop
          open c;
          loop
          fetch c bulk collect into l_data limit p_array_size;
          forall k IN 1 .. l_data.count
          insert into sensor_data(sensor_id,time_id,temprature) values (i,   to_date('2019-01-01','YYYY-MM-DD') + j + l_data(k)/1440,
          round( 60 + 20*sin (trunc(DBMS_RANDOM.VALUE(0, 100)))
          +  15*sin(2*(22/7)/j*trunc(i))
          +  10*sin(2*(22/7)/l_data(k)*mod(i,1440)/1440)*(1 + trunc(DBMS_RANDOM.VALUE(0, 100)))/2000,2));
          exit when c%notfound;
         end loop;
      close c;   
      commit;
      end loop;
      commit;
   end loop;

end  parallel_DML_PO;

 
3)  create a table to store all the sensor_id. DBMS_PARALLEL_EXECUTE procedure will pick chunks from this table to execute the above procedure in parallel.   For this example, I have created 200 sensor_id. -- So the total rows should be (200 * 60 * 24 * 365 = 105120000) 105M.



drop table po_parallel purge;
create table po_parallel nologging as
SELECT Level AS sensor_id FROM Dual  CONNECT BY Level <= 200 ;


4) Run the procedure in parallel.

DECLARE
  l_task     VARCHAR2(30) := 'test_task';
  l_sql_stmt VARCHAR2(32767);
  l_try      NUMBER;
  l_status   NUMBER;
BEGIN
   -- create a task
  DBMS_PARALLEL_EXECUTE.create_task (task_name => l_task);
   -- point to key column and set batch size
  DBMS_PARALLEL_EXECUTE.create_chunks_by_number_col
(task_name    => l_task,
table_owner  => 'SSB',
table_name   => 'PO_PARALLEL',
table_column => 'SENSOR_ID',
chunk_size   => 20);

l_sql_stmt := 'BEGIN parallel_DML_PO(:start_id, :end_id); END;';

   -- run task in parallel
  DBMS_PARALLEL_EXECUTE.run_task(task_name=>l_task,
                                 sql_stmt  =>l_sql_stmt,
                                 language_flag => DBMS_SQL.NATIVE,
                                 parallel_level=> 10);

  -- If there is error, RESUME it for at most 2 times.
  l_try := 0;
  l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task);
  WHILE(l_try < 2 and l_status != DBMS_PARALLEL_EXECUTE.FINISHED)
  Loop
    l_try := l_try + 1;
    DBMS_PARALLEL_EXECUTE.resume_task(l_task);
    l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task);
  END LOOP;

   DBMS_PARALLEL_EXECUTE.drop_task(l_task);
END;
/


5) 
 to monitor the progress seechunk_id, status, start_id, end_id
SELECT chunk_id, status, start_id, end_id
FROM   user_parallel_execute_chunks
WHERE  task_name = 'test_task'
ORDER BY chunk_id;



Once the job is complete you can drop the task, which will drop the associated chunk information also.
BEGIN
  DBMS_PARALLEL_EXECUTE.drop_task('test_task');
END;
/

No comments:

Post a Comment

Feedback welcome