GridGain Developers Hub
GitHub logo GridGain iso GridGain.com
GridGain Software Documentation

Querying and Modifying Data

Overview

This page elaborates on how to connect to an Ignite cluster and execute a variety of SQL queries using Ignite’s ODBC driver.

At the implementation layer, Ignite’s ODBC driver uses Ignite SQL Fields queries to retrieve data from Apache Ignite. This means that from ODBC side you can access only those fields that are defined in cluster’s configuration.

Moreover, starting from Ignite 1.8, the ODBC driver supports DML (Data Modification Layer) which means that you can not only request data but modify Ignite Data Grid content as well using an ODBC connection.

Configuring Ignite Cluster

As the first step, you need to set up a configuration that will be used by the cluster nodes. The configuration should include caches configurations as well with properly defined QueryEntities properties. QueryEntities are essential for the cases when an application (or the ODBC driver in our scenario) is going to query and modify data using SQL statements. Alternatively you can create tables using DDL.

SQLHENV env;

// Allocate an environment handle
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);

// Use ODBC ver 3
SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, reinterpret_cast<void*>(SQL_OV_ODBC3), 0);

SQLHDBC dbc;

// Allocate a connection handle
SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);

// Prepare the connection string
SQLCHAR connectStr[] = "DSN=My Ignite DSN";

// Connecting to Ignite Cluster.
SQLDriverConnect(dbc, NULL, connectStr, SQL_NTS, NULL, 0, NULL, SQL_DRIVER_COMPLETE);

SQLHSTMT stmt;

// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

SQLCHAR query1[] = "CREATE TABLE Person ( "
    "id LONG PRIMARY KEY, "
    "firstName VARCHAR, "
    "lastName VARCHAR, "
  	"salary FLOAT) "
    "WITH \"template=partitioned\"";

SQLExecDirect(stmt, query1, SQL_NTS);

SQLCHAR query2[] = "CREATE TABLE Organization ( "
    "id LONG PRIMARY KEY, "
    "name VARCHAR) "
    "WITH \"template=partitioned\"";

SQLExecDirect(stmt, query2, SQL_NTS);

SQLCHAR query3[] = "CREATE INDEX idx_organization_name ON Organization (name)";

SQLExecDirect(stmt, query3, SQL_NTS);
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd">
  <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">

    <!-- Enabling ODBC. -->
    <property name="odbcConfiguration">
      <bean class="org.apache.ignite.configuration.OdbcConfiguration"/>
    </property>

    <!-- Configuring cache. -->
    <property name="cacheConfiguration">
      <list>
        <bean class="org.apache.ignite.configuration.CacheConfiguration">
          <property name="name" value="Person"/>
          <property name="cacheMode" value="PARTITIONED"/>
          <property name="atomicityMode" value="TRANSACTIONAL"/>
          <property name="writeSynchronizationMode" value="FULL_SYNC"/>

          <property name="queryEntities">
            <list>
              <bean class="org.apache.ignite.cache.QueryEntity">
                <property name="keyType" value="java.lang.Long"/>
                <property name="keyFieldName" value="id"/>
                <property name="valueType" value="Person"/>

                <property name="fields">
                  <map>
                    <entry key="firstName" value="java.lang.String"/>
                    <entry key="lastName" value="java.lang.String"/>
                    <entry key="salary" value="java.lang.Double"/>
                  </map>
                </property>
              </bean>
            </list>
          </property>
        </bean>

        <bean class="org.apache.ignite.configuration.CacheConfiguration">
          <property name="name" value="Organization"/>
          <property name="cacheMode" value="PARTITIONED"/>
          <property name="atomicityMode" value="TRANSACTIONAL"/>
          <property name="writeSynchronizationMode" value="FULL_SYNC"/>

          <property name="queryEntities">
            <list>
              <bean class="org.apache.ignite.cache.QueryEntity">
                <property name="keyType" value="java.lang.Long"/>
                <property name="keyFieldName" value="id"/>
                <property name="valueType" value="Organization"/>

                <property name="fields">
                  <map>
                    <entry key="id" value="java.lang.Integer"/>
                    <entry key="name" value="java.lang.String"/>
                  </map>
                </property>

                <property name="indexes">
                    <list>
                        <bean class="org.apache.ignite.cache.QueryIndex">
                            <constructor-arg value="name"/>
                        </bean>
                    </list>
                </property>
              </bean>
            </list>
          </property>
        </bean>
      </list>
    </property>
  </bean>
