行云无鸣

2010-06-26

采用Spring AOP控制线程的非正常退出

Filed under: 乱语 — 标签:, , , — hellyguo @ 16:58

项目中碰到线程的非正常退出,导致某一块功能缺失,是非常危险的。
虽然可以采用定时刷新线程状态这种方式来处理监控,但总是很累赘。
昨日受到启发,可以采用AOP来控制线程的非异常退出。
基本思路:

  1. 当run方法正常结束时,判断是否是业务或系统主动要求的退出,主动要求的,不再重启;非主动要求的,等待此次退出后重启线程;
  2. 当run方法异常结束时,判断是否有必要重启,有必要,重启;没有必要,不再重启。

方法如下:
Spring配置文件

<bean id="demoThread" parent="singleExecThread">
<property name="target">
<ref local="demoThreadBody" />
</property>
</bean>

<bean id="demoThreadBody" class="CommonSingleThread"
autowire="byName">
<property name="runTarget">
<ref local="demoThreadTarget" />
</property>
<property name="idName" value="demoThread" />
</bean>

<bean id="demoThreadTarget" class="DemoThread"
autowire="byName">
<property name="id" value="0" />
</bean>

<bean id="throwsAdvice"
class="ThrowsFromCommonSingleThreadRunAdvice"></bean>

<bean id="throwsAdvisor"
class="org.springframework.aop.support.NameMatchMethodPointcutAdvisor">
<property name="advice">
<ref local="throwsAdvice" />
</property>
<property name="mappedName">
<value>run</value>
</property>
</bean>

<bean id="afterAdvice" class="AfterCommonSingleThreadRunAdvice"></bean>

<bean id="afterAdvisor"
class="org.springframework.aop.support.NameMatchMethodPointcutAdvisor">
<property name="advice">
<ref local="afterAdvice" />
</property>
<property name="mappedName">
<value>run</value>
</property>
</bean>

<bean id="singleExecThread" class="org.springframework.aop.framework.ProxyFactoryBean"
abstract="true">
<property name="proxyInterfaces">
<value>ICommonSingleThread</value>
</property>
<property name="interceptorNames">
<list>
<value>throwsAdvisor</value>
<value>afterAdvisor</value>
</list>
</property>
</bean>

DemoThread

public class DemoThread implements Runnable, Constants {
private int id;

public void setId(int id) {
this.id = id;
}

public void run() {
LOGGER.info("start");
if (id % 2 == 0) {
throw new RuntimeException();
}
LOGGER.info("quit");
}

}

ICommonSingleThread

/**
* 单线程控制接口
* ICommonSingleThread Jun 26, 2010 10:39:06 AM
*
* @author helly
*
*/
public interface ICommonSingleThread extends Runnable {

/**
* 设置线程控制标志
*
* @param active
*/
public abstract void setActive(boolean active);

/**
* 获取线程控制标志
*
*/
public abstract boolean isActive();

/**
* 设置线程执行主体
*
* @param target
*/
public abstract void setRunTarget(Runnable runTarget);

/**
* spring配置文件中唯一id
*
* @param id
*/
public abstract void setIdName(String beanId);

/**
* 获取线程状态
*
* @return
*/
public abstract boolean getThreadState();

/**
* 初始化
*/
public abstract void init();

/**
* 销毁
*/
public abstract void destory();

}

CommonSingleThread

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

/**
* 通用线程线程
* CommonSingleThread Jun 23, 2010 8:57:04 PM
*
* @author helly
*
*/
public class CommonSingleThread implements Runnable, Constants,
ICommonSingleThread, ApplicationContextAware {
// Spring ApplicationContext
protected static ApplicationContext appcontext;
// 对象锁
protected Object lock = new Object();
// 线程控制标志
protected boolean active = true;
// 线程
protected Thread thread;
// 运行实例
protected Runnable runTarget;
// spring代理对象
protected Runnable proxyObject;
// spring配置文件中唯一id
protected String idName;

/**
* 设置线程控制标志
*
* @param active
*/
public void setActive(boolean active) {
synchronized (lock) {
this.active = active;
}
}

/**
* 获取线程控制标志
*
* @return
*/
public boolean isActive() {
return active;
}

/**
* 设置线程执行主体
*
* @param target
*/
public void setRunTarget(Runnable runTarget) {
this.runTarget = runTarget;
}

/**
* spring配置文件中唯一id
*
* @param id
*/
public void setIdName(String idName) {
this.idName = idName;
}

/**
* 获取线程状态
*
* @return
*/
public boolean getThreadState() {
synchronized (lock) {
if (thread == null) {
return false;
} else {
return thread.isAlive();
}
}
}

/**
* 初始化
*/
public void init() {
synchronized (lock) {
// 设置代理对象
if (proxyObject == null) {
setupProxyObject();
}
if (!getThreadState()) {
destory();
active = true;
createNewThreadAndStart();
}
}
}

/**
* 根据idName获取被spring代理过的对象
*/
private void setupProxyObject() {
proxyObject = (Runnable) appcontext.getBean(idName);
}

/**
* 创建新线程
*/
protected void createNewThreadAndStart() {
// 创建线程,Runnable主体必须用代理对象创建
// 原因:如果用内部的runTarget对象创建,无法调用到Advice
// 只有调用proxyObject对象,才能通过AOP调用Advice
thread = new Thread(proxyObject, runTarget.getClass().getSimpleName());
thread.start();
}

/**
* 线程执行体
*/
public void run() {
while (active) {
try {
runTarget();
} catch (Exception e) {
LOGGER.warn(e.getMessage(), e);
throw new RuntimeException(e);
}
break;
}
}

/**
* 执行
*/
protected void runTarget() {
runTarget.run();
}

/**
* 销毁
*/
public void destory() {
synchronized (lock) {
if (thread != null) {
active = false;
while (true) {
try {
thread.join();
active = true;
break;
} catch (InterruptedException e) {
continue;
}
}
}
}
}

/**
* 获取spirng上下文
*
* @param arg0
* @throws BeansException
*/
public void setApplicationContext(ApplicationContext context)
throws BeansException {
if (appcontext == null) {
appcontext = context;
}
}
}

