English 中文(简体)
为多处理处理处理线程
原标题:Handling threads for multiprocessing

我在.Net 4.0 C#上管理线程时遇到了一个问题,我对线程的了解不足以解决这个问题,所以我把它发布在这里,希望有人能给我一些建议。

场景如下:

我们在C#框架4.0上有一个Windows服务,它(1)通过套接字连接到服务器以获取.PCM文件,(2)然后将其转换为.WAV文件,(3)通过电子邮件-SMTP发送,最后(4)通知初始服务器它已成功发送。

安装该服务的服务器有8个处理器和8 GB或RAM。

为了允许多处理,我用4个线程构建了服务,每个线程执行我前面提到的每个任务。

在代码中,我为每个任务都有类和方法,因此我创建线程并调用方法,如下所示:

 Thread eachThread = new Thread(object.PerformTask);

在每个方法中,我都有一个While,它检查套接字的连接是否有效,并根据它们的用途继续获取数据或处理数据。

while (_socket.Connected){ 
//perform task
}

问题是,随着越来越多的服务被安装(相同的windows服务被复制并指向服务器上的两个端点,以通过套接字获取文件),CPU消耗急剧增加,每个服务都继续运行和处理文件,但有时CPU消耗太高,服务器就会崩溃。

问题是:你建议我如何处理这种情况,我的意思是,一般来说,什么是处理这种高要求处理任务的好方法,以避免服务器在CPU消耗方面崩溃?

谢谢

PS:如果有人需要更多关于这个场景的细节,请告诉我。

编辑1

随着CPU崩溃,我的意思是服务器变得太慢,我们不得不重新启动它。

编辑2

在这里,我发布了部分代码,这样你就可以了解它是如何编程的:

while(true){
            //starting the service    
            try
            {
                IPEndPoint endPoint = conn.SettingConnection();
                string id = _objProp.Parametros.IdApp;

                using (socket = conn.Connect(endPoint))
                {
                    while (!socket.Connected)
                    {
                        _log.SetLog("INFO", "Conectando socket...");
                        socket = conn.Connect(endPoint);

                        //if the connection failed, wait 5 seconds for a new try.
                        if (!socket.Connected)
                        {
                            Thread.Sleep(5000);
                        }
                    }

                    proInThread = new Thread(proIn.ThreadRun);
                    conInThread = new Thread(conIn.ThreadRun);
                    conOutThread = new Thread(conOut.ThreadRun);

                    proInThread.Start();
                    conInThread.Start();
                    conOutThread.Start();

                    proInThread.Join();
                    conInThread.Join();
                    conOutThread.Join();
                }
          }
     }