</beans>

As you can see from the configuration, we defined two Ignite caches that will contain the data of Person and Organization types. For both of the types, we listed specific fields and indexes that will be read or updated using SQL.

Connecting to the Cluster

After the cluster is configured and started, we can connect to it from the ODBC driver side. To do this, you need to prepare a valid connection string and pass it as a parameter to the ODBC driver at the connection time. Refer to the Connection String page for more details.

Alternatively, you can also use a pre-configured DSN for connection purposes as shown in the example below.

SQLHENV env;

// Allocate an environment handle
SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);

// Use ODBC ver 3
SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, reinterpret_cast<void*>(SQL_OV_ODBC3), 0);

SQLHDBC dbc;

// Allocate a connection handle
SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);

// Prepare the connection string
SQLCHAR connectStr[] = "DSN=My Ignite DSN";

// Connecting to Ignite Cluster.
SQLRETURN ret = SQLDriverConnect(dbc, NULL, connectStr, SQL_NTS, NULL, 0, NULL, SQL_DRIVER_COMPLETE);

if (!SQL_SUCCEEDED(ret))
{
  SQLCHAR sqlstate[7] = { 0 };
  SQLINTEGER nativeCode;

  SQLCHAR errMsg[BUFFER_SIZE] = { 0 };
  SQLSMALLINT errMsgLen = static_cast<SQLSMALLINT>(sizeof(errMsg));

  SQLGetDiagRec(SQL_HANDLE_DBC, dbc, 1, sqlstate, &nativeCode, errMsg, errMsgLen, &errMsgLen);

  std::cerr << "Failed to connect to Apache Ignite: "
            << reinterpret_cast<char*>(sqlstate) << ": "
            << reinterpret_cast<char*>(errMsg) << ", "
            << "Native error code: " << nativeCode
            << std::endl;

  // Releasing allocated handles.
  SQLFreeHandle(SQL_HANDLE_DBC, dbc);
  SQLFreeHandle(SQL_HANDLE_ENV, env);

  return;
}

Querying Data

After everything is up and running, we’re ready to execute SQL SELECT queries using the ODBC API.

SQLHSTMT stmt;

// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

SQLCHAR query[] = "SELECT firstName, lastName, salary, Organization.name FROM Person "
  "INNER JOIN \"Organization\".Organization ON Person.orgId = Organization.id";
SQLSMALLINT queryLen = static_cast<SQLSMALLINT>(sizeof(queryLen));

SQLRETURN ret = SQLExecDirect(stmt, query, queryLen);

