GridGain™ 2.0.3
Java API Specification

org.gridgain.grid.spi.collision.jobstealing
Class GridJobStealingCollisionSpi

java.lang.Object
  extended by org.gridgain.grid.spi.GridSpiAdapter
      extended by org.gridgain.grid.spi.collision.jobstealing.GridJobStealingCollisionSpi
All Implemented Interfaces:
GridCollisionSpi, GridJobStealingCollisionSpiMBean, GridSpi, GridSpiManagementMBean

@GridSpiInfo(author="GridGain Systems",
             url="www.gridgain.org",
             email="support@gridgain.com",
             version="x.x")
@GridSpiMultipleInstancesSupport(value=true)
public class GridJobStealingCollisionSpi
extends GridSpiAdapter
implements GridCollisionSpi, GridJobStealingCollisionSpiMBean

Collision SPI that supports job stealing from over-utilized nodes to under-utilized nodes. This SPI is especially useful if you have some jobs within task complete fast, and others sitting in the waiting queue on slower nodes. In such case, the waiting jobs will be stolen from slower node and moved to the fast under-utilized node.

The design and ideas for this SPI are significantly influenced by Java Fork/Join Framework authored by Doug Lea and planned for Java 7. GridJobStealingCollisionSpi took similar concepts and applied them to the grid (as opposed to within VM support planned in Java 7).

Quite often grids are deployed across many computers some of which will always be more powerful than others. This SPI helps you avoid jobs being stuck at a slower node, as they will be stolen by a faster node. In the following picture when Node3 becomes free, it steals Job13 and Job23 from Node1 and Node2 respectively.

Note that this SPI must always be used in conjunction with GridJobStealingFailoverSpi. The responsibility of Job Stealing Failover SPI is to properly route stolen jobs to the nodes that initially requested (stole) these jobs. The SPI maintains a counter of how many times a jobs was stolen and hence traveled to another node. GridJobStealingCollisionSpi checks this counter and will not allow a job to be stolen if this counter exceeds a certain threshold GridJobStealingCollisionSpi.setMaximumStealingAttempts(int).

Configuration

In order to use this SPI, you should configure your grid instance to use GridJobStealingCollisionSpi either from Spring XML file or directly. The following configuration parameters are supported:

Mandatory

This SPI has no mandatory configuration parameters.

Optional

The following configuration parameters are optional: Below is example of configuring this SPI from Java code:
 GridJobStealingCollisionSpi spi = new GridJobStealingCollisionSpi();

 // Configure number of waiting jobs
 // in the queue for job stealing.
 spi.setWaitJobsThreshold(10);

 // Configure message expire time (in milliseconds).
 spi.setMessageExpireTime(500);

 // Configure stealing attempts number.
 spi.setMaximumStealingAttempts(10);

 // Configure number of active jobs that are allowed to execute
 // in parallel. This number should usually be equal to the number
 // of threads in the pool (default is 100).
 spi.setActiveJobsThreshold(50);

 // Enable stealing.
 spi.setStealingEnabled(true);

 // Set stealing attribute to steal from/to nodes that have it.
 spi.setStealingAttributes(Collections.singletonMap("node.segment", "foobar"));

 GridConfigurationAdapter cfg = new GridConfigurationAdapter();

 // Override default Collision SPI.
 cfg.setCollisionSpi(spi);
 
Here is an example of how this SPI can be configured from Spring XML configuration:
 <property name="collisionSpi">
     <bean class="org.gridgain.grid.spi.collision.jobstealing.GridJobStealingCollisionSpi">
         <property name="activeJobsThreshold" value="100"/>
         <property name="waitJobsThreshold" value="0"/>
         <property name="messageExpireTime" value="1000"/>
         <property name="maximumStealingAttempts" value="10"/>
         <property name="stealingEnabled" value="true"/>
         <property name="stealingAttributes">
             <map>
                 <entry key="node.segment" value="foobar"/>
             </map>
         </property>
     </bean>
 </property>
 