AfterAdvice

import java.lang.reflect.Method;

import org.springframework.aop.AfterReturningAdvice;

/**
* CommonSingleThread正常退出善后类
* AfterCommonSingleThreadRunAdvice Jun 26, 2010 10:58:35 AM
*
* @author helly
*
*/
public class AfterCommonSingleThreadRunAdvice implements AfterReturningAdvice,
Constants {

public void afterReturning(Object retValue, Method method, Object[] args,
Object target) throws Throwable {
LOGGER.debug("进入AfterCommonSingleThreadRunAdvice");
// 获取对象
final ICommonSingleThread cst = (ICommonSingleThread) target;
// 获取线程名称
final String thName = Thread.currentThread().getName();
// 校验是否需要重新启动
// 如若active状态为true,需要重启
// 否则,不需要
if (cst.isActive()) {
LOGGER.info("需要重启" + thName);
Runnable runit = new Runnable() {
public void run() {
LOGGER.info("重启" + thName);
// 必须等待
// 原因:要预留时间给原异常线程退出。
// 如若不等待,原线程还存活,init方法无法启动新线程
try {
Thread.sleep(5000l);
} catch (InterruptedException e) {
}
// 启动新线程
cst.init();
LOGGER.info("重启" + thName + "完成");
}
};
new Thread(runit, "RestartNormalExitThreadFor:" + thName).start();
}
}

}

ThrowsAdvice

import java.lang.reflect.Method;

import org.springframework.aop.ThrowsAdvice;

/**
* CommonSingleThread异常退出善后类
* ThrowsFromCommonSingleThreadRunAdvice Jun 26, 2010 10:55:18 AM
*
* @author helly
*
*/
public class ThrowsFromCommonSingleThreadRunAdvice implements ThrowsAdvice,
Constants {

/**
* 当异常退出时,进入此方法
*
* @param method
* @param args
* @param target
* @param ex
*/
public void afterThrowing(Method method, Object[] args, Object target,
Exception ex) {
LOGGER.debug("进入ThrowsFromCommonSingleThreadRunAdvice");
// 获取对象
final ICommonSingleThread cst = (ICommonSingleThread) target;
// 获取线程名称
final String thName = Thread.currentThread().getName();
// 校验是否需要重新启动
// 如若active状态为true,需要重启
// 否则,不需要
if (cst.isActive()) {
LOGGER.info("需要重启" + thName);
Runnable runit = new Runnable() {
public void run() {
LOGGER.info("重启" + thName);
// 必须等待
// 原因:要预留时间给原异常线程退出。
// 如若不等待,原线程还存活,init方法无法启动新线程
try {
Thread.sleep(5000l);
} catch (InterruptedException e) {
}
// 启动新线程
cst.init();
LOGGER.info("重启" + thName + "完成");
}
};
new Thread(runit, "RestartExceptionExitThreadFor:" + thName).start();
}
}
}

Test

import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

public class DemoThreadTest {
private ICommonSingleThread th;

@Test
public void test() throws Exception {
th.init();
Thread.sleep(1000l);
}

@BeforeTest
public void beforeTest() {
Log4jLoader.loadLog4j();
SpringLoader.loadSpring();
th = (ICommonSingleThread) SpringLoader.context.getBean("demoThread");
}

}

2008-10-29

6个关于Java线程的demo

Filed under: 未分类 — 标签:, , — hellyguo @ 13:35

package thread.learn;

/**
* 该demo主要演示线程在遭遇RuntimeException时的状况
* 线程不会自己处理,直接中断线程执行
*
* @author hellyguo
*
*/
public class ThreadTest1 implements Runnable {
public static void main(String[] args) {
ThreadTest1 test1 = new ThreadTest1();
Thread th = new Thread(test1, "ThreadTest1");
th.start();
}

public void run() {
System.out.println("执行之");
if (new NullPointerException() instanceof RuntimeException) {
throw new RuntimeException();
}
System.out.println("永远不被执行");
}
}

package thread.learn;