if (!SQL_SUCCEEDED(ret))
{
  SQLCHAR sqlstate[7] = { 0 };
  SQLINTEGER nativeCode;

  SQLCHAR errMsg[BUFFER_SIZE] = { 0 };
  SQLSMALLINT errMsgLen = static_cast<SQLSMALLINT>(sizeof(errMsg));

  SQLGetDiagRec(SQL_HANDLE_DBC, dbc, 1, sqlstate, &nativeCode, errMsg, errMsgLen, &errMsgLen);

  std::cerr << "Failed to perfrom SQL query upon Apache Ignite: "
            << reinterpret_cast<char*>(sqlstate) << ": "
            << reinterpret_cast<char*>(errMsg) << ", "
            << "Native error code: " << nativeCode
            << std::endl;
}
else
{
  // Printing the result set.
  struct OdbcStringBuffer
  {
    SQLCHAR buffer[BUFFER_SIZE];
    SQLLEN resLen;
  };

  // Getting a number of columns in the result set.
  SQLSMALLINT columnsCnt = 0;
  SQLNumResultCols(stmt, &columnsCnt);

  // Allocating buffers for columns.
  std::vector<OdbcStringBuffer> columns(columnsCnt);

  // Binding colums. For simplicity we are going to use only
  // string buffers here.
  for (SQLSMALLINT i = 0; i < columnsCnt; ++i)
    SQLBindCol(stmt, i + 1, SQL_C_CHAR, columns[i].buffer, BUFFER_SIZE, &columns[i].resLen);

  // Fetching and printing data in a loop.
  ret = SQLFetch(stmt);
  while (SQL_SUCCEEDED(ret))
  {
    for (size_t i = 0; i < columns.size(); ++i)
      std::cout << std::setw(16) << std::left << columns[i].buffer << " ";

    std::cout << std::endl;

    ret = SQLFetch(stmt);
  }
}

// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);

Inserting Data

To insert new data into the cluster, SQL INSERT statements can be used from the ODBC side.

SQLHSTMT stmt;

// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

SQLCHAR query[] =
	"INSERT INTO Person (id, orgId, firstName, lastName, resume, salary) "
	"VALUES (?, ?, ?, ?, ?, ?)";

SQLPrepare(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));

// Binding columns.
int64_t key = 0;
int64_t orgId = 0;
char name[1024] = { 0 };
SQLLEN nameLen = SQL_NTS;
double salary = 0.0;

SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &key, 0, 0);
SQLBindParameter(stmt, 2, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, &orgId, 0, 0);
SQLBindParameter(stmt, 3, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR,	sizeof(name), sizeof(name), name, 0, &nameLen);
SQLBindParameter(stmt, 4, SQL_PARAM_INPUT, SQL_C_DOUBLE, SQL_DOUBLE, 0, 0, &salary, 0, 0);

// Filling cache.
key = 1;
orgId = 1;
strncpy(name, "John", sizeof(name));
salary = 2200.0;

SQLExecute(stmt);
SQLMoreResults(stmt);

++key;
orgId = 1;
strncpy(name, "Jane", sizeof(name));
salary = 1300.0;

SQLExecute(stmt);
SQLMoreResults(stmt);

++key;
orgId = 2;
strncpy(name, "Richard", sizeof(name));
salary = 900.0;

SQLExecute(stmt);
SQLMoreResults(stmt);

++key;
orgId = 2;
strncpy(name, "Mary", sizeof(name));
salary = 2400.0;

SQLExecute(stmt);

// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);

Next, we are going to insert additional organizations without the usage of prepared statements.

SQLHSTMT stmt;

// Allocate a statement handle
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

SQLCHAR query1[] = "INSERT INTO \"Organization\".Organization (id, name) VALUES (1L, 'Some company')";

SQLExecDirect(stmt, query1, static_cast<SQLSMALLINT>(sizeof(query1)));

SQLFreeStmt(stmt, SQL_CLOSE);

SQLCHAR query2[] = "INSERT INTO \"Organization\".Organization (id, name) VALUES (2L, 'Some other company')";

  SQLExecDirect(stmt, query2, static_cast<SQLSMALLINT>(sizeof(query2)));

// Releasing statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);

Updating Data

Let’s now update the salary for some of the persons stored in the cluster using SQL UPDATE statement.

void AdjustSalary(SQLHDBC dbc, int64_t key, double salary)
{
  SQLHSTMT stmt;

  // Allocate a statement handle
  SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

  SQLCHAR query[] = "UPDATE Person SET salary=? WHERE id=?";

  SQLBindParameter(stmt, 1, SQL_PARAM_INPUT,
      SQL_C_DOUBLE, SQL_DOUBLE, 0, 0, &salary, 0, 0);

  SQLBindParameter(stmt, 2, SQL_PARAM_INPUT, SQL_C_SLONG,
      SQL_BIGINT, 0, 0, &key, 0, 0);

  SQLExecDirect(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));

  // Releasing statement handle.
  SQLFreeHandle(SQL_HANDLE_STMT, stmt);
}

