GridGain™ 2.0.3
Java API Specification

org.gridgain.grid.spi.loadbalancing.affinity
Class GridAffinityLoadBalancingSpi

java.lang.Object
  extended by org.gridgain.grid.spi.GridSpiAdapter
      extended by org.gridgain.grid.spi.loadbalancing.affinity.GridAffinityLoadBalancingSpi
All Implemented Interfaces:
GridSpi, GridSpiManagementMBean, GridAffinityLoadBalancingSpiMBean, GridLoadBalancingSpi

@GridSpiInfo(author="GridGain Systems",
             url="www.gridgain.org",
             email="support@gridgain.com",
             version="x.x")
@GridSpiMultipleInstancesSupport(value=true)
public class GridAffinityLoadBalancingSpi
extends GridSpiAdapter
implements GridLoadBalancingSpi, GridAffinityLoadBalancingSpiMBean

Load balancing SPI which uses data affinity for routing jobs to remote nodes. It provides ability to collocate computations with data. This SPI is best used with distributed caches for which it is really important that computation is routed exactly to the node on which data is cached. Many data cache schemes can take advantage of this SPI, distributed, or invalidation based. The real value is that you now can partition your database between data servers and hence load the whole database into memory for faster access.

Architecture and Deployment

The diagram below illustrates the difference between using data grids without and with GridGain. The left side shows execution flow without GridGain, in which a remote data server is queried for data, the data is then delivered to caller (master) node, which is faster than DB access, but results into unnecessary network traffic.

On the right hand side, you can see the value that GridGain brings to the picture. The whole computation logic together with data access logic is brought to data server for local execution. Assuming that serialization of computation logic is much lighter than serializing data, the network traffic in this case is minimal. Also, your computation may access data from both, Node 2 and Node 3. In this case, GridGain will split your computation into logical jobs and route appropriate logical jobs to the corresponding data servers to ensure that all computations still remain local. Now, if one of the data server nodes crashes, your jobs will be automatically failed-over to other nodes, which allows you to fail-over logic together with data (not just data fail-over provided by data grids or distributed caches).

Data Partitioning and Affinity Map/Reduce

Coding Example