For information about Spring framework visit www.springframework.org



See Also:

  Documentation
  Email Support
  Online Forums
  Issue Tracking

Author:   2005-2008 Copyright © GridGain Systems. All Rights Reserved. ver. 2.0.3

 

Field Summary
static String ACTIVE_JOBS_THRESHOLD_NODE_ATTR
          Threshold of maximum jobs executing concurrently.
static int DFLT_ACTIVE_JOBS_THRESHOLD
          Default number of parallel jobs allowed (value is 95 which is slightly less same as default value of threads in the execution thread pool to allow some extra threads for system processing).
static int DFLT_MAX_STEALING_ATTEMPTS
          Maximum number of attempts to steal job by another node (default is 5).
static long DFLT_MSG_EXPIRE_TIME
          Default steal message expire time in milliseconds (value is 1000).
static int DFLT_WAIT_JOBS_THRESHOLD
          Default threshold of waiting jobs.
static String MAX_STEALING_ATTEMPT_ATTR
          Maximum stealing attempts attribute name.
static String MSG_EXPIRE_TIME
          Stealing request expiration time attribute name.
static String STEALING_ATTEMPT_COUNT_ATTR
          Name of job context attribute containing current stealing attempt count.
static String THIEF_NODE_ATTR
          Job context attribute for storing thief node UUID (this attribute is used in job stealing failover SPI).
static String WAIT_JOBS_THRESHOLD_NODE_ATTR
          Threshold of maximum jobs on waiting queue.
 
Constructor Summary
GridJobStealingCollisionSpi()
           
 
Method Summary
protected  void checkConfigurationConsistency(GridNode node)
          Checks remote node attributes.
 int getActiveJobsThreshold()
          Gets number of jobs that can be executed in parallel.
 int getCurrentActiveJobsCount()
          Gets current number of jobs that are being executed.
 int getCurrentJobsToStealCount()
          Gets current number of jobs to be stolen.
 int getCurrentWaitJobsCount()
          Gets current number of jobs that wait for the execution.
 int getMaximumStealingAttempts()
          Gets maximum number of attempts to steal job by another node.
 long getMessageExpireTime()
          Message expire time configuration parameter.
 Map<String,Serializable> getNodeAttributes()
          This method is called before SPI starts (before method GridSpi.spiStart(String) is called). It allows SPI implementation to add attributes to a local node. Kernel collects these attributes from all SPI implementations loaded up and then passes it to discovery SPI so that they can be exchanged with other nodes.
 Map<String,? extends Serializable> getStealingAttributes()
          Configuration parameter to enable stealing to/from only nodes that have these attributes set (see GridNode.getAttribute(String) and GridConfiguration.getUserAttributes() methods).
 int getTotalStolenJobsCount()
          Gets total number of stolen jobs.
 int getWaitJobsThreshold()
          Gets job count threshold at which this node will start stealing jobs from other nodes.
 boolean isStealingEnabled()
          Gets flag indicating whether this node should attempt to steal jobs from other nodes.
 void onCollision(Collection<GridCollisionJobContext> waitJobs, Collection<GridCollisionJobContext> activeJobs)
          This is a callback called when either new grid job arrived or executing job finished its execution.
 void onContextDestroyed()
          Callback invoked prior to stopping grid before SPI context is destroyed. Once this method is complete, grid will begin shutdown sequence. Use this callback for de-initialization logic that may involve SPI context. Note that invoking SPI context after this callback is complete is considered illegal and may produce unknown results.

If GridSpiAdapter is used for SPI implementation, then it will replace actual context with dummy no-op context which is usually good-enough since grid is about to shut down.

 void onContextInitialized(GridSpiContext spiCtx)
          Callback invoked when SPI context is initialized. SPI implementation may store SPI context for future access.