...
AdjustSalary(dbc, 3, 1200.0);
AdjustSalary(dbc, 1, 2500.0);

Deleting Data

Finally, let’s remove a few records with the help of SQL DELETE statement.

void DeletePerson(SQLHDBC dbc, int64_t key)
{
  SQLHSTMT stmt;

  // Allocate a statement handle
  SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

  SQLCHAR query[] = "DELETE FROM Person WHERE id=?";

  SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT,
      0, 0, &key, 0, 0);

  SQLExecDirect(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));

  // Releasing statement handle.
  SQLFreeHandle(SQL_HANDLE_STMT, stmt);
}

...
DeletePerson(dbc, 1);
DeletePerson(dbc, 4);

Batching With Arrays of Parameters

Ignite’s ODBC driver supports batching with arrays of parameters for DML statements.

Let’s try to insert the same records we did in the example above but now with a single SQLExecute call:

SQLHSTMT stmt;

// Allocating a statement handle.
SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);

SQLCHAR query[] =
	"INSERT INTO Person (id, orgId, firstName, lastName, resume, salary) "
	"VALUES (?, ?, ?, ?, ?, ?)";

SQLPrepare(stmt, query, static_cast<SQLSMALLINT>(sizeof(query)));

// Binding columns.
int64_t key[4] = {0};
int64_t orgId[4] = {0};
char name[1024 * 4] = {0};
SQLLEN nameLen[4] = {0};
double salary[4] = {0};

SQLBindParameter(stmt, 1, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, key, 0, 0);
SQLBindParameter(stmt, 2, SQL_PARAM_INPUT, SQL_C_SLONG, SQL_BIGINT, 0, 0, orgId, 0, 0);
SQLBindParameter(stmt, 3, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR,	1024, 1024, name, 0, &nameLen);
SQLBindParameter(stmt, 4, SQL_PARAM_INPUT, SQL_C_DOUBLE, SQL_DOUBLE, 0, 0, salary, 0, 0);

// Filling cache.
key[0] = 1;
orgId[0] = 1;
strncpy(name, "John", 1023);
salary[0] = 2200.0;
nameLen[0] = SQL_NTS;

key[1] = 2;
orgId[1] = 1;
strncpy(name + 1024, "Jane", 1023);
salary[1] = 1300.0;
nameLen[1] = SQL_NTS;

key[2] = 3;
orgId[2] = 2;
strncpy(name + 1024 * 2, "Richard", 1023);
salary[2] = 900.0;
nameLen[2] = SQL_NTS;

key[3] = 4;
orgId[3] = 2;
strncpy(name + 1024 * 3, "Mary", 1023);
salary[3] = 2400.0;
nameLen[3] = SQL_NTS;

// Asking the driver to store the total number of processed argument sets
// in the following variable.
SQLULEN setsProcessed = 0;
SQLSetStmtAttr(stmt, SQL_ATTR_PARAMS_PROCESSED_PTR, &setsProcessed, SQL_IS_POINTER);

// Setting the size of the arguments array. This is 4 in our case.
SQLSetStmtAttr(stmt, SQL_ATTR_PARAMSET_SIZE, reinterpret_cast<SQLPOINTER>(4), 0);

// Executing the statement.
SQLExecute(stmt);

// Releasing the statement handle.
SQLFreeHandle(SQL_HANDLE_STMT, stmt);

Streaming

Ignite ODBC driver allows streaming data in bulk using the SET command. See the SET linkhttps://apacheignite-sql.readme.io/docs/set[command documentation] for more information.