在__main__中调用While循环函数?

时间:2021-09-11 20:43:06

Here I have a program which polls the queue for an event, if found it executes an order to a REST API. In addition if an event is found, it prints the current price that I need to use as my stopLoss. This code runs exactly as I would like it to, however, the moment I try and call the function rates() inside the __main__ the program just stops running.

在这里,我有一个程序可以轮询队列中的事件,如果发现它执行了一个REST API的命令。此外,如果找到一个事件,它会打印我需要用作止损的当前价格。这段代码完全按照我的意愿运行,但是,当我尝试调用__main__中的函数rates()时,程序就会停止运行。

Remove the reference stopLoss = rates() and the program runs great just without a stopLoss, but I need the rate -.001 as my stopLoss.

删除引用stopLoss = rates()并且程序在没有stopLoss的情况下运行良好,但我需要将-.001作为我的stopLoss。

Code as follows:

代码如下:

import Queue
import threading
import time
import json
import oandapy

from execution import Execution
from settings import STREAM_DOMAIN, API_DOMAIN, ACCESS_TOKEN, ACCOUNT_ID
from strategy import TestRandomStrategy
from streaming import StreamingForexPrices

#polls API for Current Price
def stop():
        while True:
            oanda = oandapy.API(environment="practice", access_token="xxxxxx")
            response = oanda.get_prices(instruments="EUR_USD")
            prices = response.get("prices")
            asking_price = prices[0].get("ask")
            s = asking_price - .001
            return s

#Checks for events and executes order                
def trade(events, strategy, execution):
    while True:
        try:
            event = events.get(False)
        except Queue.Empty:
            pass
        else:
            if event is not None:
                if event.type == 'TICK':
                    strategy.calculate_signals(event)
                elif event.type == 'ORDER':
                    print 
                    execution.execute_order(event)

def rates(events):
            while True:
                try:
                    event = events.get(False)
                except Queue.Empty:
                    pass
                else:
                        if event.type == 'TICK':
                                r = stop()
                                print r

if __name__ == "__main__":
    heartbeat = 0  # Half a second between polling
    events = Queue.Queue()

# Trade 1 unit of EUR/USD
    instrument = "EUR_USD"
    units = 1
    stopLoss = rates()  #Problem area!!!!!!>>>>>>>>>>>>>>//////////////////////////

    prices = StreamingForexPrices(
        STREAM_DOMAIN, ACCESS_TOKEN, ACCOUNT_ID,
        instrument, events
    )
    execution = Execution(API_DOMAIN, ACCESS_TOKEN, ACCOUNT_ID)

    strategy = TestRandomStrategy(instrument, units, events, stopLoss)


#Threads
    trade_thread = threading.Thread(target=trade, args=(events, strategy, execution))
    price_thread = threading.Thread(target=prices.stream_to_queue, args=[])
    stop_thread = threading.Thread(target=rates, args=(events,))      

# Start both threads
    trade_thread.start()
    price_thread.start()
    stop_thread.start()

1 个解决方案

#1


Okay no answers so far, so I'll try.
Your main problem seems to be, that you don't know how to interchange data between threads.
First the problem with the price.

好吧到目前为止还没有答案,所以我会试试。您的主要问题似乎是,您不知道如何在线程之间交换数据。首先是价格问题。

The loop here:

循环在这里:

while True:
    oanda = oandapy.API(environment="practice", access_token="xxxxxx")
    response = oanda.get_prices(instruments="EUR_USD")
    prices = response.get("prices")
    asking_price = prices[0].get("ask")
    s = asking_price - .001
    return s

Has no effect, because return s will automatically break out of it. So what you need is a shared variable where you store s. You can protect the access to it by using threading.Lock. The easiest way would be to subclass Thread and make s an instance attribute like this (I named it price):

没有效果,因为返回s会自动突破它。所以你需要的是一个存储s的共享变量。您可以使用threading.Lock保护对它的访问。最简单的方法是将Thread子类化并使其成为像这样的实例属性(我将其命名为price):

class PricePoller(threading.Thread):

    def __init__(self, interval):
        super(PricePoller, self).__init__()
        # private attribute, will be accessed as property via
        # threadsafe getter and setter
        self._price = None
        # lock guarding access to _price
        self._dataLock = threading.Lock()
        # polling interval
        self.interval = interval
        # set this thread as deamon, so it will be killed when
        # the main thread dies
        self.deamon = True
        # create an event that allows us to exit the mainloop
        # and terminate the thread safely
        self._stopEvent = threading.Event()

    def getPrice(self):
        # use _dataLock to get threadsafe access to self._price
        with self._dataLock:
            return self._price

    def setPrice(self, price)
        # use _dataLock to get threadsafe access to self._price
        with self._dataLock:
            self._price = price

    price = property(getPrice, setPrice, None)

    def run(self):
        while not self.stopEvent.isSet():
            oanda = oandapy.API(environment="practice", access_token="xxxxxx")
            response = oanda.get_prices(instruments="EUR_USD")
            prices = response.get("prices")
            asking_price = prices[0].get("ask")
            self.price = asking_price - .001
            time.sleep(self.interval) # don't spam the server

    def stop(self):
        self._stopEvent.set()

It can then be started with:

然后可以开始:

poller = PricePoller(heartbeat)
poller.start()

And you can get the price with poller.price wherever you want! You can even pass the poller on to other threads if you like. BUT! If you try to get the price immediately after poller.start() you will certainly get a None. Why this? poller.start() does not block, therefore while your main thread is going on and tries to get the first price, your poller has not even finished starting!
How to solve this? Introduce another threading.Event and use its function wait to let the main thread wait until the poller thread has set it. I leave the implementation up to you.

你可以随时随地使用poller.price获得价格!如果愿意,您甚至可以将轮询器传递给其他线程。但!如果你试图在poller.start()之后立即获得价格,你肯定会得到一个无。为什么这个? poller.start()没有阻塞,因此当你的主线程正在进行并试图获得第一个价格时,你的轮询器甚至没有完成启动!怎么解决这个?引入另一个threading.Event并使用它的函数wait让主线程等到poller线程设置它为止。我将实施留给您。

I'm just guessing that this is what you want... looking only at your code you don't have to put the stop function in a thread at all and you can just replace stopLess = rates() with stopLess = stop(), because you're not updating the results from the price polling anywhere! But I think you want to do that at some point, otherwise it wouldn't make sense to put it into a thread.

我只是猜测这就是你想要的......只看你的代码,你根本不需要将stop函数放在一个线程中,你可以用stopLess = stop()替换stopLess = rates() ,因为你没有在任何地方更新价格轮询的结果!但我认为你想在某些时候这样做,否则将它放入一个线程是没有意义的。

Now to the queue and your 'event stream'.
This snippet:

现在到队列和你的'事件流'。这个片段:

try:
    event = events.get(False)
except Queue.Empty:
    pass

Can just as well be:

也可以是:

event = events.get()

You're doing nothing in the meantime anyway and it is better to let Queue deal with waiting for an event.
Then, as far as I can see, you have two threads calling Queue.get, but this function will delete the element from the queue after retrieving it! This means whoever obtains the event first, consumes it and the other thread will never see it. But with the above solution for the poller, I think you can get rid of the stop_thread, which also solves that problem.

无论如何,你在此期间什么也不做,最好让Queue处理等待事件。然后,据我所知,你有两个线程调用Queue.get,但是这个函数会在检索后删除队列中的元素!这意味着无论谁先获得事件,消耗它,而另一个线程永远不会看到它。但是对于poller的上述解决方案,我认为你可以摆脱stop_thread,这也解决了这个问题。

Now a note on Threads in general.
A thread has its own 'chain' of calls that starts within its run method (or the method which you supply as target if you don't subclass). That means whatever function is called by run is executed by this thread, and also all functions that are called in turn by this one (and so on). HOWEVER, it is perfectly possible that two threads execute the same function, at the same time! And there is no way to know which thread executes which part of the code at a certain time, if you do not use means of synchronisation (e.g. Events, Locks or Barriers). This is no problem if all variables used in a called function are local ore were local in the calling function:

现在一般关于线程的注释。一个线程有自己的“链”调用,它们在其run方法中启动(或者如果你不是子类,则作为目标提供的方法)。这意味着由此线程执行run所调用的任何函数,以及由此线程依次调用的所有函数(依此类推)。但是,两个线程完全可能同时执行相同的功能!如果你不使用同步手段(例如事件,锁或障碍),则无法知道哪个线程在某个时间执行代码的哪个部分。如果调用函数中使用的所有变量都是本地矿石在调用函数中是本地的,那么这不是问题:

def onlyLocal(x, n):
    if n == 0:
        return x
    return onlyLocal(x*2, n-1)

or are exclusively read:

或者专门阅读:

def onlyRead(myarray):
    t = time.time()
    return t - sum(myarray)

But as soon as you do both read from and write to a variable from multiple threads, you need to secure access to those because otherwise if you pass objects which are known by more than one thread (for example self):

但是,只要您同时从多个线程读取和写入变量,就需要保护对这些变量的访问权限,否则如果您传递多个线程已知的对象(例如self):

def setPrice(self, price):
    self._price = price

or if your function uses variables from an outer scope which is acessed by multiple threads:

或者如果您的函数使用来自多个线程所占据的外部作用域的变量:

def variableFromOutside(y):
    global x
    x += y
    return y

You can never be sure that there isn't a thread(2) changing a variable which you(1) have just read, while you are processing it and before you update it with a then invalid value.

你可以永远不会确定没有一个线程(2)更改你刚刚读过的变量,当你正在处理它时,在用一个无效的值更新它之前。

global x ; Thread1 ;  Thread2 ;
    2    ;  y = x  ;   z = x  ;
    2    ; y **= 3 ;  x = z+1 ;
    3    ; x = y-4 ;   return ;
    4    ;  return ;   ...    ;

This is why you have to secure the access to those variables with locks. With a Lock (l):

这就是您必须使用锁保护对这些变量的访问的原因。带锁(l):

global x ;    Thread1   ;   Thread2   ;
    2    ; l.acqcuire() ; l.acquire() ;
    2    ; y = x        ;      |      ;
    2    ; y **= 3      ;      |      ;
    2    ; x = y-4      ;      |      ;
    4    ; l.release()  ;      v      ;
    4    ; return       ; z = x       ;
    4    ; ...          ; x = z+1     ;
    5    ; ...          ; l.release() ;

Here Thread1 acquires the lock before Thread2. Thread2 therefore has to wait until Thread1 releases the lock again before its call to acquire returns.
acquire and release are automatically called when you use with lock:. Note also that in this toy example it could have been the case that Thread2 aqcuires the lock before Thread1, but at least they would still not interfere with each other.

这里Thread1在Thread2之前获取锁。因此,Thread2必须等到Thread1再次释放锁定,然后再调用获取返回。使用lock时自动调用获取和释放:还要注意,在这个玩具示例中,可能是Thread2在Thread1之前获得锁定的情况,但至少它们仍然不会相互干扰。

This was a brief introduction on a large topic, read a bit about thread parallelisation and play around with it. There is no better means for learning than practice.
I've written this code here in the browser and therefore it is not tested! If someone finds issues, please tell me so in the comments or feel free to change it directly.

这是一个关于大型主题的简要介绍,阅读一些关于线程并行化的内容并使用它。没有比练习更好的学习方法。我在浏览器中编写了这段代码,因此未经过测试!如果有人发现问题,请在评论中告诉我,或者随时直接更改。

#1


Okay no answers so far, so I'll try.
Your main problem seems to be, that you don't know how to interchange data between threads.
First the problem with the price.

好吧到目前为止还没有答案,所以我会试试。您的主要问题似乎是,您不知道如何在线程之间交换数据。首先是价格问题。

The loop here:

循环在这里:

while True:
    oanda = oandapy.API(environment="practice", access_token="xxxxxx")
    response = oanda.get_prices(instruments="EUR_USD")
    prices = response.get("prices")
    asking_price = prices[0].get("ask")
    s = asking_price - .001
    return s

Has no effect, because return s will automatically break out of it. So what you need is a shared variable where you store s. You can protect the access to it by using threading.Lock. The easiest way would be to subclass Thread and make s an instance attribute like this (I named it price):

没有效果,因为返回s会自动突破它。所以你需要的是一个存储s的共享变量。您可以使用threading.Lock保护对它的访问。最简单的方法是将Thread子类化并使其成为像这样的实例属性(我将其命名为price):

class PricePoller(threading.Thread):

    def __init__(self, interval):
        super(PricePoller, self).__init__()
        # private attribute, will be accessed as property via
        # threadsafe getter and setter
        self._price = None
        # lock guarding access to _price
        self._dataLock = threading.Lock()
        # polling interval
        self.interval = interval
        # set this thread as deamon, so it will be killed when
        # the main thread dies
        self.deamon = True
        # create an event that allows us to exit the mainloop
        # and terminate the thread safely
        self._stopEvent = threading.Event()

    def getPrice(self):
        # use _dataLock to get threadsafe access to self._price
        with self._dataLock:
            return self._price

    def setPrice(self, price)
        # use _dataLock to get threadsafe access to self._price
        with self._dataLock:
            self._price = price

    price = property(getPrice, setPrice, None)

    def run(self):
        while not self.stopEvent.isSet():
            oanda = oandapy.API(environment="practice", access_token="xxxxxx")
            response = oanda.get_prices(instruments="EUR_USD")
            prices = response.get("prices")
            asking_price = prices[0].get("ask")
            self.price = asking_price - .001
            time.sleep(self.interval) # don't spam the server

    def stop(self):
        self._stopEvent.set()

It can then be started with:

然后可以开始:

poller = PricePoller(heartbeat)
poller.start()

And you can get the price with poller.price wherever you want! You can even pass the poller on to other threads if you like. BUT! If you try to get the price immediately after poller.start() you will certainly get a None. Why this? poller.start() does not block, therefore while your main thread is going on and tries to get the first price, your poller has not even finished starting!
How to solve this? Introduce another threading.Event and use its function wait to let the main thread wait until the poller thread has set it. I leave the implementation up to you.

你可以随时随地使用poller.price获得价格!如果愿意,您甚至可以将轮询器传递给其他线程。但!如果你试图在poller.start()之后立即获得价格,你肯定会得到一个无。为什么这个? poller.start()没有阻塞,因此当你的主线程正在进行并试图获得第一个价格时,你的轮询器甚至没有完成启动!怎么解决这个?引入另一个threading.Event并使用它的函数wait让主线程等到poller线程设置它为止。我将实施留给您。

I'm just guessing that this is what you want... looking only at your code you don't have to put the stop function in a thread at all and you can just replace stopLess = rates() with stopLess = stop(), because you're not updating the results from the price polling anywhere! But I think you want to do that at some point, otherwise it wouldn't make sense to put it into a thread.

我只是猜测这就是你想要的......只看你的代码,你根本不需要将stop函数放在一个线程中,你可以用stopLess = stop()替换stopLess = rates() ,因为你没有在任何地方更新价格轮询的结果!但我认为你想在某些时候这样做,否则将它放入一个线程是没有意义的。

Now to the queue and your 'event stream'.
This snippet:

现在到队列和你的'事件流'。这个片段:

try:
    event = events.get(False)
except Queue.Empty:
    pass

Can just as well be:

也可以是:

event = events.get()

You're doing nothing in the meantime anyway and it is better to let Queue deal with waiting for an event.
Then, as far as I can see, you have two threads calling Queue.get, but this function will delete the element from the queue after retrieving it! This means whoever obtains the event first, consumes it and the other thread will never see it. But with the above solution for the poller, I think you can get rid of the stop_thread, which also solves that problem.

无论如何,你在此期间什么也不做,最好让Queue处理等待事件。然后,据我所知,你有两个线程调用Queue.get,但是这个函数会在检索后删除队列中的元素!这意味着无论谁先获得事件,消耗它,而另一个线程永远不会看到它。但是对于poller的上述解决方案,我认为你可以摆脱stop_thread,这也解决了这个问题。

Now a note on Threads in general.
A thread has its own 'chain' of calls that starts within its run method (or the method which you supply as target if you don't subclass). That means whatever function is called by run is executed by this thread, and also all functions that are called in turn by this one (and so on). HOWEVER, it is perfectly possible that two threads execute the same function, at the same time! And there is no way to know which thread executes which part of the code at a certain time, if you do not use means of synchronisation (e.g. Events, Locks or Barriers). This is no problem if all variables used in a called function are local ore were local in the calling function:

现在一般关于线程的注释。一个线程有自己的“链”调用,它们在其run方法中启动(或者如果你不是子类,则作为目标提供的方法)。这意味着由此线程执行run所调用的任何函数,以及由此线程依次调用的所有函数(依此类推)。但是,两个线程完全可能同时执行相同的功能!如果你不使用同步手段(例如事件,锁或障碍),则无法知道哪个线程在某个时间执行代码的哪个部分。如果调用函数中使用的所有变量都是本地矿石在调用函数中是本地的,那么这不是问题:

def onlyLocal(x, n):
    if n == 0:
        return x
    return onlyLocal(x*2, n-1)

or are exclusively read:

或者专门阅读:

def onlyRead(myarray):
    t = time.time()
    return t - sum(myarray)

But as soon as you do both read from and write to a variable from multiple threads, you need to secure access to those because otherwise if you pass objects which are known by more than one thread (for example self):

但是,只要您同时从多个线程读取和写入变量,就需要保护对这些变量的访问权限,否则如果您传递多个线程已知的对象(例如self):

def setPrice(self, price):
    self._price = price

or if your function uses variables from an outer scope which is acessed by multiple threads:

或者如果您的函数使用来自多个线程所占据的外部作用域的变量:

def variableFromOutside(y):
    global x
    x += y
    return y

You can never be sure that there isn't a thread(2) changing a variable which you(1) have just read, while you are processing it and before you update it with a then invalid value.

你可以永远不会确定没有一个线程(2)更改你刚刚读过的变量,当你正在处理它时,在用一个无效的值更新它之前。

global x ; Thread1 ;  Thread2 ;
    2    ;  y = x  ;   z = x  ;
    2    ; y **= 3 ;  x = z+1 ;
    3    ; x = y-4 ;   return ;
    4    ;  return ;   ...    ;

This is why you have to secure the access to those variables with locks. With a Lock (l):

这就是您必须使用锁保护对这些变量的访问的原因。带锁(l):

global x ;    Thread1   ;   Thread2   ;
    2    ; l.acqcuire() ; l.acquire() ;
    2    ; y = x        ;      |      ;
    2    ; y **= 3      ;      |      ;
    2    ; x = y-4      ;      |      ;
    4    ; l.release()  ;      v      ;
    4    ; return       ; z = x       ;
    4    ; ...          ; x = z+1     ;
    5    ; ...          ; l.release() ;

Here Thread1 acquires the lock before Thread2. Thread2 therefore has to wait until Thread1 releases the lock again before its call to acquire returns.
acquire and release are automatically called when you use with lock:. Note also that in this toy example it could have been the case that Thread2 aqcuires the lock before Thread1, but at least they would still not interfere with each other.

这里Thread1在Thread2之前获取锁。因此,Thread2必须等到Thread1再次释放锁定,然后再调用获取返回。使用lock时自动调用获取和释放:还要注意,在这个玩具示例中,可能是Thread2在Thread1之前获得锁定的情况,但至少它们仍然不会相互干扰。

This was a brief introduction on a large topic, read a bit about thread parallelisation and play around with it. There is no better means for learning than practice.
I've written this code here in the browser and therefore it is not tested! If someone finds issues, please tell me so in the comments or feel free to change it directly.

这是一个关于大型主题的简要介绍,阅读一些关于线程并行化的内容并使用它。没有比练习更好的学习方法。我在浏览器中编写了这段代码,因此未经过测试!如果有人发现问题,请在评论中告诉我,或者随时直接更改。