This method is invoked after GridSpi.spiStart(String) method is completed, so SPI should be fully functional at this point. Use this method for post-start initialization, such as subscribing a discovery listener, sending a message to remote node, etc...

 void setActiveJobsThreshold(int activeJobsThreshold)
          Sets number of jobs that are allowed to be executed in parallel on this node.
 void setExternalCollisionListener(GridCollisionExternalListener listener)
          Listener to be set for notification of external collision events (e.g. job stealing).
 void setMaximumStealingAttempts(int maxStealingAttempts)
          Sets maximum number of attempts to steal job by another node.
 void setMessageExpireTime(long msgExpireTime)
          Message expire time configuration parameter.
 void setStealingAttributes(Map<String,? extends Serializable> stealAttrs)
          Configuration parameter to enable stealing to/from only nodes that have these attributes set (see GridNode.getAttribute(String) and GridConfiguration.getUserAttributes() methods).
 void setStealingEnabled(boolean isStealingEnabled)
          Sets flag indicating whether this node should attempt to steal jobs from other nodes.
 void setWaitJobsThreshold(int waitJobsThreshold)
          Sets wait jobs threshold.
 void spiStart(String gridName)
          This method is called to start SPI.
 void spiStop()
          This method is called to stop SPI.
 String toString()
          
 
Methods inherited from class org.gridgain.grid.spi.GridSpiAdapter
assertParameter, configInfo, getAuthor, getGridGainHome, getLocalNodeId, getSpiContext, getStartTimestamp, getStartTimestampFormatted, getUpTime, getUpTimeFormatted, getVendorEmail, getVendorUrl, getVersion, registerMBean, setSpiContext, startInfo, startStopwatch, stopInfo, unregisterMBean, warnSpi, warnSpiParameter
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 
Methods inherited from interface org.gridgain.grid.spi.GridSpiManagementMBean
getAuthor, getGridGainHome, getLocalNodeId, getStartTimestamp, getStartTimestampFormatted, getUpTime, getUpTimeFormatted, getVendorEmail, getVendorUrl, getVersion
 

Field Detail

DFLT_MAX_STEALING_ATTEMPTS

public static final int DFLT_MAX_STEALING_ATTEMPTS
Maximum number of attempts to steal job by another node (default is 5).

See Also:
Constant Field Values

DFLT_ACTIVE_JOBS_THRESHOLD

public static final int DFLT_ACTIVE_JOBS_THRESHOLD
Default number of parallel jobs allowed (value is 95 which is slightly less same as default value of threads in the execution thread pool to allow some extra threads for system processing).

See Also:
Constant Field Values

DFLT_MSG_EXPIRE_TIME

public static final long DFLT_MSG_EXPIRE_TIME
Default steal message expire time in milliseconds (value is 1000). Once this time is elapsed and no response for steal message is received, the message is considered lost and another steal message will be generated, potentially to another node.

See Also:
Constant Field Values

DFLT_WAIT_JOBS_THRESHOLD

public static final int DFLT_WAIT_JOBS_THRESHOLD
Default threshold of waiting jobs. If number of waiting jobs exceeds this threshold, then waiting jobs will become available to be stolen (value is 0).

See Also:
Constant Field Values

THIEF_NODE_ATTR

public static final String THIEF_NODE_ATTR
Job context attribute for storing thief node UUID (this attribute is used in job stealing failover SPI).

See Also:
Constant Field Values

WAIT_JOBS_THRESHOLD_NODE_ATTR

public static final String WAIT_JOBS_THRESHOLD_NODE_ATTR
Threshold of maximum jobs on waiting queue.

See Also:
Constant Field Values

ACTIVE_JOBS_THRESHOLD_NODE_ATTR

public static final String ACTIVE_JOBS_THRESHOLD_NODE_ATTR
Threshold of maximum jobs executing concurrently.

See Also:
Constant Field Values

STEALING_ATTEMPT_COUNT_ATTR