/**
* 该demo主要演示多线程同步执行
* 可以看出,sleep不会放弃对对象的锁定
*
* @author hellyguo
*
*/
public class ThreadTest2 implements Runnable {

private final static Object LOCK = new Object();

public static void main(String[] args) {
Thread[] ths = new Thread[10];
for (int i = 0; i < ths.length; i++) {
ths[i] = new Thread(new ThreadTest2(), "ThreadTest2_" + i);
ths[i].start();
}
}

public void run() {
while (true) {
synchronized (LOCK) {
System.out.println(Thread.currentThread().getName() + "获得锁");
System.out.println(Thread.currentThread().getName() + "进入等待");
try {
Thread.sleep(1000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()
+ "退出等待,放弃锁");
}
}
}
}

package thread.learn;

import java.util.concurrent.locks.ReentrantLock;

/**
* 该demo主要演示多线程同步执行的新型方法
* 该方法依赖JDK5
* 效率,较synchronized高
* 但其重入顺序,需考虑
*
* @author hellyguo
*
*/
public class ThreadTest3 implements Runnable {
private final static ReentrantLock LOCK = new ReentrantLock();

public static void main(String[] args) {
Thread[] ths = new Thread[10];
for (int i = 0; i < ths.length; i++) {
ths[i] = new Thread(new ThreadTest3(), "ThreadTest3_" + i);
ths[i].start();
}
}

public void run() {
while (true) {
LOCK.lock();
try {
System.out.println(Thread.currentThread().getName() + "获得锁");
System.out.println(Thread.currentThread().getName() + "进入等待");
System.out.println(Thread.currentThread().getName()
+ "退出等待,放弃锁");
} finally {
LOCK.unlock();
}
try {
Thread.sleep(1000l);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}
}

package thread.learn;

import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* 该demo主要演示多线程同步读写执行的新型方法
* 该方法依赖JDK5
* 效率,较synchronized高
* 但其重入顺序,需考虑
* 可分为读锁及写锁
* 读锁可共享锁定
* 写锁只能唯一锁定
*
* @author hellyguo
*
*/
public class ThreadTest4 {
private final static ReentrantReadWriteLock LOCK = new ReentrantReadWriteLock();

private final static String CDATA = "demo";

private final static String BLANK = " ";

private static String DATA = CDATA;

public static void main(String[] args) {
ThreadTest4 test4 = new ThreadTest4();
Thread[] ths = new Thread[5];
int i = 0;
for (i = 0; i 1) {
ths[i] = new Thread(test4.new ReadTarget(),
"ThreadTest4_ReadTarget_" + i);
} else {
ths[i] = new Thread(test4.new WriteTarget(),
"ThreadTest4_WriteTarget_" + i);
}
ths[i].start();
}
}

class ReadTarget implements Runnable {

public void run() {
while (true) {
LOCK.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()
+ "获得读锁");
System.out.println(DATA);
System.out.println(Thread.currentThread().getName()
+ "放弃读锁");
} finally {
LOCK.readLock().unlock();
}
try {
Thread.sleep(1000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class WriteTarget implements Runnable {

public void run() {
while (true) {
LOCK.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()
+ "获得写锁");
DATA = Thread.currentThread().getName() + BLANK + CDATA;
System.out.println(DATA);
System.out.println(Thread.currentThread().getName()
+ "放弃写锁");
} finally {
LOCK.writeLock().unlock();
}
try {
Thread.sleep(1000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

}

package thread.learn;

/**
* 该demo主要演示如何等待线程停止
*
* @author hellyguo
*
*/
public class ThreadTest5 implements Runnable {
public static void main(String[] args) {
ThreadTest5 test = new ThreadTest5();
Thread th = new Thread(test, "ThreadTest5");
System.out.println(th.getName() + "准备运行");
th.start();
System.out.println(th.getName() + "正在运行中");
try {
th.join();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(th.getName() + "运行结束");
}

public void run() {
for (int i = 0; i < 100; i++) {
System.out.println("运行中:第" + (i + 1) + "次");
}
}
}

package thread.learn;

/**
* 该demo主要演示
* 1)中断只对休眠线程有效,运行中的线程不响应
* 2)在JVM中,所有前台线程退出后,不理会后台线程是否还有存活,JVM直接退出运行
*
* @author hellyguo
*
*/
public class ThreadTest6 implements Runnable {
private final static String BLANK = " ";

public static void main(String[] args) {
ThreadTest6 test = new ThreadTest6();
Thread th = new Thread(test, "ThreadTest6");
System.out.println(Thread.currentThread().getName() + "准备运行");
th.setDaemon(true);
th.start();
System.out.println(Thread.currentThread().getName() + "正在运行中");
for (int i = 0; i < 100; i++) {
System.out.println("尝试打断" + th.getName() + "休眠");
th.interrupt();
try {
Thread.sleep(100l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "运行结束");
}

public void run() {
while (true) {
System.out.println(Thread.currentThread().getName() + "运行中");
for (int j = 0; j < 10000; j++) {
System.out.println(Thread.currentThread().getName() + "运行中"
+ BLANK + j);
}
try {
Thread.sleep(1000l);
} catch (InterruptedException e) {
System.out.println("休眠被打断");
}
}
}
}

%d 博主赞过: