Jak zařadit do fronty v tabulce Oracle AQ při potvrzení s Java a konzumovat s klientem JMS

Podařilo se mi to splnit – musel jsem hádat mnoho částí Oracle API a sbírat rady z různých blogů. Pro každého, koho to zajímá, je způsob, jak jsem to zprovoznil -1. Vytvořil jsem objekt Oracle na Oracle Db2. S tímto objektem Oracle jsem vytvořil tabulky fronty typu objektu jako užitečné zatížení3. Nyní jsem schopen zařadit do fronty typy AQMessage s užitečným zatížením STRUCT, které obsahují data objektu4. A jsem schopen vyřadit z fronty se spotřebitelem JMS, který rozumí typu užitečného zatížení ADT (Díky článku na http://blog.javaforge.net/post/30858904340/oracle-advanced-queuing-spring-custom-types )

Zde jsou kroky s kódem – Vytvořte Oracle object . Objekt může mít libovolná pole primárního datového typu jako VARCHAR, TIMESTAMP atd. a také BLOB, CLOB atd. V tomto případě jsem jeden ze sloupců poskytl jako blob, aby to bylo složitější.

create or replace type aq_event_obj as object
  id       varchar2(100),
  payload  BLOB

Nyní vytvořte tabulku fronty. Typ užitečného zatížení tabulky je objekt oracle.

private void setup(Connection conn) throws SQLException {
    doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE_TABLE( "
            + "   QUEUE_TABLE        =>  'OBJ_SINGLE_QUEUE_TABLE',  " + "   QUEUE_PAYLOAD_TYPE =>  'AQ_EVENT_OBJ', "
            + "   COMPATIBLE         =>  '10.0'); " + "END; ");
    doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE( " + "    QUEUE_NAME     =>   'OBJ_SINGLE_QUEUE', "
            + "    QUEUE_TABLE    =>   'OBJ_SINGLE_QUEUE_TABLE'); " + "END;  ");
    doUpdateDatabase(conn, "BEGIN " + "  DBMS_AQADM.START_QUEUE('OBJ_SINGLE_QUEUE'); " + "END; ");

Nyní můžete zařadit do fronty typy AQMessage v Javě s instancí struktury objektu

public void enqueueMessage(OracleConnection conn, String correlationId, byte[] payloadData) throws Exception {
    // First create the message properties:
    AQMessageProperties aqMessageProperties = AQFactory.createAQMessageProperties();

    // Specify an agent as the sender:
    AQAgent aqAgent = AQFactory.createAQAgent();

    // Create the payload
    StructDescriptor structDescriptor = StructDescriptor.createDescriptor(EVENT_OBJECT, conn);
    Map<String, Object> payloadMap = new HashMap<String, Object>();
    payloadMap.put("ID", correlationId);
    payloadMap.put("PAYLOAD", new OracleAQBLOBUtil().createBlob(conn, payloadData));
    STRUCT struct = new STRUCT(structDescriptor, conn, payloadMap);

    // Create the actual AQMessage instance:
    AQMessage aqMessage = AQFactory.createAQMessage(aqMessageProperties);

    AQEnqueueOptions opt = new AQEnqueueOptions();

    // execute the actual enqueue operation:
    conn.enqueue(QUEUE_NAME, opt, aqMessage);

Pole blob vyžadovalo speciální zpracování

public class OracleAQBLOBUtil {

    public BLOB createBlob(OracleConnection conn, byte[] payload) throws Exception {
        BLOB blob = BLOB.createTemporary(conn, false, BLOB.DURATION_SESSION);
        OutputStream outputStream = blob.setBinaryStream(1L);
        InputStream inputStream = new ByteArrayInputStream(payload);
        try {
            byte[] buffer = new byte[blob.getBufferSize()];
            int bytesRead = 0;
            while ((bytesRead = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, bytesRead);
            return blob;
        finally {

    public byte[] saveOutputStream(BLOB blob) throws Exception {
        InputStream inputStream = blob.getBinaryStream();
        int counter;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        while ((counter = inputStream.read()) > -1) {
        return byteArrayOutputStream.toByteArray();


Pro spotřebitele musíte poskytnout instanci ORADataFactory, která zákazníkovi umožní pochopit typ užitečného zatížení (váš vlastní objekt).

AQjmsSession queueSession = (AQjmsSession) session;
Queue queue = (Queue) ctx.lookup(queueName);
MessageConsumer receiver = queueSession.createReceiver(queue, new OracleAQObjORADataFactory());

Kde je kód pro OracleAQObjORADataFactory

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;

import oracle.jdbc.OracleTypes;
import oracle.jpub.runtime.MutableStruct;
import oracle.sql.BLOB;
import oracle.sql.Datum;
import oracle.sql.ORAData;
import oracle.sql.ORADataFactory;
import oracle.sql.STRUCT;

public class OracleAQObjORADataFactory  implements ORAData, ORADataFactory {

    public static final String EVENT_OBJECT = "SYSTEM.AQ_EVENT_OBJ";
    public static final int _SQL_TYPECODE = OracleTypes.STRUCT;

    protected MutableStruct _struct;

    protected static int[] _sqlType = { java.sql.Types.VARCHAR, java.sql.Types.VARBINARY };
    protected static ORADataFactory[] _factory = new ORADataFactory[2];
    protected static final OracleAQObjORADataFactory  _AqEventObjFactory = new OracleAQObjORADataFactory ();

    public static ORADataFactory getORADataFactory() {
        return _AqEventObjFactory;

    /* constructors */
    protected void _init_struct(boolean init) {
        if (init)
            _struct = new MutableStruct(new Object[2], _sqlType, _factory);

    public OracleAQObjORADataFactory () {

    public OracleAQObjORADataFactory (String id, byte[] payload) throws SQLException {

    /* ORAData interface */
    public Datum toDatum(Connection c) throws SQLException {
        return _struct.toDatum(c, EVENT_OBJECT);

    /* ORADataFactory interface */
    public ORAData create(Datum d, int sqlType) throws SQLException {
        return create(null, d, sqlType);

    protected ORAData create(OracleAQObjORADataFactory  o, Datum d, int sqlType) throws SQLException {
        if (d == null)
            return null;
        if (o == null)
            o = new OracleAQObjORADataFactory ();
        o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);
        return o;

    public String getId() throws SQLException {
        return (String) _struct.getAttribute(0);

    public void setId(String id) throws SQLException {
        _struct.setAttribute(0, id);

    public byte[] getPayload() throws SQLException {
        BLOB blob = (BLOB) _struct.getAttribute(1);
        InputStream inputStream = blob.getBinaryStream();
        return getBytes(inputStream);

    public byte[] getBytes(InputStream body) {
        int c;
        try {
            ByteArrayOutputStream f = new ByteArrayOutputStream();
            while ((c = body.read()) > -1) {
            byte[] result = f.toByteArray();
            return result;
        catch (Exception e) {
            System.err.println("Exception: " + e.getMessage());
            return null;

    public void setPayload(byte[] payload) throws SQLException {
        _struct.setAttribute(1, payload);


Pravděpodobně ve svém projektu používáte Camel nebo Spring, v takovém případě -1. Pokud používáte Camel 2.10.2 nebo novější, můžete vytvořit spotřebitele JMS s vlastním kontejnerem seznamu zpráv (CAMEL-5676)2. Pokud používáte předchozí verzi, možná nebudete moci používat způsob koncového bodu (nemohl jsem na to přijít), ale můžete použít posluchač požadavků JMS

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"

    <!-- this is just an example, you can also use a datasource as the ctor arg -->
    <bean id="connectionFactoryOracleAQQueue" class="oracle.jms.AQjmsFactory" factory-method="getQueueConnectionFactory">
        <constructor-arg index="0">
        <constructor-arg index="1" type="java.util.Properties">

    <bean id="oracleQueueCredentials" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
        <property name="targetConnectionFactory">
            <ref bean="connectionFactoryOracleAQQueue" />
        <property name="username">
        <property name="password">

    <!-- Definitions for JMS Listener classes that we have created -->
    <bean id="aqMessageListener" class="com.misys.test.JmsRequestListener" />

    <bean id="aqEventQueue" class="com.misys.test.OracleAqQueueFactoryBean">
        <property name="connectionFactory" ref="oracleQueueCredentials" />
        <property name="oracleQueueName" value="BOZ_SINGLE_QUEUE" />

    <!-- The Spring DefaultMessageListenerContainer configuration. This bean is automatically loaded when the JMS application context is started -->
    <bean id="jmsContainer" class="com.misys.test.AQMessageListenerContainer" scope="singleton">
        <property name="connectionFactory" ref="oracleQueueCredentials" />
        <property name="destination" ref="aqEventQueue" />
        <property name="messageListener" ref="aqMessageListener" />
        <property name="sessionTransacted" value="false" />


Vlastní kontejner posluchače zpráv

public class AQMessageListenerContainer extends DefaultMessageListenerContainer {

    protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
        return ((AQjmsSession) session).createConsumer(destination, getMessageSelector(),
                OracleAQObjORADataFactory.getORADataFactory(), null, isPubSubNoLocal());

a metoda posluchače požadavků onMessage

public void onMessage(Message msg) {
    try {
        AQjmsAdtMessage aQjmsAdtMessage = (AQjmsAdtMessage) msg;
        OracleAQObjORADataFactory obj = (OracleAQObjORADataFactory) aQjmsAdtMessage.getAdtPayload();

        System.out.println("Datetime: " + obj.getId());
        System.out.println("Payload: " + new String(obj.getPayload(), Charset.forName("UTF-8")));
    catch (Exception jmsException) {
        if (logger.isErrorEnabled()) {