public static final String STEALING_ATTEMPT_COUNT_ATTR
Name of job context attribute containing current stealing attempt count. This count is incremented every time the same job gets stolen for execution.

See Also:
GridJobContext, Constant Field Values

MAX_STEALING_ATTEMPT_ATTR

public static final String MAX_STEALING_ATTEMPT_ATTR
Maximum stealing attempts attribute name.

See Also:
Constant Field Values

MSG_EXPIRE_TIME

public static final String MSG_EXPIRE_TIME
Stealing request expiration time attribute name.

See Also:
Constant Field Values
Constructor Detail

GridJobStealingCollisionSpi

public GridJobStealingCollisionSpi()
Method Detail

setActiveJobsThreshold

@GridSpiConfiguration(optional=true)
public void setActiveJobsThreshold(int activeJobsThreshold)
Sets number of jobs that are allowed to be executed in parallel on this node. Node that this attribute may be different for different grid nodes as stronger nodes may be able to execute more jobs in parallel.

If not provided, default value is GridJobStealingCollisionSpi.DFLT_ACTIVE_JOBS_THRESHOLD.

Parameters:
activeJobsThreshold - Maximum number of jobs to be executed in parallel.

getActiveJobsThreshold

public int getActiveJobsThreshold()
Gets number of jobs that can be executed in parallel.

Specified by:
getActiveJobsThreshold in interface GridJobStealingCollisionSpiMBean
Returns:
Number of jobs that can be executed in parallel.

setWaitJobsThreshold

@GridSpiConfiguration(optional=true)
public void setWaitJobsThreshold(int waitJobsThreshold)
Sets wait jobs threshold. If number of jobs in the waiting queue goes below (or equal to) this threshold, then implementation will attempt to steal jobs from other, more over-loaded nodes.

Note this value may be different (but does not have to be) for different nodes in the grid. You may wish to give stronger nodes a smaller waiting threshold so they can start stealing jobs from other nodes sooner.

If not provided, default value is GridJobStealingCollisionSpi.DFLT_WAIT_JOBS_THRESHOLD.

Parameters:
waitJobsThreshold - Default job priority.

getWaitJobsThreshold

public int getWaitJobsThreshold()
Gets job count threshold at which this node will start stealing jobs from other nodes.

Specified by:
getWaitJobsThreshold in interface GridJobStealingCollisionSpiMBean
Returns:
Job count threshold.

setMessageExpireTime

@GridSpiConfiguration(optional=true)
public void setMessageExpireTime(long msgExpireTime)
Message expire time configuration parameter. If no response is received from a busy node to a job stealing request, then implementation will assume that message never got there, or that remote node does not have this node included into topology of any of the jobs it has. In any case, job steal request will be resent (potentially to another node).

If not provided, default value is GridJobStealingCollisionSpi.DFLT_MSG_EXPIRE_TIME.

Parameters:
msgExpireTime - Message expire time.

getMessageExpireTime

public long getMessageExpireTime()
Message expire time configuration parameter. If no response is received from a busy node to a job stealing message, then implementation will assume that message never got there, or that remote node does not have this node included into topology of any of the jobs it has.

Specified by:
getMessageExpireTime in interface GridJobStealingCollisionSpiMBean
Returns:
Message expire time.

setStealingEnabled

@GridSpiConfiguration(optional=true)
public void setStealingEnabled(boolean isStealingEnabled)
Sets flag indicating whether this node should attempt to steal jobs from other nodes. If false, then this node will steal allow jobs to be stolen from it, but won't attempt to steal any jobs from other nodes.

Default value is true.

Parameters:
isStealingEnabled - Flag indicating whether this node should attempt to steal jobs from other nodes

isStealingEnabled

public boolean isStealingEnabled()
Gets flag indicating whether this node should attempt to steal jobs from other nodes. If false, then this node will steal allow jobs to be stolen from it, but won't attempt to steal any jobs from other nodes.

Default value is true.

Specified by:
isStealingEnabled in interface GridJobStealingCollisionSpiMBean
Returns:
Flag indicating whether this node should attempt to steal jobs from other nodes.

getMaximumStealingAttempts

public int getMaximumStealingAttempts()
Gets maximum number of attempts to steal job by another node. If not specified, GridJobStealingCollisionSpi.DFLT_MAX_STEALING_ATTEMPTS value will be used.

Specified by:
getMaximumStealingAttempts in interface GridJobStealingCollisionSpiMBean
Returns:
Maximum number of attempts to steal job by another node.

setMaximumStealingAttempts

@GridSpiConfiguration(optional=true)
public void setMaximumStealingAttempts(int maxStealingAttempts)
Sets maximum number of attempts to steal job by another node. If not specified, GridJobStealingCollisionSpi.DFLT_MAX_STEALING_ATTEMPTS value will be used.

Note this value must be identical for all grid nodes in the grid.

Parameters:
maxStealingAttempts - Maximum number of attempts to steal job by another node.

setStealingAttributes

@GridSpiConfiguration(optional=true)
public void setStealingAttributes(Map<String,? extends Serializable> stealAttrs)
Configuration parameter to enable stealing to/from only nodes that have these attributes set (see GridNode.getAttribute(String) and GridConfiguration.getUserAttributes() methods).

Parameters:
stealAttrs - Node attributes to enable job stealing for.

getStealingAttributes

public Map<String,? extends Serializable> getStealingAttributes()
Configuration parameter to enable stealing to/from only nodes that have these attributes set (see GridNode.getAttribute(String) and GridConfiguration.getUserAttributes() methods).

Specified by:
getStealingAttributes in interface GridJobStealingCollisionSpiMBean
Returns:
Node attributes to enable job stealing for.

getCurrentWaitJobsCount

public int getCurrentWaitJobsCount()
Gets current number of jobs that wait for the execution.

Specified by:
getCurrentWaitJobsCount in interface GridJobStealingCollisionSpiMBean
Returns:
Number of jobs that wait for execution.

getCurrentActiveJobsCount

public int getCurrentActiveJobsCount()
Gets current number of jobs that are being executed.

Specified by:
getCurrentActiveJobsCount in interface GridJobStealingCollisionSpiMBean
Returns:
Number of active jobs.

getTotalStolenJobsCount

public int getTotalStolenJobsCount()
Gets total number of stolen jobs.

Specified by:
getTotalStolenJobsCount in interface GridJobStealingCollisionSpiMBean
Returns:
Number of stolen jobs.

getCurrentJobsToStealCount

public int getCurrentJobsToStealCount()
Gets current number of jobs to be stolen. This is outstanding requests number.

Specified by:
getCurrentJobsToStealCount in interface GridJobStealingCollisionSpiMBean
Returns:
Number of jobs to be stolen.

getNodeAttributes

public Map<String,Serializable> getNodeAttributes()
                                           throws GridSpiException
This method is called before SPI starts (before method GridSpi.spiStart(String) is called). It allows SPI implementation to add attributes to a local node. Kernel collects these attributes from all SPI implementations loaded up and then passes it to discovery SPI so that they can be exchanged with other nodes.

Specified by:
getNodeAttributes in interface GridSpi
Overrides:
getNodeAttributes in class GridSpiAdapter
Throws:
GridSpiException - Throws in case of any error.
Returns:
Map of local node attributes this SPI wants to add.

spiStart

public void spiStart(String gridName)
              throws GridSpiException
This method is called to start SPI. After this method returns successfully kernel assumes that SPI is fully operational.

Specified by:
spiStart in interface GridSpi
Throws:
GridSpiException - Throws in case of any error during SPI start.
Parameters:
gridName - Name of grid instance this SPI is being started for (null for default grid).

spiStop