To use load balancers for your job routing, in your GridTask.map(List, Object) implementation use load balancer to find out the node this job should be routed to (see GridLoadBalancerResource documentation for information on how a load balancer can be injected into your task. However, the preferred way here is to use GridTaskSplitAdapter, as it will handle affinity assignment of jobs to nodes automatically. Node that when working with affinity load balancing, your task's map(..) or split(..) methods should return GridAffinityJob instances instead of GridJob ones. GridAffinityJob adds one additional method to grid job: GridAffinityJob.getAffinityKey() which will allow GridGain to properly route the job with the same key to the same grid node every time. In case if regular GridJob is returned, not the GridAffinityJob, it will be routed to a randomly picked node. Here is an example of a grid task that uses affinity load balancing. Note how load balancing jobs is absolutely transparent to the user and is simply a matter of proper grid configuration.
 public class MyFooBarAffinityTask extends GridTaskSplitAdapter<List<Integer>,Object> {
    // For this example we receive a list of cache keys and for every key
    // create a job that accesses it.
    @Override
    protected Collection<? extends GridJob> split(int gridSize, List<Integer> cacheKeys) throws GridException {
        List<MyGridAffinityJob> jobs = new ArrayList<MyGridAffinityJob>(gridSize);

        for (Integer cacheKey : cacheKeys) {
            jobs.add(new MyGridAffinityJob(cacheKey));
        }

        // Node assignment via load balancer
        // happens automatically.
        return jobs;
    }
    ...
 }
 
Here is the example of grid jobs created by the task above:
 public class MyGridAffinityJob extends GridAffinityJobAdapter<Integer, Serializable> {
    public MyGridAffinityJob(Integer cacheKey) {
        // Pass cache key as a job argument.
        super(cacheKey);
    }

    public Serializable execute() throws GridException {
        ...
        // Access data by the same key returned
        // in 'getAffinityKey()' method.
        mycache.get(getAffinityKey());
        ...
    }
 }
 

Also note that there may be cases where your underlying cache product supports multiple caches and you need to cache data with identical keys on those caches. Although, it still may be OK to return the same key from GridAffinityJob.getAffinityKey() method for either cache, you may wish to change your affinity key method as follows to make sure that affinity load balancing for one cache is independent from another:

 public class MyFooBarAffintyJob extends GridAffinityJobAdapter<String, Integer> {
    public MyFooBarAffinity(String cacheName, Integer cacheKey) {
        // Construct affinity key by concatenating cache name
        // and affinity key. Note that we also pass cacheKey as
        // argument to access from execute method.
        super(cacheName + '.' + cacheKey, cacheKey);
    }

    @Override
    pubic Serializable execute() {
       ...
       // Access data from your cache by the cache key.
       // The main point to note here is that the same
       // affinity key always corresponds to the same
       // cache key.
       Integer cacheKey = getArgument();

       Object data = someCache.get(cacheKey);
       ...
       // Do computations.
    }
 }
 

Configuration

Mandatory

This SPI has no mandatory configuration parameters.

Optional

The following configuration parameters are optional: Please pay specific attention to the number of virtual nodes for Consistent Hashing algorithm (see GridAffinityLoadBalancingSpi.setVirtualNodeCount(int)). The larger the virtual node count, the more even the data distribution is across nodes. For best affinity distribution the value should usually be larger than 500. The default value of 1000 is good enough for most grid deployments. If you set the value too large (larger than several thousands), it may cause performance degradation. Consistent Hashing algorithm generally yields between 2% and 4% standard deviation for equal data affinity distribution.

You can use virtual node count to distribute load in uneven grid. Since the larger the virtual node count is, the more data will be stored on that node (which leads to more jobs sent to that node), nodes that have higher Memory or CPU capacity should have larger virtual node count value.

When configuring virtual node count, it is common to assign a certain number of virtual nodes to a single unit of capacity characteristic. For example, if you have 3 nodes in the grid: N1, N2, and N3, and nodes N1 and N2 have 2GB of memory and node N3 has 3GB of memory, then, to ease up calculations, for every 1GB of memory on a node you could assign 500 virtual nodes. As a result, nodes N1 and N2 should be assigned 1000 virtual nodes and node N3 should be assigned 1500 virtual nodes.

Below is a Java example of configuration for Affinity load balancing SPI: GridAffinityLoadBalancingSpi spi = new GridAffnityLoadBalancingSpi();

 GridAffinityLoadBalancingSpi spi = new GridAffnityLoadBalancingSpi();

 // Change number of virtual nodes.
 spi.setVirtualNodeCount(1500);

 GridConfigurationAdapter cfg = new GridConfigurationAdapter();

 // Override default load balancing SPI.
 cfg.setLoadBalancingSpi(spi);

 // Start grid.
 GridFactory.start(cfg);
 
Here is Spring XML configuration example:
 <property name="loadBalancingSpi">
     <bean class="org.gridgain.grid.spi.loadbalancing.affinity.GridAffinityLoadBalancingSpi">
         <property name="virtualNodeCount" value="1500"/>

         <!--
             If your grid is segmented via node attributes,
             then provide all attributes a node should have
             in order to be considered by affinity load balancer.
         -->
         <property name="affinityNodeAttributes">
             <map>
                 <entry key="node.segment" value="foobar"/>
             </map>
         </property>
     </bean>
 </property>
 

The implementation utilizes Consistent Hashing algorithm that is best documented in Tom White's Blog (we modified the algorithm to fit better into GridGain).


For information about Spring framework visit www.springframework.org



See Also:

  Documentation
  Email Support
  Online Forums
  Issue Tracking

Author:   2005-2008 Copyright © GridGain Systems. All Rights Reserved. ver. 2.0.3

 

Field Summary
static int DFLT_VIRTUAL_NODE_COUNT
          Default virtual node count for Consistent Hashing algorithm (value is 1000).
static int RECOMMENDED_MAX_NODE_WEIGHT
          Recommended maximum node weight.
static String VIRTUAL_NODE_COUNT_ATTR_NAME
          Name of node attribute to specify number of replicas for a node.
 
Constructor Summary
GridAffinityLoadBalancingSpi()
           
 
Method Summary
protected  void checkConfigurationConsistency(GridNode node)
          Checks remote node SPI configuration and prints warnings if necessary.
 Map<String,? extends Serializable> getAffinityNodeAttributes()
          Gets attribute names and values that nodes to do affinity load-balancing for.
 GridNode getBalancedNode(GridTaskSession ses, List<GridNode> top, GridJob job)
          Gets balanced node for specified job within given task session.
 Map<String,Serializable> getNodeAttributes()
          This method is called before SPI starts (before method GridSpi.spiStart(String) is called). It allows SPI implementation to add attributes to a local node. Kernel collects these attributes from all SPI implementations loaded up and then passes it to discovery SPI so that they can be exchanged with other nodes.
 int getVirtualNodeCount()
          Gets number of virtual nodes for Consistent Hashing algorithm.
 void onContextDestroyed()
          Callback invoked prior to stopping grid before SPI context is destroyed. Once this method is complete, grid will begin shutdown sequence. Use this callback for de-initialization logic that may involve SPI context. Note that invoking SPI context after this callback is complete is considered illegal and may produce unknown results.

If GridSpiAdapter is used for SPI implementation, then it will replace actual context with dummy no-op context which is usually good-enough since grid is about to shut down.

 void onContextInitialized(GridSpiContext spiCtx)
          Callback invoked when SPI context is initialized. SPI implementation may store SPI context for future access.

This method is invoked after GridSpi.spiStart(String) method is completed, so SPI should be fully functional at this point. Use this method for post-start initialization, such as subscribing a discovery listener, sending a message to remote node, etc...

 void setAffinityNodeAttributes(Map<String,? extends Serializable> affAttrs)
          Sets node attributes for data affinity grid segment.
 void setVirtualNodeCount(int virtualNodeCnt)
          Sets number of virtual nodes for Consistent Hashing algorithm.
 void spiStart(String gridName)
          This method is called to start SPI.
 void spiStop()
          This method is called to stop SPI.
 String toString()
          
 
Methods inherited from class org.gridgain.grid.spi.GridSpiAdapter
assertParameter, configInfo, getAuthor, getGridGainHome, getLocalNodeId, getSpiContext, getStartTimestamp, getStartTimestampFormatted, getUpTime, getUpTimeFormatted, getVendorEmail, getVendorUrl, getVersion, registerMBean, setSpiContext, startInfo, startStopwatch, stopInfo, unregisterMBean, warnSpi, warnSpiParameter
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 
Methods inherited from interface org.gridgain.grid.spi.GridSpiManagementMBean
getAuthor, getGridGainHome, getLocalNodeId, getStartTimestamp, getStartTimestampFormatted, getUpTime, getUpTimeFormatted, getVendorEmail, getVendorUrl, getVersion
 

Field Detail

DFLT_VIRTUAL_NODE_COUNT

public static final int DFLT_VIRTUAL_NODE_COUNT
Default virtual node count for Consistent Hashing algorithm (value is 1000).

See Also:
Constant Field Values

VIRTUAL_NODE_COUNT_ATTR_NAME

public static final String VIRTUAL_NODE_COUNT_ATTR_NAME
Name of node attribute to specify number of replicas for a node.

See Also:
Constant Field Values

RECOMMENDED_MAX_NODE_WEIGHT

public static final int RECOMMENDED_MAX_NODE_WEIGHT
Recommended maximum node weight.

See Also:
Constant Field Values
Constructor Detail

GridAffinityLoadBalancingSpi

public GridAffinityLoadBalancingSpi()
Method Detail

getVirtualNodeCount

public int getVirtualNodeCount()
Gets number of virtual nodes for Consistent Hashing algorithm. For more information about algorithm, see Tom White's Blog.

Specified by:
getVirtualNodeCount in interface GridAffinityLoadBalancingSpiMBean
Returns:
Number of virtual nodes.

setVirtualNodeCount

@GridSpiConfiguration(optional=true)
public void setVirtualNodeCount(int virtualNodeCnt)
Sets number of virtual nodes for Consistent Hashing algorithm. Generally, the larger the virtual node count, the more even the data affinity distribution is across nodes. The value should generally be larger than 500. The default value of 1000 is good enough for most grid deployments. Consistent Hashing generally yields between 2% and 4% standard deviation for equal data affinity distribution. If you set the value too large (larger than several thousands), it may cause performance degradation

You can use virtual node count to distribute load in uneven grid. The larger the virtual node count is, the more data will be stored on that node which leads to more jobs sent to that node. Hence, nodes that have higher Memory or CPU capacity should have larger virtual node count value.

For example, if you have 3 nodes in the grid: N1, N2, and N3, and nodes N1 and N2 have 2GB of memory and node N3 has 3GB of memory, then, to ease up calculations, for every 1GB of memory on a node you could assign 500 virtual nodes. As a result, nodes N1 and N2 should be assigned 1000 virtual nodes and node N3 should be assigned 1500 virtual nodes.

Parameters:
virtualNodeCnt - Weight of the node.

getAffinityNodeAttributes

public Map<String,? extends Serializable> getAffinityNodeAttributes()
Gets attribute names and values that nodes to do affinity load-balancing for.

Default value is null which means all nodes will be added.

Specified by:
getAffinityNodeAttributes in interface GridAffinityLoadBalancingSpiMBean
Returns:
Map of node attributes.

setAffinityNodeAttributes

@GridSpiConfiguration(optional=true)
public void setAffinityNodeAttributes(Map<String,? extends Serializable> affAttrs)
Sets node attributes for data affinity grid segment. All nodes that want to participate in affinity load balancing should have these attributes. This is useful when grid is segmented, for example, into clients and servers, and client nodes will never cache any data.

Default value is null, which means that all nodes will be included.

Parameters:
affAttrs - Map of node attributes for affinity load balancing.

getNodeAttributes

public Map<String,Serializable> getNodeAttributes()
                                           throws GridSpiException
This method is called before SPI starts (before method GridSpi.spiStart(String) is called). It allows SPI implementation to add attributes to a local node. Kernel collects these attributes from all SPI implementations loaded up and then passes it to discovery SPI so that they can be exchanged with other nodes.

Specified by:
getNodeAttributes in interface GridSpi
Overrides:
getNodeAttributes in class GridSpiAdapter
Throws:
GridSpiException - Throws in case of any error.
Returns:
Map of local node attributes this SPI wants to add.

spiStart

public void spiStart(@Nullable
                     String gridName)
              throws GridSpiException
This method is called to start SPI. After this method returns successfully kernel assumes that SPI is fully operational.

Specified by:
spiStart in interface GridSpi
Throws:
GridSpiException - Throws in case of any error during SPI start.
Parameters:
gridName - Name of grid instance this SPI is being started for (null for default grid).

spiStop

public void spiStop()
             throws GridSpiException
This method is called to stop SPI. After this method returns kernel assumes that this SPI is finished and all resources acquired by it are released. Note that this method can be called at any point including during recovery of failed start. It should make no assumptions on what state SPI will be in when this method is called.

Specified by:
spiStop in interface GridSpi
Throws:
GridSpiException - Thrown in case of any error during SPI stop.

onContextInitialized

public void onContextInitialized(GridSpiContext spiCtx)
                          throws GridSpiException
Callback invoked when SPI context is initialized. SPI implementation may store SPI context for future access.

This method is invoked after GridSpi.spiStart(String) method is completed, so SPI should be fully functional at this point. Use this method for post-start initialization, such as subscribing a discovery listener, sending a message to remote node, etc...

Specified by:
onContextInitialized in interface GridSpi
Overrides:
onContextInitialized in class GridSpiAdapter
Throws:
GridSpiException - If context initialization failed (grid will be stopped).
Parameters:
spiCtx - Spi context.

onContextDestroyed

public void onContextDestroyed()
Callback invoked prior to stopping grid before SPI context is destroyed. Once this method is complete, grid will begin shutdown sequence. Use this callback for de-initialization logic that may involve SPI context. Note that invoking SPI context after this callback is complete is considered illegal and may produce unknown results.

If GridSpiAdapter is used for SPI implementation, then it will replace actual context with dummy no-op context which is usually good-enough since grid is about to shut down.

Specified by:
onContextDestroyed in interface GridSpi
Overrides:
onContextDestroyed in class GridSpiAdapter

getBalancedNode

public GridNode getBalancedNode(GridTaskSession ses,
                                List<GridNode> top,
                                GridJob job)
                         throws GridException
Gets balanced node for specified job within given task session.

Specified by:
getBalancedNode in interface GridLoadBalancingSpi
Throws:
GridException - If failed to get next balanced node.
Parameters:
ses - Grid task session for currently executing task.
top - Topology of task nodes from which to pick the best balanced node for given job.
job - Job for which to pick the best balanced node.
Returns:
Best balanced node for the given job within given task session.

checkConfigurationConsistency

protected void checkConfigurationConsistency(GridNode node)
Checks remote node SPI configuration and prints warnings if necessary.

Overrides:
checkConfigurationConsistency in class GridSpiAdapter
Parameters:
node - Remote node.

toString

public String toString()

Overrides:
toString in class Object

GridGain™ 2.0.3
Java API Specification

GridGain™ - Grid Computing Made Simple, ver. 2.0.3.20052008
2005-2008 Copyright © GridGain Systems. All Rights Reserved.