如何在不使用InheritableThreadLocal的情况下为每个*进程/线程创建共享上下文?

时间:2022-10-25 20:39:56

I'd like to see if there's a good pattern for sharing a context across all classes and subthreads of a top-level thread without using InheritableThreadLocal.

我想看看是否有一个很好的模式,可以在不使用InheritableThreadLocal的情况下在*线程的所有类和子线程之间共享上下文。

I've got several top-level processes that each run in their own thread. These top-level processes often spawn temporary subthreads.

我有几个*进程,每个进程都在自己的线程中运行。这些*进程通常会产生临时子线程。

I want each top level process to have and manage it's own database connection.

我希望每个*流程都拥有并管理自己的数据库连接。

I do not want to pass around the database connection from class to class and from thread to subthread (my associate calls this the "community bicycle" pattern). These are big top-level processes and it would mean editing probably hundreds of method signatures to pass around this database connection.

我不想传递从类到类以及从线程到子线程的数据库连接(我的同事称之为“社区自行车”模式)。这些是*的大流程,这意味着可能会编辑数百个方法签名来传递这个数据库连接。

Right now I call a singleton to get the database connection manager. The singleton uses InheritableThreadLocal so that each top-level process has it's own version of it. While I know some people have problems with singletons, it means I can just say DBConnector.getDBConnection(args) (to paraphrase) whenever I need the correctly managed connection. I am not tied to this method if I can find a better and yet still-clean solution.

现在我调用单例来获取数据库连接管理器。单例使用InheritableThreadLocal,以便每个*进程都有自己的版本。虽然我知道有些人遇到单身人士问题,但这意味着只要我需要正确管理的连接,我就可以说DBConnector.getDBConnection(args)(用语)。如果我能找到一个更好但仍然干净的解决方案,我并不依赖于这种方法。

For various reasons InheritableThreadLocal is proving to be tricky. (See this question.)

由于各种原因,InheritableThreadLocal被证明是棘手的。 (见这个问题。)

Does anyone have a suggestion to handle this kind of thing that doesn't require either InheritableThreadLocal or passing around some context object all over the place?

有没有人有一个建议来处理这种不需要InheritableThreadLocal或在整个地方传递一些上下文对象的东西?

Thanks for any help!

谢谢你的帮助!


Update: I've managed to solve the immediate problem (see the linked question) but I'd still like to hear about other possible approaches. forty-two's suggestion below is good and does work (thanks!), but see the comments for why it's problematic. If people vote for jtahlborn's answer and tell me that I'm being obsessive for wanting to avoid passing around my database connection then I will relent, select that as my answer, and revise my world-view.

更新:我设法解决了眼前的问题(请参阅相关问题),但我仍然希望了解其他可能的方法。下面的四十二个建议是好的并且确实有效(谢谢!),但请参阅评论为什么它有问题。如果人们投票支持jtahlborn的答案,并告诉我,我因为想要避免绕过我的数据库连接而迷恋,那么我会松懈,选择那个作为我的答案,并修改我的世界观。

5 个解决方案

#1


3  

I haven't tested this, but the idea is to create a customized ThreadPoolExecutor that knows how to get the context object and use #beforeExecute() to transfer the context object to the thread that is going to execute the task. To be a nice citizen, you should also clear the context object in #afterEXecute(), but I leave that as an exercise.

我没有对此进行过测试,但我的想法是创建一个自定义的ThreadPoolExecutor,它知道如何获取上下文对象并使用#beforeExecute()将上下文对象传递给将要执行任务的线程。要成为一个好公民,你还应该清除#afterEXecute()中的上下文对象,但我把它留作练习。

public class XyzThreadPoolExecutor extends ThreadPoolExecutor  {

public XyzThreadPoolExecutor() {
    super(3, 3, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new MyThreadFactory());
}

@Override
public void execute(Runnable command) {
    /*
     * get the context object from the calling thread
     */
    Object context = null;
    super.execute(new MyRunnable(context, command));
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
    ((MyRunnable)r).updateThreadLocal((MyThread) t);
    super.beforeExecute(t, r);
}

private static class MyThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        return new MyThread(r);
    }

}

private class MyRunnable implements Runnable {
    private final Object context;
    private final Runnable delegate;

    public MyRunnable(Object context, Runnable delegate) {
        super();
        this.context = context;
        this.delegate = delegate;
    }

    void updateThreadLocal(MyThread thread) {
        thread.setContext(context);
    }

    @Override
    public void run() {
        delegate.run();

    }
}

private static class MyThread extends Thread {

    public MyThread(Runnable target) {
        super(target);
    }

    public void setContext(Object context) {
        // set the context object here using thread local
    }

}
}

#2


3  

the "community bicycle" solution (as you call it) is actually much better than the global (or pseudo global) singleton that you are currently using. it makes the code testable and it makes it very easy to choose which classes use which context. if done well, you don't need to add the context object to every method signature. you generally ensure that all the "major" classes have a reference to the current context, and that any "minor" classes have access to the relevant "major" class. one-off methods which may need access to the context will need their method signatures updated, but most classes should have the context available through a member variable.

“社区自行车”解决方案(正如您所说)实际上比您当前使用的全局(或伪全局)单例要好得多。它使代码可测试,并且可以很容易地选择哪些类使用哪个上下文。如果做得好,则不需要将上下文对象添加到每个方法签名。您通常会确保所有“主要”类都引用当前上下文,并且任何“次要”类都可以访问相关的“主要”类。可能需要访问上下文的一次性方法需要更新其方法签名,但大多数类应该通过成员变量提供上下文。

#3


2  

As a ThreadLocal is essentially a Map keyed on your thread, couldn't you implement a Map keyed on your thread name? All you then need is an effective naming strategy that meets your requirements.

由于ThreadLocal本质上是一个键入你的线程的Map,你不能实现一个键入你的线程名称的Map吗?您需要的只是一种符合您要求的有效命名策略。

#4


1  

As a Lisper, I very much agree with your worldview and would consider it a shame if you were to revise it. :-)

作为一个Lisper,我非常同意你的世界观,如果你要修改它,会认为这是一种耻辱。 :-)

If it were me, I would simply use a ThreadGroup for each top-level process, and associate each connection with the group the caller is running in. If using in conjunction with thread pools, just ensure the pools use threads in the correct thread group (for instance, by having a pool per thread group).

如果是我,我只需为每个*进程使用ThreadGroup,并将每个连接与调用者运行的组相关联。如果与线程池一起使用,只需确保池使用正确线程组中的线程(例如,通过为每个线程组创建一个池)。

Example implementation:

public class CachedConnection {
     /* Whatever */
}

public class ProcessContext extends ThreadGroup {
    private static final Map<ProcessContext, Map<Class, Object>> contexts = new WeakHashMap<ProcessContext, Map<Class, Object>>();

    public static T getContext(Class<T> cls) {
        ProcessContext tg = currentContext();
        Map<Class, Object> ctx;
        synchronized(contexts) {
            if((ctx = contexts.get(tg)) == null)
                contexts.put(tg, ctx = new HashMap<Class, Object>());
        }
        synchronized(ctx) {
            Object cur = ctx.get(cls);
            if(cur != null)
                return(cls.cast(cur));
            T new_t;
            try {
                new_t = cls.newInstance();
            } catch(Exception e) {
                throw(new RuntimeException(e));
            }
            ctx.put(cls, new_t);
            return(new_t);
        }
    }

    public static ProcessContext currentContext() {
        ThreadGroup tg = Thread.currentThread().getThreadGroup();
        while(true) {
            if(tg instanceof ProcessContext)
                return((ProcessContext)tg);
            tg = tg.getParent();
            if(tg == null)
                throw(new IllegalStateException("Not running in a ProcessContext"));
        }
    }
}

If you then simply make sure to run all your threads in a proper ProcessContext, you can get a CachedConnection anywhere by calling ProcessContext.getContext(CachedConnection.class).

如果您只是确保在适当的ProcessContext中运行所有线程,则可以通过调用ProcessContext.getContext(CachedConnection.class)在任何地方获取CachedConnection。

Of course, as mentioned above, you would have to make sure that any other threads you may delegate work to also run in the correct ProcessContext, but I'm pretty sure that problem is inherent in your description -- you would obviously need to specify somehow which one of multiple contexts your delegation workers run in. If anything, it could be conceivable to modify ProcessContext as follows:

当然,如上所述,您必须确保您委托的任何其他线程也能在正确的ProcessContext中运行,但我很确定您的描述中存在问题 - 您显然需要指定不知怎的,你的代表团工作人员遇到了多个上下文中的哪一个。如果有的话,可以想象修改ProcessContext如下:

public class ProcessContext extends ThreadGroup {
    /* getContext() as above */

    private static final ThreadLocal<ProcessContext> tempctx = new ThreadLocal<ProcessContext>();

    public static ProcessContext currentContext() {
        if(tempctx.get() != null)
            return(tempctx.get());
        ThreadGroup tg = Thread.currentThread().getThreadGroup();
        while(true) {
            if(tg instanceof ProcessContext)
                return((ProcessContext)tg);
            tg = tg.getParent();
            if(tg == null)
                throw(new IllegalStateException("Not running in a ProcessContext"));
        }
    }

    public class RunnableInContext implements Runnable {
        private final Runnable delegate;
        public RunnableInContext(Runnable delegate) {this.delegate = delegate;}

        public void run() {
            ProcessContext old = tempctx.get();
            tempctx.set(ProcessContext.this);
            try {
                delegate.run();
            } finally {
                tempctx.set(old);
            }
        }
    }

    public static Runnable wrapInContext(Runnable delegate) {
        return(currentContext().new RunnableInContext(delegate));
    }
}

That way, you could use ProcessContext.wrapInContext() to pass a Runnable which, when run, inherits its context from where it was created.

这样,您可以使用ProcessContext.wrapInContext()来传递Runnable,该Runnable在运行时会从创建它的位置继承其上下文。

(Note that I haven't actually tried the above code, so it may well be full of typos.)

(请注意,我实际上没有尝试过上面的代码,所以它可能充满了拼写错误。)

#5


1  

I would not support your world-view and jthalborn's idea on the count that its more testable even.

我不支持你的世界观和jthalborn关于伯爵的想法,即使它更可测试。

Though paraphrasing first what I have understood from your problme statement is like this.

虽然从你的问题陈述中首先解释我所理解的是这样的。

  1. There are 3 or 4 top-level processes (and they are basically having a thread of their own). And connection object is what is diffrenet in them.
  2. 有3或4个*进程(它们基本上都有自己的线程)。连接对象就是diffrenet。

  3. You need some basic characteristic of Connection to be set up and done once.
  4. 您需要设置和完成一次Connection的一些基本特性。

  5. The child threads in no way change the Connection object passe to them from top-level threads.
  6. 子线程决不会将顶层线程中的Connection对象passe更改为它们。

Here is what I propose, you do need the one tim,e set-up of you Connection but then in each of your top-level process, you do 1) further processing of that Connection 2) keep a InheriatbleThreadLocal (and the child process of your top-level thread will have the modified connection object. 3) Pass these threasd implementing classes. MyThread1, MyThread2, MyThread3, ... MyThread4 in the Executor. (This is different from the other linked question of yours that if you need some gating, Semaphore is a better approach)

这是我的建议,你确实需要一个时间,你需要设置连接,但是在你的每个顶层进程中,你做1)进一步处理该连接2)保持一个InheriatbleThreadLocal(和子进程)您的*线程将具有已修改的连接对象.3)传递这些threasd实现类。执行者中的MyThread1,MyThread2,MyThread3,... MyThread4。 (这与你的另一个相关问题不同,如果你需要一些门控,信号量是一种更好的方法)

Why I said that its not less testable than jthalborn's view is that in that case also you anyway again needs to provide mocked Connection object. Here too. Plus conecptually passing the object and keeping the object in ThreadLocal is one and the same (InheritableThreadLocal is a map which gets passed by java inbuilt way, nothing bad here I believe).

为什么我说它不比jthalborn的观点更可测试的是,在这种情况下,你无论如何也需要提供模拟的Connection对象。这里也。另外,直接传递对象并将对象保留在ThreadLocal中是一样的(InheritableThreadLocal是一个以java内置方式传递的地图,我相信这里没什么不好的)。

EDIT: I did keep in account that its a closed system and we are not having "free" threads tempring with connection

编辑:我确实记得它是一个封闭的系统,我们没有“免费”线程临时连接

#1


3  

I haven't tested this, but the idea is to create a customized ThreadPoolExecutor that knows how to get the context object and use #beforeExecute() to transfer the context object to the thread that is going to execute the task. To be a nice citizen, you should also clear the context object in #afterEXecute(), but I leave that as an exercise.

我没有对此进行过测试,但我的想法是创建一个自定义的ThreadPoolExecutor,它知道如何获取上下文对象并使用#beforeExecute()将上下文对象传递给将要执行任务的线程。要成为一个好公民,你还应该清除#afterEXecute()中的上下文对象,但我把它留作练习。

public class XyzThreadPoolExecutor extends ThreadPoolExecutor  {

public XyzThreadPoolExecutor() {
    super(3, 3, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new MyThreadFactory());
}

@Override
public void execute(Runnable command) {
    /*
     * get the context object from the calling thread
     */
    Object context = null;
    super.execute(new MyRunnable(context, command));
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
    ((MyRunnable)r).updateThreadLocal((MyThread) t);
    super.beforeExecute(t, r);
}

private static class MyThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        return new MyThread(r);
    }

}

private class MyRunnable implements Runnable {
    private final Object context;
    private final Runnable delegate;

    public MyRunnable(Object context, Runnable delegate) {
        super();
        this.context = context;
        this.delegate = delegate;
    }

    void updateThreadLocal(MyThread thread) {
        thread.setContext(context);
    }

    @Override
    public void run() {
        delegate.run();

    }
}

private static class MyThread extends Thread {

    public MyThread(Runnable target) {
        super(target);
    }

    public void setContext(Object context) {
        // set the context object here using thread local
    }

}
}

#2


3  

the "community bicycle" solution (as you call it) is actually much better than the global (or pseudo global) singleton that you are currently using. it makes the code testable and it makes it very easy to choose which classes use which context. if done well, you don't need to add the context object to every method signature. you generally ensure that all the "major" classes have a reference to the current context, and that any "minor" classes have access to the relevant "major" class. one-off methods which may need access to the context will need their method signatures updated, but most classes should have the context available through a member variable.

“社区自行车”解决方案(正如您所说)实际上比您当前使用的全局(或伪全局)单例要好得多。它使代码可测试,并且可以很容易地选择哪些类使用哪个上下文。如果做得好,则不需要将上下文对象添加到每个方法签名。您通常会确保所有“主要”类都引用当前上下文,并且任何“次要”类都可以访问相关的“主要”类。可能需要访问上下文的一次性方法需要更新其方法签名,但大多数类应该通过成员变量提供上下文。

#3


2  

As a ThreadLocal is essentially a Map keyed on your thread, couldn't you implement a Map keyed on your thread name? All you then need is an effective naming strategy that meets your requirements.

由于ThreadLocal本质上是一个键入你的线程的Map,你不能实现一个键入你的线程名称的Map吗?您需要的只是一种符合您要求的有效命名策略。

#4


1  

As a Lisper, I very much agree with your worldview and would consider it a shame if you were to revise it. :-)

作为一个Lisper,我非常同意你的世界观,如果你要修改它,会认为这是一种耻辱。 :-)

If it were me, I would simply use a ThreadGroup for each top-level process, and associate each connection with the group the caller is running in. If using in conjunction with thread pools, just ensure the pools use threads in the correct thread group (for instance, by having a pool per thread group).

如果是我,我只需为每个*进程使用ThreadGroup,并将每个连接与调用者运行的组相关联。如果与线程池一起使用,只需确保池使用正确线程组中的线程(例如,通过为每个线程组创建一个池)。

Example implementation:

public class CachedConnection {
     /* Whatever */
}

public class ProcessContext extends ThreadGroup {
    private static final Map<ProcessContext, Map<Class, Object>> contexts = new WeakHashMap<ProcessContext, Map<Class, Object>>();

    public static T getContext(Class<T> cls) {
        ProcessContext tg = currentContext();
        Map<Class, Object> ctx;
        synchronized(contexts) {
            if((ctx = contexts.get(tg)) == null)
                contexts.put(tg, ctx = new HashMap<Class, Object>());
        }
        synchronized(ctx) {
            Object cur = ctx.get(cls);
            if(cur != null)
                return(cls.cast(cur));
            T new_t;
            try {
                new_t = cls.newInstance();
            } catch(Exception e) {
                throw(new RuntimeException(e));
            }
            ctx.put(cls, new_t);
            return(new_t);
        }
    }

    public static ProcessContext currentContext() {
        ThreadGroup tg = Thread.currentThread().getThreadGroup();
        while(true) {
            if(tg instanceof ProcessContext)
                return((ProcessContext)tg);
            tg = tg.getParent();
            if(tg == null)
                throw(new IllegalStateException("Not running in a ProcessContext"));
        }
    }
}

If you then simply make sure to run all your threads in a proper ProcessContext, you can get a CachedConnection anywhere by calling ProcessContext.getContext(CachedConnection.class).

如果您只是确保在适当的ProcessContext中运行所有线程,则可以通过调用ProcessContext.getContext(CachedConnection.class)在任何地方获取CachedConnection。

Of course, as mentioned above, you would have to make sure that any other threads you may delegate work to also run in the correct ProcessContext, but I'm pretty sure that problem is inherent in your description -- you would obviously need to specify somehow which one of multiple contexts your delegation workers run in. If anything, it could be conceivable to modify ProcessContext as follows:

当然,如上所述,您必须确保您委托的任何其他线程也能在正确的ProcessContext中运行,但我很确定您的描述中存在问题 - 您显然需要指定不知怎的,你的代表团工作人员遇到了多个上下文中的哪一个。如果有的话,可以想象修改ProcessContext如下:

public class ProcessContext extends ThreadGroup {
    /* getContext() as above */

    private static final ThreadLocal<ProcessContext> tempctx = new ThreadLocal<ProcessContext>();

    public static ProcessContext currentContext() {
        if(tempctx.get() != null)
            return(tempctx.get());
        ThreadGroup tg = Thread.currentThread().getThreadGroup();
        while(true) {
            if(tg instanceof ProcessContext)
                return((ProcessContext)tg);
            tg = tg.getParent();
            if(tg == null)
                throw(new IllegalStateException("Not running in a ProcessContext"));
        }
    }

    public class RunnableInContext implements Runnable {
        private final Runnable delegate;
        public RunnableInContext(Runnable delegate) {this.delegate = delegate;}

        public void run() {
            ProcessContext old = tempctx.get();
            tempctx.set(ProcessContext.this);
            try {
                delegate.run();
            } finally {
                tempctx.set(old);
            }
        }
    }

    public static Runnable wrapInContext(Runnable delegate) {
        return(currentContext().new RunnableInContext(delegate));
    }
}

That way, you could use ProcessContext.wrapInContext() to pass a Runnable which, when run, inherits its context from where it was created.

这样,您可以使用ProcessContext.wrapInContext()来传递Runnable,该Runnable在运行时会从创建它的位置继承其上下文。

(Note that I haven't actually tried the above code, so it may well be full of typos.)

(请注意,我实际上没有尝试过上面的代码,所以它可能充满了拼写错误。)

#5


1  

I would not support your world-view and jthalborn's idea on the count that its more testable even.

我不支持你的世界观和jthalborn关于伯爵的想法,即使它更可测试。

Though paraphrasing first what I have understood from your problme statement is like this.

虽然从你的问题陈述中首先解释我所理解的是这样的。

  1. There are 3 or 4 top-level processes (and they are basically having a thread of their own). And connection object is what is diffrenet in them.
  2. 有3或4个*进程(它们基本上都有自己的线程)。连接对象就是diffrenet。

  3. You need some basic characteristic of Connection to be set up and done once.
  4. 您需要设置和完成一次Connection的一些基本特性。

  5. The child threads in no way change the Connection object passe to them from top-level threads.
  6. 子线程决不会将顶层线程中的Connection对象passe更改为它们。

Here is what I propose, you do need the one tim,e set-up of you Connection but then in each of your top-level process, you do 1) further processing of that Connection 2) keep a InheriatbleThreadLocal (and the child process of your top-level thread will have the modified connection object. 3) Pass these threasd implementing classes. MyThread1, MyThread2, MyThread3, ... MyThread4 in the Executor. (This is different from the other linked question of yours that if you need some gating, Semaphore is a better approach)

这是我的建议,你确实需要一个时间,你需要设置连接,但是在你的每个顶层进程中,你做1)进一步处理该连接2)保持一个InheriatbleThreadLocal(和子进程)您的*线程将具有已修改的连接对象.3)传递这些threasd实现类。执行者中的MyThread1,MyThread2,MyThread3,... MyThread4。 (这与你的另一个相关问题不同,如果你需要一些门控,信号量是一种更好的方法)

Why I said that its not less testable than jthalborn's view is that in that case also you anyway again needs to provide mocked Connection object. Here too. Plus conecptually passing the object and keeping the object in ThreadLocal is one and the same (InheritableThreadLocal is a map which gets passed by java inbuilt way, nothing bad here I believe).

为什么我说它不比jthalborn的观点更可测试的是,在这种情况下,你无论如何也需要提供模拟的Connection对象。这里也。另外,直接传递对象并将对象保留在ThreadLocal中是一样的(InheritableThreadLocal是一个以java内置方式传递的地图,我相信这里没什么不好的)。

EDIT: I did keep in account that its a closed system and we are not having "free" threads tempring with connection

编辑:我确实记得它是一个封闭的系统,我们没有“免费”线程临时连接