编辑3

  • 线程1

    while (_socket.Connected) { try { var conn = new AppConection(ref _objPropiedades);

                    try
                    {
                        string message = conn.ReceiveMessage(_socket);
                        lock (((ICollection)_queue).SyncRoot)
                        {
                            _queue.Enqueue(message);
                            _syncEvents.NewItemEvent.Set();
                            _syncEvents.NewResetEvent.Set();
                        }
                        lock (((ICollection)_total_rec).SyncRoot)
                        {
    
                            _total_rec.Add("1");
                        }
                    }
                    catch (SocketException ex)
                    {
                        //log exception
    
                    }
                    catch (IndexOutOfRangeException ex)
                    {
                        //log exception
                    }
                    catch (Exception ex)
                    {
                       //log exception
    
                    }
                    //message received
    
                }
                catch (Exception ex)
                {
                   //logging error
                }
            }
    
            //release ANY instance that could be using memory
            _socket.Dispose();
            log = null;
    
  • 螺纹2

    while (_socket.Connected) { try{ _syncEvents.NewItemEventOut.WaitOne();

                        if (_socket.Connected)
                        {
                            lock (((ICollection)_queue).SyncRoot)
                            {
    
                                total_queue = _queue.Count();
    
                            }
    
                            int i = 0;
                            while (i < total_queue)
                            {
                                //EMail Emails;
                                string mail = "";
                                lock (((ICollection)_queue).SyncRoot)
                                {
    
                                    mail = _queue.Dequeue();
    
                                    i = i + 1;
                                }
                                try
                                {
                                    conn.SendMessage(_socket, mail);
                                    _syncEvents.NewResetEvent.Set();
                                }
                                catch (SocketException ex)
                                {
                                    //log exception
                                }
                            }
                        }
                        else
                        {
                            //log exception
    
                            _syncEvents.NewAbortEvent.Set();
                            Thread.CurrentThread.Abort();
                        }
                    }
                    catch (InvalidOperationException e)
                    {
                        //log exception
                    }
                    catch (Exception e)
                    {
                        //log exception
                    }
            }
    
            //release ANY instance that could be using memory
            _socket.Dispose();
            conn = null;
            log = null;
    
  • 螺纹3

    while (_socket.Connected) {

                    int total_queue = 0;
                    try
                {
                    _syncEvents.NewItemEvent.WaitOne();
                    lock (((ICollection) _queue).SyncRoot)
                    {
                        total_queue = _queue.Count();
                    }
                    int i = 0;
                    while (i < total_queue)
                    {
                        if (mgthreads.GetThreatdAct() <
    

    mgthreads.GetMaxThread()) { string message = ""; lock (((ICollection) _queue).SyncRoot) {

                                message = _queue.Dequeue();
                                i = i + 1;
    
                            }
                            count++;
                            lock (((ICollection) _queueO).SyncRoot)
                            {
                                app.SetParameters(_socket, _id,
    

    message, _queueO, _syncEvents, _total_Env, _total_err); }

                            Thread producerThread = new
    

    Thread(app.ThreadJob) { Name = "ProducerThread_" + DateTime.Now.ToString("ddMMyyyyhhmmss"), Priority = ThreadPriority.AboveNormal }; producerThread.Start();

                            producerThread.Join();
    
                            mgthreads.IncThreatdAct(producerThread);
                        }
                        mgthreads.DecThreatdAct();
                    }
                    mgthreads.DecThreatdAct();
                }
                catch (InvalidOperationException e)
                {
    
                }
                catch (Exception e)
                {
    
                }
                Thread.Sleep(500);
            }
    
            //release ANY instance that could be using memory
            _socket.Dispose();
            app = null;
            log = null;
            mgthreads = null;
    
  • 螺纹4

    MessageVO mesVo = fac.ParseMessageXml(_message);

最佳回答

我会降低线程优先级,让所有线程都通过一个信号量,该信号量将并发限制为Environment.ProcessorCount。这不是一个完美的解决方案,但在这种情况下听起来已经足够了,而且很容易修复。

编辑:考虑一下,您必须将10个服务合并到一个进程中,否则您将无法集中控制正在运行的线程。如果你有10个独立的过程,它们就无法协调。

问题回答

通常不会因为cpu使用率高而崩溃。当任何线程都在等待远程事件发生时(例如,远程服务器响应请求),该线程不使用cpu资源。但是,当它实际在做一些事情时,它会相应地使用cpu。在您提到的任务中,没有固有的高cpu使用率(因为将PCM文件保存为WAV不需要复杂的算法),因此高cpu使用量似乎是编程错误的迹象。





相关问题
Silverlight, Updating the UI during processing

I have a simple silverlight multifile upload application, and i want to provide the user with some feedback, right now its only in a test phase and i dont have the webservice. Somehow i cant get the ...

Is reading from an XmlDocument object thread safe?

I was wondering if i could safely read from an XmlDocument object using SelectNodes() and SelectSingleNode() from multiple threads with no problems. MSDN says that they are not guaranteed to be ...

Terminating a thread gracefully not using TerminateThread()

My application creates a thread and that runs in the background all the time. I can only terminate the thread manually, not from within the thread callback function. At the moment I am using ...

热门标签