public void spiStop()
             throws GridSpiException
This method is called to stop SPI. After this method returns kernel assumes that this SPI is finished and all resources acquired by it are released. Note that this method can be called at any point including during recovery of failed start. It should make no assumptions on what state SPI will be in when this method is called.

Specified by:
spiStop in interface GridSpi
Throws:
GridSpiException - Thrown in case of any error during SPI stop.

setExternalCollisionListener

public void setExternalCollisionListener(GridCollisionExternalListener listener)
Listener to be set for notification of external collision events (e.g. job stealing). Once grid receives such notification, it will immediately invoke collision SPI.

GridGain uses this listener to enable job stealing from overloaded to underloaded nodes. However, you can also utilize it, for instance, to provide time based collision resolution. To achieve this, you most likely would mark some job by setting a certain attribute in job context (see GridJobContext) for a job that requires time-based scheduling and set some timer in your SPI implementation that would wake up after a certain period of time. Once this period is reached, you would notify this listener that a collision resolution should take place. Then inside of your collision resolution logic, you would find the marked waiting job and activate it.

Note that most collision SPI's may not have external collisions. In that case, they should simply ignore this method and do nothing when listener is set.

Specified by:
setExternalCollisionListener in interface GridCollisionSpi
Parameters:
listener - Listener for external collision events.

onContextInitialized

public void onContextInitialized(GridSpiContext spiCtx)
                          throws GridSpiException
Callback invoked when SPI context is initialized. SPI implementation may store SPI context for future access.

This method is invoked after GridSpi.spiStart(String) method is completed, so SPI should be fully functional at this point. Use this method for post-start initialization, such as subscribing a discovery listener, sending a message to remote node, etc...

Specified by:
onContextInitialized in interface GridSpi
Overrides:
onContextInitialized in class GridSpiAdapter
Throws:
GridSpiException - If context initialization failed (grid will be stopped).
Parameters:
spiCtx - Spi context.

onContextDestroyed

public void onContextDestroyed()
Callback invoked prior to stopping grid before SPI context is destroyed. Once this method is complete, grid will begin shutdown sequence. Use this callback for de-initialization logic that may involve SPI context. Note that invoking SPI context after this callback is complete is considered illegal and may produce unknown results.

If GridSpiAdapter is used for SPI implementation, then it will replace actual context with dummy no-op context which is usually good-enough since grid is about to shut down.

Specified by:
onContextDestroyed in interface GridSpi
Overrides:
onContextDestroyed in class GridSpiAdapter

onCollision

public void onCollision(Collection<GridCollisionJobContext> waitJobs,
                        Collection<GridCollisionJobContext> activeJobs)
This is a callback called when either new grid job arrived or executing job finished its execution. When new job arrives it is added to the end of the wait list and this method is called. When job finished its execution, it is removed from the active list and this method is called (i.e., when grid job is finished it will not appear in any list in collision resolution).

Implementation of this method should act on two lists, each of which contains collision job contexts that define a set of operations available during collision resolution. Refer to GridCollisionJobContext documentation for more information.

Specified by:
onCollision in interface GridCollisionSpi
Parameters:
waitJobs - Ordered collection of collision contexts for jobs that are currently waiting for execution. It can be empty but never null. Note that a new newly arrived job, if any, will always be represented by the last item in this list.
activeJobs - Ordered collection of collision contexts for jobs that are currently executing. It can be empty but never null.

checkConfigurationConsistency

protected void checkConfigurationConsistency(GridNode node)
Checks remote node attributes.

Overrides:
checkConfigurationConsistency in class GridSpiAdapter
Parameters:
node - Remote node.

toString

public String toString()

Overrides:
toString in class Object

GridGain™ 2.0.3
Java API Specification

GridGain™ - Grid Computing Made Simple, ver. 2.0.3.20052008
2005-2008 Copyright © GridGain Systems. All Rights Reserved.